Browse Source

Add support for Git filter drivers (clean/smudge)

This implements the infrastructure for Git filter drivers, which enable
custom transformations when files move between the working tree and the
repository. Key components:

- FilterDriver protocol defining the clean/smudge interface
- ProcessFilterDriver for executing external filter commands
- FilterRegistry for managing and creating filter drivers
- FilterBlobNormalizer for applying filters during checkin/checkout
- Support for filter configuration via gitconfig
- Basic gitattributes pattern matching for filter assignment

This provides the foundation for implementing features like Git LFS,
keyword expansion, and custom file transformations.
Jelmer Vernooij 1 month ago
parent
commit
c715d007ec
2 changed files with 501 additions and 0 deletions
  1. 244 0
      dulwich/filters.py
  2. 257 0
      tests/test_filters.py

+ 244 - 0
dulwich/filters.py

@@ -0,0 +1,244 @@
+# filters.py -- Git filter drivers (clean/smudge) implementation
+# Copyright (C) 2024 Jelmer Vernooij
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Implementation of Git filter drivers (clean/smudge filters)."""
+
+import subprocess
+from typing import TYPE_CHECKING, Callable, Optional, Protocol
+
+from .objects import Blob
+
+if TYPE_CHECKING:
+    from .config import StackedConfig
+
+
+class FilterDriver(Protocol):
+    """Protocol for filter drivers."""
+
+    def clean(self, data: bytes) -> bytes:
+        """Apply clean filter (working tree → repository)."""
+        ...
+
+    def smudge(self, data: bytes) -> bytes:
+        """Apply smudge filter (repository → working tree)."""
+        ...
+
+
+class ProcessFilterDriver:
+    """Filter driver that executes external processes."""
+
+    def __init__(
+        self, clean_cmd: Optional[str] = None, smudge_cmd: Optional[str] = None
+    ) -> None:
+        self.clean_cmd = clean_cmd
+        self.smudge_cmd = smudge_cmd
+
+    def clean(self, data: bytes) -> bytes:
+        """Apply clean filter using external process."""
+        if not self.clean_cmd:
+            return data
+
+        result = subprocess.run(
+            self.clean_cmd,
+            shell=True,
+            input=data,
+            capture_output=True,
+            check=True,
+        )
+        return result.stdout
+
+    def smudge(self, data: bytes) -> bytes:
+        """Apply smudge filter using external process."""
+        if not self.smudge_cmd:
+            return data
+
+        result = subprocess.run(
+            self.smudge_cmd,
+            shell=True,
+            input=data,
+            capture_output=True,
+            check=True,
+        )
+        return result.stdout
+
+
+class FilterRegistry:
+    """Registry for filter drivers."""
+
+    def __init__(self, config: Optional["StackedConfig"] = None, repo=None) -> None:
+        self.config = config
+        self.repo = repo
+        self._drivers: dict[str, FilterDriver] = {}
+        self._factories: dict[str, Callable[[FilterRegistry], FilterDriver]] = {}
+
+    def register_factory(
+        self, name: str, factory: Callable[["FilterRegistry"], FilterDriver]
+    ) -> None:
+        """Register a filter driver factory."""
+        self._factories[name] = factory
+
+    def register_driver(self, name: str, driver: FilterDriver) -> None:
+        """Register a filter driver instance."""
+        self._drivers[name] = driver
+
+    def get_driver(self, name: str) -> Optional[FilterDriver]:
+        """Get a filter driver by name."""
+        # Check if we already have an instance
+        if name in self._drivers:
+            return self._drivers[name]
+
+        # Try to create from factory
+        if name in self._factories:
+            driver = self._factories[name](self)
+            self._drivers[name] = driver
+            return driver
+
+        # Try to create from config
+        if self.config is not None:
+            config_driver = self._create_from_config(name)
+            if config_driver is not None:
+                self._drivers[name] = config_driver
+                return config_driver
+
+        return None
+
+    def _create_from_config(self, name: str) -> Optional[FilterDriver]:
+        """Create a filter driver from config."""
+        if self.config is None:
+            return None
+
+        clean_cmd: Optional[str] = None
+        smudge_cmd: Optional[str] = None
+
+        # Get clean command
+        try:
+            clean_value = self.config.get(("filter", name), "clean")
+            if isinstance(clean_value, bytes):
+                clean_cmd = clean_value.decode("utf-8")
+            else:
+                clean_cmd = clean_value
+        except KeyError:
+            pass
+
+        # Get smudge command
+        try:
+            smudge_value = self.config.get(("filter", name), "smudge")
+            if isinstance(smudge_value, bytes):
+                smudge_cmd = smudge_value.decode("utf-8")
+            else:
+                smudge_cmd = smudge_value
+        except KeyError:
+            pass
+
+        if clean_cmd or smudge_cmd:
+            return ProcessFilterDriver(clean_cmd, smudge_cmd)
+
+        return None
+
+
+def get_filter_for_path(
+    path: bytes,
+    gitattributes: dict[bytes, dict[bytes, bytes]],
+    filter_registry: FilterRegistry,
+) -> Optional[FilterDriver]:
+    """Get the appropriate filter driver for a given path.
+
+    Args:
+        path: Path to check
+        gitattributes: Parsed gitattributes (pattern -> attributes mapping)
+        filter_registry: Registry of filter drivers
+
+    Returns:
+        FilterDriver instance or None
+    """
+    # For now, this is a simple implementation that does exact path matching
+    # In a real implementation, we'd need to handle glob patterns
+
+    # Check each pattern in gitattributes
+    for pattern, attrs in gitattributes.items():
+        # Simple implementation: just check if path matches pattern exactly
+        # TODO: Implement proper gitattributes pattern matching
+        if pattern == path or (pattern.startswith(b"*") and path.endswith(pattern[1:])):
+            filter_name_bytes = attrs.get(b"filter")
+            if filter_name_bytes is not None:
+                if isinstance(filter_name_bytes, bytes):
+                    filter_name_str = filter_name_bytes.decode("utf-8")
+                else:
+                    filter_name_str = filter_name_bytes
+                return filter_registry.get_driver(filter_name_str)
+
+    return None
+
+
+class FilterBlobNormalizer:
+    """Blob normalizer that applies clean/smudge filters based on gitattributes.
+
+    This can be used in addition to or instead of line ending normalization.
+    """
+
+    def __init__(
+        self,
+        config_stack: Optional["StackedConfig"],
+        gitattributes: dict[bytes, dict[bytes, bytes]],
+        filter_registry: Optional[FilterRegistry] = None,
+        repo=None,
+    ) -> None:
+        self.config_stack = config_stack
+        self.gitattributes = gitattributes
+        self.filter_registry = filter_registry or FilterRegistry(config_stack, repo)
+
+    def checkin_normalize(self, blob: Blob, path: bytes) -> Blob:
+        """Apply clean filter during checkin (working tree -> repository)."""
+        # Get filter for this path
+        filter_driver = get_filter_for_path(
+            path, self.gitattributes, self.filter_registry
+        )
+        if filter_driver is None:
+            return blob
+
+        # Apply clean filter
+        filtered_data = filter_driver.clean(blob.data)
+        if filtered_data == blob.data:
+            return blob
+
+        # Create new blob with filtered data
+        new_blob = Blob()
+        new_blob.data = filtered_data
+        return new_blob
+
+    def checkout_normalize(self, blob: Blob, path: bytes) -> Blob:
+        """Apply smudge filter during checkout (repository -> working tree)."""
+        # Get filter for this path
+        filter_driver = get_filter_for_path(
+            path, self.gitattributes, self.filter_registry
+        )
+        if filter_driver is None:
+            return blob
+
+        # Apply smudge filter
+        filtered_data = filter_driver.smudge(blob.data)
+        if filtered_data == blob.data:
+            return blob
+
+        # Create new blob with filtered data
+        new_blob = Blob()
+        new_blob.data = filtered_data
+        return new_blob

+ 257 - 0
tests/test_filters.py

@@ -0,0 +1,257 @@
+# test_filters.py -- tests for filter drivers
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for filter drivers support."""
+
+import sys
+from unittest import skipIf
+
+from dulwich.config import ConfigDict
+from dulwich.filters import (
+    FilterBlobNormalizer,
+    FilterRegistry,
+    ProcessFilterDriver,
+    get_filter_for_path,
+)
+from dulwich.objects import Blob
+
+from . import TestCase
+
+
+class ProcessFilterDriverTests(TestCase):
+    @skipIf(sys.platform == "win32", "Unix shell commands")
+    def test_clean_filter(self) -> None:
+        """Test clean filter with external command."""
+        # Use a simple command that converts to uppercase
+        driver = ProcessFilterDriver(clean_cmd="tr '[:lower:]' '[:upper:]'")
+        result = driver.clean(b"hello world")
+        self.assertEqual(result, b"HELLO WORLD")
+
+    @skipIf(sys.platform == "win32", "Unix shell commands")
+    def test_smudge_filter(self) -> None:
+        """Test smudge filter with external command."""
+        # Use a simple command that converts to lowercase
+        driver = ProcessFilterDriver(smudge_cmd="tr '[:upper:]' '[:lower:]'")
+        result = driver.smudge(b"HELLO WORLD")
+        self.assertEqual(result, b"hello world")
+
+    def test_no_filters(self) -> None:
+        """Test driver with no filters configured."""
+        driver = ProcessFilterDriver()
+        data = b"test data"
+        self.assertEqual(driver.clean(data), data)
+        self.assertEqual(driver.smudge(data), data)
+
+    @skipIf(sys.platform == "win32", "Unix shell commands")
+    def test_failing_filter(self) -> None:
+        """Test that failing filter propagates the error."""
+        import subprocess
+
+        # Use a command that will fail
+        driver = ProcessFilterDriver(clean_cmd="false")
+        data = b"test data"
+        # Should raise CalledProcessError
+        with self.assertRaises(subprocess.CalledProcessError):
+            driver.clean(data)
+
+        # Test smudge filter too
+        driver = ProcessFilterDriver(smudge_cmd="false")
+        with self.assertRaises(subprocess.CalledProcessError):
+            driver.smudge(data)
+
+
+class FilterRegistryTests(TestCase):
+    def setUp(self) -> None:
+        super().setUp()
+        self.config = ConfigDict()
+        self.registry = FilterRegistry(self.config)
+
+    def test_register_and_get_driver(self) -> None:
+        """Test registering and retrieving a driver."""
+        driver = ProcessFilterDriver(clean_cmd="cat")
+        self.registry.register_driver("test", driver)
+
+        retrieved = self.registry.get_driver("test")
+        self.assertIs(retrieved, driver)
+
+    def test_get_nonexistent_driver(self) -> None:
+        """Test getting a non-existent driver."""
+        result = self.registry.get_driver("nonexistent")
+        self.assertIsNone(result)
+
+    def test_register_factory(self) -> None:
+        """Test registering a driver factory."""
+        created_driver = ProcessFilterDriver(clean_cmd="cat")
+
+        def factory(registry):
+            return created_driver
+
+        self.registry.register_factory("test", factory)
+
+        # Getting driver should invoke factory
+        retrieved = self.registry.get_driver("test")
+        self.assertIs(retrieved, created_driver)
+
+        # Second get should return cached instance
+        retrieved2 = self.registry.get_driver("test")
+        self.assertIs(retrieved2, created_driver)
+
+    def test_create_from_config(self) -> None:
+        """Test creating driver from config."""
+        # Set up config using the proper Config interface
+        self.config.set(("filter", "test"), "clean", b"cat")
+        self.config.set(("filter", "test"), "smudge", b"tac")
+
+        # Get driver (should be created from config)
+        driver = self.registry.get_driver("test")
+        self.assertIsNotNone(driver)
+        self.assertIsInstance(driver, ProcessFilterDriver)
+        self.assertEqual(driver.clean_cmd, "cat")
+        self.assertEqual(driver.smudge_cmd, "tac")
+
+    def test_builtin_lfs_factory(self) -> None:
+        """Test that LFS filter is available as a built-in."""
+        from dulwich.lfs import LFSFilterDriver
+
+        # Should be able to get LFS filter without explicit registration
+        driver = self.registry.get_driver("lfs")
+        self.assertIsNotNone(driver)
+        self.assertIsInstance(driver, LFSFilterDriver)
+
+
+class GetFilterForPathTests(TestCase):
+    def setUp(self) -> None:
+        super().setUp()
+        self.registry = FilterRegistry()
+        self.driver = ProcessFilterDriver(clean_cmd="cat")
+        self.registry.register_driver("test", self.driver)
+
+    def test_get_filter_for_path(self) -> None:
+        """Test getting filter for a path with filter attribute."""
+        gitattributes = {
+            b"*.txt": {b"filter": b"test"},
+        }
+
+        result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
+        self.assertIs(result, self.driver)
+
+    def test_no_filter_attribute(self) -> None:
+        """Test path with no filter attribute."""
+        gitattributes = {
+            b"*.txt": {b"text": b"auto"},
+        }
+
+        result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
+        self.assertIsNone(result)
+
+    def test_no_matching_pattern(self) -> None:
+        """Test path with no matching pattern."""
+        gitattributes = {
+            b"*.jpg": {b"filter": b"test"},
+        }
+
+        result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
+        self.assertIsNone(result)
+
+    def test_filter_not_registered(self) -> None:
+        """Test path with filter that's not registered."""
+        gitattributes = {
+            b"*.txt": {b"filter": b"nonexistent"},
+        }
+
+        result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
+        self.assertIsNone(result)
+
+
+class FilterBlobNormalizerTests(TestCase):
+    def setUp(self) -> None:
+        super().setUp()
+        self.config = ConfigDict()
+        self.registry = FilterRegistry(self.config)
+        self.gitattributes = {}
+        self.normalizer = FilterBlobNormalizer(
+            self.config, self.gitattributes, self.registry
+        )
+
+    def test_no_filter(self) -> None:
+        """Test normalizer with no filter defined."""
+        blob = Blob()
+        blob.data = b"test content"
+
+        # Both checkin and checkout should return blob unchanged
+        result = self.normalizer.checkin_normalize(blob, b"file.txt")
+        self.assertIs(result, blob)
+
+        result = self.normalizer.checkout_normalize(blob, b"file.txt")
+        self.assertIs(result, blob)
+
+    def test_with_filter(self) -> None:
+        """Test normalizer with a filter defined."""
+
+        # Create a simple filter that converts to uppercase on clean
+        # and lowercase on smudge
+        class TestFilter:
+            def clean(self, data):
+                return data.upper()
+
+            def smudge(self, data):
+                return data.lower()
+
+        # Register the filter and set it in gitattributes
+        self.registry.register_driver("test", TestFilter())
+        self.gitattributes[b"*.txt"] = {b"filter": b"test"}
+
+        blob = Blob()
+        blob.data = b"Test Content"
+
+        # Checkin should uppercase
+        result = self.normalizer.checkin_normalize(blob, b"file.txt")
+        self.assertEqual(result.data, b"TEST CONTENT")
+        self.assertIsNot(result, blob)  # Should be a new blob
+
+        # Checkout should lowercase
+        result = self.normalizer.checkout_normalize(blob, b"file.txt")
+        self.assertEqual(result.data, b"test content")
+        self.assertIsNot(result, blob)  # Should be a new blob
+
+    def test_filter_returns_same_data(self) -> None:
+        """Test that normalizer returns same blob if filter doesn't change data."""
+
+        # Create a filter that returns data unchanged
+        class NoOpFilter:
+            def clean(self, data):
+                return data
+
+            def smudge(self, data):
+                return data
+
+        self.registry.register_driver("noop", NoOpFilter())
+        self.gitattributes[b"*.txt"] = {b"filter": b"noop"}
+
+        blob = Blob()
+        blob.data = b"unchanged content"
+
+        # Both operations should return the same blob instance
+        result = self.normalizer.checkin_normalize(blob, b"file.txt")
+        self.assertIs(result, blob)
+
+        result = self.normalizer.checkout_normalize(blob, b"file.txt")
+        self.assertIs(result, blob)