test_merge_drivers.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. # test_merge_drivers.py -- Tests for merge driver support
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as published by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. """Tests for merge driver support."""
  21. import sys
  22. import unittest
  23. from typing import Optional
  24. from dulwich.attrs import GitAttributes, Pattern
  25. from dulwich.config import ConfigDict
  26. from dulwich.merge import merge_blobs
  27. from dulwich.merge_drivers import (
  28. MergeDriverRegistry,
  29. ProcessMergeDriver,
  30. get_merge_driver_registry,
  31. )
  32. from dulwich.objects import Blob
  33. class _TestMergeDriver:
  34. """Test merge driver implementation."""
  35. def __init__(self, name: str = "test"):
  36. self.name = name
  37. self.called = False
  38. self.last_args = None
  39. def merge(
  40. self,
  41. ancestor: bytes,
  42. ours: bytes,
  43. theirs: bytes,
  44. path: Optional[str] = None,
  45. marker_size: int = 7,
  46. ) -> tuple[bytes, bool]:
  47. """Test merge implementation."""
  48. self.called = True
  49. self.last_args = {
  50. "ancestor": ancestor,
  51. "ours": ours,
  52. "theirs": theirs,
  53. "path": path,
  54. "marker_size": marker_size,
  55. }
  56. # Simple test merge: combine all three versions
  57. result = b"TEST MERGE OUTPUT\n"
  58. result += b"Ancestor: " + ancestor + b"\n"
  59. result += b"Ours: " + ours + b"\n"
  60. result += b"Theirs: " + theirs + b"\n"
  61. # Return success if all three are different
  62. success = ancestor != ours and ancestor != theirs
  63. return result, success
  64. class MergeDriverRegistryTests(unittest.TestCase):
  65. """Tests for MergeDriverRegistry."""
  66. def test_register_driver(self):
  67. """Test registering a merge driver."""
  68. registry = MergeDriverRegistry()
  69. driver = _TestMergeDriver("mydriver")
  70. registry.register_driver("mydriver", driver)
  71. retrieved = registry.get_driver("mydriver")
  72. self.assertIs(retrieved, driver)
  73. def test_register_factory(self):
  74. """Test registering a merge driver factory."""
  75. registry = MergeDriverRegistry()
  76. def create_driver():
  77. return _TestMergeDriver("factory_driver")
  78. registry.register_factory("factory", create_driver)
  79. driver = registry.get_driver("factory")
  80. self.assertIsInstance(driver, _TestMergeDriver)
  81. self.assertEqual(driver.name, "factory_driver")
  82. # Second call should return the same instance
  83. driver2 = registry.get_driver("factory")
  84. self.assertIs(driver2, driver)
  85. def test_get_nonexistent_driver(self):
  86. """Test getting a non-existent driver returns None."""
  87. registry = MergeDriverRegistry()
  88. driver = registry.get_driver("nonexistent")
  89. self.assertIsNone(driver)
  90. def test_create_from_config(self):
  91. """Test creating a merge driver from configuration."""
  92. config = ConfigDict()
  93. config.set((b"merge", b"xmlmerge"), b"driver", b"xmlmerge %O %A %B")
  94. registry = MergeDriverRegistry(config)
  95. driver = registry.get_driver("xmlmerge")
  96. self.assertIsInstance(driver, ProcessMergeDriver)
  97. self.assertEqual(driver.name, "xmlmerge")
  98. self.assertEqual(driver.command, "xmlmerge %O %A %B")
  99. class ProcessMergeDriverTests(unittest.TestCase):
  100. """Tests for ProcessMergeDriver."""
  101. def test_merge_with_echo(self):
  102. """Test merge driver using echo command."""
  103. # Use a simple echo command that writes to the output file
  104. command = "echo merged > %A"
  105. driver = ProcessMergeDriver(command, "echo_driver")
  106. ancestor = b"ancestor content"
  107. ours = b"our content"
  108. theirs = b"their content"
  109. result, success = driver.merge(ancestor, ours, theirs, "test.txt", 7)
  110. # Expect different line endings on Windows vs Unix
  111. if sys.platform == "win32":
  112. expected = b"merged \r\n"
  113. else:
  114. expected = b"merged\n"
  115. self.assertEqual(result, expected)
  116. self.assertTrue(success) # echo returns 0
  117. def test_merge_with_cat(self):
  118. """Test merge driver using cat command."""
  119. # Cat all three files together
  120. command = "cat %O %B >> %A"
  121. driver = ProcessMergeDriver(command, "cat_driver")
  122. ancestor = b"ancestor\n"
  123. ours = b"ours\n"
  124. theirs = b"theirs\n"
  125. result, success = driver.merge(ancestor, ours, theirs)
  126. self.assertEqual(result, b"ours\nancestor\ntheirs\n")
  127. self.assertTrue(success)
  128. def test_merge_with_failure(self):
  129. """Test merge driver that fails."""
  130. # Use false command which always returns 1
  131. command = "false"
  132. driver = ProcessMergeDriver(command, "fail_driver")
  133. result, success = driver.merge(b"a", b"b", b"c")
  134. # Should return original content on failure
  135. self.assertEqual(result, b"b")
  136. self.assertFalse(success)
  137. def test_merge_with_markers(self):
  138. """Test merge driver with conflict marker size."""
  139. # Echo the marker size
  140. command = "echo marker size: %L > %A"
  141. driver = ProcessMergeDriver(command, "marker_driver")
  142. result, success = driver.merge(b"a", b"b", b"c", marker_size=15)
  143. # Expect different line endings on Windows vs Unix
  144. if sys.platform == "win32":
  145. expected = b"marker size: 15 \r\n"
  146. else:
  147. expected = b"marker size: 15\n"
  148. self.assertEqual(result, expected)
  149. self.assertTrue(success)
  150. def test_merge_with_path(self):
  151. """Test merge driver with file path."""
  152. # Echo the path
  153. command = "echo path: %P > %A"
  154. driver = ProcessMergeDriver(command, "path_driver")
  155. result, success = driver.merge(b"a", b"b", b"c", path="dir/file.xml")
  156. # Expect different line endings on Windows vs Unix
  157. if sys.platform == "win32":
  158. expected = b"path: dir/file.xml \r\n"
  159. else:
  160. expected = b"path: dir/file.xml\n"
  161. self.assertEqual(result, expected)
  162. self.assertTrue(success)
  163. class MergeBlobsWithDriversTests(unittest.TestCase):
  164. """Tests for merge_blobs with merge drivers."""
  165. def setUp(self):
  166. """Set up test fixtures."""
  167. # Reset global registry
  168. global _merge_driver_registry
  169. from dulwich import merge_drivers
  170. merge_drivers._merge_driver_registry = None
  171. def test_merge_blobs_without_driver(self):
  172. """Test merge_blobs without any merge driver."""
  173. base = Blob.from_string(b"base\ncontent\n")
  174. ours = Blob.from_string(b"base\nour change\n")
  175. theirs = Blob.from_string(b"base\ntheir change\n")
  176. result, has_conflicts = merge_blobs(base, ours, theirs)
  177. # Should use default merge and have conflicts
  178. self.assertTrue(has_conflicts)
  179. self.assertIn(b"<<<<<<< ours", result)
  180. self.assertIn(b">>>>>>> theirs", result)
  181. def test_merge_blobs_with_text_driver(self):
  182. """Test merge_blobs with 'text' merge driver (default)."""
  183. base = Blob.from_string(b"base\ncontent\n")
  184. ours = Blob.from_string(b"base\nour change\n")
  185. theirs = Blob.from_string(b"base\ntheir change\n")
  186. # Set up gitattributes
  187. patterns = [(Pattern(b"*.txt"), {b"merge": b"text"})]
  188. gitattributes = GitAttributes(patterns)
  189. result, has_conflicts = merge_blobs(
  190. base, ours, theirs, b"file.txt", gitattributes
  191. )
  192. # Should use default merge (text is the default)
  193. self.assertTrue(has_conflicts)
  194. self.assertIn(b"<<<<<<< ours", result)
  195. def test_merge_blobs_with_custom_driver(self):
  196. """Test merge_blobs with custom merge driver."""
  197. # Register a test driver
  198. registry = get_merge_driver_registry()
  199. test_driver = _TestMergeDriver("custom")
  200. registry.register_driver("custom", test_driver)
  201. base = Blob.from_string(b"base content")
  202. ours = Blob.from_string(b"our content")
  203. theirs = Blob.from_string(b"their content")
  204. # Set up gitattributes
  205. patterns = [(Pattern(b"*.xml"), {b"merge": b"custom"})]
  206. gitattributes = GitAttributes(patterns)
  207. result, has_conflicts = merge_blobs(
  208. base, ours, theirs, b"file.xml", gitattributes
  209. )
  210. # Check that our test driver was called
  211. self.assertTrue(test_driver.called)
  212. self.assertEqual(test_driver.last_args["ancestor"], b"base content")
  213. self.assertEqual(test_driver.last_args["ours"], b"our content")
  214. self.assertEqual(test_driver.last_args["theirs"], b"their content")
  215. self.assertEqual(test_driver.last_args["path"], "file.xml")
  216. # Check result
  217. self.assertIn(b"TEST MERGE OUTPUT", result)
  218. self.assertFalse(
  219. has_conflicts
  220. ) # Our test driver returns success=True when all differ, so had_conflicts=False
  221. def test_merge_blobs_with_process_driver(self):
  222. """Test merge_blobs with process-based merge driver."""
  223. # Set up config with merge driver
  224. config = ConfigDict()
  225. config.set((b"merge", b"union"), b"driver", b"echo process merge worked > %A")
  226. base = Blob.from_string(b"base")
  227. ours = Blob.from_string(b"ours")
  228. theirs = Blob.from_string(b"theirs")
  229. # Set up gitattributes
  230. patterns = [(Pattern(b"*.list"), {b"merge": b"union"})]
  231. gitattributes = GitAttributes(patterns)
  232. result, has_conflicts = merge_blobs(
  233. base, ours, theirs, b"file.list", gitattributes, config
  234. )
  235. # Check that the process driver was executed
  236. # Expect different line endings on Windows vs Unix
  237. if sys.platform == "win32":
  238. expected = b"process merge worked \r\n"
  239. else:
  240. expected = b"process merge worked\n"
  241. self.assertEqual(result, expected)
  242. self.assertFalse(has_conflicts) # echo returns 0
  243. def test_merge_blobs_driver_not_found(self):
  244. """Test merge_blobs when specified driver is not found."""
  245. base = Blob.from_string(b"base")
  246. ours = Blob.from_string(b"ours")
  247. theirs = Blob.from_string(b"theirs")
  248. # Set up gitattributes with non-existent driver
  249. patterns = [(Pattern(b"*.dat"), {b"merge": b"nonexistent"})]
  250. gitattributes = GitAttributes(patterns)
  251. result, has_conflicts = merge_blobs(
  252. base, ours, theirs, b"file.dat", gitattributes
  253. )
  254. # Should fall back to default merge
  255. self.assertTrue(has_conflicts)
  256. self.assertIn(b"<<<<<<< ours", result)
  257. class GlobalRegistryTests(unittest.TestCase):
  258. """Tests for global merge driver registry."""
  259. def setUp(self):
  260. """Reset global registry before each test."""
  261. global _merge_driver_registry
  262. from dulwich import merge_drivers
  263. merge_drivers._merge_driver_registry = None
  264. def test_get_merge_driver_registry_singleton(self):
  265. """Test that get_merge_driver_registry returns singleton."""
  266. registry1 = get_merge_driver_registry()
  267. registry2 = get_merge_driver_registry()
  268. self.assertIs(registry1, registry2)
  269. def test_get_merge_driver_registry_with_config(self):
  270. """Test updating config on existing registry."""
  271. # Get registry without config
  272. registry = get_merge_driver_registry()
  273. self.assertIsNone(registry._config)
  274. # Get with config
  275. config = ConfigDict()
  276. registry2 = get_merge_driver_registry(config)
  277. self.assertIs(registry2, registry)
  278. self.assertIsNotNone(registry._config)
  279. if __name__ == "__main__":
  280. unittest.main()