Jelmer Vernooij 4 месяцев назад
Родитель
Сommit
f619c2222f

Разница между файлами не показана из-за своего большого размера
+ 204 - 167
dulwich/cli.py


+ 8 - 8
dulwich/cloud/gcs.py

@@ -80,17 +80,17 @@ class GcsObjectStore(BucketBasedObjectStore):
 
 
         from ..file import _GitFile
         from ..file import _GitFile
 
 
-        f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE)
-        b.download_to_file(f)
-        f.seek(0)
-        return PackData(name + ".pack", cast(_GitFile, f))
+        with tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE) as f:
+            b.download_to_file(f)
+            f.seek(0)
+            return PackData(name + ".pack", cast(_GitFile, f))
 
 
     def _load_pack_index(self, name: str) -> PackIndex:
     def _load_pack_index(self, name: str) -> PackIndex:
         b = self.bucket.blob(posixpath.join(self.subpath, name + ".idx"))
         b = self.bucket.blob(posixpath.join(self.subpath, name + ".idx"))
-        f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE)
-        b.download_to_file(f)
-        f.seek(0)
-        return load_pack_index_file(name + ".idx", f)
+        with tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE) as f:
+            b.download_to_file(f)
+            f.seek(0)
+            return load_pack_index_file(name + ".idx", f)
 
 
     def _get_pack(self, name: str) -> Pack:
     def _get_pack(self, name: str) -> Pack:
         return Pack.from_lazy_objects(  # type: ignore[no-untyped-call]
         return Pack.from_lazy_objects(  # type: ignore[no-untyped-call]

+ 12 - 6
dulwich/gc.py

@@ -1,6 +1,7 @@
 """Git garbage collection implementation."""
 """Git garbage collection implementation."""
 
 
 import collections
 import collections
+import logging
 import os
 import os
 import time
 import time
 from dataclasses import dataclass, field
 from dataclasses import dataclass, field
@@ -292,10 +293,10 @@ def garbage_collect(
     if not dry_run:
     if not dry_run:
         if prune and unreachable_to_prune:
         if prune and unreachable_to_prune:
             # Repack excluding unreachable objects
             # Repack excluding unreachable objects
-            object_store.repack(exclude=unreachable_to_prune)
+            object_store.repack(exclude=unreachable_to_prune, progress=progress)
         else:
         else:
             # Normal repack
             # Normal repack
-            object_store.repack()
+            object_store.repack(progress=progress)
 
 
     # Prune orphaned temporary files
     # Prune orphaned temporary files
     if progress:
     if progress:
@@ -367,12 +368,15 @@ def should_run_gc(repo: "BaseRepo", config: Optional["Config"] = None) -> bool:
     return False
     return False
 
 
 
 
-def maybe_auto_gc(repo: "Repo", config: Optional["Config"] = None) -> bool:
+def maybe_auto_gc(
+    repo: "Repo", config: Optional["Config"] = None, progress: Optional[Callable] = None
+) -> bool:
     """Run automatic garbage collection if needed.
     """Run automatic garbage collection if needed.
 
 
     Args:
     Args:
         repo: Repository to potentially GC
         repo: Repository to potentially GC
         config: Configuration to use (defaults to repo config)
         config: Configuration to use (defaults to repo config)
+        progress: Optional progress reporting callback
 
 
     Returns:
     Returns:
         True if GC was run, False otherwise
         True if GC was run, False otherwise
@@ -383,7 +387,7 @@ def maybe_auto_gc(repo: "Repo", config: Optional["Config"] = None) -> bool:
     # Check for gc.log file - only for disk-based repos
     # Check for gc.log file - only for disk-based repos
     if not hasattr(repo, "controldir"):
     if not hasattr(repo, "controldir"):
         # For non-disk repos, just run GC without gc.log handling
         # For non-disk repos, just run GC without gc.log handling
-        garbage_collect(repo, auto=True)
+        garbage_collect(repo, auto=True, progress=progress)
         return True
         return True
 
 
     gc_log_path = os.path.join(repo.controldir(), "gc.log")
     gc_log_path = os.path.join(repo.controldir(), "gc.log")
@@ -409,7 +413,9 @@ def maybe_auto_gc(repo: "Repo", config: Optional["Config"] = None) -> bool:
         if time.time() - stat_info.st_mtime < expiry_seconds:
         if time.time() - stat_info.st_mtime < expiry_seconds:
             # gc.log exists and is not expired - skip GC
             # gc.log exists and is not expired - skip GC
             with open(gc_log_path, "rb") as f:
             with open(gc_log_path, "rb") as f:
-                print(f.read().decode("utf-8", errors="replace"))
+                logging.info(
+                    "gc.log content: %s", f.read().decode("utf-8", errors="replace")
+                )
             return False
             return False
 
 
     # TODO: Support gc.autoDetach to run in background
     # TODO: Support gc.autoDetach to run in background
@@ -417,7 +423,7 @@ def maybe_auto_gc(repo: "Repo", config: Optional["Config"] = None) -> bool:
 
 
     try:
     try:
         # Run GC with auto=True flag
         # Run GC with auto=True flag
-        garbage_collect(repo, auto=True)
+        garbage_collect(repo, auto=True, progress=progress)
 
 
         # Remove gc.log on successful completion
         # Remove gc.log on successful completion
         if os.path.exists(gc_log_path):
         if os.path.exists(gc_log_path):

+ 3 - 1
dulwich/lfs.py

@@ -43,6 +43,8 @@ from typing import TYPE_CHECKING, BinaryIO, Optional, Union
 from urllib.parse import urljoin, urlparse
 from urllib.parse import urljoin, urlparse
 from urllib.request import Request, urlopen
 from urllib.request import Request, urlopen
 
 
+logger = logging.getLogger(__name__)
+
 if TYPE_CHECKING:
 if TYPE_CHECKING:
     import urllib3
     import urllib3
 
 
@@ -272,7 +274,7 @@ class LFSFilterDriver:
                 return content
                 return content
             except LFSError as e:
             except LFSError as e:
                 # Download failed, fall back to returning pointer
                 # Download failed, fall back to returning pointer
-                logging.warning("LFS object download failed for %s: %s", pointer.oid, e)
+                logger.warning("LFS object download failed for %s: %s", pointer.oid, e)
 
 
                 # Return pointer as-is when object is missing and download failed
                 # Return pointer as-is when object is missing and download failed
                 return data
                 return data

+ 14 - 5
dulwich/object_store.py

@@ -769,9 +769,12 @@ class PackBasedObjectStore(BaseObjectStore, PackedObjectContainer):
     def _remove_pack(self, pack: "Pack") -> None:
     def _remove_pack(self, pack: "Pack") -> None:
         raise NotImplementedError(self._remove_pack)
         raise NotImplementedError(self._remove_pack)
 
 
-    def pack_loose_objects(self) -> int:
+    def pack_loose_objects(self, progress: Optional[Callable] = None) -> int:
         """Pack loose objects.
         """Pack loose objects.
 
 
+        Args:
+          progress: Optional progress reporting callback
+
         Returns: Number of objects packed
         Returns: Number of objects packed
         """
         """
         objects: list[tuple[ShaFile, None]] = []
         objects: list[tuple[ShaFile, None]] = []
@@ -779,12 +782,14 @@ class PackBasedObjectStore(BaseObjectStore, PackedObjectContainer):
             obj = self._get_loose_object(sha)
             obj = self._get_loose_object(sha)
             if obj is not None:
             if obj is not None:
                 objects.append((obj, None))
                 objects.append((obj, None))
-        self.add_objects(objects)
+        self.add_objects(objects, progress=progress)
         for obj, path in objects:
         for obj, path in objects:
             self.delete_loose_object(obj.id)
             self.delete_loose_object(obj.id)
         return len(objects)
         return len(objects)
 
 
-    def repack(self, exclude: Optional[set] = None) -> int:
+    def repack(
+        self, exclude: Optional[set] = None, progress: Optional[Callable] = None
+    ) -> int:
         """Repack the packs in this repository.
         """Repack the packs in this repository.
 
 
         Note that this implementation is fairly naive and currently keeps all
         Note that this implementation is fairly naive and currently keeps all
@@ -792,6 +797,7 @@ class PackBasedObjectStore(BaseObjectStore, PackedObjectContainer):
 
 
         Args:
         Args:
           exclude: Optional set of object SHAs to exclude from repacking
           exclude: Optional set of object SHAs to exclude from repacking
+          progress: Optional progress reporting callback
         """
         """
         if exclude is None:
         if exclude is None:
             exclude = set()
             exclude = set()
@@ -818,7 +824,7 @@ class PackBasedObjectStore(BaseObjectStore, PackedObjectContainer):
             # The name of the consolidated pack might match the name of a
             # The name of the consolidated pack might match the name of a
             # pre-existing pack. Take care not to remove the newly created
             # pre-existing pack. Take care not to remove the newly created
             # consolidated pack.
             # consolidated pack.
-            consolidated = self.add_objects(list(objects))
+            consolidated = self.add_objects(list(objects), progress=progress)
             if consolidated is not None:
             if consolidated is not None:
                 old_packs.pop(consolidated.name(), None)
                 old_packs.pop(consolidated.name(), None)
 
 
@@ -2507,10 +2513,13 @@ class BucketBasedObjectStore(PackBasedObjectStore):
         """
         """
         # Doesn't exist..
         # Doesn't exist..
 
 
-    def pack_loose_objects(self) -> int:
+    def pack_loose_objects(self, progress: Optional[Callable] = None) -> int:
         """Pack loose objects. Returns number of objects packed.
         """Pack loose objects. Returns number of objects packed.
 
 
         BucketBasedObjectStore doesn't support loose objects, so this is a no-op.
         BucketBasedObjectStore doesn't support loose objects, so this is a no-op.
+
+        Args:
+          progress: Optional progress reporting callback (ignored)
         """
         """
         return 0
         return 0
 
 

+ 16 - 4
tests/compat/test_dumb.py

@@ -21,6 +21,7 @@
 
 
 """Compatibility tests for dumb HTTP git repositories."""
 """Compatibility tests for dumb HTTP git repositories."""
 
 
+import io
 import os
 import os
 import sys
 import sys
 import tempfile
 import tempfile
@@ -38,6 +39,10 @@ from tests.compat.utils import (
 )
 )
 
 
 
 
+def no_op_progress(msg):
+    """Progress callback that does nothing."""
+
+
 class DumbHTTPRequestHandler(SimpleHTTPRequestHandler):
 class DumbHTTPRequestHandler(SimpleHTTPRequestHandler):
     """HTTP request handler for dumb git protocol."""
     """HTTP request handler for dumb git protocol."""
 
 
@@ -163,7 +168,8 @@ class DumbHTTPClientNoPackTests(CompatTestCase):
     )
     )
     def test_clone_dumb(self):
     def test_clone_dumb(self):
         dest_path = os.path.join(self.temp_dir, "cloned")
         dest_path = os.path.join(self.temp_dir, "cloned")
