فهرست منبع

Add merge command (#1554)

Jelmer Vernooij 2 هفته پیش
والد
کامیت
1f7c55b7df
9فایلهای تغییر یافته به همراه1409 افزوده شده و 1 حذف شده
  1. 5 0
      NEWS
  2. 44 0
      dulwich/cli.py
  3. 1 1
      dulwich/client.py
  4. 381 0
      dulwich/merge.py
  5. 128 0
      dulwich/porcelain.py
  6. 1 0
      pyproject.toml
  7. 275 0
      tests/test_cli_merge.py
  8. 297 0
      tests/test_merge.py
  9. 277 0
      tests/test_porcelain_merge.py

+ 5 - 0
NEWS

@@ -19,6 +19,11 @@
    when the subprocess hasn't terminated when closing
    the channel. (Jelmer Vernooij)
 
+ * Add type hint for ``dulwich.client.get_ssh_vendor``.
+   (Jelmer Vernooij, #1471)
+
+ * Add basic merge command. (Jelmer Vernooij)
+
 * Fix typing for ``dulwich.client`` methods that take repositories.
    (Jelmer Vernooij, #1521)
 

+ 44 - 0
dulwich/cli.py

@@ -835,6 +835,49 @@ class cmd_describe(Command):
         print(porcelain.describe("."))
 
 
+class cmd_merge(Command):
+    def run(self, args) -> None:
+        parser = argparse.ArgumentParser()
+        parser.add_argument("commit", type=str, help="Commit to merge")
+        parser.add_argument(
+            "--no-commit", action="store_true", help="Do not create a merge commit"
+        )
+        parser.add_argument(
+            "--no-ff", action="store_true", help="Force create a merge commit"
+        )
+        parser.add_argument("-m", "--message", type=str, help="Merge commit message")
+        args = parser.parse_args(args)
+
+        try:
+            merge_commit_id, conflicts = porcelain.merge(
+                ".",
+                args.commit,
+                no_commit=args.no_commit,
+                no_ff=args.no_ff,
+                message=args.message,
+            )
+
+            if conflicts:
+                print(f"Merge conflicts in {len(conflicts)} file(s):")
+                for conflict_path in conflicts:
+                    print(f"  {conflict_path.decode()}")
+                print(
+                    "\nAutomatic merge failed; fix conflicts and then commit the result."
+                )
+                sys.exit(1)
+            elif merge_commit_id is None:
+                print("Already up to date.")
+            elif args.no_commit:
+                print("Automatic merge successful; not committing as requested.")
+            else:
+                print(
+                    f"Merge successful. Created merge commit {merge_commit_id.decode()}"
+                )
+        except porcelain.Error as e:
+            print(f"Error: {e}")
+            sys.exit(1)
+
+
 class cmd_help(Command):
     def run(self, args) -> None:
         parser = optparse.OptionParser()
@@ -888,6 +931,7 @@ commands = {
     "ls-files": cmd_ls_files,
     "ls-remote": cmd_ls_remote,
     "ls-tree": cmd_ls_tree,
+    "merge": cmd_merge,
     "pack-objects": cmd_pack_objects,
     "pack-refs": cmd_pack_refs,
     "pull": cmd_pull,

+ 1 - 1
dulwich/client.py

@@ -2111,7 +2111,7 @@ def ParamikoSSHVendor(**kwargs):
 
 
 # Can be overridden by users
-get_ssh_vendor = SubprocessSSHVendor
+get_ssh_vendor: Callable[[], SSHVendor] = SubprocessSSHVendor
 
 
 class SSHGitClient(TraditionalGitClient):

+ 381 - 0
dulwich/merge.py

@@ -0,0 +1,381 @@
+"""Git merge implementation."""
+
+from typing import Optional, cast
+
+try:
+    import merge3
+except ImportError:
+    merge3 = None
+
+from dulwich.object_store import BaseObjectStore
+from dulwich.objects import S_ISGITLINK, Blob, Commit, Tree
+
+
+class MergeConflict(Exception):
+    """Raised when a merge conflict occurs."""
+
+    def __init__(self, path: bytes, message: str):
+        self.path = path
+        super().__init__(f"Merge conflict in {path!r}: {message}")
+
+
+class Merger:
+    """Handles git merge operations."""
+
+    def __init__(self, object_store: BaseObjectStore):
+        """Initialize merger.
+
+        Args:
+            object_store: Object store to read objects from
+        """
+        self.object_store = object_store
+
+    def merge_blobs(
+        self,
+        base_blob: Optional[Blob],
+        ours_blob: Optional[Blob],
+        theirs_blob: Optional[Blob],
+    ) -> tuple[bytes, bool]:
+        """Perform three-way merge on blob contents.
+
+        Args:
+            base_blob: Common ancestor blob (can be None)
+            ours_blob: Our version of the blob (can be None)
+            theirs_blob: Their version of the blob (can be None)
+
+        Returns:
+            Tuple of (merged_content, had_conflicts)
+        """
+        if merge3 is None:
+            raise ImportError(
+                "merge3 is required for merging. Install with: pip install dulwich[merge]"
+            )
+
+        # Handle deletion cases
+        if ours_blob is None and theirs_blob is None:
+            return b"", False
+
+        if base_blob is None:
+            # No common ancestor
+            if ours_blob is None:
+                assert theirs_blob is not None
+                return theirs_blob.data, False
+            elif theirs_blob is None:
+                return ours_blob.data, False
+            elif ours_blob.data == theirs_blob.data:
+                return ours_blob.data, False
+            else:
+                # Both added different content - conflict
+                m = merge3.Merge3(
+                    [],
+                    ours_blob.data.splitlines(True),
+                    theirs_blob.data.splitlines(True),
+                )
+                return self._merge3_to_bytes(m), True
+
+        # Get content for each version
+        base_content = base_blob.data if base_blob else b""
+        ours_content = ours_blob.data if ours_blob else b""
+        theirs_content = theirs_blob.data if theirs_blob else b""
+
+        # Check if either side deleted
+        if ours_blob is None or theirs_blob is None:
+            if ours_blob is None and theirs_blob is None:
+                return b"", False
+            elif ours_blob is None:
+                # We deleted, check if they modified
+                if base_content == theirs_content:
+                    return b"", False  # They didn't modify, accept deletion
+                else:
+                    # Conflict: we deleted, they modified
+                    m = merge3.Merge3(
+                        base_content.splitlines(True),
+                        [],
+                        theirs_content.splitlines(True),
+                    )
+                    return self._merge3_to_bytes(m), True
+            else:
+                # They deleted, check if we modified
+                if base_content == ours_content:
+                    return b"", False  # We didn't modify, accept deletion
+                else:
+                    # Conflict: they deleted, we modified
+                    m = merge3.Merge3(
+                        base_content.splitlines(True),
+                        ours_content.splitlines(True),
+                        [],
+                    )
+                    return self._merge3_to_bytes(m), True
+
+        # Both sides exist, check if merge is needed
+        if ours_content == theirs_content:
+            return ours_content, False
+        elif base_content == ours_content:
+            return theirs_content, False
+        elif base_content == theirs_content:
+            return ours_content, False
+
+        # Perform three-way merge
+        m = merge3.Merge3(
+            base_content.splitlines(True),
+            ours_content.splitlines(True),
+            theirs_content.splitlines(True),
+        )
+
+        # Check for conflicts and generate merged content
+        merged_content = self._merge3_to_bytes(m)
+        has_conflicts = b"<<<<<<< ours" in merged_content
+
+        return merged_content, has_conflicts
+
+    def _merge3_to_bytes(self, m: merge3.Merge3) -> bytes:
+        """Convert merge3 result to bytes with conflict markers.
+
+        Args:
+            m: Merge3 object
+
+        Returns:
+            Merged content as bytes
+        """
+        result = []
+        for group in m.merge_groups():
+            if group[0] == "unchanged":
+                result.extend(group[1])
+            elif group[0] == "a":
+                result.extend(group[1])
+            elif group[0] == "b":
+                result.extend(group[1])
+            elif group[0] == "same":
+                result.extend(group[1])
+            elif group[0] == "conflict":
+                # Check if this is a real conflict or just different changes
+                base_lines, a_lines, b_lines = group[1], group[2], group[3]
+
+                # Try to merge line by line
+                if self._can_merge_lines(base_lines, a_lines, b_lines):
+                    merged_lines = self._merge_lines(base_lines, a_lines, b_lines)
+                    result.extend(merged_lines)
+                else:
+                    # Real conflict - add conflict markers
+                    result.append(b"<<<<<<< ours\n")
+                    result.extend(a_lines)
+                    result.append(b"=======\n")
+                    result.extend(b_lines)
+                    result.append(b">>>>>>> theirs\n")
+
+        return b"".join(result)
+
+    def _can_merge_lines(
+        self, base_lines: list[bytes], a_lines: list[bytes], b_lines: list[bytes]
+    ) -> bool:
+        """Check if lines can be merged without conflict."""
+        # If one side is unchanged, we can take the other side
+        if base_lines == a_lines:
+            return True
+        elif base_lines == b_lines:
+            return True
+        else:
+            # For now, treat any difference as a conflict
+            # A more sophisticated algorithm would check for non-overlapping changes
+            return False
+
+    def _merge_lines(
+        self, base_lines: list[bytes], a_lines: list[bytes], b_lines: list[bytes]
+    ) -> list[bytes]:
+        """Merge lines when possible."""
+        if base_lines == a_lines:
+            return b_lines
+        elif base_lines == b_lines:
+            return a_lines
+        else:
+            # This shouldn't happen if _can_merge_lines returned True
+            return a_lines
+
+    def merge_trees(
+        self, base_tree: Optional[Tree], ours_tree: Tree, theirs_tree: Tree
+    ) -> tuple[Tree, list[bytes]]:
+        """Perform three-way merge on trees.
+
+        Args:
+            base_tree: Common ancestor tree (can be None for no common ancestor)
+            ours_tree: Our version of the tree
+            theirs_tree: Their version of the tree
+
+        Returns:
+            tuple of (merged_tree, list_of_conflicted_paths)
+        """
+        conflicts = []
+        merged_entries = {}
+
+        # Get all paths from all trees
+        all_paths = set()
+
+        if base_tree:
+            for entry in base_tree.items():
+                all_paths.add(entry.path)
+
+        for entry in ours_tree.items():
+            all_paths.add(entry.path)
+
+        for entry in theirs_tree.items():
+            all_paths.add(entry.path)
+
+        # Process each path
+        for path in sorted(all_paths):
+            base_entry = None
+            if base_tree:
+                try:
+                    base_entry = base_tree.lookup_path(
+                        self.object_store.__getitem__, path
+                    )
+                except KeyError:
+                    pass
+
+            try:
+                ours_entry = ours_tree.lookup_path(self.object_store.__getitem__, path)
+            except KeyError:
+                ours_entry = None
+
+            try:
+                theirs_entry = theirs_tree.lookup_path(
+                    self.object_store.__getitem__, path
+                )
+            except KeyError:
+                theirs_entry = None
+
+            # Extract mode and sha
+            base_mode, base_sha = base_entry if base_entry else (None, None)
+            ours_mode, ours_sha = ours_entry if ours_entry else (None, None)
+            theirs_mode, theirs_sha = theirs_entry if theirs_entry else (None, None)
+
+            # Handle deletions
+            if ours_sha is None and theirs_sha is None:
+                continue  # Deleted in both
+
+            # Handle additions
+            if base_sha is None:
+                if ours_sha == theirs_sha and ours_mode == theirs_mode:
+                    # Same addition in both
+                    merged_entries[path] = (ours_mode, ours_sha)
+                elif ours_sha is None:
+                    # Added only in theirs
+                    merged_entries[path] = (theirs_mode, theirs_sha)
+                elif theirs_sha is None:
+                    # Added only in ours
+                    merged_entries[path] = (ours_mode, ours_sha)
+                else:
+                    # Different additions - conflict
+                    conflicts.append(path)
+                    # For now, keep ours
+                    merged_entries[path] = (ours_mode, ours_sha)
+                continue
+
+            # Check for mode conflicts
+            if (
+                ours_mode != theirs_mode
+                and ours_mode is not None
+                and theirs_mode is not None
+            ):
+                conflicts.append(path)
+                # For now, keep ours
+                merged_entries[path] = (ours_mode, ours_sha)
+                continue
+
+            # Handle modifications
+            if ours_sha == theirs_sha:
+                # Same modification or no change
+                if ours_sha is not None:
+                    merged_entries[path] = (ours_mode, ours_sha)
+            elif base_sha == ours_sha and theirs_sha is not None:
+                # Only theirs modified
+                merged_entries[path] = (theirs_mode, theirs_sha)
+            elif base_sha == theirs_sha and ours_sha is not None:
+                # Only ours modified
+                merged_entries[path] = (ours_mode, ours_sha)
+            elif ours_sha is None:
+                # We deleted
+                if base_sha == theirs_sha:
+                    # They didn't modify, accept deletion
+                    pass
+                else:
+                    # They modified, we deleted - conflict
+                    conflicts.append(path)
+            elif theirs_sha is None:
+                # They deleted
+                if base_sha == ours_sha:
+                    # We didn't modify, accept deletion
+                    pass
+                else:
+                    # We modified, they deleted - conflict
+                    conflicts.append(path)
+                    merged_entries[path] = (ours_mode, ours_sha)
+            else:
+                # Both modified differently
+                # For trees and submodules, this is a conflict
+                if S_ISGITLINK(ours_mode or 0) or S_ISGITLINK(theirs_mode or 0):
+                    conflicts.append(path)
+                    merged_entries[path] = (ours_mode, ours_sha)
+                elif (ours_mode or 0) & 0o170000 == 0o040000 or (
+                    theirs_mode or 0
+                ) & 0o170000 == 0o040000:
+                    # Tree conflict
+                    conflicts.append(path)
+                    merged_entries[path] = (ours_mode, ours_sha)
+                else:
+                    # Try to merge blobs
+                    base_blob = (
+                        cast(Blob, self.object_store[base_sha]) if base_sha else None
+                    )
+                    ours_blob = (
+                        cast(Blob, self.object_store[ours_sha]) if ours_sha else None
+                    )
+                    theirs_blob = (
+                        cast(Blob, self.object_store[theirs_sha])
+                        if theirs_sha
+                        else None
+                    )
+
+                    merged_content, had_conflict = self.merge_blobs(
+                        base_blob, ours_blob, theirs_blob
+                    )
+
+                    if had_conflict:
+                        conflicts.append(path)
+
+                    # Store merged blob
+                    merged_blob = Blob.from_string(merged_content)
+                    self.object_store.add_object(merged_blob)
+                    merged_entries[path] = (ours_mode or theirs_mode, merged_blob.id)
+
+        # Build merged tree
+        merged_tree = Tree()
+        for path, (mode, sha) in sorted(merged_entries.items()):
+            merged_tree.add(path, mode, sha)
+
+        return merged_tree, conflicts
+
+
+def three_way_merge(
+    object_store: BaseObjectStore,
+    base_commit: Optional[Commit],
+    ours_commit: Commit,
+    theirs_commit: Commit,
+) -> tuple[Tree, list[bytes]]:
+    """Perform a three-way merge between commits.
+
+    Args:
+        object_store: Object store to read/write objects
+        base_commit: Common ancestor commit (None if no common ancestor)
+        ours_commit: Our commit
+        theirs_commit: Their commit
+
+    Returns:
+        tuple of (merged_tree, list_of_conflicted_paths)
+    """
+    merger = Merger(object_store)
+
+    base_tree = cast(Tree, object_store[base_commit.tree]) if base_commit else None
+    ours_tree = cast(Tree, object_store[ours_commit.tree])
+    theirs_tree = cast(Tree, object_store[theirs_commit.tree])
+
+    return merger.merge_trees(base_tree, ours_tree, theirs_tree)

+ 128 - 0
dulwich/porcelain.py

@@ -40,6 +40,7 @@ Currently implemented:
  * ls_files
  * ls_remote
  * ls_tree
+ * merge
  * pull
  * push
  * rm
@@ -2444,3 +2445,130 @@ def write_tree(repo):
     """
     with open_repo_closing(repo) as r:
         return r.open_index().commit(r.object_store)
+
+
+def merge(
+    repo,
+    committish,
+    no_commit=False,
+    no_ff=False,
+    message=None,
+    author=None,
+    committer=None,
+):
+    """Merge a commit into the current branch.
+
+    Args:
+      repo: Repository to merge into
+      committish: Commit to merge
+      no_commit: If True, do not create a merge commit
+      no_ff: If True, force creation of a merge commit
+      message: Optional merge commit message
+      author: Optional author for merge commit
+      committer: Optional committer for merge commit
+
+    Returns:
+      Tuple of (merge_commit_sha, conflicts) where merge_commit_sha is None
+      if no_commit=True or there were conflicts
+
+    Raises:
+      Error: If there is no HEAD reference or commit cannot be found
+    """
+    from .graph import find_merge_base
+    from .index import build_index_from_tree
+    from .merge import three_way_merge
+
+    with open_repo_closing(repo) as r:
+        # Get HEAD commit
+        try:
+            head_commit_id = r.refs[b"HEAD"]
+        except KeyError:
+            raise Error("No HEAD reference found")
+
+        # Parse the commit to merge
+        try:
+            merge_commit_id = parse_commit(r, committish)
+        except KeyError:
+            raise Error(f"Cannot find commit '{committish}'")
+
+        head_commit = r[head_commit_id]
+        merge_commit = r[merge_commit_id]
+
+        # Check if fast-forward is possible
+        merge_bases = find_merge_base(r, [head_commit_id, merge_commit_id])
+
+        if not merge_bases:
+            raise Error("No common ancestor found")
+
+        # Use the first merge base
+        base_commit_id = merge_bases[0]
+
+        # Check for fast-forward
+        if base_commit_id == head_commit_id and not no_ff:
+            # Fast-forward merge
+            r.refs[b"HEAD"] = merge_commit_id
+            # Update the working directory
+            index = r.open_index()
+            tree = r[merge_commit.tree]
+            build_index_from_tree(r.path, index, r.object_store, tree.id)
+            index.write()
+            return (merge_commit_id, [])
+
+        if base_commit_id == merge_commit_id:
+            # Already up to date
+            return (None, [])
+
+        # Perform three-way merge
+        base_commit = r[base_commit_id]
+        merged_tree, conflicts = three_way_merge(
+            r.object_store, base_commit, head_commit, merge_commit
+        )
+
+        # Add merged tree to object store
+        r.object_store.add_object(merged_tree)
+
+        # Update index and working directory
+        index = r.open_index()
+        build_index_from_tree(r.path, index, r.object_store, merged_tree.id)
+        index.write()
+
+        if conflicts or no_commit:
+            # Don't create a commit if there are conflicts or no_commit is True
+            return (None, conflicts)
+
+        # Create merge commit
+        merge_commit_obj = Commit()
+        merge_commit_obj.tree = merged_tree.id
+        merge_commit_obj.parents = [head_commit_id, merge_commit_id]
+
+        # Set author/committer
+        if author is None:
+            author = get_user_identity(r.get_config_stack())
+        if committer is None:
+            committer = author
+
+        merge_commit_obj.author = author
+        merge_commit_obj.committer = committer
+
+        # Set timestamps
+        timestamp = int(time())
+        timezone = 0  # UTC
+        merge_commit_obj.author_time = timestamp
+        merge_commit_obj.author_timezone = timezone
+        merge_commit_obj.commit_time = timestamp
+        merge_commit_obj.commit_timezone = timezone
+
+        # Set commit message
+        if message is None:
+            message = f"Merge commit '{merge_commit_id.decode()[:7]}'\n"
+        merge_commit_obj.message = (
+            message.encode() if isinstance(message, str) else message
+        )
+
+        # Add commit to object store
+        r.object_store.add_object(merge_commit_obj)
+
+        # Update HEAD
+        r.refs[b"HEAD"] = merge_commit_obj.id
+
+        return (merge_commit_obj.id, [])

+ 1 - 0
pyproject.toml

@@ -44,6 +44,7 @@ dev = [
     "ruff==0.11.11",
     "mypy==1.15.0"
 ]
+merge = ["merge3"]
 
 [project.scripts]
 dulwich = "dulwich.cli:main"

+ 275 - 0
tests/test_cli_merge.py

@@ -0,0 +1,275 @@
+# test_cli_merge.py -- Tests for dulwich merge CLI command
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for dulwich merge CLI command."""
+
+import io
+import os
+import tempfile
+import unittest
+from unittest.mock import patch
+
+from dulwich import porcelain
+from dulwich.cli import main
+from dulwich.tests import TestCase
+
+
+class CLIMergeTests(TestCase):
+    """Tests for the dulwich merge CLI command."""
+
+    def test_merge_fast_forward(self):
+        """Test CLI merge with fast-forward."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master
+            porcelain.checkout_branch(tmpdir, "master")
+
+            # Test merge via CLI
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    ret = main(["merge", "feature"])
+                    output = mock_stdout.getvalue()
+
+                self.assertEqual(ret, None)  # Success
+                self.assertIn("Merge successful", output)
+
+                # Check that file2.txt exists
+                self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
+            finally:
+                os.chdir(old_cwd)
+
+    def test_merge_with_conflicts(self):
+        """Test CLI merge with conflicts."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch and modify file1
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Modify file1 in feature")
+
+            # Go back to master and modify file1 differently
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Master content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Modify file1 in master")
+
+            # Test merge via CLI - should exit with error
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    with patch("sys.exit") as mock_exit:
+                        main(["merge", "feature"])
+                        mock_exit.assert_called_with(1)
+                    output = mock_stdout.getvalue()
+
+                self.assertIn("Merge conflicts", output)
+                self.assertIn("file1.txt", output)
+            finally:
+                os.chdir(old_cwd)
+
+    def test_merge_already_up_to_date(self):
+        """Test CLI merge when already up to date."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Test merge via CLI
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    ret = main(["merge", "HEAD"])
+                    output = mock_stdout.getvalue()
+
+                self.assertEqual(ret, None)  # Success
+                self.assertIn("Already up to date", output)
+            finally:
+                os.chdir(old_cwd)
+
+    def test_merge_no_commit(self):
+        """Test CLI merge with --no-commit."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master and add another file
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file3.txt"), "w") as f:
+                f.write("Master content\n")
+            porcelain.add(tmpdir, paths=["file3.txt"])
+            porcelain.commit(tmpdir, message=b"Add file3")
+
+            # Test merge via CLI with --no-commit
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    ret = main(["merge", "--no-commit", "feature"])
+                    output = mock_stdout.getvalue()
+
+                self.assertEqual(ret, None)  # Success
+                self.assertIn("not committing", output)
+
+                # Check that files are merged
+                self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
+                self.assertTrue(os.path.exists(os.path.join(tmpdir, "file3.txt")))
+            finally:
+                os.chdir(old_cwd)
+
+    def test_merge_no_ff(self):
+        """Test CLI merge with --no-ff."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master
+            porcelain.checkout_branch(tmpdir, "master")
+
+            # Test merge via CLI with --no-ff
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    ret = main(["merge", "--no-ff", "feature"])
+                    output = mock_stdout.getvalue()
+
+                self.assertEqual(ret, None)  # Success
+                self.assertIn("Merge successful", output)
+                self.assertIn("Created merge commit", output)
+            finally:
+                os.chdir(old_cwd)
+
+    def test_merge_with_message(self):
+        """Test CLI merge with custom message."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master and add another file
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file3.txt"), "w") as f:
+                f.write("Master content\n")
+            porcelain.add(tmpdir, paths=["file3.txt"])
+            porcelain.commit(tmpdir, message=b"Add file3")
+
+            # Test merge via CLI with custom message
+            old_cwd = os.getcwd()
+            try:
+                os.chdir(tmpdir)
+                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                    ret = main(["merge", "-m", "Custom merge message", "feature"])
+                    output = mock_stdout.getvalue()
+
+                self.assertEqual(ret, None)  # Success
+                self.assertIn("Merge successful", output)
+            finally:
+                os.chdir(old_cwd)
+
+
+if __name__ == "__main__":
+    unittest.main()

+ 297 - 0
tests/test_merge.py

@@ -0,0 +1,297 @@
+"""Tests for merge functionality."""
+
+import unittest
+
+from dulwich.merge import MergeConflict, Merger, three_way_merge
+from dulwich.objects import Blob, Commit, Tree
+from dulwich.repo import MemoryRepo
+
+
+class MergeTests(unittest.TestCase):
+    """Tests for merge functionality."""
+
+    def setUp(self):
+        self.repo = MemoryRepo()
+        self.merger = Merger(self.repo.object_store)
+
+    def test_merge_blobs_no_conflict(self):
+        """Test merging blobs without conflicts."""
+        # Create base blob
+        base_blob = Blob.from_string(b"line1\nline2\nline3\n")
+
+        # Create modified versions - currently our algorithm treats changes to different line groups as conflicts
+        # This is a simple implementation - Git's merge is more sophisticated
+        ours_blob = Blob.from_string(b"line1\nmodified line2\nline3\n")
+        theirs_blob = Blob.from_string(b"line1\nline2\nmodified line3\n")
+
+        # Add blobs to object store
+        self.repo.object_store.add_object(base_blob)
+        self.repo.object_store.add_object(ours_blob)
+        self.repo.object_store.add_object(theirs_blob)
+
+        # Merge - this will result in a conflict with our simple algorithm
+        result, has_conflicts = self.merger.merge_blobs(
+            base_blob, ours_blob, theirs_blob
+        )
+
+        # For now, expect conflicts since both sides changed (even different lines)
+        self.assertTrue(has_conflicts)
+        self.assertIn(b"<<<<<<< ours", result)
+        self.assertIn(b">>>>>>> theirs", result)
+
+    def test_merge_blobs_clean_merge(self):
+        """Test merging blobs with a clean merge (one side unchanged)."""
+        # Create base blob
+        base_blob = Blob.from_string(b"line1\nline2\nline3\n")
+
+        # Only ours modifies
+        ours_blob = Blob.from_string(b"line1\nmodified line2\nline3\n")
+        theirs_blob = base_blob  # unchanged
+
+        # Add blobs to object store
+        self.repo.object_store.add_object(base_blob)
+        self.repo.object_store.add_object(ours_blob)
+
+        # Merge
+        result, has_conflicts = self.merger.merge_blobs(
+            base_blob, ours_blob, theirs_blob
+        )
+
+        self.assertFalse(has_conflicts)
+        self.assertEqual(result, b"line1\nmodified line2\nline3\n")
+
+    def test_merge_blobs_with_conflict(self):
+        """Test merging blobs with conflicts."""
+        # Create base blob
+        base_blob = Blob.from_string(b"line1\nline2\nline3\n")
+
+        # Create conflicting modifications
+        ours_blob = Blob.from_string(b"line1\nours line2\nline3\n")
+        theirs_blob = Blob.from_string(b"line1\ntheirs line2\nline3\n")
+
+        # Add blobs to object store
+        self.repo.object_store.add_object(base_blob)
+        self.repo.object_store.add_object(ours_blob)
+        self.repo.object_store.add_object(theirs_blob)
+
+        # Merge
+        result, has_conflicts = self.merger.merge_blobs(
+            base_blob, ours_blob, theirs_blob
+        )
+
+        self.assertTrue(has_conflicts)
+        self.assertIn(b"<<<<<<< ours", result)
+        self.assertIn(b"=======", result)
+        self.assertIn(b">>>>>>> theirs", result)
+
+    def test_merge_blobs_identical(self):
+        """Test merging identical blobs."""
+        blob = Blob.from_string(b"same content\n")
+        self.repo.object_store.add_object(blob)
+
+        result, has_conflicts = self.merger.merge_blobs(blob, blob, blob)
+
+        self.assertFalse(has_conflicts)
+        self.assertEqual(result, b"same content\n")
+
+    def test_merge_blobs_one_side_unchanged(self):
+        """Test merging when one side is unchanged."""
+        base_blob = Blob.from_string(b"original\n")
+        modified_blob = Blob.from_string(b"modified\n")
+
+        self.repo.object_store.add_object(base_blob)
+        self.repo.object_store.add_object(modified_blob)
+
+        # Test ours unchanged, theirs modified
+        result, has_conflicts = self.merger.merge_blobs(
+            base_blob, base_blob, modified_blob
+        )
+        self.assertFalse(has_conflicts)
+        self.assertEqual(result, b"modified\n")
+
+        # Test theirs unchanged, ours modified
+        result, has_conflicts = self.merger.merge_blobs(
+            base_blob, modified_blob, base_blob
+        )
+        self.assertFalse(has_conflicts)
+        self.assertEqual(result, b"modified\n")
+
+    def test_merge_blobs_deletion_no_conflict(self):
+        """Test merging with deletion where no conflict occurs."""
+        base_blob = Blob.from_string(b"content\n")
+        self.repo.object_store.add_object(base_blob)
+
+        # Both delete
+        result, has_conflicts = self.merger.merge_blobs(base_blob, None, None)
+        self.assertFalse(has_conflicts)
+        self.assertEqual(result, b"")
+
+        # One deletes, other unchanged
+        result, has_conflicts = self.merger.merge_blobs(base_blob, None, base_blob)
+        self.assertFalse(has_conflicts)
+        self.assertEqual(result, b"")
+
+    def test_merge_blobs_deletion_with_conflict(self):
+        """Test merging with deletion that causes conflict."""
+        base_blob = Blob.from_string(b"content\n")
+        modified_blob = Blob.from_string(b"modified content\n")
+
+        self.repo.object_store.add_object(base_blob)
+        self.repo.object_store.add_object(modified_blob)
+
+        # We delete, they modify
+        result, has_conflicts = self.merger.merge_blobs(base_blob, None, modified_blob)
+        self.assertTrue(has_conflicts)
+
+    def test_merge_blobs_no_base(self):
+        """Test merging blobs with no common ancestor."""
+        blob1 = Blob.from_string(b"content1\n")
+        blob2 = Blob.from_string(b"content2\n")
+
+        self.repo.object_store.add_object(blob1)
+        self.repo.object_store.add_object(blob2)
+
+        # Different content added in both - conflict
+        result, has_conflicts = self.merger.merge_blobs(None, blob1, blob2)
+        self.assertTrue(has_conflicts)
+
+        # Same content added in both - no conflict
+        result, has_conflicts = self.merger.merge_blobs(None, blob1, blob1)
+        self.assertFalse(has_conflicts)
+        self.assertEqual(result, b"content1\n")
+
+    def test_merge_trees_simple(self):
+        """Test simple tree merge."""
+        # Create base tree
+        base_tree = Tree()
+        blob1 = Blob.from_string(b"file1 content\n")
+        blob2 = Blob.from_string(b"file2 content\n")
+        self.repo.object_store.add_object(blob1)
+        self.repo.object_store.add_object(blob2)
+        base_tree.add(b"file1.txt", 0o100644, blob1.id)
+        base_tree.add(b"file2.txt", 0o100644, blob2.id)
+        self.repo.object_store.add_object(base_tree)
+
+        # Create ours tree (modify file1)
+        ours_tree = Tree()
+        ours_blob1 = Blob.from_string(b"file1 modified by ours\n")
+        self.repo.object_store.add_object(ours_blob1)
+        ours_tree.add(b"file1.txt", 0o100644, ours_blob1.id)
+        ours_tree.add(b"file2.txt", 0o100644, blob2.id)
+        self.repo.object_store.add_object(ours_tree)
+
+        # Create theirs tree (modify file2)
+        theirs_tree = Tree()
+        theirs_blob2 = Blob.from_string(b"file2 modified by theirs\n")
+        self.repo.object_store.add_object(theirs_blob2)
+        theirs_tree.add(b"file1.txt", 0o100644, blob1.id)
+        theirs_tree.add(b"file2.txt", 0o100644, theirs_blob2.id)
+        self.repo.object_store.add_object(theirs_tree)
+
+        # Merge
+        merged_tree, conflicts = self.merger.merge_trees(
+            base_tree, ours_tree, theirs_tree
+        )
+
+        self.assertEqual(len(conflicts), 0)
+        self.assertIn(b"file1.txt", [item.path for item in merged_tree.items()])
+        self.assertIn(b"file2.txt", [item.path for item in merged_tree.items()])
+
+    def test_merge_trees_with_conflict(self):
+        """Test tree merge with conflicting changes."""
+        # Create base tree
+        base_tree = Tree()
+        blob1 = Blob.from_string(b"original content\n")
+        self.repo.object_store.add_object(blob1)
+        base_tree.add(b"conflict.txt", 0o100644, blob1.id)
+        self.repo.object_store.add_object(base_tree)
+
+        # Create ours tree
+        ours_tree = Tree()
+        ours_blob = Blob.from_string(b"ours content\n")
+        self.repo.object_store.add_object(ours_blob)
+        ours_tree.add(b"conflict.txt", 0o100644, ours_blob.id)
+        self.repo.object_store.add_object(ours_tree)
+
+        # Create theirs tree
+        theirs_tree = Tree()
+        theirs_blob = Blob.from_string(b"theirs content\n")
+        self.repo.object_store.add_object(theirs_blob)
+        theirs_tree.add(b"conflict.txt", 0o100644, theirs_blob.id)
+        self.repo.object_store.add_object(theirs_tree)
+
+        # Merge
+        merged_tree, conflicts = self.merger.merge_trees(
+            base_tree, ours_tree, theirs_tree
+        )
+
+        self.assertEqual(len(conflicts), 1)
+        self.assertEqual(conflicts[0], b"conflict.txt")
+
+    def test_three_way_merge(self):
+        """Test three-way merge between commits."""
+        # Create base commit
+        base_tree = Tree()
+        blob = Blob.from_string(b"base content\n")
+        self.repo.object_store.add_object(blob)
+        base_tree.add(b"file.txt", 0o100644, blob.id)
+        self.repo.object_store.add_object(base_tree)
+
+        base_commit = Commit()
+        base_commit.tree = base_tree.id
+        base_commit.author = b"Test Author <test@example.com>"
+        base_commit.committer = b"Test Author <test@example.com>"
+        base_commit.message = b"Base commit"
+        base_commit.commit_time = base_commit.author_time = 12345
+        base_commit.commit_timezone = base_commit.author_timezone = 0
+        self.repo.object_store.add_object(base_commit)
+
+        # Create ours commit
+        ours_tree = Tree()
+        ours_blob = Blob.from_string(b"ours content\n")
+        self.repo.object_store.add_object(ours_blob)
+        ours_tree.add(b"file.txt", 0o100644, ours_blob.id)
+        self.repo.object_store.add_object(ours_tree)
+
+        ours_commit = Commit()
+        ours_commit.tree = ours_tree.id
+        ours_commit.parents = [base_commit.id]
+        ours_commit.author = b"Test Author <test@example.com>"
+        ours_commit.committer = b"Test Author <test@example.com>"
+        ours_commit.message = b"Ours commit"
+        ours_commit.commit_time = ours_commit.author_time = 12346
+        ours_commit.commit_timezone = ours_commit.author_timezone = 0
+        self.repo.object_store.add_object(ours_commit)
+
+        # Create theirs commit
+        theirs_tree = Tree()
+        theirs_blob = Blob.from_string(b"theirs content\n")
+        self.repo.object_store.add_object(theirs_blob)
+        theirs_tree.add(b"file.txt", 0o100644, theirs_blob.id)
+        self.repo.object_store.add_object(theirs_tree)
+
+        theirs_commit = Commit()
+        theirs_commit.tree = theirs_tree.id
+        theirs_commit.parents = [base_commit.id]
+        theirs_commit.author = b"Test Author <test@example.com>"
+        theirs_commit.committer = b"Test Author <test@example.com>"
+        theirs_commit.message = b"Theirs commit"
+        theirs_commit.commit_time = theirs_commit.author_time = 12347
+        theirs_commit.commit_timezone = theirs_commit.author_timezone = 0
+        self.repo.object_store.add_object(theirs_commit)
+
+        # Perform three-way merge
+        merged_tree, conflicts = three_way_merge(
+            self.repo.object_store, base_commit, ours_commit, theirs_commit
+        )
+
+        # Should have conflict since both modified the same file differently
+        self.assertEqual(len(conflicts), 1)
+        self.assertEqual(conflicts[0], b"file.txt")
+
+    def test_merge_exception(self):
+        """Test MergeConflict exception."""
+        exc = MergeConflict(b"test/path", "test message")
+        self.assertEqual(exc.path, b"test/path")
+        self.assertIn("test/path", str(exc))
+        self.assertIn("test message", str(exc))

+ 277 - 0
tests/test_porcelain_merge.py

@@ -0,0 +1,277 @@
+# test_porcelain_merge.py -- Tests for porcelain merge functionality
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for porcelain merge functionality."""
+
+import os
+import tempfile
+import unittest
+
+from dulwich import porcelain
+from dulwich.repo import Repo
+from dulwich.tests import TestCase
+
+
+class PorcelainMergeTests(TestCase):
+    """Tests for the porcelain merge functionality."""
+
+    def test_merge_fast_forward(self):
+        """Test fast-forward merge."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            feature_commit = porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master
+            porcelain.checkout_branch(tmpdir, "master")
+
+            # Merge feature branch (should fast-forward)
+            merge_commit, conflicts = porcelain.merge(tmpdir, "feature")
+
+            self.assertEqual(merge_commit, feature_commit)
+            self.assertEqual(conflicts, [])
+
+            # Check that file2.txt exists
+            self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
+
+    def test_merge_already_up_to_date(self):
+        """Test merge when already up to date."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Try to merge the same commit
+            merge_commit, conflicts = porcelain.merge(tmpdir, "HEAD")
+
+            self.assertIsNone(merge_commit)
+            self.assertEqual(conflicts, [])
+
+    def test_merge_no_ff(self):
+        """Test merge with --no-ff flag."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            feature_commit = porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master
+            porcelain.checkout_branch(tmpdir, "master")
+
+            # Merge feature branch with no-ff
+            merge_commit, conflicts = porcelain.merge(tmpdir, "feature", no_ff=True)
+
+            # Should create a new merge commit
+            self.assertIsNotNone(merge_commit)
+            self.assertNotEqual(merge_commit, feature_commit)
+            self.assertEqual(conflicts, [])
+
+            # Check that it's a merge commit with two parents
+            with Repo(tmpdir) as repo:
+                commit = repo[merge_commit]
+                self.assertEqual(len(commit.parents), 2)
+
+    def test_merge_three_way(self):
+        """Test three-way merge without conflicts."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Initial file2\n")
+            porcelain.add(tmpdir, paths=["file1.txt", "file2.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch and modify file1
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Modify file1 in feature")
+
+            # Go back to master and modify file2
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Master file2\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Modify file2 in master")
+
+            # Merge feature branch
+            merge_commit, conflicts = porcelain.merge(tmpdir, "feature")
+
+            self.assertIsNotNone(merge_commit)
+            self.assertEqual(conflicts, [])
+
+            # Check both modifications are present
+            with open(os.path.join(tmpdir, "file1.txt")) as f:
+                self.assertEqual(f.read(), "Feature content\n")
+            with open(os.path.join(tmpdir, "file2.txt")) as f:
+                self.assertEqual(f.read(), "Master file2\n")
+
+    def test_merge_with_conflicts(self):
+        """Test merge with conflicts."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch and modify file1
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Modify file1 in feature")
+
+            # Go back to master and modify file1 differently
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Master content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Modify file1 in master")
+
+            # Merge feature branch - should have conflicts
+            merge_commit, conflicts = porcelain.merge(tmpdir, "feature")
+
+            self.assertIsNone(merge_commit)
+            self.assertEqual(len(conflicts), 1)
+            self.assertEqual(conflicts[0], b"file1.txt")
+
+            # Check conflict markers in file
+            with open(os.path.join(tmpdir, "file1.txt"), "rb") as f:
+                content = f.read()
+                self.assertIn(b"<<<<<<< ours", content)
+                self.assertIn(b"=======", content)
+                self.assertIn(b">>>>>>> theirs", content)
+
+    def test_merge_no_commit(self):
+        """Test merge with no_commit flag."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Create a branch
+            porcelain.branch_create(tmpdir, "feature")
+            porcelain.checkout_branch(tmpdir, "feature")
+
+            # Add a file on feature branch
+            with open(os.path.join(tmpdir, "file2.txt"), "w") as f:
+                f.write("Feature content\n")
+            porcelain.add(tmpdir, paths=["file2.txt"])
+            porcelain.commit(tmpdir, message=b"Add feature")
+
+            # Go back to master and add another file
+            porcelain.checkout_branch(tmpdir, "master")
+            with open(os.path.join(tmpdir, "file3.txt"), "w") as f:
+                f.write("Master content\n")
+            porcelain.add(tmpdir, paths=["file3.txt"])
+            master_commit = porcelain.commit(tmpdir, message=b"Add file3")
+
+            # Merge feature branch with no_commit
+            merge_commit, conflicts = porcelain.merge(tmpdir, "feature", no_commit=True)
+
+            self.assertIsNone(merge_commit)
+            self.assertEqual(conflicts, [])
+
+            # Check that files are merged but no commit was created
+            self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
+            self.assertTrue(os.path.exists(os.path.join(tmpdir, "file3.txt")))
+
+            # HEAD should still point to master_commit
+            with Repo(tmpdir) as repo:
+                self.assertEqual(repo.refs[b"HEAD"], master_commit)
+
+    def test_merge_no_head(self):
+        """Test merge with no HEAD reference."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo without any commits
+            porcelain.init(tmpdir)
+
+            # Try to merge - should fail with no HEAD
+            self.assertRaises(porcelain.Error, porcelain.merge, tmpdir, "nonexistent")
+
+    def test_merge_invalid_commit(self):
+        """Test merge with invalid commit reference."""
+        with tempfile.TemporaryDirectory() as tmpdir:
+            # Initialize repo
+            porcelain.init(tmpdir)
+
+            # Create initial commit
+            with open(os.path.join(tmpdir, "file1.txt"), "w") as f:
+                f.write("Initial content\n")
+            porcelain.add(tmpdir, paths=["file1.txt"])
+            porcelain.commit(tmpdir, message=b"Initial commit")
+
+            # Try to merge nonexistent commit
+            self.assertRaises(porcelain.Error, porcelain.merge, tmpdir, "nonexistent")
+
+
+if __name__ == "__main__":
+    unittest.main()