Selaa lähdekoodia

reflog: add drop_reflog_entry method

Peter Rowlands 4 vuotta sitten
vanhempi
commit
231eca441c
2 muutettua tiedostoa jossa 121 lisäystä ja 0 poistoa
  1. 59 0
      dulwich/reflog.py
  2. 62 0
      dulwich/tests/test_reflog.py

+ 59 - 0
dulwich/reflog.py

@@ -93,3 +93,62 @@ def read_reflog(f):
     """
     for line in f:
         yield parse_reflog_line(line)
+
+
+def drop_reflog_entry(f, index, rewrite=False):
+    """Drop the specified reflog entry.
+
+    Args:
+        f: File-like object
+        index: Reflog entry index (in Git reflog reverse 0-indexed order)
+        rewrite: If a reflog entry's predecessor is removed, set its
+            old SHA to the new SHA of the entry that now precedes it
+    """
+    if index < 0:
+        raise ValueError("Invalid reflog index %d" % index)
+
+    log = []
+    offset = f.tell()
+    for line in f:
+        log.append((offset, parse_reflog_line(line)))
+        offset = f.tell()
+
+    inverse_index = len(log) - index - 1
+    write_offset = log[inverse_index][0]
+    f.seek(write_offset)
+
+    if index == 0:
+        f.truncate()
+        return
+
+    del log[inverse_index]
+    if rewrite and index > 0 and log:
+        if inverse_index == 0:
+            previous_new = ZERO_SHA
+        else:
+            previous_new = log[inverse_index - 1][1].new_sha
+        offset, entry = log[inverse_index]
+        log[inverse_index] = (
+            offset,
+            Entry(
+                previous_new,
+                entry.new_sha,
+                entry.committer,
+                entry.timestamp,
+                entry.timezone,
+                entry.message,
+            ),
+        )
+
+    for _, entry in log[inverse_index:]:
+        f.write(
+            format_reflog_line(
+                entry.old_sha,
+                entry.new_sha,
+                entry.committer,
+                entry.timestamp,
+                entry.timezone,
+                entry.message,
+            )
+        )
+    f.truncate()

+ 62 - 0
dulwich/tests/test_reflog.py

@@ -21,10 +21,14 @@
 
 """Tests for dulwich.reflog."""
 
+from io import BytesIO
 
+from dulwich.objects import ZERO_SHA
 from dulwich.reflog import (
+    drop_reflog_entry,
     format_reflog_line,
     parse_reflog_line,
+    read_reflog,
 )
 
 from dulwich.tests import (
@@ -82,3 +86,61 @@ class ReflogLineTests(TestCase):
             ),
             parse_reflog_line(reflog_line),
         )
+
+
+_TEST_REFLOG = (
+    b"0000000000000000000000000000000000000000 "
+    b"49030649db3dfec5a9bc03e5dde4255a14499f16 Jelmer Vernooij "
+    b"<jelmer@jelmer.uk> 1446552482 +0000	"
+    b"clone: from git://jelmer.uk/samba\n"
+    b"49030649db3dfec5a9bc03e5dde4255a14499f16 "
+    b"42d06bd4b77fed026b154d16493e5deab78f02ec Jelmer Vernooij "
+    b"<jelmer@jelmer.uk> 1446552483 +0000	"
+    b"clone: from git://jelmer.uk/samba\n"
+    b"42d06bd4b77fed026b154d16493e5deab78f02ec "
+    b"df6800012397fb85c56e7418dd4eb9405dee075c Jelmer Vernooij "
+    b"<jelmer@jelmer.uk> 1446552484 +0000	"
+    b"clone: from git://jelmer.uk/samba\n"
+)
+
+
+class ReflogDropTests(TestCase):
+    def setUp(self):
+        TestCase.setUp(self)
+        self.f = BytesIO(_TEST_REFLOG)
+        self.original_log = list(read_reflog(self.f))
+        self.f.seek(0)
+
+    def _read_log(self):
+        self.f.seek(0)
+        return list(read_reflog(self.f))
+
+    def test_invalid(self):
+        self.assertRaises(ValueError, drop_reflog_entry, self.f, -1)
+
+    def test_drop_entry(self):
+        drop_reflog_entry(self.f, 0)
+        log = self._read_log()
+        self.assertEqual(len(log), 2)
+        self.assertEqual(self.original_log[0:2], log)
+
+        self.f.seek(0)
+        drop_reflog_entry(self.f, 1)
+        log = self._read_log()
+        self.assertEqual(len(log), 1)
+        self.assertEqual(self.original_log[1], log[0])
+
+    def test_drop_entry_with_rewrite(self):
+        drop_reflog_entry(self.f, 1, True)
+        log = self._read_log()
+        self.assertEqual(len(log), 2)
+        self.assertEqual(self.original_log[0], log[0])
+        self.assertEqual(self.original_log[0].new_sha, log[1].old_sha)
+        self.assertEqual(self.original_log[2].new_sha, log[1].new_sha)
+
+        self.f.seek(0)
+        drop_reflog_entry(self.f, 1, True)
+        log = self._read_log()
+        self.assertEqual(len(log), 1)
+        self.assertEqual(ZERO_SHA, log[0].old_sha)
+        self.assertEqual(self.original_log[2].new_sha, log[0].new_sha)