Procházet zdrojové kódy

feat(cone-mode): sparse checkout cone mode support (#1497)

Following up on my previous submission #1495 (from #405) which laid the
groundwork for cone mode I wanted to complete the task and now have got
the test cases all working.

It's once again pretty late by the time I'm finishing up, so I'll review
with fresh eyes but I've at least ran all the linters this time :-)
Tests pass and are named normally.

---------

Co-authored-by: Jelmer Vernooij <jvernooij@jelmer.uk>
Louis Maddox před 3 týdny
rodič
revize
081d6b4c72
5 změnil soubory, kde provedl 1083 přidání a 99 odebrání
  1. 124 99
      dulwich/porcelain.py
  2. 33 0
      dulwich/repo.py
  3. 342 0
      dulwich/sparse_patterns.py
  4. 225 0
      tests/test_porcelain.py
  5. 359 0
      tests/test_sparse_patterns.py

+ 124 - 99
dulwich/porcelain.py

@@ -28,6 +28,7 @@ Currently implemented:
  * check-ignore
  * checkout_branch
  * clone
+ * cone mode{_init, _set, _add}
  * commit
  * commit-tree
  * daemon
@@ -95,7 +96,6 @@ from .file import ensure_dir_exists
 from .graph import can_fast_forward
 from .ignore import IgnoreFilterManager
 from .index import (
-    EXTENDED_FLAG_SKIP_WORKTREE,
     _fs_to_tree_path,
     blob_from_path_and_stat,
     build_file_from_blob,
@@ -136,6 +136,11 @@ from .server import (
     UploadPackHandler,
 )
 from .server import update_server_info as server_update_server_info
+from .sparse_patterns import (
+    SparseCheckoutConflictError,
+    apply_included_paths,
+    determine_included_paths,
+)
 
 # Module level tuple definition for status output
 GitStatus = namedtuple("GitStatus", "staged unstaged untracked")
@@ -2114,113 +2119,133 @@ def checkout_branch(repo, target: Union[bytes, str], force: bool = False) -> Non
                 dir_path = os.path.dirname(dir_path)
 
 
-def sparse_checkout(repo, patterns=None, force=False):
-    """Perform a sparse checkout by excluding certain paths via skip-worktree bits.
+def sparse_checkout(
+    repo, patterns=None, force: bool = False, cone: Union[bool, None] = None
+):
+    """Perform a sparse checkout in the repository (either 'full' or 'cone mode').
+
+    Perform sparse checkout in either 'cone' (directory-based) mode or
+    'full pattern' (.gitignore) mode, depending on the ``cone`` parameter.
 
-    Mark any paths not matching the given patterns with skip-worktree in the index and
-    remove them from the working tree.  If `force=False` and a file has local
-    modifications, a `CheckoutError` is raised to prevent accidental data loss.
+    If ``cone`` is ``None``, the mode is inferred from the repository's
+    ``core.sparseCheckoutCone`` config setting.
 
-    By default, patterns are stored in or read from `.git/info/sparse-checkout`, and
-    follow standard Gitignore/fnmatch rules.
+    Steps:
+      1) If ``patterns`` is provided, write them to ``.git/info/sparse-checkout``.
+      2) Determine which paths in the index are included vs. excluded.
+         - If ``cone=True``, use "cone-compatible" directory-based logic.
+         - If ``cone=False``, use standard .gitignore-style matching.
+      3) Update the index's skip-worktree bits and add/remove files in
+         the working tree accordingly.
+      4) If ``force=False``, refuse to remove files that have local modifications.
 
     Args:
-      repo: A path to a repository or a Repo instance.
-      patterns: A list of Gitignore-style patterns to include.
-      force: Whether to allow destructive removals of uncommitted changes
-             in newly excluded paths.
+      repo: Path to the repository or a Repo object.
+      patterns: Optional list of sparse-checkout patterns to write.
+      force: Whether to force removal of locally modified files (default False).
+      cone: Boolean indicating cone mode (True/False). If None, read from config.
 
