test_graph.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. # test_graph.py -- Tests for merge base
  2. # Copyright (c) 2020 Kevin B. Hendricks, Stratford Ontario Canada
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as public by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. """Tests for dulwich.graph."""
  20. from dulwich.graph import WorkList, _find_lcas, can_fast_forward
  21. from dulwich.repo import MemoryRepo
  22. from dulwich.tests.utils import make_commit
  23. from . import TestCase
  24. class FindMergeBaseTests(TestCase):
  25. @staticmethod
  26. def run_test(dag, inputs):
  27. def lookup_parents(commit_id):
  28. return dag[commit_id]
  29. def lookup_stamp(commit_id):
  30. # any constant timestamp value here will work to force
  31. # this test to test the same behaviour as done previously
  32. return 100
  33. c1 = inputs[0]
  34. c2s = inputs[1:]
  35. return set(_find_lcas(lookup_parents, c1, c2s, lookup_stamp))
  36. def test_multiple_lca(self):
  37. # two lowest common ancestors
  38. graph = {
  39. "5": ["1", "2"],
  40. "4": ["3", "1"],
  41. "3": ["2"],
  42. "2": ["0"],
  43. "1": [],
  44. "0": [],
  45. }
  46. self.assertEqual(self.run_test(graph, ["4", "5"]), {"1", "2"})
  47. def test_no_common_ancestor(self):
  48. # no common ancestor
  49. graph = {
  50. "4": ["2"],
  51. "3": ["1"],
  52. "2": [],
  53. "1": ["0"],
  54. "0": [],
  55. }
  56. self.assertEqual(self.run_test(graph, ["4", "3"]), set())
  57. def test_ancestor(self):
  58. # ancestor
  59. graph = {
  60. "G": ["D", "F"],
  61. "F": ["E"],
  62. "D": ["C"],
  63. "C": ["B"],
  64. "E": ["B"],
  65. "B": ["A"],
  66. "A": [],
  67. }
  68. self.assertEqual(self.run_test(graph, ["D", "C"]), {"C"})
  69. def test_direct_parent(self):
  70. # parent
  71. graph = {
  72. "G": ["D", "F"],
  73. "F": ["E"],
  74. "D": ["C"],
  75. "C": ["B"],
  76. "E": ["B"],
  77. "B": ["A"],
  78. "A": [],
  79. }
  80. self.assertEqual(self.run_test(graph, ["G", "D"]), {"D"})
  81. def test_another_crossover(self):
  82. # Another cross over
  83. graph = {
  84. "G": ["D", "F"],
  85. "F": ["E", "C"],
  86. "D": ["C", "E"],
  87. "C": ["B"],
  88. "E": ["B"],
  89. "B": ["A"],
  90. "A": [],
  91. }
  92. self.assertEqual(self.run_test(graph, ["D", "F"]), {"E", "C"})
  93. def test_three_way_merge_lca(self):
  94. # three way merge commit straight from git docs
  95. graph = {
  96. "C": ["C1"],
  97. "C1": ["C2"],
  98. "C2": ["C3"],
  99. "C3": ["C4"],
  100. "C4": ["2"],
  101. "B": ["B1"],
  102. "B1": ["B2"],
  103. "B2": ["B3"],
  104. "B3": ["1"],
  105. "A": ["A1"],
  106. "A1": ["A2"],
  107. "A2": ["A3"],
  108. "A3": ["1"],
  109. "1": ["2"],
  110. "2": [],
  111. }
  112. # assumes a theoretical merge M exists that merges B and C first
  113. # which actually means find the first LCA from either of B OR C with A
  114. self.assertEqual(self.run_test(graph, ["A", "B", "C"]), {"1"})
  115. def test_octopus(self):
  116. # octopus algorithm test
  117. # test straight from git docs of A, B, and C
  118. # but this time use octopus to find lcas of A, B, and C simultaneously
  119. graph = {
  120. "C": ["C1"],
  121. "C1": ["C2"],
  122. "C2": ["C3"],
  123. "C3": ["C4"],
  124. "C4": ["2"],
  125. "B": ["B1"],
  126. "B1": ["B2"],
  127. "B2": ["B3"],
  128. "B3": ["1"],
  129. "A": ["A1"],
  130. "A1": ["A2"],
  131. "A2": ["A3"],
  132. "A3": ["1"],
  133. "1": ["2"],
  134. "2": [],
  135. }
  136. def lookup_parents(cid):
  137. return graph[cid]
  138. def lookup_stamp(commit_id):
  139. # any constant timestamp value here will work to force
  140. # this test to test the same behaviour as done previously
  141. return 100
  142. lcas = ["A"]
  143. others = ["B", "C"]
  144. for cmt in others:
  145. next_lcas = []
  146. for ca in lcas:
  147. res = _find_lcas(lookup_parents, cmt, [ca], lookup_stamp)
  148. next_lcas.extend(res)
  149. lcas = next_lcas[:]
  150. self.assertEqual(set(lcas), {"2"})
  151. class CanFastForwardTests(TestCase):
  152. def test_ff(self):
  153. r = MemoryRepo()
  154. base = make_commit()
  155. c1 = make_commit(parents=[base.id])
  156. c2 = make_commit(parents=[c1.id])
  157. r.object_store.add_objects([(base, None), (c1, None), (c2, None)])
  158. self.assertTrue(can_fast_forward(r, c1.id, c1.id))
  159. self.assertTrue(can_fast_forward(r, base.id, c1.id))
  160. self.assertTrue(can_fast_forward(r, c1.id, c2.id))
  161. self.assertFalse(can_fast_forward(r, c2.id, c1.id))
  162. def test_diverged(self):
  163. r = MemoryRepo()
  164. base = make_commit()
  165. c1 = make_commit(parents=[base.id])
  166. c2a = make_commit(parents=[c1.id], message=b"2a")
  167. c2b = make_commit(parents=[c1.id], message=b"2b")
  168. r.object_store.add_objects([(base, None), (c1, None), (c2a, None), (c2b, None)])
  169. self.assertTrue(can_fast_forward(r, c1.id, c2a.id))
  170. self.assertTrue(can_fast_forward(r, c1.id, c2b.id))
  171. self.assertFalse(can_fast_forward(r, c2a.id, c2b.id))
  172. self.assertFalse(can_fast_forward(r, c2b.id, c2a.id))
  173. class WorkListTest(TestCase):
  174. def test_WorkList(self):
  175. # tuples of (timestamp, value) are stored in a Priority MaxQueue
  176. # repeated use of get should return them in maxheap timestamp
  177. # order: largest time value (most recent in time) first then earlier/older
  178. wlst = WorkList()
  179. wlst.add((100, "Test Value 1"))
  180. wlst.add((50, "Test Value 2"))
  181. wlst.add((200, "Test Value 3"))
  182. self.assertEqual(wlst.get(), (200, "Test Value 3"))
  183. self.assertEqual(wlst.get(), (100, "Test Value 1"))
  184. wlst.add((150, "Test Value 4"))
  185. self.assertEqual(wlst.get(), (150, "Test Value 4"))
  186. self.assertEqual(wlst.get(), (50, "Test Value 2"))