2
0

test_graph.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. # test_graph.py -- Tests for merge base
  2. # Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. """Tests for dulwich.graph."""
  21. from dulwich.graph import (
  22. WorkList,
  23. _find_lcas,
  24. can_fast_forward,
  25. find_merge_base,
  26. find_octopus_base,
  27. )
  28. from dulwich.repo import MemoryRepo
  29. from dulwich.tests.utils import make_commit
  30. from . import TestCase
  31. class FindMergeBaseTests(TestCase):
  32. @staticmethod
  33. def run_test(dag, inputs):
  34. def lookup_parents(commit_id):
  35. return dag[commit_id]
  36. def lookup_stamp(commit_id) -> int:
  37. # any constant timestamp value here will work to force
  38. # this test to test the same behaviour as done previously
  39. return 100
  40. c1 = inputs[0]
  41. c2s = inputs[1:]
  42. return set(_find_lcas(lookup_parents, c1, c2s, lookup_stamp))
  43. def test_multiple_lca(self) -> None:
  44. # two lowest common ancestors
  45. graph = {
  46. "5": ["1", "2"],
  47. "4": ["3", "1"],
  48. "3": ["2"],
  49. "2": ["0"],
  50. "1": [],
  51. "0": [],
  52. }
  53. self.assertEqual(self.run_test(graph, ["4", "5"]), {"1", "2"})
  54. def test_no_common_ancestor(self) -> None:
  55. # no common ancestor
  56. graph = {
  57. "4": ["2"],
  58. "3": ["1"],
  59. "2": [],
  60. "1": ["0"],
  61. "0": [],
  62. }
  63. self.assertEqual(self.run_test(graph, ["4", "3"]), set())
  64. def test_ancestor(self) -> None:
  65. # ancestor
  66. graph = {
  67. "G": ["D", "F"],
  68. "F": ["E"],
  69. "D": ["C"],
  70. "C": ["B"],
  71. "E": ["B"],
  72. "B": ["A"],
  73. "A": [],
  74. }
  75. self.assertEqual(self.run_test(graph, ["D", "C"]), {"C"})
  76. def test_direct_parent(self) -> None:
  77. # parent
  78. graph = {
  79. "G": ["D", "F"],
  80. "F": ["E"],
  81. "D": ["C"],
  82. "C": ["B"],
  83. "E": ["B"],
  84. "B": ["A"],
  85. "A": [],
  86. }
  87. self.assertEqual(self.run_test(graph, ["G", "D"]), {"D"})
  88. def test_another_crossover(self) -> None:
  89. # Another cross over
  90. graph = {
  91. "G": ["D", "F"],
  92. "F": ["E", "C"],
  93. "D": ["C", "E"],
  94. "C": ["B"],
  95. "E": ["B"],
  96. "B": ["A"],
  97. "A": [],
  98. }
  99. self.assertEqual(self.run_test(graph, ["D", "F"]), {"E", "C"})
  100. def test_three_way_merge_lca(self) -> None:
  101. # three way merge commit straight from git docs
  102. graph = {
  103. "C": ["C1"],
  104. "C1": ["C2"],
  105. "C2": ["C3"],
  106. "C3": ["C4"],
  107. "C4": ["2"],
  108. "B": ["B1"],
  109. "B1": ["B2"],
  110. "B2": ["B3"],
  111. "B3": ["1"],
  112. "A": ["A1"],
  113. "A1": ["A2"],
  114. "A2": ["A3"],
  115. "A3": ["1"],
  116. "1": ["2"],
  117. "2": [],
  118. }
  119. # assumes a theoretical merge M exists that merges B and C first
  120. # which actually means find the first LCA from either of B OR C with A
  121. self.assertEqual(self.run_test(graph, ["A", "B", "C"]), {"1"})
  122. def test_octopus(self) -> None:
  123. # octopus algorithm test
  124. # test straight from git docs of A, B, and C
  125. # but this time use octopus to find lcas of A, B, and C simultaneously
  126. graph = {
  127. "C": ["C1"],
  128. "C1": ["C2"],
  129. "C2": ["C3"],
  130. "C3": ["C4"],
  131. "C4": ["2"],
  132. "B": ["B1"],
  133. "B1": ["B2"],
  134. "B2": ["B3"],
  135. "B3": ["1"],
  136. "A": ["A1"],
  137. "A1": ["A2"],
  138. "A2": ["A3"],
  139. "A3": ["1"],
  140. "1": ["2"],
  141. "2": [],
  142. }
  143. def lookup_parents(cid):
  144. return graph[cid]
  145. def lookup_stamp(commit_id) -> int:
  146. # any constant timestamp value here will work to force
  147. # this test to test the same behaviour as done previously
  148. return 100
  149. lcas = ["A"]
  150. others = ["B", "C"]
  151. for cmt in others:
  152. next_lcas = []
  153. for ca in lcas:
  154. res = _find_lcas(lookup_parents, cmt, [ca], lookup_stamp)
  155. next_lcas.extend(res)
  156. lcas = next_lcas[:]
  157. self.assertEqual(set(lcas), {"2"})
  158. class FindMergeBaseFunctionTests(TestCase):
  159. def test_find_merge_base_empty(self) -> None:
  160. r = MemoryRepo()
  161. # Empty list of commits
  162. self.assertEqual([], find_merge_base(r, []))
  163. def test_find_merge_base_single(self) -> None:
  164. r = MemoryRepo()
  165. base = make_commit()
  166. r.object_store.add_objects([(base, None)])
  167. # Single commit returns itself
  168. self.assertEqual([base.id], find_merge_base(r, [base.id]))
  169. def test_find_merge_base_identical(self) -> None:
  170. r = MemoryRepo()
  171. base = make_commit()
  172. r.object_store.add_objects([(base, None)])
  173. # When the same commit is in both positions
  174. self.assertEqual([base.id], find_merge_base(r, [base.id, base.id]))
  175. def test_find_merge_base_linear(self) -> None:
  176. r = MemoryRepo()
  177. base = make_commit()
  178. c1 = make_commit(parents=[base.id])
  179. c2 = make_commit(parents=[c1.id])
  180. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  181. # Base of c1 and c2 is c1
  182. self.assertEqual([c1.id], find_merge_base(r, [c1.id, c2.id]))
  183. # Base of c2 and c1 is c1
  184. self.assertEqual([c1.id], find_merge_base(r, [c2.id, c1.id]))
  185. def test_find_merge_base_diverged(self) -> None:
  186. r = MemoryRepo()
  187. base = make_commit()
  188. c1 = make_commit(parents=[base.id])
  189. c2a = make_commit(parents=[c1.id], message=b"2a")
  190. c2b = make_commit(parents=[c1.id], message=b"2b")
  191. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  192. # Merge base of two diverged commits is their common parent
  193. self.assertEqual([c1.id], find_merge_base(r, [c2a.id, c2b.id]))
  194. def test_find_merge_base_with_min_stamp(self) -> None:
  195. r = MemoryRepo()
  196. base = make_commit(commit_time=100)
  197. c1 = make_commit(parents=[base.id], commit_time=200)
  198. c2 = make_commit(parents=[c1.id], commit_time=300)
  199. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  200. # Normal merge base finding works
  201. self.assertEqual([c1.id], find_merge_base(r, [c1.id, c2.id]))
  202. def test_find_merge_base_multiple_common_ancestors(self) -> None:
  203. r = MemoryRepo()
  204. base = make_commit(commit_time=100)
  205. c1a = make_commit(parents=[base.id], commit_time=200, message=b"c1a")
  206. c1b = make_commit(parents=[base.id], commit_time=201, message=b"c1b")
  207. c2 = make_commit(parents=[c1a.id, c1b.id], commit_time=300)
  208. c3 = make_commit(parents=[c1a.id, c1b.id], commit_time=301)
  209. r.object_store.add_objects(
  210. [(base, None), (c1a, None), (c1b, None), (c2, None), (c3, None)]
  211. )
  212. # Merge base should include both c1a and c1b since both are common ancestors
  213. bases = find_merge_base(r, [c2.id, c3.id])
  214. self.assertEqual(2, len(bases))
  215. self.assertIn(c1a.id, bases)
  216. self.assertIn(c1b.id, bases)
  217. class FindOctopusBaseTests(TestCase):
  218. def test_find_octopus_base_empty(self) -> None:
  219. r = MemoryRepo()
  220. # Empty list of commits
  221. self.assertEqual([], find_octopus_base(r, []))
  222. def test_find_octopus_base_single(self) -> None:
  223. r = MemoryRepo()
  224. base = make_commit()
  225. r.object_store.add_objects([(base, None)])
  226. # Single commit returns itself
  227. self.assertEqual([base.id], find_octopus_base(r, [base.id]))
  228. def test_find_octopus_base_two_commits(self) -> None:
  229. r = MemoryRepo()
  230. base = make_commit()
  231. c1 = make_commit(parents=[base.id])
  232. c2 = make_commit(parents=[c1.id])
  233. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  234. # With two commits it should call find_merge_base
  235. self.assertEqual([c1.id], find_octopus_base(r, [c1.id, c2.id]))
  236. def test_find_octopus_base_multiple(self) -> None:
  237. r = MemoryRepo()
  238. base = make_commit()
  239. c1 = make_commit(parents=[base.id])
  240. c2a = make_commit(parents=[c1.id], message=b"2a")
  241. c2b = make_commit(parents=[c1.id], message=b"2b")
  242. c2c = make_commit(parents=[c1.id], message=b"2c")
  243. r.object_store.add_objects(
  244. [(base, None), (c1, None), (c2a, None), (c2b, None), (c2c, None)]
  245. )
  246. # Common ancestor of all three branches
  247. self.assertEqual([c1.id], find_octopus_base(r, [c2a.id, c2b.id, c2c.id]))
  248. class CanFastForwardTests(TestCase):
  249. def test_ff(self) -> None:
  250. r = MemoryRepo()
  251. base = make_commit()
  252. c1 = make_commit(parents=[base.id])
  253. c2 = make_commit(parents=[c1.id])
  254. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  255. self.assertTrue(can_fast_forward(r, c1.id, c1.id))
  256. self.assertTrue(can_fast_forward(r, base.id, c1.id))
  257. self.assertTrue(can_fast_forward(r, c1.id, c2.id))
  258. self.assertFalse(can_fast_forward(r, c2.id, c1.id))
  259. def test_diverged(self) -> None:
  260. r = MemoryRepo()
  261. base = make_commit()
  262. c1 = make_commit(parents=[base.id])
  263. c2a = make_commit(parents=[c1.id], message=b"2a")
  264. c2b = make_commit(parents=[c1.id], message=b"2b")
  265. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  266. self.assertTrue(can_fast_forward(r, c1.id, c2a.id))
  267. self.assertTrue(can_fast_forward(r, c1.id, c2b.id))
  268. self.assertFalse(can_fast_forward(r, c2a.id, c2b.id))
  269. self.assertFalse(can_fast_forward(r, c2b.id, c2a.id))
  270. def test_shallow_repository(self) -> None:
  271. r = MemoryRepo()
  272. # Create a shallow repository structure:
  273. # base (missing) -> c1 -> c2
  274. # We only have c1 and c2, base is missing (shallow boundary at c1)
  275. # Create a fake base commit ID (won't exist in repo)
  276. base_sha = b"1" * 20 # Valid SHA format but doesn't exist (20 bytes)
  277. c1 = make_commit(parents=[base_sha], commit_time=2000)
  278. c2 = make_commit(parents=[c1.id], commit_time=3000)
  279. # Only add c1 and c2 to the repo (base is missing)
  280. r.object_store.add_objects([(c1, None), (c2, None)])
  281. # Mark c1 as shallow using the proper API
  282. r.update_shallow([c1.id], [])
  283. # Should be able to fast-forward from c1 to c2
  284. self.assertTrue(can_fast_forward(r, c1.id, c2.id))
  285. # Should return False when trying to check against missing parent
  286. # (not raise KeyError)
  287. self.assertFalse(can_fast_forward(r, base_sha, c1.id))
  288. self.assertFalse(can_fast_forward(r, base_sha, c2.id))
  289. class FindLCAsTests(TestCase):
  290. """Tests for _find_lcas function with shallow repository support."""
  291. def test_find_lcas_normal(self) -> None:
  292. """Test _find_lcas works normally without shallow commits."""
  293. # Set up a simple repository structure:
  294. # base
  295. # / \
  296. # c1 c2
  297. commits = {
  298. b"base": (1000, []),
  299. b"c1": (2000, [b"base"]),
  300. b"c2": (3000, [b"base"]),
  301. }
  302. def lookup_parents(sha):
  303. return commits[sha][1]
  304. def lookup_stamp(sha):
  305. return commits[sha][0]
  306. # Find LCA of c1 and c2, should be base
  307. lcas = _find_lcas(lookup_parents, b"c1", [b"c2"], lookup_stamp)
  308. self.assertEqual(lcas, [b"base"])
  309. def test_find_lcas_with_shallow_missing_c1(self) -> None:
  310. """Test _find_lcas when c1 doesn't exist in shallow clone."""
  311. # Only have c2 and base, c1 is missing (shallow boundary)
  312. commits = {
  313. b"base": (1000, []),
  314. b"c2": (3000, [b"base"]),
  315. }
  316. shallows = {b"c2"} # c2 is at shallow boundary
  317. def lookup_parents(sha):
  318. return commits[sha][1]
  319. def lookup_stamp(sha):
  320. if sha not in commits:
  321. raise KeyError(sha)
  322. return commits[sha][0]
  323. # c1 doesn't exist, but we have shallow commits
  324. lcas = _find_lcas(
  325. lookup_parents, b"c1", [b"c2"], lookup_stamp, shallows=shallows
  326. )
  327. # Should handle gracefully
  328. self.assertEqual(lcas, [])
  329. def test_find_lcas_with_shallow_missing_parent(self) -> None:
  330. """Test _find_lcas when parent commits are missing in shallow clone."""
  331. # Have c1 and c2, but base is missing
  332. commits = {
  333. b"c1": (2000, [b"base"]), # base doesn't exist
  334. b"c2": (3000, [b"base"]), # base doesn't exist
  335. }
  336. shallows = {b"c1", b"c2"} # Both at shallow boundary
  337. def lookup_parents(sha):
  338. if sha not in commits:
  339. raise KeyError(sha)
  340. return commits[sha][1]
  341. def lookup_stamp(sha):
  342. if sha not in commits:
  343. raise KeyError(sha)
  344. return commits[sha][0]
  345. # Should handle missing parent gracefully
  346. lcas = _find_lcas(
  347. lookup_parents, b"c1", [b"c2"], lookup_stamp, shallows=shallows
  348. )
  349. # Can't find LCA because parents are missing
  350. self.assertEqual(lcas, [])
  351. def test_find_lcas_with_shallow_partial_history(self) -> None:
  352. """Test _find_lcas with partial history in shallow clone."""
  353. # Complex structure where some history is missing:
  354. # base (missing)
  355. # / \
  356. # c1 c2
  357. # | |
  358. # c3 c4
  359. commits = {
  360. b"c1": (2000, [b"base"]), # base missing
  361. b"c2": (2500, [b"base"]), # base missing
  362. b"c3": (3000, [b"c1"]),
  363. b"c4": (3500, [b"c2"]),
  364. }
  365. shallows = {b"c1", b"c2"} # c1 and c2 are at shallow boundary
  366. def lookup_parents(sha):
  367. if sha not in commits:
  368. raise KeyError(sha)
  369. return commits[sha][1]
  370. def lookup_stamp(sha):
  371. if sha not in commits:
  372. raise KeyError(sha)
  373. return commits[sha][0]
  374. # Find LCA of c3 and c4
  375. lcas = _find_lcas(
  376. lookup_parents, b"c3", [b"c4"], lookup_stamp, shallows=shallows
  377. )
  378. # Can't determine LCA because base is missing
  379. self.assertEqual(lcas, [])
  380. def test_find_lcas_without_shallows_raises_keyerror(self) -> None:
  381. """Test _find_lcas raises KeyError when commit missing without shallows."""
  382. commits = {
  383. b"c2": (3000, [b"base"]),
  384. }
  385. def lookup_parents(sha):
  386. return commits[sha][1]
  387. def lookup_stamp(sha):
  388. if sha not in commits:
  389. raise KeyError(sha)
  390. return commits[sha][0]
  391. # Without shallows parameter, should raise KeyError
  392. with self.assertRaises(KeyError):
  393. _find_lcas(lookup_parents, b"c1", [b"c2"], lookup_stamp)
  394. def test_find_lcas_octopus_with_shallow(self) -> None:
  395. """Test _find_lcas with multiple commits in shallow clone."""
  396. # Structure:
  397. # base (missing)
  398. # / | \
  399. # c1 c2 c3
  400. commits = {
  401. b"c1": (2000, [b"base"]),
  402. b"c2": (2100, [b"base"]),
  403. b"c3": (2200, [b"base"]),
  404. }
  405. shallows = {b"c1", b"c2", b"c3"}
  406. def lookup_parents(sha):
  407. if sha not in commits:
  408. raise KeyError(sha)
  409. return commits[sha][1]
  410. def lookup_stamp(sha):
  411. if sha not in commits:
  412. raise KeyError(sha)
  413. return commits[sha][0]
  414. # Find LCA of c1 with c2 and c3
  415. lcas = _find_lcas(
  416. lookup_parents, b"c1", [b"c2", b"c3"], lookup_stamp, shallows=shallows
  417. )
  418. # Can't find LCA because base is missing
  419. self.assertEqual(lcas, [])
  420. class WorkListTest(TestCase):
  421. def test_WorkList(self) -> None:
  422. # tuples of (timestamp, value) are stored in a Priority MaxQueue
  423. # repeated use of get should return them in maxheap timestamp
  424. # order: largest time value (most recent in time) first then earlier/older
  425. wlst = WorkList()
  426. wlst.add((100, "Test Value 1"))
  427. wlst.add((50, "Test Value 2"))
  428. wlst.add((200, "Test Value 3"))
  429. self.assertEqual(wlst.get(), (200, "Test Value 3"))
  430. self.assertEqual(wlst.get(), (100, "Test Value 1"))
  431. wlst.add((150, "Test Value 4"))
  432. self.assertEqual(wlst.get(), (150, "Test Value 4"))
  433. self.assertEqual(wlst.get(), (50, "Test Value 2"))
  434. def test_WorkList_iter(self) -> None:
  435. # Test the iter method of WorkList
  436. wlst = WorkList()
  437. wlst.add((100, "Value 1"))
  438. wlst.add((200, "Value 2"))
  439. wlst.add((50, "Value 3"))
  440. # Collect all items from iter
  441. items = list(wlst.iter())
  442. # Items should be in their original order, not sorted
  443. self.assertEqual(len(items), 3)
  444. # Check the values are present with correct timestamps
  445. timestamps = [dt for dt, _ in items]
  446. values = [val for _, val in items]
  447. self.assertIn(100, timestamps)
  448. self.assertIn(200, timestamps)
  449. self.assertIn(50, timestamps)
  450. self.assertIn("Value 1", values)
  451. self.assertIn("Value 2", values)
  452. self.assertIn("Value 3", values)
  453. def test_WorkList_empty_get(self) -> None:
  454. # Test getting from an empty WorkList
  455. wlst = WorkList()
  456. with self.assertRaises(IndexError):
  457. wlst.get()
  458. def test_WorkList_empty_iter(self) -> None:
  459. # Test iterating over an empty WorkList
  460. wlst = WorkList()
  461. items = list(wlst.iter())
  462. self.assertEqual([], items)
  463. def test_WorkList_empty_heap(self) -> None:
  464. # The current implementation raises IndexError when the heap is empty
  465. wlst = WorkList()
  466. # Ensure pq is empty
  467. wlst.pq = []
  468. # get should raise IndexError when heap is empty
  469. with self.assertRaises(IndexError):
  470. wlst.get()