test_pack.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. # test_pack.py -- Tests for the handling of git packs.
  2. # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
  3. # Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
  4. #
  5. # This program is free software; you can redistribute it and/or
  6. # modify it under the terms of the GNU General Public License
  7. # as published by the Free Software Foundation; version 2
  8. # of the License, or (at your option) any later version of the license.
  9. #
  10. # This program is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with this program; if not, write to the Free Software
  17. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  18. # MA 02110-1301, USA.
  19. """Tests for Dulwich packs."""
  20. from cStringIO import StringIO
  21. import os
  22. import unittest
  23. from dulwich.objects import (
  24. Tree,
  25. )
  26. from dulwich.pack import (
  27. Pack,
  28. PackData,
  29. apply_delta,
  30. create_delta,
  31. load_pack_index,
  32. hex_to_sha,
  33. read_zlib,
  34. sha_to_hex,
  35. write_pack_index_v1,
  36. write_pack_index_v2,
  37. write_pack,
  38. )
  39. pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
  40. a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
  41. tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
  42. commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
  43. class PackTests(unittest.TestCase):
  44. """Base class for testing packs"""
  45. datadir = os.path.join(os.path.dirname(__file__), 'data/packs')
  46. def get_pack_index(self, sha):
  47. """Returns a PackIndex from the datadir with the given sha"""
  48. return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
  49. def get_pack_data(self, sha):
  50. """Returns a PackData object from the datadir with the given sha"""
  51. return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
  52. def get_pack(self, sha):
  53. return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
  54. class PackIndexTests(PackTests):
  55. """Class that tests the index of packfiles"""
  56. def test_object_index(self):
  57. """Tests that the correct object offset is returned from the index."""
  58. p = self.get_pack_index(pack1_sha)
  59. self.assertRaises(KeyError, p.object_index, pack1_sha)
  60. self.assertEqual(p.object_index(a_sha), 178)
  61. self.assertEqual(p.object_index(tree_sha), 138)
  62. self.assertEqual(p.object_index(commit_sha), 12)
  63. def test_index_len(self):
  64. p = self.get_pack_index(pack1_sha)
  65. self.assertEquals(3, len(p))
  66. def test_get_stored_checksum(self):
  67. p = self.get_pack_index(pack1_sha)
  68. self.assertEquals("\xf2\x84\x8e*\xd1o2\x9a\xe1\xc9.;\x95\xe9\x18\x88\xda\xa5\xbd\x01", str(p.get_stored_checksum()))
  69. self.assertEquals( 'r\x19\x80\xe8f\xaf\x9a_\x93\xadgAD\xe1E\x9b\x8b\xa3\xe7\xb7' , str(p.get_pack_checksum()))
  70. def test_index_check(self):
  71. p = self.get_pack_index(pack1_sha)
  72. self.assertEquals(True, p.check())
  73. def test_iterentries(self):
  74. p = self.get_pack_index(pack1_sha)
  75. self.assertEquals([('og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 178, None), ('\xb2\xa2vj(y\xc2\t\xab\x11v\xe7\xe7x\xb8\x1a\xe4"\xee\xaa', 138, None), ('\xf1\x8f\xaa\x16S\x1a\xc5p\xa3\xfd\xc8\xc7\xca\x16h%H\xda\xfd\x12', 12, None)], list(p.iterentries()))
  76. def test_iter(self):
  77. p = self.get_pack_index(pack1_sha)
  78. self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
  79. class TestPackDeltas(unittest.TestCase):
  80. test_string1 = "The answer was flailing in the wind"
  81. test_string2 = "The answer was falling down the pipe"
  82. test_string3 = "zzzzz"
  83. test_string_empty = ""
  84. test_string_big = "Z" * 8192
  85. def _test_roundtrip(self, base, target):
  86. self.assertEquals(target,
  87. apply_delta(base, create_delta(base, target)))
  88. def test_nochange(self):
  89. self._test_roundtrip(self.test_string1, self.test_string1)
  90. def test_change(self):
  91. self._test_roundtrip(self.test_string1, self.test_string2)
  92. def test_rewrite(self):
  93. self._test_roundtrip(self.test_string1, self.test_string3)
  94. def test_overflow(self):
  95. self._test_roundtrip(self.test_string_empty, self.test_string_big)
  96. class TestPackData(PackTests):
  97. """Tests getting the data from the packfile."""
  98. def test_create_pack(self):
  99. p = self.get_pack_data(pack1_sha)
  100. def test_pack_len(self):
  101. p = self.get_pack_data(pack1_sha)
  102. self.assertEquals(3, len(p))
  103. def test_index_check(self):
  104. p = self.get_pack_data(pack1_sha)
  105. self.assertEquals(True, p.check())
  106. def test_iterobjects(self):
  107. p = self.get_pack_data(pack1_sha)
  108. self.assertEquals([(12, 1, 'tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\nauthor James Westby <jw+debian@jameswestby.net> 1174945067 +0100\ncommitter James Westby <jw+debian@jameswestby.net> 1174945067 +0100\n\nTest commit\n', 3775879613L), (138, 2, '100644 a\x00og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 912998690L), (178, 3, 'test 1\n', 1373561701L)], list(p.iterobjects()))
  109. def test_iterentries(self):
  110. p = self.get_pack_data(pack1_sha)
  111. self.assertEquals(set([('og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 178, 1373561701L), ('\xb2\xa2vj(y\xc2\t\xab\x11v\xe7\xe7x\xb8\x1a\xe4"\xee\xaa', 138, 912998690L), ('\xf1\x8f\xaa\x16S\x1a\xc5p\xa3\xfd\xc8\xc7\xca\x16h%H\xda\xfd\x12', 12, 3775879613L)]), set(p.iterentries()))
  112. def test_create_index_v1(self):
  113. p = self.get_pack_data(pack1_sha)
  114. p.create_index_v1("v1test.idx")
  115. idx1 = load_pack_index("v1test.idx")
  116. idx2 = self.get_pack_index(pack1_sha)
  117. self.assertEquals(idx1, idx2)
  118. def test_create_index_v2(self):
  119. p = self.get_pack_data(pack1_sha)
  120. p.create_index_v2("v2test.idx")
  121. idx1 = load_pack_index("v2test.idx")
  122. idx2 = self.get_pack_index(pack1_sha)
  123. self.assertEquals(idx1, idx2)
  124. class TestPack(PackTests):
  125. def test_len(self):
  126. p = self.get_pack(pack1_sha)
  127. self.assertEquals(3, len(p))
  128. def test_contains(self):
  129. p = self.get_pack(pack1_sha)
  130. self.assertTrue(tree_sha in p)
  131. def test_get(self):
  132. p = self.get_pack(pack1_sha)
  133. self.assertEquals(type(p[tree_sha]), Tree)
  134. def test_iter(self):
  135. p = self.get_pack(pack1_sha)
  136. self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
  137. def test_get_object_at(self):
  138. """Tests random access for non-delta objects"""
  139. p = self.get_pack(pack1_sha)
  140. obj = p[a_sha]
  141. self.assertEqual(obj._type, 'blob')
  142. self.assertEqual(obj.sha().hexdigest(), a_sha)
  143. obj = p[tree_sha]
  144. self.assertEqual(obj._type, 'tree')
  145. self.assertEqual(obj.sha().hexdigest(), tree_sha)
  146. obj = p[commit_sha]
  147. self.assertEqual(obj._type, 'commit')
  148. self.assertEqual(obj.sha().hexdigest(), commit_sha)
  149. def test_copy(self):
  150. origpack = self.get_pack(pack1_sha)
  151. self.assertEquals(True, origpack.index.check())
  152. write_pack("Elch", [(x, "") for x in origpack.iterobjects()],
  153. len(origpack))
  154. newpack = Pack("Elch")
  155. self.assertEquals(origpack, newpack)
  156. self.assertEquals(True, newpack.index.check())
  157. self.assertEquals(origpack.name(), newpack.name())
  158. self.assertEquals(origpack.index.get_pack_checksum(),
  159. newpack.index.get_pack_checksum())
  160. self.assertTrue(
  161. (origpack.index.version != newpack.index.version) or
  162. (origpack.index.get_stored_checksum() == newpack.index.get_stored_checksum()))
  163. def test_commit_obj(self):
  164. p = self.get_pack(pack1_sha)
  165. commit = p[commit_sha]
  166. self.assertEquals("James Westby <jw+debian@jameswestby.net>",
  167. commit.author)
  168. self.assertEquals([], commit.parents)
  169. def test_name(self):
  170. p = self.get_pack(pack1_sha)
  171. self.assertEquals(pack1_sha, p.name())
  172. class TestHexToSha(unittest.TestCase):
  173. def test_simple(self):
  174. self.assertEquals('\xab\xcd' * 10, hex_to_sha("abcd" * 10))
  175. def test_reverse(self):
  176. self.assertEquals("abcd" * 10, sha_to_hex('\xab\xcd' * 10))
  177. class BaseTestPackIndexWriting(object):
  178. def test_empty(self):
  179. pack_checksum = 'r\x19\x80\xe8f\xaf\x9a_\x93\xadgAD\xe1E\x9b\x8b\xa3\xe7\xb7'
  180. self._write_fn("empty.idx", [], pack_checksum)
  181. idx = load_pack_index("empty.idx")
  182. self.assertTrue(idx.check())
  183. self.assertEquals(idx.get_pack_checksum(), pack_checksum)
  184. self.assertEquals(0, len(idx))
  185. def test_single(self):
  186. pack_checksum = 'r\x19\x80\xe8f\xaf\x9a_\x93\xadgAD\xe1E\x9b\x8b\xa3\xe7\xb7'
  187. my_entries = [('og\x0c\x0f\xb5?\x94cv\x0br\x95\xfb\xb8\x14\xe9e\xfb \xc8', 178, 42)]
  188. my_entries.sort()
  189. self._write_fn("single.idx", my_entries, pack_checksum)
  190. idx = load_pack_index("single.idx")
  191. self.assertEquals(idx.version, self._expected_version)
  192. self.assertTrue(idx.check())
  193. self.assertEquals(idx.get_pack_checksum(), pack_checksum)
  194. self.assertEquals(1, len(idx))
  195. actual_entries = list(idx.iterentries())
  196. self.assertEquals(len(my_entries), len(actual_entries))
  197. for a, b in zip(my_entries, actual_entries):
  198. self.assertEquals(a[0], b[0])
  199. self.assertEquals(a[1], b[1])
  200. if self._has_crc32_checksum:
  201. self.assertEquals(a[2], b[2])
  202. else:
  203. self.assertTrue(b[2] is None)
  204. class TestPackIndexWritingv1(unittest.TestCase, BaseTestPackIndexWriting):
  205. def setUp(self):
  206. unittest.TestCase.setUp(self)
  207. self._has_crc32_checksum = False
  208. self._expected_version = 1
  209. self._write_fn = write_pack_index_v1
  210. class TestPackIndexWritingv2(unittest.TestCase, BaseTestPackIndexWriting):
  211. def setUp(self):
  212. unittest.TestCase.setUp(self)
  213. self._has_crc32_checksum = True
  214. self._expected_version = 2
  215. self._write_fn = write_pack_index_v2
  216. TEST_COMP1 = """\x78\x9c\x9d\x8e\xc1\x0a\xc2\x30\x10\x44\xef\xf9\x8a\xbd\xa9\x08\x92\x86\xb4\x26\x20\xe2\xd9\x83\x78\xf2\xbe\x49\x37\xb5\xa5\x69\xca\x36\xf5\xfb\x4d\xfd\x04\x67\x6e\x33\xcc\xf0\x32\x13\x81\xc6\x16\x8d\xa9\xbd\xad\x6c\xe3\x8a\x03\x4a\x73\xd6\xda\xd5\xa6\x51\x2e\x58\x65\x6c\x13\xbc\x94\x4a\xcc\xc8\x34\x65\x78\xa4\x89\x04\xae\xf9\x9d\x18\xee\x34\x46\x62\x78\x11\x4f\x29\xf5\x03\x5c\x86\x5f\x70\x5b\x30\x3a\x3c\x25\xee\xae\x50\xa9\xf2\x60\xa4\xaa\x34\x1c\x65\x91\xf0\x29\xc6\x3e\x67\xfa\x6f\x2d\x9e\x9c\x3e\x7d\x4b\xc0\x34\x8f\xe8\x29\x6e\x48\xa1\xa0\xc4\x88\xf3\xfe\xb0\x5b\x20\x85\xb0\x50\x06\xe4\x6e\xdd\xca\xd3\x17\x26\xfa\x49\x23"""
  217. class ZlibTests(unittest.TestCase):
  218. def test_simple_decompress(self):
  219. self.assertEquals(("tree 4ada885c9196b6b6fa08744b5862bf92896fc002\nparent None\nauthor Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\ncommitter Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n\nProvide replacement for mmap()'s offset argument.", 158),
  220. read_zlib(StringIO(TEST_COMP1).read, 229))