Explorar el Código

Fix Repo resource leaks in test_gc.py

Jelmer Vernooij hace 2 semanas
padre
commit
844115660a
Se han modificado 1 ficheros con 176 adiciones y 178 borrados
  1. 176 178
      tests/test_gc.py

+ 176 - 178
tests/test_gc.py

@@ -491,94 +491,92 @@ class AutoGCTestCase(TestCase):
     def test_should_run_gc_with_loose_objects(self):
         """Test that auto GC triggers based on loose object count."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-            config = ConfigDict()
-            config.set(b"gc", b"auto", b"10")  # Low threshold for testing
+            with Repo.init(tmpdir) as r:
+                config = ConfigDict()
+                config.set(b"gc", b"auto", b"10")  # Low threshold for testing
 
-            # Add some loose objects
-            for i in range(15):
-                blob = Blob()
-                blob.data = f"test blob {i}".encode()
-                r.object_store.add_object(blob)
+                # Add some loose objects
+                for i in range(15):
+                    blob = Blob()
+                    blob.data = f"test blob {i}".encode()
+                    r.object_store.add_object(blob)
 
-            self.assertTrue(should_run_gc(r, config))
+                self.assertTrue(should_run_gc(r, config))
 
     def test_should_run_gc_with_pack_limit(self):
         """Test that auto GC triggers based on pack file count."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-            config = ConfigDict()
-            config.set(b"gc", b"autoPackLimit", b"2")  # Low threshold for testing
+            with Repo.init(tmpdir) as r:
+                config = ConfigDict()
+                config.set(b"gc", b"autoPackLimit", b"2")  # Low threshold for testing
 
-            # Create some pack files by repacking
-            for i in range(3):
-                blob = Blob()
-                blob.data = f"test blob {i}".encode()
-                r.object_store.add_object(blob)
-                r.object_store.pack_loose_objects(progress=no_op_progress)
+                # Create some pack files by repacking
+                for i in range(3):
+                    blob = Blob()
+                    blob.data = f"test blob {i}".encode()
+                    r.object_store.add_object(blob)
+                    r.object_store.pack_loose_objects(progress=no_op_progress)
 
-            # Force re-enumeration of packs
-            r.object_store._update_pack_cache()
+                # Force re-enumeration of packs
+                r.object_store._update_pack_cache()
 
-            self.assertTrue(should_run_gc(r, config))
+                self.assertTrue(should_run_gc(r, config))
 
     def test_count_loose_objects(self):
         """Test counting loose objects."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-
-            # Initially should have no loose objects
-            count = r.object_store.count_loose_objects()
-            self.assertEqual(0, count)
+            with Repo.init(tmpdir) as r:
+                # Initially should have no loose objects
+                count = r.object_store.count_loose_objects()
+                self.assertEqual(0, count)
 
-            # Add some loose objects
-            for i in range(5):
-                blob = Blob()
-                blob.data = f"test blob {i}".encode()
-                r.object_store.add_object(blob)
+                # Add some loose objects
+                for i in range(5):
+                    blob = Blob()
+                    blob.data = f"test blob {i}".encode()
+                    r.object_store.add_object(blob)
 
-            count = r.object_store.count_loose_objects()
-            self.assertEqual(5, count)
+                count = r.object_store.count_loose_objects()
+                self.assertEqual(5, count)
 
     def test_count_pack_files(self):
         """Test counting pack files."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-
-            # Initially should have no packs
-            count = r.object_store.count_pack_files()
-            self.assertEqual(0, count)
+            with Repo.init(tmpdir) as r:
+                # Initially should have no packs
+                count = r.object_store.count_pack_files()
+                self.assertEqual(0, count)
 
-            # Create a pack
-            blob = Blob()
-            blob.data = b"test blob"
-            r.object_store.add_object(blob)
-            r.object_store.pack_loose_objects(progress=no_op_progress)
+                # Create a pack
+                blob = Blob()
+                blob.data = b"test blob"
+                r.object_store.add_object(blob)
+                r.object_store.pack_loose_objects(progress=no_op_progress)
 
-            # Force re-enumeration of packs
-            r.object_store._update_pack_cache()
+                # Force re-enumeration of packs
+                r.object_store._update_pack_cache()
 
