Explorar el Código

Merge branch 'master' into lfs-performance

Jelmer Vernooij hace 4 meses
padre
commit
ebbef3f851

+ 6 - 0
NEWS

@@ -40,6 +40,12 @@
    performance in repositories with many LFS-tracked files.
    (Jelmer Vernooij, #1789)
 
+ * Add support for ``patiencediff`` algorithm in diff.
+   (Jelmer Vernooij, #1795)
+
+ * Add IPv6 support for git:// protocol URLs.
+   (Jelmer Vernooij, #1796)
+
 0.24.1	2025-08-01
 
  * Require ``typing_extensions`` on Python 3.10.

La diferencia del archivo ha sido suprimido porque es demasiado grande
+ 242 - 184
dulwich/cli.py


+ 6 - 1
dulwich/client.py

@@ -1708,7 +1708,12 @@ class TCPGitClient(TraditionalGitClient):
         Returns:
           ``git://`` URL for the path
         """
-        netloc = self._host
+        # IPv6 addresses contain colons and need to be wrapped in brackets
+        if ":" in self._host:
+            netloc = f"[{self._host}]"
+        else:
+            netloc = self._host
+
         if self._port is not None and self._port != TCP_GIT_PORT:
             netloc += f":{self._port}"
         return urlunsplit(("git", netloc, path, "", ""))

+ 8 - 8
dulwich/cloud/gcs.py

@@ -80,17 +80,17 @@ class GcsObjectStore(BucketBasedObjectStore):
 
         from ..file import _GitFile
 
-        f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE)
-        b.download_to_file(f)
-        f.seek(0)
-        return PackData(name + ".pack", cast(_GitFile, f))
+        with tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE) as f:
+            b.download_to_file(f)
+            f.seek(0)
+            return PackData(name + ".pack", cast(_GitFile, f))
 
     def _load_pack_index(self, name: str) -> PackIndex:
         b = self.bucket.blob(posixpath.join(self.subpath, name + ".idx"))
-        f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE)
-        b.download_to_file(f)
-        f.seek(0)
-        return load_pack_index_file(name + ".idx", f)
+        with tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE) as f:
+            b.download_to_file(f)
+            f.seek(0)
+            return load_pack_index_file(name + ".idx", f)
 
     def _get_pack(self, name: str) -> Pack:
         return Pack.from_lazy_objects(  # type: ignore[no-untyped-call]

+ 10 - 1
dulwich/diff.py

@@ -78,6 +78,7 @@ def diff_index_to_tree(
     outstream: BinaryIO,
     commit_sha: Optional[bytes] = None,
     paths: Optional[list[bytes]] = None,
+    diff_algorithm: Optional[str] = None,
 ) -> None:
     """Show staged changes (index vs commit).
 
@@ -86,6 +87,7 @@ def diff_index_to_tree(
         outstream: Stream to write diff to
         commit_sha: SHA of commit to compare against, or None for HEAD
         paths: Optional list of paths to filter (as bytes)
+        diff_algorithm: Algorithm to use for diffing ("myers" or "patience"), defaults to DEFAULT_DIFF_ALGORITHM if None
     """
     if commit_sha is None:
         try:
@@ -112,6 +114,7 @@ def diff_index_to_tree(
             repo.object_store,
             (oldpath, oldmode, oldsha),
             (newpath, newmode, newsha),
+            diff_algorithm=diff_algorithm,
         )
 
 
@@ -120,6 +123,7 @@ def diff_working_tree_to_tree(
     outstream: BinaryIO,
     commit_sha: bytes,
     paths: Optional[list[bytes]] = None,
+    diff_algorithm: Optional[str] = None,
 ) -> None:
     """Compare working tree to a specific commit.
 
@@ -128,6 +132,7 @@ def diff_working_tree_to_tree(
         outstream: Stream to write diff to
         commit_sha: SHA of commit to compare against
         paths: Optional list of paths to filter (as bytes)
+        diff_algorithm: Algorithm to use for diffing ("myers" or "patience"), defaults to DEFAULT_DIFF_ALGORITHM if None
     """
     commit = repo[commit_sha]
     assert isinstance(commit, Commit)
@@ -357,7 +362,10 @@ def diff_working_tree_to_tree(
 
 
 def diff_working_tree_to_index(
-    repo: Repo, outstream: BinaryIO, paths: Optional[list[bytes]] = None
+    repo: Repo,
+    outstream: BinaryIO,
+    paths: Optional[list[bytes]] = None,
+    diff_algorithm: Optional[str] = None,
 ) -> None:
     """Compare working tree to index.
 
@@ -365,6 +373,7 @@ def diff_working_tree_to_index(
         repo: Repository object
         outstream: Stream to write diff to
         paths: Optional list of paths to filter (as bytes)
+        diff_algorithm: Algorithm to use for diffing ("myers" or "patience"), defaults to DEFAULT_DIFF_ALGORITHM if None
     """
     index = repo.open_index()
     normalizer = repo.get_blob_normalizer()

+ 12 - 6
dulwich/gc.py

@@ -1,6 +1,7 @@
 """Git garbage collection implementation."""
 
 import collections
+import logging
 import os
 import time
 from dataclasses import dataclass, field
@@ -292,10 +293,10 @@ def garbage_collect(
     if not dry_run:
         if prune and unreachable_to_prune:
             # Repack excluding unreachable objects
-            object_store.repack(exclude=unreachable_to_prune)
+            object_store.repack(exclude=unreachable_to_prune, progress=progress)
         else:
             # Normal repack
-            object_store.repack()
+            object_store.repack(progress=progress)
 
     # Prune orphaned temporary files
     if progress:
@@ -367,12 +368,15 @@ def should_run_gc(repo: "BaseRepo", config: Optional["Config"] = None) -> bool:
     return False
 
 
-def maybe_auto_gc(repo: "Repo", config: Optional["Config"] = None) -> bool:
+def maybe_auto_gc(
+    repo: "Repo", config: Optional["Config"] = None, progress: Optional[Callable] = None
+) -> bool:
     """Run automatic garbage collection if needed.
 
     Args:
         repo: Repository to potentially GC
         config: Configuration to use (defaults to repo config)
+        progress: Optional progress reporting callback
 
     Returns:
         True if GC was run, False otherwise
@@ -383,7 +387,7 @@ def maybe_auto_gc(repo: "Repo", config: Optional["Config"] = None) -> bool:
     # Check for gc.log file - only for disk-based repos
     if not hasattr(repo, "controldir"):
         # For non-disk repos, just run GC without gc.log handling
-        garbage_collect(repo, auto=True)
+        garbage_collect(repo, auto=True, progress=progress)
         return True
 
     gc_log_path = os.path.join(repo.controldir(), "gc.log")
@@ -409,7 +413,9 @@ def maybe_auto_gc(repo: "Repo", config: Optional["Config"] = None) -> bool:
         if time.time() - stat_info.st_mtime < expiry_seconds:
             # gc.log exists and is not expired - skip GC
             with open(gc_log_path, "rb") as f:
-                print(f.read().decode("utf-8", errors="replace"))
+                logging.info(
+                    "gc.log content: %s", f.read().decode("utf-8", errors="replace")
+                )
             return False
 
     # TODO: Support gc.autoDetach to run in background
@@ -417,7 +423,7 @@ def maybe_auto_gc(repo: "Repo", config: Optional["Config"] = None) -> bool:
 
     try:
         # Run GC with auto=True flag
-        garbage_collect(repo, auto=True)
+        garbage_collect(repo, auto=True, progress=progress)
 
         # Remove gc.log on successful completion
         if os.path.exists(gc_log_path):

+ 3 - 1
dulwich/lfs.py

@@ -43,6 +43,8 @@ from typing import TYPE_CHECKING, BinaryIO, Optional, Union
 from urllib.parse import urljoin, urlparse
 from urllib.request import Request, urlopen
 
+logger = logging.getLogger(__name__)
+
 if TYPE_CHECKING:
     import urllib3
 
@@ -285,7 +287,7 @@ class LFSFilterDriver:
                 return content
             except LFSError as e:
                 # Download failed, fall back to returning pointer
-                logging.warning("LFS object download failed for %s: %s", pointer.oid, e)
+                logger.warning("LFS object download failed for %s: %s", pointer.oid, e)
 
                 # Return pointer as-is when object is missing and download failed
                 return data

+ 14 - 5
dulwich/object_store.py

@@ -769,9 +769,12 @@ class PackBasedObjectStore(BaseObjectStore, PackedObjectContainer):
     def _remove_pack(self, pack: "Pack") -> None:
         raise NotImplementedError(self._remove_pack)
 
-    def pack_loose_objects(self) -> int:
+    def pack_loose_objects(self, progress: Optional[Callable] = None) -> int:
         """Pack loose objects.
 
+        Args:
+          progress: Optional progress reporting callback
+
         Returns: Number of objects packed
         """
         objects: list[tuple[ShaFile, None]] = []
@@ -779,12 +782,14 @@ class PackBasedObjectStore(BaseObjectStore, PackedObjectContainer):
             obj = self._get_loose_object(sha)
             if obj is not None:
                 objects.append((obj, None))
-        self.add_objects(objects)
+        self.add_objects(objects, progress=progress)
         for obj, path in objects:
             self.delete_loose_object(obj.id)
         return len(objects)
 
-    def repack(self, exclude: Optional[set] = None) -> int:
+    def repack(
+        self, exclude: Optional[set] = None, progress: Optional[Callable] = None
+    ) -> int:
         """Repack the packs in this repository.
 
         Note that this implementation is fairly naive and currently keeps all
@@ -792,6 +797,7 @@ class PackBasedObjectStore(BaseObjectStore, PackedObjectContainer):
 
         Args:
           exclude: Optional set of object SHAs to exclude from repacking
+          progress: Optional progress reporting callback
         """
         if exclude is None:
             exclude = set()
@@ -818,7 +824,7 @@ class PackBasedObjectStore(BaseObjectStore, PackedObjectContainer):
             # The name of the consolidated pack might match the name of a
             # pre-existing pack. Take care not to remove the newly created
             # consolidated pack.
-            consolidated = self.add_objects(list(objects))
+            consolidated = self.add_objects(list(objects), progress=progress)
             if consolidated is not None:
                 old_packs.pop(consolidated.name(), None)
 
@@ -2507,10 +2513,13 @@ class BucketBasedObjectStore(PackBasedObjectStore):
         """
         # Doesn't exist..
 
-    def pack_loose_objects(self) -> int:
+    def pack_loose_objects(self, progress: Optional[Callable] = None) -> int:
         """Pack loose objects. Returns number of objects packed.
 
         BucketBasedObjectStore doesn't support loose objects, so this is a no-op.
+
+        Args:
+          progress: Optional progress reporting callback (ignored)
         """
         return 0
 

+ 141 - 2
dulwich/patch.py

@@ -47,6 +47,30 @@ from .objects import S_ISGITLINK, Blob, Commit
 
 FIRST_FEW_BYTES = 8000
 
+DEFAULT_DIFF_ALGORITHM = "myers"
+
+
+class DiffAlgorithmNotAvailable(Exception):
+    """Raised when a requested diff algorithm is not available."""
+
+    def __init__(self, algorithm: str, install_hint: str = "") -> None:
+        """Initialize exception.
+
+        Args:
+            algorithm: Name of the unavailable algorithm
+            install_hint: Optional installation hint
+        """
+        self.algorithm = algorithm
+        self.install_hint = install_hint
+        if install_hint:
+            super().__init__(
+                f"Diff algorithm '{algorithm}' requested but not available. {install_hint}"
+            )
+        else:
+            super().__init__(
+                f"Diff algorithm '{algorithm}' requested but not available."
+            )
+
 
 def write_commit_patch(
     f: IO[bytes],
@@ -191,6 +215,107 @@ def unified_diff(
                     yield b"+" + line
 
 
+def _get_sequence_matcher(algorithm: str, a: list[bytes], b: list[bytes]):
+    """Get appropriate sequence matcher for the given algorithm.
+
+    Args:
+        algorithm: Diff algorithm ("myers" or "patience")
+        a: First sequence
+        b: Second sequence
+
+    Returns:
+        Configured sequence matcher instance
+
+    Raises:
+        DiffAlgorithmNotAvailable: If patience requested but not available
+    """
+    if algorithm == "patience":
+        try:
+            from patiencediff import PatienceSequenceMatcher
+
+            return PatienceSequenceMatcher(None, a, b)
+        except ImportError:
+            raise DiffAlgorithmNotAvailable(
+                "patience", "Install with: pip install 'dulwich[patiencediff]'"
+            )
+    else:
+        return SequenceMatcher(a=a, b=b)
+
+
+def unified_diff_with_algorithm(
+    a: list[bytes],
+    b: list[bytes],
+    fromfile: bytes = b"",
+    tofile: bytes = b"",
+    fromfiledate: str = "",
+    tofiledate: str = "",
+    n: int = 3,
+    lineterm: str = "\n",
+    tree_encoding: str = "utf-8",
+    output_encoding: str = "utf-8",
+    algorithm: Optional[str] = None,
+) -> Generator[bytes, None, None]:
+    """Generate unified diff with specified algorithm.
+
+    Args:
+        a: First sequence of lines
+        b: Second sequence of lines
+        fromfile: Name of first file
+        tofile: Name of second file
+        fromfiledate: Date of first file
+        tofiledate: Date of second file
+        n: Number of context lines
+        lineterm: Line terminator
+        tree_encoding: Encoding for tree paths
+        output_encoding: Encoding for output
+        algorithm: Diff algorithm to use ("myers" or "patience")
+
+    Returns:
+        Generator yielding diff lines
+
+    Raises:
+        DiffAlgorithmNotAvailable: If patience algorithm requested but patiencediff not available
+    """
+    if algorithm is None:
+        algorithm = DEFAULT_DIFF_ALGORITHM
+
+    matcher = _get_sequence_matcher(algorithm, a, b)
+
+    started = False
+    for group in matcher.get_grouped_opcodes(n):
+        if not started:
+            started = True
+            fromdate = f"\t{fromfiledate}" if fromfiledate else ""
+            todate = f"\t{tofiledate}" if tofiledate else ""
+            yield f"--- {fromfile.decode(tree_encoding)}{fromdate}{lineterm}".encode(
+                output_encoding
+            )
+            yield f"+++ {tofile.decode(tree_encoding)}{todate}{lineterm}".encode(
+                output_encoding
+            )
+
+        first, last = group[0], group[-1]
+        file1_range = _format_range_unified(first[1], last[2])
+        file2_range = _format_range_unified(first[3], last[4])
+        yield f"@@ -{file1_range} +{file2_range} @@{lineterm}".encode(output_encoding)
+
+        for tag, i1, i2, j1, j2 in group:
+            if tag == "equal":
+                for line in a[i1:i2]:
+                    yield b" " + line
+                continue
+            if tag in ("replace", "delete"):
+                for line in a[i1:i2]:
+                    if not line[-1:] == b"\n":
+                        line += b"\n\\ No newline at end of file\n"
+                    yield b"-" + line
+            if tag in ("replace", "insert"):
+                for line in b[j1:j2]:
+                    if not line[-1:] == b"\n":
+                        line += b"\n\\ No newline at end of file\n"
+                    yield b"+" + line
+
+
 def is_binary(content: bytes) -> bool:
     """See if the first few bytes contain any null characters.
 
@@ -237,6 +362,7 @@ def write_object_diff(
     old_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
     new_file: tuple[Optional[bytes], Optional[int], Optional[bytes]],
     diff_binary: bool = False,
+    diff_algorithm: Optional[str] = None,
 ) -> None:
     """Write the diff for an object.
 
@@ -247,6 +373,7 @@ def write_object_diff(
       new_file: (path, mode, hexsha) tuple
       diff_binary: Whether to diff files even if they
         are considered binary files by is_binary().
+      diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
 
     Note: the tuple elements should be None for nonexistent files
     """
@@ -307,11 +434,12 @@ def write_object_diff(
         f.write(binary_diff)
     else:
         f.writelines(
-            unified_diff(
+            unified_diff_with_algorithm(
                 lines(old_content),
                 lines(new_content),
                 patched_old_path,
                 patched_new_path,
+                algorithm=diff_algorithm,
             )
         )
 
@@ -358,6 +486,7 @@ def write_blob_diff(
     f: IO[bytes],
     old_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
     new_file: tuple[Optional[bytes], Optional[int], Optional["Blob"]],
+    diff_algorithm: Optional[str] = None,
 ) -> None:
     """Write blob diff.
 
@@ -365,6 +494,7 @@ def write_blob_diff(
       f: File-like object to write to
       old_file: (path, mode, hexsha) tuple (None if nonexisting)
       new_file: (path, mode, hexsha) tuple (None if nonexisting)
+      diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
 
     Note: The use of write_object_diff is recommended over this function.
     """
@@ -397,7 +527,13 @@ def write_blob_diff(
     old_contents = lines(old_blob)
     new_contents = lines(new_blob)
     f.writelines(
-        unified_diff(old_contents, new_contents, patched_old_path, patched_new_path)
+        unified_diff_with_algorithm(
+            old_contents,
+            new_contents,
+            patched_old_path,
+            patched_new_path,
+            algorithm=diff_algorithm,
+        )
     )
 
 
@@ -407,6 +543,7 @@ def write_tree_diff(
     old_tree: Optional[bytes],
     new_tree: Optional[bytes],
     diff_binary: bool = False,
+    diff_algorithm: Optional[str] = None,
 ) -> None:
     """Write tree diff.
 
@@ -417,6 +554,7 @@ def write_tree_diff(
       new_tree: New tree id
       diff_binary: Whether to diff files even if they
         are considered binary files by is_binary().
+      diff_algorithm: Algorithm to use for diffing ("myers" or "patience")
     """
     changes = store.tree_changes(old_tree, new_tree)
     for (oldpath, newpath), (oldmode, newmode), (oldsha, newsha) in changes:
@@ -426,6 +564,7 @@ def write_tree_diff(
             (oldpath, oldmode, oldsha),
             (newpath, newmode, newsha),
             diff_binary=diff_binary,
+            diff_algorithm=diff_algorithm,
         )
 
 

+ 13 - 3
dulwich/porcelain.py

@@ -1560,6 +1560,7 @@ def diff(
     staged: bool = False,
     paths: Optional[list[Union[str, bytes]]] = None,
     outstream: BinaryIO = default_bytes_out_stream,
+    diff_algorithm: Optional[str] = None,
 ) -> None:
     """Show diff.
 
@@ -1576,6 +1577,8 @@ def diff(
               Ignored if commit2 is provided.
       paths: Optional list of paths to limit diff
       outstream: Stream to write to
+      diff_algorithm: Algorithm to use for diffing ("myers" or "patience"),
+                      defaults to the underlying function's default if None
     """
     from . import diff as diff_module
 
@@ -1637,19 +1640,26 @@ def diff(
                     r.object_store,
                     (oldpath, oldmode, oldsha),
                     (newpath, newmode, newsha),
+                    diff_algorithm=diff_algorithm,
                 )
         elif staged:
             # Show staged changes (index vs commit)
-            diff_module.diff_index_to_tree(r, outstream, commit_sha, byte_paths)
+            diff_module.diff_index_to_tree(
+                r, outstream, commit_sha, byte_paths, diff_algorithm=diff_algorithm
+            )
         elif commit is not None:
             # Compare working tree to a specific commit
             assert (
                 commit_sha is not None
             )  # mypy: commit_sha is set when commit is not None
-            diff_module.diff_working_tree_to_tree(r, outstream, commit_sha, byte_paths)
+            diff_module.diff_working_tree_to_tree(
+                r, outstream, commit_sha, byte_paths, diff_algorithm=diff_algorithm
+            )
         else:
             # Compare working tree to index
-            diff_module.diff_working_tree_to_index(r, outstream, byte_paths)
+            diff_module.diff_working_tree_to_index(
+                r, outstream, byte_paths, diff_algorithm=diff_algorithm
+            )
 
 
 def rev_list(

+ 1 - 0
pyproject.toml

@@ -49,6 +49,7 @@ dev = [
 ]
 merge = ["merge3"]
 fuzzing = ["atheris"]
+patiencediff = ["patiencediff"]
 
 [project.scripts]
 dulwich = "dulwich.cli:main"

+ 16 - 4
tests/compat/test_dumb.py

@@ -21,6 +21,7 @@
 
 """Compatibility tests for dumb HTTP git repositories."""
 
+import io
 import os
 import sys
 import tempfile
@@ -38,6 +39,10 @@ from tests.compat.utils import (
 )
 
 
+def no_op_progress(msg):
+    """Progress callback that does nothing."""
+
+
 class DumbHTTPRequestHandler(SimpleHTTPRequestHandler):
     """HTTP request handler for dumb git protocol."""
 
@@ -163,7 +168,8 @@ class DumbHTTPClientNoPackTests(CompatTestCase):
     )
     def test_clone_dumb(self):
         dest_path = os.path.join(self.temp_dir, "cloned")
-        repo = clone(self.server.url, dest_path)
+        # Use a dummy errstream to suppress progress output
+        repo = clone(self.server.url, dest_path, errstream=io.BytesIO())
         assert b"HEAD" in repo
 
     def test_clone_from_dumb_http(self):
@@ -183,7 +189,9 @@ class DumbHTTPClientNoPackTests(CompatTestCase):
                     sha for ref, sha in refs.items() if ref.startswith(b"refs/heads/")
                 ]
 
-            result = client.fetch("/", dest_repo, determine_wants=determine_wants)
+            result = client.fetch(
+                "/", dest_repo, determine_wants=determine_wants, progress=no_op_progress
+            )
 
             # Update refs
             for ref, sha in result.refs.items():
@@ -237,7 +245,9 @@ class DumbHTTPClientNoPackTests(CompatTestCase):
                         wants.append(sha)
                 return wants
 
-            result = client.fetch("/", dest_repo, determine_wants=determine_wants)
+            result = client.fetch(
+                "/", dest_repo, determine_wants=determine_wants, progress=no_op_progress
+            )
 
             # Update refs
             for ref, sha in result.refs.items():
@@ -282,7 +292,9 @@ class DumbHTTPClientNoPackTests(CompatTestCase):
                     if ref.startswith((b"refs/heads/", b"refs/tags/"))
                 ]
 
-            result = client.fetch("/", dest_repo, determine_wants=determine_wants)
+            result = client.fetch(
+                "/", dest_repo, determine_wants=determine_wants, progress=no_op_progress
+            )
 
             # Update refs
             for ref, sha in result.refs.items():

+ 14 - 3
tests/compat/test_server.py

@@ -58,9 +58,20 @@ class GitServerTestCase(ServerTests, CompatTestCase):
         backend = DictBackend({b"/": repo})
         dul_server = TCPGitServer(backend, b"localhost", 0, handlers=self._handlers())
         self._check_server(dul_server)
-        self.addCleanup(dul_server.shutdown)
-        self.addCleanup(dul_server.server_close)
-        threading.Thread(target=dul_server.serve).start()
+
+        # Start server in a thread
+        server_thread = threading.Thread(target=dul_server.serve)
+        server_thread.daemon = True  # Make thread daemon so it dies with main thread
+        server_thread.start()
+
+        # Add cleanup in the correct order
+        def cleanup_server():
+            dul_server.shutdown()
+            dul_server.server_close()
+            # Give thread a moment to exit cleanly
+            server_thread.join(timeout=1.0)
+
+        self.addCleanup(cleanup_server)
         self._server = dul_server
         _, port = self._server.socket.getsockname()
         return port

+ 153 - 147
tests/test_cli.py

@@ -81,6 +81,7 @@ class DulwichCliTestCase(TestCase):
         old_stdout = sys.stdout
         old_stderr = sys.stderr
         old_cwd = os.getcwd()
+
         try:
             # Use custom stdout_stream if provided, otherwise use MockStream
             if stdout_stream:
@@ -691,13 +692,15 @@ class FilterBranchCommandTest(DulwichCliTestCase):
     def test_filter_branch_subdirectory_filter(self):
         """Test filter-branch with subdirectory filter."""
         # Run filter-branch to extract only the subdir
-        result, stdout, stderr = self._run_cli(
-            "filter-branch", "--subdirectory-filter", "subdir"
-        )
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "filter-branch", "--subdirectory-filter", "subdir"
+            )
 
-        # Check that the operation succeeded
-        self.assertEqual(result, 0)
-        self.assertIn("Rewrite HEAD", stdout)
+            # Check that the operation succeeded
+            self.assertEqual(result, 0)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Rewrite HEAD", log_output)
 
         # filter-branch rewrites history but doesn't update working tree
         # We need to check the commit contents, not the working tree
@@ -786,12 +789,14 @@ class FilterBranchCommandTest(DulwichCliTestCase):
         self.assertTrue(len(original_refs) > 0, "No original refs found")
 
         # Run again without force - should fail
-        result, stdout, stderr = self._run_cli(
-            "filter-branch", "--msg-filter", "sed 's/^/[TEST2] /'"
-        )
-        self.assertEqual(result, 1)
-        self.assertIn("Cannot create a new backup", stdout)
-        self.assertIn("refs/original", stdout)
+        with self.assertLogs("dulwich.cli", level="ERROR") as cm:
+            result, stdout, stderr = self._run_cli(
+                "filter-branch", "--msg-filter", "sed 's/^/[TEST2] /'"
+            )
+            self.assertEqual(result, 1)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Cannot create a new backup", log_output)
+            self.assertIn("refs/original", log_output)
 
         # Run with force - should succeed
         result, stdout, stderr = self._run_cli(
@@ -810,12 +815,14 @@ class FilterBranchCommandTest(DulwichCliTestCase):
         self._run_cli("commit", "--message=Branch commit")
 
         # Run filter-branch on the test-branch
-        result, stdout, stderr = self._run_cli(
-            "filter-branch", "--msg-filter", "sed 's/^/[BRANCH] /'", "test-branch"
-        )
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "filter-branch", "--msg-filter", "sed 's/^/[BRANCH] /'", "test-branch"
+            )
 
-        self.assertEqual(result, 0)
-        self.assertIn("Ref 'refs/heads/test-branch' was rewritten", stdout)
+            self.assertEqual(result, 0)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Ref 'refs/heads/test-branch' was rewritten", log_output)
 
         # Check that only test-branch was modified
         result, stdout, stderr = self._run_cli("log")
@@ -962,9 +969,11 @@ class FormatPatchCommandTest(DulwichCliTestCase):
         )
 
         # Test format-patch for last commit
-        result, stdout, stderr = self._run_cli("format-patch", "-n", "1")
-        self.assertEqual(result, None)
-        self.assertIn("0001-Add-hello.txt.patch", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("format-patch", "-n", "1")
+            self.assertEqual(result, None)
+            log_output = "\n".join(cm.output)
+            self.assertIn("0001-Add-hello.txt.patch", log_output)
 
         # Check patch contents
         patch_file = os.path.join(self.repo_path, "0001-Add-hello.txt.patch")
@@ -1019,10 +1028,12 @@ class FormatPatchCommandTest(DulwichCliTestCase):
         )
 
         # Test format-patch for last 2 commits
-        result, stdout, stderr = self._run_cli("format-patch", "-n", "2")
-        self.assertEqual(result, None)
-        self.assertIn("0001-Add-file1.txt.patch", stdout)
-        self.assertIn("0002-Add-file2.txt.patch", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("format-patch", "-n", "2")
+            self.assertEqual(result, None)
+            log_output = "\n".join(cm.output)
+            self.assertIn("0001-Add-file1.txt.patch", log_output)
+            self.assertIn("0002-Add-file2.txt.patch", log_output)
 
         # Check first patch
         with open(os.path.join(self.repo_path, "0001-Add-file1.txt.patch"), "rb") as f:
@@ -1110,14 +1121,16 @@ class FormatPatchCommandTest(DulwichCliTestCase):
             commits.append(c)
 
         # Test format-patch with commit range (should get commits 2 and 3)
-        result, stdout, stderr = self._run_cli(
-            "format-patch", f"{commits[1].decode()}..{commits[3].decode()}"
-        )
-        self.assertEqual(result, None)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "format-patch", f"{commits[1].decode()}..{commits[3].decode()}"
+            )
+            self.assertEqual(result, None)
 
-        # Should create patches for commits 2 and 3
-        self.assertIn("0001-Add-file2.txt.patch", stdout)
-        self.assertIn("0002-Add-file3.txt.patch", stdout)
+            # Should create patches for commits 2 and 3
+            log_output = "\n".join(cm.output)
+            self.assertIn("0001-Add-file2.txt.patch", log_output)
+            self.assertIn("0002-Add-file3.txt.patch", log_output)
 
         # Verify patch contents
         with open(os.path.join(self.repo_path, "0001-Add-file2.txt.patch"), "rb") as f:
@@ -1307,8 +1320,12 @@ class ForEachRefCommandTest(DulwichCliTestCase):
         self._run_cli("add", "test.txt")
         self._run_cli("commit", "--message=Initial")
 
-        result, stdout, stderr = self._run_cli("for-each-ref")
-        self.assertIn("refs/heads/master", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("for-each-ref")
+            log_output = "\n".join(cm.output)
+            # Just check that we have some refs output and it contains refs/heads
+            self.assertTrue(len(cm.output) > 0, "Expected some ref output")
+            self.assertIn("refs/heads/", log_output)
 
 
 class PackRefsCommandTest(DulwichCliTestCase):
@@ -1374,8 +1391,9 @@ class StashCommandTest(DulwichCliTestCase):
             f.write("modified")
 
         # Stash changes
-        result, stdout, stderr = self._run_cli("stash", "push")
-        self.assertIn("Saved working directory", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("stash", "push")
+            self.assertIn("Saved working directory", cm.output[0])
 
         # Note: Dulwich stash doesn't currently update the working tree
         # so the file remains modified after stash push
@@ -1416,14 +1434,18 @@ class HelpCommandTest(DulwichCliTestCase):
     """Tests for help command."""
 
     def test_help_basic(self):
-        result, stdout, stderr = self._run_cli("help")
-        self.assertIn("dulwich command line tool", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("help")
+            log_output = "\n".join(cm.output)
+            self.assertIn("dulwich command line tool", log_output)
 
     def test_help_all(self):
-        result, stdout, stderr = self._run_cli("help", "-a")
-        self.assertIn("Available commands:", stdout)
-        self.assertIn("add", stdout)
-        self.assertIn("commit", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("help", "-a")
+            log_output = "\n".join(cm.output)
+            self.assertIn("Available commands:", log_output)
+            self.assertIn("add", log_output)
+            self.assertIn("commit", log_output)
 
 
 class RemoteCommandTest(DulwichCliTestCase):
@@ -1450,9 +1472,13 @@ class CheckIgnoreCommandTest(DulwichCliTestCase):
         with open(gitignore, "w") as f:
             f.write("*.log\n")
 
-        result, stdout, stderr = self._run_cli("check-ignore", "test.log", "test.txt")
-        self.assertIn("test.log", stdout)
-        self.assertNotIn("test.txt", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "check-ignore", "test.log", "test.txt"
+            )
+            log_output = "\n".join(cm.output)
+            self.assertIn("test.log", log_output)
+            self.assertNotIn("test.txt", log_output)
 
 
 class LsFilesCommandTest(DulwichCliTestCase):
@@ -1466,10 +1492,12 @@ class LsFilesCommandTest(DulwichCliTestCase):
                 f.write(f"content of {name}")
         self._run_cli("add", "a.txt", "b.txt", "c.txt")
 
-        result, stdout, stderr = self._run_cli("ls-files")
-        self.assertIn("a.txt", stdout)
-        self.assertIn("b.txt", stdout)
-        self.assertIn("c.txt", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("ls-files")
+            log_output = "\n".join(cm.output)
+            self.assertIn("a.txt", log_output)
+            self.assertIn("b.txt", log_output)
+            self.assertIn("c.txt", log_output)
 
 
 class LsTreeCommandTest(DulwichCliTestCase):
@@ -1515,8 +1543,9 @@ class DescribeCommandTest(DulwichCliTestCase):
         self._run_cli("commit", "--message=Initial")
         self._run_cli("tag", "v1.0")
 
-        result, stdout, stderr = self._run_cli("describe")
-        self.assertIn("v1.0", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("describe")
+            self.assertIn("v1.0", cm.output[0])
 
 
 class FsckCommandTest(DulwichCliTestCase):
@@ -1706,9 +1735,10 @@ class BundleCommandTest(DulwichCliTestCase):
         """Test bundle creation with no refs specified."""
         bundle_file = os.path.join(self.test_dir, "noref.bundle")
 
-        result, stdout, stderr = self._run_cli("bundle", "create", bundle_file)
-        self.assertEqual(result, 1)
-        self.assertIn("No refs specified", stdout)
+        with self.assertLogs("dulwich.cli", level="ERROR") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "create", bundle_file)
+            self.assertEqual(result, 1)
+            self.assertIn("No refs specified", cm.output[0])
 
     def test_bundle_create_empty_bundle_refused(self):
         """Test that empty bundles are refused."""
@@ -1729,9 +1759,10 @@ class BundleCommandTest(DulwichCliTestCase):
         self.assertEqual(result, 0)
 
         # Now verify it
-        result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
-        self.assertEqual(result, 0)
-        self.assertIn("valid and can be applied", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
+            self.assertEqual(result, 0)
+            self.assertIn("valid and can be applied", cm.output[0])
 
     def test_bundle_verify_quiet(self):
         """Test bundle verification with quiet flag."""
@@ -1776,10 +1807,11 @@ class BundleCommandTest(DulwichCliTestCase):
         self._run_cli("bundle", "create", bundle_file, "HEAD")
 
         # List heads
-        result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
-        self.assertEqual(result, 0)
-        # Should contain at least the HEAD reference
-        self.assertTrue(len(stdout.strip()) > 0)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
+            self.assertEqual(result, 0)
+            # Should contain at least the HEAD reference
+            self.assertTrue(len(cm.output) > 0)
 
     def test_bundle_list_heads_specific_refs(self):
         """Test listing specific bundle heads."""
@@ -1789,10 +1821,11 @@ class BundleCommandTest(DulwichCliTestCase):
         self._run_cli("bundle", "create", bundle_file, "HEAD")
 
         # List heads without filtering
-        result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
-        self.assertEqual(result, 0)
-        # Should contain some reference
-        self.assertTrue(len(stdout.strip()) > 0)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
+            self.assertEqual(result, 0)
+            # Should contain some reference
+            self.assertTrue(len(cm.output) > 0)
 
     def test_bundle_list_heads_from_stdin(self):
         """Test listing bundle heads from stdin."""
@@ -1918,15 +1951,17 @@ class BundleCommandTest(DulwichCliTestCase):
 
     def test_bundle_invalid_subcommand(self):
         """Test invalid bundle subcommand."""
-        result, stdout, stderr = self._run_cli("bundle", "invalid-command")
-        self.assertEqual(result, 1)
-        self.assertIn("Unknown bundle subcommand", stdout)
+        with self.assertLogs("dulwich.cli", level="ERROR") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "invalid-command")
+            self.assertEqual(result, 1)
+            self.assertIn("Unknown bundle subcommand", cm.output[0])
 
     def test_bundle_no_subcommand(self):
         """Test bundle command with no subcommand."""
-        result, stdout, stderr = self._run_cli("bundle")
-        self.assertEqual(result, 1)
-        self.assertIn("Usage: bundle", stdout)
+        with self.assertLogs("dulwich.cli", level="ERROR") as cm:
+            result, stdout, stderr = self._run_cli("bundle")
+            self.assertEqual(result, 1)
+            self.assertIn("Usage: bundle", cm.output[0])
 
     def test_bundle_create_with_stdin_refs(self):
         """Test bundle creation reading refs from stdin."""
@@ -2003,9 +2038,10 @@ class BundleCommandTest(DulwichCliTestCase):
         self.assertTrue(os.path.exists(bundle_file))
 
         # Verify the bundle was created
-        result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
-        self.assertEqual(result, 0)
-        self.assertIn("valid and can be applied", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
+            self.assertEqual(result, 0)
+            self.assertIn("valid and can be applied", cm.output[0])
 
 
 class FormatBytesTestCase(TestCase):
@@ -2355,19 +2391,14 @@ class WorktreeCliTests(DulwichCliTestCase):
         """Test worktree add command."""
         wt_path = os.path.join(self.test_dir, "worktree1")
 
-        # Change to repo directory like real usage
-        old_cwd = os.getcwd()
-        os.chdir(self.repo_path)
-        try:
-            cmd = cli.cmd_worktree()
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["add", wt_path, "feature"])
-
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "worktree", "add", wt_path, "feature"
+            )
             self.assertEqual(result, 0)
             self.assertTrue(os.path.exists(wt_path))
-            self.assertIn("Worktree added:", mock_stdout.getvalue())
-        finally:
-            os.chdir(old_cwd)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Worktree added:", log_output)
 
     def test_worktree_add_detached(self):
         """Test worktree add with detached HEAD."""
@@ -2390,98 +2421,73 @@ class WorktreeCliTests(DulwichCliTestCase):
         """Test worktree remove command."""
         # First add a worktree
         wt_path = os.path.join(self.test_dir, "to-remove")
+        result, stdout, stderr = self._run_cli("worktree", "add", wt_path)
+        self.assertEqual(result, 0)
 
-        # Change to repo directory
-        old_cwd = os.getcwd()
-        os.chdir(self.repo_path)
-        try:
-            cmd = cli.cmd_worktree()
-            cmd.run(["add", wt_path])
-
-            # Then remove it
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["remove", wt_path])
-
+        # Then remove it
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("worktree", "remove", wt_path)
             self.assertEqual(result, 0)
             self.assertFalse(os.path.exists(wt_path))
-            self.assertIn("Worktree removed:", mock_stdout.getvalue())
-        finally:
-            os.chdir(old_cwd)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Worktree removed:", log_output)
 
     def test_worktree_prune(self):
         """Test worktree prune command."""
         # Add a worktree and manually remove it
         wt_path = os.path.join(self.test_dir, "to-prune")
+        result, stdout, stderr = self._run_cli("worktree", "add", wt_path)
+        self.assertEqual(result, 0)
+        shutil.rmtree(wt_path)
 
-        # Change to repo directory
-        old_cwd = os.getcwd()
-        os.chdir(self.repo_path)
-        try:
-            cmd = cli.cmd_worktree()
-            cmd.run(["add", wt_path])
-            shutil.rmtree(wt_path)
-
-            # Prune
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["prune", "-v"])
-
+        # Prune
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("worktree", "prune", "-v")
             self.assertEqual(result, 0)
-            output = mock_stdout.getvalue()
-            self.assertIn("to-prune", output)
-        finally:
-            os.chdir(old_cwd)
+            log_output = "\n".join(cm.output)
+            self.assertIn("to-prune", log_output)
 
     def test_worktree_lock_unlock(self):
         """Test worktree lock and unlock commands."""
         # Add a worktree
         wt_path = os.path.join(self.test_dir, "lockable")
+        result, stdout, stderr = self._run_cli("worktree", "add", wt_path)
+        self.assertEqual(result, 0)
 
-        # Change to repo directory
-        old_cwd = os.getcwd()
-        os.chdir(self.repo_path)
-        try:
-            cmd = cli.cmd_worktree()
-            cmd.run(["add", wt_path])
-
-            # Lock it
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["lock", wt_path, "--reason", "Testing"])
-
+        # Lock it
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "worktree", "lock", wt_path, "--reason", "Testing"
+            )
             self.assertEqual(result, 0)
-            self.assertIn("Worktree locked:", mock_stdout.getvalue())
-
-            # Unlock it
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["unlock", wt_path])
+            log_output = "\n".join(cm.output)
+            self.assertIn("Worktree locked:", log_output)
 
+        # Unlock it
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("worktree", "unlock", wt_path)
             self.assertEqual(result, 0)
-            self.assertIn("Worktree unlocked:", mock_stdout.getvalue())
-        finally:
-            os.chdir(old_cwd)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Worktree unlocked:", log_output)
 
     def test_worktree_move(self):
         """Test worktree move command."""
         # Add a worktree
         old_path = os.path.join(self.test_dir, "old-location")
         new_path = os.path.join(self.test_dir, "new-location")
+        result, stdout, stderr = self._run_cli("worktree", "add", old_path)
+        self.assertEqual(result, 0)
 
-        # Change to repo directory
-        old_cwd = os.getcwd()
-        os.chdir(self.repo_path)
-        try:
-            cmd = cli.cmd_worktree()
-            cmd.run(["add", old_path])
-
-            # Move it
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["move", old_path, new_path])
-
+        # Move it
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "worktree", "move", old_path, new_path
+            )
             self.assertEqual(result, 0)
             self.assertFalse(os.path.exists(old_path))
             self.assertTrue(os.path.exists(new_path))
-            self.assertIn("Worktree moved:", mock_stdout.getvalue())
-        finally:
-            os.chdir(old_cwd)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Worktree moved:", log_output)
 
     def test_worktree_invalid_command(self):
         """Test invalid worktree subcommand."""

+ 20 - 22
tests/test_cli_merge.py

@@ -21,11 +21,9 @@
 
 """Tests for dulwich merge CLI command."""
 
-import io
 import os
 import tempfile
 import unittest
-from unittest.mock import patch
 
 from dulwich import porcelain
 from dulwich.cli import main
@@ -65,12 +63,12 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             try:
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="INFO") as cm:
                     ret = main(["merge", "feature"])
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
                 self.assertEqual(ret, 0)  # Success
-                self.assertIn("Merge successful", output)
+                self.assertIn("Merge successful", log_output)
 
                 # Check that file2.txt exists
                 self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
@@ -108,13 +106,13 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             try:
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="WARNING") as cm:
                     retcode = main(["merge", "feature"])
                     self.assertEqual(retcode, 1)
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
-                self.assertIn("Merge conflicts", output)
-                self.assertIn("file1.txt", output)
+                self.assertIn("Merge conflicts", log_output)
+                self.assertIn("file1.txt", log_output)
             finally:
                 os.chdir(old_cwd)
 