-    Raises:
-      CheckoutError: If local modifications would be discarded without force=True.
-      Error: If no patterns are given or an I/O failure occurs.
+    Returns:
+      None
     """
-    repo = Repo(repo) if not isinstance(repo, Repo) else repo
+    with open_repo_closing(repo) as repo_obj:
+        # --- 0) Possibly infer 'cone' from config ---
+        if cone is None:
+            cone = repo_obj.infer_cone_mode()
 
-    # 1) Read or write the sparse-checkout file
-    if patterns is not None:
-        repo.set_sparse_checkout_patterns(patterns)
-    else:
-        patterns = repo.get_sparse_checkout_patterns()
+        # --- 1) Read or write patterns ---
         if patterns is None:
-            raise Error("No sparse checkout patterns provided and no file found.")
-
-    # 2) Preprocess patterns: "docs/" -> "docs/*", unify path separators
-    processed_pats = []
-    for pat in patterns:
-        if pat.endswith("/"):
-            pat += "*"
-        processed_pats.append(pat)
-    patterns = processed_pats
-
-    def matches_any_pattern(index_path):
-        forward_path = index_path.replace("\\", "/")
-        for pat in patterns:
-            if fnmatch.fnmatch(forward_path, pat):
-                return True
-        return False
-
-    # 3) Helper to detect local modifications
-    normalizer = repo.get_blob_normalizer()
-
-    def local_modifications_exist(full_path, index_entry):
-        if not os.path.exists(full_path):
-            return False
-        try:
-            with open(full_path, "rb") as f:
-                disk_data = f.read()
-        except OSError:
-            return True
-        try:
-            blob = repo.object_store[index_entry.sha]
-        except KeyError:
-            return True
-        norm_data = normalizer.checkin_normalize(disk_data, full_path)
-        return norm_data != blob.data
-
-    # 4) Update skip-worktree bits in the index
-    index = repo.open_index()
-    for path, entry in list(index.items()):
-        path_str = path.decode("utf-8")
-        # If the file matches any pattern => included => clear skip-worktree
-        if matches_any_pattern(path_str):
-            entry.set_skip_worktree(False)
-        else:
-            entry.set_skip_worktree(True)
-        index[path] = entry
-    index.write()
-
-    # 5) Update the working tree to reflect skip-worktree bits
-    for path, entry in list(index.items()):
-        path_str = path.decode("utf-8")
-        full_path = os.path.join(repo.path, path_str)
-        skip_bit_set = bool(entry.extended_flags & EXTENDED_FLAG_SKIP_WORKTREE)
-
-        if skip_bit_set:
-            # The file is excluded
-            if os.path.exists(full_path):
-                # If force is False and local modifications exist, fail
-                if not force and local_modifications_exist(full_path, entry):
-                    raise CheckoutError(
-                        f"Local modifications in {path_str} would be overwritten "
-                        f"by sparse checkout. Use force=True to override."
-                    )
-                try:
-                    os.remove(full_path)
-                except OSError as e:
-                    raise Error(f"Failed to remove excluded file {path_str}: {e}")
+            lines = repo_obj.get_sparse_checkout_patterns()
+            if lines is None:
+                raise Error("No sparse checkout patterns found.")
         else:
-            # The file is included
-            if not os.path.exists(full_path):
-                try:
-                    blob = repo.object_store[entry.sha]
-                except KeyError:
-                    raise Error(
-                        f"Blob {entry.sha} not found in object store for {path_str}."
-                    )
-                ensure_dir_exists(os.path.dirname(full_path))
-                with open(full_path, "wb") as f:
-                    f.write(blob.data)
-    return
+            lines = patterns
+            repo_obj.set_sparse_checkout_patterns(patterns)
+
+        # --- 2) Determine the set of included paths ---
+        included_paths = determine_included_paths(repo_obj, lines, cone)
+
+        # --- 3) Apply those results to the index & working tree ---
+        try:
+            apply_included_paths(repo_obj, included_paths, force=force)
+        except SparseCheckoutConflictError as exc:
+            raise CheckoutError(*exc.args) from exc
+
+
+def cone_mode_init(repo):
+    """Initialize a repository to use sparse checkout in 'cone' mode.
+
+    Sets ``core.sparseCheckout`` and ``core.sparseCheckoutCone`` in the config.
+    Writes an initial ``.git/info/sparse-checkout`` file that includes only
+    top-level files (and excludes all subdirectories), e.g. ``["/*", "!/*/"]``.
+    Then performs a sparse checkout to update the working tree accordingly.
+
+    If no directories are specified, then only top-level files are included:
+    https://git-scm.com/docs/git-sparse-checkout#_internalscone_mode_handling
+
+    Args:
+      repo: Path to the repository or a Repo object.
+
+    Returns:
+      None
+    """
+    with open_repo_closing(repo) as repo_obj:
+        repo_obj.configure_for_cone_mode()
+        patterns = ["/*", "!/*/"]  # root-level files only
+        sparse_checkout(repo_obj, patterns, force=True, cone=True)
+
+
+def cone_mode_set(repo, dirs, force=False):
+    """Overwrite the existing 'cone-mode' sparse patterns with a new set of directories.
+
+    Ensures ``core.sparseCheckout`` and ``core.sparseCheckoutCone`` are enabled.
+    Writes new patterns so that only the specified directories (and top-level files)
+    remain in the working tree, and applies the sparse checkout update.
+
+    Args:
+      repo: Path to the repository or a Repo object.
+      dirs: List of directory names to include.
+      force: Whether to forcibly discard local modifications (default False).
+
+    Returns:
+      None
+    """
+    with open_repo_closing(repo) as repo_obj:
+        repo_obj.configure_for_cone_mode()
+        repo_obj.set_cone_mode_patterns(dirs=dirs)
+        new_patterns = repo_obj.get_sparse_checkout_patterns()
+        # Finally, apply the patterns and update the working tree
+        sparse_checkout(repo_obj, new_patterns, force=force, cone=True)
+
+
+def cone_mode_add(repo, dirs, force=False):
+    """Add new directories to the existing 'cone-mode' sparse-checkout patterns.
+
+    Reads the current patterns from ``.git/info/sparse-checkout``, adds pattern
+    lines to include the specified directories, and then performs a sparse
+    checkout to update the working tree accordingly.
+
+    Args:
+      repo: Path to the repository or a Repo object.
+      dirs: List of directory names to add to the sparse-checkout.
+      force: Whether to forcibly discard local modifications (default False).
+
+    Returns:
+      None
+    """
+    with open_repo_closing(repo) as repo_obj:
+        repo_obj.configure_for_cone_mode()
+        # Do not pass base patterns as dirs
+        base_patterns = ["/*", "!/*/"]
+        existing_dirs = [
+            pat.strip("/")
+            for pat in repo_obj.get_sparse_checkout_patterns()
+            if pat not in base_patterns
+        ]
+        added_dirs = existing_dirs + (dirs or [])
+        repo_obj.set_cone_mode_patterns(dirs=added_dirs)
+        new_patterns = repo_obj.get_sparse_checkout_patterns()
+        sparse_checkout(repo_obj, patterns=new_patterns, force=force, cone=True)
 
 
 def check_mailmap(repo, contact):

+ 33 - 0
dulwich/repo.py

@@ -1842,6 +1842,23 @@ class Repo(BaseRepo):
         """Return the path of the sparse-checkout file in this repo's control dir."""
         return os.path.join(self.controldir(), "info", "sparse-checkout")
 
