# test_graph.py -- Tests for merge base # Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada # # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. """Tests for dulwich.graph.""" from dulwich.graph import WorkList, _find_lcas, can_fast_forward from dulwich.repo import MemoryRepo from dulwich.tests.utils import make_commit from . import TestCase class FindMergeBaseTests(TestCase): @staticmethod def run_test(dag, inputs): def lookup_parents(commit_id): return dag[commit_id] def lookup_stamp(commit_id) -> int: # any constant timestamp value here will work to force # this test to test the same behaviour as done previously return 100 c1 = inputs[0] c2s = inputs[1:] return set(_find_lcas(lookup_parents, c1, c2s, lookup_stamp)) def test_multiple_lca(self) -> None: # two lowest common ancestors graph = { "5": ["1", "2"], "4": ["3", "1"], "3": ["2"], "2": ["0"], "1": [], "0": [], } self.assertEqual(self.run_test(graph, ["4", "5"]), {"1", "2"}) def test_no_common_ancestor(self) -> None: # no common ancestor graph = { "4": ["2"], "3": ["1"], "2": [], "1": ["0"], "0": [], } self.assertEqual(self.run_test(graph, ["4", "3"]), set()) def test_ancestor(self) -> None: # ancestor graph = { "G": ["D", "F"], "F": ["E"], "D": ["C"], "C": ["B"], "E": ["B"], "B": ["A"], "A": [], } self.assertEqual(self.run_test(graph, ["D", "C"]), {"C"}) def test_direct_parent(self) -> None: # parent graph = { "G": ["D", "F"], "F": ["E"], "D": ["C"], "C": ["B"], "E": ["B"], "B": ["A"], "A": [], } self.assertEqual(self.run_test(graph, ["G", "D"]), {"D"}) def test_another_crossover(self) -> None: # Another cross over graph = { "G": ["D", "F"], "F": ["E", "C"], "D": ["C", "E"], "C": ["B"], "E": ["B"], "B": ["A"], "A": [], } self.assertEqual(self.run_test(graph, ["D", "F"]), {"E", "C"}) def test_three_way_merge_lca(self) -> None: # three way merge commit straight from git docs graph = { "C": ["C1"], "C1": ["C2"], "C2": ["C3"], "C3": ["C4"], "C4": ["2"], "B": ["B1"], "B1": ["B2"], "B2": ["B3"], "B3": ["1"], "A": ["A1"], "A1": ["A2"], "A2": ["A3"], "A3": ["1"], "1": ["2"], "2": [], } # assumes a theoretical merge M exists that merges B and C first # which actually means find the first LCA from either of B OR C with A self.assertEqual(self.run_test(graph, ["A", "B", "C"]), {"1"}) def test_octopus(self) -> None: # octopus algorithm test # test straight from git docs of A, B, and C # but this time use octopus to find lcas of A, B, and C simultaneously graph = { "C": ["C1"], "C1": ["C2"], "C2": ["C3"], "C3": ["C4"], "C4": ["2"], "B": ["B1"], "B1": ["B2"], "B2": ["B3"], "B3": ["1"], "A": ["A1"], "A1": ["A2"], "A2": ["A3"], "A3": ["1"], "1": ["2"], "2": [], } def lookup_parents(cid): return graph[cid] def lookup_stamp(commit_id) -> int: # any constant timestamp value here will work to force # this test to test the same behaviour as done previously return 100 lcas = ["A"] others = ["B", "C"] for cmt in others: next_lcas = [] for ca in lcas: res = _find_lcas(lookup_parents, cmt, [ca], lookup_stamp) next_lcas.extend(res) lcas = next_lcas[:] self.assertEqual(set(lcas), {"2"}) class CanFastForwardTests(TestCase): def test_ff(self) -> None: r = MemoryRepo() base = make_commit() c1 = make_commit(parents=[base.id]) c2 = make_commit(parents=[c1.id]) r.object_store.add_objects([(base, None), (c1, None), (c2, None)]) self.assertTrue(can_fast_forward(r, c1.id, c1.id)) self.assertTrue(can_fast_forward(r, base.id, c1.id)) self.assertTrue(can_fast_forward(r, c1.id, c2.id)) self.assertFalse(can_fast_forward(r, c2.id, c1.id)) def test_diverged(self) -> None: r = MemoryRepo() base = make_commit() c1 = make_commit(parents=[base.id]) c2a = make_commit(parents=[c1.id], message=b"2a") c2b = make_commit(parents=[c1.id], message=b"2b") r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)]) self.assertTrue(can_fast_forward(r, c1.id, c2a.id)) self.assertTrue(can_fast_forward(r, c1.id, c2b.id)) self.assertFalse(can_fast_forward(r, c2a.id, c2b.id)) self.assertFalse(can_fast_forward(r, c2b.id, c2a.id)) class WorkListTest(TestCase): def test_WorkList(self) -> None: # tuples of (timestamp, value) are stored in a Priority MaxQueue # repeated use of get should return them in maxheap timestamp # order: largest time value (most recent in time) first then earlier/older wlst = WorkList() wlst.add((100, "Test Value 1")) wlst.add((50, "Test Value 2")) wlst.add((200, "Test Value 3")) self.assertEqual(wlst.get(), (200, "Test Value 3")) self.assertEqual(wlst.get(), (100, "Test Value 1")) wlst.add((150, "Test Value 4")) self.assertEqual(wlst.get(), (150, "Test Value 4")) self.assertEqual(wlst.get(), (50, "Test Value 2"))