@@ -134,12 +132,12 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             try:
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="INFO") as cm:
                     ret = main(["merge", "HEAD"])
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
                 self.assertEqual(ret, 0)  # Success
-                self.assertIn("Already up to date", output)
+                self.assertIn("Already up to date", log_output)
             finally:
                 os.chdir(old_cwd)
 
@@ -176,12 +174,12 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             try:
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="INFO") as cm:
                     ret = main(["merge", "--no-commit", "feature"])
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
                 self.assertEqual(ret, 0)  # Success
-                self.assertIn("not committing", output)
+                self.assertIn("not committing", log_output)
 
                 # Check that files are merged
                 self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
@@ -218,13 +216,13 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             try:
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="INFO") as cm:
                     ret = main(["merge", "--no-ff", "feature"])
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
                 self.assertEqual(ret, 0)  # Success
-                self.assertIn("Merge successful", output)
-                self.assertIn("Created merge commit", output)
+                self.assertIn("Merge successful", log_output)
+                self.assertIn("Created merge commit", log_output)
             finally:
                 os.chdir(old_cwd)
 
@@ -261,12 +259,12 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             try:
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="INFO") as cm:
                     ret = main(["merge", "-m", "Custom merge message", "feature"])
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
                 self.assertEqual(ret, 0)  # Success
