2
0
Эх сурвалжийг харах

Fix test failures by adding missing test modules and fixing filter tests

- Add missing test modules to tests/__init__.py:
  * test_filter_branch
  * test_filters
  * test_lfs_integration
  * test_whitespace

- Fix FilterContext
Jelmer Vernooij 4 сар өмнө
parent
commit
7bd123b00b

+ 90 - 38
dulwich/filters.py

@@ -74,6 +74,41 @@ class FilterDriver(TypingProtocol):
         ...
 
 
+class CompositeFilterDriver:
+    """Filter driver that chains multiple filters together."""
+
+    def __init__(self, filters: list[FilterDriver]) -> None:
+        """Initialize CompositeFilterDriver.
+
+        Args:
+            filters: List of filters to apply in order
+        """
+        self.filters = filters
+
+    def clean(self, data: bytes) -> bytes:
+        """Apply all clean filters in order."""
+        for filter_driver in self.filters:
+            data = filter_driver.clean(data)
+        return data
+
+    def smudge(self, data: bytes, path: bytes = b"") -> bytes:
+        """Apply all smudge filters in reverse order."""
+        # For smudge, apply filters in reverse order
+        for filter_driver in reversed(self.filters):
+            data = filter_driver.smudge(data, path)
+        return data
+
+    def cleanup(self) -> None:
+        """Clean up all filter drivers."""
+        for filter_driver in self.filters:
+            filter_driver.cleanup()
+
+    def reuse(self, config: "StackedConfig", filter_name: str) -> bool:
+        """Check if all filters can be reused."""
+        # A composite filter can only be reused if all its components can
+        return all(f.reuse(config, filter_name) for f in self.filters)
+
+
 class ProcessFilterDriver:
     """Filter driver that executes external processes."""
 
@@ -584,32 +619,35 @@ class FilterRegistry:
         # Get process command (preferred over clean/smudge for performance)
         try:
             process_cmd_raw = self.config.get(("filter", name), "process")
+        except KeyError:
+            pass
+        else:
             if isinstance(process_cmd_raw, bytes):
                 process_cmd = process_cmd_raw.decode("utf-8")
             else:
                 process_cmd = process_cmd_raw
-        except KeyError:
-            pass
 
         # Get clean command
         try:
             clean_cmd_raw = self.config.get(("filter", name), "clean")
+        except KeyError:
+            pass
+        else:
             if isinstance(clean_cmd_raw, bytes):
                 clean_cmd = clean_cmd_raw.decode("utf-8")
             else:
                 clean_cmd = clean_cmd_raw
-        except KeyError:
-            pass
 
         # Get smudge command
         try:
             smudge_cmd_raw = self.config.get(("filter", name), "smudge")
+        except KeyError:
+            pass
+        else:
             if isinstance(smudge_cmd_raw, bytes):
                 smudge_cmd = smudge_cmd_raw.decode("utf-8")
             else:
                 smudge_cmd = smudge_cmd_raw
-        except KeyError:
-            pass
 
         # Get required flag (defaults to False)
         required = self.config.get_boolean(("filter", name), "required", False)
@@ -664,13 +702,14 @@ class FilterRegistry:
         # Parse autocrlf as bytes
         try:
             autocrlf_raw = self.config.get("core", "autocrlf")
+        except KeyError:
+            return
+        else:
             autocrlf: bytes = (
                 autocrlf_raw.lower()
                 if isinstance(autocrlf_raw, bytes)
                 else str(autocrlf_raw).lower().encode("ascii")
             )
-        except KeyError:
-            return
 
         # If autocrlf is enabled, register the text filter
         if autocrlf in (b"true", b"input"):