-        repo = clone(self.server.url, dest_path)
+        # Use a dummy errstream to suppress progress output
+        repo = clone(self.server.url, dest_path, errstream=io.BytesIO())
         assert b"HEAD" in repo
         assert b"HEAD" in repo
 
 
     def test_clone_from_dumb_http(self):
     def test_clone_from_dumb_http(self):
@@ -183,7 +189,9 @@ class DumbHTTPClientNoPackTests(CompatTestCase):
                     sha for ref, sha in refs.items() if ref.startswith(b"refs/heads/")
                     sha for ref, sha in refs.items() if ref.startswith(b"refs/heads/")
                 ]
                 ]
 
 
-            result = client.fetch("/", dest_repo, determine_wants=determine_wants)
+            result = client.fetch(
+                "/", dest_repo, determine_wants=determine_wants, progress=no_op_progress
+            )
 
 
             # Update refs
             # Update refs
             for ref, sha in result.refs.items():
             for ref, sha in result.refs.items():
@@ -237,7 +245,9 @@ class DumbHTTPClientNoPackTests(CompatTestCase):
                         wants.append(sha)
                         wants.append(sha)
                 return wants
                 return wants
 
 
-            result = client.fetch("/", dest_repo, determine_wants=determine_wants)
+            result = client.fetch(
+                "/", dest_repo, determine_wants=determine_wants, progress=no_op_progress
+            )
 
 
             # Update refs
             # Update refs
             for ref, sha in result.refs.items():
             for ref, sha in result.refs.items():
@@ -282,7 +292,9 @@ class DumbHTTPClientNoPackTests(CompatTestCase):
                     if ref.startswith((b"refs/heads/", b"refs/tags/"))
                     if ref.startswith((b"refs/heads/", b"refs/tags/"))
                 ]
                 ]
 
 
-            result = client.fetch("/", dest_repo, determine_wants=determine_wants)
+            result = client.fetch(
+                "/", dest_repo, determine_wants=determine_wants, progress=no_op_progress
+            )
 
 
             # Update refs
             # Update refs
             for ref, sha in result.refs.items():
             for ref, sha in result.refs.items():

+ 14 - 3
tests/compat/test_server.py

@@ -58,9 +58,20 @@ class GitServerTestCase(ServerTests, CompatTestCase):
         backend = DictBackend({b"/": repo})
         backend = DictBackend({b"/": repo})
         dul_server = TCPGitServer(backend, b"localhost", 0, handlers=self._handlers())
         dul_server = TCPGitServer(backend, b"localhost", 0, handlers=self._handlers())
         self._check_server(dul_server)
         self._check_server(dul_server)
-        self.addCleanup(dul_server.shutdown)
-        self.addCleanup(dul_server.server_close)
-        threading.Thread(target=dul_server.serve).start()
+
+        # Start server in a thread
+        server_thread = threading.Thread(target=dul_server.serve)
+        server_thread.daemon = True  # Make thread daemon so it dies with main thread
+        server_thread.start()
+
+        # Add cleanup in the correct order
+        def cleanup_server():
+            dul_server.shutdown()
+            dul_server.server_close()
+            # Give thread a moment to exit cleanly
+            server_thread.join(timeout=1.0)
+
+        self.addCleanup(cleanup_server)
         self._server = dul_server
         self._server = dul_server
         _, port = self._server.socket.getsockname()
         _, port = self._server.socket.getsockname()
         return port
         return port

+ 153 - 147
tests/test_cli.py

@@ -81,6 +81,7 @@ class DulwichCliTestCase(TestCase):
         old_stdout = sys.stdout
         old_stdout = sys.stdout
         old_stderr = sys.stderr
         old_stderr = sys.stderr
         old_cwd = os.getcwd()
         old_cwd = os.getcwd()
+
         try:
         try:
             # Use custom stdout_stream if provided, otherwise use MockStream
             # Use custom stdout_stream if provided, otherwise use MockStream
             if stdout_stream:
             if stdout_stream:
@@ -691,13 +692,15 @@ class FilterBranchCommandTest(DulwichCliTestCase):
     def test_filter_branch_subdirectory_filter(self):
     def test_filter_branch_subdirectory_filter(self):
         """Test filter-branch with subdirectory filter."""
         """Test filter-branch with subdirectory filter."""
         # Run filter-branch to extract only the subdir
         # Run filter-branch to extract only the subdir
-        result, stdout, stderr = self._run_cli(
-            "filter-branch", "--subdirectory-filter", "subdir"
-        )
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "filter-branch", "--subdirectory-filter", "subdir"
+            )
 
 
-        # Check that the operation succeeded
-        self.assertEqual(result, 0)
-        self.assertIn("Rewrite HEAD", stdout)
+            # Check that the operation succeeded
+            self.assertEqual(result, 0)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Rewrite HEAD", log_output)
 
 
         # filter-branch rewrites history but doesn't update working tree
         # filter-branch rewrites history but doesn't update working tree
         # We need to check the commit contents, not the working tree
         # We need to check the commit contents, not the working tree
@@ -786,12 +789,14 @@ class FilterBranchCommandTest(DulwichCliTestCase):
         self.assertTrue(len(original_refs) > 0, "No original refs found")
         self.assertTrue(len(original_refs) > 0, "No original refs found")
 
 
         # Run again without force - should fail
         # Run again without force - should fail
-        result, stdout, stderr = self._run_cli(
-            "filter-branch", "--msg-filter", "sed 's/^/[TEST2] /'"
-        )
-        self.assertEqual(result, 1)
-        self.assertIn("Cannot create a new backup", stdout)
-        self.assertIn("refs/original", stdout)
+        with self.assertLogs("dulwich.cli", level="ERROR") as cm:
+            result, stdout, stderr = self._run_cli(
+                "filter-branch", "--msg-filter", "sed 's/^/[TEST2] /'"
+            )
+            self.assertEqual(result, 1)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Cannot create a new backup", log_output)
+            self.assertIn("refs/original", log_output)
 
 
         # Run with force - should succeed
         # Run with force - should succeed
         result, stdout, stderr = self._run_cli(
         result, stdout, stderr = self._run_cli(
@@ -810,12 +815,14 @@ class FilterBranchCommandTest(DulwichCliTestCase):
         self._run_cli("commit", "--message=Branch commit")
         self._run_cli("commit", "--message=Branch commit")
 
 
         # Run filter-branch on the test-branch
         # Run filter-branch on the test-branch
-        result, stdout, stderr = self._run_cli(
-            "filter-branch", "--msg-filter", "sed 's/^/[BRANCH] /'", "test-branch"
-        )
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "filter-branch", "--msg-filter", "sed 's/^/[BRANCH] /'", "test-branch"
+            )
 
 
-        self.assertEqual(result, 0)
-        self.assertIn("Ref 'refs/heads/test-branch' was rewritten", stdout)
+            self.assertEqual(result, 0)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Ref 'refs/heads/test-branch' was rewritten", log_output)
 
 
         # Check that only test-branch was modified
         # Check that only test-branch was modified
         result, stdout, stderr = self._run_cli("log")
         result, stdout, stderr = self._run_cli("log")
@@ -962,9 +969,11 @@ class FormatPatchCommandTest(DulwichCliTestCase):
         )
         )
 
 
         # Test format-patch for last commit
         # Test format-patch for last commit