-                self.assertIn("Merge successful", output)
+                self.assertIn("Merge successful", log_output)
             finally:
                 os.chdir(old_cwd)
 

+ 40 - 0
tests/test_client.py

@@ -424,6 +424,20 @@ class TestGetTransportAndPath(TestCase):
         self.assertEqual(1234, c._port)
         self.assertEqual("/bar/baz", path)
 
+    def test_tcp_ipv6(self) -> None:
+        c, path = get_transport_and_path("git://[::1]/bar/baz")
+        self.assertIsInstance(c, TCPGitClient)
+        self.assertEqual("::1", c._host)
+        self.assertEqual(TCP_GIT_PORT, c._port)
+        self.assertEqual("/bar/baz", path)
+
+    def test_tcp_ipv6_port(self) -> None:
+        c, path = get_transport_and_path("git://[2001:db8::1]:1234/bar/baz")
+        self.assertIsInstance(c, TCPGitClient)
+        self.assertEqual("2001:db8::1", c._host)
+        self.assertEqual(1234, c._port)
+        self.assertEqual("/bar/baz", path)
+
     def test_git_ssh_explicit(self) -> None:
         c, path = get_transport_and_path("git+ssh://foo.com/bar/baz")
         self.assertIsInstance(c, SSHGitClient)
