sparse_patterns.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. # sparse_patterns.py -- Sparse checkout pattern handling.
  2. # Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Sparse checkout pattern handling."""
  22. import os
  23. from fnmatch import fnmatch
  24. from typing import Any, Union, cast
  25. from .file import ensure_dir_exists
  26. from .index import IndexEntry
  27. from .repo import Repo
  28. class SparseCheckoutConflictError(Exception):
  29. """Raised when local modifications would be overwritten by a sparse checkout operation."""
  30. class BlobNotFoundError(Exception):
  31. """Raised when a requested blob is not found in the repository's object store."""
  32. def determine_included_paths(
  33. repo: Union[str, Repo], lines: list[str], cone: bool
  34. ) -> set[str]:
  35. """Determine which paths in the index should be included based on either
  36. a full-pattern match or a cone-mode approach.
  37. Args:
  38. repo: A path to the repository or a Repo object.
  39. lines: A list of pattern lines (strings) from sparse-checkout config.
  40. cone: A bool indicating cone mode.
  41. Returns:
  42. A set of included path strings.
  43. """
  44. if cone:
  45. return compute_included_paths_cone(repo, lines)
  46. else:
  47. return compute_included_paths_full(repo, lines)
  48. def compute_included_paths_full(repo: Union[str, Repo], lines: list[str]) -> set[str]:
  49. """Use .gitignore-style parsing and matching to determine included paths.
  50. Each file path in the index is tested against the parsed sparse patterns.
  51. If it matches the final (most recently applied) positive pattern, it is included.
  52. Args:
  53. repo: A path to the repository or a Repo object.
  54. lines: A list of pattern lines (strings) from sparse-checkout config.
  55. Returns:
  56. A set of included path strings.
  57. """
  58. parsed = parse_sparse_patterns(lines)
  59. if isinstance(repo, str):
  60. from .porcelain import open_repo
  61. repo_obj = open_repo(repo)
  62. else:
  63. repo_obj = repo
  64. index = repo_obj.open_index()
  65. included = set()
  66. for path_bytes, entry in index.items():
  67. path_str = path_bytes.decode("utf-8")
  68. # For .gitignore logic, match_gitignore_patterns returns True if 'included'
  69. if match_gitignore_patterns(path_str, parsed, path_is_dir=False):
  70. included.add(path_str)
  71. return included
  72. def compute_included_paths_cone(repo: Union[str, Repo], lines: list[str]) -> set[str]:
  73. """Implement a simplified 'cone' approach for sparse-checkout.
  74. By default, this can include top-level files, exclude all subdirectories,
  75. and re-include specified directories. The logic is less comprehensive than
  76. Git's built-in cone mode (recursive vs parent) and is essentially an implementation
  77. of the recursive cone mode.
  78. Args:
  79. repo: A path to the repository or a Repo object.
  80. lines: A list of pattern lines (strings), typically including entries like
  81. "/*", "!/*/", or "/mydir/".
  82. Returns:
  83. A set of included path strings.
  84. """
  85. include_top_level = False
  86. exclude_subdirs = False
  87. reinclude_dirs = set()
  88. for pat in lines:
  89. if pat == "/*":
  90. include_top_level = True
  91. elif pat == "!/*/":
  92. exclude_subdirs = True
  93. elif pat.startswith("/"):
  94. # strip leading '/' and trailing '/'
  95. d = pat.strip("/")
  96. if d:
  97. reinclude_dirs.add(d)
  98. if isinstance(repo, str):
  99. from .porcelain import open_repo
  100. repo_obj = open_repo(repo)
  101. else:
  102. repo_obj = repo
  103. index = repo_obj.open_index()
  104. included = set()
  105. for path_bytes, entry in index.items():
  106. path_str = path_bytes.decode("utf-8")
  107. # Check if this is top-level (no slash) or which top_dir it belongs to
  108. if "/" not in path_str:
  109. # top-level file
  110. if include_top_level:
  111. included.add(path_str)
  112. continue
  113. top_dir = path_str.split("/", 1)[0]
  114. if exclude_subdirs:
  115. # subdirs are excluded unless they appear in reinclude_dirs
  116. if top_dir in reinclude_dirs:
  117. included.add(path_str)
  118. else:
  119. # if we never set exclude_subdirs, we might include everything by default
  120. # or handle partial subdir logic. For now, let's assume everything is included
  121. included.add(path_str)
  122. return included
  123. def apply_included_paths(
  124. repo: Union[str, Repo], included_paths: set[str], force: bool = False
  125. ) -> None:
  126. """Apply the sparse-checkout inclusion set to the index and working tree.
  127. This function updates skip-worktree bits in the index based on whether each
  128. path is included or not. It then adds or removes files in the working tree
  129. accordingly. If ``force=False``, files that have local modifications
  130. will cause an error instead of being removed.
  131. Args:
  132. repo: A path to the repository or a Repo object.
  133. included_paths: A set of paths (strings) that should remain included.
  134. force: Whether to forcibly remove locally modified files (default False).
  135. Returns:
  136. None
  137. """
  138. if isinstance(repo, str):
  139. from .porcelain import open_repo
  140. repo_obj = open_repo(repo)
  141. else:
  142. repo_obj = repo
  143. index = repo_obj.open_index()
  144. if not hasattr(repo_obj, "get_blob_normalizer"):
  145. raise ValueError("Repository must support get_blob_normalizer")
  146. normalizer = repo_obj.get_blob_normalizer()
  147. def local_modifications_exist(full_path: str, index_entry: IndexEntry) -> bool:
  148. if not os.path.exists(full_path):
  149. return False
  150. with open(full_path, "rb") as f:
  151. disk_data = f.read()
  152. try:
  153. blob_obj = repo_obj.object_store[index_entry.sha]
  154. except KeyError:
  155. return True
  156. norm_data = normalizer.checkin_normalize(disk_data, full_path)
  157. from .objects import Blob
  158. if not isinstance(blob_obj, Blob):
  159. return True
  160. return norm_data != blob_obj.data
  161. # 1) Update skip-worktree bits
  162. for path_bytes, entry in list(index.items()):
  163. if not isinstance(entry, IndexEntry):
  164. continue # Skip conflicted entries
  165. path_str = path_bytes.decode("utf-8")
  166. if path_str in included_paths:
  167. entry.set_skip_worktree(False)
  168. else:
  169. entry.set_skip_worktree(True)
  170. index[path_bytes] = entry
  171. index.write()
  172. # 2) Reflect changes in the working tree
  173. for path_bytes, entry in list(index.items()):
  174. if not isinstance(entry, IndexEntry):
  175. continue # Skip conflicted entries
  176. if not hasattr(repo_obj, "path"):
  177. raise ValueError("Repository must have a path attribute")
  178. full_path = os.path.join(cast(Any, repo_obj).path, path_bytes.decode("utf-8"))
  179. if entry.skip_worktree:
  180. # Excluded => remove if safe
  181. if os.path.exists(full_path):
  182. if not force and local_modifications_exist(full_path, entry):
  183. raise SparseCheckoutConflictError(
  184. f"Local modifications in {full_path} would be overwritten "
  185. "by sparse checkout. Use force=True to override."
  186. )
  187. try:
  188. os.remove(full_path)
  189. except IsADirectoryError:
  190. pass
  191. except FileNotFoundError:
  192. pass
  193. except PermissionError:
  194. if not force:
  195. raise
  196. else:
  197. # Included => materialize if missing
  198. if not os.path.exists(full_path):
  199. try:
  200. blob = repo_obj.object_store[entry.sha]
  201. except KeyError:
  202. raise BlobNotFoundError(
  203. f"Blob {entry.sha.hex()} not found for {path_bytes.decode('utf-8')}."
  204. )
  205. ensure_dir_exists(os.path.dirname(full_path))
  206. from .objects import Blob
  207. with open(full_path, "wb") as f:
  208. if isinstance(blob, Blob):
  209. f.write(blob.data)
  210. def parse_sparse_patterns(lines: list[str]) -> list[tuple[str, bool, bool, bool]]:
  211. """Parse pattern lines from a sparse-checkout file (.git/info/sparse-checkout).
  212. This simplified parser:
  213. 1. Strips comments (#...) and empty lines.
  214. 2. Returns a list of (pattern, is_negation, is_dir_only, anchored) tuples.
  215. These lines are similar to .gitignore patterns but are used for sparse-checkout
  216. logic. This function strips comments and blank lines, identifies negation,
  217. anchoring, and directory-only markers, and returns data suitable for matching.
  218. Example:
  219. ``line = "/*.txt" -> ("/.txt", False, False, True)``
  220. ``line = "!/docs/" -> ("/docs/", True, True, True)``
  221. ``line = "mydir/" -> ("mydir/", False, True, False)`` not anchored, no leading "/"
  222. Args:
  223. lines: A list of raw lines (strings) from the sparse-checkout file.
  224. Returns:
  225. A list of tuples (pattern, negation, dir_only, anchored), representing
  226. the essential details needed to perform matching.
  227. """
  228. results = []
  229. for raw_line in lines:
  230. line = raw_line.strip()
  231. if not line or line.startswith("#"):
  232. continue # ignore comments and blank lines
  233. negation = line.startswith("!")
  234. if negation:
  235. line = line[1:] # remove leading '!'
  236. anchored = line.startswith("/")
  237. if anchored:
  238. line = line[1:] # remove leading '/'
  239. # If pattern ends with '/', we consider it directory-only
  240. # (like "docs/"). Real Git might treat it slightly differently,
  241. # but we'll simplify and mark it as "dir_only" if it ends in "/".
  242. dir_only = False
  243. if line.endswith("/"):
  244. dir_only = True
  245. line = line[:-1]
  246. results.append((line, negation, dir_only, anchored))
  247. return results
  248. def match_gitignore_patterns(
  249. path_str: str,
  250. parsed_patterns: list[tuple[str, bool, bool, bool]],
  251. path_is_dir: bool = False,
  252. ) -> bool:
  253. """Check whether a path is included based on .gitignore-style patterns.
  254. This is a simplified approach that:
  255. 1. Iterates over patterns in order.
  256. 2. If a pattern matches, we set the "include" state depending on negation.
  257. 3. Later matches override earlier ones.
  258. In a .gitignore sense, lines that do not start with '!' are "ignore" patterns,
  259. lines that start with '!' are "unignore" (re-include). But in sparse checkout,
  260. it's effectively reversed: a non-negation line is "include," negation is "exclude."
  261. However, many flows still rely on the same final logic: the last matching pattern
  262. decides "excluded" vs. "included."
  263. We'll interpret "include" as returning True, "exclude" as returning False.
  264. Each pattern can include negation (!), directory-only markers, or be anchored
  265. to the start of the path. The last matching pattern determines whether the
  266. path is ultimately included or excluded.
  267. Args:
  268. path_str: The path (string) to test.
  269. parsed_patterns: A list of (pattern, negation, dir_only, anchored) tuples
  270. as returned by parse_sparse_patterns.
  271. path_is_dir: Whether to treat the path as a directory (default False).
  272. Returns:
  273. True if the path is included by the last matching pattern, False otherwise.
  274. """
  275. # Start by assuming "excluded" (like a .gitignore starts by including everything
  276. # until matched, but for sparse-checkout we often treat unmatched as "excluded").
  277. # We will flip if we match an "include" pattern.
  278. is_included = False
  279. for pattern, negation, dir_only, anchored in parsed_patterns:
  280. forbidden_path = dir_only and not path_is_dir
  281. if path_str == pattern:
  282. if forbidden_path:
  283. continue
  284. else:
  285. matched = True
  286. else:
  287. matched = False
  288. # If dir_only is True and path_is_dir is False, we skip matching
  289. if dir_only and not matched:
  290. if path_str == pattern + "/":
  291. matched = not forbidden_path
  292. elif fnmatch(path_str, f"{pattern}/*"):
  293. matched = True # root subpath (anchored or unanchored)
  294. elif not anchored:
  295. matched = fnmatch(path_str, f"*/{pattern}/*") # unanchored subpath
  296. # If anchored is True, pattern should match from the start of path_str.
  297. # If not anchored, we can match anywhere.
  298. if anchored and not matched:
  299. # We match from the beginning. For example, pattern = "docs"
  300. # path_str = "docs/readme.md" -> start is "docs"
  301. # We'll just do a prefix check or prefix + slash check
  302. # Or you can do a partial fnmatch. We'll do a manual approach:
  303. if pattern == "":
  304. # Means it was just "/", which can happen if line was "/"
  305. # That might represent top-level only?
  306. # We'll skip for simplicity or treat it as a special case.
  307. continue
  308. elif path_str == pattern:
  309. matched = True
  310. elif path_str.startswith(pattern + "/"):
  311. matched = True
  312. else:
  313. matched = False
  314. elif not matched:
  315. # Not anchored: we can do a simple wildcard match or a substring match.
  316. # For simplicity, let's use Python's fnmatch:
  317. matched = fnmatch(path_str, pattern) or fnmatch(path_str, f"*/{pattern}")
  318. if matched:
  319. # If negation is True, that means 'exclude'. If negation is False, 'include'.
  320. is_included = not negation
  321. # The last matching pattern overrides, so we continue checking until the end.
  322. return is_included