|
|
@@ -447,6 +447,7 @@ class AutoGCTestCase(TestCase):
|
|
|
def test_should_run_gc_disabled(self):
|
|
|
"""Test that auto GC doesn't run when gc.auto is 0."""
|
|
|
r = MemoryRepo()
|
|
|
+ self.addCleanup(r.close)
|
|
|
config = ConfigDict()
|
|
|
config.set(b"gc", b"auto", b"0")
|
|
|
|
|
|
@@ -455,6 +456,7 @@ class AutoGCTestCase(TestCase):
|
|
|
def test_should_run_gc_disabled_by_env_var(self):
|
|
|
"""Test that auto GC doesn't run when GIT_AUTO_GC environment variable is 0."""
|
|
|
r = MemoryRepo()
|
|
|
+ self.addCleanup(r.close)
|
|
|
config = ConfigDict()
|
|
|
config.set(b"gc", b"auto", b"10") # Should normally run
|
|
|
|
|
|
@@ -464,6 +466,7 @@ class AutoGCTestCase(TestCase):
|
|
|
def test_should_run_gc_disabled_programmatically(self):
|
|
|
"""Test that auto GC doesn't run when disabled via _autogc_disabled attribute."""
|
|
|
r = MemoryRepo()
|
|
|
+ self.addCleanup(r.close)
|
|
|
config = ConfigDict()
|
|
|
config.set(b"gc", b"auto", b"10") # Should normally run
|
|
|
|
|
|
@@ -479,6 +482,7 @@ class AutoGCTestCase(TestCase):
|
|
|
def test_should_run_gc_default_values(self):
|
|
|
"""Test auto GC with default configuration values."""
|
|
|
r = MemoryRepo()
|
|
|
+ self.addCleanup(r.close)
|
|
|
config = ConfigDict()
|
|
|
|
|
|
# Should not run with empty repo
|
|
|
@@ -487,98 +491,97 @@ class AutoGCTestCase(TestCase):
|
|
|
def test_should_run_gc_with_loose_objects(self):
|
|
|
"""Test that auto GC triggers based on loose object count."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
- config = ConfigDict()
|
|
|
- config.set(b"gc", b"auto", b"10") # Low threshold for testing
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ config = ConfigDict()
|
|
|
+ config.set(b"gc", b"auto", b"10") # Low threshold for testing
|
|
|
|
|
|
- # Add some loose objects
|
|
|
- for i in range(15):
|
|
|
- blob = Blob()
|
|
|
- blob.data = f"test blob {i}".encode()
|
|
|
- r.object_store.add_object(blob)
|
|
|
+ # Add some loose objects
|
|
|
+ for i in range(15):
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = f"test blob {i}".encode()
|
|
|
+ r.object_store.add_object(blob)
|
|
|
|
|
|
- self.assertTrue(should_run_gc(r, config))
|
|
|
+ self.assertTrue(should_run_gc(r, config))
|
|
|
|
|
|
def test_should_run_gc_with_pack_limit(self):
|
|
|
"""Test that auto GC triggers based on pack file count."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
- config = ConfigDict()
|
|
|
- config.set(b"gc", b"autoPackLimit", b"2") # Low threshold for testing
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ config = ConfigDict()
|
|
|
+ config.set(b"gc", b"autoPackLimit", b"2") # Low threshold for testing
|
|
|
|
|
|
- # Create some pack files by repacking
|
|
|
- for i in range(3):
|
|
|
- blob = Blob()
|
|
|
- blob.data = f"test blob {i}".encode()
|
|
|
- r.object_store.add_object(blob)
|
|
|
- r.object_store.pack_loose_objects(progress=no_op_progress)
|
|
|
+ # Create some pack files by repacking
|
|
|
+ for i in range(3):
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = f"test blob {i}".encode()
|
|
|
+ r.object_store.add_object(blob)
|
|
|
+ r.object_store.pack_loose_objects(progress=no_op_progress)
|
|
|
|
|
|
- # Force re-enumeration of packs
|
|
|
- r.object_store._update_pack_cache()
|
|
|
+ # Force re-enumeration of packs
|
|
|
+ r.object_store._update_pack_cache()
|
|
|
|
|
|
- self.assertTrue(should_run_gc(r, config))
|
|
|
+ self.assertTrue(should_run_gc(r, config))
|
|
|
|
|
|
def test_count_loose_objects(self):
|
|
|
"""Test counting loose objects."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
-
|
|
|
- # Initially should have no loose objects
|
|
|
- count = r.object_store.count_loose_objects()
|
|
|
- self.assertEqual(0, count)
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ # Initially should have no loose objects
|
|
|
+ count = r.object_store.count_loose_objects()
|
|
|
+ self.assertEqual(0, count)
|
|
|
|
|
|
- # Add some loose objects
|
|
|
- for i in range(5):
|
|
|
- blob = Blob()
|
|
|
- blob.data = f"test blob {i}".encode()
|
|
|
- r.object_store.add_object(blob)
|
|
|
+ # Add some loose objects
|
|
|
+ for i in range(5):
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = f"test blob {i}".encode()
|
|
|
+ r.object_store.add_object(blob)
|
|
|
|
|
|
- count = r.object_store.count_loose_objects()
|
|
|
- self.assertEqual(5, count)
|
|
|
+ count = r.object_store.count_loose_objects()
|
|
|
+ self.assertEqual(5, count)
|
|
|
|
|
|
def test_count_pack_files(self):
|
|
|
"""Test counting pack files."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
-
|
|
|
- # Initially should have no packs
|
|
|
- count = r.object_store.count_pack_files()
|
|
|
- self.assertEqual(0, count)
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ # Initially should have no packs
|
|
|
+ count = r.object_store.count_pack_files()
|
|
|
+ self.assertEqual(0, count)
|
|
|
|
|
|
- # Create a pack
|
|
|
- blob = Blob()
|
|
|
- blob.data = b"test blob"
|
|
|
- r.object_store.add_object(blob)
|
|
|
- r.object_store.pack_loose_objects(progress=no_op_progress)
|
|
|
+ # Create a pack
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = b"test blob"
|
|
|
+ r.object_store.add_object(blob)
|
|
|
+ r.object_store.pack_loose_objects(progress=no_op_progress)
|
|
|
|
|
|
- # Force re-enumeration of packs
|
|
|
- r.object_store._update_pack_cache()
|
|
|
+ # Force re-enumeration of packs
|
|
|
+ r.object_store._update_pack_cache()
|
|
|
|
|
|
- count = r.object_store.count_pack_files()
|
|
|
- self.assertEqual(1, count)
|
|
|
+ count = r.object_store.count_pack_files()
|
|
|
+ self.assertEqual(1, count)
|
|
|
|
|
|
def test_maybe_auto_gc_runs_when_needed(self):
|
|
|
"""Test that auto GC runs when thresholds are exceeded."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
- config = ConfigDict()
|
|
|
- config.set(b"gc", b"auto", b"5") # Low threshold for testing
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ config = ConfigDict()
|
|
|
+ config.set(b"gc", b"auto", b"5") # Low threshold for testing
|
|
|
|
|
|
- # Add enough loose objects to trigger GC
|
|
|
- for i in range(10):
|
|
|
- blob = Blob()
|
|
|
- blob.data = f"test blob {i}".encode()
|
|
|
- r.object_store.add_object(blob)
|
|
|
+ # Add enough loose objects to trigger GC
|
|
|
+ for i in range(10):
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = f"test blob {i}".encode()
|
|
|
+ r.object_store.add_object(blob)
|
|
|
|
|
|
- with patch("dulwich.gc.garbage_collect") as mock_gc:
|
|
|
- result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
+ with patch("dulwich.gc.garbage_collect") as mock_gc:
|
|
|
+ result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
|
|
|
- self.assertTrue(result)
|
|
|
- mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
|
|
|
+ self.assertTrue(result)
|
|
|
+ mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
|
|
|
|
|
|
def test_maybe_auto_gc_skips_when_not_needed(self):
|
|
|
"""Test that auto GC doesn't run when thresholds are not exceeded."""
|
|
|
r = MemoryRepo()
|
|
|
+ self.addCleanup(r.close)
|
|
|
config = ConfigDict()
|
|
|
|
|
|
with patch("dulwich.gc.garbage_collect") as mock_gc:
|
|
|
@@ -590,151 +593,152 @@ class AutoGCTestCase(TestCase):
|
|
|
def test_maybe_auto_gc_with_gc_log(self):
|
|
|
"""Test that auto GC is skipped when gc.log exists and is recent."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
- config = ConfigDict()
|
|
|
- config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ config = ConfigDict()
|
|
|
+ config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
|
|
|
- # Create gc.log file
|
|
|
- gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
- with open(gc_log_path, "wb") as f:
|
|
|
- f.write(b"Previous GC failed\n")
|
|
|
+ # Create gc.log file
|
|
|
+ gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
+ with open(gc_log_path, "wb") as f:
|
|
|
+ f.write(b"Previous GC failed\n")
|
|
|
|
|
|
- # Add objects to trigger GC
|
|
|
- blob = Blob()
|
|
|
- blob.data = b"test"
|
|
|
- r.object_store.add_object(blob)
|
|
|
+ # Add objects to trigger GC
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = b"test"
|
|
|
+ r.object_store.add_object(blob)
|
|
|
|
|
|
- # Capture log messages
|
|
|
- import logging
|
|
|
+ # Capture log messages
|
|
|
+ import logging
|
|
|
|
|
|
- with self.assertLogs(level=logging.INFO) as cm:
|
|
|
- result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
+ with self.assertLogs(level=logging.INFO) as cm:
|
|
|
+ result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
|
|
|
- self.assertFalse(result)
|
|
|
- # Verify gc.log contents were logged
|
|
|
- self.assertTrue(any("Previous GC failed" in msg for msg in cm.output))
|
|
|
+ self.assertFalse(result)
|
|
|
+ # Verify gc.log contents were logged
|
|
|
+ self.assertTrue(any("Previous GC failed" in msg for msg in cm.output))
|
|
|
|
|
|
def test_maybe_auto_gc_with_expired_gc_log(self):
|
|
|
"""Test that auto GC runs when gc.log exists but is expired."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
- config = ConfigDict()
|
|
|
- config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
- config.set(b"gc", b"logExpiry", b"0.days") # Expire immediately
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ config = ConfigDict()
|
|
|
+ config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
+ config.set(b"gc", b"logExpiry", b"0.days") # Expire immediately
|
|
|
|
|
|
- # Create gc.log file
|
|
|
- gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
- with open(gc_log_path, "wb") as f:
|
|
|
- f.write(b"Previous GC failed\n")
|
|
|
+ # Create gc.log file
|
|
|
+ gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
+ with open(gc_log_path, "wb") as f:
|
|
|
+ f.write(b"Previous GC failed\n")
|
|
|
|
|
|
- # Make the file old
|
|
|
- old_time = time.time() - 86400 # 1 day ago
|
|
|
- os.utime(gc_log_path, (old_time, old_time))
|
|
|
+ # Make the file old
|
|
|
+ old_time = time.time() - 86400 # 1 day ago
|
|
|
+ os.utime(gc_log_path, (old_time, old_time))
|
|
|
|
|
|
- # Add objects to trigger GC
|
|
|
- blob = Blob()
|
|
|
- blob.data = b"test"
|
|
|
- r.object_store.add_object(blob)
|
|
|
+ # Add objects to trigger GC
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = b"test"
|
|
|
+ r.object_store.add_object(blob)
|
|
|
|
|
|
- with patch("dulwich.gc.garbage_collect") as mock_gc:
|
|
|
- result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
+ with patch("dulwich.gc.garbage_collect") as mock_gc:
|
|
|
+ result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
|
|
|
- self.assertTrue(result)
|
|
|
- mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
|
|
|
- # gc.log should be removed after successful GC
|
|
|
- self.assertFalse(os.path.exists(gc_log_path))
|
|
|
+ self.assertTrue(result)
|
|
|
+ mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
|
|
|
+ # gc.log should be removed after successful GC
|
|
|
+ self.assertFalse(os.path.exists(gc_log_path))
|
|
|
|
|
|
def test_maybe_auto_gc_handles_gc_failure(self):
|
|
|
"""Test that auto GC handles failures gracefully."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
- config = ConfigDict()
|
|
|
- config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ config = ConfigDict()
|
|
|
+ config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
|
|
|
- # Add objects to trigger GC
|
|
|
- blob = Blob()
|
|
|
- blob.data = b"test"
|
|
|
- r.object_store.add_object(blob)
|
|
|
+ # Add objects to trigger GC
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = b"test"
|
|
|
+ r.object_store.add_object(blob)
|
|
|
|
|
|
- with patch(
|
|
|
- "dulwich.gc.garbage_collect", side_effect=OSError("GC failed")
|
|
|
- ) as mock_gc:
|
|
|
- result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
+ with patch(
|
|
|
+ "dulwich.gc.garbage_collect", side_effect=OSError("GC failed")
|
|
|
+ ) as mock_gc:
|
|
|
+ result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
|
|
|
- self.assertFalse(result)
|
|
|
- mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
|
|
|
+ self.assertFalse(result)
|
|
|
+ mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
|
|
|
|
|
|
- # Check that error was written to gc.log
|
|
|
- gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
- self.assertTrue(os.path.exists(gc_log_path))
|
|
|
- with open(gc_log_path, "rb") as f:
|
|
|
- content = f.read()
|
|
|
- self.assertIn(b"Auto GC failed: GC failed", content)
|
|
|
+ # Check that error was written to gc.log
|
|
|
+ gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
+ self.assertTrue(os.path.exists(gc_log_path))
|
|
|
+ with open(gc_log_path, "rb") as f:
|
|
|
+ content = f.read()
|
|
|
+ self.assertIn(b"Auto GC failed: GC failed", content)
|
|
|
|
|
|
def test_gc_log_expiry_singular_day(self):
|
|
|
"""Test that gc.logExpiry supports singular '.day' format."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
- config = ConfigDict()
|
|
|
- config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
- config.set(b"gc", b"logExpiry", b"1.day") # Singular form
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ config = ConfigDict()
|
|
|
+ config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
+ config.set(b"gc", b"logExpiry", b"1.day") # Singular form
|
|
|
|
|
|
- # Create gc.log file
|
|
|
- gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
- with open(gc_log_path, "wb") as f:
|
|
|
- f.write(b"Previous GC failed\n")
|
|
|
+ # Create gc.log file
|
|
|
+ gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
+ with open(gc_log_path, "wb") as f:
|
|
|
+ f.write(b"Previous GC failed\n")
|
|
|
|
|
|
- # Make the file 2 days old (older than 1 day expiry)
|
|
|
- old_time = time.time() - (2 * 86400)
|
|
|
- os.utime(gc_log_path, (old_time, old_time))
|
|
|
+ # Make the file 2 days old (older than 1 day expiry)
|
|
|
+ old_time = time.time() - (2 * 86400)
|
|
|
+ os.utime(gc_log_path, (old_time, old_time))
|
|
|
|
|
|
- # Add objects to trigger GC
|
|
|
- blob = Blob()
|
|
|
- blob.data = b"test"
|
|
|
- r.object_store.add_object(blob)
|
|
|
+ # Add objects to trigger GC
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = b"test"
|
|
|
+ r.object_store.add_object(blob)
|
|
|
|
|
|
- with patch("dulwich.gc.garbage_collect") as mock_gc:
|
|
|
- result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
+ with patch("dulwich.gc.garbage_collect") as mock_gc:
|
|
|
+ result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
|
|
|
- self.assertTrue(result)
|
|
|
- mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
|
|
|
+ self.assertTrue(result)
|
|
|
+ mock_gc.assert_called_once_with(r, auto=True, progress=no_op_progress)
|
|
|
|
|
|
def test_gc_log_expiry_invalid_format(self):
|
|
|
"""Test that invalid gc.logExpiry format defaults to 1 day."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
- config = ConfigDict()
|
|
|
- config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
- config.set(b"gc", b"logExpiry", b"invalid") # Invalid format
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ config = ConfigDict()
|
|
|
+ config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
+ config.set(b"gc", b"logExpiry", b"invalid") # Invalid format
|
|
|
|
|
|
- # Create gc.log file
|
|
|
- gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
- with open(gc_log_path, "wb") as f:
|
|
|
- f.write(b"Previous GC failed\n")
|
|
|
+ # Create gc.log file
|
|
|
+ gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
+ with open(gc_log_path, "wb") as f:
|
|
|
+ f.write(b"Previous GC failed\n")
|
|
|
|
|
|
- # Make the file recent (within default 1 day)
|
|
|
- recent_time = time.time() - 3600 # 1 hour ago
|
|
|
- os.utime(gc_log_path, (recent_time, recent_time))
|
|
|
+ # Make the file recent (within default 1 day)
|
|
|
+ recent_time = time.time() - 3600 # 1 hour ago
|
|
|
+ os.utime(gc_log_path, (recent_time, recent_time))
|
|
|
|
|
|
- # Add objects to trigger GC
|
|
|
- blob = Blob()
|
|
|
- blob.data = b"test"
|
|
|
- r.object_store.add_object(blob)
|
|
|
+ # Add objects to trigger GC
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = b"test"
|
|
|
+ r.object_store.add_object(blob)
|
|
|
|
|
|
- # Capture log messages
|
|
|
- import logging
|
|
|
+ # Capture log messages
|
|
|
+ import logging
|
|
|
|
|
|
- with self.assertLogs(level=logging.INFO) as cm:
|
|
|
- result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
+ with self.assertLogs(level=logging.INFO) as cm:
|
|
|
+ result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
|
|
|
- # Should not run GC because gc.log is recent (within default 1 day)
|
|
|
- self.assertFalse(result)
|
|
|
- # Check that gc.log content was logged
|
|
|
- self.assertTrue(any("gc.log content:" in msg for msg in cm.output))
|
|
|
+ # Should not run GC because gc.log is recent (within default 1 day)
|
|
|
+ self.assertFalse(result)
|
|
|
+ # Check that gc.log content was logged
|
|
|
+ self.assertTrue(any("gc.log content:" in msg for msg in cm.output))
|
|
|
|
|
|
def test_maybe_auto_gc_non_disk_repo(self):
|
|
|
"""Test auto GC on non-disk repository (MemoryRepo)."""
|
|
|
r = MemoryRepo()
|
|
|
+ self.addCleanup(r.close)
|
|
|
config = ConfigDict()
|
|
|
config.set(b"gc", b"auto", b"1") # Would trigger if it were disk-based
|
|
|
|
|
|
@@ -752,27 +756,27 @@ class AutoGCTestCase(TestCase):
|
|
|
def test_gc_removes_existing_gc_log_on_success(self):
|
|
|
"""Test that successful GC removes pre-existing gc.log file."""
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
- r = Repo.init(tmpdir)
|
|
|
- config = ConfigDict()
|
|
|
- config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
+ with Repo.init(tmpdir) as r:
|
|
|
+ config = ConfigDict()
|
|
|
+ config.set(b"gc", b"auto", b"1") # Low threshold
|
|
|
|
|
|
- # Create gc.log file from previous failure
|
|
|
- gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
- with open(gc_log_path, "wb") as f:
|
|
|
- f.write(b"Previous GC failed\n")
|
|
|
+ # Create gc.log file from previous failure
|
|
|
+ gc_log_path = os.path.join(r.controldir(), "gc.log")
|
|
|
+ with open(gc_log_path, "wb") as f:
|
|
|
+ f.write(b"Previous GC failed\n")
|
|
|
|
|
|
- # Make it old enough to be expired
|
|
|
- old_time = time.time() - (2 * 86400) # 2 days ago
|
|
|
- os.utime(gc_log_path, (old_time, old_time))
|
|
|
+ # Make it old enough to be expired
|
|
|
+ old_time = time.time() - (2 * 86400) # 2 days ago
|
|
|
+ os.utime(gc_log_path, (old_time, old_time))
|
|
|
|
|
|
- # Add objects to trigger GC
|
|
|
- blob = Blob()
|
|
|
- blob.data = b"test"
|
|
|
- r.object_store.add_object(blob)
|
|
|
+ # Add objects to trigger GC
|
|
|
+ blob = Blob()
|
|
|
+ blob.data = b"test"
|
|
|
+ r.object_store.add_object(blob)
|
|
|
|
|
|
- # Run auto GC
|
|
|
- result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
+ # Run auto GC
|
|
|
+ result = maybe_auto_gc(r, config, progress=no_op_progress)
|
|
|
|
|
|
- self.assertTrue(result)
|
|
|
- # gc.log should be removed after successful GC
|
|
|
- self.assertFalse(os.path.exists(gc_log_path))
|
|
|
+ self.assertTrue(result)
|
|
|
+ # gc.log should be removed after successful GC
|
|
|
+ self.assertFalse(os.path.exists(gc_log_path))
|