test_archive.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # test_archive.py -- tests for archive
  2. # Copyright (C) 2015 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for archive support."""
  22. import struct
  23. import tarfile
  24. from io import BytesIO
  25. from typing import Optional
  26. from unittest.mock import patch
  27. from dulwich.archive import tar_stream
  28. from dulwich.object_store import MemoryObjectStore
  29. from dulwich.objects import Blob, Tree
  30. from dulwich.tests.utils import build_commit_graph
  31. from . import TestCase
  32. class ArchiveTests(TestCase):
  33. def test_empty(self) -> None:
  34. store = MemoryObjectStore()
  35. _c1, _c2, c3 = build_commit_graph(store, [[1], [2, 1], [3, 1, 2]])
  36. tree = store[c3.tree]
  37. stream = b"".join(tar_stream(store, tree, 10))
  38. out = BytesIO(stream)
  39. tf = tarfile.TarFile(fileobj=out)
  40. self.addCleanup(tf.close)
  41. self.assertEqual([], tf.getnames())
  42. def _get_example_tar_stream(
  43. self, mtime: int, prefix: bytes = b"", format: str = ""
  44. ) -> BytesIO:
  45. store = MemoryObjectStore()
  46. b1 = Blob.from_string(b"somedata")
  47. store.add_object(b1)
  48. t1 = Tree()
  49. t1.add(b"somename", 0o100644, b1.id)
  50. store.add_object(t1)
  51. stream = b"".join(tar_stream(store, t1, mtime, prefix, format))
  52. return BytesIO(stream)
  53. def test_simple(self) -> None:
  54. stream = self._get_example_tar_stream(mtime=0)
  55. tf = tarfile.TarFile(fileobj=stream)
  56. self.addCleanup(tf.close)
  57. self.assertEqual(["somename"], tf.getnames())
  58. def test_unicode(self) -> None:
  59. store = MemoryObjectStore()
  60. b1 = Blob.from_string(b"somedata")
  61. store.add_object(b1)
  62. t1 = Tree()
  63. t1.add("ő".encode(), 0o100644, b1.id)
  64. store.add_object(t1)
  65. stream = b"".join(tar_stream(store, t1, mtime=0))
  66. tf = tarfile.TarFile(fileobj=BytesIO(stream))
  67. self.addCleanup(tf.close)
  68. self.assertEqual(["ő"], tf.getnames())
  69. def test_prefix(self) -> None:
  70. stream = self._get_example_tar_stream(mtime=0, prefix=b"blah")
  71. tf = tarfile.TarFile(fileobj=stream)
  72. self.addCleanup(tf.close)
  73. self.assertEqual(["blah/somename"], tf.getnames())
  74. def test_gzip_mtime(self) -> None:
  75. stream = self._get_example_tar_stream(mtime=1234, format="gz")
  76. expected_mtime = struct.pack("<L", 1234)
  77. self.assertEqual(stream.getvalue()[4:8], expected_mtime)
  78. def test_same_file(self) -> None:
  79. contents: list[Optional[bytes]] = [None, None]
  80. for format in ["", "gz", "bz2"]:
  81. for i in [0, 1]:
  82. with patch("time.time", return_value=i):
  83. stream = self._get_example_tar_stream(mtime=0, format=format)
  84. contents[i] = stream.getvalue()
  85. self.assertEqual(
  86. contents[0],
  87. contents[1],
  88. f"Different file contents for format {format!r}",
  89. )