test_graph.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. # test_graph.py -- Tests for merge base
  2. # Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. """Tests for dulwich.graph."""
  21. from dulwich.graph import (
  22. WorkList,
  23. _find_lcas,
  24. can_fast_forward,
  25. find_merge_base,
  26. find_octopus_base,
  27. )
  28. from dulwich.repo import MemoryRepo
  29. from dulwich.tests.utils import make_commit
  30. from . import TestCase
  31. class FindMergeBaseTests(TestCase):
  32. @staticmethod
  33. def run_test(dag, inputs):
  34. def lookup_parents(commit_id):
  35. return dag[commit_id]
  36. def lookup_stamp(commit_id) -> int:
  37. # any constant timestamp value here will work to force
  38. # this test to test the same behaviour as done previously
  39. return 100
  40. c1 = inputs[0]
  41. c2s = inputs[1:]
  42. return set(_find_lcas(lookup_parents, c1, c2s, lookup_stamp))
  43. def test_multiple_lca(self) -> None:
  44. # two lowest common ancestors
  45. graph = {
  46. "5": ["1", "2"],
  47. "4": ["3", "1"],
  48. "3": ["2"],
  49. "2": ["0"],
  50. "1": [],
  51. "0": [],
  52. }
  53. self.assertEqual(self.run_test(graph, ["4", "5"]), {"1", "2"})
  54. def test_no_common_ancestor(self) -> None:
  55. # no common ancestor
  56. graph = {
  57. "4": ["2"],
  58. "3": ["1"],
  59. "2": [],
  60. "1": ["0"],
  61. "0": [],
  62. }
  63. self.assertEqual(self.run_test(graph, ["4", "3"]), set())
  64. def test_ancestor(self) -> None:
  65. # ancestor
  66. graph = {
  67. "G": ["D", "F"],
  68. "F": ["E"],
  69. "D": ["C"],
  70. "C": ["B"],
  71. "E": ["B"],
  72. "B": ["A"],
  73. "A": [],
  74. }
  75. self.assertEqual(self.run_test(graph, ["D", "C"]), {"C"})
  76. def test_direct_parent(self) -> None:
  77. # parent
  78. graph = {
  79. "G": ["D", "F"],
  80. "F": ["E"],
  81. "D": ["C"],
  82. "C": ["B"],
  83. "E": ["B"],
  84. "B": ["A"],
  85. "A": [],
  86. }
  87. self.assertEqual(self.run_test(graph, ["G", "D"]), {"D"})
  88. def test_another_crossover(self) -> None:
  89. # Another cross over
  90. graph = {
  91. "G": ["D", "F"],
  92. "F": ["E", "C"],
  93. "D": ["C", "E"],
  94. "C": ["B"],
  95. "E": ["B"],
  96. "B": ["A"],
  97. "A": [],
  98. }
  99. self.assertEqual(self.run_test(graph, ["D", "F"]), {"E", "C"})
  100. def test_three_way_merge_lca(self) -> None:
  101. # three way merge commit straight from git docs
  102. graph = {
  103. "C": ["C1"],
  104. "C1": ["C2"],
  105. "C2": ["C3"],
  106. "C3": ["C4"],
  107. "C4": ["2"],
  108. "B": ["B1"],
  109. "B1": ["B2"],
  110. "B2": ["B3"],
  111. "B3": ["1"],
  112. "A": ["A1"],
  113. "A1": ["A2"],
  114. "A2": ["A3"],
  115. "A3": ["1"],
  116. "1": ["2"],
  117. "2": [],
  118. }
  119. # assumes a theoretical merge M exists that merges B and C first
  120. # which actually means find the first LCA from either of B OR C with A
  121. self.assertEqual(self.run_test(graph, ["A", "B", "C"]), {"1"})
  122. def test_octopus(self) -> None:
  123. # octopus algorithm test
  124. # test straight from git docs of A, B, and C
  125. # but this time use octopus to find lcas of A, B, and C simultaneously
  126. graph = {
  127. "C": ["C1"],
  128. "C1": ["C2"],
  129. "C2": ["C3"],
  130. "C3": ["C4"],
  131. "C4": ["2"],
  132. "B": ["B1"],
  133. "B1": ["B2"],
  134. "B2": ["B3"],
  135. "B3": ["1"],
  136. "A": ["A1"],
  137. "A1": ["A2"],
  138. "A2": ["A3"],
  139. "A3": ["1"],
  140. "1": ["2"],
  141. "2": [],
  142. }
  143. def lookup_parents(cid):
  144. return graph[cid]
  145. def lookup_stamp(commit_id) -> int:
  146. # any constant timestamp value here will work to force
  147. # this test to test the same behaviour as done previously
  148. return 100
  149. lcas = ["A"]
  150. others = ["B", "C"]
  151. for cmt in others:
  152. next_lcas = []
  153. for ca in lcas:
  154. res = _find_lcas(lookup_parents, cmt, [ca], lookup_stamp)
  155. next_lcas.extend(res)
  156. lcas = next_lcas[:]
  157. self.assertEqual(set(lcas), {"2"})
  158. class FindMergeBaseFunctionTests(TestCase):
  159. def test_find_merge_base_empty(self) -> None:
  160. r = MemoryRepo()
  161. # Empty list of commits
  162. self.assertEqual([], find_merge_base(r, []))
  163. def test_find_merge_base_single(self) -> None:
  164. r = MemoryRepo()
  165. base = make_commit()
  166. r.object_store.add_objects([(base, None)])
  167. # Single commit returns itself
  168. self.assertEqual([base.id], find_merge_base(r, [base.id]))
  169. def test_find_merge_base_identical(self) -> None:
  170. r = MemoryRepo()
  171. base = make_commit()
  172. r.object_store.add_objects([(base, None)])
  173. # When the same commit is in both positions
  174. self.assertEqual([base.id], find_merge_base(r, [base.id, base.id]))
  175. def test_find_merge_base_linear(self) -> None:
  176. r = MemoryRepo()
  177. base = make_commit()
  178. c1 = make_commit(parents=[base.id])
  179. c2 = make_commit(parents=[c1.id])
  180. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  181. # Base of c1 and c2 is c1
  182. self.assertEqual([c1.id], find_merge_base(r, [c1.id, c2.id]))
  183. # Base of c2 and c1 is c1
  184. self.assertEqual([c1.id], find_merge_base(r, [c2.id, c1.id]))
  185. def test_find_merge_base_diverged(self) -> None:
  186. r = MemoryRepo()
  187. base = make_commit()
  188. c1 = make_commit(parents=[base.id])
  189. c2a = make_commit(parents=[c1.id], message=b"2a")
  190. c2b = make_commit(parents=[c1.id], message=b"2b")
  191. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  192. # Merge base of two diverged commits is their common parent
  193. self.assertEqual([c1.id], find_merge_base(r, [c2a.id, c2b.id]))
  194. def test_find_merge_base_with_min_stamp(self) -> None:
  195. r = MemoryRepo()
  196. base = make_commit(commit_time=100)
  197. c1 = make_commit(parents=[base.id], commit_time=200)
  198. c2 = make_commit(parents=[c1.id], commit_time=300)
  199. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  200. # Normal merge base finding works
  201. self.assertEqual([c1.id], find_merge_base(r, [c1.id, c2.id]))
  202. def test_find_merge_base_multiple_common_ancestors(self) -> None:
  203. r = MemoryRepo()
  204. base = make_commit(commit_time=100)
  205. c1a = make_commit(parents=[base.id], commit_time=200, message=b"c1a")
  206. c1b = make_commit(parents=[base.id], commit_time=201, message=b"c1b")
  207. c2 = make_commit(parents=[c1a.id, c1b.id], commit_time=300)
  208. c3 = make_commit(parents=[c1a.id, c1b.id], commit_time=301)
  209. r.object_store.add_objects(
  210. [(base, None), (c1a, None), (c1b, None), (c2, None), (c3, None)]
  211. )
  212. # Merge base should include both c1a and c1b since both are common ancestors
  213. bases = find_merge_base(r, [c2.id, c3.id])
  214. self.assertEqual(2, len(bases))
  215. self.assertIn(c1a.id, bases)
  216. self.assertIn(c1b.id, bases)
  217. class FindOctopusBaseTests(TestCase):
  218. def test_find_octopus_base_empty(self) -> None:
  219. r = MemoryRepo()
  220. # Empty list of commits
  221. self.assertEqual([], find_octopus_base(r, []))
  222. def test_find_octopus_base_single(self) -> None:
  223. r = MemoryRepo()
  224. base = make_commit()
  225. r.object_store.add_objects([(base, None)])
  226. # Single commit returns itself
  227. self.assertEqual([base.id], find_octopus_base(r, [base.id]))
  228. def test_find_octopus_base_two_commits(self) -> None:
  229. r = MemoryRepo()
  230. base = make_commit()
  231. c1 = make_commit(parents=[base.id])
  232. c2 = make_commit(parents=[c1.id])
  233. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  234. # With two commits it should call find_merge_base
  235. self.assertEqual([c1.id], find_octopus_base(r, [c1.id, c2.id]))
  236. def test_find_octopus_base_multiple(self) -> None:
  237. r = MemoryRepo()
  238. base = make_commit()
  239. c1 = make_commit(parents=[base.id])
  240. c2a = make_commit(parents=[c1.id], message=b"2a")
  241. c2b = make_commit(parents=[c1.id], message=b"2b")
  242. c2c = make_commit(parents=[c1.id], message=b"2c")
  243. r.object_store.add_objects(
  244. [(base, None), (c1, None), (c2a, None), (c2b, None), (c2c, None)]
  245. )
  246. # Common ancestor of all three branches
  247. self.assertEqual([c1.id], find_octopus_base(r, [c2a.id, c2b.id, c2c.id]))
  248. class CanFastForwardTests(TestCase):
  249. def test_ff(self) -> None:
  250. r = MemoryRepo()
  251. base = make_commit()
  252. c1 = make_commit(parents=[base.id])
  253. c2 = make_commit(parents=[c1.id])
  254. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  255. self.assertTrue(can_fast_forward(r, c1.id, c1.id))
  256. self.assertTrue(can_fast_forward(r, base.id, c1.id))
  257. self.assertTrue(can_fast_forward(r, c1.id, c2.id))
  258. self.assertFalse(can_fast_forward(r, c2.id, c1.id))
  259. def test_diverged(self) -> None:
  260. r = MemoryRepo()
  261. base = make_commit()
  262. c1 = make_commit(parents=[base.id])
  263. c2a = make_commit(parents=[c1.id], message=b"2a")
  264. c2b = make_commit(parents=[c1.id], message=b"2b")
  265. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  266. self.assertTrue(can_fast_forward(r, c1.id, c2a.id))
  267. self.assertTrue(can_fast_forward(r, c1.id, c2b.id))
  268. self.assertFalse(can_fast_forward(r, c2a.id, c2b.id))
  269. self.assertFalse(can_fast_forward(r, c2b.id, c2a.id))
  270. class WorkListTest(TestCase):
  271. def test_WorkList(self) -> None:
  272. # tuples of (timestamp, value) are stored in a Priority MaxQueue
  273. # repeated use of get should return them in maxheap timestamp
  274. # order: largest time value (most recent in time) first then earlier/older
  275. wlst = WorkList()
  276. wlst.add((100, "Test Value 1"))
  277. wlst.add((50, "Test Value 2"))
  278. wlst.add((200, "Test Value 3"))
  279. self.assertEqual(wlst.get(), (200, "Test Value 3"))
  280. self.assertEqual(wlst.get(), (100, "Test Value 1"))
  281. wlst.add((150, "Test Value 4"))
  282. self.assertEqual(wlst.get(), (150, "Test Value 4"))
  283. self.assertEqual(wlst.get(), (50, "Test Value 2"))
  284. def test_WorkList_iter(self) -> None:
  285. # Test the iter method of WorkList
  286. wlst = WorkList()
  287. wlst.add((100, "Value 1"))
  288. wlst.add((200, "Value 2"))
  289. wlst.add((50, "Value 3"))
  290. # Collect all items from iter
  291. items = list(wlst.iter())
  292. # Items should be in their original order, not sorted
  293. self.assertEqual(len(items), 3)
  294. # Check the values are present with correct timestamps
  295. timestamps = [dt for dt, _ in items]
  296. values = [val for _, val in items]
  297. self.assertIn(100, timestamps)
  298. self.assertIn(200, timestamps)
  299. self.assertIn(50, timestamps)
  300. self.assertIn("Value 1", values)
  301. self.assertIn("Value 2", values)
  302. self.assertIn("Value 3", values)
  303. def test_WorkList_empty_get(self) -> None:
  304. # Test getting from an empty WorkList
  305. wlst = WorkList()
  306. with self.assertRaises(IndexError):
  307. wlst.get()
  308. def test_WorkList_empty_iter(self) -> None:
  309. # Test iterating over an empty WorkList
  310. wlst = WorkList()
  311. items = list(wlst.iter())
  312. self.assertEqual([], items)
  313. def test_WorkList_empty_heap(self) -> None:
  314. # The current implementation raises IndexError when the heap is empty
  315. wlst = WorkList()
  316. # Ensure pq is empty
  317. wlst.pq = []
  318. # get should raise IndexError when heap is empty
  319. with self.assertRaises(IndexError):
  320. wlst.get()