Browse Source

Add merge-tree plumbing command

This implements the merge-tree plumbing command in dulwich.porcelain and CLI,
similar to git merge-tree. The command performs three-way tree merges without
touching the working directory or creating commits.

The implementation leverages the existing Merger class from dulwich.merge
and uses parse_tree from objectspec to resolve tree-ish arguments.
Jelmer Vernooij 1 tháng trước cách đây
mục cha
commit
fa028bc327
5 tập tin đã thay đổi với 309 bổ sung6 xóa
  1. 5 0
      NEWS
  2. 67 3
      dulwich/cli.py
  3. 39 0
      dulwich/porcelain.py
  4. 2 3
      tests/test_cli_merge.py
  5. 196 0
      tests/test_porcelain_merge.py

+ 5 - 0
NEWS

@@ -6,6 +6,11 @@
    Available in both ``dulwich.porcelain.unpack_objects()`` and as a CLI
    command ``dulwich unpack-objects``. (Jelmer Vernooij)
 
+ * Add ``merge-tree`` plumbing command to ``dulwich.porcelain`` and CLI.
+   This command performs three-way tree merges without touching the working
+   directory or creating commits, similar to ``git merge-tree``. It outputs
+   the merged tree SHA and lists any conflicted paths. (Jelmer Vernooij)
+
  * Add support for pack index format version 3. This format supports variable
    hash sizes to enable future SHA-256 support. The implementation includes
    reading and writing v3 indexes with proper hash algorithm identification

+ 67 - 3
dulwich/cli.py

@@ -874,7 +874,7 @@ class cmd_describe(Command):
 
 
 class cmd_merge(Command):
