2
0

test_sparse_patterns.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659
  1. # test_sparse_patterns.py -- Sparse checkout (full and cone mode) pattern handling
  2. # Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.sparse_patterns."""
  22. import os
  23. import shutil
  24. import tempfile
  25. import time
  26. from dulwich.filters import FilterBlobNormalizer, FilterRegistry
  27. from dulwich.index import IndexEntry
  28. from dulwich.objects import Blob
  29. from dulwich.repo import Repo
  30. from dulwich.sparse_patterns import (
  31. BlobNotFoundError,
  32. SparseCheckoutConflictError,
  33. apply_included_paths,
  34. compute_included_paths_cone,
  35. compute_included_paths_full,
  36. determine_included_paths,
  37. match_gitignore_patterns,
  38. parse_sparse_patterns,
  39. )
  40. from . import TestCase
  41. class ParseSparsePatternsTests(TestCase):
  42. """Test parse_sparse_patterns function."""
  43. def test_empty_and_comment_lines(self):
  44. lines = [
  45. "",
  46. "# comment here",
  47. " ",
  48. "# another comment",
  49. ]
  50. parsed = parse_sparse_patterns(lines)
  51. self.assertEqual(parsed, [])
  52. def test_simple_patterns(self):
  53. lines = [
  54. "*.py",
  55. "!*.md",
  56. "/docs/",
  57. "!/docs/images/",
  58. ]
  59. parsed = parse_sparse_patterns(lines)
  60. self.assertEqual(len(parsed), 4)
  61. self.assertEqual(parsed[0], ("*.py", False, False, False)) # include *.py
  62. self.assertEqual(parsed[1], ("*.md", True, False, False)) # exclude *.md
  63. self.assertEqual(parsed[2], ("docs", False, True, True)) # anchored, dir_only
  64. self.assertEqual(parsed[3], ("docs/images", True, True, True))
  65. def test_trailing_slash_dir(self):
  66. lines = [
  67. "src/",
  68. ]
  69. parsed = parse_sparse_patterns(lines)
  70. # "src/" => (pattern="src", negation=False, dir_only=True, anchored=False)
  71. self.assertEqual(parsed, [("src", False, True, False)])
  72. def test_negation_anchor(self):
  73. lines = [
  74. "!/foo.txt",
  75. ]
  76. parsed = parse_sparse_patterns(lines)
  77. # => (pattern="foo.txt", negation=True, dir_only=False, anchored=True)
  78. self.assertEqual(parsed, [("foo.txt", True, False, True)])
  79. class MatchGitignorePatternsTests(TestCase):
  80. """Test the match_gitignore_patterns function."""
  81. def test_no_patterns_returns_excluded(self):
  82. """If no patterns are provided, by default we treat the path as excluded."""
  83. self.assertFalse(match_gitignore_patterns("anyfile.py", []))
  84. def test_last_match_wins(self):
  85. """Checks that the last pattern to match determines included vs excluded."""
  86. parsed = parse_sparse_patterns(
  87. [
  88. "*.py", # include
  89. "!foo.py", # exclude
  90. ]
  91. )
  92. # "foo.py" matches first pattern => included
  93. # then matches second pattern => excluded
  94. self.assertFalse(match_gitignore_patterns("foo.py", parsed))
  95. def test_dir_only(self):
  96. """A pattern with a trailing slash should only match directories and subdirectories."""
  97. parsed = parse_sparse_patterns(["docs/"])
  98. # Because we set path_is_dir=False, it won't match
  99. self.assertTrue(
  100. match_gitignore_patterns("docs/readme.md", parsed, path_is_dir=False)
  101. )
  102. self.assertTrue(match_gitignore_patterns("docs", parsed, path_is_dir=True))
  103. # Even if the path name is "docs", if it's a file, won't match:
  104. self.assertFalse(match_gitignore_patterns("docs", parsed, path_is_dir=False))
  105. def test_anchored(self):
  106. """Anchored patterns match from the start of the path only."""
  107. parsed = parse_sparse_patterns(["/foo"])
  108. self.assertTrue(match_gitignore_patterns("foo", parsed))
  109. # But "some/foo" doesn't match because anchored requires start
  110. self.assertFalse(match_gitignore_patterns("some/foo", parsed))
  111. def test_unanchored_uses_fnmatch(self):
  112. parsed = parse_sparse_patterns(["foo"])
  113. self.assertTrue(match_gitignore_patterns("some/foo", parsed))
  114. self.assertFalse(match_gitignore_patterns("some/bar", parsed))
  115. def test_anchored_empty_pattern(self):
  116. """Test handling of empty pattern with anchoring (e.g., '/')."""
  117. parsed = parse_sparse_patterns(["/"])
  118. # Check the structure of the parsed empty pattern first
  119. self.assertEqual(parsed, [("", False, False, True)])
  120. # When the pattern is empty with anchoring, it's continued (skipped) in match_gitignore_patterns
  121. # for non-empty paths but for empty string it might match due to empty string comparisons
  122. self.assertFalse(match_gitignore_patterns("foo", parsed))
  123. # An empty string with empty pattern will match (implementation detail)
  124. self.assertTrue(match_gitignore_patterns("", parsed))
  125. def test_anchored_dir_only_exact_match(self):
  126. """Test anchored directory-only patterns with exact matching."""
  127. parsed = parse_sparse_patterns(["/docs/"])
  128. # Test with exact match "docs" and path_is_dir=True
  129. self.assertTrue(match_gitignore_patterns("docs", parsed, path_is_dir=True))
  130. # Test with "docs/" (exact match + trailing slash)
  131. self.assertTrue(match_gitignore_patterns("docs/", parsed, path_is_dir=True))
  132. def test_complex_anchored_patterns(self):
  133. """Test more complex anchored pattern matching."""
  134. parsed = parse_sparse_patterns(["/dir/subdir"])
  135. # Test exact match
  136. self.assertTrue(match_gitignore_patterns("dir/subdir", parsed))
  137. # Test subdirectory path
  138. self.assertTrue(match_gitignore_patterns("dir/subdir/file.txt", parsed))
  139. # Test non-matching path
  140. self.assertFalse(match_gitignore_patterns("otherdir/subdir", parsed))
  141. def test_pattern_matching_edge_cases(self):
  142. """Test various edge cases in pattern matching."""
  143. # Test exact equality with an anchored pattern
  144. parsed = parse_sparse_patterns(["/foo"])
  145. self.assertTrue(match_gitignore_patterns("foo", parsed))
  146. # Test with path_is_dir=True
  147. self.assertTrue(match_gitignore_patterns("foo", parsed, path_is_dir=True))
  148. # Test exact match with pattern with dir_only=True
  149. parsed = parse_sparse_patterns(["/bar/"])
  150. self.assertTrue(match_gitignore_patterns("bar", parsed, path_is_dir=True))
  151. # Test startswith match for anchored pattern
  152. parsed = parse_sparse_patterns(["/prefix"])
  153. self.assertTrue(
  154. match_gitignore_patterns("prefix/subdirectory/file.txt", parsed)
  155. )
  156. class ComputeIncludedPathsFullTests(TestCase):
  157. """Test compute_included_paths_full using a real ephemeral repo index."""
  158. def setUp(self):
  159. super().setUp()
  160. self.temp_dir = tempfile.mkdtemp()
  161. self.addCleanup(shutil.rmtree, self.temp_dir)
  162. self.repo = Repo.init(self.temp_dir)
  163. def _add_file_to_index(self, relpath, content=b"test"):
  164. full = os.path.join(self.temp_dir, relpath)
  165. os.makedirs(os.path.dirname(full), exist_ok=True)
  166. with open(full, "wb") as f:
  167. f.write(content)
  168. # Stage in the index
  169. self.repo.get_worktree().stage([relpath])
  170. def test_basic_inclusion_exclusion(self):
  171. """Given patterns, check correct set of included paths."""
  172. self._add_file_to_index("foo.py", b"print(1)")
  173. self._add_file_to_index("bar.md", b"markdown")
  174. self._add_file_to_index("docs/readme", b"# docs")
  175. lines = [
  176. "*.py", # include all .py
  177. "!bar.*", # exclude bar.md
  178. "docs/", # include docs dir
  179. ]
  180. included = compute_included_paths_full(self.repo.open_index(), lines)
  181. self.assertEqual(included, {"foo.py", "docs/readme"})
  182. def test_full_with_utf8_paths(self):
  183. """Test that UTF-8 encoded paths are handled correctly."""
  184. self._add_file_to_index("unicode/文件.txt", b"unicode content")
  185. self._add_file_to_index("unicode/другой.md", b"more unicode")
  186. # Include all text files
  187. lines = ["*.txt"]
  188. included = compute_included_paths_full(self.repo.open_index(), lines)
  189. self.assertEqual(included, {"unicode/文件.txt"})
  190. class ComputeIncludedPathsConeTests(TestCase):
  191. """Test compute_included_paths_cone with ephemeral repo to see included vs excluded."""
  192. def setUp(self):
  193. super().setUp()
  194. self.temp_dir = tempfile.mkdtemp()
  195. self.addCleanup(shutil.rmtree, self.temp_dir)
  196. self.repo = Repo.init(self.temp_dir)
  197. def _add_file_to_index(self, relpath, content=b"test"):
  198. full = os.path.join(self.temp_dir, relpath)
  199. os.makedirs(os.path.dirname(full), exist_ok=True)
  200. with open(full, "wb") as f:
  201. f.write(content)
  202. self.repo.get_worktree().stage([relpath])
  203. def test_cone_mode_patterns(self):
  204. """Simpler pattern handling in cone mode.
  205. Lines in 'cone' style typically look like:
  206. - /* -> include top-level
  207. - !/*/ -> exclude all subdirs
  208. - /docs/ -> reinclude 'docs' directory
  209. """
  210. self._add_file_to_index("topfile", b"hi")
  211. self._add_file_to_index("docs/readme.md", b"stuff")
  212. self._add_file_to_index("lib/code.py", b"stuff")
  213. lines = [
  214. "/*",
  215. "!/*/",
  216. "/docs/",
  217. ]
  218. included = compute_included_paths_cone(self.repo.open_index(), lines)
  219. # top-level => includes 'topfile'
  220. # subdirs => excluded, except docs/
  221. self.assertEqual(included, {"topfile", "docs/readme.md"})
  222. def test_cone_mode_with_empty_pattern(self):
  223. """Test cone mode with an empty reinclude directory."""
  224. self._add_file_to_index("topfile", b"hi")
  225. self._add_file_to_index("docs/readme.md", b"stuff")
  226. # Include an empty pattern that should be skipped
  227. lines = [
  228. "/*",
  229. "!/*/",
  230. "/", # This empty pattern should be skipped
  231. ]
  232. included = compute_included_paths_cone(self.repo.open_index(), lines)
  233. # Only topfile should be included since the empty pattern is skipped
  234. self.assertEqual(included, {"topfile"})
  235. def test_no_exclude_subdirs(self):
  236. """If lines never specify '!/*/', we include everything by default."""
  237. self._add_file_to_index("topfile", b"hi")
  238. self._add_file_to_index("docs/readme.md", b"stuff")
  239. self._add_file_to_index("lib/code.py", b"stuff")
  240. lines = [
  241. "/*", # top-level
  242. "/docs/", # re-include docs?
  243. ]
  244. included = compute_included_paths_cone(self.repo.open_index(), lines)
  245. # Because exclude_subdirs was never set, everything is included:
  246. self.assertEqual(
  247. included,
  248. {"topfile", "docs/readme.md", "lib/code.py"},
  249. )
  250. def test_only_reinclude_dirs(self):
  251. """Test cone mode when only reinclude directories are specified."""
  252. self._add_file_to_index("topfile", b"hi")
  253. self._add_file_to_index("docs/readme.md", b"stuff")
  254. self._add_file_to_index("lib/code.py", b"stuff")
  255. # Only specify reinclude_dirs, need to explicitly exclude subdirs
  256. lines = ["!/*/", "/docs/"]
  257. included = compute_included_paths_cone(self.repo.open_index(), lines)
  258. # Only docs/* should be included, not topfile or lib/*
  259. self.assertEqual(included, {"docs/readme.md"})
  260. def test_exclude_subdirs_no_toplevel(self):
  261. """Test with exclude_subdirs but without toplevel files."""
  262. self._add_file_to_index("topfile", b"hi")
  263. self._add_file_to_index("docs/readme.md", b"stuff")
  264. self._add_file_to_index("lib/code.py", b"stuff")
  265. # Only exclude subdirs and reinclude docs
  266. lines = ["!/*/", "/docs/"]
  267. included = compute_included_paths_cone(self.repo.open_index(), lines)
  268. # Only docs/* should be included since we didn't include top level
  269. self.assertEqual(included, {"docs/readme.md"})
  270. class DetermineIncludedPathsTests(TestCase):
  271. """Test the top-level determine_included_paths function."""
  272. def setUp(self):
  273. super().setUp()
  274. self.temp_dir = tempfile.mkdtemp()
  275. self.addCleanup(shutil.rmtree, self.temp_dir)
  276. self.repo = Repo.init(self.temp_dir)
  277. def _add_file_to_index(self, relpath):
  278. path = os.path.join(self.temp_dir, relpath)
  279. os.makedirs(os.path.dirname(path), exist_ok=True)
  280. with open(path, "wb") as f:
  281. f.write(b"data")
  282. self.repo.get_worktree().stage([relpath])
  283. def test_full_mode(self):
  284. self._add_file_to_index("foo.py")
  285. self._add_file_to_index("bar.md")
  286. lines = ["*.py", "!bar.*"]
  287. index = self.repo.open_index()
  288. included = determine_included_paths(index, lines, cone=False)
  289. self.assertEqual(included, {"foo.py"})
  290. def test_cone_mode(self):
  291. self._add_file_to_index("topfile")
  292. self._add_file_to_index("subdir/anotherfile")
  293. lines = ["/*", "!/*/"]
  294. index = self.repo.open_index()
  295. included = determine_included_paths(index, lines, cone=True)
  296. self.assertEqual(included, {"topfile"})
  297. class ApplyIncludedPathsTests(TestCase):
  298. """Integration tests for apply_included_paths, verifying skip-worktree bits and file removal."""
  299. def setUp(self):
  300. super().setUp()
  301. self.temp_dir = tempfile.mkdtemp()
  302. self.addCleanup(shutil.rmtree, self.temp_dir)
  303. self.repo = Repo.init(self.temp_dir)
  304. # For testing local_modifications_exist logic, we'll need the normalizer
  305. # plus some real content in the object store.
  306. def _commit_blob(self, relpath, content=b"hello"):
  307. """Create a blob object in object_store, stage an index entry for it."""
  308. full = os.path.join(self.temp_dir, relpath)
  309. os.makedirs(os.path.dirname(full), exist_ok=True)
  310. with open(full, "wb") as f:
  311. f.write(content)
  312. self.repo.get_worktree().stage([relpath])
  313. # Actually commit so the object is in the store
  314. self.repo.get_worktree().commit(
  315. message=b"Commit " + relpath.encode(),
  316. )
  317. def test_set_skip_worktree_bits(self):
  318. """If a path is not in included_paths, skip_worktree bit is set."""
  319. self._commit_blob("keep.py", b"print('keep')")
  320. self._commit_blob("exclude.md", b"# exclude")
  321. included = {"keep.py"}
  322. apply_included_paths(self.repo, included_paths=included, force=False)
  323. idx = self.repo.open_index()
  324. self.assertIn(b"keep.py", idx)
  325. self.assertFalse(idx[b"keep.py"].skip_worktree)
  326. self.assertIn(b"exclude.md", idx)
  327. self.assertTrue(idx[b"exclude.md"].skip_worktree)
  328. # Also check that the exclude.md file was removed from the working tree
  329. exclude_path = os.path.join(self.temp_dir, "exclude.md")
  330. self.assertFalse(os.path.exists(exclude_path))
  331. def test_conflict_with_local_modifications_no_force(self):
  332. """If local modifications exist for an excluded path, raise SparseCheckoutConflictError."""
  333. self._commit_blob("foo.txt", b"original")
  334. # Modify foo.txt on disk
  335. with open(os.path.join(self.temp_dir, "foo.txt"), "ab") as f:
  336. f.write(b" local changes")
  337. with self.assertRaises(SparseCheckoutConflictError):
  338. apply_included_paths(self.repo, included_paths=set(), force=False)
  339. def test_conflict_with_local_modifications_forced_removal(self):
  340. """With force=True, we remove local modifications and skip_worktree the file."""
  341. self._commit_blob("foo.txt", b"original")
  342. with open(os.path.join(self.temp_dir, "foo.txt"), "ab") as f:
  343. f.write(b" local changes")
  344. # This time, pass force=True => file is removed
  345. apply_included_paths(self.repo, included_paths=set(), force=True)
  346. # Check skip-worktree in index
  347. idx = self.repo.open_index()
  348. self.assertTrue(idx[b"foo.txt"].skip_worktree)
  349. # Working tree file removed
  350. self.assertFalse(os.path.exists(os.path.join(self.temp_dir, "foo.txt")))
  351. def test_materialize_included_file_if_missing(self):
  352. """If a path is included but missing from disk, we restore it from the blob in the store."""
  353. self._commit_blob("restored.txt", b"some content")
  354. # Manually remove the file from the working tree
  355. os.remove(os.path.join(self.temp_dir, "restored.txt"))
  356. apply_included_paths(self.repo, included_paths={"restored.txt"}, force=False)
  357. # Should have re-created "restored.txt" from the blob
  358. self.assertTrue(os.path.exists(os.path.join(self.temp_dir, "restored.txt")))
  359. with open(os.path.join(self.temp_dir, "restored.txt"), "rb") as f:
  360. self.assertEqual(f.read(), b"some content")
  361. def test_blob_not_found_raises(self):
  362. """If the object store is missing the blob for an included path, raise BlobNotFoundError."""
  363. # We'll create an entry in the index that references a nonexistent sha
  364. idx = self.repo.open_index()
  365. fake_sha = b"ab" * 20
  366. e = IndexEntry(
  367. ctime=(int(time.time()), 0), # ctime (s, ns)
  368. mtime=(int(time.time()), 0), # mtime (s, ns)
  369. dev=0, # dev
  370. ino=0, # ino
  371. mode=0o100644, # mode
  372. uid=0, # uid
  373. gid=0, # gid
  374. size=0, # size
  375. sha=fake_sha, # sha
  376. flags=0, # flags
  377. extended_flags=0,
  378. )
  379. e.set_skip_worktree(False)
  380. e.sha = fake_sha
  381. idx[(b"missing_file")] = e
  382. idx.write()
  383. with self.assertRaises(BlobNotFoundError):
  384. apply_included_paths(
  385. self.repo, included_paths={"missing_file"}, force=False
  386. )
  387. def test_directory_removal(self):
  388. """Test handling of directories when removing excluded files."""
  389. # Create a directory with a file
  390. dir_path = os.path.join(self.temp_dir, "dir")
  391. os.makedirs(dir_path, exist_ok=True)
  392. self._commit_blob("dir/file.txt", b"content")
  393. # Make sure it exists before we proceed
  394. self.assertTrue(os.path.exists(os.path.join(dir_path, "file.txt")))
  395. # Exclude everything
  396. apply_included_paths(self.repo, included_paths=set(), force=True)
  397. # The file should be removed, but the directory might remain
  398. self.assertFalse(os.path.exists(os.path.join(dir_path, "file.txt")))
  399. # Test when file is actually a directory - should hit the IsADirectoryError case
  400. another_dir_path = os.path.join(self.temp_dir, "another_dir")
  401. os.makedirs(another_dir_path, exist_ok=True)
  402. self._commit_blob("another_dir/subfile.txt", b"content")
  403. # Create a path with the same name as the file but make it a dir to trigger IsADirectoryError
  404. subfile_dir_path = os.path.join(another_dir_path, "subfile.txt")
  405. if os.path.exists(subfile_dir_path):
  406. # Remove any existing file first
  407. os.remove(subfile_dir_path)
  408. os.makedirs(subfile_dir_path, exist_ok=True)
  409. # Attempt to apply sparse checkout, should trigger IsADirectoryError but not fail
  410. apply_included_paths(self.repo, included_paths=set(), force=True)
  411. def test_handling_removed_files(self):
  412. """Test that files already removed from disk are handled correctly during exclusion."""
  413. self._commit_blob("test_file.txt", b"test content")
  414. # Remove the file manually
  415. os.remove(os.path.join(self.temp_dir, "test_file.txt"))
  416. # Should not raise any errors when excluding this file
  417. apply_included_paths(self.repo, included_paths=set(), force=True)
  418. # Verify skip-worktree bit is set in index
  419. idx = self.repo.open_index()
  420. self.assertTrue(idx[b"test_file.txt"].skip_worktree)
  421. def test_local_modifications_ioerror(self):
  422. """Test handling of PermissionError/OSError when checking for local modifications."""
  423. import sys
  424. self._commit_blob("special_file.txt", b"content")
  425. file_path = os.path.join(self.temp_dir, "special_file.txt")
  426. # On Windows, chmod with 0 doesn't make files unreadable the same way
  427. # Skip this test on Windows as the permission model is different
  428. if sys.platform == "win32":
  429. self.skipTest("File permissions work differently on Windows")
  430. # Make the file unreadable on Unix-like systems
  431. os.chmod(file_path, 0)
  432. # Add a cleanup that checks if file exists first
  433. def safe_chmod_cleanup():
  434. if os.path.exists(file_path):
  435. try:
  436. os.chmod(file_path, 0o644)
  437. except (FileNotFoundError, PermissionError):
  438. pass
  439. self.addCleanup(safe_chmod_cleanup)
  440. # Should raise PermissionError with unreadable file and force=False
  441. with self.assertRaises((PermissionError, OSError)):
  442. apply_included_paths(self.repo, included_paths=set(), force=False)
  443. # With force=True, should remove the file anyway
  444. apply_included_paths(self.repo, included_paths=set(), force=True)
  445. # Verify file is gone and skip-worktree bit is set
  446. self.assertFalse(os.path.exists(file_path))
  447. idx = self.repo.open_index()
  448. self.assertTrue(idx[b"special_file.txt"].skip_worktree)
  449. def test_checkout_normalization_applied(self):
  450. """Test that checkout normalization is applied when materializing files during sparse checkout."""
  451. # Create a simple filter that converts content to uppercase
  452. class UppercaseFilter:
  453. def smudge(self, input_bytes, path=b""):
  454. return input_bytes.upper()
  455. def clean(self, input_bytes):
  456. return input_bytes.lower()
  457. # Set up filter registry and normalizer
  458. filter_registry = FilterRegistry()
  459. filter_registry.register_driver("uppercase", UppercaseFilter())
  460. # Create gitattributes object
  461. from dulwich.attrs import GitAttributes, Pattern
  462. patterns = [(Pattern(b"*.txt"), {b"filter": b"uppercase"})]
  463. gitattributes = GitAttributes(patterns)
  464. # Monkey patch the repo to use our filter registry
  465. original_get_blob_normalizer = self.repo.get_blob_normalizer
  466. def get_blob_normalizer_with_filters():
  467. return FilterBlobNormalizer(None, gitattributes, filter_registry)
  468. self.repo.get_blob_normalizer = get_blob_normalizer_with_filters
  469. # Commit a file with lowercase content
  470. self._commit_blob("test.txt", b"hello world")
  471. # Remove the file from working tree to force materialization
  472. os.remove(os.path.join(self.temp_dir, "test.txt"))
  473. # Apply sparse checkout
  474. apply_included_paths(self.repo, included_paths={"test.txt"}, force=False)
  475. # Verify file was materialized with uppercase content (checkout normalization applied)
  476. with open(os.path.join(self.temp_dir, "test.txt"), "rb") as f:
  477. content = f.read()
  478. self.assertEqual(content, b"HELLO WORLD")
  479. # Restore original method
  480. self.repo.get_blob_normalizer = original_get_blob_normalizer
  481. def test_checkout_normalization_with_lf_to_crlf(self):
  482. """Test that line ending normalization is applied during sparse checkout."""
  483. # Commit a file with LF line endings
  484. self._commit_blob("unix_file.txt", b"line1\nline2\nline3\n")
  485. # Remove the file from working tree
  486. os.remove(os.path.join(self.temp_dir, "unix_file.txt"))
  487. # Create a normalizer that converts LF to CRLF on checkout
  488. class CRLFNormalizer:
  489. def checkin_normalize(self, data, path):
  490. # For checkin, return unchanged
  491. return data
  492. def checkout_normalize(self, blob, path):
  493. if isinstance(blob, Blob):
  494. # Convert LF to CRLF
  495. new_blob = Blob()
  496. new_blob.data = blob.data.replace(b"\n", b"\r\n")
  497. return new_blob
  498. return blob
  499. # Monkey patch the repo to use our normalizer
  500. original_get_blob_normalizer = self.repo.get_blob_normalizer
  501. self.repo.get_blob_normalizer = lambda: CRLFNormalizer()
  502. # Apply sparse checkout
  503. apply_included_paths(self.repo, included_paths={"unix_file.txt"}, force=False)
  504. # Verify file was materialized with CRLF line endings
  505. with open(os.path.join(self.temp_dir, "unix_file.txt"), "rb") as f:
  506. content = f.read()
  507. self.assertEqual(content, b"line1\r\nline2\r\nline3\r\n")
  508. # Restore original method
  509. self.repo.get_blob_normalizer = original_get_blob_normalizer
  510. def test_checkout_normalization_not_applied_without_normalizer(self):
  511. """Test that when normalizer returns original blob, no transformation occurs."""
  512. # Commit a file with specific content
  513. original_content = b"original content\nwith newlines\n"
  514. self._commit_blob("no_norm.txt", original_content)
  515. # Remove the file from working tree
  516. os.remove(os.path.join(self.temp_dir, "no_norm.txt"))
  517. # Create a normalizer that returns blob unchanged
  518. class NoOpNormalizer:
  519. def checkin_normalize(self, data, path):
  520. return data
  521. def checkout_normalize(self, blob, path):
  522. # Return the blob unchanged
  523. return blob
  524. # Monkey patch the repo to use our no-op normalizer
  525. original_get_blob_normalizer = self.repo.get_blob_normalizer
  526. self.repo.get_blob_normalizer = lambda: NoOpNormalizer()
  527. # Apply sparse checkout
  528. apply_included_paths(self.repo, included_paths={"no_norm.txt"}, force=False)
  529. # Verify file was materialized with original content (no normalization)
  530. with open(os.path.join(self.temp_dir, "no_norm.txt"), "rb") as f:
  531. content = f.read()
  532. self.assertEqual(content, original_content)
  533. # Restore original method
  534. self.repo.get_blob_normalizer = original_get_blob_normalizer