test_archive.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. # test_archive.py -- tests for archive
  2. # Copyright (C) 2015 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for archive support."""
  22. import struct
  23. import tarfile
  24. from io import BytesIO
  25. from unittest.mock import patch
  26. from dulwich.archive import ChunkedBytesIO, tar_stream
  27. from dulwich.object_store import MemoryObjectStore
  28. from dulwich.objects import Blob, Tree
  29. from dulwich.tests.utils import build_commit_graph
  30. from . import TestCase
  31. class ArchiveTests(TestCase):
  32. def test_empty(self) -> None:
  33. store = MemoryObjectStore()
  34. _c1, _c2, c3 = build_commit_graph(store, [[1], [2, 1], [3, 1, 2]])
  35. tree = store[c3.tree]
  36. stream = b"".join(tar_stream(store, tree, 10))
  37. out = BytesIO(stream)
  38. tf = tarfile.TarFile(fileobj=out)
  39. self.addCleanup(tf.close)
  40. self.assertEqual([], tf.getnames())
  41. def _get_example_tar_stream(
  42. self, mtime: int, prefix: bytes = b"", format: str = ""
  43. ) -> BytesIO:
  44. store = MemoryObjectStore()
  45. b1 = Blob.from_string(b"somedata")
  46. store.add_object(b1)
  47. t1 = Tree()
  48. t1.add(b"somename", 0o100644, b1.id)
  49. store.add_object(t1)
  50. stream = b"".join(tar_stream(store, t1, mtime, prefix, format))
  51. return BytesIO(stream)
  52. def test_simple(self) -> None:
  53. stream = self._get_example_tar_stream(mtime=0)
  54. tf = tarfile.TarFile(fileobj=stream)
  55. self.addCleanup(tf.close)
  56. self.assertEqual(["somename"], tf.getnames())
  57. def test_unicode(self) -> None:
  58. store = MemoryObjectStore()
  59. b1 = Blob.from_string(b"somedata")
  60. store.add_object(b1)
  61. t1 = Tree()
  62. t1.add("ő".encode(), 0o100644, b1.id)
  63. store.add_object(t1)
  64. stream = b"".join(tar_stream(store, t1, mtime=0))
  65. tf = tarfile.TarFile(fileobj=BytesIO(stream))
  66. self.addCleanup(tf.close)
  67. self.assertEqual(["ő"], tf.getnames())
  68. def test_prefix(self) -> None:
  69. stream = self._get_example_tar_stream(mtime=0, prefix=b"blah")
  70. tf = tarfile.TarFile(fileobj=stream)
  71. self.addCleanup(tf.close)
  72. self.assertEqual(["blah/somename"], tf.getnames())
  73. def test_gzip_mtime(self) -> None:
  74. stream = self._get_example_tar_stream(mtime=1234, format="gz")
  75. expected_mtime = struct.pack("<L", 1234)
  76. self.assertEqual(stream.getvalue()[4:8], expected_mtime)
  77. def test_same_file(self) -> None:
  78. contents: list[bytes | None] = [None, None]
  79. for format in ["", "gz", "bz2"]:
  80. for i in [0, 1]:
  81. with patch("time.time", return_value=i):
  82. stream = self._get_example_tar_stream(mtime=0, format=format)
  83. contents[i] = stream.getvalue()
  84. self.assertEqual(
  85. contents[0],
  86. contents[1],
  87. f"Different file contents for format {format!r}",
  88. )
  89. def test_tar_stream_with_directory(self) -> None:
  90. """Test tar_stream with a tree containing directories."""
  91. store = MemoryObjectStore()
  92. # Create a blob for a file
  93. b1 = Blob.from_string(b"file in subdir")
  94. store.add_object(b1)
  95. # Create a subtree
  96. subtree = Tree()
  97. subtree.add(b"file.txt", 0o100644, b1.id)
  98. store.add_object(subtree)
  99. # Create root tree with a directory
  100. root_tree = Tree()
  101. root_tree.add(b"subdir", 0o040000, subtree.id)
  102. store.add_object(root_tree)
  103. # Generate tar stream
  104. stream = b"".join(tar_stream(store, root_tree, 0))
  105. tf = tarfile.TarFile(fileobj=BytesIO(stream))
  106. self.addCleanup(tf.close)
  107. # Should contain the file in the subdirectory
  108. self.assertEqual(["subdir/file.txt"], tf.getnames())
  109. def test_tar_stream_with_submodule(self) -> None:
  110. """Test tar_stream handles missing objects (submodules) gracefully."""
  111. store = MemoryObjectStore()
  112. # Create a tree with an entry that doesn't exist in the store
  113. # (simulating a submodule reference)
  114. root_tree = Tree()
  115. # Use a valid hex SHA (40 hex chars = 20 bytes)
  116. nonexistent_sha = b"a" * 40
  117. root_tree.add(b"submodule", 0o160000, nonexistent_sha)
  118. store.add_object(root_tree)
  119. # Should not raise, just skip the missing entry
  120. stream = b"".join(tar_stream(store, root_tree, 0))
  121. tf = tarfile.TarFile(fileobj=BytesIO(stream))
  122. self.addCleanup(tf.close)
  123. # Submodule should be skipped
  124. self.assertEqual([], tf.getnames())
  125. class ChunkedBytesIOTests(TestCase):
  126. """Tests for ChunkedBytesIO class."""
  127. def test_read_all(self) -> None:
  128. """Test reading all bytes from ChunkedBytesIO."""
  129. chunks = [b"hello", b" ", b"world"]
  130. chunked = ChunkedBytesIO(chunks)
  131. result = chunked.read()
  132. self.assertEqual(b"hello world", result)
  133. def test_read_with_limit(self) -> None:
  134. """Test reading limited bytes from ChunkedBytesIO."""
  135. chunks = [b"hello", b" ", b"world"]
  136. chunked = ChunkedBytesIO(chunks)
  137. # Read first 5 bytes
  138. result = chunked.read(5)
  139. self.assertEqual(b"hello", result)
  140. # Read next 3 bytes
  141. result = chunked.read(3)
  142. self.assertEqual(b" wo", result)
  143. # Read remaining
  144. result = chunked.read()
  145. self.assertEqual(b"rld", result)
  146. def test_read_negative_maxbytes(self) -> None:
  147. """Test reading with negative maxbytes reads all."""
  148. chunks = [b"hello", b" ", b"world"]
  149. chunked = ChunkedBytesIO(chunks)
  150. result = chunked.read(-1)
  151. self.assertEqual(b"hello world", result)
  152. def test_read_across_chunks(self) -> None:
  153. """Test reading across multiple chunks."""
  154. chunks = [b"abc", b"def", b"ghi"]
  155. chunked = ChunkedBytesIO(chunks)
  156. # Read 7 bytes (spans three chunks)
  157. result = chunked.read(7)
  158. self.assertEqual(b"abcdefg", result)
  159. # Read remaining
  160. result = chunked.read()
  161. self.assertEqual(b"hi", result)
  162. def test_read_empty_chunks(self) -> None:
  163. """Test reading from empty chunks list."""
  164. chunked = ChunkedBytesIO([])
  165. result = chunked.read()
  166. self.assertEqual(b"", result)
  167. def test_read_with_empty_chunks_mixed(self) -> None:
  168. """Test reading with some empty chunks in the list."""
  169. chunks = [b"hello", b"", b"world", b""]
  170. chunked = ChunkedBytesIO(chunks)
  171. result = chunked.read()
  172. self.assertEqual(b"helloworld", result)
  173. def test_read_exact_chunk_boundary(self) -> None:
  174. """Test reading exactly to a chunk boundary."""
  175. chunks = [b"abc", b"def", b"ghi"]
  176. chunked = ChunkedBytesIO(chunks)
  177. # Read exactly first chunk
  178. result = chunked.read(3)
  179. self.assertEqual(b"abc", result)
  180. # Read exactly second chunk
  181. result = chunked.read(3)
  182. self.assertEqual(b"def", result)
  183. # Read exactly third chunk
  184. result = chunked.read(3)
  185. self.assertEqual(b"ghi", result)
  186. # Should be at end
  187. result = chunked.read()
  188. self.assertEqual(b"", result)