test_file.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. # test_file.py -- Test for git files
  2. # Copyright (C) 2010 Google, Inc.
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as public by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. import io
  21. import os
  22. import shutil
  23. import sys
  24. import tempfile
  25. from dulwich.file import FileLocked, GitFile, _fancy_rename
  26. from . import SkipTest, TestCase
  27. class FancyRenameTests(TestCase):
  28. def setUp(self):
  29. super().setUp()
  30. self._tempdir = tempfile.mkdtemp()
  31. self.foo = self.path("foo")
  32. self.bar = self.path("bar")
  33. self.create(self.foo, b"foo contents")
  34. def tearDown(self):
  35. shutil.rmtree(self._tempdir)
  36. super().tearDown()
  37. def path(self, filename):
  38. return os.path.join(self._tempdir, filename)
  39. def create(self, path, contents):
  40. f = open(path, "wb")
  41. f.write(contents)
  42. f.close()
  43. def test_no_dest_exists(self):
  44. self.assertFalse(os.path.exists(self.bar))
  45. _fancy_rename(self.foo, self.bar)
  46. self.assertFalse(os.path.exists(self.foo))
  47. new_f = open(self.bar, "rb")
  48. self.assertEqual(b"foo contents", new_f.read())
  49. new_f.close()
  50. def test_dest_exists(self):
  51. self.create(self.bar, b"bar contents")
  52. _fancy_rename(self.foo, self.bar)
  53. self.assertFalse(os.path.exists(self.foo))
  54. new_f = open(self.bar, "rb")
  55. self.assertEqual(b"foo contents", new_f.read())
  56. new_f.close()
  57. def test_dest_opened(self):
  58. if sys.platform != "win32":
  59. raise SkipTest("platform allows overwriting open files")
  60. self.create(self.bar, b"bar contents")
  61. dest_f = open(self.bar, "rb")
  62. self.assertRaises(OSError, _fancy_rename, self.foo, self.bar)
  63. dest_f.close()
  64. self.assertTrue(os.path.exists(self.path("foo")))
  65. new_f = open(self.foo, "rb")
  66. self.assertEqual(b"foo contents", new_f.read())
  67. new_f.close()
  68. new_f = open(self.bar, "rb")
  69. self.assertEqual(b"bar contents", new_f.read())
  70. new_f.close()
  71. class GitFileTests(TestCase):
  72. def setUp(self):
  73. super().setUp()
  74. self._tempdir = tempfile.mkdtemp()
  75. f = open(self.path("foo"), "wb")
  76. f.write(b"foo contents")
  77. f.close()
  78. def tearDown(self):
  79. shutil.rmtree(self._tempdir)
  80. super().tearDown()
  81. def path(self, filename):
  82. return os.path.join(self._tempdir, filename)
  83. def test_invalid(self):
  84. foo = self.path("foo")
  85. self.assertRaises(IOError, GitFile, foo, mode="r")
  86. self.assertRaises(IOError, GitFile, foo, mode="ab")
  87. self.assertRaises(IOError, GitFile, foo, mode="r+b")
  88. self.assertRaises(IOError, GitFile, foo, mode="w+b")
  89. self.assertRaises(IOError, GitFile, foo, mode="a+bU")
  90. def test_readonly(self):
  91. f = GitFile(self.path("foo"), "rb")
  92. self.assertIsInstance(f, io.IOBase)
  93. self.assertEqual(b"foo contents", f.read())
  94. self.assertEqual(b"", f.read())
  95. f.seek(4)
  96. self.assertEqual(b"contents", f.read())
  97. f.close()
  98. def test_default_mode(self):
  99. f = GitFile(self.path("foo"))
  100. self.assertEqual(b"foo contents", f.read())
  101. f.close()
  102. def test_write(self):
  103. foo = self.path("foo")
  104. foo_lock = f"{foo}.lock"
  105. orig_f = open(foo, "rb")
  106. self.assertEqual(orig_f.read(), b"foo contents")
  107. orig_f.close()
  108. self.assertFalse(os.path.exists(foo_lock))
  109. f = GitFile(foo, "wb")
  110. self.assertFalse(f.closed)
  111. self.assertRaises(AttributeError, getattr, f, "not_a_file_property")
  112. self.assertTrue(os.path.exists(foo_lock))
  113. f.write(b"new stuff")
  114. f.seek(4)
  115. f.write(b"contents")
  116. f.close()
  117. self.assertFalse(os.path.exists(foo_lock))
  118. new_f = open(foo, "rb")
  119. self.assertEqual(b"new contents", new_f.read())
  120. new_f.close()
  121. def test_open_twice(self):
  122. foo = self.path("foo")
  123. f1 = GitFile(foo, "wb")
  124. f1.write(b"new")
  125. try:
  126. f2 = GitFile(foo, "wb")
  127. self.fail()
  128. except FileLocked:
  129. pass
  130. else:
  131. f2.close()
  132. f1.write(b" contents")
  133. f1.close()
  134. # Ensure trying to open twice doesn't affect original.
  135. f = open(foo, "rb")
  136. self.assertEqual(b"new contents", f.read())
  137. f.close()
  138. def test_abort(self):
  139. foo = self.path("foo")
  140. foo_lock = f"{foo}.lock"
  141. orig_f = open(foo, "rb")
  142. self.assertEqual(orig_f.read(), b"foo contents")
  143. orig_f.close()
  144. f = GitFile(foo, "wb")
  145. f.write(b"new contents")
  146. f.abort()
  147. self.assertTrue(f.closed)
  148. self.assertFalse(os.path.exists(foo_lock))
  149. new_orig_f = open(foo, "rb")
  150. self.assertEqual(new_orig_f.read(), b"foo contents")
  151. new_orig_f.close()
  152. def test_abort_close(self):
  153. foo = self.path("foo")
  154. f = GitFile(foo, "wb")
  155. f.abort()
  156. try:
  157. f.close()
  158. except OSError:
  159. self.fail()
  160. f = GitFile(foo, "wb")
  161. f.close()
  162. try:
  163. f.abort()
  164. except OSError:
  165. self.fail()
  166. def test_abort_close_removed(self):
  167. foo = self.path("foo")
  168. f = GitFile(foo, "wb")
  169. f._file.close()
  170. os.remove(foo + ".lock")
  171. f.abort()
  172. self.assertTrue(f._closed)