-        result, stdout, stderr = self._run_cli("format-patch", "-n", "1")
-        self.assertEqual(result, None)
-        self.assertIn("0001-Add-hello.txt.patch", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("format-patch", "-n", "1")
+            self.assertEqual(result, None)
+            log_output = "\n".join(cm.output)
+            self.assertIn("0001-Add-hello.txt.patch", log_output)
 
 
         # Check patch contents
         # Check patch contents
         patch_file = os.path.join(self.repo_path, "0001-Add-hello.txt.patch")
         patch_file = os.path.join(self.repo_path, "0001-Add-hello.txt.patch")
@@ -1019,10 +1028,12 @@ class FormatPatchCommandTest(DulwichCliTestCase):
         )
         )
 
 
         # Test format-patch for last 2 commits
         # Test format-patch for last 2 commits
-        result, stdout, stderr = self._run_cli("format-patch", "-n", "2")
-        self.assertEqual(result, None)
-        self.assertIn("0001-Add-file1.txt.patch", stdout)
-        self.assertIn("0002-Add-file2.txt.patch", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("format-patch", "-n", "2")
+            self.assertEqual(result, None)
+            log_output = "\n".join(cm.output)
+            self.assertIn("0001-Add-file1.txt.patch", log_output)
+            self.assertIn("0002-Add-file2.txt.patch", log_output)
 
 
         # Check first patch
         # Check first patch
         with open(os.path.join(self.repo_path, "0001-Add-file1.txt.patch"), "rb") as f:
         with open(os.path.join(self.repo_path, "0001-Add-file1.txt.patch"), "rb") as f:
@@ -1110,14 +1121,16 @@ class FormatPatchCommandTest(DulwichCliTestCase):
             commits.append(c)
             commits.append(c)
 
 
         # Test format-patch with commit range (should get commits 2 and 3)
         # Test format-patch with commit range (should get commits 2 and 3)
-        result, stdout, stderr = self._run_cli(
-            "format-patch", f"{commits[1].decode()}..{commits[3].decode()}"
-        )
-        self.assertEqual(result, None)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "format-patch", f"{commits[1].decode()}..{commits[3].decode()}"
+            )
+            self.assertEqual(result, None)
 
 
-        # Should create patches for commits 2 and 3
-        self.assertIn("0001-Add-file2.txt.patch", stdout)
-        self.assertIn("0002-Add-file3.txt.patch", stdout)
+            # Should create patches for commits 2 and 3
+            log_output = "\n".join(cm.output)
+            self.assertIn("0001-Add-file2.txt.patch", log_output)
+            self.assertIn("0002-Add-file3.txt.patch", log_output)
 
 
         # Verify patch contents
         # Verify patch contents
         with open(os.path.join(self.repo_path, "0001-Add-file2.txt.patch"), "rb") as f:
         with open(os.path.join(self.repo_path, "0001-Add-file2.txt.patch"), "rb") as f:
@@ -1307,8 +1320,12 @@ class ForEachRefCommandTest(DulwichCliTestCase):
         self._run_cli("add", "test.txt")
         self._run_cli("add", "test.txt")
         self._run_cli("commit", "--message=Initial")
         self._run_cli("commit", "--message=Initial")
 
 