+    def configure_for_cone_mode(self) -> None:
+        """Ensure the repository is configured for cone-mode sparse-checkout."""
+        config = self.get_config()
+        config.set((b"core",), b"sparseCheckout", b"true")
+        config.set((b"core",), b"sparseCheckoutCone", b"true")
+        config.write_to_path()
+
+    def infer_cone_mode(self) -> bool:
+        """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
+        config = self.get_config()
+        try:
+            sc_cone = config.get((b"core",), b"sparseCheckoutCone")
+            return sc_cone == b"true"
+        except KeyError:
+            # If core.sparseCheckoutCone is not set, default to False
+            return False
+
     def get_sparse_checkout_patterns(self) -> list[str]:
         """Return a list of sparse-checkout patterns from info/sparse-checkout.
 
@@ -1871,6 +1888,22 @@ class Repo(BaseRepo):
             for pat in patterns:
                 f.write(pat + "\n")
 
+    def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None:
+        """Write the given cone-mode directory patterns into info/sparse-checkout.
+
+        For each directory to include, add an inclusion line that "undoes" the prior
+        ``!/*/`` 'exclude' that re-includes that directory and everything under it.
+        Never add the same line twice.
+        """
+        patterns = ["/*", "!/*/"]
+        if dirs:
+            for d in dirs:
+                d = d.strip("/")
+                line = f"/{d}/"
+                if d and line not in patterns:
+                    patterns.append(line)
+        self.set_sparse_checkout_patterns(patterns)
+
 
 class MemoryRepo(BaseRepo):
     """Repo that stores refs, objects, and named files in memory.

+ 342 - 0
dulwich/sparse_patterns.py

@@ -0,0 +1,342 @@
+# sparse_patterns.py -- Sparse checkout pattern handling.
+# Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Sparse checkout pattern handling."""
+
+import os
+from fnmatch import fnmatch
+
+from .file import ensure_dir_exists
+
+
+class SparseCheckoutConflictError(Exception):
+    """Raised when local modifications would be overwritten by a sparse checkout operation."""
+
+
+class BlobNotFoundError(Exception):
+    """Raised when a requested blob is not found in the repository's object store."""
+
+
+def determine_included_paths(repo, lines, cone):
+    """Determine which paths in the index should be included based on either
+    a full-pattern match or a cone-mode approach.
+
+    Args:
+      repo: A path to the repository or a Repo object.
+      lines: A list of pattern lines (strings) from sparse-checkout config.
+      cone: A bool indicating cone mode.
+
+    Returns:
+      A set of included path strings.
+    """
+    if cone:
+        return compute_included_paths_cone(repo, lines)
+    else:
+        return compute_included_paths_full(repo, lines)
+
+
+def compute_included_paths_full(repo, lines):
+    """Use .gitignore-style parsing and matching to determine included paths.
+
+    Each file path in the index is tested against the parsed sparse patterns.
+    If it matches the final (most recently applied) positive pattern, it is included.
+
+    Args:
+      repo: A path to the repository or a Repo object.
+      lines: A list of pattern lines (strings) from sparse-checkout config.
+
+    Returns:
+      A set of included path strings.
+    """
+    parsed = parse_sparse_patterns(lines)
+    index = repo.open_index()
+    included = set()
+    for path_bytes, entry in index.items():
+        path_str = path_bytes.decode("utf-8")
+        # For .gitignore logic, match_gitignore_patterns returns True if 'included'
+        if match_gitignore_patterns(path_str, parsed, path_is_dir=False):
+            included.add(path_str)
+    return included
+
+
+def compute_included_paths_cone(repo, lines):
+    """Implement a simplified 'cone' approach for sparse-checkout.
+
+    By default, this can include top-level files, exclude all subdirectories,
+    and re-include specified directories. The logic is less comprehensive than
+    Git's built-in cone mode (recursive vs parent) and is essentially an implementation
+    of the recursive cone mode.
+
+    Args:
+      repo: A path to the repository or a Repo object.
+      lines: A list of pattern lines (strings), typically including entries like
+        "/*", "!/*/", or "/mydir/".
+
+    Returns:
+      A set of included path strings.
+    """
+    include_top_level = False
+    exclude_subdirs = False
+    reinclude_dirs = set()
+
+    for pat in lines:
+        if pat == "/*":
+            include_top_level = True
+        elif pat == "!/*/":
+            exclude_subdirs = True
+        elif pat.startswith("/"):
+            # strip leading '/' and trailing '/'
+            d = pat.strip("/")
+            if d:
+                reinclude_dirs.add(d)
+
+    index = repo.open_index()
+    included = set()
+
+    for path_bytes, entry in index.items():
+        path_str = path_bytes.decode("utf-8")
+
+        # Check if this is top-level (no slash) or which top_dir it belongs to
+        if "/" not in path_str:
+            # top-level file
+            if include_top_level:
+                included.add(path_str)
+            continue
+
+        top_dir = path_str.split("/", 1)[0]
+        if exclude_subdirs:
+            # subdirs are excluded unless they appear in reinclude_dirs
+            if top_dir in reinclude_dirs:
+                included.add(path_str)
+        else:
+            # if we never set exclude_subdirs, we might include everything by default
+            # or handle partial subdir logic. For now, let's assume everything is included
+            included.add(path_str)
+
+    return included
+
+
+def apply_included_paths(repo, included_paths, force=False):
+    """Apply the sparse-checkout inclusion set to the index and working tree.
+
+    This function updates skip-worktree bits in the index based on whether each
+    path is included or not. It then adds or removes files in the working tree
+    accordingly. If ``force=False``, files that have local modifications
+    will cause an error instead of being removed.
+
+    Args:
+      repo: A path to the repository or a Repo object.
+      included_paths: A set of paths (strings) that should remain included.
+      force: Whether to forcibly remove locally modified files (default False).
+
+    Returns:
+      None
+    """
+    index = repo.open_index()
+    normalizer = repo.get_blob_normalizer()
+
+    def local_modifications_exist(full_path, index_entry):
+        if not os.path.exists(full_path):
+            return False
+        try:
+            with open(full_path, "rb") as f:
+                disk_data = f.read()
+        except OSError:
+            return True
+        try:
+            blob = repo.object_store[index_entry.sha]
+        except KeyError:
+            return True
+        norm_data = normalizer.checkin_normalize(disk_data, full_path)
+        return norm_data != blob.data
+
+    # 1) Update skip-worktree bits
+    for path_bytes, entry in list(index.items()):
+        path_str = path_bytes.decode("utf-8")
+        if path_str in included_paths:
+            entry.set_skip_worktree(False)
+        else:
+            entry.set_skip_worktree(True)
+        index[path_bytes] = entry
+    index.write()
+
+    # 2) Reflect changes in the working tree
+    for path_bytes, entry in list(index.items()):
+        full_path = os.path.join(repo.path, path_bytes.decode("utf-8"))
+
+        if entry.skip_worktree:
+            # Excluded => remove if safe
+            if os.path.exists(full_path):
+                if not force and local_modifications_exist(full_path, entry):
+                    raise SparseCheckoutConflictError(
+                        f"Local modifications in {full_path} would be overwritten "
+                        "by sparse checkout. Use force=True to override."
+                    )
+                try:
+                    os.remove(full_path)
+                except IsADirectoryError:
+                    pass
+                except FileNotFoundError:
+                    pass
+        else:
+            # Included => materialize if missing
+            if not os.path.exists(full_path):
+                try:
+                    blob = repo.object_store[entry.sha]
+                except KeyError:
+                    raise BlobNotFoundError(
+                        f"Blob {entry.sha} not found for {path_bytes}."
+                    )
+                ensure_dir_exists(os.path.dirname(full_path))
+                with open(full_path, "wb") as f:
+                    f.write(blob.data)
+
+
+def parse_sparse_patterns(lines):
+    """Parse pattern lines from a sparse-checkout file (.git/info/sparse-checkout).
+
+    This simplified parser:
+      1. Strips comments (#...) and empty lines.
+      2. Returns a list of (pattern, is_negation, is_dir_only, anchored) tuples.
+
+    These lines are similar to .gitignore patterns but are used for sparse-checkout
+    logic. This function strips comments and blank lines, identifies negation,
+    anchoring, and directory-only markers, and returns data suitable for matching.
+
+    Example:
+      ``line = "/*.txt" -> ("/.txt", False, False, True)``
+      ``line = "!/docs/" -> ("/docs/", True, True, True)``
+      ``line = "mydir/" -> ("mydir/", False, True, False)`` not anchored, no leading "/"
+
+    Args:
+      lines: A list of raw lines (strings) from the sparse-checkout file.
+
+    Returns:
+      A list of tuples (pattern, negation, dir_only, anchored), representing
+      the essential details needed to perform matching.
+    """
+    results = []
+    for raw_line in lines:
+        line = raw_line.strip()
+        if not line or line.startswith("#"):
+            continue  # ignore comments and blank lines
+
+        negation = line.startswith("!")
+        if negation:
+            line = line[1:]  # remove leading '!'
+
+        anchored = line.startswith("/")
+        if anchored:
+            line = line[1:]  # remove leading '/'
+
+        # If pattern ends with '/', we consider it directory-only
+        # (like "docs/"). Real Git might treat it slightly differently,
+        # but we'll simplify and mark it as "dir_only" if it ends in "/".
+        dir_only = False
+        if line.endswith("/"):
+            dir_only = True
+            line = line[:-1]
+
+        results.append((line, negation, dir_only, anchored))
+    return results
+
+
+def match_gitignore_patterns(path_str, parsed_patterns, path_is_dir=False):
+    """Check whether a path is included based on .gitignore-style patterns.
+
+    This is a simplified approach that:
+      1. Iterates over patterns in order.
+      2. If a pattern matches, we set the "include" state depending on negation.
+      3. Later matches override earlier ones.
+
+    In a .gitignore sense, lines that do not start with '!' are "ignore" patterns,
+    lines that start with '!' are "unignore" (re-include). But in sparse checkout,
+    it's effectively reversed: a non-negation line is "include," negation is "exclude."
+    However, many flows still rely on the same final logic: the last matching pattern
+    decides "excluded" vs. "included."
+
+    We'll interpret "include" as returning True, "exclude" as returning False.
+
+    Each pattern can include negation (!), directory-only markers, or be anchored
+    to the start of the path. The last matching pattern determines whether the
+    path is ultimately included or excluded.
+
+    Args:
+      path_str: The path (string) to test.
+      parsed_patterns: A list of (pattern, negation, dir_only, anchored) tuples
+        as returned by parse_sparse_patterns.
+      path_is_dir: Whether to treat the path as a directory (default False).
+
+    Returns:
+      True if the path is included by the last matching pattern, False otherwise.
+    """
+    # Start by assuming "excluded" (like a .gitignore starts by including everything
+    # until matched, but for sparse-checkout we often treat unmatched as "excluded").
+    # We will flip if we match an "include" pattern.
+    is_included = False
+
+    for pattern, negation, dir_only, anchored in parsed_patterns:
+        forbidden_path = dir_only and not path_is_dir
+        if path_str == pattern:
+            if forbidden_path:
+                continue
+            else:
+                matched = True
+        else:
+            matched = False
+        # If dir_only is True and path_is_dir is False, we skip matching
+        if dir_only and not matched:
+            if path_str == pattern + "/":
+                matched = not forbidden_path
+            elif fnmatch(path_str, f"{pattern}/*"):
+                matched = True  # root subpath (anchored or unanchored)
+            elif not anchored:
+                matched = fnmatch(path_str, f"*/{pattern}/*")  # unanchored subpath
+
+        # If anchored is True, pattern should match from the start of path_str.
+        # If not anchored, we can match anywhere.
+        if anchored and not matched:
+            # We match from the beginning. For example, pattern = "docs"
+            # path_str = "docs/readme.md" -> start is "docs"
+            # We'll just do a prefix check or prefix + slash check
+            # Or you can do a partial fnmatch. We'll do a manual approach:
+            if pattern == "":
+                # Means it was just "/", which can happen if line was "/"
+                # That might represent top-level only?
+                # We'll skip for simplicity or treat it as a special case.
+                continue
+            elif path_str == pattern:
+                matched = True
+            elif path_str.startswith(pattern + "/"):
+                matched = True
+            else:
+                matched = False
+        elif not matched:
+            # Not anchored: we can do a simple wildcard match or a substring match.
+            # For simplicity, let's use Python's fnmatch:
+            matched = fnmatch(path_str, pattern) or fnmatch(path_str, f"*/{pattern}")
+
+        if matched:
+            # If negation is True, that means 'exclude'. If negation is False, 'include'.
+            is_included = not negation
+            # The last matching pattern overrides, so we continue checking until the end.
+
+    return is_included