-            count = r.object_store.count_pack_files()
-            self.assertEqual(1, count)
+                count = r.object_store.count_pack_files()
+                self.assertEqual(1, count)
 
     def test_maybe_auto_gc_runs_when_needed(self):
         """Test that auto GC runs when thresholds are exceeded."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-            config = ConfigDict()
-            config.set(b"gc", b"auto", b"5")  # Low threshold for testing
+            with Repo.init(tmpdir) as r:
+                config = ConfigDict()
+                config.set(b"gc", b"auto", b"5")  # Low threshold for testing
 
-            # Add enough loose objects to trigger GC
-            for i in range(10):
-                blob = Blob()
-                blob.data = f"test blob {i}".encode()
-                r.object_store.add_object(blob)
+                # Add enough loose objects to trigger GC
+                for i in range(10):
+                    blob = Blob()
+                    blob.data = f"test blob {i}".encode()
+                    r.object_store.add_object(blob)
 
-            with patch("dulwich.gc.garbage_collect") as mock_gc:
-                result = maybe_auto_gc(r, config, progress=no_op_progress)
+                with patch("dulwich.gc.garbage_collect") as mock_gc:
+                    result = maybe_auto_gc(r, config, progress=no_op_progress)
 
-            self.assertTrue(result)
-            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
+                self.assertTrue(result)
+                mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
 
     def test_maybe_auto_gc_skips_when_not_needed(self):
         """Test that auto GC doesn't run when thresholds are not exceeded."""
@@ -595,147 +593,147 @@ class AutoGCTestCase(TestCase):
     def test_maybe_auto_gc_with_gc_log(self):
         """Test that auto GC is skipped when gc.log exists and is recent."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-            config = ConfigDict()
-            config.set(b"gc", b"auto", b"1")  # Low threshold
+            with Repo.init(tmpdir) as r:
+                config = ConfigDict()
+                config.set(b"gc", b"auto", b"1")  # Low threshold
 
-            # Create gc.log file
-            gc_log_path = os.path.join(r.controldir(), "gc.log")
-            with open(gc_log_path, "wb") as f:
-                f.write(b"Previous GC failed\n")
+                # Create gc.log file
+                gc_log_path = os.path.join(r.controldir(), "gc.log")
+                with open(gc_log_path, "wb") as f:
+                    f.write(b"Previous GC failed\n")
 
-            # Add objects to trigger GC
-            blob = Blob()
-            blob.data = b"test"
-            r.object_store.add_object(blob)
+                # Add objects to trigger GC
+                blob = Blob()
+                blob.data = b"test"
+                r.object_store.add_object(blob)
 
-            # Capture log messages
-            import logging
+                # Capture log messages
+                import logging
 
-            with self.assertLogs(level=logging.INFO) as cm:
-                result = maybe_auto_gc(r, config, progress=no_op_progress)
+                with self.assertLogs(level=logging.INFO) as cm:
+                    result = maybe_auto_gc(r, config, progress=no_op_progress)
 
-            self.assertFalse(result)
-            # Verify gc.log contents were logged
-            self.assertTrue(any("Previous GC failed" in msg for msg in cm.output))
+                self.assertFalse(result)
+                # Verify gc.log contents were logged
+                self.assertTrue(any("Previous GC failed" in msg for msg in cm.output))
 
     def test_maybe_auto_gc_with_expired_gc_log(self):
         """Test that auto GC runs when gc.log exists but is expired."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-            config = ConfigDict()
-            config.set(b"gc", b"auto", b"1")  # Low threshold
-            config.set(b"gc", b"logExpiry", b"0.days")  # Expire immediately
+            with Repo.init(tmpdir) as r:
+                config = ConfigDict()
+                config.set(b"gc", b"auto", b"1")  # Low threshold
+                config.set(b"gc", b"logExpiry", b"0.days")  # Expire immediately
 
-            # Create gc.log file
-            gc_log_path = os.path.join(r.controldir(), "gc.log")
-            with open(gc_log_path, "wb") as f:
-                f.write(b"Previous GC failed\n")
+                # Create gc.log file
+                gc_log_path = os.path.join(r.controldir(), "gc.log")
+                with open(gc_log_path, "wb") as f:
+                    f.write(b"Previous GC failed\n")
 
-            # Make the file old
-            old_time = time.time() - 86400  # 1 day ago
-            os.utime(gc_log_path, (old_time, old_time))
+                # Make the file old
+                old_time = time.time() - 86400  # 1 day ago
+                os.utime(gc_log_path, (old_time, old_time))
 
-            # Add objects to trigger GC
-            blob = Blob()
-            blob.data = b"test"
-            r.object_store.add_object(blob)
+                # Add objects to trigger GC
+                blob = Blob()
+                blob.data = b"test"
+                r.object_store.add_object(blob)
 