-        result, stdout, stderr = self._run_cli("for-each-ref")
-        self.assertIn("refs/heads/master", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("for-each-ref")
+            log_output = "\n".join(cm.output)
+            # Just check that we have some refs output and it contains refs/heads
+            self.assertTrue(len(cm.output) > 0, "Expected some ref output")
+            self.assertIn("refs/heads/", log_output)
 
 
 
 
 class PackRefsCommandTest(DulwichCliTestCase):
 class PackRefsCommandTest(DulwichCliTestCase):
@@ -1374,8 +1391,9 @@ class StashCommandTest(DulwichCliTestCase):
             f.write("modified")
             f.write("modified")
 
 
         # Stash changes
         # Stash changes
-        result, stdout, stderr = self._run_cli("stash", "push")
-        self.assertIn("Saved working directory", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("stash", "push")
+            self.assertIn("Saved working directory", cm.output[0])
 
 
         # Note: Dulwich stash doesn't currently update the working tree
         # Note: Dulwich stash doesn't currently update the working tree
         # so the file remains modified after stash push
         # so the file remains modified after stash push
@@ -1416,14 +1434,18 @@ class HelpCommandTest(DulwichCliTestCase):
     """Tests for help command."""
     """Tests for help command."""
 
 
     def test_help_basic(self):
     def test_help_basic(self):
-        result, stdout, stderr = self._run_cli("help")
-        self.assertIn("dulwich command line tool", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("help")
+            log_output = "\n".join(cm.output)
+            self.assertIn("dulwich command line tool", log_output)
 
 
     def test_help_all(self):
     def test_help_all(self):
-        result, stdout, stderr = self._run_cli("help", "-a")
-        self.assertIn("Available commands:", stdout)
-        self.assertIn("add", stdout)
-        self.assertIn("commit", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("help", "-a")
+            log_output = "\n".join(cm.output)
+            self.assertIn("Available commands:", log_output)
+            self.assertIn("add", log_output)
+            self.assertIn("commit", log_output)
 
 
 
 
 class RemoteCommandTest(DulwichCliTestCase):
 class RemoteCommandTest(DulwichCliTestCase):
@@ -1450,9 +1472,13 @@ class CheckIgnoreCommandTest(DulwichCliTestCase):
         with open(gitignore, "w") as f:
         with open(gitignore, "w") as f:
             f.write("*.log\n")
             f.write("*.log\n")
 
 
-        result, stdout, stderr = self._run_cli("check-ignore", "test.log", "test.txt")
-        self.assertIn("test.log", stdout)
-        self.assertNotIn("test.txt", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "check-ignore", "test.log", "test.txt"
+            )
+            log_output = "\n".join(cm.output)
+            self.assertIn("test.log", log_output)
+            self.assertNotIn("test.txt", log_output)
 
 
 
 
 class LsFilesCommandTest(DulwichCliTestCase):
 class LsFilesCommandTest(DulwichCliTestCase):
@@ -1466,10 +1492,12 @@ class LsFilesCommandTest(DulwichCliTestCase):
                 f.write(f"content of {name}")
                 f.write(f"content of {name}")
         self._run_cli("add", "a.txt", "b.txt", "c.txt")
         self._run_cli("add", "a.txt", "b.txt", "c.txt")
 
 
-        result, stdout, stderr = self._run_cli("ls-files")
-        self.assertIn("a.txt", stdout)
-        self.assertIn("b.txt", stdout)
-        self.assertIn("c.txt", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("ls-files")
+            log_output = "\n".join(cm.output)
+            self.assertIn("a.txt", log_output)
+            self.assertIn("b.txt", log_output)
+            self.assertIn("c.txt", log_output)
 
 
 
 
 class LsTreeCommandTest(DulwichCliTestCase):
 class LsTreeCommandTest(DulwichCliTestCase):
@@ -1515,8 +1543,9 @@ class DescribeCommandTest(DulwichCliTestCase):
         self._run_cli("commit", "--message=Initial")
         self._run_cli("commit", "--message=Initial")
         self._run_cli("tag", "v1.0")
         self._run_cli("tag", "v1.0")
 
 
-        result, stdout, stderr = self._run_cli("describe")
-        self.assertIn("v1.0", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("describe")
+            self.assertIn("v1.0", cm.output[0])
 
 
 
 
 class FsckCommandTest(DulwichCliTestCase):
 class FsckCommandTest(DulwichCliTestCase):
@@ -1706,9 +1735,10 @@ class BundleCommandTest(DulwichCliTestCase):
         """Test bundle creation with no refs specified."""
         """Test bundle creation with no refs specified."""
         bundle_file = os.path.join(self.test_dir, "noref.bundle")
         bundle_file = os.path.join(self.test_dir, "noref.bundle")
 
 
-        result, stdout, stderr = self._run_cli("bundle", "create", bundle_file)
-        self.assertEqual(result, 1)
-        self.assertIn("No refs specified", stdout)
+        with self.assertLogs("dulwich.cli", level="ERROR") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "create", bundle_file)
+            self.assertEqual(result, 1)
+            self.assertIn("No refs specified", cm.output[0])
 
 
     def test_bundle_create_empty_bundle_refused(self):
     def test_bundle_create_empty_bundle_refused(self):
         """Test that empty bundles are refused."""
         """Test that empty bundles are refused."""
@@ -1729,9 +1759,10 @@ class BundleCommandTest(DulwichCliTestCase):
         self.assertEqual(result, 0)
         self.assertEqual(result, 0)
 
 
         # Now verify it
         # Now verify it
-        result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
-        self.assertEqual(result, 0)
-        self.assertIn("valid and can be applied", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
+            self.assertEqual(result, 0)
+            self.assertIn("valid and can be applied", cm.output[0])
 
 
     def test_bundle_verify_quiet(self):
     def test_bundle_verify_quiet(self):
         """Test bundle verification with quiet flag."""
         """Test bundle verification with quiet flag."""
@@ -1776,10 +1807,11 @@ class BundleCommandTest(DulwichCliTestCase):
         self._run_cli("bundle", "create", bundle_file, "HEAD")
         self._run_cli("bundle", "create", bundle_file, "HEAD")
 
 
         # List heads
         # List heads
-        result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
-        self.assertEqual(result, 0)
-        # Should contain at least the HEAD reference
-        self.assertTrue(len(stdout.strip()) > 0)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
+            self.assertEqual(result, 0)
+            # Should contain at least the HEAD reference
+            self.assertTrue(len(cm.output) > 0)
 
 
     def test_bundle_list_heads_specific_refs(self):
     def test_bundle_list_heads_specific_refs(self):
         """Test listing specific bundle heads."""
         """Test listing specific bundle heads."""
@@ -1789,10 +1821,11 @@ class BundleCommandTest(DulwichCliTestCase):
         self._run_cli("bundle", "create", bundle_file, "HEAD")
         self._run_cli("bundle", "create", bundle_file, "HEAD")
 
 
         # List heads without filtering
         # List heads without filtering
-        result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
-        self.assertEqual(result, 0)
-        # Should contain some reference
-        self.assertTrue(len(stdout.strip()) > 0)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "list-heads", bundle_file)
+            self.assertEqual(result, 0)
+            # Should contain some reference
+            self.assertTrue(len(cm.output) > 0)
 
 
     def test_bundle_list_heads_from_stdin(self):
     def test_bundle_list_heads_from_stdin(self):
         """Test listing bundle heads from stdin."""
         """Test listing bundle heads from stdin."""
@@ -1918,15 +1951,17 @@ class BundleCommandTest(DulwichCliTestCase):
 
 
     def test_bundle_invalid_subcommand(self):
     def test_bundle_invalid_subcommand(self):
         """Test invalid bundle subcommand."""
         """Test invalid bundle subcommand."""
-        result, stdout, stderr = self._run_cli("bundle", "invalid-command")
-        self.assertEqual(result, 1)
-        self.assertIn("Unknown bundle subcommand", stdout)
+        with self.assertLogs("dulwich.cli", level="ERROR") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "invalid-command")
+            self.assertEqual(result, 1)
+            self.assertIn("Unknown bundle subcommand", cm.output[0])
 
 
     def test_bundle_no_subcommand(self):
     def test_bundle_no_subcommand(self):
         """Test bundle command with no subcommand."""
         """Test bundle command with no subcommand."""
-        result, stdout, stderr = self._run_cli("bundle")
-        self.assertEqual(result, 1)
-        self.assertIn("Usage: bundle", stdout)
+        with self.assertLogs("dulwich.cli", level="ERROR") as cm:
+            result, stdout, stderr = self._run_cli("bundle")
+            self.assertEqual(result, 1)
+            self.assertIn("Usage: bundle", cm.output[0])
 
 
     def test_bundle_create_with_stdin_refs(self):
     def test_bundle_create_with_stdin_refs(self):
         """Test bundle creation reading refs from stdin."""
         """Test bundle creation reading refs from stdin."""
@@ -2003,9 +2038,10 @@ class BundleCommandTest(DulwichCliTestCase):
         self.assertTrue(os.path.exists(bundle_file))
         self.assertTrue(os.path.exists(bundle_file))
 
 
         # Verify the bundle was created
         # Verify the bundle was created
-        result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
-        self.assertEqual(result, 0)
-        self.assertIn("valid and can be applied", stdout)
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("bundle", "verify", bundle_file)
+            self.assertEqual(result, 0)
+            self.assertIn("valid and can be applied", cm.output[0])
 
 
 
 
 class FormatBytesTestCase(TestCase):
 class FormatBytesTestCase(TestCase):
@@ -2355,19 +2391,14 @@ class WorktreeCliTests(DulwichCliTestCase):
         """Test worktree add command."""
         """Test worktree add command."""
         wt_path = os.path.join(self.test_dir, "worktree1")
         wt_path = os.path.join(self.test_dir, "worktree1")
 
 
-        # Change to repo directory like real usage
-        old_cwd = os.getcwd()
-        os.chdir(self.repo_path)
-        try:
-            cmd = cli.cmd_worktree()
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["add", wt_path, "feature"])
-
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "worktree", "add", wt_path, "feature"
+            )
             self.assertEqual(result, 0)
             self.assertEqual(result, 0)
             self.assertTrue(os.path.exists(wt_path))
             self.assertTrue(os.path.exists(wt_path))
-            self.assertIn("Worktree added:", mock_stdout.getvalue())
-        finally:
-            os.chdir(old_cwd)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Worktree added:", log_output)
 
 
     def test_worktree_add_detached(self):
     def test_worktree_add_detached(self):
         """Test worktree add with detached HEAD."""
         """Test worktree add with detached HEAD."""
@@ -2390,98 +2421,73 @@ class WorktreeCliTests(DulwichCliTestCase):
         """Test worktree remove command."""
         """Test worktree remove command."""
         # First add a worktree
         # First add a worktree
         wt_path = os.path.join(self.test_dir, "to-remove")
         wt_path = os.path.join(self.test_dir, "to-remove")
+        result, stdout, stderr = self._run_cli("worktree", "add", wt_path)
+        self.assertEqual(result, 0)
 
 
-        # Change to repo directory
-        old_cwd = os.getcwd()
-        os.chdir(self.repo_path)
-        try:
-            cmd = cli.cmd_worktree()
-            cmd.run(["add", wt_path])
-
-            # Then remove it
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["remove", wt_path])
-
+        # Then remove it
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("worktree", "remove", wt_path)
             self.assertEqual(result, 0)
             self.assertEqual(result, 0)
             self.assertFalse(os.path.exists(wt_path))
             self.assertFalse(os.path.exists(wt_path))
-            self.assertIn("Worktree removed:", mock_stdout.getvalue())
-        finally:
-            os.chdir(old_cwd)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Worktree removed:", log_output)
 
 
     def test_worktree_prune(self):
     def test_worktree_prune(self):
         """Test worktree prune command."""
         """Test worktree prune command."""
         # Add a worktree and manually remove it
         # Add a worktree and manually remove it
         wt_path = os.path.join(self.test_dir, "to-prune")
         wt_path = os.path.join(self.test_dir, "to-prune")
+        result, stdout, stderr = self._run_cli("worktree", "add", wt_path)
+        self.assertEqual(result, 0)
+        shutil.rmtree(wt_path)
 
 
-        # Change to repo directory
-        old_cwd = os.getcwd()
-        os.chdir(self.repo_path)
-        try:
-            cmd = cli.cmd_worktree()
-            cmd.run(["add", wt_path])
-            shutil.rmtree(wt_path)
-
-            # Prune
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["prune", "-v"])
-
+        # Prune
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("worktree", "prune", "-v")
             self.assertEqual(result, 0)
             self.assertEqual(result, 0)
-            output = mock_stdout.getvalue()
-            self.assertIn("to-prune", output)
-        finally:
-            os.chdir(old_cwd)
+            log_output = "\n".join(cm.output)
+            self.assertIn("to-prune", log_output)
 
 
     def test_worktree_lock_unlock(self):
     def test_worktree_lock_unlock(self):
         """Test worktree lock and unlock commands."""
         """Test worktree lock and unlock commands."""
         # Add a worktree
         # Add a worktree
         wt_path = os.path.join(self.test_dir, "lockable")
         wt_path = os.path.join(self.test_dir, "lockable")
