# test_file.py -- Test for git files # Copyright (C) 2010 Google, Inc. # # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # import io import os import shutil import sys import tempfile from dulwich.file import FileLocked, GitFile, _fancy_rename from . import SkipTest, TestCase class FancyRenameTests(TestCase): def setUp(self) -> None: super().setUp() self._tempdir = tempfile.mkdtemp() self.foo = self.path("foo") self.bar = self.path("bar") self.create(self.foo, b"foo contents") def tearDown(self) -> None: shutil.rmtree(self._tempdir) super().tearDown() def path(self, filename): return os.path.join(self._tempdir, filename) def create(self, path, contents) -> None: f = open(path, "wb") f.write(contents) f.close() def test_no_dest_exists(self) -> None: self.assertFalse(os.path.exists(self.bar)) _fancy_rename(self.foo, self.bar) self.assertFalse(os.path.exists(self.foo)) new_f = open(self.bar, "rb") self.assertEqual(b"foo contents", new_f.read()) new_f.close() def test_dest_exists(self) -> None: self.create(self.bar, b"bar contents") _fancy_rename(self.foo, self.bar) self.assertFalse(os.path.exists(self.foo)) new_f = open(self.bar, "rb") self.assertEqual(b"foo contents", new_f.read()) new_f.close() def test_dest_opened(self) -> None: if sys.platform != "win32": raise SkipTest("platform allows overwriting open files") self.create(self.bar, b"bar contents") dest_f = open(self.bar, "rb") self.assertRaises(OSError, _fancy_rename, self.foo, self.bar) dest_f.close() self.assertTrue(os.path.exists(self.path("foo"))) new_f = open(self.foo, "rb") self.assertEqual(b"foo contents", new_f.read()) new_f.close() new_f = open(self.bar, "rb") self.assertEqual(b"bar contents", new_f.read()) new_f.close() class GitFileTests(TestCase): def setUp(self) -> None: super().setUp() self._tempdir = tempfile.mkdtemp() f = open(self.path("foo"), "wb") f.write(b"foo contents") f.close() def tearDown(self) -> None: shutil.rmtree(self._tempdir) super().tearDown() def path(self, filename): return os.path.join(self._tempdir, filename) def test_invalid(self) -> None: foo = self.path("foo") self.assertRaises(IOError, GitFile, foo, mode="r") self.assertRaises(IOError, GitFile, foo, mode="ab") self.assertRaises(IOError, GitFile, foo, mode="r+b") self.assertRaises(IOError, GitFile, foo, mode="w+b") self.assertRaises(IOError, GitFile, foo, mode="a+bU") def test_readonly(self) -> None: f = GitFile(self.path("foo"), "rb") self.assertIsInstance(f, io.IOBase) self.assertEqual(b"foo contents", f.read()) self.assertEqual(b"", f.read()) f.seek(4) self.assertEqual(b"contents", f.read()) f.close() def test_default_mode(self) -> None: f = GitFile(self.path("foo")) self.assertEqual(b"foo contents", f.read()) f.close() def test_write(self) -> None: foo = self.path("foo") foo_lock = f"{foo}.lock" orig_f = open(foo, "rb") self.assertEqual(orig_f.read(), b"foo contents") orig_f.close() self.assertFalse(os.path.exists(foo_lock)) f = GitFile(foo, "wb") self.assertFalse(f.closed) self.assertRaises(AttributeError, getattr, f, "not_a_file_property") self.assertTrue(os.path.exists(foo_lock)) f.write(b"new stuff") f.seek(4) f.write(b"contents") f.close() self.assertFalse(os.path.exists(foo_lock)) new_f = open(foo, "rb") self.assertEqual(b"new contents", new_f.read()) new_f.close() def test_open_twice(self) -> None: foo = self.path("foo") f1 = GitFile(foo, "wb") f1.write(b"new") try: f2 = GitFile(foo, "wb") self.fail() except FileLocked: pass else: f2.close() f1.write(b" contents") f1.close() # Ensure trying to open twice doesn't affect original. f = open(foo, "rb") self.assertEqual(b"new contents", f.read()) f.close() def test_abort(self) -> None: foo = self.path("foo") foo_lock = f"{foo}.lock" orig_f = open(foo, "rb") self.assertEqual(orig_f.read(), b"foo contents") orig_f.close() f = GitFile(foo, "wb") f.write(b"new contents") f.abort() self.assertTrue(f.closed) self.assertFalse(os.path.exists(foo_lock)) new_orig_f = open(foo, "rb") self.assertEqual(new_orig_f.read(), b"foo contents") new_orig_f.close() def test_abort_close(self) -> None: foo = self.path("foo") f = GitFile(foo, "wb") f.abort() try: f.close() except OSError: self.fail() f = GitFile(foo, "wb") f.close() try: f.abort() except OSError: self.fail() def test_abort_close_removed(self) -> None: foo = self.path("foo") f = GitFile(foo, "wb") f._file.close() os.remove(foo + ".lock") f.abort() self.assertTrue(f._closed)