-            with patch("dulwich.gc.garbage_collect") as mock_gc:
-                result = maybe_auto_gc(r, config, progress=no_op_progress)
+                with patch("dulwich.gc.garbage_collect") as mock_gc:
+                    result = maybe_auto_gc(r, config, progress=no_op_progress)
 
-            self.assertTrue(result)
-            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
-            # gc.log should be removed after successful GC
-            self.assertFalse(os.path.exists(gc_log_path))
+                self.assertTrue(result)
+                mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
+                # gc.log should be removed after successful GC
+                self.assertFalse(os.path.exists(gc_log_path))
 
     def test_maybe_auto_gc_handles_gc_failure(self):
         """Test that auto GC handles failures gracefully."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-            config = ConfigDict()
-            config.set(b"gc", b"auto", b"1")  # Low threshold
+            with Repo.init(tmpdir) as r:
+                config = ConfigDict()
+                config.set(b"gc", b"auto", b"1")  # Low threshold
 
-            # Add objects to trigger GC
-            blob = Blob()
-            blob.data = b"test"
-            r.object_store.add_object(blob)
+                # Add objects to trigger GC
+                blob = Blob()
+                blob.data = b"test"
+                r.object_store.add_object(blob)
 
-            with patch(
-                "dulwich.gc.garbage_collect", side_effect=OSError("GC failed")
-            ) as mock_gc:
-                result = maybe_auto_gc(r, config, progress=no_op_progress)
+                with patch(
+                    "dulwich.gc.garbage_collect", side_effect=OSError("GC failed")
+                ) as mock_gc:
+                    result = maybe_auto_gc(r, config, progress=no_op_progress)
 
-            self.assertFalse(result)
-            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
+                self.assertFalse(result)
+                mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
 
-            # Check that error was written to gc.log
-            gc_log_path = os.path.join(r.controldir(), "gc.log")
-            self.assertTrue(os.path.exists(gc_log_path))
-            with open(gc_log_path, "rb") as f:
-                content = f.read()
-                self.assertIn(b"Auto GC failed: GC failed", content)
+                # Check that error was written to gc.log
+                gc_log_path = os.path.join(r.controldir(), "gc.log")
+                self.assertTrue(os.path.exists(gc_log_path))
+                with open(gc_log_path, "rb") as f:
+                    content = f.read()
+                    self.assertIn(b"Auto GC failed: GC failed", content)
 
     def test_gc_log_expiry_singular_day(self):
         """Test that gc.logExpiry supports singular '.day' format."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-            config = ConfigDict()
-            config.set(b"gc", b"auto", b"1")  # Low threshold
-            config.set(b"gc", b"logExpiry", b"1.day")  # Singular form
+            with Repo.init(tmpdir) as r:
+                config = ConfigDict()
+                config.set(b"gc", b"auto", b"1")  # Low threshold
+                config.set(b"gc", b"logExpiry", b"1.day")  # Singular form
 
-            # Create gc.log file
-            gc_log_path = os.path.join(r.controldir(), "gc.log")
-            with open(gc_log_path, "wb") as f:
-                f.write(b"Previous GC failed\n")
+                # Create gc.log file
+                gc_log_path = os.path.join(r.controldir(), "gc.log")
+                with open(gc_log_path, "wb") as f:
+                    f.write(b"Previous GC failed\n")
 
-            # Make the file 2 days old (older than 1 day expiry)
-            old_time = time.time() - (2 * 86400)
-            os.utime(gc_log_path, (old_time, old_time))
+                # Make the file 2 days old (older than 1 day expiry)
+                old_time = time.time() - (2 * 86400)
+                os.utime(gc_log_path, (old_time, old_time))
 
-            # Add objects to trigger GC
-            blob = Blob()
-            blob.data = b"test"
-            r.object_store.add_object(blob)
+                # Add objects to trigger GC
+                blob = Blob()
+                blob.data = b"test"
+                r.object_store.add_object(blob)
 
-            with patch("dulwich.gc.garbage_collect") as mock_gc:
-                result = maybe_auto_gc(r, config, progress=no_op_progress)
+                with patch("dulwich.gc.garbage_collect") as mock_gc:
+                    result = maybe_auto_gc(r, config, progress=no_op_progress)
 