@@ -1642,6 +1656,32 @@ class TCPGitClientTests(TestCase):
         url = c.get_url(path)
         self.assertEqual("git://github.com:9090/jelmer/dulwich", url)
 
+    def test_get_url_with_ipv6(self) -> None:
+        host = "::1"
+        path = "/jelmer/dulwich"
+        c = TCPGitClient(host)
+
+        url = c.get_url(path)
+        self.assertEqual("git://[::1]/jelmer/dulwich", url)
+
+    def test_get_url_with_ipv6_and_port(self) -> None:
+        host = "2001:db8::1"
+        path = "/jelmer/dulwich"
+        port = 9090
+        c = TCPGitClient(host, port=port)
+
+        url = c.get_url(path)
+        self.assertEqual("git://[2001:db8::1]:9090/jelmer/dulwich", url)
+
+    def test_get_url_with_ipv6_default_port(self) -> None:
+        host = "2001:db8::1"
+        path = "/jelmer/dulwich"
+        port = TCP_GIT_PORT  # Default port should not be included in URL
+        c = TCPGitClient(host, port=port)
+
+        url = c.get_url(path)
+        self.assertEqual("git://[2001:db8::1]/jelmer/dulwich", url)
+
 
 class DefaultUrllib3ManagerTest(TestCase):
     def test_no_config(self) -> None:

