|
@@ -26,34 +26,57 @@ import shutil
|
|
|
import tempfile
|
|
|
import zlib
|
|
|
|
|
|
+from dulwich._compat import (
|
|
|
+ make_sha,
|
|
|
+ )
|
|
|
from dulwich.errors import (
|
|
|
ChecksumMismatch,
|
|
|
)
|
|
|
from dulwich.file import (
|
|
|
GitFile,
|
|
|
)
|
|
|
+from dulwich.object_store import (
|
|
|
+ MemoryObjectStore,
|
|
|
+ )
|
|
|
from dulwich.objects import (
|
|
|
+ Blob,
|
|
|
hex_to_sha,
|
|
|
sha_to_hex,
|
|
|
+ Commit,
|
|
|
Tree,
|
|
|
+ Blob,
|
|
|
)
|
|
|
from dulwich.pack import (
|
|
|
+ OFS_DELTA,
|
|
|
+ REF_DELTA,
|
|
|
+ DELTA_TYPES,
|
|
|
MemoryPackIndex,
|
|
|
Pack,
|
|
|
PackData,
|
|
|
- ThinPackData,
|
|
|
apply_delta,
|
|
|
create_delta,
|
|
|
+ deltify_pack_objects,
|
|
|
load_pack_index,
|
|
|
+ UnpackedObject,
|
|
|
read_zlib_chunks,
|
|
|
write_pack_header,
|
|
|
write_pack_index_v1,
|
|
|
write_pack_index_v2,
|
|
|
+ SHA1Writer,
|
|
|
+ write_pack_object,
|
|
|
write_pack,
|
|
|
+ unpack_object,
|
|
|
+ compute_file_sha,
|
|
|
+ PackStreamReader,
|
|
|
+ DeltaChainIterator,
|
|
|
)
|
|
|
from dulwich.tests import (
|
|
|
TestCase,
|
|
|
)
|
|
|
+from utils import (
|
|
|
+ make_object,
|
|
|
+ build_pack,
|
|
|
+ )
|
|
|
|
|
|
pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
|
|
|
|
|
@@ -170,21 +193,6 @@ class TestPackData(PackTests):
|
|
|
path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha)
|
|
|
PackData.from_file(open(path), os.path.getsize(path))
|
|
|
|
|
|
- # TODO: more ThinPackData tests.
|
|
|
- def test_thin_from_file(self):
|
|
|
- test_sha = '1' * 40
|
|
|
-
|
|
|
- def resolve(sha):
|
|
|
- self.assertEqual(test_sha, sha)
|
|
|
- return 3, 'data'
|
|
|
-
|
|
|
- path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha)
|
|
|
- data = ThinPackData.from_file(resolve, open(path),
|
|
|
- os.path.getsize(path))
|
|
|
- idx = self.get_pack_index(pack1_sha)
|
|
|
- Pack.from_objects(data, idx)
|
|
|
- self.assertEqual((None, 3, 'data'), data.get_ref(test_sha))
|
|
|
-
|
|
|
def test_pack_len(self):
|
|
|
p = self.get_pack_data(pack1_sha)
|
|
|
self.assertEquals(3, len(p))
|
|
@@ -238,6 +246,20 @@ class TestPackData(PackTests):
|
|
|
idx2 = self.get_pack_index(pack1_sha)
|
|
|
self.assertEquals(idx1, idx2)
|
|
|
|
|
|
+ def test_compute_file_sha(self):
|
|
|
+ f = StringIO('abcd1234wxyz')
|
|
|
+ self.assertEqual(make_sha('abcd1234wxyz').hexdigest(),
|
|
|
+ compute_file_sha(f).hexdigest())
|
|
|
+ self.assertEqual(make_sha('abcd1234wxyz').hexdigest(),
|
|
|
+ compute_file_sha(f, buffer_size=5).hexdigest())
|
|
|
+ self.assertEqual(make_sha('abcd1234').hexdigest(),
|
|
|
+ compute_file_sha(f, end_ofs=-4).hexdigest())
|
|
|
+ self.assertEqual(make_sha('1234wxyz').hexdigest(),
|
|
|
+ compute_file_sha(f, start_ofs=4).hexdigest())
|
|
|
+ self.assertEqual(
|
|
|
+ make_sha('1234').hexdigest(),
|
|
|
+ compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest())
|
|
|
+
|
|
|
|
|
|
class TestPack(PackTests):
|
|
|
|
|
@@ -257,6 +279,19 @@ class TestPack(PackTests):
|
|
|
p = self.get_pack(pack1_sha)
|
|
|
self.assertEquals(set([tree_sha, commit_sha, a_sha]), set(p))
|
|
|
|
|
|
+ def test_iterobjects(self):
|
|
|
+ p = self.get_pack(pack1_sha)
|
|
|
+ expected = set([p[s] for s in [commit_sha, tree_sha, a_sha]])
|
|
|
+ self.assertEquals(expected, set(list(p.iterobjects())))
|
|
|
+
|
|
|
+ def test_pack_tuples(self):
|
|
|
+ p = self.get_pack(pack1_sha)
|
|
|
+ tuples = p.pack_tuples()
|
|
|
+ expected = set([(p[s], None) for s in [commit_sha, tree_sha, a_sha]])
|
|
|
+ self.assertEquals(expected, set(list(tuples)))
|
|
|
+ self.assertEquals(expected, set(list(tuples)))
|
|
|
+ self.assertEquals(3, len(tuples))
|
|
|
+
|
|
|
def test_get_object_at(self):
|
|
|
"""Tests random access for non-delta objects"""
|
|
|
p = self.get_pack(pack1_sha)
|
|
@@ -276,8 +311,7 @@ class TestPack(PackTests):
|
|
|
try:
|
|
|
self.assertSucceeds(origpack.index.check)
|
|
|
basename = os.path.join(self.tempdir, 'Elch')
|
|
|
- write_pack(basename, [(x, '') for x in origpack.iterobjects()],
|
|
|
- len(origpack))
|
|
|
+ write_pack(basename, origpack.pack_tuples())
|
|
|
newpack = Pack(basename)
|
|
|
|
|
|
try:
|
|
@@ -306,8 +340,7 @@ class TestPack(PackTests):
|
|
|
|
|
|
def _copy_pack(self, origpack):
|
|
|
basename = os.path.join(self.tempdir, 'somepack')
|
|
|
- write_pack(basename, [(x, '') for x in origpack.iterobjects()],
|
|
|
- len(origpack))
|
|
|
+ write_pack(basename, origpack.pack_tuples())
|
|
|
return Pack(basename)
|
|
|
|
|
|
def test_keep_no_message(self):
|
|
@@ -347,15 +380,81 @@ class TestPack(PackTests):
|
|
|
p = self.get_pack(pack1_sha)
|
|
|
self.assertEquals(pack1_sha, p.name())
|
|
|
|
|
|
+ def test_length_mismatch(self):
|
|
|
+ data = self.get_pack_data(pack1_sha)
|
|
|
+ index = self.get_pack_index(pack1_sha)
|
|
|
+ Pack.from_objects(data, index).check_length_and_checksum()
|
|
|
+
|
|
|
+ data._file.seek(12)
|
|
|
+ bad_file = StringIO()
|
|
|
+ write_pack_header(bad_file, 9999)
|
|
|
+ bad_file.write(data._file.read())
|
|
|
+ bad_file = StringIO(bad_file.getvalue())
|
|
|
+ bad_data = PackData('', file=bad_file)
|
|
|
+ bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
|
|
|
+ self.assertRaises(AssertionError, lambda: bad_pack.data)
|
|
|
+ self.assertRaises(AssertionError,
|
|
|
+ lambda: bad_pack.check_length_and_checksum())
|
|
|
+
|
|
|
+ def test_checksum_mismatch(self):
|
|
|
+ data = self.get_pack_data(pack1_sha)
|
|
|
+ index = self.get_pack_index(pack1_sha)
|
|
|
+ Pack.from_objects(data, index).check_length_and_checksum()
|
|
|
+
|
|
|
+ data._file.seek(0)
|
|
|
+ bad_file = StringIO(data._file.read()[:-20] + ('\xff' * 20))
|
|
|
+ bad_data = PackData('', file=bad_file)
|
|
|
+ bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
|
|
|
+ self.assertRaises(ChecksumMismatch, lambda: bad_pack.data)
|
|
|
+ self.assertRaises(ChecksumMismatch, lambda:
|
|
|
+ bad_pack.check_length_and_checksum())
|
|
|
+
|
|
|
+ def test_iterobjects(self):
|
|
|
+ p = self.get_pack(pack1_sha)
|
|
|
+ objs = dict((o.id, o) for o in p.iterobjects())
|
|
|
+ self.assertEquals(3, len(objs))
|
|
|
+ self.assertEquals(sorted(objs), sorted(p.index))
|
|
|
+ self.assertTrue(isinstance(objs[a_sha], Blob))
|
|
|
+ self.assertTrue(isinstance(objs[tree_sha], Tree))
|
|
|
+ self.assertTrue(isinstance(objs[commit_sha], Commit))
|
|
|
+
|
|
|
|
|
|
-class WritePackHeaderTests(TestCase):
|
|
|
+class WritePackTests(TestCase):
|
|
|
|
|
|
- def test_simple(self):
|
|
|
+ def test_write_pack_header(self):
|
|
|
f = StringIO()
|
|
|
write_pack_header(f, 42)
|
|
|
self.assertEquals('PACK\x00\x00\x00\x02\x00\x00\x00*',
|
|
|
f.getvalue())
|
|
|
|
|
|
+ def test_write_pack_object(self):
|
|
|
+ f = StringIO()
|
|
|
+ f.write('header')
|
|
|
+ offset = f.tell()
|
|
|
+ crc32 = write_pack_object(f, Blob.type_num, 'blob')
|
|
|
+ self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xffffffff)
|
|
|
+
|
|
|
+ f.write('x') # unpack_object needs extra trailing data.
|
|
|
+ f.seek(offset)
|
|
|
+ comp_len = len(f.getvalue()) - offset - 1
|
|
|
+ unpacked, unused = unpack_object(f.read, compute_crc32=True)
|
|
|
+ self.assertEqual(Blob.type_num, unpacked.pack_type_num)
|
|
|
+ self.assertEqual(Blob.type_num, unpacked.obj_type_num)
|
|
|
+ self.assertEqual(['blob'], unpacked.decomp_chunks)
|
|
|
+ self.assertEqual(crc32, unpacked.crc32)
|
|
|
+ self.assertEqual('x', unused)
|
|
|
+
|
|
|
+ def test_write_pack_object_sha(self):
|
|
|
+ f = StringIO()
|
|
|
+ f.write('header')
|
|
|
+ offset = f.tell()
|
|
|
+ sha_a = make_sha('foo')
|
|
|
+ sha_b = sha_a.copy()
|
|
|
+ write_pack_object(f, Blob.type_num, 'blob', sha=sha_a)
|
|
|
+ self.assertNotEqual(sha_a.digest(), sha_b.digest())
|
|
|
+ sha_b.update(f.getvalue()[offset:])
|
|
|
+ self.assertEqual(sha_a.digest(), sha_b.digest())
|
|
|
+
|
|
|
|
|
|
pack_checksum = hex_to_sha('721980e866af9a5f93ad674144e1459b8ba3e7b7')
|
|
|
|
|
@@ -476,41 +575,52 @@ class ReadZlibTests(TestCase):
|
|
|
def setUp(self):
|
|
|
super(ReadZlibTests, self).setUp()
|
|
|
self.read = StringIO(self.comp + self.extra).read
|
|
|
+ self.unpacked = UnpackedObject(Tree.type_num, None, len(self.decomp), 0)
|
|
|
|
|
|
def test_decompress_size(self):
|
|
|
good_decomp_len = len(self.decomp)
|
|
|
- self.assertRaises(ValueError, read_zlib_chunks, self.read, -1)
|
|
|
+ self.unpacked.decomp_len = -1
|
|
|
+ self.assertRaises(ValueError, read_zlib_chunks, self.read,
|
|
|
+ self.unpacked)
|
|
|
+ self.unpacked.decomp_len = good_decomp_len - 1
|
|
|
self.assertRaises(zlib.error, read_zlib_chunks, self.read,
|
|
|
- good_decomp_len - 1)
|
|
|
+ self.unpacked)
|
|
|
+ self.unpacked.decomp_len = good_decomp_len + 1
|
|
|
self.assertRaises(zlib.error, read_zlib_chunks, self.read,
|
|
|
- good_decomp_len + 1)
|
|
|
+ self.unpacked)
|
|
|
|
|
|
def test_decompress_truncated(self):
|
|
|
read = StringIO(self.comp[:10]).read
|
|
|
- self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
|
|
|
+ self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked)
|
|
|
|
|
|
read = StringIO(self.comp).read
|
|
|
- self.assertRaises(zlib.error, read_zlib_chunks, read, len(self.decomp))
|
|
|
+ self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked)
|
|
|
|
|
|
def test_decompress_empty(self):
|
|
|
+ unpacked = UnpackedObject(Tree.type_num, None, 0, None)
|
|
|
comp = zlib.compress('')
|
|
|
read = StringIO(comp + self.extra).read
|
|
|
- decomp, comp_len, unused_data = read_zlib_chunks(read, 0)
|
|
|
- self.assertEqual('', ''.join(decomp))
|
|
|
- self.assertEqual(len(comp), comp_len)
|
|
|
- self.assertNotEquals('', unused_data)
|
|
|
- self.assertEquals(self.extra, unused_data + read())
|
|
|
-
|
|
|
- def _do_decompress_test(self, buffer_size):
|
|
|
- decomp, comp_len, unused_data = read_zlib_chunks(
|
|
|
- self.read, len(self.decomp), buffer_size=buffer_size)
|
|
|
- self.assertEquals(self.decomp, ''.join(decomp))
|
|
|
- self.assertEquals(len(self.comp), comp_len)
|
|
|
- self.assertNotEquals('', unused_data)
|
|
|
- self.assertEquals(self.extra, unused_data + self.read())
|
|
|
+ unused = read_zlib_chunks(read, unpacked)
|
|
|
+ self.assertEqual('', ''.join(unpacked.decomp_chunks))
|
|
|
+ self.assertNotEquals('', unused)
|
|
|
+ self.assertEquals(self.extra, unused + read())
|
|
|
+
|
|
|
+ def test_decompress_no_crc32(self):
|
|
|
+ self.unpacked.crc32 = None
|
|
|
+ read_zlib_chunks(self.read, self.unpacked)
|
|
|
+ self.assertEquals(None, self.unpacked.crc32)
|
|
|
+
|
|
|
+ def _do_decompress_test(self, buffer_size, **kwargs):
|
|
|
+ unused = read_zlib_chunks(self.read, self.unpacked,
|
|
|
+ buffer_size=buffer_size, **kwargs)
|
|
|
+ self.assertEquals(self.decomp, ''.join(self.unpacked.decomp_chunks))
|
|
|
+ self.assertEquals(zlib.crc32(self.comp), self.unpacked.crc32)
|
|
|
+ self.assertNotEquals('', unused)
|
|
|
+ self.assertEquals(self.extra, unused + self.read())
|
|
|
|
|
|
def test_simple_decompress(self):
|
|
|
self._do_decompress_test(4096)
|
|
|
+ self.assertEqual(None, self.unpacked.comp_chunks)
|
|
|
|
|
|
# These buffer sizes are not intended to be realistic, but rather simulate
|
|
|
# larger buffer sizes that may end at various places.
|
|
@@ -525,3 +635,290 @@ class ReadZlibTests(TestCase):
|
|
|
|
|
|
def test_decompress_buffer_size_4(self):
|
|
|
self._do_decompress_test(4)
|
|
|
+
|
|
|
+ def test_decompress_include_comp(self):
|
|
|
+ self._do_decompress_test(4096, include_comp=True)
|
|
|
+ self.assertEqual(self.comp, ''.join(self.unpacked.comp_chunks))
|
|
|
+
|
|
|
+
|
|
|
+class DeltifyTests(TestCase):
|
|
|
+
|
|
|
+ def test_empty(self):
|
|
|
+ self.assertEquals([], list(deltify_pack_objects([])))
|
|
|
+
|
|
|
+ def test_single(self):
|
|
|
+ b = Blob.from_string("foo")
|
|
|
+ self.assertEquals(
|
|
|
+ [(b.type_num, b.sha().digest(), None, b.as_raw_string())],
|
|
|
+ list(deltify_pack_objects([(b, "")])))
|
|
|
+
|
|
|
+ def test_simple_delta(self):
|
|
|
+ b1 = Blob.from_string("a" * 101)
|
|
|
+ b2 = Blob.from_string("a" * 100)
|
|
|
+ delta = create_delta(b1.as_raw_string(), b2.as_raw_string())
|
|
|
+ self.assertEquals([
|
|
|
+ (b1.type_num, b1.sha().digest(), None, b1.as_raw_string()),
|
|
|
+ (b2.type_num, b2.sha().digest(), b1.sha().digest(), delta)
|
|
|
+ ],
|
|
|
+ list(deltify_pack_objects([(b1, ""), (b2, "")])))
|
|
|
+
|
|
|
+
|
|
|
+class TestPackStreamReader(TestCase):
|
|
|
+
|
|
|
+ def test_read_objects_emtpy(self):
|
|
|
+ f = StringIO()
|
|
|
+ build_pack(f, [])
|
|
|
+ reader = PackStreamReader(f.read)
|
|
|
+ self.assertEqual(0, len(list(reader.read_objects())))
|
|
|
+
|
|
|
+ def test_read_objects(self):
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (Blob.type_num, 'blob'),
|
|
|
+ (OFS_DELTA, (0, 'blob1')),
|
|
|
+ ])
|
|
|
+ reader = PackStreamReader(f.read)
|
|
|
+ objects = list(reader.read_objects(compute_crc32=True))
|
|
|
+ self.assertEqual(2, len(objects))
|
|
|
+
|
|
|
+ unpacked_blob, unpacked_delta = objects
|
|
|
+
|
|
|
+ self.assertEqual(entries[0][0], unpacked_blob.offset)
|
|
|
+ self.assertEqual(Blob.type_num, unpacked_blob.pack_type_num)
|
|
|
+ self.assertEqual(Blob.type_num, unpacked_blob.obj_type_num)
|
|
|
+ self.assertEqual(None, unpacked_blob.delta_base)
|
|
|
+ self.assertEqual('blob', ''.join(unpacked_blob.decomp_chunks))
|
|
|
+ self.assertEqual(entries[0][4], unpacked_blob.crc32)
|
|
|
+
|
|
|
+ self.assertEqual(entries[1][0], unpacked_delta.offset)
|
|
|
+ self.assertEqual(OFS_DELTA, unpacked_delta.pack_type_num)
|
|
|
+ self.assertEqual(None, unpacked_delta.obj_type_num)
|
|
|
+ self.assertEqual(unpacked_delta.offset - unpacked_blob.offset,
|
|
|
+ unpacked_delta.delta_base)
|
|
|
+ delta = create_delta('blob', 'blob1')
|
|
|
+ self.assertEqual(delta, ''.join(unpacked_delta.decomp_chunks))
|
|
|
+ self.assertEqual(entries[1][4], unpacked_delta.crc32)
|
|
|
+
|
|
|
+ def test_read_objects_buffered(self):
|
|
|
+ f = StringIO()
|
|
|
+ build_pack(f, [
|
|
|
+ (Blob.type_num, 'blob'),
|
|
|
+ (OFS_DELTA, (0, 'blob1')),
|
|
|
+ ])
|
|
|
+ reader = PackStreamReader(f.read, zlib_bufsize=4)
|
|
|
+ self.assertEqual(2, len(list(reader.read_objects())))
|
|
|
+
|
|
|
+
|
|
|
+class TestPackIterator(DeltaChainIterator):
|
|
|
+
|
|
|
+ _compute_crc32 = True
|
|
|
+
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
+ super(TestPackIterator, self).__init__(*args, **kwargs)
|
|
|
+ self._unpacked_offsets = set()
|
|
|
+
|
|
|
+ def _result(self, unpacked):
|
|
|
+ """Return entries in the same format as build_pack."""
|
|
|
+ return (unpacked.offset, unpacked.obj_type_num,
|
|
|
+ ''.join(unpacked.obj_chunks), unpacked.sha(), unpacked.crc32)
|
|
|
+
|
|
|
+ def _resolve_object(self, offset, pack_type_num, base_chunks):
|
|
|
+ assert offset not in self._unpacked_offsets, (
|
|
|
+ 'Attempted to re-inflate offset %i' % offset)
|
|
|
+ self._unpacked_offsets.add(offset)
|
|
|
+ return super(TestPackIterator, self)._resolve_object(
|
|
|
+ offset, pack_type_num, base_chunks)
|
|
|
+
|
|
|
+
|
|
|
+class DeltaChainIteratorTests(TestCase):
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ self.store = MemoryObjectStore()
|
|
|
+ self.fetched = set()
|
|
|
+
|
|
|
+ def store_blobs(self, blobs_data):
|
|
|
+ blobs = []
|
|
|
+ for data in blobs_data:
|
|
|
+ blob = make_object(Blob, data=data)
|
|
|
+ blobs.append(blob)
|
|
|
+ self.store.add_object(blob)
|
|
|
+ return blobs
|
|
|
+
|
|
|
+ def get_raw_no_repeat(self, bin_sha):
|
|
|
+ """Wrapper around store.get_raw that doesn't allow repeat lookups."""
|
|
|
+ hex_sha = sha_to_hex(bin_sha)
|
|
|
+ self.assertFalse(hex_sha in self.fetched,
|
|
|
+ 'Attempted to re-fetch object %s' % hex_sha)
|
|
|
+ self.fetched.add(hex_sha)
|
|
|
+ return self.store.get_raw(hex_sha)
|
|
|
+
|
|
|
+ def make_pack_iter(self, f, thin=None):
|
|
|
+ if thin is None:
|
|
|
+ thin = bool(list(self.store))
|
|
|
+ resolve_ext_ref = thin and self.get_raw_no_repeat or None
|
|
|
+ data = PackData('test.pack', file=f)
|
|
|
+ return TestPackIterator.for_pack_data(
|
|
|
+ data, resolve_ext_ref=resolve_ext_ref)
|
|
|
+
|
|
|
+ def assertEntriesMatch(self, expected_indexes, entries, pack_iter):
|
|
|
+ expected = [entries[i] for i in expected_indexes]
|
|
|
+ self.assertEqual(expected, list(pack_iter._walk_all_chains()))
|
|
|
+
|
|
|
+ def test_no_deltas(self):
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (Commit.type_num, 'commit'),
|
|
|
+ (Blob.type_num, 'blob'),
|
|
|
+ (Tree.type_num, 'tree'),
|
|
|
+ ])
|
|
|
+ self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
|
|
|
+
|
|
|
+ def test_ofs_deltas(self):
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (Blob.type_num, 'blob'),
|
|
|
+ (OFS_DELTA, (0, 'blob1')),
|
|
|
+ (OFS_DELTA, (0, 'blob2')),
|
|
|
+ ])
|
|
|
+ self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
|
|
|
+
|
|
|
+ def test_ofs_deltas_chain(self):
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (Blob.type_num, 'blob'),
|
|
|
+ (OFS_DELTA, (0, 'blob1')),
|
|
|
+ (OFS_DELTA, (1, 'blob2')),
|
|
|
+ ])
|
|
|
+ self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
|
|
|
+
|
|
|
+ def test_ref_deltas(self):
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (REF_DELTA, (1, 'blob1')),
|
|
|
+ (Blob.type_num, ('blob')),
|
|
|
+ (REF_DELTA, (1, 'blob2')),
|
|
|
+ ])
|
|
|
+ self.assertEntriesMatch([1, 0, 2], entries, self.make_pack_iter(f))
|
|
|
+
|
|
|
+ def test_ref_deltas_chain(self):
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (REF_DELTA, (2, 'blob1')),
|
|
|
+ (Blob.type_num, ('blob')),
|
|
|
+ (REF_DELTA, (1, 'blob2')),
|
|
|
+ ])
|
|
|
+ self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
|
|
|
+
|
|
|
+ def test_ofs_and_ref_deltas(self):
|
|
|
+ # Deltas pending on this offset are popped before deltas depending on
|
|
|
+ # this ref.
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (REF_DELTA, (1, 'blob1')),
|
|
|
+ (Blob.type_num, ('blob')),
|
|
|
+ (OFS_DELTA, (1, 'blob2')),
|
|
|
+ ])
|
|
|
+ self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
|
|
|
+
|
|
|
+ def test_mixed_chain(self):
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (Blob.type_num, 'blob'),
|
|
|
+ (REF_DELTA, (2, 'blob2')),
|
|
|
+ (OFS_DELTA, (0, 'blob1')),
|
|
|
+ (OFS_DELTA, (1, 'blob3')),
|
|
|
+ (OFS_DELTA, (0, 'bob')),
|
|
|
+ ])
|
|
|
+ self.assertEntriesMatch([0, 2, 1, 3, 4], entries,
|
|
|
+ self.make_pack_iter(f))
|
|
|
+
|
|
|
+ def test_long_chain(self):
|
|
|
+ n = 100
|
|
|
+ objects_spec = [(Blob.type_num, 'blob')]
|
|
|
+ for i in xrange(n):
|
|
|
+ objects_spec.append((OFS_DELTA, (i, 'blob%i' % i)))
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, objects_spec)
|
|
|
+ self.assertEntriesMatch(xrange(n + 1), entries, self.make_pack_iter(f))
|
|
|
+
|
|
|
+ def test_branchy_chain(self):
|
|
|
+ n = 100
|
|
|
+ objects_spec = [(Blob.type_num, 'blob')]
|
|
|
+ for i in xrange(n):
|
|
|
+ objects_spec.append((OFS_DELTA, (0, 'blob%i' % i)))
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, objects_spec)
|
|
|
+ self.assertEntriesMatch(xrange(n + 1), entries, self.make_pack_iter(f))
|
|
|
+
|
|
|
+ def test_ext_ref(self):
|
|
|
+ blob, = self.store_blobs(['blob'])
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [(REF_DELTA, (blob.id, 'blob1'))],
|
|
|
+ store=self.store)
|
|
|
+ pack_iter = self.make_pack_iter(f)
|
|
|
+ self.assertEntriesMatch([0], entries, pack_iter)
|
|
|
+ self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
|
+
|
|
|
+ def test_ext_ref_chain(self):
|
|
|
+ blob, = self.store_blobs(['blob'])
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (REF_DELTA, (1, 'blob2')),
|
|
|
+ (REF_DELTA, (blob.id, 'blob1')),
|
|
|
+ ], store=self.store)
|
|
|
+ pack_iter = self.make_pack_iter(f)
|
|
|
+ self.assertEntriesMatch([1, 0], entries, pack_iter)
|
|
|
+ self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
|
+
|
|
|
+ def test_ext_ref_multiple_times(self):
|
|
|
+ blob, = self.store_blobs(['blob'])
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (REF_DELTA, (blob.id, 'blob1')),
|
|
|
+ (REF_DELTA, (blob.id, 'blob2')),
|
|
|
+ ], store=self.store)
|
|
|
+ pack_iter = self.make_pack_iter(f)
|
|
|
+ self.assertEntriesMatch([0, 1], entries, pack_iter)
|
|
|
+ self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
|
+
|
|
|
+ def test_multiple_ext_refs(self):
|
|
|
+ b1, b2 = self.store_blobs(['foo', 'bar'])
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (REF_DELTA, (b1.id, 'foo1')),
|
|
|
+ (REF_DELTA, (b2.id, 'bar2')),
|
|
|
+ ], store=self.store)
|
|
|
+ pack_iter = self.make_pack_iter(f)
|
|
|
+ self.assertEntriesMatch([0, 1], entries, pack_iter)
|
|
|
+ self.assertEqual([hex_to_sha(b1.id), hex_to_sha(b2.id)],
|
|
|
+ pack_iter.ext_refs())
|
|
|
+
|
|
|
+ def test_bad_ext_ref_non_thin_pack(self):
|
|
|
+ blob, = self.store_blobs(['blob'])
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [(REF_DELTA, (blob.id, 'blob1'))],
|
|
|
+ store=self.store)
|
|
|
+ pack_iter = self.make_pack_iter(f, thin=False)
|
|
|
+ try:
|
|
|
+ list(pack_iter._walk_all_chains())
|
|
|
+ self.fail()
|
|
|
+ except KeyError, e:
|
|
|
+ self.assertEqual(([blob.id],), e.args)
|
|
|
+
|
|
|
+ def test_bad_ext_ref_thin_pack(self):
|
|
|
+ b1, b2, b3 = self.store_blobs(['foo', 'bar', 'baz'])
|
|
|
+ f = StringIO()
|
|
|
+ entries = build_pack(f, [
|
|
|
+ (REF_DELTA, (1, 'foo99')),
|
|
|
+ (REF_DELTA, (b1.id, 'foo1')),
|
|
|
+ (REF_DELTA, (b2.id, 'bar2')),
|
|
|
+ (REF_DELTA, (b3.id, 'baz3')),
|
|
|
+ ], store=self.store)
|
|
|
+ del self.store[b2.id]
|
|
|
+ del self.store[b3.id]
|
|
|
+ pack_iter = self.make_pack_iter(f)
|
|
|
+ try:
|
|
|
+ list(pack_iter._walk_all_chains())
|
|
|
+ self.fail()
|
|
|
+ except KeyError, e:
|
|
|
+ self.assertEqual((sorted([b2.id, b3.id]),), e.args)
|