Jelmer Vernooij 1 месяц назад
Родитель
Сommit
d4c61e3b66
3 измененных файлов с 734 добавлено и 670 удалено
  1. 14 669
      dulwich/porcelain/__init__.py
  2. 717 0
      dulwich/porcelain/lfs.py
  3. 3 1
      tests/__init__.py

+ 14 - 669
dulwich/porcelain/__init__.py

@@ -338,7 +338,6 @@ from ..index import (
     build_file_from_blob,
     build_index_from_tree,
     get_unstaged_changes,
-    index_entry_from_stat,
     symlink,
     update_working_tree,
     validate_path_element_default,
@@ -405,6 +404,20 @@ from ..sparse_patterns import (
     determine_included_paths,
 )
 from ..trailers import add_trailer_to_message, format_trailers, parse_trailers
+from .lfs import (
+    lfs_clean,
+    lfs_fetch,
+    lfs_init,
+    lfs_ls_files,
+    lfs_migrate,
+    lfs_pointer_check,
+    lfs_pull,
+    lfs_push,
+    lfs_smudge,
+    lfs_status,
+    lfs_track,
+    lfs_untrack,
+)
 
 # Module level tuple definition for status output
 GitStatus = namedtuple("GitStatus", "staged unstaged untracked")
@@ -8196,674 +8209,6 @@ def reflog_delete(
             drop_reflog_entry(f, index, rewrite=rewrite)
 
 
-def lfs_track(
-    repo: str | os.PathLike[str] | Repo = ".",
-    patterns: Sequence[str] | None = None,
-) -> list[str]:
-    """Track file patterns with Git LFS.
-
-    Args:
-      repo: Path to repository
-      patterns: List of file patterns to track (e.g., ["*.bin", "*.pdf"])
-                If None, returns current tracked patterns
-
-    Returns:
-      List of tracked patterns
-    """
-    from ..attrs import GitAttributes
-
-    with open_repo_closing(repo) as r:
-        gitattributes_path = os.path.join(r.path, ".gitattributes")
-
-        # Load existing GitAttributes
-        if os.path.exists(gitattributes_path):
-            gitattributes = GitAttributes.from_file(gitattributes_path)
-        else:
-            gitattributes = GitAttributes()
-
-        if patterns is None:
-            # Return current LFS tracked patterns
-            tracked = []
-            for pattern_obj, attrs in gitattributes:
-                if attrs.get(b"filter") == b"lfs":
-                    tracked.append(pattern_obj.pattern.decode())
-            return tracked
-
-        # Add new patterns
-        for pattern in patterns:
-            # Ensure pattern is bytes
-            pattern_bytes = pattern.encode() if isinstance(pattern, str) else pattern
-
-            # Set LFS attributes for the pattern
-            gitattributes.set_attribute(pattern_bytes, b"filter", b"lfs")
-            gitattributes.set_attribute(pattern_bytes, b"diff", b"lfs")
-            gitattributes.set_attribute(pattern_bytes, b"merge", b"lfs")
-            gitattributes.set_attribute(pattern_bytes, b"text", False)
-
-        # Write updated attributes
-        gitattributes.write_to_file(gitattributes_path)
-
-        # Stage the .gitattributes file
-        add(r, [".gitattributes"])
-
-        return lfs_track(r)  # Return updated list
-
-
-def lfs_untrack(
-    repo: str | os.PathLike[str] | Repo = ".",
-    patterns: Sequence[str] | None = None,
-) -> list[str]:
-    """Untrack file patterns from Git LFS.
-
-    Args:
-      repo: Path to repository
-      patterns: List of file patterns to untrack
-
-    Returns:
-      List of remaining tracked patterns
-    """
-    from ..attrs import GitAttributes
-
-    if not patterns:
-        return lfs_track(repo)
-
-    with open_repo_closing(repo) as r:
-        gitattributes_path = os.path.join(r.path, ".gitattributes")
-
-        if not os.path.exists(gitattributes_path):
-            return []
-
-        # Load existing GitAttributes
-        gitattributes = GitAttributes.from_file(gitattributes_path)
-
-        # Remove specified patterns
-        for pattern in patterns:
-            pattern_bytes = pattern.encode() if isinstance(pattern, str) else pattern
-
-            # Check if pattern is tracked by LFS
-            for pattern_obj, attrs in list(gitattributes):
-                if (
-                    pattern_obj.pattern == pattern_bytes
-                    and attrs.get(b"filter") == b"lfs"
-                ):
-                    gitattributes.remove_pattern(pattern_bytes)
-                    break
-
-        # Write updated attributes
-        gitattributes.write_to_file(gitattributes_path)
-
-        # Stage the .gitattributes file
-        add(r, [".gitattributes"])
-
-        return lfs_track(r)  # Return updated list
-
-
-def lfs_init(repo: str | os.PathLike[str] | Repo = ".") -> None:
-    """Initialize Git LFS in a repository.
-
-    Args:
-      repo: Path to repository
-
-    Returns:
-      None
-    """
-    from ..lfs import LFSStore
-
-    with open_repo_closing(repo) as r:
-        # Create LFS store
-        LFSStore.from_repo(r, create=True)
-
-        # Set up Git config for LFS
-        config = r.get_config()
-        config.set((b"filter", b"lfs"), b"process", b"git-lfs filter-process")
-        config.set((b"filter", b"lfs"), b"required", b"true")
-        config.set((b"filter", b"lfs"), b"clean", b"git-lfs clean -- %f")
-        config.set((b"filter", b"lfs"), b"smudge", b"git-lfs smudge -- %f")
-        config.write_to_path()
-
-
-def lfs_clean(
-    repo: str | os.PathLike[str] | Repo = ".",
-    path: str | os.PathLike[str] | None = None,
-) -> bytes:
-    """Clean a file by converting it to an LFS pointer.
-
-    Args:
-      repo: Path to repository
-      path: Path to file to clean (relative to repo root)
-
-    Returns:
-      LFS pointer content as bytes
-    """
-    from ..lfs import LFSFilterDriver, LFSStore
-
-    with open_repo_closing(repo) as r:
-        if path is None:
-            raise ValueError("Path must be specified")
-
-        # Get LFS store
-        lfs_store = LFSStore.from_repo(r)
-        filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
-
-        # Read file content
-        full_path = os.path.join(r.path, path)
-        with open(full_path, "rb") as f:
-            content = f.read()
-
-        # Clean the content (convert to LFS pointer)
-        return filter_driver.clean(content)
-
-
-def lfs_smudge(
-    repo: str | os.PathLike[str] | Repo = ".",
-    pointer_content: bytes | None = None,
-) -> bytes:
-    """Smudge an LFS pointer by retrieving the actual content.
-
-    Args:
-      repo: Path to repository
-      pointer_content: LFS pointer content as bytes
-
-    Returns:
-      Actual file content as bytes
-    """
-    from ..lfs import LFSFilterDriver, LFSStore
-
-    with open_repo_closing(repo) as r:
-        if pointer_content is None:
-            raise ValueError("Pointer content must be specified")
-
-        # Get LFS store
-        lfs_store = LFSStore.from_repo(r)
-        filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
-
-        # Smudge the pointer (retrieve actual content)
-        return filter_driver.smudge(pointer_content)
-
-
-def lfs_ls_files(
-    repo: str | os.PathLike[str] | Repo = ".",
-    ref: str | bytes | None = None,
-) -> list[tuple[bytes, str, int]]:
-    """List files tracked by Git LFS.
-
-    Args:
-      repo: Path to repository
-      ref: Git ref to check (defaults to HEAD)
-
-    Returns:
-      List of (path, oid, size) tuples for LFS files
-    """
-    from ..lfs import LFSPointer
-    from ..object_store import iter_tree_contents
-
-    with open_repo_closing(repo) as r:
-        if ref is None:
-            ref = b"HEAD"
-        elif isinstance(ref, str):
-            ref = ref.encode()
-
-        # Get the commit and tree
-        try:
-            commit = r[ref]
-            assert isinstance(commit, Commit)
-            tree = r[commit.tree]
-            assert isinstance(tree, Tree)
-        except KeyError:
-            return []
-
-        lfs_files = []
-
-        # Walk the tree
-        for path, mode, sha in iter_tree_contents(r.object_store, tree.id):
-            assert path is not None
-            assert mode is not None
-            assert sha is not None
-            if not stat.S_ISREG(mode):
-                continue
-
-            # Check if it's an LFS pointer
-            obj = r.object_store[sha]
-            if not isinstance(obj, Blob):
-                raise AssertionError(f"Expected Blob object, got {type(obj).__name__}")
-            pointer = LFSPointer.from_bytes(obj.data)
-            if pointer is not None:
-                lfs_files.append((path, pointer.oid, pointer.size))
-
-        return lfs_files
-
-
-def lfs_migrate(
-    repo: str | os.PathLike[str] | Repo = ".",
-    include: list[str] | None = None,
-    exclude: list[str] | None = None,
-    everything: bool = False,
-) -> int:
-    """Migrate files to Git LFS.
-
-    Args:
-      repo: Path to repository
-      include: Patterns of files to include
-      exclude: Patterns of files to exclude
-      everything: Migrate all files above a certain size
-
-    Returns:
-      Number of migrated files
-    """
-    from ..lfs import LFSFilterDriver, LFSStore
-
-    with open_repo_closing(repo) as r:
-        # Initialize LFS if needed
-        lfs_store = LFSStore.from_repo(r, create=True)
-        filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
-
-        # Get current index
-        index = r.open_index()
-
-        migrated = 0
-
-        # Determine files to migrate
-        files_to_migrate = []
-
-        if everything:
-            # Migrate all files above 100MB
-            for path, entry in index.items():
-                full_path = os.path.join(r.path, path.decode())
-                if os.path.exists(full_path):
-                    size = os.path.getsize(full_path)
-                    if size > 100 * 1024 * 1024:  # 100MB
-                        files_to_migrate.append(path.decode())
-        else:
-            # Use include/exclude patterns
-            for path, entry in index.items():
-                path_str = path.decode()
-
-                # Check include patterns
-                if include:
-                    matched = any(
-                        fnmatch.fnmatch(path_str, pattern) for pattern in include
-                    )
-                    if not matched:
-                        continue
-
-                # Check exclude patterns
-                if exclude:
-                    excluded = any(
-                        fnmatch.fnmatch(path_str, pattern) for pattern in exclude
-                    )
-                    if excluded:
-                        continue
-
-                files_to_migrate.append(path_str)
-
-        # Migrate files
-        for path_str in files_to_migrate:
-            full_path = os.path.join(r.path, path_str)
-            if not os.path.exists(full_path):
-                continue
-
-            # Read file content
-            with open(full_path, "rb") as f:
-                content = f.read()
-
-            # Convert to LFS pointer
-            pointer_content = filter_driver.clean(content)
-
-            # Write pointer back to file
-            with open(full_path, "wb") as f:
-                f.write(pointer_content)
-
-            # Create blob for pointer content and update index
-            blob = Blob()
-            blob.data = pointer_content
-            r.object_store.add_object(blob)
-
-            st = os.stat(full_path)
-            index_entry = index_entry_from_stat(st, blob.id, 0)
-            path_bytes = path_str.encode() if isinstance(path_str, str) else path_str
-            index[path_bytes] = index_entry
-
-            migrated += 1
-
-        # Write updated index
-        index.write()
-
-        # Track patterns if include was specified
-        if include:
-            lfs_track(r, include)
-
-        return migrated
-
-
-def lfs_pointer_check(
-    repo: str | os.PathLike[str] | Repo = ".",
-    paths: Sequence[str] | None = None,
-) -> dict[str, Any | None]:
-    """Check if files are valid LFS pointers.
-
-    Args:
-      repo: Path to repository
-      paths: List of file paths to check (if None, check all files)
-
-    Returns:
-      Dict mapping paths to LFSPointer objects (or None if not a pointer)
-    """
-    from ..lfs import LFSPointer
-
-    with open_repo_closing(repo) as r:
-        results = {}
-
-        if paths is None:
-            # Check all files in index
-            index = r.open_index()
-            paths = [path.decode() for path in index]
-
-        for path in paths:
-            full_path = os.path.join(r.path, path)
-            if os.path.exists(full_path):
-                try:
-                    with open(full_path, "rb") as f:
-                        content = f.read()
-                    pointer = LFSPointer.from_bytes(content)
-                    results[path] = pointer
-                except OSError:
-                    results[path] = None
-            else:
-                results[path] = None
-
-        return results
-
-
-def lfs_fetch(
-    repo: str | os.PathLike[str] | Repo = ".",
-    remote: str = "origin",
-    refs: list[str | bytes] | None = None,
-) -> int:
-    """Fetch LFS objects from remote.
-
-    Args:
-      repo: Path to repository
-      remote: Remote name (default: origin)
-      refs: Specific refs to fetch LFS objects for (default: all refs)
-
-    Returns:
-      Number of objects fetched
-    """
-    from ..lfs import LFSClient, LFSPointer, LFSStore
-
-    with open_repo_closing(repo) as r:
-        # Get LFS server URL from config
-        config = r.get_config()
-        lfs_url_bytes = config.get((b"lfs",), b"url")
-        if not lfs_url_bytes:
-            # Try remote URL
-            remote_url = config.get((b"remote", remote.encode()), b"url")
-            if remote_url:
-                # Append /info/lfs to remote URL
-                remote_url_str = remote_url.decode()
-                if remote_url_str.endswith(".git"):
-                    remote_url_str = remote_url_str[:-4]
-                lfs_url = f"{remote_url_str}/info/lfs"
-            else:
-                raise ValueError(f"No LFS URL configured for remote {remote}")
-        else:
-            lfs_url = lfs_url_bytes.decode()
-
-        # Get authentication
-        auth = None
-        # TODO: Support credential helpers and other auth methods
-
-        # Create LFS client and store
-        client = LFSClient(lfs_url, auth)
-        store = LFSStore.from_repo(r)
-
-        # Find all LFS pointers in the refs
-        pointers_to_fetch = []
-
-        if refs is None:
-            # Get all refs
-            refs = list(r.refs.keys())
-
-        for ref in refs:
-            if isinstance(ref, str):
-                ref_key = Ref(ref.encode())
-            elif isinstance(ref, bytes):
-                ref_key = Ref(ref)
-            else:
-                ref_key = ref
-            try:
-                commit = r[r.refs[ref_key]]
-            except KeyError:
-                continue
-
-            # Walk the commit tree
-            assert isinstance(commit, Commit)
-            for path, mode, sha in r.object_store.iter_tree_contents(commit.tree):
-                assert sha is not None
-                try:
-                    obj = r.object_store[sha]
-                except KeyError:
-                    pass
-                else:
-                    if isinstance(obj, Blob):
-                        pointer = LFSPointer.from_bytes(obj.data)
-                        if pointer and pointer.is_valid_oid():
-                            # Check if we already have it
-                            try:
-                                with store.open_object(pointer.oid):
-                                    pass  # Object exists, no need to fetch
-                            except KeyError:
-                                pointers_to_fetch.append((pointer.oid, pointer.size))
-
-        # Fetch missing objects
-        fetched = 0
-        for oid, size in pointers_to_fetch:
-            content = client.download(oid, size)
-            store.write_object([content])
-            fetched += 1
-
-        return fetched
-
-
-def lfs_pull(repo: str | os.PathLike[str] | Repo = ".", remote: str = "origin") -> int:
-    """Pull LFS objects for current checkout.
-
-    Args:
-      repo: Path to repository
-      remote: Remote name (default: origin)
-
-    Returns:
-      Number of objects fetched
-    """
-    from ..lfs import LFSPointer, LFSStore
-
-    with open_repo_closing(repo) as r:
-        # First do a fetch for HEAD
-        fetched = lfs_fetch(repo, remote, [b"HEAD"])
-
-        # Then checkout LFS files in working directory
-        store = LFSStore.from_repo(r)
-        index = r.open_index()
-
-        for path, entry in index.items():
-            full_path = os.path.join(r.path, path.decode())
-            if os.path.exists(full_path):
-                with open(full_path, "rb") as f:
-                    content = f.read()
-
-                pointer = LFSPointer.from_bytes(content)
-                if pointer and pointer.is_valid_oid():
-                    try:
-                        # Replace pointer with actual content
-                        with store.open_object(pointer.oid) as lfs_file:
-                            lfs_content = lfs_file.read()
-                        with open(full_path, "wb") as f:
-                            f.write(lfs_content)
-                    except KeyError:
-                        # Object not available
-                        pass
-
-        return fetched
-
-
-def lfs_push(
-    repo: str | os.PathLike[str] | Repo = ".",
-    remote: str = "origin",
-    refs: list[str | bytes] | None = None,
-) -> int:
-    """Push LFS objects to remote.
-
-    Args:
-      repo: Path to repository
-      remote: Remote name (default: origin)
-      refs: Specific refs to push LFS objects for (default: current branch)
-
-    Returns:
-      Number of objects pushed
-    """
-    from ..lfs import LFSClient, LFSPointer, LFSStore
-
-    with open_repo_closing(repo) as r:
-        # Get LFS server URL from config
-        config = r.get_config()
-        lfs_url_bytes = config.get((b"lfs",), b"url")
-        if not lfs_url_bytes:
-            # Try remote URL
-            remote_url = config.get((b"remote", remote.encode()), b"url")
-            if remote_url:
-                # Append /info/lfs to remote URL
-                remote_url_str = remote_url.decode()
-                if remote_url_str.endswith(".git"):
-                    remote_url_str = remote_url_str[:-4]
-                lfs_url = f"{remote_url_str}/info/lfs"
-            else:
-                raise ValueError(f"No LFS URL configured for remote {remote}")
-        else:
-            lfs_url = lfs_url_bytes.decode()
-
-        # Get authentication
-        auth = None
-        # TODO: Support credential helpers and other auth methods
-
-        # Create LFS client and store
-        client = LFSClient(lfs_url, auth)
-        store = LFSStore.from_repo(r)
-
-        # Find all LFS objects to push
-        if refs is None:
-            # Push current branch
-            head_ref = r.refs.read_ref(HEADREF)
-            refs = [head_ref] if head_ref else []
-
-        objects_to_push = set()
-
-        for ref in refs:
-            if isinstance(ref, str):
-                ref_bytes = ref.encode()
-            else:
-                ref_bytes = ref
-            try:
-                if ref_bytes.startswith(b"refs/"):
-                    commit = r[r.refs[Ref(ref_bytes)]]
-                else:
-                    commit = r[ref_bytes]
-            except KeyError:
-                continue
-
-            # Walk the commit tree
-            assert isinstance(commit, Commit)
-            for path, mode, sha in r.object_store.iter_tree_contents(commit.tree):
-                assert sha is not None
-                try:
-                    obj = r.object_store[sha]
-                except KeyError:
-                    pass
-                else:
-                    if isinstance(obj, Blob):
-                        pointer = LFSPointer.from_bytes(obj.data)
-                        if pointer and pointer.is_valid_oid():
-                            objects_to_push.add((pointer.oid, pointer.size))
-
-        # Push objects
-        pushed = 0
-        for oid, size in objects_to_push:
-            try:
-                with store.open_object(oid) as f:
-                    content = f.read()
-            except KeyError:
-                # Object not in local store
-                logging.warn("LFS object %s not found locally", oid)
-            else:
-                client.upload(oid, size, content)
-                pushed += 1
-
-        return pushed
-
-
-def lfs_status(repo: str | os.PathLike[str] | Repo = ".") -> dict[str, list[str]]:
-    """Show status of LFS files.
-
-    Args:
-      repo: Path to repository
-
-    Returns:
-      Dict with status information
-    """
-    from ..lfs import LFSPointer, LFSStore
-
-    with open_repo_closing(repo) as r:
-        store = LFSStore.from_repo(r)
-        index = r.open_index()
-
-        status: dict[str, list[str]] = {
-            "tracked": [],
-            "not_staged": [],
-            "not_committed": [],
-            "not_pushed": [],
-            "missing": [],
-        }
-
-        # Check working directory files
-        for path, entry in index.items():
-            path_str = path.decode()
-            full_path = os.path.join(r.path, path_str)
-
-            if os.path.exists(full_path):
-                with open(full_path, "rb") as f:
-                    content = f.read()
-
-                pointer = LFSPointer.from_bytes(content)
-                if pointer and pointer.is_valid_oid():
-                    status["tracked"].append(path_str)
-
-                    # Check if object exists locally
-                    try:
-                        with store.open_object(pointer.oid):
-                            pass  # Object exists locally
-                    except KeyError:
-                        status["missing"].append(path_str)
-
-                    # Check if file has been modified
-                    if isinstance(entry, ConflictedIndexEntry):
-                        continue  # Skip conflicted entries
-                    try:
-                        staged_obj = r.object_store[entry.sha]
-                    except KeyError:
-                        pass
-                    else:
-                        if not isinstance(staged_obj, Blob):
-                            raise AssertionError(
-                                f"Expected Blob object, got {type(staged_obj).__name__}"
-                            )
-                        staged_pointer = LFSPointer.from_bytes(staged_obj.data)
-                        if staged_pointer and staged_pointer.oid != pointer.oid:
-                            status["not_staged"].append(path_str)
-
-        # TODO: Check for not committed and not pushed files
-
-        return status
-
-
 def worktree_list(repo: RepoPath = ".") -> list[Any]:
     """List all worktrees for a repository.
 

+ 717 - 0
dulwich/porcelain/lfs.py

@@ -0,0 +1,717 @@
+# lfs.py -- LFS porcelain
+# Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as published by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Porcelain functions for Git LFS support."""
+
+import fnmatch
+import logging
+import os
+import stat
+from collections.abc import Sequence
+from typing import Any
+
+from dulwich.index import (
+    ConflictedIndexEntry,
+    index_entry_from_stat,
+)
+from dulwich.objects import Blob, Commit, Tree
+from dulwich.refs import HEADREF, Ref
+from dulwich.repo import Repo
+
+
+def lfs_track(
+    repo: str | os.PathLike[str] | Repo = ".",
+    patterns: Sequence[str] | None = None,
+) -> list[str]:
+    """Track file patterns with Git LFS.
+
+    Args:
+      repo: Path to repository
+      patterns: List of file patterns to track (e.g., ["*.bin", "*.pdf"])
+                If None, returns current tracked patterns
+
+    Returns:
+      List of tracked patterns
+    """
+    from ..attrs import GitAttributes
+    from . import add, open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        gitattributes_path = os.path.join(r.path, ".gitattributes")
+
+        # Load existing GitAttributes
+        if os.path.exists(gitattributes_path):
+            gitattributes = GitAttributes.from_file(gitattributes_path)
+        else:
+            gitattributes = GitAttributes()
+
+        if patterns is None:
+            # Return current LFS tracked patterns
+            tracked = []
+            for pattern_obj, attrs in gitattributes:
+                if attrs.get(b"filter") == b"lfs":
+                    tracked.append(pattern_obj.pattern.decode())
+            return tracked
+
+        # Add new patterns
+        for pattern in patterns:
+            # Ensure pattern is bytes
+            pattern_bytes = pattern.encode() if isinstance(pattern, str) else pattern
+
+            # Set LFS attributes for the pattern
+            gitattributes.set_attribute(pattern_bytes, b"filter", b"lfs")
+            gitattributes.set_attribute(pattern_bytes, b"diff", b"lfs")
+            gitattributes.set_attribute(pattern_bytes, b"merge", b"lfs")
+            gitattributes.set_attribute(pattern_bytes, b"text", False)
+
+        # Write updated attributes
+        gitattributes.write_to_file(gitattributes_path)
+
+        # Stage the .gitattributes file
+        add(r, [".gitattributes"])
+
+        return lfs_track(r)  # Return updated list
+
+
+def lfs_untrack(
+    repo: str | os.PathLike[str] | Repo = ".",
+    patterns: Sequence[str] | None = None,
+) -> list[str]:
+    """Untrack file patterns from Git LFS.
+
+    Args:
+      repo: Path to repository
+      patterns: List of file patterns to untrack
+
+    Returns:
+      List of remaining tracked patterns
+    """
+    from ..attrs import GitAttributes
+    from . import add, open_repo_closing
+
+    if not patterns:
+        return lfs_track(repo)
+
+    with open_repo_closing(repo) as r:
+        gitattributes_path = os.path.join(r.path, ".gitattributes")
+
+        if not os.path.exists(gitattributes_path):
+            return []
+
+        # Load existing GitAttributes
+        gitattributes = GitAttributes.from_file(gitattributes_path)
+
+        # Remove specified patterns
+        for pattern in patterns:
+            pattern_bytes = pattern.encode() if isinstance(pattern, str) else pattern
+
+            # Check if pattern is tracked by LFS
+            for pattern_obj, attrs in list(gitattributes):
+                if (
+                    pattern_obj.pattern == pattern_bytes
+                    and attrs.get(b"filter") == b"lfs"
+                ):
+                    gitattributes.remove_pattern(pattern_bytes)
+                    break
+
+        # Write updated attributes
+        gitattributes.write_to_file(gitattributes_path)
+
+        # Stage the .gitattributes file
+        add(r, [".gitattributes"])
+
+        return lfs_track(r)  # Return updated list
+
+
+def lfs_init(repo: str | os.PathLike[str] | Repo = ".") -> None:
+    """Initialize Git LFS in a repository.
+
+    Args:
+      repo: Path to repository
+
+    Returns:
+      None
+    """
+    from ..lfs import LFSStore
+    from . import open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        # Create LFS store
+        LFSStore.from_repo(r, create=True)
+
+        # Set up Git config for LFS
+        config = r.get_config()
+        config.set((b"filter", b"lfs"), b"process", b"git-lfs filter-process")
+        config.set((b"filter", b"lfs"), b"required", b"true")
+        config.set((b"filter", b"lfs"), b"clean", b"git-lfs clean -- %f")
+        config.set((b"filter", b"lfs"), b"smudge", b"git-lfs smudge -- %f")
+        config.write_to_path()
+
+
+def lfs_clean(
+    repo: str | os.PathLike[str] | Repo = ".",
+    path: str | os.PathLike[str] | None = None,
+) -> bytes:
+    """Clean a file by converting it to an LFS pointer.
+
+    Args:
+      repo: Path to repository
+      path: Path to file to clean (relative to repo root)
+
+    Returns:
+      LFS pointer content as bytes
+    """
+    from ..lfs import LFSFilterDriver, LFSStore
+    from . import open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        if path is None:
+            raise ValueError("Path must be specified")
+
+        # Get LFS store
+        lfs_store = LFSStore.from_repo(r)
+        filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
+
+        # Read file content
+        full_path = os.path.join(r.path, path)
+        with open(full_path, "rb") as f:
+            content = f.read()
+
+        # Clean the content (convert to LFS pointer)
+        return filter_driver.clean(content)
+
+
+def lfs_smudge(
+    repo: str | os.PathLike[str] | Repo = ".",
+    pointer_content: bytes | None = None,
+) -> bytes:
+    """Smudge an LFS pointer by retrieving the actual content.
+
+    Args:
+      repo: Path to repository
+      pointer_content: LFS pointer content as bytes
+
+    Returns:
+      Actual file content as bytes
+    """
+    from ..lfs import LFSFilterDriver, LFSStore
+    from . import open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        if pointer_content is None:
+            raise ValueError("Pointer content must be specified")
+
+        # Get LFS store
+        lfs_store = LFSStore.from_repo(r)
+        filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
+
+        # Smudge the pointer (retrieve actual content)
+        return filter_driver.smudge(pointer_content)
+
+
+def lfs_ls_files(
+    repo: str | os.PathLike[str] | Repo = ".",
+    ref: str | bytes | None = None,
+) -> list[tuple[bytes, str, int]]:
+    """List files tracked by Git LFS.
+
+    Args:
+      repo: Path to repository
+      ref: Git ref to check (defaults to HEAD)
+
+    Returns:
+      List of (path, oid, size) tuples for LFS files
+    """
+    from ..lfs import LFSPointer
+    from ..object_store import iter_tree_contents
+    from . import open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        if ref is None:
+            ref = b"HEAD"
+        elif isinstance(ref, str):
+            ref = ref.encode()
+
+        # Get the commit and tree
+        try:
+            commit = r[ref]
+            assert isinstance(commit, Commit)
+            tree = r[commit.tree]
+            assert isinstance(tree, Tree)
+        except KeyError:
+            return []
+
+        lfs_files = []
+
+        # Walk the tree
+        for path, mode, sha in iter_tree_contents(r.object_store, tree.id):
+            assert path is not None
+            assert mode is not None
+            assert sha is not None
+            if not stat.S_ISREG(mode):
+                continue
+
+            # Check if it's an LFS pointer
+            obj = r.object_store[sha]
+            if not isinstance(obj, Blob):
+                raise AssertionError(f"Expected Blob object, got {type(obj).__name__}")
+            pointer = LFSPointer.from_bytes(obj.data)
+            if pointer is not None:
+                lfs_files.append((path, pointer.oid, pointer.size))
+
+        return lfs_files
+
+
+def lfs_migrate(
+    repo: str | os.PathLike[str] | Repo = ".",
+    include: list[str] | None = None,
+    exclude: list[str] | None = None,
+    everything: bool = False,
+) -> int:
+    """Migrate files to Git LFS.
+
+    Args:
+      repo: Path to repository
+      include: Patterns of files to include
+      exclude: Patterns of files to exclude
+      everything: Migrate all files above a certain size
+
+    Returns:
+      Number of migrated files
+    """
+    from ..lfs import LFSFilterDriver, LFSStore
+    from . import open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        # Initialize LFS if needed
+        lfs_store = LFSStore.from_repo(r, create=True)
+        filter_driver = LFSFilterDriver(lfs_store, config=r.get_config())
+
+        # Get current index
+        index = r.open_index()
+
+        migrated = 0
+
+        # Determine files to migrate
+        files_to_migrate = []
+
+        if everything:
+            # Migrate all files above 100MB
+            for path, entry in index.items():
+                full_path = os.path.join(r.path, path.decode())
+                if os.path.exists(full_path):
+                    size = os.path.getsize(full_path)
+                    if size > 100 * 1024 * 1024:  # 100MB
+                        files_to_migrate.append(path.decode())
+        else:
+            # Use include/exclude patterns
+            for path, entry in index.items():
+                path_str = path.decode()
+
+                # Check include patterns
+                if include:
+                    matched = any(
+                        fnmatch.fnmatch(path_str, pattern) for pattern in include
+                    )
+                    if not matched:
+                        continue
+
+                # Check exclude patterns
+                if exclude:
+                    excluded = any(
+                        fnmatch.fnmatch(path_str, pattern) for pattern in exclude
+                    )
+                    if excluded:
+                        continue
+
+                files_to_migrate.append(path_str)
+
+        # Migrate files
+        for path_str in files_to_migrate:
+            full_path = os.path.join(r.path, path_str)
+            if not os.path.exists(full_path):
+                continue
+
+            # Read file content
+            with open(full_path, "rb") as f:
+                content = f.read()
+
+            # Convert to LFS pointer
+            pointer_content = filter_driver.clean(content)
+
+            # Write pointer back to file
+            with open(full_path, "wb") as f:
+                f.write(pointer_content)
+
+            # Create blob for pointer content and update index
+            blob = Blob()
+            blob.data = pointer_content
+            r.object_store.add_object(blob)
+
+            st = os.stat(full_path)
+            index_entry = index_entry_from_stat(st, blob.id, 0)
+            path_bytes = path_str.encode() if isinstance(path_str, str) else path_str
+            index[path_bytes] = index_entry
+
+            migrated += 1
+
+        # Write updated index
+        index.write()
+
+        # Track patterns if include was specified
+        if include:
+            lfs_track(r, include)
+
+        return migrated
+
+
+def lfs_pointer_check(
+    repo: str | os.PathLike[str] | Repo = ".",
+    paths: Sequence[str] | None = None,
+) -> dict[str, Any | None]:
+    """Check if files are valid LFS pointers.
+
+    Args:
+      repo: Path to repository
+      paths: List of file paths to check (if None, check all files)
+
+    Returns:
+      Dict mapping paths to LFSPointer objects (or None if not a pointer)
+    """
+    from ..lfs import LFSPointer
+    from . import open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        results = {}
+
+        if paths is None:
+            # Check all files in index
+            index = r.open_index()
+            paths = [path.decode() for path in index]
+
+        for path in paths:
+            full_path = os.path.join(r.path, path)
+            if os.path.exists(full_path):
+                try:
+                    with open(full_path, "rb") as f:
+                        content = f.read()
+                    pointer = LFSPointer.from_bytes(content)
+                    results[path] = pointer
+                except OSError:
+                    results[path] = None
+            else:
+                results[path] = None
+
+        return results
+
+
+def lfs_fetch(
+    repo: str | os.PathLike[str] | Repo = ".",
+    remote: str = "origin",
+    refs: list[str | bytes] | None = None,
+) -> int:
+    """Fetch LFS objects from remote.
+
+    Args:
+      repo: Path to repository
+      remote: Remote name (default: origin)
+      refs: Specific refs to fetch LFS objects for (default: all refs)
+
+    Returns:
+      Number of objects fetched
+    """
+    from ..lfs import LFSClient, LFSPointer, LFSStore
+    from . import open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        # Get LFS server URL from config
+        config = r.get_config()
+        lfs_url_bytes = config.get((b"lfs",), b"url")
+        if not lfs_url_bytes:
+            # Try remote URL
+            remote_url = config.get((b"remote", remote.encode()), b"url")
+            if remote_url:
+                # Append /info/lfs to remote URL
+                remote_url_str = remote_url.decode()
+                if remote_url_str.endswith(".git"):
+                    remote_url_str = remote_url_str[:-4]
+                lfs_url = f"{remote_url_str}/info/lfs"
+            else:
+                raise ValueError(f"No LFS URL configured for remote {remote}")
+        else:
+            lfs_url = lfs_url_bytes.decode()
+
+        # Get authentication
+        auth = None
+        # TODO: Support credential helpers and other auth methods
+
+        # Create LFS client and store
+        client = LFSClient(lfs_url, auth)
+        store = LFSStore.from_repo(r)
+
+        # Find all LFS pointers in the refs
+        pointers_to_fetch = []
+
+        if refs is None:
+            # Get all refs
+            refs = list(r.refs.keys())
+
+        for ref in refs:
+            if isinstance(ref, str):
+                ref_key = Ref(ref.encode())
+            elif isinstance(ref, bytes):
+                ref_key = Ref(ref)
+            else:
+                ref_key = ref
+            try:
+                commit = r[r.refs[ref_key]]
+            except KeyError:
+                continue
+
+            # Walk the commit tree
+            assert isinstance(commit, Commit)
+            for path, mode, sha in r.object_store.iter_tree_contents(commit.tree):
+                assert sha is not None
+                try:
+                    obj = r.object_store[sha]
+                except KeyError:
+                    pass
+                else:
+                    if isinstance(obj, Blob):
+                        pointer = LFSPointer.from_bytes(obj.data)
+                        if pointer and pointer.is_valid_oid():
+                            # Check if we already have it
+                            try:
+                                with store.open_object(pointer.oid):
+                                    pass  # Object exists, no need to fetch
+                            except KeyError:
+                                pointers_to_fetch.append((pointer.oid, pointer.size))
+
+        # Fetch missing objects
+        fetched = 0
+        for oid, size in pointers_to_fetch:
+            content = client.download(oid, size)
+            store.write_object([content])
+            fetched += 1
+
+        return fetched
+
+
+def lfs_pull(repo: str | os.PathLike[str] | Repo = ".", remote: str = "origin") -> int:
+    """Pull LFS objects for current checkout.
+
+    Args:
+      repo: Path to repository
+      remote: Remote name (default: origin)
+
+    Returns:
+      Number of objects fetched
+    """
+    from ..lfs import LFSPointer, LFSStore
+    from . import open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        # First do a fetch for HEAD
+        fetched = lfs_fetch(repo, remote, [b"HEAD"])
+
+        # Then checkout LFS files in working directory
+        store = LFSStore.from_repo(r)
+        index = r.open_index()
+
+        for path, entry in index.items():
+            full_path = os.path.join(r.path, path.decode())
+            if os.path.exists(full_path):
+                with open(full_path, "rb") as f:
+                    content = f.read()
+
+                pointer = LFSPointer.from_bytes(content)
+                if pointer and pointer.is_valid_oid():
+                    try:
+                        # Replace pointer with actual content
+                        with store.open_object(pointer.oid) as lfs_file:
+                            lfs_content = lfs_file.read()
+                        with open(full_path, "wb") as f:
+                            f.write(lfs_content)
+                    except KeyError:
+                        # Object not available
+                        pass
+
+        return fetched
+
+
+def lfs_push(
+    repo: str | os.PathLike[str] | Repo = ".",
+    remote: str = "origin",
+    refs: list[str | bytes] | None = None,
+) -> int:
+    """Push LFS objects to remote.
+
+    Args:
+      repo: Path to repository
+      remote: Remote name (default: origin)
+      refs: Specific refs to push LFS objects for (default: current branch)
+
+    Returns:
+      Number of objects pushed
+    """
+    from ..lfs import LFSClient, LFSPointer, LFSStore
+    from . import open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        # Get LFS server URL from config
+        config = r.get_config()
+        lfs_url_bytes = config.get((b"lfs",), b"url")
+        if not lfs_url_bytes:
+            # Try remote URL
+            remote_url = config.get((b"remote", remote.encode()), b"url")
+            if remote_url:
+                # Append /info/lfs to remote URL
+                remote_url_str = remote_url.decode()
+                if remote_url_str.endswith(".git"):
+                    remote_url_str = remote_url_str[:-4]
+                lfs_url = f"{remote_url_str}/info/lfs"
+            else:
+                raise ValueError(f"No LFS URL configured for remote {remote}")
+        else:
+            lfs_url = lfs_url_bytes.decode()
+
+        # Get authentication
+        auth = None
+        # TODO: Support credential helpers and other auth methods
+
+        # Create LFS client and store
+        client = LFSClient(lfs_url, auth)
+        store = LFSStore.from_repo(r)
+
+        # Find all LFS objects to push
+        if refs is None:
+            # Push current branch
+            head_ref = r.refs.read_ref(HEADREF)
+            refs = [head_ref] if head_ref else []
+
+        objects_to_push = set()
+
+        for ref in refs:
+            if isinstance(ref, str):
+                ref_bytes = ref.encode()
+            else:
+                ref_bytes = ref
+            try:
+                if ref_bytes.startswith(b"refs/"):
+                    commit = r[r.refs[Ref(ref_bytes)]]
+                else:
+                    commit = r[ref_bytes]
+            except KeyError:
+                continue
+
+            # Walk the commit tree
+            assert isinstance(commit, Commit)
+            for path, mode, sha in r.object_store.iter_tree_contents(commit.tree):
+                assert sha is not None
+                try:
+                    obj = r.object_store[sha]
+                except KeyError:
+                    pass
+                else:
+                    if isinstance(obj, Blob):
+                        pointer = LFSPointer.from_bytes(obj.data)
+                        if pointer and pointer.is_valid_oid():
+                            objects_to_push.add((pointer.oid, pointer.size))
+
+        # Push objects
+        pushed = 0
+        for oid, size in objects_to_push:
+            try:
+                with store.open_object(oid) as f:
+                    content = f.read()
+            except KeyError:
+                # Object not in local store
+                logging.warn("LFS object %s not found locally", oid)
+            else:
+                client.upload(oid, size, content)
+                pushed += 1
+
+        return pushed
+
+
+def lfs_status(repo: str | os.PathLike[str] | Repo = ".") -> dict[str, list[str]]:
+    """Show status of LFS files.
+
+    Args:
+      repo: Path to repository
+
+    Returns:
+      Dict with status information
+    """
+    from ..lfs import LFSPointer, LFSStore
+    from . import open_repo_closing
+
+    with open_repo_closing(repo) as r:
+        store = LFSStore.from_repo(r)
+        index = r.open_index()
+
+        status: dict[str, list[str]] = {
+            "tracked": [],
+            "not_staged": [],
+            "not_committed": [],
+            "not_pushed": [],
+            "missing": [],
+        }
+
+        # Check working directory files
+        for path, entry in index.items():
+            path_str = path.decode()
+            full_path = os.path.join(r.path, path_str)
+
+            if os.path.exists(full_path):
+                with open(full_path, "rb") as f:
+                    content = f.read()
+
+                pointer = LFSPointer.from_bytes(content)
+                if pointer and pointer.is_valid_oid():
+                    status["tracked"].append(path_str)
+
+                    # Check if object exists locally
+                    try:
+                        with store.open_object(pointer.oid):
+                            pass  # Object exists locally
+                    except KeyError:
+                        status["missing"].append(path_str)
+
+                    # Check if file has been modified
+                    if isinstance(entry, ConflictedIndexEntry):
+                        continue  # Skip conflicted entries
+                    try:
+                        staged_obj = r.object_store[entry.sha]
+                    except KeyError:
+                        pass
+                    else:
+                        if not isinstance(staged_obj, Blob):
+                            raise AssertionError(
+                                f"Expected Blob object, got {type(staged_obj).__name__}"
+                            )
+                        staged_pointer = LFSPointer.from_bytes(staged_obj.data)
+                        if staged_pointer and staged_pointer.oid != pointer.oid:
+                            status["not_staged"].append(path_str)
+
+        # TODO: Check for not committed and not pushed files
+
+        return status

+ 3 - 1
tests/__init__.py

@@ -198,7 +198,9 @@ def self_test_suite() -> unittest.TestSuite:
         "merge",
         "notes",
     ]
-    module_names += ["tests.porcelain"] + ["tests.porcelain.test_" + name for name in porcelain_names]
+    module_names += ["tests.porcelain"] + [
+        "tests.porcelain.test_" + name for name in porcelain_names
+    ]
     loader = unittest.TestLoader()
     return loader.loadTestsFromNames(module_names)