+ 18 - 12
tests/test_commit_graph.py

@@ -119,18 +119,22 @@ class CommitGraphTests(unittest.TestCase):
     def test_from_invalid_signature(self) -> None:
         data = b"XXXX" + b"\\x00" * 100
         f = io.BytesIO(data)
-
-        with self.assertRaises(ValueError) as cm:
-            CommitGraph.from_file(f)
-        self.assertIn("Invalid commit graph signature", str(cm.exception))
+        try:
+            with self.assertRaises(ValueError) as cm:
+                CommitGraph.from_file(f)
+            self.assertIn("Invalid commit graph signature", str(cm.exception))
+        finally:
+            f.close()
 
     def test_from_invalid_version(self) -> None:
         data = COMMIT_GRAPH_SIGNATURE + struct.pack(">B", 99) + b"\\x00" * 100
         f = io.BytesIO(data)
-
-        with self.assertRaises(ValueError) as cm:
-            CommitGraph.from_file(f)
-        self.assertIn("Unsupported commit graph version", str(cm.exception))
+        try:
+            with self.assertRaises(ValueError) as cm:
+                CommitGraph.from_file(f)
+            self.assertIn("Unsupported commit graph version", str(cm.exception))
+        finally:
+            f.close()
 
     def test_from_invalid_hash_version(self) -> None:
         data = (
@@ -140,10 +144,12 @@ class CommitGraphTests(unittest.TestCase):
             + b"\\x00" * 100
         )
         f = io.BytesIO(data)