+        result, stdout, stderr = self._run_cli("worktree", "add", wt_path)
+        self.assertEqual(result, 0)
 
 
-        # Change to repo directory
-        old_cwd = os.getcwd()
-        os.chdir(self.repo_path)
-        try:
-            cmd = cli.cmd_worktree()
-            cmd.run(["add", wt_path])
-
-            # Lock it
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["lock", wt_path, "--reason", "Testing"])
-
+        # Lock it
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "worktree", "lock", wt_path, "--reason", "Testing"
+            )
             self.assertEqual(result, 0)
             self.assertEqual(result, 0)
-            self.assertIn("Worktree locked:", mock_stdout.getvalue())
-
-            # Unlock it
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["unlock", wt_path])
+            log_output = "\n".join(cm.output)
+            self.assertIn("Worktree locked:", log_output)
 
 
+        # Unlock it
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli("worktree", "unlock", wt_path)
             self.assertEqual(result, 0)
             self.assertEqual(result, 0)
-            self.assertIn("Worktree unlocked:", mock_stdout.getvalue())
-        finally:
-            os.chdir(old_cwd)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Worktree unlocked:", log_output)
 
 
     def test_worktree_move(self):
     def test_worktree_move(self):
         """Test worktree move command."""
         """Test worktree move command."""
         # Add a worktree
         # Add a worktree
         old_path = os.path.join(self.test_dir, "old-location")
         old_path = os.path.join(self.test_dir, "old-location")
         new_path = os.path.join(self.test_dir, "new-location")
         new_path = os.path.join(self.test_dir, "new-location")
+        result, stdout, stderr = self._run_cli("worktree", "add", old_path)
+        self.assertEqual(result, 0)
 
 
-        # Change to repo directory
-        old_cwd = os.getcwd()
-        os.chdir(self.repo_path)
-        try:
-            cmd = cli.cmd_worktree()
-            cmd.run(["add", old_path])
-
-            # Move it
-            with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
-                result = cmd.run(["move", old_path, new_path])
-
+        # Move it
+        with self.assertLogs("dulwich.cli", level="INFO") as cm:
+            result, stdout, stderr = self._run_cli(
+                "worktree", "move", old_path, new_path
+            )
             self.assertEqual(result, 0)
             self.assertEqual(result, 0)
             self.assertFalse(os.path.exists(old_path))
             self.assertFalse(os.path.exists(old_path))
             self.assertTrue(os.path.exists(new_path))
             self.assertTrue(os.path.exists(new_path))
-            self.assertIn("Worktree moved:", mock_stdout.getvalue())
-        finally:
-            os.chdir(old_cwd)
+            log_output = "\n".join(cm.output)
+            self.assertIn("Worktree moved:", log_output)
 
 
     def test_worktree_invalid_command(self):
     def test_worktree_invalid_command(self):
         """Test invalid worktree subcommand."""
         """Test invalid worktree subcommand."""

+ 20 - 22
tests/test_cli_merge.py

@@ -21,11 +21,9 @@
 
 
 """Tests for dulwich merge CLI command."""
 """Tests for dulwich merge CLI command."""
 
 
-import io
 import os
 import os
 import tempfile
 import tempfile
 import unittest
 import unittest
-from unittest.mock import patch
 
 
 from dulwich import porcelain
 from dulwich import porcelain
 from dulwich.cli import main
 from dulwich.cli import main
@@ -65,12 +63,12 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             old_cwd = os.getcwd()
             try:
             try:
                 os.chdir(tmpdir)
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="INFO") as cm:
                     ret = main(["merge", "feature"])
                     ret = main(["merge", "feature"])
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
 
                 self.assertEqual(ret, 0)  # Success
                 self.assertEqual(ret, 0)  # Success
-                self.assertIn("Merge successful", output)
+                self.assertIn("Merge successful", log_output)
 
 
                 # Check that file2.txt exists
                 # Check that file2.txt exists
                 self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
                 self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
@@ -108,13 +106,13 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             old_cwd = os.getcwd()
             try:
             try:
                 os.chdir(tmpdir)
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="WARNING") as cm:
                     retcode = main(["merge", "feature"])
                     retcode = main(["merge", "feature"])
                     self.assertEqual(retcode, 1)
                     self.assertEqual(retcode, 1)
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
 
-                self.assertIn("Merge conflicts", output)
-                self.assertIn("file1.txt", output)
+                self.assertIn("Merge conflicts", log_output)
+                self.assertIn("file1.txt", log_output)
             finally:
             finally:
                 os.chdir(old_cwd)
                 os.chdir(old_cwd)
 
 
@@ -134,12 +132,12 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             old_cwd = os.getcwd()
             try:
             try:
                 os.chdir(tmpdir)
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="INFO") as cm:
                     ret = main(["merge", "HEAD"])
                     ret = main(["merge", "HEAD"])
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
 
                 self.assertEqual(ret, 0)  # Success
                 self.assertEqual(ret, 0)  # Success
-                self.assertIn("Already up to date", output)
+                self.assertIn("Already up to date", log_output)
             finally:
             finally:
                 os.chdir(old_cwd)
                 os.chdir(old_cwd)
 
 
@@ -176,12 +174,12 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             old_cwd = os.getcwd()
             try:
             try:
                 os.chdir(tmpdir)
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="INFO") as cm:
                     ret = main(["merge", "--no-commit", "feature"])
                     ret = main(["merge", "--no-commit", "feature"])
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
 
                 self.assertEqual(ret, 0)  # Success
                 self.assertEqual(ret, 0)  # Success
-                self.assertIn("not committing", output)
+                self.assertIn("not committing", log_output)
 
 
                 # Check that files are merged
                 # Check that files are merged
                 self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
                 self.assertTrue(os.path.exists(os.path.join(tmpdir, "file2.txt")))
@@ -218,13 +216,13 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             old_cwd = os.getcwd()
             try:
             try:
                 os.chdir(tmpdir)
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="INFO") as cm:
                     ret = main(["merge", "--no-ff", "feature"])
                     ret = main(["merge", "--no-ff", "feature"])
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
 
                 self.assertEqual(ret, 0)  # Success
                 self.assertEqual(ret, 0)  # Success
-                self.assertIn("Merge successful", output)
-                self.assertIn("Created merge commit", output)
+                self.assertIn("Merge successful", log_output)
+                self.assertIn("Created merge commit", log_output)
             finally:
             finally:
                 os.chdir(old_cwd)
                 os.chdir(old_cwd)
 
 
@@ -261,12 +259,12 @@ class CLIMergeTests(TestCase):
             old_cwd = os.getcwd()
             old_cwd = os.getcwd()
             try:
             try:
                 os.chdir(tmpdir)
                 os.chdir(tmpdir)
-                with patch("sys.stdout", new_callable=io.StringIO) as mock_stdout:
+                with self.assertLogs("dulwich.cli", level="INFO") as cm:
                     ret = main(["merge", "-m", "Custom merge message", "feature"])
                     ret = main(["merge", "-m", "Custom merge message", "feature"])
-                    output = mock_stdout.getvalue()
+                    log_output = "\n".join(cm.output)
 
 
                 self.assertEqual(ret, 0)  # Success
                 self.assertEqual(ret, 0)  # Success
-                self.assertIn("Merge successful", output)
+                self.assertIn("Merge successful", log_output)
             finally:
             finally:
                 os.chdir(old_cwd)
                 os.chdir(old_cwd)
 
 

+ 18 - 12
tests/test_commit_graph.py

@@ -119,18 +119,22 @@ class CommitGraphTests(unittest.TestCase):
     def test_from_invalid_signature(self) -> None:
     def test_from_invalid_signature(self) -> None:
         data = b"XXXX" + b"\\x00" * 100
         data = b"XXXX" + b"\\x00" * 100
         f = io.BytesIO(data)
         f = io.BytesIO(data)
-
-        with self.assertRaises(ValueError) as cm:
-            CommitGraph.from_file(f)
-        self.assertIn("Invalid commit graph signature", str(cm.exception))
+        try:
+            with self.assertRaises(ValueError) as cm:
+                CommitGraph.from_file(f)
+            self.assertIn("Invalid commit graph signature", str(cm.exception))
+        finally:
+            f.close()
 
 
     def test_from_invalid_version(self) -> None:
     def test_from_invalid_version(self) -> None:
         data = COMMIT_GRAPH_SIGNATURE + struct.pack(">B", 99) + b"\\x00" * 100
         data = COMMIT_GRAPH_SIGNATURE + struct.pack(">B", 99) + b"\\x00" * 100
         f = io.BytesIO(data)
         f = io.BytesIO(data)
