test_sparse_patterns.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678
  1. # test_sparse_patterns.py -- Sparse checkout (full and cone mode) pattern handling
  2. # Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.sparse_patterns."""
  22. import os
  23. import shutil
  24. import tempfile
  25. import time
  26. from dulwich.index import IndexEntry
  27. from dulwich.objects import Blob
  28. from dulwich.repo import Repo
  29. from dulwich.sparse_patterns import (
  30. BlobNotFoundError,
  31. SparseCheckoutConflictError,
  32. apply_included_paths,
  33. compute_included_paths_cone,
  34. compute_included_paths_full,
  35. determine_included_paths,
  36. match_sparse_patterns,
  37. parse_sparse_patterns,
  38. )
  39. from . import TestCase
  40. class ParseSparsePatternsTests(TestCase):
  41. """Test parse_sparse_patterns function."""
  42. def test_empty_and_comment_lines(self):
  43. lines = [
  44. "",
  45. "# comment here",
  46. " ",
  47. "# another comment",
  48. ]
  49. parsed = parse_sparse_patterns(lines)
  50. self.assertEqual(parsed, [])
  51. def test_sparse_pattern_combos(self):
  52. lines = [
  53. "*.py", # Python files anywhere
  54. "!*.md", # markdown files anywhere
  55. "/docs/", # root docs dir
  56. "!/docs/images/", # no root docs/images subdir
  57. "src/", # src dir anywhere
  58. "/*.toml", # root TOML files
  59. "!/*.bak", # no root backup files
  60. "!data/", # no data dirs anywhere
  61. ]
  62. parsed = parse_sparse_patterns(lines)
  63. self.assertEqual(len(parsed), 8)
  64. # Returns a 4-tuple of: (pattern, negation, dir_only, anchored)
  65. self.assertEqual(parsed[0], ("*.py", False, False, False)) # _,_,_
  66. self.assertEqual(parsed[1], ("*.md", True, False, False)) # N,_,_
  67. self.assertEqual(parsed[2], ("docs", False, True, True)) # _,D,A
  68. self.assertEqual(parsed[3], ("docs/images", True, True, True)) # N,D,A
  69. self.assertEqual(parsed[4], ("src", False, True, False)) # _,D,_
  70. self.assertEqual(parsed[5], ("*.toml", False, False, True)) # _,_,A
  71. self.assertEqual(parsed[6], ("*.bak", True, False, True)) # N,_,A
  72. self.assertEqual(parsed[7], ("data", True, True, False)) # N,D,_
  73. class MatchSparsePatternsTests(TestCase):
  74. """Test the match_sparse_patterns function."""
  75. # def match_sparse_patterns(path_str, parsed_patterns, path_is_dir=False):
  76. def test_no_patterns_returns_excluded(self):
  77. """If no patterns are provided, by default we treat the path as excluded."""
  78. self.assertFalse(match_sparse_patterns("foo.py", [], path_is_dir=False))
  79. self.assertFalse(match_sparse_patterns("A/", [], path_is_dir=True))
  80. self.assertFalse(match_sparse_patterns("A/B/", [], path_is_dir=True))
  81. self.assertFalse(match_sparse_patterns("A/B/bar.md", [], path_is_dir=False))
  82. def test_last_match_wins(self):
  83. """Checks that the last pattern to match determines included vs excluded."""
  84. parsed = parse_sparse_patterns(
  85. [
  86. "*.py", # include
  87. "!foo.py", # exclude
  88. ]
  89. )
  90. # "foo.py" matches first pattern => included
  91. # then matches second pattern => excluded
  92. self.assertFalse(match_sparse_patterns("foo.py", parsed))
  93. self.assertFalse(match_sparse_patterns("A/foo.py", parsed))
  94. self.assertFalse(match_sparse_patterns("A/B/foo.py", parsed))
  95. self.assertTrue(match_sparse_patterns("bar.py", parsed))
  96. self.assertTrue(match_sparse_patterns("A/bar.py", parsed))
  97. self.assertTrue(match_sparse_patterns("A/B/bar.py", parsed))
  98. self.assertFalse(match_sparse_patterns("bar.md", parsed))
  99. self.assertFalse(match_sparse_patterns("A/bar.md", parsed))
  100. self.assertFalse(match_sparse_patterns("A/B", parsed, path_is_dir=True))
  101. self.assertFalse(match_sparse_patterns("A/B", parsed, path_is_dir=True))
  102. self.assertFalse(match_sparse_patterns(".cache", parsed, path_is_dir=True))
  103. def test_dir_only(self):
  104. """A pattern with a trailing slash should only match directories and subdirectories."""
  105. parsed = parse_sparse_patterns(["docs/"])
  106. # The directory pattern is not rooted, so can be at any level
  107. self.assertTrue(match_sparse_patterns("docs", parsed, path_is_dir=True))
  108. self.assertTrue(match_sparse_patterns("A/docs", parsed, path_is_dir=True))
  109. self.assertTrue(match_sparse_patterns("A/B/docs", parsed, path_is_dir=True))
  110. # Even if the path name is "docs", if it's a file, won't match:
  111. self.assertFalse(match_sparse_patterns("docs", parsed, path_is_dir=False))
  112. self.assertFalse(match_sparse_patterns("A/docs", parsed, path_is_dir=False))
  113. self.assertFalse(match_sparse_patterns("A/B/docs", parsed, path_is_dir=False))
  114. # Subfiles and subdirs of the included dir should match
  115. self.assertTrue(match_sparse_patterns("docs/x.md", parsed))
  116. self.assertTrue(match_sparse_patterns("docs/A/x.md", parsed))
  117. self.assertTrue(match_sparse_patterns("docs/A/B/x.md", parsed))
  118. self.assertTrue(match_sparse_patterns("docs/A", parsed, path_is_dir=True))
  119. self.assertTrue(match_sparse_patterns("docs/A/B", parsed, path_is_dir=True))
  120. self.assertTrue(match_sparse_patterns("docs", parsed, path_is_dir=True))
  121. def test_anchored(self):
  122. """Anchored patterns match from the start of the path only."""
  123. parsed = parse_sparse_patterns(["/foo"]) # Can be file or dir, must be at root
  124. self.assertTrue(match_sparse_patterns("foo", parsed))
  125. self.assertTrue(match_sparse_patterns("foo", parsed, path_is_dir=True))
  126. # But "some/foo" doesn't match because anchored requires start
  127. self.assertFalse(match_sparse_patterns("A/foo", parsed))
  128. self.assertFalse(match_sparse_patterns("A/foo", parsed, path_is_dir=True))
  129. def test_unanchored(self):
  130. parsed = parse_sparse_patterns(["foo"])
  131. self.assertTrue(match_sparse_patterns("foo", parsed))
  132. self.assertTrue(match_sparse_patterns("foo", parsed, path_is_dir=True))
  133. # But "some/foo" doesn't match because anchored requires start
  134. self.assertTrue(match_sparse_patterns("A/foo", parsed))
  135. self.assertTrue(match_sparse_patterns("A/foo", parsed, path_is_dir=True))
  136. self.assertFalse(match_sparse_patterns("bar", parsed))
  137. self.assertFalse(match_sparse_patterns("A/bar", parsed))
  138. def test_anchored_empty_pattern(self):
  139. """Test handling of empty pattern with anchoring (e.g., '/')."""
  140. # `/` should be recursive match of all files
  141. parsed = parse_sparse_patterns(["/"])
  142. self.assertEqual(parsed, [("", False, False, True)]) # anchored
  143. self.assertTrue(match_sparse_patterns("", parsed, path_is_dir=True))
  144. self.assertTrue(match_sparse_patterns("A", parsed, path_is_dir=True))
  145. self.assertTrue(match_sparse_patterns("A/B", parsed, path_is_dir=True))
  146. self.assertTrue(match_sparse_patterns("foo", parsed))
  147. self.assertTrue(match_sparse_patterns("A/foo", parsed))
  148. self.assertTrue(match_sparse_patterns("A/B/foo", parsed))
  149. def test_anchored_dir_only(self):
  150. """Test anchored directory-only patterns."""
  151. parsed = parse_sparse_patterns(["/docs/"])
  152. self.assertTrue(match_sparse_patterns("docs", parsed, path_is_dir=True))
  153. self.assertFalse(match_sparse_patterns("docs", parsed)) # file named docs
  154. self.assertFalse(match_sparse_patterns("A", parsed, path_is_dir=True))
  155. self.assertFalse(match_sparse_patterns("A/B", parsed, path_is_dir=True))
  156. self.assertFalse(match_sparse_patterns("A/docs", parsed, path_is_dir=True))
  157. self.assertFalse(match_sparse_patterns("A/docs", parsed))
  158. self.assertFalse(match_sparse_patterns("A/B/docs", parsed))
  159. self.assertFalse(match_sparse_patterns("A/B/docs", parsed, path_is_dir=True))
  160. def test_anchored_subpath(self):
  161. """Test anchored subpath pattern matching."""
  162. parsed = parse_sparse_patterns(["/A/B"])
  163. # TODO: should this also match the dir "A" (positively?)
  164. # self.assertTrue(match_sparse_patterns("A", parsed, path_is_dir=True))
  165. # self.assertFalse(match_sparse_patterns("A", parsed, path_is_dir=False))
  166. # Test exact match (both as file and dir, not dir-only pattern)
  167. self.assertTrue(match_sparse_patterns("A/B", parsed))
  168. self.assertTrue(match_sparse_patterns("A/B", parsed, path_is_dir=True))
  169. # Test subdirectory path (file and dir)
  170. self.assertTrue(match_sparse_patterns("A/B/file.txt", parsed))
  171. self.assertTrue(match_sparse_patterns("A/B/C", parsed, path_is_dir=True))
  172. # Test non-matching path
  173. self.assertFalse(match_sparse_patterns("X", parsed, path_is_dir=True))
  174. self.assertFalse(match_sparse_patterns("X/Y", parsed, path_is_dir=True))
  175. class ComputeIncludedPathsFullTests(TestCase):
  176. """Test compute_included_paths_full using a real ephemeral repo index."""
  177. def setUp(self):
  178. super().setUp()
  179. self.temp_dir = tempfile.mkdtemp()
  180. self.addCleanup(shutil.rmtree, self.temp_dir)
  181. self.repo = Repo.init(self.temp_dir)
  182. def _add_file_to_index(self, relpath, content=b"test"):
  183. full = os.path.join(self.temp_dir, relpath)
  184. os.makedirs(os.path.dirname(full), exist_ok=True)
  185. with open(full, "wb") as f:
  186. f.write(content)
  187. # Stage in the index
  188. self.repo.get_worktree().stage([relpath])
  189. def test_basic_inclusion_exclusion(self):
  190. """Given patterns, check correct set of included paths."""
  191. self._add_file_to_index("foo.py", b"print(1)")
  192. self._add_file_to_index("bar.md", b"markdown")
  193. self._add_file_to_index("docs/readme", b"# docs")
  194. lines = [
  195. "*.py", # include all .py
  196. "!bar.*", # exclude bar.md
  197. "docs/", # include docs dir
  198. ]
  199. included = compute_included_paths_full(self.repo.open_index(), lines)
  200. self.assertEqual(included, {"foo.py", "docs/readme"})
  201. def test_full_with_utf8_paths(self):
  202. """Test that UTF-8 encoded paths are handled correctly."""
  203. self._add_file_to_index("unicode/文件.txt", b"unicode content")
  204. self._add_file_to_index("unicode/другой.md", b"more unicode")
  205. # Include all text files
  206. lines = ["*.txt"]
  207. included = compute_included_paths_full(self.repo.open_index(), lines)
  208. self.assertEqual(included, {"unicode/文件.txt"})
  209. class ComputeIncludedPathsConeTests(TestCase):
  210. """Test compute_included_paths_cone with ephemeral repo to see included vs excluded."""
  211. def setUp(self):
  212. super().setUp()
  213. self.temp_dir = tempfile.mkdtemp()
  214. self.addCleanup(shutil.rmtree, self.temp_dir)
  215. self.repo = Repo.init(self.temp_dir)
  216. def _add_file_to_index(self, relpath, content=b"test"):
  217. full = os.path.join(self.temp_dir, relpath)
  218. os.makedirs(os.path.dirname(full), exist_ok=True)
  219. with open(full, "wb") as f:
  220. f.write(content)
  221. self.repo.get_worktree().stage([relpath])
  222. def test_cone_mode_patterns(self):
  223. """Simpler pattern handling in cone mode.
  224. Lines in 'cone' style typically look like:
  225. - /* -> include top-level
  226. - !/*/ -> exclude all subdirs
  227. - /docs/ -> reinclude 'docs' directory
  228. """
  229. self._add_file_to_index("topfile", b"hi")
  230. self._add_file_to_index("docs/readme.md", b"stuff")
  231. self._add_file_to_index("lib/code.py", b"stuff")
  232. lines = [
  233. "/*",
  234. "!/*/",
  235. "/docs/",
  236. ]
  237. included = compute_included_paths_cone(self.repo.open_index(), lines)
  238. # top-level => includes 'topfile'
  239. # subdirs => excluded, except docs/
  240. self.assertEqual(included, {"topfile", "docs/readme.md"})
  241. def test_cone_mode_with_empty_pattern(self):
  242. """Test cone mode with an empty reinclude directory."""
  243. self._add_file_to_index("topfile", b"hi")
  244. self._add_file_to_index("docs/readme.md", b"stuff")
  245. # Include an empty pattern that should be skipped
  246. lines = [
  247. "/*",
  248. "!/*/",
  249. "/", # This empty pattern should be skipped
  250. ]
  251. included = compute_included_paths_cone(self.repo.open_index(), lines)
  252. # Only topfile should be included since the empty pattern is skipped
  253. self.assertEqual(included, {"topfile"})
  254. def test_no_exclude_subdirs(self):
  255. """If lines never specify '!/*/', we include everything by default."""
  256. self._add_file_to_index("topfile", b"hi")
  257. self._add_file_to_index("docs/readme.md", b"stuff")
  258. self._add_file_to_index("lib/code.py", b"stuff")
  259. lines = [
  260. "/*", # top-level
  261. "/docs/", # re-include docs?
  262. ]
  263. included = compute_included_paths_cone(self.repo.open_index(), lines)
  264. # Because exclude_subdirs was never set, everything is included:
  265. self.assertEqual(
  266. included,
  267. {"topfile", "docs/readme.md", "lib/code.py"},
  268. )
  269. def test_only_reinclude_dirs(self):
  270. """Test cone mode when only reinclude directories are specified."""
  271. self._add_file_to_index("topfile", b"hi")
  272. self._add_file_to_index("docs/readme.md", b"stuff")
  273. self._add_file_to_index("lib/code.py", b"stuff")
  274. # Only specify reinclude_dirs, need to explicitly exclude subdirs
  275. lines = ["!/*/", "/docs/"]
  276. included = compute_included_paths_cone(self.repo.open_index(), lines)
  277. # Only docs/* should be included, not topfile or lib/*
  278. self.assertEqual(included, {"docs/readme.md"})
  279. def test_exclude_subdirs_no_toplevel(self):
  280. """Test with exclude_subdirs but without toplevel files."""
  281. self._add_file_to_index("topfile", b"hi")
  282. self._add_file_to_index("docs/readme.md", b"stuff")
  283. self._add_file_to_index("lib/code.py", b"stuff")
  284. # Only exclude subdirs and reinclude docs
  285. lines = ["!/*/", "/docs/"]
  286. included = compute_included_paths_cone(self.repo.open_index(), lines)
  287. # Only docs/* should be included since we didn't include top level
  288. self.assertEqual(included, {"docs/readme.md"})
  289. class DetermineIncludedPathsTests(TestCase):
  290. """Test the top-level determine_included_paths function."""
  291. def setUp(self):
  292. super().setUp()
  293. self.temp_dir = tempfile.mkdtemp()
  294. self.addCleanup(shutil.rmtree, self.temp_dir)
  295. self.repo = Repo.init(self.temp_dir)
  296. def _add_file_to_index(self, relpath):
  297. path = os.path.join(self.temp_dir, relpath)
  298. os.makedirs(os.path.dirname(path), exist_ok=True)
  299. with open(path, "wb") as f:
  300. f.write(b"data")
  301. self.repo.get_worktree().stage([relpath])
  302. def test_full_mode(self):
  303. self._add_file_to_index("foo.py")
  304. self._add_file_to_index("bar.md")
  305. lines = ["*.py", "!bar.*"]
  306. index = self.repo.open_index()
  307. included = determine_included_paths(index, lines, cone=False)
  308. self.assertEqual(included, {"foo.py"})
  309. def test_cone_mode(self):
  310. self._add_file_to_index("topfile")
  311. self._add_file_to_index("subdir/anotherfile")
  312. lines = ["/*", "!/*/"]
  313. index = self.repo.open_index()
  314. included = determine_included_paths(index, lines, cone=True)
  315. self.assertEqual(included, {"topfile"})
  316. class ApplyIncludedPathsTests(TestCase):
  317. """Integration tests for apply_included_paths, verifying skip-worktree bits and file removal."""
  318. def setUp(self):
  319. super().setUp()
  320. self.temp_dir = tempfile.mkdtemp()
  321. self.addCleanup(shutil.rmtree, self.temp_dir)
  322. self.repo = Repo.init(self.temp_dir)
  323. # For testing local_modifications_exist logic, we'll need the normalizer
  324. # plus some real content in the object store.
  325. def _commit_blob(self, relpath, content=b"hello"):
  326. """Create a blob object in object_store, stage an index entry for it."""
  327. full = os.path.join(self.temp_dir, relpath)
  328. os.makedirs(os.path.dirname(full), exist_ok=True)
  329. with open(full, "wb") as f:
  330. f.write(content)
  331. self.repo.get_worktree().stage([relpath])
  332. # Actually commit so the object is in the store
  333. self.repo.get_worktree().commit(
  334. message=b"Commit " + relpath.encode(),
  335. )
  336. def test_set_skip_worktree_bits(self):
  337. """If a path is not in included_paths, skip_worktree bit is set."""
  338. self._commit_blob("keep.py", b"print('keep')")
  339. self._commit_blob("exclude.md", b"# exclude")
  340. included = {"keep.py"}
  341. apply_included_paths(self.repo, included_paths=included, force=False)
  342. idx = self.repo.open_index()
  343. self.assertIn(b"keep.py", idx)
  344. self.assertFalse(idx[b"keep.py"].skip_worktree)
  345. self.assertIn(b"exclude.md", idx)
  346. self.assertTrue(idx[b"exclude.md"].skip_worktree)
  347. # Also check that the exclude.md file was removed from the working tree
  348. exclude_path = os.path.join(self.temp_dir, "exclude.md")
  349. self.assertFalse(os.path.exists(exclude_path))
  350. def test_conflict_with_local_modifications_no_force(self):
  351. """If local modifications exist for an excluded path, raise SparseCheckoutConflictError."""
  352. self._commit_blob("foo.txt", b"original")
  353. # Modify foo.txt on disk
  354. with open(os.path.join(self.temp_dir, "foo.txt"), "ab") as f:
  355. f.write(b" local changes")
  356. with self.assertRaises(SparseCheckoutConflictError):
  357. apply_included_paths(self.repo, included_paths=set(), force=False)
  358. def test_conflict_with_local_modifications_forced_removal(self):
  359. """With force=True, we remove local modifications and skip_worktree the file."""
  360. self._commit_blob("foo.txt", b"original")
  361. with open(os.path.join(self.temp_dir, "foo.txt"), "ab") as f:
  362. f.write(b" local changes")
  363. # This time, pass force=True => file is removed
  364. apply_included_paths(self.repo, included_paths=set(), force=True)
  365. # Check skip-worktree in index
  366. idx = self.repo.open_index()
  367. self.assertTrue(idx[b"foo.txt"].skip_worktree)
  368. # Working tree file removed
  369. self.assertFalse(os.path.exists(os.path.join(self.temp_dir, "foo.txt")))
  370. def test_materialize_included_file_if_missing(self):
  371. """If a path is included but missing from disk, we restore it from the blob in the store."""
  372. self._commit_blob("restored.txt", b"some content")
  373. # Manually remove the file from the working tree
  374. os.remove(os.path.join(self.temp_dir, "restored.txt"))
  375. apply_included_paths(self.repo, included_paths={"restored.txt"}, force=False)
  376. # Should have re-created "restored.txt" from the blob
  377. self.assertTrue(os.path.exists(os.path.join(self.temp_dir, "restored.txt")))
  378. with open(os.path.join(self.temp_dir, "restored.txt"), "rb") as f:
  379. self.assertEqual(f.read(), b"some content")
  380. def test_blob_not_found_raises(self):
  381. """If the object store is missing the blob for an included path, raise BlobNotFoundError."""
  382. # We'll create an entry in the index that references a nonexistent sha
  383. idx = self.repo.open_index()
  384. fake_sha = b"ab" * 20
  385. e = IndexEntry(
  386. ctime=(int(time.time()), 0), # ctime (s, ns)
  387. mtime=(int(time.time()), 0), # mtime (s, ns)
  388. dev=0, # dev
  389. ino=0, # ino
  390. mode=0o100644, # mode
  391. uid=0, # uid
  392. gid=0, # gid
  393. size=0, # size
  394. sha=fake_sha, # sha
  395. flags=0, # flags
  396. extended_flags=0,
  397. )
  398. e.set_skip_worktree(False)
  399. e.sha = fake_sha
  400. idx[(b"missing_file")] = e
  401. idx.write()
  402. with self.assertRaises(BlobNotFoundError):
  403. apply_included_paths(
  404. self.repo, included_paths={"missing_file"}, force=False
  405. )
  406. def test_directory_removal(self):
  407. """Test handling of directories when removing excluded files."""
  408. # Create a directory with a file
  409. dir_path = os.path.join(self.temp_dir, "dir")
  410. os.makedirs(dir_path, exist_ok=True)
  411. self._commit_blob("dir/file.txt", b"content")
  412. # Make sure it exists before we proceed
  413. self.assertTrue(os.path.exists(os.path.join(dir_path, "file.txt")))
  414. # Exclude everything
  415. apply_included_paths(self.repo, included_paths=set(), force=True)
  416. # The file should be removed, but the directory might remain
  417. self.assertFalse(os.path.exists(os.path.join(dir_path, "file.txt")))
  418. # Test when file is actually a directory - should hit the IsADirectoryError case
  419. another_dir_path = os.path.join(self.temp_dir, "another_dir")
  420. os.makedirs(another_dir_path, exist_ok=True)
  421. self._commit_blob("another_dir/subfile.txt", b"content")
  422. # Create a path with the same name as the file but make it a dir to trigger IsADirectoryError
  423. subfile_dir_path = os.path.join(another_dir_path, "subfile.txt")
  424. if os.path.exists(subfile_dir_path):
  425. # Remove any existing file first
  426. os.remove(subfile_dir_path)
  427. os.makedirs(subfile_dir_path, exist_ok=True)
  428. # Attempt to apply sparse checkout, should trigger IsADirectoryError but not fail
  429. apply_included_paths(self.repo, included_paths=set(), force=True)
  430. def test_handling_removed_files(self):
  431. """Test that files already removed from disk are handled correctly during exclusion."""
  432. self._commit_blob("test_file.txt", b"test content")
  433. # Remove the file manually
  434. os.remove(os.path.join(self.temp_dir, "test_file.txt"))
  435. # Should not raise any errors when excluding this file
  436. apply_included_paths(self.repo, included_paths=set(), force=True)
  437. # Verify skip-worktree bit is set in index
  438. idx = self.repo.open_index()
  439. self.assertTrue(idx[b"test_file.txt"].skip_worktree)
  440. def test_local_modifications_ioerror(self):
  441. """Test handling of PermissionError/OSError when checking for local modifications."""
  442. import sys
  443. self._commit_blob("special_file.txt", b"content")
  444. file_path = os.path.join(self.temp_dir, "special_file.txt")
  445. # On Windows, chmod with 0 doesn't make files unreadable the same way
  446. # Skip this test on Windows as the permission model is different
  447. if sys.platform == "win32":
  448. self.skipTest("File permissions work differently on Windows")
  449. # Make the file unreadable on Unix-like systems
  450. os.chmod(file_path, 0)
  451. # Add a cleanup that checks if file exists first
  452. def safe_chmod_cleanup():
  453. if os.path.exists(file_path):
  454. try:
  455. os.chmod(file_path, 0o644)
  456. except (FileNotFoundError, PermissionError):
  457. pass
  458. self.addCleanup(safe_chmod_cleanup)
  459. # Should raise PermissionError with unreadable file and force=False
  460. with self.assertRaises((PermissionError, OSError)):
  461. apply_included_paths(self.repo, included_paths=set(), force=False)
  462. # With force=True, should remove the file anyway
  463. apply_included_paths(self.repo, included_paths=set(), force=True)
  464. # Verify file is gone and skip-worktree bit is set
  465. self.assertFalse(os.path.exists(file_path))
  466. idx = self.repo.open_index()
  467. self.assertTrue(idx[b"special_file.txt"].skip_worktree)
  468. def test_checkout_normalization_applied(self):
  469. """Test that checkout normalization is applied when materializing files during sparse checkout."""
  470. # Create a simple filter that converts content to uppercase
  471. class UppercaseFilter:
  472. def smudge(self, input_bytes, path=b""):
  473. return input_bytes.upper()
  474. def clean(self, input_bytes):
  475. return input_bytes.lower()
  476. def cleanup(self):
  477. pass
  478. def reuse(self, config, filter_name):
  479. return False
  480. # Create .gitattributes file
  481. gitattributes_path = os.path.join(self.temp_dir, ".gitattributes")
  482. with open(gitattributes_path, "w") as f:
  483. f.write("*.txt filter=uppercase\n")
  484. # Add and commit .gitattributes
  485. self.repo.get_worktree().stage([b".gitattributes"])
  486. self.repo.get_worktree().commit(
  487. b"Add gitattributes", committer=b"Test <test@example.com>"
  488. )
  489. # Initialize the filter context and register the filter
  490. _ = self.repo.get_blob_normalizer()
  491. # Register the filter with the cached filter context
  492. uppercase_filter = UppercaseFilter()
  493. self.repo.filter_context.filter_registry.register_driver(
  494. "uppercase", uppercase_filter
  495. )
  496. # Commit a file with lowercase content
  497. self._commit_blob("test.txt", b"hello world")
  498. # Remove the file from working tree to force materialization
  499. os.remove(os.path.join(self.temp_dir, "test.txt"))
  500. # Apply sparse checkout - this will call get_blob_normalizer() internally
  501. # which will use the cached filter_context with our registered filter
  502. apply_included_paths(self.repo, included_paths={"test.txt"}, force=False)
  503. # Verify file was materialized with uppercase content (checkout normalization applied)
  504. with open(os.path.join(self.temp_dir, "test.txt"), "rb") as f:
  505. content = f.read()
  506. self.assertEqual(content, b"HELLO WORLD")
  507. def test_checkout_normalization_with_lf_to_crlf(self):
  508. """Test that line ending normalization is applied during sparse checkout."""
  509. # Commit a file with LF line endings
  510. self._commit_blob("unix_file.txt", b"line1\nline2\nline3\n")
  511. # Remove the file from working tree
  512. os.remove(os.path.join(self.temp_dir, "unix_file.txt"))
  513. # Create a normalizer that converts LF to CRLF on checkout
  514. class CRLFNormalizer:
  515. def checkin_normalize(self, data, path):
  516. # For checkin, return unchanged
  517. return data
  518. def checkout_normalize(self, blob, path):
  519. if isinstance(blob, Blob):
  520. # Convert LF to CRLF
  521. new_blob = Blob()
  522. new_blob.data = blob.data.replace(b"\n", b"\r\n")
  523. return new_blob
  524. return blob
  525. # Monkey patch the repo to use our normalizer
  526. original_get_blob_normalizer = self.repo.get_blob_normalizer
  527. self.repo.get_blob_normalizer = lambda: CRLFNormalizer()
  528. # Apply sparse checkout
  529. apply_included_paths(self.repo, included_paths={"unix_file.txt"}, force=False)
  530. # Verify file was materialized with CRLF line endings
  531. with open(os.path.join(self.temp_dir, "unix_file.txt"), "rb") as f:
  532. content = f.read()
  533. self.assertEqual(content, b"line1\r\nline2\r\nline3\r\n")
  534. # Restore original method
  535. self.repo.get_blob_normalizer = original_get_blob_normalizer
  536. def test_checkout_normalization_not_applied_without_normalizer(self):
  537. """Test that when normalizer returns original blob, no transformation occurs."""
  538. # Commit a file with specific content
  539. original_content = b"original content\nwith newlines\n"
  540. self._commit_blob("no_norm.txt", original_content)
  541. # Remove the file from working tree
  542. os.remove(os.path.join(self.temp_dir, "no_norm.txt"))
  543. # Create a normalizer that returns blob unchanged
  544. class NoOpNormalizer:
  545. def checkin_normalize(self, data, path):
  546. return data
  547. def checkout_normalize(self, blob, path):
  548. # Return the blob unchanged
  549. return blob
  550. # Monkey patch the repo to use our no-op normalizer
  551. original_get_blob_normalizer = self.repo.get_blob_normalizer
  552. self.repo.get_blob_normalizer = lambda: NoOpNormalizer()
  553. # Apply sparse checkout
  554. apply_included_paths(self.repo, included_paths={"no_norm.txt"}, force=False)
  555. # Verify file was materialized with original content (no normalization)
  556. with open(os.path.join(self.temp_dir, "no_norm.txt"), "rb") as f:
  557. content = f.read()
  558. self.assertEqual(content, original_content)
  559. # Restore original method
  560. self.repo.get_blob_normalizer = original_get_blob_normalizer