-
-        with self.assertRaises(ValueError) as cm:
-            CommitGraph.from_file(f)
-        self.assertIn("Unsupported hash version", str(cm.exception))
+        try:
+            with self.assertRaises(ValueError) as cm:
+                CommitGraph.from_file(f)
+            self.assertIn("Unsupported hash version", str(cm.exception))
+        finally:
+            f.close()
 
     def create_minimal_commit_graph_data(self) -> bytes:
         """Create minimal valid commit graph data for testing."""

+ 59 - 28
tests/test_gc.py

@@ -21,6 +21,10 @@ from dulwich.objects import Blob, Commit, Tag, Tree
 from dulwich.repo import MemoryRepo, Repo
 
 
+def no_op_progress(msg):
+    """Progress callback that does nothing."""
+
+
 class GCTestCase(TestCase):
     """Tests for garbage collection functionality."""
 
@@ -159,7 +163,9 @@ class GCTestCase(TestCase):
         self.repo.object_store.add_object(unreachable_blob)
 
         # Run garbage collection (grace_period=None means no grace period check)
-        stats = garbage_collect(self.repo, prune=True, grace_period=None)
+        stats = garbage_collect(
+            self.repo, prune=True, grace_period=None, progress=no_op_progress
+        )
 
         # Check results
         self.assertIsInstance(stats, GCStats)
@@ -180,7 +186,7 @@ class GCTestCase(TestCase):
         self.repo.object_store.add_object(unreachable_blob)
 
         # Run garbage collection without pruning
-        stats = garbage_collect(self.repo, prune=False)
+        stats = garbage_collect(self.repo, prune=False, progress=no_op_progress)
 
         # Check that nothing was pruned
         self.assertEqual(set(), stats.pruned_objects)
@@ -194,7 +200,13 @@ class GCTestCase(TestCase):
         self.repo.object_store.add_object(unreachable_blob)
 
         # Run garbage collection with dry run (grace_period=None means no grace period check)
-        stats = garbage_collect(self.repo, prune=True, grace_period=None, dry_run=True)
+        stats = garbage_collect(
+            self.repo,
+            prune=True,
+            grace_period=None,
+            dry_run=True,
+            progress=no_op_progress,
+        )
 
         # Check that object would be pruned but still exists
         # On Windows, the repository initialization might create additional unreachable objects
@@ -214,7 +226,13 @@ class GCTestCase(TestCase):
 
         # Run garbage collection with a 1 hour grace period, but dry run to avoid packing
         # The object was just created, so it should not be pruned
-        stats = garbage_collect(self.repo, prune=True, grace_period=3600, dry_run=True)
+        stats = garbage_collect(
+            self.repo,
+            prune=True,
+            grace_period=3600,
+            dry_run=True,
+            progress=no_op_progress,
+        )
 
         # Check that the object was NOT pruned
         self.assertEqual(set(), stats.pruned_objects)
@@ -244,7 +262,9 @@ class GCTestCase(TestCase):
 
         # Run garbage collection with a 1 hour grace period
         # The object is 2 hours old, so it should be pruned
-        stats = garbage_collect(self.repo, prune=True, grace_period=3600)
+        stats = garbage_collect(
+            self.repo, prune=True, grace_period=3600, progress=no_op_progress
+        )
 
         # Check that the object was pruned
         self.assertEqual({old_blob.id}, stats.pruned_objects)
@@ -257,14 +277,16 @@ class GCTestCase(TestCase):
         self.repo.object_store.add_object(unreachable_blob)
 
         # Pack the objects to ensure the blob is in a pack
-        self.repo.object_store.pack_loose_objects()
+        self.repo.object_store.pack_loose_objects(progress=no_op_progress)
 
         # Ensure the object is NOT loose anymore
         self.assertFalse(self.repo.object_store.contains_loose(unreachable_blob.id))
         self.assertIn(unreachable_blob.id, self.repo.object_store)
 
         # Run garbage collection (grace_period=None means no grace period check)
-        stats = garbage_collect(self.repo, prune=True, grace_period=None)
+        stats = garbage_collect(
+            self.repo, prune=True, grace_period=None, progress=no_op_progress
+        )
 
         # Check that the packed object was pruned
         self.assertEqual({unreachable_blob.id}, stats.pruned_objects)
@@ -410,7 +432,9 @@ class GCTestCase(TestCase):
             self.repo.object_store, "get_object_mtime", side_effect=KeyError
         ):
             # Run garbage collection with grace period
-            stats = garbage_collect(self.repo, prune=True, grace_period=3600)
+            stats = garbage_collect(
+                self.repo, prune=True, grace_period=3600, progress=no_op_progress
+            )
 
         # Object should be kept because mtime couldn't be determined
         self.assertEqual(set(), stats.pruned_objects)
@@ -487,7 +511,7 @@ class AutoGCTestCase(TestCase):
                 blob = Blob()
                 blob.data = f"test blob {i}".encode()
                 r.object_store.add_object(blob)
-                r.object_store.pack_loose_objects()
+                r.object_store.pack_loose_objects(progress=no_op_progress)
 
             # Force re-enumeration of packs
             r.object_store._update_pack_cache()
@@ -525,7 +549,7 @@ class AutoGCTestCase(TestCase):
             blob = Blob()
             blob.data = b"test blob"
             r.object_store.add_object(blob)
-            r.object_store.pack_loose_objects()
+            r.object_store.pack_loose_objects(progress=no_op_progress)
 
             # Force re-enumeration of packs
             r.object_store._update_pack_cache()
@@ -547,10 +571,10 @@ class AutoGCTestCase(TestCase):
                 r.object_store.add_object(blob)
 
             with patch("dulwich.gc.garbage_collect") as mock_gc:
-                result = maybe_auto_gc(r, config)
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
             self.assertTrue(result)
-            mock_gc.assert_called_once_with(r, auto=True)
+            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
 
     def test_maybe_auto_gc_skips_when_not_needed(self):
         """Test that auto GC doesn't run when thresholds are not exceeded."""
@@ -558,7 +582,7 @@ class AutoGCTestCase(TestCase):
         config = ConfigDict()
 
         with patch("dulwich.gc.garbage_collect") as mock_gc:
-            result = maybe_auto_gc(r, config)
+            result = maybe_auto_gc(r, config, progress=no_op_progress)
 
         self.assertFalse(result)
         mock_gc.assert_not_called()
@@ -580,12 +604,15 @@ class AutoGCTestCase(TestCase):
             blob.data = b"test"
             r.object_store.add_object(blob)
 
-            with patch("builtins.print") as mock_print:
-                result = maybe_auto_gc(r, config)
+            # Capture log messages
+            import logging
+
+            with self.assertLogs(level=logging.INFO) as cm:
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
             self.assertFalse(result)
-            # Verify gc.log contents were printed
-            mock_print.assert_called_once_with("Previous GC failed\n")
+            # Verify gc.log contents were logged
+            self.assertTrue(any("Previous GC failed" in msg for msg in cm.output))
 
     def test_maybe_auto_gc_with_expired_gc_log(self):
         """Test that auto GC runs when gc.log exists but is expired."""
