|
@@ -28,6 +28,7 @@ import tempfile
|
|
|
import zlib
|
|
|
from hashlib import sha1
|
|
|
from io import BytesIO
|
|
|
+from typing import NoReturn
|
|
|
|
|
|
from dulwich.errors import ApplyDeltaError, ChecksumMismatch
|
|
|
from dulwich.file import GitFile
|
|
@@ -73,7 +74,7 @@ indexmode = "0o100644" if sys.platform != "win32" else "0o100666"
|
|
|
class PackTests(TestCase):
|
|
|
"""Base class for testing packs."""
|
|
|
|
|
|
- def setUp(self):
|
|
|
+ def setUp(self) -> None:
|
|
|
super().setUp()
|
|
|
self.tempdir = tempfile.mkdtemp()
|
|
|
self.addCleanup(shutil.rmtree, self.tempdir)
|
|
@@ -97,7 +98,7 @@ class PackTests(TestCase):
|
|
|
def get_pack(self, sha):
|
|
|
return Pack(os.path.join(self.datadir, "pack-{}".format(sha.decode("ascii"))))
|
|
|
|
|
|
- def assertSucceeds(self, func, *args, **kwargs):
|
|
|
+ def assertSucceeds(self, func, *args, **kwargs) -> None:
|
|
|
try:
|
|
|
func(*args, **kwargs)
|
|
|
except ChecksumMismatch as e:
|
|
@@ -107,7 +108,7 @@ class PackTests(TestCase):
|
|
|
class PackIndexTests(PackTests):
|
|
|
"""Class that tests the index of packfiles."""
|
|
|
|
|
|
- def test_object_offset(self):
|
|
|
+ def test_object_offset(self) -> None:
|
|
|
"""Tests that the correct object offset is returned from the index."""
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
self.assertRaises(KeyError, p.object_offset, pack1_sha)
|
|
@@ -115,7 +116,7 @@ class PackIndexTests(PackTests):
|
|
|
self.assertEqual(p.object_offset(tree_sha), 138)
|
|
|
self.assertEqual(p.object_offset(commit_sha), 12)
|
|
|
|
|
|
- def test_object_sha1(self):
|
|
|
+ def test_object_sha1(self) -> None:
|
|
|
"""Tests that the correct object offset is returned from the index."""
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
self.assertRaises(KeyError, p.object_sha1, 876)
|
|
@@ -123,7 +124,7 @@ class PackIndexTests(PackTests):
|
|
|
self.assertEqual(p.object_sha1(138), hex_to_sha(tree_sha))
|
|
|
self.assertEqual(p.object_sha1(12), hex_to_sha(commit_sha))
|
|
|
|
|
|
- def test_iter_prefix(self):
|
|
|
+ def test_iter_prefix(self) -> None:
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
self.assertEqual([p.object_sha1(178)], list(p.iter_prefix(hex_to_sha(a_sha))))
|
|
|
self.assertEqual(
|
|
@@ -133,11 +134,11 @@ class PackIndexTests(PackTests):
|
|
|
[p.object_sha1(178)], list(p.iter_prefix(hex_to_sha(a_sha)[:2]))
|
|
|
)
|
|
|
|
|
|
- def test_index_len(self):
|
|
|
+ def test_index_len(self) -> None:
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
self.assertEqual(3, len(p))
|
|
|
|
|
|
- def test_get_stored_checksum(self):
|
|
|
+ def test_get_stored_checksum(self) -> None:
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
self.assertEqual(
|
|
|
b"f2848e2ad16f329ae1c92e3b95e91888daa5bd01",
|
|
@@ -148,11 +149,11 @@ class PackIndexTests(PackTests):
|
|
|
sha_to_hex(p.get_pack_checksum()),
|
|
|
)
|
|
|
|
|
|
- def test_index_check(self):
|
|
|
+ def test_index_check(self) -> None:
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
self.assertSucceeds(p.check)
|
|
|
|
|
|
- def test_iterentries(self):
|
|
|
+ def test_iterentries(self) -> None:
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
entries = [(sha_to_hex(s), o, c) for s, o, c in p.iterentries()]
|
|
|
self.assertEqual(
|
|
@@ -164,7 +165,7 @@ class PackIndexTests(PackTests):
|
|
|
entries,
|
|
|
)
|
|
|
|
|
|
- def test_iter(self):
|
|
|
+ def test_iter(self) -> None:
|
|
|
p = self.get_pack_index(pack1_sha)
|
|
|
self.assertEqual({tree_sha, commit_sha, a_sha}, set(p))
|
|
|
|
|
@@ -178,36 +179,36 @@ class TestPackDeltas(TestCase):
|
|
|
test_string_big = b"Z" * 8192
|
|
|
test_string_huge = b"Z" * 100000
|
|
|
|
|
|
- def _test_roundtrip(self, base, target):
|
|
|
+ def _test_roundtrip(self, base, target) -> None:
|
|
|
self.assertEqual(
|
|
|
target, b"".join(apply_delta(base, list(create_delta(base, target))))
|
|
|
)
|
|
|
|
|
|
- def test_nochange(self):
|
|
|
+ def test_nochange(self) -> None:
|
|
|
self._test_roundtrip(self.test_string1, self.test_string1)
|
|
|
|
|
|
- def test_nochange_huge(self):
|
|
|
+ def test_nochange_huge(self) -> None:
|
|
|
self._test_roundtrip(self.test_string_huge, self.test_string_huge)
|
|
|
|
|
|
- def test_change(self):
|
|
|
+ def test_change(self) -> None:
|
|
|
self._test_roundtrip(self.test_string1, self.test_string2)
|
|
|
|
|
|
- def test_rewrite(self):
|
|
|
+ def test_rewrite(self) -> None:
|
|
|
self._test_roundtrip(self.test_string1, self.test_string3)
|
|
|
|
|
|
- def test_empty_to_big(self):
|
|
|
+ def test_empty_to_big(self) -> None:
|
|
|
self._test_roundtrip(self.test_string_empty, self.test_string_big)
|
|
|
|
|
|
- def test_empty_to_huge(self):
|
|
|
+ def test_empty_to_huge(self) -> None:
|
|
|
self._test_roundtrip(self.test_string_empty, self.test_string_huge)
|
|
|
|
|
|
- def test_huge_copy(self):
|
|
|
+ def test_huge_copy(self) -> None:
|
|
|
self._test_roundtrip(
|
|
|
self.test_string_huge + self.test_string1,
|
|
|
self.test_string_huge + self.test_string2,
|
|
|
)
|
|
|
|
|
|
- def test_dest_overflow(self):
|
|
|
+ def test_dest_overflow(self) -> None:
|
|
|
self.assertRaises(
|
|
|
ApplyDeltaError,
|
|
|
apply_delta,
|
|
@@ -218,7 +219,7 @@ class TestPackDeltas(TestCase):
|
|
|
ApplyDeltaError, apply_delta, b"", b"\x00\x80\x02\xb0\x11\x11"
|
|
|
)
|
|
|
|
|
|
- def test_pypy_issue(self):
|
|
|
+ def test_pypy_issue(self) -> None:
|
|
|
# Test for https://github.com/jelmer/dulwich/issues/509 /
|
|
|
# https://bitbucket.org/pypy/pypy/issues/2499/cpyext-pystring_asstring-doesnt-work
|
|
|
chunks = [
|
|
@@ -261,25 +262,25 @@ class TestPackDeltas(TestCase):
|
|
|
class TestPackData(PackTests):
|
|
|
"""Tests getting the data from the packfile."""
|
|
|
|
|
|
- def test_create_pack(self):
|
|
|
+ def test_create_pack(self) -> None:
|
|
|
self.get_pack_data(pack1_sha).close()
|
|
|
|
|
|
- def test_from_file(self):
|
|
|
+ def test_from_file(self) -> None:
|
|
|
path = os.path.join(
|
|
|
self.datadir, "pack-{}.pack".format(pack1_sha.decode("ascii"))
|
|
|
)
|
|
|
with open(path, "rb") as f:
|
|
|
PackData.from_file(f, os.path.getsize(path))
|
|
|
|
|
|
- def test_pack_len(self):
|
|
|
+ def test_pack_len(self) -> None:
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
|
self.assertEqual(3, len(p))
|
|
|
|
|
|
- def test_index_check(self):
|
|
|
+ def test_index_check(self) -> None:
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
|
self.assertSucceeds(p.check)
|
|
|
|
|
|
- def test_iter_unpacked(self):
|
|
|
+ def test_iter_unpacked(self) -> None:
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
|
commit_data = (
|
|
|
b"tree b2a2766a2879c209ab1176e7e778b81ae422eeaa\n"
|
|
@@ -317,7 +318,7 @@ class TestPackData(PackTests):
|
|
|
actual,
|
|
|
)
|
|
|
|
|
|
- def test_iterentries(self):
|
|
|
+ def test_iterentries(self) -> None:
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
|
entries = {(sha_to_hex(s), o, c) for s, o, c in p.iterentries()}
|
|
|
self.assertEqual(
|
|
@@ -341,7 +342,7 @@ class TestPackData(PackTests):
|
|
|
entries,
|
|
|
)
|
|
|
|
|
|
- def test_create_index_v1(self):
|
|
|
+ def test_create_index_v1(self) -> None:
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
|
filename = os.path.join(self.tempdir, "v1test.idx")
|
|
|
p.create_index_v1(filename)
|
|
@@ -350,7 +351,7 @@ class TestPackData(PackTests):
|
|
|
self.assertEqual(oct(os.stat(filename).st_mode), indexmode)
|
|
|
self.assertEqual(idx1, idx2)
|
|
|
|
|
|
- def test_create_index_v2(self):
|
|
|
+ def test_create_index_v2(self) -> None:
|
|
|
with self.get_pack_data(pack1_sha) as p:
|
|
|
filename = os.path.join(self.tempdir, "v2test.idx")
|
|
|
p.create_index_v2(filename)
|
|
@@ -359,7 +360,7 @@ class TestPackData(PackTests):
|
|
|
self.assertEqual(oct(os.stat(filename).st_mode), indexmode)
|
|
|
self.assertEqual(idx1, idx2)
|
|
|
|
|
|
- def test_compute_file_sha(self):
|
|
|
+ def test_compute_file_sha(self) -> None:
|
|
|
f = BytesIO(b"abcd1234wxyz")
|
|
|
self.assertEqual(
|
|
|
sha1(b"abcd1234wxyz").hexdigest(), compute_file_sha(f).hexdigest()
|
|
@@ -381,7 +382,7 @@ class TestPackData(PackTests):
|
|
|
compute_file_sha(f, start_ofs=4, end_ofs=-4).hexdigest(),
|
|
|
)
|
|
|
|
|
|
- def test_compute_file_sha_short_file(self):
|
|
|
+ def test_compute_file_sha_short_file(self) -> None:
|
|
|
f = BytesIO(b"abcd1234wxyz")
|
|
|
self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=-20)
|
|
|
self.assertRaises(AssertionError, compute_file_sha, f, end_ofs=20)
|
|
@@ -391,28 +392,28 @@ class TestPackData(PackTests):
|
|
|
|
|
|
|
|
|
class TestPack(PackTests):
|
|
|
- def test_len(self):
|
|
|
+ def test_len(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
self.assertEqual(3, len(p))
|
|
|
|
|
|
- def test_contains(self):
|
|
|
+ def test_contains(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
self.assertIn(tree_sha, p)
|
|
|
|
|
|
- def test_get(self):
|
|
|
+ def test_get(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
self.assertEqual(type(p[tree_sha]), Tree)
|
|
|
|
|
|
- def test_iter(self):
|
|
|
+ def test_iter(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
self.assertEqual({tree_sha, commit_sha, a_sha}, set(p))
|
|
|
|
|
|
- def test_iterobjects(self):
|
|
|
+ def test_iterobjects(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
expected = {p[s] for s in [commit_sha, tree_sha, a_sha]}
|
|
|
self.assertEqual(expected, set(list(p.iterobjects())))
|
|
|
|
|
|
- def test_pack_tuples(self):
|
|
|
+ def test_pack_tuples(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
tuples = p.pack_tuples()
|
|
|
expected = {(p[s], None) for s in [commit_sha, tree_sha, a_sha]}
|
|
@@ -420,7 +421,7 @@ class TestPack(PackTests):
|
|
|
self.assertEqual(expected, set(list(tuples)))
|
|
|
self.assertEqual(3, len(tuples))
|
|
|
|
|
|
- def test_get_object_at(self):
|
|
|
+ def test_get_object_at(self) -> None:
|
|
|
"""Tests random access for non-delta objects."""
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
obj = p[a_sha]
|
|
@@ -433,7 +434,7 @@ class TestPack(PackTests):
|
|
|
self.assertEqual(obj.type_name, b"commit")
|
|
|
self.assertEqual(obj.sha().hexdigest().encode("ascii"), commit_sha)
|
|
|
|
|
|
- def test_copy(self):
|
|
|
+ def test_copy(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as origpack:
|
|
|
self.assertSucceeds(origpack.index.check)
|
|
|
basename = os.path.join(self.tempdir, "Elch")
|
|
@@ -453,7 +454,7 @@ class TestPack(PackTests):
|
|
|
new_checksum = newpack.index.get_stored_checksum()
|
|
|
self.assertTrue(wrong_version or orig_checksum == new_checksum)
|
|
|
|
|
|
- def test_commit_obj(self):
|
|
|
+ def test_commit_obj(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
commit = p[commit_sha]
|
|
|
self.assertEqual(b"James Westby <jw+debian@jameswestby.net>", commit.author)
|
|
@@ -464,7 +465,7 @@ class TestPack(PackTests):
|
|
|
write_pack(basename, origpack.pack_tuples())
|
|
|
return Pack(basename)
|
|
|
|
|
|
- def test_keep_no_message(self):
|
|
|
+ def test_keep_no_message(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
p = self._copy_pack(p)
|
|
|
|
|
@@ -478,7 +479,7 @@ class TestPack(PackTests):
|
|
|
buf = f.read()
|
|
|
self.assertEqual("", buf)
|
|
|
|
|
|
- def test_keep_message(self):
|
|
|
+ def test_keep_message(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
p = self._copy_pack(p)
|
|
|
|
|
@@ -494,11 +495,11 @@ class TestPack(PackTests):
|
|
|
buf = f.read()
|
|
|
self.assertEqual(msg + b"\n", buf)
|
|
|
|
|
|
- def test_name(self):
|
|
|
+ def test_name(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
self.assertEqual(pack1_sha, p.name())
|
|
|
|
|
|
- def test_length_mismatch(self):
|
|
|
+ def test_length_mismatch(self) -> None:
|
|
|
with self.get_pack_data(pack1_sha) as data:
|
|
|
index = self.get_pack_index(pack1_sha)
|
|
|
Pack.from_objects(data, index).check_length_and_checksum()
|
|
@@ -513,7 +514,7 @@ class TestPack(PackTests):
|
|
|
self.assertRaises(AssertionError, lambda: bad_pack.data)
|
|
|
self.assertRaises(AssertionError, bad_pack.check_length_and_checksum)
|
|
|
|
|
|
- def test_checksum_mismatch(self):
|
|
|
+ def test_checksum_mismatch(self) -> None:
|
|
|
with self.get_pack_data(pack1_sha) as data:
|
|
|
index = self.get_pack_index(pack1_sha)
|
|
|
Pack.from_objects(data, index).check_length_and_checksum()
|
|
@@ -525,7 +526,7 @@ class TestPack(PackTests):
|
|
|
self.assertRaises(ChecksumMismatch, lambda: bad_pack.data)
|
|
|
self.assertRaises(ChecksumMismatch, bad_pack.check_length_and_checksum)
|
|
|
|
|
|
- def test_iterobjects_2(self):
|
|
|
+ def test_iterobjects_2(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
objs = {o.id: o for o in p.iterobjects()}
|
|
|
self.assertEqual(3, len(objs))
|
|
@@ -534,7 +535,7 @@ class TestPack(PackTests):
|
|
|
self.assertIsInstance(objs[tree_sha], Tree)
|
|
|
self.assertIsInstance(objs[commit_sha], Commit)
|
|
|
|
|
|
- def test_iterobjects_subset(self):
|
|
|
+ def test_iterobjects_subset(self) -> None:
|
|
|
with self.get_pack(pack1_sha) as p:
|
|
|
objs = {o.id: o for o in p.iterobjects_subset([commit_sha])}
|
|
|
self.assertEqual(1, len(objs))
|
|
@@ -542,7 +543,7 @@ class TestPack(PackTests):
|
|
|
|
|
|
|
|
|
class TestThinPack(PackTests):
|
|
|
- def setUp(self):
|
|
|
+ def setUp(self) -> None:
|
|
|
super().setUp()
|
|
|
self.store = MemoryObjectStore()
|
|
|
self.blobs = {}
|
|
@@ -583,13 +584,13 @@ class TestThinPack(PackTests):
|
|
|
resolve_ext_ref=self.store.get_raw if resolve_ext_ref else None,
|
|
|
)
|
|
|
|
|
|
- def test_get_raw(self):
|
|
|
+ def test_get_raw(self) -> None:
|
|
|
with self.make_pack(False) as p:
|
|
|
self.assertRaises(KeyError, p.get_raw, self.blobs[b"foo1234"].id)
|
|
|
with self.make_pack(True) as p:
|
|
|
self.assertEqual((3, b"foo1234"), p.get_raw(self.blobs[b"foo1234"].id))
|
|
|
|
|
|
- def test_get_unpacked_object(self):
|
|
|
+ def test_get_unpacked_object(self) -> None:
|
|
|
self.maxDiff = None
|
|
|
with self.make_pack(False) as p:
|
|
|
expected = UnpackedObject(
|
|
@@ -613,7 +614,7 @@ class TestThinPack(PackTests):
|
|
|
got,
|
|
|
)
|
|
|
|
|
|
- def test_iterobjects(self):
|
|
|
+ def test_iterobjects(self) -> None:
|
|
|
with self.make_pack(False) as p:
|
|
|
self.assertRaises(UnresolvedDeltas, list, p.iterobjects())
|
|
|
with self.make_pack(True) as p:
|
|
@@ -630,12 +631,12 @@ class TestThinPack(PackTests):
|
|
|
|
|
|
|
|
|
class WritePackTests(TestCase):
|
|
|
- def test_write_pack_header(self):
|
|
|
+ def test_write_pack_header(self) -> None:
|
|
|
f = BytesIO()
|
|
|
write_pack_header(f.write, 42)
|
|
|
self.assertEqual(b"PACK\x00\x00\x00\x02\x00\x00\x00*", f.getvalue())
|
|
|
|
|
|
- def test_write_pack_object(self):
|
|
|
+ def test_write_pack_object(self) -> None:
|
|
|
f = BytesIO()
|
|
|
f.write(b"header")
|
|
|
offset = f.tell()
|
|
@@ -651,7 +652,7 @@ class WritePackTests(TestCase):
|
|
|
self.assertEqual(crc32, unpacked.crc32)
|
|
|
self.assertEqual(b"x", unused)
|
|
|
|
|
|
- def test_write_pack_object_sha(self):
|
|
|
+ def test_write_pack_object_sha(self) -> None:
|
|
|
f = BytesIO()
|
|
|
f.write(b"header")
|
|
|
offset = f.tell()
|
|
@@ -662,7 +663,7 @@ class WritePackTests(TestCase):
|
|
|
sha_b.update(f.getvalue()[offset:])
|
|
|
self.assertEqual(sha_a.digest(), sha_b.digest())
|
|
|
|
|
|
- def test_write_pack_object_compression_level(self):
|
|
|
+ def test_write_pack_object_compression_level(self) -> None:
|
|
|
f = BytesIO()
|
|
|
f.write(b"header")
|
|
|
offset = f.tell()
|
|
@@ -680,21 +681,21 @@ pack_checksum = hex_to_sha("721980e866af9a5f93ad674144e1459b8ba3e7b7")
|
|
|
|
|
|
|
|
|
class BaseTestPackIndexWriting:
|
|
|
- def assertSucceeds(self, func, *args, **kwargs):
|
|
|
+ def assertSucceeds(self, func, *args, **kwargs) -> None:
|
|
|
try:
|
|
|
func(*args, **kwargs)
|
|
|
except ChecksumMismatch as e:
|
|
|
self.fail(e)
|
|
|
|
|
|
- def index(self, filename, entries, pack_checksum):
|
|
|
+ def index(self, filename, entries, pack_checksum) -> NoReturn:
|
|
|
raise NotImplementedError(self.index)
|
|
|
|
|
|
- def test_empty(self):
|
|
|
+ def test_empty(self) -> None:
|
|
|
idx = self.index("empty.idx", [], pack_checksum)
|
|
|
self.assertEqual(idx.get_pack_checksum(), pack_checksum)
|
|
|
self.assertEqual(0, len(idx))
|
|
|
|
|
|
- def test_large(self):
|
|
|
+ def test_large(self) -> None:
|
|
|
entry1_sha = hex_to_sha("4e6388232ec39792661e2e75db8fb117fc869ce6")
|
|
|
entry2_sha = hex_to_sha("e98f071751bd77f59967bfa671cd2caebdccc9a2")
|
|
|
entries = [
|
|
@@ -721,7 +722,7 @@ class BaseTestPackIndexWriting:
|
|
|
else:
|
|
|
self.assertIsNone(actual_crc)
|
|
|
|
|
|
- def test_single(self):
|
|
|
+ def test_single(self) -> None:
|
|
|
entry_sha = hex_to_sha("6f670c0fb53f9463760b7295fbb814e965fb20c8")
|
|
|
my_entries = [(entry_sha, 178, 42)]
|
|
|
idx = self.index("single.idx", my_entries, pack_checksum)
|
|
@@ -741,10 +742,10 @@ class BaseTestPackIndexWriting:
|
|
|
|
|
|
|
|
|
class BaseTestFilePackIndexWriting(BaseTestPackIndexWriting):
|
|
|
- def setUp(self):
|
|
|
+ def setUp(self) -> None:
|
|
|
self.tempdir = tempfile.mkdtemp()
|
|
|
|
|
|
- def tearDown(self):
|
|
|
+ def tearDown(self) -> None:
|
|
|
shutil.rmtree(self.tempdir)
|
|
|
|
|
|
def index(self, filename, entries, pack_checksum):
|
|
@@ -755,14 +756,14 @@ class BaseTestFilePackIndexWriting(BaseTestPackIndexWriting):
|
|
|
self.assertEqual(idx.version, self._expected_version)
|
|
|
return idx
|
|
|
|
|
|
- def writeIndex(self, filename, entries, pack_checksum):
|
|
|
+ def writeIndex(self, filename, entries, pack_checksum) -> None:
|
|
|
# FIXME: Write to BytesIO instead rather than hitting disk ?
|
|
|
with GitFile(filename, "wb") as f:
|
|
|
self._write_fn(f, entries, pack_checksum)
|
|
|
|
|
|
|
|
|
class TestMemoryIndexWriting(TestCase, BaseTestPackIndexWriting):
|
|
|
- def setUp(self):
|
|
|
+ def setUp(self) -> None:
|
|
|
TestCase.setUp(self)
|
|
|
self._has_crc32_checksum = True
|
|
|
self._supports_large = True
|
|
@@ -770,12 +771,12 @@ class TestMemoryIndexWriting(TestCase, BaseTestPackIndexWriting):
|
|
|
def index(self, filename, entries, pack_checksum):
|
|
|
return MemoryPackIndex(entries, pack_checksum)
|
|
|
|
|
|
- def tearDown(self):
|
|
|
+ def tearDown(self) -> None:
|
|
|
TestCase.tearDown(self)
|
|
|
|
|
|
|
|
|
class TestPackIndexWritingv1(TestCase, BaseTestFilePackIndexWriting):
|
|
|
- def setUp(self):
|
|
|
+ def setUp(self) -> None:
|
|
|
TestCase.setUp(self)
|
|
|
BaseTestFilePackIndexWriting.setUp(self)
|
|
|
self._has_crc32_checksum = False
|
|
@@ -783,13 +784,13 @@ class TestPackIndexWritingv1(TestCase, BaseTestFilePackIndexWriting):
|
|
|
self._supports_large = False
|
|
|
self._write_fn = write_pack_index_v1
|
|
|
|
|
|
- def tearDown(self):
|
|
|
+ def tearDown(self) -> None:
|
|
|
TestCase.tearDown(self)
|
|
|
BaseTestFilePackIndexWriting.tearDown(self)
|
|
|
|
|
|
|
|
|
class TestPackIndexWritingv2(TestCase, BaseTestFilePackIndexWriting):
|
|
|
- def setUp(self):
|
|
|
+ def setUp(self) -> None:
|
|
|
TestCase.setUp(self)
|
|
|
BaseTestFilePackIndexWriting.setUp(self)
|
|
|
self._has_crc32_checksum = True
|
|
@@ -797,7 +798,7 @@ class TestPackIndexWritingv2(TestCase, BaseTestFilePackIndexWriting):
|
|
|
self._expected_version = 2
|
|
|
self._write_fn = write_pack_index_v2
|
|
|
|
|
|
- def tearDown(self):
|
|
|
+ def tearDown(self) -> None:
|
|
|
TestCase.tearDown(self)
|
|
|
BaseTestFilePackIndexWriting.tearDown(self)
|
|
|
|
|
@@ -814,14 +815,14 @@ class ReadZlibTests(TestCase):
|
|
|
comp = zlib.compress(decomp)
|
|
|
extra = b"nextobject"
|
|
|
|
|
|
- def setUp(self):
|
|
|
+ def setUp(self) -> None:
|
|
|
super().setUp()
|
|
|
self.read = BytesIO(self.comp + self.extra).read
|
|
|
self.unpacked = UnpackedObject(
|
|
|
Tree.type_num, decomp_len=len(self.decomp), crc32=0
|
|
|
)
|
|
|
|
|
|
- def test_decompress_size(self):
|
|
|
+ def test_decompress_size(self) -> None:
|
|
|
good_decomp_len = len(self.decomp)
|
|
|
self.unpacked.decomp_len = -1
|
|
|
self.assertRaises(ValueError, read_zlib_chunks, self.read, self.unpacked)
|
|
@@ -830,14 +831,14 @@ class ReadZlibTests(TestCase):
|
|
|
self.unpacked.decomp_len = good_decomp_len + 1
|
|
|
self.assertRaises(zlib.error, read_zlib_chunks, self.read, self.unpacked)
|
|
|
|
|
|
- def test_decompress_truncated(self):
|
|
|
+ def test_decompress_truncated(self) -> None:
|
|
|
read = BytesIO(self.comp[:10]).read
|
|
|
self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked)
|
|
|
|
|
|
read = BytesIO(self.comp).read
|
|
|
self.assertRaises(zlib.error, read_zlib_chunks, read, self.unpacked)
|
|
|
|
|
|
- def test_decompress_empty(self):
|
|
|
+ def test_decompress_empty(self) -> None:
|
|
|
unpacked = UnpackedObject(Tree.type_num, decomp_len=0)
|
|
|
comp = zlib.compress(b"")
|
|
|
read = BytesIO(comp + self.extra).read
|
|
@@ -846,12 +847,12 @@ class ReadZlibTests(TestCase):
|
|
|
self.assertNotEqual(b"", unused)
|
|
|
self.assertEqual(self.extra, unused + read())
|
|
|
|
|
|
- def test_decompress_no_crc32(self):
|
|
|
+ def test_decompress_no_crc32(self) -> None:
|
|
|
self.unpacked.crc32 = None
|
|
|
read_zlib_chunks(self.read, self.unpacked)
|
|
|
self.assertEqual(None, self.unpacked.crc32)
|
|
|
|
|
|
- def _do_decompress_test(self, buffer_size, **kwargs):
|
|
|
+ def _do_decompress_test(self, buffer_size, **kwargs) -> None:
|
|
|
unused = read_zlib_chunks(
|
|
|
self.read, self.unpacked, buffer_size=buffer_size, **kwargs
|
|
|
)
|
|
@@ -860,34 +861,34 @@ class ReadZlibTests(TestCase):
|
|
|
self.assertNotEqual(b"", unused)
|
|
|
self.assertEqual(self.extra, unused + self.read())
|
|
|
|
|
|
- def test_simple_decompress(self):
|
|
|
+ def test_simple_decompress(self) -> None:
|
|
|
self._do_decompress_test(4096)
|
|
|
self.assertEqual(None, self.unpacked.comp_chunks)
|
|
|
|
|
|
# These buffer sizes are not intended to be realistic, but rather simulate
|
|
|
# larger buffer sizes that may end at various places.
|
|
|
- def test_decompress_buffer_size_1(self):
|
|
|
+ def test_decompress_buffer_size_1(self) -> None:
|
|
|
self._do_decompress_test(1)
|
|
|
|
|
|
- def test_decompress_buffer_size_2(self):
|
|
|
+ def test_decompress_buffer_size_2(self) -> None:
|
|
|
self._do_decompress_test(2)
|
|
|
|
|
|
- def test_decompress_buffer_size_3(self):
|
|
|
+ def test_decompress_buffer_size_3(self) -> None:
|
|
|
self._do_decompress_test(3)
|
|
|
|
|
|
- def test_decompress_buffer_size_4(self):
|
|
|
+ def test_decompress_buffer_size_4(self) -> None:
|
|
|
self._do_decompress_test(4)
|
|
|
|
|
|
- def test_decompress_include_comp(self):
|
|
|
+ def test_decompress_include_comp(self) -> None:
|
|
|
self._do_decompress_test(4096, include_comp=True)
|
|
|
self.assertEqual(self.comp, b"".join(self.unpacked.comp_chunks))
|
|
|
|
|
|
|
|
|
class DeltifyTests(TestCase):
|
|
|
- def test_empty(self):
|
|
|
+ def test_empty(self) -> None:
|
|
|
self.assertEqual([], list(deltify_pack_objects([])))
|
|
|
|
|
|
- def test_single(self):
|
|
|
+ def test_single(self) -> None:
|
|
|
b = Blob.from_string(b"foo")
|
|
|
self.assertEqual(
|
|
|
[
|
|
@@ -901,7 +902,7 @@ class DeltifyTests(TestCase):
|
|
|
list(deltify_pack_objects([(b, b"")])),
|
|
|
)
|
|
|
|
|
|
- def test_simple_delta(self):
|
|
|
+ def test_simple_delta(self) -> None:
|
|
|
b1 = Blob.from_string(b"a" * 101)
|
|
|
b2 = Blob.from_string(b"a" * 100)
|
|
|
delta = list(create_delta(b1.as_raw_chunks(), b2.as_raw_chunks()))
|
|
@@ -925,13 +926,13 @@ class DeltifyTests(TestCase):
|
|
|
|
|
|
|
|
|
class TestPackStreamReader(TestCase):
|
|
|
- def test_read_objects_emtpy(self):
|
|
|
+ def test_read_objects_emtpy(self) -> None:
|
|
|
f = BytesIO()
|
|
|
build_pack(f, [])
|
|
|
reader = PackStreamReader(f.read)
|
|
|
self.assertEqual(0, len(list(reader.read_objects())))
|
|
|
|
|
|
- def test_read_objects(self):
|
|
|
+ def test_read_objects(self) -> None:
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(
|
|
|
f,
|
|
@@ -964,7 +965,7 @@ class TestPackStreamReader(TestCase):
|
|
|
self.assertEqual(b"".join(delta), b"".join(unpacked_delta.decomp_chunks))
|
|
|
self.assertEqual(entries[1][4], unpacked_delta.crc32)
|
|
|
|
|
|
- def test_read_objects_buffered(self):
|
|
|
+ def test_read_objects_buffered(self) -> None:
|
|
|
f = BytesIO()
|
|
|
build_pack(
|
|
|
f,
|
|
@@ -976,7 +977,7 @@ class TestPackStreamReader(TestCase):
|
|
|
reader = PackStreamReader(f.read, zlib_bufsize=4)
|
|
|
self.assertEqual(2, len(list(reader.read_objects())))
|
|
|
|
|
|
- def test_read_objects_empty(self):
|
|
|
+ def test_read_objects_empty(self) -> None:
|
|
|
reader = PackStreamReader(BytesIO().read)
|
|
|
self.assertRaises(AssertionError, list, reader.read_objects())
|
|
|
|
|
@@ -1007,7 +1008,7 @@ class TestPackIterator(DeltaChainIterator):
|
|
|
|
|
|
|
|
|
class DeltaChainIteratorTests(TestCase):
|
|
|
- def setUp(self):
|
|
|
+ def setUp(self) -> None:
|
|
|
super().setUp()
|
|
|
self.store = MemoryObjectStore()
|
|
|
self.fetched = set()
|
|
@@ -1048,11 +1049,11 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
pack, subset, resolve_ext_ref=resolve_ext_ref
|
|
|
)
|
|
|
|
|
|
- def assertEntriesMatch(self, expected_indexes, entries, pack_iter):
|
|
|
+ def assertEntriesMatch(self, expected_indexes, entries, pack_iter) -> None:
|
|
|
expected = [entries[i] for i in expected_indexes]
|
|
|
self.assertEqual(expected, list(pack_iter._walk_all_chains()))
|
|
|
|
|
|
- def test_no_deltas(self):
|
|
|
+ def test_no_deltas(self) -> None:
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(
|
|
|
f,
|
|
@@ -1080,7 +1081,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
),
|
|
|
)
|
|
|
|
|
|
- def test_ofs_deltas(self):
|
|
|
+ def test_ofs_deltas(self) -> None:
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(
|
|
|
f,
|
|
@@ -1099,7 +1100,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
self.make_pack_iter_subset(f, [entries[1][3], entries[2][3]]),
|
|
|
)
|
|
|
|
|
|
- def test_ofs_deltas_chain(self):
|
|
|
+ def test_ofs_deltas_chain(self) -> None:
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(
|
|
|
f,
|
|
@@ -1111,7 +1112,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
)
|
|
|
self.assertEntriesMatch([0, 1, 2], entries, self.make_pack_iter(f))
|
|
|
|
|
|
- def test_ref_deltas(self):
|
|
|
+ def test_ref_deltas(self) -> None:
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(
|
|
|
f,
|
|
@@ -1124,7 +1125,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
# Delta resolution changed to DFS
|
|
|
self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
|
|
|
|
|
|
- def test_ref_deltas_chain(self):
|
|
|
+ def test_ref_deltas_chain(self) -> None:
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(
|
|
|
f,
|
|
@@ -1136,7 +1137,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
)
|
|
|
self.assertEntriesMatch([1, 2, 0], entries, self.make_pack_iter(f))
|
|
|
|
|
|
- def test_ofs_and_ref_deltas(self):
|
|
|
+ def test_ofs_and_ref_deltas(self) -> None:
|
|
|
# Deltas pending on this offset are popped before deltas depending on
|
|
|
# this ref.
|
|
|
f = BytesIO()
|
|
@@ -1152,7 +1153,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
# Delta resolution changed to DFS
|
|
|
self.assertEntriesMatch([1, 0, 2], entries, self.make_pack_iter(f))
|
|
|
|
|
|
- def test_mixed_chain(self):
|
|
|
+ def test_mixed_chain(self) -> None:
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(
|
|
|
f,
|
|
@@ -1167,7 +1168,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
# Delta resolution changed to DFS
|
|
|
self.assertEntriesMatch([0, 4, 2, 1, 3], entries, self.make_pack_iter(f))
|
|
|
|
|
|
- def test_long_chain(self):
|
|
|
+ def test_long_chain(self) -> None:
|
|
|
n = 100
|
|
|
objects_spec = [(Blob.type_num, b"blob")]
|
|
|
for i in range(n):
|
|
@@ -1176,7 +1177,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
entries = build_pack(f, objects_spec)
|
|
|
self.assertEntriesMatch(range(n + 1), entries, self.make_pack_iter(f))
|
|
|
|
|
|
- def test_branchy_chain(self):
|
|
|
+ def test_branchy_chain(self) -> None:
|
|
|
n = 100
|
|
|
objects_spec = [(Blob.type_num, b"blob")]
|
|
|
for i in range(n):
|
|
@@ -1187,7 +1188,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
indices = [0, *list(range(100, 0, -1))]
|
|
|
self.assertEntriesMatch(indices, entries, self.make_pack_iter(f))
|
|
|
|
|
|
- def test_ext_ref(self):
|
|
|
+ def test_ext_ref(self) -> None:
|
|
|
(blob,) = self.store_blobs([b"blob"])
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(f, [(REF_DELTA, (blob.id, b"blob1"))], store=self.store)
|
|
@@ -1195,7 +1196,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
self.assertEntriesMatch([0], entries, pack_iter)
|
|
|
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
|
|
|
|
- def test_ext_ref_chain(self):
|
|
|
+ def test_ext_ref_chain(self) -> None:
|
|
|
(blob,) = self.store_blobs([b"blob"])
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(
|
|
@@ -1210,7 +1211,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
self.assertEntriesMatch([1, 0], entries, pack_iter)
|
|
|
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
|
|
|
|
- def test_ext_ref_chain_degenerate(self):
|
|
|
+ def test_ext_ref_chain_degenerate(self) -> None:
|
|
|
# Test a degenerate case where the sender is sending a REF_DELTA
|
|
|
# object that expands to an object already in the repository.
|
|
|
(blob,) = self.store_blobs([b"blob"])
|
|
@@ -1230,7 +1231,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
self.assertEntriesMatch([0, 1], entries, pack_iter)
|
|
|
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
|
|
|
|
- def test_ext_ref_multiple_times(self):
|
|
|
+ def test_ext_ref_multiple_times(self) -> None:
|
|
|
(blob,) = self.store_blobs([b"blob"])
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(
|
|
@@ -1245,7 +1246,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
self.assertEntriesMatch([0, 1], entries, pack_iter)
|
|
|
self.assertEqual([hex_to_sha(blob.id)], pack_iter.ext_refs())
|
|
|
|
|
|
- def test_multiple_ext_refs(self):
|
|
|
+ def test_multiple_ext_refs(self) -> None:
|
|
|
b1, b2 = self.store_blobs([b"foo", b"bar"])
|
|
|
f = BytesIO()
|
|
|
entries = build_pack(
|
|
@@ -1260,7 +1261,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
self.assertEntriesMatch([0, 1], entries, pack_iter)
|
|
|
self.assertEqual([hex_to_sha(b1.id), hex_to_sha(b2.id)], pack_iter.ext_refs())
|
|
|
|
|
|
- def test_bad_ext_ref_non_thin_pack(self):
|
|
|
+ def test_bad_ext_ref_non_thin_pack(self) -> None:
|
|
|
(blob,) = self.store_blobs([b"blob"])
|
|
|
f = BytesIO()
|
|
|
build_pack(f, [(REF_DELTA, (blob.id, b"blob1"))], store=self.store)
|
|
@@ -1271,7 +1272,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
except UnresolvedDeltas as e:
|
|
|
self.assertEqual([blob.id], e.shas)
|
|
|
|
|
|
- def test_bad_ext_ref_thin_pack(self):
|
|
|
+ def test_bad_ext_ref_thin_pack(self) -> None:
|
|
|
b1, b2, b3 = self.store_blobs([b"foo", b"bar", b"baz"])
|
|
|
f = BytesIO()
|
|
|
build_pack(
|
|
@@ -1293,7 +1294,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
except UnresolvedDeltas as e:
|
|
|
self.assertEqual((sorted([b2.id, b3.id]),), (sorted(e.shas),))
|
|
|
|
|
|
- def test_ext_ref_deltified_object_based_on_itself(self):
|
|
|
+ def test_ext_ref_deltified_object_based_on_itself(self) -> None:
|
|
|
b1_content = b"foo"
|
|
|
(b1,) = self.store_blobs([b1_content])
|
|
|
f = BytesIO()
|
|
@@ -1323,7 +1324,7 @@ class DeltaChainIteratorTests(TestCase):
|
|
|
|
|
|
|
|
|
class DeltaEncodeSizeTests(TestCase):
|
|
|
- def test_basic(self):
|
|
|
+ def test_basic(self) -> None:
|
|
|
self.assertEqual(b"\x00", _delta_encode_size(0))
|
|
|
self.assertEqual(b"\x01", _delta_encode_size(1))
|
|
|
self.assertEqual(b"\xfa\x01", _delta_encode_size(250))
|
|
@@ -1332,7 +1333,7 @@ class DeltaEncodeSizeTests(TestCase):
|
|
|
|
|
|
|
|
|
class EncodeCopyOperationTests(TestCase):
|
|
|
- def test_basic(self):
|
|
|
+ def test_basic(self) -> None:
|
|
|
self.assertEqual(b"\x80", _encode_copy_operation(0, 0))
|
|
|
self.assertEqual(b"\x91\x01\x0a", _encode_copy_operation(1, 10))
|
|
|
self.assertEqual(b"\xb1\x64\xe8\x03", _encode_copy_operation(100, 1000))
|