test_merge_drivers.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. # test_merge_drivers.py -- Tests for merge driver support
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as published by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. """Tests for merge driver support."""
  21. import importlib.util
  22. import sys
  23. import unittest
  24. from typing import Optional
  25. from dulwich.attrs import GitAttributes, Pattern
  26. from dulwich.config import ConfigDict
  27. from dulwich.merge import merge_blobs
  28. from dulwich.merge_drivers import (
  29. MergeDriverRegistry,
  30. ProcessMergeDriver,
  31. get_merge_driver_registry,
  32. )
  33. from dulwich.objects import Blob
  34. from . import DependencyMissing
  35. class _TestMergeDriver:
  36. """Test merge driver implementation."""
  37. def __init__(self, name: str = "test"):
  38. self.name = name
  39. self.called = False
  40. self.last_args = None
  41. def merge(
  42. self,
  43. ancestor: bytes,
  44. ours: bytes,
  45. theirs: bytes,
  46. path: Optional[str] = None,
  47. marker_size: int = 7,
  48. ) -> tuple[bytes, bool]:
  49. """Test merge implementation."""
  50. self.called = True
  51. self.last_args = {
  52. "ancestor": ancestor,
  53. "ours": ours,
  54. "theirs": theirs,
  55. "path": path,
  56. "marker_size": marker_size,
  57. }
  58. # Simple test merge: combine all three versions
  59. result = b"TEST MERGE OUTPUT\n"
  60. result += b"Ancestor: " + ancestor + b"\n"
  61. result += b"Ours: " + ours + b"\n"
  62. result += b"Theirs: " + theirs + b"\n"
  63. # Return success if all three are different
  64. success = ancestor != ours and ancestor != theirs
  65. return result, success
  66. class MergeDriverRegistryTests(unittest.TestCase):
  67. """Tests for MergeDriverRegistry."""
  68. def test_register_driver(self):
  69. """Test registering a merge driver."""
  70. registry = MergeDriverRegistry()
  71. driver = _TestMergeDriver("mydriver")
  72. registry.register_driver("mydriver", driver)
  73. retrieved = registry.get_driver("mydriver")
  74. self.assertIs(retrieved, driver)
  75. def test_register_factory(self):
  76. """Test registering a merge driver factory."""
  77. registry = MergeDriverRegistry()
  78. def create_driver():
  79. return _TestMergeDriver("factory_driver")
  80. registry.register_factory("factory", create_driver)
  81. driver = registry.get_driver("factory")
  82. self.assertIsInstance(driver, _TestMergeDriver)
  83. self.assertEqual(driver.name, "factory_driver")
  84. # Second call should return the same instance
  85. driver2 = registry.get_driver("factory")
  86. self.assertIs(driver2, driver)
  87. def test_get_nonexistent_driver(self):
  88. """Test getting a non-existent driver returns None."""
  89. registry = MergeDriverRegistry()
  90. driver = registry.get_driver("nonexistent")
  91. self.assertIsNone(driver)
  92. def test_create_from_config(self):
  93. """Test creating a merge driver from configuration."""
  94. config = ConfigDict()
  95. config.set((b"merge", b"xmlmerge"), b"driver", b"xmlmerge %O %A %B")
  96. registry = MergeDriverRegistry(config)
  97. driver = registry.get_driver("xmlmerge")
  98. self.assertIsInstance(driver, ProcessMergeDriver)
  99. self.assertEqual(driver.name, "xmlmerge")
  100. self.assertEqual(driver.command, "xmlmerge %O %A %B")
  101. class ProcessMergeDriverTests(unittest.TestCase):
  102. """Tests for ProcessMergeDriver."""
  103. def test_merge_with_echo(self):
  104. """Test merge driver using echo command."""
  105. # Use a simple echo command that writes to the output file
  106. command = "echo merged > %A"
  107. driver = ProcessMergeDriver(command, "echo_driver")
  108. ancestor = b"ancestor content"
  109. ours = b"our content"
  110. theirs = b"their content"
  111. result, success = driver.merge(ancestor, ours, theirs, "test.txt", 7)
  112. # Expect different line endings on Windows vs Unix
  113. if sys.platform == "win32":
  114. expected = b"merged \r\n"
  115. else:
  116. expected = b"merged\n"
  117. self.assertEqual(result, expected)
  118. self.assertTrue(success) # echo returns 0
  119. def test_merge_with_cat(self):
  120. """Test merge driver using cat command."""
  121. # Cat all three files together
  122. command = "cat %O %B >> %A"
  123. driver = ProcessMergeDriver(command, "cat_driver")
  124. ancestor = b"ancestor\n"
  125. ours = b"ours\n"
  126. theirs = b"theirs\n"
  127. result, success = driver.merge(ancestor, ours, theirs)
  128. self.assertEqual(result, b"ours\nancestor\ntheirs\n")
  129. self.assertTrue(success)
  130. def test_merge_with_failure(self):
  131. """Test merge driver that fails."""
  132. # Use false command which always returns 1
  133. command = "false"
  134. driver = ProcessMergeDriver(command, "fail_driver")
  135. result, success = driver.merge(b"a", b"b", b"c")
  136. # Should return original content on failure
  137. self.assertEqual(result, b"b")
  138. self.assertFalse(success)
  139. def test_merge_with_markers(self):
  140. """Test merge driver with conflict marker size."""
  141. # Echo the marker size
  142. command = "echo marker size: %L > %A"
  143. driver = ProcessMergeDriver(command, "marker_driver")
  144. result, success = driver.merge(b"a", b"b", b"c", marker_size=15)
  145. # Expect different line endings on Windows vs Unix
  146. if sys.platform == "win32":
  147. expected = b"marker size: 15 \r\n"
  148. else:
  149. expected = b"marker size: 15\n"
  150. self.assertEqual(result, expected)
  151. self.assertTrue(success)
  152. def test_merge_with_path(self):
  153. """Test merge driver with file path."""
  154. # Echo the path
  155. command = "echo path: %P > %A"
  156. driver = ProcessMergeDriver(command, "path_driver")
  157. result, success = driver.merge(b"a", b"b", b"c", path="dir/file.xml")
  158. # Expect different line endings on Windows vs Unix
  159. if sys.platform == "win32":
  160. expected = b"path: dir/file.xml \r\n"
  161. else:
  162. expected = b"path: dir/file.xml\n"
  163. self.assertEqual(result, expected)
  164. self.assertTrue(success)
  165. class MergeBlobsWithDriversTests(unittest.TestCase):
  166. """Tests for merge_blobs with merge drivers."""
  167. def setUp(self):
  168. """Set up test fixtures."""
  169. # Check if merge3 module is available
  170. if importlib.util.find_spec("merge3") is None:
  171. raise DependencyMissing("merge3")
  172. # Reset global registry
  173. global _merge_driver_registry
  174. from dulwich import merge_drivers
  175. merge_drivers._merge_driver_registry = None
  176. def test_merge_blobs_without_driver(self):
  177. """Test merge_blobs without any merge driver."""
  178. base = Blob.from_string(b"base\ncontent\n")
  179. ours = Blob.from_string(b"base\nour change\n")
  180. theirs = Blob.from_string(b"base\ntheir change\n")
  181. result, has_conflicts = merge_blobs(base, ours, theirs)
  182. # Should use default merge and have conflicts
  183. self.assertTrue(has_conflicts)
  184. self.assertIn(b"<<<<<<< ours", result)
  185. self.assertIn(b">>>>>>> theirs", result)
  186. def test_merge_blobs_with_text_driver(self):
  187. """Test merge_blobs with 'text' merge driver (default)."""
  188. base = Blob.from_string(b"base\ncontent\n")
  189. ours = Blob.from_string(b"base\nour change\n")
  190. theirs = Blob.from_string(b"base\ntheir change\n")
  191. # Set up gitattributes
  192. patterns = [(Pattern(b"*.txt"), {b"merge": b"text"})]
  193. gitattributes = GitAttributes(patterns)
  194. result, has_conflicts = merge_blobs(
  195. base, ours, theirs, b"file.txt", gitattributes
  196. )
  197. # Should use default merge (text is the default)
  198. self.assertTrue(has_conflicts)
  199. self.assertIn(b"<<<<<<< ours", result)
  200. def test_merge_blobs_with_custom_driver(self):
  201. """Test merge_blobs with custom merge driver."""
  202. # Register a test driver
  203. registry = get_merge_driver_registry()
  204. test_driver = _TestMergeDriver("custom")
  205. registry.register_driver("custom", test_driver)
  206. base = Blob.from_string(b"base content")
  207. ours = Blob.from_string(b"our content")
  208. theirs = Blob.from_string(b"their content")
  209. # Set up gitattributes
  210. patterns = [(Pattern(b"*.xml"), {b"merge": b"custom"})]
  211. gitattributes = GitAttributes(patterns)
  212. result, has_conflicts = merge_blobs(
  213. base, ours, theirs, b"file.xml", gitattributes
  214. )
  215. # Check that our test driver was called
  216. self.assertTrue(test_driver.called)
  217. self.assertEqual(test_driver.last_args["ancestor"], b"base content")
  218. self.assertEqual(test_driver.last_args["ours"], b"our content")
  219. self.assertEqual(test_driver.last_args["theirs"], b"their content")
  220. self.assertEqual(test_driver.last_args["path"], "file.xml")
  221. # Check result
  222. self.assertIn(b"TEST MERGE OUTPUT", result)
  223. self.assertFalse(
  224. has_conflicts
  225. ) # Our test driver returns success=True when all differ, so had_conflicts=False
  226. def test_merge_blobs_with_process_driver(self):
  227. """Test merge_blobs with process-based merge driver."""
  228. # Set up config with merge driver
  229. config = ConfigDict()
  230. config.set((b"merge", b"union"), b"driver", b"echo process merge worked > %A")
  231. base = Blob.from_string(b"base")
  232. ours = Blob.from_string(b"ours")
  233. theirs = Blob.from_string(b"theirs")
  234. # Set up gitattributes
  235. patterns = [(Pattern(b"*.list"), {b"merge": b"union"})]
  236. gitattributes = GitAttributes(patterns)
  237. result, has_conflicts = merge_blobs(
  238. base, ours, theirs, b"file.list", gitattributes, config
  239. )
  240. # Check that the process driver was executed
  241. # Expect different line endings on Windows vs Unix
  242. if sys.platform == "win32":
  243. expected = b"process merge worked \r\n"
  244. else:
  245. expected = b"process merge worked\n"
  246. self.assertEqual(result, expected)
  247. self.assertFalse(has_conflicts) # echo returns 0
  248. def test_merge_blobs_driver_not_found(self):
  249. """Test merge_blobs when specified driver is not found."""
  250. base = Blob.from_string(b"base")
  251. ours = Blob.from_string(b"ours")
  252. theirs = Blob.from_string(b"theirs")
  253. # Set up gitattributes with non-existent driver
  254. patterns = [(Pattern(b"*.dat"), {b"merge": b"nonexistent"})]
  255. gitattributes = GitAttributes(patterns)
  256. result, has_conflicts = merge_blobs(
  257. base, ours, theirs, b"file.dat", gitattributes
  258. )
  259. # Should fall back to default merge
  260. self.assertTrue(has_conflicts)
  261. self.assertIn(b"<<<<<<< ours", result)
  262. class GlobalRegistryTests(unittest.TestCase):
  263. """Tests for global merge driver registry."""
  264. def setUp(self):
  265. """Reset global registry before each test."""
  266. global _merge_driver_registry
  267. from dulwich import merge_drivers
  268. merge_drivers._merge_driver_registry = None
  269. def test_get_merge_driver_registry_singleton(self):
  270. """Test that get_merge_driver_registry returns singleton."""
  271. registry1 = get_merge_driver_registry()
  272. registry2 = get_merge_driver_registry()
  273. self.assertIs(registry1, registry2)
  274. def test_get_merge_driver_registry_with_config(self):
  275. """Test updating config on existing registry."""
  276. # Get registry without config
  277. registry = get_merge_driver_registry()
  278. self.assertIsNone(registry._config)
  279. # Get with config
  280. config = ConfigDict()
  281. registry2 = get_merge_driver_registry(config)
  282. self.assertIs(registry2, registry)
  283. self.assertIsNotNone(registry._config)
  284. if __name__ == "__main__":
  285. unittest.main()