test_lfs.py 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118
  1. # test_lfs.py -- tests for LFS
  2. # Copyright (C) 2020 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for LFS support."""
  22. import json
  23. import os
  24. import shutil
  25. import tempfile
  26. from dulwich import porcelain
  27. from dulwich.lfs import LFSFilterDriver, LFSPointer, LFSStore
  28. from dulwich.repo import Repo
  29. from . import TestCase
  30. class LFSTests(TestCase):
  31. def setUp(self) -> None:
  32. super().setUp()
  33. # Suppress LFS warnings during these tests
  34. import logging
  35. self._old_level = logging.getLogger("dulwich.lfs").level
  36. logging.getLogger("dulwich.lfs").setLevel(logging.ERROR)
  37. self.test_dir = tempfile.mkdtemp()
  38. self.addCleanup(shutil.rmtree, self.test_dir)
  39. self.lfs = LFSStore.create(self.test_dir)
  40. def tearDown(self) -> None:
  41. # Restore original logging level
  42. import logging
  43. logging.getLogger("dulwich.lfs").setLevel(self._old_level)
  44. super().tearDown()
  45. def test_create(self) -> None:
  46. sha = self.lfs.write_object([b"a", b"b"])
  47. with self.lfs.open_object(sha) as f:
  48. self.assertEqual(b"ab", f.read())
  49. def test_missing(self) -> None:
  50. self.assertRaises(KeyError, self.lfs.open_object, "abcdeabcdeabcdeabcde")
  51. def test_write_object_empty(self) -> None:
  52. """Test writing an empty object."""
  53. sha = self.lfs.write_object([])
  54. with self.lfs.open_object(sha) as f:
  55. self.assertEqual(b"", f.read())
  56. def test_write_object_multiple_chunks(self) -> None:
  57. """Test writing an object with multiple chunks."""
  58. chunks = [b"chunk1", b"chunk2", b"chunk3"]
  59. sha = self.lfs.write_object(chunks)
  60. with self.lfs.open_object(sha) as f:
  61. self.assertEqual(b"".join(chunks), f.read())
  62. def test_sha_path_calculation(self) -> None:
  63. """Test the internal sha path calculation."""
  64. # The implementation splits the sha into parts for directory structure
  65. # Write and verify we can read it back
  66. sha = self.lfs.write_object([b"test data"])
  67. self.assertEqual(len(sha), 64) # SHA-256 is 64 hex chars
  68. # Open should succeed, which verifies the path calculation works
  69. with self.lfs.open_object(sha) as f:
  70. self.assertEqual(b"test data", f.read())
  71. def test_create_lfs_dir(self) -> None:
  72. """Test creating an LFS directory when it doesn't exist."""
  73. import os
  74. # Create a temporary directory for the test
  75. lfs_parent_dir = tempfile.mkdtemp()
  76. self.addCleanup(shutil.rmtree, lfs_parent_dir)
  77. # Create a path for the LFS directory
  78. lfs_dir = os.path.join(lfs_parent_dir, "lfs")
  79. # Create the LFS store
  80. LFSStore.create(lfs_dir)
  81. # Verify the directories were created
  82. self.assertTrue(os.path.isdir(lfs_dir))
  83. self.assertTrue(os.path.isdir(os.path.join(lfs_dir, "tmp")))
  84. self.assertTrue(os.path.isdir(os.path.join(lfs_dir, "objects")))
  85. class LFSPointerTests(TestCase):
  86. def test_from_bytes_valid(self) -> None:
  87. """Test parsing a valid LFS pointer."""
  88. pointer_data = (
  89. b"version https://git-lfs.github.com/spec/v1\n"
  90. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  91. b"size 0\n"
  92. )
  93. pointer = LFSPointer.from_bytes(pointer_data)
  94. self.assertIsNotNone(pointer)
  95. self.assertEqual(
  96. pointer.oid,
  97. "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
  98. )
  99. self.assertEqual(pointer.size, 0)
  100. def test_from_bytes_with_extra_fields(self) -> None:
  101. """Test parsing LFS pointer with extra fields (should still work)."""
  102. pointer_data = (
  103. b"version https://git-lfs.github.com/spec/v1\n"
  104. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  105. b"size 1234\n"
  106. b"x-custom-field value\n"
  107. )
  108. pointer = LFSPointer.from_bytes(pointer_data)
  109. self.assertIsNotNone(pointer)
  110. self.assertEqual(pointer.size, 1234)
  111. def test_from_bytes_invalid_version(self) -> None:
  112. """Test parsing with invalid version line."""
  113. pointer_data = (
  114. b"version https://invalid.com/spec/v1\n"
  115. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  116. b"size 0\n"
  117. )
  118. pointer = LFSPointer.from_bytes(pointer_data)
  119. self.assertIsNone(pointer)
  120. def test_from_bytes_missing_oid(self) -> None:
  121. """Test parsing with missing OID."""
  122. pointer_data = b"version https://git-lfs.github.com/spec/v1\nsize 0\n"
  123. pointer = LFSPointer.from_bytes(pointer_data)
  124. self.assertIsNone(pointer)
  125. def test_from_bytes_missing_size(self) -> None:
  126. """Test parsing with missing size."""
  127. pointer_data = (
  128. b"version https://git-lfs.github.com/spec/v1\n"
  129. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  130. )
  131. pointer = LFSPointer.from_bytes(pointer_data)
  132. self.assertIsNone(pointer)
  133. def test_from_bytes_invalid_size(self) -> None:
  134. """Test parsing with invalid size."""
  135. pointer_data = (
  136. b"version https://git-lfs.github.com/spec/v1\n"
  137. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  138. b"size not_a_number\n"
  139. )
  140. pointer = LFSPointer.from_bytes(pointer_data)
  141. self.assertIsNone(pointer)
  142. def test_from_bytes_binary_data(self) -> None:
  143. """Test parsing binary data (not an LFS pointer)."""
  144. binary_data = b"\x00\x01\x02\x03\x04"
  145. pointer = LFSPointer.from_bytes(binary_data)
  146. self.assertIsNone(pointer)
  147. def test_to_bytes(self) -> None:
  148. """Test converting LFS pointer to bytes."""
  149. pointer = LFSPointer(
  150. "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 1234
  151. )
  152. data = pointer.to_bytes()
  153. expected = (
  154. b"version https://git-lfs.github.com/spec/v1\n"
  155. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  156. b"size 1234\n"
  157. )
  158. self.assertEqual(data, expected)
  159. def test_round_trip(self) -> None:
  160. """Test converting to bytes and back."""
  161. original = LFSPointer(
  162. "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 9876
  163. )
  164. data = original.to_bytes()
  165. parsed = LFSPointer.from_bytes(data)
  166. self.assertIsNotNone(parsed)
  167. self.assertEqual(parsed.oid, original.oid)
  168. self.assertEqual(parsed.size, original.size)
  169. def test_is_valid_oid(self) -> None:
  170. """Test OID validation."""
  171. # Valid SHA256
  172. valid_pointer = LFSPointer(
  173. "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0
  174. )
  175. self.assertTrue(valid_pointer.is_valid_oid())
  176. # Too short
  177. short_pointer = LFSPointer("e3b0c44298fc1c14", 0)
  178. self.assertFalse(short_pointer.is_valid_oid())
  179. # Invalid hex characters
  180. invalid_pointer = LFSPointer(
  181. "g3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0
  182. )
  183. self.assertFalse(invalid_pointer.is_valid_oid())
  184. class LFSIntegrationTests(TestCase):
  185. """Integration tests for LFS with Git operations."""
  186. def setUp(self) -> None:
  187. super().setUp()
  188. # Suppress LFS warnings during these integration tests
  189. import logging
  190. self._old_level = logging.getLogger("dulwich.lfs").level
  191. logging.getLogger("dulwich.lfs").setLevel(logging.ERROR)
  192. # Create temporary directory for test repo
  193. self.test_dir = tempfile.mkdtemp()
  194. self.addCleanup(shutil.rmtree, self.test_dir)
  195. # Initialize repo
  196. from dulwich.repo import Repo
  197. self.repo = Repo.init(self.test_dir)
  198. self.lfs_dir = os.path.join(self.test_dir, ".git", "lfs")
  199. self.lfs_store = LFSStore.create(self.lfs_dir)
  200. def tearDown(self) -> None:
  201. # Restore original logging level
  202. import logging
  203. logging.getLogger("dulwich.lfs").setLevel(self._old_level)
  204. super().tearDown()
  205. def test_lfs_with_gitattributes(self) -> None:
  206. """Test LFS integration with .gitattributes."""
  207. import os
  208. # Create .gitattributes file
  209. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  210. with open(gitattributes_path, "wb") as f:
  211. f.write(b"*.bin filter=lfs diff=lfs merge=lfs -text\n")
  212. # Create a binary file
  213. bin_path = os.path.join(self.test_dir, "large.bin")
  214. large_content = b"Large binary content" * 1000
  215. with open(bin_path, "wb") as f:
  216. f.write(large_content)
  217. # Add files to repo
  218. self.repo.get_worktree().stage([".gitattributes", "large.bin"])
  219. # Get the blob for large.bin from the index
  220. index = self.repo.open_index()
  221. entry = index[b"large.bin"]
  222. blob = self.repo.object_store[entry.sha]
  223. # With LFS configured, the blob should contain an LFS pointer
  224. # (Note: This would require actual LFS filter integration in dulwich)
  225. # For now, we just verify the structure
  226. self.assertIsNotNone(blob)
  227. def test_lfs_checkout_missing_object(self) -> None:
  228. """Test checkout behavior when LFS object is missing."""
  229. from dulwich.objects import Blob, Commit, Tree
  230. # Create an LFS pointer blob
  231. pointer = LFSPointer(
  232. "0000000000000000000000000000000000000000000000000000000000000000", 1234
  233. )
  234. blob = Blob()
  235. blob.data = pointer.to_bytes()
  236. self.repo.object_store.add_object(blob)
  237. # Create tree with the blob
  238. tree = Tree()
  239. tree.add(b"missing.bin", 0o100644, blob.id)
  240. self.repo.object_store.add_object(tree)
  241. # Create commit
  242. commit = Commit()
  243. commit.tree = tree.id
  244. commit.message = b"Add missing LFS file"
  245. commit.author = commit.committer = b"Test User <test@example.com>"
  246. commit.commit_time = commit.author_time = 1234567890
  247. commit.commit_timezone = commit.author_timezone = 0
  248. self.repo.object_store.add_object(commit)
  249. # Update HEAD
  250. self.repo.refs[b"HEAD"] = commit.id
  251. # Checkout should leave pointer file when object is missing
  252. # (actual checkout would require more integration)
  253. def test_lfs_pointer_detection(self) -> None:
  254. """Test detection of LFS pointer files."""
  255. # Test various file contents
  256. test_cases = [
  257. # Valid LFS pointer
  258. (
  259. b"version https://git-lfs.github.com/spec/v1\n"
  260. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  261. b"size 1234\n",
  262. True,
  263. ),
  264. # Regular text file
  265. (b"This is a regular text file\n", False),
  266. # Binary file
  267. (b"\x00\x01\x02\x03\x04", False),
  268. # File that starts like pointer but isn't
  269. (b"version 1.0\nThis is not an LFS pointer\n", False),
  270. ]
  271. for content, expected_is_pointer in test_cases:
  272. pointer = LFSPointer.from_bytes(content)
  273. self.assertEqual(
  274. pointer is not None,
  275. expected_is_pointer,
  276. f"Failed for content: {content!r}",
  277. )
  278. def test_builtin_lfs_clone_no_config(self) -> None:
  279. """Test cloning with LFS when no git-lfs commands are configured."""
  280. # Create source repository
  281. source_dir = os.path.join(self.test_dir, "source")
  282. os.makedirs(source_dir)
  283. source_repo = Repo.init(source_dir)
  284. # Create empty config (no LFS commands)
  285. config = source_repo.get_config()
  286. config.write_to_path()
  287. # Create .gitattributes with LFS filter
  288. gitattributes_path = os.path.join(source_dir, ".gitattributes")
  289. with open(gitattributes_path, "wb") as f:
  290. f.write(b"*.bin filter=lfs\n")
  291. # Create test content and store in LFS
  292. test_content = b"Test binary content"
  293. test_oid = LFSStore.from_repo(source_repo, create=True).write_object(
  294. [test_content]
  295. )
  296. # Create LFS pointer file
  297. pointer = LFSPointer(test_oid, len(test_content))
  298. pointer_file = os.path.join(source_dir, "test.bin")
  299. with open(pointer_file, "wb") as f:
  300. f.write(pointer.to_bytes())
  301. # Commit files
  302. porcelain.add(source_repo, paths=[".gitattributes", "test.bin"])
  303. porcelain.commit(source_repo, message=b"Add LFS tracked file")
  304. source_repo.close()
  305. # Clone the repository
  306. target_dir = os.path.join(self.test_dir, "target")
  307. target_repo = porcelain.clone(source_dir, target_dir)
  308. # Verify no LFS commands in config
  309. target_config = target_repo.get_config_stack()
  310. with self.assertRaises(KeyError):
  311. target_config.get((b"filter", b"lfs"), b"smudge")
  312. # Check the cloned file
  313. cloned_file = os.path.join(target_dir, "test.bin")
  314. with open(cloned_file, "rb") as f:
  315. content = f.read()
  316. # Should still be a pointer (LFS object not in target's store)
  317. self.assertTrue(
  318. content.startswith(b"version https://git-lfs.github.com/spec/v1")
  319. )
  320. self.assertIn(test_oid.encode(), content)
  321. target_repo.close()
  322. def test_builtin_lfs_with_local_objects(self) -> None:
  323. """Test built-in LFS filter when objects are available locally."""
  324. # No LFS config
  325. config = self.repo.get_config()
  326. config.write_to_path()
  327. # Create .gitattributes
  328. gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
  329. with open(gitattributes_path, "wb") as f:
  330. f.write(b"*.dat filter=lfs\n")
  331. # Create LFS store and add object
  332. test_content = b"Hello from LFS!"
  333. lfs_store = LFSStore.from_repo(self.repo, create=True)
  334. test_oid = lfs_store.write_object([test_content])
  335. # Create pointer file
  336. pointer = LFSPointer(test_oid, len(test_content))
  337. pointer_file = os.path.join(self.test_dir, "data.dat")
  338. with open(pointer_file, "wb") as f:
  339. f.write(pointer.to_bytes())
  340. # Commit
  341. porcelain.add(self.repo, paths=[".gitattributes", "data.dat"])
  342. porcelain.commit(self.repo, message=b"Add LFS file")
  343. # Reset index to trigger checkout with filter
  344. self.repo.get_worktree().reset_index()
  345. # Check file content
  346. with open(pointer_file, "rb") as f:
  347. content = f.read()
  348. # Built-in filter should have converted pointer to actual content
  349. self.assertEqual(content, test_content)
  350. def test_builtin_lfs_filter_used(self) -> None:
  351. """Verify that built-in LFS filter is used when no config exists."""
  352. # Get filter registry
  353. normalizer = self.repo.get_blob_normalizer()
  354. filter_registry = normalizer.filter_registry
  355. lfs_driver = filter_registry.get_driver("lfs")
  356. # Should be built-in LFS filter
  357. self.assertIsInstance(lfs_driver, LFSFilterDriver)
  358. self.assertEqual(type(lfs_driver).__module__, "dulwich.lfs")
  359. class LFSFilterDriverTests(TestCase):
  360. def setUp(self) -> None:
  361. super().setUp()
  362. self.test_dir = tempfile.mkdtemp()
  363. self.addCleanup(shutil.rmtree, self.test_dir)
  364. self.lfs_store = LFSStore.create(self.test_dir)
  365. self.filter_driver = LFSFilterDriver(self.lfs_store)
  366. def test_clean_new_file(self) -> None:
  367. """Test clean filter on new file content."""
  368. content = b"This is a test file content"
  369. result = self.filter_driver.clean(content)
  370. # Result should be an LFS pointer
  371. pointer = LFSPointer.from_bytes(result)
  372. self.assertIsNotNone(pointer)
  373. self.assertEqual(pointer.size, len(content))
  374. # Content should be stored in LFS
  375. with self.lfs_store.open_object(pointer.oid) as f:
  376. self.assertEqual(f.read(), content)
  377. def test_clean_existing_pointer(self) -> None:
  378. """Test clean filter on already-pointer content."""
  379. # Create a pointer
  380. pointer = LFSPointer(
  381. "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 1234
  382. )
  383. pointer_data = pointer.to_bytes()
  384. # Clean should return the pointer unchanged
  385. result = self.filter_driver.clean(pointer_data)
  386. self.assertEqual(result, pointer_data)
  387. def test_smudge_valid_pointer(self) -> None:
  388. """Test smudge filter with valid pointer."""
  389. # Store some content
  390. content = b"This is the actual file content"
  391. sha = self.lfs_store.write_object([content])
  392. # Create pointer
  393. pointer = LFSPointer(sha, len(content))
  394. pointer_data = pointer.to_bytes()
  395. # Smudge should return the actual content
  396. result = self.filter_driver.smudge(pointer_data)
  397. self.assertEqual(result, content)
  398. def test_smudge_missing_object(self) -> None:
  399. """Test smudge filter with missing LFS object."""
  400. # Create pointer to non-existent object
  401. pointer = LFSPointer(
  402. "0000000000000000000000000000000000000000000000000000000000000000", 1234
  403. )
  404. pointer_data = pointer.to_bytes()
  405. # Smudge should return the pointer as-is when object is missing
  406. result = self.filter_driver.smudge(pointer_data)
  407. self.assertEqual(result, pointer_data)
  408. def test_smudge_non_pointer(self) -> None:
  409. """Test smudge filter on non-pointer content."""
  410. content = b"This is not an LFS pointer"
  411. # Smudge should return content unchanged
  412. result = self.filter_driver.smudge(content)
  413. self.assertEqual(result, content)
  414. def test_round_trip(self) -> None:
  415. """Test clean followed by smudge."""
  416. original_content = b"Round trip test content"
  417. # Clean (working tree -> repo)
  418. pointer_data = self.filter_driver.clean(original_content)
  419. # Verify it's a pointer
  420. pointer = LFSPointer.from_bytes(pointer_data)
  421. self.assertIsNotNone(pointer)
  422. # Smudge (repo -> working tree)
  423. restored_content = self.filter_driver.smudge(pointer_data)
  424. # Should get back the original content
  425. self.assertEqual(restored_content, original_content)
  426. def test_clean_empty_file(self) -> None:
  427. """Test clean filter on empty file."""
  428. content = b""
  429. result = self.filter_driver.clean(content)
  430. # Result should be an LFS pointer
  431. pointer = LFSPointer.from_bytes(result)
  432. self.assertIsNotNone(pointer)
  433. self.assertEqual(pointer.size, 0)
  434. # Empty content should be stored in LFS
  435. with self.lfs_store.open_object(pointer.oid) as f:
  436. self.assertEqual(f.read(), content)
  437. def test_clean_large_file(self) -> None:
  438. """Test clean filter on large file."""
  439. # Create a large file (1MB)
  440. content = b"x" * (1024 * 1024)
  441. result = self.filter_driver.clean(content)
  442. # Result should be an LFS pointer
  443. pointer = LFSPointer.from_bytes(result)
  444. self.assertIsNotNone(pointer)
  445. self.assertEqual(pointer.size, len(content))
  446. # Content should be stored in LFS
  447. with self.lfs_store.open_object(pointer.oid) as f:
  448. self.assertEqual(f.read(), content)
  449. def test_smudge_corrupt_pointer(self) -> None:
  450. """Test smudge filter with corrupt pointer data."""
  451. # Create corrupt pointer data
  452. corrupt_data = (
  453. b"version https://git-lfs.github.com/spec/v1\noid sha256:invalid\n"
  454. )
  455. # Smudge should return the data as-is
  456. result = self.filter_driver.smudge(corrupt_data)
  457. self.assertEqual(result, corrupt_data)
  458. def test_clean_unicode_content(self) -> None:
  459. """Test clean filter with unicode content."""
  460. # UTF-8 encoded unicode content
  461. content = "Hello 世界 🌍".encode()
  462. result = self.filter_driver.clean(content)
  463. # Result should be an LFS pointer
  464. pointer = LFSPointer.from_bytes(result)
  465. self.assertIsNotNone(pointer)
  466. # Content should be preserved exactly
  467. with self.lfs_store.open_object(pointer.oid) as f:
  468. self.assertEqual(f.read(), content)
  469. class LFSStoreEdgeCaseTests(TestCase):
  470. """Edge case tests for LFS store."""
  471. def setUp(self) -> None:
  472. super().setUp()
  473. self.test_dir = tempfile.mkdtemp()
  474. self.addCleanup(shutil.rmtree, self.test_dir)
  475. self.lfs = LFSStore.create(self.test_dir)
  476. def test_concurrent_writes(self) -> None:
  477. """Test that concurrent writes to same content work correctly."""
  478. content = b"duplicate content"
  479. # Write the same content multiple times
  480. sha1 = self.lfs.write_object([content])
  481. sha2 = self.lfs.write_object([content])
  482. # Should get the same SHA
  483. self.assertEqual(sha1, sha2)
  484. # Content should be stored only once
  485. with self.lfs.open_object(sha1) as f:
  486. self.assertEqual(f.read(), content)
  487. def test_write_with_generator(self) -> None:
  488. """Test writing object with generator chunks."""
  489. def chunk_generator():
  490. yield b"chunk1"
  491. yield b"chunk2"
  492. yield b"chunk3"
  493. sha = self.lfs.write_object(chunk_generator())
  494. # Verify content
  495. with self.lfs.open_object(sha) as f:
  496. self.assertEqual(f.read(), b"chunk1chunk2chunk3")
  497. def test_partial_write_rollback(self) -> None:
  498. """Test that partial writes don't leave artifacts."""
  499. import os
  500. # Count initial objects
  501. objects_dir = os.path.join(self.test_dir, "objects")
  502. initial_count = sum(len(files) for _, _, files in os.walk(objects_dir))
  503. # Try to write with a failing generator
  504. def failing_generator():
  505. yield b"chunk1"
  506. raise RuntimeError("Simulated error")
  507. # This should fail
  508. with self.assertRaises(RuntimeError):
  509. self.lfs.write_object(failing_generator())
  510. # No new objects should have been created
  511. final_count = sum(len(files) for _, _, files in os.walk(objects_dir))
  512. self.assertEqual(initial_count, final_count)
  513. class LFSPointerEdgeCaseTests(TestCase):
  514. """Edge case tests for LFS pointer parsing."""
  515. def test_pointer_with_windows_line_endings(self) -> None:
  516. """Test parsing pointer with Windows line endings."""
  517. pointer_data = (
  518. b"version https://git-lfs.github.com/spec/v1\r\n"
  519. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\r\n"
  520. b"size 1234\r\n"
  521. )
  522. pointer = LFSPointer.from_bytes(pointer_data)
  523. self.assertIsNotNone(pointer)
  524. self.assertEqual(pointer.size, 1234)
  525. def test_pointer_with_extra_whitespace(self) -> None:
  526. """Test parsing pointer with extra whitespace."""
  527. pointer_data = (
  528. b"version https://git-lfs.github.com/spec/v1 \n"
  529. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  530. b"size 1234 \n"
  531. )
  532. pointer = LFSPointer.from_bytes(pointer_data)
  533. self.assertIsNotNone(pointer)
  534. self.assertEqual(pointer.size, 1234)
  535. def test_pointer_case_sensitivity(self) -> None:
  536. """Test that pointer parsing is case sensitive."""
  537. # Version line must be exact
  538. pointer_data = (
  539. b"Version https://git-lfs.github.com/spec/v1\n" # Capital V
  540. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  541. b"size 1234\n"
  542. )
  543. pointer = LFSPointer.from_bytes(pointer_data)
  544. self.assertIsNone(pointer) # Should fail due to case
  545. def test_pointer_oid_formats(self) -> None:
  546. """Test different OID formats."""
  547. # SHA256 is currently the only supported format
  548. # Test SHA1 format (should fail)
  549. pointer_data = (
  550. b"version https://git-lfs.github.com/spec/v1\n"
  551. b"oid sha1:356a192b7913b04c54574d18c28d46e6395428ab\n" # SHA1
  552. b"size 1234\n"
  553. )
  554. pointer = LFSPointer.from_bytes(pointer_data)
  555. # This might be accepted but marked as invalid OID
  556. if pointer:
  557. self.assertFalse(pointer.is_valid_oid())
  558. def test_pointer_size_limits(self) -> None:
  559. """Test size value limits."""
  560. # Test with very large size
  561. pointer_data = (
  562. b"version https://git-lfs.github.com/spec/v1\n"
  563. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  564. b"size 999999999999999999\n" # Very large number
  565. )
  566. pointer = LFSPointer.from_bytes(pointer_data)
  567. self.assertIsNotNone(pointer)
  568. self.assertEqual(pointer.size, 999999999999999999)
  569. # Test with negative size (should fail)
  570. pointer_data = (
  571. b"version https://git-lfs.github.com/spec/v1\n"
  572. b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
  573. b"size -1\n"
  574. )
  575. pointer = LFSPointer.from_bytes(pointer_data)
  576. self.assertIsNone(pointer) # Should fail with negative size
  577. class LFSServerTests(TestCase):
  578. """Tests for the LFS server implementation."""
  579. def setUp(self) -> None:
  580. super().setUp()
  581. import threading
  582. from dulwich.lfs_server import run_lfs_server
  583. # Create temporary directory for LFS storage
  584. self.test_dir = tempfile.mkdtemp()
  585. self.addCleanup(shutil.rmtree, self.test_dir)
  586. # Start LFS server
  587. self.server, self.server_url = run_lfs_server(port=0, lfs_dir=self.test_dir)
  588. self.server_thread = threading.Thread(target=self.server.serve_forever)
  589. self.server_thread.daemon = True
  590. self.server_thread.start()
  591. def cleanup_server():
  592. self.server.shutdown()
  593. self.server.server_close()
  594. self.server_thread.join(timeout=1.0)
  595. self.addCleanup(cleanup_server)
  596. def test_server_batch_endpoint(self) -> None:
  597. """Test the batch endpoint directly."""
  598. from urllib.request import Request, urlopen
  599. # Create batch request
  600. batch_data = {
  601. "operation": "download",
  602. "transfers": ["basic"],
  603. "objects": [{"oid": "abc123", "size": 100}],
  604. }
  605. req = Request(
  606. f"{self.server_url}/objects/batch",
  607. data=json.dumps(batch_data).encode("utf-8"),
  608. headers={
  609. "Content-Type": "application/vnd.git-lfs+json",
  610. "Accept": "application/vnd.git-lfs+json",
  611. },
  612. method="POST",
  613. )
  614. with urlopen(req) as response:
  615. result = json.loads(response.read())
  616. self.assertIn("objects", result)
  617. self.assertEqual(len(result["objects"]), 1)
  618. self.assertEqual(result["objects"][0]["oid"], "abc123")
  619. self.assertIn("error", result["objects"][0]) # Object doesn't exist
  620. def test_server_upload_download(self) -> None:
  621. """Test uploading and downloading an object."""
  622. import hashlib
  623. from urllib.request import Request, urlopen
  624. test_content = b"test server content"
  625. test_oid = hashlib.sha256(test_content).hexdigest()
  626. # Get upload URL via batch
  627. batch_data = {
  628. "operation": "upload",
  629. "transfers": ["basic"],
  630. "objects": [{"oid": test_oid, "size": len(test_content)}],
  631. }
  632. req = Request(
  633. f"{self.server_url}/objects/batch",
  634. data=json.dumps(batch_data).encode("utf-8"),
  635. headers={
  636. "Content-Type": "application/vnd.git-lfs+json",
  637. "Accept": "application/vnd.git-lfs+json",
  638. },
  639. method="POST",
  640. )
  641. with urlopen(req) as response:
  642. batch_result = json.loads(response.read())
  643. upload_url = batch_result["objects"][0]["actions"]["upload"]["href"]
  644. # Upload the object
  645. upload_req = Request(
  646. upload_url,
  647. data=test_content,
  648. headers={"Content-Type": "application/octet-stream"},
  649. method="PUT",
  650. )
  651. with urlopen(upload_req) as response:
  652. self.assertEqual(response.status, 200)
  653. # Download the object
  654. download_batch_data = {
  655. "operation": "download",
  656. "transfers": ["basic"],
  657. "objects": [{"oid": test_oid, "size": len(test_content)}],
  658. }
  659. req = Request(
  660. f"{self.server_url}/objects/batch",
  661. data=json.dumps(download_batch_data).encode("utf-8"),
  662. headers={
  663. "Content-Type": "application/vnd.git-lfs+json",
  664. "Accept": "application/vnd.git-lfs+json",
  665. },
  666. method="POST",
  667. )
  668. with urlopen(req) as response:
  669. download_batch_result = json.loads(response.read())
  670. download_url = download_batch_result["objects"][0]["actions"]["download"][
  671. "href"
  672. ]
  673. # Download the object
  674. download_req = Request(download_url)
  675. with urlopen(download_req) as response:
  676. downloaded_content = response.read()
  677. self.assertEqual(downloaded_content, test_content)
  678. def test_server_verify_endpoint(self) -> None:
  679. """Test the verify endpoint."""
  680. import hashlib
  681. from urllib.error import HTTPError
  682. from urllib.request import Request, urlopen
  683. test_content = b"verify test"
  684. test_oid = hashlib.sha256(test_content).hexdigest()
  685. # First upload the object
  686. self.server.lfs_store.write_object([test_content])
  687. # Test verify for existing object
  688. verify_req = Request(
  689. f"{self.server_url}/objects/{test_oid}/verify",
  690. data=json.dumps({"oid": test_oid, "size": len(test_content)}).encode(
  691. "utf-8"
  692. ),
  693. headers={"Content-Type": "application/vnd.git-lfs+json"},
  694. method="POST",
  695. )
  696. with urlopen(verify_req) as response:
  697. self.assertEqual(response.status, 200)
  698. # Test verify for non-existent object
  699. fake_oid = "0" * 64
  700. verify_req = Request(
  701. f"{self.server_url}/objects/{fake_oid}/verify",
  702. data=json.dumps({"oid": fake_oid, "size": 100}).encode("utf-8"),
  703. headers={"Content-Type": "application/vnd.git-lfs+json"},
  704. method="POST",
  705. )
  706. with self.assertRaises(HTTPError) as cm:
  707. with urlopen(verify_req):
  708. pass
  709. self.assertEqual(cm.exception.code, 404)
  710. def test_server_invalid_endpoints(self) -> None:
  711. """Test invalid endpoints return 404."""
  712. from urllib.error import HTTPError
  713. from urllib.request import Request, urlopen
  714. # Test invalid GET endpoint
  715. with self.assertRaises(HTTPError) as cm:
  716. with urlopen(f"{self.server_url}/invalid"):
  717. pass
  718. self.assertEqual(cm.exception.code, 404)
  719. # Test invalid POST endpoint
  720. req = Request(f"{self.server_url}/invalid", data=b"test", method="POST")
  721. with self.assertRaises(HTTPError) as cm:
  722. with urlopen(req):
  723. pass
  724. self.assertEqual(cm.exception.code, 404)
  725. def test_server_batch_invalid_operation(self) -> None:
  726. """Test batch endpoint with invalid operation."""
  727. from urllib.error import HTTPError
  728. from urllib.request import Request, urlopen
  729. batch_data = {"operation": "invalid", "transfers": ["basic"], "objects": []}
  730. req = Request(
  731. f"{self.server_url}/objects/batch",
  732. data=json.dumps(batch_data).encode("utf-8"),
  733. headers={"Content-Type": "application/vnd.git-lfs+json"},
  734. method="POST",
  735. )
  736. with self.assertRaises(HTTPError) as cm:
  737. with urlopen(req):
  738. pass
  739. self.assertEqual(cm.exception.code, 400)
  740. def test_server_batch_missing_fields(self) -> None:
  741. """Test batch endpoint with missing required fields."""
  742. from urllib.request import Request, urlopen
  743. # Missing oid
  744. batch_data = {
  745. "operation": "download",
  746. "transfers": ["basic"],
  747. "objects": [{"size": 100}], # Missing oid
  748. }
  749. req = Request(
  750. f"{self.server_url}/objects/batch",
  751. data=json.dumps(batch_data).encode("utf-8"),
  752. headers={"Content-Type": "application/vnd.git-lfs+json"},
  753. method="POST",
  754. )
  755. with urlopen(req) as response:
  756. result = json.loads(response.read())
  757. self.assertIn("error", result["objects"][0])
  758. self.assertIn("Missing oid", result["objects"][0]["error"]["message"])
  759. def test_server_upload_oid_mismatch(self) -> None:
  760. """Test upload with OID mismatch."""
  761. from urllib.error import HTTPError
  762. from urllib.request import Request, urlopen
  763. # Upload with wrong OID
  764. upload_req = Request(
  765. f"{self.server_url}/objects/wrongoid123",
  766. data=b"test content",
  767. headers={"Content-Type": "application/octet-stream"},
  768. method="PUT",
  769. )
  770. with self.assertRaises(HTTPError) as cm:
  771. with urlopen(upload_req):
  772. pass
  773. self.assertEqual(cm.exception.code, 400)
  774. self.assertIn("OID mismatch", cm.exception.read().decode())
  775. def test_server_download_non_existent(self) -> None:
  776. """Test downloading non-existent object."""
  777. from urllib.error import HTTPError
  778. from urllib.request import urlopen
  779. fake_oid = "0" * 64
  780. with self.assertRaises(HTTPError) as cm:
  781. with urlopen(f"{self.server_url}/objects/{fake_oid}"):
  782. pass
  783. self.assertEqual(cm.exception.code, 404)
  784. def test_server_invalid_json(self) -> None:
  785. """Test batch endpoint with invalid JSON."""
  786. from urllib.error import HTTPError
  787. from urllib.request import Request, urlopen
  788. req = Request(
  789. f"{self.server_url}/objects/batch",
  790. data=b"not json",
  791. headers={"Content-Type": "application/vnd.git-lfs+json"},
  792. method="POST",
  793. )
  794. with self.assertRaises(HTTPError) as cm:
  795. with urlopen(req):
  796. pass
  797. self.assertEqual(cm.exception.code, 400)
  798. class LFSClientTests(TestCase):
  799. """Tests for LFS client network operations."""
  800. def setUp(self) -> None:
  801. super().setUp()
  802. import threading
  803. from dulwich.lfs import LFSClient
  804. from dulwich.lfs_server import run_lfs_server
  805. # Create temporary directory for LFS storage
  806. self.test_dir = tempfile.mkdtemp()
  807. self.addCleanup(shutil.rmtree, self.test_dir)
  808. # Start LFS server in a thread
  809. self.server, self.server_url = run_lfs_server(port=0, lfs_dir=self.test_dir)
  810. self.server_thread = threading.Thread(target=self.server.serve_forever)
  811. self.server_thread.daemon = True
  812. self.server_thread.start()
  813. def cleanup_server():
  814. self.server.shutdown()
  815. self.server.server_close()
  816. self.server_thread.join(timeout=1.0)
  817. self.addCleanup(cleanup_server)
  818. # Create LFS client pointing to our test server
  819. self.client = LFSClient(self.server_url)
  820. def test_client_url_normalization(self) -> None:
  821. """Test that client URL is normalized correctly."""
  822. from dulwich.lfs import LFSClient
  823. # Test with trailing slash
  824. client = LFSClient("https://example.com/repo.git/info/lfs/")
  825. self.assertEqual(client.url, "https://example.com/repo.git/info/lfs")
  826. # Test without trailing slash
  827. client = LFSClient("https://example.com/repo.git/info/lfs")
  828. self.assertEqual(client.url, "https://example.com/repo.git/info/lfs")
  829. def test_batch_request_format(self) -> None:
  830. """Test batch request formatting."""
  831. # Create an object in the store
  832. test_content = b"test content for batch"
  833. sha = self.server.lfs_store.write_object([test_content])
  834. # Request download batch
  835. result = self.client.batch(
  836. "download", [{"oid": sha, "size": len(test_content)}]
  837. )
  838. self.assertIsNotNone(result.objects)
  839. self.assertEqual(len(result.objects), 1)
  840. self.assertEqual(result.objects[0].oid, sha)
  841. self.assertIsNotNone(result.objects[0].actions)
  842. self.assertIn("download", result.objects[0].actions)
  843. def test_download_with_verification(self) -> None:
  844. """Test download with size and hash verification."""
  845. import hashlib
  846. from dulwich.lfs import LFSError
  847. test_content = b"test content for download"
  848. test_oid = hashlib.sha256(test_content).hexdigest()
  849. # Store the object
  850. sha = self.server.lfs_store.write_object([test_content])
  851. self.assertEqual(sha, test_oid) # Verify SHA calculation
  852. # Download the object
  853. content = self.client.download(test_oid, len(test_content))
  854. self.assertEqual(content, test_content)
  855. # Test size mismatch
  856. with self.assertRaises(LFSError) as cm:
  857. self.client.download(test_oid, 999) # Wrong size
  858. self.assertIn("size", str(cm.exception))
  859. def test_upload_with_verify(self) -> None:
  860. """Test upload with verification step."""
  861. import hashlib
  862. test_content = b"upload test content"
  863. test_oid = hashlib.sha256(test_content).hexdigest()
  864. test_size = len(test_content)
  865. # Upload the object
  866. self.client.upload(test_oid, test_size, test_content)
  867. # Verify it was stored
  868. with self.server.lfs_store.open_object(test_oid) as f:
  869. stored_content = f.read()
  870. self.assertEqual(stored_content, test_content)
  871. def test_upload_already_exists(self) -> None:
  872. """Test upload when object already exists on server."""
  873. import hashlib
  874. test_content = b"existing content"
  875. test_oid = hashlib.sha256(test_content).hexdigest()
  876. # Pre-store the object
  877. self.server.lfs_store.write_object([test_content])
  878. # Upload again - should not raise an error
  879. self.client.upload(test_oid, len(test_content), test_content)
  880. # Verify it's still there
  881. with self.server.lfs_store.open_object(test_oid) as f:
  882. self.assertEqual(f.read(), test_content)
  883. def test_error_handling(self) -> None:
  884. """Test error handling for various scenarios."""
  885. from urllib.error import HTTPError
  886. from dulwich.lfs import LFSError
  887. # Test downloading non-existent object
  888. with self.assertRaises(LFSError) as cm:
  889. self.client.download(
  890. "0000000000000000000000000000000000000000000000000000000000000000", 100
  891. )
  892. self.assertIn("Object not found", str(cm.exception))
  893. # Test uploading with wrong OID
  894. with self.assertRaises(HTTPError) as cm:
  895. self.client.upload("wrong_oid", 5, b"hello")
  896. # Server should reject due to OID mismatch
  897. self.assertIn("OID mismatch", str(cm.exception))