# test_walk.py -- Tests for commit walking functionality. # Copyright (C) 2010 Google, Inc. # # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for commit walking functionality.""" from itertools import permutations from unittest import expectedFailure from dulwich.diff_tree import CHANGE_MODIFY, CHANGE_RENAME, RenameDetector, TreeChange from dulwich.errors import MissingCommitError from dulwich.object_store import MemoryObjectStore from dulwich.objects import Blob, Commit from dulwich.tests.utils import F, build_commit_graph, make_object, make_tag from dulwich.walk import ORDER_TOPO, WalkEntry, Walker, _topo_reorder from . import TestCase class TestWalkEntry: def __init__(self, commit, changes) -> None: self.commit = commit self.changes = changes def __repr__(self) -> str: return f"" def __eq__(self, other): if not isinstance(other, WalkEntry) or self.commit != other.commit: return False if self.changes is None: return True return self.changes == other.changes() class WalkerTest(TestCase): def setUp(self) -> None: super().setUp() self.store = MemoryObjectStore() def make_commits(self, commit_spec, **kwargs): times = kwargs.pop("times", []) attrs = kwargs.pop("attrs", {}) for i, t in enumerate(times): attrs.setdefault(i + 1, {})["commit_time"] = t return build_commit_graph(self.store, commit_spec, attrs=attrs, **kwargs) def make_linear_commits(self, num_commits, **kwargs): commit_spec = [] for i in range(1, num_commits + 1): c = [i] if i > 1: c.append(i - 1) commit_spec.append(c) return self.make_commits(commit_spec, **kwargs) def assertWalkYields(self, expected, *args, **kwargs) -> None: walker = Walker(self.store, *args, **kwargs) expected = list(expected) for i, entry in enumerate(expected): if isinstance(entry, Commit): expected[i] = TestWalkEntry(entry, None) actual = list(walker) self.assertEqual(expected, actual) def test_tag(self) -> None: c1, c2, c3 = self.make_linear_commits(3) t2 = make_tag(target=c2) self.store.add_object(t2) self.assertWalkYields([c2, c1], [t2.id]) def test_linear(self) -> None: c1, c2, c3 = self.make_linear_commits(3) self.assertWalkYields([c1], [c1.id]) self.assertWalkYields([c2, c1], [c2.id]) self.assertWalkYields([c3, c2, c1], [c3.id]) self.assertWalkYields([c3, c2, c1], [c3.id, c1.id]) self.assertWalkYields([c3, c2], [c3.id], exclude=[c1.id]) self.assertWalkYields([c3, c2], [c3.id, c1.id], exclude=[c1.id]) self.assertWalkYields([c3], [c3.id, c1.id], exclude=[c2.id]) def test_missing(self) -> None: cs = list(reversed(self.make_linear_commits(20))) self.assertWalkYields(cs, [cs[0].id]) # Exactly how close we can get to a missing commit depends on our # implementation (in particular the choice of _MAX_EXTRA_COMMITS), but # we should at least be able to walk some history in a broken repo. del self.store[cs[-1].id] for i in range(1, 11): self.assertWalkYields(cs[:i], [cs[0].id], max_entries=i) self.assertRaises(MissingCommitError, Walker, self.store, [cs[-1].id]) def test_branch(self) -> None: c1, x2, x3, y4 = self.make_commits([[1], [2, 1], [3, 2], [4, 1]]) self.assertWalkYields([x3, x2, c1], [x3.id]) self.assertWalkYields([y4, c1], [y4.id]) self.assertWalkYields([y4, x2, c1], [y4.id, x2.id]) self.assertWalkYields([y4, x2], [y4.id, x2.id], exclude=[c1.id]) self.assertWalkYields([y4, x3], [y4.id, x3.id], exclude=[x2.id]) self.assertWalkYields([y4], [y4.id], exclude=[x3.id]) self.assertWalkYields([x3, x2], [x3.id], exclude=[y4.id]) def test_merge(self) -> None: c1, c2, c3, c4 = self.make_commits([[1], [2, 1], [3, 1], [4, 2, 3]]) self.assertWalkYields([c4, c3, c2, c1], [c4.id]) self.assertWalkYields([c3, c1], [c3.id]) self.assertWalkYields([c2, c1], [c2.id]) self.assertWalkYields([c4, c3], [c4.id], exclude=[c2.id]) self.assertWalkYields([c4, c2], [c4.id], exclude=[c3.id]) def test_merge_of_new_branch_from_old_base(self) -> None: # The commit on the branch was made at a time after any of the # commits on master, but the branch was from an older commit. # See also test_merge_of_old_branch self.maxDiff = None c1, c2, c3, c4, c5 = self.make_commits( [[1], [2, 1], [3, 2], [4, 1], [5, 3, 4]], times=[1, 2, 3, 4, 5], ) self.assertWalkYields([c5, c4, c3, c2, c1], [c5.id]) self.assertWalkYields([c3, c2, c1], [c3.id]) self.assertWalkYields([c2, c1], [c2.id]) @expectedFailure def test_merge_of_old_branch(self) -> None: # The commit on the branch was made at a time before any of # the commits on master, but it was merged into master after # those commits. # See also test_merge_of_new_branch_from_old_base self.maxDiff = None c1, c2, c3, c4, c5 = self.make_commits( [[1], [2, 1], [3, 2], [4, 1], [5, 3, 4]], times=[1, 3, 4, 2, 5], ) self.assertWalkYields([c5, c4, c3, c2, c1], [c5.id]) self.assertWalkYields([c3, c2, c1], [c3.id]) self.assertWalkYields([c2, c1], [c2.id]) def test_reverse(self) -> None: c1, c2, c3 = self.make_linear_commits(3) self.assertWalkYields([c1, c2, c3], [c3.id], reverse=True) def test_max_entries(self) -> None: c1, c2, c3 = self.make_linear_commits(3) self.assertWalkYields([c3, c2, c1], [c3.id], max_entries=3) self.assertWalkYields([c3, c2], [c3.id], max_entries=2) self.assertWalkYields([c3], [c3.id], max_entries=1) def test_reverse_after_max_entries(self) -> None: c1, c2, c3 = self.make_linear_commits(3) self.assertWalkYields([c1, c2, c3], [c3.id], max_entries=3, reverse=True) self.assertWalkYields([c2, c3], [c3.id], max_entries=2, reverse=True) self.assertWalkYields([c3], [c3.id], max_entries=1, reverse=True) def test_changes_one_parent(self) -> None: blob_a1 = make_object(Blob, data=b"a1") blob_a2 = make_object(Blob, data=b"a2") blob_b2 = make_object(Blob, data=b"b2") c1, c2 = self.make_linear_commits( 2, trees={ 1: [(b"a", blob_a1)], 2: [(b"a", blob_a2), (b"b", blob_b2)], }, ) e1 = TestWalkEntry(c1, [TreeChange.add((b"a", F, blob_a1.id))]) e2 = TestWalkEntry( c2, [ TreeChange(CHANGE_MODIFY, (b"a", F, blob_a1.id), (b"a", F, blob_a2.id)), TreeChange.add((b"b", F, blob_b2.id)), ], ) self.assertWalkYields([e2, e1], [c2.id]) def test_changes_multiple_parents(self) -> None: blob_a1 = make_object(Blob, data=b"a1") blob_b2 = make_object(Blob, data=b"b2") blob_a3 = make_object(Blob, data=b"a3") c1, c2, c3 = self.make_commits( [[1], [2], [3, 1, 2]], trees={ 1: [(b"a", blob_a1)], 2: [(b"b", blob_b2)], 3: [(b"a", blob_a3), (b"b", blob_b2)], }, ) # a is a modify/add conflict and b is not conflicted. changes = [ [ TreeChange(CHANGE_MODIFY, (b"a", F, blob_a1.id), (b"a", F, blob_a3.id)), TreeChange.add((b"a", F, blob_a3.id)), ] ] self.assertWalkYields( [TestWalkEntry(c3, changes)], [c3.id], exclude=[c1.id, c2.id] ) def test_path_matches(self) -> None: walker = Walker(None, [], paths=[b"foo", b"bar", b"baz/quux"]) self.assertTrue(walker._path_matches(b"foo")) self.assertTrue(walker._path_matches(b"foo/a")) self.assertTrue(walker._path_matches(b"foo/a/b")) self.assertTrue(walker._path_matches(b"bar")) self.assertTrue(walker._path_matches(b"baz/quux")) self.assertTrue(walker._path_matches(b"baz/quux/a")) self.assertFalse(walker._path_matches(None)) self.assertFalse(walker._path_matches(b"oops")) self.assertFalse(walker._path_matches(b"fool")) self.assertFalse(walker._path_matches(b"baz")) self.assertFalse(walker._path_matches(b"baz/quu")) def test_paths(self) -> None: blob_a1 = make_object(Blob, data=b"a1") blob_b2 = make_object(Blob, data=b"b2") blob_a3 = make_object(Blob, data=b"a3") blob_b3 = make_object(Blob, data=b"b3") c1, c2, c3 = self.make_linear_commits( 3, trees={ 1: [(b"a", blob_a1)], 2: [(b"a", blob_a1), (b"x/b", blob_b2)], 3: [(b"a", blob_a3), (b"x/b", blob_b3)], }, ) self.assertWalkYields([c3, c2, c1], [c3.id]) self.assertWalkYields([c3, c1], [c3.id], paths=[b"a"]) self.assertWalkYields([c3, c2], [c3.id], paths=[b"x/b"]) # All changes are included, not just for requested paths. changes = [ TreeChange(CHANGE_MODIFY, (b"a", F, blob_a1.id), (b"a", F, blob_a3.id)), TreeChange(CHANGE_MODIFY, (b"x/b", F, blob_b2.id), (b"x/b", F, blob_b3.id)), ] self.assertWalkYields( [TestWalkEntry(c3, changes)], [c3.id], max_entries=1, paths=[b"a"] ) def test_paths_subtree(self) -> None: blob_a = make_object(Blob, data=b"a") blob_b = make_object(Blob, data=b"b") c1, c2, c3 = self.make_linear_commits( 3, trees={ 1: [(b"x/a", blob_a)], 2: [(b"b", blob_b), (b"x/a", blob_a)], 3: [(b"b", blob_b), (b"x/a", blob_a), (b"x/b", blob_b)], }, ) self.assertWalkYields([c2], [c3.id], paths=[b"b"]) self.assertWalkYields([c3, c1], [c3.id], paths=[b"x"]) def test_paths_max_entries(self) -> None: blob_a = make_object(Blob, data=b"a") blob_b = make_object(Blob, data=b"b") c1, c2 = self.make_linear_commits( 2, trees={1: [(b"a", blob_a)], 2: [(b"a", blob_a), (b"b", blob_b)]} ) self.assertWalkYields([c2], [c2.id], paths=[b"b"], max_entries=1) self.assertWalkYields([c1], [c1.id], paths=[b"a"], max_entries=1) def test_paths_merge(self) -> None: blob_a1 = make_object(Blob, data=b"a1") blob_a2 = make_object(Blob, data=b"a2") blob_a3 = make_object(Blob, data=b"a3") x1, y2, m3, m4 = self.make_commits( [[1], [2], [3, 1, 2], [4, 1, 2]], trees={ 1: [(b"a", blob_a1)], 2: [(b"a", blob_a2)], 3: [(b"a", blob_a3)], 4: [(b"a", blob_a1)], }, ) # Non-conflicting self.assertWalkYields([m3, y2, x1], [m3.id], paths=[b"a"]) self.assertWalkYields([y2, x1], [m4.id], paths=[b"a"]) def test_changes_with_renames(self) -> None: blob = make_object(Blob, data=b"blob") c1, c2 = self.make_linear_commits( 2, trees={1: [(b"a", blob)], 2: [(b"b", blob)]} ) entry_a = (b"a", F, blob.id) entry_b = (b"b", F, blob.id) changes_without_renames = [ TreeChange.delete(entry_a), TreeChange.add(entry_b), ] changes_with_renames = [TreeChange(CHANGE_RENAME, entry_a, entry_b)] self.assertWalkYields( [TestWalkEntry(c2, changes_without_renames)], [c2.id], max_entries=1, ) detector = RenameDetector(self.store) self.assertWalkYields( [TestWalkEntry(c2, changes_with_renames)], [c2.id], max_entries=1, rename_detector=detector, ) def test_follow_rename(self) -> None: blob = make_object(Blob, data=b"blob") names = [b"a", b"a", b"b", b"b", b"c", b"c"] trees = {i + 1: [(n, blob, F)] for i, n in enumerate(names)} c1, c2, c3, c4, c5, c6 = self.make_linear_commits(6, trees=trees) self.assertWalkYields([c5], [c6.id], paths=[b"c"]) def e(n): return (n, F, blob.id) self.assertWalkYields( [ TestWalkEntry(c5, [TreeChange(CHANGE_RENAME, e(b"b"), e(b"c"))]), TestWalkEntry(c3, [TreeChange(CHANGE_RENAME, e(b"a"), e(b"b"))]), TestWalkEntry(c1, [TreeChange.add(e(b"a"))]), ], [c6.id], paths=[b"c"], follow=True, ) def test_follow_rename_remove_path(self) -> None: blob = make_object(Blob, data=b"blob") _, _, _, c4, c5, c6 = self.make_linear_commits( 6, trees={ 1: [(b"a", blob), (b"c", blob)], 2: [], 3: [], 4: [(b"b", blob)], 5: [(b"a", blob)], 6: [(b"c", blob)], }, ) def e(n): return (n, F, blob.id) # Once the path changes to b, we aren't interested in a or c anymore. self.assertWalkYields( [ TestWalkEntry(c6, [TreeChange(CHANGE_RENAME, e(b"a"), e(b"c"))]), TestWalkEntry(c5, [TreeChange(CHANGE_RENAME, e(b"b"), e(b"a"))]), TestWalkEntry(c4, [TreeChange.add(e(b"b"))]), ], [c6.id], paths=[b"c"], follow=True, ) def test_since(self) -> None: c1, c2, c3 = self.make_linear_commits(3) self.assertWalkYields([c3, c2, c1], [c3.id], since=-1) self.assertWalkYields([c3, c2, c1], [c3.id], since=0) self.assertWalkYields([c3, c2], [c3.id], since=1) self.assertWalkYields([c3, c2], [c3.id], since=99) self.assertWalkYields([c3, c2], [c3.id], since=100) self.assertWalkYields([c3], [c3.id], since=101) self.assertWalkYields([c3], [c3.id], since=199) self.assertWalkYields([c3], [c3.id], since=200) self.assertWalkYields([], [c3.id], since=201) self.assertWalkYields([], [c3.id], since=300) def test_until(self) -> None: c1, c2, c3 = self.make_linear_commits(3) self.assertWalkYields([], [c3.id], until=-1) self.assertWalkYields([c1], [c3.id], until=0) self.assertWalkYields([c1], [c3.id], until=1) self.assertWalkYields([c1], [c3.id], until=99) self.assertWalkYields([c2, c1], [c3.id], until=100) self.assertWalkYields([c2, c1], [c3.id], until=101) self.assertWalkYields([c2, c1], [c3.id], until=199) self.assertWalkYields([c3, c2, c1], [c3.id], until=200) self.assertWalkYields([c3, c2, c1], [c3.id], until=201) self.assertWalkYields([c3, c2, c1], [c3.id], until=300) def test_since_until(self) -> None: c1, c2, c3 = self.make_linear_commits(3) self.assertWalkYields([], [c3.id], since=100, until=99) self.assertWalkYields([c3, c2, c1], [c3.id], since=-1, until=201) self.assertWalkYields([c2], [c3.id], since=100, until=100) self.assertWalkYields([c2], [c3.id], since=50, until=150) def test_since_over_scan(self) -> None: commits = self.make_linear_commits(11, times=[9, 0, 1, 2, 3, 4, 5, 8, 6, 7, 9]) c8, _, c10, c11 = commits[-4:] del self.store[commits[0].id] # c9 is older than we want to walk, but is out of order with its # parent, so we need to walk past it to get to c8. # c1 would also match, but we've deleted it, and it should get pruned # even with over-scanning. self.assertWalkYields([c11, c10, c8], [c11.id], since=7) def assertTopoOrderEqual(self, expected_commits, commits) -> None: entries = [TestWalkEntry(c, None) for c in commits] actual_ids = [e.commit.id for e in list(_topo_reorder(entries))] self.assertEqual([c.id for c in expected_commits], actual_ids) def test_topo_reorder_linear(self) -> None: commits = self.make_linear_commits(5) commits.reverse() for perm in permutations(commits): self.assertTopoOrderEqual(commits, perm) def test_topo_reorder_multiple_parents(self) -> None: c1, c2, c3 = self.make_commits([[1], [2], [3, 1, 2]]) # Already sorted, so totally FIFO. self.assertTopoOrderEqual([c3, c2, c1], [c3, c2, c1]) self.assertTopoOrderEqual([c3, c1, c2], [c3, c1, c2]) # c3 causes one parent to be yielded. self.assertTopoOrderEqual([c3, c2, c1], [c2, c3, c1]) self.assertTopoOrderEqual([c3, c1, c2], [c1, c3, c2]) # c3 causes both parents to be yielded. self.assertTopoOrderEqual([c3, c2, c1], [c1, c2, c3]) self.assertTopoOrderEqual([c3, c2, c1], [c2, c1, c3]) def test_topo_reorder_multiple_children(self) -> None: c1, c2, c3 = self.make_commits([[1], [2, 1], [3, 1]]) # c2 and c3 are FIFO but c1 moves to the end. self.assertTopoOrderEqual([c3, c2, c1], [c3, c2, c1]) self.assertTopoOrderEqual([c3, c2, c1], [c3, c1, c2]) self.assertTopoOrderEqual([c3, c2, c1], [c1, c3, c2]) self.assertTopoOrderEqual([c2, c3, c1], [c2, c3, c1]) self.assertTopoOrderEqual([c2, c3, c1], [c2, c1, c3]) self.assertTopoOrderEqual([c2, c3, c1], [c1, c2, c3]) def test_out_of_order_children(self) -> None: c1, c2, c3, c4, c5 = self.make_commits( [[1], [2, 1], [3, 2], [4, 1], [5, 3, 4]], times=[2, 1, 3, 4, 5] ) self.assertWalkYields([c5, c4, c3, c1, c2], [c5.id]) self.assertWalkYields([c5, c4, c3, c2, c1], [c5.id], order=ORDER_TOPO) def test_out_of_order_with_exclude(self) -> None: # Create the following graph: # c1-------x2---m6 # \ / # \-y3--y4-/--y5 # Due to skew, y5 is the oldest commit. c1, x2, y3, y4, y5, m6 = self.make_commits( [[1], [2, 1], [3, 1], [4, 3], [5, 4], [6, 2, 4]], times=[2, 3, 4, 5, 1, 6], ) self.assertWalkYields([m6, y4, y3, x2, c1], [m6.id]) # Ensure that c1..y4 get excluded even though they're popped from the # priority queue long before y5. self.assertWalkYields([m6, x2], [m6.id], exclude=[y5.id]) def test_empty_walk(self) -> None: c1, c2, c3 = self.make_linear_commits(3) self.assertWalkYields([], [c3.id], exclude=[c3.id]) class WalkEntryTest(TestCase): def setUp(self) -> None: super().setUp() self.store = MemoryObjectStore() def make_commits(self, commit_spec, **kwargs): times = kwargs.pop("times", []) attrs = kwargs.pop("attrs", {}) for i, t in enumerate(times): attrs.setdefault(i + 1, {})["commit_time"] = t return build_commit_graph(self.store, commit_spec, attrs=attrs, **kwargs) def make_linear_commits(self, num_commits, **kwargs): commit_spec = [] for i in range(1, num_commits + 1): c = [i] if i > 1: c.append(i - 1) commit_spec.append(c) return self.make_commits(commit_spec, **kwargs) def test_all_changes(self) -> None: # Construct a commit with 2 files in different subdirectories. blob_a = make_object(Blob, data=b"a") blob_b = make_object(Blob, data=b"b") c1 = self.make_linear_commits( 1, trees={1: [(b"x/a", blob_a), (b"y/b", blob_b)]}, )[0] # Get the WalkEntry for the commit. walker = Walker(self.store, c1.id) walker_entry = next(iter(walker)) changes = walker_entry.changes() # Compare the changes with the expected values. entry_a = (b"x/a", F, blob_a.id) entry_b = (b"y/b", F, blob_b.id) self.assertEqual( [TreeChange.add(entry_a), TreeChange.add(entry_b)], changes, ) def test_all_with_merge(self) -> None: blob_a = make_object(Blob, data=b"a") blob_a2 = make_object(Blob, data=b"a2") blob_b = make_object(Blob, data=b"b") blob_b2 = make_object(Blob, data=b"b2") x1, y2, m3 = self.make_commits( [[1], [2], [3, 1, 2]], trees={ 1: [(b"x/a", blob_a)], 2: [(b"y/b", blob_b)], 3: [(b"x/a", blob_a2), (b"y/b", blob_b2)], }, ) # Get the WalkEntry for the merge commit. walker = Walker(self.store, m3.id) entries = list(walker) walker_entry = entries[0] self.assertEqual(walker_entry.commit.id, m3.id) changes = walker_entry.changes() self.assertEqual(2, len(changes)) entry_a = (b"x/a", F, blob_a.id) entry_a2 = (b"x/a", F, blob_a2.id) entry_b = (b"y/b", F, blob_b.id) entry_b2 = (b"y/b", F, blob_b2.id) self.assertEqual( [ [ TreeChange(CHANGE_MODIFY, entry_a, entry_a2), TreeChange.add(entry_a2), ], [ TreeChange.add(entry_b2), TreeChange(CHANGE_MODIFY, entry_b, entry_b2), ], ], changes, ) def test_filter_changes(self) -> None: # Construct a commit with 2 files in different subdirectories. blob_a = make_object(Blob, data=b"a") blob_b = make_object(Blob, data=b"b") c1 = self.make_linear_commits( 1, trees={1: [(b"x/a", blob_a), (b"y/b", blob_b)]}, )[0] # Get the WalkEntry for the commit. walker = Walker(self.store, c1.id) walker_entry = next(iter(walker)) changes = walker_entry.changes(path_prefix=b"x") # Compare the changes with the expected values. entry_a = (b"a", F, blob_a.id) self.assertEqual( [TreeChange.add(entry_a)], changes, ) def test_filter_with_merge(self) -> None: blob_a = make_object(Blob, data=b"a") blob_a2 = make_object(Blob, data=b"a2") blob_b = make_object(Blob, data=b"b") blob_b2 = make_object(Blob, data=b"b2") x1, y2, m3 = self.make_commits( [[1], [2], [3, 1, 2]], trees={ 1: [(b"x/a", blob_a)], 2: [(b"y/b", blob_b)], 3: [(b"x/a", blob_a2), (b"y/b", blob_b2)], }, ) # Get the WalkEntry for the merge commit. walker = Walker(self.store, m3.id) entries = list(walker) walker_entry = entries[0] self.assertEqual(walker_entry.commit.id, m3.id) changes = walker_entry.changes(b"x") self.assertEqual(1, len(changes)) entry_a = (b"a", F, blob_a.id) entry_a2 = (b"a", F, blob_a2.id) self.assertEqual( [[TreeChange(CHANGE_MODIFY, entry_a, entry_a2)]], changes, )