-    def run(self, args) -> None:
+    def run(self, args) -> Optional[int]:
         parser = argparse.ArgumentParser()
         parser.add_argument("commit", type=str, help="Commit to merge")
         parser.add_argument(
@@ -902,7 +902,7 @@ class cmd_merge(Command):
                 print(
                     "\nAutomatic merge failed; fix conflicts and then commit the result."
                 )
-                sys.exit(1)
+                return 1
             elif merge_commit_id is None and not args.no_commit:
                 print("Already up to date.")
             elif args.no_commit:
@@ -911,9 +911,72 @@ class cmd_merge(Command):
                 print(
                     f"Merge successful. Created merge commit {merge_commit_id.decode()}"
                 )
+            return None
         except porcelain.Error as e:
             print(f"Error: {e}")
-            sys.exit(1)
+            return 1
+
+
+class cmd_merge_tree(Command):
+    def run(self, args) -> Optional[int]:
+        parser = argparse.ArgumentParser(
+            description="Perform a tree-level merge without touching the working directory"
+        )
+        parser.add_argument(
+            "base_tree",
+            nargs="?",
+            help="The common ancestor tree (optional, defaults to empty tree)",
+        )
+        parser.add_argument("our_tree", help="Our side of the merge")
+        parser.add_argument("their_tree", help="Their side of the merge")
+        parser.add_argument(
+            "-z",
+            "--name-only",
+            action="store_true",
+            help="Output only conflict paths, null-terminated",
+        )
+        args = parser.parse_args(args)
+
+        try:
+            # Determine base tree - if only two args provided, base is None
+            if args.base_tree is None:
+                # Only two arguments provided
+                base_tree = None
+                our_tree = args.our_tree
+                their_tree = args.their_tree
+            else:
+                # Three arguments provided
+                base_tree = args.base_tree
+                our_tree = args.our_tree
+                their_tree = args.their_tree
+
+            merged_tree_id, conflicts = porcelain.merge_tree(
+                ".", base_tree, our_tree, their_tree
+            )
+
+            if args.name_only:
+                # Output only conflict paths, null-terminated
+                for conflict_path in conflicts:
+                    sys.stdout.buffer.write(conflict_path)
+                    sys.stdout.buffer.write(b"\0")
+            else:
+                # Output the merged tree SHA
+                print(merged_tree_id.decode("ascii"))
+
+                # Output conflict information
+                if conflicts:
+                    print(f"\nConflicts in {len(conflicts)} file(s):")
+                    for conflict_path in conflicts:
+                        print(f"  {conflict_path.decode()}")
+
+            return None
+
+        except porcelain.Error as e:
+            print(f"Error: {e}", file=sys.stderr)
+            return 1
+        except KeyError as e:
+            print(f"Error: Object not found: {e}", file=sys.stderr)
+            return 1
 
 
 class cmd_help(Command):
@@ -969,6 +1032,7 @@ commands = {
     "ls-remote": cmd_ls_remote,
     "ls-tree": cmd_ls_tree,
     "merge": cmd_merge,
+    "merge-tree": cmd_merge_tree,
     "pack-objects": cmd_pack_objects,
     "pack-refs": cmd_pack_refs,
     "pull": cmd_pull,

+ 39 - 0
dulwich/porcelain.py

@@ -42,6 +42,7 @@ Currently implemented:
  * ls_remote
  * ls_tree
  * merge
+ * merge_tree
  * pull
  * push
  * rm
@@ -2854,3 +2855,41 @@ def unpack_objects(pack_path, target="."):
                 r.object_store.add_object(obj)
                 count += 1
             return count
+
+
+def merge_tree(repo, base_tree, our_tree, their_tree):
+    """Perform a three-way tree merge without touching the working directory.
+
+    This is similar to git merge-tree, performing a merge at the tree level
+    without creating commits or updating any references.
+
+    Args:
+      repo: Repository containing the trees
+      base_tree: Tree-ish of the common ancestor (or None for no common ancestor)
+      our_tree: Tree-ish of our side of the merge
+      their_tree: Tree-ish of their side of the merge
+
+    Returns:
+      Tuple of (merged_tree_id, conflicts) where:
+        - merged_tree_id is the SHA-1 of the merged tree
+        - conflicts is a list of paths (as bytes) that had conflicts
+
+    Raises:
+      KeyError: If any of the tree-ish arguments cannot be resolved
+    """
+    from .merge import Merger
+
+    with open_repo_closing(repo) as r:
+        # Resolve tree-ish arguments to actual trees
+        base = parse_tree(r, base_tree) if base_tree else None
+        ours = parse_tree(r, our_tree)
+        theirs = parse_tree(r, their_tree)
+
+        # Perform the merge
+        merger = Merger(r.object_store)
+        merged_tree, conflicts = merger.merge_trees(base, ours, theirs)
+
+        # Add the merged tree to the object store
+        r.object_store.add_object(merged_tree)
+
+        return merged_tree.id, conflicts

+ 2 - 3
tests/test_cli_merge.py

@@ -109,9 +109,8 @@ class CLIMergeTests(TestCase):
             try:
                 os.chdir(tmpdir)
                 with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                    with patch("sys.exit") as mock_exit:
-                        main(["merge", "feature"])
-                        mock_exit.assert_called_with(1)
+                    exit_code = main(["merge", "feature"])
+                    self.assertEqual(1, exit_code)
                     output = mock_stdout.getvalue()
 
                 self.assertIn("Merge conflicts", output)

+ 196 - 0
tests/test_porcelain_merge.py

@@ -274,5 +274,201 @@ class PorcelainMergeTests(TestCase):
             self.assertRaises(porcelain.Error, porcelain.merge, tmpdir, "nonexistent")
 
 
+class PorcelainMergeTreeTests(TestCase):
+    """Tests for the porcelain merge_tree functionality."""
+
+    def test_merge_tree_no_conflicts(self):
+        """Test merge_tree with no conflicts."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+            repo = Repo(tmpdir)
+
+            # Create base tree
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Base content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            base_commit = porcelain.commit(tmpdir, message=b"Base commit")
+
+            # Create our branch
+            porcelain.branch_create(tmpdir, "ours")
+            porcelain.checkout_branch(tmpdir, "ours")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Our content\n")
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Our new file\n")
+            porcelain.add(tmpdir, paths=["file1.txt", "file2.txt"])
+            our_commit = porcelain.commit(tmpdir, message=b"Our commit")
+
+            # Create their branch
+            porcelain.checkout_branch(tmpdir, b"master")
+            porcelain.branch_create(tmpdir, "theirs")
+            porcelain.checkout_branch(tmpdir, "theirs")
+            with open(os.path.join(tmpdir, "file3.txt"), "w") as f:
+                f.write("Their new file\n")
+            porcelain.add(tmpdir, paths=["file3.txt"])
+            their_commit = porcelain.commit(tmpdir, message=b"Their commit")
+
+            # Perform merge_tree
+            merged_tree_id, conflicts = porcelain.merge_tree(
+                tmpdir, base_commit, our_commit, their_commit
+            )
+
+            # Should have no conflicts
+            self.assertEqual(conflicts, [])
+
+            # Check merged tree contains all files
+            merged_tree = repo[merged_tree_id]
+            self.assertIn(b"file1.txt", merged_tree)
+            self.assertIn(b"file2.txt", merged_tree)
+            self.assertIn(b"file3.txt", merged_tree)
+
+    def test_merge_tree_with_conflicts(self):
+        """Test merge_tree with conflicts."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+            repo = Repo(tmpdir)
+
+            # Create base tree
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Base content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            base_commit = porcelain.commit(tmpdir, message=b"Base commit")
+
+            # Create our branch with changes
+            porcelain.branch_create(tmpdir, "ours")
+            porcelain.checkout_branch(tmpdir, "ours")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Our content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            our_commit = porcelain.commit(tmpdir, message=b"Our commit")
+
+            # Create their branch with conflicting changes
+            porcelain.checkout_branch(tmpdir, b"master")
+            porcelain.branch_create(tmpdir, "theirs")
+            porcelain.checkout_branch(tmpdir, "theirs")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Their content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            their_commit = porcelain.commit(tmpdir, message=b"Their commit")
+
+            # Perform merge_tree
+            merged_tree_id, conflicts = porcelain.merge_tree(
+                tmpdir, base_commit, our_commit, their_commit
+            )
+
+            # Should have conflicts
+            self.assertEqual(conflicts, [b"file1.txt"])
+
+            # Check merged tree exists and contains conflict markers
+            merged_tree = repo[merged_tree_id]
+            self.assertIn(b"file1.txt", merged_tree)
+
+            # Get the merged blob content
+            file_mode, file_sha = merged_tree[b"file1.txt"]
+            merged_blob = repo[file_sha]
+            content = merged_blob.data
+
+            # Should contain conflict markers
+            self.assertIn(b"<<<<<<< ours", content)
+            self.assertIn(b"=======", content)
+            self.assertIn(b">>>>>>> theirs", content)
+
+    def test_merge_tree_no_base(self):
+        """Test merge_tree without a base commit."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+            repo = Repo(tmpdir)
+
+            # Create our tree
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Our content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            our_commit = porcelain.commit(tmpdir, message=b"Our commit")
+
+            # Create their tree (independent)
+            os.remove(os.path.join(tmpdir, "file1.txt"))
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Their content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            their_commit = porcelain.commit(tmpdir, message=b"Their commit")
+
+            # Perform merge_tree without base
+            merged_tree_id, conflicts = porcelain.merge_tree(
+                tmpdir, None, our_commit, their_commit
+            )
+
+            # Should have no conflicts (different files)
+            self.assertEqual(conflicts, [])
+
+            # Check merged tree contains both files
+            merged_tree = repo[merged_tree_id]
+            self.assertIn(b"file1.txt", merged_tree)
+            self.assertIn(b"file2.txt", merged_tree)
+
+    def test_merge_tree_with_tree_objects(self):
+        """Test merge_tree with tree objects instead of commits."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+            repo = Repo(tmpdir)
+
+            # Create base tree
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Base content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            base_commit_id = porcelain.commit(tmpdir, message=b"Base commit")
+            base_tree_id = repo[base_commit_id].tree
+
+            # Create our tree
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Our content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            our_commit_id = porcelain.commit(tmpdir, message=b"Our commit")
+            our_tree_id = repo[our_commit_id].tree
+
+            # Create their tree
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Their content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            their_commit_id = porcelain.commit(tmpdir, message=b"Their commit")
+            their_tree_id = repo[their_commit_id].tree
+
+            # Perform merge_tree with tree SHAs
+            merged_tree_id, conflicts = porcelain.merge_tree(
+                tmpdir,
+                base_tree_id if base_tree_id else None,
+                our_tree_id,
+                their_tree_id,
+            )
+
+            # Should have conflicts
+            self.assertEqual(conflicts, [b"file1.txt"])
+
+    def test_merge_tree_invalid_object(self):
+        """Test merge_tree with invalid object reference."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create a commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            commit_id = porcelain.commit(tmpdir, message=b"Commit")
+
+            # Try to merge with nonexistent object
+            self.assertRaises(
+                KeyError,
+                porcelain.merge_tree,
+                tmpdir,
+                None,
+                commit_id,
+                "0" * 40,  # Invalid SHA
+            )
+
+
 if __name__ == "__main__":
     unittest.main()