@@ -610,10 +637,10 @@ class AutoGCTestCase(TestCase):
             r.object_store.add_object(blob)
 
             with patch("dulwich.gc.garbage_collect") as mock_gc:
-                result = maybe_auto_gc(r, config)
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
             self.assertTrue(result)
-            mock_gc.assert_called_once_with(r, auto=True)
+            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
             # gc.log should be removed after successful GC
             self.assertFalse(os.path.exists(gc_log_path))
 
@@ -632,10 +659,10 @@ class AutoGCTestCase(TestCase):
             with patch(
                 "dulwich.gc.garbage_collect", side_effect=OSError("GC failed")
             ) as mock_gc:
-                result = maybe_auto_gc(r, config)
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
             self.assertFalse(result)
-            mock_gc.assert_called_once_with(r, auto=True)
+            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
 
             # Check that error was written to gc.log
             gc_log_path = os.path.join(r.controldir(), "gc.log")
@@ -667,10 +694,10 @@ class AutoGCTestCase(TestCase):
             r.object_store.add_object(blob)
 
             with patch("dulwich.gc.garbage_collect") as mock_gc:
-                result = maybe_auto_gc(r, config)
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
             self.assertTrue(result)
-            mock_gc.assert_called_once_with(r, auto=True)
+            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
 
     def test_gc_log_expiry_invalid_format(self):
         """Test that invalid gc.logExpiry format defaults to 1 day."""
@@ -694,12 +721,16 @@ class AutoGCTestCase(TestCase):
             blob.data = b"test"
             r.object_store.add_object(blob)
 
-            with patch("builtins.print") as mock_print:
-                result = maybe_auto_gc(r, config)
+            # Capture log messages
+            import logging
+
+            with self.assertLogs(level=logging.INFO) as cm:
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
             # Should not run GC because gc.log is recent (within default 1 day)
             self.assertFalse(result)
-            mock_print.assert_called_once()
+            # Check that gc.log content was logged
+            self.assertTrue(any("gc.log content:" in msg for msg in cm.output))
 
     def test_maybe_auto_gc_non_disk_repo(self):
         """Test auto GC on non-disk repository (MemoryRepo)."""
@@ -715,7 +746,7 @@ class AutoGCTestCase(TestCase):
 
         # For non-disk repos, should_run_gc returns False
         # because it can't count loose objects
-        result = maybe_auto_gc(r, config)
+        result = maybe_auto_gc(r, config, progress=no_op_progress)
         self.assertFalse(result)
 
     def test_gc_removes_existing_gc_log_on_success(self):
@@ -740,7 +771,7 @@ class AutoGCTestCase(TestCase):
             r.object_store.add_object(blob)
 
             # Run auto GC
-            result = maybe_auto_gc(r, config)
+            result = maybe_auto_gc(r, config, progress=no_op_progress)
 
             self.assertTrue(result)
             # gc.log should be removed after successful GC

+ 39 - 4
tests/test_lfs.py

@@ -36,10 +36,22 @@ from . import TestCase
 class LFSTests(TestCase):
     def setUp(self) -> None:
         super().setUp()
+        # Suppress LFS warnings during these tests
+        import logging
+
+        self._old_level = logging.getLogger("dulwich.lfs").level
+        logging.getLogger("dulwich.lfs").setLevel(logging.ERROR)
         self.test_dir = tempfile.mkdtemp()
         self.addCleanup(shutil.rmtree, self.test_dir)
         self.lfs = LFSStore.create(self.test_dir)
 
+    def tearDown(self) -> None:
+        # Restore original logging level
+        import logging
+
+        logging.getLogger("dulwich.lfs").setLevel(self._old_level)
+        super().tearDown()
+
     def test_create(self) -> None:
         sha = self.lfs.write_object([b"a", b"b"])
         with self.lfs.open_object(sha) as f:
@@ -209,19 +221,30 @@ class LFSIntegrationTests(TestCase):
 
     def setUp(self) -> None:
         super().setUp()
-        import os
+        # Suppress LFS warnings during these integration tests
+        import logging
 
-        from dulwich.repo import Repo
+        self._old_level = logging.getLogger("dulwich.lfs").level
+        logging.getLogger("dulwich.lfs").setLevel(logging.ERROR)
 
         # Create temporary directory for test repo
         self.test_dir = tempfile.mkdtemp()
         self.addCleanup(shutil.rmtree, self.test_dir)
 
         # Initialize repo
+        from dulwich.repo import Repo
+
         self.repo = Repo.init(self.test_dir)
         self.lfs_dir = os.path.join(self.test_dir, ".git", "lfs")
         self.lfs_store = LFSStore.create(self.lfs_dir)
 
+    def tearDown(self) -> None:
+        # Restore original logging level
+        import logging
+
+        logging.getLogger("dulwich.lfs").setLevel(self._old_level)
+        super().tearDown()
+
     def test_lfs_with_gitattributes(self) -> None:
         """Test LFS integration with .gitattributes."""
         import os
@@ -701,7 +724,13 @@ class LFSServerTests(TestCase):
         self.server_thread = threading.Thread(target=self.server.serve_forever)
         self.server_thread.daemon = True
         self.server_thread.start()
-        self.addCleanup(self.server.shutdown)
+
+        def cleanup_server():
+            self.server.shutdown()
+            self.server.server_close()
+            self.server_thread.join(timeout=1.0)
+
+        self.addCleanup(cleanup_server)
 
     def test_server_batch_endpoint(self) -> None:
         """Test the batch endpoint directly."""
@@ -974,7 +1003,13 @@ class LFSClientTests(TestCase):
         self.server_thread = threading.Thread(target=self.server.serve_forever)
         self.server_thread.daemon = True
         self.server_thread.start()
-        self.addCleanup(self.server.shutdown)
+
+        def cleanup_server():
+            self.server.shutdown()
+            self.server.server_close()
+            self.server_thread.join(timeout=1.0)
+
+        self.addCleanup(cleanup_server)
 
         # Create LFS client pointing to our test server
         self.client = LFSClient(self.server_url)

+ 12 - 0
tests/test_lfs_integration.py

@@ -35,6 +35,11 @@ from . import TestCase
 class LFSFilterIntegrationTests(TestCase):
     def setUp(self) -> None:
         super().setUp()
+        # Suppress LFS warnings during these integration tests
+        import logging
+
+        self._old_level = logging.getLogger("dulwich.lfs").level
+        logging.getLogger("dulwich.lfs").setLevel(logging.ERROR)
         # Create temporary directory for LFS store
         self.test_dir = tempfile.mkdtemp()
         self.addCleanup(shutil.rmtree, self.test_dir)
@@ -60,6 +65,13 @@ class LFSFilterIntegrationTests(TestCase):
             self.config, self.gitattributes, self.registry
         )
 
+    def tearDown(self) -> None:
+        # Restore original logging level
+        import logging
+
+        logging.getLogger("dulwich.lfs").setLevel(self._old_level)
+        super().tearDown()
+
     def test_lfs_round_trip(self) -> None:
         """Test complete LFS round trip through filter normalizer."""
         # Create a blob with binary content

+ 51 - 39
tests/test_pack.py

@@ -430,33 +430,39 @@ class TestPackData(PackTests):
 
     def test_compute_file_sha(self) -> None:
         f = BytesIO(b"abcd1234wxyz")
-        self.assertEqual(
-            sha1(b"abcd1234wxyz").hexdigest(), compute_file_sha(f).hexdigest()
-        )
-        self.assertEqual(
-            sha1(b"abcd1234wxyz").hexdigest(),
-            compute_file_sha(f, buffer_size=5).hexdigest(),
-        )
-        self.assertEqual(
-            sha1(b"abcd1234").hexdigest(),
-            compute_file_sha(f, end_ofs=-4).hexdigest(),
-        )
-        self.assertEqual(
-            sha1(b"1234wxyz").hexdigest(),
-            compute_file_sha(f, start_ofs=4).hexdigest(),
-        )
-        self.assertEqual(
-            sha1(b"1234").hexdigest(),
-            compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest(),
-        )
+        try:
+            self.assertEqual(
+                sha1(b"abcd1234wxyz").hexdigest(), compute_file_sha(f).hexdigest()
+            )
+            self.assertEqual(
+                sha1(b"abcd1234wxyz").hexdigest(),
+                compute_file_sha(f, buffer_size=5).hexdigest(),
+            )
+            self.assertEqual(
+                sha1(b"abcd1234").hexdigest(),
+                compute_file_sha(f, end_ofs=-4).hexdigest(),
+            )
+            self.assertEqual(
+                sha1(b"1234wxyz").hexdigest(),
+                compute_file_sha(f, start_ofs=4).hexdigest(),
+            )
+            self.assertEqual(
+                sha1(b"1234").hexdigest(),
+                compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest(),
+            )
+        finally:
+            f.close()
 
     def test_compute_file_sha_short_file(self) -> None:
         f = BytesIO(b"abcd1234wxyz")
