Quellcode durchsuchen

Various LFS fixes (#1691)

Jelmer Vernooij vor 2 Monaten
Ursprung
Commit
52db9ad387

+ 23 - 13
dulwich/filters.py

@@ -43,7 +43,7 @@ class FilterDriver(Protocol):
         """Apply clean filter (working tree → repository)."""
         ...
 
-    def smudge(self, data: bytes) -> bytes:
+    def smudge(self, data: bytes, path: bytes = b"") -> bytes:
         """Apply smudge filter (repository → working tree)."""
         ...
 
@@ -56,10 +56,12 @@ class ProcessFilterDriver:
         clean_cmd: Optional[str] = None,
         smudge_cmd: Optional[str] = None,
         required: bool = False,
+        cwd: Optional[str] = None,
     ) -> None:
         self.clean_cmd = clean_cmd
         self.smudge_cmd = smudge_cmd
         self.required = required
+        self.cwd = cwd
 
     def clean(self, data: bytes) -> bytes:
         """Apply clean filter using external process."""
@@ -75,6 +77,7 @@ class ProcessFilterDriver:
                 input=data,
                 capture_output=True,
                 check=True,
+                cwd=self.cwd,
             )
             return result.stdout
         except subprocess.CalledProcessError as e:
@@ -84,20 +87,24 @@ class ProcessFilterDriver:
             logging.warning(f"Optional clean filter failed: {e}")
             return data
 
-    def smudge(self, data: bytes) -> bytes:
+    def smudge(self, data: bytes, path: bytes = b"") -> bytes:
         """Apply smudge filter using external process."""
         if not self.smudge_cmd:
             if self.required:
                 raise FilterError("Smudge command is required but not configured")
             return data
 
+        # Substitute %f placeholder with file path
+        cmd = self.smudge_cmd.replace("%f", path.decode("utf-8", errors="replace"))
+
         try:
             result = subprocess.run(
-                self.smudge_cmd,
+                cmd,
                 shell=True,
                 input=data,
                 capture_output=True,
                 check=True,
+                cwd=self.cwd,
             )
             return result.stdout
         except subprocess.CalledProcessError as e:
@@ -140,19 +147,19 @@ class FilterRegistry:
         if name in self._drivers:
             return self._drivers[name]
 
-        # Try to create from factory
-        if name in self._factories:
-            factory_driver = self._factories[name](self)
-            self._drivers[name] = factory_driver
-            return factory_driver
-
-        # Try to create from config
+        # Try to create from config first (respect user configuration)
         if self.config is not None:
             config_driver = self._create_from_config(name)
             if config_driver is not None:
                 self._drivers[name] = config_driver
                 return config_driver
 
+        # Try to create from factory as fallback
+        if name in self._factories:
+            factory_driver = self._factories[name](self)
+            self._drivers[name] = factory_driver
+            return factory_driver
+
         return None
 
     def _create_from_config(self, name: str) -> Optional[FilterDriver]:
@@ -187,7 +194,9 @@ class FilterRegistry:
         required = self.config.get_boolean(("filter", name), "required", False)
 
         if clean_cmd or smudge_cmd:
-            return ProcessFilterDriver(clean_cmd, smudge_cmd, required)
+            # Get repository working directory
+            repo_path = self.repo.path if self.repo else None
+            return ProcessFilterDriver(clean_cmd, smudge_cmd, required, repo_path)
 
         return None
 
@@ -205,7 +214,8 @@ class FilterRegistry:
             lfs_dir = tempfile.mkdtemp(prefix="dulwich-lfs-")
             lfs_store = LFSStore.create(lfs_dir)
 