@@ -708,11 +747,42 @@ def get_filter_for_path(
     # Get all attributes for this path
     attributes = gitattributes.match_path(path)
 
+    # Collect filters to apply
+    filters: list[FilterDriver] = []
+
+    # Check for text attribute first (it should be applied before custom filters)
+    text_attr = attributes.get(b"text")
+    if text_attr is True:
+        # Add text filter for line ending conversion
+        text_filter = get_driver("text")
+        if text_filter is not None:
+            filters.append(text_filter)
+    elif text_attr is False:
+        # -text means binary, no conversion - but still check for custom filters
+        pass
+    else:
+        # If no explicit text attribute, check if autocrlf is enabled
+        # When autocrlf is true/input, files are treated as text by default
+        if registry.config is not None:
+            try:
+                autocrlf_raw = registry.config.get("core", "autocrlf")
+            except KeyError:
+                pass
+            else:
+                autocrlf: bytes = (
+                    autocrlf_raw.lower()
+                    if isinstance(autocrlf_raw, bytes)
+                    else str(autocrlf_raw).lower().encode("ascii")
+                )
+                if autocrlf in (b"true", b"input"):
+                    # Add text filter for files without explicit attributes
+                    text_filter = get_driver("text")
+                    if text_filter is not None:
+                        filters.append(text_filter)
+
     # Check if there's a filter attribute
     filter_name = attributes.get(b"filter")
-    if filter_name is not None:
-        if isinstance(filter_name, bool):
-            return None
+    if filter_name is not None and not isinstance(filter_name, bool):
         if isinstance(filter_name, bytes):
             filter_name_str = filter_name.decode("utf-8")
             driver = get_driver(filter_name_str)
@@ -727,35 +797,17 @@ def get_filter_for_path(
                         f"Required filter '{filter_name_str}' is not available"
                     )
 
-            return driver
-        return None
+            if driver is not None:
+                filters.append(driver)
 
-    # Check for text attribute
-    text_attr = attributes.get(b"text")
-    if text_attr is True:
-        # Use the text filter for line ending conversion
-        return get_driver("text")
-    elif text_attr is False:
-        # -text means binary, no conversion
+    # Return appropriate filter(s)
+    if len(filters) == 0:
         return None
-
-    # If no explicit text attribute, check if autocrlf is enabled
-    # When autocrlf is true/input, files are treated as text by default
-    if registry.config is not None:
-        try:
-            autocrlf_raw = registry.config.get("core", "autocrlf")
-            autocrlf: bytes = (
-                autocrlf_raw.lower()
-                if isinstance(autocrlf_raw, bytes)
-                else str(autocrlf_raw).lower().encode("ascii")
-            )
-            if autocrlf in (b"true", b"input"):
-                # Use text filter for files without explicit attributes
-                return get_driver("text")
-        except KeyError:
-            pass
-
-    return None
+    elif len(filters) == 1:
+        return filters[0]
+    else:
+        # Multiple filters - create a composite
+        return CompositeFilterDriver(filters)
 
 
 class FilterBlobNormalizer:

+ 4 - 0
tests/__init__.py

@@ -134,6 +134,8 @@ def self_test_suite() -> unittest.TestSuite:
         "dumb",
         "fastexport",
         "file",
+        "filter_branch",
+        "filters",
         "gc",
         "grafts",
         "graph",
@@ -142,6 +144,7 @@ def self_test_suite() -> unittest.TestSuite:
         "ignore",
         "index",
         "lfs",
+        "lfs_integration",
         "line_ending",
         "log_utils",
         "lru_cache",
@@ -174,6 +177,7 @@ def self_test_suite() -> unittest.TestSuite:
         "utils",
         "walk",
         "web",
+        "whitespace",
         "worktree",
     ]
     module_names = ["tests.test_" + name for name in names]

+ 94 - 14
tests/test_filters.py

@@ -24,7 +24,6 @@
 import os
 import tempfile
 import threading