-
-        with self.assertRaises(ValueError) as cm:
-            CommitGraph.from_file(f)
-        self.assertIn("Unsupported commit graph version", str(cm.exception))
+        try:
+            with self.assertRaises(ValueError) as cm:
+                CommitGraph.from_file(f)
+            self.assertIn("Unsupported commit graph version", str(cm.exception))
+        finally:
+            f.close()
 
 
     def test_from_invalid_hash_version(self) -> None:
     def test_from_invalid_hash_version(self) -> None:
         data = (
         data = (
@@ -140,10 +144,12 @@ class CommitGraphTests(unittest.TestCase):
             + b"\\x00" * 100
             + b"\\x00" * 100
         )
         )
         f = io.BytesIO(data)
         f = io.BytesIO(data)
-
-        with self.assertRaises(ValueError) as cm:
-            CommitGraph.from_file(f)
-        self.assertIn("Unsupported hash version", str(cm.exception))
+        try:
+            with self.assertRaises(ValueError) as cm:
+                CommitGraph.from_file(f)
+            self.assertIn("Unsupported hash version", str(cm.exception))
+        finally:
+            f.close()
 
 
     def create_minimal_commit_graph_data(self) -> bytes:
     def create_minimal_commit_graph_data(self) -> bytes:
         """Create minimal valid commit graph data for testing."""
         """Create minimal valid commit graph data for testing."""

+ 59 - 28
tests/test_gc.py

@@ -21,6 +21,10 @@ from dulwich.objects import Blob, Commit, Tag, Tree
 from dulwich.repo import MemoryRepo, Repo
 from dulwich.repo import MemoryRepo, Repo
 
 
 
 
+def no_op_progress(msg):
+    """Progress callback that does nothing."""
+
+
 class GCTestCase(TestCase):
 class GCTestCase(TestCase):
     """Tests for garbage collection functionality."""
     """Tests for garbage collection functionality."""
 
 
@@ -159,7 +163,9 @@ class GCTestCase(TestCase):
         self.repo.object_store.add_object(unreachable_blob)
         self.repo.object_store.add_object(unreachable_blob)
 
 
         # Run garbage collection (grace_period=None means no grace period check)
         # Run garbage collection (grace_period=None means no grace period check)
-        stats = garbage_collect(self.repo, prune=True, grace_period=None)
+        stats = garbage_collect(
+            self.repo, prune=True, grace_period=None, progress=no_op_progress
+        )
 
 
         # Check results
         # Check results
         self.assertIsInstance(stats, GCStats)
         self.assertIsInstance(stats, GCStats)
@@ -180,7 +186,7 @@ class GCTestCase(TestCase):
         self.repo.object_store.add_object(unreachable_blob)
         self.repo.object_store.add_object(unreachable_blob)
 
 
         # Run garbage collection without pruning
         # Run garbage collection without pruning
-        stats = garbage_collect(self.repo, prune=False)
+        stats = garbage_collect(self.repo, prune=False, progress=no_op_progress)
 
 
         # Check that nothing was pruned
         # Check that nothing was pruned
         self.assertEqual(set(), stats.pruned_objects)
         self.assertEqual(set(), stats.pruned_objects)
@@ -194,7 +200,13 @@ class GCTestCase(TestCase):
         self.repo.object_store.add_object(unreachable_blob)
         self.repo.object_store.add_object(unreachable_blob)
 
 
         # Run garbage collection with dry run (grace_period=None means no grace period check)
         # Run garbage collection with dry run (grace_period=None means no grace period check)
-        stats = garbage_collect(self.repo, prune=True, grace_period=None, dry_run=True)
+        stats = garbage_collect(
+            self.repo,
+            prune=True,
+            grace_period=None,
+            dry_run=True,
+            progress=no_op_progress,
+        )
 
 
         # Check that object would be pruned but still exists
         # Check that object would be pruned but still exists
         # On Windows, the repository initialization might create additional unreachable objects
         # On Windows, the repository initialization might create additional unreachable objects
@@ -214,7 +226,13 @@ class GCTestCase(TestCase):
 
 
         # Run garbage collection with a 1 hour grace period, but dry run to avoid packing
         # Run garbage collection with a 1 hour grace period, but dry run to avoid packing
         # The object was just created, so it should not be pruned
         # The object was just created, so it should not be pruned
-        stats = garbage_collect(self.repo, prune=True, grace_period=3600, dry_run=True)
+        stats = garbage_collect(
+            self.repo,
+            prune=True,
+            grace_period=3600,
+            dry_run=True,
+            progress=no_op_progress,
+        )
 
 
         # Check that the object was NOT pruned
         # Check that the object was NOT pruned
         self.assertEqual(set(), stats.pruned_objects)
         self.assertEqual(set(), stats.pruned_objects)
@@ -244,7 +262,9 @@ class GCTestCase(TestCase):
 
 
         # Run garbage collection with a 1 hour grace period
         # Run garbage collection with a 1 hour grace period
         # The object is 2 hours old, so it should be pruned
         # The object is 2 hours old, so it should be pruned
-        stats = garbage_collect(self.repo, prune=True, grace_period=3600)
+        stats = garbage_collect(
+            self.repo, prune=True, grace_period=3600, progress=no_op_progress
+        )
 
 
         # Check that the object was pruned
         # Check that the object was pruned
         self.assertEqual({old_blob.id}, stats.pruned_objects)
         self.assertEqual({old_blob.id}, stats.pruned_objects)
@@ -257,14 +277,16 @@ class GCTestCase(TestCase):
         self.repo.object_store.add_object(unreachable_blob)
         self.repo.object_store.add_object(unreachable_blob)
 
 
         # Pack the objects to ensure the blob is in a pack
         # Pack the objects to ensure the blob is in a pack
-        self.repo.object_store.pack_loose_objects()
+        self.repo.object_store.pack_loose_objects(progress=no_op_progress)
 
 
         # Ensure the object is NOT loose anymore
         # Ensure the object is NOT loose anymore
         self.assertFalse(self.repo.object_store.contains_loose(unreachable_blob.id))
         self.assertFalse(self.repo.object_store.contains_loose(unreachable_blob.id))
         self.assertIn(unreachable_blob.id, self.repo.object_store)
         self.assertIn(unreachable_blob.id, self.repo.object_store)
 
 
         # Run garbage collection (grace_period=None means no grace period check)
         # Run garbage collection (grace_period=None means no grace period check)
-        stats = garbage_collect(self.repo, prune=True, grace_period=None)
+        stats = garbage_collect(
+            self.repo, prune=True, grace_period=None, progress=no_op_progress
+        )
 
 
         # Check that the packed object was pruned
         # Check that the packed object was pruned
         self.assertEqual({unreachable_blob.id}, stats.pruned_objects)
         self.assertEqual({unreachable_blob.id}, stats.pruned_objects)
@@ -410,7 +432,9 @@ class GCTestCase(TestCase):
             self.repo.object_store, "get_object_mtime", side_effect=KeyError
             self.repo.object_store, "get_object_mtime", side_effect=KeyError
         ):
         ):
             # Run garbage collection with grace period
             # Run garbage collection with grace period
-            stats = garbage_collect(self.repo, prune=True, grace_period=3600)
+            stats = garbage_collect(
+                self.repo, prune=True, grace_period=3600, progress=no_op_progress
+            )
 
 
         # Object should be kept because mtime couldn't be determined
         # Object should be kept because mtime couldn't be determined
         self.assertEqual(set(), stats.pruned_objects)
         self.assertEqual(set(), stats.pruned_objects)
@@ -487,7 +511,7 @@ class AutoGCTestCase(TestCase):
                 blob = Blob()
                 blob = Blob()
                 blob.data = f"test blob {i}".encode()
                 blob.data = f"test blob {i}".encode()
                 r.object_store.add_object(blob)
                 r.object_store.add_object(blob)
-                r.object_store.pack_loose_objects()
+                r.object_store.pack_loose_objects(progress=no_op_progress)
 
 
             # Force re-enumeration of packs
             # Force re-enumeration of packs
             r.object_store._update_pack_cache()
             r.object_store._update_pack_cache()
@@ -525,7 +549,7 @@ class AutoGCTestCase(TestCase):
             blob = Blob()
             blob = Blob()
             blob.data = b"test blob"
             blob.data = b"test blob"
             r.object_store.add_object(blob)
             r.object_store.add_object(blob)
-            r.object_store.pack_loose_objects()
+            r.object_store.pack_loose_objects(progress=no_op_progress)
 
 
             # Force re-enumeration of packs
             # Force re-enumeration of packs
             r.object_store._update_pack_cache()
             r.object_store._update_pack_cache()
@@ -547,10 +571,10 @@ class AutoGCTestCase(TestCase):
                 r.object_store.add_object(blob)
                 r.object_store.add_object(blob)
 
 
             with patch("dulwich.gc.garbage_collect") as mock_gc:
             with patch("dulwich.gc.garbage_collect") as mock_gc:
