test_lfs.py 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083
  1. # test_lfs.py -- tests for LFS
  2. # Copyright (C) 2020 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for LFS support."""
  22. import json
  23. import os
  24. import shutil
  25. import tempfile
  26. from dulwich import porcelain
  27. from dulwich.lfs import LFSFilterDriver, LFSPointer, LFSStore
  28. from dulwich.repo import Repo
  29. from . import TestCase
  30. class LFSTests(TestCase):
  31. def setUp(self) -> None:
  32. super().setUp()
  33. self.test_dir = tempfile.mkdtemp()
  34. self.addCleanup(shutil.rmtree, self.test_dir)
  35. self.lfs = LFSStore.create(self.test_dir)
  36. def test_create(self) -> None:
  37. sha = self.lfs.write_object([b"a", b"b"])
  38. with self.lfs.open_object(sha) as f:
  39. self.assertEqual(b"ab", f.read())
  40. def test_missing(self) -> None:
  41. self.assertRaises(KeyError, self.lfs.open_object, "abcdeabcdeabcdeabcde")
  42. def test_write_object_empty(self) -> None:
  43. """Test writing an empty object."""
  44. sha = self.lfs.write_object([])
  45. with self.lfs.open_object(sha) as f:
  46. self.assertEqual(b"", f.read())
  47. def test_write_object_multiple_chunks(self) -> None:
  48. """Test writing an object with multiple chunks."""
  49. chunks = [b"chunk1", b"chunk2", b"chunk3"]
  50. sha = self.lfs.write_object(chunks)
  51. with self.lfs.open_object(sha) as f:
  52. self.assertEqual(b"".join(chunks), f.read())
  53. def test_sha_path_calculation(self) -> None:
  54. """Test the internal sha path calculation."""
  55. # The implementation splits the sha into parts for directory structure
  56. # Write and verify we can read it back
  57. sha = self.lfs.write_object([b"test data"])
  58. self.assertEqual(len(sha), 64) # SHA-256 is 64 hex chars
  59. # Open should succeed, which verifies the path calculation works
  60. with self.lfs.open_object(sha) as f:
  61. self.assertEqual(b"test data", f.read())
  62. def test_create_lfs_dir(self) -> None:
  63. """Test creating an LFS directory when it doesn't exist."""
  64. import os
  65. # Create a temporary directory for the test
  66. lfs_parent_dir = tempfile.mkdtemp()
  67. self.addCleanup(shutil.rmtree, lfs_parent_dir)
  68. # Create a path for the LFS directory
  69. lfs_dir = os.path.join(lfs_parent_dir, "lfs")
  70. # Create the LFS store
  71. LFSStore.create(lfs_dir)
  72. # Verify the directories were created
  73. self.assertTrue(os.path.isdir(lfs_dir))
  74. self.assertTrue(os.path.isdir(os.path.join(lfs_dir, "tmp")))
  75. self.assertTrue(os.path.isdir(os.path.join(lfs_dir, "objects")))
  76. class LFSPointerTests(TestCase):
  77. def test_from_bytes_valid(self) -> None:
  78. """Test parsing a valid LFS pointer."""
  79. pointer_data = (
  80. b"version https://git-lfs.github.com/spec/v1\n"
  81. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  82. b"size 0\n"
  83. )
  84. pointer = LFSPointer.from_bytes(pointer_data)
  85. self.assertIsNotNone(pointer)
  86. self.assertEqual(
  87. pointer.oid,
  88. "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
  89. )
  90. self.assertEqual(pointer.size, 0)
  91. def test_from_bytes_with_extra_fields(self) -> None:
  92. """Test parsing LFS pointer with extra fields (should still work)."""
  93. pointer_data = (
  94. b"version https://git-lfs.github.com/spec/v1\n"
  95. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  96. b"size 1234\n"
  97. b"x-custom-field value\n"
  98. )
  99. pointer = LFSPointer.from_bytes(pointer_data)
  100. self.assertIsNotNone(pointer)
  101. self.assertEqual(pointer.size, 1234)
  102. def test_from_bytes_invalid_version(self) -> None:
  103. """Test parsing with invalid version line."""
  104. pointer_data = (
  105. b"version https://invalid.com/spec/v1\n"
  106. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  107. b"size 0\n"
  108. )
  109. pointer = LFSPointer.from_bytes(pointer_data)
  110. self.assertIsNone(pointer)
  111. def test_from_bytes_missing_oid(self) -> None:
  112. """Test parsing with missing OID."""
  113. pointer_data = b"version https://git-lfs.github.com/spec/v1\nsize 0\n"
  114. pointer = LFSPointer.from_bytes(pointer_data)
  115. self.assertIsNone(pointer)
  116. def test_from_bytes_missing_size(self) -> None:
  117. """Test parsing with missing size."""
  118. pointer_data = (
  119. b"version https://git-lfs.github.com/spec/v1\n"
  120. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  121. )
  122. pointer = LFSPointer.from_bytes(pointer_data)
  123. self.assertIsNone(pointer)
  124. def test_from_bytes_invalid_size(self) -> None:
  125. """Test parsing with invalid size."""
  126. pointer_data = (
  127. b"version https://git-lfs.github.com/spec/v1\n"
  128. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  129. b"size not_a_number\n"
  130. )
  131. pointer = LFSPointer.from_bytes(pointer_data)
  132. self.assertIsNone(pointer)
  133. def test_from_bytes_binary_data(self) -> None:
  134. """Test parsing binary data (not an LFS pointer)."""
  135. binary_data = b"\x00\x01\x02\x03\x04"
  136. pointer = LFSPointer.from_bytes(binary_data)
  137. self.assertIsNone(pointer)
  138. def test_to_bytes(self) -> None:
  139. """Test converting LFS pointer to bytes."""
  140. pointer = LFSPointer(
  141. "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 1234
  142. )
  143. data = pointer.to_bytes()
  144. expected = (
  145. b"version https://git-lfs.github.com/spec/v1\n"
  146. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  147. b"size 1234\n"
  148. )
  149. self.assertEqual(data, expected)
  150. def test_round_trip(self) -> None:
  151. """Test converting to bytes and back."""
  152. original = LFSPointer(
  153. "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 9876
  154. )
  155. data = original.to_bytes()
  156. parsed = LFSPointer.from_bytes(data)
  157. self.assertIsNotNone(parsed)
  158. self.assertEqual(parsed.oid, original.oid)
  159. self.assertEqual(parsed.size, original.size)
  160. def test_is_valid_oid(self) -> None:
  161. """Test OID validation."""
  162. # Valid SHA256
  163. valid_pointer = LFSPointer(
  164. "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0
  165. )
  166. self.assertTrue(valid_pointer.is_valid_oid())
  167. # Too short
  168. short_pointer = LFSPointer("e3b0c44298fc1c14", 0)
  169. self.assertFalse(short_pointer.is_valid_oid())
  170. # Invalid hex characters
  171. invalid_pointer = LFSPointer(
  172. "g3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0
  173. )
  174. self.assertFalse(invalid_pointer.is_valid_oid())
  175. class LFSIntegrationTests(TestCase):
  176. """Integration tests for LFS with Git operations."""
  177. def setUp(self) -> None:
  178. super().setUp()
  179. import os
  180. from dulwich.repo import Repo
  181. # Create temporary directory for test repo
  182. self.test_dir = tempfile.mkdtemp()
  183. self.addCleanup(shutil.rmtree, self.test_dir)
  184. # Initialize repo
  185. self.repo = Repo.init(self.test_dir)
  186. self.lfs_dir = os.path.join(self.test_dir, ".git", "lfs")
  187. self.lfs_store = LFSStore.create(self.lfs_dir)
  188. def test_lfs_with_gitattributes(self) -> None:
  189. """Test LFS integration with .gitattributes."""
  190. import os
  191. # Create .gitattributes file
  192. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  193. with open(gitattributes_path, "wb") as f:
  194. f.write(b"*.bin filter=lfs diff=lfs merge=lfs -text\n")
  195. # Create a binary file
  196. bin_path = os.path.join(self.test_dir, "large.bin")
  197. large_content = b"Large binary content" * 1000
  198. with open(bin_path, "wb") as f:
  199. f.write(large_content)
  200. # Add files to repo
  201. self.repo.get_worktree().stage([".gitattributes", "large.bin"])
  202. # Get the blob for large.bin from the index
  203. index = self.repo.open_index()
  204. entry = index[b"large.bin"]
  205. blob = self.repo.object_store[entry.sha]
  206. # With LFS configured, the blob should contain an LFS pointer
  207. # (Note: This would require actual LFS filter integration in dulwich)
  208. # For now, we just verify the structure
  209. self.assertIsNotNone(blob)
  210. def test_lfs_checkout_missing_object(self) -> None:
  211. """Test checkout behavior when LFS object is missing."""
  212. from dulwich.objects import Blob, Commit, Tree
  213. # Create an LFS pointer blob
  214. pointer = LFSPointer(
  215. "0000000000000000000000000000000000000000000000000000000000000000", 1234
  216. )
  217. blob = Blob()
  218. blob.data = pointer.to_bytes()
  219. self.repo.object_store.add_object(blob)
  220. # Create tree with the blob
  221. tree = Tree()
  222. tree.add(b"missing.bin", 0o100644, blob.id)
  223. self.repo.object_store.add_object(tree)
  224. # Create commit
  225. commit = Commit()
  226. commit.tree = tree.id
  227. commit.message = b"Add missing LFS file"
  228. commit.author = commit.committer = b"Test User <test@example.com>"
  229. commit.commit_time = commit.author_time = 1234567890
  230. commit.commit_timezone = commit.author_timezone = 0
  231. self.repo.object_store.add_object(commit)
  232. # Update HEAD
  233. self.repo.refs[b"HEAD"] = commit.id
  234. # Checkout should leave pointer file when object is missing
  235. # (actual checkout would require more integration)
  236. def test_lfs_pointer_detection(self) -> None:
  237. """Test detection of LFS pointer files."""
  238. # Test various file contents
  239. test_cases = [
  240. # Valid LFS pointer
  241. (
  242. b"version https://git-lfs.github.com/spec/v1\n"
  243. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  244. b"size 1234\n",
  245. True,
  246. ),
  247. # Regular text file
  248. (b"This is a regular text file\n", False),
  249. # Binary file
  250. (b"\x00\x01\x02\x03\x04", False),
  251. # File that starts like pointer but isn't
  252. (b"version 1.0\nThis is not an LFS pointer\n", False),
  253. ]
  254. for content, expected_is_pointer in test_cases:
  255. pointer = LFSPointer.from_bytes(content)
  256. self.assertEqual(
  257. pointer is not None,
  258. expected_is_pointer,
  259. f"Failed for content: {content!r}",
  260. )
  261. def test_builtin_lfs_clone_no_config(self) -> None:
  262. """Test cloning with LFS when no git-lfs commands are configured."""
  263. # Create source repository
  264. source_dir = os.path.join(self.test_dir, "source")
  265. os.makedirs(source_dir)
  266. source_repo = Repo.init(source_dir)
  267. # Create empty config (no LFS commands)
  268. config = source_repo.get_config()
  269. config.write_to_path()
  270. # Create .gitattributes with LFS filter
  271. gitattributes_path = os.path.join(source_dir, ".gitattributes")
  272. with open(gitattributes_path, "wb") as f:
  273. f.write(b"*.bin filter=lfs\n")
  274. # Create test content and store in LFS
  275. test_content = b"Test binary content"
  276. test_oid = LFSStore.from_repo(source_repo, create=True).write_object(
  277. [test_content]
  278. )
  279. # Create LFS pointer file
  280. pointer = LFSPointer(test_oid, len(test_content))
  281. pointer_file = os.path.join(source_dir, "test.bin")
  282. with open(pointer_file, "wb") as f:
  283. f.write(pointer.to_bytes())
  284. # Commit files
  285. porcelain.add(source_repo, paths=[".gitattributes", "test.bin"])
  286. porcelain.commit(source_repo, message=b"Add LFS tracked file")
  287. source_repo.close()
  288. # Clone the repository
  289. target_dir = os.path.join(self.test_dir, "target")
  290. target_repo = porcelain.clone(source_dir, target_dir)
  291. # Verify no LFS commands in config
  292. target_config = target_repo.get_config_stack()
  293. with self.assertRaises(KeyError):
  294. target_config.get((b"filter", b"lfs"), b"smudge")
  295. # Check the cloned file
  296. cloned_file = os.path.join(target_dir, "test.bin")
  297. with open(cloned_file, "rb") as f:
  298. content = f.read()
  299. # Should still be a pointer (LFS object not in target's store)
  300. self.assertTrue(
  301. content.startswith(b"version https://git-lfs.github.com/spec/v1")
  302. )
  303. self.assertIn(test_oid.encode(), content)
  304. target_repo.close()
  305. def test_builtin_lfs_with_local_objects(self) -> None:
  306. """Test built-in LFS filter when objects are available locally."""
  307. # No LFS config
  308. config = self.repo.get_config()
  309. config.write_to_path()
  310. # Create .gitattributes
  311. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  312. with open(gitattributes_path, "wb") as f:
  313. f.write(b"*.dat filter=lfs\n")
  314. # Create LFS store and add object
  315. test_content = b"Hello from LFS!"
  316. lfs_store = LFSStore.from_repo(self.repo, create=True)
  317. test_oid = lfs_store.write_object([test_content])
  318. # Create pointer file
  319. pointer = LFSPointer(test_oid, len(test_content))
  320. pointer_file = os.path.join(self.test_dir, "data.dat")
  321. with open(pointer_file, "wb") as f:
  322. f.write(pointer.to_bytes())
  323. # Commit
  324. porcelain.add(self.repo, paths=[".gitattributes", "data.dat"])
  325. porcelain.commit(self.repo, message=b"Add LFS file")
  326. # Reset index to trigger checkout with filter
  327. self.repo.get_worktree().reset_index()
  328. # Check file content
  329. with open(pointer_file, "rb") as f:
  330. content = f.read()
  331. # Built-in filter should have converted pointer to actual content
  332. self.assertEqual(content, test_content)
  333. def test_builtin_lfs_filter_used(self) -> None:
  334. """Verify that built-in LFS filter is used when no config exists."""
  335. # Get filter registry
  336. normalizer = self.repo.get_blob_normalizer()
  337. filter_registry = normalizer.filter_registry
  338. lfs_driver = filter_registry.get_driver("lfs")
  339. # Should be built-in LFS filter
  340. self.assertIsInstance(lfs_driver, LFSFilterDriver)
  341. self.assertEqual(type(lfs_driver).__module__, "dulwich.lfs")
  342. class LFSFilterDriverTests(TestCase):
  343. def setUp(self) -> None:
  344. super().setUp()
  345. self.test_dir = tempfile.mkdtemp()
  346. self.addCleanup(shutil.rmtree, self.test_dir)
  347. self.lfs_store = LFSStore.create(self.test_dir)
  348. self.filter_driver = LFSFilterDriver(self.lfs_store)
  349. def test_clean_new_file(self) -> None:
  350. """Test clean filter on new file content."""
  351. content = b"This is a test file content"
  352. result = self.filter_driver.clean(content)
  353. # Result should be an LFS pointer
  354. pointer = LFSPointer.from_bytes(result)
  355. self.assertIsNotNone(pointer)
  356. self.assertEqual(pointer.size, len(content))
  357. # Content should be stored in LFS
  358. with self.lfs_store.open_object(pointer.oid) as f:
  359. self.assertEqual(f.read(), content)
  360. def test_clean_existing_pointer(self) -> None:
  361. """Test clean filter on already-pointer content."""
  362. # Create a pointer
  363. pointer = LFSPointer(
  364. "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 1234
  365. )
  366. pointer_data = pointer.to_bytes()
  367. # Clean should return the pointer unchanged
  368. result = self.filter_driver.clean(pointer_data)
  369. self.assertEqual(result, pointer_data)
  370. def test_smudge_valid_pointer(self) -> None:
  371. """Test smudge filter with valid pointer."""
  372. # Store some content
  373. content = b"This is the actual file content"
  374. sha = self.lfs_store.write_object([content])
  375. # Create pointer
  376. pointer = LFSPointer(sha, len(content))
  377. pointer_data = pointer.to_bytes()
  378. # Smudge should return the actual content
  379. result = self.filter_driver.smudge(pointer_data)
  380. self.assertEqual(result, content)
  381. def test_smudge_missing_object(self) -> None:
  382. """Test smudge filter with missing LFS object."""
  383. # Create pointer to non-existent object
  384. pointer = LFSPointer(
  385. "0000000000000000000000000000000000000000000000000000000000000000", 1234
  386. )
  387. pointer_data = pointer.to_bytes()
  388. # Smudge should return the pointer as-is when object is missing
  389. result = self.filter_driver.smudge(pointer_data)
  390. self.assertEqual(result, pointer_data)
  391. def test_smudge_non_pointer(self) -> None:
  392. """Test smudge filter on non-pointer content."""
  393. content = b"This is not an LFS pointer"
  394. # Smudge should return content unchanged
  395. result = self.filter_driver.smudge(content)
  396. self.assertEqual(result, content)
  397. def test_round_trip(self) -> None:
  398. """Test clean followed by smudge."""
  399. original_content = b"Round trip test content"
  400. # Clean (working tree -> repo)
  401. pointer_data = self.filter_driver.clean(original_content)
  402. # Verify it's a pointer
  403. pointer = LFSPointer.from_bytes(pointer_data)
  404. self.assertIsNotNone(pointer)
  405. # Smudge (repo -> working tree)
  406. restored_content = self.filter_driver.smudge(pointer_data)
  407. # Should get back the original content
  408. self.assertEqual(restored_content, original_content)
  409. def test_clean_empty_file(self) -> None:
  410. """Test clean filter on empty file."""
  411. content = b""
  412. result = self.filter_driver.clean(content)
  413. # Result should be an LFS pointer
  414. pointer = LFSPointer.from_bytes(result)
  415. self.assertIsNotNone(pointer)
  416. self.assertEqual(pointer.size, 0)
  417. # Empty content should be stored in LFS
  418. with self.lfs_store.open_object(pointer.oid) as f:
  419. self.assertEqual(f.read(), content)
  420. def test_clean_large_file(self) -> None:
  421. """Test clean filter on large file."""
  422. # Create a large file (1MB)
  423. content = b"x" * (1024 * 1024)
  424. result = self.filter_driver.clean(content)
  425. # Result should be an LFS pointer
  426. pointer = LFSPointer.from_bytes(result)
  427. self.assertIsNotNone(pointer)
  428. self.assertEqual(pointer.size, len(content))
  429. # Content should be stored in LFS
  430. with self.lfs_store.open_object(pointer.oid) as f:
  431. self.assertEqual(f.read(), content)
  432. def test_smudge_corrupt_pointer(self) -> None:
  433. """Test smudge filter with corrupt pointer data."""
  434. # Create corrupt pointer data
  435. corrupt_data = (
  436. b"version https://git-lfs.github.com/spec/v1\noid sha256:invalid\n"
  437. )
  438. # Smudge should return the data as-is
  439. result = self.filter_driver.smudge(corrupt_data)
  440. self.assertEqual(result, corrupt_data)
  441. def test_clean_unicode_content(self) -> None:
  442. """Test clean filter with unicode content."""
  443. # UTF-8 encoded unicode content
  444. content = "Hello 世界 🌍".encode()
  445. result = self.filter_driver.clean(content)
  446. # Result should be an LFS pointer
  447. pointer = LFSPointer.from_bytes(result)
  448. self.assertIsNotNone(pointer)
  449. # Content should be preserved exactly
  450. with self.lfs_store.open_object(pointer.oid) as f:
  451. self.assertEqual(f.read(), content)
  452. class LFSStoreEdgeCaseTests(TestCase):
  453. """Edge case tests for LFS store."""
  454. def setUp(self) -> None:
  455. super().setUp()
  456. self.test_dir = tempfile.mkdtemp()
  457. self.addCleanup(shutil.rmtree, self.test_dir)
  458. self.lfs = LFSStore.create(self.test_dir)
  459. def test_concurrent_writes(self) -> None:
  460. """Test that concurrent writes to same content work correctly."""
  461. content = b"duplicate content"
  462. # Write the same content multiple times
  463. sha1 = self.lfs.write_object([content])
  464. sha2 = self.lfs.write_object([content])
  465. # Should get the same SHA
  466. self.assertEqual(sha1, sha2)
  467. # Content should be stored only once
  468. with self.lfs.open_object(sha1) as f:
  469. self.assertEqual(f.read(), content)
  470. def test_write_with_generator(self) -> None:
  471. """Test writing object with generator chunks."""
  472. def chunk_generator():
  473. yield b"chunk1"
  474. yield b"chunk2"
  475. yield b"chunk3"
  476. sha = self.lfs.write_object(chunk_generator())
  477. # Verify content
  478. with self.lfs.open_object(sha) as f:
  479. self.assertEqual(f.read(), b"chunk1chunk2chunk3")
  480. def test_partial_write_rollback(self) -> None:
  481. """Test that partial writes don't leave artifacts."""
  482. import os
  483. # Count initial objects
  484. objects_dir = os.path.join(self.test_dir, "objects")
  485. initial_count = sum(len(files) for _, _, files in os.walk(objects_dir))
  486. # Try to write with a failing generator
  487. def failing_generator():
  488. yield b"chunk1"
  489. raise RuntimeError("Simulated error")
  490. # This should fail
  491. with self.assertRaises(RuntimeError):
  492. self.lfs.write_object(failing_generator())
  493. # No new objects should have been created
  494. final_count = sum(len(files) for _, _, files in os.walk(objects_dir))
  495. self.assertEqual(initial_count, final_count)
  496. class LFSPointerEdgeCaseTests(TestCase):
  497. """Edge case tests for LFS pointer parsing."""
  498. def test_pointer_with_windows_line_endings(self) -> None:
  499. """Test parsing pointer with Windows line endings."""
  500. pointer_data = (
  501. b"version https://git-lfs.github.com/spec/v1\r\n"
  502. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\r\n"
  503. b"size 1234\r\n"
  504. )
  505. pointer = LFSPointer.from_bytes(pointer_data)
  506. self.assertIsNotNone(pointer)
  507. self.assertEqual(pointer.size, 1234)
  508. def test_pointer_with_extra_whitespace(self) -> None:
  509. """Test parsing pointer with extra whitespace."""
  510. pointer_data = (
  511. b"version https://git-lfs.github.com/spec/v1 \n"
  512. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  513. b"size 1234 \n"
  514. )
  515. pointer = LFSPointer.from_bytes(pointer_data)
  516. self.assertIsNotNone(pointer)
  517. self.assertEqual(pointer.size, 1234)
  518. def test_pointer_case_sensitivity(self) -> None:
  519. """Test that pointer parsing is case sensitive."""
  520. # Version line must be exact
  521. pointer_data = (
  522. b"Version https://git-lfs.github.com/spec/v1\n" # Capital V
  523. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  524. b"size 1234\n"
  525. )
  526. pointer = LFSPointer.from_bytes(pointer_data)
  527. self.assertIsNone(pointer) # Should fail due to case
  528. def test_pointer_oid_formats(self) -> None:
  529. """Test different OID formats."""
  530. # SHA256 is currently the only supported format
  531. # Test SHA1 format (should fail)
  532. pointer_data = (
  533. b"version https://git-lfs.github.com/spec/v1\n"
  534. b"oid sha1:356a192b7913b04c54574d18c28d46e6395428ab\n" # SHA1
  535. b"size 1234\n"
  536. )
  537. pointer = LFSPointer.from_bytes(pointer_data)
  538. # This might be accepted but marked as invalid OID
  539. if pointer:
  540. self.assertFalse(pointer.is_valid_oid())
  541. def test_pointer_size_limits(self) -> None:
  542. """Test size value limits."""
  543. # Test with very large size
  544. pointer_data = (
  545. b"version https://git-lfs.github.com/spec/v1\n"
  546. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  547. b"size 999999999999999999\n" # Very large number
  548. )
  549. pointer = LFSPointer.from_bytes(pointer_data)
  550. self.assertIsNotNone(pointer)
  551. self.assertEqual(pointer.size, 999999999999999999)
  552. # Test with negative size (should fail)
  553. pointer_data = (
  554. b"version https://git-lfs.github.com/spec/v1\n"
  555. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  556. b"size -1\n"
  557. )
  558. pointer = LFSPointer.from_bytes(pointer_data)
  559. self.assertIsNone(pointer) # Should fail with negative size
  560. class LFSServerTests(TestCase):
  561. """Tests for the LFS server implementation."""
  562. def setUp(self) -> None:
  563. super().setUp()
  564. import threading
  565. from dulwich.lfs_server import run_lfs_server
  566. # Create temporary directory for LFS storage
  567. self.test_dir = tempfile.mkdtemp()
  568. self.addCleanup(shutil.rmtree, self.test_dir)
  569. # Start LFS server
  570. self.server, self.server_url = run_lfs_server(port=0, lfs_dir=self.test_dir)
  571. self.server_thread = threading.Thread(target=self.server.serve_forever)
  572. self.server_thread.daemon = True
  573. self.server_thread.start()
  574. self.addCleanup(self.server.shutdown)
  575. def test_server_batch_endpoint(self) -> None:
  576. """Test the batch endpoint directly."""
  577. from urllib.request import Request, urlopen
  578. # Create batch request
  579. batch_data = {
  580. "operation": "download",
  581. "transfers": ["basic"],
  582. "objects": [{"oid": "abc123", "size": 100}],
  583. }
  584. req = Request(
  585. f"{self.server_url}/objects/batch",
  586. data=json.dumps(batch_data).encode("utf-8"),
  587. headers={
  588. "Content-Type": "application/vnd.git-lfs+json",
  589. "Accept": "application/vnd.git-lfs+json",
  590. },
  591. method="POST",
  592. )
  593. with urlopen(req) as response:
  594. result = json.loads(response.read())
  595. self.assertIn("objects", result)
  596. self.assertEqual(len(result["objects"]), 1)
  597. self.assertEqual(result["objects"][0]["oid"], "abc123")
  598. self.assertIn("error", result["objects"][0]) # Object doesn't exist
  599. def test_server_upload_download(self) -> None:
  600. """Test uploading and downloading an object."""
  601. import hashlib
  602. from urllib.request import Request, urlopen
  603. test_content = b"test server content"
  604. test_oid = hashlib.sha256(test_content).hexdigest()
  605. # Get upload URL via batch
  606. batch_data = {
  607. "operation": "upload",
  608. "transfers": ["basic"],
  609. "objects": [{"oid": test_oid, "size": len(test_content)}],
  610. }
  611. req = Request(
  612. f"{self.server_url}/objects/batch",
  613. data=json.dumps(batch_data).encode("utf-8"),
  614. headers={
  615. "Content-Type": "application/vnd.git-lfs+json",
  616. "Accept": "application/vnd.git-lfs+json",
  617. },
  618. method="POST",
  619. )
  620. with urlopen(req) as response:
  621. batch_result = json.loads(response.read())
  622. upload_url = batch_result["objects"][0]["actions"]["upload"]["href"]
  623. # Upload the object
  624. upload_req = Request(
  625. upload_url,
  626. data=test_content,
  627. headers={"Content-Type": "application/octet-stream"},
  628. method="PUT",
  629. )
  630. with urlopen(upload_req) as response:
  631. self.assertEqual(response.status, 200)
  632. # Download the object
  633. download_batch_data = {
  634. "operation": "download",
  635. "transfers": ["basic"],
  636. "objects": [{"oid": test_oid, "size": len(test_content)}],
  637. }
  638. req = Request(
  639. f"{self.server_url}/objects/batch",
  640. data=json.dumps(download_batch_data).encode("utf-8"),
  641. headers={
  642. "Content-Type": "application/vnd.git-lfs+json",
  643. "Accept": "application/vnd.git-lfs+json",
  644. },
  645. method="POST",
  646. )
  647. with urlopen(req) as response:
  648. download_batch_result = json.loads(response.read())
  649. download_url = download_batch_result["objects"][0]["actions"]["download"][
  650. "href"
  651. ]
  652. # Download the object
  653. download_req = Request(download_url)
  654. with urlopen(download_req) as response:
  655. downloaded_content = response.read()
  656. self.assertEqual(downloaded_content, test_content)
  657. def test_server_verify_endpoint(self) -> None:
  658. """Test the verify endpoint."""
  659. import hashlib
  660. from urllib.error import HTTPError
  661. from urllib.request import Request, urlopen
  662. test_content = b"verify test"
  663. test_oid = hashlib.sha256(test_content).hexdigest()
  664. # First upload the object
  665. self.server.lfs_store.write_object([test_content])
  666. # Test verify for existing object
  667. verify_req = Request(
  668. f"{self.server_url}/objects/{test_oid}/verify",
  669. data=json.dumps({"oid": test_oid, "size": len(test_content)}).encode(
  670. "utf-8"
  671. ),
  672. headers={"Content-Type": "application/vnd.git-lfs+json"},
  673. method="POST",
  674. )
  675. with urlopen(verify_req) as response:
  676. self.assertEqual(response.status, 200)
  677. # Test verify for non-existent object
  678. fake_oid = "0" * 64
  679. verify_req = Request(
  680. f"{self.server_url}/objects/{fake_oid}/verify",
  681. data=json.dumps({"oid": fake_oid, "size": 100}).encode("utf-8"),
  682. headers={"Content-Type": "application/vnd.git-lfs+json"},
  683. method="POST",
  684. )
  685. with self.assertRaises(HTTPError) as cm:
  686. with urlopen(verify_req):
  687. pass
  688. self.assertEqual(cm.exception.code, 404)
  689. def test_server_invalid_endpoints(self) -> None:
  690. """Test invalid endpoints return 404."""
  691. from urllib.error import HTTPError
  692. from urllib.request import Request, urlopen
  693. # Test invalid GET endpoint
  694. with self.assertRaises(HTTPError) as cm:
  695. with urlopen(f"{self.server_url}/invalid"):
  696. pass
  697. self.assertEqual(cm.exception.code, 404)
  698. # Test invalid POST endpoint
  699. req = Request(f"{self.server_url}/invalid", data=b"test", method="POST")
  700. with self.assertRaises(HTTPError) as cm:
  701. with urlopen(req):
  702. pass
  703. self.assertEqual(cm.exception.code, 404)
  704. def test_server_batch_invalid_operation(self) -> None:
  705. """Test batch endpoint with invalid operation."""
  706. from urllib.error import HTTPError
  707. from urllib.request import Request, urlopen
  708. batch_data = {"operation": "invalid", "transfers": ["basic"], "objects": []}
  709. req = Request(
  710. f"{self.server_url}/objects/batch",
  711. data=json.dumps(batch_data).encode("utf-8"),
  712. headers={"Content-Type": "application/vnd.git-lfs+json"},
  713. method="POST",
  714. )
  715. with self.assertRaises(HTTPError) as cm:
  716. with urlopen(req):
  717. pass
  718. self.assertEqual(cm.exception.code, 400)
  719. def test_server_batch_missing_fields(self) -> None:
  720. """Test batch endpoint with missing required fields."""
  721. from urllib.request import Request, urlopen
  722. # Missing oid
  723. batch_data = {
  724. "operation": "download",
  725. "transfers": ["basic"],
  726. "objects": [{"size": 100}], # Missing oid
  727. }
  728. req = Request(
  729. f"{self.server_url}/objects/batch",
  730. data=json.dumps(batch_data).encode("utf-8"),
  731. headers={"Content-Type": "application/vnd.git-lfs+json"},
  732. method="POST",
  733. )
  734. with urlopen(req) as response:
  735. result = json.loads(response.read())
  736. self.assertIn("error", result["objects"][0])
  737. self.assertIn("Missing oid", result["objects"][0]["error"]["message"])
  738. def test_server_upload_oid_mismatch(self) -> None:
  739. """Test upload with OID mismatch."""
  740. from urllib.error import HTTPError
  741. from urllib.request import Request, urlopen
  742. # Upload with wrong OID
  743. upload_req = Request(
  744. f"{self.server_url}/objects/wrongoid123",
  745. data=b"test content",
  746. headers={"Content-Type": "application/octet-stream"},
  747. method="PUT",
  748. )
  749. with self.assertRaises(HTTPError) as cm:
  750. with urlopen(upload_req):
  751. pass
  752. self.assertEqual(cm.exception.code, 400)
  753. self.assertIn("OID mismatch", cm.exception.read().decode())
  754. def test_server_download_non_existent(self) -> None:
  755. """Test downloading non-existent object."""
  756. from urllib.error import HTTPError
  757. from urllib.request import urlopen
  758. fake_oid = "0" * 64
  759. with self.assertRaises(HTTPError) as cm:
  760. with urlopen(f"{self.server_url}/objects/{fake_oid}"):
  761. pass
  762. self.assertEqual(cm.exception.code, 404)
  763. def test_server_invalid_json(self) -> None:
  764. """Test batch endpoint with invalid JSON."""
  765. from urllib.error import HTTPError
  766. from urllib.request import Request, urlopen
  767. req = Request(
  768. f"{self.server_url}/objects/batch",
  769. data=b"not json",
  770. headers={"Content-Type": "application/vnd.git-lfs+json"},
  771. method="POST",
  772. )
  773. with self.assertRaises(HTTPError) as cm:
  774. with urlopen(req):
  775. pass
  776. self.assertEqual(cm.exception.code, 400)
  777. class LFSClientTests(TestCase):
  778. """Tests for LFS client network operations."""
  779. def setUp(self) -> None:
  780. super().setUp()
  781. import threading
  782. from dulwich.lfs import LFSClient
  783. from dulwich.lfs_server import run_lfs_server
  784. # Create temporary directory for LFS storage
  785. self.test_dir = tempfile.mkdtemp()
  786. self.addCleanup(shutil.rmtree, self.test_dir)
  787. # Start LFS server in a thread
  788. self.server, self.server_url = run_lfs_server(port=0, lfs_dir=self.test_dir)
  789. self.server_thread = threading.Thread(target=self.server.serve_forever)
  790. self.server_thread.daemon = True
  791. self.server_thread.start()
  792. self.addCleanup(self.server.shutdown)
  793. # Create LFS client pointing to our test server
  794. self.client = LFSClient(self.server_url)
  795. def test_client_url_normalization(self) -> None:
  796. """Test that client URL is normalized correctly."""
  797. from dulwich.lfs import LFSClient
  798. # Test with trailing slash
  799. client = LFSClient("https://example.com/repo.git/info/lfs/")
  800. self.assertEqual(client.url, "https://example.com/repo.git/info/lfs")
  801. # Test without trailing slash
  802. client = LFSClient("https://example.com/repo.git/info/lfs")
  803. self.assertEqual(client.url, "https://example.com/repo.git/info/lfs")
  804. def test_batch_request_format(self) -> None:
  805. """Test batch request formatting."""
  806. # Create an object in the store
  807. test_content = b"test content for batch"
  808. sha = self.server.lfs_store.write_object([test_content])
  809. # Request download batch
  810. result = self.client.batch(
  811. "download", [{"oid": sha, "size": len(test_content)}]
  812. )
  813. self.assertIsNotNone(result.objects)
  814. self.assertEqual(len(result.objects), 1)
  815. self.assertEqual(result.objects[0].oid, sha)
  816. self.assertIsNotNone(result.objects[0].actions)
  817. self.assertIn("download", result.objects[0].actions)
  818. def test_download_with_verification(self) -> None:
  819. """Test download with size and hash verification."""
  820. import hashlib
  821. from dulwich.lfs import LFSError
  822. test_content = b"test content for download"
  823. test_oid = hashlib.sha256(test_content).hexdigest()
  824. # Store the object
  825. sha = self.server.lfs_store.write_object([test_content])
  826. self.assertEqual(sha, test_oid) # Verify SHA calculation
  827. # Download the object
  828. content = self.client.download(test_oid, len(test_content))
  829. self.assertEqual(content, test_content)
  830. # Test size mismatch
  831. with self.assertRaises(LFSError) as cm:
  832. self.client.download(test_oid, 999) # Wrong size
  833. self.assertIn("size", str(cm.exception))
  834. def test_upload_with_verify(self) -> None:
  835. """Test upload with verification step."""
  836. import hashlib
  837. test_content = b"upload test content"
  838. test_oid = hashlib.sha256(test_content).hexdigest()
  839. test_size = len(test_content)
  840. # Upload the object
  841. self.client.upload(test_oid, test_size, test_content)
  842. # Verify it was stored
  843. with self.server.lfs_store.open_object(test_oid) as f:
  844. stored_content = f.read()
  845. self.assertEqual(stored_content, test_content)
  846. def test_upload_already_exists(self) -> None:
  847. """Test upload when object already exists on server."""
  848. import hashlib
  849. test_content = b"existing content"
  850. test_oid = hashlib.sha256(test_content).hexdigest()
  851. # Pre-store the object
  852. self.server.lfs_store.write_object([test_content])
  853. # Upload again - should not raise an error
  854. self.client.upload(test_oid, len(test_content), test_content)
  855. # Verify it's still there
  856. with self.server.lfs_store.open_object(test_oid) as f:
  857. self.assertEqual(f.read(), test_content)
  858. def test_error_handling(self) -> None:
  859. """Test error handling for various scenarios."""
  860. from urllib.error import HTTPError
  861. from dulwich.lfs import LFSError
  862. # Test downloading non-existent object
  863. with self.assertRaises(LFSError) as cm:
  864. self.client.download(
  865. "0000000000000000000000000000000000000000000000000000000000000000", 100
  866. )
  867. self.assertIn("Object not found", str(cm.exception))
  868. # Test uploading with wrong OID
  869. with self.assertRaises(HTTPError) as cm:
  870. self.client.upload("wrong_oid", 5, b"hello")
  871. # Server should reject due to OID mismatch
  872. self.assertIn("OID mismatch", str(cm.exception))