Bladeren bron

Support popping stashes

Jelmer Vernooij 1 maand geleden
bovenliggende
commit
6e27ba53ba
5 gewijzigde bestanden met toevoegingen van 414 en 5 verwijderingen
  1. 2 0
      NEWS
  2. 3 2
      dulwich/repo.py
  3. 176 3
      dulwich/stash.py
  4. 144 0
      tests/test_porcelain.py
  5. 89 0
      tests/test_stash.py

+ 2 - 0
NEWS

@@ -14,6 +14,8 @@
    CLI command, and ``submodule_update`` CLI command. Add ``--recurse-submodules``
    option to ``clone`` command. (#506, Jelmer Vernooij)
 
+ * Support popping stashes. (Jelmer Vernooij)
+
 0.23.1	2025-06-30
 
  * Support ``untracked_files="normal"`` argument to ``porcelain.status``,

+ 3 - 2
dulwich/repo.py

@@ -980,7 +980,7 @@ class BaseRepo:
         author_timezone=None,
         tree: Optional[ObjectID] = None,
         encoding: Optional[bytes] = None,
-        ref: Ref = b"HEAD",
+        ref: Optional[Ref] = b"HEAD",
         merge_heads: Optional[list[ObjectID]] = None,
         no_verify: bool = False,
         sign: bool = False,
@@ -1004,7 +1004,8 @@ class BaseRepo:
           tree: SHA1 of the tree root to use (if not specified the
             current index will be committed).
           encoding: Encoding
-          ref: Optional ref to commit to (defaults to current branch)
+          ref: Optional ref to commit to (defaults to current branch).
+            If None, creates a dangling commit without updating any ref.
           merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
           no_verify: Skip pre-commit and commit-msg hooks
           sign: GPG Sign the commit (bool, defaults to False,

+ 176 - 3
dulwich/stash.py

@@ -25,8 +25,21 @@ import os
 from typing import TYPE_CHECKING, Optional, TypedDict
 
 from .file import GitFile
-from .index import commit_tree, iter_fresh_objects
-from .objects import ObjectID
+from .index import (
+    IndexEntry,
+    _tree_to_fs_path,
+    build_file_from_blob,
+    commit_tree,
+    index_entry_from_stat,
+    iter_fresh_objects,
+    iter_tree_contents,
+    symlink,
+    update_working_tree,
+    validate_path,
+    validate_path_element_default,
+    validate_path_element_ntfs,
+)
+from .objects import S_IFGITLINK, Blob, Commit, ObjectID
 from .reflog import drop_reflog_entry, read_reflog
 from .refs import Ref
 
@@ -83,7 +96,147 @@ class Stash:
             self._repo.refs[self._ref] = self[0].new_sha
 
     def pop(self, index: int) -> "Entry":
-        raise NotImplementedError(self.pop)
+        """Pop a stash entry and apply its changes.
+
+        Args:
+          index: Index of the stash entry to pop (0 is the most recent)
+
+        Returns:
+          The stash entry that was popped
+        """
+        # Get the stash entry before removing it
+        entry = self[index]
+
+        # Get the stash commit
+        stash_commit = self._repo.get_object(entry.new_sha)
+        assert isinstance(stash_commit, Commit)
+
+        # The stash commit has the working tree changes
+        # Its first parent is the commit the stash was based on
+        # Its second parent is the index commit
+        if len(stash_commit.parents) < 1:
+            raise ValueError("Invalid stash entry: no parent commits")
+
+        base_commit_sha = stash_commit.parents[0]
+
+        # Get current HEAD to determine if we can apply cleanly
+        try:
+            current_head = self._repo.refs[b"HEAD"]
+        except KeyError:
+            raise ValueError("Cannot pop stash: no HEAD")
+
+        # Check if we're at the same commit where the stash was created
+        # If not, we need to do a three-way merge
+        if current_head != base_commit_sha:
+            # For now, we'll apply changes directly but this could cause conflicts
+            # A full implementation would do a three-way merge
+            pass
+
+        # Apply the stash changes to the working tree and index
+        # Get config for working directory update
+        config = self._repo.get_config()
+        honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
+
+        if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
+            validate_path_element = validate_path_element_ntfs
+        else:
+            validate_path_element = validate_path_element_default
+
+        if config.get_boolean(b"core", b"symlinks", True):
+            symlink_fn = symlink
+        else:
+
+            def symlink_fn(source, target) -> None:  # type: ignore
+                mode = "w" + ("b" if isinstance(source, bytes) else "")
+                with open(target, mode) as f:
+                    f.write(source)
+
+        # Get blob normalizer for line ending conversion
+        blob_normalizer = self._repo.get_blob_normalizer()
+
+        # Open the index
+        repo_index = self._repo.open_index()
+
+        # Apply working tree changes
+        stash_tree_id = stash_commit.tree
+        repo_path = os.fsencode(self._repo.path)
+
+        # First, if we have index changes (second parent), restore the index state
+        if len(stash_commit.parents) >= 2:
+            index_commit_sha = stash_commit.parents[1]
+            index_commit = self._repo.get_object(index_commit_sha)
+            assert isinstance(index_commit, Commit)
+            index_tree_id = index_commit.tree
+
+            # Update index entries from the stashed index tree
+            for entry in iter_tree_contents(self._repo.object_store, index_tree_id):
+                if not validate_path(entry.path, validate_path_element):
+                    continue
+
+                # Add to index with stage 0 (normal)
+                # Get file stats for the entry
+                full_path = _tree_to_fs_path(repo_path, entry.path)
+                try:
+                    st = os.lstat(full_path)
+                except FileNotFoundError:
+                    # File doesn't exist yet, use dummy stats
+                    st = os.stat_result((entry.mode, 0, 0, 0, 0, 0, 0, 0, 0, 0))
+                repo_index[entry.path] = index_entry_from_stat(st, entry.sha)
+
+        # Apply working tree changes from the stash
+        for entry in iter_tree_contents(self._repo.object_store, stash_tree_id):
+            if not validate_path(entry.path, validate_path_element):
+                continue
+
+            full_path = _tree_to_fs_path(repo_path, entry.path)
+
+            # Create parent directories if needed
+            parent_dir = os.path.dirname(full_path)
+            if parent_dir and not os.path.exists(parent_dir):
+                os.makedirs(parent_dir)
+
+            # Write the file
+            if entry.mode == S_IFGITLINK:
+                # Submodule - just create directory
+                if not os.path.isdir(full_path):
+                    os.mkdir(full_path)
+                st = os.lstat(full_path)
+            else:
+                obj = self._repo.object_store[entry.sha]
+                assert isinstance(obj, Blob)
+                # Apply blob normalization for checkout if normalizer is provided
+                if blob_normalizer is not None:
+                    obj = blob_normalizer.checkout_normalize(obj, entry.path)
+                st = build_file_from_blob(
+                    obj,
+                    entry.mode,
+                    full_path,
+                    honor_filemode=honor_filemode,
+                    symlink_fn=symlink_fn,
+                )
+
+            # Update index if the file wasn't already staged
+            if entry.path not in repo_index:
+                # Update with file stats from disk
+                repo_index[entry.path] = index_entry_from_stat(st, entry.sha)
+            else:
+                existing_entry = repo_index[entry.path]
+
+                if (
+                    isinstance(existing_entry, IndexEntry)
+                    and existing_entry.mode == entry.mode
+                    and existing_entry.sha == entry.sha
+                ):
+                    # Update with file stats from disk
+                    repo_index[entry.path] = index_entry_from_stat(st, entry.sha)
+
+        # Write the updated index
+        repo_index.write()
+
+        # Remove the stash entry
+        self.drop(index)
+
+        return entry
 
     def push(
         self,
@@ -107,11 +260,15 @@ class Stash:
 
         index = self._repo.open_index()
         index_tree_id = index.commit(self._repo.object_store)
+        # Create a dangling commit for the index state
+        # Note: We pass ref=None which is handled specially in do_commit
+        # to create a commit without updating any reference
         index_commit_id = self._repo.do_commit(
             tree=index_tree_id,
             message=b"Index stash",
             merge_heads=[self._repo.head()],
             no_verify=True,
+            ref=None,  # Don't update any ref
             **commit_kwargs,
         )
 
@@ -146,6 +303,22 @@ class Stash:
             **commit_kwargs,
         )
 
+        # Reset working tree and index to HEAD to match git's behavior
+        # Use update_working_tree to reset from stash tree to HEAD tree
+        # Get HEAD tree
+        head_commit = self._repo.get_object(self._repo.head())
+        assert isinstance(head_commit, Commit)
+        head_tree_id = head_commit.tree
+
+        # Update from stash tree to HEAD tree
+        # This will remove files that were in stash but not in HEAD,
+        # and restore files to their HEAD versions
+        update_working_tree(
+            self._repo,
+            old_tree_id=stash_tree_id,
+            new_tree_id=head_tree_id,
+        )
+
         return cid
 
     def __getitem__(self, index: int) -> "Entry":

+ 144 - 0
tests/test_porcelain.py

@@ -7063,3 +7063,147 @@ class FilterBranchTests(PorcelainTestCase):
         new_head = self.repo.refs[b"refs/heads/master"]
         new_commit = self.repo[new_head]
         self.assertTrue(new_commit.message.startswith(b"Second: First: "))
+
+
+class StashTests(PorcelainTestCase):
+    def setUp(self) -> None:
+        super().setUp()
+        # Create initial commit
+        with open(os.path.join(self.repo.path, "initial.txt"), "wb") as f:
+            f.write(b"initial content")
+        porcelain.add(repo=self.repo.path, paths=["initial.txt"])
+        porcelain.commit(
+            repo=self.repo.path,
+            message=b"Initial commit",
+            author=b"Test Author <test@example.com>",
+            committer=b"Test Committer <test@example.com>",
+        )
+
+    def test_stash_push_and_pop(self) -> None:
+        # Create a new file and stage it
+        new_file = os.path.join(self.repo.path, "new.txt")
+        with open(new_file, "wb") as f:
+            f.write(b"new file content")
+        porcelain.add(repo=self.repo.path, paths=["new.txt"])
+
+        # Modify existing file
+        with open(os.path.join(self.repo.path, "initial.txt"), "wb") as f:
+            f.write(b"modified content")
+
+        # Push to stash
+        porcelain.stash_push(self.repo.path)
+
+        # Verify files are reset
+        self.assertFalse(os.path.exists(new_file))
+        with open(os.path.join(self.repo.path, "initial.txt"), "rb") as f:
+            self.assertEqual(b"initial content", f.read())
+
+        # Pop the stash
+        porcelain.stash_pop(self.repo.path)
+
+        # Verify files are restored
+        self.assertTrue(os.path.exists(new_file))
+        with open(new_file, "rb") as f:
+            self.assertEqual(b"new file content", f.read())
+        with open(os.path.join(self.repo.path, "initial.txt"), "rb") as f:
+            self.assertEqual(b"modified content", f.read())
+
+        # Verify new file is in the index
+        from dulwich.index import Index
+
+        index = Index(os.path.join(self.repo.path, ".git", "index"))
+        self.assertIn(b"new.txt", index)
+
+    def test_stash_list(self) -> None:
+        # Initially no stashes
+        stashes = list(porcelain.stash_list(self.repo.path))
+        self.assertEqual(0, len(stashes))
+
+        # Create a file and stash it
+        test_file = os.path.join(self.repo.path, "test.txt")
+        with open(test_file, "wb") as f:
+            f.write(b"test content")
+        porcelain.add(repo=self.repo.path, paths=["test.txt"])
+
+        # Push first stash
+        porcelain.stash_push(self.repo.path)
+
+        # Create another file and stash it
+        test_file2 = os.path.join(self.repo.path, "test2.txt")
+        with open(test_file2, "wb") as f:
+            f.write(b"test content 2")
+        porcelain.add(repo=self.repo.path, paths=["test2.txt"])
+
+        # Push second stash
+        porcelain.stash_push(self.repo.path)
+
+        # Check stash list
+        stashes = list(porcelain.stash_list(self.repo.path))
+        self.assertEqual(2, len(stashes))
+
+        # Stashes are returned in order (most recent first)
+        self.assertEqual(0, stashes[0][0])
+        self.assertEqual(1, stashes[1][0])
+
+    def test_stash_drop(self) -> None:
+        # Create and stash some changes
+        test_file = os.path.join(self.repo.path, "test.txt")
+        with open(test_file, "wb") as f:
+            f.write(b"test content")
+        porcelain.add(repo=self.repo.path, paths=["test.txt"])
+        porcelain.stash_push(self.repo.path)
+
+        # Create another stash
+        test_file2 = os.path.join(self.repo.path, "test2.txt")
+        with open(test_file2, "wb") as f:
+            f.write(b"test content 2")
+        porcelain.add(repo=self.repo.path, paths=["test2.txt"])
+        porcelain.stash_push(self.repo.path)
+
+        # Verify we have 2 stashes
+        stashes = list(porcelain.stash_list(self.repo.path))
+        self.assertEqual(2, len(stashes))
+
+        # Drop the first stash (index 0)
+        porcelain.stash_drop(self.repo.path, 0)
+
+        # Verify we have 1 stash left
+        stashes = list(porcelain.stash_list(self.repo.path))
+        self.assertEqual(1, len(stashes))
+
+        # The remaining stash should be the one we created first
+        # Pop it and verify it's the first file
+        porcelain.stash_pop(self.repo.path)
+        self.assertTrue(os.path.exists(test_file))
+        self.assertFalse(os.path.exists(test_file2))
+
+    def test_stash_pop_empty(self) -> None:
+        # Attempting to pop from empty stash should raise an error
+        with self.assertRaises(IndexError):
+            porcelain.stash_pop(self.repo.path)
+
+    def test_stash_with_untracked_files(self) -> None:
+        # Create an untracked file
+        untracked_file = os.path.join(self.repo.path, "untracked.txt")
+        with open(untracked_file, "wb") as f:
+            f.write(b"untracked content")
+
+        # Create a tracked change
+        tracked_file = os.path.join(self.repo.path, "tracked.txt")
+        with open(tracked_file, "wb") as f:
+            f.write(b"tracked content")
+        porcelain.add(repo=self.repo.path, paths=["tracked.txt"])
+
+        # Stash (by default, untracked files are not included)
+        porcelain.stash_push(self.repo.path)
+
+        # Untracked file should still exist
+        self.assertTrue(os.path.exists(untracked_file))
+        # Tracked file should be gone
+        self.assertFalse(os.path.exists(tracked_file))
+
+        # Pop the stash
+        porcelain.stash_pop(self.repo.path)
+
+        # Tracked file should be restored
+        self.assertTrue(os.path.exists(tracked_file))

+ 89 - 0
tests/test_stash.py

@@ -127,3 +127,92 @@ class StashTests(TestCase):
         custom_ref = b"refs/custom_stash"
         stash = Stash(self.repo, ref=custom_ref)
         self.assertEqual(custom_ref, stash._ref)
+
+    def test_pop_stash(self) -> None:
+        stash = Stash.from_repo(self.repo)
+
+        # Make sure logs directory exists for reflog
+        os.makedirs(os.path.join(self.repo.commondir(), "logs"), exist_ok=True)
+
+        # Create a file and add it to the index
+        file_path = os.path.join(self.repo_dir, "testfile.txt")
+        with open(file_path, "wb") as f:
+            f.write(b"test data")
+        self.repo.stage(["testfile.txt"])
+
+        # Push to stash
+        stash.push(message=b"Test stash message")
+        self.assertEqual(1, len(stash))
+
+        # After stash push, the file should be removed from working tree
+        # (matching git's behavior)
+        self.assertFalse(os.path.exists(file_path))
+
+        # Pop the stash
+        stash.pop(0)
+
+        # Verify file is restored
+        self.assertTrue(os.path.exists(file_path))
+        with open(file_path, "rb") as f:
+            self.assertEqual(b"test data", f.read())
+
+        # Verify stash is empty
+        self.assertEqual(0, len(stash))
+
+        # Verify the file is in the index
+        index = self.repo.open_index()
+        self.assertIn(b"testfile.txt", index)
+
+    def test_pop_stash_with_index_changes(self) -> None:
+        stash = Stash.from_repo(self.repo)
+
+        # Make sure logs directory exists for reflog
+        os.makedirs(os.path.join(self.repo.commondir(), "logs"), exist_ok=True)
+
+        # First commit a file so we have tracked files
+        tracked_path = os.path.join(self.repo_dir, "tracked.txt")
+        with open(tracked_path, "wb") as f:
+            f.write(b"original content")
+        self.repo.stage(["tracked.txt"])
+        self.repo.do_commit(b"Add tracked file")
+
+        # Modify the tracked file and stage it
+        with open(tracked_path, "wb") as f:
+            f.write(b"staged changes")
+        self.repo.stage(["tracked.txt"])
+
+        # Modify it again but don't stage
+        with open(tracked_path, "wb") as f:
+            f.write(b"working tree changes")
+
+        # Create a new file and stage it
+        new_file_path = os.path.join(self.repo_dir, "new.txt")
+        with open(new_file_path, "wb") as f:
+            f.write(b"new file content")
+        self.repo.stage(["new.txt"])
+
+        # Push to stash
+        stash.push(message=b"Test stash with index")
+        self.assertEqual(1, len(stash))
+
+        # After stash push, new file should be removed and tracked file reset
+        self.assertFalse(os.path.exists(new_file_path))
+        with open(tracked_path, "rb") as f:
+            self.assertEqual(b"original content", f.read())
+
+        # Pop the stash
+        stash.pop(0)
+
+        # Verify tracked file has working tree changes
+        self.assertTrue(os.path.exists(tracked_path))
+        with open(tracked_path, "rb") as f:
+            self.assertEqual(b"working tree changes", f.read())
+
+        # Verify new file is restored
+        self.assertTrue(os.path.exists(new_file_path))
+        with open(new_file_path, "rb") as f:
+            self.assertEqual(b"new file content", f.read())
+
+        # Verify index has the staged changes
+        index = self.repo.open_index()
+        self.assertIn(b"new.txt", index)