test_merge_drivers.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. # test_merge_drivers.py -- Tests for merge driver support
  2. # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as published by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. """Tests for merge driver support."""
  21. import importlib.util
  22. import sys
  23. import unittest
  24. from dulwich.attrs import GitAttributes, Pattern
  25. from dulwich.config import ConfigDict
  26. from dulwich.merge import merge_blobs
  27. from dulwich.merge_drivers import (
  28. MergeDriverRegistry,
  29. ProcessMergeDriver,
  30. get_merge_driver_registry,
  31. )
  32. from dulwich.objects import Blob
  33. from . import DependencyMissing
  34. class _TestMergeDriver:
  35. """Test merge driver implementation."""
  36. def __init__(self, name: str = "test"):
  37. self.name = name
  38. self.called = False
  39. self.last_args = None
  40. def merge(
  41. self,
  42. ancestor: bytes,
  43. ours: bytes,
  44. theirs: bytes,
  45. path: str | None = None,
  46. marker_size: int = 7,
  47. ) -> tuple[bytes, bool]:
  48. """Test merge implementation."""
  49. self.called = True
  50. self.last_args = {
  51. "ancestor": ancestor,
  52. "ours": ours,
  53. "theirs": theirs,
  54. "path": path,
  55. "marker_size": marker_size,
  56. }
  57. # Simple test merge: combine all three versions
  58. result = b"TEST MERGE OUTPUT\n"
  59. result += b"Ancestor: " + ancestor + b"\n"
  60. result += b"Ours: " + ours + b"\n"
  61. result += b"Theirs: " + theirs + b"\n"
  62. # Return success if all three are different
  63. success = ancestor != ours and ancestor != theirs
  64. return result, success
  65. class MergeDriverRegistryTests(unittest.TestCase):
  66. """Tests for MergeDriverRegistry."""
  67. def test_register_driver(self):
  68. """Test registering a merge driver."""
  69. registry = MergeDriverRegistry()
  70. driver = _TestMergeDriver("mydriver")
  71. registry.register_driver("mydriver", driver)
  72. retrieved = registry.get_driver("mydriver")
  73. self.assertIs(retrieved, driver)
  74. def test_register_factory(self):
  75. """Test registering a merge driver factory."""
  76. registry = MergeDriverRegistry()
  77. def create_driver():
  78. return _TestMergeDriver("factory_driver")
  79. registry.register_factory("factory", create_driver)
  80. driver = registry.get_driver("factory")
  81. self.assertIsInstance(driver, _TestMergeDriver)
  82. self.assertEqual(driver.name, "factory_driver")
  83. # Second call should return the same instance
  84. driver2 = registry.get_driver("factory")
  85. self.assertIs(driver2, driver)
  86. def test_get_nonexistent_driver(self):
  87. """Test getting a non-existent driver returns None."""
  88. registry = MergeDriverRegistry()
  89. driver = registry.get_driver("nonexistent")
  90. self.assertIsNone(driver)
  91. def test_create_from_config(self):
  92. """Test creating a merge driver from configuration."""
  93. config = ConfigDict()
  94. config.set((b"merge", b"xmlmerge"), b"driver", b"xmlmerge %O %A %B")
  95. registry = MergeDriverRegistry(config)
  96. driver = registry.get_driver("xmlmerge")
  97. self.assertIsInstance(driver, ProcessMergeDriver)
  98. self.assertEqual(driver.name, "xmlmerge")
  99. self.assertEqual(driver.command, "xmlmerge %O %A %B")
  100. class ProcessMergeDriverTests(unittest.TestCase):
  101. """Tests for ProcessMergeDriver."""
  102. def test_merge_with_echo(self):
  103. """Test merge driver using echo command."""
  104. # Use a simple echo command that writes to the output file
  105. command = "echo merged > %A"
  106. driver = ProcessMergeDriver(command, "echo_driver")
  107. ancestor = b"ancestor content"
  108. ours = b"our content"
  109. theirs = b"their content"
  110. result, success = driver.merge(ancestor, ours, theirs, "test.txt", 7)
  111. # Expect different line endings on Windows vs Unix
  112. if sys.platform == "win32":
  113. expected = b"merged \r\n"
  114. else:
  115. expected = b"merged\n"
  116. self.assertEqual(result, expected)
  117. self.assertTrue(success) # echo returns 0
  118. def test_merge_with_cat(self):
  119. """Test merge driver using cat command."""
  120. # Cat all three files together
  121. command = "cat %O %B >> %A"
  122. driver = ProcessMergeDriver(command, "cat_driver")
  123. ancestor = b"ancestor\n"
  124. ours = b"ours\n"
  125. theirs = b"theirs\n"
  126. result, success = driver.merge(ancestor, ours, theirs)
  127. self.assertEqual(result, b"ours\nancestor\ntheirs\n")
  128. self.assertTrue(success)
  129. def test_merge_with_failure(self):
  130. """Test merge driver that fails."""
  131. # Use false command which always returns 1
  132. command = "false"
  133. driver = ProcessMergeDriver(command, "fail_driver")
  134. result, success = driver.merge(b"a", b"b", b"c")
  135. # Should return original content on failure
  136. self.assertEqual(result, b"b")
  137. self.assertFalse(success)
  138. def test_merge_with_markers(self):
  139. """Test merge driver with conflict marker size."""
  140. # Echo the marker size
  141. command = "echo marker size: %L > %A"
  142. driver = ProcessMergeDriver(command, "marker_driver")
  143. result, success = driver.merge(b"a", b"b", b"c", marker_size=15)
  144. # Expect different line endings on Windows vs Unix
  145. if sys.platform == "win32":
  146. expected = b"marker size: 15 \r\n"
  147. else:
  148. expected = b"marker size: 15\n"
  149. self.assertEqual(result, expected)
  150. self.assertTrue(success)
  151. def test_merge_with_path(self):
  152. """Test merge driver with file path."""
  153. # Echo the path
  154. command = "echo path: %P > %A"
  155. driver = ProcessMergeDriver(command, "path_driver")
  156. result, success = driver.merge(b"a", b"b", b"c", path="dir/file.xml")
  157. # Expect different line endings on Windows vs Unix
  158. if sys.platform == "win32":
  159. expected = b"path: dir/file.xml \r\n"
  160. else:
  161. expected = b"path: dir/file.xml\n"
  162. self.assertEqual(result, expected)
  163. self.assertTrue(success)
  164. class MergeBlobsWithDriversTests(unittest.TestCase):
  165. """Tests for merge_blobs with merge drivers."""
  166. def setUp(self):
  167. """Set up test fixtures."""
  168. # Check if merge3 module is available
  169. if importlib.util.find_spec("merge3") is None:
  170. raise DependencyMissing("merge3")
  171. # Reset global registry
  172. global _merge_driver_registry
  173. from dulwich import merge_drivers
  174. merge_drivers._merge_driver_registry = None
  175. def test_merge_blobs_without_driver(self):
  176. """Test merge_blobs without any merge driver."""
  177. base = Blob.from_string(b"base\ncontent\n")
  178. ours = Blob.from_string(b"base\nour change\n")
  179. theirs = Blob.from_string(b"base\ntheir change\n")
  180. result, has_conflicts = merge_blobs(base, ours, theirs)
  181. # Should use default merge and have conflicts
  182. self.assertTrue(has_conflicts)
  183. self.assertIn(b"<<<<<<< ours", result)
  184. self.assertIn(b">>>>>>> theirs", result)
  185. def test_merge_blobs_with_text_driver(self):
  186. """Test merge_blobs with 'text' merge driver (default)."""
  187. base = Blob.from_string(b"base\ncontent\n")
  188. ours = Blob.from_string(b"base\nour change\n")
  189. theirs = Blob.from_string(b"base\ntheir change\n")
  190. # Set up gitattributes
  191. patterns = [(Pattern(b"*.txt"), {b"merge": b"text"})]
  192. gitattributes = GitAttributes(patterns)
  193. result, has_conflicts = merge_blobs(
  194. base, ours, theirs, b"file.txt", gitattributes
  195. )
  196. # Should use default merge (text is the default)
  197. self.assertTrue(has_conflicts)
  198. self.assertIn(b"<<<<<<< ours", result)
  199. def test_merge_blobs_with_custom_driver(self):
  200. """Test merge_blobs with custom merge driver."""
  201. # Register a test driver
  202. registry = get_merge_driver_registry()
  203. test_driver = _TestMergeDriver("custom")
  204. registry.register_driver("custom", test_driver)
  205. base = Blob.from_string(b"base content")
  206. ours = Blob.from_string(b"our content")
  207. theirs = Blob.from_string(b"their content")
  208. # Set up gitattributes
  209. patterns = [(Pattern(b"*.xml"), {b"merge": b"custom"})]
  210. gitattributes = GitAttributes(patterns)
  211. result, has_conflicts = merge_blobs(
  212. base, ours, theirs, b"file.xml", gitattributes
  213. )
  214. # Check that our test driver was called
  215. self.assertTrue(test_driver.called)
  216. self.assertEqual(test_driver.last_args["ancestor"], b"base content")
  217. self.assertEqual(test_driver.last_args["ours"], b"our content")
  218. self.assertEqual(test_driver.last_args["theirs"], b"their content")
  219. self.assertEqual(test_driver.last_args["path"], "file.xml")
  220. # Check result
  221. self.assertIn(b"TEST MERGE OUTPUT", result)
  222. self.assertFalse(
  223. has_conflicts
  224. ) # Our test driver returns success=True when all differ, so had_conflicts=False
  225. def test_merge_blobs_with_process_driver(self):
  226. """Test merge_blobs with process-based merge driver."""
  227. # Set up config with merge driver
  228. config = ConfigDict()
  229. config.set((b"merge", b"union"), b"driver", b"echo process merge worked > %A")
  230. base = Blob.from_string(b"base")
  231. ours = Blob.from_string(b"ours")
  232. theirs = Blob.from_string(b"theirs")
  233. # Set up gitattributes
  234. patterns = [(Pattern(b"*.list"), {b"merge": b"union"})]
  235. gitattributes = GitAttributes(patterns)
  236. result, has_conflicts = merge_blobs(
  237. base, ours, theirs, b"file.list", gitattributes, config
  238. )
  239. # Check that the process driver was executed
  240. # Expect different line endings on Windows vs Unix
  241. if sys.platform == "win32":
  242. expected = b"process merge worked \r\n"
  243. else:
  244. expected = b"process merge worked\n"
  245. self.assertEqual(result, expected)
  246. self.assertFalse(has_conflicts) # echo returns 0
  247. def test_merge_blobs_driver_not_found(self):
  248. """Test merge_blobs when specified driver is not found."""
  249. base = Blob.from_string(b"base")
  250. ours = Blob.from_string(b"ours")
  251. theirs = Blob.from_string(b"theirs")
  252. # Set up gitattributes with non-existent driver
  253. patterns = [(Pattern(b"*.dat"), {b"merge": b"nonexistent"})]
  254. gitattributes = GitAttributes(patterns)
  255. result, has_conflicts = merge_blobs(
  256. base, ours, theirs, b"file.dat", gitattributes
  257. )
  258. # Should fall back to default merge
  259. self.assertTrue(has_conflicts)
  260. self.assertIn(b"<<<<<<< ours", result)
  261. class GlobalRegistryTests(unittest.TestCase):
  262. """Tests for global merge driver registry."""
  263. def setUp(self):
  264. """Reset global registry before each test."""
  265. global _merge_driver_registry
  266. from dulwich import merge_drivers
  267. merge_drivers._merge_driver_registry = None
  268. def test_get_merge_driver_registry_singleton(self):
  269. """Test that get_merge_driver_registry returns singleton."""
  270. registry1 = get_merge_driver_registry()
  271. registry2 = get_merge_driver_registry()
  272. self.assertIs(registry1, registry2)
  273. def test_get_merge_driver_registry_with_config(self):
  274. """Test updating config on existing registry."""
  275. # Get registry without config
  276. registry = get_merge_driver_registry()
  277. self.assertIsNone(registry._config)
  278. # Get with config
  279. config = ConfigDict()
  280. registry2 = get_merge_driver_registry(config)
  281. self.assertIs(registry2, registry)
  282. self.assertIsNotNone(registry._config)
  283. if __name__ == "__main__":
  284. unittest.main()