Explorar o código

filters: Support ``required`` setting (#1689)

Jelmer Vernooij hai 1 mes
pai
achega
9f24e6bef9
Modificáronse 2 ficheiros con 181 adicións e 19 borrados
  1. 61 19
      dulwich/filters.py
  2. 120 0
      tests/test_filters.py

+ 61 - 19
dulwich/filters.py

@@ -21,6 +21,7 @@
 
 """Implementation of Git filter drivers (clean/smudge filters)."""
 
+import logging
 import subprocess
 from typing import TYPE_CHECKING, Callable, Optional, Protocol
 
@@ -31,6 +32,10 @@ if TYPE_CHECKING:
     from .config import StackedConfig
 
 
+class FilterError(Exception):
+    """Exception raised when filter operations fail."""
+
+
 class FilterDriver(Protocol):
     """Protocol for filter drivers."""
 
@@ -47,38 +52,60 @@ class ProcessFilterDriver:
     """Filter driver that executes external processes."""
 
     def __init__(
-        self, clean_cmd: Optional[str] = None, smudge_cmd: Optional[str] = None
+        self,
+        clean_cmd: Optional[str] = None,
+        smudge_cmd: Optional[str] = None,
+        required: bool = False,
     ) -> None:
         self.clean_cmd = clean_cmd
         self.smudge_cmd = smudge_cmd
+        self.required = required
 
     def clean(self, data: bytes) -> bytes:
         """Apply clean filter using external process."""
         if not self.clean_cmd:
+            if self.required:
+                raise FilterError("Clean command is required but not configured")
             return data
 
-        result = subprocess.run(
-            self.clean_cmd,
-            shell=True,
-            input=data,
-            capture_output=True,
-            check=True,
-        )
-        return result.stdout
+        try:
+            result = subprocess.run(
+                self.clean_cmd,
+                shell=True,
+                input=data,
+                capture_output=True,
+                check=True,
+            )
+            return result.stdout
+        except subprocess.CalledProcessError as e:
+            if self.required:
+                raise FilterError(f"Required clean filter failed: {e}")
+            # If not required, log warning and return original data on failure
+            logging.warning(f"Optional clean filter failed: {e}")
+            return data
 
     def smudge(self, data: bytes) -> bytes:
         """Apply smudge filter using external process."""
         if not self.smudge_cmd:
+            if self.required:
+                raise FilterError("Smudge command is required but not configured")
             return data
 
-        result = subprocess.run(
-            self.smudge_cmd,
-            shell=True,
-            input=data,
-            capture_output=True,
-            check=True,
-        )
-        return result.stdout
+        try:
+            result = subprocess.run(
+                self.smudge_cmd,
+                shell=True,
+                input=data,
+                capture_output=True,
+                check=True,
+            )
+            return result.stdout
+        except subprocess.CalledProcessError as e:
+            if self.required:
+                raise FilterError(f"Required smudge filter failed: {e}")
+            # If not required, log warning and return original data on failure
+            logging.warning(f"Optional smudge filter failed: {e}")
+            return data
 
 
 class FilterRegistry:
@@ -156,8 +183,11 @@ class FilterRegistry:
         except KeyError:
             pass
 
+        # Get required flag (defaults to False)
+        required = self.config.get_boolean(("filter", name), "required", False)
+
         if clean_cmd or smudge_cmd:
-            return ProcessFilterDriver(clean_cmd, smudge_cmd)
+            return ProcessFilterDriver(clean_cmd, smudge_cmd, required)
 
         return None
 
@@ -278,7 +308,19 @@ def get_filter_for_path(
             return None
         if isinstance(filter_name, bytes):
             filter_name_str = filter_name.decode("utf-8")
-            return filter_registry.get_driver(filter_name_str)
+            driver = filter_registry.get_driver(filter_name_str)
+
+            # Check if filter is required but missing
+            if driver is None and filter_registry.config is not None:
+                required = filter_registry.config.get_boolean(
+                    ("filter", filter_name_str), "required", False
+                )
+                if required:
+                    raise FilterError(
+                        f"Required filter '{filter_name_str}' is not available"
+                    )
+
+            return driver
         return None
 
     # Check for text attribute

+ 120 - 0
tests/test_filters.py

@@ -26,6 +26,7 @@ import tempfile
 import unittest
 
 from dulwich import porcelain
+from dulwich.filters import FilterError
 from dulwich.repo import Repo
 
 from . import TestCase
@@ -197,3 +198,122 @@ class GitAttributesFilterIntegrationTests(TestCase):
         # Check it has access to gitattributes
         self.assertIsNotNone(normalizer.gitattributes)
         self.assertIsNotNone(normalizer.filter_registry)
+
+    def test_required_filter_missing(self) -> None:
+        """Test that missing required filter raises an error."""
+        # Create .gitattributes with required filter
+        gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.secret filter=required_filter\n")
+
+        # Configure filter as required but without commands
+        config = self.repo.get_config()
+        config.set((b"filter", b"required_filter"), b"required", b"true")
+        config.write_to_path()
+
+        # Add .gitattributes
+        porcelain.add(self.repo, paths=[".gitattributes"])
+
+        # Create file that would use the filter
+        secret_file = os.path.join(self.test_dir, "test.secret")
+        with open(secret_file, "wb") as f:
+            f.write(b"test content\n")
+
+        # Adding file should raise error due to missing required filter
+        with self.assertRaises(FilterError) as cm:
+            porcelain.add(self.repo, paths=["test.secret"])
+        self.assertIn(
+            "Required filter 'required_filter' is not available", str(cm.exception)
+        )
+
+    def test_required_filter_clean_command_fails(self) -> None:
+        """Test that required filter failure during clean raises an error."""
+        # Create .gitattributes with required filter
+        gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.secret filter=failing_filter\n")
+
+        # Configure filter as required with failing command
+        config = self.repo.get_config()
+        config.set(
+            (b"filter", b"failing_filter"), b"clean", b"false"
+        )  # false command always fails
+        config.set((b"filter", b"failing_filter"), b"required", b"true")
+        config.write_to_path()
+
+        # Add .gitattributes
+        porcelain.add(self.repo, paths=[".gitattributes"])
+
+        # Create file that would use the filter
+        secret_file = os.path.join(self.test_dir, "test.secret")
+        with open(secret_file, "wb") as f:
+            f.write(b"test content\n")
+
+        # Adding file should raise error due to failing required filter
+        with self.assertRaises(FilterError) as cm:
+            porcelain.add(self.repo, paths=["test.secret"])
+        self.assertIn("Required clean filter failed", str(cm.exception))
+
+    def test_required_filter_success(self) -> None:
+        """Test that required filter works when properly configured."""
+        # Create .gitattributes with required filter
+        gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.secret filter=working_filter\n")
+
+        # Configure filter as required with working command
+        config = self.repo.get_config()
+        config.set(
+            (b"filter", b"working_filter"), b"clean", b"tr 'a-z' 'A-Z'"
+        )  # uppercase
+        config.set((b"filter", b"working_filter"), b"required", b"true")
+        config.write_to_path()
+
+        # Add .gitattributes
+        porcelain.add(self.repo, paths=[".gitattributes"])
+
+        # Create file that would use the filter
+        secret_file = os.path.join(self.test_dir, "test.secret")
+        with open(secret_file, "wb") as f:
+            f.write(b"hello world\n")
+
+        # Adding file should work and apply filter
+        porcelain.add(self.repo, paths=["test.secret"])
+
+        # Check that content was filtered
+        index = self.repo.open_index()
+        entry = index[b"test.secret"]
+        blob = self.repo.object_store[entry.sha]
+        self.assertEqual(blob.data, b"HELLO WORLD\n")
+
+    def test_optional_filter_failure_fallback(self) -> None:
+        """Test that optional filter failure falls back to original data."""
+        # Create .gitattributes with optional filter
+        gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.txt filter=optional_filter\n")
+
+        # Configure filter as optional (required=false) with failing command
+        config = self.repo.get_config()
+        config.set(
+            (b"filter", b"optional_filter"), b"clean", b"false"
+        )  # false command always fails
+        config.set((b"filter", b"optional_filter"), b"required", b"false")
+        config.write_to_path()
+
+        # Add .gitattributes
+        porcelain.add(self.repo, paths=[".gitattributes"])
+
+        # Create file that would use the filter
+        test_file = os.path.join(self.test_dir, "test.txt")
+        with open(test_file, "wb") as f:
+            f.write(b"test content\n")
+
+        # Adding file should work and fallback to original content
+        porcelain.add(self.repo, paths=["test.txt"])
+
+        # Check that original content was preserved
+        index = self.repo.open_index()
+        entry = index[b"test.txt"]
+        blob = self.repo.object_store[entry.sha]
+        self.assertEqual(blob.data, b"test content\n")