2
0
Эх сурвалжийг харах

Fix subprocess filter performance issues (#1789) (#1868)

The ProcessFilterDriver was spawning a new subprocess for every file
operation, causing severe performance degradation for large repositories
(minutes instead of milliseconds for git status).

This commit implements the Git filter process protocol to use a single
long-running process for all filter operations:

- Add support for filter.<driver>.process configuration
- Implement Git filter process protocol (version 2) with proper pkt-line
format

Fixes #1789
Jelmer Vernooij 4 сар өмнө
parent
commit
2a8b0cb23b

+ 2 - 0
NEWS

@@ -43,6 +43,8 @@
    performance in repositories with many LFS-tracked files.
    (Jelmer Vernooij, #1789)
 
+ * Add filter server support. (Jelmer Vernooij, #1789)
+
  * Add support for ``patiencediff`` algorithm in diff.
    (Jelmer Vernooij, #1795)
 

+ 471 - 18
dulwich/filters.py

@@ -23,13 +23,16 @@
 
 import logging
 import subprocess
-from typing import TYPE_CHECKING, Callable, Optional, Protocol
+import threading
+from typing import TYPE_CHECKING, Callable, Optional
+from typing import Protocol as TypingProtocol
 
 from .attrs import GitAttributes
 from .objects import Blob
 
 if TYPE_CHECKING:
     from .config import StackedConfig
+    from .protocol import Protocol
     from .repo import BaseRepo
 
 
@@ -37,7 +40,7 @@ class FilterError(Exception):
     """Exception raised when filter operations fail."""
 
 
-class FilterDriver(Protocol):
+class FilterDriver(TypingProtocol):
     """Protocol for filter drivers."""
 
     def clean(self, data: bytes) -> bytes:
@@ -48,6 +51,28 @@ class FilterDriver(Protocol):
         """Apply smudge filter (repository → working tree)."""
         ...
 
+    def cleanup(self) -> None:
+        """Clean up any resources held by this filter driver."""
+        ...
+
+    def reuse(self, config: "StackedConfig", filter_name: str) -> bool:
+        """Check if this filter driver should be reused with the given configuration.
+
+        This method determines whether a cached filter driver instance should continue
+        to be used or if it should be recreated. Only filters that are expensive to
+        create (like long-running process filters) and whose configuration hasn't
+        changed should return True. Lightweight filters should return False to ensure
+        they always use the latest configuration.
+
+        Args:
+            config: The current configuration stack
+            filter_name: The name of the filter in config
+
+        Returns:
+            True if the filter should be reused, False if it should be recreated
+        """
+        ...
+
 
 class ProcessFilterDriver:
     """Filter driver that executes external processes."""
@@ -58,6 +83,7 @@ class ProcessFilterDriver:
         smudge_cmd: Optional[str] = None,
         required: bool = False,
         cwd: Optional[str] = None,
+        process_cmd: Optional[str] = None,
     ) -> None:
         """Initialize ProcessFilterDriver.
 
@@ -66,14 +92,165 @@ class ProcessFilterDriver:
           smudge_cmd: Command to run for smudge filter
           required: Whether the filter is required
           cwd: Working directory for filter execution
+          process_cmd: Command to run for process filter (preferred for performance)
         """
         self.clean_cmd = clean_cmd
         self.smudge_cmd = smudge_cmd
         self.required = required
         self.cwd = cwd
+        self.process_cmd = process_cmd
+        self._process: Optional[subprocess.Popen] = None
+        self._protocol: Optional[Protocol] = None
+        self._capabilities: set[bytes] = set()
+        self._process_lock = threading.Lock()
+
+    def _get_or_start_process(self):
+        """Get or start the long-running process filter."""
+        if self._process is None and self.process_cmd:
+            from .errors import HangupException
+            from .protocol import Protocol
+
+            try:
+                self._process = subprocess.Popen(
+                    self.process_cmd,
+                    shell=True,
+                    stdin=subprocess.PIPE,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.PIPE,
+                    cwd=self.cwd,
+                    text=False,  # Use bytes
+                )
+
+                # Check if process started successfully
+                if self._process.poll() is not None:
+                    # Process already terminated
+                    raise OSError(
+                        f"Process terminated immediately with code {self._process.returncode}"
+                    )
+
+                # Create protocol wrapper
+                def write_func(data):
+                    n = self._process.stdin.write(data)
+                    self._process.stdin.flush()
+                    return n
+
+                def read_func(size):
+                    return self._process.stdout.read(size)
+
+                self._protocol = Protocol(read_func, write_func)
+
+                # Send handshake using pkt-line format
+                self._protocol.write_pkt_line(b"git-filter-client")
+                self._protocol.write_pkt_line(b"version=2")
+                self._protocol.write_pkt_line(None)  # flush packet
+
+                # Read handshake response
+                welcome = self._protocol.read_pkt_line()
+                version = self._protocol.read_pkt_line()
+                flush = self._protocol.read_pkt_line()
+
+                # Verify handshake (be liberal - accept with or without newlines)
+                if welcome and welcome.rstrip(b"\n\r") != b"git-filter-server":
+                    raise FilterError(f"Invalid welcome message: {welcome}")
+                if version and version.rstrip(b"\n\r") != b"version=2":
+                    raise FilterError(f"Invalid version: {version}")
+                if flush is not None:
+                    raise FilterError("Expected flush packet after handshake")
+
+                # Send capabilities
+                self._protocol.write_pkt_line(b"capability=clean")
+                self._protocol.write_pkt_line(b"capability=smudge")
+                self._protocol.write_pkt_line(None)  # flush packet
+
+                # Read capability response
+                capabilities = []
+                while True:
+                    pkt = self._protocol.read_pkt_line()
+                    if pkt is None:  # flush packet
+                        break
+                    capabilities.append(pkt)
+
+                # Store supported capabilities
+                self._capabilities = set()
+                for cap in capabilities:
+                    cap = cap.rstrip(b"\n\r")  # Be liberal - strip any line endings
+                    if cap.startswith(b"capability="):
+                        self._capabilities.add(cap[11:])  # Remove "capability=" prefix
+
+            except (OSError, subprocess.SubprocessError, HangupException) as e:
+                self.cleanup()
+                raise FilterError(f"Failed to start process filter: {e}")
+        return self._process
+
+    def _use_process_filter(self, data: bytes, operation: str, path: str = "") -> bytes:
+        """Use the long-running process filter for the operation."""
+        with self._process_lock:
+            try:
+                proc = self._get_or_start_process()
+                if proc is None:
+                    return data
+
+                operation_bytes = operation.encode()
+                if operation_bytes not in self._capabilities:
+                    raise FilterError(f"Operation {operation} not supported by filter")
+
+                if not self._protocol:
+                    raise FilterError("Protocol not initialized")
+
+                # Send request using pkt-line format
+                self._protocol.write_pkt_line(f"command={operation}".encode())
+                self._protocol.write_pkt_line(f"pathname={path}".encode())
+                self._protocol.write_pkt_line(None)  # flush packet
+
+                # Send data
+                # Split data into chunks if needed (max pkt-line payload is 65516 bytes)
+                chunk_size = 65516
+                for i in range(0, len(data), chunk_size):
+                    chunk = data[i : i + chunk_size]
+                    self._protocol.write_pkt_line(chunk)
+                self._protocol.write_pkt_line(None)  # flush packet to end data
+
+                # Read response
+                response_headers = {}
+                while True:
+                    pkt = self._protocol.read_pkt_line()
+                    if pkt is None:  # flush packet ends headers
+                        break
+                    key, _, value = pkt.decode().rstrip("\n\r").partition("=")
+                    response_headers[key] = value
+
+                # Check status
+                status = response_headers.get("status", "error")
+                if status != "success":
+                    raise FilterError(f"Process filter {operation} failed: {status}")
+
+                # Read result data
+                result_chunks = []
+                while True:
+                    pkt = self._protocol.read_pkt_line()
+                    if pkt is None:  # flush packet ends data
+                        break
+                    result_chunks.append(pkt)
+
+                return b"".join(result_chunks)
+
+            except (OSError, subprocess.SubprocessError, ValueError) as e:
+                # Clean up broken process
+                self.cleanup()
+                raise FilterError(f"Process filter failed: {e}")
 
     def clean(self, data: bytes) -> bytes:
         """Apply clean filter using external process."""
+        # Try process filter first (much faster)
+        if self.process_cmd:
+            try:
+                return self._use_process_filter(data, "clean")
+            except FilterError as e:
+                if self.required:
+                    raise
+                logging.warning(f"Process filter failed, falling back: {e}")
+
+        # Fall back to clean command
         if not self.clean_cmd:
             if self.required:
                 raise FilterError("Clean command is required but not configured")
@@ -98,13 +275,25 @@ class ProcessFilterDriver:
 
     def smudge(self, data: bytes, path: bytes = b"") -> bytes:
         """Apply smudge filter using external process."""
+        path_str = path.decode("utf-8", errors="replace")
+
+        # Try process filter first (much faster)
+        if self.process_cmd:
+            try:
+                return self._use_process_filter(data, "smudge", path_str)
+            except FilterError as e:
+                if self.required:
+                    raise
+                logging.warning(f"Process filter failed, falling back: {e}")
+
+        # Fall back to smudge command
         if not self.smudge_cmd:
             if self.required:
                 raise FilterError("Smudge command is required but not configured")
             return data
 
         # Substitute %f placeholder with file path
-        cmd = self.smudge_cmd.replace("%f", path.decode("utf-8", errors="replace"))
+        cmd = self.smudge_cmd.replace("%f", path_str)
 
         try:
             result = subprocess.run(
@@ -123,6 +312,194 @@ class ProcessFilterDriver:
             logging.warning(f"Optional smudge filter failed: {e}")
             return data
 
+    def cleanup(self):
+        """Clean up the process filter."""
+        if self._process:
+            # Close stdin first to signal the process to quit cleanly
+            if self._process.stdin and not self._process.stdin.closed:
+                try:
+                    self._process.stdin.close()
+                except BrokenPipeError:
+                    pass
+
+            # Try to terminate gracefully first
+            if self._process.poll() is None:  # Still running
+                try:
+                    self._process.terminate()
+                    self._process.wait(timeout=2)
+                except subprocess.TimeoutExpired:
+                    # Force kill if terminate didn't work
+                    try:
+                        self._process.kill()
+                        self._process.wait(timeout=3)
+                    except subprocess.TimeoutExpired:
+                        # On Windows, sometimes we need to be more aggressive
+                        import os
+
+                        if os.name == "nt":
+                            try:
+                                subprocess.run(
+                                    [
+                                        "taskkill",
+                                        "/F",
+                                        "/T",
+                                        "/PID",
+                                        str(self._process.pid),
+                                    ],
+                                    capture_output=True,
+                                    timeout=5,
+                                )
+                                self._process.wait(timeout=1)
+                            except (
+                                subprocess.CalledProcessError,
+                                subprocess.TimeoutExpired,
+                            ):
+                                pass
+                        else:
+                            try:
+                                import signal
+
+                                os.kill(self._process.pid, signal.SIGKILL)
+                                self._process.wait(timeout=1)
+                            except (ProcessLookupError, subprocess.TimeoutExpired):
+                                pass
+                except ProcessLookupError:
+                    # Process already dead
+                    pass
+        self._process = None
+        self._protocol = None
+
+    def reuse(self, config: "StackedConfig", filter_name: str) -> bool:
+        """Check if this filter driver should be reused with the given configuration."""
+        # Only reuse if it's a long-running process filter AND config hasn't changed
+        if self.process_cmd is None:
+            # Not a long-running filter, don't cache
+            return False
+
+        # Check if the filter commands in config match our current commands
+        try:
+            clean_cmd = config.get(("filter", filter_name), "clean")
+        except KeyError:
+            clean_cmd = None
+        if clean_cmd != self.clean_cmd:
+            return False
+
+        try:
+            smudge_cmd = config.get(("filter", filter_name), "smudge")
+        except KeyError:
+            smudge_cmd = None
+        if smudge_cmd != self.smudge_cmd:
+            return False
+
+        try:
+            process_cmd = config.get(("filter", filter_name), "process")
+        except KeyError:
+            process_cmd = None
+        if process_cmd != self.process_cmd:
+            return False
+
+        required = config.get_boolean(("filter", filter_name), "required", False)
+        if required != self.required:
+            return False
+
+        return True
+
+    def __del__(self):
+        """Clean up the process filter on destruction."""
+        self.cleanup()
+
+
+class FilterContext:
+    """Context for managing stateful filter resources.
+
+    This class manages the runtime state for filters, including:
+    - Cached filter driver instances that maintain long-running state
+    - Resource lifecycle management
+
+    It works in conjunction with FilterRegistry to provide complete
+    filter functionality while maintaining proper separation of concerns.
+    """
+
+    def __init__(self, filter_registry: "FilterRegistry") -> None:
+        """Initialize FilterContext.
+
+        Args:
+            filter_registry: The filter registry to use for driver lookups
+        """
+        self.filter_registry = filter_registry
+        self._active_drivers: dict[str, FilterDriver] = {}
+
+    def get_driver(self, name: str) -> Optional[FilterDriver]:
+        """Get a filter driver by name, managing stateful instances.
+
+        This method handles driver instantiation and caching. Only drivers
+        that should be reused are cached.
+
+        Args:
+            name: The filter name
+
+        Returns:
+            FilterDriver instance or None
+        """
+        driver: Optional[FilterDriver] = None
+        # Check if we have a cached instance that should be reused
+        if name in self._active_drivers:
+            driver = self._active_drivers[name]
+            # Check if the cached driver should still be reused
+            if self.filter_registry.config and driver.reuse(
+                self.filter_registry.config, name
+            ):
+                return driver
+            else:
+                # Driver shouldn't be reused, clean it up and remove from cache
+                driver.cleanup()
+                del self._active_drivers[name]
+
+        # Get driver from registry
+        driver = self.filter_registry.get_driver(name)
+        if driver is not None and self.filter_registry.config:
+            # Only cache drivers that should be reused
+            if driver.reuse(self.filter_registry.config, name):
+                self._active_drivers[name] = driver
+
+        return driver
+
+    def close(self) -> None:
+        """Close all active filter resources."""
+        # Clean up active drivers
+        for driver in self._active_drivers.values():
+            driver.cleanup()
+        self._active_drivers.clear()
+
+        # Also close the registry
+        self.filter_registry.close()
+
+    def refresh_config(self, config: "StackedConfig") -> None:
+        """Refresh the configuration used by the filter registry.
+
+        This should be called when the configuration has changed to ensure
+        filters use the latest settings.
+
+        Args:
+            config: The new configuration stack
+        """
+        # Update the registry's config
+        self.filter_registry.config = config
+
+        # Re-setup line ending filter with new config
+        # This will update the text filter factory to use new autocrlf settings
+        self.filter_registry._setup_line_ending_filter()
+
+        # The get_driver method will now handle checking reuse() for cached drivers
+
+    def __del__(self) -> None:
+        """Clean up on destruction."""
+        try:
+            self.close()
+        except Exception:
+            # Don't raise exceptions in __del__
+            pass
+
 
 class FilterRegistry:
     """Registry for filter drivers."""
@@ -181,6 +558,20 @@ class FilterRegistry:
 
         return None
 
+    def close(self) -> None:
+        """Close all filter drivers, ensuring process cleanup."""
+        for driver in self._drivers.values():
+            driver.cleanup()
+        self._drivers.clear()
+
+    def __del__(self) -> None:
+        """Clean up filter drivers on destruction."""
+        try:
+            self.close()
+        except Exception:
+            # Don't raise exceptions in __del__
+            pass
+
     def _create_from_config(self, name: str) -> Optional[FilterDriver]:
         """Create a filter driver from config."""
         if self.config is None:
@@ -188,6 +579,17 @@ class FilterRegistry:
 
         clean_cmd: Optional[str] = None
         smudge_cmd: Optional[str] = None
+        process_cmd: Optional[str] = None
+
+        # Get process command (preferred over clean/smudge for performance)
+        try:
+            process_cmd_raw = self.config.get(("filter", name), "process")
+            if isinstance(process_cmd_raw, bytes):
+                process_cmd = process_cmd_raw.decode("utf-8")
+            else:
+                process_cmd = process_cmd_raw
+        except KeyError:
+            pass
 
         # Get clean command
         try:
@@ -212,14 +614,16 @@ class FilterRegistry:
         # Get required flag (defaults to False)
         required = self.config.get_boolean(("filter", name), "required", False)
 
-        if clean_cmd or smudge_cmd:
+        if process_cmd or clean_cmd or smudge_cmd:
             # Get repository working directory (only for Repo, not BaseRepo)
             from .repo import Repo
 
             repo_path = (
                 self.repo.path if self.repo and isinstance(self.repo, Repo) else None
             )
-            return ProcessFilterDriver(clean_cmd, smudge_cmd, required, repo_path)
+            return ProcessFilterDriver(
+                clean_cmd, smudge_cmd, required, repo_path, process_cmd
+            )
 
         return None
 
@@ -321,18 +725,30 @@ class FilterRegistry:
 def get_filter_for_path(
     path: bytes,
     gitattributes: "GitAttributes",
-    filter_registry: FilterRegistry,
+    filter_registry: Optional[FilterRegistry] = None,
+    filter_context: Optional[FilterContext] = None,
 ) -> Optional[FilterDriver]:
     """Get the appropriate filter driver for a given path.
 
     Args:
         path: Path to check
         gitattributes: GitAttributes object with parsed patterns
-        filter_registry: Registry of filter drivers
+        filter_registry: Registry of filter drivers (deprecated, use filter_context)
+        filter_context: Context for managing filter state
 
     Returns:
         FilterDriver instance or None
     """
+    # Use filter_context if provided, otherwise fall back to registry
+    if filter_context is not None:
+        registry = filter_context.filter_registry
+        get_driver = filter_context.get_driver
+    elif filter_registry is not None:
+        registry = filter_registry
+        get_driver = filter_registry.get_driver
+    else:
+        raise ValueError("Either filter_registry or filter_context must be provided")
+
     # Get all attributes for this path
     attributes = gitattributes.match_path(path)
 
@@ -343,11 +759,11 @@ def get_filter_for_path(
             return None
         if isinstance(filter_name, bytes):
             filter_name_str = filter_name.decode("utf-8")
-            driver = filter_registry.get_driver(filter_name_str)
+            driver = get_driver(filter_name_str)
 
             # Check if filter is required but missing
-            if driver is None and filter_registry.config is not None:
-                required = filter_registry.config.get_boolean(
+            if driver is None and registry.config is not None:
+                required = registry.config.get_boolean(
                     ("filter", filter_name_str), "required", False
                 )
                 if required:
@@ -362,16 +778,16 @@ def get_filter_for_path(
     text_attr = attributes.get(b"text")
     if text_attr is True:
         # Use the text filter for line ending conversion
-        return filter_registry.get_driver("text")
+        return get_driver("text")
     elif text_attr is False:
         # -text means binary, no conversion
         return None
 
     # If no explicit text attribute, check if autocrlf is enabled
     # When autocrlf is true/input, files are treated as text by default
-    if filter_registry.config is not None:
+    if registry.config is not None:
         try:
-            autocrlf_raw = filter_registry.config.get("core", "autocrlf")
+            autocrlf_raw = registry.config.get("core", "autocrlf")
             autocrlf: bytes = (
                 autocrlf_raw.lower()
                 if isinstance(autocrlf_raw, bytes)
@@ -379,7 +795,7 @@ def get_filter_for_path(
             )
             if autocrlf in (b"true", b"input"):
                 # Use text filter for files without explicit attributes
-                return filter_registry.get_driver("text")
+                return get_driver("text")
         except KeyError:
             pass
 
@@ -398,24 +814,47 @@ class FilterBlobNormalizer:
         gitattributes: GitAttributes,
         filter_registry: Optional[FilterRegistry] = None,
         repo: Optional["BaseRepo"] = None,
+        filter_context: Optional[FilterContext] = None,
     ) -> None:
         """Initialize FilterBlobNormalizer.
 
         Args:
           config_stack: Git configuration stack
           gitattributes: GitAttributes instance
-          filter_registry: Optional filter registry to use
+          filter_registry: Optional filter registry to use (deprecated, use filter_context)
           repo: Optional repository instance
+          filter_context: Optional filter context to use for managing filter state
         """
         self.config_stack = config_stack
         self.gitattributes = gitattributes
-        self.filter_registry = filter_registry or FilterRegistry(config_stack, repo)
+        self._owns_context = False  # Track if we created our own context
+
+        # Support both old and new API
+        if filter_context is not None:
+            self.filter_context = filter_context
+            self.filter_registry = filter_context.filter_registry
+            self._owns_context = False  # We're using an external context
+        else:
+            if filter_registry is not None:
+                import warnings
+
+                warnings.warn(
+                    "Passing filter_registry to FilterBlobNormalizer is deprecated. "
+                    "Pass a FilterContext instead.",
+                    DeprecationWarning,
+                    stacklevel=2,
+                )
+                self.filter_registry = filter_registry
+            else:
+                self.filter_registry = FilterRegistry(config_stack, repo)
+            self.filter_context = FilterContext(self.filter_registry)
+            self._owns_context = True  # We created our own context
 
     def checkin_normalize(self, blob: Blob, path: bytes) -> Blob:
         """Apply clean filter during checkin (working tree -> repository)."""
         # Get filter for this path
         filter_driver = get_filter_for_path(
-            path, self.gitattributes, self.filter_registry
+            path, self.gitattributes, filter_context=self.filter_context
         )
         if filter_driver is None:
             return blob
@@ -434,7 +873,7 @@ class FilterBlobNormalizer:
         """Apply smudge filter during checkout (repository -> working tree)."""
         # Get filter for this path
         filter_driver = get_filter_for_path(
-            path, self.gitattributes, self.filter_registry
+            path, self.gitattributes, filter_context=self.filter_context
         )
         if filter_driver is None:
             return blob
@@ -448,3 +887,17 @@ class FilterBlobNormalizer:
         new_blob = Blob()
         new_blob.data = filtered_data
         return new_blob
+
+    def close(self) -> None:
+        """Close all filter drivers, ensuring process cleanup."""
+        # Only close the filter context if we created it ourselves
+        if self._owns_context:
+            self.filter_context.close()
+
+    def __del__(self) -> None:
+        """Clean up filter drivers on destruction."""
+        try:
+            self.close()
+        except Exception:
+            # Don't raise exceptions in __del__
+            pass

+ 9 - 0
dulwich/lfs.py

@@ -324,6 +324,15 @@ class LFSFilterDriver:
 
         return content
 
+    def cleanup(self) -> None:
+        """Clean up any resources held by this filter driver."""
+        # LFSFilterDriver doesn't hold any resources that need cleanup
+
+    def reuse(self, config, filter_name: str) -> bool:
+        """Check if this filter driver should be reused with the given configuration."""
+        # LFSFilterDriver is stateless and lightweight, no need to cache
+        return False
+
 
 def _get_lfs_user_agent(config: Optional["Config"]) -> str:
     """Get User-Agent string for LFS requests, respecting git config."""

+ 10 - 0
dulwich/line_ending.py

@@ -190,6 +190,16 @@ class LineEndingFilter(FilterDriver):
 
         return self.smudge_conversion(data)
 
+    def cleanup(self) -> None:
+        """Clean up any resources held by this filter driver."""
+        # LineEndingFilter doesn't hold any resources that need cleanup
+
+    def reuse(self, config, filter_name: str) -> bool:
+        """Check if this filter driver should be reused with the given configuration."""
+        # LineEndingFilter is lightweight and should always be recreated
+        # to ensure it uses the latest configuration
+        return False
+
 
 def convert_crlf_to_lf(text_hunk: bytes) -> bytes:
     """Convert CRLF in text hunk into LF.

+ 43 - 14
dulwich/repo.py

@@ -1301,6 +1301,9 @@ class Repo(BaseRepo):
         self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
         self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
 
+        # Initialize filter context as None, will be created lazily
+        self.filter_context = None
+
     def get_worktree(self) -> "WorkTree":
         """Get the working tree for this repository.
 
@@ -1969,6 +1972,10 @@ class Repo(BaseRepo):
     def close(self) -> None:
         """Close any files opened by this repository."""
         self.object_store.close()
+        # Clean up filter context if it was created
+        if self.filter_context is not None:
+            self.filter_context.close()
+            self.filter_context = None
 
     def __enter__(self):
         """Enter context manager."""
@@ -2019,17 +2026,24 @@ class Repo(BaseRepo):
 
     def get_blob_normalizer(self):
         """Return a BlobNormalizer object."""
-        from .filters import FilterBlobNormalizer, FilterRegistry
+        from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
 
-        # Get proper GitAttributes object
-        git_attributes = self.get_gitattributes()
+        # Get fresh configuration and GitAttributes
         config_stack = self.get_config_stack()
+        git_attributes = self.get_gitattributes()
 
-        # Create FilterRegistry with repo reference
-        filter_registry = FilterRegistry(config_stack, self)
+        # Lazily create FilterContext if needed
+        if self.filter_context is None:
+            filter_registry = FilterRegistry(config_stack, self)
+            self.filter_context = FilterContext(filter_registry)
+        else:
+            # Refresh the context with current config to handle config changes
+            self.filter_context.refresh_config(config_stack)
 
-        # Return FilterBlobNormalizer which handles all filters including line endings
-        return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self)
+        # Return a new FilterBlobNormalizer with the context
+        return FilterBlobNormalizer(
+            config_stack, git_attributes, filter_context=self.filter_context
+        )
 
     def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
         """Read gitattributes for the repository.
@@ -2162,6 +2176,7 @@ class MemoryRepo(BaseRepo):
         self.bare = True
         self._config = ConfigFile()
         self._description = None
+        self.filter_context = None
 
     def _append_reflog(self, *args) -> None:
         self._reflog.append(args)
@@ -2254,17 +2269,24 @@ class MemoryRepo(BaseRepo):
 
     def get_blob_normalizer(self):
         """Return a BlobNormalizer object for checkin/checkout operations."""
-        from .filters import FilterBlobNormalizer, FilterRegistry
+        from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
 
-        # Get GitAttributes object
-        git_attributes = self.get_gitattributes()
+        # Get fresh configuration and GitAttributes
         config_stack = self.get_config_stack()
+        git_attributes = self.get_gitattributes()
 
-        # Create FilterRegistry with repo reference
-        filter_registry = FilterRegistry(config_stack, self)
+        # Lazily create FilterContext if needed
+        if self.filter_context is None:
+            filter_registry = FilterRegistry(config_stack, self)
+            self.filter_context = FilterContext(filter_registry)
+        else:
+            # Refresh the context with current config to handle config changes
+            self.filter_context.refresh_config(config_stack)
 
-        # Return FilterBlobNormalizer which handles all filters
-        return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self)
+        # Return a new FilterBlobNormalizer with the context
+        return FilterBlobNormalizer(
+            config_stack, git_attributes, filter_context=self.filter_context
+        )
 
     def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
         """Read gitattributes for the repository."""
@@ -2274,6 +2296,13 @@ class MemoryRepo(BaseRepo):
         # Return empty GitAttributes
         return GitAttributes([])
 
+    def close(self) -> None:
+        """Close any resources opened by this repository."""
+        # Clean up filter context if it was created
+        if self.filter_context is not None:
+            self.filter_context.close()
+            self.filter_context = None
+
     def do_commit(
         self,
         message: Optional[bytes] = None,

+ 7 - 1
dulwich/sparse_patterns.py

@@ -164,7 +164,13 @@ def apply_included_paths(
             blob_obj = repo.object_store[index_entry.sha]
         except KeyError:
             return True
-        norm_data = normalizer.checkin_normalize(disk_data, full_path)
+        # Create a temporary blob for normalization
+        temp_blob = Blob()
+        temp_blob.data = disk_data
+        norm_blob = normalizer.checkin_normalize(
+            temp_blob, os.path.relpath(full_path, repo.path).encode()
+        )
+        norm_data = norm_blob.data
         if not isinstance(blob_obj, Blob):
             return True
         return bool(norm_data != blob_obj.data)

+ 787 - 1
tests/test_filters.py

@@ -23,10 +23,16 @@
 
 import os
 import tempfile
+import threading
 import unittest
 
 from dulwich import porcelain
-from dulwich.filters import FilterError
+from dulwich.filters import (
+    FilterContext,
+    FilterError,
+    FilterRegistry,
+    ProcessFilterDriver,
+)
 from dulwich.repo import Repo
 
 from . import TestCase
@@ -317,3 +323,783 @@ class GitAttributesFilterIntegrationTests(TestCase):
         entry = index[b"test.txt"]
         blob = self.repo.object_store[entry.sha]
         self.assertEqual(blob.data, b"test content\n")
+
+
+class ProcessFilterDriverTests(TestCase):
+    """Tests for ProcessFilterDriver with real process filter."""
+
+    def setUp(self):
+        super().setUp()
+        # Create a temporary test filter process dynamically
+        self.test_filter_path = self._create_test_filter()
+
+    def tearDown(self):
+        # Clean up the test filter
+        if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
+            os.unlink(self.test_filter_path)
+        super().tearDown()
+
+    def _create_test_filter(self):
+        """Create a simple test filter process that works on all platforms."""
+        import tempfile
+
+        # Create filter script that uppercases on clean, lowercases on smudge
+        filter_script = """import sys
+import os
+
+# Simple filter that doesn't use any external dependencies
+def read_exact(n):
+    data = b""
+    while len(data) < n:
+        chunk = sys.stdin.buffer.read(n - len(data))
+        if not chunk:
+            break
+        data += chunk
+    return data
+
+def write_pkt(data):
+    if data is None:
+        sys.stdout.buffer.write(b"0000")
+    else:
+        length = len(data) + 4
+        sys.stdout.buffer.write(("{:04x}".format(length)).encode())
+        sys.stdout.buffer.write(data)
+    sys.stdout.buffer.flush()
+
+def read_pkt():
+    size_bytes = read_exact(4)
+    if not size_bytes:
+        return None
+    size = int(size_bytes.decode(), 16)
+    if size == 0:
+        return None
+    return read_exact(size - 4)
+
+# Handshake
+client_hello = read_pkt()
+version = read_pkt()
+flush = read_pkt()
+
+write_pkt(b"git-filter-server")
+write_pkt(b"version=2")
+write_pkt(None)
+
+# Read and echo capabilities
+caps = []
+while True:
+    cap = read_pkt()
+    if cap is None:
+        break
+    caps.append(cap)
+
+for cap in caps:
+    write_pkt(cap)
+write_pkt(None)
+
+# Process commands
+while True:
+    headers = {}
+    while True:
+        line = read_pkt()
+        if line is None:
+            break
+        if b"=" in line:
+            k, v = line.split(b"=", 1)
+            headers[k.decode()] = v.decode()
+    
+    if not headers:
+        break
+    
+    # Read data
+    data_chunks = []
+    while True:
+        chunk = read_pkt()
+        if chunk is None:
+            break
+        data_chunks.append(chunk)
+    
+    data = b"".join(data_chunks)
+    
+    # Process (uppercase for clean, lowercase for smudge)
+    if headers.get("command") == "clean":
+        result = data.upper()
+    elif headers.get("command") == "smudge":
+        result = data.lower()
+    else:
+        result = data
+    
+    # Send response
+    write_pkt(b"status=success")
+    write_pkt(None)
+    
+    # Send result
+    chunk_size = 65516
+    for i in range(0, len(result), chunk_size):
+        write_pkt(result[i:i+chunk_size])
+    write_pkt(None)
+"""
+
+        # Create temporary file
+        fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_")
+        try:
+            os.write(fd, filter_script.encode())
+            os.close(fd)
+
+            # Make executable on Unix-like systems
+            if os.name != "nt":  # Not Windows
+                os.chmod(path, 0o755)
+
+            return path
+        except:
+            if os.path.exists(path):
+                os.unlink(path)
+            raise
+
+    def test_process_filter_clean_operation(self):
+        """Test clean operation using real process filter."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
+        )
+
+        test_data = b"hello world"
+        result = driver.clean(test_data)
+
+        # Our test filter uppercases on clean
+        self.assertEqual(result, b"HELLO WORLD")
+
+    def test_process_filter_smudge_operation(self):
+        """Test smudge operation using real process filter."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
+        )
+
+        test_data = b"HELLO WORLD"
+        result = driver.smudge(test_data, b"test.txt")
+
+        # Our test filter lowercases on smudge
+        self.assertEqual(result, b"hello world")
+
+    def test_process_filter_large_data(self):
+        """Test process filter with data larger than single pkt-line."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
+        )
+
+        # Create data larger than max pkt-line payload (65516 bytes)
+        test_data = b"a" * 70000
+        result = driver.clean(test_data)
+
+        # Should be uppercased
+        self.assertEqual(result, b"A" * 70000)
+
+    def test_fallback_to_individual_commands(self):
+        """Test fallback when process filter fails."""
+        driver = ProcessFilterDriver(
+            clean_cmd="tr '[:lower:]' '[:upper:]'",  # Shell command to uppercase
+            process_cmd="/nonexistent/command",  # This should fail
+            required=False,
+        )
+
+        test_data = b"hello world\n"
+        result = driver.clean(test_data)
+
+        # Should fallback to tr command and uppercase
+        self.assertEqual(result, b"HELLO WORLD\n")
+
+    def test_process_reuse(self):
+        """Test that process is reused across multiple operations."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
+        )
+
+        # First operation
+        result1 = driver.clean(b"test1")
+        self.assertEqual(result1, b"TEST1")
+
+        # Second operation should reuse the same process
+        result2 = driver.clean(b"test2")
+        self.assertEqual(result2, b"TEST2")
+
+        # Process should still be alive
+        self.assertIsNotNone(driver._process)
+        self.assertIsNone(driver._process.poll())  # None means still running
+
+    def test_error_handling_invalid_command(self):
+        """Test error handling with invalid filter command."""
+        driver = ProcessFilterDriver(process_cmd="/nonexistent/command", required=True)
+
+        with self.assertRaises(FilterError) as cm:
+            driver.clean(b"test data")
+
+        self.assertIn("Failed to start process filter", str(cm.exception))
+
+
+class FilterContextTests(TestCase):
+    """Tests for FilterContext class."""
+
+    def test_filter_context_caches_long_running_drivers(self):
+        """Test that FilterContext caches only long-running drivers."""
+
+        # Create real filter drivers
+        class UppercaseFilter:
+            def clean(self, data):
+                return data.upper()
+
+            def smudge(self, data, path=b""):
+                return data.lower()
+
+            def cleanup(self):
+                pass
+
+            def reuse(self, config, filter_name):
+                # Pretend it's a long-running filter that should be cached
+                return True
+
+        class IdentityFilter:
+            def clean(self, data):
+                return data
+
+            def smudge(self, data, path=b""):
+                return data
+
+            def cleanup(self):
+                pass
+
+            def reuse(self, config, filter_name):
+                # Lightweight filter, don't cache
+                return False
+
+        # Create registry and context
+        registry = FilterRegistry()
+        context = FilterContext(registry)
+
+        # Register drivers
+        long_running = UppercaseFilter()
+        stateless = IdentityFilter()
+        registry.register_driver("uppercase", long_running)
+        registry.register_driver("identity", stateless)
+
+        # Get drivers through context
+        driver1 = context.get_driver("uppercase")
+        driver2 = context.get_driver("uppercase")
+
+        # Long-running driver should be cached
+        self.assertIs(driver1, driver2)
+        self.assertIs(driver1, long_running)
+
+        # Get stateless driver
+        stateless1 = context.get_driver("identity")
+        stateless2 = context.get_driver("identity")
+
+        # Stateless driver comes from registry but isn't cached in context
+        self.assertIs(stateless1, stateless)
+        self.assertIs(stateless2, stateless)
+        self.assertNotIn("identity", context._active_drivers)
+        self.assertIn("uppercase", context._active_drivers)
+
+    def test_filter_context_cleanup(self):
+        """Test that FilterContext properly cleans up resources."""
+        cleanup_called = []
+
+        class TrackableFilter:
+            def __init__(self, name):
+                self.name = name
+
+            def clean(self, data):
+                return data
+
+            def smudge(self, data, path=b""):
+                return data
+
+            def cleanup(self):
+                cleanup_called.append(self.name)
+
+            def is_long_running(self):
+                return True
+
+        # Create registry and context
+        registry = FilterRegistry()
+        context = FilterContext(registry)
+
+        # Register and use drivers
+        filter1 = TrackableFilter("filter1")
+        filter2 = TrackableFilter("filter2")
+        filter3 = TrackableFilter("filter3")
+        registry.register_driver("filter1", filter1)
+        registry.register_driver("filter2", filter2)
+        registry.register_driver("filter3", filter3)
+
+        # Get only some drivers to cache them
+        context.get_driver("filter1")
+        context.get_driver("filter2")
+        # Don't get filter3
+
+        # Close context
+        context.close()
+
+        # Verify cleanup was called for all drivers (context closes registry too)
+        self.assertEqual(set(cleanup_called), {"filter1", "filter2", "filter3"})
+
+    def test_filter_context_get_driver_returns_none_for_missing(self):
+        """Test that get_driver returns None for non-existent drivers."""
+        registry = FilterRegistry()
+        context = FilterContext(registry)
+
+        result = context.get_driver("nonexistent")
+        self.assertIsNone(result)
+
+    def test_filter_context_with_real_process_filter(self):
+        """Test FilterContext with real ProcessFilterDriver instances."""
+        import sys
+
+        # Use existing test filter from ProcessFilterDriverTests
+        test_dir = tempfile.mkdtemp()
+        self.addCleanup(lambda: __import__("shutil").rmtree(test_dir))
+
+        # Create a simple test filter that just passes data through
+        filter_script = """import sys
+while True:
+    line = sys.stdin.buffer.read()
+    if not line:
+        break
+    sys.stdout.buffer.write(line)
+    sys.stdout.buffer.flush()
+"""
+        filter_path = os.path.join(test_dir, "simple_filter.py")
+        with open(filter_path, "w") as f:
+            f.write(filter_script)
+
+        # Create ProcessFilterDriver instances
+        # One with process_cmd (long-running)
+        process_driver = ProcessFilterDriver(
+            process_cmd=None,  # Don't use actual process to avoid complexity
+            clean_cmd=f"{sys.executable} {filter_path}",
+            smudge_cmd=f"{sys.executable} {filter_path}",
+        )
+
+        # Register in context
+        registry = FilterRegistry()
+        context = FilterContext(registry)
+        registry.register_driver("process", process_driver)
+
+        # Get driver - should not be cached since it's not long-running
+        driver1 = context.get_driver("process")
+        self.assertIsNotNone(driver1)
+        self.assertFalse(driver1.is_long_running())
+        self.assertNotIn("process", context._active_drivers)
+
+        # Test with a long-running driver (has process_cmd)
+        long_process_driver = ProcessFilterDriver()
+        long_process_driver.process_cmd = "dummy"  # Just to make it long-running
+        registry.register_driver("long_process", long_process_driver)
+
+        driver2 = context.get_driver("long_process")
+        self.assertTrue(driver2.is_long_running())
+        self.assertIn("long_process", context._active_drivers)
+
+        context.close()
+
+    def test_filter_context_closes_registry(self):
+        """Test that closing FilterContext also closes the registry."""
+        # Track if registry.close() is called
+        registry_closed = []
+
+        class TrackingRegistry(FilterRegistry):
+            def close(self):
+                registry_closed.append(True)
+                super().close()
+
+        registry = TrackingRegistry()
+        context = FilterContext(registry)
+
+        # Close context should also close registry
+        context.close()
+        self.assertTrue(registry_closed)
+
+
+class ProcessFilterProtocolTests(TestCase):
+    """Tests for ProcessFilterDriver protocol compliance."""
+
+    def setUp(self):
+        super().setUp()
+        # Create a spec-compliant test filter process dynamically
+        self.test_filter_path = self._create_spec_compliant_filter()
+
+    def tearDown(self):
+        # Clean up the test filter
+        if hasattr(self, "test_filter_path") and os.path.exists(self.test_filter_path):
+            os.unlink(self.test_filter_path)
+        super().tearDown()
+
+    def _create_spec_compliant_filter(self):
+        """Create a spec-compliant test filter that works on all platforms."""
+        import tempfile
+
+        # This filter strictly follows Git spec - no newlines in packets
+        filter_script = """import sys
+
+def read_exact(n):
+    data = b""
+    while len(data) < n:
+        chunk = sys.stdin.buffer.read(n - len(data))
+        if not chunk:
+            break
+        data += chunk
+    return data
+
+def write_pkt(data):
+    if data is None:
+        sys.stdout.buffer.write(b"0000")
+    else:
+        length = len(data) + 4
+        sys.stdout.buffer.write(("{:04x}".format(length)).encode())
+        sys.stdout.buffer.write(data)
+    sys.stdout.buffer.flush()
+
+def read_pkt():
+    size_bytes = read_exact(4)
+    if not size_bytes:
+        return None
+    size = int(size_bytes.decode(), 16)
+    if size == 0:
+        return None
+    return read_exact(size - 4)
+
+# Handshake - exact format, no newlines
+client_hello = read_pkt()
+version = read_pkt()
+flush = read_pkt()
+
+if client_hello != b"git-filter-client":
+    sys.exit(1)
+if version != b"version=2":
+    sys.exit(1)
+
+write_pkt(b"git-filter-server")  # No newline
+write_pkt(b"version=2")  # No newline
+write_pkt(None)
+
+# Read and echo capabilities
+caps = []
+while True:
+    cap = read_pkt()
+    if cap is None:
+        break
+    caps.append(cap)
+
+for cap in caps:
+    if cap in [b"capability=clean", b"capability=smudge"]:
+        write_pkt(cap)
+write_pkt(None)
+
+# Process commands
+while True:
+    headers = {}
+    while True:
+        line = read_pkt()
+        if line is None:
+            break
+        if b"=" in line:
+            k, v = line.split(b"=", 1)
+            headers[k.decode()] = v.decode()
+    
+    if not headers:
+        break
+    
+    # Read data
+    data_chunks = []
+    while True:
+        chunk = read_pkt()
+        if chunk is None:
+            break
+        data_chunks.append(chunk)
+    
+    data = b"".join(data_chunks)
+    
+    # Process
+    if headers.get("command") == "clean":
+        result = data.upper()
+    elif headers.get("command") == "smudge":
+        result = data.lower()
+    else:
+        result = data
+    
+    # Send response
+    write_pkt(b"status=success")
+    write_pkt(None)
+    
+    # Send result
+    chunk_size = 65516
+    for i in range(0, len(result), chunk_size):
+        write_pkt(result[i:i+chunk_size])
+    write_pkt(None)
+"""
+
+        fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_spec_")
+        try:
+            os.write(fd, filter_script.encode())
+            os.close(fd)
+
+            if os.name != "nt":  # Not Windows
+                os.chmod(path, 0o755)
+
+            return path
+        except:
+            if os.path.exists(path):
+                os.unlink(path)
+            raise
+
+    def test_protocol_handshake_exact_format(self):
+        """Test that handshake uses exact format without newlines."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}",
+            required=True,  # Require success to test protocol compliance
+        )
+
+        # This should work with exact protocol format
+        test_data = b"hello world"
+        result = driver.clean(test_data)
+
+        # Our test filter uppercases on clean
+        self.assertEqual(result, b"HELLO WORLD")
+
+    def test_capability_negotiation_exact_format(self):
+        """Test that capabilities are sent and received in exact format."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
+        )
+
+        # Force capability negotiation by using both clean and smudge
+        clean_result = driver.clean(b"test")
+        smudge_result = driver.smudge(b"TEST", b"test.txt")
+
+        self.assertEqual(clean_result, b"TEST")
+        self.assertEqual(smudge_result, b"test")
+
+    def test_binary_data_handling(self):
+        """Test handling of binary data through the protocol."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
+        )
+
+        # Binary data with null bytes, high bytes, etc.
+        binary_data = bytes(range(256))
+
+        try:
+            result = driver.clean(binary_data)
+            # Should handle binary data without crashing
+            self.assertIsInstance(result, bytes)
+            # Our test filter uppercases, which may not work for all binary data
+            # but should not crash
+        except UnicodeDecodeError:
+            # This might happen with binary data - acceptable
+            pass
+
+    def test_large_file_chunking(self):
+        """Test proper chunking of large files."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
+        )
+
+        # Create data larger than max pkt-line payload (65516 bytes)
+        large_data = b"a" * 100000
+        result = driver.clean(large_data)
+
+        # Should be properly processed (uppercased)
+        expected = b"A" * 100000
+        self.assertEqual(result, expected)
+
+    def test_empty_file_handling(self):
+        """Test handling of empty files."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
+        )
+
+        result = driver.clean(b"")
+        self.assertEqual(result, b"")
+
+    def test_special_characters_in_pathname(self):
+        """Test paths with special characters are handled correctly."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
+        )
+
+        # Test various special characters in paths
+        special_paths = [
+            b"file with spaces.txt",
+            b"path/with/slashes.txt",
+            b"file=with=equals.txt",
+            b"file\nwith\nnewlines.txt",
+        ]
+
+        test_data = b"test data"
+
+        for path in special_paths:
+            result = driver.smudge(test_data, path)
+            self.assertEqual(result, b"test data")
+
+    def test_process_crash_recovery(self):
+        """Test that process is properly restarted after crash."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
+        )
+
+        # First operation
+        result = driver.clean(b"test1")
+        self.assertEqual(result, b"TEST1")
+
+        # Kill the process
+        if driver._process:
+            driver._process.kill()
+            driver._process.wait()
+        driver.cleanup()
+
+        # Should restart and work again
+        result = driver.clean(b"test2")
+        self.assertEqual(result, b"TEST2")
+
+    def test_malformed_process_response_handling(self):
+        """Test handling of malformed responses from process."""
+        # Create a filter that sends malformed responses
+        malformed_filter = """#!/usr/bin/env python3
+import sys
+import os
+sys.path.insert(0, os.path.dirname(__file__))
+from dulwich.protocol import Protocol
+
+protocol = Protocol(
+    lambda n: sys.stdin.buffer.read(n),
+    lambda d: sys.stdout.buffer.write(d) or len(d)
+)
+
+# Read handshake
+protocol.read_pkt_line()
+protocol.read_pkt_line()
+protocol.read_pkt_line()
+
+# Send invalid handshake
+protocol.write_pkt_line(b"invalid-welcome")
+protocol.write_pkt_line(b"version=2")
+protocol.write_pkt_line(None)
+"""
+
+        import tempfile
+
+        fd, script_path = tempfile.mkstemp(suffix=".py")
+        try:
+            os.write(fd, malformed_filter.encode())
+            os.close(fd)
+            os.chmod(script_path, 0o755)
+
+            driver = ProcessFilterDriver(
+                process_cmd=f"python3 {script_path}",
+                clean_cmd="cat",  # Fallback
+                required=False,
+            )
+
+            # Should fallback to clean_cmd when process fails
+            result = driver.clean(b"test data")
+            self.assertEqual(result, b"test data")
+
+        finally:
+            os.unlink(script_path)
+
+    def test_concurrent_filter_operations(self):
+        """Test that concurrent operations work correctly."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
+        )
+
+        results = []
+        errors = []
+
+        def worker(data):
+            try:
+                result = driver.clean(data)
+                results.append(result)
+            except Exception as e:
+                errors.append(e)
+
+        # Start 5 concurrent operations
+        threads = []
+        test_data = [f"test{i}".encode() for i in range(5)]
+
+        for data in test_data:
+            t = threading.Thread(target=worker, args=(data,))
+            threads.append(t)
+            t.start()
+
+        for t in threads:
+            t.join()
+
+        # Should have no errors
+        self.assertEqual(len(errors), 0, f"Errors: {errors}")
+        self.assertEqual(len(results), 5)
+
+        # All results should be uppercase versions
+        expected = [data.upper() for data in test_data]
+        self.assertEqual(sorted(results), sorted(expected))
+
+    def test_process_resource_cleanup(self):
+        """Test that process resources are properly cleaned up."""
+        import sys
+
+        driver = ProcessFilterDriver(
+            process_cmd=f"{sys.executable} {self.test_filter_path}", required=False
+        )
+
+        # Use the driver
+        result = driver.clean(b"test")
+        self.assertEqual(result, b"TEST")
+
+        # Process should be running
+        self.assertIsNotNone(driver._process)
+        self.assertIsNone(driver._process.poll())  # None means still running
+
+        # Remember the old process to check it was terminated
+        old_process = driver._process
+
+        # Manually clean up (simulates __del__)
+        driver.cleanup()
+
+        # Process reference should be cleared
+        self.assertIsNone(driver._process)
+        self.assertIsNone(driver._protocol)
+
+        # Old process should be terminated
+        self.assertIsNotNone(old_process.poll())  # Not None means terminated
+
+    def test_required_filter_error_propagation(self):
+        """Test that errors are properly propagated when filter is required."""
+        driver = ProcessFilterDriver(
+            process_cmd="/definitely/nonexistent/command", required=True
+        )
+
+        with self.assertRaises(FilterError) as cm:
+            driver.clean(b"test data")
+
+        self.assertIn("Failed to start process filter", str(cm.exception))

+ 6 - 0
tests/test_line_ending.py

@@ -519,6 +519,12 @@ class LineEndingIntegrationTests(TestCase):
             def smudge(self, data):
                 return b"LFS content"
 
+            def cleanup(self):
+                pass
+
+            def reuse(self, config, filter_name):
+                return False
+
         self.registry.register_driver("lfs", MockLFSFilter())
 
         # Different files use different filters

+ 20 - 17
tests/test_sparse_patterns.py

@@ -27,7 +27,6 @@ import shutil
 import tempfile
 import time
 
-from dulwich.filters import FilterBlobNormalizer, FilterRegistry
 from dulwich.index import IndexEntry
 from dulwich.objects import Blob
 from dulwich.repo import Repo
@@ -553,23 +552,29 @@ class ApplyIncludedPathsTests(TestCase):
             def clean(self, input_bytes):
                 return input_bytes.lower()
 
-        # Set up filter registry and normalizer
-        filter_registry = FilterRegistry()
-        filter_registry.register_driver("uppercase", UppercaseFilter())
+            def cleanup(self):
+                pass
 
-        # Create gitattributes object
-        from dulwich.attrs import GitAttributes, Pattern
+            def reuse(self, config, filter_name):
+                return False
 
-        patterns = [(Pattern(b"*.txt"), {b"filter": b"uppercase"})]
-        gitattributes = GitAttributes(patterns)
+        # Create .gitattributes file
+        gitattributes_path = os.path.join(self.temp_dir, ".gitattributes")
+        with open(gitattributes_path, "w") as f:
+            f.write("*.txt filter=uppercase\n")
 
-        # Monkey patch the repo to use our filter registry
-        original_get_blob_normalizer = self.repo.get_blob_normalizer
+        # Add and commit .gitattributes
+        self.repo.get_worktree().stage([b".gitattributes"])
+        self.repo.do_commit(b"Add gitattributes", committer=b"Test <test@example.com>")
 
-        def get_blob_normalizer_with_filters():
-            return FilterBlobNormalizer(None, gitattributes, filter_registry)
+        # Initialize the filter context and register the filter
+        _ = self.repo.get_blob_normalizer()
 
-        self.repo.get_blob_normalizer = get_blob_normalizer_with_filters
+        # Register the filter with the cached filter context
+        uppercase_filter = UppercaseFilter()
+        self.repo.filter_context.filter_registry.register_driver(
+            "uppercase", uppercase_filter
+        )
 
         # Commit a file with lowercase content
         self._commit_blob("test.txt", b"hello world")
@@ -577,7 +582,8 @@ class ApplyIncludedPathsTests(TestCase):
         # Remove the file from working tree to force materialization
         os.remove(os.path.join(self.temp_dir, "test.txt"))
 
-        # Apply sparse checkout
+        # Apply sparse checkout - this will call get_blob_normalizer() internally
+        # which will use the cached filter_context with our registered filter
         apply_included_paths(self.repo, included_paths={"test.txt"}, force=False)
 
         # Verify file was materialized with uppercase content (checkout normalization applied)
@@ -585,9 +591,6 @@ class ApplyIncludedPathsTests(TestCase):
             content = f.read()
             self.assertEqual(content, b"HELLO WORLD")
 
-        # Restore original method
-        self.repo.get_blob_normalizer = original_get_blob_normalizer
-
     def test_checkout_normalization_with_lf_to_crlf(self):
         """Test that line ending normalization is applied during sparse checkout."""
         # Commit a file with LF line endings