# test_pack.py -- Compatibility tests for git packs.
# Copyright (C) 2010 Google, Inc.
#
# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Compatibility tests for git packs."""
import binascii
import os
import re
import shutil
import tempfile
from typing import NoReturn
from dulwich.objects import Blob
from dulwich.pack import write_pack
from .. import SkipTest
from ..test_pack import PackTests, a_sha, pack1_sha
from .utils import require_git_version, run_git_or_fail
_NON_DELTA_RE = re.compile(b"non delta: (?P\\d+) objects")
def _git_verify_pack_object_list(output):
pack_shas = set()
for line in output.splitlines():
sha = line[:40]
try:
binascii.unhexlify(sha)
except (TypeError, binascii.Error):
continue # non-sha line
pack_shas.add(sha)
return pack_shas
class TestPack(PackTests):
"""Compatibility tests for reading and writing pack files."""
def setUp(self) -> None:
require_git_version((1, 5, 0))
super().setUp()
self._tempdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self._tempdir)
def test_copy(self) -> None:
with self.get_pack(pack1_sha) as origpack:
self.assertSucceeds(origpack.index.check)
pack_path = os.path.join(self._tempdir, "Elch")
write_pack(pack_path, origpack.pack_tuples())
output = run_git_or_fail(["verify-pack", "-v", pack_path])
orig_shas = {o.id for o in origpack.iterobjects()}
self.assertEqual(orig_shas, _git_verify_pack_object_list(output))
def test_deltas_work(self) -> None:
with self.get_pack(pack1_sha) as orig_pack:
orig_blob = orig_pack[a_sha]
new_blob = Blob()
new_blob.data = orig_blob.data + b"x"
all_to_pack = [(o, None) for o in orig_pack.iterobjects()] + [
(new_blob, None)
]
pack_path = os.path.join(self._tempdir, "pack_with_deltas")
write_pack(pack_path, all_to_pack, deltify=True)
output = run_git_or_fail(["verify-pack", "-v", pack_path])
self.assertEqual(
{x[0].id for x in all_to_pack},
_git_verify_pack_object_list(output),
)
# We specifically made a new blob that should be a delta
# against the blob a_sha, so make sure we really got only 3
# non-delta objects:
got_non_delta = int(_NON_DELTA_RE.search(output).group("non_delta"))
self.assertEqual(
3,
got_non_delta,
"Expected 3 non-delta objects, got %d" % got_non_delta,
)
def test_delta_medium_object(self) -> None:
# This tests an object set that will have a copy operation
# 2**20 in size.
with self.get_pack(pack1_sha) as orig_pack:
orig_blob = orig_pack[a_sha]
new_blob = Blob()
new_blob.data = orig_blob.data + (b"x" * 2**20)
new_blob_2 = Blob()
new_blob_2.data = new_blob.data + b"y"
all_to_pack = [
*list(orig_pack.pack_tuples()),
(new_blob, None),
(new_blob_2, None),
]
pack_path = os.path.join(self._tempdir, "pack_with_deltas")
write_pack(pack_path, all_to_pack, deltify=True)
output = run_git_or_fail(["verify-pack", "-v", pack_path])
self.assertEqual(
{x[0].id for x in all_to_pack},
_git_verify_pack_object_list(output),
)
# We specifically made a new blob that should be a delta
# against the blob a_sha, so make sure we really got only 3
# non-delta objects:
got_non_delta = int(_NON_DELTA_RE.search(output).group("non_delta"))
self.assertEqual(
3,
got_non_delta,
"Expected 3 non-delta objects, got %d" % got_non_delta,
)
# We expect one object to have a delta chain length of two
# (new_blob_2), so let's verify that actually happens:
self.assertIn(b"chain length = 2", output)
# This test is SUPER slow: over 80 seconds on a 2012-era
# laptop. This is because SequenceMatcher is worst-case quadratic
# on the input size. It's impractical to produce deltas for
# objects this large, but it's still worth doing the right thing
# when it happens.
def test_delta_large_object(self) -> NoReturn:
# This tests an object set that will have a copy operation
# 2**25 in size. This is a copy large enough that it requires
# two copy operations in git's binary delta format.
raise SkipTest("skipping slow, large test")
with self.get_pack(pack1_sha) as orig_pack:
new_blob = Blob()
new_blob.data = "big blob" + ("x" * 2**25)
new_blob_2 = Blob()
new_blob_2.data = new_blob.data + "y"
all_to_pack = [
*list(orig_pack.pack_tuples()),
(new_blob, None),
(new_blob_2, None),
]
pack_path = os.path.join(self._tempdir, "pack_with_deltas")
write_pack(pack_path, all_to_pack, deltify=True)
output = run_git_or_fail(["verify-pack", "-v", pack_path])
self.assertEqual(
{x[0].id for x in all_to_pack},
_git_verify_pack_object_list(output),
)
# We specifically made a new blob that should be a delta
# against the blob a_sha, so make sure we really got only 4
# non-delta objects:
got_non_delta = int(_NON_DELTA_RE.search(output).group("non_delta"))
self.assertEqual(
4,
got_non_delta,
"Expected 4 non-delta objects, got %d" % got_non_delta,
)