-        self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20)
-        self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20)
-        self.assertRaises(
-            AssertionError, compute_file_sha, f, start_ofs=10, end_ofs=-12
-        )
+        try:
+            self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20)
+            self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20)
+            self.assertRaises(
+                AssertionError, compute_file_sha, f, start_ofs=10, end_ofs=-12
+            )
+        finally:
+            f.close()
 
 
 class TestPack(PackTests):
@@ -729,24 +735,30 @@ class TestThinPack(PackTests):
 class WritePackTests(TestCase):
     def test_write_pack_header(self) -> None:
         f = BytesIO()
-        write_pack_header(f.write, 42)
-        self.assertEqual(b"PACK\x00\x00\x00\x02\x00\x00\x00*", f.getvalue())
+        try:
+            write_pack_header(f.write, 42)
+            self.assertEqual(b"PACK\x00\x00\x00\x02\x00\x00\x00*", f.getvalue())
+        finally:
+            f.close()
 
     def test_write_pack_object(self) -> None:
         f = BytesIO()
-        f.write(b"header")
-        offset = f.tell()
-        crc32 = write_pack_object(f.write, Blob.type_num, b"blob")
-        self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xFFFFFFFF)
-
-        f.write(b"x")  # unpack_object needs extra trailing data.
-        f.seek(offset)
-        unpacked, unused = unpack_object(f.read, compute_crc32=True)
-        self.assertEqual(Blob.type_num, unpacked.pack_type_num)
-        self.assertEqual(Blob.type_num, unpacked.obj_type_num)
-        self.assertEqual([b"blob"], unpacked.decomp_chunks)
-        self.assertEqual(crc32, unpacked.crc32)
-        self.assertEqual(b"x", unused)
+        try:
+            f.write(b"header")
+            offset = f.tell()
+            crc32 = write_pack_object(f.write, Blob.type_num, b"blob")
+            self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xFFFFFFFF)
+
+            f.write(b"x")  # unpack_object needs extra trailing data.
+            f.seek(offset)
+            unpacked, unused = unpack_object(f.read, compute_crc32=True)
+            self.assertEqual(Blob.type_num, unpacked.pack_type_num)
+            self.assertEqual(Blob.type_num, unpacked.obj_type_num)
+            self.assertEqual([b"blob"], unpacked.decomp_chunks)
+            self.assertEqual(crc32, unpacked.crc32)
+            self.assertEqual(b"x", unused)
+        finally:
+            f.close()
 
     def test_write_pack_object_sha(self) -> None:
         f = BytesIO()

+ 162 - 0
tests/test_patch.py

@@ -27,8 +27,10 @@ from typing import NoReturn
 from dulwich.object_store import MemoryObjectStore
 from dulwich.objects import S_IFGITLINK, Blob, Commit, Tree
 from dulwich.patch import (
+    DiffAlgorithmNotAvailable,
     get_summary,
     git_am_patch_split,
+    unified_diff_with_algorithm,
     write_blob_diff,
     write_commit_patch,
     write_object_diff,
@@ -635,3 +637,163 @@ class GetSummaryTests(TestCase):
         c.message = b"This is the first line\nAnd this is the second line.\n"
         c.tree = Tree().id
         self.assertEqual("This-is-the-first-line", get_summary(c))
+
+
+class DiffAlgorithmTests(TestCase):
+    """Tests for diff algorithm selection."""
+
+    def test_unified_diff_with_myers(self) -> None:
+        """Test unified_diff_with_algorithm with default myers algorithm."""
+        a = [b"line1\n", b"line2\n", b"line3\n"]
+        b = [b"line1\n", b"line2 modified\n", b"line3\n"]
+
+        result = list(
+            unified_diff_with_algorithm(
+                a, b, fromfile=b"a.txt", tofile=b"b.txt", algorithm="myers"
+            )
+        )
+
+        # Should contain diff headers and the change
+        self.assertTrue(any(b"---" in line for line in result))
+        self.assertTrue(any(b"+++" in line for line in result))
+        self.assertTrue(any(b"-line2" in line for line in result))
+        self.assertTrue(any(b"+line2 modified" in line for line in result))
+
+    def test_unified_diff_with_patience_not_available(self) -> None:
+        """Test that DiffAlgorithmNotAvailable is raised when patience not available."""
+        # Temporarily mock _get_sequence_matcher to simulate ImportError
+        import dulwich.patch
+
+        original = dulwich.patch._get_sequence_matcher
+
+        def mock_get_sequence_matcher(algorithm, a, b):
+            if algorithm == "patience":
+                raise DiffAlgorithmNotAvailable(
+                    "patience", "Install with: pip install 'dulwich[patiencediff]'"
+                )
+            return original(algorithm, a, b)
+
+        try:
+            dulwich.patch._get_sequence_matcher = mock_get_sequence_matcher
+
+            a = [b"line1\n", b"line2\n", b"line3\n"]
+            b = [b"line1\n", b"line2 modified\n", b"line3\n"]
+
+            with self.assertRaises(DiffAlgorithmNotAvailable) as cm:
+                list(
+                    unified_diff_with_algorithm(
+                        a, b, fromfile=b"a.txt", tofile=b"b.txt", algorithm="patience"
+                    )
+                )
+
+            self.assertIn("patience", str(cm.exception))
+            self.assertIn("pip install", str(cm.exception))
+        finally:
+            dulwich.patch._get_sequence_matcher = original
+
+
+class PatienceDiffTests(TestCase):
+    """Tests for patience diff algorithm support."""
+
+    def setUp(self) -> None:
+        super().setUp()
+        # Skip all patience diff tests if patiencediff is not available
+        try:
+            import patiencediff  # noqa: F401
+        except ImportError:
+            raise SkipTest("patiencediff not available")
+
+    def test_unified_diff_with_patience_available(self) -> None:
+        """Test unified_diff_with_algorithm with patience if available."""
+        a = [b"line1\n", b"line2\n", b"line3\n"]
+        b = [b"line1\n", b"line2 modified\n", b"line3\n"]
+
+        result = list(
+            unified_diff_with_algorithm(
+                a, b, fromfile=b"a.txt", tofile=b"b.txt", algorithm="patience"
+            )
+        )
+
+        # Should contain diff headers and the change
+        self.assertTrue(any(b"---" in line for line in result))
+        self.assertTrue(any(b"+++" in line for line in result))
+        self.assertTrue(any(b"-line2" in line for line in result))
+        self.assertTrue(any(b"+line2 modified" in line for line in result))
+
+    def test_unified_diff_with_patience_not_available(self) -> None:
+        """Test that DiffAlgorithmNotAvailable is raised when patience not available."""
+        # Temporarily mock _get_sequence_matcher to simulate ImportError
+        import dulwich.patch
+
+        original = dulwich.patch._get_sequence_matcher
+
+        def mock_get_sequence_matcher(algorithm, a, b):
+            if algorithm == "patience":
+                raise DiffAlgorithmNotAvailable(
+                    "patience", "Install with: pip install 'dulwich[patiencediff]'"
+                )
+            return original(algorithm, a, b)
+
+        try:
+            dulwich.patch._get_sequence_matcher = mock_get_sequence_matcher
+
+            a = [b"line1\n", b"line2\n", b"line3\n"]
+            b = [b"line1\n", b"line2 modified\n", b"line3\n"]
+
+            with self.assertRaises(DiffAlgorithmNotAvailable) as cm:
+                list(
+                    unified_diff_with_algorithm(
+                        a, b, fromfile=b"a.txt", tofile=b"b.txt", algorithm="patience"
+                    )
+                )
+
+            self.assertIn("patience", str(cm.exception))
+            self.assertIn("pip install", str(cm.exception))
+        finally:
+            dulwich.patch._get_sequence_matcher = original
+
+    def test_write_blob_diff_with_patience(self) -> None:
+        """Test write_blob_diff with patience algorithm if available."""
+        f = BytesIO()
+        old_blob = Blob()
+        old_blob.data = b"line1\nline2\nline3\n"
+        new_blob = Blob()
+        new_blob.data = b"line1\nline2 modified\nline3\n"
+
+        write_blob_diff(
+            f,
+            (b"file.txt", 0o100644, old_blob),
+            (b"file.txt", 0o100644, new_blob),
+            diff_algorithm="patience",
+        )
+
+        diff = f.getvalue()
+        self.assertIn(b"diff --git", diff)
+        self.assertIn(b"-line2", diff)
+        self.assertIn(b"+line2 modified", diff)
+
+    def test_write_object_diff_with_patience(self) -> None:
+        """Test write_object_diff with patience algorithm if available."""
+        f = BytesIO()
+        store = MemoryObjectStore()
+
+        old_blob = Blob()
+        old_blob.data = b"line1\nline2\nline3\n"
+        store.add_object(old_blob)
+
+        new_blob = Blob()
+        new_blob.data = b"line1\nline2 modified\nline3\n"
+        store.add_object(new_blob)
+
+        write_object_diff(
+            f,
+            store,
+            (b"file.txt", 0o100644, old_blob.id),
+            (b"file.txt", 0o100644, new_blob.id),
+            diff_algorithm="patience",
+        )
+
+        diff = f.getvalue()
+        self.assertIn(b"diff --git", diff)
+        self.assertIn(b"-line2", diff)
+        self.assertIn(b"+line2 modified", diff)

Algunos archivos no se mostraron porque demasiados archivos cambiaron en este cambio