-import unittest
 
 from dulwich import porcelain
 from dulwich.filters import (
@@ -94,18 +93,40 @@ class GitAttributesFilterIntegrationTests(TestCase):
         bin_blob = self.repo.object_store[bin_entry.sha]
         self.assertEqual(bin_blob.data, b"binary\r\ndata\r\n")
 
-    @unittest.skip("Custom process filters require external commands")
     def test_gitattributes_custom_filter(self) -> None:
         """Test custom filter specified in gitattributes."""
+        # Create a Python script that acts as our filter
+        import sys
+
+        filter_script = os.path.join(self.test_dir, "redact_filter.py")
+        with open(filter_script, "w") as f:
+            f.write("""#!/usr/bin/env python3
+import sys
+data = sys.stdin.buffer.read()
+# Replace all digits with X
+result = bytearray()
+for b in data:
+    if chr(b).isdigit():
+        result.append(ord('X'))
+    else:
+        result.append(b)
+sys.stdout.buffer.write(result)
+""")
+        os.chmod(filter_script, 0o755)
+
         # Create .gitattributes with custom filter
         gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
         with open(gitattributes_path, "wb") as f:
             f.write(b"*.secret filter=redact\n")
 
-        # Configure custom filter (use tr command for testing)
+        # Configure custom filter (use Python script for testing)
         config = self.repo.get_config()
         # This filter replaces all digits with X
-        config.set((b"filter", b"redact"), b"clean", b"tr '0-9' 'X'")
+        config.set(
+            (b"filter", b"redact"),
+            b"clean",
+            f"{sys.executable} {filter_script}".encode(),
+        )
         config.write_to_path()
 
         # Add .gitattributes
@@ -159,9 +180,22 @@ class GitAttributesFilterIntegrationTests(TestCase):
         attrs = gitattributes.match_path(b"debug.log")
         self.assertEqual(attrs.get(b"text"), True)
 
-    @unittest.skip("Custom process filters require external commands")
     def test_filter_precedence(self) -> None:
         """Test that filter attribute takes precedence over text attribute."""
+        # Create a Python script that converts to uppercase
+        import sys
+
+        filter_script = os.path.join(self.test_dir, "uppercase_filter.py")
+        with open(filter_script, "w") as f:
+            f.write("""#!/usr/bin/env python3
+import sys
+data = sys.stdin.buffer.read()
+# Convert bytes to string, uppercase, then back to bytes
+result = data.decode('utf-8', errors='replace').upper().encode('utf-8')
+sys.stdout.buffer.write(result)
+""")
+        os.chmod(filter_script, 0o755)
+
         # Create .gitattributes with both text and filter
         gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
         with open(gitattributes_path, "wb") as f:
@@ -171,7 +205,11 @@ class GitAttributesFilterIntegrationTests(TestCase):
         config = self.repo.get_config()
         config.set((b"core",), b"autocrlf", b"true")
         # This filter converts to uppercase
-        config.set((b"filter", b"custom"), b"clean", b"tr '[:lower:]' '[:upper:]'")
+        config.set(
+            (b"filter", b"custom"),
+            b"clean",
+            f"{sys.executable} {filter_script}".encode(),
+        )
         config.write_to_path()
 
         # Add .gitattributes
@@ -578,7 +616,13 @@ class FilterContextTests(TestCase):
                 return False
 
         # Create registry and context
-        registry = FilterRegistry()
+        # Need to provide a config for caching to work
+        from dulwich.config import ConfigDict
+
+        config = ConfigDict()
+        # Add some dummy config to make it truthy (use proper format)
+        config.set((b"filter", b"uppercase"), b"clean", b"dummy")
+        registry = FilterRegistry(config=config)
         context = FilterContext(registry)
 
         # Register drivers
@@ -686,23 +730,59 @@ while True:
         )
 
         # Register in context
-        registry = FilterRegistry()
+        from dulwich.config import ConfigDict
+
+        config = ConfigDict()
+        # Add some dummy config to make it truthy (use proper format)
+        config.set(
+            (b"filter", b"process"),
+            b"clean",
+            f"{sys.executable} {filter_path}".encode(),
+        )
+        config.set(
+            (b"filter", b"process"),
+            b"smudge",
+            f"{sys.executable} {filter_path}".encode(),
+        )
+        registry = FilterRegistry(config=config)
         context = FilterContext(registry)
         registry.register_driver("process", process_driver)
 
         # Get driver - should not be cached since it's not long-running
         driver1 = context.get_driver("process")
         self.assertIsNotNone(driver1)
-        self.assertFalse(driver1.is_long_running())
+        # Check that it's not a long-running process (no process_cmd)
+        self.assertIsNone(driver1.process_cmd)
         self.assertNotIn("process", context._active_drivers)
 
-        # Test with a long-running driver (has process_cmd)
-        long_process_driver = ProcessFilterDriver()
-        long_process_driver.process_cmd = "dummy"  # Just to make it long-running
-        registry.register_driver("long_process", long_process_driver)
+        # Test with a long-running driver that should be cached
+        # Create a mock driver that always wants to be reused
+        class CacheableProcessDriver:
+            def __init__(self):
+                self.process_cmd = "dummy"
+                self.clean_cmd = None
+                self.smudge_cmd = None
+                self.required = False
+
+            def clean(self, data):
+                return data
+
+            def smudge(self, data, path=b""):
+                return data
+
+            def cleanup(self):
+                pass
+
+            def reuse(self, config, filter_name):
+                # This driver always wants to be cached (simulates a long-running process)
+                return True
+
+        cacheable_driver = CacheableProcessDriver()
+        registry.register_driver("long_process", cacheable_driver)
 
         driver2 = context.get_driver("long_process")
-        self.assertTrue(driver2.is_long_running())
+        # Check that it has a process_cmd (long-running)
+        self.assertIsNotNone(driver2.process_cmd)
         self.assertIn("long_process", context._active_drivers)
 
         context.close()