test_merge_drivers.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. # test_merge_drivers.py -- Tests for merge driver support
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as published by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. """Tests for merge driver support."""
  21. import importlib.util
  22. import sys
  23. import unittest
  24. from typing import Optional
  25. from dulwich.attrs import GitAttributes, Pattern
  26. from dulwich.config import ConfigDict
  27. from dulwich.merge import merge_blobs
  28. from dulwich.merge_drivers import (
  29. MergeDriverRegistry,
  30. ProcessMergeDriver,
  31. get_merge_driver_registry,
  32. )
  33. from dulwich.objects import Blob
  34. class _TestMergeDriver:
  35. """Test merge driver implementation."""
  36. def __init__(self, name: str = "test"):
  37. self.name = name
  38. self.called = False
  39. self.last_args = None
  40. def merge(
  41. self,
  42. ancestor: bytes,
  43. ours: bytes,
  44. theirs: bytes,
  45. path: Optional[str] = None,
  46. marker_size: int = 7,
  47. ) -> tuple[bytes, bool]:
  48. """Test merge implementation."""
  49. self.called = True
  50. self.last_args = {
  51. "ancestor": ancestor,
  52. "ours": ours,
  53. "theirs": theirs,
  54. "path": path,
  55. "marker_size": marker_size,
  56. }
  57. # Simple test merge: combine all three versions
  58. result = b"TEST MERGE OUTPUT\n"
  59. result += b"Ancestor: " + ancestor + b"\n"
  60. result += b"Ours: " + ours + b"\n"
  61. result += b"Theirs: " + theirs + b"\n"
  62. # Return success if all three are different
  63. success = ancestor != ours and ancestor != theirs
  64. return result, success
  65. class MergeDriverRegistryTests(unittest.TestCase):
  66. """Tests for MergeDriverRegistry."""
  67. def test_register_driver(self):
  68. """Test registering a merge driver."""
  69. registry = MergeDriverRegistry()
  70. driver = _TestMergeDriver("mydriver")
  71. registry.register_driver("mydriver", driver)
  72. retrieved = registry.get_driver("mydriver")
  73. self.assertIs(retrieved, driver)
  74. def test_register_factory(self):
  75. """Test registering a merge driver factory."""
  76. registry = MergeDriverRegistry()
  77. def create_driver():
  78. return _TestMergeDriver("factory_driver")
  79. registry.register_factory("factory", create_driver)
  80. driver = registry.get_driver("factory")
  81. self.assertIsInstance(driver, _TestMergeDriver)
  82. self.assertEqual(driver.name, "factory_driver")
  83. # Second call should return the same instance
  84. driver2 = registry.get_driver("factory")
  85. self.assertIs(driver2, driver)
  86. def test_get_nonexistent_driver(self):
  87. """Test getting a non-existent driver returns None."""
  88. registry = MergeDriverRegistry()
  89. driver = registry.get_driver("nonexistent")
  90. self.assertIsNone(driver)
  91. def test_create_from_config(self):
  92. """Test creating a merge driver from configuration."""
  93. config = ConfigDict()
  94. config.set((b"merge", b"xmlmerge"), b"driver", b"xmlmerge %O %A %B")
  95. registry = MergeDriverRegistry(config)
  96. driver = registry.get_driver("xmlmerge")
  97. self.assertIsInstance(driver, ProcessMergeDriver)
  98. self.assertEqual(driver.name, "xmlmerge")
  99. self.assertEqual(driver.command, "xmlmerge %O %A %B")
  100. class ProcessMergeDriverTests(unittest.TestCase):
  101. """Tests for ProcessMergeDriver."""
  102. def test_merge_with_echo(self):
  103. """Test merge driver using echo command."""
  104. # Use a simple echo command that writes to the output file
  105. command = "echo merged > %A"
  106. driver = ProcessMergeDriver(command, "echo_driver")
  107. ancestor = b"ancestor content"
  108. ours = b"our content"
  109. theirs = b"their content"
  110. result, success = driver.merge(ancestor, ours, theirs, "test.txt", 7)
  111. # Expect different line endings on Windows vs Unix
  112. if sys.platform == "win32":
  113. expected = b"merged \r\n"
  114. else:
  115. expected = b"merged\n"
  116. self.assertEqual(result, expected)
  117. self.assertTrue(success) # echo returns 0
  118. def test_merge_with_cat(self):
  119. """Test merge driver using cat command."""
  120. # Cat all three files together
  121. command = "cat %O %B >> %A"
  122. driver = ProcessMergeDriver(command, "cat_driver")
  123. ancestor = b"ancestor\n"
  124. ours = b"ours\n"
  125. theirs = b"theirs\n"
  126. result, success = driver.merge(ancestor, ours, theirs)
  127. self.assertEqual(result, b"ours\nancestor\ntheirs\n")
  128. self.assertTrue(success)
  129. def test_merge_with_failure(self):
  130. """Test merge driver that fails."""
  131. # Use false command which always returns 1
  132. command = "false"
  133. driver = ProcessMergeDriver(command, "fail_driver")
  134. result, success = driver.merge(b"a", b"b", b"c")
  135. # Should return original content on failure
  136. self.assertEqual(result, b"b")
  137. self.assertFalse(success)
  138. def test_merge_with_markers(self):
  139. """Test merge driver with conflict marker size."""
  140. # Echo the marker size
  141. command = "echo marker size: %L > %A"
  142. driver = ProcessMergeDriver(command, "marker_driver")
  143. result, success = driver.merge(b"a", b"b", b"c", marker_size=15)
  144. # Expect different line endings on Windows vs Unix
  145. if sys.platform == "win32":
  146. expected = b"marker size: 15 \r\n"
  147. else:
  148. expected = b"marker size: 15\n"
  149. self.assertEqual(result, expected)
  150. self.assertTrue(success)
  151. def test_merge_with_path(self):
  152. """Test merge driver with file path."""
  153. # Echo the path
  154. command = "echo path: %P > %A"
  155. driver = ProcessMergeDriver(command, "path_driver")
  156. result, success = driver.merge(b"a", b"b", b"c", path="dir/file.xml")
  157. # Expect different line endings on Windows vs Unix
  158. if sys.platform == "win32":
  159. expected = b"path: dir/file.xml \r\n"
  160. else:
  161. expected = b"path: dir/file.xml\n"
  162. self.assertEqual(result, expected)
  163. self.assertTrue(success)
  164. class MergeBlobsWithDriversTests(unittest.TestCase):
  165. """Tests for merge_blobs with merge drivers."""
  166. def setUp(self):
  167. """Set up test fixtures."""
  168. # Check if merge3 module is available
  169. if importlib.util.find_spec("merge3") is None:
  170. raise unittest.SkipTest("merge3 module not available, skipping merge tests")
  171. # Reset global registry
  172. global _merge_driver_registry
  173. from dulwich import merge_drivers
  174. merge_drivers._merge_driver_registry = None
  175. def test_merge_blobs_without_driver(self):
  176. """Test merge_blobs without any merge driver."""
  177. base = Blob.from_string(b"base\ncontent\n")
  178. ours = Blob.from_string(b"base\nour change\n")
  179. theirs = Blob.from_string(b"base\ntheir change\n")
  180. result, has_conflicts = merge_blobs(base, ours, theirs)
  181. # Should use default merge and have conflicts
  182. self.assertTrue(has_conflicts)
  183. self.assertIn(b"<<<<<<< ours", result)
  184. self.assertIn(b">>>>>>> theirs", result)
  185. def test_merge_blobs_with_text_driver(self):
  186. """Test merge_blobs with 'text' merge driver (default)."""
  187. base = Blob.from_string(b"base\ncontent\n")
  188. ours = Blob.from_string(b"base\nour change\n")
  189. theirs = Blob.from_string(b"base\ntheir change\n")
  190. # Set up gitattributes
  191. patterns = [(Pattern(b"*.txt"), {b"merge": b"text"})]
  192. gitattributes = GitAttributes(patterns)
  193. result, has_conflicts = merge_blobs(
  194. base, ours, theirs, b"file.txt", gitattributes
  195. )
  196. # Should use default merge (text is the default)
  197. self.assertTrue(has_conflicts)
  198. self.assertIn(b"<<<<<<< ours", result)
  199. def test_merge_blobs_with_custom_driver(self):
  200. """Test merge_blobs with custom merge driver."""
  201. # Register a test driver
  202. registry = get_merge_driver_registry()
  203. test_driver = _TestMergeDriver("custom")
  204. registry.register_driver("custom", test_driver)
  205. base = Blob.from_string(b"base content")
  206. ours = Blob.from_string(b"our content")
  207. theirs = Blob.from_string(b"their content")
  208. # Set up gitattributes
  209. patterns = [(Pattern(b"*.xml"), {b"merge": b"custom"})]
  210. gitattributes = GitAttributes(patterns)
  211. result, has_conflicts = merge_blobs(
  212. base, ours, theirs, b"file.xml", gitattributes
  213. )
  214. # Check that our test driver was called
  215. self.assertTrue(test_driver.called)
  216. self.assertEqual(test_driver.last_args["ancestor"], b"base content")
  217. self.assertEqual(test_driver.last_args["ours"], b"our content")
  218. self.assertEqual(test_driver.last_args["theirs"], b"their content")
  219. self.assertEqual(test_driver.last_args["path"], "file.xml")
  220. # Check result
  221. self.assertIn(b"TEST MERGE OUTPUT", result)
  222. self.assertFalse(
  223. has_conflicts
  224. ) # Our test driver returns success=True when all differ, so had_conflicts=False
  225. def test_merge_blobs_with_process_driver(self):
  226. """Test merge_blobs with process-based merge driver."""
  227. # Set up config with merge driver
  228. config = ConfigDict()
  229. config.set((b"merge", b"union"), b"driver", b"echo process merge worked > %A")
  230. base = Blob.from_string(b"base")
  231. ours = Blob.from_string(b"ours")
  232. theirs = Blob.from_string(b"theirs")
  233. # Set up gitattributes
  234. patterns = [(Pattern(b"*.list"), {b"merge": b"union"})]
  235. gitattributes = GitAttributes(patterns)
  236. result, has_conflicts = merge_blobs(
  237. base, ours, theirs, b"file.list", gitattributes, config
  238. )
  239. # Check that the process driver was executed
  240. # Expect different line endings on Windows vs Unix
  241. if sys.platform == "win32":
  242. expected = b"process merge worked \r\n"
  243. else:
  244. expected = b"process merge worked\n"
  245. self.assertEqual(result, expected)
  246. self.assertFalse(has_conflicts) # echo returns 0
  247. def test_merge_blobs_driver_not_found(self):
  248. """Test merge_blobs when specified driver is not found."""
  249. base = Blob.from_string(b"base")
  250. ours = Blob.from_string(b"ours")
  251. theirs = Blob.from_string(b"theirs")
  252. # Set up gitattributes with non-existent driver
  253. patterns = [(Pattern(b"*.dat"), {b"merge": b"nonexistent"})]
  254. gitattributes = GitAttributes(patterns)
  255. result, has_conflicts = merge_blobs(
  256. base, ours, theirs, b"file.dat", gitattributes
  257. )
  258. # Should fall back to default merge
  259. self.assertTrue(has_conflicts)
  260. self.assertIn(b"<<<<<<< ours", result)
  261. class GlobalRegistryTests(unittest.TestCase):
  262. """Tests for global merge driver registry."""
  263. def setUp(self):
  264. """Reset global registry before each test."""
  265. global _merge_driver_registry
  266. from dulwich import merge_drivers
  267. merge_drivers._merge_driver_registry = None
  268. def test_get_merge_driver_registry_singleton(self):
  269. """Test that get_merge_driver_registry returns singleton."""
  270. registry1 = get_merge_driver_registry()
  271. registry2 = get_merge_driver_registry()
  272. self.assertIs(registry1, registry2)
  273. def test_get_merge_driver_registry_with_config(self):
  274. """Test updating config on existing registry."""
  275. # Get registry without config
  276. registry = get_merge_driver_registry()
  277. self.assertIsNone(registry._config)
  278. # Get with config
  279. config = ConfigDict()
  280. registry2 = get_merge_driver_registry(config)
  281. self.assertIs(registry2, registry)
  282. self.assertIsNotNone(registry._config)
  283. if __name__ == "__main__":
  284. unittest.main()