123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- # test_pack.py -- Tests for the handling of git packs.
- # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
- # Copyright (C) 2008 Jelmer Vernooij <jelmer@samba.org>
- #
- # This program is free software; you can redistribute it and/or
- # modify it under the terms of the GNU General Public License
- # as published by the Free Software Foundation; version 2
- # of the License, or (at your option) any later version of the license.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program; if not, write to the Free Software
- # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
- # MA 02110-1301, USA.
- """Tests for Dulwich packs."""
- from cStringIO import StringIO
- import os
- import shutil
- import tempfile
- import unittest
- import zlib
- from dulwich.errors import (
- ChecksumMismatch,
- )
- from dulwich.objects import (
- hex_to_sha,
- sha_to_hex,
- Tree,
- )
- from dulwich.pack import (
- Pack,
- PackData,
- apply_delta,
- create_delta,
- load_pack_index,
- hex_to_sha,
- read_zlib_chunks,
- sha_to_hex,
- write_pack_index_v1,
- write_pack_index_v2,
- write_pack,
- )
- pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
- a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
- tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
- commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
- class PackTests(unittest.TestCase):
- """Base class for testing packs"""
- def setUp(self):
- self.tempdir = tempfile.mkdtemp()
- def tearDown(self):
- shutil.rmtree(self.tempdir)
- datadir = os.path.join(os.path.dirname(__file__), 'data/packs')
- def get_pack_index(self, sha):
- """Returns a PackIndex from the datadir with the given sha"""
- return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
- def get_pack_data(self, sha):
- """Returns a PackData object from the datadir with the given sha"""
- return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
- def get_pack(self, sha):
- return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
- def assertSucceeds(self, func, *args, **kwargs):
- try:
- func(*args, **kwargs)
- except ChecksumMismatch, e:
- self.fail(e)
- class PackIndexTests(PackTests):
- """Class that tests the index of packfiles"""
- def test_object_index(self):
- """Tests that the correct object offset is returned from the index."""
- p = self.get_pack_index(pack1_sha)
- self.assertRaises(KeyError, p.object_index, pack1_sha)
- self.assertEqual(p.object_index(a_sha), 178)
- self.assertEqual(p.object_index(tree_sha), 138)
- self.assertEqual(p.object_index(commit_sha), 12)
- def test_index_len(self):
- p = self.get_pack_index(pack1_sha)
- self.assertEquals(3, len(p))
- def test_get_stored_checksum(self):
- p = self.get_pack_index(pack1_sha)
- self.assertEquals('f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
- sha_to_hex(p.get_stored_checksum()))
- self.assertEquals('721980e866af9a5f93ad674144e1459b8ba3e7b7',
- sha_to_hex(p.get_pack_checksum()))
- def test_index_check(self):
- p = self.get_pack_index(pack1_sha)
- self.assertSucceeds(p.check)
- def test_iterentries(self):
- p = self.get_pack_index(pack1_sha)
- entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
- self.assertEquals([
- ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
- ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
- ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
- ], entries)
- def test_iter(self):
- p = self.get_pack_index(pack1_sha)
- self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
- class TestPackDeltas(unittest.TestCase):
- test_string1 = 'The answer was flailing in the wind'
- test_string2 = 'The answer was falling down the pipe'
- test_string3 = 'zzzzz'
- test_string_empty = ''
- test_string_big = 'Z' * 8192
- def _test_roundtrip(self, base, target):
- self.assertEquals([target],
- apply_delta(base, create_delta(base, target)))
- def test_nochange(self):
- self._test_roundtrip(self.test_string1, self.test_string1)
- def test_change(self):
- self._test_roundtrip(self.test_string1, self.test_string2)
- def test_rewrite(self):
- self._test_roundtrip(self.test_string1, self.test_string3)
- def test_overflow(self):
- self._test_roundtrip(self.test_string_empty, self.test_string_big)
- class TestPackData(PackTests):
- """Tests getting the data from the packfile."""
- def test_create_pack(self):
- p = self.get_pack_data(pack1_sha)
- def test_pack_len(self):
- p = self.get_pack_data(pack1_sha)
- self.assertEquals(3, len(p))
- def test_index_check(self):
- p = self.get_pack_data(pack1_sha)
- self.assertSucceeds(p.check)
- def test_iterobjects(self):
- p = self.get_pack_data(pack1_sha)
- commit_data = ('tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
- 'author James Westby <jw+debian@jameswestby.net> '
- '1174945067 +0100\n'
- 'committer James Westby <jw+debian@jameswestby.net> '
- '1174945067 +0100\n'
- '\n'
- 'Test commit\n')
- blob_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
- tree_data = '100644 a\0%s' % hex_to_sha(blob_sha)
- actual = []
- for offset, type, chunks, crc32 in p.iterobjects():
- actual.append((offset, type, ''.join(chunks), crc32))
- self.assertEquals([
- (12, 1, commit_data, 3775879613L),
- (138, 2, tree_data, 912998690L),
- (178, 3, 'test 1\n', 1373561701L)
- ], actual)
- def test_iterentries(self):
- p = self.get_pack_data(pack1_sha)
- entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries())
- self.assertEquals(set([
- ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701L),
- ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690L),
- ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613L),
- ]), entries)
- def test_create_index_v1(self):
- p = self.get_pack_data(pack1_sha)
- filename = os.path.join(self.tempdir, 'v1test.idx')
- p.create_index_v1(filename)
- idx1 = load_pack_index(filename)
- idx2 = self.get_pack_index(pack1_sha)
- self.assertEquals(idx1, idx2)
- def test_create_index_v2(self):
- p = self.get_pack_data(pack1_sha)
- filename = os.path.join(self.tempdir, 'v2test.idx')
- p.create_index_v2(filename)
- idx1 = load_pack_index(filename)
- idx2 = self.get_pack_index(pack1_sha)
- self.assertEquals(idx1, idx2)
- class TestPack(PackTests):
- def test_len(self):
- p = self.get_pack(pack1_sha)
- self.assertEquals(3, len(p))
- def test_contains(self):
- p = self.get_pack(pack1_sha)
- self.assertTrue(tree_sha in p)
- def test_get(self):
- p = self.get_pack(pack1_sha)
- self.assertEquals(type(p[tree_sha]), Tree)
- def test_iter(self):
- p = self.get_pack(pack1_sha)
- self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
- def test_get_object_at(self):
- """Tests random access for non-delta objects"""
- p = self.get_pack(pack1_sha)
- obj = p[a_sha]
- self.assertEqual(obj.type_name, 'blob')
- self.assertEqual(obj.sha().hexdigest(), a_sha)
- obj = p[tree_sha]
- self.assertEqual(obj.type_name, 'tree')
- self.assertEqual(obj.sha().hexdigest(), tree_sha)
- obj = p[commit_sha]
- self.assertEqual(obj.type_name, 'commit')
- self.assertEqual(obj.sha().hexdigest(), commit_sha)
- def test_copy(self):
- origpack = self.get_pack(pack1_sha)
- self.assertSucceeds(origpack.index.check)
- basename = os.path.join(self.tempdir, 'Elch')
- write_pack(basename, [(x, '') for x in origpack.iterobjects()],
- len(origpack))
- newpack = Pack(basename)
- self.assertEquals(origpack, newpack)
- self.assertSucceeds(newpack.index.check)
- self.assertEquals(origpack.name(), newpack.name())
- self.assertEquals(origpack.index.get_pack_checksum(),
- newpack.index.get_pack_checksum())
- wrong_version = origpack.index.version != newpack.index.version
- orig_checksum = origpack.index.get_stored_checksum()
- new_checksum = newpack.index.get_stored_checksum()
- self.assertTrue(wrong_version or orig_checksum == new_checksum)
- def test_commit_obj(self):
- p = self.get_pack(pack1_sha)
- commit = p[commit_sha]
- self.assertEquals('James Westby <jw+debian@jameswestby.net>',
- commit.author)
- self.assertEquals([], commit.parents)
- def test_name(self):
- p = self.get_pack(pack1_sha)
- self.assertEquals(pack1_sha, p.name())
- pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')
- class BaseTestPackIndexWriting(object):
- def setUp(self):
- self.tempdir = tempfile.mkdtemp()
- def tearDown(self):
- shutil.rmtree(self.tempdir)
- def assertSucceeds(self, func, *args, **kwargs):
- try:
- func(*args, **kwargs)
- except ChecksumMismatch, e:
- self.fail(e)
- def test_empty(self):
- filename = os.path.join(self.tempdir, 'empty.idx')
- self._write_fn(filename, [], pack_checksum)
- idx = load_pack_index(filename)
- self.assertSucceeds(idx.check)
- self.assertEquals(idx.get_pack_checksum(), pack_checksum)
- self.assertEquals(0, len(idx))
- def test_single(self):
- entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
- my_entries = [(entry_sha, 178, 42)]
- filename = os.path.join(self.tempdir, 'single.idx')
- self._write_fn(filename, my_entries, pack_checksum)
- idx = load_pack_index(filename)
- self.assertEquals(idx.version, self._expected_version)
- self.assertSucceeds(idx.check)
- self.assertEquals(idx.get_pack_checksum(), pack_checksum)
- self.assertEquals(1, len(idx))
- actual_entries = list(idx.iterentries())
- self.assertEquals(len(my_entries), len(actual_entries))
- for mine, actual in zip(my_entries, actual_entries):
- my_sha, my_offset, my_crc = mine
- actual_sha, actual_offset, actual_crc = actual
- self.assertEquals(my_sha, actual_sha)
- self.assertEquals(my_offset, actual_offset)
- if self._has_crc32_checksum:
- self.assertEquals(my_crc, actual_crc)
- else:
- self.assertTrue(actual_crc is None)
- class TestPackIndexWritingv1(unittest.TestCase, BaseTestPackIndexWriting):
- def setUp(self):
- unittest.TestCase.setUp(self)
- BaseTestPackIndexWriting.setUp(self)
- self._has_crc32_checksum = False
- self._expected_version = 1
- self._write_fn = write_pack_index_v1
- def tearDown(self):
- unittest.TestCase.tearDown(self)
- BaseTestPackIndexWriting.tearDown(self)
- class TestPackIndexWritingv2(unittest.TestCase, BaseTestPackIndexWriting):
- def setUp(self):
- unittest.TestCase.setUp(self)
- BaseTestPackIndexWriting.setUp(self)
- self._has_crc32_checksum = True
- self._expected_version = 2
- self._write_fn = write_pack_index_v2
- def tearDown(self):
- unittest.TestCase.tearDown(self)
- BaseTestPackIndexWriting.tearDown(self)
- class ReadZlibTests(unittest.TestCase):
- decomp = (
- 'tree 4ada885c9196b6b6fa08744b5862bf92896fc002\n'
- 'parent None\n'
- 'author Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
- 'committer Jelmer Vernooij <jelmer@samba.org> 1228980214 +0000\n'
- '\n'
- "Provide replacement for mmap()'s offset argument.")
- comp = zlib.compress(decomp)
- extra = 'nextobject'
- def setUp(self):
- self.read = StringIO(self.comp + self.extra).read
- def test_decompress_size(self):
- good_decomp_len = len(self.decomp)
- self.assertRaises(ValueError, read_zlib_chunks, self.read, -1)
- self.assertRaises(zlib.error, read_zlib_chunks, self.read,
- good_decomp_len - 1)
- self.assertRaises(zlib.error, read_zlib_chunks, self.read,
- good_decomp_len + 1)
- def test_decompress_truncated(self):
- read = StringIO(self.comp[:10]).read
- self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
- read = StringIO(self.comp).read
- self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
- def test_decompress_empty(self):
- comp = zlib.compress('')
- read = StringIO(comp + self.extra).read
- decomp, comp_len, unused_data = read_zlib_chunks(read, 0)
- self.assertEqual('', ''.join(decomp))
- self.assertEqual(len(comp), comp_len)
- self.assertNotEquals('', unused_data)
- self.assertEquals(self.extra, unused_data + read())
- def _do_decompress_test(self, buffer_size):
- decomp, comp_len, unused_data = read_zlib_chunks(
- self.read, len(self.decomp), buffer_size=buffer_size)
- self.assertEquals(self.decomp, ''.join(decomp))
- self.assertEquals(len(self.comp), comp_len)
- self.assertNotEquals('', unused_data)
- self.assertEquals(self.extra, unused_data + self.read())
- def test_simple_decompress(self):
- self._do_decompress_test(4096)
- # These buffer sizes are not intended to be realistic, but rather simulate
- # larger buffer sizes that may end at various places.
- def test_decompress_buffer_size_1(self):
- self._do_decompress_test(1)
- def test_decompress_buffer_size_2(self):
- self._do_decompress_test(2)
- def test_decompress_buffer_size_3(self):
- self._do_decompress_test(3)
- def test_decompress_buffer_size_4(self):
- self._do_decompress_test(4)
|