Browse Source

Add Repo.unstage.

Ded_Secer 3 years ago
parent
commit
f76bc1eeed
2 changed files with 135 additions and 0 deletions
  1. 53 0
      dulwich/repo.py
  2. 82 0
      dulwich/tests/test_repository.py

+ 53 - 0
dulwich/repo.py

@@ -1305,6 +1305,59 @@ class Repo(BaseRepo):
                     index[tree_path] = index_entry_from_stat(st, blob.id, 0)
         index.write()
 
+    def unstage(self, relpaths: List[bytes] = []):
+        """unstage specific file in the index
+        Args:
+          repo: dulwich Repo object
+          paths: a list of file to unstage, relative to the repository path
+        """
+
+        from dulwich.index import IndexEntry
+
+        index = self.open_index()
+        try:
+            tree_id = self[b'HEAD'].tree
+        # no head mean no commit in the repo
+        except KeyError:
+            for path in relpaths:
+                del index[path]
+            index.write()
+            return
+
+        for path in relpaths:
+            try:
+                tree_entry = self[tree_id].lookup_path(lambda x: self[x], path)
+            except KeyError:
+                # if tree_entry didnt exist, this file was being added, so remove index entry
+                try:
+                    del index[path]
+                    continue
+                except KeyError:
+                    raise KeyError("file '%s' not in index" % (path.decode()))
+
+            st = None
+            try:
+                st = os.lstat(os.path.join(self.path, path.decode()))
+            except FileNotFoundError:
+                pass
+
+            index_entry = IndexEntry(
+                ctime=(self[b'HEAD'].commit_time, 0),
+                mtime=(self[b'HEAD'].commit_time, 0),
+                dev=st.st_dev if st else 0,
+                ino=st.st_ino if st else 0,
+                mode=tree_entry[0],
+                uid=st.st_uid if st else 0,
+                gid=st.st_gid if st else 0,
+                size=len(self[tree_entry[1]].data),
+                sha=tree_entry[1],
+                flags=0,
+                extended_flags=0
+            )
+
+            index[path] = index_entry
+        index.write()
+
     def clone(
         self,
         target_path,

+ 82 - 0
dulwich/tests/test_repository.py

@@ -31,6 +31,7 @@ import tempfile
 import warnings
 
 from dulwich import errors
+from dulwich import porcelain
 from dulwich.object_store import (
     tree_lookup_path,
 )
@@ -1226,6 +1227,87 @@ class BuildRepoRootTests(TestCase):
         r.stage(["c"])
         self.assertEqual([b"a"], list(r.open_index()))
 
+    def test_unstage_midify_file_with_dir(self):
+        os.mkdir(os.path.join(self._repo.path, 'new_dir'))
+        full_path = os.path.join(self._repo.path, 'new_dir', 'foo')
+
+        with open(full_path, 'w') as f:
+            f.write('hello')
+        porcelain.add(self._repo, paths=[full_path])
+        porcelain.commit(
+            self._repo,
+            message=b"unitest",
+            committer=b"Jane <jane@example.com>",
+            author=b"John <john@example.com>",
+        )
+        with open(full_path, 'a') as f:
+            f.write('something new')
+        self._repo.unstage(['new_dir/foo'.encode()])
+        status = list(porcelain.status(self._repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [b'new_dir/foo'], []], status)
+
+    def test_unstage_while_no_commit(self):
+        file = 'foo'
+        full_path = os.path.join(self._repo.path, file)
+        with open(full_path, 'w') as f:
+            f.write('hello')
+        porcelain.add(self._repo, paths=[full_path])
+        self._repo.unstage([file.encode()])
+        status = list(porcelain.status(self._repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], ['foo']], status)
+
+    def test_unstage_add_file(self):
+        file = 'foo'
+        full_path = os.path.join(self._repo.path, file)
+        porcelain.commit(
+            self._repo,
+            message=b"unitest",
+            committer=b"Jane <jane@example.com>",
+            author=b"John <john@example.com>",
+        )
+        with open(full_path, 'w') as f:
+            f.write('hello')
+        porcelain.add(self._repo, paths=[full_path])
+        self._repo.unstage([file.encode()])
+        status = list(porcelain.status(self._repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [], ['foo']], status)
+
+    def test_unstage_modify_file(self):
+        file = 'foo'
+        full_path = os.path.join(self._repo.path, file)
+        with open(full_path, 'w') as f:
+            f.write('hello')
+        porcelain.add(self._repo, paths=[full_path])
+        porcelain.commit(
+            self._repo,
+            message=b"unitest",
+            committer=b"Jane <jane@example.com>",
+            author=b"John <john@example.com>",
+        )
+        with open(full_path, 'a') as f:
+            f.write('broken')
+        porcelain.add(self._repo, paths=[full_path])
+        self._repo.unstage([file.encode()])
+        status = list(porcelain.status(self._repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [b'foo'], []], status)
+
+    def test_unstage_remove_file(self):
+        file = 'foo'
+        full_path = os.path.join(self._repo.path, file)
+        with open(full_path, 'w') as f:
+            f.write('hello')
+        porcelain.add(self._repo, paths=[full_path])
+        porcelain.commit(
+            self._repo,
+            message=b"unitest",
+            committer=b"Jane <jane@example.com>",
+            author=b"John <john@example.com>",
+        )
+        os.remove(full_path)
+        self._repo.unstage([file.encode()])
+        status = list(porcelain.status(self._repo))
+        self.assertEqual([{'add': [], 'delete': [], 'modify': []}, [b'foo'], []], status)
+
     @skipIf(
         sys.platform in ("win32", "darwin"),
         "tries to implicitly decode as utf8",