| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- # test_reflog.py -- tests for reflog.py
- # Copyright (C) 2015 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Tests for dulwich.reflog."""
- import tempfile
- from io import BytesIO
- from dulwich.objects import ZERO_SHA, Blob, Commit, Tree
- from dulwich.reflog import (
- drop_reflog_entry,
- expire_reflog,
- format_reflog_line,
- iter_reflogs,
- parse_reflog_line,
- parse_reflog_spec,
- read_reflog,
- )
- from dulwich.repo import Repo
- from . import TestCase
- class ReflogSpecTests(TestCase):
- def test_parse_reflog_spec_basic(self) -> None:
- # Test basic reflog spec
- ref, index = parse_reflog_spec("HEAD@{1}")
- self.assertEqual(b"HEAD", ref)
- self.assertEqual(1, index)
- def test_parse_reflog_spec_with_full_ref(self) -> None:
- # Test with full ref name
- ref, index = parse_reflog_spec("refs/heads/master@{5}")
- self.assertEqual(b"refs/heads/master", ref)
- self.assertEqual(5, index)
- def test_parse_reflog_spec_bytes(self) -> None:
- # Test with bytes input
- ref, index = parse_reflog_spec(b"develop@{0}")
- self.assertEqual(b"develop", ref)
- self.assertEqual(0, index)
- def test_parse_reflog_spec_no_ref(self) -> None:
- # Test with no ref (defaults to HEAD)
- ref, index = parse_reflog_spec("@{2}")
- self.assertEqual(b"HEAD", ref)
- self.assertEqual(2, index)
- def test_parse_reflog_spec_invalid_no_brace(self) -> None:
- # Test invalid spec without @{
- with self.assertRaises(ValueError) as cm:
- parse_reflog_spec("HEAD")
- self.assertIn("Expected format: ref@{n}", str(cm.exception))
- def test_parse_reflog_spec_invalid_no_closing_brace(self) -> None:
- # Test invalid spec without closing brace
- with self.assertRaises(ValueError) as cm:
- parse_reflog_spec("HEAD@{1")
- self.assertIn("Expected format: ref@{n}", str(cm.exception))
- def test_parse_reflog_spec_invalid_non_numeric(self) -> None:
- # Test invalid spec with non-numeric index
- with self.assertRaises(ValueError) as cm:
- parse_reflog_spec("HEAD@{foo}")
- self.assertIn("Expected integer", str(cm.exception))
- class ReflogLineTests(TestCase):
- def test_format(self) -> None:
- self.assertEqual(
- b"0000000000000000000000000000000000000000 "
- b"49030649db3dfec5a9bc03e5dde4255a14499f16 Jelmer Vernooij "
- b"<jelmer@jelmer.uk> 1446552482 +0000 "
- b"clone: from git://jelmer.uk/samba",
- format_reflog_line(
- b"0000000000000000000000000000000000000000",
- b"49030649db3dfec5a9bc03e5dde4255a14499f16",
- b"Jelmer Vernooij <jelmer@jelmer.uk>",
- 1446552482,
- 0,
- b"clone: from git://jelmer.uk/samba",
- ),
- )
- self.assertEqual(
- b"0000000000000000000000000000000000000000 "
- b"49030649db3dfec5a9bc03e5dde4255a14499f16 Jelmer Vernooij "
- b"<jelmer@jelmer.uk> 1446552482 +0000 "
- b"clone: from git://jelmer.uk/samba",
- format_reflog_line(
- None,
- b"49030649db3dfec5a9bc03e5dde4255a14499f16",
- b"Jelmer Vernooij <jelmer@jelmer.uk>",
- 1446552482,
- 0,
- b"clone: from git://jelmer.uk/samba",
- ),
- )
- def test_parse(self) -> None:
- reflog_line = (
- b"0000000000000000000000000000000000000000 "
- b"49030649db3dfec5a9bc03e5dde4255a14499f16 Jelmer Vernooij "
- b"<jelmer@jelmer.uk> 1446552482 +0000 "
- b"clone: from git://jelmer.uk/samba"
- )
- self.assertEqual(
- (
- b"0000000000000000000000000000000000000000",
- b"49030649db3dfec5a9bc03e5dde4255a14499f16",
- b"Jelmer Vernooij <jelmer@jelmer.uk>",
- 1446552482,
- 0,
- b"clone: from git://jelmer.uk/samba",
- ),
- parse_reflog_line(reflog_line),
- )
- _TEST_REFLOG = (
- b"0000000000000000000000000000000000000000 "
- b"49030649db3dfec5a9bc03e5dde4255a14499f16 Jelmer Vernooij "
- b"<jelmer@jelmer.uk> 1446552482 +0000 "
- b"clone: from git://jelmer.uk/samba\n"
- b"49030649db3dfec5a9bc03e5dde4255a14499f16 "
- b"42d06bd4b77fed026b154d16493e5deab78f02ec Jelmer Vernooij "
- b"<jelmer@jelmer.uk> 1446552483 +0000 "
- b"clone: from git://jelmer.uk/samba\n"
- b"42d06bd4b77fed026b154d16493e5deab78f02ec "
- b"df6800012397fb85c56e7418dd4eb9405dee075c Jelmer Vernooij "
- b"<jelmer@jelmer.uk> 1446552484 +0000 "
- b"clone: from git://jelmer.uk/samba\n"
- )
- class ReflogDropTests(TestCase):
- def setUp(self) -> None:
- TestCase.setUp(self)
- self.f = BytesIO(_TEST_REFLOG)
- self.original_log = list(read_reflog(self.f))
- self.f.seek(0)
- def _read_log(self):
- self.f.seek(0)
- return list(read_reflog(self.f))
- def test_invalid(self) -> None:
- self.assertRaises(ValueError, drop_reflog_entry, self.f, -1)
- def test_drop_entry(self) -> None:
- drop_reflog_entry(self.f, 0)
- log = self._read_log()
- self.assertEqual(len(log), 2)
- self.assertEqual(self.original_log[0:2], log)
- self.f.seek(0)
- drop_reflog_entry(self.f, 1)
- log = self._read_log()
- self.assertEqual(len(log), 1)
- self.assertEqual(self.original_log[1], log[0])
- def test_drop_entry_with_rewrite(self) -> None:
- drop_reflog_entry(self.f, 1, True)
- log = self._read_log()
- self.assertEqual(len(log), 2)
- self.assertEqual(self.original_log[0], log[0])
- self.assertEqual(self.original_log[0].new_sha, log[1].old_sha)
- self.assertEqual(self.original_log[2].new_sha, log[1].new_sha)
- self.f.seek(0)
- drop_reflog_entry(self.f, 1, True)
- log = self._read_log()
- self.assertEqual(len(log), 1)
- self.assertEqual(ZERO_SHA, log[0].old_sha)
- self.assertEqual(self.original_log[2].new_sha, log[0].new_sha)
- class RepoReflogTests(TestCase):
- def setUp(self) -> None:
- TestCase.setUp(self)
- self.test_dir = tempfile.mkdtemp()
- self.repo = Repo.init(self.test_dir)
- def tearDown(self) -> None:
- TestCase.tearDown(self)
- import shutil
- shutil.rmtree(self.test_dir)
- def test_read_reflog_nonexistent(self) -> None:
- # Reading a reflog that doesn't exist should return empty
- entries = list(self.repo.read_reflog(b"refs/heads/nonexistent"))
- self.assertEqual([], entries)
- def test_read_reflog_head(self) -> None:
- # Create a commit to generate a reflog entry
- blob = Blob.from_string(b"test content")
- self.repo.object_store.add_object(blob)
- tree = Tree()
- tree.add(b"test", 0o100644, blob.id)
- self.repo.object_store.add_object(tree)
- commit = Commit()
- commit.tree = tree.id
- commit.author = b"Test Author <test@example.com>"
- commit.committer = b"Test Author <test@example.com>"
- commit.commit_time = 1234567890
- commit.commit_timezone = 0
- commit.author_time = 1234567890
- commit.author_timezone = 0
- commit.message = b"Initial commit"
- self.repo.object_store.add_object(commit)
- # Manually write a reflog entry
- self.repo._write_reflog(
- b"HEAD",
- ZERO_SHA,
- commit.id,
- b"Test Author <test@example.com>",
- 1234567890,
- 0,
- b"commit (initial): Initial commit",
- )
- # Read the reflog
- entries = list(self.repo.read_reflog(b"HEAD"))
- self.assertEqual(1, len(entries))
- self.assertEqual(ZERO_SHA, entries[0].old_sha)
- self.assertEqual(commit.id, entries[0].new_sha)
- self.assertEqual(b"Test Author <test@example.com>", entries[0].committer)
- self.assertEqual(1234567890, entries[0].timestamp)
- self.assertEqual(0, entries[0].timezone)
- self.assertEqual(b"commit (initial): Initial commit", entries[0].message)
- def test_iter_reflogs(self) -> None:
- # Create commits and reflog entries
- blob = Blob.from_string(b"test content")
- self.repo.object_store.add_object(blob)
- tree = Tree()
- tree.add(b"test", 0o100644, blob.id)
- self.repo.object_store.add_object(tree)
- commit = Commit()
- commit.tree = tree.id
- commit.author = b"Test Author <test@example.com>"
- commit.committer = b"Test Author <test@example.com>"
- commit.commit_time = 1234567890
- commit.commit_timezone = 0
- commit.author_time = 1234567890
- commit.author_timezone = 0
- commit.message = b"Initial commit"
- self.repo.object_store.add_object(commit)
- # Write reflog entries for multiple refs
- self.repo._write_reflog(
- b"HEAD",
- ZERO_SHA,
- commit.id,
- b"Test Author <test@example.com>",
- 1234567890,
- 0,
- b"commit (initial): Initial commit",
- )
- self.repo._write_reflog(
- b"refs/heads/master",
- ZERO_SHA,
- commit.id,
- b"Test Author <test@example.com>",
- 1234567891,
- 0,
- b"branch: Created from HEAD",
- )
- self.repo._write_reflog(
- b"refs/heads/develop",
- ZERO_SHA,
- commit.id,
- b"Test Author <test@example.com>",
- 1234567892,
- 0,
- b"branch: Created from HEAD",
- )
- # Use iter_reflogs to get all reflogs
- import os
- logs_dir = os.path.join(self.repo.controldir(), "logs")
- reflogs = list(iter_reflogs(logs_dir))
- # Should have at least HEAD, refs/heads/master, and refs/heads/develop
- self.assertIn(b"HEAD", reflogs)
- self.assertIn(b"refs/heads/master", reflogs)
- self.assertIn(b"refs/heads/develop", reflogs)
- class ReflogExpireTests(TestCase):
- def setUp(self) -> None:
- TestCase.setUp(self)
- # Create a reflog with entries at different timestamps
- self.f = BytesIO()
- # Old entry (timestamp: 1000000000)
- self.f.write(
- b"0000000000000000000000000000000000000000 "
- b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa "
- b"Test <test@example.com> 1000000000 +0000\told entry\n"
- )
- # Medium entry (timestamp: 1500000000)
- self.f.write(
- b"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa "
- b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb "
- b"Test <test@example.com> 1500000000 +0000\tmedium entry\n"
- )
- # Recent entry (timestamp: 2000000000)
- self.f.write(
- b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb "
- b"cccccccccccccccccccccccccccccccccccccccc "
- b"Test <test@example.com> 2000000000 +0000\trecent entry\n"
- )
- self.f.seek(0)
- def _read_log(self):
- self.f.seek(0)
- return list(read_reflog(self.f))
- def test_expire_no_criteria(self) -> None:
- # If no expiration criteria, nothing should be expired
- count = expire_reflog(self.f)
- self.assertEqual(0, count)
- log = self._read_log()
- self.assertEqual(3, len(log))
- def test_expire_by_time(self) -> None:
- # Expire entries older than timestamp 1600000000
- # Should remove the first two entries
- count = expire_reflog(self.f, expire_time=1600000000)
- self.assertEqual(2, count)
- log = self._read_log()
- self.assertEqual(1, len(log))
- self.assertEqual(b"recent entry", log[0].message)
- def test_expire_unreachable(self) -> None:
- # Test expiring unreachable entries
- # Mark the middle entry as unreachable
- def reachable_checker(sha):
- return sha != b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
- count = expire_reflog(
- self.f,
- expire_unreachable_time=1600000000,
- reachable_checker=reachable_checker,
- )
- self.assertEqual(1, count)
- log = self._read_log()
- self.assertEqual(2, len(log))
- # First and third entries should remain
- self.assertEqual(b"old entry", log[0].message)
- self.assertEqual(b"recent entry", log[1].message)
- def test_expire_mixed(self) -> None:
- # Test with both expire_time and expire_unreachable_time
- def reachable_checker(sha):
- # Only the most recent entry is reachable
- return sha == b"cccccccccccccccccccccccccccccccccccccccc"
- count = expire_reflog(
- self.f,
- expire_time=1800000000, # Would expire first two if reachable
- expire_unreachable_time=1200000000, # Would expire first if unreachable
- reachable_checker=reachable_checker,
- )
- # First entry is unreachable and old enough -> expired
- # Second entry is unreachable but not old enough -> kept
- # Third entry is reachable and recent -> kept
- self.assertEqual(1, count)
- log = self._read_log()
- self.assertEqual(2, len(log))
- self.assertEqual(b"medium entry", log[0].message)
- self.assertEqual(b"recent entry", log[1].message)
- def test_expire_all_entries(self) -> None:
- # Expire all entries
- count = expire_reflog(self.f, expire_time=3000000000)
- self.assertEqual(3, count)
- log = self._read_log()
- self.assertEqual(0, len(log))
|