test_pack.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. # test_pack.py -- Tests for the handling of git packs.
  2. # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
  3. # Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU General Public License
  7. # as published by the Free Software Foundation; version 2
  8. # of the License, or (at your option) any later version of the license.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  18. # MA 02110-1301, USA.
  19. """Tests for Dulwich packs."""
  20. from cStringIO import StringIO
  21. import os
  22. import shutil
  23. import tempfile
  24. import zlib
  25. from dulwich.errors import (
  26. ChecksumMismatch,
  27. )
  28. from dulwich.objects import (
  29. hex_to_sha,
  30. sha_to_hex,
  31. Tree,
  32. )
  33. from dulwich.pack import (
  34. Pack,
  35. PackData,
  36. apply_delta,
  37. create_delta,
  38. load_pack_index,
  39. read_zlib_chunks,
  40. write_pack_index_v1,
  41. write_pack_index_v2,
  42. write_pack,
  43. )
  44. from dulwich.tests import (
  45. TestCase,
  46. )
  47. pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
  48. a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
  49. tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
  50. commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
  51. class PackTests(TestCase):
  52. """Base class for testing packs"""
  53. def setUp(self):
  54. super(PackTests, self).setUp()
  55. self.tempdir = tempfile.mkdtemp()
  56. def tearDown(self):
  57. shutil.rmtree(self.tempdir)
  58. super(PackTests, self).tearDown()
  59. datadir = os.path.join(os.path.dirname(__file__), 'data/packs')
  60. def get_pack_index(self, sha):
  61. """Returns a PackIndex from the datadir with the given sha"""
  62. return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
  63. def get_pack_data(self, sha):
  64. """Returns a PackData object from the datadir with the given sha"""
  65. return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
  66. def get_pack(self, sha):
  67. return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
  68. def assertSucceeds(self, func, *args, **kwargs):
  69. try:
  70. func(*args, **kwargs)
  71. except ChecksumMismatch, e:
  72. self.fail(e)
  73. class PackIndexTests(PackTests):
  74. """Class that tests the index of packfiles"""
  75. def test_object_index(self):
  76. """Tests that the correct object offset is returned from the index."""
  77. p = self.get_pack_index(pack1_sha)
  78. self.assertRaises(KeyError, p.object_index, pack1_sha)
  79. self.assertEqual(p.object_index(a_sha), 178)
  80. self.assertEqual(p.object_index(tree_sha), 138)
  81. self.assertEqual(p.object_index(commit_sha), 12)
  82. def test_index_len(self):
  83. p = self.get_pack_index(pack1_sha)
  84. self.assertEquals(3, len(p))
  85. def test_get_stored_checksum(self):
  86. p = self.get_pack_index(pack1_sha)
  87. self.assertEquals('f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
  88. sha_to_hex(p.get_stored_checksum()))
  89. self.assertEquals('721980e866af9a5f93ad674144e1459b8ba3e7b7',
  90. sha_to_hex(p.get_pack_checksum()))
  91. def test_index_check(self):
  92. p = self.get_pack_index(pack1_sha)
  93. self.assertSucceeds(p.check)
  94. def test_iterentries(self):
  95. p = self.get_pack_index(pack1_sha)
  96. entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
  97. self.assertEquals([
  98. ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
  99. ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
  100. ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
  101. ], entries)
  102. def test_iter(self):
  103. p = self.get_pack_index(pack1_sha)
  104. self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
  105. class TestPackDeltas(TestCase):
  106. test_string1 = 'The answer was flailing in the wind'
  107. test_string2 = 'The answer was falling down the pipe'
  108. test_string3 = 'zzzzz'
  109. test_string_empty = ''
  110. test_string_big = 'Z' * 8192
  111. def _test_roundtrip(self, base, target):
  112. self.assertEquals(target,
  113. ''.join(apply_delta(base, create_delta(base, target))))
  114. def test_nochange(self):
  115. self._test_roundtrip(self.test_string1, self.test_string1)
  116. def test_change(self):
  117. self._test_roundtrip(self.test_string1, self.test_string2)
  118. def test_rewrite(self):
  119. self._test_roundtrip(self.test_string1, self.test_string3)
  120. def test_overflow(self):
  121. self._test_roundtrip(self.test_string_empty, self.test_string_big)
  122. class TestPackData(PackTests):
  123. """Tests getting the data from the packfile."""
  124. def test_create_pack(self):
  125. p = self.get_pack_data(pack1_sha)
  126. def test_pack_len(self):
  127. p = self.get_pack_data(pack1_sha)
  128. self.assertEquals(3, len(p))
  129. def test_index_check(self):
  130. p = self.get_pack_data(pack1_sha)
  131. self.assertSucceeds(p.check)
  132. def test_iterobjects(self):
  133. p = self.get_pack_data(pack1_sha)
  134. commit_data = ('tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
  135. 'author James Westby <jw+debian@jameswestby.net> '
  136. '1174945067 +0100\n'
  137. 'committer James Westby <jw+debian@jameswestby.net> '
  138. '1174945067 +0100\n'
  139. '\n'
  140. 'Test commit\n')
  141. blob_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
  142. tree_data = '100644 a\0%s' % hex_to_sha(blob_sha)
  143. actual = []
  144. for offset, type_num, chunks, crc32 in p.iterobjects():
  145. actual.append((offset, type_num, ''.join(chunks), crc32))
  146. self.assertEquals([
  147. (12, 1, commit_data, 3775879613L),
  148. (138, 2, tree_data, 912998690L),
  149. (178, 3, 'test 1\n', 1373561701L)
  150. ], actual)
  151. def test_iterentries(self):
  152. p = self.get_pack_data(pack1_sha)
  153. entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries())
  154. self.assertEquals(set([
  155. ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701L),
  156. ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690L),
  157. ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613L),
  158. ]), entries)
  159. def test_create_index_v1(self):
  160. p = self.get_pack_data(pack1_sha)
  161. filename = os.path.join(self.tempdir, 'v1test.idx')
  162. p.create_index_v1(filename)
  163. idx1 = load_pack_index(filename)
  164. idx2 = self.get_pack_index(pack1_sha)
  165. self.assertEquals(idx1, idx2)
  166. def test_create_index_v2(self):
  167. p = self.get_pack_data(pack1_sha)
  168. filename = os.path.join(self.tempdir, 'v2test.idx')
  169. p.create_index_v2(filename)
  170. idx1 = load_pack_index(filename)
  171. idx2 = self.get_pack_index(pack1_sha)
  172. self.assertEquals(idx1, idx2)
  173. class TestPack(PackTests):
  174. def test_len(self):
  175. p = self.get_pack(pack1_sha)
  176. self.assertEquals(3, len(p))
  177. def test_contains(self):
  178. p = self.get_pack(pack1_sha)
  179. self.assertTrue(tree_sha in p)
  180. def test_get(self):
  181. p = self.get_pack(pack1_sha)
  182. self.assertEquals(type(p[tree_sha]), Tree)
  183. def test_iter(self):
  184. p = self.get_pack(pack1_sha)
  185. self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
  186. def test_get_object_at(self):
  187. """Tests random access for non-delta objects"""
  188. p = self.get_pack(pack1_sha)
  189. obj = p[a_sha]
  190. self.assertEqual(obj.type_name, 'blob')
  191. self.assertEqual(obj.sha().hexdigest(), a_sha)
  192. obj = p[tree_sha]
  193. self.assertEqual(obj.type_name, 'tree')
  194. self.assertEqual(obj.sha().hexdigest(), tree_sha)
  195. obj = p[commit_sha]
  196. self.assertEqual(obj.type_name, 'commit')
  197. self.assertEqual(obj.sha().hexdigest(), commit_sha)
  198. def test_copy(self):
  199. origpack = self.get_pack(pack1_sha)
  200. self.assertSucceeds(origpack.index.check)
  201. basename = os.path.join(self.tempdir, 'Elch')
  202. write_pack(basename, [(x, '') for x in origpack.iterobjects()],
  203. len(origpack))
  204. newpack = Pack(basename)
  205. self.assertEquals(origpack, newpack)
  206. self.assertSucceeds(newpack.index.check)
  207. self.assertEquals(origpack.name(), newpack.name())
  208. self.assertEquals(origpack.index.get_pack_checksum(),
  209. newpack.index.get_pack_checksum())
  210. wrong_version = origpack.index.version != newpack.index.version
  211. orig_checksum = origpack.index.get_stored_checksum()
  212. new_checksum = newpack.index.get_stored_checksum()
  213. self.assertTrue(wrong_version or orig_checksum == new_checksum)
  214. def test_commit_obj(self):
  215. p = self.get_pack(pack1_sha)
  216. commit = p[commit_sha]
  217. self.assertEquals('James Westby <jw+debian@jameswestby.net>',
  218. commit.author)
  219. self.assertEquals([], commit.parents)
  220. def test_name(self):
  221. p = self.get_pack(pack1_sha)
  222. self.assertEquals(pack1_sha, p.name())
  223. pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')
  224. class BaseTestPackIndexWriting(object):
  225. def setUp(self):
  226. self.tempdir = tempfile.mkdtemp()
  227. def tearDown(self):
  228. shutil.rmtree(self.tempdir)
  229. def assertSucceeds(self, func, *args, **kwargs):
  230. try:
  231. func(*args, **kwargs)
  232. except ChecksumMismatch, e:
  233. self.fail(e)
  234. def test_empty(self):
  235. filename = os.path.join(self.tempdir, 'empty.idx')
  236. self._write_fn(filename, [], pack_checksum)
  237. idx = load_pack_index(filename)
  238. self.assertSucceeds(idx.check)
  239. self.assertEquals(idx.get_pack_checksum(), pack_checksum)
  240. self.assertEquals(0, len(idx))
  241. def test_single(self):
  242. entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
  243. my_entries = [(entry_sha, 178, 42)]
  244. filename = os.path.join(self.tempdir, 'single.idx')
  245. self._write_fn(filename, my_entries, pack_checksum)
  246. idx = load_pack_index(filename)
  247. self.assertEquals(idx.version, self._expected_version)
  248. self.assertSucceeds(idx.check)
  249. self.assertEquals(idx.get_pack_checksum(), pack_checksum)
  250. self.assertEquals(1, len(idx))
  251. actual_entries = list(idx.iterentries())
  252. self.assertEquals(len(my_entries), len(actual_entries))
  253. for mine, actual in zip(my_entries, actual_entries):
  254. my_sha, my_offset, my_crc = mine
  255. actual_sha, actual_offset, actual_crc = actual
  256. self.assertEquals(my_sha, actual_sha)
  257. self.assertEquals(my_offset, actual_offset)
  258. if self._has_crc32_checksum:
  259. self.assertEquals(my_crc, actual_crc)
  260. else:
  261. self.assertTrue(actual_crc is None)
  262. class TestPackIndexWritingv1(TestCase, BaseTestPackIndexWriting):
  263. def setUp(self):
  264. TestCase.setUp(self)
  265. BaseTestPackIndexWriting.setUp(self)
  266. self._has_crc32_checksum = False
  267. self._expected_version = 1
  268. self._write_fn = write_pack_index_v1
  269. def tearDown(self):
  270. TestCase.tearDown(self)
  271. BaseTestPackIndexWriting.tearDown(self)
  272. class TestPackIndexWritingv2(TestCase, BaseTestPackIndexWriting):
  273. def setUp(self):
  274. TestCase.setUp(self)
  275. BaseTestPackIndexWriting.setUp(self)
  276. self._has_crc32_checksum = True
  277. self._expected_version = 2
  278. self._write_fn = write_pack_index_v2
  279. def tearDown(self):
  280. TestCase.tearDown(self)
  281. BaseTestPackIndexWriting.tearDown(self)
  282. class ReadZlibTests(TestCase):
  283. decomp = (
  284. 'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
  285. 'parent None\n'
  286. 'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
  287. 'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
  288. '\n'
  289. "Provide replacement for mmap()'s offset argument.")
  290. comp = zlib.compress(decomp)
  291. extra = 'nextobject'
  292. def setUp(self):
  293. super(ReadZlibTests, self).setUp()
  294. self.read = StringIO(self.comp + self.extra).read
  295. def test_decompress_size(self):
  296. good_decomp_len = len(self.decomp)
  297. self.assertRaises(ValueError, read_zlib_chunks, self.read, -1)
  298. self.assertRaises(zlib.error, read_zlib_chunks, self.read,
  299. good_decomp_len - 1)
  300. self.assertRaises(zlib.error, read_zlib_chunks, self.read,
  301. good_decomp_len + 1)
  302. def test_decompress_truncated(self):
  303. read = StringIO(self.comp[:10]).read
  304. self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
  305. read = StringIO(self.comp).read
  306. self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
  307. def test_decompress_empty(self):
  308. comp = zlib.compress('')
  309. read = StringIO(comp + self.extra).read
  310. decomp, comp_len, unused_data = read_zlib_chunks(read, 0)
  311. self.assertEqual('', ''.join(decomp))
  312. self.assertEqual(len(comp), comp_len)
  313. self.assertNotEquals('', unused_data)
  314. self.assertEquals(self.extra, unused_data + read())
  315. def _do_decompress_test(self, buffer_size):
  316. decomp, comp_len, unused_data = read_zlib_chunks(
  317. self.read, len(self.decomp), buffer_size=buffer_size)
  318. self.assertEquals(self.decomp, ''.join(decomp))
  319. self.assertEquals(len(self.comp), comp_len)
  320. self.assertNotEquals('', unused_data)
  321. self.assertEquals(self.extra, unused_data + self.read())
  322. def test_simple_decompress(self):
  323. self._do_decompress_test(4096)
  324. # These buffer sizes are not intended to be realistic, but rather simulate
  325. # larger buffer sizes that may end at various places.
  326. def test_decompress_buffer_size_1(self):
  327. self._do_decompress_test(1)
  328. def test_decompress_buffer_size_2(self):
  329. self._do_decompress_test(2)
  330. def test_decompress_buffer_size_3(self):
  331. self._do_decompress_test(3)
  332. def test_decompress_buffer_size_4(self):
  333. self._do_decompress_test(4)