test_merge.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. """Tests for merge functionality."""
  2. import importlib.util
  3. import unittest
  4. from dulwich.merge import MergeConflict, Merger, three_way_merge
  5. from dulwich.objects import Blob, Commit, Tree
  6. from dulwich.repo import MemoryRepo
  7. from . import DependencyMissing
  8. class MergeTests(unittest.TestCase):
  9. """Tests for merge functionality."""
  10. def setUp(self):
  11. self.repo = MemoryRepo()
  12. # Check if merge3 module is available
  13. if importlib.util.find_spec("merge3") is None:
  14. raise DependencyMissing("merge3")
  15. self.merger = Merger(self.repo.object_store)
  16. def test_merge_blobs_no_conflict(self):
  17. """Test merging blobs without conflicts."""
  18. # Create base blob
  19. base_blob = Blob.from_string(b"line1\nline2\nline3\n")
  20. # Create modified versions - currently our algorithm treats changes to different line groups as conflicts
  21. # This is a simple implementation - Git's merge is more sophisticated
  22. ours_blob = Blob.from_string(b"line1\nmodified line2\nline3\n")
  23. theirs_blob = Blob.from_string(b"line1\nline2\nmodified line3\n")
  24. # Add blobs to object store
  25. self.repo.object_store.add_object(base_blob)
  26. self.repo.object_store.add_object(ours_blob)
  27. self.repo.object_store.add_object(theirs_blob)
  28. # Merge - this will result in a conflict with our simple algorithm
  29. result, has_conflicts = self.merger.merge_blobs(
  30. base_blob, ours_blob, theirs_blob
  31. )
  32. # For now, expect conflicts since both sides changed (even different lines)
  33. self.assertTrue(has_conflicts)
  34. self.assertIn(b"<<<<<<< ours", result)
  35. self.assertIn(b">>>>>>> theirs", result)
  36. def test_merge_blobs_clean_merge(self):
  37. """Test merging blobs with a clean merge (one side unchanged)."""
  38. # Create base blob
  39. base_blob = Blob.from_string(b"line1\nline2\nline3\n")
  40. # Only ours modifies
  41. ours_blob = Blob.from_string(b"line1\nmodified line2\nline3\n")
  42. theirs_blob = base_blob # unchanged
  43. # Add blobs to object store
  44. self.repo.object_store.add_object(base_blob)
  45. self.repo.object_store.add_object(ours_blob)
  46. # Merge
  47. result, has_conflicts = self.merger.merge_blobs(
  48. base_blob, ours_blob, theirs_blob
  49. )
  50. self.assertFalse(has_conflicts)
  51. self.assertEqual(result, b"line1\nmodified line2\nline3\n")
  52. def test_merge_blobs_with_conflict(self):
  53. """Test merging blobs with conflicts."""
  54. # Create base blob
  55. base_blob = Blob.from_string(b"line1\nline2\nline3\n")
  56. # Create conflicting modifications
  57. ours_blob = Blob.from_string(b"line1\nours line2\nline3\n")
  58. theirs_blob = Blob.from_string(b"line1\ntheirs line2\nline3\n")
  59. # Add blobs to object store
  60. self.repo.object_store.add_object(base_blob)
  61. self.repo.object_store.add_object(ours_blob)
  62. self.repo.object_store.add_object(theirs_blob)
  63. # Merge
  64. result, has_conflicts = self.merger.merge_blobs(
  65. base_blob, ours_blob, theirs_blob
  66. )
  67. self.assertTrue(has_conflicts)
  68. self.assertIn(b"<<<<<<< ours", result)
  69. self.assertIn(b"=======", result)
  70. self.assertIn(b">>>>>>> theirs", result)
  71. def test_merge_blobs_identical(self):
  72. """Test merging identical blobs."""
  73. blob = Blob.from_string(b"same content\n")
  74. self.repo.object_store.add_object(blob)
  75. result, has_conflicts = self.merger.merge_blobs(blob, blob, blob)
  76. self.assertFalse(has_conflicts)
  77. self.assertEqual(result, b"same content\n")
  78. def test_merge_blobs_one_side_unchanged(self):
  79. """Test merging when one side is unchanged."""
  80. base_blob = Blob.from_string(b"original\n")
  81. modified_blob = Blob.from_string(b"modified\n")
  82. self.repo.object_store.add_object(base_blob)
  83. self.repo.object_store.add_object(modified_blob)
  84. # Test ours unchanged, theirs modified
  85. result, has_conflicts = self.merger.merge_blobs(
  86. base_blob, base_blob, modified_blob
  87. )
  88. self.assertFalse(has_conflicts)
  89. self.assertEqual(result, b"modified\n")
  90. # Test theirs unchanged, ours modified
  91. result, has_conflicts = self.merger.merge_blobs(
  92. base_blob, modified_blob, base_blob
  93. )
  94. self.assertFalse(has_conflicts)
  95. self.assertEqual(result, b"modified\n")
  96. def test_merge_blobs_deletion_no_conflict(self):
  97. """Test merging with deletion where no conflict occurs."""
  98. base_blob = Blob.from_string(b"content\n")
  99. self.repo.object_store.add_object(base_blob)
  100. # Both delete
  101. result, has_conflicts = self.merger.merge_blobs(base_blob, None, None)
  102. self.assertFalse(has_conflicts)
  103. self.assertEqual(result, b"")
  104. # One deletes, other unchanged
  105. result, has_conflicts = self.merger.merge_blobs(base_blob, None, base_blob)
  106. self.assertFalse(has_conflicts)
  107. self.assertEqual(result, b"")
  108. def test_merge_blobs_deletion_with_conflict(self):
  109. """Test merging with deletion that causes conflict."""
  110. base_blob = Blob.from_string(b"content\n")
  111. modified_blob = Blob.from_string(b"modified content\n")
  112. self.repo.object_store.add_object(base_blob)
  113. self.repo.object_store.add_object(modified_blob)
  114. # We delete, they modify
  115. _result, has_conflicts = self.merger.merge_blobs(base_blob, None, modified_blob)
  116. self.assertTrue(has_conflicts)
  117. def test_merge_blobs_no_base(self):
  118. """Test merging blobs with no common ancestor."""
  119. blob1 = Blob.from_string(b"content1\n")
  120. blob2 = Blob.from_string(b"content2\n")
  121. self.repo.object_store.add_object(blob1)
  122. self.repo.object_store.add_object(blob2)
  123. # Different content added in both - conflict
  124. result, has_conflicts = self.merger.merge_blobs(None, blob1, blob2)
  125. self.assertTrue(has_conflicts)
  126. # Same content added in both - no conflict
  127. result, has_conflicts = self.merger.merge_blobs(None, blob1, blob1)
  128. self.assertFalse(has_conflicts)
  129. self.assertEqual(result, b"content1\n")
  130. def test_merge_trees_simple(self):
  131. """Test simple tree merge."""
  132. # Create base tree
  133. base_tree = Tree()
  134. blob1 = Blob.from_string(b"file1 content\n")
  135. blob2 = Blob.from_string(b"file2 content\n")
  136. self.repo.object_store.add_object(blob1)
  137. self.repo.object_store.add_object(blob2)
  138. base_tree.add(b"file1.txt", 0o100644, blob1.id)
  139. base_tree.add(b"file2.txt", 0o100644, blob2.id)
  140. self.repo.object_store.add_object(base_tree)
  141. # Create ours tree (modify file1)
  142. ours_tree = Tree()
  143. ours_blob1 = Blob.from_string(b"file1 modified by ours\n")
  144. self.repo.object_store.add_object(ours_blob1)
  145. ours_tree.add(b"file1.txt", 0o100644, ours_blob1.id)
  146. ours_tree.add(b"file2.txt", 0o100644, blob2.id)
  147. self.repo.object_store.add_object(ours_tree)
  148. # Create theirs tree (modify file2)
  149. theirs_tree = Tree()
  150. theirs_blob2 = Blob.from_string(b"file2 modified by theirs\n")
  151. self.repo.object_store.add_object(theirs_blob2)
  152. theirs_tree.add(b"file1.txt", 0o100644, blob1.id)
  153. theirs_tree.add(b"file2.txt", 0o100644, theirs_blob2.id)
  154. self.repo.object_store.add_object(theirs_tree)
  155. # Merge
  156. merged_tree, conflicts = self.merger.merge_trees(
  157. base_tree, ours_tree, theirs_tree
  158. )
  159. self.assertEqual(len(conflicts), 0)
  160. self.assertIn(b"file1.txt", [item.path for item in merged_tree.items()])
  161. self.assertIn(b"file2.txt", [item.path for item in merged_tree.items()])
  162. def test_merge_trees_with_conflict(self):
  163. """Test tree merge with conflicting changes."""
  164. # Create base tree
  165. base_tree = Tree()
  166. blob1 = Blob.from_string(b"original content\n")
  167. self.repo.object_store.add_object(blob1)
  168. base_tree.add(b"conflict.txt", 0o100644, blob1.id)
  169. self.repo.object_store.add_object(base_tree)
  170. # Create ours tree
  171. ours_tree = Tree()
  172. ours_blob = Blob.from_string(b"ours content\n")
  173. self.repo.object_store.add_object(ours_blob)
  174. ours_tree.add(b"conflict.txt", 0o100644, ours_blob.id)
  175. self.repo.object_store.add_object(ours_tree)
  176. # Create theirs tree
  177. theirs_tree = Tree()
  178. theirs_blob = Blob.from_string(b"theirs content\n")
  179. self.repo.object_store.add_object(theirs_blob)
  180. theirs_tree.add(b"conflict.txt", 0o100644, theirs_blob.id)
  181. self.repo.object_store.add_object(theirs_tree)
  182. # Merge
  183. _merged_tree, conflicts = self.merger.merge_trees(
  184. base_tree, ours_tree, theirs_tree
  185. )
  186. self.assertEqual(len(conflicts), 1)
  187. self.assertEqual(conflicts[0], b"conflict.txt")
  188. def test_three_way_merge(self):
  189. """Test three-way merge between commits."""
  190. # Create base commit
  191. base_tree = Tree()
  192. blob = Blob.from_string(b"base content\n")
  193. self.repo.object_store.add_object(blob)
  194. base_tree.add(b"file.txt", 0o100644, blob.id)
  195. self.repo.object_store.add_object(base_tree)
  196. base_commit = Commit()
  197. base_commit.tree = base_tree.id
  198. base_commit.author = b"Test Author <test@example.com>"
  199. base_commit.committer = b"Test Author <test@example.com>"
  200. base_commit.message = b"Base commit"
  201. base_commit.commit_time = base_commit.author_time = 12345
  202. base_commit.commit_timezone = base_commit.author_timezone = 0
  203. self.repo.object_store.add_object(base_commit)
  204. # Create ours commit
  205. ours_tree = Tree()
  206. ours_blob = Blob.from_string(b"ours content\n")
  207. self.repo.object_store.add_object(ours_blob)
  208. ours_tree.add(b"file.txt", 0o100644, ours_blob.id)
  209. self.repo.object_store.add_object(ours_tree)
  210. ours_commit = Commit()
  211. ours_commit.tree = ours_tree.id
  212. ours_commit.parents = [base_commit.id]
  213. ours_commit.author = b"Test Author <test@example.com>"
  214. ours_commit.committer = b"Test Author <test@example.com>"
  215. ours_commit.message = b"Ours commit"
  216. ours_commit.commit_time = ours_commit.author_time = 12346
  217. ours_commit.commit_timezone = ours_commit.author_timezone = 0
  218. self.repo.object_store.add_object(ours_commit)
  219. # Create theirs commit
  220. theirs_tree = Tree()
  221. theirs_blob = Blob.from_string(b"theirs content\n")
  222. self.repo.object_store.add_object(theirs_blob)
  223. theirs_tree.add(b"file.txt", 0o100644, theirs_blob.id)
  224. self.repo.object_store.add_object(theirs_tree)
  225. theirs_commit = Commit()
  226. theirs_commit.tree = theirs_tree.id
  227. theirs_commit.parents = [base_commit.id]
  228. theirs_commit.author = b"Test Author <test@example.com>"
  229. theirs_commit.committer = b"Test Author <test@example.com>"
  230. theirs_commit.message = b"Theirs commit"
  231. theirs_commit.commit_time = theirs_commit.author_time = 12347
  232. theirs_commit.commit_timezone = theirs_commit.author_timezone = 0
  233. self.repo.object_store.add_object(theirs_commit)
  234. # Perform three-way merge
  235. _merged_tree, conflicts = three_way_merge(
  236. self.repo.object_store, base_commit, ours_commit, theirs_commit
  237. )
  238. # Should have conflict since both modified the same file differently
  239. self.assertEqual(len(conflicts), 1)
  240. self.assertEqual(conflicts[0], b"file.txt")
  241. def test_merge_exception(self):
  242. """Test MergeConflict exception."""
  243. exc = MergeConflict(b"test/path", "test message")
  244. self.assertEqual(exc.path, b"test/path")
  245. self.assertIn("test/path", str(exc))
  246. self.assertIn("test message", str(exc))