test_file.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. # test_file.py -- Test for git files
  2. # Copyright (C) 2010 Google, Inc.
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. import io
  22. import os
  23. import shutil
  24. import sys
  25. import tempfile
  26. from dulwich.file import FileLocked, GitFile, _fancy_rename
  27. from . import SkipTest, TestCase
  28. class FancyRenameTests(TestCase):
  29. def setUp(self) -> None:
  30. super().setUp()
  31. self._tempdir = tempfile.mkdtemp()
  32. self.foo = self.path("foo")
  33. self.bar = self.path("bar")
  34. self.create(self.foo, b"foo contents")
  35. def tearDown(self) -> None:
  36. shutil.rmtree(self._tempdir)
  37. super().tearDown()
  38. def path(self, filename):
  39. return os.path.join(self._tempdir, filename)
  40. def create(self, path, contents) -> None:
  41. f = open(path, "wb")
  42. f.write(contents)
  43. f.close()
  44. def test_no_dest_exists(self) -> None:
  45. self.assertFalse(os.path.exists(self.bar))
  46. _fancy_rename(self.foo, self.bar)
  47. self.assertFalse(os.path.exists(self.foo))
  48. new_f = open(self.bar, "rb")
  49. self.assertEqual(b"foo contents", new_f.read())
  50. new_f.close()
  51. def test_dest_exists(self) -> None:
  52. self.create(self.bar, b"bar contents")
  53. _fancy_rename(self.foo, self.bar)
  54. self.assertFalse(os.path.exists(self.foo))
  55. new_f = open(self.bar, "rb")
  56. self.assertEqual(b"foo contents", new_f.read())
  57. new_f.close()
  58. def test_dest_opened(self) -> None:
  59. if sys.platform != "win32":
  60. raise SkipTest("platform allows overwriting open files")
  61. self.create(self.bar, b"bar contents")
  62. dest_f = open(self.bar, "rb")
  63. self.assertRaises(OSError, _fancy_rename, self.foo, self.bar)
  64. dest_f.close()
  65. self.assertTrue(os.path.exists(self.path("foo")))
  66. new_f = open(self.foo, "rb")
  67. self.assertEqual(b"foo contents", new_f.read())
  68. new_f.close()
  69. new_f = open(self.bar, "rb")
  70. self.assertEqual(b"bar contents", new_f.read())
  71. new_f.close()
  72. class GitFileTests(TestCase):
  73. def setUp(self) -> None:
  74. super().setUp()
  75. self._tempdir = tempfile.mkdtemp()
  76. f = open(self.path("foo"), "wb")
  77. f.write(b"foo contents")
  78. f.close()
  79. def tearDown(self) -> None:
  80. shutil.rmtree(self._tempdir)
  81. super().tearDown()
  82. def path(self, filename):
  83. return os.path.join(self._tempdir, filename)
  84. def test_invalid(self) -> None:
  85. foo = self.path("foo")
  86. self.assertRaises(IOError, GitFile, foo, mode="r")
  87. self.assertRaises(IOError, GitFile, foo, mode="ab")
  88. self.assertRaises(IOError, GitFile, foo, mode="r+b")
  89. self.assertRaises(IOError, GitFile, foo, mode="w+b")
  90. self.assertRaises(IOError, GitFile, foo, mode="a+bU")
  91. def test_readonly(self) -> None:
  92. f = GitFile(self.path("foo"), "rb")
  93. self.assertIsInstance(f, io.IOBase)
  94. self.assertEqual(b"foo contents", f.read())
  95. self.assertEqual(b"", f.read())
  96. f.seek(4)
  97. self.assertEqual(b"contents", f.read())
  98. f.close()
  99. def test_default_mode(self) -> None:
  100. f = GitFile(self.path("foo"))
  101. self.assertEqual(b"foo contents", f.read())
  102. f.close()
  103. def test_write(self) -> None:
  104. foo = self.path("foo")
  105. foo_lock = f"{foo}.lock"
  106. orig_f = open(foo, "rb")
  107. self.assertEqual(orig_f.read(), b"foo contents")
  108. orig_f.close()
  109. self.assertFalse(os.path.exists(foo_lock))
  110. f = GitFile(foo, "wb")
  111. self.assertFalse(f.closed)
  112. self.assertRaises(AttributeError, getattr, f, "not_a_file_property")
  113. self.assertTrue(os.path.exists(foo_lock))
  114. f.write(b"new stuff")
  115. f.seek(4)
  116. f.write(b"contents")
  117. f.close()
  118. self.assertFalse(os.path.exists(foo_lock))
  119. new_f = open(foo, "rb")
  120. self.assertEqual(b"new contents", new_f.read())
  121. new_f.close()
  122. def test_open_twice(self) -> None:
  123. foo = self.path("foo")
  124. f1 = GitFile(foo, "wb")
  125. f1.write(b"new")
  126. try:
  127. f2 = GitFile(foo, "wb")
  128. self.fail()
  129. except FileLocked:
  130. pass
  131. else:
  132. f2.close()
  133. f1.write(b" contents")
  134. f1.close()
  135. # Ensure trying to open twice doesn't affect original.
  136. f = open(foo, "rb")
  137. self.assertEqual(b"new contents", f.read())
  138. f.close()
  139. def test_abort(self) -> None:
  140. foo = self.path("foo")
  141. foo_lock = f"{foo}.lock"
  142. orig_f = open(foo, "rb")
  143. self.assertEqual(orig_f.read(), b"foo contents")
  144. orig_f.close()
  145. f = GitFile(foo, "wb")
  146. f.write(b"new contents")
  147. f.abort()
  148. self.assertTrue(f.closed)
  149. self.assertFalse(os.path.exists(foo_lock))
  150. new_orig_f = open(foo, "rb")
  151. self.assertEqual(new_orig_f.read(), b"foo contents")
  152. new_orig_f.close()
  153. def test_abort_close(self) -> None:
  154. foo = self.path("foo")
  155. f = GitFile(foo, "wb")
  156. f.abort()
  157. try:
  158. f.close()
  159. except OSError:
  160. self.fail()
  161. f = GitFile(foo, "wb")
  162. f.close()
  163. try:
  164. f.abort()
  165. except OSError:
  166. self.fail()
  167. def test_abort_close_removed(self) -> None:
  168. foo = self.path("foo")
  169. f = GitFile(foo, "wb")
  170. f._file.close()
  171. os.remove(foo + ".lock")
  172. f.abort()
  173. self.assertTrue(f._closed)