123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- # test_merge_drivers.py -- Tests for merge driver support
- # Copyright (C) 2025 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Tests for merge driver support."""
- import sys
- import unittest
- from typing import Optional
- from dulwich.attrs import GitAttributes, Pattern
- from dulwich.config import ConfigDict
- from dulwich.merge import merge_blobs
- from dulwich.merge_drivers import (
- MergeDriverRegistry,
- ProcessMergeDriver,
- get_merge_driver_registry,
- )
- from dulwich.objects import Blob
- class _TestMergeDriver:
- """Test merge driver implementation."""
- def __init__(self, name: str = "test"):
- self.name = name
- self.called = False
- self.last_args = None
- def merge(
- self,
- ancestor: bytes,
- ours: bytes,
- theirs: bytes,
- path: Optional[str] = None,
- marker_size: int = 7,
- ) -> tuple[bytes, bool]:
- """Test merge implementation."""
- self.called = True
- self.last_args = {
- "ancestor": ancestor,
- "ours": ours,
- "theirs": theirs,
- "path": path,
- "marker_size": marker_size,
- }
- # Simple test merge: combine all three versions
- result = b"TEST MERGE OUTPUT\n"
- result += b"Ancestor: " + ancestor + b"\n"
- result += b"Ours: " + ours + b"\n"
- result += b"Theirs: " + theirs + b"\n"
- # Return success if all three are different
- success = ancestor != ours and ancestor != theirs
- return result, success
- class MergeDriverRegistryTests(unittest.TestCase):
- """Tests for MergeDriverRegistry."""
- def test_register_driver(self):
- """Test registering a merge driver."""
- registry = MergeDriverRegistry()
- driver = _TestMergeDriver("mydriver")
- registry.register_driver("mydriver", driver)
- retrieved = registry.get_driver("mydriver")
- self.assertIs(retrieved, driver)
- def test_register_factory(self):
- """Test registering a merge driver factory."""
- registry = MergeDriverRegistry()
- def create_driver():
- return _TestMergeDriver("factory_driver")
- registry.register_factory("factory", create_driver)
- driver = registry.get_driver("factory")
- self.assertIsInstance(driver, _TestMergeDriver)
- self.assertEqual(driver.name, "factory_driver")
- # Second call should return the same instance
- driver2 = registry.get_driver("factory")
- self.assertIs(driver2, driver)
- def test_get_nonexistent_driver(self):
- """Test getting a non-existent driver returns None."""
- registry = MergeDriverRegistry()
- driver = registry.get_driver("nonexistent")
- self.assertIsNone(driver)
- def test_create_from_config(self):
- """Test creating a merge driver from configuration."""
- config = ConfigDict()
- config.set((b"merge", b"xmlmerge"), b"driver", b"xmlmerge %O %A %B")
- registry = MergeDriverRegistry(config)
- driver = registry.get_driver("xmlmerge")
- self.assertIsInstance(driver, ProcessMergeDriver)
- self.assertEqual(driver.name, "xmlmerge")
- self.assertEqual(driver.command, "xmlmerge %O %A %B")
- class ProcessMergeDriverTests(unittest.TestCase):
- """Tests for ProcessMergeDriver."""
- def test_merge_with_echo(self):
- """Test merge driver using echo command."""
- # Use a simple echo command that writes to the output file
- command = "echo merged > %A"
- driver = ProcessMergeDriver(command, "echo_driver")
- ancestor = b"ancestor content"
- ours = b"our content"
- theirs = b"their content"
- result, success = driver.merge(ancestor, ours, theirs, "test.txt", 7)
- # Expect different line endings on Windows vs Unix
- if sys.platform == "win32":
- expected = b"merged \r\n"
- else:
- expected = b"merged\n"
- self.assertEqual(result, expected)
- self.assertTrue(success) # echo returns 0
- def test_merge_with_cat(self):
- """Test merge driver using cat command."""
- # Cat all three files together
- command = "cat %O %B >> %A"
- driver = ProcessMergeDriver(command, "cat_driver")
- ancestor = b"ancestor\n"
- ours = b"ours\n"
- theirs = b"theirs\n"
- result, success = driver.merge(ancestor, ours, theirs)
- self.assertEqual(result, b"ours\nancestor\ntheirs\n")
- self.assertTrue(success)
- def test_merge_with_failure(self):
- """Test merge driver that fails."""
- # Use false command which always returns 1
- command = "false"
- driver = ProcessMergeDriver(command, "fail_driver")
- result, success = driver.merge(b"a", b"b", b"c")
- # Should return original content on failure
- self.assertEqual(result, b"b")
- self.assertFalse(success)
- def test_merge_with_markers(self):
- """Test merge driver with conflict marker size."""
- # Echo the marker size
- command = "echo marker size: %L > %A"
- driver = ProcessMergeDriver(command, "marker_driver")
- result, success = driver.merge(b"a", b"b", b"c", marker_size=15)
- # Expect different line endings on Windows vs Unix
- if sys.platform == "win32":
- expected = b"marker size: 15 \r\n"
- else:
- expected = b"marker size: 15\n"
- self.assertEqual(result, expected)
- self.assertTrue(success)
- def test_merge_with_path(self):
- """Test merge driver with file path."""
- # Echo the path
- command = "echo path: %P > %A"
- driver = ProcessMergeDriver(command, "path_driver")
- result, success = driver.merge(b"a", b"b", b"c", path="dir/file.xml")
- # Expect different line endings on Windows vs Unix
- if sys.platform == "win32":
- expected = b"path: dir/file.xml \r\n"
- else:
- expected = b"path: dir/file.xml\n"
- self.assertEqual(result, expected)
- self.assertTrue(success)
- class MergeBlobsWithDriversTests(unittest.TestCase):
- """Tests for merge_blobs with merge drivers."""
- def setUp(self):
- """Set up test fixtures."""
- # Reset global registry
- global _merge_driver_registry
- from dulwich import merge_drivers
- merge_drivers._merge_driver_registry = None
- def test_merge_blobs_without_driver(self):
- """Test merge_blobs without any merge driver."""
- base = Blob.from_string(b"base\ncontent\n")
- ours = Blob.from_string(b"base\nour change\n")
- theirs = Blob.from_string(b"base\ntheir change\n")
- result, has_conflicts = merge_blobs(base, ours, theirs)
- # Should use default merge and have conflicts
- self.assertTrue(has_conflicts)
- self.assertIn(b"<<<<<<< ours", result)
- self.assertIn(b">>>>>>> theirs", result)
- def test_merge_blobs_with_text_driver(self):
- """Test merge_blobs with 'text' merge driver (default)."""
- base = Blob.from_string(b"base\ncontent\n")
- ours = Blob.from_string(b"base\nour change\n")
- theirs = Blob.from_string(b"base\ntheir change\n")
- # Set up gitattributes
- patterns = [(Pattern(b"*.txt"), {b"merge": b"text"})]
- gitattributes = GitAttributes(patterns)
- result, has_conflicts = merge_blobs(
- base, ours, theirs, b"file.txt", gitattributes
- )
- # Should use default merge (text is the default)
- self.assertTrue(has_conflicts)
- self.assertIn(b"<<<<<<< ours", result)
- def test_merge_blobs_with_custom_driver(self):
- """Test merge_blobs with custom merge driver."""
- # Register a test driver
- registry = get_merge_driver_registry()
- test_driver = _TestMergeDriver("custom")
- registry.register_driver("custom", test_driver)
- base = Blob.from_string(b"base content")
- ours = Blob.from_string(b"our content")
- theirs = Blob.from_string(b"their content")
- # Set up gitattributes
- patterns = [(Pattern(b"*.xml"), {b"merge": b"custom"})]
- gitattributes = GitAttributes(patterns)
- result, has_conflicts = merge_blobs(
- base, ours, theirs, b"file.xml", gitattributes
- )
- # Check that our test driver was called
- self.assertTrue(test_driver.called)
- self.assertEqual(test_driver.last_args["ancestor"], b"base content")
- self.assertEqual(test_driver.last_args["ours"], b"our content")
- self.assertEqual(test_driver.last_args["theirs"], b"their content")
- self.assertEqual(test_driver.last_args["path"], "file.xml")
- # Check result
- self.assertIn(b"TEST MERGE OUTPUT", result)
- self.assertFalse(
- has_conflicts
- ) # Our test driver returns success=True when all differ, so had_conflicts=False
- def test_merge_blobs_with_process_driver(self):
- """Test merge_blobs with process-based merge driver."""
- # Set up config with merge driver
- config = ConfigDict()
- config.set((b"merge", b"union"), b"driver", b"echo process merge worked > %A")
- base = Blob.from_string(b"base")
- ours = Blob.from_string(b"ours")
- theirs = Blob.from_string(b"theirs")
- # Set up gitattributes
- patterns = [(Pattern(b"*.list"), {b"merge": b"union"})]
- gitattributes = GitAttributes(patterns)
- result, has_conflicts = merge_blobs(
- base, ours, theirs, b"file.list", gitattributes, config
- )
- # Check that the process driver was executed
- # Expect different line endings on Windows vs Unix
- if sys.platform == "win32":
- expected = b"process merge worked \r\n"
- else:
- expected = b"process merge worked\n"
- self.assertEqual(result, expected)
- self.assertFalse(has_conflicts) # echo returns 0
- def test_merge_blobs_driver_not_found(self):
- """Test merge_blobs when specified driver is not found."""
- base = Blob.from_string(b"base")
- ours = Blob.from_string(b"ours")
- theirs = Blob.from_string(b"theirs")
- # Set up gitattributes with non-existent driver
- patterns = [(Pattern(b"*.dat"), {b"merge": b"nonexistent"})]
- gitattributes = GitAttributes(patterns)
- result, has_conflicts = merge_blobs(
- base, ours, theirs, b"file.dat", gitattributes
- )
- # Should fall back to default merge
- self.assertTrue(has_conflicts)
- self.assertIn(b"<<<<<<< ours", result)
- class GlobalRegistryTests(unittest.TestCase):
- """Tests for global merge driver registry."""
- def setUp(self):
- """Reset global registry before each test."""
- global _merge_driver_registry
- from dulwich import merge_drivers
- merge_drivers._merge_driver_registry = None
- def test_get_merge_driver_registry_singleton(self):
- """Test that get_merge_driver_registry returns singleton."""
- registry1 = get_merge_driver_registry()
- registry2 = get_merge_driver_registry()
- self.assertIs(registry1, registry2)
- def test_get_merge_driver_registry_with_config(self):
- """Test updating config on existing registry."""
- # Get registry without config
- registry = get_merge_driver_registry()
- self.assertIsNone(registry._config)
- # Get with config
- config = ConfigDict()
- registry2 = get_merge_driver_registry(config)
- self.assertIs(registry2, registry)
- self.assertIsNotNone(registry._config)
- if __name__ == "__main__":
- unittest.main()
|