test_file.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # test_file.py -- Test for git files
  2. # Copyright (C) 2010 Google, Inc.
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # of the License or (at your option) a later version of the License.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  17. # MA 02110-1301, USA.
  18. import errno
  19. import io
  20. import os
  21. import shutil
  22. import sys
  23. import tempfile
  24. from dulwich.file import GitFile, fancy_rename
  25. from dulwich.tests import (
  26. SkipTest,
  27. TestCase,
  28. )
  29. class FancyRenameTests(TestCase):
  30. def setUp(self):
  31. super(FancyRenameTests, self).setUp()
  32. self._tempdir = tempfile.mkdtemp()
  33. self.foo = self.path('foo')
  34. self.bar = self.path('bar')
  35. self.create(self.foo, b'foo contents')
  36. def tearDown(self):
  37. shutil.rmtree(self._tempdir)
  38. super(FancyRenameTests, self).tearDown()
  39. def path(self, filename):
  40. return os.path.join(self._tempdir, filename)
  41. def create(self, path, contents):
  42. f = open(path, 'wb')
  43. f.write(contents)
  44. f.close()
  45. def test_no_dest_exists(self):
  46. self.assertFalse(os.path.exists(self.bar))
  47. fancy_rename(self.foo, self.bar)
  48. self.assertFalse(os.path.exists(self.foo))
  49. new_f = open(self.bar, 'rb')
  50. self.assertEqual(b'foo contents', new_f.read())
  51. new_f.close()
  52. def test_dest_exists(self):
  53. self.create(self.bar, b'bar contents')
  54. fancy_rename(self.foo, self.bar)
  55. self.assertFalse(os.path.exists(self.foo))
  56. new_f = open(self.bar, 'rb')
  57. self.assertEqual(b'foo contents', new_f.read())
  58. new_f.close()
  59. def test_dest_opened(self):
  60. if sys.platform != "win32":
  61. raise SkipTest("platform allows overwriting open files")
  62. self.create(self.bar, b'bar contents')
  63. dest_f = open(self.bar, 'rb')
  64. self.assertRaises(OSError, fancy_rename, self.foo, self.bar)
  65. dest_f.close()
  66. self.assertTrue(os.path.exists(self.path('foo')))
  67. new_f = open(self.foo, 'rb')
  68. self.assertEqual(b'foo contents', new_f.read())
  69. new_f.close()
  70. new_f = open(self.bar, 'rb')
  71. self.assertEqual(b'bar contents', new_f.read())
  72. new_f.close()
  73. class GitFileTests(TestCase):
  74. def setUp(self):
  75. super(GitFileTests, self).setUp()
  76. self._tempdir = tempfile.mkdtemp()
  77. f = open(self.path('foo'), 'wb')
  78. f.write(b'foo contents')
  79. f.close()
  80. def tearDown(self):
  81. shutil.rmtree(self._tempdir)
  82. super(GitFileTests, self).tearDown()
  83. def path(self, filename):
  84. return os.path.join(self._tempdir, filename)
  85. def test_invalid(self):
  86. foo = self.path('foo')
  87. self.assertRaises(IOError, GitFile, foo, mode='r')
  88. self.assertRaises(IOError, GitFile, foo, mode='ab')
  89. self.assertRaises(IOError, GitFile, foo, mode='r+b')
  90. self.assertRaises(IOError, GitFile, foo, mode='w+b')
  91. self.assertRaises(IOError, GitFile, foo, mode='a+bU')
  92. def test_readonly(self):
  93. f = GitFile(self.path('foo'), 'rb')
  94. self.assertTrue(isinstance(f, io.IOBase))
  95. self.assertEqual(b'foo contents', f.read())
  96. self.assertEqual(b'', f.read())
  97. f.seek(4)
  98. self.assertEqual(b'contents', f.read())
  99. f.close()
  100. def test_default_mode(self):
  101. f = GitFile(self.path('foo'))
  102. self.assertEqual(b'foo contents', f.read())
  103. f.close()
  104. def test_write(self):
  105. foo = self.path('foo')
  106. foo_lock = '%s.lock' % foo
  107. orig_f = open(foo, 'rb')
  108. self.assertEqual(orig_f.read(), b'foo contents')
  109. orig_f.close()
  110. self.assertFalse(os.path.exists(foo_lock))
  111. f = GitFile(foo, 'wb')
  112. self.assertFalse(f.closed)
  113. self.assertRaises(AttributeError, getattr, f, 'not_a_file_property')
  114. self.assertTrue(os.path.exists(foo_lock))
  115. f.write(b'new stuff')
  116. f.seek(4)
  117. f.write(b'contents')
  118. f.close()
  119. self.assertFalse(os.path.exists(foo_lock))
  120. new_f = open(foo, 'rb')
  121. self.assertEqual(b'new contents', new_f.read())
  122. new_f.close()
  123. def test_open_twice(self):
  124. foo = self.path('foo')
  125. f1 = GitFile(foo, 'wb')
  126. f1.write(b'new')
  127. try:
  128. f2 = GitFile(foo, 'wb')
  129. self.fail()
  130. except OSError as e:
  131. self.assertEqual(errno.EEXIST, e.errno)
  132. else:
  133. f2.close()
  134. f1.write(b' contents')
  135. f1.close()
  136. # Ensure trying to open twice doesn't affect original.
  137. f = open(foo, 'rb')
  138. self.assertEqual(b'new contents', f.read())
  139. f.close()
  140. def test_abort(self):
  141. foo = self.path('foo')
  142. foo_lock = '%s.lock' % foo
  143. orig_f = open(foo, 'rb')
  144. self.assertEqual(orig_f.read(), b'foo contents')
  145. orig_f.close()
  146. f = GitFile(foo, 'wb')
  147. f.write(b'new contents')
  148. f.abort()
  149. self.assertTrue(f.closed)
  150. self.assertFalse(os.path.exists(foo_lock))
  151. new_orig_f = open(foo, 'rb')
  152. self.assertEqual(new_orig_f.read(), b'foo contents')
  153. new_orig_f.close()
  154. def test_abort_close(self):
  155. foo = self.path('foo')
  156. f = GitFile(foo, 'wb')
  157. f.abort()
  158. try:
  159. f.close()
  160. except (IOError, OSError):
  161. self.fail()
  162. f = GitFile(foo, 'wb')
  163. f.close()
  164. try:
  165. f.abort()
  166. except (IOError, OSError):
  167. self.fail()
  168. def test_abort_close_removed(self):
  169. foo = self.path('foo')
  170. f = GitFile(foo, 'wb')
  171. f._file.close()
  172. os.remove(foo+".lock")
  173. f.abort()
  174. self.assertTrue(f._closed)