test_archive.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # test_archive.py -- tests for archive
  2. # Copyright (C) 2015 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as public by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. """Tests for archive support."""
  21. from io import BytesIO
  22. import tarfile
  23. import struct
  24. from unittest import skipUnless
  25. from dulwich.archive import tar_stream
  26. from dulwich.object_store import (
  27. MemoryObjectStore,
  28. )
  29. from dulwich.objects import (
  30. Blob,
  31. Tree,
  32. )
  33. from dulwich.tests import (
  34. TestCase,
  35. )
  36. from dulwich.tests.utils import (
  37. build_commit_graph,
  38. )
  39. try:
  40. from mock import patch
  41. except ImportError:
  42. patch = None
  43. class ArchiveTests(TestCase):
  44. def test_empty(self):
  45. store = MemoryObjectStore()
  46. c1, c2, c3 = build_commit_graph(store, [[1], [2, 1], [3, 1, 2]])
  47. tree = store[c3.tree]
  48. stream = b''.join(tar_stream(store, tree, 10))
  49. out = BytesIO(stream)
  50. tf = tarfile.TarFile(fileobj=out)
  51. self.addCleanup(tf.close)
  52. self.assertEqual([], tf.getnames())
  53. def _get_example_tar_stream(self, *tar_stream_args, **tar_stream_kwargs):
  54. store = MemoryObjectStore()
  55. b1 = Blob.from_string(b"somedata")
  56. store.add_object(b1)
  57. t1 = Tree()
  58. t1.add(b"somename", 0o100644, b1.id)
  59. store.add_object(t1)
  60. stream = b''.join(
  61. tar_stream(store, t1, *tar_stream_args, **tar_stream_kwargs))
  62. return BytesIO(stream)
  63. def test_simple(self):
  64. stream = self._get_example_tar_stream(mtime=0)
  65. tf = tarfile.TarFile(fileobj=stream)
  66. self.addCleanup(tf.close)
  67. self.assertEqual(["somename"], tf.getnames())
  68. def test_prefix(self):
  69. stream = self._get_example_tar_stream(mtime=0, prefix=b'blah')
  70. tf = tarfile.TarFile(fileobj=stream)
  71. self.addCleanup(tf.close)
  72. self.assertEqual(["blah/somename"], tf.getnames())
  73. def test_gzip_mtime(self):
  74. stream = self._get_example_tar_stream(mtime=1234, format='gz')
  75. expected_mtime = struct.pack('<L', 1234)
  76. self.assertEqual(stream.getvalue()[4:8], expected_mtime)
  77. @skipUnless(patch, "Required mock.patch")
  78. def test_same_file(self):
  79. contents = [None, None]
  80. for format in ['', 'gz', 'bz2']:
  81. for i in [0, 1]:
  82. with patch('time.time', return_value=i):
  83. stream = self._get_example_tar_stream(
  84. mtime=0, format=format)
  85. contents[i] = stream.getvalue()
  86. self.assertEqual(
  87. contents[0],
  88. contents[1],
  89. "Different file contents for format %r" % format
  90. )