-                result = maybe_auto_gc(r, config)
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
 
             self.assertTrue(result)
             self.assertTrue(result)
-            mock_gc.assert_called_once_with(r, auto=True)
+            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
 
 
     def test_maybe_auto_gc_skips_when_not_needed(self):
     def test_maybe_auto_gc_skips_when_not_needed(self):
         """Test that auto GC doesn't run when thresholds are not exceeded."""
         """Test that auto GC doesn't run when thresholds are not exceeded."""
@@ -558,7 +582,7 @@ class AutoGCTestCase(TestCase):
         config = ConfigDict()
         config = ConfigDict()
 
 
         with patch("dulwich.gc.garbage_collect") as mock_gc:
         with patch("dulwich.gc.garbage_collect") as mock_gc:
-            result = maybe_auto_gc(r, config)
+            result = maybe_auto_gc(r, config, progress=no_op_progress)
 
 
         self.assertFalse(result)
         self.assertFalse(result)
         mock_gc.assert_not_called()
         mock_gc.assert_not_called()
@@ -580,12 +604,15 @@ class AutoGCTestCase(TestCase):
             blob.data = b"test"
             blob.data = b"test"
             r.object_store.add_object(blob)
             r.object_store.add_object(blob)
 
 
-            with patch("builtins.print") as mock_print:
-                result = maybe_auto_gc(r, config)
+            # Capture log messages
+            import logging
+
+            with self.assertLogs(level=logging.INFO) as cm:
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
 
             self.assertFalse(result)
             self.assertFalse(result)
-            # Verify gc.log contents were printed
-            mock_print.assert_called_once_with("Previous GC failed\n")
+            # Verify gc.log contents were logged
+            self.assertTrue(any("Previous GC failed" in msg for msg in cm.output))
 
 
     def test_maybe_auto_gc_with_expired_gc_log(self):
     def test_maybe_auto_gc_with_expired_gc_log(self):
         """Test that auto GC runs when gc.log exists but is expired."""
         """Test that auto GC runs when gc.log exists but is expired."""
@@ -610,10 +637,10 @@ class AutoGCTestCase(TestCase):
             r.object_store.add_object(blob)
             r.object_store.add_object(blob)
 
 
             with patch("dulwich.gc.garbage_collect") as mock_gc:
             with patch("dulwich.gc.garbage_collect") as mock_gc:
-                result = maybe_auto_gc(r, config)
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
 
             self.assertTrue(result)
             self.assertTrue(result)
-            mock_gc.assert_called_once_with(r, auto=True)
+            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
             # gc.log should be removed after successful GC
             # gc.log should be removed after successful GC
             self.assertFalse(os.path.exists(gc_log_path))
             self.assertFalse(os.path.exists(gc_log_path))
 
 
@@ -632,10 +659,10 @@ class AutoGCTestCase(TestCase):
             with patch(
             with patch(
                 "dulwich.gc.garbage_collect", side_effect=OSError("GC failed")
                 "dulwich.gc.garbage_collect", side_effect=OSError("GC failed")
             ) as mock_gc:
             ) as mock_gc:
-                result = maybe_auto_gc(r, config)
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
 
             self.assertFalse(result)
             self.assertFalse(result)
-            mock_gc.assert_called_once_with(r, auto=True)
+            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
 
 
             # Check that error was written to gc.log
             # Check that error was written to gc.log
             gc_log_path = os.path.join(r.controldir(), "gc.log")
             gc_log_path = os.path.join(r.controldir(), "gc.log")
@@ -667,10 +694,10 @@ class AutoGCTestCase(TestCase):
             r.object_store.add_object(blob)
             r.object_store.add_object(blob)
 
 
             with patch("dulwich.gc.garbage_collect") as mock_gc:
             with patch("dulwich.gc.garbage_collect") as mock_gc:
-                result = maybe_auto_gc(r, config)
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
 
             self.assertTrue(result)
             self.assertTrue(result)
-            mock_gc.assert_called_once_with(r, auto=True)
+            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
 
 
     def test_gc_log_expiry_invalid_format(self):
     def test_gc_log_expiry_invalid_format(self):
         """Test that invalid gc.logExpiry format defaults to 1 day."""
         """Test that invalid gc.logExpiry format defaults to 1 day."""
@@ -694,12 +721,16 @@ class AutoGCTestCase(TestCase):
             blob.data = b"test"
             blob.data = b"test"
             r.object_store.add_object(blob)
             r.object_store.add_object(blob)
 
 
-            with patch("builtins.print") as mock_print:
-                result = maybe_auto_gc(r, config)
+            # Capture log messages
+            import logging
+
+            with self.assertLogs(level=logging.INFO) as cm:
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
 
             # Should not run GC because gc.log is recent (within default 1 day)
             # Should not run GC because gc.log is recent (within default 1 day)
             self.assertFalse(result)
             self.assertFalse(result)
-            mock_print.assert_called_once()
+            # Check that gc.log content was logged
+            self.assertTrue(any("gc.log content:" in msg for msg in cm.output))
 
 
     def test_maybe_auto_gc_non_disk_repo(self):
     def test_maybe_auto_gc_non_disk_repo(self):
         """Test auto GC on non-disk repository (MemoryRepo)."""
         """Test auto GC on non-disk repository (MemoryRepo)."""
@@ -715,7 +746,7 @@ class AutoGCTestCase(TestCase):
 
 
         # For non-disk repos, should_run_gc returns False
         # For non-disk repos, should_run_gc returns False
         # because it can't count loose objects
         # because it can't count loose objects
-        result = maybe_auto_gc(r, config)
+        result = maybe_auto_gc(r, config, progress=no_op_progress)
         self.assertFalse(result)
         self.assertFalse(result)
 
 
     def test_gc_removes_existing_gc_log_on_success(self):
     def test_gc_removes_existing_gc_log_on_success(self):
@@ -740,7 +771,7 @@ class AutoGCTestCase(TestCase):
             r.object_store.add_object(blob)
             r.object_store.add_object(blob)
 
 
             # Run auto GC
             # Run auto GC
-            result = maybe_auto_gc(r, config)
+            result = maybe_auto_gc(r, config, progress=no_op_progress)
 
 
             self.assertTrue(result)
             self.assertTrue(result)
             # gc.log should be removed after successful GC
             # gc.log should be removed after successful GC

+ 39 - 4
tests/test_lfs.py

@@ -36,10 +36,22 @@ from . import TestCase
 class LFSTests(TestCase):
 class LFSTests(TestCase):
     def setUp(self) -> None:
     def setUp(self) -> None:
         super().setUp()
         super().setUp()
+        # Suppress LFS warnings during these tests
+        import logging
+
+        self._old_level = logging.getLogger("dulwich.lfs").level
+        logging.getLogger("dulwich.lfs").setLevel(logging.ERROR)
         self.test_dir = tempfile.mkdtemp()
         self.test_dir = tempfile.mkdtemp()
         self.addCleanup(shutil.rmtree, self.test_dir)
         self.addCleanup(shutil.rmtree, self.test_dir)
         self.lfs = LFSStore.create(self.test_dir)
         self.lfs = LFSStore.create(self.test_dir)
 
 
+    def tearDown(self) -> None:
+        # Restore original logging level
+        import logging
+
+        logging.getLogger("dulwich.lfs").setLevel(self._old_level)
+        super().tearDown()
+
     def test_create(self) -> None:
     def test_create(self) -> None:
         sha = self.lfs.write_object([b"a", b"b"])
         sha = self.lfs.write_object([b"a", b"b"])
         with self.lfs.open_object(sha) as f:
         with self.lfs.open_object(sha) as f:
@@ -209,19 +221,30 @@ class LFSIntegrationTests(TestCase):
 
 
     def setUp(self) -> None:
     def setUp(self) -> None:
         super().setUp()
         super().setUp()
-        import os
+        # Suppress LFS warnings during these integration tests
+        import logging
 
 
-        from dulwich.repo import Repo
+        self._old_level = logging.getLogger("dulwich.lfs").level
+        logging.getLogger("dulwich.lfs").setLevel(logging.ERROR)
 
 
         # Create temporary directory for test repo
         # Create temporary directory for test repo
         self.test_dir = tempfile.mkdtemp()
         self.test_dir = tempfile.mkdtemp()
         self.addCleanup(shutil.rmtree, self.test_dir)
         self.addCleanup(shutil.rmtree, self.test_dir)
 
 
         # Initialize repo
         # Initialize repo
