test_pack.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. # test_pack.py -- Tests for the handling of git packs.
  2. # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
  3. # Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU General Public License
  7. # as published by the Free Software Foundation; version 2
  8. # of the License, or (at your option) any later version of the license.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  18. # MA 02110-1301, USA.
  19. """Tests for Dulwich packs."""
  20. from cStringIO import StringIO
  21. import os
  22. import shutil
  23. import tempfile
  24. import unittest
  25. import zlib
  26. from dulwich.errors import (
  27. ChecksumMismatch,
  28. )
  29. from dulwich.objects import (
  30. hex_to_sha,
  31. sha_to_hex,
  32. Tree,
  33. )
  34. from dulwich.pack import (
  35. Pack,
  36. PackData,
  37. apply_delta,
  38. create_delta,
  39. load_pack_index,
  40. hex_to_sha,
  41. read_zlib_chunks,
  42. sha_to_hex,
  43. write_pack_index_v1,
  44. write_pack_index_v2,
  45. write_pack,
  46. )
  47. pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
  48. a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
  49. tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
  50. commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
  51. class PackTests(unittest.TestCase):
  52. """Base class for testing packs"""
  53. def setUp(self):
  54. self.tempdir = tempfile.mkdtemp()
  55. def tearDown(self):
  56. shutil.rmtree(self.tempdir)
  57. datadir = os.path.join(os.path.dirname(__file__), 'data/packs')
  58. def get_pack_index(self, sha):
  59. """Returns a PackIndex from the datadir with the given sha"""
  60. return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
  61. def get_pack_data(self, sha):
  62. """Returns a PackData object from the datadir with the given sha"""
  63. return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
  64. def get_pack(self, sha):
  65. return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
  66. def assertSucceeds(self, func, *args, **kwargs):
  67. try:
  68. func(*args, **kwargs)
  69. except ChecksumMismatch, e:
  70. self.fail(e)
  71. class PackIndexTests(PackTests):
  72. """Class that tests the index of packfiles"""
  73. def test_object_index(self):
  74. """Tests that the correct object offset is returned from the index."""
  75. p = self.get_pack_index(pack1_sha)
  76. self.assertRaises(KeyError, p.object_index, pack1_sha)
  77. self.assertEqual(p.object_index(a_sha), 178)
  78. self.assertEqual(p.object_index(tree_sha), 138)
  79. self.assertEqual(p.object_index(commit_sha), 12)
  80. def test_index_len(self):
  81. p = self.get_pack_index(pack1_sha)
  82. self.assertEquals(3, len(p))
  83. def test_get_stored_checksum(self):
  84. p = self.get_pack_index(pack1_sha)
  85. self.assertEquals('f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
  86. sha_to_hex(p.get_stored_checksum()))
  87. self.assertEquals('721980e866af9a5f93ad674144e1459b8ba3e7b7',
  88. sha_to_hex(p.get_pack_checksum()))
  89. def test_index_check(self):
  90. p = self.get_pack_index(pack1_sha)
  91. self.assertSucceeds(p.check)
  92. def test_iterentries(self):
  93. p = self.get_pack_index(pack1_sha)
  94. entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
  95. self.assertEquals([
  96. ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
  97. ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
  98. ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
  99. ], entries)
  100. def test_iter(self):
  101. p = self.get_pack_index(pack1_sha)
  102. self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
  103. class TestPackDeltas(unittest.TestCase):
  104. test_string1 = 'The answer was flailing in the wind'
  105. test_string2 = 'The answer was falling down the pipe'
  106. test_string3 = 'zzzzz'
  107. test_string_empty = ''
  108. test_string_big = 'Z' * 8192
  109. def _test_roundtrip(self, base, target):
  110. self.assertEquals([target],
  111. apply_delta(base, create_delta(base, target)))
  112. def test_nochange(self):
  113. self._test_roundtrip(self.test_string1, self.test_string1)
  114. def test_change(self):
  115. self._test_roundtrip(self.test_string1, self.test_string2)
  116. def test_rewrite(self):
  117. self._test_roundtrip(self.test_string1, self.test_string3)
  118. def test_overflow(self):
  119. self._test_roundtrip(self.test_string_empty, self.test_string_big)
  120. class TestPackData(PackTests):
  121. """Tests getting the data from the packfile."""
  122. def test_create_pack(self):
  123. p = self.get_pack_data(pack1_sha)
  124. def test_pack_len(self):
  125. p = self.get_pack_data(pack1_sha)
  126. self.assertEquals(3, len(p))
  127. def test_index_check(self):
  128. p = self.get_pack_data(pack1_sha)
  129. self.assertSucceeds(p.check)
  130. def test_iterobjects(self):
  131. p = self.get_pack_data(pack1_sha)
  132. commit_data = ('tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
  133. 'author James Westby <jw+debian@jameswestby.net> '
  134. '1174945067 +0100\n'
  135. 'committer James Westby <jw+debian@jameswestby.net> '
  136. '1174945067 +0100\n'
  137. '\n'
  138. 'Test commit\n')
  139. blob_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
  140. tree_data = '100644 a\0%s' % hex_to_sha(blob_sha)
  141. actual = []
  142. for offset, type, chunks, crc32 in p.iterobjects():
  143. actual.append((offset, type, ''.join(chunks), crc32))
  144. self.assertEquals([
  145. (12, 1, commit_data, 3775879613L),
  146. (138, 2, tree_data, 912998690L),
  147. (178, 3, 'test 1\n', 1373561701L)
  148. ], actual)
  149. def test_iterentries(self):
  150. p = self.get_pack_data(pack1_sha)
  151. entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries())
  152. self.assertEquals(set([
  153. ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701L),
  154. ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690L),
  155. ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613L),
  156. ]), entries)
  157. def test_create_index_v1(self):
  158. p = self.get_pack_data(pack1_sha)
  159. filename = os.path.join(self.tempdir, 'v1test.idx')
  160. p.create_index_v1(filename)
  161. idx1 = load_pack_index(filename)
  162. idx2 = self.get_pack_index(pack1_sha)
  163. self.assertEquals(idx1, idx2)
  164. def test_create_index_v2(self):
  165. p = self.get_pack_data(pack1_sha)
  166. filename = os.path.join(self.tempdir, 'v2test.idx')
  167. p.create_index_v2(filename)
  168. idx1 = load_pack_index(filename)
  169. idx2 = self.get_pack_index(pack1_sha)
  170. self.assertEquals(idx1, idx2)
  171. class TestPack(PackTests):
  172. def test_len(self):
  173. p = self.get_pack(pack1_sha)
  174. self.assertEquals(3, len(p))
  175. def test_contains(self):
  176. p = self.get_pack(pack1_sha)
  177. self.assertTrue(tree_sha in p)
  178. def test_get(self):
  179. p = self.get_pack(pack1_sha)
  180. self.assertEquals(type(p[tree_sha]), Tree)
  181. def test_iter(self):
  182. p = self.get_pack(pack1_sha)
  183. self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
  184. def test_get_object_at(self):
  185. """Tests random access for non-delta objects"""
  186. p = self.get_pack(pack1_sha)
  187. obj = p[a_sha]
  188. self.assertEqual(obj.type_name, 'blob')
  189. self.assertEqual(obj.sha().hexdigest(), a_sha)
  190. obj = p[tree_sha]
  191. self.assertEqual(obj.type_name, 'tree')
  192. self.assertEqual(obj.sha().hexdigest(), tree_sha)
  193. obj = p[commit_sha]
  194. self.assertEqual(obj.type_name, 'commit')
  195. self.assertEqual(obj.sha().hexdigest(), commit_sha)
  196. def test_copy(self):
  197. origpack = self.get_pack(pack1_sha)
  198. self.assertSucceeds(origpack.index.check)
  199. basename = os.path.join(self.tempdir, 'Elch')
  200. write_pack(basename, [(x, '') for x in origpack.iterobjects()],
  201. len(origpack))
  202. newpack = Pack(basename)
  203. self.assertEquals(origpack, newpack)
  204. self.assertSucceeds(newpack.index.check)
  205. self.assertEquals(origpack.name(), newpack.name())
  206. self.assertEquals(origpack.index.get_pack_checksum(),
  207. newpack.index.get_pack_checksum())
  208. wrong_version = origpack.index.version != newpack.index.version
  209. orig_checksum = origpack.index.get_stored_checksum()
  210. new_checksum = newpack.index.get_stored_checksum()
  211. self.assertTrue(wrong_version or orig_checksum == new_checksum)
  212. def test_commit_obj(self):
  213. p = self.get_pack(pack1_sha)
  214. commit = p[commit_sha]
  215. self.assertEquals('James Westby <jw+debian@jameswestby.net>',
  216. commit.author)
  217. self.assertEquals([], commit.parents)
  218. def test_name(self):
  219. p = self.get_pack(pack1_sha)
  220. self.assertEquals(pack1_sha, p.name())
  221. pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')
  222. class BaseTestPackIndexWriting(object):
  223. def setUp(self):
  224. self.tempdir = tempfile.mkdtemp()
  225. def tearDown(self):
  226. shutil.rmtree(self.tempdir)
  227. def assertSucceeds(self, func, *args, **kwargs):
  228. try:
  229. func(*args, **kwargs)
  230. except ChecksumMismatch, e:
  231. self.fail(e)
  232. def test_empty(self):
  233. filename = os.path.join(self.tempdir, 'empty.idx')
  234. self._write_fn(filename, [], pack_checksum)
  235. idx = load_pack_index(filename)
  236. self.assertSucceeds(idx.check)
  237. self.assertEquals(idx.get_pack_checksum(), pack_checksum)
  238. self.assertEquals(0, len(idx))
  239. def test_single(self):
  240. entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
  241. my_entries = [(entry_sha, 178, 42)]
  242. filename = os.path.join(self.tempdir, 'single.idx')
  243. self._write_fn(filename, my_entries, pack_checksum)
  244. idx = load_pack_index(filename)
  245. self.assertEquals(idx.version, self._expected_version)
  246. self.assertSucceeds(idx.check)
  247. self.assertEquals(idx.get_pack_checksum(), pack_checksum)
  248. self.assertEquals(1, len(idx))
  249. actual_entries = list(idx.iterentries())
  250. self.assertEquals(len(my_entries), len(actual_entries))
  251. for mine, actual in zip(my_entries, actual_entries):
  252. my_sha, my_offset, my_crc = mine
  253. actual_sha, actual_offset, actual_crc = actual
  254. self.assertEquals(my_sha, actual_sha)
  255. self.assertEquals(my_offset, actual_offset)
  256. if self._has_crc32_checksum:
  257. self.assertEquals(my_crc, actual_crc)
  258. else:
  259. self.assertTrue(actual_crc is None)
  260. class TestPackIndexWritingv1(unittest.TestCase, BaseTestPackIndexWriting):
  261. def setUp(self):
  262. unittest.TestCase.setUp(self)
  263. BaseTestPackIndexWriting.setUp(self)
  264. self._has_crc32_checksum = False
  265. self._expected_version = 1
  266. self._write_fn = write_pack_index_v1
  267. def tearDown(self):
  268. unittest.TestCase.tearDown(self)
  269. BaseTestPackIndexWriting.tearDown(self)
  270. class TestPackIndexWritingv2(unittest.TestCase, BaseTestPackIndexWriting):
  271. def setUp(self):
  272. unittest.TestCase.setUp(self)
  273. BaseTestPackIndexWriting.setUp(self)
  274. self._has_crc32_checksum = True
  275. self._expected_version = 2
  276. self._write_fn = write_pack_index_v2
  277. def tearDown(self):
  278. unittest.TestCase.tearDown(self)
  279. BaseTestPackIndexWriting.tearDown(self)
  280. class ReadZlibTests(unittest.TestCase):
  281. decomp = (
  282. 'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
  283. 'parent None\n'
  284. 'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
  285. 'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
  286. '\n'
  287. "Provide replacement for mmap()'s offset argument.")
  288. comp = zlib.compress(decomp)
  289. extra = 'nextobject'
  290. def setUp(self):
  291. self.read = StringIO(self.comp + self.extra).read
  292. def test_decompress_size(self):
  293. good_decomp_len = len(self.decomp)
  294. self.assertRaises(ValueError, read_zlib_chunks, self.read, -1)
  295. self.assertRaises(zlib.error, read_zlib_chunks, self.read,
  296. good_decomp_len - 1)
  297. self.assertRaises(zlib.error, read_zlib_chunks, self.read,
  298. good_decomp_len + 1)
  299. def test_decompress_truncated(self):
  300. read = StringIO(self.comp[:10]).read
  301. self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
  302. read = StringIO(self.comp).read
  303. self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
  304. def test_decompress_empty(self):
  305. comp = zlib.compress('')
  306. read = StringIO(comp + self.extra).read
  307. decomp, comp_len, unused_data = read_zlib_chunks(read, 0)
  308. self.assertEqual('', ''.join(decomp))
  309. self.assertEqual(len(comp), comp_len)
  310. self.assertNotEquals('', unused_data)
  311. self.assertEquals(self.extra, unused_data + read())
  312. def _do_decompress_test(self, buffer_size):
  313. decomp, comp_len, unused_data = read_zlib_chunks(
  314. self.read, len(self.decomp), buffer_size=buffer_size)
  315. self.assertEquals(self.decomp, ''.join(decomp))
  316. self.assertEquals(len(self.comp), comp_len)
  317. self.assertNotEquals('', unused_data)
  318. self.assertEquals(self.extra, unused_data + self.read())
  319. def test_simple_decompress(self):
  320. self._do_decompress_test(4096)
  321. # These buffer sizes are not intended to be realistic, but rather simulate
  322. # larger buffer sizes that may end at various places.
  323. def test_decompress_buffer_size_1(self):
  324. self._do_decompress_test(1)
  325. def test_decompress_buffer_size_2(self):
  326. self._do_decompress_test(2)
  327. def test_decompress_buffer_size_3(self):
  328. self._do_decompress_test(3)
  329. def test_decompress_buffer_size_4(self):
  330. self._do_decompress_test(4)