浏览代码

Add support for dulwich merge-base command

Jelmer Vernooij 3 月之前
父节点
当前提交
983db89c0f
共有 7 个文件被更改,包括 530 次插入0 次删除
  1. 2 0
      NEWS
  2. 67 0
      dulwich/cli.py
  3. 35 0
      dulwich/graph.py
  4. 116 0
      dulwich/porcelain.py
  5. 82 0
      tests/test_cli.py
  6. 54 0
      tests/test_graph.py
  7. 174 0
      tests/test_porcelain.py

+ 2 - 0
NEWS

@@ -1,5 +1,7 @@
 0.24.3	UNRELEASED
 
+ * Add ``dulwich merge-base`` command. (Jelmer Vernooij, #1831)
+
  * Add support for ``git var`` command to display Git's logical variables
    (GIT_AUTHOR_IDENT, GIT_COMMITTER_IDENT, GIT_EDITOR, GIT_SEQUENCE_EDITOR,
    GIT_PAGER, GIT_DEFAULT_BRANCH). Available as ``porcelain.var()`` and

+ 67 - 0
dulwich/cli.py

@@ -2953,6 +2953,72 @@ class cmd_merge(Command):
             return 1
 
 
+class cmd_merge_base(Command):
+    """Find the best common ancestor between commits."""
+
+    def run(self, args: Sequence[str]) -> Optional[int]:
+        """Execute the merge-base command.
+
+        Args:
+            args: Command line arguments
+        """
+        parser = argparse.ArgumentParser(
+            description="Find the best common ancestor between commits",
+            prog="dulwich merge-base",
+        )
+        parser.add_argument("commits", nargs="+", help="Commits to find merge base for")
+        parser.add_argument("--all", action="store_true", help="Output all merge bases")
+        parser.add_argument(
+            "--octopus",
+            action="store_true",
+            help="Compute common ancestor of all commits",
+        )
+        parser.add_argument(
+            "--is-ancestor",
+            action="store_true",
+            help="Check if first commit is ancestor of second",
+        )
+        parser.add_argument(
+            "--independent",
+            action="store_true",
+            help="List commits not reachable from others",
+        )
+        parsed_args = parser.parse_args(args)
+
+        try:
+            if parsed_args.is_ancestor:
+                if len(parsed_args.commits) != 2:
+                    logger.error("--is-ancestor requires exactly two commits")
+                    return 1
+                is_anc = porcelain.is_ancestor(
+                    ".",
+                    ancestor=parsed_args.commits[0],
+                    descendant=parsed_args.commits[1],
+                )
+                return 0 if is_anc else 1
+            elif parsed_args.independent:
+                commits = porcelain.independent_commits(".", parsed_args.commits)
+                for commit_id in commits:
+                    print(commit_id.decode())
+                return 0
+            else:
+                if len(parsed_args.commits) < 2:
+                    logger.error("At least two commits are required")
+                    return 1
+                merge_bases = porcelain.merge_base(
+                    ".",
+                    parsed_args.commits,
+                    all=parsed_args.all,
+                    octopus=parsed_args.octopus,
+                )
+                for commit_id in merge_bases:
+                    print(commit_id.decode())
+                return 0
+        except (ValueError, KeyError) as e:
+            logger.error("%s", e)
+            return 1
+
+
 class cmd_notes_add(Command):
     """Add notes to a commit."""
 
@@ -4613,6 +4679,7 @@ commands = {
     "ls-remote": cmd_ls_remote,
     "ls-tree": cmd_ls_tree,
     "merge": cmd_merge,
+    "merge-base": cmd_merge_base,
     "merge-tree": cmd_merge_tree,
     "notes": cmd_notes,
     "pack-objects": cmd_pack_objects,

+ 35 - 0
dulwich/graph.py

@@ -350,3 +350,38 @@ def can_fast_forward(repo: "BaseRepo", c1: bytes, c2: bytes) -> bool:
         shallows=parents_provider.shallows,
     )
     return lcas == [c1]
+
+
+def independent(repo: "BaseRepo", commit_ids: Sequence[ObjectID]) -> list[ObjectID]:
+    """Filter commits to only those that are not reachable from others.
+
+    Args:
+      repo: Repository object
+      commit_ids: list of commit ids to filter
+
+    Returns:
+      list of commit ids that are not ancestors of any other commits in the list
+    """
+    if not commit_ids:
+        return []
+    if len(commit_ids) == 1:
+        return list(commit_ids)
+
+    # Filter out commits that are ancestors of other commits
+    independent_commits = []
+    for i, commit_id in enumerate(commit_ids):
+        is_independent = True
+        # Check if this commit is an ancestor of any other commit
+        for j, other_id in enumerate(commit_ids):
+            if i == j:
+                continue
+            # If merge base of (commit_id, other_id) is commit_id,
+            # then commit_id is an ancestor of other_id
+            merge_bases = find_merge_base(repo, [commit_id, other_id])
+            if merge_bases == [commit_id]:
+                is_independent = False
+                break
+        if is_independent:
+            independent_commits.append(commit_id)
+
+    return independent_commits

+ 116 - 0
dulwich/porcelain.py

@@ -7167,3 +7167,119 @@ def worktree_repair(
 
     with open_repo_closing(repo) as r:
         return repair_worktree(r, paths=paths)
+
+
+def merge_base(
+    repo: RepoPath = ".",
+    committishes: Optional[Sequence[Union[str, bytes]]] = None,
+    all: bool = False,
+    octopus: bool = False,
+) -> list[bytes]:
+    """Find the best common ancestor(s) between commits.
+
+    Args:
+        repo: Path to repository
+        committishes: List of commit references (branches, tags, commit IDs)
+        all: If True, return all merge bases, not just one
+        octopus: If True, find merge base of all commits (n-way merge)
+
+    Returns:
+        List of commit IDs that are merge bases
+    """
+    from .graph import find_merge_base, find_octopus_base
+    from .objects import Commit
+    from .objectspec import parse_object
+
+    if committishes is None or len(committishes) < 2:
+        raise ValueError("At least two commits are required")
+
+    with open_repo_closing(repo) as r:
+        # Resolve committish references to commit IDs
+        commit_ids = []
+        for committish in committishes:
+            obj = parse_object(r, committish)
+            if not isinstance(obj, Commit):
+                raise ValueError(f"Expected commit, got {obj.type_name.decode()}")
+            commit_ids.append(obj.id)
+
+        # Find merge base
+        if octopus:
+            result = find_octopus_base(r, commit_ids)
+        else:
+            result = find_merge_base(r, commit_ids)
+
+        # Return first result only if all=False
+        if not all and result:
+            return [result[0]]
+        return result
+
+
+def is_ancestor(
+    repo: RepoPath = ".",
+    ancestor: Optional[Union[str, bytes]] = None,
+    descendant: Optional[Union[str, bytes]] = None,
+) -> bool:
+    """Check if one commit is an ancestor of another.
+
+    Args:
+        repo: Path to repository
+        ancestor: Commit that might be the ancestor
+        descendant: Commit that might be the descendant
+
+    Returns:
+        True if ancestor is an ancestor of descendant, False otherwise
+    """
+    from .graph import find_merge_base
+    from .objects import Commit
+    from .objectspec import parse_object
+
+    if ancestor is None or descendant is None:
+        raise ValueError("Both ancestor and descendant are required")
+
+    with open_repo_closing(repo) as r:
+        # Resolve committish references to commit IDs
+        ancestor_obj = parse_object(r, ancestor)
+        if not isinstance(ancestor_obj, Commit):
+            raise ValueError(f"Expected commit, got {ancestor_obj.type_name.decode()}")
+        descendant_obj = parse_object(r, descendant)
+        if not isinstance(descendant_obj, Commit):
+            raise ValueError(
+                f"Expected commit, got {descendant_obj.type_name.decode()}"
+            )
+
+        # If ancestor is the merge base of (ancestor, descendant), then it's an ancestor
+        merge_bases = find_merge_base(r, [ancestor_obj.id, descendant_obj.id])
+        return merge_bases == [ancestor_obj.id]
+
+
+def independent_commits(
+    repo: RepoPath = ".",
+    committishes: Optional[Sequence[Union[str, bytes]]] = None,
+) -> list[bytes]:
+    """Filter commits to only those that are not reachable from others.
+
+    Args:
+        repo: Path to repository
+        committishes: List of commit references to filter
+
+    Returns:
+        List of commit IDs that are not ancestors of any other commits in the list
+    """
+    from .graph import independent
+    from .objects import Commit
+    from .objectspec import parse_object
+
+    if committishes is None or len(committishes) == 0:
+        return []
+
+    with open_repo_closing(repo) as r:
+        # Resolve committish references to commit IDs
+        commit_ids = []
+        for committish in committishes:
+            obj = parse_object(r, committish)
+            if not isinstance(obj, Commit):
+                raise ValueError(f"Expected commit, got {obj.type_name.decode()}")
+            commit_ids.append(obj.id)
+
+        # Filter to independent commits
+        return independent(r, commit_ids)

+ 82 - 0
tests/test_cli.py

@@ -3015,5 +3015,87 @@ class WorktreeCliTests(DulwichCliTestCase):
                 cmd.run(["invalid"])
 
 
+class MergeBaseCommandTest(DulwichCliTestCase):
+    """Tests for merge-base command."""
+
+    def _create_commits(self):
+        """Helper to create a commit history for testing."""
+        # Create three commits in linear history
+        for i in range(1, 4):
+            test_file = os.path.join(self.repo_path, f"file{i}.txt")
+            with open(test_file, "w") as f:
+                f.write(f"content{i}")
+            self._run_cli("add", f"file{i}.txt")
+            self._run_cli("commit", f"--message=Commit {i}")
+
+    def test_merge_base_linear_history(self):
+        """Test merge-base with linear history."""
+        self._create_commits()
+
+        result, stdout, _stderr = self._run_cli("merge-base", "HEAD", "HEAD~1")
+        self.assertEqual(result, 0)
+
+        # Should return HEAD~1 as the merge base
+        output = stdout.strip()
+        # Verify it's a valid commit ID (40 hex chars)
+        self.assertEqual(len(output), 40)
+        self.assertTrue(all(c in "0123456789abcdef" for c in output))
+
+    def test_merge_base_is_ancestor_true(self):
+        """Test merge-base --is-ancestor when true."""
+        self._create_commits()
+
+        result, _stdout, _stderr = self._run_cli(
+            "merge-base", "--is-ancestor", "HEAD~1", "HEAD"
+        )
+        self.assertEqual(result, 0)  # Exit code 0 means true
+
+    def test_merge_base_is_ancestor_false(self):
+        """Test merge-base --is-ancestor when false."""
+        self._create_commits()
+
+        result, _stdout, _stderr = self._run_cli(
+            "merge-base", "--is-ancestor", "HEAD", "HEAD~1"
+        )
+        self.assertEqual(result, 1)  # Exit code 1 means false
+
+    def test_merge_base_independent(self):
+        """Test merge-base --independent."""
+        self._create_commits()
+
+        # All three commits in linear history - only HEAD should be independent
+        head = self.repo.refs[b"HEAD"]
+        head_1 = self.repo[head].parents[0]
+        head_2 = self.repo[head_1].parents[0]
+
+        result, stdout, _stderr = self._run_cli(
+            "merge-base",
+            "--independent",
+            head.decode(),
+            head_1.decode(),
+            head_2.decode(),
+        )
+        self.assertEqual(result, 0)
+
+        # Only HEAD should be in output (as it's the only independent commit)
+        lines = stdout.strip().split("\n")
+        self.assertEqual(len(lines), 1)
+        self.assertEqual(lines[0], head.decode())
+
+    def test_merge_base_requires_two_commits(self):
+        """Test merge-base requires at least two commits."""
+        self._create_commits()
+
+        result, _stdout, _stderr = self._run_cli("merge-base", "HEAD")
+        self.assertEqual(result, 1)
+
+    def test_merge_base_is_ancestor_requires_two_commits(self):
+        """Test merge-base --is-ancestor requires exactly two commits."""
+        self._create_commits()
+
+        result, _stdout, _stderr = self._run_cli("merge-base", "--is-ancestor", "HEAD")
+        self.assertEqual(result, 1)
+
+
 if __name__ == "__main__":
     unittest.main()

+ 54 - 0
tests/test_graph.py

@@ -26,6 +26,7 @@ from dulwich.graph import (
     can_fast_forward,
     find_merge_base,
     find_octopus_base,
+    independent,
 )
 from dulwich.repo import MemoryRepo
 from dulwich.tests.utils import make_commit
@@ -546,3 +547,56 @@ class WorkListTest(TestCase):
         # get should raise IndexError when heap is empty
         with self.assertRaises(IndexError):
             wlst.get()
+
+
+class IndependentTests(TestCase):
+    def test_independent_empty(self) -> None:
+        r = MemoryRepo()
+        # Empty list of commits
+        self.assertEqual([], independent(r, []))
+
+    def test_independent_single(self) -> None:
+        r = MemoryRepo()
+        base = make_commit()
+        r.object_store.add_objects([(base, None)])
+        # Single commit is independent
+        self.assertEqual([base.id], independent(r, [base.id]))
+
+    def test_independent_linear(self) -> None:
+        r = MemoryRepo()
+        base = make_commit()
+        c1 = make_commit(parents=[base.id])
+        c2 = make_commit(parents=[c1.id])
+        r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
+        # In linear history, only the tip is independent
+        self.assertEqual([c2.id], independent(r, [base.id, c1.id, c2.id]))
+
+    def test_independent_diverged(self) -> None:
+        r = MemoryRepo()
+        base = make_commit()
+        c1 = make_commit(parents=[base.id])
+        c2a = make_commit(parents=[c1.id], message=b"2a")
+        c2b = make_commit(parents=[c1.id], message=b"2b")
+        r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
+        # c2a and c2b are independent from each other
+        result = independent(r, [c2a.id, c2b.id])
+        self.assertEqual(2, len(result))
+        self.assertIn(c2a.id, result)
+        self.assertIn(c2b.id, result)
+
+    def test_independent_mixed(self) -> None:
+        r = MemoryRepo()
+        base = make_commit()
+        c1 = make_commit(parents=[base.id])
+        c2a = make_commit(parents=[c1.id], message=b"2a")
+        c2b = make_commit(parents=[c1.id], message=b"2b")
+        c3a = make_commit(parents=[c2a.id], message=b"3a")
+        r.object_store.add_objects(
+            [(base, None), (c1, None), (c2a, None), (c2b, None), (c3a, None)]
+        )
+        # c3a and c2b are independent; c2a is ancestor of c3a
+        result = independent(r, [c2a.id, c2b.id, c3a.id])
+        self.assertEqual(2, len(result))
+        self.assertIn(c2b.id, result)
+        self.assertIn(c3a.id, result)
+        self.assertNotIn(c2a.id, result)  # c2a is ancestor of c3a

+ 174 - 0
tests/test_porcelain.py

@@ -9867,3 +9867,177 @@ class VarTests(PorcelainTestCase):
         """Test requesting an unknown variable."""
         with self.assertRaises(KeyError):
             porcelain.var(self.repo_path, variable="UNKNOWN_VARIABLE")
+
+
+class MergeBaseTests(PorcelainTestCase):
+    """Tests for merge-base, is_ancestor, and independent_commits."""
+
+    def test_merge_base_linear_history(self):
+        """Test merge-base with linear history."""
+        # Create linear history: c1 <- c2 <- c3
+        _c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 2]])
+        self.repo.refs[b"refs/heads/branch1"] = c2.id
+        self.repo.refs[b"refs/heads/branch2"] = c3.id
+
+        result = porcelain.merge_base(
+            self.repo.path, committishes=["refs/heads/branch1", "refs/heads/branch2"]
+        )
+        self.assertEqual([c2.id], result)
+
+    def test_merge_base_diverged(self):
+        """Test merge-base with diverged branches."""
+        # Create diverged history: c1 <- c2a, c1 <- c2b
+        c1, c2a, c2b = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 1]])
+        self.repo.refs[b"refs/heads/branch-a"] = c2a.id
+        self.repo.refs[b"refs/heads/branch-b"] = c2b.id
+
+        result = porcelain.merge_base(
+            self.repo.path, committishes=["refs/heads/branch-a", "refs/heads/branch-b"]
+        )
+        self.assertEqual([c1.id], result)
+
+    def test_merge_base_all(self):
+        """Test merge-base with --all flag."""
+        # Create history with multiple common ancestors
+        commits = build_commit_graph(
+            self.repo.object_store,
+            [[1], [2, 1], [3, 1], [4, 2, 3], [5, 2, 3]],
+        )
+        _c1, c2, c3, c4, c5 = commits
+        self.repo.refs[b"refs/heads/branch1"] = c4.id
+        self.repo.refs[b"refs/heads/branch2"] = c5.id
+
+        # Without --all, should return only first result
+        result = porcelain.merge_base(
+            self.repo.path,
+            committishes=["refs/heads/branch1", "refs/heads/branch2"],
+            all=False,
+        )
+        self.assertEqual(1, len(result))
+
+        # With --all, should return all merge bases
+        result_all = porcelain.merge_base(
+            self.repo.path,
+            committishes=["refs/heads/branch1", "refs/heads/branch2"],
+            all=True,
+        )
+        self.assertEqual(2, len(result_all))
+        self.assertIn(c2.id, result_all)
+        self.assertIn(c3.id, result_all)
+
+    def test_merge_base_octopus(self):
+        """Test merge-base with --octopus flag."""
+        # Create three-way diverged history
+        commits = build_commit_graph(
+            self.repo.object_store, [[1], [2, 1], [3, 1], [4, 1]]
+        )
+        c1, c2, c3, c4 = commits
+        self.repo.refs[b"refs/heads/a"] = c2.id
+        self.repo.refs[b"refs/heads/b"] = c3.id
+        self.repo.refs[b"refs/heads/c"] = c4.id
+
+        result = porcelain.merge_base(
+            self.repo.path,
+            committishes=["refs/heads/a", "refs/heads/b", "refs/heads/c"],
+            octopus=True,
+        )
+        self.assertEqual([c1.id], result)
+
+    def test_merge_base_requires_two_commits(self):
+        """Test merge-base requires at least two commits."""
+        with self.assertRaises(ValueError):
+            porcelain.merge_base(self.repo.path, committishes=["HEAD"])
+
+    def test_is_ancestor_true(self):
+        """Test is_ancestor returns True when commit is an ancestor."""
+        # Create linear history: c1 <- c2 <- c3
+        c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 2]])
+        self.repo.refs[b"refs/heads/main"] = c3.id
+
+        # c1 is ancestor of c3
+        result = porcelain.is_ancestor(
+            self.repo.path, ancestor=c1.id.decode(), descendant=c3.id.decode()
+        )
+        self.assertTrue(result)
+
+        # c2 is ancestor of c3
+        result = porcelain.is_ancestor(
+            self.repo.path, ancestor=c2.id.decode(), descendant=c3.id.decode()
+        )
+        self.assertTrue(result)
+
+    def test_is_ancestor_false(self):
+        """Test is_ancestor returns False when commit is not an ancestor."""
+        # Create diverged history: c1 <- c2a, c1 <- c2b
+        _c1, c2a, c2b = build_commit_graph(
+            self.repo.object_store, [[1], [2, 1], [3, 1]]
+        )
+
+        # c2a is not ancestor of c2b
+        result = porcelain.is_ancestor(
+            self.repo.path, ancestor=c2a.id.decode(), descendant=c2b.id.decode()
+        )
+        self.assertFalse(result)
+
+        # c2b is not ancestor of c2a
+        result = porcelain.is_ancestor(
+            self.repo.path, ancestor=c2b.id.decode(), descendant=c2a.id.decode()
+        )
+        self.assertFalse(result)
+
+    def test_is_ancestor_requires_both(self):
+        """Test is_ancestor requires both ancestor and descendant."""
+        with self.assertRaises(ValueError):
+            porcelain.is_ancestor(self.repo.path, ancestor="HEAD", descendant=None)
+        with self.assertRaises(ValueError):
+            porcelain.is_ancestor(self.repo.path, ancestor=None, descendant="HEAD")
+
+    def test_independent_commits_linear(self):
+        """Test independent_commits with linear history."""
+        # Create linear history: c1 <- c2 <- c3
+        c1, c2, c3 = build_commit_graph(self.repo.object_store, [[1], [2, 1], [3, 2]])
+
+        # Only c3 is independent (c1 and c2 are ancestors)
+        result = porcelain.independent_commits(
+            self.repo.path,
+            committishes=[c1.id.decode(), c2.id.decode(), c3.id.decode()],
+        )
+        self.assertEqual([c3.id], result)
+
+    def test_independent_commits_diverged(self):
+        """Test independent_commits with diverged branches."""
+        # Create diverged history: c1 <- c2a, c1 <- c2b
+        _c1, c2a, c2b = build_commit_graph(
+            self.repo.object_store, [[1], [2, 1], [3, 1]]
+        )
+
+        # c2a and c2b are both independent (neither is ancestor of the other)
+        result = porcelain.independent_commits(
+            self.repo.path, committishes=[c2a.id.decode(), c2b.id.decode()]
+        )
+        self.assertEqual(2, len(result))
+        self.assertIn(c2a.id, result)
+        self.assertIn(c2b.id, result)
+
+    def test_independent_commits_mixed(self):
+        """Test independent_commits with mixed history."""
+        # Create mixed history
+        commits = build_commit_graph(
+            self.repo.object_store, [[1], [2, 1], [3, 1], [4, 2]]
+        )
+        _c1, c2, c3, c4 = commits
+
+        # c4 and c3 are independent; c2 is ancestor of c4
+        result = porcelain.independent_commits(
+            self.repo.path,
+            committishes=[c2.id.decode(), c3.id.decode(), c4.id.decode()],
+        )
+        self.assertEqual(2, len(result))
+        self.assertIn(c3.id, result)
+        self.assertIn(c4.id, result)
+        self.assertNotIn(c2.id, result)
+
+    def test_independent_commits_empty(self):
+        """Test independent_commits with empty list."""
+        result = porcelain.independent_commits(self.repo.path, committishes=[])
+        self.assertEqual([], result)