test_graph.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. # test_graph.py -- Tests for merge base
  2. # Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. """Tests for dulwich.graph."""
  21. from dulwich.graph import (
  22. WorkList,
  23. _find_lcas,
  24. can_fast_forward,
  25. find_merge_base,
  26. find_octopus_base,
  27. independent,
  28. )
  29. from dulwich.repo import MemoryRepo
  30. from dulwich.tests.utils import make_commit
  31. from . import TestCase
  32. class FindMergeBaseTests(TestCase):
  33. @staticmethod
  34. def run_test(dag, inputs):
  35. def lookup_parents(commit_id):
  36. return dag[commit_id]
  37. def lookup_stamp(commit_id) -> int:
  38. # any constant timestamp value here will work to force
  39. # this test to test the same behaviour as done previously
  40. return 100
  41. c1 = inputs[0]
  42. c2s = inputs[1:]
  43. return set(_find_lcas(lookup_parents, c1, c2s, lookup_stamp))
  44. def test_multiple_lca(self) -> None:
  45. # two lowest common ancestors
  46. graph = {
  47. "5": ["1", "2"],
  48. "4": ["3", "1"],
  49. "3": ["2"],
  50. "2": ["0"],
  51. "1": [],
  52. "0": [],
  53. }
  54. self.assertEqual(self.run_test(graph, ["4", "5"]), {"1", "2"})
  55. def test_no_common_ancestor(self) -> None:
  56. # no common ancestor
  57. graph = {
  58. "4": ["2"],
  59. "3": ["1"],
  60. "2": [],
  61. "1": ["0"],
  62. "0": [],
  63. }
  64. self.assertEqual(self.run_test(graph, ["4", "3"]), set())
  65. def test_ancestor(self) -> None:
  66. # ancestor
  67. graph = {
  68. "G": ["D", "F"],
  69. "F": ["E"],
  70. "D": ["C"],
  71. "C": ["B"],
  72. "E": ["B"],
  73. "B": ["A"],
  74. "A": [],
  75. }
  76. self.assertEqual(self.run_test(graph, ["D", "C"]), {"C"})
  77. def test_direct_parent(self) -> None:
  78. # parent
  79. graph = {
  80. "G": ["D", "F"],
  81. "F": ["E"],
  82. "D": ["C"],
  83. "C": ["B"],
  84. "E": ["B"],
  85. "B": ["A"],
  86. "A": [],
  87. }
  88. self.assertEqual(self.run_test(graph, ["G", "D"]), {"D"})
  89. def test_another_crossover(self) -> None:
  90. # Another cross over
  91. graph = {
  92. "G": ["D", "F"],
  93. "F": ["E", "C"],
  94. "D": ["C", "E"],
  95. "C": ["B"],
  96. "E": ["B"],
  97. "B": ["A"],
  98. "A": [],
  99. }
  100. self.assertEqual(self.run_test(graph, ["D", "F"]), {"E", "C"})
  101. def test_three_way_merge_lca(self) -> None:
  102. # three way merge commit straight from git docs
  103. graph = {
  104. "C": ["C1"],
  105. "C1": ["C2"],
  106. "C2": ["C3"],
  107. "C3": ["C4"],
  108. "C4": ["2"],
  109. "B": ["B1"],
  110. "B1": ["B2"],
  111. "B2": ["B3"],
  112. "B3": ["1"],
  113. "A": ["A1"],
  114. "A1": ["A2"],
  115. "A2": ["A3"],
  116. "A3": ["1"],
  117. "1": ["2"],
  118. "2": [],
  119. }
  120. # assumes a theoretical merge M exists that merges B and C first
  121. # which actually means find the first LCA from either of B OR C with A
  122. self.assertEqual(self.run_test(graph, ["A", "B", "C"]), {"1"})
  123. def test_octopus(self) -> None:
  124. # octopus algorithm test
  125. # test straight from git docs of A, B, and C
  126. # but this time use octopus to find lcas of A, B, and C simultaneously
  127. graph = {
  128. "C": ["C1"],
  129. "C1": ["C2"],
  130. "C2": ["C3"],
  131. "C3": ["C4"],
  132. "C4": ["2"],
  133. "B": ["B1"],
  134. "B1": ["B2"],
  135. "B2": ["B3"],
  136. "B3": ["1"],
  137. "A": ["A1"],
  138. "A1": ["A2"],
  139. "A2": ["A3"],
  140. "A3": ["1"],
  141. "1": ["2"],
  142. "2": [],
  143. }
  144. def lookup_parents(cid):
  145. return graph[cid]
  146. def lookup_stamp(commit_id) -> int:
  147. # any constant timestamp value here will work to force
  148. # this test to test the same behaviour as done previously
  149. return 100
  150. lcas = ["A"]
  151. others = ["B", "C"]
  152. for cmt in others:
  153. next_lcas = []
  154. for ca in lcas:
  155. res = _find_lcas(lookup_parents, cmt, [ca], lookup_stamp)
  156. next_lcas.extend(res)
  157. lcas = next_lcas[:]
  158. self.assertEqual(set(lcas), {"2"})
  159. class FindMergeBaseFunctionTests(TestCase):
  160. def test_find_merge_base_empty(self) -> None:
  161. r = MemoryRepo()
  162. self.addCleanup(r.close)
  163. # Empty list of commits
  164. self.assertEqual([], find_merge_base(r, []))
  165. def test_find_merge_base_single(self) -> None:
  166. r = MemoryRepo()
  167. self.addCleanup(r.close)
  168. base = make_commit()
  169. r.object_store.add_objects([(base, None)])
  170. # Single commit returns itself
  171. self.assertEqual([base.id], find_merge_base(r, [base.id]))
  172. def test_find_merge_base_identical(self) -> None:
  173. r = MemoryRepo()
  174. self.addCleanup(r.close)
  175. base = make_commit()
  176. r.object_store.add_objects([(base, None)])
  177. # When the same commit is in both positions
  178. self.assertEqual([base.id], find_merge_base(r, [base.id, base.id]))
  179. def test_find_merge_base_linear(self) -> None:
  180. r = MemoryRepo()
  181. self.addCleanup(r.close)
  182. base = make_commit()
  183. c1 = make_commit(parents=[base.id])
  184. c2 = make_commit(parents=[c1.id])
  185. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  186. # Base of c1 and c2 is c1
  187. self.assertEqual([c1.id], find_merge_base(r, [c1.id, c2.id]))
  188. # Base of c2 and c1 is c1
  189. self.assertEqual([c1.id], find_merge_base(r, [c2.id, c1.id]))
  190. def test_find_merge_base_diverged(self) -> None:
  191. r = MemoryRepo()
  192. self.addCleanup(r.close)
  193. base = make_commit()
  194. c1 = make_commit(parents=[base.id])
  195. c2a = make_commit(parents=[c1.id], message=b"2a")
  196. c2b = make_commit(parents=[c1.id], message=b"2b")
  197. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  198. # Merge base of two diverged commits is their common parent
  199. self.assertEqual([c1.id], find_merge_base(r, [c2a.id, c2b.id]))
  200. def test_find_merge_base_with_min_stamp(self) -> None:
  201. r = MemoryRepo()
  202. self.addCleanup(r.close)
  203. base = make_commit(commit_time=100)
  204. c1 = make_commit(parents=[base.id], commit_time=200)
  205. c2 = make_commit(parents=[c1.id], commit_time=300)
  206. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  207. # Normal merge base finding works
  208. self.assertEqual([c1.id], find_merge_base(r, [c1.id, c2.id]))
  209. def test_find_merge_base_multiple_common_ancestors(self) -> None:
  210. r = MemoryRepo()
  211. self.addCleanup(r.close)
  212. base = make_commit(commit_time=100)
  213. c1a = make_commit(parents=[base.id], commit_time=200, message=b"c1a")
  214. c1b = make_commit(parents=[base.id], commit_time=201, message=b"c1b")
  215. c2 = make_commit(parents=[c1a.id, c1b.id], commit_time=300)
  216. c3 = make_commit(parents=[c1a.id, c1b.id], commit_time=301)
  217. r.object_store.add_objects(
  218. [(base, None), (c1a, None), (c1b, None), (c2, None), (c3, None)]
  219. )
  220. # Merge base should include both c1a and c1b since both are common ancestors
  221. bases = find_merge_base(r, [c2.id, c3.id])
  222. self.assertEqual(2, len(bases))
  223. self.assertIn(c1a.id, bases)
  224. self.assertIn(c1b.id, bases)
  225. class FindOctopusBaseTests(TestCase):
  226. def test_find_octopus_base_empty(self) -> None:
  227. r = MemoryRepo()
  228. self.addCleanup(r.close)
  229. # Empty list of commits
  230. self.assertEqual([], find_octopus_base(r, []))
  231. def test_find_octopus_base_single(self) -> None:
  232. r = MemoryRepo()
  233. self.addCleanup(r.close)
  234. base = make_commit()
  235. r.object_store.add_objects([(base, None)])
  236. # Single commit returns itself
  237. self.assertEqual([base.id], find_octopus_base(r, [base.id]))
  238. def test_find_octopus_base_two_commits(self) -> None:
  239. r = MemoryRepo()
  240. self.addCleanup(r.close)
  241. base = make_commit()
  242. c1 = make_commit(parents=[base.id])
  243. c2 = make_commit(parents=[c1.id])
  244. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  245. # With two commits it should call find_merge_base
  246. self.assertEqual([c1.id], find_octopus_base(r, [c1.id, c2.id]))
  247. def test_find_octopus_base_multiple(self) -> None:
  248. r = MemoryRepo()
  249. self.addCleanup(r.close)
  250. base = make_commit()
  251. c1 = make_commit(parents=[base.id])
  252. c2a = make_commit(parents=[c1.id], message=b"2a")
  253. c2b = make_commit(parents=[c1.id], message=b"2b")
  254. c2c = make_commit(parents=[c1.id], message=b"2c")
  255. r.object_store.add_objects(
  256. [(base, None), (c1, None), (c2a, None), (c2b, None), (c2c, None)]
  257. )
  258. # Common ancestor of all three branches
  259. self.assertEqual([c1.id], find_octopus_base(r, [c2a.id, c2b.id, c2c.id]))
  260. class CanFastForwardTests(TestCase):
  261. def test_ff(self) -> None:
  262. r = MemoryRepo()
  263. self.addCleanup(r.close)
  264. base = make_commit()
  265. c1 = make_commit(parents=[base.id])
  266. c2 = make_commit(parents=[c1.id])
  267. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  268. self.assertTrue(can_fast_forward(r, c1.id, c1.id))
  269. self.assertTrue(can_fast_forward(r, base.id, c1.id))
  270. self.assertTrue(can_fast_forward(r, c1.id, c2.id))
  271. self.assertFalse(can_fast_forward(r, c2.id, c1.id))
  272. def test_diverged(self) -> None:
  273. r = MemoryRepo()
  274. self.addCleanup(r.close)
  275. base = make_commit()
  276. c1 = make_commit(parents=[base.id])
  277. c2a = make_commit(parents=[c1.id], message=b"2a")
  278. c2b = make_commit(parents=[c1.id], message=b"2b")
  279. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  280. self.assertTrue(can_fast_forward(r, c1.id, c2a.id))
  281. self.assertTrue(can_fast_forward(r, c1.id, c2b.id))
  282. self.assertFalse(can_fast_forward(r, c2a.id, c2b.id))
  283. self.assertFalse(can_fast_forward(r, c2b.id, c2a.id))
  284. def test_shallow_repository(self) -> None:
  285. r = MemoryRepo()
  286. self.addCleanup(r.close)
  287. # Create a shallow repository structure:
  288. # base (missing) -> c1 -> c2
  289. # We only have c1 and c2, base is missing (shallow boundary at c1)
  290. # Create a fake base commit ID (won't exist in repo)
  291. base_sha = b"1" * 20 # Valid SHA format but doesn't exist (20 bytes)
  292. c1 = make_commit(parents=[base_sha], commit_time=2000)
  293. c2 = make_commit(parents=[c1.id], commit_time=3000)
  294. # Only add c1 and c2 to the repo (base is missing)
  295. r.object_store.add_objects([(c1, None), (c2, None)])
  296. # Mark c1 as shallow using the proper API
  297. r.update_shallow([c1.id], [])
  298. # Should be able to fast-forward from c1 to c2
  299. self.assertTrue(can_fast_forward(r, c1.id, c2.id))
  300. # Should return False when trying to check against missing parent
  301. # (not raise KeyError)
  302. self.assertFalse(can_fast_forward(r, base_sha, c1.id))
  303. self.assertFalse(can_fast_forward(r, base_sha, c2.id))
  304. class FindLCAsTests(TestCase):
  305. """Tests for _find_lcas function with shallow repository support."""
  306. def test_find_lcas_normal(self) -> None:
  307. """Test _find_lcas works normally without shallow commits."""
  308. # Set up a simple repository structure:
  309. # base
  310. # / \
  311. # c1 c2
  312. commits = {
  313. b"base": (1000, []),
  314. b"c1": (2000, [b"base"]),
  315. b"c2": (3000, [b"base"]),
  316. }
  317. def lookup_parents(sha):
  318. return commits[sha][1]
  319. def lookup_stamp(sha):
  320. return commits[sha][0]
  321. # Find LCA of c1 and c2, should be base
  322. lcas = _find_lcas(lookup_parents, b"c1", [b"c2"], lookup_stamp)
  323. self.assertEqual(lcas, [b"base"])
  324. def test_find_lcas_with_shallow_missing_c1(self) -> None:
  325. """Test _find_lcas when c1 doesn't exist in shallow clone."""
  326. # Only have c2 and base, c1 is missing (shallow boundary)
  327. commits = {
  328. b"base": (1000, []),
  329. b"c2": (3000, [b"base"]),
  330. }
  331. shallows = {b"c2"} # c2 is at shallow boundary
  332. def lookup_parents(sha):
  333. return commits[sha][1]
  334. def lookup_stamp(sha):
  335. if sha not in commits:
  336. raise KeyError(sha)
  337. return commits[sha][0]
  338. # c1 doesn't exist, but we have shallow commits
  339. lcas = _find_lcas(
  340. lookup_parents, b"c1", [b"c2"], lookup_stamp, shallows=shallows
  341. )
  342. # Should handle gracefully
  343. self.assertEqual(lcas, [])
  344. def test_find_lcas_with_shallow_missing_parent(self) -> None:
  345. """Test _find_lcas when parent commits are missing in shallow clone."""
  346. # Have c1 and c2, but base is missing
  347. commits = {
  348. b"c1": (2000, [b"base"]), # base doesn't exist
  349. b"c2": (3000, [b"base"]), # base doesn't exist
  350. }
  351. shallows = {b"c1", b"c2"} # Both at shallow boundary
  352. def lookup_parents(sha):
  353. if sha not in commits:
  354. raise KeyError(sha)
  355. return commits[sha][1]
  356. def lookup_stamp(sha):
  357. if sha not in commits:
  358. raise KeyError(sha)
  359. return commits[sha][0]
  360. # Should handle missing parent gracefully
  361. lcas = _find_lcas(
  362. lookup_parents, b"c1", [b"c2"], lookup_stamp, shallows=shallows
  363. )
  364. # Can't find LCA because parents are missing
  365. self.assertEqual(lcas, [])
  366. def test_find_lcas_with_shallow_partial_history(self) -> None:
  367. """Test _find_lcas with partial history in shallow clone."""
  368. # Complex structure where some history is missing:
  369. # base (missing)
  370. # / \
  371. # c1 c2
  372. # | |
  373. # c3 c4
  374. commits = {
  375. b"c1": (2000, [b"base"]), # base missing
  376. b"c2": (2500, [b"base"]), # base missing
  377. b"c3": (3000, [b"c1"]),
  378. b"c4": (3500, [b"c2"]),
  379. }
  380. shallows = {b"c1", b"c2"} # c1 and c2 are at shallow boundary
  381. def lookup_parents(sha):
  382. if sha not in commits:
  383. raise KeyError(sha)
  384. return commits[sha][1]
  385. def lookup_stamp(sha):
  386. if sha not in commits:
  387. raise KeyError(sha)
  388. return commits[sha][0]
  389. # Find LCA of c3 and c4
  390. lcas = _find_lcas(
  391. lookup_parents, b"c3", [b"c4"], lookup_stamp, shallows=shallows
  392. )
  393. # Can't determine LCA because base is missing
  394. self.assertEqual(lcas, [])
  395. def test_find_lcas_without_shallows_raises_keyerror(self) -> None:
  396. """Test _find_lcas raises KeyError when commit missing without shallows."""
  397. commits = {
  398. b"c2": (3000, [b"base"]),
  399. }
  400. def lookup_parents(sha):
  401. return commits[sha][1]
  402. def lookup_stamp(sha):
  403. if sha not in commits:
  404. raise KeyError(sha)
  405. return commits[sha][0]
  406. # Without shallows parameter, should raise KeyError
  407. with self.assertRaises(KeyError):
  408. _find_lcas(lookup_parents, b"c1", [b"c2"], lookup_stamp)
  409. def test_find_lcas_octopus_with_shallow(self) -> None:
  410. """Test _find_lcas with multiple commits in shallow clone."""
  411. # Structure:
  412. # base (missing)
  413. # / | \
  414. # c1 c2 c3
  415. commits = {
  416. b"c1": (2000, [b"base"]),
  417. b"c2": (2100, [b"base"]),
  418. b"c3": (2200, [b"base"]),
  419. }
  420. shallows = {b"c1", b"c2", b"c3"}
  421. def lookup_parents(sha):
  422. if sha not in commits:
  423. raise KeyError(sha)
  424. return commits[sha][1]
  425. def lookup_stamp(sha):
  426. if sha not in commits:
  427. raise KeyError(sha)
  428. return commits[sha][0]
  429. # Find LCA of c1 with c2 and c3
  430. lcas = _find_lcas(
  431. lookup_parents, b"c1", [b"c2", b"c3"], lookup_stamp, shallows=shallows
  432. )
  433. # Can't find LCA because base is missing
  434. self.assertEqual(lcas, [])
  435. class WorkListTest(TestCase):
  436. def test_WorkList(self) -> None:
  437. # tuples of (timestamp, value) are stored in a Priority MaxQueue
  438. # repeated use of get should return them in maxheap timestamp
  439. # order: largest time value (most recent in time) first then earlier/older
  440. wlst = WorkList()
  441. wlst.add((100, "Test Value 1"))
  442. wlst.add((50, "Test Value 2"))
  443. wlst.add((200, "Test Value 3"))
  444. self.assertEqual(wlst.get(), (200, "Test Value 3"))
  445. self.assertEqual(wlst.get(), (100, "Test Value 1"))
  446. wlst.add((150, "Test Value 4"))
  447. self.assertEqual(wlst.get(), (150, "Test Value 4"))
  448. self.assertEqual(wlst.get(), (50, "Test Value 2"))
  449. def test_WorkList_iter(self) -> None:
  450. # Test the iter method of WorkList
  451. wlst = WorkList()
  452. wlst.add((100, "Value 1"))
  453. wlst.add((200, "Value 2"))
  454. wlst.add((50, "Value 3"))
  455. # Collect all items from iter
  456. items = list(wlst.iter())
  457. # Items should be in their original order, not sorted
  458. self.assertEqual(len(items), 3)
  459. # Check the values are present with correct timestamps
  460. timestamps = [dt for dt, _ in items]
  461. values = [val for _, val in items]
  462. self.assertIn(100, timestamps)
  463. self.assertIn(200, timestamps)
  464. self.assertIn(50, timestamps)
  465. self.assertIn("Value 1", values)
  466. self.assertIn("Value 2", values)
  467. self.assertIn("Value 3", values)
  468. def test_WorkList_empty_get(self) -> None:
  469. # Test getting from an empty WorkList
  470. wlst = WorkList()
  471. with self.assertRaises(IndexError):
  472. wlst.get()
  473. def test_WorkList_empty_iter(self) -> None:
  474. # Test iterating over an empty WorkList
  475. wlst = WorkList()
  476. items = list(wlst.iter())
  477. self.assertEqual([], items)
  478. def test_WorkList_empty_heap(self) -> None:
  479. # The current implementation raises IndexError when the heap is empty
  480. wlst = WorkList()
  481. # Ensure pq is empty
  482. wlst.pq = []
  483. # get should raise IndexError when heap is empty
  484. with self.assertRaises(IndexError):
  485. wlst.get()
  486. class IndependentTests(TestCase):
  487. def test_independent_empty(self) -> None:
  488. r = MemoryRepo()
  489. self.addCleanup(r.close)
  490. # Empty list of commits
  491. self.assertEqual([], independent(r, []))
  492. def test_independent_single(self) -> None:
  493. r = MemoryRepo()
  494. self.addCleanup(r.close)
  495. base = make_commit()
  496. r.object_store.add_objects([(base, None)])
  497. # Single commit is independent
  498. self.assertEqual([base.id], independent(r, [base.id]))
  499. def test_independent_linear(self) -> None:
  500. r = MemoryRepo()
  501. self.addCleanup(r.close)
  502. base = make_commit()
  503. c1 = make_commit(parents=[base.id])
  504. c2 = make_commit(parents=[c1.id])
  505. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  506. # In linear history, only the tip is independent
  507. self.assertEqual([c2.id], independent(r, [base.id, c1.id, c2.id]))
  508. def test_independent_diverged(self) -> None:
  509. r = MemoryRepo()
  510. self.addCleanup(r.close)
  511. base = make_commit()
  512. c1 = make_commit(parents=[base.id])
  513. c2a = make_commit(parents=[c1.id], message=b"2a")
  514. c2b = make_commit(parents=[c1.id], message=b"2b")
  515. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  516. # c2a and c2b are independent from each other
  517. result = independent(r, [c2a.id, c2b.id])
  518. self.assertEqual(2, len(result))
  519. self.assertIn(c2a.id, result)
  520. self.assertIn(c2b.id, result)
  521. def test_independent_mixed(self) -> None:
  522. r = MemoryRepo()
  523. self.addCleanup(r.close)
  524. base = make_commit()
  525. c1 = make_commit(parents=[base.id])
  526. c2a = make_commit(parents=[c1.id], message=b"2a")
  527. c2b = make_commit(parents=[c1.id], message=b"2b")
  528. c3a = make_commit(parents=[c2a.id], message=b"3a")
  529. r.object_store.add_objects(
  530. [(base, None), (c1, None), (c2a, None), (c2b, None), (c3a, None)]
  531. )
  532. # c3a and c2b are independent; c2a is ancestor of c3a
  533. result = independent(r, [c2a.id, c2b.id, c3a.id])
  534. self.assertEqual(2, len(result))
  535. self.assertIn(c2b.id, result)
  536. self.assertIn(c3a.id, result)
  537. self.assertNotIn(c2a.id, result) # c2a is ancestor of c3a