-        return LFSFilterDriver(lfs_store)
+        config = registry.repo.get_config_stack() if registry.repo else None
+        return LFSFilterDriver(lfs_store, config=config)
 
     def _create_text_filter(self, registry: "FilterRegistry") -> FilterDriver:
         """Create text filter driver for line ending conversion.
@@ -397,7 +407,7 @@ class FilterBlobNormalizer:
             return blob
 
         # Apply smudge filter
-        filtered_data = filter_driver.smudge(blob.data)
+        filtered_data = filter_driver.smudge(blob.data, path)
         if filtered_data == blob.data:
             return blob
 

+ 151 - 34
dulwich/lfs.py

@@ -21,16 +21,17 @@
 
 import hashlib
 import json
+import logging
 import os
 import tempfile
 from collections.abc import Iterable
 from dataclasses import dataclass
 from typing import TYPE_CHECKING, BinaryIO, Optional, Union
-from urllib.error import HTTPError
-from urllib.parse import urljoin
+from urllib.parse import urljoin, urlparse
 from urllib.request import Request, urlopen
 
 if TYPE_CHECKING:
+    from .config import Config
     from .repo import Repo
 
 
@@ -96,6 +97,13 @@ class LFSStore:
             return cls.create(lfs_dir)
         return cls(lfs_dir)
 
+    @classmethod
+    def from_controldir(cls, controldir: str, create: bool = False) -> "LFSStore":
+        lfs_dir = os.path.join(controldir, "lfs")
+        if create:
+            return cls.create(lfs_dir)
+        return cls(lfs_dir)
+
     def _sha_path(self, sha: str) -> str:
         return os.path.join(self.path, "objects", sha[0:2], sha[2:4], sha)
 
@@ -200,8 +208,11 @@ class LFSPointer:
 class LFSFilterDriver:
     """LFS filter driver implementation."""
 
-    def __init__(self, lfs_store: "LFSStore") -> None:
+    def __init__(
+        self, lfs_store: "LFSStore", config: Optional["Config"] = None
+    ) -> None:
         self.lfs_store = lfs_store
+        self.config = config
 
     def clean(self, data: bytes) -> bytes:
         """Convert file content to LFS pointer (clean filter)."""
@@ -217,7 +228,7 @@ class LFSFilterDriver:
         pointer = LFSPointer(sha, len(data))
         return pointer.to_bytes()
 
-    def smudge(self, data: bytes) -> bytes:
+    def smudge(self, data: bytes, path: bytes = b"") -> bytes:
         """Convert LFS pointer to file content (smudge filter)."""
         # Try to parse as LFS pointer
         pointer = LFSPointer.from_bytes(data)
@@ -234,23 +245,133 @@ class LFSFilterDriver:
             with self.lfs_store.open_object(pointer.oid) as f:
                 return f.read()
         except KeyError:
-            # Object not found in LFS store, return pointer as-is
-            # This matches Git LFS behavior when object is missing
-            return data
+            # Object not found in LFS store, try to download it
+            try:
+                content = self._download_object(pointer)
+                return content
+            except LFSError as e:
+                # Download failed, fall back to returning pointer
+                logging.warning("LFS object download failed for %s: %s", pointer.oid, e)
+
+                # Return pointer as-is when object is missing and download failed
+                return data
+
+    def _download_object(self, pointer: LFSPointer) -> bytes:
+        """Download an LFS object from the server.
+
+        Args:
+            pointer: LFS pointer containing OID and size
+
+        Returns:
+            Downloaded content
+
+        Raises:
+            LFSError: If download fails for any reason
+        """
+        if self.config is None:
+            raise LFSError("No configuration available for LFS download")
+
+        # Create LFS client and download
+        client = LFSClient.from_config(self.config)
+        if client is None:
+            raise LFSError("No LFS client available from configuration")
+        content = client.download(pointer.oid, pointer.size)
+
+        # Store the downloaded content in local LFS store
+        stored_oid = self.lfs_store.write_object([content])
+
+        # Verify the stored OID matches what we expected
+        if stored_oid != pointer.oid:
+            raise LFSError(
+                f"Downloaded OID mismatch: expected {pointer.oid}, got {stored_oid}"
+            )
+
+        return content
+
+
+def _get_lfs_user_agent(config):
+    """Get User-Agent string for LFS requests, respecting git config."""
+    try:
+        if config:
+            # Use configured user agent verbatim if set
+            return config.get(b"http", b"useragent").decode()
+    except KeyError:
+        pass
+
+    # Default LFS user agent (similar to git-lfs format)
+    from . import __version__
+
+    version_str = ".".join([str(x) for x in __version__])
+    return f"git-lfs/dulwich/{version_str}"
 
 
 class LFSClient:
     """LFS client for network operations."""
 
-    def __init__(self, url: str, auth: Optional[tuple[str, str]] = None) -> None:
+    def __init__(self, url: str, config: Optional["Config"] = None) -> None:
         """Initialize LFS client.
 
         Args:
             url: LFS server URL
