Răsfoiți Sursa

Add basic merge command

Jelmer Vernooij 2 săptămâni în urmă
părinte
comite
5c5095b375
7 a modificat fișierele cu 827 adăugiri și 63 ștergeri
  1. 2 0
      NEWS
  2. 44 0
      dulwich/cli.py
  3. 34 16
      dulwich/merge.py
  4. 128 0
      dulwich/porcelain.py
  5. 277 0
      tests/test_cli_merge.py
  6. 61 47
      tests/test_merge.py
  7. 281 0
      tests/test_porcelain_merge.py

+ 2 - 0
NEWS

@@ -22,6 +22,8 @@
  * Add type hint for ``dulwich.client.get_ssh_vendor``.
    (Jelmer Vernooij, #1471)
 
+ * Add basic merge command. (Jelmer Vernooij)
+
 0.22.8	2025-03-02
 
  * Allow passing in plain strings to ``dulwich.porcelain.tag_create``

+ 44 - 0
dulwich/cli.py

@@ -835,6 +835,49 @@ class cmd_describe(Command):
         print(porcelain.describe("."))
 
 
+class cmd_merge(Command):
+    def run(self, args) -> None:
+        parser = argparse.ArgumentParser()
+        parser.add_argument("commit", type=str, help="Commit to merge")
+        parser.add_argument(
+            "--no-commit", action="store_true", help="Do not create a merge commit"
+        )
+        parser.add_argument(
+            "--no-ff", action="store_true", help="Force create a merge commit"
+        )
+        parser.add_argument("-m", "--message", type=str, help="Merge commit message")
+        args = parser.parse_args(args)
+
+        try:
+            merge_commit_id, conflicts = porcelain.merge(
+                ".",
+                args.commit,
+                no_commit=args.no_commit,
+                no_ff=args.no_ff,
+                message=args.message,
+            )
+
+            if conflicts:
+                print(f"Merge conflicts in {len(conflicts)} file(s):")
+                for conflict_path in conflicts:
+                    print(f"  {conflict_path.decode()}")
+                print(
+                    "\nAutomatic merge failed; fix conflicts and then commit the result."
+                )
+                sys.exit(1)
+            elif merge_commit_id is None:
+                print("Already up to date.")
+            elif args.no_commit:
+                print("Automatic merge successful; not committing as requested.")
+            else:
+                print(
+                    f"Merge successful. Created merge commit {merge_commit_id.decode()}"
+                )
+        except porcelain.Error as e:
+            print(f"Error: {e}")
+            sys.exit(1)
+
+
 class cmd_help(Command):
     def run(self, args) -> None:
         parser = optparse.OptionParser()
@@ -888,6 +931,7 @@ commands = {
     "ls-files": cmd_ls_files,
     "ls-remote": cmd_ls_remote,
     "ls-tree": cmd_ls_tree,
+    "merge": cmd_merge,
     "pack-objects": cmd_pack_objects,
     "pack-refs": cmd_pack_refs,
     "pull": cmd_pull,

+ 34 - 16
dulwich/merge.py

@@ -47,8 +47,10 @@ class Merger:
             Tuple of (merged_content, had_conflicts)
         """
         if merge3 is None:
-            raise ImportError("merge3 is required for merging. Install with: pip install dulwich[merge]")
-            
+            raise ImportError(
+                "merge3 is required for merging. Install with: pip install dulwich[merge]"
+            )
+
         # Handle deletion cases
         if ours_blob is None and theirs_blob is None:
             return b"", False
@@ -123,7 +125,7 @@ class Merger:
         # Check for conflicts and generate merged content
         merged_content = self._merge3_to_bytes(m)
         has_conflicts = b"<<<<<<< ours" in merged_content
-        
+
         return merged_content, has_conflicts
 
     def _merge3_to_bytes(self, m: merge3.Merge3) -> bytes:
@@ -141,14 +143,14 @@ class Merger:
                 result.extend(group[1])
             elif group[0] == "a":
                 result.extend(group[1])
-            elif group[0] == "b": 
+            elif group[0] == "b":
                 result.extend(group[1])
             elif group[0] == "same":
                 result.extend(group[1])
             elif group[0] == "conflict":
                 # Check if this is a real conflict or just different changes
                 base_lines, a_lines, b_lines = group[1], group[2], group[3]
-                
+
                 # Try to merge line by line
                 if self._can_merge_lines(base_lines, a_lines, b_lines):
                     merged_lines = self._merge_lines(base_lines, a_lines, b_lines)
@@ -162,8 +164,10 @@ class Merger:
                     result.append(b">>>>>>> theirs\n")
 
         return b"".join(result)
-        
-    def _can_merge_lines(self, base_lines: list[bytes], a_lines: list[bytes], b_lines: list[bytes]) -> bool:
+
+    def _can_merge_lines(
+        self, base_lines: list[bytes], a_lines: list[bytes], b_lines: list[bytes]
+    ) -> bool:
         """Check if lines can be merged without conflict."""
         # If one side is unchanged, we can take the other side
         if base_lines == a_lines:
@@ -174,8 +178,10 @@ class Merger:
             # For now, treat any difference as a conflict
             # A more sophisticated algorithm would check for non-overlapping changes
             return False
-        
-    def _merge_lines(self, base_lines: list[bytes], a_lines: list[bytes], b_lines: list[bytes]) -> list[bytes]:
+
+    def _merge_lines(
+        self, base_lines: list[bytes], a_lines: list[bytes], b_lines: list[bytes]
+    ) -> list[bytes]:
         """Merge lines when possible."""
         if base_lines == a_lines:
             return b_lines
@@ -219,17 +225,21 @@ class Merger:
             base_entry = None
             if base_tree:
                 try:
-                    base_entry = base_tree.lookup_path(self.object_store.__getitem__, path)
+                    base_entry = base_tree.lookup_path(
+                        self.object_store.__getitem__, path
+                    )
                 except KeyError:
                     pass
-            
+
             try:
                 ours_entry = ours_tree.lookup_path(self.object_store.__getitem__, path)
             except KeyError:
                 ours_entry = None
-                
+
             try:
-                theirs_entry = theirs_tree.lookup_path(self.object_store.__getitem__, path)
+                theirs_entry = theirs_tree.lookup_path(
+                    self.object_store.__getitem__, path
+                )
             except KeyError:
                 theirs_entry = None
 
@@ -313,9 +323,17 @@ class Merger:
                     merged_entries[path] = (ours_mode, ours_sha)
                 else:
                     # Try to merge blobs
-                    base_blob = cast(Blob, self.object_store[base_sha]) if base_sha else None
-                    ours_blob = cast(Blob, self.object_store[ours_sha]) if ours_sha else None
-                    theirs_blob = cast(Blob, self.object_store[theirs_sha]) if theirs_sha else None
+                    base_blob = (
+                        cast(Blob, self.object_store[base_sha]) if base_sha else None
+                    )
+                    ours_blob = (
+                        cast(Blob, self.object_store[ours_sha]) if ours_sha else None
+                    )
+                    theirs_blob = (
+                        cast(Blob, self.object_store[theirs_sha])
+                        if theirs_sha
+                        else None
+                    )
 
                     merged_content, had_conflict = self.merge_blobs(
                         base_blob, ours_blob, theirs_blob

+ 128 - 0
dulwich/porcelain.py

@@ -40,6 +40,7 @@ Currently implemented:
  * ls_files
  * ls_remote
  * ls_tree
+ * merge
  * pull
  * push
  * rm
@@ -2444,3 +2445,130 @@ def write_tree(repo):
     """
     with open_repo_closing(repo) as r:
         return r.open_index().commit(r.object_store)
+
+
+def merge(
+    repo,
+    committish,
+    no_commit=False,
+    no_ff=False,
+    message=None,
+    author=None,
+    committer=None,
+):
+    """Merge a commit into the current branch.
+
+    Args:
+      repo: Repository to merge into
+      committish: Commit to merge
+      no_commit: If True, do not create a merge commit
+      no_ff: If True, force creation of a merge commit
+      message: Optional merge commit message
+      author: Optional author for merge commit
+      committer: Optional committer for merge commit
+
+    Returns:
+      Tuple of (merge_commit_sha, conflicts) where merge_commit_sha is None
+      if no_commit=True or there were conflicts
+
+    Raises:
+      Error: If there is no HEAD reference or commit cannot be found
+    """
+    from .graph import find_merge_base
+    from .index import build_index_from_tree
+    from .merge import three_way_merge
+
+    with open_repo_closing(repo) as r:
+        # Get HEAD commit
+        try:
+            head_commit_id = r.refs[b"HEAD"]
+        except KeyError:
+            raise Error("No HEAD reference found")
+
+        # Parse the commit to merge
+        try:
+            merge_commit_id = parse_commit(r, committish)
+        except KeyError:
+            raise Error(f"Cannot find commit '{committish}'")
+
+        head_commit = r[head_commit_id]
+        merge_commit = r[merge_commit_id]
+
+        # Check if fast-forward is possible
+        merge_bases = find_merge_base(r, [head_commit_id, merge_commit_id])
+
+        if not merge_bases:
+            raise Error("No common ancestor found")
+
+        # Use the first merge base
+        base_commit_id = merge_bases[0]
+
+        # Check for fast-forward
+        if base_commit_id == head_commit_id and not no_ff:
+            # Fast-forward merge
+            r.refs[b"HEAD"] = merge_commit_id
+            # Update the working directory
+            index = r.open_index()
+            tree = r[merge_commit.tree]
+            build_index_from_tree(r.path, index, r.object_store, tree.id)
+            index.write()
+            return (merge_commit_id, [])
+
+        if base_commit_id == merge_commit_id:
+            # Already up to date
+            return (None, [])
+
+        # Perform three-way merge
+        base_commit = r[base_commit_id]
+        merged_tree, conflicts = three_way_merge(
+            r.object_store, base_commit, head_commit, merge_commit
+        )
+
+        # Add merged tree to object store
+        r.object_store.add_object(merged_tree)
+
+        # Update index and working directory
+        index = r.open_index()
+        build_index_from_tree(r.path, index, r.object_store, merged_tree.id)
+        index.write()
+
+        if conflicts or no_commit:
+            # Don't create a commit if there are conflicts or no_commit is True
+            return (None, conflicts)
+
+        # Create merge commit
+        merge_commit_obj = Commit()
+        merge_commit_obj.tree = merged_tree.id
+        merge_commit_obj.parents = [head_commit_id, merge_commit_id]
+
+        # Set author/committer
+        if author is None:
+            author = get_user_identity(r.get_config_stack())
+        if committer is None:
+            committer = author
+
+        merge_commit_obj.author = author
+        merge_commit_obj.committer = committer
+
+        # Set timestamps
+        timestamp = int(time())
+        timezone = 0  # UTC
+        merge_commit_obj.author_time = timestamp
+        merge_commit_obj.author_timezone = timezone
+        merge_commit_obj.commit_time = timestamp
+        merge_commit_obj.commit_timezone = timezone
+
+        # Set commit message
+        if message is None:
+            message = f"Merge commit '{merge_commit_id.decode()[:7]}'\n"
+        merge_commit_obj.message = (
+            message.encode() if isinstance(message, str) else message
+        )
+
+        # Add commit to object store
+        r.object_store.add_object(merge_commit_obj)
+
+        # Update HEAD
+        r.refs[b"HEAD"] = merge_commit_obj.id
+
+        return (merge_commit_obj.id, [])

+ 277 - 0
tests/test_cli_merge.py

@@ -0,0 +1,277 @@
+# test_cli_merge.py -- Tests for dulwich merge CLI command
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for dulwich merge CLI command."""
+
+import io
+import os
+import tempfile
+import unittest
+from unittest.mock import patch
+
+from dulwich import porcelain
+from dulwich.cli import main
+from dulwich.tests import TestCase
+
+
+class CLIMergeTests(TestCase):
+    """Tests for the dulwich merge CLI command."""
+
+    def test_merge_fast_forward(self):
+        """Test CLI merge with fast-forward."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master
+            porcelain.checkout_branch(tmpdir, "master")
+
+            # Test merge via CLI
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    ret = main(["merge", "feature"])
+                    output = mock_stdout.getvalue()
+
+                self.assertEqual(ret, None)  # Success
+                self.assertIn("Merge successful", output)
+
+                # Check that file2.txt exists
+                self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
+            finally:
+                os.chdir(old_cwd)
+
+    def test_merge_with_conflicts(self):
+        """Test CLI merge with conflicts."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch and modify file1
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(
+                tmpdir, message=b"Modify file1 in feature"
+            )
+
+            # Go back to master and modify file1 differently
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Master content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Modify file1 in master")
+
+            # Test merge via CLI - should exit with error
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    with patch("sys.exit") as mock_exit:
+                        main(["merge", "feature"])
+                        mock_exit.assert_called_with(1)
+                    output = mock_stdout.getvalue()
+
+                self.assertIn("Merge conflicts", output)
+                self.assertIn("file1.txt", output)
+            finally:
+                os.chdir(old_cwd)
+
+    def test_merge_already_up_to_date(self):
+        """Test CLI merge when already up to date."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Test merge via CLI
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    ret = main(["merge", "HEAD"])
+                    output = mock_stdout.getvalue()
+
+                self.assertEqual(ret, None)  # Success
+                self.assertIn("Already up to date", output)
+            finally:
+                os.chdir(old_cwd)
+
+    def test_merge_no_commit(self):
+        """Test CLI merge with --no-commit."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master and add another file
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file3.txt"), "w") as f:
+                f.write("Master content\n")
+            porcelain.add(tmpdir, paths=["file3.txt"])
+            porcelain.commit(tmpdir, message=b"Add file3")
+
+            # Test merge via CLI with --no-commit
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    ret = main(["merge", "--no-commit", "feature"])
+                    output = mock_stdout.getvalue()
+
+                self.assertEqual(ret, None)  # Success
+                self.assertIn("not committing", output)
+
+                # Check that files are merged
+                self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
+                self.assertTrue(os.path.exists(os.path.join(tmpdir, "file3.txt")))
+            finally:
+                os.chdir(old_cwd)
+
+    def test_merge_no_ff(self):
+        """Test CLI merge with --no-ff."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master
+            porcelain.checkout_branch(tmpdir, "master")
+
+            # Test merge via CLI with --no-ff
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    ret = main(["merge", "--no-ff", "feature"])
+                    output = mock_stdout.getvalue()
+
+                self.assertEqual(ret, None)  # Success
+                self.assertIn("Merge successful", output)
+                self.assertIn("Created merge commit", output)
+            finally:
+                os.chdir(old_cwd)
+
+    def test_merge_with_message(self):
+        """Test CLI merge with custom message."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master and add another file
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file3.txt"), "w") as f:
+                f.write("Master content\n")
+            porcelain.add(tmpdir, paths=["file3.txt"])
+            porcelain.commit(tmpdir, message=b"Add file3")
+
+            # Test merge via CLI with custom message
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    ret = main(["merge", "-m", "Custom merge message", "feature"])
+                    output = mock_stdout.getvalue()
+
+                self.assertEqual(ret, None)  # Success
+                self.assertIn("Merge successful", output)
+            finally:
+                os.chdir(old_cwd)
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 61 - 47
tests/test_merge.py

@@ -18,20 +18,22 @@ class MergeTests(unittest.TestCase):
         """Test merging blobs without conflicts."""
         # Create base blob
         base_blob = Blob.from_string(b"line1\nline2\nline3\n")
-        
+
         # Create modified versions - currently our algorithm treats changes to different line groups as conflicts
         # This is a simple implementation - Git's merge is more sophisticated
         ours_blob = Blob.from_string(b"line1\nmodified line2\nline3\n")
         theirs_blob = Blob.from_string(b"line1\nline2\nmodified line3\n")
-        
+
         # Add blobs to object store
         self.repo.object_store.add_object(base_blob)
         self.repo.object_store.add_object(ours_blob)
         self.repo.object_store.add_object(theirs_blob)
-        
+
         # Merge - this will result in a conflict with our simple algorithm
-        result, has_conflicts = self.merger.merge_blobs(base_blob, ours_blob, theirs_blob)
-        
+        result, has_conflicts = self.merger.merge_blobs(
+            base_blob, ours_blob, theirs_blob
+        )
+
         # For now, expect conflicts since both sides changed (even different lines)
         self.assertTrue(has_conflicts)
         self.assertIn(b"<<<<<<< ours", result)
@@ -41,18 +43,20 @@ class MergeTests(unittest.TestCase):
         """Test merging blobs with a clean merge (one side unchanged)."""
         # Create base blob
         base_blob = Blob.from_string(b"line1\nline2\nline3\n")
-        
+
         # Only ours modifies
         ours_blob = Blob.from_string(b"line1\nmodified line2\nline3\n")
         theirs_blob = base_blob  # unchanged
-        
+
         # Add blobs to object store
         self.repo.object_store.add_object(base_blob)
         self.repo.object_store.add_object(ours_blob)
-        
+
         # Merge
-        result, has_conflicts = self.merger.merge_blobs(base_blob, ours_blob, theirs_blob)
-        
+        result, has_conflicts = self.merger.merge_blobs(
+            base_blob, ours_blob, theirs_blob
+        )
+
         self.assertFalse(has_conflicts)
         self.assertEqual(result, b"line1\nmodified line2\nline3\n")
 
@@ -60,19 +64,21 @@ class MergeTests(unittest.TestCase):
         """Test merging blobs with conflicts."""
         # Create base blob
         base_blob = Blob.from_string(b"line1\nline2\nline3\n")
-        
+
         # Create conflicting modifications
         ours_blob = Blob.from_string(b"line1\nours line2\nline3\n")
         theirs_blob = Blob.from_string(b"line1\ntheirs line2\nline3\n")
-        
+
         # Add blobs to object store
         self.repo.object_store.add_object(base_blob)
         self.repo.object_store.add_object(ours_blob)
         self.repo.object_store.add_object(theirs_blob)
-        
+
         # Merge
-        result, has_conflicts = self.merger.merge_blobs(base_blob, ours_blob, theirs_blob)
-        
+        result, has_conflicts = self.merger.merge_blobs(
+            base_blob, ours_blob, theirs_blob
+        )
+
         self.assertTrue(has_conflicts)
         self.assertIn(b"<<<<<<< ours", result)
         self.assertIn(b"=======", result)
@@ -82,9 +88,9 @@ class MergeTests(unittest.TestCase):
         """Test merging identical blobs."""
         blob = Blob.from_string(b"same content\n")
         self.repo.object_store.add_object(blob)
-        
+
         result, has_conflicts = self.merger.merge_blobs(blob, blob, blob)
-        
+
         self.assertFalse(has_conflicts)
         self.assertEqual(result, b"same content\n")
 
@@ -92,17 +98,21 @@ class MergeTests(unittest.TestCase):
         """Test merging when one side is unchanged."""
         base_blob = Blob.from_string(b"original\n")
         modified_blob = Blob.from_string(b"modified\n")
-        
+
         self.repo.object_store.add_object(base_blob)
         self.repo.object_store.add_object(modified_blob)
-        
+
         # Test ours unchanged, theirs modified
-        result, has_conflicts = self.merger.merge_blobs(base_blob, base_blob, modified_blob)
+        result, has_conflicts = self.merger.merge_blobs(
+            base_blob, base_blob, modified_blob
+        )
         self.assertFalse(has_conflicts)
         self.assertEqual(result, b"modified\n")
-        
+
         # Test theirs unchanged, ours modified
-        result, has_conflicts = self.merger.merge_blobs(base_blob, modified_blob, base_blob)
+        result, has_conflicts = self.merger.merge_blobs(
+            base_blob, modified_blob, base_blob
+        )
         self.assertFalse(has_conflicts)
         self.assertEqual(result, b"modified\n")
 
@@ -110,12 +120,12 @@ class MergeTests(unittest.TestCase):
         """Test merging with deletion where no conflict occurs."""
         base_blob = Blob.from_string(b"content\n")
         self.repo.object_store.add_object(base_blob)
-        
+
         # Both delete
         result, has_conflicts = self.merger.merge_blobs(base_blob, None, None)
         self.assertFalse(has_conflicts)
         self.assertEqual(result, b"")
-        
+
         # One deletes, other unchanged
         result, has_conflicts = self.merger.merge_blobs(base_blob, None, base_blob)
         self.assertFalse(has_conflicts)
@@ -125,10 +135,10 @@ class MergeTests(unittest.TestCase):
         """Test merging with deletion that causes conflict."""
         base_blob = Blob.from_string(b"content\n")
         modified_blob = Blob.from_string(b"modified content\n")
-        
+
         self.repo.object_store.add_object(base_blob)
         self.repo.object_store.add_object(modified_blob)
-        
+
         # We delete, they modify
         result, has_conflicts = self.merger.merge_blobs(base_blob, None, modified_blob)
         self.assertTrue(has_conflicts)
@@ -137,14 +147,14 @@ class MergeTests(unittest.TestCase):
         """Test merging blobs with no common ancestor."""
         blob1 = Blob.from_string(b"content1\n")
         blob2 = Blob.from_string(b"content2\n")
-        
+
         self.repo.object_store.add_object(blob1)
         self.repo.object_store.add_object(blob2)
-        
+
         # Different content added in both - conflict
         result, has_conflicts = self.merger.merge_blobs(None, blob1, blob2)
         self.assertTrue(has_conflicts)
-        
+
         # Same content added in both - no conflict
         result, has_conflicts = self.merger.merge_blobs(None, blob1, blob1)
         self.assertFalse(has_conflicts)
@@ -161,7 +171,7 @@ class MergeTests(unittest.TestCase):
         base_tree.add(b"file1.txt", 0o100644, blob1.id)
         base_tree.add(b"file2.txt", 0o100644, blob2.id)
         self.repo.object_store.add_object(base_tree)
-        
+
         # Create ours tree (modify file1)
         ours_tree = Tree()
         ours_blob1 = Blob.from_string(b"file1 modified by ours\n")
@@ -169,7 +179,7 @@ class MergeTests(unittest.TestCase):
         ours_tree.add(b"file1.txt", 0o100644, ours_blob1.id)
         ours_tree.add(b"file2.txt", 0o100644, blob2.id)
         self.repo.object_store.add_object(ours_tree)
-        
+
         # Create theirs tree (modify file2)
         theirs_tree = Tree()
         theirs_blob2 = Blob.from_string(b"file2 modified by theirs\n")
@@ -177,10 +187,12 @@ class MergeTests(unittest.TestCase):
         theirs_tree.add(b"file1.txt", 0o100644, blob1.id)
         theirs_tree.add(b"file2.txt", 0o100644, theirs_blob2.id)
         self.repo.object_store.add_object(theirs_tree)
-        
+
         # Merge
-        merged_tree, conflicts = self.merger.merge_trees(base_tree, ours_tree, theirs_tree)
-        
+        merged_tree, conflicts = self.merger.merge_trees(
+            base_tree, ours_tree, theirs_tree
+        )
+
         self.assertEqual(len(conflicts), 0)
         self.assertIn(b"file1.txt", [item.path for item in merged_tree.items()])
         self.assertIn(b"file2.txt", [item.path for item in merged_tree.items()])
@@ -193,24 +205,26 @@ class MergeTests(unittest.TestCase):
         self.repo.object_store.add_object(blob1)
         base_tree.add(b"conflict.txt", 0o100644, blob1.id)
         self.repo.object_store.add_object(base_tree)
-        
+
         # Create ours tree
         ours_tree = Tree()
         ours_blob = Blob.from_string(b"ours content\n")
         self.repo.object_store.add_object(ours_blob)
         ours_tree.add(b"conflict.txt", 0o100644, ours_blob.id)
         self.repo.object_store.add_object(ours_tree)
-        
+
         # Create theirs tree
         theirs_tree = Tree()
         theirs_blob = Blob.from_string(b"theirs content\n")
         self.repo.object_store.add_object(theirs_blob)
         theirs_tree.add(b"conflict.txt", 0o100644, theirs_blob.id)
         self.repo.object_store.add_object(theirs_tree)
-        
+
         # Merge
-        merged_tree, conflicts = self.merger.merge_trees(base_tree, ours_tree, theirs_tree)
-        
+        merged_tree, conflicts = self.merger.merge_trees(
+            base_tree, ours_tree, theirs_tree
+        )
+
         self.assertEqual(len(conflicts), 1)
         self.assertEqual(conflicts[0], b"conflict.txt")
 
@@ -222,7 +236,7 @@ class MergeTests(unittest.TestCase):
         self.repo.object_store.add_object(blob)
         base_tree.add(b"file.txt", 0o100644, blob.id)
         self.repo.object_store.add_object(base_tree)
-        
+
         base_commit = Commit()
         base_commit.tree = base_tree.id
         base_commit.author = b"Test Author <test@example.com>"
@@ -231,14 +245,14 @@ class MergeTests(unittest.TestCase):
         base_commit.commit_time = base_commit.author_time = 12345
         base_commit.commit_timezone = base_commit.author_timezone = 0
         self.repo.object_store.add_object(base_commit)
-        
+
         # Create ours commit
         ours_tree = Tree()
         ours_blob = Blob.from_string(b"ours content\n")
         self.repo.object_store.add_object(ours_blob)
         ours_tree.add(b"file.txt", 0o100644, ours_blob.id)
         self.repo.object_store.add_object(ours_tree)
-        
+
         ours_commit = Commit()
         ours_commit.tree = ours_tree.id
         ours_commit.parents = [base_commit.id]
@@ -248,14 +262,14 @@ class MergeTests(unittest.TestCase):
         ours_commit.commit_time = ours_commit.author_time = 12346
         ours_commit.commit_timezone = ours_commit.author_timezone = 0
         self.repo.object_store.add_object(ours_commit)
-        
+
         # Create theirs commit
         theirs_tree = Tree()
         theirs_blob = Blob.from_string(b"theirs content\n")
         self.repo.object_store.add_object(theirs_blob)
         theirs_tree.add(b"file.txt", 0o100644, theirs_blob.id)
         self.repo.object_store.add_object(theirs_tree)
-        
+
         theirs_commit = Commit()
         theirs_commit.tree = theirs_tree.id
         theirs_commit.parents = [base_commit.id]
@@ -265,12 +279,12 @@ class MergeTests(unittest.TestCase):
         theirs_commit.commit_time = theirs_commit.author_time = 12347
         theirs_commit.commit_timezone = theirs_commit.author_timezone = 0
         self.repo.object_store.add_object(theirs_commit)
-        
+
         # Perform three-way merge
         merged_tree, conflicts = three_way_merge(
             self.repo.object_store, base_commit, ours_commit, theirs_commit
         )
-        
+
         # Should have conflict since both modified the same file differently
         self.assertEqual(len(conflicts), 1)
         self.assertEqual(conflicts[0], b"file.txt")
@@ -280,4 +294,4 @@ class MergeTests(unittest.TestCase):
         exc = MergeConflict(b"test/path", "test message")
         self.assertEqual(exc.path, b"test/path")
         self.assertIn("test/path", str(exc))
-        self.assertIn("test message", str(exc))
+        self.assertIn("test message", str(exc))

+ 281 - 0
tests/test_porcelain_merge.py

@@ -0,0 +1,281 @@
+# test_porcelain_merge.py -- Tests for porcelain merge functionality
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for porcelain merge functionality."""
+
+import os
+import tempfile
+import unittest
+
+from dulwich import porcelain
+from dulwich.repo import Repo
+from dulwich.tests import TestCase
+
+
+class PorcelainMergeTests(TestCase):
+    """Tests for the porcelain merge functionality."""
+
+    def test_merge_fast_forward(self):
+        """Test fast-forward merge."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            feature_commit = porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master
+            porcelain.checkout_branch(tmpdir, "master")
+
+            # Merge feature branch (should fast-forward)
+            merge_commit, conflicts = porcelain.merge(tmpdir, "feature")
+
+            self.assertEqual(merge_commit, feature_commit)
+            self.assertEqual(conflicts, [])
+
+            # Check that file2.txt exists
+            self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
+
+    def test_merge_already_up_to_date(self):
+        """Test merge when already up to date."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Try to merge the same commit
+            merge_commit, conflicts = porcelain.merge(tmpdir, "HEAD")
+
+            self.assertIsNone(merge_commit)
+            self.assertEqual(conflicts, [])
+
+    def test_merge_no_ff(self):
+        """Test merge with --no-ff flag."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            feature_commit = porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master
+            porcelain.checkout_branch(tmpdir, "master")
+
+            # Merge feature branch with no-ff
+            merge_commit, conflicts = porcelain.merge(tmpdir, "feature", no_ff=True)
+
+            # Should create a new merge commit
+            self.assertIsNotNone(merge_commit)
+            self.assertNotEqual(merge_commit, feature_commit)
+            self.assertEqual(conflicts, [])
+
+            # Check that it's a merge commit with two parents
+            with Repo(tmpdir) as repo:
+                commit = repo[merge_commit]
+                self.assertEqual(len(commit.parents), 2)
+
+    def test_merge_three_way(self):
+        """Test three-way merge without conflicts."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Initial file2\n")
+            porcelain.add(tmpdir, paths=["file1.txt", "file2.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch and modify file1
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(
+                tmpdir, message=b"Modify file1 in feature"
+            )
+
+            # Go back to master and modify file2
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Master file2\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Modify file2 in master")
+
+            # Merge feature branch
+            merge_commit, conflicts = porcelain.merge(tmpdir, "feature")
+
+            self.assertIsNotNone(merge_commit)
+            self.assertEqual(conflicts, [])
+
+            # Check both modifications are present
+            with open(os.path.join(tmpdir, "file1.txt")) as f:
+                self.assertEqual(f.read(), "Feature content\n")
+            with open(os.path.join(tmpdir, "file2.txt")) as f:
+                self.assertEqual(f.read(), "Master file2\n")
+
+    def test_merge_with_conflicts(self):
+        """Test merge with conflicts."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch and modify file1
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(
+                tmpdir, message=b"Modify file1 in feature"
+            )
+
+            # Go back to master and modify file1 differently
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Master content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Modify file1 in master")
+
+            # Merge feature branch - should have conflicts
+            merge_commit, conflicts = porcelain.merge(tmpdir, "feature")
+
+            self.assertIsNone(merge_commit)
+            self.assertEqual(len(conflicts), 1)
+            self.assertEqual(conflicts[0], b"file1.txt")
+
+            # Check conflict markers in file
+            with open(os.path.join(tmpdir, "file1.txt"), "rb") as f:
+                content = f.read()
+                self.assertIn(b"<<<<<<< ours", content)
+                self.assertIn(b"=======", content)
+                self.assertIn(b">>>>>>> theirs", content)
+
+    def test_merge_no_commit(self):
+        """Test merge with no_commit flag."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master and add another file
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file3.txt"), "w") as f:
+                f.write("Master content\n")
+            porcelain.add(tmpdir, paths=["file3.txt"])
+            master_commit = porcelain.commit(tmpdir, message=b"Add file3")
+
+            # Merge feature branch with no_commit
+            merge_commit, conflicts = porcelain.merge(tmpdir, "feature", no_commit=True)
+
+            self.assertIsNone(merge_commit)
+            self.assertEqual(conflicts, [])
+
+            # Check that files are merged but no commit was created
+            self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
+            self.assertTrue(os.path.exists(os.path.join(tmpdir, "file3.txt")))
+
+            # HEAD should still point to master_commit
+            with Repo(tmpdir) as repo:
+                self.assertEqual(repo.refs[b"HEAD"], master_commit)
+
+    def test_merge_no_head(self):
+        """Test merge with no HEAD reference."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo without any commits
+            porcelain.init(tmpdir)
+
+            # Try to merge - should fail with no HEAD
+            self.assertRaises(porcelain.Error, porcelain.merge, tmpdir, "nonexistent")
+
+    def test_merge_invalid_commit(self):
+        """Test merge with invalid commit reference."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Try to merge nonexistent commit
+            self.assertRaises(porcelain.Error, porcelain.merge, tmpdir, "nonexistent")
+
+
+if __name__ == "__main__":
+    unittest.main()