+ 225 - 0
tests/test_porcelain.py

@@ -3911,3 +3911,228 @@ class SparseCheckoutTests(PorcelainTestCase):
         with open(sc_file) as f:
             lines = f.read().strip().split()
             self.assertIn("src/foo*.py", lines)
+
+
+class ConeModeTests(PorcelainTestCase):
+    """Provide integration tests for Dulwich's cone mode sparse checkout.
+
+    This test suite verifies the expected behavior for:
+      * cone_mode_init
+      * cone_mode_set
+      * cone_mode_add
+    Although Dulwich does not yet implement cone mode, these tests are
+    prepared in advance to guide future development.
+    """
+
+    def setUp(self):
+        """Set up a fresh repository for each test.
+
+        This method creates a new empty repo_path and Repo object
+        as provided by the PorcelainTestCase base class.
+        """
+        super().setUp()
+
+    def _commit_file(self, rel_path, content=b"contents"):
+        """Add a file at the given relative path and commit it.
+
+        Creates necessary directories, writes the file content,
+        stages, and commits. The commit message and author/committer
+        are also provided.
+        """
+        full_path = os.path.join(self.repo_path, rel_path)
+        os.makedirs(os.path.dirname(full_path), exist_ok=True)
+        with open(full_path, "wb") as f:
+            f.write(content)
+        porcelain.add(self.repo_path, paths=[full_path])
+        porcelain.commit(
+            self.repo_path,
+            message=b"Adding " + rel_path.encode("utf-8"),
+            author=b"Test Author <author@example.com>",
+            committer=b"Test Committer <committer@example.com>",
+        )
+
+    def _list_wtree_files(self):
+        """Return a set of all file paths relative to the repository root.
+
+        Walks the working tree, skipping the .git directory.
+        """
+        found_files = set()
+        for root, dirs, files in os.walk(self.repo_path):
+            if ".git" in dirs:
+                dirs.remove(".git")
+            for fn in files:
+                relp = os.path.relpath(os.path.join(root, fn), self.repo_path)
+                found_files.add(relp)
+        return found_files
+
+    def test_init_excludes_everything(self):
+        """Verify that cone_mode_init writes minimal patterns and empties the working tree.
+
+        Make some dummy files, commit them, then call cone_mode_init. Confirm
+        that the working tree is empty, the sparse-checkout file has the
+        minimal patterns (/*, !/*/), and the relevant config values are set.
+        """
+        self._commit_file("docs/readme.md", b"# doc\n")
+        self._commit_file("src/main.py", b"print('hello')\n")
+
+        porcelain.cone_mode_init(self.repo)
+
+        actual_files = self._list_wtree_files()
+        self.assertEqual(
+            set(),
+            actual_files,
+            "cone_mode_init did not exclude all files from the working tree.",
+        )
+
+        sp_path = os.path.join(self.repo_path, ".git", "info", "sparse-checkout")
+        with open(sp_path) as f:
+            lines = [ln.strip() for ln in f if ln.strip()]
+
+        self.assertIn("/*", lines)
+        self.assertIn("!/*/", lines)
+
+        config = self.repo.get_config()
+        self.assertEqual(config.get((b"core",), b"sparseCheckout"), b"true")
+        self.assertEqual(config.get((b"core",), b"sparseCheckoutCone"), b"true")
+
+    def test_set_specific_dirs(self):
+        """Verify that cone_mode_set overwrites the included directories to only the specified ones.
+
+        Initializes cone mode, commits some files, then calls cone_mode_set with
+        a list of directories. Expects that only those directories remain in the
+        working tree.
+        """
+        porcelain.cone_mode_init(self.repo)
+        self._commit_file("docs/readme.md", b"# doc\n")
+        self._commit_file("src/main.py", b"print('hello')\n")
+        self._commit_file("tests/test_foo.py", b"# tests\n")
+
+        # Everything is still excluded initially by init.
+
+        porcelain.cone_mode_set(self.repo, dirs=["docs", "src"])
+
+        actual_files = self._list_wtree_files()
+        expected_files = {
+            os.path.join("docs", "readme.md"),
+            os.path.join("src", "main.py"),
+        }
+        self.assertEqual(
+            expected_files,
+            actual_files,
+            "Did not see only the 'docs/' and 'src/' dirs in the working tree.",
+        )
+
+        sp_path = os.path.join(self.repo_path, ".git", "info", "sparse-checkout")
+        with open(sp_path) as f:
+            lines = [ln.strip() for ln in f if ln.strip()]
+
+        # For standard cone mode, we'd expect lines like:
+        #    /*           (include top-level files)
+        #    !/*/         (exclude subdirectories)
+        #    !/docs/      (re-include docs)
+        #    !/src/       (re-include src)
+        # Instead of the wildcard-based lines the old test used.
+        self.assertIn("/*", lines)
+        self.assertIn("!/*/", lines)
+        self.assertIn("/docs/", lines)
+        self.assertIn("/src/", lines)
+        self.assertNotIn("/tests/", lines)
+
+    def test_set_overwrites_old_dirs(self):
+        """Ensure that calling cone_mode_set again overwrites old includes.
+
+        Initializes cone mode, includes two directories, then calls
+        cone_mode_set again with a different directory to confirm the
+        new set of includes replaces the old.
+        """
+        porcelain.cone_mode_init(self.repo)
+        self._commit_file("docs/readme.md")
+        self._commit_file("src/main.py")
+        self._commit_file("tests/test_bar.py")
+
+        porcelain.cone_mode_set(self.repo, dirs=["docs", "src"])
+        self.assertEqual(
+            {os.path.join("docs", "readme.md"), os.path.join("src", "main.py")},
+            self._list_wtree_files(),
+        )
+
+        # Overwrite includes, now only 'tests'
+        porcelain.cone_mode_set(self.repo, dirs=["tests"], force=True)
+
+        actual_files = self._list_wtree_files()
+        expected_files = {os.path.join("tests", "test_bar.py")}
+        self.assertEqual(expected_files, actual_files)
+
+    def test_force_removal_of_local_mods(self):
+        """Confirm that force=True removes local changes in excluded paths.
+
+        cone_mode_init and cone_mode_set are called, a file is locally modified,
+        and then cone_mode_set is called again with force=True to exclude that path.
+        The excluded file should be removed with no CheckoutError.
+        """
+        porcelain.cone_mode_init(self.repo)
+        porcelain.cone_mode_set(self.repo, dirs=["docs"])
+
+        self._commit_file("docs/readme.md", b"Docs stuff\n")
+        self._commit_file("src/main.py", b"print('hello')\n")
+
+        # Modify src/main.py
+        with open(os.path.join(self.repo_path, "src/main.py"), "ab") as f:
+            f.write(b"extra line\n")
+
+        # Exclude src/ with force=True
+        porcelain.cone_mode_set(self.repo, dirs=["docs"], force=True)
+
+        actual_files = self._list_wtree_files()
+        expected_files = {os.path.join("docs", "readme.md")}
+        self.assertEqual(expected_files, actual_files)
+
+    def test_add_and_merge_dirs(self):
+        """Verify that cone_mode_add merges new directories instead of overwriting them.
+
+        After initializing cone mode and including a single directory, call
+        cone_mode_add with a new directory. Confirm that both directories
+        remain included. Repeat for an additional directory to ensure it
+        is merged, not overwritten.
+        """
+        porcelain.cone_mode_init(self.repo)
+        self._commit_file("docs/readme.md", b"# doc\n")
+        self._commit_file("src/main.py", b"print('hello')\n")
+        self._commit_file("tests/test_bar.py", b"# tests\n")
+
+        # Include "docs" only
+        porcelain.cone_mode_set(self.repo, dirs=["docs"])
+        self.assertEqual({os.path.join("docs", "readme.md")}, self._list_wtree_files())
+
+        # Add "src"
+        porcelain.cone_mode_add(self.repo, dirs=["src"])
+        actual_files = self._list_wtree_files()
+        self.assertEqual(
+            {os.path.join("docs", "readme.md"), os.path.join("src", "main.py")},
+            actual_files,
+        )
+
+        # Add "tests" as well
+        porcelain.cone_mode_add(self.repo, dirs=["tests"])
+        actual_files = self._list_wtree_files()
+        expected_files = {
+            os.path.join("docs", "readme.md"),
+            os.path.join("src", "main.py"),
+            os.path.join("tests", "test_bar.py"),
+        }
+        self.assertEqual(expected_files, actual_files)
+
+        # Check .git/info/sparse-checkout
+        sp_path = os.path.join(self.repo_path, ".git", "info", "sparse-checkout")
+        with open(sp_path) as f:
+            lines = [ln.strip() for ln in f if ln.strip()]
+
+        # Standard cone mode lines:
+        # "/*"    -> include top-level
+        # "!/*/"  -> exclude subdirectories
+        # "!/docs/", "!/src/", "!/tests/" -> re-include the directories we added
+        self.assertIn("/*", lines)
+        self.assertIn("!/*/", lines)
+        self.assertIn("/docs/", lines)
+        self.assertIn("/src/", lines)
+        self.assertIn("/tests/", lines)

