test_object_store.py 27 KB


  1. # test_object_store.py -- tests for object_store.py
  2. # Copyright (C) 2008 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as public by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. """Tests for the object store interface."""
  21. from contextlib import closing
  22. from io import BytesIO
  23. from unittest import skipUnless
  24. import os
  25. import shutil
  26. import stat
  27. import sys
  28. import tempfile
  29. from dulwich.index import (
  30. commit_tree,
  31. )
  32. from dulwich.errors import (
  33. NotTreeError,
  34. )
  35. from dulwich.objects import (
  36. sha_to_hex,
  37. Blob,
  38. Tree,
  39. TreeEntry,
  40. EmptyFileException,
  41. SubmoduleEncountered,
  42. S_IFGITLINK,
  43. )
  44. from dulwich.object_store import (
  45. DiskObjectStore,
  46. MemoryObjectStore,
  47. OverlayObjectStore,
  48. ObjectStoreGraphWalker,
  49. commit_tree_changes,
  50. iter_tree_contents,
  51. peel_sha,
  52. read_packs_file,
  53. tree_lookup_path,
  54. )
  55. from dulwich.pack import (
  56. REF_DELTA,
  57. write_pack_objects,
  58. )
  59. from dulwich.protocol import DEPTH_INFINITE
  60. from dulwich.tests import (
  61. TestCase,
  62. )
  63. from dulwich.tests.utils import (
  64. make_object,
  65. make_tag,
  66. build_pack,
  67. )
  68. try:
  69. from unittest.mock import patch
  70. except ImportError:
  71. patch = None # type: ignore
  72. testobject = make_object(Blob, data=b"yummy data")
  73. class ObjectStoreTests:
  74. def test_determine_wants_all(self):
  75. self.assertEqual(
  76. [b"1" * 40],
  77. self.store.determine_wants_all({b"refs/heads/foo": b"1" * 40}),
  78. )
  79. def test_determine_wants_all_zero(self):
  80. self.assertEqual(
  81. [], self.store.determine_wants_all({b"refs/heads/foo": b"0" * 40})
  82. )
  83. @skipUnless(patch, "Required mock.patch")
  84. def test_determine_wants_all_depth(self):
  85. self.store.add_object(testobject)
  86. refs = {b"refs/heads/foo": testobject.id}
  87. with patch.object(self.store, "_get_depth", return_value=1) as m:
  88. self.assertEqual(
  89. [], self.store.determine_wants_all(refs, depth=0)
  90. )
  91. self.assertEqual(
  92. [testobject.id],
  93. self.store.determine_wants_all(refs, depth=DEPTH_INFINITE),
  94. )
  95. m.assert_not_called()
  96. self.assertEqual(
  97. [], self.store.determine_wants_all(refs, depth=1)
  98. )
  99. m.assert_called_with(testobject.id)
  100. self.assertEqual(
  101. [testobject.id], self.store.determine_wants_all(refs, depth=2)
  102. )
  103. def test_get_depth(self):
  104. self.assertEqual(
  105. 0, self.store._get_depth(testobject.id)
  106. )
  107. self.store.add_object(testobject)
  108. self.assertEqual(
  109. 1, self.store._get_depth(testobject.id, get_parents=lambda x: [])
  110. )
  111. parent = make_object(Blob, data=b"parent data")
  112. self.store.add_object(parent)
  113. self.assertEqual(
  114. 2,
  115. self.store._get_depth(
  116. testobject.id,
  117. get_parents=lambda x: [parent.id] if x == testobject else [],
  118. ),
  119. )
  120. def test_iter(self):
  121. self.assertEqual([], list(self.store))
  122. def test_get_nonexistant(self):
  123. self.assertRaises(KeyError, lambda: self.store[b"a" * 40])
  124. def test_contains_nonexistant(self):
  125. self.assertNotIn(b"a" * 40, self.store)
  126. def test_add_objects_empty(self):
  127. self.store.add_objects([])
  128. def test_add_commit(self):
  129. # TODO: Argh, no way to construct Git commit objects without
  130. # access to a serialized form.
  131. self.store.add_objects([])
  132. def test_store_resilience(self):
  133. """Test if updating an existing stored object doesn't erase the
  134. object from the store.
  135. """
  136. test_object = make_object(Blob, data=b"data")
  137. self.store.add_object(test_object)
  138. test_object_id = test_object.id
  139. test_object.data = test_object.data + b"update"
  140. stored_test_object = self.store[test_object_id]
  141. self.assertNotEqual(test_object.id, stored_test_object.id)
  142. self.assertEqual(stored_test_object.id, test_object_id)
  143. def test_add_object(self):
  144. self.store.add_object(testobject)
  145. self.assertEqual({testobject.id}, set(self.store))
  146. self.assertIn(testobject.id, self.store)
  147. r = self.store[testobject.id]
  148. self.assertEqual(r, testobject)
  149. def test_add_objects(self):
  150. data = [(testobject, "mypath")]
  151. self.store.add_objects(data)
  152. self.assertEqual({testobject.id}, set(self.store))
  153. self.assertIn(testobject.id, self.store)
  154. r = self.store[testobject.id]
  155. self.assertEqual(r, testobject)
  156. def test_tree_changes(self):
  157. blob_a1 = make_object(Blob, data=b"a1")
  158. blob_a2 = make_object(Blob, data=b"a2")
  159. blob_b = make_object(Blob, data=b"b")
  160. for blob in [blob_a1, blob_a2, blob_b]:
  161. self.store.add_object(blob)
  162. blobs_1 = [(b"a", blob_a1.id, 0o100644), (b"b", blob_b.id, 0o100644)]
  163. tree1_id = commit_tree(self.store, blobs_1)
  164. blobs_2 = [(b"a", blob_a2.id, 0o100644), (b"b", blob_b.id, 0o100644)]
  165. tree2_id = commit_tree(self.store, blobs_2)
  166. change_a = (
  167. (b"a", b"a"),
  168. (0o100644, 0o100644),
  169. (blob_a1.id, blob_a2.id),
  170. )
  171. self.assertEqual([change_a], list(self.store.tree_changes(tree1_id, tree2_id)))
  172. self.assertEqual(
  173. [
  174. change_a,
  175. ((b"b", b"b"), (0o100644, 0o100644), (blob_b.id, blob_b.id)),
  176. ],
  177. list(self.store.tree_changes(tree1_id, tree2_id, want_unchanged=True)),
  178. )
  179. def test_iter_tree_contents(self):
  180. blob_a = make_object(Blob, data=b"a")
  181. blob_b = make_object(Blob, data=b"b")
  182. blob_c = make_object(Blob, data=b"c")
  183. for blob in [blob_a, blob_b, blob_c]:
  184. self.store.add_object(blob)
  185. blobs = [
  186. (b"a", blob_a.id, 0o100644),
  187. (b"ad/b", blob_b.id, 0o100644),
  188. (b"ad/bd/c", blob_c.id, 0o100755),
  189. (b"ad/c", blob_c.id, 0o100644),
  190. (b"c", blob_c.id, 0o100644),
  191. ]
  192. tree_id = commit_tree(self.store, blobs)
  193. self.assertEqual(
  194. [TreeEntry(p, m, h) for (p, h, m) in blobs],
  195. list(iter_tree_contents(self.store, tree_id)),
  196. )
  197. def test_iter_tree_contents_include_trees(self):
  198. blob_a = make_object(Blob, data=b"a")
  199. blob_b = make_object(Blob, data=b"b")
  200. blob_c = make_object(Blob, data=b"c")
  201. for blob in [blob_a, blob_b, blob_c]:
  202. self.store.add_object(blob)
  203. blobs = [
  204. (b"a", blob_a.id, 0o100644),
  205. (b"ad/b", blob_b.id, 0o100644),
  206. (b"ad/bd/c", blob_c.id, 0o100755),
  207. ]
  208. tree_id = commit_tree(self.store, blobs)
  209. tree = self.store[tree_id]
  210. tree_ad = self.store[tree[b"ad"][1]]
  211. tree_bd = self.store[tree_ad[b"bd"][1]]
  212. expected = [
  213. TreeEntry(b"", 0o040000, tree_id),
  214. TreeEntry(b"a", 0o100644, blob_a.id),
  215. TreeEntry(b"ad", 0o040000, tree_ad.id),
  216. TreeEntry(b"ad/b", 0o100644, blob_b.id),
  217. TreeEntry(b"ad/bd", 0o040000, tree_bd.id),
  218. TreeEntry(b"ad/bd/c", 0o100755, blob_c.id),
  219. ]
  220. actual = iter_tree_contents(self.store, tree_id, include_trees=True)
  221. self.assertEqual(expected, list(actual))
  222. def make_tag(self, name, obj):
  223. tag = make_tag(obj, name=name)
  224. self.store.add_object(tag)
  225. return tag
  226. def test_peel_sha(self):
  227. self.store.add_object(testobject)
  228. tag1 = self.make_tag(b"1", testobject)
  229. tag2 = self.make_tag(b"2", testobject)
  230. tag3 = self.make_tag(b"3", testobject)
  231. for obj in [testobject, tag1, tag2, tag3]:
  232. self.assertEqual(testobject, peel_sha(self.store, obj.id))
  233. def test_get_raw(self):
  234. self.store.add_object(testobject)
  235. self.assertEqual(
  236. (Blob.type_num, b"yummy data"), self.store.get_raw(testobject.id)
  237. )
  238. def test_close(self):
  239. # For now, just check that close doesn't barf.
  240. self.store.add_object(testobject)
  241. self.store.close()
  242. class OverlayObjectStoreTests(ObjectStoreTests, TestCase):
  243. def setUp(self):
  244. TestCase.setUp(self)
  245. self.bases = [MemoryObjectStore(), MemoryObjectStore()]
  246. self.store = OverlayObjectStore(self.bases, self.bases[0])
  247. class MemoryObjectStoreTests(ObjectStoreTests, TestCase):
  248. def setUp(self):
  249. TestCase.setUp(self)
  250. self.store = MemoryObjectStore()
  251. def test_add_pack(self):
  252. o = MemoryObjectStore()
  253. f, commit, abort = o.add_pack()
  254. try:
  255. b = make_object(Blob, data=b"more yummy data")
  256. write_pack_objects(f.write, [(b, None)])
  257. except BaseException:
  258. abort()
  259. raise
  260. else:
  261. commit()
  262. def test_add_pack_emtpy(self):
  263. o = MemoryObjectStore()
  264. f, commit, abort = o.add_pack()
  265. commit()
  266. def test_add_thin_pack(self):
  267. o = MemoryObjectStore()
  268. blob = make_object(Blob, data=b"yummy data")
  269. o.add_object(blob)
  270. f = BytesIO()
  271. entries = build_pack(
  272. f,
  273. [
  274. (REF_DELTA, (blob.id, b"more yummy data")),
  275. ],
  276. store=o,
  277. )
  278. o.add_thin_pack(f.read, None)
  279. packed_blob_sha = sha_to_hex(entries[0][3])
  280. self.assertEqual(
  281. (Blob.type_num, b"more yummy data"), o.get_raw(packed_blob_sha)
  282. )
  283. def test_add_thin_pack_empty(self):
  284. o = MemoryObjectStore()
  285. f = BytesIO()
  286. entries = build_pack(f, [], store=o)
  287. self.assertEqual([], entries)
  288. o.add_thin_pack(f.read, None)
  289. class PackBasedObjectStoreTests(ObjectStoreTests):
  290. def tearDown(self):
  291. for pack in self.store.packs:
  292. pack.close()
  293. def test_empty_packs(self):
  294. self.assertEqual([], list(self.store.packs))
  295. def test_pack_loose_objects(self):
  296. b1 = make_object(Blob, data=b"yummy data")
  297. self.store.add_object(b1)
  298. b2 = make_object(Blob, data=b"more yummy data")
  299. self.store.add_object(b2)
  300. b3 = make_object(Blob, data=b"even more yummy data")
  301. b4 = make_object(Blob, data=b"and more yummy data")
  302. self.store.add_objects([(b3, None), (b4, None)])
  303. self.assertEqual({b1.id, b2.id, b3.id, b4.id}, set(self.store))
  304. self.assertEqual(1, len(self.store.packs))
  305. self.assertEqual(2, self.store.pack_loose_objects())
  306. self.assertNotEqual([], list(self.store.packs))
  307. self.assertEqual(0, self.store.pack_loose_objects())
  308. def test_repack(self):
  309. b1 = make_object(Blob, data=b"yummy data")
  310. self.store.add_object(b1)
  311. b2 = make_object(Blob, data=b"more yummy data")
  312. self.store.add_object(b2)
  313. b3 = make_object(Blob, data=b"even more yummy data")
  314. b4 = make_object(Blob, data=b"and more yummy data")
  315. self.store.add_objects([(b3, None), (b4, None)])
  316. b5 = make_object(Blob, data=b"and more data")
  317. b6 = make_object(Blob, data=b"and some more data")
  318. self.store.add_objects([(b5, None), (b6, None)])
  319. self.assertEqual({b1.id, b2.id, b3.id, b4.id, b5.id, b6.id}, set(self.store))
  320. self.assertEqual(2, len(self.store.packs))
  321. self.assertEqual(6, self.store.repack())
  322. self.assertEqual(1, len(self.store.packs))
  323. self.assertEqual(0, self.store.pack_loose_objects())
  324. def test_repack_existing(self):
  325. b1 = make_object(Blob, data=b"yummy data")
  326. self.store.add_object(b1)
  327. b2 = make_object(Blob, data=b"more yummy data")
  328. self.store.add_object(b2)
  329. self.store.add_objects([(b1, None), (b2, None)])
  330. self.store.add_objects([(b2, None)])
  331. self.assertEqual({b1.id, b2.id}, set(self.store))
  332. self.assertEqual(2, len(self.store.packs))
  333. self.assertEqual(2, self.store.repack())
  334. self.assertEqual(1, len(self.store.packs))
  335. self.assertEqual(0, self.store.pack_loose_objects())
  336. self.assertEqual({b1.id, b2.id}, set(self.store))
  337. self.assertEqual(1, len(self.store.packs))
  338. self.assertEqual(2, self.store.repack())
  339. self.assertEqual(1, len(self.store.packs))
  340. self.assertEqual(0, self.store.pack_loose_objects())
  341. class DiskObjectStoreTests(PackBasedObjectStoreTests, TestCase):
  342. def setUp(self):
  343. TestCase.setUp(self)
  344. self.store_dir = tempfile.mkdtemp()
  345. self.addCleanup(shutil.rmtree, self.store_dir)
  346. self.store = DiskObjectStore.init(self.store_dir)
  347. def tearDown(self):
  348. TestCase.tearDown(self)
  349. PackBasedObjectStoreTests.tearDown(self)
  350. def test_loose_compression_level(self):
  351. alternate_dir = tempfile.mkdtemp()
  352. self.addCleanup(shutil.rmtree, alternate_dir)
  353. alternate_store = DiskObjectStore(alternate_dir, loose_compression_level=6)
  354. b2 = make_object(Blob, data=b"yummy data")
  355. alternate_store.add_object(b2)
  356. def test_alternates(self):
  357. alternate_dir = tempfile.mkdtemp()
  358. self.addCleanup(shutil.rmtree, alternate_dir)
  359. alternate_store = DiskObjectStore(alternate_dir)
  360. b2 = make_object(Blob, data=b"yummy data")
  361. alternate_store.add_object(b2)
  362. store = DiskObjectStore(self.store_dir)
  363. self.assertRaises(KeyError, store.__getitem__, b2.id)
  364. store.add_alternate_path(alternate_dir)
  365. self.assertIn(b2.id, store)
  366. self.assertEqual(b2, store[b2.id])
  367. def test_read_alternate_paths(self):
  368. store = DiskObjectStore(self.store_dir)
  369. abs_path = os.path.abspath(os.path.normpath("/abspath"))
  370. # ensures in particular existence of the alternates file
  371. store.add_alternate_path(abs_path)
  372. self.assertEqual(set(store._read_alternate_paths()), {abs_path})
  373. store.add_alternate_path("relative-path")
  374. self.assertIn(
  375. os.path.join(store.path, "relative-path"),
  376. set(store._read_alternate_paths()),
  377. )
  378. # arguably, add_alternate_path() could strip comments.
  379. # Meanwhile it's more convenient to use it than to import INFODIR
  380. store.add_alternate_path("# comment")
  381. for alt_path in store._read_alternate_paths():
  382. self.assertNotIn("#", alt_path)
  383. def test_file_modes(self):
  384. self.store.add_object(testobject)
  385. path = self.store._get_shafile_path(testobject.id)
  386. mode = os.stat(path).st_mode
  387. packmode = "0o100444" if sys.platform != "win32" else "0o100666"
  388. self.assertEqual(oct(mode), packmode)
  389. def test_corrupted_object_raise_exception(self):
  390. """Corrupted sha1 disk file should raise specific exception"""
  391. self.store.add_object(testobject)
  392. self.assertEqual(
  393. (Blob.type_num, b"yummy data"), self.store.get_raw(testobject.id)
  394. )
  395. self.assertTrue(self.store.contains_loose(testobject.id))
  396. self.assertIsNotNone(self.store._get_loose_object(testobject.id))
  397. path = self.store._get_shafile_path(testobject.id)
  398. old_mode = os.stat(path).st_mode
  399. os.chmod(path, 0o600)
  400. with open(path, "wb") as f: # corrupt the file
  401. f.write(b"")
  402. os.chmod(path, old_mode)
  403. expected_error_msg = "Corrupted empty file detected"
  404. try:
  405. self.store.contains_loose(testobject.id)
  406. except EmptyFileException as e:
  407. self.assertEqual(str(e), expected_error_msg)
  408. try:
  409. self.store._get_loose_object(testobject.id)
  410. except EmptyFileException as e:
  411. self.assertEqual(str(e), expected_error_msg)
  412. # this does not change iteration on loose objects though
  413. self.assertEqual([testobject.id], list(self.store._iter_loose_objects()))
  414. def test_tempfile_in_loose_store(self):
  415. self.store.add_object(testobject)
  416. self.assertEqual([testobject.id], list(self.store._iter_loose_objects()))
  417. # add temporary files to the loose store
  418. for i in range(256):
  419. dirname = os.path.join(self.store_dir, "%02x" % i)
  420. if not os.path.isdir(dirname):
  421. os.makedirs(dirname)
  422. fd, n = tempfile.mkstemp(prefix="tmp_obj_", dir=dirname)
  423. os.close(fd)
  424. self.assertEqual([testobject.id], list(self.store._iter_loose_objects()))
  425. def test_add_alternate_path(self):
  426. store = DiskObjectStore(self.store_dir)
  427. self.assertEqual([], list(store._read_alternate_paths()))
  428. store.add_alternate_path("/foo/path")
  429. self.assertEqual(["/foo/path"], list(store._read_alternate_paths()))
  430. store.add_alternate_path("/bar/path")
  431. self.assertEqual(
  432. ["/foo/path", "/bar/path"], list(store._read_alternate_paths())
  433. )
  434. def test_rel_alternative_path(self):
  435. alternate_dir = tempfile.mkdtemp()
  436. self.addCleanup(shutil.rmtree, alternate_dir)
  437. alternate_store = DiskObjectStore(alternate_dir)
  438. b2 = make_object(Blob, data=b"yummy data")
  439. alternate_store.add_object(b2)
  440. store = DiskObjectStore(self.store_dir)
  441. self.assertRaises(KeyError, store.__getitem__, b2.id)
  442. store.add_alternate_path(os.path.relpath(alternate_dir, self.store_dir))
  443. self.assertEqual(list(alternate_store), list(store.alternates[0]))
  444. self.assertIn(b2.id, store)
  445. self.assertEqual(b2, store[b2.id])
  446. def test_pack_dir(self):
  447. o = DiskObjectStore(self.store_dir)
  448. self.assertEqual(os.path.join(self.store_dir, "pack"), o.pack_dir)
  449. def test_add_pack(self):
  450. o = DiskObjectStore(self.store_dir)
  451. f, commit, abort = o.add_pack()
  452. try:
  453. b = make_object(Blob, data=b"more yummy data")
  454. write_pack_objects(f.write, [(b, None)])
  455. except BaseException:
  456. abort()
  457. raise
  458. else:
  459. commit()
  460. def test_add_thin_pack(self):
  461. o = DiskObjectStore(self.store_dir)
  462. try:
  463. blob = make_object(Blob, data=b"yummy data")
  464. o.add_object(blob)
  465. f = BytesIO()
  466. entries = build_pack(
  467. f,
  468. [
  469. (REF_DELTA, (blob.id, b"more yummy data")),
  470. ],
  471. store=o,
  472. )
  473. with o.add_thin_pack(f.read, None) as pack:
  474. packed_blob_sha = sha_to_hex(entries[0][3])
  475. pack.check_length_and_checksum()
  476. self.assertEqual(sorted([blob.id, packed_blob_sha]), list(pack))
  477. self.assertTrue(o.contains_packed(packed_blob_sha))
  478. self.assertTrue(o.contains_packed(blob.id))
  479. self.assertEqual(
  480. (Blob.type_num, b"more yummy data"),
  481. o.get_raw(packed_blob_sha),
  482. )
  483. finally:
  484. o.close()
  485. def test_add_thin_pack_empty(self):
  486. with closing(DiskObjectStore(self.store_dir)) as o:
  487. f = BytesIO()
  488. entries = build_pack(f, [], store=o)
  489. self.assertEqual([], entries)
  490. o.add_thin_pack(f.read, None)
  491. class TreeLookupPathTests(TestCase):
  492. def setUp(self):
  493. TestCase.setUp(self)
  494. self.store = MemoryObjectStore()
  495. blob_a = make_object(Blob, data=b"a")
  496. blob_b = make_object(Blob, data=b"b")
  497. blob_c = make_object(Blob, data=b"c")
  498. for blob in [blob_a, blob_b, blob_c]:
  499. self.store.add_object(blob)
  500. blobs = [
  501. (b"a", blob_a.id, 0o100644),
  502. (b"ad/b", blob_b.id, 0o100644),
  503. (b"ad/bd/c", blob_c.id, 0o100755),
  504. (b"ad/c", blob_c.id, 0o100644),
  505. (b"c", blob_c.id, 0o100644),
  506. (b"d", blob_c.id, S_IFGITLINK),
  507. ]
  508. self.tree_id = commit_tree(self.store, blobs)
  509. def get_object(self, sha):
  510. return self.store[sha]
  511. def test_lookup_blob(self):
  512. o_id = tree_lookup_path(self.get_object, self.tree_id, b"a")[1]
  513. self.assertIsInstance(self.store[o_id], Blob)
  514. def test_lookup_tree(self):
  515. o_id = tree_lookup_path(self.get_object, self.tree_id, b"ad")[1]
  516. self.assertIsInstance(self.store[o_id], Tree)
  517. o_id = tree_lookup_path(self.get_object, self.tree_id, b"ad/bd")[1]
  518. self.assertIsInstance(self.store[o_id], Tree)
  519. o_id = tree_lookup_path(self.get_object, self.tree_id, b"ad/bd/")[1]
  520. self.assertIsInstance(self.store[o_id], Tree)
  521. def test_lookup_submodule(self):
  522. tree_lookup_path(self.get_object, self.tree_id, b"d")[1]
  523. self.assertRaises(
  524. SubmoduleEncountered, tree_lookup_path, self.get_object,
  525. self.tree_id, b"d/a")
  526. def test_lookup_nonexistent(self):
  527. self.assertRaises(
  528. KeyError, tree_lookup_path, self.get_object, self.tree_id, b"j"
  529. )
  530. def test_lookup_not_tree(self):
  531. self.assertRaises(
  532. NotTreeError,
  533. tree_lookup_path,
  534. self.get_object,
  535. self.tree_id,
  536. b"ad/b/j",
  537. )
  538. class ObjectStoreGraphWalkerTests(TestCase):
  539. def get_walker(self, heads, parent_map):
  540. new_parent_map = {
  541. k * 40: [(p * 40) for p in ps] for (k, ps) in parent_map.items()
  542. }
  543. return ObjectStoreGraphWalker(
  544. [x * 40 for x in heads], new_parent_map.__getitem__
  545. )
  546. def test_ack_invalid_value(self):
  547. gw = self.get_walker([], {})
  548. self.assertRaises(ValueError, gw.ack, "tooshort")
  549. def test_empty(self):
  550. gw = self.get_walker([], {})
  551. self.assertIs(None, next(gw))
  552. gw.ack(b"a" * 40)
  553. self.assertIs(None, next(gw))
  554. def test_descends(self):
  555. gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []})
  556. self.assertEqual(b"a" * 40, next(gw))
  557. self.assertEqual(b"b" * 40, next(gw))
  558. def test_present(self):
  559. gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []})
  560. gw.ack(b"a" * 40)
  561. self.assertIs(None, next(gw))
  562. def test_parent_present(self):
  563. gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": []})
  564. self.assertEqual(b"a" * 40, next(gw))
  565. gw.ack(b"a" * 40)
  566. self.assertIs(None, next(gw))
  567. def test_child_ack_later(self):
  568. gw = self.get_walker([b"a"], {b"a": [b"b"], b"b": [b"c"], b"c": []})
  569. self.assertEqual(b"a" * 40, next(gw))
  570. self.assertEqual(b"b" * 40, next(gw))
  571. gw.ack(b"a" * 40)
  572. self.assertIs(None, next(gw))
  573. def test_only_once(self):
  574. # a b
  575. # | |
  576. # c d
  577. # \ /
  578. # e
  579. gw = self.get_walker(
  580. [b"a", b"b"],
  581. {
  582. b"a": [b"c"],
  583. b"b": [b"d"],
  584. b"c": [b"e"],
  585. b"d": [b"e"],
  586. b"e": [],
  587. },
  588. )
  589. walk = []
  590. acked = False
  591. walk.append(next(gw))
  592. walk.append(next(gw))
  593. # A branch (a, c) or (b, d) may be done after 2 steps or 3 depending on
  594. # the order walked: 3-step walks include (a, b, c) and (b, a, d), etc.
  595. if walk == [b"a" * 40, b"c" * 40] or walk == [b"b" * 40, b"d" * 40]:
  596. gw.ack(walk[0])
  597. acked = True
  598. walk.append(next(gw))
  599. if not acked and walk[2] == b"c" * 40:
  600. gw.ack(b"a" * 40)
  601. elif not acked and walk[2] == b"d" * 40:
  602. gw.ack(b"b" * 40)
  603. walk.append(next(gw))
  604. self.assertIs(None, next(gw))
  605. self.assertEqual([b"a" * 40, b"b" * 40, b"c" * 40, b"d" * 40], sorted(walk))
  606. self.assertLess(walk.index(b"a" * 40), walk.index(b"c" * 40))
  607. self.assertLess(walk.index(b"b" * 40), walk.index(b"d" * 40))
  608. class CommitTreeChangesTests(TestCase):
  609. def setUp(self):
  610. super().setUp()
  611. self.store = MemoryObjectStore()
  612. self.blob_a = make_object(Blob, data=b"a")
  613. self.blob_b = make_object(Blob, data=b"b")
  614. self.blob_c = make_object(Blob, data=b"c")
  615. for blob in [self.blob_a, self.blob_b, self.blob_c]:
  616. self.store.add_object(blob)
  617. blobs = [
  618. (b"a", self.blob_a.id, 0o100644),
  619. (b"ad/b", self.blob_b.id, 0o100644),
  620. (b"ad/bd/c", self.blob_c.id, 0o100755),
  621. (b"ad/c", self.blob_c.id, 0o100644),
  622. (b"c", self.blob_c.id, 0o100644),
  623. ]
  624. self.tree_id = commit_tree(self.store, blobs)
  625. def test_no_changes(self):
  626. self.assertEqual(
  627. self.store[self.tree_id],
  628. commit_tree_changes(self.store, self.store[self.tree_id], []),
  629. )
  630. def test_add_blob(self):
  631. blob_d = make_object(Blob, data=b"d")
  632. new_tree = commit_tree_changes(
  633. self.store, self.store[self.tree_id], [(b"d", 0o100644, blob_d.id)]
  634. )
  635. self.assertEqual(
  636. new_tree[b"d"],
  637. (33188, b"c59d9b6344f1af00e504ba698129f07a34bbed8d"),
  638. )
  639. def test_add_blob_in_dir(self):
  640. blob_d = make_object(Blob, data=b"d")
  641. new_tree = commit_tree_changes(
  642. self.store,
  643. self.store[self.tree_id],
  644. [(b"e/f/d", 0o100644, blob_d.id)],
  645. )
  646. self.assertEqual(
  647. new_tree.items(),
  648. [
  649. TreeEntry(path=b"a", mode=stat.S_IFREG | 0o100644, sha=self.blob_a.id),
  650. TreeEntry(
  651. path=b"ad",
  652. mode=stat.S_IFDIR,
  653. sha=b"0e2ce2cd7725ff4817791be31ccd6e627e801f4a",
  654. ),
  655. TreeEntry(path=b"c", mode=stat.S_IFREG | 0o100644, sha=self.blob_c.id),
  656. TreeEntry(
  657. path=b"e",
  658. mode=stat.S_IFDIR,
  659. sha=b"6ab344e288724ac2fb38704728b8896e367ed108",
  660. ),
  661. ],
  662. )
  663. e_tree = self.store[new_tree[b"e"][1]]
  664. self.assertEqual(
  665. e_tree.items(),
  666. [
  667. TreeEntry(
  668. path=b"f",
  669. mode=stat.S_IFDIR,
  670. sha=b"24d2c94d8af232b15a0978c006bf61ef4479a0a5",
  671. )
  672. ],
  673. )
  674. f_tree = self.store[e_tree[b"f"][1]]
  675. self.assertEqual(
  676. f_tree.items(),
  677. [TreeEntry(path=b"d", mode=stat.S_IFREG | 0o100644, sha=blob_d.id)],
  678. )
  679. def test_delete_blob(self):
  680. new_tree = commit_tree_changes(
  681. self.store, self.store[self.tree_id], [(b"ad/bd/c", None, None)]
  682. )
  683. self.assertEqual(set(new_tree), {b"a", b"ad", b"c"})
  684. ad_tree = self.store[new_tree[b"ad"][1]]
  685. self.assertEqual(set(ad_tree), {b"b", b"c"})
  686. class TestReadPacksFile(TestCase):
  687. def test_read_packs(self):
  688. self.assertEqual(
  689. ["pack-1.pack"],
  690. list(
  691. read_packs_file(
  692. BytesIO(
  693. b"""P pack-1.pack
  694. """
  695. )
  696. )
  697. ),
  698. )