test_pack.py 68 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870
  1. # test_pack.py -- Tests for the handling of git packs.
  2. # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
  3. # Copyright (C) 2008 Jelmer Vernooij <jelmer@jelmer.uk>
  4. #
  5. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  6. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  7. # General Public License as published by the Free Software Foundation; version 2.0
  8. # or (at your option) any later version. You can redistribute it and/or
  9. # modify it under the terms of either of these two licenses.
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. #
  17. # You should have received a copy of the licenses; if not, see
  18. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  19. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  20. # License, Version 2.0.
  21. #
  22. """Tests for Dulwich packs."""
  23. import os
  24. import shutil
  25. import sys
  26. import tempfile
  27. import zlib
  28. from hashlib import sha1
  29. from io import BytesIO
  30. from typing import NoReturn
  31. from dulwich.errors import ApplyDeltaError, ChecksumMismatch
  32. from dulwich.file import GitFile
  33. from dulwich.object_format import DEFAULT_OBJECT_FORMAT
  34. from dulwich.object_store import MemoryObjectStore
  35. from dulwich.objects import Blob, Commit, Tree, hex_to_sha, sha_to_hex
  36. from dulwich.pack import (
  37. OFS_DELTA,
  38. REF_DELTA,
  39. DeltaChainIterator,
  40. MemoryPackIndex,
  41. Pack,
  42. PackData,
  43. PackIndex3,
  44. PackStreamReader,
  45. UnpackedObject,
  46. UnresolvedDeltas,
  47. _create_delta_py,
  48. _delta_encode_size,
  49. _encode_copy_operation,
  50. apply_delta,
  51. compute_file_sha,
  52. create_delta,
  53. deltify_pack_objects,
  54. load_pack_index,
  55. read_zlib_chunks,
  56. unpack_object,
  57. write_pack,
  58. write_pack_header,
  59. write_pack_index_v1,
  60. write_pack_index_v2,
  61. write_pack_index_v3,
  62. write_pack_object,
  63. )
  64. from dulwich.tests.utils import (
  65. build_pack,
  66. ext_functest_builder,
  67. functest_builder,
  68. make_object,
  69. )
  70. try:
  71. from dulwich._pack import create_delta as _create_delta_rs
  72. except ImportError:
  73. _create_delta_rs = None
  74. from . import TestCase
  75. pack1_sha = b"bc63ddad95e7321ee734ea11a7a62d314e0d7481"
  76. a_sha = b"6f670c0fb53f9463760b7295fbb814e965fb20c8"
  77. tree_sha = b"b2a2766a2879c209ab1176e7e778b81ae422eeaa"
  78. commit_sha = b"f18faa16531ac570a3fdc8c7ca16682548dafd12"
  79. indexmode = "0o100644" if sys.platform != "win32" else "0o100666"
  80. class PackTests(TestCase):
  81. """Base class for testing packs."""
  82. def setUp(self) -> None:
  83. super().setUp()
  84. self.tempdir = tempfile.mkdtemp()
  85. self.addCleanup(shutil.rmtree, self.tempdir)
  86. datadir = os.path.abspath(
  87. os.path.join(os.path.dirname(__file__), "../testdata/packs")
  88. )
  89. def get_pack_index(self, sha):
  90. """Returns a PackIndex from the datadir with the given sha."""
  91. return load_pack_index(
  92. os.path.join(self.datadir, "pack-{}.idx".format(sha.decode("ascii"))),
  93. DEFAULT_OBJECT_FORMAT,
  94. )
  95. def get_pack_data(self, sha):
  96. """Returns a PackData object from the datadir with the given sha."""
  97. return PackData(
  98. os.path.join(self.datadir, "pack-{}.pack".format(sha.decode("ascii"))),
  99. object_format=DEFAULT_OBJECT_FORMAT,
  100. )
  101. def get_pack(self, sha):
  102. return Pack(
  103. os.path.join(self.datadir, "pack-{}".format(sha.decode("ascii"))),
  104. object_format=DEFAULT_OBJECT_FORMAT,
  105. )
  106. def assertSucceeds(self, func, *args, **kwargs) -> None:
  107. try:
  108. func(*args, **kwargs)
  109. except ChecksumMismatch as e:
  110. self.fail(e)
  111. class PackIndexTests(PackTests):
  112. """Class that tests the index of packfiles."""
  113. def test_object_offset(self) -> None:
  114. """Tests that the correct object offset is returned from the index."""
  115. p = self.get_pack_index(pack1_sha)
  116. self.assertRaises(KeyError, p.object_offset, pack1_sha)
  117. self.assertEqual(p.object_offset(a_sha), 178)
  118. self.assertEqual(p.object_offset(tree_sha), 138)
  119. self.assertEqual(p.object_offset(commit_sha), 12)
  120. def test_object_sha1(self) -> None:
  121. """Tests that the correct object offset is returned from the index."""
  122. p = self.get_pack_index(pack1_sha)
  123. self.assertRaises(KeyError, p.object_sha1, 876)
  124. self.assertEqual(p.object_sha1(178), hex_to_sha(a_sha))
  125. self.assertEqual(p.object_sha1(138), hex_to_sha(tree_sha))
  126. self.assertEqual(p.object_sha1(12), hex_to_sha(commit_sha))
  127. def test_iter_prefix(self) -> None:
  128. p = self.get_pack_index(pack1_sha)
  129. self.assertEqual([p.object_sha1(178)], list(p.iter_prefix(hex_to_sha(a_sha))))
  130. self.assertEqual(
  131. [p.object_sha1(178)], list(p.iter_prefix(hex_to_sha(a_sha)[:5]))
  132. )
  133. self.assertEqual(
  134. [p.object_sha1(178)], list(p.iter_prefix(hex_to_sha(a_sha)[:2]))
  135. )
  136. def test_index_len(self) -> None:
  137. p = self.get_pack_index(pack1_sha)
  138. self.assertEqual(3, len(p))
  139. def test_get_stored_checksum(self) -> None:
  140. p = self.get_pack_index(pack1_sha)
  141. self.assertEqual(
  142. b"f2848e2ad16f329ae1c92e3b95e91888daa5bd01",
  143. sha_to_hex(p.get_stored_checksum()),
  144. )
  145. self.assertEqual(
  146. b"721980e866af9a5f93ad674144e1459b8ba3e7b7",
  147. sha_to_hex(p.get_pack_checksum()),
  148. )
  149. def test_index_check(self) -> None:
  150. p = self.get_pack_index(pack1_sha)
  151. self.assertSucceeds(p.check)
  152. def test_iterentries(self) -> None:
  153. p = self.get_pack_index(pack1_sha)
  154. entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
  155. self.assertEqual(
  156. [
  157. (b"6f670c0fb53f9463760b7295fbb814e965fb20c8", 178, None),
  158. (b"b2a2766a2879c209ab1176e7e778b81ae422eeaa", 138, None),
  159. (b"f18faa16531ac570a3fdc8c7ca16682548dafd12", 12, None),
  160. ],
  161. entries,
  162. )
  163. def test_iter(self) -> None:
  164. p = self.get_pack_index(pack1_sha)
  165. self.assertEqual({tree_sha, commit_sha, a_sha}, set(p))
  166. class TestPackDeltas(TestCase):
  167. test_string1 = b"The answer was flailing in the wind"
  168. test_string2 = b"The answer was falling down the pipe"
  169. test_string3 = b"zzzzz"
  170. test_string_empty = b""
  171. test_string_big = b"Z" * 8192
  172. test_string_huge = b"Z" * 100000
  173. def _test_roundtrip(self, base, target) -> None:
  174. self.assertEqual(
  175. target, b"".join(apply_delta(base, list(create_delta(base, target))))
  176. )
  177. def test_nochange(self) -> None:
  178. self._test_roundtrip(self.test_string1, self.test_string1)
  179. def test_nochange_huge(self) -> None:
  180. self._test_roundtrip(self.test_string_huge, self.test_string_huge)
  181. def test_change(self) -> None:
  182. self._test_roundtrip(self.test_string1, self.test_string2)
  183. def test_rewrite(self) -> None:
  184. self._test_roundtrip(self.test_string1, self.test_string3)
  185. def test_empty_to_big(self) -> None:
  186. self._test_roundtrip(self.test_string_empty, self.test_string_big)
  187. def test_empty_to_huge(self) -> None:
  188. self._test_roundtrip(self.test_string_empty, self.test_string_huge)
  189. def test_huge_copy(self) -> None:
  190. self._test_roundtrip(
  191. self.test_string_huge + self.test_string1,
  192. self.test_string_huge + self.test_string2,
  193. )
  194. def test_dest_overflow(self) -> None:
  195. self.assertRaises(
  196. ApplyDeltaError,
  197. apply_delta,
  198. b"a" * 0x10000,
  199. b"\x80\x80\x04\x80\x80\x04\x80" + b"a" * 0x10000,
  200. )
  201. self.assertRaises(
  202. ApplyDeltaError, apply_delta, b"", b"\x00\x80\x02\xb0\x11\x11"
  203. )
  204. def test_apply_delta_invalid_opcode(self) -> None:
  205. """Test apply_delta with an invalid opcode."""
  206. # Create a delta with an invalid opcode (0xff is not valid)
  207. invalid_delta = [b"\xff\x01\x02"]
  208. base = b"test base"
  209. # Should raise ApplyDeltaError
  210. self.assertRaises(ApplyDeltaError, apply_delta, base, invalid_delta)
  211. def test_create_delta_insert_only(self) -> None:
  212. """Test create_delta when only insertions are required."""
  213. base = b""
  214. target = b"brand new content"
  215. delta = list(create_delta(base, target))
  216. # Apply the delta to verify it works correctly
  217. result = apply_delta(base, delta)
  218. self.assertEqual(target, b"".join(result))
  219. def test_create_delta_copy_only(self) -> None:
  220. """Test create_delta when only copy operations are required."""
  221. base = b"content to be copied"
  222. target = b"content to be copied" # Identical to base
  223. delta = list(create_delta(base, target))
  224. # Apply the delta to verify
  225. result = apply_delta(base, delta)
  226. self.assertEqual(target, b"".join(result))
  227. def test_pypy_issue(self) -> None:
  228. # Test for https://github.com/jelmer/dulwich/issues/509 /
  229. # https://bitbucket.org/pypy/pypy/issues/2499/cpyext-pystring_asstring-doesnt-work
  230. chunks = [
  231. b"tree 03207ccf58880a748188836155ceed72f03d65d6\n"
  232. b"parent 408fbab530fd4abe49249a636a10f10f44d07a21\n"
  233. b"author Victor Stinner <victor.stinner@gmail.com> "
  234. b"1421355207 +0100\n"
  235. b"committer Victor Stinner <victor.stinner@gmail.com> "
  236. b"1421355207 +0100\n"
  237. b"\n"
  238. b"Backout changeset 3a06020af8cf\n"
  239. b"\nStreamWriter: close() now clears the reference to the "
  240. b"transport\n"
  241. b"\nStreamWriter now raises an exception if it is closed: "
  242. b"write(), writelines(),\n"
  243. b"write_eof(), can_write_eof(), get_extra_info(), drain().\n"
  244. ]
  245. delta = [
  246. b"\xcd\x03\xad\x03]tree ff3c181a393d5a7270cddc01ea863818a8621ca8\n"
  247. b"parent 20a103cc90135494162e819f98d0edfc1f1fba6b\x91]7\x0510738"
  248. b"\x91\x99@\x0b10738 +0100\x93\x04\x01\xc9"
  249. ]
  250. res = apply_delta(chunks, delta)
  251. expected = [
  252. b"tree ff3c181a393d5a7270cddc01ea863818a8621ca8\n"
  253. b"parent 20a103cc90135494162e819f98d0edfc1f1fba6b",
  254. b"\nauthor Victor Stinner <victor.stinner@gmail.com> 14213",
  255. b"10738",
  256. b" +0100\ncommitter Victor Stinner <victor.stinner@gmail.com> 14213",
  257. b"10738 +0100",
  258. b"\n\nStreamWriter: close() now clears the reference to the "
  259. b"transport\n\n"
  260. b"StreamWriter now raises an exception if it is closed: "
  261. b"write(), writelines(),\n"
  262. b"write_eof(), can_write_eof(), get_extra_info(), drain().\n",
  263. ]
  264. self.assertEqual(b"".join(expected), b"".join(res))
  265. def _do_test_create_delta_various_cases(self, create_delta_func):
  266. """Test create_delta with various input cases for both Python and Rust versions."""
  267. import types
  268. # Helper to normalize delta output (Rust returns bytes, Python returns Iterator[bytes])
  269. def get_delta(base, target):
  270. result = create_delta_func(base, target)
  271. # Check if it's a Rust extension (returns bytes directly)
  272. if isinstance(create_delta_func, types.BuiltinFunctionType):
  273. return result
  274. # Python version returns iterator
  275. return b"".join(result)
  276. # Test case 1: Identical content
  277. base = b"hello world"
  278. target = b"hello world"
  279. delta = get_delta(base, target)
  280. result = b"".join(apply_delta(base, delta))
  281. self.assertEqual(target, result)
  282. # Test case 2: Complete rewrite
  283. base = b"aaaaaaaaaa"
  284. target = b"bbbbbbbbbb"
  285. delta = get_delta(base, target)
  286. result = b"".join(apply_delta(base, delta))
  287. self.assertEqual(target, result)
  288. # Test case 3: Partial replacement
  289. base = b"The quick brown fox jumps over the lazy dog"
  290. target = b"The quick brown cat jumps over the lazy dog"
  291. delta = get_delta(base, target)
  292. result = b"".join(apply_delta(base, delta))
  293. self.assertEqual(target, result)
  294. # Test case 4: Insertion at end
  295. base = b"hello"
  296. target = b"hello world"
  297. delta = get_delta(base, target)
  298. result = b"".join(apply_delta(base, delta))
  299. self.assertEqual(target, result)
  300. # Test case 5: Deletion from end
  301. base = b"hello world"
  302. target = b"hello"
  303. delta = get_delta(base, target)
  304. result = b"".join(apply_delta(base, delta))
  305. self.assertEqual(target, result)
  306. # Test case 6: Empty base
  307. base = b""
  308. target = b"new content"
  309. delta = get_delta(base, target)
  310. result = b"".join(apply_delta(base, delta))
  311. self.assertEqual(target, result)
  312. # Test case 7: Empty target
  313. base = b"old content"
  314. target = b""
  315. delta = get_delta(base, target)
  316. result = b"".join(apply_delta(base, delta))
  317. self.assertEqual(target, result)
  318. # Test case 8: Large content
  319. base = b"x" * 10000
  320. target = b"x" * 9000 + b"y" * 1000
  321. delta = get_delta(base, target)
  322. result = b"".join(apply_delta(base, delta))
  323. self.assertEqual(target, result)
  324. # Test case 9: Multiple changes
  325. base = b"line1\nline2\nline3\nline4\n"
  326. target = b"line1\nmodified2\nline3\nmodified4\n"
  327. delta = get_delta(base, target)
  328. result = b"".join(apply_delta(base, delta))
  329. self.assertEqual(target, result)
  330. # Test both Python and Rust versions
  331. test_create_delta_py = functest_builder(
  332. _do_test_create_delta_various_cases, _create_delta_py
  333. )
  334. test_create_delta_extension = ext_functest_builder(
  335. _do_test_create_delta_various_cases, _create_delta_rs
  336. )
  337. def _do_test_create_delta_output_consistency(self, create_delta_func):
  338. """Test that create_delta produces consistent and valid output."""
  339. import types
  340. # Helper to normalize delta output
  341. def get_delta(base, target):
  342. result = create_delta_func(base, target)
  343. if isinstance(create_delta_func, types.BuiltinFunctionType):
  344. return result
  345. return b"".join(result)
  346. test_cases = [
  347. (b"", b""),
  348. (b"a", b"a"),
  349. (b"abc", b"abc"),
  350. (b"abc", b"def"),
  351. (b"hello world", b"hello rust"),
  352. (b"x" * 100, b"y" * 100),
  353. (b"same prefix but different suffix", b"same prefix with new suffix"),
  354. ]
  355. for base, target in test_cases:
  356. delta = get_delta(base, target)
  357. # Verify delta can be applied
  358. result = b"".join(apply_delta(base, delta))
  359. self.assertEqual(
  360. target,
  361. result,
  362. f"Delta failed for base={base[:20]}... target={target[:20]}...",
  363. )
  364. # Verify delta is not empty (should have at least header)
  365. self.assertGreater(len(delta), 0)
  366. test_create_delta_output_consistency_py = functest_builder(
  367. _do_test_create_delta_output_consistency, _create_delta_py
  368. )
  369. test_create_delta_output_consistency_extension = ext_functest_builder(
  370. _do_test_create_delta_output_consistency, _create_delta_rs
  371. )
  372. def _do_test_create_delta_produces_valid_deltas(self, create_delta_func):
  373. """Test that deltas produced are valid Git delta format."""
  374. import types
  375. # Helper to normalize delta output
  376. def get_delta(base, target):
  377. result = create_delta_func(base, target)
  378. if isinstance(create_delta_func, types.BuiltinFunctionType):
  379. return result
  380. return b"".join(result)
  381. base = b"The quick brown fox"
  382. target = b"The slow brown fox"
  383. delta = get_delta(base, target)
  384. # A valid delta should have:
  385. # 1. Base size header
  386. # 2. Target size header
  387. # 3. Delta operations
  388. self.assertGreater(len(delta), 2) # At minimum 2 header bytes
  389. # Apply delta to verify it's valid
  390. result = b"".join(apply_delta(base, delta))
  391. self.assertEqual(target, result)
  392. test_create_delta_valid_format_py = functest_builder(
  393. _do_test_create_delta_produces_valid_deltas, _create_delta_py
  394. )
  395. test_create_delta_valid_format_extension = ext_functest_builder(
  396. _do_test_create_delta_produces_valid_deltas, _create_delta_rs
  397. )
  398. class TestPackData(PackTests):
  399. """Tests getting the data from the packfile."""
  400. def test_create_pack(self) -> None:
  401. self.get_pack_data(pack1_sha).close()
  402. def test_from_file(self) -> None:
  403. path = os.path.join(
  404. self.datadir, "pack-{}.pack".format(pack1_sha.decode("ascii"))
  405. )
  406. with open(path, "rb") as f:
  407. pack_data = PackData.from_file(f, DEFAULT_OBJECT_FORMAT, os.path.getsize(path))
  408. pack_data.close()
  409. def test_pack_len(self) -> None:
  410. with self.get_pack_data(pack1_sha) as p:
  411. self.assertEqual(3, len(p))
  412. def test_index_check(self) -> None:
  413. with self.get_pack_data(pack1_sha) as p:
  414. self.assertSucceeds(p.check)
  415. def test_get_stored_checksum(self) -> None:
  416. """Test getting the stored checksum of the pack data."""
  417. with self.get_pack_data(pack1_sha) as p:
  418. checksum = p.get_stored_checksum()
  419. self.assertEqual(20, len(checksum))
  420. # Verify it's a valid SHA1 hash (20 bytes)
  421. self.assertIsInstance(checksum, bytes)
  422. # Removed test_check_pack_data_size as it was accessing private attributes
  423. def test_close_twice(self) -> None:
  424. """Test that calling close multiple times is safe."""
  425. p = self.get_pack_data(pack1_sha)
  426. p.close()
  427. # Second close should not raise an exception
  428. p.close()
  429. def test_iter_unpacked(self) -> None:
  430. with self.get_pack_data(pack1_sha) as p:
  431. commit_data = (
  432. b"tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n"
  433. b"author James Westby <jw+debian@jameswestby.net> "
  434. b"1174945067 +0100\n"
  435. b"committer James Westby <jw+debian@jameswestby.net> "
  436. b"1174945067 +0100\n"
  437. b"\n"
  438. b"Test commit\n"
  439. )
  440. blob_sha = b"6f670c0fb53f9463760b7295fbb814e965fb20c8"
  441. tree_data = b"100644 a\0" + hex_to_sha(blob_sha)
  442. actual = list(p.iter_unpacked())
  443. self.assertEqual(
  444. [
  445. UnpackedObject(
  446. offset=12,
  447. pack_type_num=1,
  448. decomp_chunks=[commit_data],
  449. crc32=None,
  450. ),
  451. UnpackedObject(
  452. offset=138,
  453. pack_type_num=2,
  454. decomp_chunks=[tree_data],
  455. crc32=None,
  456. ),
  457. UnpackedObject(
  458. offset=178,
  459. pack_type_num=3,
  460. decomp_chunks=[b"test 1\n"],
  461. crc32=None,
  462. ),
  463. ],
  464. actual,
  465. )
  466. def test_iterentries(self) -> None:
  467. with self.get_pack_data(pack1_sha) as p:
  468. entries = {(sha_to_hex(s), o, c) for s, o, c in p.iterentries()}
  469. self.assertEqual(
  470. {
  471. (
  472. b"6f670c0fb53f9463760b7295fbb814e965fb20c8",
  473. 178,
  474. 1373561701,
  475. ),
  476. (
  477. b"b2a2766a2879c209ab1176e7e778b81ae422eeaa",
  478. 138,
  479. 912998690,
  480. ),
  481. (
  482. b"f18faa16531ac570a3fdc8c7ca16682548dafd12",
  483. 12,
  484. 3775879613,
  485. ),
  486. },
  487. entries,
  488. )
  489. def test_create_index_v1(self) -> None:
  490. with self.get_pack_data(pack1_sha) as p:
  491. filename = os.path.join(self.tempdir, "v1test.idx")
  492. p.create_index_v1(filename)
  493. idx1 = load_pack_index(filename, DEFAULT_OBJECT_FORMAT)
  494. idx2 = self.get_pack_index(pack1_sha)
  495. self.assertEqual(oct(os.stat(filename).st_mode), indexmode)
  496. self.assertEqual(idx1, idx2)
  497. def test_create_index_v2(self) -> None:
  498. with self.get_pack_data(pack1_sha) as p:
  499. filename = os.path.join(self.tempdir, "v2test.idx")
  500. p.create_index_v2(filename)
  501. idx1 = load_pack_index(filename, DEFAULT_OBJECT_FORMAT)
  502. idx2 = self.get_pack_index(pack1_sha)
  503. self.assertEqual(oct(os.stat(filename).st_mode), indexmode)
  504. self.assertEqual(idx1, idx2)
  505. def test_create_index_v3(self) -> None:
  506. with self.get_pack_data(pack1_sha) as p:
  507. filename = os.path.join(self.tempdir, "v3test.idx")
  508. p.create_index_v3(filename)
  509. idx1 = load_pack_index(filename, DEFAULT_OBJECT_FORMAT)
  510. idx2 = self.get_pack_index(pack1_sha)
  511. self.assertEqual(oct(os.stat(filename).st_mode), indexmode)
  512. self.assertEqual(idx1, idx2)
  513. self.assertIsInstance(idx1, PackIndex3)
  514. self.assertEqual(idx1.version, 3)
  515. def test_create_index_version3(self) -> None:
  516. with self.get_pack_data(pack1_sha) as p:
  517. filename = os.path.join(self.tempdir, "version3test.idx")
  518. p.create_index(filename, version=3)
  519. idx = load_pack_index(filename, DEFAULT_OBJECT_FORMAT)
  520. self.assertIsInstance(idx, PackIndex3)
  521. self.assertEqual(idx.version, 3)
  522. def test_compute_file_sha(self) -> None:
  523. f = BytesIO(b"abcd1234wxyz")
  524. try:
  525. self.assertEqual(
  526. sha1(b"abcd1234wxyz").hexdigest(),
  527. compute_file_sha(f, DEFAULT_OBJECT_FORMAT.hash_func).hexdigest(),
  528. )
  529. self.assertEqual(
  530. sha1(b"abcd1234wxyz").hexdigest(),
  531. compute_file_sha(
  532. f, DEFAULT_OBJECT_FORMAT.hash_func, buffer_size=5
  533. ).hexdigest(),
  534. )
  535. self.assertEqual(
  536. sha1(b"abcd1234").hexdigest(),
  537. compute_file_sha(
  538. f, DEFAULT_OBJECT_FORMAT.hash_func, end_ofs=-4
  539. ).hexdigest(),
  540. )
  541. self.assertEqual(
  542. sha1(b"1234wxyz").hexdigest(),
  543. compute_file_sha(
  544. f, DEFAULT_OBJECT_FORMAT.hash_func, start_ofs=4
  545. ).hexdigest(),
  546. )
  547. self.assertEqual(
  548. sha1(b"1234").hexdigest(),
  549. compute_file_sha(
  550. f, DEFAULT_OBJECT_FORMAT.hash_func, start_ofs=4, end_ofs=-4
  551. ).hexdigest(),
  552. )
  553. finally:
  554. f.close()
  555. def test_compute_file_sha_short_file(self) -> None:
  556. f = BytesIO(b"abcd1234wxyz")
  557. try:
  558. self.assertRaises(
  559. AssertionError,
  560. compute_file_sha,
  561. f,
  562. DEFAULT_OBJECT_FORMAT.hash_func,
  563. -20,
  564. )
  565. self.assertRaises(
  566. AssertionError,
  567. compute_file_sha,
  568. f,
  569. DEFAULT_OBJECT_FORMAT.hash_func,
  570. 0,
  571. 20,
  572. )
  573. self.assertRaises(
  574. AssertionError,
  575. compute_file_sha,
  576. f,
  577. DEFAULT_OBJECT_FORMAT.hash_func,
  578. 10,
  579. -12,
  580. )
  581. finally:
  582. f.close()
  583. class TestPack(PackTests):
  584. def test_len(self) -> None:
  585. with self.get_pack(pack1_sha) as p:
  586. self.assertEqual(3, len(p))
  587. def test_contains(self) -> None:
  588. with self.get_pack(pack1_sha) as p:
  589. self.assertIn(tree_sha, p)
  590. def test_get(self) -> None:
  591. with self.get_pack(pack1_sha) as p:
  592. self.assertEqual(type(p[tree_sha]), Tree)
  593. def test_iter(self) -> None:
  594. with self.get_pack(pack1_sha) as p:
  595. self.assertEqual({tree_sha, commit_sha, a_sha}, set(p))
  596. def test_iterobjects(self) -> None:
  597. with self.get_pack(pack1_sha) as p:
  598. expected = {p[s] for s in [commit_sha, tree_sha, a_sha]}
  599. self.assertEqual(expected, set(list(p.iterobjects())))
  600. def test_pack_tuples(self) -> None:
  601. with self.get_pack(pack1_sha) as p:
  602. tuples = p.pack_tuples()
  603. expected = {(p[s], None) for s in [commit_sha, tree_sha, a_sha]}
  604. self.assertEqual(expected, set(list(tuples)))
  605. self.assertEqual(expected, set(list(tuples)))
  606. self.assertEqual(3, len(tuples))
  607. # Removed test_pack_tuples_with_progress as it was using parameters not supported by the API
  608. def test_get_object_at(self) -> None:
  609. """Tests random access for non-delta objects."""
  610. with self.get_pack(pack1_sha) as p:
  611. obj = p[a_sha]
  612. self.assertEqual(obj.type_name, b"blob")
  613. self.assertEqual(obj.sha().hexdigest().encode("ascii"), a_sha)
  614. obj = p[tree_sha]
  615. self.assertEqual(obj.type_name, b"tree")
  616. self.assertEqual(obj.sha().hexdigest().encode("ascii"), tree_sha)
  617. obj = p[commit_sha]
  618. self.assertEqual(obj.type_name, b"commit")
  619. self.assertEqual(obj.sha().hexdigest().encode("ascii"), commit_sha)
  620. def test_copy(self) -> None:
  621. with self.get_pack(pack1_sha) as origpack:
  622. self.assertSucceeds(origpack.index.check)
  623. basename = os.path.join(self.tempdir, "Elch")
  624. write_pack(
  625. basename, origpack.pack_tuples(), object_format=DEFAULT_OBJECT_FORMAT
  626. )
  627. with Pack(basename, object_format=DEFAULT_OBJECT_FORMAT) as newpack:
  628. self.assertEqual(origpack, newpack)
  629. self.assertSucceeds(newpack.index.check)
  630. self.assertEqual(origpack.name(), newpack.name())
  631. # Note: We don't compare pack data checksums here because Git does
  632. # not require deterministic object ordering in pack files. The same
  633. # set of objects can be written in different orders (e.g., due to
  634. # dict iteration order differences across Python versions/platforms),
  635. # producing different but equally valid pack files with different
  636. # checksums. The assertEqual above already verifies both packs
  637. # contain the same objects by comparing their indices.
  638. wrong_version = origpack.index.version != newpack.index.version
  639. orig_checksum = origpack.index.get_stored_checksum()
  640. new_checksum = newpack.index.get_stored_checksum()
  641. self.assertTrue(wrong_version or orig_checksum == new_checksum)
  642. def test_commit_obj(self) -> None:
  643. with self.get_pack(pack1_sha) as p:
  644. commit = p[commit_sha]
  645. self.assertEqual(b"James Westby <jw+debian@jameswestby.net>", commit.author)
  646. self.assertEqual([], commit.parents)
  647. def _copy_pack(self, origpack):
  648. basename = os.path.join(self.tempdir, "somepack")
  649. write_pack(
  650. basename, origpack.pack_tuples(), object_format=DEFAULT_OBJECT_FORMAT
  651. )
  652. return Pack(basename, object_format=DEFAULT_OBJECT_FORMAT)
  653. def test_keep_no_message(self) -> None:
  654. with self.get_pack(pack1_sha) as p:
  655. p = self._copy_pack(p)
  656. with p:
  657. keepfile_name = p.keep()
  658. # file should exist
  659. self.assertTrue(os.path.exists(keepfile_name))
  660. with open(keepfile_name) as f:
  661. buf = f.read()
  662. self.assertEqual("", buf)
  663. def test_keep_message(self) -> None:
  664. with self.get_pack(pack1_sha) as p:
  665. p = self._copy_pack(p)
  666. msg = b"some message"
  667. with p:
  668. keepfile_name = p.keep(msg)
  669. # file should exist
  670. self.assertTrue(os.path.exists(keepfile_name))
  671. # and contain the right message, with a linefeed
  672. with open(keepfile_name, "rb") as f:
  673. buf = f.read()
  674. self.assertEqual(msg + b"\n", buf)
  675. def test_name(self) -> None:
  676. with self.get_pack(pack1_sha) as p:
  677. self.assertEqual(pack1_sha, p.name())
  678. def test_length_mismatch(self) -> None:
  679. with self.get_pack_data(pack1_sha) as data:
  680. index = self.get_pack_index(pack1_sha)
  681. pack = Pack.from_objects(data, index)
  682. self.addCleanup(pack.close)
  683. pack.check_length_and_checksum()
  684. data._file.seek(12)
  685. bad_file = BytesIO()
  686. write_pack_header(bad_file.write, 9999)
  687. bad_file.write(data._file.read())
  688. bad_file = BytesIO(bad_file.getvalue())
  689. bad_data = PackData("", file=bad_file, object_format=DEFAULT_OBJECT_FORMAT)
  690. self.addCleanup(bad_data.close)
  691. bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
  692. self.addCleanup(bad_pack.close)
  693. self.assertRaises(AssertionError, lambda: bad_pack.data)
  694. self.assertRaises(AssertionError, bad_pack.check_length_and_checksum)
  695. def test_checksum_mismatch(self) -> None:
  696. with self.get_pack_data(pack1_sha) as data:
  697. index = self.get_pack_index(pack1_sha)
  698. pack = Pack.from_objects(data, index)
  699. self.addCleanup(pack.close)
  700. pack.check_length_and_checksum()
  701. data._file.seek(0)
  702. bad_file = BytesIO(data._file.read()[:-20] + (b"\xff" * 20))
  703. bad_data = PackData("", file=bad_file, object_format=DEFAULT_OBJECT_FORMAT)
  704. self.addCleanup(bad_data.close)
  705. bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
  706. self.addCleanup(bad_pack.close)
  707. self.assertRaises(ChecksumMismatch, lambda: bad_pack.data)
  708. self.assertRaises(ChecksumMismatch, bad_pack.check_length_and_checksum)
  709. def test_iterobjects_2(self) -> None:
  710. with self.get_pack(pack1_sha) as p:
  711. objs = {o.id: o for o in p.iterobjects()}
  712. self.assertEqual(3, len(objs))
  713. self.assertEqual(sorted(objs), sorted(p.index))
  714. self.assertIsInstance(objs[a_sha], Blob)
  715. self.assertIsInstance(objs[tree_sha], Tree)
  716. self.assertIsInstance(objs[commit_sha], Commit)
  717. def test_iterobjects_subset(self) -> None:
  718. with self.get_pack(pack1_sha) as p:
  719. objs = {o.id: o for o in p.iterobjects_subset([commit_sha])}
  720. self.assertEqual(1, len(objs))
  721. self.assertIsInstance(objs[commit_sha], Commit)
  722. def test_iterobjects_subset_empty(self) -> None:
  723. """Test iterobjects_subset with an empty subset."""
  724. with self.get_pack(pack1_sha) as p:
  725. objs = list(p.iterobjects_subset([]))
  726. self.assertEqual(0, len(objs))
  727. def test_iterobjects_subset_nonexistent(self) -> None:
  728. """Test iterobjects_subset with non-existent object IDs."""
  729. with self.get_pack(pack1_sha) as p:
  730. # Create a fake SHA that doesn't exist in the pack
  731. fake_sha = b"1" * 40
  732. # KeyError is expected when trying to access a non-existent object
  733. # We'll use a try-except block to test the behavior
  734. try:
  735. list(p.iterobjects_subset([fake_sha]))
  736. self.fail("Expected KeyError when accessing non-existent object")
  737. except KeyError:
  738. pass # This is the expected behavior
  739. def test_check_length_and_checksum(self) -> None:
  740. """Test that check_length_and_checksum works correctly."""
  741. with self.get_pack(pack1_sha) as p:
  742. # This should not raise an exception
  743. p.check_length_and_checksum()
  744. class TestThinPack(PackTests):
  745. def setUp(self) -> None:
  746. super().setUp()
  747. self.store = MemoryObjectStore()
  748. self.blobs = {}
  749. for blob in (b"foo", b"bar", b"foo1234", b"bar2468"):
  750. self.blobs[blob] = make_object(Blob, data=blob)
  751. self.store.add_object(self.blobs[b"foo"])
  752. self.store.add_object(self.blobs[b"bar"])
  753. # Build a thin pack. 'foo' is as an external reference, 'bar' an
  754. # internal reference.
  755. self.pack_dir = tempfile.mkdtemp()
  756. self.addCleanup(shutil.rmtree, self.pack_dir)
  757. self.pack_prefix = os.path.join(self.pack_dir, "pack")
  758. with open(self.pack_prefix + ".pack", "wb") as f:
  759. build_pack(
  760. f,
  761. [
  762. (REF_DELTA, (self.blobs[b"foo"].id, b"foo1234")),
  763. (Blob.type_num, b"bar"),
  764. (REF_DELTA, (self.blobs[b"bar"].id, b"bar2468")),
  765. ],
  766. store=self.store,
  767. )
  768. # Index the new pack.
  769. with self.make_pack(True) as pack:
  770. with PackData(pack._data_path, object_format=DEFAULT_OBJECT_FORMAT) as data:
  771. data.create_index(
  772. self.pack_prefix + ".idx", resolve_ext_ref=pack.resolve_ext_ref
  773. )
  774. del self.store[self.blobs[b"bar"].id]
  775. def make_pack(self, resolve_ext_ref):
  776. return Pack(
  777. self.pack_prefix,
  778. object_format=DEFAULT_OBJECT_FORMAT,
  779. resolve_ext_ref=self.store.get_raw if resolve_ext_ref else None,
  780. )
  781. def test_get_raw(self) -> None:
  782. with self.make_pack(False) as p:
  783. self.assertRaises(KeyError, p.get_raw, self.blobs[b"foo1234"].id)
  784. with self.make_pack(True) as p:
  785. self.assertEqual((3, b"foo1234"), p.get_raw(self.blobs[b"foo1234"].id))
  786. def test_get_unpacked_object(self) -> None:
  787. self.maxDiff = None
  788. with self.make_pack(False) as p:
  789. expected = UnpackedObject(
  790. 7,
  791. delta_base=b"\x19\x10(\x15f=#\xf8\xb7ZG\xe7\xa0\x19e\xdc\xdc\x96F\x8c",
  792. decomp_chunks=[b"\x03\x07\x90\x03\x041234"],
  793. )
  794. expected.offset = 12
  795. got = p.get_unpacked_object(self.blobs[b"foo1234"].id)
  796. self.assertEqual(expected, got)
  797. with self.make_pack(True) as p:
  798. expected = UnpackedObject(
  799. 7,
  800. delta_base=b"\x19\x10(\x15f=#\xf8\xb7ZG\xe7\xa0\x19e\xdc\xdc\x96F\x8c",
  801. decomp_chunks=[b"\x03\x07\x90\x03\x041234"],
  802. )
  803. expected.offset = 12
  804. got = p.get_unpacked_object(self.blobs[b"foo1234"].id)
  805. self.assertEqual(
  806. expected,
  807. got,
  808. )
  809. def test_iterobjects(self) -> None:
  810. with self.make_pack(False) as p:
  811. self.assertRaises(UnresolvedDeltas, list, p.iterobjects())
  812. with self.make_pack(True) as p:
  813. self.assertEqual(
  814. sorted(
  815. [
  816. self.blobs[b"foo1234"].id,
  817. self.blobs[b"bar"].id,
  818. self.blobs[b"bar2468"].id,
  819. ]
  820. ),
  821. sorted(o.id for o in p.iterobjects()),
  822. )
  823. class WritePackTests(TestCase):
  824. def test_write_pack_header(self) -> None:
  825. f = BytesIO()
  826. try:
  827. write_pack_header(f.write, 42)
  828. self.assertEqual(b"PACK\x00\x00\x00\x02\x00\x00\x00*", f.getvalue())
  829. finally:
  830. f.close()
  831. def test_write_pack_object(self) -> None:
  832. f = BytesIO()
  833. try:
  834. f.write(b"header")
  835. offset = f.tell()
  836. crc32 = write_pack_object(
  837. f.write, Blob.type_num, b"blob", object_format=DEFAULT_OBJECT_FORMAT
  838. )
  839. self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xFFFFFFFF)
  840. f.write(b"x") # unpack_object needs extra trailing data.
  841. f.seek(offset)
  842. unpacked, unused = unpack_object(
  843. f.read, DEFAULT_OBJECT_FORMAT.hash_func, compute_crc32=True
  844. )
  845. self.assertEqual(Blob.type_num, unpacked.pack_type_num)
  846. self.assertEqual(Blob.type_num, unpacked.obj_type_num)
  847. self.assertEqual([b"blob"], unpacked.decomp_chunks)
  848. self.assertEqual(crc32, unpacked.crc32)
  849. self.assertEqual(b"x", unused)
  850. finally:
  851. f.close()
  852. def test_write_pack_object_sha(self) -> None:
  853. f = BytesIO()
  854. f.write(b"header")
  855. offset = f.tell()
  856. sha_a = sha1(b"foo")
  857. sha_b = sha_a.copy()
  858. write_pack_object(
  859. f.write,
  860. Blob.type_num,
  861. b"blob",
  862. sha=sha_a,
  863. object_format=DEFAULT_OBJECT_FORMAT,
  864. )
  865. self.assertNotEqual(sha_a.digest(), sha_b.digest())
  866. sha_b.update(f.getvalue()[offset:])
  867. self.assertEqual(sha_a.digest(), sha_b.digest())
  868. def test_write_pack_object_compression_level(self) -> None:
  869. f = BytesIO()
  870. f.write(b"header")
  871. offset = f.tell()
  872. sha_a = sha1(b"foo")
  873. sha_b = sha_a.copy()
  874. write_pack_object(
  875. f.write,
  876. Blob.type_num,
  877. b"blob",
  878. sha=sha_a,
  879. compression_level=6,
  880. object_format=DEFAULT_OBJECT_FORMAT,
  881. )
  882. self.assertNotEqual(sha_a.digest(), sha_b.digest())
  883. sha_b.update(f.getvalue()[offset:])
  884. self.assertEqual(sha_a.digest(), sha_b.digest())
  885. pack_checksum = hex_to_sha("721980e866af9a5f93ad674144e1459b8ba3e7b7")
  886. class BaseTestPackIndexWriting:
  887. def assertSucceeds(self, func, *args, **kwargs) -> None:
  888. try:
  889. func(*args, **kwargs)
  890. except ChecksumMismatch as e:
  891. self.fail(e)
  892. def index(self, filename, entries, pack_checksum) -> NoReturn:
  893. raise NotImplementedError(self.index)
  894. def test_empty(self) -> None:
  895. idx = self.index("empty.idx", [], pack_checksum)
  896. self.assertEqual(idx.get_pack_checksum(), pack_checksum)
  897. self.assertEqual(0, len(idx))
  898. def test_large(self) -> None:
  899. entry1_sha = hex_to_sha("4e6388232ec39792661e2e75db8fb117fc869ce6")
  900. entry2_sha = hex_to_sha("e98f071751bd77f59967bfa671cd2caebdccc9a2")
  901. entries = [
  902. (entry1_sha, 0xF2972D0830529B87, 24),
  903. (entry2_sha, (~0xF2972D0830529B87) & (2**64 - 1), 92),
  904. ]
  905. if not self._supports_large:
  906. self.assertRaises(
  907. TypeError, self.index, "single.idx", entries, pack_checksum
  908. )
  909. return
  910. idx = self.index("single.idx", entries, pack_checksum)
  911. self.assertEqual(idx.get_pack_checksum(), pack_checksum)
  912. self.assertEqual(2, len(idx))
  913. actual_entries = list(idx.iterentries())
  914. self.assertEqual(len(entries), len(actual_entries))
  915. for mine, actual in zip(entries, actual_entries):
  916. my_sha, my_offset, my_crc = mine
  917. actual_sha, actual_offset, actual_crc = actual
  918. self.assertEqual(my_sha, actual_sha)
  919. self.assertEqual(my_offset, actual_offset)
  920. if self._has_crc32_checksum:
  921. self.assertEqual(my_crc, actual_crc)
  922. else:
  923. self.assertIsNone(actual_crc)
  924. def test_single(self) -> None:
  925. entry_sha = hex_to_sha("6f670c0fb53f9463760b7295fbb814e965fb20c8")
  926. my_entries = [(entry_sha, 178, 42)]
  927. idx = self.index("single.idx", my_entries, pack_checksum)
  928. self.assertEqual(idx.get_pack_checksum(), pack_checksum)
  929. self.assertEqual(1, len(idx))
  930. actual_entries = list(idx.iterentries())
  931. self.assertEqual(len(my_entries), len(actual_entries))
  932. for mine, actual in zip(my_entries, actual_entries):
  933. my_sha, my_offset, my_crc = mine
  934. actual_sha, actual_offset, actual_crc = actual
  935. self.assertEqual(my_sha, actual_sha)
  936. self.assertEqual(my_offset, actual_offset)
  937. if self._has_crc32_checksum:
  938. self.assertEqual(my_crc, actual_crc)
  939. else:
  940. self.assertIsNone(actual_crc)
  941. class BaseTestFilePackIndexWriting(BaseTestPackIndexWriting):
  942. def setUp(self) -> None:
  943. self.tempdir = tempfile.mkdtemp()
  944. def tearDown(self) -> None:
  945. shutil.rmtree(self.tempdir)
  946. def index(self, filename, entries, pack_checksum):
  947. path = os.path.join(self.tempdir, filename)
  948. self.writeIndex(path, entries, pack_checksum)
  949. idx = load_pack_index(path, DEFAULT_OBJECT_FORMAT)
  950. self.assertSucceeds(idx.check)
  951. self.assertEqual(idx.version, self._expected_version)
  952. return idx
  953. def writeIndex(self, filename, entries, pack_checksum) -> None:
  954. # FIXME: Write to BytesIO instead rather than hitting disk ?
  955. with GitFile(filename, "wb") as f:
  956. self._write_fn(f, entries, pack_checksum)
  957. class TestMemoryIndexWriting(TestCase, BaseTestPackIndexWriting):
  958. def setUp(self) -> None:
  959. TestCase.setUp(self)
  960. self._has_crc32_checksum = True
  961. self._supports_large = True
  962. def index(self, filename, entries, pack_checksum):
  963. from dulwich.object_format import DEFAULT_OBJECT_FORMAT
  964. return MemoryPackIndex(entries, DEFAULT_OBJECT_FORMAT, pack_checksum)
  965. def tearDown(self) -> None:
  966. TestCase.tearDown(self)
  967. class TestPackIndexWritingv1(TestCase, BaseTestFilePackIndexWriting):
  968. def setUp(self) -> None:
  969. TestCase.setUp(self)
  970. BaseTestFilePackIndexWriting.setUp(self)
  971. self._has_crc32_checksum = False
  972. self._expected_version = 1
  973. self._supports_large = False
  974. self._write_fn = write_pack_index_v1
  975. def tearDown(self) -> None:
  976. TestCase.tearDown(self)
  977. BaseTestFilePackIndexWriting.tearDown(self)
  978. class TestPackIndexWritingv2(TestCase, BaseTestFilePackIndexWriting):
  979. def setUp(self) -> None:
  980. TestCase.setUp(self)
  981. BaseTestFilePackIndexWriting.setUp(self)
  982. self._has_crc32_checksum = True
  983. self._supports_large = True
  984. self._expected_version = 2
  985. self._write_fn = write_pack_index_v2
  986. def tearDown(self) -> None:
  987. TestCase.tearDown(self)
  988. BaseTestFilePackIndexWriting.tearDown(self)
  989. class TestPackIndexWritingv3(TestCase, BaseTestFilePackIndexWriting):
  990. def setUp(self) -> None:
  991. TestCase.setUp(self)
  992. BaseTestFilePackIndexWriting.setUp(self)
  993. self._has_crc32_checksum = True
  994. self._supports_large = True
  995. self._expected_version = 3
  996. self._write_fn = write_pack_index_v3
  997. def tearDown(self) -> None:
  998. TestCase.tearDown(self)
  999. BaseTestFilePackIndexWriting.tearDown(self)
  1000. def test_load_v3_index_returns_packindex3(self) -> None:
  1001. """Test that loading a v3 index file returns a PackIndex3 instance."""
  1002. entries = [(b"abcd" * 5, 0, zlib.crc32(b""))]
  1003. filename = os.path.join(self.tempdir, "test.idx")
  1004. self.writeIndex(filename, entries, b"1234567890" * 2)
  1005. idx = load_pack_index(filename, DEFAULT_OBJECT_FORMAT)
  1006. self.assertIsInstance(idx, PackIndex3)
  1007. self.assertEqual(idx.version, 3)
  1008. self.assertEqual(idx.hash_format, 1) # SHA-1
  1009. self.assertEqual(idx.hash_size, 20)
  1010. self.assertEqual(idx.shortened_oid_len, 20)
  1011. def test_v3_hash_algorithm(self) -> None:
  1012. """Test v3 index correctly handles hash algorithm field."""
  1013. entries = [(b"a" * 20, 42, zlib.crc32(b"data"))]
  1014. filename = os.path.join(self.tempdir, "test_hash.idx")
  1015. # Write v3 index with SHA-1 (algorithm=1)
  1016. with GitFile(filename, "wb") as f:
  1017. write_pack_index_v3(f, entries, b"1" * 20, hash_format=1)
  1018. idx = load_pack_index(filename, DEFAULT_OBJECT_FORMAT)
  1019. self.assertEqual(idx.hash_format, 1)
  1020. self.assertEqual(idx.hash_size, 20)
  1021. def test_v3_sha256_length(self) -> None:
  1022. """Test v3 index with SHA-256 hash length."""
  1023. # For now, test that SHA-256 is not yet implemented
  1024. entries = [(b"a" * 32, 42, zlib.crc32(b"data"))]
  1025. filename = os.path.join(self.tempdir, "test_sha256.idx")
  1026. # SHA-256 should raise NotImplementedError
  1027. with self.assertRaises(NotImplementedError) as cm:
  1028. with GitFile(filename, "wb") as f:
  1029. write_pack_index_v3(f, entries, b"1" * 32, hash_format=2)
  1030. self.assertIn("SHA-256", str(cm.exception))
  1031. def test_v3_invalid_hash_algorithm(self) -> None:
  1032. """Test v3 index with invalid hash algorithm."""
  1033. entries = [(b"a" * 20, 42, zlib.crc32(b"data"))]
  1034. filename = os.path.join(self.tempdir, "test_invalid.idx")
  1035. # Invalid hash algorithm should raise ValueError
  1036. with self.assertRaises(ValueError) as cm:
  1037. with GitFile(filename, "wb") as f:
  1038. write_pack_index_v3(f, entries, b"1" * 20, hash_format=99)
  1039. self.assertIn("Unknown hash algorithm", str(cm.exception))
  1040. def test_v3_wrong_hash_length(self) -> None:
  1041. """Test v3 index with mismatched hash length."""
  1042. # Entry with wrong hash length for SHA-1
  1043. entries = [(b"a" * 15, 42, zlib.crc32(b"data"))] # Too short
  1044. filename = os.path.join(self.tempdir, "test_wrong_len.idx")
  1045. with self.assertRaises(ValueError) as cm:
  1046. with GitFile(filename, "wb") as f:
  1047. write_pack_index_v3(f, entries, b"1" * 20, hash_format=1)
  1048. self.assertIn("wrong length", str(cm.exception))
  1049. class WritePackIndexTests(TestCase):
  1050. """Tests for the configurable write_pack_index function."""
  1051. def test_default_pack_index_version_constant(self) -> None:
  1052. from dulwich.pack import DEFAULT_PACK_INDEX_VERSION
  1053. # Ensure the constant is set to version 2 (current Git default)
  1054. self.assertEqual(2, DEFAULT_PACK_INDEX_VERSION)
  1055. def test_write_pack_index_defaults_to_v2(self) -> None:
  1056. import tempfile
  1057. from dulwich.pack import (
  1058. DEFAULT_PACK_INDEX_VERSION,
  1059. load_pack_index,
  1060. write_pack_index,
  1061. )
  1062. tempdir = tempfile.mkdtemp()
  1063. self.addCleanup(shutil.rmtree, tempdir)
  1064. entries = [(b"1" * 20, 42, zlib.crc32(b"data"))]
  1065. filename = os.path.join(tempdir, "test_default.idx")
  1066. with GitFile(filename, "wb") as f:
  1067. write_pack_index(f, entries, b"P" * 20)
  1068. idx = load_pack_index(filename, DEFAULT_OBJECT_FORMAT)
  1069. self.assertEqual(DEFAULT_PACK_INDEX_VERSION, idx.version)
  1070. def test_write_pack_index_version_1(self) -> None:
  1071. import tempfile
  1072. from dulwich.pack import load_pack_index, write_pack_index
  1073. tempdir = tempfile.mkdtemp()
  1074. self.addCleanup(shutil.rmtree, tempdir)
  1075. entries = [(b"1" * 20, 42, zlib.crc32(b"data"))]
  1076. filename = os.path.join(tempdir, "test_v1.idx")
  1077. with GitFile(filename, "wb") as f:
  1078. write_pack_index(f, entries, b"P" * 20, version=1)
  1079. idx = load_pack_index(filename, DEFAULT_OBJECT_FORMAT)
  1080. self.assertEqual(1, idx.version)
  1081. def test_write_pack_index_version_3(self) -> None:
  1082. import tempfile
  1083. from dulwich.pack import load_pack_index, write_pack_index
  1084. tempdir = tempfile.mkdtemp()
  1085. self.addCleanup(shutil.rmtree, tempdir)
  1086. entries = [(b"1" * 20, 42, zlib.crc32(b"data"))]
  1087. filename = os.path.join(tempdir, "test_v3.idx")
  1088. with GitFile(filename, "wb") as f:
  1089. write_pack_index(f, entries, b"P" * 20, version=3)
  1090. idx = load_pack_index(filename, DEFAULT_OBJECT_FORMAT)
  1091. self.assertEqual(3, idx.version)
  1092. def test_write_pack_index_invalid_version(self) -> None:
  1093. import tempfile
  1094. from dulwich.pack import write_pack_index
  1095. tempdir = tempfile.mkdtemp()
  1096. self.addCleanup(shutil.rmtree, tempdir)
  1097. entries = [(b"1" * 20, 42, zlib.crc32(b"data"))]
  1098. filename = os.path.join(tempdir, "test_invalid.idx")
  1099. with self.assertRaises(ValueError) as cm:
  1100. with GitFile(filename, "wb") as f:
  1101. write_pack_index(f, entries, b"P" * 20, version=99)
  1102. self.assertIn("Unsupported pack index version: 99", str(cm.exception))
  1103. class MockFileWithoutFileno:
  1104. """Mock file-like object without fileno method."""
  1105. def __init__(self, content):
  1106. self.content = content
  1107. self.position = 0
  1108. def read(self, size=None):
  1109. if size is None:
  1110. result = self.content[self.position :]
  1111. self.position = len(self.content)
  1112. else:
  1113. result = self.content[self.position : self.position + size]
  1114. self.position += size
  1115. return result
  1116. def seek(self, position):
  1117. self.position = position
  1118. def tell(self):
  1119. return self.position
  1120. # Removed the PackWithoutMmapTests class since it was using private methods
  1121. class ReadZlibTests(TestCase):
  1122. decomp = (
  1123. b"tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n"
  1124. b"parent None\n"
  1125. b"author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n"
  1126. b"committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n"
  1127. b"\n"
  1128. b"Provide replacement for mmap()'s offset argument."
  1129. )
  1130. comp = zlib.compress(decomp)
  1131. extra = b"nextobject"
  1132. def setUp(self) -> None:
  1133. super().setUp()
  1134. self.read = BytesIO(self.comp + self.extra).read
  1135. self.unpacked = UnpackedObject(
  1136. Tree.type_num, decomp_len=len(self.decomp), crc32=0
  1137. )
  1138. def test_decompress_size(self) -> None:
  1139. good_decomp_len = len(self.decomp)
  1140. self.unpacked.decomp_len = -1
  1141. self.assertRaises(ValueError, read_zlib_chunks, self.read, self.unpacked)
  1142. self.unpacked.decomp_len = good_decomp_len - 1
  1143. self.assertRaises(zlib.error, read_zlib_chunks, self.read, self.unpacked)
  1144. self.unpacked.decomp_len = good_decomp_len + 1
  1145. self.assertRaises(zlib.error, read_zlib_chunks, self.read, self.unpacked)
  1146. def test_decompress_truncated(self) -> None:
  1147. read = BytesIO(self.comp[:10]).read
  1148. self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked)
  1149. read = BytesIO(self.comp).read
  1150. self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked)
  1151. def test_decompress_empty(self) -> None:
  1152. unpacked = UnpackedObject(Tree.type_num, decomp_len=0)
  1153. comp = zlib.compress(b"")
  1154. read = BytesIO(comp + self.extra).read
  1155. unused = read_zlib_chunks(read, unpacked)
  1156. self.assertEqual(b"", b"".join(unpacked.decomp_chunks))
  1157. self.assertNotEqual(b"", unused)
  1158. self.assertEqual(self.extra, unused + read())
  1159. def test_decompress_no_crc32(self) -> None:
  1160. self.unpacked.crc32 = None
  1161. read_zlib_chunks(self.read, self.unpacked)
  1162. self.assertEqual(None, self.unpacked.crc32)
  1163. def _do_decompress_test(self, buffer_size, **kwargs) -> None:
  1164. unused = read_zlib_chunks(
  1165. self.read, self.unpacked, buffer_size=buffer_size, **kwargs
  1166. )
  1167. self.assertEqual(self.decomp, b"".join(self.unpacked.decomp_chunks))
  1168. self.assertEqual(zlib.crc32(self.comp), self.unpacked.crc32)
  1169. self.assertNotEqual(b"", unused)
  1170. self.assertEqual(self.extra, unused + self.read())
  1171. def test_simple_decompress(self) -> None:
  1172. self._do_decompress_test(4096)
  1173. self.assertEqual(None, self.unpacked.comp_chunks)
  1174. # These buffer sizes are not intended to be realistic, but rather simulate
  1175. # larger buffer sizes that may end at various places.
  1176. def test_decompress_buffer_size_1(self) -> None:
  1177. self._do_decompress_test(1)
  1178. def test_decompress_buffer_size_2(self) -> None:
  1179. self._do_decompress_test(2)
  1180. def test_decompress_buffer_size_3(self) -> None:
  1181. self._do_decompress_test(3)
  1182. def test_decompress_buffer_size_4(self) -> None:
  1183. self._do_decompress_test(4)
  1184. def test_decompress_include_comp(self) -> None:
  1185. self._do_decompress_test(4096, include_comp=True)
  1186. self.assertEqual(self.comp, b"".join(self.unpacked.comp_chunks))
  1187. class DeltifyTests(TestCase):
  1188. def test_empty(self) -> None:
  1189. self.assertEqual([], list(deltify_pack_objects([])))
  1190. def test_single(self) -> None:
  1191. b = Blob.from_string(b"foo")
  1192. self.assertEqual(
  1193. [
  1194. UnpackedObject(
  1195. b.type_num,
  1196. sha=b.sha().digest(),
  1197. delta_base=None,
  1198. decomp_chunks=b.as_raw_chunks(),
  1199. )
  1200. ],
  1201. list(deltify_pack_objects([(b, b"")])),
  1202. )
  1203. def test_simple_delta(self) -> None:
  1204. b1 = Blob.from_string(b"a" * 101)
  1205. b2 = Blob.from_string(b"a" * 100)
  1206. delta = list(create_delta(b1.as_raw_chunks(), b2.as_raw_chunks()))
  1207. self.assertEqual(
  1208. [
  1209. UnpackedObject(
  1210. b1.type_num,
  1211. sha=b1.sha().digest(),
  1212. delta_base=None,
  1213. decomp_chunks=b1.as_raw_chunks(),
  1214. ),
  1215. UnpackedObject(
  1216. b2.type_num,
  1217. sha=b2.sha().digest(),
  1218. delta_base=b1.sha().digest(),
  1219. decomp_chunks=delta,
  1220. ),
  1221. ],
  1222. list(deltify_pack_objects([(b1, b""), (b2, b"")])),
  1223. )
  1224. class TestPackStreamReader(TestCase):
  1225. def test_read_objects_emtpy(self) -> None:
  1226. f = BytesIO()
  1227. build_pack(f, [])
  1228. reader = PackStreamReader(DEFAULT_OBJECT_FORMAT.hash_func, f.read)
  1229. self.assertEqual(0, len(list(reader.read_objects())))
  1230. def test_read_objects(self) -> None:
  1231. f = BytesIO()
  1232. entries = build_pack(
  1233. f,
  1234. [
  1235. (Blob.type_num, b"blob"),
  1236. (OFS_DELTA, (0, b"blob1")),
  1237. ],
  1238. )
  1239. reader = PackStreamReader(DEFAULT_OBJECT_FORMAT.hash_func, f.read)
  1240. objects = list(reader.read_objects(compute_crc32=True))
  1241. self.assertEqual(2, len(objects))
  1242. unpacked_blob, unpacked_delta = objects
  1243. self.assertEqual(entries[0][0], unpacked_blob.offset)
  1244. self.assertEqual(Blob.type_num, unpacked_blob.pack_type_num)
  1245. self.assertEqual(Blob.type_num, unpacked_blob.obj_type_num)
  1246. self.assertEqual(None, unpacked_blob.delta_base)
  1247. self.assertEqual(b"blob", b"".join(unpacked_blob.decomp_chunks))
  1248. self.assertEqual(entries[0][4], unpacked_blob.crc32)
  1249. self.assertEqual(entries[1][0], unpacked_delta.offset)
  1250. self.assertEqual(OFS_DELTA, unpacked_delta.pack_type_num)
  1251. self.assertEqual(None, unpacked_delta.obj_type_num)
  1252. self.assertEqual(
  1253. unpacked_delta.offset - unpacked_blob.offset,
  1254. unpacked_delta.delta_base,
  1255. )
  1256. delta = create_delta(b"blob", b"blob1")
  1257. self.assertEqual(b"".join(delta), b"".join(unpacked_delta.decomp_chunks))
  1258. self.assertEqual(entries[1][4], unpacked_delta.crc32)
  1259. def test_read_objects_buffered(self) -> None:
  1260. f = BytesIO()
  1261. build_pack(
  1262. f,
  1263. [
  1264. (Blob.type_num, b"blob"),
  1265. (OFS_DELTA, (0, b"blob1")),
  1266. ],
  1267. )
  1268. reader = PackStreamReader(
  1269. DEFAULT_OBJECT_FORMAT.hash_func, f.read, zlib_bufsize=4
  1270. )
  1271. self.assertEqual(2, len(list(reader.read_objects())))
  1272. def test_read_objects_empty(self) -> None:
  1273. reader = PackStreamReader(DEFAULT_OBJECT_FORMAT.hash_func, BytesIO().read)
  1274. self.assertRaises(AssertionError, list, reader.read_objects())
  1275. class TestPackIterator(DeltaChainIterator):
  1276. _compute_crc32 = True
  1277. def __init__(self, *args, **kwargs) -> None:
  1278. super().__init__(*args, **kwargs)
  1279. self._unpacked_offsets: set[int] = set()
  1280. def _result(self, unpacked):
  1281. """Return entries in the same format as build_pack."""
  1282. return (
  1283. unpacked.offset,
  1284. unpacked.obj_type_num,
  1285. b"".join(unpacked.obj_chunks),
  1286. unpacked.sha(),
  1287. unpacked.crc32,
  1288. )
  1289. def _resolve_object(self, offset, pack_type_num, base_chunks):
  1290. assert offset not in self._unpacked_offsets, (
  1291. f"Attempted to re-inflate offset {offset}"
  1292. )
  1293. self._unpacked_offsets.add(offset)
  1294. return super()._resolve_object(offset, pack_type_num, base_chunks)
  1295. class DeltaChainIteratorTests(TestCase):
  1296. def setUp(self) -> None:
  1297. super().setUp()
  1298. self.store = MemoryObjectStore()
  1299. self.fetched = set()
  1300. def store_blobs(self, blobs_data):
  1301. blobs = []
  1302. for data in blobs_data:
  1303. blob = make_object(Blob, data=data)
  1304. blobs.append(blob)
  1305. self.store.add_object(blob)
  1306. return blobs
  1307. def get_raw_no_repeat(self, bin_sha):
  1308. """Wrapper around store.get_raw that doesn't allow repeat lookups."""
  1309. hex_sha = sha_to_hex(bin_sha)
  1310. self.assertNotIn(
  1311. hex_sha, self.fetched, f"Attempted to re-fetch object {hex_sha}"
  1312. )
  1313. self.fetched.add(hex_sha)
  1314. return self.store.get_raw(hex_sha)
  1315. def make_pack_iter(self, f, thin=None):
  1316. if thin is None:
  1317. thin = bool(list(self.store))
  1318. resolve_ext_ref = (thin and self.get_raw_no_repeat) or None
  1319. data = PackData("test.pack", file=f, object_format=DEFAULT_OBJECT_FORMAT)
  1320. self.addCleanup(data.close)
  1321. return TestPackIterator.for_pack_data(data, resolve_ext_ref=resolve_ext_ref)
  1322. def make_pack_iter_subset(self, f, subset, thin=None):
  1323. if thin is None:
  1324. thin = bool(list(self.store))
  1325. resolve_ext_ref = (thin and self.get_raw_no_repeat) or None
  1326. data = PackData("test.pack", file=f, object_format=DEFAULT_OBJECT_FORMAT)
  1327. assert data
  1328. index = MemoryPackIndex.for_pack(data)
  1329. pack = Pack.from_objects(data, index)
  1330. self.addCleanup(pack.close)
  1331. return TestPackIterator.for_pack_subset(
  1332. pack, subset, resolve_ext_ref=resolve_ext_ref
  1333. )
  1334. def assertEntriesMatch(self, expected_indexes, entries, pack_iter) -> None:
  1335. expected = [entries[i] for i in expected_indexes]
  1336. self.assertEqual(expected, list(pack_iter._walk_all_chains()))
  1337. def test_no_deltas(self) -> None:
  1338. f = BytesIO()
  1339. entries = build_pack(
  1340. f,
  1341. [
  1342. (Commit.type_num, b"commit"),
  1343. (Blob.type_num, b"blob"),
  1344. (Tree.type_num, b"tree"),
  1345. ],
  1346. )
  1347. self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
  1348. f.seek(0)
  1349. self.assertEntriesMatch([], entries, self.make_pack_iter_subset(f, []))
  1350. f.seek(0)
  1351. self.assertEntriesMatch(
  1352. [1, 0],
  1353. entries,
  1354. self.make_pack_iter_subset(f, [entries[0][3], entries[1][3]]),
  1355. )
  1356. f.seek(0)
  1357. self.assertEntriesMatch(
  1358. [1, 0],
  1359. entries,
  1360. self.make_pack_iter_subset(
  1361. f, [sha_to_hex(entries[0][3]), sha_to_hex(entries[1][3])]
  1362. ),
  1363. )
  1364. def test_ofs_deltas(self) -> None:
  1365. f = BytesIO()
  1366. entries = build_pack(
  1367. f,
  1368. [
  1369. (Blob.type_num, b"blob"),
  1370. (OFS_DELTA, (0, b"blob1")),
  1371. (OFS_DELTA, (0, b"blob2")),
  1372. ],
  1373. )
  1374. # Delta resolution changed to DFS
  1375. self.assertEntriesMatch([0, 2, 1], entries, self.make_pack_iter(f))
  1376. f.seek(0)
  1377. self.assertEntriesMatch(
  1378. [0, 2, 1],
  1379. entries,
  1380. self.make_pack_iter_subset(f, [entries[1][3], entries[2][3]]),
  1381. )
  1382. def test_ofs_deltas_chain(self) -> None:
  1383. f = BytesIO()
  1384. entries = build_pack(
  1385. f,
  1386. [
  1387. (Blob.type_num, b"blob"),
  1388. (OFS_DELTA, (0, b"blob1")),
  1389. (OFS_DELTA, (1, b"blob2")),
  1390. ],
  1391. )
  1392. self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
  1393. def test_ref_deltas(self) -> None:
  1394. f = BytesIO()
  1395. entries = build_pack(
  1396. f,
  1397. [
  1398. (REF_DELTA, (1, b"blob1")),
  1399. (Blob.type_num, (b"blob")),
  1400. (REF_DELTA, (1, b"blob2")),
  1401. ],
  1402. )
  1403. # Delta resolution changed to DFS
  1404. self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
  1405. def test_ref_deltas_chain(self) -> None:
  1406. f = BytesIO()
  1407. entries = build_pack(
  1408. f,
  1409. [
  1410. (REF_DELTA, (2, b"blob1")),
  1411. (Blob.type_num, (b"blob")),
  1412. (REF_DELTA, (1, b"blob2")),
  1413. ],
  1414. )
  1415. self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
  1416. def test_ofs_and_ref_deltas(self) -> None:
  1417. # Deltas pending on this offset are popped before deltas depending on
  1418. # this ref.
  1419. f = BytesIO()
  1420. entries = build_pack(
  1421. f,
  1422. [
  1423. (REF_DELTA, (1, b"blob1")),
  1424. (Blob.type_num, (b"blob")),
  1425. (OFS_DELTA, (1, b"blob2")),
  1426. ],
  1427. )
  1428. # Delta resolution changed to DFS
  1429. self.assertEntriesMatch([1, 0, 2], entries, self.make_pack_iter(f))
  1430. def test_mixed_chain(self) -> None:
  1431. f = BytesIO()
  1432. entries = build_pack(
  1433. f,
  1434. [
  1435. (Blob.type_num, b"blob"),
  1436. (REF_DELTA, (2, b"blob2")),
  1437. (OFS_DELTA, (0, b"blob1")),
  1438. (OFS_DELTA, (1, b"blob3")),
  1439. (OFS_DELTA, (0, b"bob")),
  1440. ],
  1441. )
  1442. # Delta resolution changed to DFS
  1443. self.assertEntriesMatch([0, 4, 2, 1, 3], entries, self.make_pack_iter(f))
  1444. def test_long_chain(self) -> None:
  1445. n = 100
  1446. objects_spec = [(Blob.type_num, b"blob")]
  1447. for i in range(n):
  1448. objects_spec.append((OFS_DELTA, (i, b"blob" + str(i).encode("ascii"))))
  1449. f = BytesIO()
  1450. entries = build_pack(f, objects_spec)
  1451. self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f))
  1452. def test_branchy_chain(self) -> None:
  1453. n = 100
  1454. objects_spec = [(Blob.type_num, b"blob")]
  1455. for i in range(n):
  1456. objects_spec.append((OFS_DELTA, (0, b"blob" + str(i).encode("ascii"))))
  1457. f = BytesIO()
  1458. entries = build_pack(f, objects_spec)
  1459. # Delta resolution changed to DFS
  1460. indices = [0, *list(range(100, 0, -1))]
  1461. self.assertEntriesMatch(indices, entries, self.make_pack_iter(f))
  1462. def test_ext_ref(self) -> None:
  1463. (blob,) = self.store_blobs([b"blob"])
  1464. f = BytesIO()
  1465. entries = build_pack(f, [(REF_DELTA, (blob.id, b"blob1"))], store=self.store)
  1466. pack_iter = self.make_pack_iter(f)
  1467. self.assertEntriesMatch([0], entries, pack_iter)
  1468. self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
  1469. def test_ext_ref_chain(self) -> None:
  1470. (blob,) = self.store_blobs([b"blob"])
  1471. f = BytesIO()
  1472. entries = build_pack(
  1473. f,
  1474. [
  1475. (REF_DELTA, (1, b"blob2")),
  1476. (REF_DELTA, (blob.id, b"blob1")),
  1477. ],
  1478. store=self.store,
  1479. )
  1480. pack_iter = self.make_pack_iter(f)
  1481. self.assertEntriesMatch([1, 0], entries, pack_iter)
  1482. self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
  1483. def test_ext_ref_chain_degenerate(self) -> None:
  1484. # Test a degenerate case where the sender is sending a REF_DELTA
  1485. # object that expands to an object already in the repository.
  1486. (blob,) = self.store_blobs([b"blob"])
  1487. (blob2,) = self.store_blobs([b"blob2"])
  1488. assert blob.id < blob2.id
  1489. f = BytesIO()
  1490. entries = build_pack(
  1491. f,
  1492. [
  1493. (REF_DELTA, (blob.id, b"blob2")),
  1494. (REF_DELTA, (0, b"blob3")),
  1495. ],
  1496. store=self.store,
  1497. )
  1498. pack_iter = self.make_pack_iter(f)
  1499. self.assertEntriesMatch([0, 1], entries, pack_iter)
  1500. self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
  1501. def test_ext_ref_multiple_times(self) -> None:
  1502. (blob,) = self.store_blobs([b"blob"])
  1503. f = BytesIO()
  1504. entries = build_pack(
  1505. f,
  1506. [
  1507. (REF_DELTA, (blob.id, b"blob1")),
  1508. (REF_DELTA, (blob.id, b"blob2")),
  1509. ],
  1510. store=self.store,
  1511. )
  1512. pack_iter = self.make_pack_iter(f)
  1513. self.assertEntriesMatch([0, 1], entries, pack_iter)
  1514. self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
  1515. def test_multiple_ext_refs(self) -> None:
  1516. b1, b2 = self.store_blobs([b"foo", b"bar"])
  1517. f = BytesIO()
  1518. entries = build_pack(
  1519. f,
  1520. [
  1521. (REF_DELTA, (b1.id, b"foo1")),
  1522. (REF_DELTA, (b2.id, b"bar2")),
  1523. ],
  1524. store=self.store,
  1525. )
  1526. pack_iter = self.make_pack_iter(f)
  1527. self.assertEntriesMatch([0, 1], entries, pack_iter)
  1528. self.assertEqual([hex_to_sha(b1.id), hex_to_sha(b2.id)], pack_iter.ext_refs())
  1529. def test_bad_ext_ref_non_thin_pack(self) -> None:
  1530. (blob,) = self.store_blobs([b"blob"])
  1531. f = BytesIO()
  1532. build_pack(f, [(REF_DELTA, (blob.id, b"blob1"))], store=self.store)
  1533. pack_iter = self.make_pack_iter(f, thin=False)
  1534. try:
  1535. list(pack_iter._walk_all_chains())
  1536. self.fail()
  1537. except UnresolvedDeltas as e:
  1538. self.assertEqual([blob.id], e.shas)
  1539. def test_bad_ext_ref_thin_pack(self) -> None:
  1540. b1, b2, b3 = self.store_blobs([b"foo", b"bar", b"baz"])
  1541. f = BytesIO()
  1542. build_pack(
  1543. f,
  1544. [
  1545. (REF_DELTA, (1, b"foo99")),
  1546. (REF_DELTA, (b1.id, b"foo1")),
  1547. (REF_DELTA, (b2.id, b"bar2")),
  1548. (REF_DELTA, (b3.id, b"baz3")),
  1549. ],
  1550. store=self.store,
  1551. )
  1552. del self.store[b2.id]
  1553. del self.store[b3.id]
  1554. pack_iter = self.make_pack_iter(f)
  1555. try:
  1556. list(pack_iter._walk_all_chains())
  1557. self.fail()
  1558. except UnresolvedDeltas as e:
  1559. self.assertEqual((sorted([b2.id, b3.id]),), (sorted(e.shas),))
  1560. def test_ext_ref_deltified_object_based_on_itself(self) -> None:
  1561. b1_content = b"foo"
  1562. (b1,) = self.store_blobs([b1_content])
  1563. f = BytesIO()
  1564. build_pack(
  1565. f,
  1566. [
  1567. # b1's content refers to bl1's object ID as delta base
  1568. (REF_DELTA, (b1.id, b1_content)),
  1569. ],
  1570. store=self.store,
  1571. )
  1572. fsize = f.tell()
  1573. f.seek(0)
  1574. packdata = PackData.from_file(f, DEFAULT_OBJECT_FORMAT, fsize)
  1575. td = tempfile.mkdtemp()
  1576. idx_path = os.path.join(td, "test.idx")
  1577. self.addCleanup(shutil.rmtree, td)
  1578. packdata.create_index(
  1579. idx_path,
  1580. version=2,
  1581. resolve_ext_ref=self.get_raw_no_repeat,
  1582. )
  1583. packindex = load_pack_index(idx_path, DEFAULT_OBJECT_FORMAT)
  1584. pack = Pack.from_objects(packdata, packindex)
  1585. try:
  1586. # Attempting to open this REF_DELTA object would loop forever
  1587. pack[b1.id]
  1588. except UnresolvedDeltas as e:
  1589. self.assertEqual([b1.id], [sha_to_hex(sha) for sha in e.shas])
  1590. finally:
  1591. pack.close()
  1592. packdata.close()
  1593. class DeltaEncodeSizeTests(TestCase):
  1594. def test_basic(self) -> None:
  1595. self.assertEqual(b"\x00", _delta_encode_size(0))
  1596. self.assertEqual(b"\x01", _delta_encode_size(1))
  1597. self.assertEqual(b"\xfa\x01", _delta_encode_size(250))
  1598. self.assertEqual(b"\xe8\x07", _delta_encode_size(1000))
  1599. self.assertEqual(b"\xa0\x8d\x06", _delta_encode_size(100000))
  1600. class EncodeCopyOperationTests(TestCase):
  1601. def test_basic(self) -> None:
  1602. self.assertEqual(b"\x80", _encode_copy_operation(0, 0))
  1603. self.assertEqual(b"\x91\x01\x0a", _encode_copy_operation(1, 10))
  1604. self.assertEqual(b"\xb1\x64\xe8\x03", _encode_copy_operation(100, 1000))
  1605. self.assertEqual(b"\x93\xe8\x03\x01", _encode_copy_operation(1000, 1))