Browse Source

Update directory detection for `get_unstaged_changes` for Python 3 - fix #684

The exception raised when trying to read a directory changed from an `IOError`
in Python 2 to an `IsADirectoryError` in Python 3 (`IsADirectoryError` is a
subclass of `OSError`).

`IsADirectoryError` doesn't exists in Python 2 so we have to makes extra hops
to have a check compatible with both Python 2 and Python 3.
Boris Feld 6 years ago
parent
commit
c9d03864c1
2 changed files with 85 additions and 15 deletions
  1. 39 15
      dulwich/index.py
  2. 46 0
      dulwich/tests/test_index.py

+ 39 - 15
dulwich/index.py

@@ -604,6 +604,32 @@ def read_submodule_head(path):
         return None
 
 
+def has_directory_changed(error, tree_path, entry):
+    """ When handling an error trying to create a blob from a path, call this
+    function. It will check if the path is a directory. If it's a directory
+    and a submodule, check the submodule head to see if it's has changed. If
+    not, consider the file as changed as Git tracked a file and not a
+    directory.
+
+    Return true if the given path should be considered as changed and False
+    otherwise or if the path is not a directory.
+    """
+    if error.errno != errno.EISDIR:
+        return False
+
+    # This is actually a directory
+    if os.path.exists(os.path.join(tree_path, b'.git')):
+        # Submodule
+        head = read_submodule_head(tree_path)
+        if entry.sha != head:
+            return True
+    else:
+        # The file was changed to a directory, so consider it removed.
+        return True
+
+    return False
+
+
 def get_unstaged_changes(index, root_path, filter_blob_callback=None):
     """Walk through an index and check for differences against working tree.
 
@@ -625,23 +651,21 @@ def get_unstaged_changes(index, root_path, filter_blob_callback=None):
             if filter_blob_callback is not None:
                 blob = filter_blob_callback(blob, tree_path)
         except OSError as e:
-            if e.errno != errno.ENOENT:
-                raise
-            # The file was removed, so we assume that counts as
-            # different from whatever file used to exist.
-            yield tree_path
-        except IOError as e:
-            if e.errno != errno.EISDIR:
-                raise
-            # This is actually a directory
-            if os.path.exists(os.path.join(tree_path, '.git')):
-                # Submodule
-                head = read_submodule_head(tree_path)
-                if entry.sha != head:
-                    yield tree_path
+            directory_changed = has_directory_changed(e, tree_path, entry)
+            if directory_changed:
+                yield tree_path
             else:
-                # The file was changed to a directory, so consider it removed.
+                if e.errno != errno.ENOENT:
+                    raise
+                # The file was removed, so we assume that counts as
+                # different from whatever file used to exist.
                 yield tree_path
+        except IOError as e:
+            directory_changed = has_directory_changed(e, tree_path, entry)
+            if directory_changed:
+                yield tree_path
+            else:
+                raise
         else:
             if blob.id != entry.sha:
                 yield tree_path

+ 46 - 0
dulwich/tests/test_index.py

@@ -671,6 +671,52 @@ class GetUnstagedChangesTests(TestCase):
 
             self.assertEqual(list(changes), [b'foo1'])
 
+    def test_get_unstaged_changes_removed_replaced_by_directory(self):
+        """Unit test for get_unstaged_changes."""
+
+        repo_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, repo_dir)
+        with Repo.init(repo_dir) as repo:
+
+            # Commit a dummy file then modify it
+            foo1_fullpath = os.path.join(repo_dir, 'foo1')
+            with open(foo1_fullpath, 'wb') as f:
+                f.write(b'origstuff')
+
+            repo.stage(['foo1'])
+            repo.do_commit(b'test status', author=b'author <email>',
+                           committer=b'committer <email>')
+
+            os.remove(foo1_fullpath)
+            os.mkdir(foo1_fullpath)
+
+            changes = get_unstaged_changes(repo.open_index(), repo_dir)
+
+            self.assertEqual(list(changes), [b'foo1'])
+
+    def test_get_unstaged_changes_removed_replaced_by_link(self):
+        """Unit test for get_unstaged_changes."""
+
+        repo_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, repo_dir)
+        with Repo.init(repo_dir) as repo:
+
+            # Commit a dummy file then modify it
+            foo1_fullpath = os.path.join(repo_dir, 'foo1')
+            with open(foo1_fullpath, 'wb') as f:
+                f.write(b'origstuff')
+
+            repo.stage(['foo1'])
+            repo.do_commit(b'test status', author=b'author <email>',
+                           committer=b'committer <email>')
+
+            os.remove(foo1_fullpath)
+            os.symlink(os.path.dirname(foo1_fullpath), foo1_fullpath)
+
+            changes = get_unstaged_changes(repo.open_index(), repo_dir)
+
+            self.assertEqual(list(changes), [b'foo1'])
+
 
 class TestValidatePathElement(TestCase):