test_pack.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. # test_pack.py -- Tests for the handling of git packs.
  2. # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
  3. # Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU General Public License
  7. # as published by the Free Software Foundation; version 2
  8. # of the License, or (at your option) any later version of the license.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  18. # MA 02110-1301, USA.
  19. """Tests for Dulwich packs."""
  20. from cStringIO import StringIO
  21. import os
  22. import shutil
  23. import tempfile
  24. import zlib
  25. from dulwich.errors import (
  26. ChecksumMismatch,
  27. )
  28. from dulwich.file import (
  29. GitFile,
  30. )
  31. from dulwich.objects import (
  32. hex_to_sha,
  33. sha_to_hex,
  34. Tree,
  35. )
  36. from dulwich.pack import (
  37. Pack,
  38. PackData,
  39. ThinPackData,
  40. apply_delta,
  41. create_delta,
  42. load_pack_index,
  43. read_zlib_chunks,
  44. write_pack_header,
  45. write_pack_index_v1,
  46. write_pack_index_v2,
  47. write_pack,
  48. )
  49. from dulwich.tests import (
  50. TestCase,
  51. )
  52. pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
  53. a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
  54. tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
  55. commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
  56. class PackTests(TestCase):
  57. """Base class for testing packs"""
  58. def setUp(self):
  59. super(PackTests, self).setUp()
  60. self.tempdir = tempfile.mkdtemp()
  61. def tearDown(self):
  62. shutil.rmtree(self.tempdir)
  63. super(PackTests, self).tearDown()
  64. datadir = os.path.join(os.path.dirname(__file__), 'data/packs')
  65. def get_pack_index(self, sha):
  66. """Returns a PackIndex from the datadir with the given sha"""
  67. return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
  68. def get_pack_data(self, sha):
  69. """Returns a PackData object from the datadir with the given sha"""
  70. return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
  71. def get_pack(self, sha):
  72. return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
  73. def assertSucceeds(self, func, *args, **kwargs):
  74. try:
  75. func(*args, **kwargs)
  76. except ChecksumMismatch, e:
  77. self.fail(e)
  78. class PackIndexTests(PackTests):
  79. """Class that tests the index of packfiles"""
  80. def test_object_index(self):
  81. """Tests that the correct object offset is returned from the index."""
  82. p = self.get_pack_index(pack1_sha)
  83. self.assertRaises(KeyError, p.object_index, pack1_sha)
  84. self.assertEqual(p.object_index(a_sha), 178)
  85. self.assertEqual(p.object_index(tree_sha), 138)
  86. self.assertEqual(p.object_index(commit_sha), 12)
  87. def test_index_len(self):
  88. p = self.get_pack_index(pack1_sha)
  89. self.assertEquals(3, len(p))
  90. def test_get_stored_checksum(self):
  91. p = self.get_pack_index(pack1_sha)
  92. self.assertEquals('f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
  93. sha_to_hex(p.get_stored_checksum()))
  94. self.assertEquals('721980e866af9a5f93ad674144e1459b8ba3e7b7',
  95. sha_to_hex(p.get_pack_checksum()))
  96. def test_index_check(self):
  97. p = self.get_pack_index(pack1_sha)
  98. self.assertSucceeds(p.check)
  99. def test_iterentries(self):
  100. p = self.get_pack_index(pack1_sha)
  101. entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
  102. self.assertEquals([
  103. ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
  104. ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
  105. ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
  106. ], entries)
  107. def test_iter(self):
  108. p = self.get_pack_index(pack1_sha)
  109. self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
  110. class TestPackDeltas(TestCase):
  111. test_string1 = 'The answer was flailing in the wind'
  112. test_string2 = 'The answer was falling down the pipe'
  113. test_string3 = 'zzzzz'
  114. test_string_empty = ''
  115. test_string_big = 'Z' * 8192
  116. def _test_roundtrip(self, base, target):
  117. self.assertEquals(target,
  118. ''.join(apply_delta(base, create_delta(base, target))))
  119. def test_nochange(self):
  120. self._test_roundtrip(self.test_string1, self.test_string1)
  121. def test_change(self):
  122. self._test_roundtrip(self.test_string1, self.test_string2)
  123. def test_rewrite(self):
  124. self._test_roundtrip(self.test_string1, self.test_string3)
  125. def test_overflow(self):
  126. self._test_roundtrip(self.test_string_empty, self.test_string_big)
  127. class TestPackData(PackTests):
  128. """Tests getting the data from the packfile."""
  129. def test_create_pack(self):
  130. p = self.get_pack_data(pack1_sha)
  131. def test_from_file(self):
  132. path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha)
  133. PackData.from_file(open(path), os.path.getsize(path))
  134. # TODO: more ThinPackData tests.
  135. def test_thin_from_file(self):
  136. test_sha = '1' * 40
  137. def resolve(sha):
  138. self.assertEqual(test_sha, sha)
  139. return 3, 'data'
  140. path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha)
  141. data = ThinPackData.from_file(resolve, open(path),
  142. os.path.getsize(path))
  143. idx = self.get_pack_index(pack1_sha)
  144. Pack.from_objects(data, idx)
  145. self.assertEqual((None, 3, 'data'), data.get_ref(test_sha))
  146. def test_pack_len(self):
  147. p = self.get_pack_data(pack1_sha)
  148. self.assertEquals(3, len(p))
  149. def test_index_check(self):
  150. p = self.get_pack_data(pack1_sha)
  151. self.assertSucceeds(p.check)
  152. def test_iterobjects(self):
  153. p = self.get_pack_data(pack1_sha)
  154. commit_data = ('tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
  155. 'author James Westby <jw+debian@jameswestby.net> '
  156. '1174945067 +0100\n'
  157. 'committer James Westby <jw+debian@jameswestby.net> '
  158. '1174945067 +0100\n'
  159. '\n'
  160. 'Test commit\n')
  161. blob_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
  162. tree_data = '100644 a\0%s' % hex_to_sha(blob_sha)
  163. actual = []
  164. for offset, type_num, chunks, crc32 in p.iterobjects():
  165. actual.append((offset, type_num, ''.join(chunks), crc32))
  166. self.assertEquals([
  167. (12, 1, commit_data, 3775879613L),
  168. (138, 2, tree_data, 912998690L),
  169. (178, 3, 'test 1\n', 1373561701L)
  170. ], actual)
  171. def test_iterentries(self):
  172. p = self.get_pack_data(pack1_sha)
  173. entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries())
  174. self.assertEquals(set([
  175. ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701L),
  176. ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690L),
  177. ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613L),
  178. ]), entries)
  179. def test_create_index_v1(self):
  180. p = self.get_pack_data(pack1_sha)
  181. filename = os.path.join(self.tempdir, 'v1test.idx')
  182. p.create_index_v1(filename)
  183. idx1 = load_pack_index(filename)
  184. idx2 = self.get_pack_index(pack1_sha)
  185. self.assertEquals(idx1, idx2)
  186. def test_create_index_v2(self):
  187. p = self.get_pack_data(pack1_sha)
  188. filename = os.path.join(self.tempdir, 'v2test.idx')
  189. p.create_index_v2(filename)
  190. idx1 = load_pack_index(filename)
  191. idx2 = self.get_pack_index(pack1_sha)
  192. self.assertEquals(idx1, idx2)
  193. class TestPack(PackTests):
  194. def test_len(self):
  195. p = self.get_pack(pack1_sha)
  196. self.assertEquals(3, len(p))
  197. def test_contains(self):
  198. p = self.get_pack(pack1_sha)
  199. self.assertTrue(tree_sha in p)
  200. def test_get(self):
  201. p = self.get_pack(pack1_sha)
  202. self.assertEquals(type(p[tree_sha]), Tree)
  203. def test_iter(self):
  204. p = self.get_pack(pack1_sha)
  205. self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
  206. def test_get_object_at(self):
  207. """Tests random access for non-delta objects"""
  208. p = self.get_pack(pack1_sha)
  209. obj = p[a_sha]
  210. self.assertEqual(obj.type_name, 'blob')
  211. self.assertEqual(obj.sha().hexdigest(), a_sha)
  212. obj = p[tree_sha]
  213. self.assertEqual(obj.type_name, 'tree')
  214. self.assertEqual(obj.sha().hexdigest(), tree_sha)
  215. obj = p[commit_sha]
  216. self.assertEqual(obj.type_name, 'commit')
  217. self.assertEqual(obj.sha().hexdigest(), commit_sha)
  218. def test_copy(self):
  219. origpack = self.get_pack(pack1_sha)
  220. self.assertSucceeds(origpack.index.check)
  221. basename = os.path.join(self.tempdir, 'Elch')
  222. write_pack(basename, [(x, '') for x in origpack.iterobjects()],
  223. len(origpack))
  224. newpack = Pack(basename)
  225. self.assertEquals(origpack, newpack)
  226. self.assertSucceeds(newpack.index.check)
  227. self.assertEquals(origpack.name(), newpack.name())
  228. self.assertEquals(origpack.index.get_pack_checksum(),
  229. newpack.index.get_pack_checksum())
  230. wrong_version = origpack.index.version != newpack.index.version
  231. orig_checksum = origpack.index.get_stored_checksum()
  232. new_checksum = newpack.index.get_stored_checksum()
  233. self.assertTrue(wrong_version or orig_checksum == new_checksum)
  234. def test_commit_obj(self):
  235. p = self.get_pack(pack1_sha)
  236. commit = p[commit_sha]
  237. self.assertEquals('James Westby <jw+debian@jameswestby.net>',
  238. commit.author)
  239. self.assertEquals([], commit.parents)
  240. def test_name(self):
  241. p = self.get_pack(pack1_sha)
  242. self.assertEquals(pack1_sha, p.name())
  243. class WritePackHeaderTests(TestCase):
  244. def test_simple(self):
  245. f = StringIO()
  246. write_pack_header(f, 42)
  247. self.assertEquals('PACK\x00\x00\x00\x02\x00\x00\x00*',
  248. f.getvalue())
  249. pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')
  250. class BaseTestPackIndexWriting(object):
  251. def setUp(self):
  252. self.tempdir = tempfile.mkdtemp()
  253. def tearDown(self):
  254. shutil.rmtree(self.tempdir)
  255. def assertSucceeds(self, func, *args, **kwargs):
  256. try:
  257. func(*args, **kwargs)
  258. except ChecksumMismatch, e:
  259. self.fail(e)
  260. def writeIndex(self, filename, entries, pack_checksum):
  261. # FIXME: Write to StringIO instead rather than hitting disk ?
  262. f = GitFile(filename, "wb")
  263. try:
  264. self._write_fn(f, entries, pack_checksum)
  265. finally:
  266. f.close()
  267. def test_empty(self):
  268. filename = os.path.join(self.tempdir, 'empty.idx')
  269. self.writeIndex(filename, [], pack_checksum)
  270. idx = load_pack_index(filename)
  271. self.assertSucceeds(idx.check)
  272. self.assertEquals(idx.get_pack_checksum(), pack_checksum)
  273. self.assertEquals(0, len(idx))
  274. def test_single(self):
  275. entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
  276. my_entries = [(entry_sha, 178, 42)]
  277. filename = os.path.join(self.tempdir, 'single.idx')
  278. self.writeIndex(filename, my_entries, pack_checksum)
  279. idx = load_pack_index(filename)
  280. self.assertEquals(idx.version, self._expected_version)
  281. self.assertSucceeds(idx.check)
  282. self.assertEquals(idx.get_pack_checksum(), pack_checksum)
  283. self.assertEquals(1, len(idx))
  284. actual_entries = list(idx.iterentries())
  285. self.assertEquals(len(my_entries), len(actual_entries))
  286. for mine, actual in zip(my_entries, actual_entries):
  287. my_sha, my_offset, my_crc = mine
  288. actual_sha, actual_offset, actual_crc = actual
  289. self.assertEquals(my_sha, actual_sha)
  290. self.assertEquals(my_offset, actual_offset)
  291. if self._has_crc32_checksum:
  292. self.assertEquals(my_crc, actual_crc)
  293. else:
  294. self.assertTrue(actual_crc is None)
  295. class TestPackIndexWritingv1(TestCase, BaseTestPackIndexWriting):
  296. def setUp(self):
  297. TestCase.setUp(self)
  298. BaseTestPackIndexWriting.setUp(self)
  299. self._has_crc32_checksum = False
  300. self._expected_version = 1
  301. self._write_fn = write_pack_index_v1
  302. def tearDown(self):
  303. TestCase.tearDown(self)
  304. BaseTestPackIndexWriting.tearDown(self)
  305. class TestPackIndexWritingv2(TestCase, BaseTestPackIndexWriting):
  306. def setUp(self):
  307. TestCase.setUp(self)
  308. BaseTestPackIndexWriting.setUp(self)
  309. self._has_crc32_checksum = True
  310. self._expected_version = 2
  311. self._write_fn = write_pack_index_v2
  312. def tearDown(self):
  313. TestCase.tearDown(self)
  314. BaseTestPackIndexWriting.tearDown(self)
  315. class ReadZlibTests(TestCase):
  316. decomp = (
  317. 'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
  318. 'parent None\n'
  319. 'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
  320. 'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
  321. '\n'
  322. "Provide replacement for mmap()'s offset argument.")
  323. comp = zlib.compress(decomp)
  324. extra = 'nextobject'
  325. def setUp(self):
  326. super(ReadZlibTests, self).setUp()
  327. self.read = StringIO(self.comp + self.extra).read
  328. def test_decompress_size(self):
  329. good_decomp_len = len(self.decomp)
  330. self.assertRaises(ValueError, read_zlib_chunks, self.read, -1)
  331. self.assertRaises(zlib.error, read_zlib_chunks, self.read,
  332. good_decomp_len - 1)
  333. self.assertRaises(zlib.error, read_zlib_chunks, self.read,
  334. good_decomp_len + 1)
  335. def test_decompress_truncated(self):
  336. read = StringIO(self.comp[:10]).read
  337. self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
  338. read = StringIO(self.comp).read
  339. self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
  340. def test_decompress_empty(self):
  341. comp = zlib.compress('')
  342. read = StringIO(comp + self.extra).read
  343. decomp, comp_len, unused_data = read_zlib_chunks(read, 0)
  344. self.assertEqual('', ''.join(decomp))
  345. self.assertEqual(len(comp), comp_len)
  346. self.assertNotEquals('', unused_data)
  347. self.assertEquals(self.extra, unused_data + read())
  348. def _do_decompress_test(self, buffer_size):
  349. decomp, comp_len, unused_data = read_zlib_chunks(
  350. self.read, len(self.decomp), buffer_size=buffer_size)
  351. self.assertEquals(self.decomp, ''.join(decomp))
  352. self.assertEquals(len(self.comp), comp_len)
  353. self.assertNotEquals('', unused_data)
  354. self.assertEquals(self.extra, unused_data + self.read())
  355. def test_simple_decompress(self):
  356. self._do_decompress_test(4096)
  357. # These buffer sizes are not intended to be realistic, but rather simulate
  358. # larger buffer sizes that may end at various places.
  359. def test_decompress_buffer_size_1(self):
  360. self._do_decompress_test(1)
  361. def test_decompress_buffer_size_2(self):
  362. self._do_decompress_test(2)
  363. def test_decompress_buffer_size_3(self):
  364. self._do_decompress_test(3)
  365. def test_decompress_buffer_size_4(self):
  366. self._do_decompress_test(4)