test_object_store.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747
  1. # test_object_store.py -- tests for object_store.py
  2. # Copyright (C) 2008 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the object store interface."""
  22. import os
  23. import shutil
  24. import stat
  25. import sys
  26. import tempfile
  27. from contextlib import closing
  28. from io import BytesIO
  29. from dulwich.errors import NotTreeError
  30. from dulwich.index import commit_tree
  31. from dulwich.object_store import (
  32. DiskObjectStore,
  33. MemoryObjectStore,
  34. ObjectStoreGraphWalker,
  35. OverlayObjectStore,
  36. commit_tree_changes,
  37. read_packs_file,
  38. tree_lookup_path,
  39. )
  40. from dulwich.objects import (
  41. S_IFGITLINK,
  42. Blob,
  43. EmptyFileException,
  44. SubmoduleEncountered,
  45. Tree,
  46. TreeEntry,
  47. sha_to_hex,
  48. )
  49. from dulwich.pack import REF_DELTA, write_pack_objects
  50. from dulwich.tests.test_object_store import ObjectStoreTests, PackBasedObjectStoreTests
  51. from dulwich.tests.utils import build_pack, make_object
  52. from . import TestCase
  53. testobject = make_object(Blob, data=b"yummy data")
  54. class OverlayObjectStoreTests(ObjectStoreTests, TestCase):
  55. def setUp(self) -> None:
  56. TestCase.setUp(self)
  57. self.bases = [MemoryObjectStore(), MemoryObjectStore()]
  58. self.store = OverlayObjectStore(self.bases, self.bases[0])
  59. class MemoryObjectStoreTests(ObjectStoreTests, TestCase):
  60. def setUp(self) -> None:
  61. TestCase.setUp(self)
  62. self.store = MemoryObjectStore()
  63. def test_add_pack(self) -> None:
  64. o = MemoryObjectStore()
  65. f, commit, abort = o.add_pack()
  66. try:
  67. b = make_object(Blob, data=b"more yummy data")
  68. write_pack_objects(f.write, [(b, None)])
  69. except BaseException:
  70. abort()
  71. raise
  72. else:
  73. commit()
  74. def test_add_pack_emtpy(self) -> None:
  75. o = MemoryObjectStore()
  76. f, commit, abort = o.add_pack()
  77. commit()
  78. def test_add_thin_pack(self) -> None:
  79. o = MemoryObjectStore()
  80. blob = make_object(Blob, data=b"yummy data")
  81. o.add_object(blob)
  82. f = BytesIO()
  83. entries = build_pack(
  84. f,
  85. [
  86. (REF_DELTA, (blob.id, b"more yummy data")),
  87. ],
  88. store=o,
  89. )
  90. o.add_thin_pack(f.read, None)
  91. packed_blob_sha = sha_to_hex(entries[0][3])
  92. self.assertEqual(
  93. (Blob.type_num, b"more yummy data"), o.get_raw(packed_blob_sha)
  94. )
  95. def test_add_thin_pack_empty(self) -> None:
  96. o = MemoryObjectStore()
  97. f = BytesIO()
  98. entries = build_pack(f, [], store=o)
  99. self.assertEqual([], entries)
  100. o.add_thin_pack(f.read, None)
  101. def test_add_pack_data_with_deltas(self) -> None:
  102. """Test that add_pack_data properly handles delta objects.
  103. This test verifies that MemoryObjectStore.add_pack_data can handle
  104. pack data containing delta objects. Before the fix for issue #1179,
  105. this would fail with AssertionError when trying to call sha_file()
  106. on unresolved delta objects.
  107. The fix routes through add_pack() which properly resolves deltas.
  108. """
  109. o1 = MemoryObjectStore()
  110. o2 = MemoryObjectStore()
  111. base_blob = make_object(Blob, data=b"base data")
  112. o1.add_object(base_blob)
  113. # Create a pack with a delta object
  114. f = BytesIO()
  115. entries = build_pack(
  116. f,
  117. [
  118. (REF_DELTA, (base_blob.id, b"more data")),
  119. ],
  120. store=o1,
  121. )
  122. # Use add_thin_pack which internally calls add_pack_data
  123. # This demonstrates the scenario where delta resolution is needed
  124. f.seek(0)
  125. o2.add_object(base_blob) # Need base object for thin pack
  126. o2.add_thin_pack(f.read, None)
  127. # Verify the delta object was properly resolved and added
  128. packed_blob_sha = sha_to_hex(entries[0][3])
  129. self.assertIn(packed_blob_sha, o2)
  130. self.assertEqual((Blob.type_num, b"more data"), o2.get_raw(packed_blob_sha))
  131. class DiskObjectStoreTests(PackBasedObjectStoreTests, TestCase):
  132. def setUp(self) -> None:
  133. TestCase.setUp(self)
  134. self.store_dir = tempfile.mkdtemp()
  135. self.addCleanup(shutil.rmtree, self.store_dir)
  136. self.store = DiskObjectStore.init(self.store_dir)
  137. def tearDown(self) -> None:
  138. TestCase.tearDown(self)
  139. PackBasedObjectStoreTests.tearDown(self)
  140. def test_loose_compression_level(self) -> None:
  141. alternate_dir = tempfile.mkdtemp()
  142. self.addCleanup(shutil.rmtree, alternate_dir)
  143. alternate_store = DiskObjectStore(alternate_dir, loose_compression_level=6)
  144. b2 = make_object(Blob, data=b"yummy data")
  145. alternate_store.add_object(b2)
  146. def test_alternates(self) -> None:
  147. alternate_dir = tempfile.mkdtemp()
  148. self.addCleanup(shutil.rmtree, alternate_dir)
  149. alternate_store = DiskObjectStore(alternate_dir)
  150. b2 = make_object(Blob, data=b"yummy data")
  151. alternate_store.add_object(b2)
  152. store = DiskObjectStore(self.store_dir)
  153. self.assertRaises(KeyError, store.__getitem__, b2.id)
  154. store.add_alternate_path(alternate_dir)
  155. self.assertIn(b2.id, store)
  156. self.assertEqual(b2, store[b2.id])
  157. def test_read_alternate_paths(self) -> None:
  158. store = DiskObjectStore(self.store_dir)
  159. abs_path = os.path.abspath(os.path.normpath("/abspath"))
  160. # ensures in particular existence of the alternates file
  161. store.add_alternate_path(abs_path)
  162. self.assertEqual(set(store._read_alternate_paths()), {abs_path})
  163. store.add_alternate_path("relative-path")
  164. self.assertIn(
  165. os.path.join(store.path, "relative-path"),
  166. set(store._read_alternate_paths()),
  167. )
  168. # arguably, add_alternate_path() could strip comments.
  169. # Meanwhile it's more convenient to use it than to import INFODIR
  170. store.add_alternate_path("# comment")
  171. for alt_path in store._read_alternate_paths():
  172. self.assertNotIn("#", alt_path)
  173. def test_file_modes(self) -> None:
  174. self.store.add_object(testobject)
  175. path = self.store._get_shafile_path(testobject.id)
  176. mode = os.stat(path).st_mode
  177. packmode = "0o100444" if sys.platform != "win32" else "0o100666"
  178. self.assertEqual(oct(mode), packmode)
  179. def test_corrupted_object_raise_exception(self) -> None:
  180. """Corrupted sha1 disk file should raise specific exception."""
  181. self.store.add_object(testobject)
  182. self.assertEqual(
  183. (Blob.type_num, b"yummy data"), self.store.get_raw(testobject.id)
  184. )
  185. self.assertTrue(self.store.contains_loose(testobject.id))
  186. self.assertIsNotNone(self.store._get_loose_object(testobject.id))
  187. path = self.store._get_shafile_path(testobject.id)
  188. old_mode = os.stat(path).st_mode
  189. os.chmod(path, 0o600)
  190. with open(path, "wb") as f: # corrupt the file
  191. f.write(b"")
  192. os.chmod(path, old_mode)
  193. expected_error_msg = "Corrupted empty file detected"
  194. try:
  195. self.store.contains_loose(testobject.id)
  196. except EmptyFileException as e:
  197. self.assertEqual(str(e), expected_error_msg)
  198. try:
  199. self.store._get_loose_object(testobject.id)
  200. except EmptyFileException as e:
  201. self.assertEqual(str(e), expected_error_msg)
  202. # this does not change iteration on loose objects though
  203. self.assertEqual([testobject.id], list(self.store._iter_loose_objects()))
  204. def test_tempfile_in_loose_store(self) -> None:
  205. self.store.add_object(testobject)
  206. self.assertEqual([testobject.id], list(self.store._iter_loose_objects()))
  207. # add temporary files to the loose store
  208. for i in range(256):
  209. dirname = os.path.join(self.store_dir, f"{i:02x}")
  210. if not os.path.isdir(dirname):
  211. os.makedirs(dirname)
  212. fd, n = tempfile.mkstemp(prefix="tmp_obj_", dir=dirname)
  213. os.close(fd)
  214. self.assertEqual([testobject.id], list(self.store._iter_loose_objects()))
  215. def test_add_alternate_path(self) -> None:
  216. store = DiskObjectStore(self.store_dir)
  217. self.assertEqual([], list(store._read_alternate_paths()))
  218. store.add_alternate_path(os.path.abspath("/foo/path"))
  219. self.assertEqual(
  220. [os.path.abspath("/foo/path")], list(store._read_alternate_paths())
  221. )
  222. if sys.platform == "win32":
  223. store.add_alternate_path("D:\\bar\\path")
  224. else:
  225. store.add_alternate_path("/bar/path")
  226. if sys.platform == "win32":
  227. self.assertEqual(
  228. [os.path.abspath("/foo/path"), "D:\\bar\\path"],
  229. list(store._read_alternate_paths()),
  230. )
  231. else:
  232. self.assertEqual(
  233. [os.path.abspath("/foo/path"), "/bar/path"],
  234. list(store._read_alternate_paths()),
  235. )
  236. def test_rel_alternative_path(self) -> None:
  237. alternate_dir = tempfile.mkdtemp()
  238. self.addCleanup(shutil.rmtree, alternate_dir)
  239. alternate_store = DiskObjectStore(alternate_dir)
  240. b2 = make_object(Blob, data=b"yummy data")
  241. alternate_store.add_object(b2)
  242. store = DiskObjectStore(self.store_dir)
  243. self.assertRaises(KeyError, store.__getitem__, b2.id)
  244. store.add_alternate_path(os.path.relpath(alternate_dir, self.store_dir))
  245. self.assertEqual(list(alternate_store), list(store.alternates[0]))
  246. self.assertIn(b2.id, store)
  247. self.assertEqual(b2, store[b2.id])
  248. def test_pack_dir(self) -> None:
  249. o = DiskObjectStore(self.store_dir)
  250. self.assertEqual(os.path.join(self.store_dir, "pack"), o.pack_dir)
  251. def test_add_pack(self) -> None:
  252. o = DiskObjectStore(self.store_dir)
  253. self.addCleanup(o.close)
  254. f, commit, abort = o.add_pack()
  255. try:
  256. b = make_object(Blob, data=b"more yummy data")
  257. write_pack_objects(f.write, [(b, None)])
  258. except BaseException:
  259. abort()
  260. raise
  261. else:
  262. commit()
  263. def test_add_thin_pack(self) -> None:
  264. o = DiskObjectStore(self.store_dir)
  265. self.addCleanup(o.close)
  266. blob = make_object(Blob, data=b"yummy data")
  267. o.add_object(blob)
  268. f = BytesIO()
  269. entries = build_pack(
  270. f,
  271. [
  272. (REF_DELTA, (blob.id, b"more yummy data")),
  273. ],
  274. store=o,
  275. )
  276. with o.add_thin_pack(f.read, None) as pack:
  277. packed_blob_sha = sha_to_hex(entries[0][3])
  278. pack.check_length_and_checksum()
  279. self.assertEqual(sorted([blob.id, packed_blob_sha]), list(pack))
  280. self.assertTrue(o.contains_packed(packed_blob_sha))
  281. self.assertTrue(o.contains_packed(blob.id))
  282. self.assertEqual(
  283. (Blob.type_num, b"more yummy data"),
  284. o.get_raw(packed_blob_sha),
  285. )
  286. def test_add_thin_pack_empty(self) -> None:
  287. with closing(DiskObjectStore(self.store_dir)) as o:
  288. f = BytesIO()
  289. entries = build_pack(f, [], store=o)
  290. self.assertEqual([], entries)
  291. o.add_thin_pack(f.read, None)
  292. def test_pack_index_version_config(self) -> None:
  293. # Test that pack.indexVersion configuration is respected
  294. from dulwich.config import ConfigDict
  295. from dulwich.pack import load_pack_index
  296. # Create config with pack.indexVersion = 1
  297. config = ConfigDict()
  298. config[(b"pack",)] = {b"indexVersion": b"1"}
  299. # Create object store with config
  300. store_dir = tempfile.mkdtemp()
  301. self.addCleanup(shutil.rmtree, store_dir)
  302. os.makedirs(os.path.join(store_dir, "pack"))
  303. store = DiskObjectStore.from_config(store_dir, config)
  304. self.addCleanup(store.close)
  305. # Create some objects to pack
  306. b1 = make_object(Blob, data=b"blob1")
  307. b2 = make_object(Blob, data=b"blob2")
  308. store.add_objects([(b1, None), (b2, None)])
  309. # Add a pack
  310. f, commit, abort = store.add_pack()
  311. try:
  312. # build_pack expects (type_num, data) tuples
  313. objects_spec = [
  314. (b1.type_num, b1.as_raw_string()),
  315. (b2.type_num, b2.as_raw_string()),
  316. ]
  317. build_pack(f, objects_spec, store=store)
  318. commit()
  319. except:
  320. abort()
  321. raise
  322. # Find the created pack index
  323. pack_dir = os.path.join(store_dir, "pack")
  324. idx_files = [f for f in os.listdir(pack_dir) if f.endswith(".idx")]
  325. self.assertEqual(1, len(idx_files))
  326. # Load and verify it's version 1
  327. idx_path = os.path.join(pack_dir, idx_files[0])
  328. idx = load_pack_index(idx_path)
  329. self.assertEqual(1, idx.version)
  330. # Test version 3
  331. config[(b"pack",)] = {b"indexVersion": b"3"}
  332. store_dir2 = tempfile.mkdtemp()
  333. self.addCleanup(shutil.rmtree, store_dir2)
  334. os.makedirs(os.path.join(store_dir2, "pack"))
  335. store2 = DiskObjectStore.from_config(store_dir2, config)
  336. self.addCleanup(store2.close)
  337. b3 = make_object(Blob, data=b"blob3")
  338. store2.add_objects([(b3, None)])
  339. f2, commit2, abort2 = store2.add_pack()
  340. try:
  341. objects_spec2 = [(b3.type_num, b3.as_raw_string())]
  342. build_pack(f2, objects_spec2, store=store2)
  343. commit2()
  344. except:
  345. abort2()
  346. raise
  347. # Find and verify version 3 index
  348. pack_dir2 = os.path.join(store_dir2, "pack")
  349. idx_files2 = [f for f in os.listdir(pack_dir2) if f.endswith(".idx")]
  350. self.assertEqual(1, len(idx_files2))
  351. idx_path2 = os.path.join(pack_dir2, idx_files2[0])
  352. idx2 = load_pack_index(idx_path2)
  353. self.assertEqual(3, idx2.version)
  354. def test_prune_orphaned_tempfiles(self) -> None:
  355. import time
  356. # Create an orphaned temporary pack file in the repository directory
  357. tmp_pack_path = os.path.join(self.store_dir, "tmp_pack_test123")
  358. with open(tmp_pack_path, "wb") as f:
  359. f.write(b"temporary pack data")
  360. # Create an orphaned .pack file without .idx in pack directory
  361. pack_dir = os.path.join(self.store_dir, "pack")
  362. orphaned_pack_path = os.path.join(pack_dir, "pack-orphaned.pack")
  363. with open(orphaned_pack_path, "wb") as f:
  364. f.write(b"orphaned pack data")
  365. # Make files appear old by modifying mtime (older than grace period)
  366. from dulwich.object_store import DEFAULT_TEMPFILE_GRACE_PERIOD
  367. old_time = time.time() - (
  368. DEFAULT_TEMPFILE_GRACE_PERIOD + 3600
  369. ) # grace period + 1 hour
  370. os.utime(tmp_pack_path, (old_time, old_time))
  371. os.utime(orphaned_pack_path, (old_time, old_time))
  372. # Create a recent temporary file that should NOT be cleaned
  373. recent_tmp_path = os.path.join(self.store_dir, "tmp_pack_recent")
  374. with open(recent_tmp_path, "wb") as f:
  375. f.write(b"recent temp data")
  376. # Run prune
  377. self.store.prune()
  378. # Check that old orphaned files were removed
  379. self.assertFalse(os.path.exists(tmp_pack_path))
  380. self.assertFalse(os.path.exists(orphaned_pack_path))
  381. # Check that recent file was NOT removed
  382. self.assertTrue(os.path.exists(recent_tmp_path))
  383. # Cleanup the recent file
  384. os.remove(recent_tmp_path)
  385. def test_prune_with_custom_grace_period(self) -> None:
  386. """Test that prune respects custom grace period."""
  387. import time
  388. # Create a temporary file that's 1 hour old
  389. tmp_pack_path = os.path.join(self.store_dir, "tmp_pack_1hour")
  390. with open(tmp_pack_path, "wb") as f:
  391. f.write(b"1 hour old data")
  392. # Make it 1 hour old
  393. old_time = time.time() - 3600 # 1 hour ago
  394. os.utime(tmp_pack_path, (old_time, old_time))
  395. # Prune with default grace period (2 weeks) - should NOT remove
  396. self.store.prune()
  397. self.assertTrue(os.path.exists(tmp_pack_path))
  398. # Prune with 30 minute grace period - should remove
  399. self.store.prune(grace_period=1800) # 30 minutes
  400. self.assertFalse(os.path.exists(tmp_pack_path))
  401. def test_gc_prunes_tempfiles(self) -> None:
  402. """Test that garbage collection prunes temporary files."""
  403. import time
  404. from dulwich.gc import garbage_collect
  405. from dulwich.repo import Repo
  406. # Create a repository with the store
  407. repo = Repo.init(self.store_dir)
  408. # Create an old orphaned temporary file in the objects directory
  409. tmp_pack_path = os.path.join(repo.object_store.path, "tmp_pack_old")
  410. with open(tmp_pack_path, "wb") as f:
  411. f.write(b"old temporary data")
  412. # Make it old (older than grace period)
  413. from dulwich.object_store import DEFAULT_TEMPFILE_GRACE_PERIOD
  414. old_time = time.time() - (
  415. DEFAULT_TEMPFILE_GRACE_PERIOD + 3600
  416. ) # grace period + 1 hour
  417. os.utime(tmp_pack_path, (old_time, old_time))
  418. # Run garbage collection
  419. garbage_collect(repo)
  420. # Verify the orphaned file was cleaned up
  421. self.assertFalse(os.path.exists(tmp_pack_path))
  422. class TreeLookupPathTests(TestCase):
  423. def setUp(self) -> None:
  424. TestCase.setUp(self)
  425. self.store = MemoryObjectStore()
  426. blob_a = make_object(Blob, data=b"a")
  427. blob_b = make_object(Blob, data=b"b")
  428. blob_c = make_object(Blob, data=b"c")
  429. for blob in [blob_a, blob_b, blob_c]:
  430. self.store.add_object(blob)
  431. blobs = [
  432. (b"a", blob_a.id, 0o100644),
  433. (b"ad/b", blob_b.id, 0o100644),
  434. (b"ad/bd/c", blob_c.id, 0o100755),
  435. (b"ad/c", blob_c.id, 0o100644),
  436. (b"c", blob_c.id, 0o100644),
  437. (b"d", blob_c.id, S_IFGITLINK),
  438. ]
  439. self.tree_id = commit_tree(self.store, blobs)
  440. def get_object(self, sha):
  441. return self.store[sha]
  442. def test_lookup_blob(self) -> None:
  443. o_id = tree_lookup_path(self.get_object, self.tree_id, b"a")[1]
  444. self.assertIsInstance(self.store[o_id], Blob)
  445. def test_lookup_tree(self) -> None:
  446. o_id = tree_lookup_path(self.get_object, self.tree_id, b"ad")[1]
  447. self.assertIsInstance(self.store[o_id], Tree)
  448. o_id = tree_lookup_path(self.get_object, self.tree_id, b"ad/bd")[1]
  449. self.assertIsInstance(self.store[o_id], Tree)
  450. o_id = tree_lookup_path(self.get_object, self.tree_id, b"ad/bd/")[1]
  451. self.assertIsInstance(self.store[o_id], Tree)
  452. def test_lookup_submodule(self) -> None:
  453. tree_lookup_path(self.get_object, self.tree_id, b"d")[1]
  454. self.assertRaises(
  455. SubmoduleEncountered,
  456. tree_lookup_path,
  457. self.get_object,
  458. self.tree_id,
  459. b"d/a",
  460. )
  461. def test_lookup_nonexistent(self) -> None:
  462. self.assertRaises(
  463. KeyError, tree_lookup_path, self.get_object, self.tree_id, b"j"
  464. )
  465. def test_lookup_not_tree(self) -> None:
  466. self.assertRaises(
  467. NotTreeError,
  468. tree_lookup_path,
  469. self.get_object,
  470. self.tree_id,
  471. b"ad/b/j",
  472. )
  473. class ObjectStoreGraphWalkerTests(TestCase):
  474. def get_walker(self, heads, parent_map):
  475. new_parent_map = {
  476. k * 40: [(p * 40) for p in ps] for (k, ps) in parent_map.items()
  477. }
  478. return ObjectStoreGraphWalker(
  479. [x * 40 for x in heads], new_parent_map.__getitem__
  480. )
  481. def test_ack_invalid_value(self) -> None:
  482. gw = self.get_walker([], {})
  483. self.assertRaises(ValueError, gw.ack, "tooshort")
  484. def test_empty(self) -> None:
  485. gw = self.get_walker([], {})
  486. self.assertIs(None, next(gw))
  487. gw.ack(b"a" * 40)
  488. self.assertIs(None, next(gw))
  489. def test_descends(self) -> None:
  490. gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []})
  491. self.assertEqual(b"a" * 40, next(gw))
  492. self.assertEqual(b"b" * 40, next(gw))
  493. def test_present(self) -> None:
  494. gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []})
  495. gw.ack(b"a" * 40)
  496. self.assertIs(None, next(gw))
  497. def test_parent_present(self) -> None:
  498. gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []})
  499. self.assertEqual(b"a" * 40, next(gw))
  500. gw.ack(b"a" * 40)
  501. self.assertIs(None, next(gw))
  502. def test_child_ack_later(self) -> None:
  503. gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": [b"c"], b"c": []})
  504. self.assertEqual(b"a" * 40, next(gw))
  505. self.assertEqual(b"b" * 40, next(gw))
  506. gw.ack(b"a" * 40)
  507. self.assertIs(None, next(gw))
  508. def test_only_once(self) -> None:
  509. # a b
  510. # | |
  511. # c d
  512. # \ /
  513. # e
  514. gw = self.get_walker(
  515. [b"a", b"b"],
  516. {
  517. b"a": [b"c"],
  518. b"b": [b"d"],
  519. b"c": [b"e"],
  520. b"d": [b"e"],
  521. b"e": [],
  522. },
  523. )
  524. walk = []
  525. acked = False
  526. walk.append(next(gw))
  527. walk.append(next(gw))
  528. # A branch (a, c) or (b, d) may be done after 2 steps or 3 depending on
  529. # the order walked: 3-step walks include (a, b, c) and (b, a, d), etc.
  530. if walk == [b"a" * 40, b"c" * 40] or walk == [b"b" * 40, b"d" * 40]:
  531. gw.ack(walk[0])
  532. acked = True
  533. walk.append(next(gw))
  534. if not acked and walk[2] == b"c" * 40:
  535. gw.ack(b"a" * 40)
  536. elif not acked and walk[2] == b"d" * 40:
  537. gw.ack(b"b" * 40)
  538. walk.append(next(gw))
  539. self.assertIs(None, next(gw))
  540. self.assertEqual([b"a" * 40, b"b" * 40, b"c" * 40, b"d" * 40], sorted(walk))
  541. self.assertLess(walk.index(b"a" * 40), walk.index(b"c" * 40))
  542. self.assertLess(walk.index(b"b" * 40), walk.index(b"d" * 40))
  543. class CommitTreeChangesTests(TestCase):
  544. def setUp(self) -> None:
  545. super().setUp()
  546. self.store = MemoryObjectStore()
  547. self.blob_a = make_object(Blob, data=b"a")
  548. self.blob_b = make_object(Blob, data=b"b")
  549. self.blob_c = make_object(Blob, data=b"c")
  550. for blob in [self.blob_a, self.blob_b, self.blob_c]:
  551. self.store.add_object(blob)
  552. blobs = [
  553. (b"a", self.blob_a.id, 0o100644),
  554. (b"ad/b", self.blob_b.id, 0o100644),
  555. (b"ad/bd/c", self.blob_c.id, 0o100755),
  556. (b"ad/c", self.blob_c.id, 0o100644),
  557. (b"c", self.blob_c.id, 0o100644),
  558. ]
  559. self.tree_id = commit_tree(self.store, blobs)
  560. def test_no_changes(self) -> None:
  561. self.assertEqual(
  562. self.store[self.tree_id],
  563. commit_tree_changes(self.store, self.store[self.tree_id], []),
  564. )
  565. def test_add_blob(self) -> None:
  566. blob_d = make_object(Blob, data=b"d")
  567. new_tree = commit_tree_changes(
  568. self.store, self.store[self.tree_id], [(b"d", 0o100644, blob_d.id)]
  569. )
  570. self.assertEqual(
  571. new_tree[b"d"],
  572. (33188, b"c59d9b6344f1af00e504ba698129f07a34bbed8d"),
  573. )
  574. def test_add_blob_in_dir(self) -> None:
  575. blob_d = make_object(Blob, data=b"d")
  576. new_tree = commit_tree_changes(
  577. self.store,
  578. self.store[self.tree_id],
  579. [(b"e/f/d", 0o100644, blob_d.id)],
  580. )
  581. self.assertEqual(
  582. new_tree.items(),
  583. [
  584. TreeEntry(path=b"a", mode=stat.S_IFREG | 0o100644, sha=self.blob_a.id),
  585. TreeEntry(
  586. path=b"ad",
  587. mode=stat.S_IFDIR,
  588. sha=b"0e2ce2cd7725ff4817791be31ccd6e627e801f4a",
  589. ),
  590. TreeEntry(path=b"c", mode=stat.S_IFREG | 0o100644, sha=self.blob_c.id),
  591. TreeEntry(
  592. path=b"e",
  593. mode=stat.S_IFDIR,
  594. sha=b"6ab344e288724ac2fb38704728b8896e367ed108",
  595. ),
  596. ],
  597. )
  598. e_tree = self.store[new_tree[b"e"][1]]
  599. self.assertEqual(
  600. e_tree.items(),
  601. [
  602. TreeEntry(
  603. path=b"f",
  604. mode=stat.S_IFDIR,
  605. sha=b"24d2c94d8af232b15a0978c006bf61ef4479a0a5",
  606. )
  607. ],
  608. )
  609. f_tree = self.store[e_tree[b"f"][1]]
  610. self.assertEqual(
  611. f_tree.items(),
  612. [TreeEntry(path=b"d", mode=stat.S_IFREG | 0o100644, sha=blob_d.id)],
  613. )
  614. def test_delete_blob(self) -> None:
  615. new_tree = commit_tree_changes(
  616. self.store, self.store[self.tree_id], [(b"ad/bd/c", None, None)]
  617. )
  618. self.assertEqual(set(new_tree), {b"a", b"ad", b"c"})
  619. ad_tree = self.store[new_tree[b"ad"][1]]
  620. self.assertEqual(set(ad_tree), {b"b", b"c"})
  621. class TestReadPacksFile(TestCase):
  622. def test_read_packs(self) -> None:
  623. self.assertEqual(
  624. ["pack-1.pack"],
  625. list(
  626. read_packs_file(
  627. BytesIO(
  628. b"""P pack-1.pack
  629. """
  630. )
  631. )
  632. ),
  633. )