Parcourir la source

Add LFS support to porcelain layer

- Integrate LFS into high-level porcelain commands
- Add LFS-specific porcelain functions
Jelmer Vernooij il y a 1 mois
Parent
commit
2fd4b52d98
4 fichiers modifiés avec 1392 ajouts et 30 suppressions
  1. 704 30
      dulwich/porcelain.py
  2. 2 0
      tests/__init__.py
  3. 442 0
      tests/test_porcelain_filters.py
  4. 244 0
      tests/test_porcelain_lfs.py

+ 704 - 30
dulwich/porcelain.py

@@ -107,14 +107,22 @@ from .errors import SendPackError
 from .graph import can_fast_forward
 from .ignore import IgnoreFilterManager
 from .index import (
+    IndexEntry,
     _fs_to_tree_path,
     blob_from_path_and_stat,
     build_file_from_blob,
+    build_index_from_tree,
     get_unstaged_changes,
+    index_entry_from_stat,
+    symlink,
     update_working_tree,
+    validate_path_element_default,
+    validate_path_element_hfs,
+    validate_path_element_ntfs,
 )
 from .object_store import tree_lookup_path
 from .objects import (
+    Blob,
     Commit,
     Tag,
     Tree,
@@ -130,7 +138,13 @@ from .objectspec import (
     parse_tree,
 )
 from .pack import write_pack_from_container, write_pack_index
-from .patch import write_commit_patch, write_tree_diff
+from .patch import (
+    get_summary,
+    write_blob_diff,
+    write_commit_patch,
+    write_object_diff,
+    write_tree_diff,
+)
 from .protocol import ZERO_SHA, Protocol
 from .refs import (
     LOCAL_BRANCH_PREFIX,
@@ -1255,7 +1269,6 @@ def diff(
       outstream: Stream to write to
     """
     from . import diff as diff_module
-    from .objectspec import parse_commit
 
     with open_repo_closing(repo) as r:
         # Normalize paths to bytes
@@ -1272,8 +1285,6 @@ def diff(
 
         # Resolve commit refs to SHAs if provided
         if commit is not None:
-            from .objects import Commit
-
             if isinstance(commit, Commit):
                 # Already a Commit object
                 commit_sha = commit.id
@@ -1288,9 +1299,6 @@ def diff(
 
         if commit2 is not None:
             # Compare two commits
-            from .objects import Commit
-            from .patch import write_object_diff
-
             if isinstance(commit2, Commit):
                 commit2_obj = commit2
             else:
@@ -1416,8 +1424,6 @@ def submodule_update(repo, paths=None, init=False, force=False, errstream=None)
       init: If True, initialize submodules first
       force: Force update even if local changes exist
     """
-    from .client import get_transport_and_path
-    from .index import build_index_from_tree
     from .submodule import iter_cached_submodules
 
     with open_repo_closing(repo) as r:
@@ -1809,7 +1815,6 @@ def reset(repo, mode, treeish: Union[str, bytes, Commit, Tree, Tag] = "HEAD") ->
 
         elif mode == "mixed":
             # Mixed reset: update HEAD and index, but leave working tree unchanged
-            from .index import IndexEntry
             from .object_store import iter_tree_contents
 
             # Open the index
@@ -1852,13 +1857,6 @@ def reset(repo, mode, treeish: Union[str, bytes, Commit, Tree, Tag] = "HEAD") ->
             config = r.get_config()
             honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
 
-            # Import validation functions
-            from .index import (
-                validate_path_element_default,
-                validate_path_element_hfs,
-                validate_path_element_ntfs,
-            )
-
             if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
                 validate_path_element = validate_path_element_ntfs
             elif config.get_boolean(
@@ -1869,9 +1867,6 @@ def reset(repo, mode, treeish: Union[str, bytes, Commit, Tree, Tag] = "HEAD") ->
                 validate_path_element = validate_path_element_default
 
             if config.get_boolean(b"core", b"symlinks", True):
-                # Import symlink function
-                from .index import symlink
-
                 symlink_fn = symlink
             else:
 
@@ -3074,9 +3069,10 @@ def update_head(repo, target, detached=False, new_branch=None) -> None:
 
 def checkout(
     repo,
-    target: Union[str, bytes, Commit, Tag],
+    target: Optional[Union[str, bytes, Commit, Tag]] = None,
     force: bool = False,
     new_branch: Optional[Union[bytes, str]] = None,
+    paths: Optional[list[Union[bytes, str]]] = None,
 ) -> None:
     """Switch to a branch or commit, updating both HEAD and the working tree.
 
@@ -3086,9 +3082,12 @@ def checkout(
 
     Args:
       repo: Path to repository or repository object
-      target: Branch name, tag, or commit SHA to checkout
+      target: Branch name, tag, or commit SHA to checkout. If None and paths is specified,
+              restores files from HEAD
       force: Force checkout even if there are local changes
       new_branch: Create a new branch at target (like git checkout -b)
+      paths: List of specific paths to checkout. If specified, only these paths are updated
+             and HEAD is not changed
 
     Raises:
       CheckoutError: If checkout cannot be performed due to conflicts
@@ -3097,6 +3096,77 @@ def checkout(
     with open_repo_closing(repo) as r:
         # Store the original target for later reference checks
         original_target = target
+        # Handle path-specific checkout (like git checkout -- <paths>)
+        if paths is not None:
+            # Convert paths to bytes
+            byte_paths = []
+            for path in paths:
+                if isinstance(path, str):
+                    byte_paths.append(path.encode(DEFAULT_ENCODING))
+                else:
+                    byte_paths.append(path)
+
+            # If no target specified, use HEAD
+            if target is None:
+                try:
+                    target = r.refs[b"HEAD"]
+                except KeyError:
+                    raise CheckoutError("No HEAD reference found")
+            else:
+                if isinstance(target, str):
+                    target = target.encode(DEFAULT_ENCODING)
+
+            # Get the target commit and tree
+            target_commit = parse_commit(r, target)
+            target_tree = r[target_commit.tree]
+
+            # Get blob normalizer for line ending conversion
+            blob_normalizer = r.get_blob_normalizer()
+
+            # Restore specified paths from target tree
+            for path in byte_paths:
+                try:
+                    # Look up the path in the target tree
+                    mode, sha = target_tree.lookup_path(
+                        r.object_store.__getitem__, path
+                    )
+                    obj = r[sha]
+
+                    # Create directories if needed
+                    # Handle path as string
+                    if isinstance(path, bytes):
+                        path_str = path.decode(DEFAULT_ENCODING)
+                    else:
+                        path_str = path
+                    file_path = os.path.join(r.path, path_str)
+                    os.makedirs(os.path.dirname(file_path), exist_ok=True)
+
+                    # Write the file content
+                    if stat.S_ISREG(mode):
+                        # Apply checkout filters (smudge)
+                        if blob_normalizer:
+                            obj = blob_normalizer.checkout_normalize(obj, path)
+
+                        flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC
+                        if sys.platform == "win32":
+                            flags |= os.O_BINARY
+
+                        with os.fdopen(os.open(file_path, flags, mode), "wb") as f:
+                            f.write(obj.data)
+
+                    # Update the index
+                    r.stage(path)
+
+                except KeyError:
+                    # Path doesn't exist in target tree
+                    pass
+
+            return
+
+        # Normal checkout (switching branches/commits)
+        if target is None:
+            raise ValueError("Target must be specified for branch/commit checkout")
+
         if isinstance(target, str):
             target_bytes = target.encode(DEFAULT_ENCODING)
         elif isinstance(target, bytes):
@@ -3153,18 +3223,12 @@ def checkout(
         config = r.get_config()
         honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
 
-        # Import validation functions
-        from .index import validate_path_element_default, validate_path_element_ntfs
-
         if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
             validate_path_element = validate_path_element_ntfs
         else:
             validate_path_element = validate_path_element_default
 
         if config.get_boolean(b"core", b"symlinks", True):
-            # Import symlink function
-            from .index import symlink
-
             symlink_fn = symlink
         else:
 
@@ -4621,8 +4685,6 @@ def format_patch(
                 )
             else:
                 # Generate filename
-                from .patch import get_summary
-
                 summary = get_summary(commit)
                 filename = os.path.join(outdir, f"{i:04d}-{summary}.patch")
 
@@ -4865,3 +4927,615 @@ def reflog(repo=".", ref=b"HEAD", all=False):
                 # Read the reflog entries for this ref
                 for entry in r.read_reflog(ref_bytes):
                     yield (ref_bytes, entry)
+
+
+def lfs_track(repo=".", patterns=None):
+    """Track file patterns with Git LFS.
+
+    Args:
+      repo: Path to repository
+      patterns: List of file patterns to track (e.g., ["*.bin", "*.pdf"])
+                If None, returns current tracked patterns
+
+    Returns:
+      List of tracked patterns
+    """
+    from .attrs import GitAttributes
+
+    with open_repo_closing(repo) as r:
+        gitattributes_path = os.path.join(r.path, ".gitattributes")
+
+        # Load existing GitAttributes
+        if os.path.exists(gitattributes_path):
+            gitattributes = GitAttributes.from_file(gitattributes_path)
+        else:
+            gitattributes = GitAttributes()
+
+        if patterns is None:
+            # Return current LFS tracked patterns
+            tracked = []
+            for pattern_obj, attrs in gitattributes:
+                if attrs.get(b"filter") == b"lfs":
+                    tracked.append(pattern_obj.pattern.decode())
+            return tracked
+
+        # Add new patterns
+        for pattern in patterns:
+            # Ensure pattern is bytes
+            if isinstance(pattern, str):
+                pattern = pattern.encode()
+
+            # Set LFS attributes for the pattern
+            gitattributes.set_attribute(pattern, b"filter", b"lfs")
+            gitattributes.set_attribute(pattern, b"diff", b"lfs")
+            gitattributes.set_attribute(pattern, b"merge", b"lfs")
+            gitattributes.set_attribute(pattern, b"text", False)
+
+        # Write updated attributes
+        gitattributes.write_to_file(gitattributes_path)
+
+        # Stage the .gitattributes file
+        add(r, [".gitattributes"])
+
+        return lfs_track(r)  # Return updated list
+
+
+def lfs_untrack(repo=".", patterns=None):
+    """Untrack file patterns from Git LFS.
+
+    Args:
+      repo: Path to repository
+      patterns: List of file patterns to untrack
+
+    Returns:
+      List of remaining tracked patterns
+    """
+    from .attrs import GitAttributes
+
+    if not patterns:
+        return lfs_track(repo)
+
+    with open_repo_closing(repo) as r:
+        gitattributes_path = os.path.join(r.path, ".gitattributes")
+
+        if not os.path.exists(gitattributes_path):
+            return []
+
+        # Load existing GitAttributes
+        gitattributes = GitAttributes.from_file(gitattributes_path)
+
+        # Remove specified patterns
+        for pattern in patterns:
+            if isinstance(pattern, str):
+                pattern = pattern.encode()
+
+            # Check if pattern is tracked by LFS
+            for pattern_obj, attrs in list(gitattributes):
+                if pattern_obj.pattern == pattern and attrs.get(b"filter") == b"lfs":
+                    gitattributes.remove_pattern(pattern)
+                    break
+
+        # Write updated attributes
+        gitattributes.write_to_file(gitattributes_path)
+
+        # Stage the .gitattributes file
+        add(r, [".gitattributes"])
+
+        return lfs_track(r)  # Return updated list
+
+
+def lfs_init(repo="."):
+    """Initialize Git LFS in a repository.
+
+    Args:
+      repo: Path to repository
+
+    Returns:
+      None
+    """
+    from .lfs import LFSStore
+
+    with open_repo_closing(repo) as r:
+        # Create LFS store
+        LFSStore.from_repo(r, create=True)
+
+        # Set up Git config for LFS
+        config = r.get_config()
+        config.set((b"filter", b"lfs"), b"process", b"git-lfs filter-process")
+        config.set((b"filter", b"lfs"), b"required", b"true")
+        config.set((b"filter", b"lfs"), b"clean", b"git-lfs clean -- %f")
+        config.set((b"filter", b"lfs"), b"smudge", b"git-lfs smudge -- %f")
+        config.write_to_path()
+
+
+def lfs_clean(repo=".", path=None):
+    """Clean a file by converting it to an LFS pointer.
+
+    Args:
+      repo: Path to repository
+      path: Path to file to clean (relative to repo root)
+
+    Returns:
+      LFS pointer content as bytes
+    """
+    from .lfs import LFSFilterDriver, LFSStore
+
+    with open_repo_closing(repo) as r:
+        if path is None:
+            raise ValueError("Path must be specified")
+
+        # Get LFS store
+        lfs_store = LFSStore.from_repo(r)
+        filter_driver = LFSFilterDriver(lfs_store)
+
+        # Read file content
+        full_path = os.path.join(r.path, path)
+        with open(full_path, "rb") as f:
+            content = f.read()
+
+        # Clean the content (convert to LFS pointer)
+        return filter_driver.clean(content)
+
+
+def lfs_smudge(repo=".", pointer_content=None):
+    """Smudge an LFS pointer by retrieving the actual content.
+
+    Args:
+      repo: Path to repository
+      pointer_content: LFS pointer content as bytes
+
+    Returns:
+      Actual file content as bytes
+    """
+    from .lfs import LFSFilterDriver, LFSStore
+
+    with open_repo_closing(repo) as r:
+        if pointer_content is None:
+            raise ValueError("Pointer content must be specified")
+
+        # Get LFS store
+        lfs_store = LFSStore.from_repo(r)
+        filter_driver = LFSFilterDriver(lfs_store)
+
+        # Smudge the pointer (retrieve actual content)
+        return filter_driver.smudge(pointer_content)
+
+
+def lfs_ls_files(repo=".", ref=None):
+    """List files tracked by Git LFS.
+
+    Args:
+      repo: Path to repository
+      ref: Git ref to check (defaults to HEAD)
+
+    Returns:
+      List of (path, oid, size) tuples for LFS files
+    """
+    from .lfs import LFSPointer
+    from .object_store import iter_tree_contents
+
+    with open_repo_closing(repo) as r:
+        if ref is None:
+            ref = b"HEAD"
+        elif isinstance(ref, str):
+            ref = ref.encode()
+
+        # Get the commit and tree
+        try:
+            commit = r[ref]
+            tree = r[commit.tree]
+        except KeyError:
+            return []
+
+        lfs_files = []
+
+        # Walk the tree
+        for path, mode, sha in iter_tree_contents(r.object_store, tree.id):
+            if not stat.S_ISREG(mode):
+                continue
+
+            # Check if it's an LFS pointer
+            obj = r.object_store[sha]
+            pointer = LFSPointer.from_bytes(obj.data)
+            if pointer is not None:
+                lfs_files.append((path.decode(), pointer.oid, pointer.size))
+
+        return lfs_files
+
+
+def lfs_migrate(repo=".", include=None, exclude=None, everything=False):
+    """Migrate files to Git LFS.
+
+    Args:
+      repo: Path to repository
+      include: Patterns of files to include
+      exclude: Patterns of files to exclude
+      everything: Migrate all files above a certain size
+
+    Returns:
+      Number of migrated files
+    """
+    from .lfs import LFSFilterDriver, LFSStore
+
+    with open_repo_closing(repo) as r:
+        # Initialize LFS if needed
+        lfs_store = LFSStore.from_repo(r, create=True)
+        filter_driver = LFSFilterDriver(lfs_store)
+
+        # Get current index
+        index = r.open_index()
+
+        migrated = 0
+
+        # Determine files to migrate
+        files_to_migrate = []
+
+        if everything:
+            # Migrate all files above 100MB
+            for path, entry in index.items():
+                full_path = os.path.join(r.path, path.decode())
+                if os.path.exists(full_path):
+                    size = os.path.getsize(full_path)
+                    if size > 100 * 1024 * 1024:  # 100MB
+                        files_to_migrate.append(path.decode())
+        else:
+            # Use include/exclude patterns
+            for path, entry in index.items():
+                path_str = path.decode()
+
+                # Check include patterns
+                if include:
+                    matched = any(
+                        fnmatch.fnmatch(path_str, pattern) for pattern in include
+                    )
+                    if not matched:
+                        continue
+
+                # Check exclude patterns
+                if exclude:
+                    excluded = any(
+                        fnmatch.fnmatch(path_str, pattern) for pattern in exclude
+                    )
+                    if excluded:
+                        continue
+
+                files_to_migrate.append(path_str)
+
+        # Migrate files
+        for path in files_to_migrate:
+            full_path = os.path.join(r.path, path)
+            if not os.path.exists(full_path):
+                continue
+
+            # Read file content
+            with open(full_path, "rb") as f:
+                content = f.read()
+
+            # Convert to LFS pointer
+            pointer_content = filter_driver.clean(content)
+
+            # Write pointer back to file
+            with open(full_path, "wb") as f:
+                f.write(pointer_content)
+
+            # Create blob for pointer content and update index
+            blob = Blob()
+            blob.data = pointer_content
+            r.object_store.add_object(blob)
+
+            st = os.stat(full_path)
+            index_entry = index_entry_from_stat(st, blob.id, 0)
+            index[path.encode()] = index_entry
+
+            migrated += 1
+
+        # Write updated index
+        index.write()
+
+        # Track patterns if include was specified
+        if include:
+            lfs_track(r, include)
+
+        return migrated
+
+
+def lfs_pointer_check(repo=".", paths=None):
+    """Check if files are valid LFS pointers.
+
+    Args:
+      repo: Path to repository
+      paths: List of file paths to check (if None, check all files)
+
+    Returns:
+      Dict mapping paths to LFSPointer objects (or None if not a pointer)
+    """
+    from .lfs import LFSPointer
+
+    with open_repo_closing(repo) as r:
+        results = {}
+
+        if paths is None:
+            # Check all files in index
+            index = r.open_index()
+            paths = [path.decode() for path in index]
+
+        for path in paths:
+            full_path = os.path.join(r.path, path)
+            if os.path.exists(full_path):
+                try:
+                    with open(full_path, "rb") as f:
+                        content = f.read()
+                    pointer = LFSPointer.from_bytes(content)
+                    results[path] = pointer
+                except OSError:
+                    results[path] = None
+            else:
+                results[path] = None
+
+        return results
+
+
+def lfs_fetch(repo=".", remote="origin", refs=None):
+    """Fetch LFS objects from remote.
+
+    Args:
+      repo: Path to repository
+      remote: Remote name (default: origin)
+      refs: Specific refs to fetch LFS objects for (default: all refs)
+
+    Returns:
+      Number of objects fetched
+    """
+    from .lfs import LFSClient, LFSPointer, LFSStore
+
+    with open_repo_closing(repo) as r:
+        # Get LFS server URL from config
+        config = r.get_config()
+        lfs_url = config.get((b"lfs",), b"url")
+        if not lfs_url:
+            # Try remote URL
+            remote_url = config.get((b"remote", remote.encode()), b"url")
+            if remote_url:
+                # Append /info/lfs to remote URL
+                remote_url = remote_url.decode()
+                if remote_url.endswith(".git"):
+                    remote_url = remote_url[:-4]
+                lfs_url = f"{remote_url}/info/lfs"
+            else:
+                raise ValueError(f"No LFS URL configured for remote {remote}")
+        else:
+            lfs_url = lfs_url.decode()
+
+        # Get authentication
+        auth = None
+        # TODO: Support credential helpers and other auth methods
+
+        # Create LFS client and store
+        client = LFSClient(lfs_url, auth)
+        store = LFSStore.from_repo(r)
+
+        # Find all LFS pointers in the refs
+        pointers_to_fetch = []
+
+        if refs is None:
+            # Get all refs
+            refs = list(r.refs.keys())
+
+        for ref in refs:
+            if isinstance(ref, str):
+                ref = ref.encode()
+            try:
+                commit = r[r.refs[ref]]
+            except KeyError:
+                continue
+
+            # Walk the commit tree
+            for entry in r.object_store.iter_tree_contents(commit.tree):
+                try:
+                    obj = r.object_store[entry.sha]
+                    if obj.type_name == b"blob":
+                        pointer = LFSPointer.from_bytes(obj.data)
+                        if pointer and pointer.is_valid_oid():
+                            # Check if we already have it
+                            try:
+                                store.open_object(pointer.oid)
+                            except KeyError:
+                                pointers_to_fetch.append((pointer.oid, pointer.size))
+                except KeyError:
+                    pass
+
+        # Fetch missing objects
+        fetched = 0
+        for oid, size in pointers_to_fetch:
+            try:
+                content = client.download(oid, size)
+                store.write_object([content])
+                fetched += 1
+            except Exception as e:
+                # Log error but continue
+                print(f"Failed to fetch {oid}: {e}")
+
+        return fetched
+
+
+def lfs_pull(repo=".", remote="origin"):
+    """Pull LFS objects for current checkout.
+
+    Args:
+      repo: Path to repository
+      remote: Remote name (default: origin)
+
+    Returns:
+      Number of objects fetched
+    """
+    from .lfs import LFSPointer, LFSStore
+
+    with open_repo_closing(repo) as r:
+        # First do a fetch for HEAD
+        fetched = lfs_fetch(repo, remote, [b"HEAD"])
+
+        # Then checkout LFS files in working directory
+        store = LFSStore.from_repo(r)
+        index = r.open_index()
+
+        for path, entry in index.items():
+            full_path = os.path.join(r.path, path.decode())
+            if os.path.exists(full_path):
+                with open(full_path, "rb") as f:
+                    content = f.read()
+
+                pointer = LFSPointer.from_bytes(content)
+                if pointer and pointer.is_valid_oid():
+                    try:
+                        # Replace pointer with actual content
+                        with store.open_object(pointer.oid) as lfs_file:
+                            lfs_content = lfs_file.read()
+                        with open(full_path, "wb") as f:
+                            f.write(lfs_content)
+                    except KeyError:
+                        # Object not available
+                        pass
+
+        return fetched
+
+
+def lfs_push(repo=".", remote="origin", refs=None):
+    """Push LFS objects to remote.
+
+    Args:
+      repo: Path to repository
+      remote: Remote name (default: origin)
+      refs: Specific refs to push LFS objects for (default: current branch)
+
+    Returns:
+      Number of objects pushed
+    """
+    from .lfs import LFSClient, LFSPointer, LFSStore
+
+    with open_repo_closing(repo) as r:
+        # Get LFS server URL from config
+        config = r.get_config()
+        lfs_url = config.get((b"lfs",), b"url")
+        if not lfs_url:
+            # Try remote URL
+            remote_url = config.get((b"remote", remote.encode()), b"url")
+            if remote_url:
+                # Append /info/lfs to remote URL
+                remote_url = remote_url.decode()
+                if remote_url.endswith(".git"):
+                    remote_url = remote_url[:-4]
+                lfs_url = f"{remote_url}/info/lfs"
+            else:
+                raise ValueError(f"No LFS URL configured for remote {remote}")
+        else:
+            lfs_url = lfs_url.decode()
+
+        # Get authentication
+        auth = None
+        # TODO: Support credential helpers and other auth methods
+
+        # Create LFS client and store
+        client = LFSClient(lfs_url, auth)
+        store = LFSStore.from_repo(r)
+
+        # Find all LFS objects to push
+        if refs is None:
+            # Push current branch
+            refs = [r.refs.read_ref(b"HEAD")]
+
+        objects_to_push = set()
+
+        for ref in refs:
+            if isinstance(ref, str):
+                ref = ref.encode()
+            try:
+                if ref.startswith(b"refs/"):
+                    commit = r[r.refs[ref]]
+                else:
+                    commit = r[ref]
+            except KeyError:
+                continue
+
+            # Walk the commit tree
+            for entry in r.object_store.iter_tree_contents(commit.tree):
+                try:
+                    obj = r.object_store[entry.sha]
+                    if obj.type_name == b"blob":
+                        pointer = LFSPointer.from_bytes(obj.data)
+                        if pointer and pointer.is_valid_oid():
+                            objects_to_push.add((pointer.oid, pointer.size))
+                except KeyError:
+                    pass
+
+        # Push objects
+        pushed = 0
+        for oid, size in objects_to_push:
+            try:
+                with store.open_object(oid) as f:
+                    content = f.read()
+                client.upload(oid, size, content)
+                pushed += 1
+            except KeyError:
+                # Object not in local store
+                print(f"Warning: LFS object {oid} not found locally")
+            except Exception as e:
+                # Log error but continue
+                print(f"Failed to push {oid}: {e}")
+
+        return pushed
+
+
+def lfs_status(repo="."):
+    """Show status of LFS files.
+
+    Args:
+      repo: Path to repository
+
+    Returns:
+      Dict with status information
+    """
+    from .lfs import LFSPointer, LFSStore
+
+    with open_repo_closing(repo) as r:
+        store = LFSStore.from_repo(r)
+        index = r.open_index()
+
+        status = {
+            "tracked": [],
+            "not_staged": [],
+            "not_committed": [],
+            "not_pushed": [],
+            "missing": [],
+        }
+
+        # Check working directory files
+        for path, entry in index.items():
+            path_str = path.decode()
+            full_path = os.path.join(r.path, path_str)
+
+            if os.path.exists(full_path):
+                with open(full_path, "rb") as f:
+                    content = f.read()
+
+                pointer = LFSPointer.from_bytes(content)
+                if pointer and pointer.is_valid_oid():
+                    status["tracked"].append(path_str)
+
+                    # Check if object exists locally
+                    try:
+                        store.open_object(pointer.oid)
+                    except KeyError:
+                        status["missing"].append(path_str)
+
+                    # Check if file has been modified
+                    try:
+                        staged_obj = r.object_store[entry.binsha]
+                        staged_pointer = LFSPointer.from_bytes(staged_obj.data)
+                        if staged_pointer and staged_pointer.oid != pointer.oid:
+                            status["not_staged"].append(path_str)
+                    except KeyError:
+                        pass
+
+        # TODO: Check for not committed and not pushed files
+
+        return status

+ 2 - 0
tests/__init__.py

@@ -155,6 +155,8 @@ def self_test_suite():
         "patch",
         "porcelain",
         "porcelain_cherry_pick",
+        "porcelain_filters",
+        "porcelain_lfs",
         "porcelain_merge",
         "porcelain_notes",
         "protocol",

+ 442 - 0
tests/test_porcelain_filters.py

@@ -0,0 +1,442 @@
+# test_porcelain_filters.py -- Tests for porcelain filter integration
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for porcelain filter integration."""
+
+import os
+import shutil
+import tempfile
+
+from dulwich import porcelain
+from dulwich.repo import Repo
+
+from . import TestCase
+
+
+class PorcelainFilterTests(TestCase):
+    """Test filter integration in porcelain commands."""
+
+    def setUp(self) -> None:
+        super().setUp()
+        self.test_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.test_dir)
+        self.repo = Repo.init(self.test_dir)
+
+    def test_add_with_autocrlf(self) -> None:
+        """Test adding files with autocrlf enabled."""
+        # Configure autocrlf
+        config = self.repo.get_config()
+        config.set((b"core",), b"autocrlf", b"true")
+        config.write_to_path()
+
+        # Create a file with CRLF line endings
+        test_file = os.path.join(self.test_dir, "test.txt")
+        with open(test_file, "wb") as f:
+            f.write(b"line1\r\nline2\r\nline3\r\n")
+
+        # Add the file
+        porcelain.add(self.repo, paths=["test.txt"])
+
+        # Check that the blob in the index has LF line endings
+        index = self.repo.open_index()
+        entry = index[b"test.txt"]
+        blob = self.repo.object_store[entry.sha]
+        self.assertEqual(blob.data, b"line1\nline2\nline3\n")
+
+    def test_checkout_with_autocrlf(self) -> None:
+        """Test checkout with autocrlf enabled."""
+        # First, add a file with LF line endings to the repo
+        test_file = os.path.join(self.test_dir, "test.txt")
+        with open(test_file, "wb") as f:
+            f.write(b"line1\nline2\nline3\n")
+
+        porcelain.add(self.repo, paths=["test.txt"])
+        porcelain.commit(self.repo, message=b"Add test file")
+
+        # Remove the file
+        os.remove(test_file)
+
+        # Configure autocrlf
+        config = self.repo.get_config()
+        config.set((b"core",), b"autocrlf", b"true")
+        config.write_to_path()
+
+        # Checkout the file
+        porcelain.checkout(self.repo, paths=["test.txt"])
+
+        # On Windows or with autocrlf=true, file should have CRLF line endings
+        with open(test_file, "rb") as f:
+            content = f.read()
+            # The checkout should apply the smudge filter
+            self.assertEqual(content, b"line1\r\nline2\r\nline3\r\n")
+
+    def test_status_with_filters(self) -> None:
+        """Test status command with filters applied."""
+        # Configure autocrlf
+        config = self.repo.get_config()
+        config.set((b"core",), b"autocrlf", b"input")
+        config.write_to_path()
+
+        # Create a file with CRLF line endings
+        test_file = os.path.join(self.test_dir, "test.txt")
+        with open(test_file, "wb") as f:
+            f.write(b"line1\r\nline2\r\n")
+
+        # Add and commit with normalized line endings
+        porcelain.add(self.repo, paths=["test.txt"])
+        porcelain.commit(self.repo, message=b"Initial commit")
+
+        # Modify the file with CRLF line endings
+        with open(test_file, "wb") as f:
+            f.write(b"line1\r\nline2\r\nline3\r\n")
+
+        # Status should detect the change after normalizing
+        results = porcelain.status(self.repo)
+        self.assertIn(b"test.txt", results.unstaged)
+
+    def test_diff_with_filters(self) -> None:
+        """Test diff command with filters applied."""
+        # Configure autocrlf
+        config = self.repo.get_config()
+        config.set((b"core",), b"autocrlf", b"true")
+        config.write_to_path()
+
+        # Create and commit a file
+        test_file = os.path.join(self.test_dir, "test.txt")
+        with open(test_file, "wb") as f:
+            f.write(b"line1\r\nline2\r\n")
+
+        porcelain.add(self.repo, paths=["test.txt"])
+        porcelain.commit(self.repo, message=b"Initial commit")
+
+        # Modify the file
+        with open(test_file, "wb") as f:
+            f.write(b"line1\r\nmodified\r\nline3\r\n")
+
+        # Get diff - should normalize line endings for comparison
+        diff_output = b"".join(porcelain.diff(self.repo))
+        self.assertIn(b"-line2", diff_output)
+        self.assertIn(b"+modified", diff_output)
+        self.assertIn(b"+line3", diff_output)
+
+    def test_add_with_gitattributes(self) -> None:
+        """Test adding files with gitattributes filters."""
+        # Create .gitattributes with text attribute
+        gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.txt text\n")
+            f.write(b"*.bin -text\n")
+
+        # Add .gitattributes
+        porcelain.add(self.repo, paths=[".gitattributes"])
+
+        # Create text file with CRLF
+        text_file = os.path.join(self.test_dir, "test.txt")
+        with open(text_file, "wb") as f:
+            f.write(b"text\r\nfile\r\n")
+
+        # Create binary file with CRLF (should not be converted)
+        bin_file = os.path.join(self.test_dir, "test.bin")
+        with open(bin_file, "wb") as f:
+            f.write(b"binary\r\nfile\r\n")
+
+        # Add both files
+        porcelain.add(self.repo, paths=["test.txt", "test.bin"])
+
+        # Check text file was normalized
+        index = self.repo.open_index()
+        text_entry = index[b"test.txt"]
+        text_blob = self.repo.object_store[text_entry.sha]
+        self.assertEqual(text_blob.data, b"text\nfile\n")
+
+        # Check binary file was not normalized
+        bin_entry = index[b"test.bin"]
+        bin_blob = self.repo.object_store[bin_entry.sha]
+        self.assertEqual(bin_blob.data, b"binary\r\nfile\r\n")
+
+    def test_clone_with_filters(self) -> None:
+        """Test cloning a repository with filters."""
+        # Create a source repository
+        source_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, source_dir)
+        source_repo = Repo.init(source_dir)
+
+        # Add a file with LF endings
+        test_file = os.path.join(source_dir, "test.txt")
+        with open(test_file, "wb") as f:
+            f.write(b"line1\nline2\n")
+
+        porcelain.add(source_repo, paths=["test.txt"])
+        porcelain.commit(source_repo, message=b"Initial commit")
+
+        # Clone the repository without checkout
+        target_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, target_dir)
+
+        # Clone without checkout first
+        target_repo = porcelain.clone(source_dir, target_dir, checkout=False)
+
+        # Configure autocrlf in target repo
+        target_config = target_repo.get_config()
+        target_config.set((b"core",), b"autocrlf", b"true")
+        target_config.write_to_path()
+
+        # Now checkout the files with autocrlf enabled
+        target_repo.reset_index()
+
+        # Check that the working tree file has CRLF endings
+        target_file = os.path.join(target_dir, "test.txt")
+        with open(target_file, "rb") as f:
+            content = f.read()
+            # The checkout should apply the smudge filter
+            self.assertIn(b"\r\n", content)
+
+    def test_commit_with_clean_filter(self) -> None:
+        """Test committing with a clean filter."""
+        # Set up a custom filter in git config
+        config = self.repo.get_config()
+        config.set((b"filter", b"testfilter"), b"clean", b"sed 's/SECRET/REDACTED/g'")
+        config.write_to_path()
+
+        # Create .gitattributes to use the filter
+        gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.secret filter=testfilter\n")
+
+        porcelain.add(self.repo, paths=[".gitattributes"])
+        porcelain.commit(self.repo, message=b"Add gitattributes")
+
+        # Create a file with sensitive content
+        secret_file = os.path.join(self.test_dir, "config.secret")
+        with open(secret_file, "wb") as f:
+            f.write(b"password=SECRET123\n")
+
+        # Add the file
+        porcelain.add(self.repo, paths=["config.secret"])
+
+        # The committed blob should have filtered content
+        # (Note: actual filter execution requires process filter support)
+
+    def test_ls_files_with_filters(self) -> None:
+        """Test ls-files respects filter settings."""
+        # Configure autocrlf
+        config = self.repo.get_config()
+        config.set((b"core",), b"autocrlf", b"true")
+        config.write_to_path()
+
+        # Create files with different line endings
+        file1 = os.path.join(self.test_dir, "unix.txt")
+        with open(file1, "wb") as f:
+            f.write(b"unix\nfile\n")
+
+        file2 = os.path.join(self.test_dir, "windows.txt")
+        with open(file2, "wb") as f:
+            f.write(b"windows\r\nfile\r\n")
+
+        # Add files
+        porcelain.add(self.repo, paths=["unix.txt", "windows.txt"])
+
+        # List files
+        files = list(porcelain.ls_files(self.repo))
+        self.assertIn(b"unix.txt", files)
+        self.assertIn(b"windows.txt", files)
+
+        # Both files should be normalized in the index
+        index = self.repo.open_index()
+        for filename in [b"unix.txt", b"windows.txt"]:
+            entry = index[filename]
+            blob = self.repo.object_store[entry.sha]
+            # Both should have LF line endings in the repository
+            self.assertNotIn(b"\r\n", blob.data)
+
+
+class PorcelainLFSIntegrationTests(TestCase):
+    """Test LFS integration in porcelain commands."""
+
+    def setUp(self) -> None:
+        super().setUp()
+        self.test_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.test_dir)
+        self.repo = Repo.init(self.test_dir)
+
+        # Set up LFS
+        lfs_dir = os.path.join(self.test_dir, ".git", "lfs")
+        os.makedirs(lfs_dir, exist_ok=True)
+
+    def test_add_large_file_with_lfs(self) -> None:
+        """Test adding large files with LFS filter."""
+        # Configure LFS filter
+        config = self.repo.get_config()
+        config.set((b"filter", b"lfs"), b"clean", b"git-lfs clean -- %f")
+        config.set((b"filter", b"lfs"), b"smudge", b"git-lfs smudge -- %f")
+        config.set((b"filter", b"lfs"), b"process", b"git-lfs filter-process")
+        config.set((b"filter", b"lfs"), b"required", b"true")
+        config.write_to_path()
+
+        # Create .gitattributes for LFS
+        gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.bin filter=lfs diff=lfs merge=lfs -text\n")
+
+        porcelain.add(self.repo, paths=[".gitattributes"])
+        porcelain.commit(self.repo, message=b"Add LFS attributes")
+
+        # Create a large binary file
+        large_file = os.path.join(self.test_dir, "large.bin")
+        content = b"X" * (1024 * 1024)  # 1MB file
+        with open(large_file, "wb") as f:
+            f.write(content)
+
+        # Add the large file
+        # Note: actual LFS handling requires git-lfs to be installed
+        # This test verifies the filter infrastructure is in place
+        porcelain.add(self.repo, paths=["large.bin"])
+
+        # Check that something was added to the index
+        index = self.repo.open_index()
+        self.assertIn(b"large.bin", index)
+
+    def test_status_with_lfs_files(self) -> None:
+        """Test status command with LFS files."""
+        # Set up LFS attributes
+        gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.bin filter=lfs diff=lfs merge=lfs -text\n")
+
+        porcelain.add(self.repo, paths=[".gitattributes"])
+        porcelain.commit(self.repo, message=b"Add LFS attributes")
+
+        # Create an LFS pointer file manually
+        from dulwich.lfs import LFSPointer
+
+        pointer = LFSPointer(
+            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 1024
+        )
+        lfs_file = os.path.join(self.test_dir, "data.bin")
+        with open(lfs_file, "wb") as f:
+            f.write(pointer.to_bytes())
+
+        # Add and commit the pointer
+        porcelain.add(self.repo, paths=["data.bin"])
+        porcelain.commit(self.repo, message=b"Add LFS file")
+
+        # Modify the pointer file
+        with open(lfs_file, "ab") as f:
+            f.write(b"modified\n")
+
+        # Status should detect the change
+        results = porcelain.status(self.repo)
+        self.assertIn(b"data.bin", results.unstaged)
+
+
+class FilterEdgeCaseTests(TestCase):
+    """Test edge cases in filter handling."""
+
+    def setUp(self) -> None:
+        super().setUp()
+        self.test_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.test_dir)
+        self.repo = Repo.init(self.test_dir)
+
+    def test_mixed_line_endings(self) -> None:
+        """Test handling files with mixed line endings."""
+        config = self.repo.get_config()
+        config.set((b"core",), b"autocrlf", b"true")
+        config.write_to_path()
+
+        # Create file with mixed line endings
+        mixed_file = os.path.join(self.test_dir, "mixed.txt")
+        with open(mixed_file, "wb") as f:
+            f.write(b"line1\r\nline2\nline3\r\nline4")
+
+        porcelain.add(self.repo, paths=["mixed.txt"])
+
+        # Check normalization
+        index = self.repo.open_index()
+        entry = index[b"mixed.txt"]
+        blob = self.repo.object_store[entry.sha]
+        # Should normalize all to LF
+        self.assertEqual(blob.data, b"line1\nline2\nline3\nline4")
+
+    def test_binary_detection(self) -> None:
+        """Test binary file detection in filters."""
+        config = self.repo.get_config()
+        config.set((b"core",), b"autocrlf", b"true")
+        config.write_to_path()
+
+        # Create a file with binary content
+        binary_file = os.path.join(self.test_dir, "binary.dat")
+        with open(binary_file, "wb") as f:
+            f.write(b"\x00\x01\x02\r\n\x03\x04\r\n")
+
+        porcelain.add(self.repo, paths=["binary.dat"])
+
+        # Binary files should not be converted
+        index = self.repo.open_index()
+        entry = index[b"binary.dat"]
+        blob = self.repo.object_store[entry.sha]
+        self.assertEqual(blob.data, b"\x00\x01\x02\r\n\x03\x04\r\n")
+
+    def test_empty_file_handling(self) -> None:
+        """Test filter handling of empty files."""
+        config = self.repo.get_config()
+        config.set((b"core",), b"autocrlf", b"true")
+        config.write_to_path()
+
+        # Create empty file
+        empty_file = os.path.join(self.test_dir, "empty.txt")
+        with open(empty_file, "wb") as f:
+            f.write(b"")
+
+        porcelain.add(self.repo, paths=["empty.txt"])
+
+        # Empty files should pass through unchanged
+        index = self.repo.open_index()
+        entry = index[b"empty.txt"]
+        blob = self.repo.object_store[entry.sha]
+        self.assertEqual(blob.data, b"")
+
+    def test_gitattributes_precedence(self) -> None:
+        """Test that gitattributes takes precedence over config."""
+        # Set autocrlf=false in config
+        config = self.repo.get_config()
+        config.set((b"core",), b"autocrlf", b"false")
+        config.write_to_path()
+
+        # But force text conversion via gitattributes
+        gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
+        with open(gitattributes_path, "wb") as f:
+            f.write(b"*.txt text\n")
+
+        porcelain.add(self.repo, paths=[".gitattributes"])
+
+        # Create file with CRLF
+        text_file = os.path.join(self.test_dir, "test.txt")
+        with open(text_file, "wb") as f:
+            f.write(b"line1\r\nline2\r\n")
+
+        porcelain.add(self.repo, paths=["test.txt"])
+
+        # Should be normalized despite autocrlf=false
+        index = self.repo.open_index()
+        entry = index[b"test.txt"]
+        blob = self.repo.object_store[entry.sha]
+        self.assertEqual(blob.data, b"line1\nline2\n")

