2
0

test_file.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # test_file.py -- Test for git files
  2. # Copyright (C) 2010 Google, Inc.
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as public by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. import io
  21. import os
  22. import shutil
  23. import sys
  24. import tempfile
  25. from dulwich.file import FileLocked, GitFile, _fancy_rename
  26. from dulwich.tests import (
  27. SkipTest,
  28. TestCase,
  29. )
  30. class FancyRenameTests(TestCase):
  31. def setUp(self):
  32. super(FancyRenameTests, self).setUp()
  33. self._tempdir = tempfile.mkdtemp()
  34. self.foo = self.path('foo')
  35. self.bar = self.path('bar')
  36. self.create(self.foo, b'foo contents')
  37. def tearDown(self):
  38. shutil.rmtree(self._tempdir)
  39. super(FancyRenameTests, self).tearDown()
  40. def path(self, filename):
  41. return os.path.join(self._tempdir, filename)
  42. def create(self, path, contents):
  43. f = open(path, 'wb')
  44. f.write(contents)
  45. f.close()
  46. def test_no_dest_exists(self):
  47. self.assertFalse(os.path.exists(self.bar))
  48. _fancy_rename(self.foo, self.bar)
  49. self.assertFalse(os.path.exists(self.foo))
  50. new_f = open(self.bar, 'rb')
  51. self.assertEqual(b'foo contents', new_f.read())
  52. new_f.close()
  53. def test_dest_exists(self):
  54. self.create(self.bar, b'bar contents')
  55. _fancy_rename(self.foo, self.bar)
  56. self.assertFalse(os.path.exists(self.foo))
  57. new_f = open(self.bar, 'rb')
  58. self.assertEqual(b'foo contents', new_f.read())
  59. new_f.close()
  60. def test_dest_opened(self):
  61. if sys.platform != "win32":
  62. raise SkipTest("platform allows overwriting open files")
  63. self.create(self.bar, b'bar contents')
  64. dest_f = open(self.bar, 'rb')
  65. self.assertRaises(OSError, _fancy_rename, self.foo, self.bar)
  66. dest_f.close()
  67. self.assertTrue(os.path.exists(self.path('foo')))
  68. new_f = open(self.foo, 'rb')
  69. self.assertEqual(b'foo contents', new_f.read())
  70. new_f.close()
  71. new_f = open(self.bar, 'rb')
  72. self.assertEqual(b'bar contents', new_f.read())
  73. new_f.close()
  74. class GitFileTests(TestCase):
  75. def setUp(self):
  76. super(GitFileTests, self).setUp()
  77. self._tempdir = tempfile.mkdtemp()
  78. f = open(self.path('foo'), 'wb')
  79. f.write(b'foo contents')
  80. f.close()
  81. def tearDown(self):
  82. shutil.rmtree(self._tempdir)
  83. super(GitFileTests, self).tearDown()
  84. def path(self, filename):
  85. return os.path.join(self._tempdir, filename)
  86. def test_invalid(self):
  87. foo = self.path('foo')
  88. self.assertRaises(IOError, GitFile, foo, mode='r')
  89. self.assertRaises(IOError, GitFile, foo, mode='ab')
  90. self.assertRaises(IOError, GitFile, foo, mode='r+b')
  91. self.assertRaises(IOError, GitFile, foo, mode='w+b')
  92. self.assertRaises(IOError, GitFile, foo, mode='a+bU')
  93. def test_readonly(self):
  94. f = GitFile(self.path('foo'), 'rb')
  95. self.assertTrue(isinstance(f, io.IOBase))
  96. self.assertEqual(b'foo contents', f.read())
  97. self.assertEqual(b'', f.read())
  98. f.seek(4)
  99. self.assertEqual(b'contents', f.read())
  100. f.close()
  101. def test_default_mode(self):
  102. f = GitFile(self.path('foo'))
  103. self.assertEqual(b'foo contents', f.read())
  104. f.close()
  105. def test_write(self):
  106. foo = self.path('foo')
  107. foo_lock = '%s.lock' % foo
  108. orig_f = open(foo, 'rb')
  109. self.assertEqual(orig_f.read(), b'foo contents')
  110. orig_f.close()
  111. self.assertFalse(os.path.exists(foo_lock))
  112. f = GitFile(foo, 'wb')
  113. self.assertFalse(f.closed)
  114. self.assertRaises(AttributeError, getattr, f, 'not_a_file_property')
  115. self.assertTrue(os.path.exists(foo_lock))
  116. f.write(b'new stuff')
  117. f.seek(4)
  118. f.write(b'contents')
  119. f.close()
  120. self.assertFalse(os.path.exists(foo_lock))
  121. new_f = open(foo, 'rb')
  122. self.assertEqual(b'new contents', new_f.read())
  123. new_f.close()
  124. def test_open_twice(self):
  125. foo = self.path('foo')
  126. f1 = GitFile(foo, 'wb')
  127. f1.write(b'new')
  128. try:
  129. f2 = GitFile(foo, 'wb')
  130. self.fail()
  131. except FileLocked:
  132. pass
  133. else:
  134. f2.close()
  135. f1.write(b' contents')
  136. f1.close()
  137. # Ensure trying to open twice doesn't affect original.
  138. f = open(foo, 'rb')
  139. self.assertEqual(b'new contents', f.read())
  140. f.close()
  141. def test_abort(self):
  142. foo = self.path('foo')
  143. foo_lock = '%s.lock' % foo
  144. orig_f = open(foo, 'rb')
  145. self.assertEqual(orig_f.read(), b'foo contents')
  146. orig_f.close()
  147. f = GitFile(foo, 'wb')
  148. f.write(b'new contents')
  149. f.abort()
  150. self.assertTrue(f.closed)
  151. self.assertFalse(os.path.exists(foo_lock))
  152. new_orig_f = open(foo, 'rb')
  153. self.assertEqual(new_orig_f.read(), b'foo contents')
  154. new_orig_f.close()
  155. def test_abort_close(self):
  156. foo = self.path('foo')
  157. f = GitFile(foo, 'wb')
  158. f.abort()
  159. try:
  160. f.close()
  161. except (IOError, OSError):
  162. self.fail()
  163. f = GitFile(foo, 'wb')
  164. f.close()
  165. try:
  166. f.abort()
  167. except (IOError, OSError):
  168. self.fail()
  169. def test_abort_close_removed(self):
  170. foo = self.path('foo')
  171. f = GitFile(foo, 'wb')
  172. f._file.close()
  173. os.remove(foo+".lock")
  174. f.abort()
  175. self.assertTrue(f._closed)