Browse Source

Add tests for archive module

Jelmer Vernooij 1 week ago
parent
commit
16790c1782
1 changed files with 133 additions and 1 deletions
  1. 133 1
      tests/test_archive.py

+ 133 - 1
tests/test_archive.py

@@ -26,7 +26,7 @@ import tarfile
 from io import BytesIO
 from unittest.mock import patch
 
-from dulwich.archive import tar_stream
+from dulwich.archive import ChunkedBytesIO, tar_stream
 from dulwich.object_store import MemoryObjectStore
 from dulwich.objects import Blob, Tree
 from dulwich.tests.utils import build_commit_graph
@@ -98,3 +98,135 @@ class ArchiveTests(TestCase):
                 contents[1],
                 f"Different file contents for format {format!r}",
             )
+
+    def test_tar_stream_with_directory(self) -> None:
+        """Test tar_stream with a tree containing directories."""
+        store = MemoryObjectStore()
+
+        # Create a blob for a file
+        b1 = Blob.from_string(b"file in subdir")
+        store.add_object(b1)
+
+        # Create a subtree
+        subtree = Tree()
+        subtree.add(b"file.txt", 0o100644, b1.id)
+        store.add_object(subtree)
+
+        # Create root tree with a directory
+        root_tree = Tree()
+        root_tree.add(b"subdir", 0o040000, subtree.id)
+        store.add_object(root_tree)
+
+        # Generate tar stream
+        stream = b"".join(tar_stream(store, root_tree, 0))
+        tf = tarfile.TarFile(fileobj=BytesIO(stream))
+        self.addCleanup(tf.close)
+
+        # Should contain the file in the subdirectory
+        self.assertEqual(["subdir/file.txt"], tf.getnames())
+
+    def test_tar_stream_with_submodule(self) -> None:
+        """Test tar_stream handles missing objects (submodules) gracefully."""
+        store = MemoryObjectStore()
+
+        # Create a tree with an entry that doesn't exist in the store
+        # (simulating a submodule reference)
+        root_tree = Tree()
+        # Use a valid hex SHA (40 hex chars = 20 bytes)
+        nonexistent_sha = b"a" * 40
+        root_tree.add(b"submodule", 0o160000, nonexistent_sha)
+        store.add_object(root_tree)
+
+        # Should not raise, just skip the missing entry
+        stream = b"".join(tar_stream(store, root_tree, 0))
+        tf = tarfile.TarFile(fileobj=BytesIO(stream))
+        self.addCleanup(tf.close)
+
+        # Submodule should be skipped
+        self.assertEqual([], tf.getnames())
+
+
+class ChunkedBytesIOTests(TestCase):
+    """Tests for ChunkedBytesIO class."""
+
+    def test_read_all(self) -> None:
+        """Test reading all bytes from ChunkedBytesIO."""
+        chunks = [b"hello", b" ", b"world"]
+        chunked = ChunkedBytesIO(chunks)
+
+        result = chunked.read()
+        self.assertEqual(b"hello world", result)
+
+    def test_read_with_limit(self) -> None:
+        """Test reading limited bytes from ChunkedBytesIO."""
+        chunks = [b"hello", b" ", b"world"]
+        chunked = ChunkedBytesIO(chunks)
+
+        # Read first 5 bytes
+        result = chunked.read(5)
+        self.assertEqual(b"hello", result)
+
+        # Read next 3 bytes
+        result = chunked.read(3)
+        self.assertEqual(b" wo", result)
+
+        # Read remaining
+        result = chunked.read()
+        self.assertEqual(b"rld", result)
+
+    def test_read_negative_maxbytes(self) -> None:
+        """Test reading with negative maxbytes reads all."""
+        chunks = [b"hello", b" ", b"world"]
+        chunked = ChunkedBytesIO(chunks)
+
+        result = chunked.read(-1)
+        self.assertEqual(b"hello world", result)
+
+    def test_read_across_chunks(self) -> None:
+        """Test reading across multiple chunks."""
+        chunks = [b"abc", b"def", b"ghi"]
+        chunked = ChunkedBytesIO(chunks)
+
+        # Read 7 bytes (spans three chunks)
+        result = chunked.read(7)
+        self.assertEqual(b"abcdefg", result)
+
+        # Read remaining
+        result = chunked.read()
+        self.assertEqual(b"hi", result)
+
+    def test_read_empty_chunks(self) -> None:
+        """Test reading from empty chunks list."""
+        chunked = ChunkedBytesIO([])
+
+        result = chunked.read()
+        self.assertEqual(b"", result)
+
+    def test_read_with_empty_chunks_mixed(self) -> None:
+        """Test reading with some empty chunks in the list."""
+        chunks = [b"hello", b"", b"world", b""]
+        chunked = ChunkedBytesIO(chunks)
+
+        result = chunked.read()
+        self.assertEqual(b"helloworld", result)
+
+    def test_read_exact_chunk_boundary(self) -> None:
+        """Test reading exactly to a chunk boundary."""
+        chunks = [b"abc", b"def", b"ghi"]
+        chunked = ChunkedBytesIO(chunks)
+
+        # Read exactly first chunk
+        result = chunked.read(3)
+        self.assertEqual(b"abc", result)
+
+        # Read exactly second chunk
+        result = chunked.read(3)
+        self.assertEqual(b"def", result)
+
+        # Read exactly third chunk
+        result = chunked.read(3)
+        self.assertEqual(b"ghi", result)
+
+        # Should be at end
+        result = chunked.read()
+        self.assertEqual(b"", result)