+ 244 - 0
tests/test_porcelain_lfs.py

@@ -0,0 +1,244 @@
+# test_porcelain_lfs.py -- Tests for LFS porcelain functions
+# Copyright (C) 2024 Jelmer Vernooij
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Tests for LFS porcelain functions."""
+
+import os
+import tempfile
+import unittest
+
+from dulwich import porcelain
+from dulwich.lfs import LFSPointer, LFSStore
+from dulwich.repo import Repo
+from tests import TestCase
+
+
+class LFSPorcelainTestCase(TestCase):
+    """Test case for LFS porcelain functions."""
+
+    def setUp(self):
+        super().setUp()
+        self.test_dir = tempfile.mkdtemp()
+        self.addCleanup(self._cleanup_test_dir)
+        self.repo = Repo.init(self.test_dir)
+        self.addCleanup(self.repo.close)
+
+    def _cleanup_test_dir(self):
+        """Clean up test directory recursively."""
+        import shutil
+
+        shutil.rmtree(self.test_dir, ignore_errors=True)
+
+    def test_lfs_init(self):
+        """Test LFS initialization."""
+        porcelain.lfs_init(self.repo)
+
+        # Check that LFS store was created
+        lfs_dir = os.path.join(self.repo.controldir(), "lfs")
+        self.assertTrue(os.path.exists(lfs_dir))
+        self.assertTrue(os.path.exists(os.path.join(lfs_dir, "objects")))
+        self.assertTrue(os.path.exists(os.path.join(lfs_dir, "tmp")))
+
+        # Check that config was set
+        config = self.repo.get_config()
+        self.assertEqual(
+            config.get((b"filter", b"lfs"), b"process"), b"git-lfs filter-process"
+        )
+        self.assertEqual(config.get((b"filter", b"lfs"), b"required"), b"true")
+
+    def test_lfs_track(self):
+        """Test tracking patterns with LFS."""
+        # Track some patterns
+        patterns = ["*.bin", "*.pdf"]
+        tracked = porcelain.lfs_track(self.repo, patterns)
+
+        self.assertEqual(set(tracked), set(patterns))
+
+        # Check .gitattributes was created
+        gitattributes_path = os.path.join(self.repo.path, ".gitattributes")
+        self.assertTrue(os.path.exists(gitattributes_path))
+
+        # Read and verify content
+        with open(gitattributes_path, "rb") as f:
+            content = f.read()
+
+        self.assertIn(b"*.bin diff=lfs filter=lfs merge=lfs -text", content)
+        self.assertIn(b"*.pdf diff=lfs filter=lfs merge=lfs -text", content)
+
+        # Test listing tracked patterns
+        tracked = porcelain.lfs_track(self.repo)
+        self.assertEqual(set(tracked), set(patterns))
+
+    def test_lfs_untrack(self):
+        """Test untracking patterns from LFS."""
+        # First track some patterns
+        patterns = ["*.bin", "*.pdf", "*.zip"]
+        porcelain.lfs_track(self.repo, patterns)
+
+        # Untrack one pattern
+        remaining = porcelain.lfs_untrack(self.repo, ["*.pdf"])
+        self.assertEqual(set(remaining), {"*.bin", "*.zip"})
+
+        # Verify .gitattributes
+        with open(os.path.join(self.repo.path, ".gitattributes"), "rb") as f:
+            content = f.read()
+
+        self.assertIn(b"*.bin diff=lfs filter=lfs merge=lfs -text", content)
+        self.assertNotIn(b"*.pdf diff=lfs filter=lfs merge=lfs -text", content)
+        self.assertIn(b"*.zip diff=lfs filter=lfs merge=lfs -text", content)
+
+    def test_lfs_clean(self):
+        """Test cleaning a file to LFS pointer."""
+        # Initialize LFS
+        porcelain.lfs_init(self.repo)
+
+        # Create a test file
+        test_content = b"This is test content for LFS"
+        test_file = os.path.join(self.repo.path, "test.bin")
+        with open(test_file, "wb") as f:
+            f.write(test_content)
+
+        # Clean the file
+        pointer_content = porcelain.lfs_clean(self.repo, "test.bin")
+
+        # Verify it's a valid LFS pointer
+        pointer = LFSPointer.from_bytes(pointer_content)
+        self.assertIsNotNone(pointer)
+        self.assertEqual(pointer.size, len(test_content))
+
+        # Verify the content was stored in LFS
+        lfs_store = LFSStore.from_repo(self.repo)
+        with lfs_store.open_object(pointer.oid) as f:
+            stored_content = f.read()
+        self.assertEqual(stored_content, test_content)
+
+    def test_lfs_smudge(self):
+        """Test smudging an LFS pointer to content."""
+        # Initialize LFS
+        porcelain.lfs_init(self.repo)
+
+        # Create test content and store it
+        test_content = b"This is test content for smudging"
+        lfs_store = LFSStore.from_repo(self.repo)
+        oid = lfs_store.write_object([test_content])
+
+        # Create LFS pointer
+        pointer = LFSPointer(oid, len(test_content))
+        pointer_content = pointer.to_bytes()
+
+        # Smudge the pointer
+        smudged_content = porcelain.lfs_smudge(self.repo, pointer_content)
+
+        self.assertEqual(smudged_content, test_content)
+
+    def test_lfs_ls_files(self):
+        """Test listing LFS files."""
+        # Initialize repo with some LFS files
+        porcelain.lfs_init(self.repo)
+
+        # Create a test file and convert to LFS
+        test_content = b"Large file content"
+        test_file = os.path.join(self.repo.path, "large.bin")
+        with open(test_file, "wb") as f:
+            f.write(test_content)
+
+        # Clean to LFS pointer
+        pointer_content = porcelain.lfs_clean(self.repo, "large.bin")
+        with open(test_file, "wb") as f:
+            f.write(pointer_content)
+
+        # Add and commit
+        porcelain.add(self.repo, paths=["large.bin"])
+        porcelain.commit(self.repo, message=b"Add LFS file")
+
+        # List LFS files
+        lfs_files = porcelain.lfs_ls_files(self.repo)
+
+        self.assertEqual(len(lfs_files), 1)
+        path, oid, size = lfs_files[0]
+        self.assertEqual(path, "large.bin")
+        self.assertEqual(size, len(test_content))
+
+    def test_lfs_migrate(self):
+        """Test migrating files to LFS."""
+        # Create some files
+        files = {
+            "small.txt": b"Small file",
+            "large1.bin": b"X" * 1000,
+            "large2.dat": b"Y" * 2000,
+            "exclude.bin": b"Z" * 1500,
+        }
+
+        for filename, content in files.items():
+            path = os.path.join(self.repo.path, filename)
+            with open(path, "wb") as f:
+                f.write(content)
+
+        # Add files to index
+        porcelain.add(self.repo, paths=list(files.keys()))
+
+        # Migrate with patterns
+        count = porcelain.lfs_migrate(
+            self.repo, include=["*.bin", "*.dat"], exclude=["exclude.*"]
+        )
+
+        self.assertEqual(count, 2)  # large1.bin and large2.dat
+
+        # Verify files were converted to LFS pointers
+        for filename in ["large1.bin", "large2.dat"]:
+            path = os.path.join(self.repo.path, filename)
+            with open(path, "rb") as f:
+                content = f.read()
+            pointer = LFSPointer.from_bytes(content)
+            self.assertIsNotNone(pointer)
+
+    def test_lfs_pointer_check(self):
+        """Test checking if files are LFS pointers."""
+        # Initialize LFS
+        porcelain.lfs_init(self.repo)
+
+        # Create an LFS pointer file
+        test_content = b"LFS content"
+        lfs_file = os.path.join(self.repo.path, "lfs.bin")
+        # First create the file
+        with open(lfs_file, "wb") as f:
+            f.write(test_content)
+        pointer_content = porcelain.lfs_clean(self.repo, "lfs.bin")
+        with open(lfs_file, "wb") as f:
+            f.write(pointer_content)
+
+        # Create a regular file
+        regular_file = os.path.join(self.repo.path, "regular.txt")
+        with open(regular_file, "wb") as f:
+            f.write(b"Regular content")
+
+        # Check both files
+        results = porcelain.lfs_pointer_check(
+            self.repo, paths=["lfs.bin", "regular.txt", "nonexistent.txt"]
+        )
+
+        self.assertIsNotNone(results["lfs.bin"])
+        self.assertIsNone(results["regular.txt"])
+        self.assertIsNone(results["nonexistent.txt"])
+
+
+if __name__ == "__main__":
+    unittest.main()