+ 359 - 0
tests/test_sparse_patterns.py

@@ -0,0 +1,359 @@
+# test_sparse_patterns.py -- Sparse checkout (full and cone mode) pattern handling
+# Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+
+"""Tests for dulwich.sparse_patterns."""
+
+import os
+import shutil
+import tempfile
+import time
+
+from dulwich.index import IndexEntry
+from dulwich.repo import Repo
+from dulwich.sparse_patterns import (
+    BlobNotFoundError,
+    SparseCheckoutConflictError,
+    apply_included_paths,
+    compute_included_paths_cone,
+    compute_included_paths_full,
+    determine_included_paths,
+    match_gitignore_patterns,
+    parse_sparse_patterns,
+)
+
+from . import TestCase
+
+
+class ParseSparsePatternsTests(TestCase):
+    """Test parse_sparse_patterns function."""
+
+    def test_empty_and_comment_lines(self):
+        lines = [
+            "",
+            "# comment here",
+            "   ",
+            "# another comment",
+        ]
+        parsed = parse_sparse_patterns(lines)
+        self.assertEqual(parsed, [])
+
+    def test_simple_patterns(self):
+        lines = [
+            "*.py",
+            "!*.md",
+            "/docs/",
+            "!/docs/images/",
+        ]
+        parsed = parse_sparse_patterns(lines)
+        self.assertEqual(len(parsed), 4)
+
+        self.assertEqual(parsed[0], ("*.py", False, False, False))  # include *.py
+        self.assertEqual(parsed[1], ("*.md", True, False, False))  # exclude *.md
+        self.assertEqual(parsed[2], ("docs", False, True, True))  # anchored, dir_only
+        self.assertEqual(parsed[3], ("docs/images", True, True, True))
+
+    def test_trailing_slash_dir(self):
+        lines = [
+            "src/",
+        ]
+        parsed = parse_sparse_patterns(lines)
+        # "src/" => (pattern="src", negation=False, dir_only=True, anchored=False)
+        self.assertEqual(parsed, [("src", False, True, False)])
+
+    def test_negation_anchor(self):
+        lines = [
+            "!/foo.txt",
+        ]
+        parsed = parse_sparse_patterns(lines)
+        # => (pattern="foo.txt", negation=True, dir_only=False, anchored=True)
+        self.assertEqual(parsed, [("foo.txt", True, False, True)])
+
+
+class MatchGitignorePatternsTests(TestCase):
+    """Test the match_gitignore_patterns function."""
+
+    def test_no_patterns_returns_excluded(self):
+        """If no patterns are provided, by default we treat the path as excluded."""
+        self.assertFalse(match_gitignore_patterns("anyfile.py", []))
+
+    def test_last_match_wins(self):
+        """Checks that the last pattern to match determines included vs excluded."""
+        parsed = parse_sparse_patterns(
+            [
+                "*.py",  # include
+                "!foo.py",  # exclude
+            ]
+        )
+        # "foo.py" matches first pattern => included
+        # then matches second pattern => excluded
+        self.assertFalse(match_gitignore_patterns("foo.py", parsed))
+
+    def test_dir_only(self):
+        """A pattern with a trailing slash should only match directories and subdirectories."""
+        parsed = parse_sparse_patterns(["docs/"])
+        # Because we set path_is_dir=False, it won't match
+        self.assertTrue(
+            match_gitignore_patterns("docs/readme.md", parsed, path_is_dir=False)
+        )
+        self.assertTrue(match_gitignore_patterns("docs", parsed, path_is_dir=True))
+        # Even if the path name is "docs", if it's a file, won't match:
+        self.assertFalse(match_gitignore_patterns("docs", parsed, path_is_dir=False))
+
+    def test_anchored(self):
+        """Anchored patterns match from the start of the path only."""
+        parsed = parse_sparse_patterns(["/foo"])
+        self.assertTrue(match_gitignore_patterns("foo", parsed))
+        # But "some/foo" doesn't match because anchored requires start
+        self.assertFalse(match_gitignore_patterns("some/foo", parsed))
+
+    def test_unanchored_uses_fnmatch(self):
+        parsed = parse_sparse_patterns(["foo"])
+        self.assertTrue(match_gitignore_patterns("some/foo", parsed))
+        self.assertFalse(match_gitignore_patterns("some/bar", parsed))
+
+
+class ComputeIncludedPathsFullTests(TestCase):
+    """Test compute_included_paths_full using a real ephemeral repo index."""
+
+    def setUp(self):
+        super().setUp()
+        self.temp_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.temp_dir)
+        self.repo = Repo.init(self.temp_dir)
+
+    def _add_file_to_index(self, relpath, content=b"test"):
+        full = os.path.join(self.temp_dir, relpath)
+        os.makedirs(os.path.dirname(full), exist_ok=True)
+        with open(full, "wb") as f:
+            f.write(content)
+        # Stage in the index
+        self.repo.stage([relpath])
+
+    def test_basic_inclusion_exclusion(self):
+        """Given patterns, check correct set of included paths."""
+        self._add_file_to_index("foo.py", b"print(1)")
+        self._add_file_to_index("bar.md", b"markdown")
+        self._add_file_to_index("docs/readme", b"# docs")
+
+        lines = [
+            "*.py",  # include all .py
+            "!bar.*",  # exclude bar.md
+            "docs/",  # include docs dir
+        ]
+        included = compute_included_paths_full(self.repo, lines)
+        self.assertEqual(included, {"foo.py", "docs/readme"})
+
+
+class ComputeIncludedPathsConeTests(TestCase):
+    """Test compute_included_paths_cone with ephemeral repo to see included vs excluded."""
+
+    def setUp(self):
+        super().setUp()
+        self.temp_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.temp_dir)
+        self.repo = Repo.init(self.temp_dir)
+
+    def _add_file_to_index(self, relpath, content=b"test"):
+        full = os.path.join(self.temp_dir, relpath)
+        os.makedirs(os.path.dirname(full), exist_ok=True)
+        with open(full, "wb") as f:
+            f.write(content)
+        self.repo.stage([relpath])
+
+    def test_cone_mode_patterns(self):
+        """Simpler pattern handling in cone mode.
+
+        Lines in 'cone' style typically look like:
+          - /*     -> include top-level
+          - !/*/   -> exclude all subdirs
+          - /docs/ -> reinclude 'docs' directory
+        """
+        self._add_file_to_index("topfile", b"hi")
+        self._add_file_to_index("docs/readme.md", b"stuff")
+        self._add_file_to_index("lib/code.py", b"stuff")
+
+        lines = [
+            "/*",
+            "!/*/",
+            "/docs/",
+        ]
+        included = compute_included_paths_cone(self.repo, lines)
+        # top-level => includes 'topfile'
+        # subdirs => excluded, except docs/
+        self.assertEqual(included, {"topfile", "docs/readme.md"})
+
+    def test_no_exclude_subdirs(self):
+        """If lines never specify '!/*/', we include everything by default."""
+        self._add_file_to_index("topfile", b"hi")
+        self._add_file_to_index("docs/readme.md", b"stuff")
+        self._add_file_to_index("lib/code.py", b"stuff")
+
+        lines = [
+            "/*",  # top-level
+            "/docs/",  # re-include docs?
+        ]
+        included = compute_included_paths_cone(self.repo, lines)
+        # Because exclude_subdirs was never set, everything is included:
+        self.assertEqual(
+            included,
+            {"topfile", "docs/readme.md", "lib/code.py"},
+        )
+
+
+class DetermineIncludedPathsTests(TestCase):
+    """Test the top-level determine_included_paths function."""
+
+    def setUp(self):
+        super().setUp()
+        self.temp_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.temp_dir)
+        self.repo = Repo.init(self.temp_dir)
+
+    def _add_file_to_index(self, relpath):
+        path = os.path.join(self.temp_dir, relpath)
+        os.makedirs(os.path.dirname(path), exist_ok=True)
+        with open(path, "wb") as f:
+            f.write(b"data")
+        self.repo.stage([relpath])
+
+    def test_full_mode(self):
+        self._add_file_to_index("foo.py")
+        self._add_file_to_index("bar.md")
+
+        lines = ["*.py", "!bar.*"]
+        included = determine_included_paths(self.repo, lines, cone=False)
+        self.assertEqual(included, {"foo.py"})
+
+    def test_cone_mode(self):
+        self._add_file_to_index("topfile")
+        self._add_file_to_index("subdir/anotherfile")
+
+        lines = ["/*", "!/*/"]
+        included = determine_included_paths(self.repo, lines, cone=True)
+        self.assertEqual(included, {"topfile"})
+
+
+class ApplyIncludedPathsTests(TestCase):
+    """Integration tests for apply_included_paths, verifying skip-worktree bits and file removal."""
+
+    def setUp(self):
+        super().setUp()
+        self.temp_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.temp_dir)
+        self.repo = Repo.init(self.temp_dir)
+        # For testing local_modifications_exist logic, we'll need the normalizer
+        # plus some real content in the object store.
+
+    def _commit_blob(self, relpath, content=b"hello"):
+        """Create a blob object in object_store, stage an index entry for it."""
+        full = os.path.join(self.temp_dir, relpath)
+        os.makedirs(os.path.dirname(full), exist_ok=True)
+        with open(full, "wb") as f:
+            f.write(content)
+        self.repo.stage([relpath])
+        # Actually commit so the object is in the store
+        self.repo.do_commit(message=b"Commit " + relpath.encode())
+
+    def test_set_skip_worktree_bits(self):
+        """If a path is not in included_paths, skip_worktree bit is set."""
+        self._commit_blob("keep.py", b"print('keep')")
+        self._commit_blob("exclude.md", b"# exclude")
+
+        included = {"keep.py"}
+        apply_included_paths(self.repo, included_paths=included, force=False)
+
+        idx = self.repo.open_index()
+        self.assertIn(b"keep.py", idx)
+        self.assertFalse(idx[b"keep.py"].skip_worktree)
+
+        self.assertIn(b"exclude.md", idx)
+        self.assertTrue(idx[b"exclude.md"].skip_worktree)
+
+        # Also check that the exclude.md file was removed from the working tree
+        exclude_path = os.path.join(self.temp_dir, "exclude.md")
+        self.assertFalse(os.path.exists(exclude_path))
+
+    def test_conflict_with_local_modifications_no_force(self):
+        """If local modifications exist for an excluded path, raise SparseCheckoutConflictError."""
+        self._commit_blob("foo.txt", b"original")
+
+        # Modify foo.txt on disk
+        with open(os.path.join(self.temp_dir, "foo.txt"), "ab") as f:
+            f.write(b" local changes")
+
+        with self.assertRaises(SparseCheckoutConflictError):
+            apply_included_paths(self.repo, included_paths=set(), force=False)
+
+    def test_conflict_with_local_modifications_forced_removal(self):
+        """With force=True, we remove local modifications and skip_worktree the file."""
+        self._commit_blob("foo.txt", b"original")
+        with open(os.path.join(self.temp_dir, "foo.txt"), "ab") as f:
+            f.write(b" local changes")
+
+        # This time, pass force=True => file is removed
+        apply_included_paths(self.repo, included_paths=set(), force=True)
+
+        # Check skip-worktree in index
+        idx = self.repo.open_index()
+        self.assertTrue(idx[b"foo.txt"].skip_worktree)
+        # Working tree file removed
+        self.assertFalse(os.path.exists(os.path.join(self.temp_dir, "foo.txt")))
+
+    def test_materialize_included_file_if_missing(self):
+        """If a path is included but missing from disk, we restore it from the blob in the store."""
+        self._commit_blob("restored.txt", b"some content")
+        # Manually remove the file from the working tree
+        os.remove(os.path.join(self.temp_dir, "restored.txt"))
+
+        apply_included_paths(self.repo, included_paths={"restored.txt"}, force=False)
+        # Should have re-created "restored.txt" from the blob
+        self.assertTrue(os.path.exists(os.path.join(self.temp_dir, "restored.txt")))
+        with open(os.path.join(self.temp_dir, "restored.txt"), "rb") as f:
+            self.assertEqual(f.read(), b"some content")
+
+    def test_blob_not_found_raises(self):
+        """If the object store is missing the blob for an included path, raise BlobNotFoundError."""
+        # We'll create an entry in the index that references a nonexistent sha
+        idx = self.repo.open_index()
+        fake_sha = b"ab" * 20
+        e = IndexEntry(
+            ctime=(int(time.time()), 0),  # ctime (s, ns)
+            mtime=(int(time.time()), 0),  # mtime (s, ns)
+            dev=0,  # dev
+            ino=0,  # ino
+            mode=0o100644,  # mode
+            uid=0,  # uid
+            gid=0,  # gid
+            size=0,  # size
+            sha=fake_sha,  # sha
+            flags=0,  # flags
+            extended_flags=0,
+        )
+        e.set_skip_worktree(False)
+        e.sha = fake_sha
+        idx[(b"missing_file")] = e
+        idx.write()
+
+        with self.assertRaises(BlobNotFoundError):
+            apply_included_paths(
+                self.repo, included_paths={"missing_file"}, force=False
+            )