Răsfoiți Sursa

Fix Git filter protocol to handle two-phase response format (#1911)

The Git long-running process filter protocol specifies a two-phase
response format where filters send:
1. Initial headers with status
2. Content data
3. Final headers with status (potentially different from initial)

The previous implementation only read the initial headers and content,
causing protocol sync issues when working with filters like Git LFS that
properly implement the two-phase protocol.

Fixes #1889
Jelmer Vernooij 3 luni în urmă
părinte
comite
fdb06440d3
4 a modificat fișierele cu 524 adăugiri și 3 ștergeri
  1. 6 0
      NEWS
  2. 18 1
      dulwich/filters.py
  3. 105 0
      tests/compat/test_lfs.py
  4. 395 2
      tests/test_filters.py

+ 6 - 0
NEWS

@@ -9,6 +9,12 @@
    returns (payload, signature, signature_type) tuple. Supports both PGP and SSH
    signature detection. (Jelmer Vernooij)
 
+ * Fix Git filter protocol implementation to properly handle the two-phase
+   response format (initial headers, content, final headers) as specified in
+   the Git protocol documentation. This fixes compatibility with Git LFS and
+   other filters that send status messages in final headers.
+   (Jelmer Vernooij, #1889)
+
 0.24.2	2025-09-25
 
  * Added ``porcelain.shortlog`` function to summarize commits by author,

+ 18 - 1
dulwich/filters.py

@@ -249,7 +249,7 @@ class ProcessFilterDriver:
                     self._protocol.write_pkt_line(chunk)
                 self._protocol.write_pkt_line(None)  # flush packet to end data
 
-                # Read response
+                # Read response (initial headers)
                 response_headers = {}
                 while True:
                     pkt = self._protocol.read_pkt_line()
@@ -271,6 +271,23 @@ class ProcessFilterDriver:
                         break
                     result_chunks.append(pkt)
 
+                # Read final headers per Git filter protocol
+                # Filters send: headers + flush + content + flush + final_headers + flush
+                final_headers = {}
+                while True:
+                    pkt = self._protocol.read_pkt_line()
+                    if pkt is None:  # flush packet ends final headers
+                        break
+                    key, _, value = pkt.decode().rstrip("\n\r").partition("=")
+                    final_headers[key] = value
+
+                # Check final status (if provided, it overrides the initial status)
+                final_status = final_headers.get("status", status)
+                if final_status != "success":
+                    raise FilterError(
+                        f"Process filter {operation} failed with final status: {final_status}"
+                    )
+
                 return b"".join(result_chunks)
 
             except (OSError, subprocess.SubprocessError, ValueError) as e:

+ 105 - 0
tests/compat/test_lfs.py

@@ -325,6 +325,111 @@ class LFSFilterCompatTest(LFSCompatTestCase):
         self.assertEqual(smudged, test_content)
 
 
+class LFSStatusCompatTest(LFSCompatTestCase):
+    """Tests for git status with LFS files (issue #1889)."""
+
+    def test_status_with_lfs_files(self):
+        """Test git status works correctly with LFS files.
+
+        This reproduces issue #1889 where git status with LFS files
+        would fail due to incorrect handling of the two-phase filter
+        protocol response.
+        """
+        repo_dir = self.make_temp_dir()
+        run_git_or_fail(["init"], cwd=repo_dir)
+        # Disable autocrlf to avoid line ending issues on Windows
+        run_git_or_fail(["config", "core.autocrlf", "false"], cwd=repo_dir)
+        run_git_or_fail(["lfs", "install", "--local"], cwd=repo_dir)
+        run_git_or_fail(["lfs", "track", "*.bin"], cwd=repo_dir)
+        run_git_or_fail(["add", ".gitattributes"], cwd=repo_dir)
+        run_git_or_fail(["commit", "-m", "Track .bin files"], cwd=repo_dir)
+
+        # Add an LFS file
+        test_file = os.path.join(repo_dir, "test.bin")
+        test_content = b"x" * 1024 * 1024  # 1MB
+        with open(test_file, "wb") as f:
+            f.write(test_content)
+        run_git_or_fail(["add", "test.bin"], cwd=repo_dir)
+        run_git_or_fail(["commit", "-m", "Add LFS file"], cwd=repo_dir)
+
+        # Now check status with dulwich - this should not raise FilterError
+        repo = porcelain.open_repo(repo_dir)
+        self.addCleanup(repo.close)
+
+        # This should work without raising FilterError
+        # Before the fix, this would fail with:
+        # dulwich.filters.FilterError: Process filter smudge failed: error
+        status = porcelain.status(repo_dir, untracked_files="no")
+
+        # Verify status shows clean working tree
+        self.assertEqual(status.staged["add"], [])
+        self.assertEqual(status.staged["delete"], [])
+        self.assertEqual(status.staged["modify"], [])
+        self.assertEqual(status.unstaged, [])
+
+    def test_status_with_modified_lfs_file(self):
+        """Test git status with modified LFS files."""
+        repo_dir = self.make_temp_dir()
+        run_git_or_fail(["init"], cwd=repo_dir)
+        # Disable autocrlf to avoid line ending issues on Windows
+        run_git_or_fail(["config", "core.autocrlf", "false"], cwd=repo_dir)
+        run_git_or_fail(["lfs", "install", "--local"], cwd=repo_dir)
+        run_git_or_fail(["lfs", "track", "*.bin"], cwd=repo_dir)
+        run_git_or_fail(["add", ".gitattributes"], cwd=repo_dir)
+        run_git_or_fail(["commit", "-m", "Track .bin files"], cwd=repo_dir)
+
+        # Add an LFS file
+        test_file = os.path.join(repo_dir, "test.bin")
+        with open(test_file, "wb") as f:
+            f.write(b"original content\n")
+        run_git_or_fail(["add", "test.bin"], cwd=repo_dir)
+        run_git_or_fail(["commit", "-m", "Add LFS file"], cwd=repo_dir)
+
+        # Modify the file
+        with open(test_file, "wb") as f:
+            f.write(b"modified content\n")
+
+        # Check status - should show file as modified
+        repo = porcelain.open_repo(repo_dir)
+        self.addCleanup(repo.close)
+
+        status = porcelain.status(repo_dir, untracked_files="no")
+
+        # File should be in unstaged changes
+        self.assertIn(b"test.bin", status.unstaged)
+
+    def test_status_with_multiple_lfs_files(self):
+        """Test git status with multiple LFS files."""
+        repo_dir = self.make_temp_dir()
+        run_git_or_fail(["init"], cwd=repo_dir)
+        # Disable autocrlf to avoid line ending issues on Windows
+        run_git_or_fail(["config", "core.autocrlf", "false"], cwd=repo_dir)
+        run_git_or_fail(["lfs", "install", "--local"], cwd=repo_dir)
+        run_git_or_fail(["lfs", "track", "*.bin"], cwd=repo_dir)
+        run_git_or_fail(["add", ".gitattributes"], cwd=repo_dir)
+        run_git_or_fail(["commit", "-m", "Track .bin files"], cwd=repo_dir)
+
+        # Add multiple LFS files
+        for i in range(3):
+            test_file = os.path.join(repo_dir, f"test{i}.bin")
+            with open(test_file, "wb") as f:
+                f.write(b"content" * 1000)
+        run_git_or_fail(["add", "*.bin"], cwd=repo_dir)
+        run_git_or_fail(["commit", "-m", "Add LFS files"], cwd=repo_dir)
+
+        # Check status - should handle multiple files correctly
+        repo = porcelain.open_repo(repo_dir)
+        self.addCleanup(repo.close)
+
+        status = porcelain.status(repo_dir, untracked_files="no")
+
+        # All files should be clean
+        self.assertEqual(status.staged["add"], [])
+        self.assertEqual(status.staged["delete"], [])
+        self.assertEqual(status.staged["modify"], [])
+        self.assertEqual(status.unstaged, [])
+
+
 class LFSCloneCompatTest(LFSCompatTestCase):
     """Tests for cloning repositories with LFS files."""
 

+ 395 - 2
tests/test_filters.py

@@ -469,12 +469,15 @@ while True:
     # Send response
     write_pkt(b"status=success")
     write_pkt(None)
-    
+
     # Send result
     chunk_size = 65516
     for i in range(0, len(result), chunk_size):
         write_pkt(result[i:i+chunk_size])
     write_pkt(None)
+
+    # Send final headers (empty list to keep status=success)
+    write_pkt(None)
 """
 
         # Create temporary file
@@ -915,12 +918,15 @@ while True:
     # Send response
     write_pkt(b"status=success")
     write_pkt(None)
-    
+
     # Send result
     chunk_size = 65516
     for i in range(0, len(result), chunk_size):
         write_pkt(result[i:i+chunk_size])
     write_pkt(None)
+
+    # Send final headers (empty list to keep status=success)
+    write_pkt(None)
 """
 
         fd, path = tempfile.mkstemp(suffix=".py", prefix="test_filter_spec_")
@@ -1183,3 +1189,390 @@ protocol.write_pkt_line(None)
             driver.clean(b"test data")
 
         self.assertIn("Failed to start process filter", str(cm.exception))
+
+    def test_two_phase_response_protocol(self):
+        """Test filter protocol with two-phase response (initial + final headers).
+
+        This test verifies that the filter correctly handles the Git LFS protocol
+        where filters send:
+        1. Initial headers with status
+        2. Content data
+        3. Final headers with status
+
+        This is the format used by git-lfs and documented in the Git filter protocol.
+        """
+        import sys
+        import tempfile
+
+        # Create a filter that follows the two-phase protocol
+        filter_script = """import sys
+
+def read_exact(n):
+    data = b""
+    while len(data) < n:
+        chunk = sys.stdin.buffer.read(n - len(data))
+        if not chunk:
+            break
+        data += chunk
+    return data
+
+def write_pkt(data):
+    if data is None:
+        sys.stdout.buffer.write(b"0000")
+    else:
+        length = len(data) + 4
+        sys.stdout.buffer.write(("{:04x}".format(length)).encode())
+        sys.stdout.buffer.write(data)
+    sys.stdout.buffer.flush()
+
+def read_pkt():
+    size_bytes = read_exact(4)
+    if not size_bytes:
+        return None
+    size = int(size_bytes.decode(), 16)
+    if size == 0:
+        return None
+    return read_exact(size - 4)
+
+# Handshake
+client_hello = read_pkt()
+version = read_pkt()
+flush = read_pkt()
+
+write_pkt(b"git-filter-server")
+write_pkt(b"version=2")
+write_pkt(None)
+
+# Read and echo capabilities
+caps = []
+while True:
+    cap = read_pkt()
+    if cap is None:
+        break
+    caps.append(cap)
+
+for cap in caps:
+    write_pkt(cap)
+write_pkt(None)
+
+# Process commands
+while True:
+    headers = {}
+    while True:
+        line = read_pkt()
+        if line is None:
+            break
+        if b"=" in line:
+            k, v = line.split(b"=", 1)
+            headers[k.decode()] = v.decode()
+
+    if not headers:
+        break
+
+    # Read data
+    data_chunks = []
+    while True:
+        chunk = read_pkt()
+        if chunk is None:
+            break
+        data_chunks.append(chunk)
+
+    data = b"".join(data_chunks)
+
+    # Process
+    if headers.get("command") == "clean":
+        result = data.upper()
+    elif headers.get("command") == "smudge":
+        result = data.lower()
+    else:
+        result = data
+
+    # TWO-PHASE RESPONSE: Send initial headers
+    write_pkt(b"status=success")
+    write_pkt(None)
+
+    # Send result data
+    chunk_size = 65516
+    for i in range(0, len(result), chunk_size):
+        write_pkt(result[i:i+chunk_size])
+    write_pkt(None)
+
+    # TWO-PHASE RESPONSE: Send final headers (empty list to keep status=success)
+    write_pkt(None)
+"""
+
+        fd, filter_path = tempfile.mkstemp(
+            suffix=".py", prefix="test_filter_two_phase_"
+        )
+        try:
+            os.write(fd, filter_script.encode())
+            os.close(fd)
+
+            if os.name != "nt":
+                os.chmod(filter_path, 0o755)
+
+            driver = ProcessFilterDriver(
+                process_cmd=f"{sys.executable} {filter_path}", required=True
+            )
+
+            # Test clean operation
+            test_data = b"hello world"
+            result = driver.clean(test_data)
+            self.assertEqual(result, b"HELLO WORLD")
+
+            # Test smudge operation
+            result = driver.smudge(b"HELLO WORLD", b"test.txt")
+            self.assertEqual(result, b"hello world")
+
+            driver.cleanup()
+
+        finally:
+            if os.path.exists(filter_path):
+                os.unlink(filter_path)
+
+    def test_two_phase_response_with_status_messages(self):
+        """Test filter that sends status messages in final headers.
+
+        Some filters (like git-lfs) may send progress or status messages
+        in the final headers. This test verifies that we can handle those.
+        """
+        import sys
+        import tempfile
+
+        # Create a filter that sends extra status info in final headers
+        filter_script = """import sys
+
+def read_exact(n):
+    data = b""
+    while len(data) < n:
+        chunk = sys.stdin.buffer.read(n - len(data))
+        if not chunk:
+            break
+        data += chunk
+    return data
+
+def write_pkt(data):
+    if data is None:
+        sys.stdout.buffer.write(b"0000")
+    else:
+        length = len(data) + 4
+        sys.stdout.buffer.write(("{:04x}".format(length)).encode())
+        sys.stdout.buffer.write(data)
+    sys.stdout.buffer.flush()
+
+def read_pkt():
+    size_bytes = read_exact(4)
+    if not size_bytes:
+        return None
+    size = int(size_bytes.decode(), 16)
+    if size == 0:
+        return None
+    return read_exact(size - 4)
+
+# Handshake
+client_hello = read_pkt()
+version = read_pkt()
+flush = read_pkt()
+
+write_pkt(b"git-filter-server")
+write_pkt(b"version=2")
+write_pkt(None)
+
+# Read and echo capabilities
+caps = []
+while True:
+    cap = read_pkt()
+    if cap is None:
+        break
+    caps.append(cap)
+
+for cap in caps:
+    write_pkt(cap)
+write_pkt(None)
+
+# Process commands
+while True:
+    headers = {}
+    while True:
+        line = read_pkt()
+        if line is None:
+            break
+        if b"=" in line:
+            k, v = line.split(b"=", 1)
+            headers[k.decode()] = v.decode()
+
+    if not headers:
+        break
+
+    # Read data
+    data_chunks = []
+    while True:
+        chunk = read_pkt()
+        if chunk is None:
+            break
+        data_chunks.append(chunk)
+
+    data = b"".join(data_chunks)
+
+    # Process
+    result = data.upper()
+
+    # Send initial headers
+    write_pkt(b"status=success")
+    write_pkt(None)
+
+    # Send result data
+    chunk_size = 65516
+    for i in range(0, len(result), chunk_size):
+        write_pkt(result[i:i+chunk_size])
+    write_pkt(None)
+
+    # Send final headers with progress messages (like git-lfs does)
+    write_pkt(b"status=success")
+    write_pkt(None)
+"""
+
+        fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_status_")
+        try:
+            os.write(fd, filter_script.encode())
+            os.close(fd)
+
+            if os.name != "nt":
+                os.chmod(filter_path, 0o755)
+
+            driver = ProcessFilterDriver(
+                process_cmd=f"{sys.executable} {filter_path}", required=True
+            )
+
+            # Test clean operation with status messages
+            test_data = b"test data with status"
+            result = driver.clean(test_data)
+            self.assertEqual(result, b"TEST DATA WITH STATUS")
+
+            driver.cleanup()
+
+        finally:
+            if os.path.exists(filter_path):
+                os.unlink(filter_path)
+
+    def test_two_phase_response_with_final_error(self):
+        """Test filter that reports error in final headers.
+
+        The Git protocol allows filters to report success initially,
+        then report an error in the final headers. This test ensures
+        we handle that correctly.
+        """
+        import sys
+        import tempfile
+
+        # Create a filter that sends error in final headers
+        filter_script = """import sys
+
+def read_exact(n):
+    data = b""
+    while len(data) < n:
+        chunk = sys.stdin.buffer.read(n - len(data))
+        if not chunk:
+            break
+        data += chunk
+    return data
+
+def write_pkt(data):
+    if data is None:
+        sys.stdout.buffer.write(b"0000")
+    else:
+        length = len(data) + 4
+        sys.stdout.buffer.write(("{:04x}".format(length)).encode())
+        sys.stdout.buffer.write(data)
+    sys.stdout.buffer.flush()
+
+def read_pkt():
+    size_bytes = read_exact(4)
+    if not size_bytes:
+        return None
+    size = int(size_bytes.decode(), 16)
+    if size == 0:
+        return None
+    return read_exact(size - 4)
+
+# Handshake
+client_hello = read_pkt()
+version = read_pkt()
+flush = read_pkt()
+
+write_pkt(b"git-filter-server")
+write_pkt(b"version=2")
+write_pkt(None)
+
+# Read and echo capabilities
+caps = []
+while True:
+    cap = read_pkt()
+    if cap is None:
+        break
+    caps.append(cap)
+
+for cap in caps:
+    write_pkt(cap)
+write_pkt(None)
+
+# Process commands
+while True:
+    headers = {}
+    while True:
+        line = read_pkt()
+        if line is None:
+            break
+        if b"=" in line:
+            k, v = line.split(b"=", 1)
+            headers[k.decode()] = v.decode()
+
+    if not headers:
+        break
+
+    # Read data
+    data_chunks = []
+    while True:
+        chunk = read_pkt()
+        if chunk is None:
+            break
+        data_chunks.append(chunk)
+
+    data = b"".join(data_chunks)
+
+    # Send initial headers with success
+    write_pkt(b"status=success")
+    write_pkt(None)
+
+    # Send partial result
+    write_pkt(b"PARTIAL")
+    write_pkt(None)
+
+    # Send final headers with error (simulating processing failure)
+    write_pkt(b"status=error")
+    write_pkt(None)
+"""
+
+        fd, filter_path = tempfile.mkstemp(suffix=".py", prefix="test_filter_error_")
+        try:
+            os.write(fd, filter_script.encode())
+            os.close(fd)
+
+            if os.name != "nt":
+                os.chmod(filter_path, 0o755)
+
+            driver = ProcessFilterDriver(
+                process_cmd=f"{sys.executable} {filter_path}", required=True
+            )
+
+            # Should raise FilterError due to final status being error
+            with self.assertRaises(FilterError) as cm:
+                driver.clean(b"test data")
+
+            self.assertIn("final status: error", str(cm.exception))
+
+            driver.cleanup()
+
+        finally:
+            if os.path.exists(filter_path):
+                os.unlink(filter_path)