# test_archive.py -- tests for archive # Copyright (C) 2015 Jelmer Vernooij # # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as published by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for archive support.""" import struct import tarfile from io import BytesIO from unittest.mock import patch from dulwich.archive import ChunkedBytesIO, tar_stream from dulwich.object_store import MemoryObjectStore from dulwich.objects import Blob, Tree from dulwich.tests.utils import build_commit_graph from . import TestCase class ArchiveTests(TestCase): def test_empty(self) -> None: store = MemoryObjectStore() _c1, _c2, c3 = build_commit_graph(store, [[1], [2, 1], [3, 1, 2]]) tree = store[c3.tree] stream = b"".join(tar_stream(store, tree, 10)) out = BytesIO(stream) tf = tarfile.TarFile(fileobj=out) self.addCleanup(tf.close) self.assertEqual([], tf.getnames()) def _get_example_tar_stream( self, mtime: int, prefix: bytes = b"", format: str = "" ) -> BytesIO: store = MemoryObjectStore() b1 = Blob.from_string(b"somedata") store.add_object(b1) t1 = Tree() t1.add(b"somename", 0o100644, b1.id) store.add_object(t1) stream = b"".join(tar_stream(store, t1, mtime, prefix, format)) return BytesIO(stream) def test_simple(self) -> None: stream = self._get_example_tar_stream(mtime=0) tf = tarfile.TarFile(fileobj=stream) self.addCleanup(tf.close) self.assertEqual(["somename"], tf.getnames()) def test_unicode(self) -> None: store = MemoryObjectStore() b1 = Blob.from_string(b"somedata") store.add_object(b1) t1 = Tree() t1.add("ő".encode(), 0o100644, b1.id) store.add_object(t1) stream = b"".join(tar_stream(store, t1, mtime=0)) tf = tarfile.TarFile(fileobj=BytesIO(stream)) self.addCleanup(tf.close) self.assertEqual(["ő"], tf.getnames()) def test_prefix(self) -> None: stream = self._get_example_tar_stream(mtime=0, prefix=b"blah") tf = tarfile.TarFile(fileobj=stream) self.addCleanup(tf.close) self.assertEqual(["blah/somename"], tf.getnames()) def test_gzip_mtime(self) -> None: stream = self._get_example_tar_stream(mtime=1234, format="gz") expected_mtime = struct.pack(" None: contents: list[bytes | None] = [None, None] for format in ["", "gz", "bz2"]: for i in [0, 1]: with patch("time.time", return_value=i): stream = self._get_example_tar_stream(mtime=0, format=format) contents[i] = stream.getvalue() self.assertEqual( contents[0], contents[1], f"Different file contents for format {format!r}", ) def test_tar_stream_with_directory(self) -> None: """Test tar_stream with a tree containing directories.""" store = MemoryObjectStore() # Create a blob for a file b1 = Blob.from_string(b"file in subdir") store.add_object(b1) # Create a subtree subtree = Tree() subtree.add(b"file.txt", 0o100644, b1.id) store.add_object(subtree) # Create root tree with a directory root_tree = Tree() root_tree.add(b"subdir", 0o040000, subtree.id) store.add_object(root_tree) # Generate tar stream stream = b"".join(tar_stream(store, root_tree, 0)) tf = tarfile.TarFile(fileobj=BytesIO(stream)) self.addCleanup(tf.close) # Should contain the file in the subdirectory self.assertEqual(["subdir/file.txt"], tf.getnames()) def test_tar_stream_with_submodule(self) -> None: """Test tar_stream handles missing objects (submodules) gracefully.""" store = MemoryObjectStore() # Create a tree with an entry that doesn't exist in the store # (simulating a submodule reference) root_tree = Tree() # Use a valid hex SHA (40 hex chars = 20 bytes) nonexistent_sha = b"a" * 40 root_tree.add(b"submodule", 0o160000, nonexistent_sha) store.add_object(root_tree) # Should not raise, just skip the missing entry stream = b"".join(tar_stream(store, root_tree, 0)) tf = tarfile.TarFile(fileobj=BytesIO(stream)) self.addCleanup(tf.close) # Submodule should be skipped self.assertEqual([], tf.getnames()) class ChunkedBytesIOTests(TestCase): """Tests for ChunkedBytesIO class.""" def test_read_all(self) -> None: """Test reading all bytes from ChunkedBytesIO.""" chunks = [b"hello", b" ", b"world"] chunked = ChunkedBytesIO(chunks) result = chunked.read() self.assertEqual(b"hello world", result) def test_read_with_limit(self) -> None: """Test reading limited bytes from ChunkedBytesIO.""" chunks = [b"hello", b" ", b"world"] chunked = ChunkedBytesIO(chunks) # Read first 5 bytes result = chunked.read(5) self.assertEqual(b"hello", result) # Read next 3 bytes result = chunked.read(3) self.assertEqual(b" wo", result) # Read remaining result = chunked.read() self.assertEqual(b"rld", result) def test_read_negative_maxbytes(self) -> None: """Test reading with negative maxbytes reads all.""" chunks = [b"hello", b" ", b"world"] chunked = ChunkedBytesIO(chunks) result = chunked.read(-1) self.assertEqual(b"hello world", result) def test_read_across_chunks(self) -> None: """Test reading across multiple chunks.""" chunks = [b"abc", b"def", b"ghi"] chunked = ChunkedBytesIO(chunks) # Read 7 bytes (spans three chunks) result = chunked.read(7) self.assertEqual(b"abcdefg", result) # Read remaining result = chunked.read() self.assertEqual(b"hi", result) def test_read_empty_chunks(self) -> None: """Test reading from empty chunks list.""" chunked = ChunkedBytesIO([]) result = chunked.read() self.assertEqual(b"", result) def test_read_with_empty_chunks_mixed(self) -> None: """Test reading with some empty chunks in the list.""" chunks = [b"hello", b"", b"world", b""] chunked = ChunkedBytesIO(chunks) result = chunked.read() self.assertEqual(b"helloworld", result) def test_read_exact_chunk_boundary(self) -> None: """Test reading exactly to a chunk boundary.""" chunks = [b"abc", b"def", b"ghi"] chunked = ChunkedBytesIO(chunks) # Read exactly first chunk result = chunked.read(3) self.assertEqual(b"abc", result) # Read exactly second chunk result = chunked.read(3) self.assertEqual(b"def", result) # Read exactly third chunk result = chunked.read(3) self.assertEqual(b"ghi", result) # Should be at end result = chunked.read() self.assertEqual(b"", result)