test_merge.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. """Tests for merge functionality."""
  2. import importlib.util
  3. import unittest
  4. from dulwich.merge import MergeConflict, Merger, three_way_merge
  5. from dulwich.objects import Blob, Commit, Tree
  6. from dulwich.repo import MemoryRepo
  7. class MergeTests(unittest.TestCase):
  8. """Tests for merge functionality."""
  9. def setUp(self):
  10. self.repo = MemoryRepo()
  11. # Check if merge3 module is available
  12. if importlib.util.find_spec("merge3") is None:
  13. raise unittest.SkipTest("merge3 module not available, skipping merge tests")
  14. self.merger = Merger(self.repo.object_store)
  15. def test_merge_blobs_no_conflict(self):
  16. """Test merging blobs without conflicts."""
  17. # Create base blob
  18. base_blob = Blob.from_string(b"line1\nline2\nline3\n")
  19. # Create modified versions - currently our algorithm treats changes to different line groups as conflicts
  20. # This is a simple implementation - Git's merge is more sophisticated
  21. ours_blob = Blob.from_string(b"line1\nmodified line2\nline3\n")
  22. theirs_blob = Blob.from_string(b"line1\nline2\nmodified line3\n")
  23. # Add blobs to object store
  24. self.repo.object_store.add_object(base_blob)
  25. self.repo.object_store.add_object(ours_blob)
  26. self.repo.object_store.add_object(theirs_blob)
  27. # Merge - this will result in a conflict with our simple algorithm
  28. result, has_conflicts = self.merger.merge_blobs(
  29. base_blob, ours_blob, theirs_blob
  30. )
  31. # For now, expect conflicts since both sides changed (even different lines)
  32. self.assertTrue(has_conflicts)
  33. self.assertIn(b"<<<<<<< ours", result)
  34. self.assertIn(b">>>>>>> theirs", result)
  35. def test_merge_blobs_clean_merge(self):
  36. """Test merging blobs with a clean merge (one side unchanged)."""
  37. # Create base blob
  38. base_blob = Blob.from_string(b"line1\nline2\nline3\n")
  39. # Only ours modifies
  40. ours_blob = Blob.from_string(b"line1\nmodified line2\nline3\n")
  41. theirs_blob = base_blob # unchanged
  42. # Add blobs to object store
  43. self.repo.object_store.add_object(base_blob)
  44. self.repo.object_store.add_object(ours_blob)
  45. # Merge
  46. result, has_conflicts = self.merger.merge_blobs(
  47. base_blob, ours_blob, theirs_blob
  48. )
  49. self.assertFalse(has_conflicts)
  50. self.assertEqual(result, b"line1\nmodified line2\nline3\n")
  51. def test_merge_blobs_with_conflict(self):
  52. """Test merging blobs with conflicts."""
  53. # Create base blob
  54. base_blob = Blob.from_string(b"line1\nline2\nline3\n")
  55. # Create conflicting modifications
  56. ours_blob = Blob.from_string(b"line1\nours line2\nline3\n")
  57. theirs_blob = Blob.from_string(b"line1\ntheirs line2\nline3\n")
  58. # Add blobs to object store
  59. self.repo.object_store.add_object(base_blob)
  60. self.repo.object_store.add_object(ours_blob)
  61. self.repo.object_store.add_object(theirs_blob)
  62. # Merge
  63. result, has_conflicts = self.merger.merge_blobs(
  64. base_blob, ours_blob, theirs_blob
  65. )
  66. self.assertTrue(has_conflicts)
  67. self.assertIn(b"<<<<<<< ours", result)
  68. self.assertIn(b"=======", result)
  69. self.assertIn(b">>>>>>> theirs", result)
  70. def test_merge_blobs_identical(self):
  71. """Test merging identical blobs."""
  72. blob = Blob.from_string(b"same content\n")
  73. self.repo.object_store.add_object(blob)
  74. result, has_conflicts = self.merger.merge_blobs(blob, blob, blob)
  75. self.assertFalse(has_conflicts)
  76. self.assertEqual(result, b"same content\n")
  77. def test_merge_blobs_one_side_unchanged(self):
  78. """Test merging when one side is unchanged."""
  79. base_blob = Blob.from_string(b"original\n")
  80. modified_blob = Blob.from_string(b"modified\n")
  81. self.repo.object_store.add_object(base_blob)
  82. self.repo.object_store.add_object(modified_blob)
  83. # Test ours unchanged, theirs modified
  84. result, has_conflicts = self.merger.merge_blobs(
  85. base_blob, base_blob, modified_blob
  86. )
  87. self.assertFalse(has_conflicts)
  88. self.assertEqual(result, b"modified\n")
  89. # Test theirs unchanged, ours modified
  90. result, has_conflicts = self.merger.merge_blobs(
  91. base_blob, modified_blob, base_blob
  92. )
  93. self.assertFalse(has_conflicts)
  94. self.assertEqual(result, b"modified\n")
  95. def test_merge_blobs_deletion_no_conflict(self):
  96. """Test merging with deletion where no conflict occurs."""
  97. base_blob = Blob.from_string(b"content\n")
  98. self.repo.object_store.add_object(base_blob)
  99. # Both delete
  100. result, has_conflicts = self.merger.merge_blobs(base_blob, None, None)
  101. self.assertFalse(has_conflicts)
  102. self.assertEqual(result, b"")
  103. # One deletes, other unchanged
  104. result, has_conflicts = self.merger.merge_blobs(base_blob, None, base_blob)
  105. self.assertFalse(has_conflicts)
  106. self.assertEqual(result, b"")
  107. def test_merge_blobs_deletion_with_conflict(self):
  108. """Test merging with deletion that causes conflict."""
  109. base_blob = Blob.from_string(b"content\n")
  110. modified_blob = Blob.from_string(b"modified content\n")
  111. self.repo.object_store.add_object(base_blob)
  112. self.repo.object_store.add_object(modified_blob)
  113. # We delete, they modify
  114. result, has_conflicts = self.merger.merge_blobs(base_blob, None, modified_blob)
  115. self.assertTrue(has_conflicts)
  116. def test_merge_blobs_no_base(self):
  117. """Test merging blobs with no common ancestor."""
  118. blob1 = Blob.from_string(b"content1\n")
  119. blob2 = Blob.from_string(b"content2\n")
  120. self.repo.object_store.add_object(blob1)
  121. self.repo.object_store.add_object(blob2)
  122. # Different content added in both - conflict
  123. result, has_conflicts = self.merger.merge_blobs(None, blob1, blob2)
  124. self.assertTrue(has_conflicts)
  125. # Same content added in both - no conflict
  126. result, has_conflicts = self.merger.merge_blobs(None, blob1, blob1)
  127. self.assertFalse(has_conflicts)
  128. self.assertEqual(result, b"content1\n")
  129. def test_merge_trees_simple(self):
  130. """Test simple tree merge."""
  131. # Create base tree
  132. base_tree = Tree()
  133. blob1 = Blob.from_string(b"file1 content\n")
  134. blob2 = Blob.from_string(b"file2 content\n")
  135. self.repo.object_store.add_object(blob1)
  136. self.repo.object_store.add_object(blob2)
  137. base_tree.add(b"file1.txt", 0o100644, blob1.id)
  138. base_tree.add(b"file2.txt", 0o100644, blob2.id)
  139. self.repo.object_store.add_object(base_tree)
  140. # Create ours tree (modify file1)
  141. ours_tree = Tree()
  142. ours_blob1 = Blob.from_string(b"file1 modified by ours\n")
  143. self.repo.object_store.add_object(ours_blob1)
  144. ours_tree.add(b"file1.txt", 0o100644, ours_blob1.id)
  145. ours_tree.add(b"file2.txt", 0o100644, blob2.id)
  146. self.repo.object_store.add_object(ours_tree)
  147. # Create theirs tree (modify file2)
  148. theirs_tree = Tree()
  149. theirs_blob2 = Blob.from_string(b"file2 modified by theirs\n")
  150. self.repo.object_store.add_object(theirs_blob2)
  151. theirs_tree.add(b"file1.txt", 0o100644, blob1.id)
  152. theirs_tree.add(b"file2.txt", 0o100644, theirs_blob2.id)
  153. self.repo.object_store.add_object(theirs_tree)
  154. # Merge
  155. merged_tree, conflicts = self.merger.merge_trees(
  156. base_tree, ours_tree, theirs_tree
  157. )
  158. self.assertEqual(len(conflicts), 0)
  159. self.assertIn(b"file1.txt", [item.path for item in merged_tree.items()])
  160. self.assertIn(b"file2.txt", [item.path for item in merged_tree.items()])
  161. def test_merge_trees_with_conflict(self):
  162. """Test tree merge with conflicting changes."""
  163. # Create base tree
  164. base_tree = Tree()
  165. blob1 = Blob.from_string(b"original content\n")
  166. self.repo.object_store.add_object(blob1)
  167. base_tree.add(b"conflict.txt", 0o100644, blob1.id)
  168. self.repo.object_store.add_object(base_tree)
  169. # Create ours tree
  170. ours_tree = Tree()
  171. ours_blob = Blob.from_string(b"ours content\n")
  172. self.repo.object_store.add_object(ours_blob)
  173. ours_tree.add(b"conflict.txt", 0o100644, ours_blob.id)
  174. self.repo.object_store.add_object(ours_tree)
  175. # Create theirs tree
  176. theirs_tree = Tree()
  177. theirs_blob = Blob.from_string(b"theirs content\n")
  178. self.repo.object_store.add_object(theirs_blob)
  179. theirs_tree.add(b"conflict.txt", 0o100644, theirs_blob.id)
  180. self.repo.object_store.add_object(theirs_tree)
  181. # Merge
  182. merged_tree, conflicts = self.merger.merge_trees(
  183. base_tree, ours_tree, theirs_tree
  184. )
  185. self.assertEqual(len(conflicts), 1)
  186. self.assertEqual(conflicts[0], b"conflict.txt")
  187. def test_three_way_merge(self):
  188. """Test three-way merge between commits."""
  189. # Create base commit
  190. base_tree = Tree()
  191. blob = Blob.from_string(b"base content\n")
  192. self.repo.object_store.add_object(blob)
  193. base_tree.add(b"file.txt", 0o100644, blob.id)
  194. self.repo.object_store.add_object(base_tree)
  195. base_commit = Commit()
  196. base_commit.tree = base_tree.id
  197. base_commit.author = b"Test Author <test@example.com>"
  198. base_commit.committer = b"Test Author <test@example.com>"
  199. base_commit.message = b"Base commit"
  200. base_commit.commit_time = base_commit.author_time = 12345
  201. base_commit.commit_timezone = base_commit.author_timezone = 0
  202. self.repo.object_store.add_object(base_commit)
  203. # Create ours commit
  204. ours_tree = Tree()
  205. ours_blob = Blob.from_string(b"ours content\n")
  206. self.repo.object_store.add_object(ours_blob)
  207. ours_tree.add(b"file.txt", 0o100644, ours_blob.id)
  208. self.repo.object_store.add_object(ours_tree)
  209. ours_commit = Commit()
  210. ours_commit.tree = ours_tree.id
  211. ours_commit.parents = [base_commit.id]
  212. ours_commit.author = b"Test Author <test@example.com>"
  213. ours_commit.committer = b"Test Author <test@example.com>"
  214. ours_commit.message = b"Ours commit"
  215. ours_commit.commit_time = ours_commit.author_time = 12346
  216. ours_commit.commit_timezone = ours_commit.author_timezone = 0
  217. self.repo.object_store.add_object(ours_commit)
  218. # Create theirs commit
  219. theirs_tree = Tree()
  220. theirs_blob = Blob.from_string(b"theirs content\n")
  221. self.repo.object_store.add_object(theirs_blob)
  222. theirs_tree.add(b"file.txt", 0o100644, theirs_blob.id)
  223. self.repo.object_store.add_object(theirs_tree)
  224. theirs_commit = Commit()
  225. theirs_commit.tree = theirs_tree.id
  226. theirs_commit.parents = [base_commit.id]
  227. theirs_commit.author = b"Test Author <test@example.com>"
  228. theirs_commit.committer = b"Test Author <test@example.com>"
  229. theirs_commit.message = b"Theirs commit"
  230. theirs_commit.commit_time = theirs_commit.author_time = 12347
  231. theirs_commit.commit_timezone = theirs_commit.author_timezone = 0
  232. self.repo.object_store.add_object(theirs_commit)
  233. # Perform three-way merge
  234. merged_tree, conflicts = three_way_merge(
  235. self.repo.object_store, base_commit, ours_commit, theirs_commit
  236. )
  237. # Should have conflict since both modified the same file differently
  238. self.assertEqual(len(conflicts), 1)
  239. self.assertEqual(conflicts[0], b"file.txt")
  240. def test_merge_exception(self):
  241. """Test MergeConflict exception."""
  242. exc = MergeConflict(b"test/path", "test message")
  243. self.assertEqual(exc.path, b"test/path")
  244. self.assertIn("test/path", str(exc))
  245. self.assertIn("test message", str(exc))