+        from dulwich.repo import Repo
+
         self.repo = Repo.init(self.test_dir)
         self.repo = Repo.init(self.test_dir)
         self.lfs_dir = os.path.join(self.test_dir, ".git", "lfs")
         self.lfs_dir = os.path.join(self.test_dir, ".git", "lfs")
         self.lfs_store = LFSStore.create(self.lfs_dir)
         self.lfs_store = LFSStore.create(self.lfs_dir)
 
 
+    def tearDown(self) -> None:
+        # Restore original logging level
+        import logging
+
+        logging.getLogger("dulwich.lfs").setLevel(self._old_level)
+        super().tearDown()
+
     def test_lfs_with_gitattributes(self) -> None:
     def test_lfs_with_gitattributes(self) -> None:
         """Test LFS integration with .gitattributes."""
         """Test LFS integration with .gitattributes."""
         import os
         import os
@@ -701,7 +724,13 @@ class LFSServerTests(TestCase):
         self.server_thread = threading.Thread(target=self.server.serve_forever)
         self.server_thread = threading.Thread(target=self.server.serve_forever)
         self.server_thread.daemon = True
         self.server_thread.daemon = True
         self.server_thread.start()
         self.server_thread.start()
-        self.addCleanup(self.server.shutdown)
+
+        def cleanup_server():
+            self.server.shutdown()
+            self.server.server_close()
+            self.server_thread.join(timeout=1.0)
+
+        self.addCleanup(cleanup_server)
 
 
     def test_server_batch_endpoint(self) -> None:
     def test_server_batch_endpoint(self) -> None:
         """Test the batch endpoint directly."""
         """Test the batch endpoint directly."""
@@ -974,7 +1003,13 @@ class LFSClientTests(TestCase):
         self.server_thread = threading.Thread(target=self.server.serve_forever)
         self.server_thread = threading.Thread(target=self.server.serve_forever)
         self.server_thread.daemon = True
         self.server_thread.daemon = True
         self.server_thread.start()
         self.server_thread.start()
-        self.addCleanup(self.server.shutdown)
+
+        def cleanup_server():
+            self.server.shutdown()
+            self.server.server_close()
+            self.server_thread.join(timeout=1.0)
+
+        self.addCleanup(cleanup_server)
 
 
         # Create LFS client pointing to our test server
         # Create LFS client pointing to our test server
         self.client = LFSClient(self.server_url)
         self.client = LFSClient(self.server_url)

+ 12 - 0
tests/test_lfs_integration.py

@@ -35,6 +35,11 @@ from . import TestCase
 class LFSFilterIntegrationTests(TestCase):
 class LFSFilterIntegrationTests(TestCase):
     def setUp(self) -> None:
     def setUp(self) -> None:
         super().setUp()
         super().setUp()
+        # Suppress LFS warnings during these integration tests
+        import logging
+
+        self._old_level = logging.getLogger("dulwich.lfs").level
+        logging.getLogger("dulwich.lfs").setLevel(logging.ERROR)
         # Create temporary directory for LFS store
         # Create temporary directory for LFS store
         self.test_dir = tempfile.mkdtemp()
         self.test_dir = tempfile.mkdtemp()
         self.addCleanup(shutil.rmtree, self.test_dir)
         self.addCleanup(shutil.rmtree, self.test_dir)
@@ -60,6 +65,13 @@ class LFSFilterIntegrationTests(TestCase):
             self.config, self.gitattributes, self.registry
             self.config, self.gitattributes, self.registry
         )
         )
 
 
+    def tearDown(self) -> None:
+        # Restore original logging level
+        import logging
+
+        logging.getLogger("dulwich.lfs").setLevel(self._old_level)
+        super().tearDown()
+
     def test_lfs_round_trip(self) -> None:
     def test_lfs_round_trip(self) -> None:
         """Test complete LFS round trip through filter normalizer."""
         """Test complete LFS round trip through filter normalizer."""
         # Create a blob with binary content
         # Create a blob with binary content

+ 51 - 39
tests/test_pack.py

@@ -430,33 +430,39 @@ class TestPackData(PackTests):
 
 
     def test_compute_file_sha(self) -> None:
     def test_compute_file_sha(self) -> None:
         f = BytesIO(b"abcd1234wxyz")
         f = BytesIO(b"abcd1234wxyz")
-        self.assertEqual(
-            sha1(b"abcd1234wxyz").hexdigest(), compute_file_sha(f).hexdigest()
-        )
-        self.assertEqual(
-            sha1(b"abcd1234wxyz").hexdigest(),
-            compute_file_sha(f, buffer_size=5).hexdigest(),
-        )
-        self.assertEqual(
-            sha1(b"abcd1234").hexdigest(),
-            compute_file_sha(f, end_ofs=-4).hexdigest(),
-        )
-        self.assertEqual(
-            sha1(b"1234wxyz").hexdigest(),
-            compute_file_sha(f, start_ofs=4).hexdigest(),
-        )
-        self.assertEqual(
-            sha1(b"1234").hexdigest(),
-            compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest(),
-        )
+        try:
+            self.assertEqual(
+                sha1(b"abcd1234wxyz").hexdigest(), compute_file_sha(f).hexdigest()
+            )
+            self.assertEqual(
+                sha1(b"abcd1234wxyz").hexdigest(),
+                compute_file_sha(f, buffer_size=5).hexdigest(),
+            )
+            self.assertEqual(
+                sha1(b"abcd1234").hexdigest(),
+                compute_file_sha(f, end_ofs=-4).hexdigest(),
+            )
+            self.assertEqual(
+                sha1(b"1234wxyz").hexdigest(),
+                compute_file_sha(f, start_ofs=4).hexdigest(),
+            )
+            self.assertEqual(
+                sha1(b"1234").hexdigest(),
+                compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest(),
+            )
+        finally:
+            f.close()
 
 
     def test_compute_file_sha_short_file(self) -> None:
     def test_compute_file_sha_short_file(self) -> None:
         f = BytesIO(b"abcd1234wxyz")
         f = BytesIO(b"abcd1234wxyz")
-        self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20)
-        self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20)
-        self.assertRaises(
-            AssertionError, compute_file_sha, f, start_ofs=10, end_ofs=-12
-        )
+        try:
+            self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20)
+            self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20)
+            self.assertRaises(
+                AssertionError, compute_file_sha, f, start_ofs=10, end_ofs=-12
+            )
+        finally:
+            f.close()
 
 
 
 
 class TestPack(PackTests):
 class TestPack(PackTests):
@@ -729,24 +735,30 @@ class TestThinPack(PackTests):
 class WritePackTests(TestCase):
 class WritePackTests(TestCase):
     def test_write_pack_header(self) -> None:
     def test_write_pack_header(self) -> None:
         f = BytesIO()
         f = BytesIO()
-        write_pack_header(f.write, 42)
-        self.assertEqual(b"PACK\x00\x00\x00\x02\x00\x00\x00*", f.getvalue())
+        try:
+            write_pack_header(f.write, 42)
+            self.assertEqual(b"PACK\x00\x00\x00\x02\x00\x00\x00*", f.getvalue())
+        finally:
+            f.close()
 
 
     def test_write_pack_object(self) -> None:
     def test_write_pack_object(self) -> None:
         f = BytesIO()
         f = BytesIO()
-        f.write(b"header")
-        offset = f.tell()
-        crc32 = write_pack_object(f.write, Blob.type_num, b"blob")
-        self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xFFFFFFFF)
-
-        f.write(b"x")  # unpack_object needs extra trailing data.
-        f.seek(offset)
-        unpacked, unused = unpack_object(f.read, compute_crc32=True)
-        self.assertEqual(Blob.type_num, unpacked.pack_type_num)
-        self.assertEqual(Blob.type_num, unpacked.obj_type_num)
-        self.assertEqual([b"blob"], unpacked.decomp_chunks)
-        self.assertEqual(crc32, unpacked.crc32)
-        self.assertEqual(b"x", unused)
+        try:
+            f.write(b"header")
+            offset = f.tell()
+            crc32 = write_pack_object(f.write, Blob.type_num, b"blob")
+            self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xFFFFFFFF)
+
+            f.write(b"x")  # unpack_object needs extra trailing data.
+            f.seek(offset)
+            unpacked, unused = unpack_object(f.read, compute_crc32=True)
+            self.assertEqual(Blob.type_num, unpacked.pack_type_num)
+            self.assertEqual(Blob.type_num, unpacked.obj_type_num)
+            self.assertEqual([b"blob"], unpacked.decomp_chunks)
+            self.assertEqual(crc32, unpacked.crc32)
+            self.assertEqual(b"x", unused)
+        finally:
+            f.close()
 
 
     def test_write_pack_object_sha(self) -> None:
     def test_write_pack_object_sha(self) -> None:
         f = BytesIO()
         f = BytesIO()

Некоторые файлы не были показаны из-за большого количества измененных файлов