Sfoglia il codice sorgente

Fix git-lfs smudge by quoting special characters path

Petr Chmelar 4 mesi fa
parent
commit
035b7afe90
3 ha cambiato i file con 74 aggiunte e 22 eliminazioni
  1. 3 0
      NEWS
  2. 13 3
      dulwich/filters.py
  3. 58 19
      tests/test_filters.py

+ 3 - 0
NEWS

@@ -324,6 +324,9 @@ compatible.
    parallel stat operations when checking for unstaged changes. This improves
    performance on slow filesystems like NFS. (Jelmer Vernooij, #1851)
 
+ * Fix smudge filter subprocess fallback for special characters in path.
+   (Petr Chmelar, #1878)
+
 0.24.1	2025-08-01
 
  * Require ``typing_extensions`` on Python 3.10.

+ 13 - 3
dulwich/filters.py

@@ -33,6 +33,7 @@ __all__ = [
 ]
 
 import logging
+import shlex
 import subprocess
 import threading
 from collections.abc import Callable
@@ -352,6 +353,8 @@ class ProcessFilterDriver:
 
     def smudge(self, data: bytes, path: bytes = b"") -> bytes:
         """Apply smudge filter using external process."""
+        import os
+
         path_str = path.decode("utf-8", errors="replace")
 
         # Try process filter first (much faster)
@@ -369,8 +372,13 @@ class ProcessFilterDriver:
                 raise FilterError("Smudge command is required but not configured")
             return data
 
-        # Substitute %f placeholder with file path
-        cmd = self.smudge_cmd.replace("%f", path_str)
+        # quote path according to platform and Substitute %f placeholder with file path
+        quoted_path = (
+            subprocess.list2cmdline([path_str])
+            if os.name == "nt"
+            else shlex.quote(path_str)
+        )
+        cmd = self.smudge_cmd.replace("%f", quoted_path)
 
         try:
             result = subprocess.run(
@@ -384,7 +392,9 @@ class ProcessFilterDriver:
             return result.stdout
         except subprocess.CalledProcessError as e:
             if self.required:
-                raise FilterError(f"Required smudge filter failed: {e}")
+                raise FilterError(
+                    f"Required smudge filter failed: {e} {e.stderr} {e.stdout}"
+                )
             # If not required, log warning and return original data on failure
             logging.warning("Optional smudge filter failed: %s", e)
             return data

+ 58 - 19
tests/test_filters.py

@@ -24,6 +24,8 @@
 import os
 import tempfile
 import threading
+from collections.abc import Iterator
+from contextlib import contextmanager
 
 from dulwich import porcelain
 from dulwich.filters import (
@@ -100,7 +102,8 @@ class GitAttributesFilterIntegrationTests(TestCase):
 
         filter_script = os.path.join(self.test_dir, "redact_filter.py")
         with open(filter_script, "w") as f:
-            f.write("""#!/usr/bin/env python3
+            f.write(
+                """#!/usr/bin/env python3
 import sys
 data = sys.stdin.buffer.read()
 # Replace all digits with X
@@ -111,7 +114,8 @@ for b in data:
     else:
         result.append(b)
 sys.stdout.buffer.write(result)
-""")
+"""
+            )
         os.chmod(filter_script, 0o755)
 
         # Create .gitattributes with custom filter
@@ -187,13 +191,15 @@ sys.stdout.buffer.write(result)
 
         filter_script = os.path.join(self.test_dir, "uppercase_filter.py")
         with open(filter_script, "w") as f:
-            f.write("""#!/usr/bin/env python3
+            f.write(
+                """#!/usr/bin/env python3
 import sys
 data = sys.stdin.buffer.read()
 # Convert bytes to string, uppercase, then back to bytes
 result = data.decode('utf-8', errors='replace').upper().encode('utf-8')
 sys.stdout.buffer.write(result)
-""")
+"""
+            )
         os.chmod(filter_script, 0o755)
 
         # Create .gitattributes with both text and filter
@@ -712,14 +718,7 @@ class FilterContextTests(TestCase):
         self.addCleanup(lambda: __import__("shutil").rmtree(test_dir))
 
         # Create a simple test filter that just passes data through
-        filter_script = """import sys
-while True:
-    line = sys.stdin.buffer.read()
-    if not line:
-        break
-    sys.stdout.buffer.write(line)
-    sys.stdout.buffer.flush()
-"""
+        filter_script = _PASSTHROUGH_FILTER_SCRIPT
         filter_path = os.path.join(test_dir, "simple_filter.py")
         with open(filter_path, "w") as f:
             f.write(filter_script)
@@ -1044,23 +1043,33 @@ while True:
         """Test paths with special characters are handled correctly."""
         import sys
 
-        driver = ProcessFilterDriver(
-            process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
-        )
-
         # Test various special characters in paths
         special_paths = [
             b"file with spaces.txt",
             b"path/with/slashes.txt",
             b"file=with=equals.txt",
             b"file\nwith\nnewlines.txt",
+            b"filew&with&ampersand.txt",
         ]
 
         test_data = b"test data"
 
-        for path in special_paths:
-            result = driver.smudge(test_data, path)
-            self.assertEqual(result, b"test data")
+        with create_passthrough_filter() as passthrough_filter_path:
+            for process_cmd, smudge_cmd in [
+                (f"{sys.executable} {self.test_filter_path}", None),
+                (None, f"{sys.executable} {passthrough_filter_path} %f"),
+            ]:
+                driver = ProcessFilterDriver(
+                    process_cmd=process_cmd,
+                    smudge_cmd=smudge_cmd,
+                    required=True,
+                )
+                for path in special_paths:
+                    with self.subTest(
+                        process_cmd=process_cmd, smudge_cmd=smudge_cmd, path=path
+                    ):
+                        result = driver.smudge(test_data, path)
+                        self.assertEqual(result, b"test data")
 
     def test_process_crash_recovery(self):
         """Test that process is properly restarted after crash."""
@@ -1594,3 +1603,33 @@ while True:
         finally:
             if os.path.exists(filter_path):
                 os.unlink(filter_path)
+
+
+_PASSTHROUGH_FILTER_SCRIPT = """import sys
+while True:
+    line = sys.stdin.buffer.read()
+    if not line:
+        break
+    sys.stdout.buffer.write(line)
+    sys.stdout.buffer.flush()
+"""
+
+
+@contextmanager
+def create_passthrough_filter() -> Iterator[str]:
+    filter_script = _PASSTHROUGH_FILTER_SCRIPT
+    with tempfile.NamedTemporaryFile(
+        suffix=".py", delete=False, prefix="test_filter_passthrough_"
+    ) as f:
+        f.write(filter_script.encode())
+        path = f.name
+
+    try:
+        if os.name != "nt":  # Not Windows
+            os.chmod(path, 0o755)
+        yield path
+    finally:
+        try:
+            os.unlink(path)
+        except FileNotFoundError:
+            pass