test_graph.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. # test_graph.py -- Tests for merge base
  2. # Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. """Tests for dulwich.graph."""
  21. from dulwich.graph import WorkList, _find_lcas, can_fast_forward
  22. from dulwich.repo import MemoryRepo
  23. from dulwich.tests.utils import make_commit
  24. from . import TestCase
  25. class FindMergeBaseTests(TestCase):
  26. @staticmethod
  27. def run_test(dag, inputs):
  28. def lookup_parents(commit_id):
  29. return dag[commit_id]
  30. def lookup_stamp(commit_id) -> int:
  31. # any constant timestamp value here will work to force
  32. # this test to test the same behaviour as done previously
  33. return 100
  34. c1 = inputs[0]
  35. c2s = inputs[1:]
  36. return set(_find_lcas(lookup_parents, c1, c2s, lookup_stamp))
  37. def test_multiple_lca(self) -> None:
  38. # two lowest common ancestors
  39. graph = {
  40. "5": ["1", "2"],
  41. "4": ["3", "1"],
  42. "3": ["2"],
  43. "2": ["0"],
  44. "1": [],
  45. "0": [],
  46. }
  47. self.assertEqual(self.run_test(graph, ["4", "5"]), {"1", "2"})
  48. def test_no_common_ancestor(self) -> None:
  49. # no common ancestor
  50. graph = {
  51. "4": ["2"],
  52. "3": ["1"],
  53. "2": [],
  54. "1": ["0"],
  55. "0": [],
  56. }
  57. self.assertEqual(self.run_test(graph, ["4", "3"]), set())
  58. def test_ancestor(self) -> None:
  59. # ancestor
  60. graph = {
  61. "G": ["D", "F"],
  62. "F": ["E"],
  63. "D": ["C"],
  64. "C": ["B"],
  65. "E": ["B"],
  66. "B": ["A"],
  67. "A": [],
  68. }
  69. self.assertEqual(self.run_test(graph, ["D", "C"]), {"C"})
  70. def test_direct_parent(self) -> None:
  71. # parent
  72. graph = {
  73. "G": ["D", "F"],
  74. "F": ["E"],
  75. "D": ["C"],
  76. "C": ["B"],
  77. "E": ["B"],
  78. "B": ["A"],
  79. "A": [],
  80. }
  81. self.assertEqual(self.run_test(graph, ["G", "D"]), {"D"})
  82. def test_another_crossover(self) -> None:
  83. # Another cross over
  84. graph = {
  85. "G": ["D", "F"],
  86. "F": ["E", "C"],
  87. "D": ["C", "E"],
  88. "C": ["B"],
  89. "E": ["B"],
  90. "B": ["A"],
  91. "A": [],
  92. }
  93. self.assertEqual(self.run_test(graph, ["D", "F"]), {"E", "C"})
  94. def test_three_way_merge_lca(self) -> None:
  95. # three way merge commit straight from git docs
  96. graph = {
  97. "C": ["C1"],
  98. "C1": ["C2"],
  99. "C2": ["C3"],
  100. "C3": ["C4"],
  101. "C4": ["2"],
  102. "B": ["B1"],
  103. "B1": ["B2"],
  104. "B2": ["B3"],
  105. "B3": ["1"],
  106. "A": ["A1"],
  107. "A1": ["A2"],
  108. "A2": ["A3"],
  109. "A3": ["1"],
  110. "1": ["2"],
  111. "2": [],
  112. }
  113. # assumes a theoretical merge M exists that merges B and C first
  114. # which actually means find the first LCA from either of B OR C with A
  115. self.assertEqual(self.run_test(graph, ["A", "B", "C"]), {"1"})
  116. def test_octopus(self) -> None:
  117. # octopus algorithm test
  118. # test straight from git docs of A, B, and C
  119. # but this time use octopus to find lcas of A, B, and C simultaneously
  120. graph = {
  121. "C": ["C1"],
  122. "C1": ["C2"],
  123. "C2": ["C3"],
  124. "C3": ["C4"],
  125. "C4": ["2"],
  126. "B": ["B1"],
  127. "B1": ["B2"],
  128. "B2": ["B3"],
  129. "B3": ["1"],
  130. "A": ["A1"],
  131. "A1": ["A2"],
  132. "A2": ["A3"],
  133. "A3": ["1"],
  134. "1": ["2"],
  135. "2": [],
  136. }
  137. def lookup_parents(cid):
  138. return graph[cid]
  139. def lookup_stamp(commit_id) -> int:
  140. # any constant timestamp value here will work to force
  141. # this test to test the same behaviour as done previously
  142. return 100
  143. lcas = ["A"]
  144. others = ["B", "C"]
  145. for cmt in others:
  146. next_lcas = []
  147. for ca in lcas:
  148. res = _find_lcas(lookup_parents, cmt, [ca], lookup_stamp)
  149. next_lcas.extend(res)
  150. lcas = next_lcas[:]
  151. self.assertEqual(set(lcas), {"2"})
  152. class CanFastForwardTests(TestCase):
  153. def test_ff(self) -> None:
  154. r = MemoryRepo()
  155. base = make_commit()
  156. c1 = make_commit(parents=[base.id])
  157. c2 = make_commit(parents=[c1.id])
  158. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  159. self.assertTrue(can_fast_forward(r, c1.id, c1.id))
  160. self.assertTrue(can_fast_forward(r, base.id, c1.id))
  161. self.assertTrue(can_fast_forward(r, c1.id, c2.id))
  162. self.assertFalse(can_fast_forward(r, c2.id, c1.id))
  163. def test_diverged(self) -> None:
  164. r = MemoryRepo()
  165. base = make_commit()
  166. c1 = make_commit(parents=[base.id])
  167. c2a = make_commit(parents=[c1.id], message=b"2a")
  168. c2b = make_commit(parents=[c1.id], message=b"2b")
  169. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  170. self.assertTrue(can_fast_forward(r, c1.id, c2a.id))
  171. self.assertTrue(can_fast_forward(r, c1.id, c2b.id))
  172. self.assertFalse(can_fast_forward(r, c2a.id, c2b.id))
  173. self.assertFalse(can_fast_forward(r, c2b.id, c2a.id))
  174. class WorkListTest(TestCase):
  175. def test_WorkList(self) -> None:
  176. # tuples of (timestamp, value) are stored in a Priority MaxQueue
  177. # repeated use of get should return them in maxheap timestamp
  178. # order: largest time value (most recent in time) first then earlier/older
  179. wlst = WorkList()
  180. wlst.add((100, "Test Value 1"))
  181. wlst.add((50, "Test Value 2"))
  182. wlst.add((200, "Test Value 3"))
  183. self.assertEqual(wlst.get(), (200, "Test Value 3"))
  184. self.assertEqual(wlst.get(), (100, "Test Value 1"))
  185. wlst.add((150, "Test Value 4"))
  186. self.assertEqual(wlst.get(), (150, "Test Value 4"))
  187. self.assertEqual(wlst.get(), (50, "Test Value 2"))