-            self.assertTrue(result)
-            mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
+                self.assertTrue(result)
+                mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
 
     def test_gc_log_expiry_invalid_format(self):
         """Test that invalid gc.logExpiry format defaults to 1 day."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-            config = ConfigDict()
-            config.set(b"gc", b"auto", b"1")  # Low threshold
-            config.set(b"gc", b"logExpiry", b"invalid")  # Invalid format
+            with Repo.init(tmpdir) as r:
+                config = ConfigDict()
+                config.set(b"gc", b"auto", b"1")  # Low threshold
+                config.set(b"gc", b"logExpiry", b"invalid")  # Invalid format
 
-            # Create gc.log file
-            gc_log_path = os.path.join(r.controldir(), "gc.log")
-            with open(gc_log_path, "wb") as f:
-                f.write(b"Previous GC failed\n")
+                # Create gc.log file
+                gc_log_path = os.path.join(r.controldir(), "gc.log")
+                with open(gc_log_path, "wb") as f:
+                    f.write(b"Previous GC failed\n")
 
-            # Make the file recent (within default 1 day)
-            recent_time = time.time() - 3600  # 1 hour ago
-            os.utime(gc_log_path, (recent_time, recent_time))
+                # Make the file recent (within default 1 day)
+                recent_time = time.time() - 3600  # 1 hour ago
+                os.utime(gc_log_path, (recent_time, recent_time))
 
-            # Add objects to trigger GC
-            blob = Blob()
-            blob.data = b"test"
-            r.object_store.add_object(blob)
+                # Add objects to trigger GC
+                blob = Blob()
+                blob.data = b"test"
+                r.object_store.add_object(blob)
 
-            # Capture log messages
-            import logging
+                # Capture log messages
+                import logging
 
-            with self.assertLogs(level=logging.INFO) as cm:
-                result = maybe_auto_gc(r, config, progress=no_op_progress)
+                with self.assertLogs(level=logging.INFO) as cm:
+                    result = maybe_auto_gc(r, config, progress=no_op_progress)
 
-            # Should not run GC because gc.log is recent (within default 1 day)
-            self.assertFalse(result)
-            # Check that gc.log content was logged
-            self.assertTrue(any("gc.log content:" in msg for msg in cm.output))
+                # Should not run GC because gc.log is recent (within default 1 day)
+                self.assertFalse(result)
+                # Check that gc.log content was logged
+                self.assertTrue(any("gc.log content:" in msg for msg in cm.output))
 
     def test_maybe_auto_gc_non_disk_repo(self):
         """Test auto GC on non-disk repository (MemoryRepo)."""
@@ -758,27 +756,27 @@ class AutoGCTestCase(TestCase):
     def test_gc_removes_existing_gc_log_on_success(self):
         """Test that successful GC removes pre-existing gc.log file."""
         with tempfile.TemporaryDirectory() as tmpdir:
-            r = Repo.init(tmpdir)
-            config = ConfigDict()
-            config.set(b"gc", b"auto", b"1")  # Low threshold
+            with Repo.init(tmpdir) as r:
+                config = ConfigDict()
+                config.set(b"gc", b"auto", b"1")  # Low threshold
 
-            # Create gc.log file from previous failure
-            gc_log_path = os.path.join(r.controldir(), "gc.log")
-            with open(gc_log_path, "wb") as f:
-                f.write(b"Previous GC failed\n")
+                # Create gc.log file from previous failure
+                gc_log_path = os.path.join(r.controldir(), "gc.log")
+                with open(gc_log_path, "wb") as f:
+                    f.write(b"Previous GC failed\n")
 
-            # Make it old enough to be expired
-            old_time = time.time() - (2 * 86400)  # 2 days ago
-            os.utime(gc_log_path, (old_time, old_time))
+                # Make it old enough to be expired
+                old_time = time.time() - (2 * 86400)  # 2 days ago
+                os.utime(gc_log_path, (old_time, old_time))
 
-            # Add objects to trigger GC
-            blob = Blob()
-            blob.data = b"test"
-            r.object_store.add_object(blob)
+                # Add objects to trigger GC
+                blob = Blob()
+                blob.data = b"test"
+                r.object_store.add_object(blob)
 
-            # Run auto GC
-            result = maybe_auto_gc(r, config, progress=no_op_progress)
+                # Run auto GC
+                result = maybe_auto_gc(r, config, progress=no_op_progress)
 
-            self.assertTrue(result)
-            # gc.log should be removed after successful GC
-            self.assertFalse(os.path.exists(gc_log_path))
+                self.assertTrue(result)
+                # gc.log should be removed after successful GC
+                self.assertFalse(os.path.exists(gc_log_path))