test_graph.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. # test_graph.py -- Tests for merge base
  2. # Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. """Tests for dulwich.graph."""
  21. from dulwich.graph import (
  22. WorkList,
  23. _find_lcas,
  24. can_fast_forward,
  25. find_merge_base,
  26. find_octopus_base,
  27. )
  28. from dulwich.repo import MemoryRepo
  29. from dulwich.tests.utils import make_commit
  30. from . import TestCase
  31. class FindMergeBaseTests(TestCase):
  32. @staticmethod
  33. def run_test(dag, inputs):
  34. def lookup_parents(commit_id):
  35. return dag[commit_id]
  36. def lookup_stamp(commit_id) -> int:
  37. # any constant timestamp value here will work to force
  38. # this test to test the same behaviour as done previously
  39. return 100
  40. c1 = inputs[0]
  41. c2s = inputs[1:]
  42. return set(_find_lcas(lookup_parents, c1, c2s, lookup_stamp))
  43. def test_multiple_lca(self) -> None:
  44. # two lowest common ancestors
  45. graph = {
  46. "5": ["1", "2"],
  47. "4": ["3", "1"],
  48. "3": ["2"],
  49. "2": ["0"],
  50. "1": [],
  51. "0": [],
  52. }
  53. self.assertEqual(self.run_test(graph, ["4", "5"]), {"1", "2"})
  54. def test_no_common_ancestor(self) -> None:
  55. # no common ancestor
  56. graph = {
  57. "4": ["2"],
  58. "3": ["1"],
  59. "2": [],
  60. "1": ["0"],
  61. "0": [],
  62. }
  63. self.assertEqual(self.run_test(graph, ["4", "3"]), set())
  64. def test_ancestor(self) -> None:
  65. # ancestor
  66. graph = {
  67. "G": ["D", "F"],
  68. "F": ["E"],
  69. "D": ["C"],
  70. "C": ["B"],
  71. "E": ["B"],
  72. "B": ["A"],
  73. "A": [],
  74. }
  75. self.assertEqual(self.run_test(graph, ["D", "C"]), {"C"})
  76. def test_direct_parent(self) -> None:
  77. # parent
  78. graph = {
  79. "G": ["D", "F"],
  80. "F": ["E"],
  81. "D": ["C"],
  82. "C": ["B"],
  83. "E": ["B"],
  84. "B": ["A"],
  85. "A": [],
  86. }
  87. self.assertEqual(self.run_test(graph, ["G", "D"]), {"D"})
  88. def test_another_crossover(self) -> None:
  89. # Another cross over
  90. graph = {
  91. "G": ["D", "F"],
  92. "F": ["E", "C"],
  93. "D": ["C", "E"],
  94. "C": ["B"],
  95. "E": ["B"],
  96. "B": ["A"],
  97. "A": [],
  98. }
  99. self.assertEqual(self.run_test(graph, ["D", "F"]), {"E", "C"})
  100. def test_three_way_merge_lca(self) -> None:
  101. # three way merge commit straight from git docs
  102. graph = {
  103. "C": ["C1"],
  104. "C1": ["C2"],
  105. "C2": ["C3"],
  106. "C3": ["C4"],
  107. "C4": ["2"],
  108. "B": ["B1"],
  109. "B1": ["B2"],
  110. "B2": ["B3"],
  111. "B3": ["1"],
  112. "A": ["A1"],
  113. "A1": ["A2"],
  114. "A2": ["A3"],
  115. "A3": ["1"],
  116. "1": ["2"],
  117. "2": [],
  118. }
  119. # assumes a theoretical merge M exists that merges B and C first
  120. # which actually means find the first LCA from either of B OR C with A
  121. self.assertEqual(self.run_test(graph, ["A", "B", "C"]), {"1"})
  122. def test_octopus(self) -> None:
  123. # octopus algorithm test
  124. # test straight from git docs of A, B, and C
  125. # but this time use octopus to find lcas of A, B, and C simultaneously
  126. graph = {
  127. "C": ["C1"],
  128. "C1": ["C2"],
  129. "C2": ["C3"],
  130. "C3": ["C4"],
  131. "C4": ["2"],
  132. "B": ["B1"],
  133. "B1": ["B2"],
  134. "B2": ["B3"],
  135. "B3": ["1"],
  136. "A": ["A1"],
  137. "A1": ["A2"],
  138. "A2": ["A3"],
  139. "A3": ["1"],
  140. "1": ["2"],
  141. "2": [],
  142. }
  143. def lookup_parents(cid):
  144. return graph[cid]
  145. def lookup_stamp(commit_id) -> int:
  146. # any constant timestamp value here will work to force
  147. # this test to test the same behaviour as done previously
  148. return 100
  149. lcas = ["A"]
  150. others = ["B", "C"]
  151. for cmt in others:
  152. next_lcas = []
  153. for ca in lcas:
  154. res = _find_lcas(lookup_parents, cmt, [ca], lookup_stamp)
  155. next_lcas.extend(res)
  156. lcas = next_lcas[:]
  157. self.assertEqual(set(lcas), {"2"})
  158. class FindMergeBaseFunctionTests(TestCase):
  159. def test_find_merge_base_empty(self) -> None:
  160. r = MemoryRepo()
  161. # Empty list of commits
  162. self.assertEqual([], find_merge_base(r, []))
  163. def test_find_merge_base_single(self) -> None:
  164. r = MemoryRepo()
  165. base = make_commit()
  166. r.object_store.add_objects([(base, None)])
  167. # Single commit returns itself
  168. self.assertEqual([base.id], find_merge_base(r, [base.id]))
  169. def test_find_merge_base_identical(self) -> None:
  170. r = MemoryRepo()
  171. base = make_commit()
  172. r.object_store.add_objects([(base, None)])
  173. # When the same commit is in both positions
  174. self.assertEqual([base.id], find_merge_base(r, [base.id, base.id]))
  175. def test_find_merge_base_linear(self) -> None:
  176. r = MemoryRepo()
  177. base = make_commit()
  178. c1 = make_commit(parents=[base.id])
  179. c2 = make_commit(parents=[c1.id])
  180. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  181. # Base of c1 and c2 is c1
  182. self.assertEqual([c1.id], find_merge_base(r, [c1.id, c2.id]))
  183. # Base of c2 and c1 is c1
  184. self.assertEqual([c1.id], find_merge_base(r, [c2.id, c1.id]))
  185. def test_find_merge_base_diverged(self) -> None:
  186. r = MemoryRepo()
  187. base = make_commit()
  188. c1 = make_commit(parents=[base.id])
  189. c2a = make_commit(parents=[c1.id], message=b"2a")
  190. c2b = make_commit(parents=[c1.id], message=b"2b")
  191. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  192. # Merge base of two diverged commits is their common parent
  193. self.assertEqual([c1.id], find_merge_base(r, [c2a.id, c2b.id]))
  194. class FindOctopusBaseTests(TestCase):
  195. def test_find_octopus_base_empty(self) -> None:
  196. r = MemoryRepo()
  197. # Empty list of commits
  198. self.assertEqual([], find_octopus_base(r, []))
  199. def test_find_octopus_base_single(self) -> None:
  200. r = MemoryRepo()
  201. base = make_commit()
  202. r.object_store.add_objects([(base, None)])
  203. # Single commit returns itself
  204. self.assertEqual([base.id], find_octopus_base(r, [base.id]))
  205. def test_find_octopus_base_two_commits(self) -> None:
  206. r = MemoryRepo()
  207. base = make_commit()
  208. c1 = make_commit(parents=[base.id])
  209. c2 = make_commit(parents=[c1.id])
  210. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  211. # With two commits it should call find_merge_base
  212. self.assertEqual([c1.id], find_octopus_base(r, [c1.id, c2.id]))
  213. def test_find_octopus_base_multiple(self) -> None:
  214. r = MemoryRepo()
  215. base = make_commit()
  216. c1 = make_commit(parents=[base.id])
  217. c2a = make_commit(parents=[c1.id], message=b"2a")
  218. c2b = make_commit(parents=[c1.id], message=b"2b")
  219. c2c = make_commit(parents=[c1.id], message=b"2c")
  220. r.object_store.add_objects(
  221. [(base, None), (c1, None), (c2a, None), (c2b, None), (c2c, None)]
  222. )
  223. # Common ancestor of all three branches
  224. self.assertEqual([c1.id], find_octopus_base(r, [c2a.id, c2b.id, c2c.id]))
  225. class CanFastForwardTests(TestCase):
  226. def test_ff(self) -> None:
  227. r = MemoryRepo()
  228. base = make_commit()
  229. c1 = make_commit(parents=[base.id])
  230. c2 = make_commit(parents=[c1.id])
  231. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  232. self.assertTrue(can_fast_forward(r, c1.id, c1.id))
  233. self.assertTrue(can_fast_forward(r, base.id, c1.id))
  234. self.assertTrue(can_fast_forward(r, c1.id, c2.id))
  235. self.assertFalse(can_fast_forward(r, c2.id, c1.id))
  236. def test_diverged(self) -> None:
  237. r = MemoryRepo()
  238. base = make_commit()
  239. c1 = make_commit(parents=[base.id])
  240. c2a = make_commit(parents=[c1.id], message=b"2a")
  241. c2b = make_commit(parents=[c1.id], message=b"2b")
  242. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  243. self.assertTrue(can_fast_forward(r, c1.id, c2a.id))
  244. self.assertTrue(can_fast_forward(r, c1.id, c2b.id))
  245. self.assertFalse(can_fast_forward(r, c2a.id, c2b.id))
  246. self.assertFalse(can_fast_forward(r, c2b.id, c2a.id))
  247. class WorkListTest(TestCase):
  248. def test_WorkList(self) -> None:
  249. # tuples of (timestamp, value) are stored in a Priority MaxQueue
  250. # repeated use of get should return them in maxheap timestamp
  251. # order: largest time value (most recent in time) first then earlier/older
  252. wlst = WorkList()
  253. wlst.add((100, "Test Value 1"))
  254. wlst.add((50, "Test Value 2"))
  255. wlst.add((200, "Test Value 3"))
  256. self.assertEqual(wlst.get(), (200, "Test Value 3"))
  257. self.assertEqual(wlst.get(), (100, "Test Value 1"))
  258. wlst.add((150, "Test Value 4"))
  259. self.assertEqual(wlst.get(), (150, "Test Value 4"))
  260. self.assertEqual(wlst.get(), (50, "Test Value 2"))
  261. def test_WorkList_iter(self) -> None:
  262. # Test the iter method of WorkList
  263. wlst = WorkList()
  264. wlst.add((100, "Value 1"))
  265. wlst.add((200, "Value 2"))
  266. wlst.add((50, "Value 3"))
  267. # Collect all items from iter
  268. items = list(wlst.iter())
  269. # Items should be in their original order, not sorted
  270. self.assertEqual(len(items), 3)
  271. # Check the values are present with correct timestamps
  272. timestamps = [dt for dt, _ in items]
  273. values = [val for _, val in items]
  274. self.assertIn(100, timestamps)
  275. self.assertIn(200, timestamps)
  276. self.assertIn(50, timestamps)
  277. self.assertIn("Value 1", values)
  278. self.assertIn("Value 2", values)
  279. self.assertIn("Value 3", values)
  280. def test_WorkList_empty_get(self) -> None:
  281. # Test getting from an empty WorkList
  282. wlst = WorkList()
  283. with self.assertRaises(IndexError):
  284. wlst.get()