-            auth: Optional (username, password) tuple for authentication
+            config: Optional git config for authentication/proxy settings
         """
-        self.url = url.rstrip("/")
-        self.auth = auth
+        self._base_url = url.rstrip("/") + "/"  # Ensure trailing slash for urljoin
+        self.config = config
+        self._pool_manager = None
+
+    @classmethod
+    def from_config(cls, config: "Config") -> Optional["LFSClient"]:
+        """Create LFS client from git config."""
+        # Try to get LFS URL from config first
+        try:
+            url = config.get((b"lfs",), b"url").decode()
+        except KeyError:
+            pass
+        else:
+            return cls(url, config)
+
+        # Fall back to deriving from remote URL (same as git-lfs)
+        try:
+            remote_url = config.get((b"remote", b"origin"), b"url").decode()
+        except KeyError:
+            pass
+        else:
+            # Convert SSH URLs to HTTPS if needed
+            if remote_url.startswith("git@"):
+                # Convert git@host:user/repo.git to https://host/user/repo.git
+                if ":" in remote_url and "/" in remote_url:
+                    host_and_path = remote_url[4:]  # Remove "git@"
+                    if ":" in host_and_path:
+                        host, path = host_and_path.split(":", 1)
+                        remote_url = f"https://{host}/{path}"
+
+            # Ensure URL ends with .git for consistent LFS endpoint
+            if not remote_url.endswith(".git"):
+                remote_url = f"{remote_url}.git"
+
+            # Standard LFS endpoint is remote_url + "/info/lfs"
+            lfs_url = f"{remote_url}/info/lfs"
+
+            parsed = urlparse(lfs_url)
+            if not parsed.scheme or not parsed.netloc:
+                return None
+
+            return LFSClient(lfs_url, config)
+
+        return None
+
+    @property
+    def url(self) -> str:
+        """Get the LFS server URL without trailing slash."""
+        return self._base_url.rstrip("/")
+
+    def _get_pool_manager(self):
+        """Get urllib3 pool manager with git config applied."""
+        if self._pool_manager is None:
+            from dulwich.client import default_urllib3_manager
+
+            self._pool_manager = default_urllib3_manager(self.config)
+        return self._pool_manager
 
     def _make_request(
         self,
@@ -260,29 +381,23 @@ class LFSClient:
         headers: Optional[dict[str, str]] = None,
     ) -> bytes:
         """Make an HTTP request to the LFS server."""
-        url = urljoin(self.url, path)
+        url = urljoin(self._base_url, path)
         req_headers = {
             "Accept": "application/vnd.git-lfs+json",
             "Content-Type": "application/vnd.git-lfs+json",
+            "User-Agent": _get_lfs_user_agent(self.config),
         }
         if headers:
             req_headers.update(headers)
 
-        req = Request(url, data=data, headers=req_headers, method=method)
-
-        if self.auth:
-            import base64
-
-            auth_str = f"{self.auth[0]}:{self.auth[1]}"
-            b64_auth = base64.b64encode(auth_str.encode()).decode("ascii")
-            req.add_header("Authorization", f"Basic {b64_auth}")
-
-        try:
-            with urlopen(req) as response:
-                return response.read()
-        except HTTPError as e:
-            error_body = e.read().decode("utf-8", errors="ignore")
-            raise LFSError(f"LFS server error {e.code}: {error_body}")
+        # Use urllib3 pool manager with git config applied
+        pool_manager = self._get_pool_manager()
+        response = pool_manager.request(method, url, headers=req_headers, body=data)
+        if response.status >= 400:
+            raise ValueError(
+                f"HTTP {response.status}: {response.data.decode('utf-8', errors='ignore')}"
+            )
+        return response.data
 
     def batch(
         self,
@@ -311,8 +426,10 @@ class LFSClient:
             data["ref"] = {"name": ref}
 
         response = self._make_request(
-            "POST", "/objects/batch", json.dumps(data).encode("utf-8")
+            "POST", "objects/batch", json.dumps(data).encode("utf-8")
         )
+        if not response:
+            raise ValueError("Empty response from LFS server")
         response_data = json.loads(response)
         return self._parse_batch_response(response_data)
 
@@ -378,14 +495,14 @@ class LFSClient:
         download_action = obj.actions["download"]
         download_url = download_action.href
 
-        # Download the object
-        req = Request(download_url)
+        # Download the object using urllib3 with git config
+        download_headers = {"User-Agent": _get_lfs_user_agent(self.config)}
         if download_action.header:
-            for name, value in download_action.header.items():
-                req.add_header(name, value)
+            download_headers.update(download_action.header)
 
-        with urlopen(req) as response:
-            content = response.read()
+        pool_manager = self._get_pool_manager()
+        response = pool_manager.request("GET", download_url, headers=download_headers)
+        content = response.data
 
         # Verify size
         if len(content) != size:

+ 1 - 1
dulwich/line_ending.py

@@ -178,7 +178,7 @@ class LineEndingFilter(FilterDriver):
 
         return self.clean_conversion(data)
 
-    def smudge(self, data: bytes) -> bytes:
+    def smudge(self, data: bytes, path: bytes = b"") -> bytes:
         """Apply line ending conversion for checkout (repository -> working tree)."""
         if self.smudge_conversion is None:
             return data

+ 3 - 3
dulwich/porcelain.py

@@ -5068,7 +5068,7 @@ def lfs_clean(repo=".", path=None):
 
         # Get LFS store
         lfs_store = LFSStore.from_repo(r)
-        filter_driver = LFSFilterDriver(lfs_store)
+        filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
 
         # Read file content
         full_path = os.path.join(r.path, path)
@@ -5097,7 +5097,7 @@ def lfs_smudge(repo=".", pointer_content=None):
 
         # Get LFS store
         lfs_store = LFSStore.from_repo(r)
-        filter_driver = LFSFilterDriver(lfs_store)
+        filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
 
         # Smudge the pointer (retrieve actual content)
         return filter_driver.smudge(pointer_content)
@@ -5162,7 +5162,7 @@ def lfs_migrate(repo=".", include=None, exclude=None, everything=False):
     with open_repo_closing(repo) as r:
         # Initialize LFS if needed
         lfs_store = LFSStore.from_repo(r, create=True)
-        filter_driver = LFSFilterDriver(lfs_store)
+        filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
 
         # Get current index
         index = r.open_index()

+ 15 - 4
tests/compat/test_lfs.py

@@ -355,14 +355,25 @@ class LFSCloneCompatTest(LFSCompatTestCase):
         cloned_repo = porcelain.clone(source_dir, target_dir)
         self.addCleanup(cloned_repo.close)
 
-        # Verify LFS file exists as pointer
+        # Verify LFS file exists
         cloned_file = os.path.join(target_dir, "test.bin")
         with open(cloned_file, "rb") as f:
             content = f.read()
 
-        # Should be a pointer, not the full content
-        self.assertLess(len(content), 1000)  # Pointer is much smaller
-        self.assertIn(b"version https://git-lfs.github.com/spec/v1", content)
+        # Check if filter.lfs.smudge is configured
+        cloned_config = cloned_repo.get_config()
+        try:
+            lfs_smudge = cloned_config.get((b"filter", b"lfs"), b"smudge")
+            has_lfs_config = bool(lfs_smudge)
+        except KeyError:
+            has_lfs_config = False
+
+        if has_lfs_config:
+            # git-lfs smudge filter should have converted it
+            self.assertEqual(content, test_content)
+        else:
+            # No git-lfs config (uses built-in filter), should be a pointer
+            self.assertIn(b"version https://git-lfs.github.com/spec/v1", content)
 
 
 if __name__ == "__main__":

+ 105 - 1
tests/test_lfs.py

@@ -22,10 +22,13 @@
 """Tests for LFS support."""
 
 import json
+import os
 import shutil
 import tempfile
 
+from dulwich import porcelain
 from dulwich.lfs import LFSFilterDriver, LFSPointer, LFSStore
+from dulwich.repo import Repo
 
 from . import TestCase
 
@@ -306,6 +309,107 @@ class LFSIntegrationTests(TestCase):
                 f"Failed for content: {content!r}",
             )
 
+    def test_builtin_lfs_clone_no_config(self) -> None:
+        """Test cloning with LFS when no git-lfs commands are configured."""
+        # Create source repository
+        source_dir = os.path.join(self.test_dir, "source")
+        os.makedirs(source_dir)
+        source_repo = Repo.init(source_dir)
+
+        # Create empty config (no LFS commands)
+        config = source_repo.get_config()
+        config.write_to_path()
+
+        # Create .gitattributes with LFS filter
+        gitattributes_path = os.path.join(source_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.bin filter=lfs\n")
+
+        # Create test content and store in LFS
+        test_content = b"Test binary content"
+        test_oid = LFSStore.from_repo(source_repo, create=True).write_object(
+            [test_content]
+        )
+
+        # Create LFS pointer file
+        pointer = LFSPointer(test_oid, len(test_content))
+        pointer_file = os.path.join(source_dir, "test.bin")
+        with open(pointer_file, "wb") as f:
+            f.write(pointer.to_bytes())
+
+        # Commit files
+        porcelain.add(source_repo, paths=[".gitattributes", "test.bin"])
+        porcelain.commit(source_repo, message=b"Add LFS tracked file")
+        source_repo.close()
+
+        # Clone the repository
+        target_dir = os.path.join(self.test_dir, "target")
+        target_repo = porcelain.clone(source_dir, target_dir)
+
+        # Verify no LFS commands in config
+        target_config = target_repo.get_config_stack()
+        with self.assertRaises(KeyError):
+            target_config.get((b"filter", b"lfs"), b"smudge")
+
+        # Check the cloned file
+        cloned_file = os.path.join(target_dir, "test.bin")
+        with open(cloned_file, "rb") as f:
+            content = f.read()
+
+        # Should still be a pointer (LFS object not in target's store)
+        self.assertTrue(
+            content.startswith(b"version https://git-lfs.github.com/spec/v1")
+        )
+        self.assertIn(test_oid.encode(), content)
+        target_repo.close()
+
+    def test_builtin_lfs_with_local_objects(self) -> None:
+        """Test built-in LFS filter when objects are available locally."""
+        # No LFS config
+        config = self.repo.get_config()
+        config.write_to_path()
+
+        # Create .gitattributes
+        gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.dat filter=lfs\n")
+
+        # Create LFS store and add object
+        test_content = b"Hello from LFS!"
+        lfs_store = LFSStore.from_repo(self.repo, create=True)
+        test_oid = lfs_store.write_object([test_content])
+
+        # Create pointer file
+        pointer = LFSPointer(test_oid, len(test_content))
+        pointer_file = os.path.join(self.test_dir, "data.dat")
+        with open(pointer_file, "wb") as f:
+            f.write(pointer.to_bytes())
+
+        # Commit
+        porcelain.add(self.repo, paths=[".gitattributes", "data.dat"])
+        porcelain.commit(self.repo, message=b"Add LFS file")
+
+        # Reset index to trigger checkout with filter
+        self.repo.reset_index()
+
+        # Check file content
+        with open(pointer_file, "rb") as f:
+            content = f.read()
+
+        # Built-in filter should have converted pointer to actual content
+        self.assertEqual(content, test_content)
+
+    def test_builtin_lfs_filter_used(self) -> None:
+        """Verify that built-in LFS filter is used when no config exists."""
+        # Get filter registry
+        normalizer = self.repo.get_blob_normalizer()
+        filter_registry = normalizer.filter_registry
+        lfs_driver = filter_registry.get_driver("lfs")
+
+        # Should be built-in LFS filter
+        self.assertIsInstance(lfs_driver, LFSFilterDriver)
+        self.assertEqual(type(lfs_driver).__module__, "dulwich.lfs")
+
 
 class LFSFilterDriverTests(TestCase):
     def setUp(self) -> None:
@@ -873,7 +977,7 @@ class LFSClientTests(TestCase):
         self.addCleanup(self.server.shutdown)
 
         # Create LFS client pointing to our test server
-        self.client = LFSClient(f"{self.server_url}/objects")
+        self.client = LFSClient(self.server_url)
 
     def test_client_url_normalization(self) -> None:
         """Test that client URL is normalized correctly."""

+ 138 - 1
tests/test_porcelain_filters.py

@@ -21,6 +21,7 @@
 
 """Tests for porcelain filter integration."""
 
+import hashlib
 import os
 import tempfile
 from io import BytesIO
@@ -215,11 +216,66 @@ class PorcelainFilterTests(TestCase):
             # The checkout should apply the smudge filter
             self.assertIn(b"\r\n", content)
 
+    def test_process_filter_priority(self) -> None:
+        """Test that process filters take priority over built-in ones."""
+        # Create a cross-platform filter command
+        import sys
+
+        if sys.platform == "win32":
+            # On Windows, use echo command directly
+            filter_cmd = "echo FILTERED"
+        else:
+            # On Unix, create a shell script
+            filter_script = os.path.join(self.test_dir, "test-filter.sh")
+            with open(filter_script, "w") as f:
+                f.write("#!/bin/sh\necho 'FILTERED'")
+            os.chmod(filter_script, 0o755)
+            filter_cmd = filter_script
+
+        # Configure custom filter
+        config = self.repo.get_config()
+        config.set((b"filter", b"test"), b"smudge", filter_cmd.encode())
+        config.write_to_path()
+
+        # Create .gitattributes
+        gitattributes = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes, "wb") as f:
+            f.write(b"*.txt filter=test\n")
+
+        # Test filter application
+        from dulwich.filters import FilterRegistry
+
+        filter_registry = FilterRegistry(config, self.repo)
+        test_driver = filter_registry.get_driver("test")
+
+        # Should be ProcessFilterDriver, not built-in
+        from dulwich.filters import ProcessFilterDriver
+
+        self.assertIsInstance(test_driver, ProcessFilterDriver)
+
+        # Test smudge
+        result = test_driver.smudge(b"original", b"test.txt")
+        # Strip line endings to handle platform differences
+        self.assertEqual(result.rstrip(), b"FILTERED")
+
     def test_commit_with_clean_filter(self) -> None:
         """Test committing with a clean filter."""
         # Set up a custom filter in git config
         config = self.repo.get_config()
-        config.set((b"filter", b"testfilter"), b"clean", b"sed 's/SECRET/REDACTED/g'")
+        import sys
+
+        if sys.platform == "win32":
+            # On Windows, use PowerShell for string replacement
+            config.set(
+                (b"filter", b"testfilter"),
+                b"clean",
+                b"powershell -Command \"$input -replace 'SECRET', 'REDACTED'\"",
+            )
+        else:
+            # On Unix, use sed
+            config.set(
+                (b"filter", b"testfilter"), b"clean", b"sed 's/SECRET/REDACTED/g'"
+            )
         config.write_to_path()
 
         # Create .gitattributes to use the filter
@@ -241,6 +297,87 @@ class PorcelainFilterTests(TestCase):
         # The committed blob should have filtered content
         # (Note: actual filter execution requires process filter support)
 
+    def test_clone_with_builtin_lfs_filter(self) -> None:
+        """Test cloning with built-in LFS filter (no subprocess)."""
+        # Create a source repository with LFS
+        source_dir = tempfile.mkdtemp()
+        self.addCleanup(rmtree_ro, source_dir)
+        source_repo = Repo.init(source_dir)
+        self.addCleanup(source_repo.close)
+
+        # Create .gitattributes with LFS filter
+        gitattributes_path = os.path.join(source_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.bin filter=lfs\n")
+
+        # Create LFS pointer file manually
+        from dulwich.lfs import LFSPointer
+
+        pointer = LFSPointer(
+            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0
+        )
+        pointer_file = os.path.join(source_dir, "empty.bin")
+        with open(pointer_file, "wb") as f:
+            f.write(pointer.to_bytes())
+
+        # Create actual LFS object in the store
+        from dulwich.lfs import LFSStore
+
+        lfs_store = LFSStore.from_repo(source_repo, create=True)
+        lfs_store.write_object([b""])  # Empty file content
+
+        # Commit the files
+        porcelain.add(source_repo, paths=[".gitattributes", "empty.bin"])
+        porcelain.commit(source_repo, message=b"Add LFS file")
+
+        # Clone the repository (should use built-in LFS filter)
+        target_dir = tempfile.mkdtemp()
+        self.addCleanup(rmtree_ro, target_dir)
+
+        # Clone with built-in filter (no git-lfs config)
+        target_repo = porcelain.clone(source_dir, target_dir)
+        self.addCleanup(target_repo.close)
+
+        # Verify the file was checked out with the filter
+        target_file = os.path.join(target_dir, "empty.bin")
+        with open(target_file, "rb") as f:
+            content = f.read()
+
+        # Without git-lfs configured, the built-in filter is used
+        # Since the LFS object isn't in the target repo's store,
+        # it should remain as a pointer
+        self.assertIn(b"version https://git-lfs", content)
+        self.assertIn(
+            b"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", content
+        )
+
+    def test_builtin_lfs_filter_with_object(self) -> None:
+        """Test built-in LFS filter when object is available in store."""
+        # Create test content
+        test_content = b"Hello, LFS!"
+        test_oid = hashlib.sha256(test_content).hexdigest()
+
+        # Create LFS pointer
+        from dulwich.lfs import LFSPointer
+
+        pointer = LFSPointer(test_oid, len(test_content))
+
+        # Create LFS store and write object
+        from dulwich.lfs import LFSStore
+
+        lfs_store = LFSStore.from_repo(self.repo, create=True)
+        lfs_store.write_object([test_content])
+
+        # Test smudge filter
+        from dulwich.filters import FilterRegistry
+
+        filter_registry = FilterRegistry(self.repo.get_config_stack(), self.repo)
+        lfs_driver = filter_registry.get_driver("lfs")
+
+        # Smudge should return actual content since object is in store
+        smudged = lfs_driver.smudge(pointer.to_bytes(), b"test.txt")
+        self.assertEqual(smudged, test_content)
+
     def test_ls_files_with_filters(self) -> None:
         """Test ls-files respects filter settings."""
         # Configure autocrlf

+ 186 - 0
tests/test_porcelain_lfs.py

@@ -239,6 +239,192 @@ class LFSPorcelainTestCase(TestCase):
         self.assertIsNone(results["regular.txt"])
         self.assertIsNone(results["nonexistent.txt"])
 
+    def test_clone_with_builtin_lfs_no_config(self):
+        """Test cloning with built-in LFS filter when no git-lfs config exists."""
+        # Create a source repo with LFS content
+        source_dir = tempfile.mkdtemp()
+        self.addCleanup(lambda: self._cleanup_test_dir_path(source_dir))
+        source_repo = Repo.init(source_dir)
+
+        # Create .gitattributes
+        gitattributes_path = os.path.join(source_dir, ".gitattributes")
+        with open(gitattributes_path, "w") as f:
+            f.write("*.bin filter=lfs diff=lfs merge=lfs -text\n")
+
+        # Create test content and store in LFS
+        # LFSStore.from_repo with create=True will create the directories
+        test_content = b"This is test content for LFS"
+        lfs_store = LFSStore.from_repo(source_repo, create=True)
+        oid = lfs_store.write_object([test_content])
+
+        # Create LFS pointer file
+        pointer = LFSPointer(oid, len(test_content))
+        test_file = os.path.join(source_dir, "test.bin")
+        with open(test_file, "wb") as f:
+            f.write(pointer.to_bytes())
+
+        # Add and commit
+        porcelain.add(source_repo, paths=[".gitattributes", "test.bin"])
+        porcelain.commit(source_repo, message=b"Add LFS file")
+
+        # Clone with empty config (no git-lfs commands)
+        clone_dir = tempfile.mkdtemp()
+        self.addCleanup(lambda: self._cleanup_test_dir_path(clone_dir))
+
+        # Verify source repo has no LFS filter config
+        config = source_repo.get_config()
+        with self.assertRaises(KeyError):
+            config.get((b"filter", b"lfs"), b"smudge")
+
+        # Clone the repository
+        cloned_repo = porcelain.clone(source_dir, clone_dir)
+
+        # Verify that built-in LFS filter was used
+        normalizer = cloned_repo.get_blob_normalizer()
+        if hasattr(normalizer, "filter_registry"):
+            lfs_driver = normalizer.filter_registry.get_driver("lfs")
+            # Should be the built-in LFSFilterDriver
+            self.assertEqual(type(lfs_driver).__name__, "LFSFilterDriver")
+            self.assertEqual(type(lfs_driver).__module__, "dulwich.lfs")
+
+        # Check that the file remains as a pointer (expected behavior)
+        # The built-in LFS filter preserves pointers when objects aren't available
+        cloned_file = os.path.join(clone_dir, "test.bin")
+        with open(cloned_file, "rb") as f:
+            content = f.read()
+
+        # Should still be a pointer since objects weren't transferred
+        self.assertTrue(
+            content.startswith(b"version https://git-lfs.github.com/spec/v1")
+        )
+        cloned_pointer = LFSPointer.from_bytes(content)
+        self.assertIsNotNone(cloned_pointer)
+        self.assertEqual(cloned_pointer.oid, pointer.oid)
+        self.assertEqual(cloned_pointer.size, pointer.size)
+
+        source_repo.close()
+        cloned_repo.close()
+
+    def _cleanup_test_dir_path(self, path):
+        """Clean up a test directory by path."""
+        import shutil
+
+        shutil.rmtree(path, ignore_errors=True)
+
+    def test_add_applies_clean_filter(self):
+        """Test that add operation applies LFS clean filter."""
+        # Don't use lfs_init to avoid configuring git-lfs commands
+        # Create LFS store manually
+        lfs_store = LFSStore.from_repo(self.repo, create=True)
+
+        # Create .gitattributes
+        gitattributes_path = os.path.join(self.repo.path, ".gitattributes")
+        with open(gitattributes_path, "w") as f:
+            f.write("*.bin filter=lfs diff=lfs merge=lfs -text\n")
+
+        # Create a file that should be cleaned to LFS
+        test_content = b"This is large file content that should be stored in LFS"
+        test_file = os.path.join(self.repo.path, "large.bin")
+        with open(test_file, "wb") as f:
+            f.write(test_content)
+
+        # Add the file - this should apply the clean filter
+        porcelain.add(self.repo, paths=["large.bin"])
+
+        # Check that the file was cleaned to a pointer in the index
+        index = self.repo.open_index()
+        entry = index[b"large.bin"]
+
+        # Get the blob from the object store
+        blob = self.repo.get_object(entry.sha)
+        content = blob.data
+
+        # Should be an LFS pointer
+        self.assertTrue(
+            content.startswith(b"version https://git-lfs.github.com/spec/v1")
+        )
+        pointer = LFSPointer.from_bytes(content)
+        self.assertIsNotNone(pointer)
+        self.assertEqual(pointer.size, len(test_content))
+
+        # Verify the actual content was stored in LFS
+        with lfs_store.open_object(pointer.oid) as f:
+            stored_content = f.read()
+        self.assertEqual(stored_content, test_content)
+
+    def test_checkout_applies_smudge_filter(self):
+        """Test that checkout operation applies LFS smudge filter."""
+        # Create LFS store and content
+        lfs_store = LFSStore.from_repo(self.repo, create=True)
+
+        # Create .gitattributes
+        gitattributes_path = os.path.join(self.repo.path, ".gitattributes")
+        with open(gitattributes_path, "w") as f:
+            f.write("*.bin filter=lfs diff=lfs merge=lfs -text\n")
+
+        # Create test content and store in LFS
+        test_content = b"This is the actual file content from LFS"
+        oid = lfs_store.write_object([test_content])
+
+        # Create LFS pointer file
+        pointer = LFSPointer(oid, len(test_content))
+        test_file = os.path.join(self.repo.path, "data.bin")
+        with open(test_file, "wb") as f:
+            f.write(pointer.to_bytes())
+
+        # Add and commit the pointer
+        porcelain.add(self.repo, paths=[".gitattributes", "data.bin"])
+        porcelain.commit(self.repo, message=b"Add LFS file")
+
+        # Remove the file from working directory
+        os.remove(test_file)
+
+        # Checkout the file - this should apply the smudge filter
+        porcelain.checkout(self.repo, paths=["data.bin"])
+
+        # Verify the file was expanded from pointer to content
+        with open(test_file, "rb") as f:
+            content = f.read()
+
+        self.assertEqual(content, test_content)
+
+    def test_reset_hard_applies_smudge_filter(self):
+        """Test that reset --hard applies LFS smudge filter."""
+        # Create LFS store and content
+        lfs_store = LFSStore.from_repo(self.repo, create=True)
+
+        # Create .gitattributes
+        gitattributes_path = os.path.join(self.repo.path, ".gitattributes")
+        with open(gitattributes_path, "w") as f:
+            f.write("*.bin filter=lfs diff=lfs merge=lfs -text\n")
+
+        # Create test content and store in LFS
+        test_content = b"Content that should be restored by reset"
+        oid = lfs_store.write_object([test_content])
+
+        # Create LFS pointer file
+        pointer = LFSPointer(oid, len(test_content))
+        test_file = os.path.join(self.repo.path, "reset-test.bin")
+        with open(test_file, "wb") as f:
+            f.write(pointer.to_bytes())
+
+        # Add and commit
+        porcelain.add(self.repo, paths=[".gitattributes", "reset-test.bin"])
+        commit_sha = porcelain.commit(self.repo, message=b"Add LFS file for reset test")
+
+        # Modify the file in working directory
+        with open(test_file, "wb") as f:
+            f.write(b"Modified content that should be discarded")
+
+        # Reset hard - this should restore the file with smudge filter applied
+        porcelain.reset(self.repo, mode="hard", treeish=commit_sha)
+
+        # Verify the file was restored with LFS content
+        with open(test_file, "rb") as f:
+            content = f.read()
+
+        self.assertEqual(content, test_content)
+
 
 if __name__ == "__main__":
     unittest.main()

+ 1 - 1
tests/test_sparse_patterns.py

@@ -543,7 +543,7 @@ class ApplyIncludedPathsTests(TestCase):
 
         # Create a simple filter that converts content to uppercase
         class UppercaseFilter:
-            def smudge(self, input_bytes):
+            def smudge(self, input_bytes, path=b""):
                 return input_bytes.upper()
 
             def clean(self, input_bytes):