test_sparse_patterns.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. # test_sparse_patterns.py -- Sparse checkout (full and cone mode) pattern handling
  2. # Copyright (C) 2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for dulwich.sparse_patterns."""
  22. import os
  23. import shutil
  24. import tempfile
  25. import time
  26. from dulwich.filters import FilterBlobNormalizer, FilterRegistry
  27. from dulwich.index import IndexEntry
  28. from dulwich.objects import Blob
  29. from dulwich.repo import Repo
  30. from dulwich.sparse_patterns import (
  31. BlobNotFoundError,
  32. SparseCheckoutConflictError,
  33. apply_included_paths,
  34. compute_included_paths_cone,
  35. compute_included_paths_full,
  36. determine_included_paths,
  37. match_gitignore_patterns,
  38. parse_sparse_patterns,
  39. )
  40. from . import TestCase
  41. class ParseSparsePatternsTests(TestCase):
  42. """Test parse_sparse_patterns function."""
  43. def test_empty_and_comment_lines(self):
  44. lines = [
  45. "",
  46. "# comment here",
  47. " ",
  48. "# another comment",
  49. ]
  50. parsed = parse_sparse_patterns(lines)
  51. self.assertEqual(parsed, [])
  52. def test_simple_patterns(self):
  53. lines = [
  54. "*.py",
  55. "!*.md",
  56. "/docs/",
  57. "!/docs/images/",
  58. ]
  59. parsed = parse_sparse_patterns(lines)
  60. self.assertEqual(len(parsed), 4)
  61. self.assertEqual(parsed[0], ("*.py", False, False, False)) # include *.py
  62. self.assertEqual(parsed[1], ("*.md", True, False, False)) # exclude *.md
  63. self.assertEqual(parsed[2], ("docs", False, True, True)) # anchored, dir_only
  64. self.assertEqual(parsed[3], ("docs/images", True, True, True))
  65. def test_trailing_slash_dir(self):
  66. lines = [
  67. "src/",
  68. ]
  69. parsed = parse_sparse_patterns(lines)
  70. # "src/" => (pattern="src", negation=False, dir_only=True, anchored=False)
  71. self.assertEqual(parsed, [("src", False, True, False)])
  72. def test_negation_anchor(self):
  73. lines = [
  74. "!/foo.txt",
  75. ]
  76. parsed = parse_sparse_patterns(lines)
  77. # => (pattern="foo.txt", negation=True, dir_only=False, anchored=True)
  78. self.assertEqual(parsed, [("foo.txt", True, False, True)])
  79. class MatchGitignorePatternsTests(TestCase):
  80. """Test the match_gitignore_patterns function."""
  81. def test_no_patterns_returns_excluded(self):
  82. """If no patterns are provided, by default we treat the path as excluded."""
  83. self.assertFalse(match_gitignore_patterns("anyfile.py", []))
  84. def test_last_match_wins(self):
  85. """Checks that the last pattern to match determines included vs excluded."""
  86. parsed = parse_sparse_patterns(
  87. [
  88. "*.py", # include
  89. "!foo.py", # exclude
  90. ]
  91. )
  92. # "foo.py" matches first pattern => included
  93. # then matches second pattern => excluded
  94. self.assertFalse(match_gitignore_patterns("foo.py", parsed))
  95. def test_dir_only(self):
  96. """A pattern with a trailing slash should only match directories and subdirectories."""
  97. parsed = parse_sparse_patterns(["docs/"])
  98. # Because we set path_is_dir=False, it won't match
  99. self.assertTrue(
  100. match_gitignore_patterns("docs/readme.md", parsed, path_is_dir=False)
  101. )
  102. self.assertTrue(match_gitignore_patterns("docs", parsed, path_is_dir=True))
  103. # Even if the path name is "docs", if it's a file, won't match:
  104. self.assertFalse(match_gitignore_patterns("docs", parsed, path_is_dir=False))
  105. def test_anchored(self):
  106. """Anchored patterns match from the start of the path only."""
  107. parsed = parse_sparse_patterns(["/foo"])
  108. self.assertTrue(match_gitignore_patterns("foo", parsed))
  109. # But "some/foo" doesn't match because anchored requires start
  110. self.assertFalse(match_gitignore_patterns("some/foo", parsed))
  111. def test_unanchored_uses_fnmatch(self):
  112. parsed = parse_sparse_patterns(["foo"])
  113. self.assertTrue(match_gitignore_patterns("some/foo", parsed))
  114. self.assertFalse(match_gitignore_patterns("some/bar", parsed))
  115. def test_anchored_empty_pattern(self):
  116. """Test handling of empty pattern with anchoring (e.g., '/')."""
  117. parsed = parse_sparse_patterns(["/"])
  118. # Check the structure of the parsed empty pattern first
  119. self.assertEqual(parsed, [("", False, False, True)])
  120. # When the pattern is empty with anchoring, it's continued (skipped) in match_gitignore_patterns
  121. # for non-empty paths but for empty string it might match due to empty string comparisons
  122. self.assertFalse(match_gitignore_patterns("foo", parsed))
  123. # An empty string with empty pattern will match (implementation detail)
  124. self.assertTrue(match_gitignore_patterns("", parsed))
  125. def test_anchored_dir_only_exact_match(self):
  126. """Test anchored directory-only patterns with exact matching."""
  127. parsed = parse_sparse_patterns(["/docs/"])
  128. # Test with exact match "docs" and path_is_dir=True
  129. self.assertTrue(match_gitignore_patterns("docs", parsed, path_is_dir=True))
  130. # Test with "docs/" (exact match + trailing slash)
  131. self.assertTrue(match_gitignore_patterns("docs/", parsed, path_is_dir=True))
  132. def test_complex_anchored_patterns(self):
  133. """Test more complex anchored pattern matching."""
  134. parsed = parse_sparse_patterns(["/dir/subdir"])
  135. # Test exact match
  136. self.assertTrue(match_gitignore_patterns("dir/subdir", parsed))
  137. # Test subdirectory path
  138. self.assertTrue(match_gitignore_patterns("dir/subdir/file.txt", parsed))
  139. # Test non-matching path
  140. self.assertFalse(match_gitignore_patterns("otherdir/subdir", parsed))
  141. def test_pattern_matching_edge_cases(self):
  142. """Test various edge cases in pattern matching."""
  143. # Test exact equality with an anchored pattern
  144. parsed = parse_sparse_patterns(["/foo"])
  145. self.assertTrue(match_gitignore_patterns("foo", parsed))
  146. # Test with path_is_dir=True
  147. self.assertTrue(match_gitignore_patterns("foo", parsed, path_is_dir=True))
  148. # Test exact match with pattern with dir_only=True
  149. parsed = parse_sparse_patterns(["/bar/"])
  150. self.assertTrue(match_gitignore_patterns("bar", parsed, path_is_dir=True))
  151. # Test startswith match for anchored pattern
  152. parsed = parse_sparse_patterns(["/prefix"])
  153. self.assertTrue(
  154. match_gitignore_patterns("prefix/subdirectory/file.txt", parsed)
  155. )
  156. class ComputeIncludedPathsFullTests(TestCase):
  157. """Test compute_included_paths_full using a real ephemeral repo index."""
  158. def setUp(self):
  159. super().setUp()
  160. self.temp_dir = tempfile.mkdtemp()
  161. self.addCleanup(shutil.rmtree, self.temp_dir)
  162. self.repo = Repo.init(self.temp_dir)
  163. def _add_file_to_index(self, relpath, content=b"test"):
  164. full = os.path.join(self.temp_dir, relpath)
  165. os.makedirs(os.path.dirname(full), exist_ok=True)
  166. with open(full, "wb") as f:
  167. f.write(content)
  168. # Stage in the index
  169. self.repo.stage([relpath])
  170. def test_basic_inclusion_exclusion(self):
  171. """Given patterns, check correct set of included paths."""
  172. self._add_file_to_index("foo.py", b"print(1)")
  173. self._add_file_to_index("bar.md", b"markdown")
  174. self._add_file_to_index("docs/readme", b"# docs")
  175. lines = [
  176. "*.py", # include all .py
  177. "!bar.*", # exclude bar.md
  178. "docs/", # include docs dir
  179. ]
  180. included = compute_included_paths_full(self.repo, lines)
  181. self.assertEqual(included, {"foo.py", "docs/readme"})
  182. def test_full_with_utf8_paths(self):
  183. """Test that UTF-8 encoded paths are handled correctly."""
  184. self._add_file_to_index("unicode/文件.txt", b"unicode content")
  185. self._add_file_to_index("unicode/другой.md", b"more unicode")
  186. # Include all text files
  187. lines = ["*.txt"]
  188. included = compute_included_paths_full(self.repo, lines)
  189. self.assertEqual(included, {"unicode/文件.txt"})
  190. class ComputeIncludedPathsConeTests(TestCase):
  191. """Test compute_included_paths_cone with ephemeral repo to see included vs excluded."""
  192. def setUp(self):
  193. super().setUp()
  194. self.temp_dir = tempfile.mkdtemp()
  195. self.addCleanup(shutil.rmtree, self.temp_dir)
  196. self.repo = Repo.init(self.temp_dir)
  197. def _add_file_to_index(self, relpath, content=b"test"):
  198. full = os.path.join(self.temp_dir, relpath)
  199. os.makedirs(os.path.dirname(full), exist_ok=True)
  200. with open(full, "wb") as f:
  201. f.write(content)
  202. self.repo.stage([relpath])
  203. def test_cone_mode_patterns(self):
  204. """Simpler pattern handling in cone mode.
  205. Lines in 'cone' style typically look like:
  206. - /* -> include top-level
  207. - !/*/ -> exclude all subdirs
  208. - /docs/ -> reinclude 'docs' directory
  209. """
  210. self._add_file_to_index("topfile", b"hi")
  211. self._add_file_to_index("docs/readme.md", b"stuff")
  212. self._add_file_to_index("lib/code.py", b"stuff")
  213. lines = [
  214. "/*",
  215. "!/*/",
  216. "/docs/",
  217. ]
  218. included = compute_included_paths_cone(self.repo, lines)
  219. # top-level => includes 'topfile'
  220. # subdirs => excluded, except docs/
  221. self.assertEqual(included, {"topfile", "docs/readme.md"})
  222. def test_cone_mode_with_empty_pattern(self):
  223. """Test cone mode with an empty reinclude directory."""
  224. self._add_file_to_index("topfile", b"hi")
  225. self._add_file_to_index("docs/readme.md", b"stuff")
  226. # Include an empty pattern that should be skipped
  227. lines = [
  228. "/*",
  229. "!/*/",
  230. "/", # This empty pattern should be skipped
  231. ]
  232. included = compute_included_paths_cone(self.repo, lines)
  233. # Only topfile should be included since the empty pattern is skipped
  234. self.assertEqual(included, {"topfile"})
  235. def test_no_exclude_subdirs(self):
  236. """If lines never specify '!/*/', we include everything by default."""
  237. self._add_file_to_index("topfile", b"hi")
  238. self._add_file_to_index("docs/readme.md", b"stuff")
  239. self._add_file_to_index("lib/code.py", b"stuff")
  240. lines = [
  241. "/*", # top-level
  242. "/docs/", # re-include docs?
  243. ]
  244. included = compute_included_paths_cone(self.repo, lines)
  245. # Because exclude_subdirs was never set, everything is included:
  246. self.assertEqual(
  247. included,
  248. {"topfile", "docs/readme.md", "lib/code.py"},
  249. )
  250. def test_only_reinclude_dirs(self):
  251. """Test cone mode when only reinclude directories are specified."""
  252. self._add_file_to_index("topfile", b"hi")
  253. self._add_file_to_index("docs/readme.md", b"stuff")
  254. self._add_file_to_index("lib/code.py", b"stuff")
  255. # Only specify reinclude_dirs, need to explicitly exclude subdirs
  256. lines = ["!/*/", "/docs/"]
  257. included = compute_included_paths_cone(self.repo, lines)
  258. # Only docs/* should be included, not topfile or lib/*
  259. self.assertEqual(included, {"docs/readme.md"})
  260. def test_exclude_subdirs_no_toplevel(self):
  261. """Test with exclude_subdirs but without toplevel files."""
  262. self._add_file_to_index("topfile", b"hi")
  263. self._add_file_to_index("docs/readme.md", b"stuff")
  264. self._add_file_to_index("lib/code.py", b"stuff")
  265. # Only exclude subdirs and reinclude docs
  266. lines = ["!/*/", "/docs/"]
  267. included = compute_included_paths_cone(self.repo, lines)
  268. # Only docs/* should be included since we didn't include top level
  269. self.assertEqual(included, {"docs/readme.md"})
  270. class DetermineIncludedPathsTests(TestCase):
  271. """Test the top-level determine_included_paths function."""
  272. def setUp(self):
  273. super().setUp()
  274. self.temp_dir = tempfile.mkdtemp()
  275. self.addCleanup(shutil.rmtree, self.temp_dir)
  276. self.repo = Repo.init(self.temp_dir)
  277. def _add_file_to_index(self, relpath):
  278. path = os.path.join(self.temp_dir, relpath)
  279. os.makedirs(os.path.dirname(path), exist_ok=True)
  280. with open(path, "wb") as f:
  281. f.write(b"data")
  282. self.repo.stage([relpath])
  283. def test_full_mode(self):
  284. self._add_file_to_index("foo.py")
  285. self._add_file_to_index("bar.md")
  286. lines = ["*.py", "!bar.*"]
  287. included = determine_included_paths(self.repo, lines, cone=False)
  288. self.assertEqual(included, {"foo.py"})
  289. def test_cone_mode(self):
  290. self._add_file_to_index("topfile")
  291. self._add_file_to_index("subdir/anotherfile")
  292. lines = ["/*", "!/*/"]
  293. included = determine_included_paths(self.repo, lines, cone=True)
  294. self.assertEqual(included, {"topfile"})
  295. class ApplyIncludedPathsTests(TestCase):
  296. """Integration tests for apply_included_paths, verifying skip-worktree bits and file removal."""
  297. def setUp(self):
  298. super().setUp()
  299. self.temp_dir = tempfile.mkdtemp()
  300. self.addCleanup(shutil.rmtree, self.temp_dir)
  301. self.repo = Repo.init(self.temp_dir)
  302. # For testing local_modifications_exist logic, we'll need the normalizer
  303. # plus some real content in the object store.
  304. def _commit_blob(self, relpath, content=b"hello"):
  305. """Create a blob object in object_store, stage an index entry for it."""
  306. full = os.path.join(self.temp_dir, relpath)
  307. os.makedirs(os.path.dirname(full), exist_ok=True)
  308. with open(full, "wb") as f:
  309. f.write(content)
  310. self.repo.stage([relpath])
  311. # Actually commit so the object is in the store
  312. self.repo.do_commit(message=b"Commit " + relpath.encode())
  313. def test_set_skip_worktree_bits(self):
  314. """If a path is not in included_paths, skip_worktree bit is set."""
  315. self._commit_blob("keep.py", b"print('keep')")
  316. self._commit_blob("exclude.md", b"# exclude")
  317. included = {"keep.py"}
  318. apply_included_paths(self.repo, included_paths=included, force=False)
  319. idx = self.repo.open_index()
  320. self.assertIn(b"keep.py", idx)
  321. self.assertFalse(idx[b"keep.py"].skip_worktree)
  322. self.assertIn(b"exclude.md", idx)
  323. self.assertTrue(idx[b"exclude.md"].skip_worktree)
  324. # Also check that the exclude.md file was removed from the working tree
  325. exclude_path = os.path.join(self.temp_dir, "exclude.md")
  326. self.assertFalse(os.path.exists(exclude_path))
  327. def test_conflict_with_local_modifications_no_force(self):
  328. """If local modifications exist for an excluded path, raise SparseCheckoutConflictError."""
  329. self._commit_blob("foo.txt", b"original")
  330. # Modify foo.txt on disk
  331. with open(os.path.join(self.temp_dir, "foo.txt"), "ab") as f:
  332. f.write(b" local changes")
  333. with self.assertRaises(SparseCheckoutConflictError):
  334. apply_included_paths(self.repo, included_paths=set(), force=False)
  335. def test_conflict_with_local_modifications_forced_removal(self):
  336. """With force=True, we remove local modifications and skip_worktree the file."""
  337. self._commit_blob("foo.txt", b"original")
  338. with open(os.path.join(self.temp_dir, "foo.txt"), "ab") as f:
  339. f.write(b" local changes")
  340. # This time, pass force=True => file is removed
  341. apply_included_paths(self.repo, included_paths=set(), force=True)
  342. # Check skip-worktree in index
  343. idx = self.repo.open_index()
  344. self.assertTrue(idx[b"foo.txt"].skip_worktree)
  345. # Working tree file removed
  346. self.assertFalse(os.path.exists(os.path.join(self.temp_dir, "foo.txt")))
  347. def test_materialize_included_file_if_missing(self):
  348. """If a path is included but missing from disk, we restore it from the blob in the store."""
  349. self._commit_blob("restored.txt", b"some content")
  350. # Manually remove the file from the working tree
  351. os.remove(os.path.join(self.temp_dir, "restored.txt"))
  352. apply_included_paths(self.repo, included_paths={"restored.txt"}, force=False)
  353. # Should have re-created "restored.txt" from the blob
  354. self.assertTrue(os.path.exists(os.path.join(self.temp_dir, "restored.txt")))
  355. with open(os.path.join(self.temp_dir, "restored.txt"), "rb") as f:
  356. self.assertEqual(f.read(), b"some content")
  357. def test_blob_not_found_raises(self):
  358. """If the object store is missing the blob for an included path, raise BlobNotFoundError."""
  359. # We'll create an entry in the index that references a nonexistent sha
  360. idx = self.repo.open_index()
  361. fake_sha = b"ab" * 20
  362. e = IndexEntry(
  363. ctime=(int(time.time()), 0), # ctime (s, ns)
  364. mtime=(int(time.time()), 0), # mtime (s, ns)
  365. dev=0, # dev
  366. ino=0, # ino
  367. mode=0o100644, # mode
  368. uid=0, # uid
  369. gid=0, # gid
  370. size=0, # size
  371. sha=fake_sha, # sha
  372. flags=0, # flags
  373. extended_flags=0,
  374. )
  375. e.set_skip_worktree(False)
  376. e.sha = fake_sha
  377. idx[(b"missing_file")] = e
  378. idx.write()
  379. with self.assertRaises(BlobNotFoundError):
  380. apply_included_paths(
  381. self.repo, included_paths={"missing_file"}, force=False
  382. )
  383. def test_directory_removal(self):
  384. """Test handling of directories when removing excluded files."""
  385. # Create a directory with a file
  386. dir_path = os.path.join(self.temp_dir, "dir")
  387. os.makedirs(dir_path, exist_ok=True)
  388. self._commit_blob("dir/file.txt", b"content")
  389. # Make sure it exists before we proceed
  390. self.assertTrue(os.path.exists(os.path.join(dir_path, "file.txt")))
  391. # Exclude everything
  392. apply_included_paths(self.repo, included_paths=set(), force=True)
  393. # The file should be removed, but the directory might remain
  394. self.assertFalse(os.path.exists(os.path.join(dir_path, "file.txt")))
  395. # Test when file is actually a directory - should hit the IsADirectoryError case
  396. another_dir_path = os.path.join(self.temp_dir, "another_dir")
  397. os.makedirs(another_dir_path, exist_ok=True)
  398. self._commit_blob("another_dir/subfile.txt", b"content")
  399. # Create a path with the same name as the file but make it a dir to trigger IsADirectoryError
  400. subfile_dir_path = os.path.join(another_dir_path, "subfile.txt")
  401. if os.path.exists(subfile_dir_path):
  402. # Remove any existing file first
  403. os.remove(subfile_dir_path)
  404. os.makedirs(subfile_dir_path, exist_ok=True)
  405. # Attempt to apply sparse checkout, should trigger IsADirectoryError but not fail
  406. apply_included_paths(self.repo, included_paths=set(), force=True)
  407. def test_handling_removed_files(self):
  408. """Test that files already removed from disk are handled correctly during exclusion."""
  409. self._commit_blob("test_file.txt", b"test content")
  410. # Remove the file manually
  411. os.remove(os.path.join(self.temp_dir, "test_file.txt"))
  412. # Should not raise any errors when excluding this file
  413. apply_included_paths(self.repo, included_paths=set(), force=True)
  414. # Verify skip-worktree bit is set in index
  415. idx = self.repo.open_index()
  416. self.assertTrue(idx[b"test_file.txt"].skip_worktree)
  417. def test_local_modifications_ioerror(self):
  418. """Test handling of PermissionError/OSError when checking for local modifications."""
  419. import sys
  420. self._commit_blob("special_file.txt", b"content")
  421. file_path = os.path.join(self.temp_dir, "special_file.txt")
  422. # On Windows, chmod with 0 doesn't make files unreadable the same way
  423. # Skip this test on Windows as the permission model is different
  424. if sys.platform == "win32":
  425. self.skipTest("File permissions work differently on Windows")
  426. # Make the file unreadable on Unix-like systems
  427. os.chmod(file_path, 0)
  428. # Add a cleanup that checks if file exists first
  429. def safe_chmod_cleanup():
  430. if os.path.exists(file_path):
  431. try:
  432. os.chmod(file_path, 0o644)
  433. except (FileNotFoundError, PermissionError):
  434. pass
  435. self.addCleanup(safe_chmod_cleanup)
  436. # Should raise PermissionError with unreadable file and force=False
  437. with self.assertRaises((PermissionError, OSError)):
  438. apply_included_paths(self.repo, included_paths=set(), force=False)
  439. # With force=True, should remove the file anyway
  440. apply_included_paths(self.repo, included_paths=set(), force=True)
  441. # Verify file is gone and skip-worktree bit is set
  442. self.assertFalse(os.path.exists(file_path))
  443. idx = self.repo.open_index()
  444. self.assertTrue(idx[b"special_file.txt"].skip_worktree)
  445. def test_checkout_normalization_applied(self):
  446. """Test that checkout normalization is applied when materializing files during sparse checkout."""
  447. # Create a simple filter that converts content to uppercase
  448. class UppercaseFilter:
  449. def smudge(self, input_bytes):
  450. return input_bytes.upper()
  451. def clean(self, input_bytes):
  452. return input_bytes.lower()
  453. # Set up filter registry and normalizer
  454. filter_registry = FilterRegistry()
  455. filter_registry.register_driver("uppercase", UppercaseFilter())
  456. # Create gitattributes dict
  457. gitattributes = {b"*.txt": {b"filter": b"uppercase"}}
  458. # Monkey patch the repo to use our filter registry
  459. original_get_blob_normalizer = self.repo.get_blob_normalizer
  460. def get_blob_normalizer_with_filters():
  461. return FilterBlobNormalizer(None, gitattributes, filter_registry)
  462. self.repo.get_blob_normalizer = get_blob_normalizer_with_filters
  463. # Commit a file with lowercase content
  464. self._commit_blob("test.txt", b"hello world")
  465. # Remove the file from working tree to force materialization
  466. os.remove(os.path.join(self.temp_dir, "test.txt"))
  467. # Apply sparse checkout
  468. apply_included_paths(self.repo, included_paths={"test.txt"}, force=False)
  469. # Verify file was materialized with uppercase content (checkout normalization applied)
  470. with open(os.path.join(self.temp_dir, "test.txt"), "rb") as f:
  471. content = f.read()
  472. self.assertEqual(content, b"HELLO WORLD")
  473. # Restore original method
  474. self.repo.get_blob_normalizer = original_get_blob_normalizer
  475. def test_checkout_normalization_with_lf_to_crlf(self):
  476. """Test that line ending normalization is applied during sparse checkout."""
  477. # Commit a file with LF line endings
  478. self._commit_blob("unix_file.txt", b"line1\nline2\nline3\n")
  479. # Remove the file from working tree
  480. os.remove(os.path.join(self.temp_dir, "unix_file.txt"))
  481. # Create a normalizer that converts LF to CRLF on checkout
  482. class CRLFNormalizer:
  483. def checkin_normalize(self, data, path):
  484. # For checkin, return unchanged
  485. return data
  486. def checkout_normalize(self, blob, path):
  487. if isinstance(blob, Blob):
  488. # Convert LF to CRLF
  489. new_blob = Blob()
  490. new_blob.data = blob.data.replace(b"\n", b"\r\n")
  491. return new_blob
  492. return blob
  493. # Monkey patch the repo to use our normalizer
  494. original_get_blob_normalizer = self.repo.get_blob_normalizer
  495. self.repo.get_blob_normalizer = lambda: CRLFNormalizer()
  496. # Apply sparse checkout
  497. apply_included_paths(self.repo, included_paths={"unix_file.txt"}, force=False)
  498. # Verify file was materialized with CRLF line endings
  499. with open(os.path.join(self.temp_dir, "unix_file.txt"), "rb") as f:
  500. content = f.read()
  501. self.assertEqual(content, b"line1\r\nline2\r\nline3\r\n")
  502. # Restore original method
  503. self.repo.get_blob_normalizer = original_get_blob_normalizer
  504. def test_checkout_normalization_not_applied_without_normalizer(self):
  505. """Test that when normalizer returns original blob, no transformation occurs."""
  506. # Commit a file with specific content
  507. original_content = b"original content\nwith newlines\n"
  508. self._commit_blob("no_norm.txt", original_content)
  509. # Remove the file from working tree
  510. os.remove(os.path.join(self.temp_dir, "no_norm.txt"))
  511. # Create a normalizer that returns blob unchanged
  512. class NoOpNormalizer:
  513. def checkin_normalize(self, data, path):
  514. return data
  515. def checkout_normalize(self, blob, path):
  516. # Return the blob unchanged
  517. return blob
  518. # Monkey patch the repo to use our no-op normalizer
  519. original_get_blob_normalizer = self.repo.get_blob_normalizer
  520. self.repo.get_blob_normalizer = lambda: NoOpNormalizer()
  521. # Apply sparse checkout
  522. apply_included_paths(self.repo, included_paths={"no_norm.txt"}, force=False)
  523. # Verify file was materialized with original content (no normalization)
  524. with open(os.path.join(self.temp_dir, "no_norm.txt"), "rb") as f:
  525. content = f.read()
  526. self.assertEqual(content, original_content)
  527. # Restore original method
  528. self.repo.get_blob_normalizer = original_get_blob_normalizer