|
|
@@ -24,6 +24,8 @@
|
|
|
import os
|
|
|
import tempfile
|
|
|
import threading
|
|
|
+from collections.abc import Iterator
|
|
|
+from contextlib import contextmanager
|
|
|
|
|
|
from dulwich import porcelain
|
|
|
from dulwich.filters import (
|
|
|
@@ -100,7 +102,8 @@ class GitAttributesFilterIntegrationTests(TestCase):
|
|
|
|
|
|
filter_script = os.path.join(self.test_dir, "redact_filter.py")
|
|
|
with open(filter_script, "w") as f:
|
|
|
- f.write("""#!/usr/bin/env python3
|
|
|
+ f.write(
|
|
|
+ """#!/usr/bin/env python3
|
|
|
import sys
|
|
|
data = sys.stdin.buffer.read()
|
|
|
# Replace all digits with X
|
|
|
@@ -111,7 +114,8 @@ for b in data:
|
|
|
else:
|
|
|
result.append(b)
|
|
|
sys.stdout.buffer.write(result)
|
|
|
-""")
|
|
|
+"""
|
|
|
+ )
|
|
|
os.chmod(filter_script, 0o755)
|
|
|
|
|
|
# Create .gitattributes with custom filter
|
|
|
@@ -187,13 +191,15 @@ sys.stdout.buffer.write(result)
|
|
|
|
|
|
filter_script = os.path.join(self.test_dir, "uppercase_filter.py")
|
|
|
with open(filter_script, "w") as f:
|
|
|
- f.write("""#!/usr/bin/env python3
|
|
|
+ f.write(
|
|
|
+ """#!/usr/bin/env python3
|
|
|
import sys
|
|
|
data = sys.stdin.buffer.read()
|
|
|
# Convert bytes to string, uppercase, then back to bytes
|
|
|
result = data.decode('utf-8', errors='replace').upper().encode('utf-8')
|
|
|
sys.stdout.buffer.write(result)
|
|
|
-""")
|
|
|
+"""
|
|
|
+ )
|
|
|
os.chmod(filter_script, 0o755)
|
|
|
|
|
|
# Create .gitattributes with both text and filter
|
|
|
@@ -712,14 +718,7 @@ class FilterContextTests(TestCase):
|
|
|
self.addCleanup(lambda: __import__("shutil").rmtree(test_dir))
|
|
|
|
|
|
# Create a simple test filter that just passes data through
|
|
|
- filter_script = """import sys
|
|
|
-while True:
|
|
|
- line = sys.stdin.buffer.read()
|
|
|
- if not line:
|
|
|
- break
|
|
|
- sys.stdout.buffer.write(line)
|
|
|
- sys.stdout.buffer.flush()
|
|
|
-"""
|
|
|
+ filter_script = _PASSTHROUGH_FILTER_SCRIPT
|
|
|
filter_path = os.path.join(test_dir, "simple_filter.py")
|
|
|
with open(filter_path, "w") as f:
|
|
|
f.write(filter_script)
|
|
|
@@ -1044,23 +1043,33 @@ while True:
|
|
|
"""Test paths with special characters are handled correctly."""
|
|
|
import sys
|
|
|
|
|
|
- driver = ProcessFilterDriver(
|
|
|
- process_cmd=f"{sys.executable} {self.test_filter_path}", required=True
|
|
|
- )
|
|
|
-
|
|
|
# Test various special characters in paths
|
|
|
special_paths = [
|
|
|
b"file with spaces.txt",
|
|
|
b"path/with/slashes.txt",
|
|
|
b"file=with=equals.txt",
|
|
|
b"file\nwith\nnewlines.txt",
|
|
|
+ b"filew&with&ersand.txt",
|
|
|
]
|
|
|
|
|
|
test_data = b"test data"
|
|
|
|
|
|
- for path in special_paths:
|
|
|
- result = driver.smudge(test_data, path)
|
|
|
- self.assertEqual(result, b"test data")
|
|
|
+ with create_passthrough_filter() as passthrough_filter_path:
|
|
|
+ for process_cmd, smudge_cmd in [
|
|
|
+ (f"{sys.executable} {self.test_filter_path}", None),
|
|
|
+ (None, f"{sys.executable} {passthrough_filter_path} %f"),
|
|
|
+ ]:
|
|
|
+ driver = ProcessFilterDriver(
|
|
|
+ process_cmd=process_cmd,
|
|
|
+ smudge_cmd=smudge_cmd,
|
|
|
+ required=True,
|
|
|
+ )
|
|
|
+ for path in special_paths:
|
|
|
+ with self.subTest(
|
|
|
+ process_cmd=process_cmd, smudge_cmd=smudge_cmd, path=path
|
|
|
+ ):
|
|
|
+ result = driver.smudge(test_data, path)
|
|
|
+ self.assertEqual(result, b"test data")
|
|
|
|
|
|
def test_process_crash_recovery(self):
|
|
|
"""Test that process is properly restarted after crash."""
|
|
|
@@ -1594,3 +1603,33 @@ while True:
|
|
|
finally:
|
|
|
if os.path.exists(filter_path):
|
|
|
os.unlink(filter_path)
|
|
|
+
|
|
|
+
|
|
|
+_PASSTHROUGH_FILTER_SCRIPT = """import sys
|
|
|
+while True:
|
|
|
+ line = sys.stdin.buffer.read()
|
|
|
+ if not line:
|
|
|
+ break
|
|
|
+ sys.stdout.buffer.write(line)
|
|
|
+ sys.stdout.buffer.flush()
|
|
|
+"""
|
|
|
+
|
|
|
+
|
|
|
+@contextmanager
|
|
|
+def create_passthrough_filter() -> Iterator[str]:
|
|
|
+ filter_script = _PASSTHROUGH_FILTER_SCRIPT
|
|
|
+ with tempfile.NamedTemporaryFile(
|
|
|
+ suffix=".py", delete=False, prefix="test_filter_passthrough_"
|
|
|
+ ) as f:
|
|
|
+ f.write(filter_script.encode())
|
|
|
+ path = f.name
|
|
|
+
|
|
|
+ try:
|
|
|
+ if os.name != "nt": # Not Windows
|
|
|
+ os.chmod(path, 0o755)
|
|
|
+ yield path
|
|
|
+ finally:
|
|
|
+ try:
|
|
|
+ os.unlink(path)
|
|
|
+ except FileNotFoundError:
|
|
|
+ pass
|