|
@@ -24,6 +24,7 @@ from io import BytesIO
|
|
|
from hashlib import sha1
|
|
|
import os
|
|
|
import shutil
|
|
|
+import sys
|
|
|
import tempfile
|
|
|
import zlib
|
|
|
|
|
@@ -74,38 +75,39 @@ from dulwich.tests import (
|
|
|
from dulwich.tests.utils import (
|
|
|
make_object,
|
|
|
build_pack,
|
|
|
- skipIfPY3,
|
|
|
)
|
|
|
|
|
|
-pack1_sha = 'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
|
|
|
+pack1_sha = b'bc63ddad95e7321ee734ea11a7a62d314e0d7481'
|
|
|
|
|
|
-a_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
|
|
|
-tree_sha = 'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
|
|
|
-commit_sha = 'f18faa16531ac570a3fdc8c7ca16682548dafd12'
|
|
|
+a_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8'
|
|
|
+tree_sha = b'b2a2766a2879c209ab1176e7e778b81ae422eeaa'
|
|
|
+commit_sha = b'f18faa16531ac570a3fdc8c7ca16682548dafd12'
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class PackTests(TestCase):
|
|
|
"""Base class for testing packs"""
|
|
|
|
|
|
def setUp(self):
|
|
|
super(PackTests, self).setUp()
|
|
|
self.tempdir = tempfile.mkdtemp()
|
|
|
+ if not isinstance(self.tempdir, bytes):
|
|
|
+ self.tempdir = self.tempdir.encode(sys.getfilesystemencoding())
|
|
|
self.addCleanup(shutil.rmtree, self.tempdir)
|
|
|
|
|
|
- datadir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
|
|
- 'data/packs'))
|
|
|
+ datadir = os.path.abspath(
|
|
|
+ os.path.join(os.path.dirname(__file__.encode(sys.getfilesystemencoding())),
|
|
|
+ b'data/packs'))
|
|
|
|
|
|
def get_pack_index(self, sha):
|
|
|
"""Returns a PackIndex from the datadir with the given sha"""
|
|
|
- return load_pack_index(os.path.join(self.datadir, 'pack-%s.idx' % sha))
|
|
|
+ return load_pack_index(os.path.join(self.datadir, b'pack-' + sha + b'.idx'))
|
|
|
|
|
|
def get_pack_data(self, sha):
|
|
|
"""Returns a PackData object from the datadir with the given sha"""
|
|
|
- return PackData(os.path.join(self.datadir, 'pack-%s.pack' % sha))
|
|
|
+ return PackData(os.path.join(self.datadir, b'pack-' + sha + b'.pack'))
|
|
|
|
|
|
def get_pack(self, sha):
|
|
|
- return Pack(os.path.join(self.datadir, 'pack-%s' % sha))
|
|
|
+ return Pack(os.path.join(self.datadir, b'pack-' + sha))
|
|
|
|
|
|
def assertSucceeds(self, func, *args, **kwargs):
|
|
|
try:
|
|
@@ -114,7 +116,6 @@ class PackTests(TestCase):
|
|
|
self.fail(e)
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class PackIndexTests(PackTests):
|
|
|
"""Class that tests the index of packfiles"""
|
|
|
|
|
@@ -132,10 +133,10 @@ class PackIndexTests(PackTests):
|
|
|
|
|
|
def test_get_stored_checksum(self):
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
- self.assertEqual('f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
|
|
|
- sha_to_hex(p.get_stored_checksum()))
|
|
|
- self.assertEqual('721980e866af9a5f93ad674144e1459b8ba3e7b7',
|
|
|
- sha_to_hex(p.get_pack_checksum()))
|
|
|
+ self.assertEqual(b'f2848e2ad16f329ae1c92e3b95e91888daa5bd01',
|
|
|
+ sha_to_hex(p.get_stored_checksum()))
|
|
|
+ self.assertEqual(b'721980e866af9a5f93ad674144e1459b8ba3e7b7',
|
|
|
+ sha_to_hex(p.get_pack_checksum()))
|
|
|
|
|
|
def test_index_check(self):
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
@@ -145,30 +146,29 @@ class PackIndexTests(PackTests):
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
|
|
|
self.assertEqual([
|
|
|
- ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
|
|
|
- ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
|
|
|
- ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
|
|
|
- ], entries)
|
|
|
+ (b'6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, None),
|
|
|
+ (b'b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, None),
|
|
|
+ (b'f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, None)
|
|
|
+ ], entries)
|
|
|
|
|
|
def test_iter(self):
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
self.assertEqual(set([tree_sha, commit_sha, a_sha]), set(p))
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class TestPackDeltas(TestCase):
|
|
|
|
|
|
- test_string1 = 'The answer was flailing in the wind'
|
|
|
- test_string2 = 'The answer was falling down the pipe'
|
|
|
- test_string3 = 'zzzzz'
|
|
|
+ test_string1 = b'The answer was flailing in the wind'
|
|
|
+ test_string2 = b'The answer was falling down the pipe'
|
|
|
+ test_string3 = b'zzzzz'
|
|
|
|
|
|
- test_string_empty = ''
|
|
|
- test_string_big = 'Z' * 8192
|
|
|
- test_string_huge = 'Z' * 100000
|
|
|
+ test_string_empty = b''
|
|
|
+ test_string_big = b'Z' * 8192
|
|
|
+ test_string_huge = b'Z' * 100000
|
|
|
|
|
|
def _test_roundtrip(self, base, target):
|
|
|
self.assertEqual(target,
|
|
|
- ''.join(apply_delta(base, create_delta(base, target))))
|
|
|
+ b''.join(apply_delta(base, create_delta(base, target))))
|
|
|
|
|
|
def test_nochange(self):
|
|
|
self._test_roundtrip(self.test_string1, self.test_string1)
|
|
@@ -195,13 +195,12 @@ class TestPackDeltas(TestCase):
|
|
|
def test_dest_overflow(self):
|
|
|
self.assertRaises(
|
|
|
ApplyDeltaError,
|
|
|
- apply_delta, 'a'*0x10000, '\x80\x80\x04\x80\x80\x04\x80' + 'a'*0x10000)
|
|
|
+ apply_delta, b'a'*0x10000, b'\x80\x80\x04\x80\x80\x04\x80' + b'a'*0x10000)
|
|
|
self.assertRaises(
|
|
|
ApplyDeltaError,
|
|
|
- apply_delta, '', '\x00\x80\x02\xb0\x11\x11')
|
|
|
+ apply_delta, b'', b'\x00\x80\x02\xb0\x11\x11')
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class TestPackData(PackTests):
|
|
|
"""Tests getting the data from the packfile."""
|
|
|
|
|
@@ -209,8 +208,9 @@ class TestPackData(PackTests):
|
|
|
self.get_pack_data(pack1_sha).close()
|
|
|
|
|
|
def test_from_file(self):
|
|
|
- path = os.path.join(self.datadir, 'pack-%s.pack' % pack1_sha)
|
|
|
- PackData.from_file(open(path), os.path.getsize(path))
|
|
|
+ path = os.path.join(self.datadir, b'pack-' + pack1_sha + b'.pack')
|
|
|
+ with open(path, 'rb') as f:
|
|
|
+ PackData.from_file(f, os.path.getsize(path))
|
|
|
|
|
|
def test_pack_len(self):
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
@@ -222,36 +222,36 @@ class TestPackData(PackTests):
|
|
|
|
|
|
def test_iterobjects(self):
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
|
- commit_data = ('tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
|
|
|
- 'author James Westby <jw+debian@jameswestby.net> '
|
|
|
- '1174945067 +0100\n'
|
|
|
- 'committer James Westby <jw+debian@jameswestby.net> '
|
|
|
- '1174945067 +0100\n'
|
|
|
- '\n'
|
|
|
- 'Test commit\n')
|
|
|
- blob_sha = '6f670c0fb53f9463760b7295fbb814e965fb20c8'
|
|
|
- tree_data = '100644 a\0%s' % hex_to_sha(blob_sha)
|
|
|
+ commit_data = (b'tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n'
|
|
|
+ b'author James Westby <jw+debian@jameswestby.net> '
|
|
|
+ b'1174945067 +0100\n'
|
|
|
+ b'committer James Westby <jw+debian@jameswestby.net> '
|
|
|
+ b'1174945067 +0100\n'
|
|
|
+ b'\n'
|
|
|
+ b'Test commit\n')
|
|
|
+ blob_sha = b'6f670c0fb53f9463760b7295fbb814e965fb20c8'
|
|
|
+ tree_data = b'100644 a\0' + hex_to_sha(blob_sha)
|
|
|
actual = []
|
|
|
for offset, type_num, chunks, crc32 in p.iterobjects():
|
|
|
- actual.append((offset, type_num, ''.join(chunks), crc32))
|
|
|
+ actual.append((offset, type_num, b''.join(chunks), crc32))
|
|
|
self.assertEqual([
|
|
|
- (12, 1, commit_data, 3775879613),
|
|
|
- (138, 2, tree_data, 912998690),
|
|
|
- (178, 3, 'test 1\n', 1373561701)
|
|
|
- ], actual)
|
|
|
+ (12, 1, commit_data, 3775879613),
|
|
|
+ (138, 2, tree_data, 912998690),
|
|
|
+ (178, 3, b'test 1\n', 1373561701)
|
|
|
+ ], actual)
|
|
|
|
|
|
def test_iterentries(self):
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
|
entries = set((sha_to_hex(s), o, c) for s, o, c in p.iterentries())
|
|
|
self.assertEqual(set([
|
|
|
- ('6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701),
|
|
|
- ('b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690),
|
|
|
- ('f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613),
|
|
|
+ (b'6f670c0fb53f9463760b7295fbb814e965fb20c8', 178, 1373561701),
|
|
|
+ (b'b2a2766a2879c209ab1176e7e778b81ae422eeaa', 138, 912998690),
|
|
|
+ (b'f18faa16531ac570a3fdc8c7ca16682548dafd12', 12, 3775879613),
|
|
|
]), entries)
|
|
|
|
|
|
def test_create_index_v1(self):
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
|
- filename = os.path.join(self.tempdir, 'v1test.idx')
|
|
|
+ filename = os.path.join(self.tempdir, b'v1test.idx')
|
|
|
p.create_index_v1(filename)
|
|
|
idx1 = load_pack_index(filename)
|
|
|
idx2 = self.get_pack_index(pack1_sha)
|
|
@@ -259,35 +259,34 @@ class TestPackData(PackTests):
|
|
|
|
|
|
def test_create_index_v2(self):
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
|
- filename = os.path.join(self.tempdir, 'v2test.idx')
|
|
|
+ filename = os.path.join(self.tempdir, b'v2test.idx')
|
|
|
p.create_index_v2(filename)
|
|
|
idx1 = load_pack_index(filename)
|
|
|
idx2 = self.get_pack_index(pack1_sha)
|
|
|
self.assertEqual(idx1, idx2)
|
|
|
|
|
|
def test_compute_file_sha(self):
|
|
|
- f = BytesIO('abcd1234wxyz')
|
|
|
- self.assertEqual(sha1('abcd1234wxyz').hexdigest(),
|
|
|
+ f = BytesIO(b'abcd1234wxyz')
|
|
|
+ self.assertEqual(sha1(b'abcd1234wxyz').hexdigest(),
|
|
|
compute_file_sha(f).hexdigest())
|
|
|
- self.assertEqual(sha1('abcd1234wxyz').hexdigest(),
|
|
|
+ self.assertEqual(sha1(b'abcd1234wxyz').hexdigest(),
|
|
|
compute_file_sha(f, buffer_size=5).hexdigest())
|
|
|
- self.assertEqual(sha1('abcd1234').hexdigest(),
|
|
|
+ self.assertEqual(sha1(b'abcd1234').hexdigest(),
|
|
|
compute_file_sha(f, end_ofs=-4).hexdigest())
|
|
|
- self.assertEqual(sha1('1234wxyz').hexdigest(),
|
|
|
+ self.assertEqual(sha1(b'1234wxyz').hexdigest(),
|
|
|
compute_file_sha(f, start_ofs=4).hexdigest())
|
|
|
self.assertEqual(
|
|
|
- sha1('1234').hexdigest(),
|
|
|
- compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest())
|
|
|
+ sha1(b'1234').hexdigest(),
|
|
|
+ compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest())
|
|
|
|
|
|
def test_compute_file_sha_short_file(self):
|
|
|
- f = BytesIO('abcd1234wxyz')
|
|
|
+ f = BytesIO(b'abcd1234wxyz')
|
|
|
self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20)
|
|
|
self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20)
|
|
|
self.assertRaises(AssertionError, compute_file_sha, f, start_ofs=10,
|
|
|
end_ofs=-12)
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class TestPack(PackTests):
|
|
|
|
|
|
def test_len(self):
|
|
@@ -323,19 +322,19 @@ class TestPack(PackTests):
|
|
|
"""Tests random access for non-delta objects"""
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
obj = p[a_sha]
|
|
|
- self.assertEqual(obj.type_name, 'blob')
|
|
|
- self.assertEqual(obj.sha().hexdigest(), a_sha)
|
|
|
+ self.assertEqual(obj.type_name, b'blob')
|
|
|
+ self.assertEqual(obj.sha().hexdigest().encode('ascii'), a_sha)
|
|
|
obj = p[tree_sha]
|
|
|
- self.assertEqual(obj.type_name, 'tree')
|
|
|
- self.assertEqual(obj.sha().hexdigest(), tree_sha)
|
|
|
+ self.assertEqual(obj.type_name, b'tree')
|
|
|
+ self.assertEqual(obj.sha().hexdigest().encode('ascii'), tree_sha)
|
|
|
obj = p[commit_sha]
|
|
|
- self.assertEqual(obj.type_name, 'commit')
|
|
|
- self.assertEqual(obj.sha().hexdigest(), commit_sha)
|
|
|
+ self.assertEqual(obj.type_name, b'commit')
|
|
|
+ self.assertEqual(obj.sha().hexdigest().encode('ascii'), commit_sha)
|
|
|
|
|
|
def test_copy(self):
|
|
|
with self.get_pack(pack1_sha) as origpack:
|
|
|
self.assertSucceeds(origpack.index.check)
|
|
|
- basename = os.path.join(self.tempdir, 'Elch')
|
|
|
+ basename = os.path.join(self.tempdir, b'Elch')
|
|
|
write_pack(basename, origpack.pack_tuples())
|
|
|
|
|
|
with Pack(basename) as newpack:
|
|
@@ -353,12 +352,12 @@ class TestPack(PackTests):
|
|
|
def test_commit_obj(self):
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
commit = p[commit_sha]
|
|
|
- self.assertEqual('James Westby <jw+debian@jameswestby.net>',
|
|
|
+ self.assertEqual(b'James Westby <jw+debian@jameswestby.net>',
|
|
|
commit.author)
|
|
|
self.assertEqual([], commit.parents)
|
|
|
|
|
|
def _copy_pack(self, origpack):
|
|
|
- basename = os.path.join(self.tempdir, 'somepack')
|
|
|
+ basename = os.path.join(self.tempdir, b'somepack')
|
|
|
write_pack(basename, origpack.pack_tuples())
|
|
|
return Pack(basename)
|
|
|
|
|
@@ -380,7 +379,7 @@ class TestPack(PackTests):
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
p = self._copy_pack(p)
|
|
|
|
|
|
- msg = 'some message'
|
|
|
+ msg = b'some message'
|
|
|
with p:
|
|
|
keepfile_name = p.keep(msg)
|
|
|
|
|
@@ -388,9 +387,9 @@ class TestPack(PackTests):
|
|
|
self.assertTrue(os.path.exists(keepfile_name))
|
|
|
|
|
|
# and contain the right message, with a linefeed
|
|
|
- with open(keepfile_name, 'r') as f:
|
|
|
+ with open(keepfile_name, 'rb') as f:
|
|
|
buf = f.read()
|
|
|
- self.assertEqual(msg + '\n', buf)
|
|
|
+ self.assertEqual(msg + b'\n', buf)
|
|
|
|
|
|
def test_name(self):
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
@@ -406,7 +405,7 @@ class TestPack(PackTests):
|
|
|
write_pack_header(bad_file, 9999)
|
|
|
bad_file.write(data._file.read())
|
|
|
bad_file = BytesIO(bad_file.getvalue())
|
|
|
- bad_data = PackData('', file=bad_file)
|
|
|
+ bad_data = PackData(b'', file=bad_file)
|
|
|
bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
|
|
|
self.assertRaises(AssertionError, lambda: bad_pack.data)
|
|
|
self.assertRaises(AssertionError,
|
|
@@ -418,8 +417,8 @@ class TestPack(PackTests):
|
|
|
Pack.from_objects(data, index).check_length_and_checksum()
|
|
|
|
|
|
data._file.seek(0)
|
|
|
- bad_file = BytesIO(data._file.read()[:-20] + ('\xff' * 20))
|
|
|
- bad_data = PackData('', file=bad_file)
|
|
|
+ bad_file = BytesIO(data._file.read()[:-20] + (b'\xff' * 20))
|
|
|
+ bad_data = PackData(b'', file=bad_file)
|
|
|
bad_pack = Pack.from_lazy_objects(lambda: bad_data, lambda: index)
|
|
|
self.assertRaises(ChecksumMismatch, lambda: bad_pack.data)
|
|
|
self.assertRaises(ChecksumMismatch, lambda:
|
|
@@ -435,38 +434,39 @@ class TestPack(PackTests):
|
|
|
self.assertTrue(isinstance(objs[commit_sha], Commit))
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class TestThinPack(PackTests):
|
|
|
|
|
|
def setUp(self):
|
|
|
super(TestThinPack, self).setUp()
|
|
|
self.store = MemoryObjectStore()
|
|
|
self.blobs = {}
|
|
|
- for blob in ('foo', 'bar', 'foo1234', 'bar2468'):
|
|
|
+ for blob in (b'foo', b'bar', b'foo1234', b'bar2468'):
|
|
|
self.blobs[blob] = make_object(Blob, data=blob)
|
|
|
- self.store.add_object(self.blobs['foo'])
|
|
|
- self.store.add_object(self.blobs['bar'])
|
|
|
+ self.store.add_object(self.blobs[b'foo'])
|
|
|
+ self.store.add_object(self.blobs[b'bar'])
|
|
|
|
|
|
# Build a thin pack. 'foo' is as an external reference, 'bar' an
|
|
|
# internal reference.
|
|
|
self.pack_dir = tempfile.mkdtemp()
|
|
|
+ if not isinstance(self.pack_dir, bytes):
|
|
|
+ self.pack_dir = self.pack_dir.encode(sys.getfilesystemencoding())
|
|
|
self.addCleanup(shutil.rmtree, self.pack_dir)
|
|
|
- self.pack_prefix = os.path.join(self.pack_dir, 'pack')
|
|
|
+ self.pack_prefix = os.path.join(self.pack_dir, b'pack')
|
|
|
|
|
|
- with open(self.pack_prefix + '.pack', 'wb') as f:
|
|
|
+ with open(self.pack_prefix + b'.pack', 'wb') as f:
|
|
|
build_pack(f, [
|
|
|
- (REF_DELTA, (self.blobs['foo'].id, 'foo1234')),
|
|
|
- (Blob.type_num, 'bar'),
|
|
|
- (REF_DELTA, (self.blobs['bar'].id, 'bar2468'))],
|
|
|
+ (REF_DELTA, (self.blobs[b'foo'].id, b'foo1234')),
|
|
|
+ (Blob.type_num, b'bar'),
|
|
|
+ (REF_DELTA, (self.blobs[b'bar'].id, b'bar2468'))],
|
|
|
store=self.store)
|
|
|
|
|
|
# Index the new pack.
|
|
|
with self.make_pack(True) as pack:
|
|
|
with PackData(pack._data_path) as data:
|
|
|
data.pack = pack
|
|
|
- data.create_index(self.pack_prefix + '.idx')
|
|
|
+ data.create_index(self.pack_prefix + b'.idx')
|
|
|
|
|
|
- del self.store[self.blobs['bar'].id]
|
|
|
+ del self.store[self.blobs[b'bar'].id]
|
|
|
|
|
|
def make_pack(self, resolve_ext_ref):
|
|
|
return Pack(
|
|
@@ -476,54 +476,53 @@ class TestThinPack(PackTests):
|
|
|
def test_get_raw(self):
|
|
|
with self.make_pack(False) as p:
|
|
|
self.assertRaises(
|
|
|
- KeyError, p.get_raw, self.blobs['foo1234'].id)
|
|
|
+ KeyError, p.get_raw, self.blobs[b'foo1234'].id)
|
|
|
with self.make_pack(True) as p:
|
|
|
self.assertEqual(
|
|
|
- (3, 'foo1234'),
|
|
|
- p.get_raw(self.blobs['foo1234'].id))
|
|
|
+ (3, b'foo1234'),
|
|
|
+ p.get_raw(self.blobs[b'foo1234'].id))
|
|
|
|
|
|
def test_iterobjects(self):
|
|
|
with self.make_pack(False) as p:
|
|
|
self.assertRaises(KeyError, list, p.iterobjects())
|
|
|
with self.make_pack(True) as p:
|
|
|
self.assertEqual(
|
|
|
- sorted([self.blobs['foo1234'].id, self.blobs[b'bar'].id,
|
|
|
- self.blobs['bar2468'].id]),
|
|
|
+ sorted([self.blobs[b'foo1234'].id, self.blobs[b'bar'].id,
|
|
|
+ self.blobs[b'bar2468'].id]),
|
|
|
sorted(o.id for o in p.iterobjects()))
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class WritePackTests(TestCase):
|
|
|
|
|
|
def test_write_pack_header(self):
|
|
|
f = BytesIO()
|
|
|
write_pack_header(f, 42)
|
|
|
- self.assertEqual('PACK\x00\x00\x00\x02\x00\x00\x00*',
|
|
|
- f.getvalue())
|
|
|
+ self.assertEqual(b'PACK\x00\x00\x00\x02\x00\x00\x00*',
|
|
|
+ f.getvalue())
|
|
|
|
|
|
def test_write_pack_object(self):
|
|
|
f = BytesIO()
|
|
|
- f.write('header')
|
|
|
+ f.write(b'header')
|
|
|
offset = f.tell()
|
|
|
- crc32 = write_pack_object(f, Blob.type_num, 'blob')
|
|
|
+ crc32 = write_pack_object(f, Blob.type_num, b'blob')
|
|
|
self.assertEqual(crc32, zlib.crc32(f.getvalue()[6:]) & 0xffffffff)
|
|
|
|
|
|
- f.write('x') # unpack_object needs extra trailing data.
|
|
|
+ f.write(b'x') # unpack_object needs extra trailing data.
|
|
|
f.seek(offset)
|
|
|
unpacked, unused = unpack_object(f.read, compute_crc32=True)
|
|
|
self.assertEqual(Blob.type_num, unpacked.pack_type_num)
|
|
|
self.assertEqual(Blob.type_num, unpacked.obj_type_num)
|
|
|
- self.assertEqual(['blob'], unpacked.decomp_chunks)
|
|
|
+ self.assertEqual([b'blob'], unpacked.decomp_chunks)
|
|
|
self.assertEqual(crc32, unpacked.crc32)
|
|
|
- self.assertEqual('x', unused)
|
|
|
+ self.assertEqual(b'x', unused)
|
|
|
|
|
|
def test_write_pack_object_sha(self):
|
|
|
f = BytesIO()
|
|
|
- f.write('header')
|
|
|
+ f.write(b'header')
|
|
|
offset = f.tell()
|
|
|
- sha_a = sha1('foo')
|
|
|
+ sha_a = sha1(b'foo')
|
|
|
sha_b = sha_a.copy()
|
|
|
- write_pack_object(f, Blob.type_num, 'blob', sha=sha_a)
|
|
|
+ write_pack_object(f, Blob.type_num, b'blob', sha=sha_a)
|
|
|
self.assertNotEqual(sha_a.digest(), sha_b.digest())
|
|
|
sha_b.update(f.getvalue()[offset:])
|
|
|
self.assertEqual(sha_a.digest(), sha_b.digest())
|
|
@@ -544,7 +543,7 @@ class BaseTestPackIndexWriting(object):
|
|
|
raise NotImplementedError(self.index)
|
|
|
|
|
|
def test_empty(self):
|
|
|
- idx = self.index('empty.idx', [], pack_checksum)
|
|
|
+ idx = self.index(b'empty.idx', [], pack_checksum)
|
|
|
self.assertEqual(idx.get_pack_checksum(), pack_checksum)
|
|
|
self.assertEqual(0, len(idx))
|
|
|
|
|
@@ -557,7 +556,7 @@ class BaseTestPackIndexWriting(object):
|
|
|
self.assertRaises(TypeError, self.index, 'single.idx',
|
|
|
entries, pack_checksum)
|
|
|
return
|
|
|
- idx = self.index('single.idx', entries, pack_checksum)
|
|
|
+ idx = self.index(b'single.idx', entries, pack_checksum)
|
|
|
self.assertEqual(idx.get_pack_checksum(), pack_checksum)
|
|
|
self.assertEqual(2, len(idx))
|
|
|
actual_entries = list(idx.iterentries())
|
|
@@ -575,7 +574,7 @@ class BaseTestPackIndexWriting(object):
|
|
|
def test_single(self):
|
|
|
entry_sha = hex_to_sha('6f670c0fb53f9463760b7295fbb814e965fb20c8')
|
|
|
my_entries = [(entry_sha, 178, 42)]
|
|
|
- idx = self.index('single.idx', my_entries, pack_checksum)
|
|
|
+ idx = self.index(b'single.idx', my_entries, pack_checksum)
|
|
|
self.assertEqual(idx.get_pack_checksum(), pack_checksum)
|
|
|
self.assertEqual(1, len(idx))
|
|
|
actual_entries = list(idx.iterentries())
|
|
@@ -595,6 +594,8 @@ class BaseTestFilePackIndexWriting(BaseTestPackIndexWriting):
|
|
|
|
|
|
def setUp(self):
|
|
|
self.tempdir = tempfile.mkdtemp()
|
|
|
+ if not isinstance(self.tempdir, bytes):
|
|
|
+ self.tempdir = self.tempdir.encode(sys.getfilesystemencoding())
|
|
|
|
|
|
def tearDown(self):
|
|
|
shutil.rmtree(self.tempdir)
|
|
@@ -613,7 +614,6 @@ class BaseTestFilePackIndexWriting(BaseTestPackIndexWriting):
|
|
|
self._write_fn(f, entries, pack_checksum)
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class TestMemoryIndexWriting(TestCase, BaseTestPackIndexWriting):
|
|
|
|
|
|
def setUp(self):
|
|
@@ -628,7 +628,6 @@ class TestMemoryIndexWriting(TestCase, BaseTestPackIndexWriting):
|
|
|
TestCase.tearDown(self)
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class TestPackIndexWritingv1(TestCase, BaseTestFilePackIndexWriting):
|
|
|
|
|
|
def setUp(self):
|
|
@@ -644,7 +643,6 @@ class TestPackIndexWritingv1(TestCase, BaseTestFilePackIndexWriting):
|
|
|
BaseTestFilePackIndexWriting.tearDown(self)
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class TestPackIndexWritingv2(TestCase, BaseTestFilePackIndexWriting):
|
|
|
|
|
|
def setUp(self):
|
|
@@ -660,7 +658,6 @@ class TestPackIndexWritingv2(TestCase, BaseTestFilePackIndexWriting):
|
|
|
BaseTestFilePackIndexWriting.tearDown(self)
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class ReadZlibTests(TestCase):
|
|
|
|
|
|
decomp = (
|
|
@@ -671,7 +668,7 @@ class ReadZlibTests(TestCase):
|
|
|
b'\n'
|
|
|
b"Provide replacement for mmap()'s offset argument.")
|
|
|
comp = zlib.compress(decomp)
|
|
|
- extra = 'nextobject'
|
|
|
+ extra = b'nextobject'
|
|
|
|
|
|
def setUp(self):
|
|
|
super(ReadZlibTests, self).setUp()
|
|
@@ -699,11 +696,11 @@ class ReadZlibTests(TestCase):
|
|
|
|
|
|
def test_decompress_empty(self):
|
|
|
unpacked = UnpackedObject(Tree.type_num, None, 0, None)
|
|
|
- comp = zlib.compress('')
|
|
|
+ comp = zlib.compress(b'')
|
|
|
read = BytesIO(comp + self.extra).read
|
|
|
unused = read_zlib_chunks(read, unpacked)
|
|
|
- self.assertEqual('', ''.join(unpacked.decomp_chunks))
|
|
|
- self.assertNotEqual('', unused)
|
|
|
+ self.assertEqual(b'', b''.join(unpacked.decomp_chunks))
|
|
|
+ self.assertNotEqual(b'', unused)
|
|
|
self.assertEqual(self.extra, unused + read())
|
|
|
|
|
|
def test_decompress_no_crc32(self):
|
|
@@ -714,9 +711,9 @@ class ReadZlibTests(TestCase):
|
|
|
def _do_decompress_test(self, buffer_size, **kwargs):
|
|
|
unused = read_zlib_chunks(self.read, self.unpacked,
|
|
|
buffer_size=buffer_size, **kwargs)
|
|
|
- self.assertEqual(self.decomp, ''.join(self.unpacked.decomp_chunks))
|
|
|
+ self.assertEqual(self.decomp, b''.join(self.unpacked.decomp_chunks))
|
|
|
self.assertEqual(zlib.crc32(self.comp), self.unpacked.crc32)
|
|
|
- self.assertNotEqual('', unused)
|
|
|
+ self.assertNotEqual(b'', unused)
|
|
|
self.assertEqual(self.extra, unused + self.read())
|
|
|
|
|
|
def test_simple_decompress(self):
|
|
@@ -739,33 +736,31 @@ class ReadZlibTests(TestCase):
|
|
|
|
|
|
def test_decompress_include_comp(self):
|
|
|
self._do_decompress_test(4096, include_comp=True)
|
|
|
- self.assertEqual(self.comp, ''.join(self.unpacked.comp_chunks))
|
|
|
+ self.assertEqual(self.comp, b''.join(self.unpacked.comp_chunks))
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class DeltifyTests(TestCase):
|
|
|
|
|
|
def test_empty(self):
|
|
|
self.assertEqual([], list(deltify_pack_objects([])))
|
|
|
|
|
|
def test_single(self):
|
|
|
- b = Blob.from_string("foo")
|
|
|
+ b = Blob.from_string(b"foo")
|
|
|
self.assertEqual(
|
|
|
[(b.type_num, b.sha().digest(), None, b.as_raw_string())],
|
|
|
- list(deltify_pack_objects([(b, "")])))
|
|
|
+ list(deltify_pack_objects([(b, b"")])))
|
|
|
|
|
|
def test_simple_delta(self):
|
|
|
- b1 = Blob.from_string("a" * 101)
|
|
|
- b2 = Blob.from_string("a" * 100)
|
|
|
+ b1 = Blob.from_string(b"a" * 101)
|
|
|
+ b2 = Blob.from_string(b"a" * 100)
|
|
|
delta = create_delta(b1.as_raw_string(), b2.as_raw_string())
|
|
|
self.assertEqual([
|
|
|
(b1.type_num, b1.sha().digest(), None, b1.as_raw_string()),
|
|
|
(b2.type_num, b2.sha().digest(), b1.sha().digest(), delta)
|
|
|
],
|
|
|
- list(deltify_pack_objects([(b1, ""), (b2, "")])))
|
|
|
+ list(deltify_pack_objects([(b1, b""), (b2, b"")])))
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class TestPackStreamReader(TestCase):
|
|
|
|
|
|
def test_read_objects_emtpy(self):
|
|
@@ -777,9 +772,9 @@ class TestPackStreamReader(TestCase):
|
|
|
def test_read_objects(self):
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (Blob.type_num, 'blob'),
|
|
|
- (OFS_DELTA, (0, 'blob1')),
|
|
|
- ])
|
|
|
+ (Blob.type_num, b'blob'),
|
|
|
+ (OFS_DELTA, (0, b'blob1')),
|
|
|
+ ])
|
|
|
reader = PackStreamReader(f.read)
|
|
|
objects = list(reader.read_objects(compute_crc32=True))
|
|
|
self.assertEqual(2, len(objects))
|
|
@@ -790,7 +785,7 @@ class TestPackStreamReader(TestCase):
|
|
|
self.assertEqual(Blob.type_num, unpacked_blob.pack_type_num)
|
|
|
self.assertEqual(Blob.type_num, unpacked_blob.obj_type_num)
|
|
|
self.assertEqual(None, unpacked_blob.delta_base)
|
|
|
- self.assertEqual('blob', ''.join(unpacked_blob.decomp_chunks))
|
|
|
+ self.assertEqual(b'blob', b''.join(unpacked_blob.decomp_chunks))
|
|
|
self.assertEqual(entries[0][4], unpacked_blob.crc32)
|
|
|
|
|
|
self.assertEqual(entries[1][0], unpacked_delta.offset)
|
|
@@ -798,16 +793,16 @@ class TestPackStreamReader(TestCase):
|
|
|
self.assertEqual(None, unpacked_delta.obj_type_num)
|
|
|
self.assertEqual(unpacked_delta.offset - unpacked_blob.offset,
|
|
|
unpacked_delta.delta_base)
|
|
|
- delta = create_delta('blob', 'blob1')
|
|
|
- self.assertEqual(delta, ''.join(unpacked_delta.decomp_chunks))
|
|
|
+ delta = create_delta(b'blob', b'blob1')
|
|
|
+ self.assertEqual(delta, b''.join(unpacked_delta.decomp_chunks))
|
|
|
self.assertEqual(entries[1][4], unpacked_delta.crc32)
|
|
|
|
|
|
def test_read_objects_buffered(self):
|
|
|
f = BytesIO()
|
|
|
build_pack(f, [
|
|
|
- (Blob.type_num, 'blob'),
|
|
|
- (OFS_DELTA, (0, 'blob1')),
|
|
|
- ])
|
|
|
+ (Blob.type_num, b'blob'),
|
|
|
+ (OFS_DELTA, (0, b'blob1')),
|
|
|
+ ])
|
|
|
reader = PackStreamReader(f.read, zlib_bufsize=4)
|
|
|
self.assertEqual(2, len(list(reader.read_objects())))
|
|
|
|
|
@@ -816,7 +811,6 @@ class TestPackStreamReader(TestCase):
|
|
|
self.assertEqual([], list(reader.read_objects()))
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class TestPackIterator(DeltaChainIterator):
|
|
|
|
|
|
_compute_crc32 = True
|
|
@@ -828,7 +822,7 @@ class TestPackIterator(DeltaChainIterator):
|
|
|
def _result(self, unpacked):
|
|
|
"""Return entries in the same format as build_pack."""
|
|
|
return (unpacked.offset, unpacked.obj_type_num,
|
|
|
- ''.join(unpacked.obj_chunks), unpacked.sha(), unpacked.crc32)
|
|
|
+ b''.join(unpacked.obj_chunks), unpacked.sha(), unpacked.crc32)
|
|
|
|
|
|
def _resolve_object(self, offset, pack_type_num, base_chunks):
|
|
|
assert offset not in self._unpacked_offsets, (
|
|
@@ -838,7 +832,6 @@ class TestPackIterator(DeltaChainIterator):
|
|
|
offset, pack_type_num, base_chunks)
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class DeltaChainIteratorTests(TestCase):
|
|
|
|
|
|
def setUp(self):
|
|
@@ -877,46 +870,46 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
def test_no_deltas(self):
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (Commit.type_num, 'commit'),
|
|
|
- (Blob.type_num, 'blob'),
|
|
|
- (Tree.type_num, 'tree'),
|
|
|
- ])
|
|
|
+ (Commit.type_num, b'commit'),
|
|
|
+ (Blob.type_num, b'blob'),
|
|
|
+ (Tree.type_num, b'tree'),
|
|
|
+ ])
|
|
|
self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
|
|
|
|
|
|
def test_ofs_deltas(self):
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (Blob.type_num, 'blob'),
|
|
|
- (OFS_DELTA, (0, 'blob1')),
|
|
|
- (OFS_DELTA, (0, 'blob2')),
|
|
|
- ])
|
|
|
+ (Blob.type_num, b'blob'),
|
|
|
+ (OFS_DELTA, (0, b'blob1')),
|
|
|
+ (OFS_DELTA, (0, b'blob2')),
|
|
|
+ ])
|
|
|
self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
|
|
|
|
|
|
def test_ofs_deltas_chain(self):
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (Blob.type_num, 'blob'),
|
|
|
- (OFS_DELTA, (0, 'blob1')),
|
|
|
- (OFS_DELTA, (1, 'blob2')),
|
|
|
- ])
|
|
|
+ (Blob.type_num, b'blob'),
|
|
|
+ (OFS_DELTA, (0, b'blob1')),
|
|
|
+ (OFS_DELTA, (1, b'blob2')),
|
|
|
+ ])
|
|
|
self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
|
|
|
|
|
|
def test_ref_deltas(self):
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (REF_DELTA, (1, 'blob1')),
|
|
|
- (Blob.type_num, ('blob')),
|
|
|
- (REF_DELTA, (1, 'blob2')),
|
|
|
- ])
|
|
|
+ (REF_DELTA, (1, b'blob1')),
|
|
|
+ (Blob.type_num, (b'blob')),
|
|
|
+ (REF_DELTA, (1, b'blob2')),
|
|
|
+ ])
|
|
|
self.assertEntriesMatch([1, 0, 2], entries, self.make_pack_iter(f))
|
|
|
|
|
|
def test_ref_deltas_chain(self):
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (REF_DELTA, (2, 'blob1')),
|
|
|
- (Blob.type_num, ('blob')),
|
|
|
- (REF_DELTA, (1, 'blob2')),
|
|
|
- ])
|
|
|
+ (REF_DELTA, (2, b'blob1')),
|
|
|
+ (Blob.type_num, (b'blob')),
|
|
|
+ (REF_DELTA, (1, b'blob2')),
|
|
|
+ ])
|
|
|
self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
|
|
|
|
|
|
def test_ofs_and_ref_deltas(self):
|
|
@@ -924,58 +917,58 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
# this ref.
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (REF_DELTA, (1, 'blob1')),
|
|
|
- (Blob.type_num, ('blob')),
|
|
|
- (OFS_DELTA, (1, 'blob2')),
|
|
|
- ])
|
|
|
+ (REF_DELTA, (1, b'blob1')),
|
|
|
+ (Blob.type_num, (b'blob')),
|
|
|
+ (OFS_DELTA, (1, b'blob2')),
|
|
|
+ ])
|
|
|
self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
|
|
|
|
|
|
def test_mixed_chain(self):
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (Blob.type_num, 'blob'),
|
|
|
- (REF_DELTA, (2, 'blob2')),
|
|
|
- (OFS_DELTA, (0, 'blob1')),
|
|
|
- (OFS_DELTA, (1, 'blob3')),
|
|
|
- (OFS_DELTA, (0, 'bob')),
|
|
|
- ])
|
|
|
+ (Blob.type_num, b'blob'),
|
|
|
+ (REF_DELTA, (2, b'blob2')),
|
|
|
+ (OFS_DELTA, (0, b'blob1')),
|
|
|
+ (OFS_DELTA, (1, b'blob3')),
|
|
|
+ (OFS_DELTA, (0, b'bob')),
|
|
|
+ ])
|
|
|
self.assertEntriesMatch([0, 2, 1, 3, 4], entries,
|
|
|
self.make_pack_iter(f))
|
|
|
|
|
|
def test_long_chain(self):
|
|
|
n = 100
|
|
|
- objects_spec = [(Blob.type_num, 'blob')]
|
|
|
+ objects_spec = [(Blob.type_num, b'blob')]
|
|
|
for i in range(n):
|
|
|
- objects_spec.append((OFS_DELTA, (i, 'blob%i' % i)))
|
|
|
+ objects_spec.append((OFS_DELTA, (i, b'blob' + str(i).encode('ascii'))))
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, objects_spec)
|
|
|
self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f))
|
|
|
|
|
|
def test_branchy_chain(self):
|
|
|
n = 100
|
|
|
- objects_spec = [(Blob.type_num, 'blob')]
|
|
|
+ objects_spec = [(Blob.type_num, b'blob')]
|
|
|
for i in range(n):
|
|
|
- objects_spec.append((OFS_DELTA, (0, 'blob%i' % i)))
|
|
|
+ objects_spec.append((OFS_DELTA, (0, b'blob' + str(i).encode('ascii'))))
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, objects_spec)
|
|
|
self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f))
|
|
|
|
|
|
def test_ext_ref(self):
|
|
|
- blob, = self.store_blobs(['blob'])
|
|
|
+ blob, = self.store_blobs([b'blob'])
|
|
|
f = BytesIO()
|
|
|
- entries = build_pack(f, [(REF_DELTA, (blob.id, 'blob1'))],
|
|
|
+ entries = build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))],
|
|
|
store=self.store)
|
|
|
pack_iter = self.make_pack_iter(f)
|
|
|
self.assertEntriesMatch([0], entries, pack_iter)
|
|
|
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
|
|
|
|
def test_ext_ref_chain(self):
|
|
|
- blob, = self.store_blobs(['blob'])
|
|
|
+ blob, = self.store_blobs([b'blob'])
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (REF_DELTA, (1, 'blob2')),
|
|
|
- (REF_DELTA, (blob.id, 'blob1')),
|
|
|
- ], store=self.store)
|
|
|
+ (REF_DELTA, (1, b'blob2')),
|
|
|
+ (REF_DELTA, (blob.id, b'blob1')),
|
|
|
+ ], store=self.store)
|
|
|
pack_iter = self.make_pack_iter(f)
|
|
|
self.assertEntriesMatch([1, 0], entries, pack_iter)
|
|
|
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
@@ -983,46 +976,46 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
def test_ext_ref_chain_degenerate(self):
|
|
|
# Test a degenerate case where the sender is sending a REF_DELTA
|
|
|
# object that expands to an object already in the repository.
|
|
|
- blob, = self.store_blobs(['blob'])
|
|
|
- blob2, = self.store_blobs(['blob2'])
|
|
|
+ blob, = self.store_blobs([b'blob'])
|
|
|
+ blob2, = self.store_blobs([b'blob2'])
|
|
|
assert blob.id < blob2.id
|
|
|
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (REF_DELTA, (blob.id, 'blob2')),
|
|
|
- (REF_DELTA, (0, 'blob3')),
|
|
|
+ (REF_DELTA, (blob.id, b'blob2')),
|
|
|
+ (REF_DELTA, (0, b'blob3')),
|
|
|
], store=self.store)
|
|
|
pack_iter = self.make_pack_iter(f)
|
|
|
self.assertEntriesMatch([0, 1], entries, pack_iter)
|
|
|
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
|
|
|
|
def test_ext_ref_multiple_times(self):
|
|
|
- blob, = self.store_blobs(['blob'])
|
|
|
+ blob, = self.store_blobs([b'blob'])
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (REF_DELTA, (blob.id, 'blob1')),
|
|
|
- (REF_DELTA, (blob.id, 'blob2')),
|
|
|
- ], store=self.store)
|
|
|
+ (REF_DELTA, (blob.id, b'blob1')),
|
|
|
+ (REF_DELTA, (blob.id, b'blob2')),
|
|
|
+ ], store=self.store)
|
|
|
pack_iter = self.make_pack_iter(f)
|
|
|
self.assertEntriesMatch([0, 1], entries, pack_iter)
|
|
|
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
|
|
|
|
def test_multiple_ext_refs(self):
|
|
|
- b1, b2 = self.store_blobs(['foo', 'bar'])
|
|
|
+ b1, b2 = self.store_blobs([b'foo', b'bar'])
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [
|
|
|
- (REF_DELTA, (b1.id, 'foo1')),
|
|
|
- (REF_DELTA, (b2.id, 'bar2')),
|
|
|
- ], store=self.store)
|
|
|
+ (REF_DELTA, (b1.id, b'foo1')),
|
|
|
+ (REF_DELTA, (b2.id, b'bar2')),
|
|
|
+ ], store=self.store)
|
|
|
pack_iter = self.make_pack_iter(f)
|
|
|
self.assertEntriesMatch([0, 1], entries, pack_iter)
|
|
|
self.assertEqual([hex_to_sha(b1.id), hex_to_sha(b2.id)],
|
|
|
pack_iter.ext_refs())
|
|
|
|
|
|
def test_bad_ext_ref_non_thin_pack(self):
|
|
|
- blob, = self.store_blobs(['blob'])
|
|
|
+ blob, = self.store_blobs([b'blob'])
|
|
|
f = BytesIO()
|
|
|
- build_pack(f, [(REF_DELTA, (blob.id, 'blob1'))],
|
|
|
+ entries = build_pack(f, [(REF_DELTA, (blob.id, b'blob1'))],
|
|
|
store=self.store)
|
|
|
pack_iter = self.make_pack_iter(f, thin=False)
|
|
|
try:
|
|
@@ -1032,13 +1025,13 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
self.assertEqual(([blob.id],), e.args)
|
|
|
|
|
|
def test_bad_ext_ref_thin_pack(self):
|
|
|
- b1, b2, b3 = self.store_blobs(['foo', 'bar', 'baz'])
|
|
|
+ b1, b2, b3 = self.store_blobs([b'foo', b'bar', b'baz'])
|
|
|
f = BytesIO()
|
|
|
build_pack(f, [
|
|
|
- (REF_DELTA, (1, 'foo99')),
|
|
|
- (REF_DELTA, (b1.id, 'foo1')),
|
|
|
- (REF_DELTA, (b2.id, 'bar2')),
|
|
|
- (REF_DELTA, (b3.id, 'baz3')),
|
|
|
+ (REF_DELTA, (1, b'foo99')),
|
|
|
+ (REF_DELTA, (b1.id, b'foo1')),
|
|
|
+ (REF_DELTA, (b2.id, b'bar2')),
|
|
|
+ (REF_DELTA, (b3.id, b'baz3')),
|
|
|
], store=self.store)
|
|
|
del self.store[b2.id]
|
|
|
del self.store[b3.id]
|
|
@@ -1053,17 +1046,17 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
class DeltaEncodeSizeTests(TestCase):
|
|
|
|
|
|
def test_basic(self):
|
|
|
- self.assertEqual('\x00', _delta_encode_size(0))
|
|
|
- self.assertEqual('\x01', _delta_encode_size(1))
|
|
|
- self.assertEqual('\xfa\x01', _delta_encode_size(250))
|
|
|
- self.assertEqual('\xe8\x07', _delta_encode_size(1000))
|
|
|
- self.assertEqual('\xa0\x8d\x06', _delta_encode_size(100000))
|
|
|
+ self.assertEqual(b'\x00', _delta_encode_size(0))
|
|
|
+ self.assertEqual(b'\x01', _delta_encode_size(1))
|
|
|
+ self.assertEqual(b'\xfa\x01', _delta_encode_size(250))
|
|
|
+ self.assertEqual(b'\xe8\x07', _delta_encode_size(1000))
|
|
|
+ self.assertEqual(b'\xa0\x8d\x06', _delta_encode_size(100000))
|
|
|
|
|
|
|
|
|
class EncodeCopyOperationTests(TestCase):
|
|
|
|
|
|
def test_basic(self):
|
|
|
- self.assertEqual('\x80', _encode_copy_operation(0, 0))
|
|
|
- self.assertEqual('\x91\x01\x0a', _encode_copy_operation(1, 10))
|
|
|
- self.assertEqual('\xb1\x64\xe8\x03', _encode_copy_operation(100, 1000))
|
|
|
- self.assertEqual('\x93\xe8\x03\x01', _encode_copy_operation(1000, 1))
|
|
|
+ self.assertEqual(b'\x80', _encode_copy_operation(0, 0))
|
|
|
+ self.assertEqual(b'\x91\x01\x0a', _encode_copy_operation(1, 10))
|
|
|
+ self.assertEqual(b'\xb1\x64\xe8\x03', _encode_copy_operation(100, 1000))
|
|
|
+ self.assertEqual(b'\x93\xe8\x03\x01', _encode_copy_operation(1000, 1))
|