| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- # stash.py
- # Copyright (C) 2018 Jelmer Vernooij <jelmer@samba.org>
- #
- # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Stash handling."""
- import os
- import sys
- from typing import TYPE_CHECKING, TypedDict
- from .diff_tree import tree_changes
- from .file import GitFile
- from .index import (
- IndexEntry,
- _tree_to_fs_path,
- build_file_from_blob,
- commit_tree,
- index_entry_from_stat,
- iter_fresh_objects,
- symlink,
- update_working_tree,
- validate_path,
- validate_path_element_default,
- validate_path_element_hfs,
- validate_path_element_ntfs,
- )
- from .object_store import iter_tree_contents
- from .objects import S_IFGITLINK, Blob, Commit, ObjectID, TreeEntry
- from .reflog import drop_reflog_entry, read_reflog
- from .refs import Ref
- if TYPE_CHECKING:
- from .reflog import Entry
- from .repo import Repo
- class CommitKwargs(TypedDict, total=False):
- """Keyword arguments for do_commit."""
- committer: bytes
- author: bytes
- DEFAULT_STASH_REF = Ref(b"refs/stash")
- class Stash:
- """A Git stash.
- Note that this doesn't currently update the working tree.
- """
- def __init__(self, repo: "Repo", ref: Ref = DEFAULT_STASH_REF) -> None:
- """Initialize Stash.
- Args:
- repo: Repository object
- ref: Stash reference name
- """
- self._ref = ref
- self._repo = repo
- @property
- def _reflog_path(self) -> str:
- return os.path.join(self._repo.commondir(), "logs", os.fsdecode(self._ref))
- def stashes(self) -> list["Entry"]:
- """Get list of stash entries.
- Returns:
- List of stash entries in chronological order
- """
- try:
- with GitFile(self._reflog_path, "rb") as f:
- return list(reversed(list(read_reflog(f))))
- except FileNotFoundError:
- return []
- @classmethod
- def from_repo(cls, repo: "Repo") -> "Stash":
- """Create a new stash from a Repo object."""
- return cls(repo)
- def drop(self, index: int) -> None:
- """Drop entry with specified index."""
- with open(self._reflog_path, "rb+") as f:
- drop_reflog_entry(f, index, rewrite=True)
- if len(self) == 0:
- os.remove(self._reflog_path)
- del self._repo.refs[self._ref]
- return
- if index == 0:
- self._repo.refs[self._ref] = self[0].new_sha
- def pop(self, index: int) -> "Entry":
- """Pop a stash entry and apply its changes.
- Args:
- index: Index of the stash entry to pop (0 is the most recent)
- Returns:
- The stash entry that was popped
- """
- # Get the stash entry before removing it
- entry = self[index]
- # Get the stash commit
- stash_commit = self._repo.get_object(entry.new_sha)
- assert isinstance(stash_commit, Commit)
- # The stash commit has the working tree changes
- # Its first parent is the commit the stash was based on
- # Its second parent is the index commit
- if len(stash_commit.parents) < 1:
- raise ValueError("Invalid stash entry: no parent commits")
- base_commit_sha = stash_commit.parents[0]
- # Get current HEAD to determine if we can apply cleanly
- try:
- from dulwich.refs import HEADREF
- current_head = self._repo.refs[HEADREF]
- except KeyError:
- raise ValueError("Cannot pop stash: no HEAD")
- # Check if we're at the same commit where the stash was created
- # If not, we need to do a three-way merge
- if current_head != base_commit_sha:
- # For now, we'll apply changes directly but this could cause conflicts
- # A full implementation would do a three-way merge
- pass
- # Apply the stash changes to the working tree and index
- # Get config for working directory update
- config = self._repo.get_config()
- honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
- if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
- validate_path_element = validate_path_element_ntfs
- elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
- validate_path_element = validate_path_element_hfs
- else:
- validate_path_element = validate_path_element_default
- if config.get_boolean(b"core", b"symlinks", True):
- symlink_fn = symlink
- else:
- def symlink_fn( # type: ignore[misc,unused-ignore]
- src: str | bytes,
- dst: str | bytes,
- target_is_directory: bool = False,
- *,
- dir_fd: int | None = None,
- ) -> None:
- mode = "w" + ("b" if isinstance(src, bytes) else "")
- with open(dst, mode) as f:
- f.write(src)
- # Get blob normalizer for line ending conversion
- blob_normalizer = self._repo.get_blob_normalizer()
- # Open the index
- repo_index = self._repo.open_index()
- # Apply working tree changes
- stash_tree_id = stash_commit.tree
- repo_path = os.fsencode(self._repo.path)
- # First, if we have index changes (second parent), restore the index state
- if len(stash_commit.parents) >= 2:
- index_commit_sha = stash_commit.parents[1]
- index_commit = self._repo.get_object(index_commit_sha)
- assert isinstance(index_commit, Commit)
- index_tree_id = index_commit.tree
- # Update index entries from the stashed index tree
- tree_entry: TreeEntry
- for tree_entry in iter_tree_contents(
- self._repo.object_store, index_tree_id
- ):
- assert (
- tree_entry.path is not None
- and tree_entry.mode is not None
- and tree_entry.sha is not None
- )
- if not validate_path(tree_entry.path, validate_path_element):
- continue
- # Add to index with stage 0 (normal)
- # Get file stats for the entry
- full_path = _tree_to_fs_path(repo_path, tree_entry.path)
- try:
- st = os.lstat(full_path)
- except FileNotFoundError:
- # File doesn't exist yet, use dummy stats
- st = os.stat_result((tree_entry.mode, 0, 0, 0, 0, 0, 0, 0, 0, 0))
- repo_index[tree_entry.path] = index_entry_from_stat(st, tree_entry.sha)
- # Apply working tree changes from the stash
- tree_entry2: TreeEntry
- for tree_entry2 in iter_tree_contents(self._repo.object_store, stash_tree_id):
- assert (
- tree_entry2.path is not None
- and tree_entry2.mode is not None
- and tree_entry2.sha is not None
- )
- if not validate_path(tree_entry2.path, validate_path_element):
- continue
- full_path = _tree_to_fs_path(repo_path, tree_entry2.path)
- # Create parent directories if needed
- parent_dir = os.path.dirname(full_path)
- if parent_dir and not os.path.exists(parent_dir):
- os.makedirs(parent_dir)
- # Write the file
- if tree_entry2.mode == S_IFGITLINK:
- # Submodule - just create directory
- if not os.path.isdir(full_path):
- os.mkdir(full_path)
- st = os.lstat(full_path)
- else:
- obj = self._repo.object_store[tree_entry2.sha]
- assert isinstance(obj, Blob)
- # Apply blob normalization for checkout if normalizer is provided
- if blob_normalizer is not None:
- obj = blob_normalizer.checkout_normalize(obj, tree_entry2.path)
- st = build_file_from_blob(
- obj,
- tree_entry2.mode,
- full_path,
- honor_filemode=honor_filemode,
- symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore]
- )
- # Update index if the file wasn't already staged
- if tree_entry2.path not in repo_index:
- # Update with file stats from disk
- repo_index[tree_entry2.path] = index_entry_from_stat(
- st, tree_entry2.sha
- )
- else:
- existing_entry = repo_index[tree_entry2.path]
- if (
- isinstance(existing_entry, IndexEntry)
- and existing_entry.mode == tree_entry2.mode
- and existing_entry.sha == tree_entry2.sha
- ):
- # Update with file stats from disk
- repo_index[tree_entry2.path] = index_entry_from_stat(
- st, tree_entry2.sha
- )
- # Write the updated index
- repo_index.write()
- # Remove the stash entry
- self.drop(index)
- return entry
- def push(
- self,
- committer: bytes | None = None,
- author: bytes | None = None,
- message: bytes | None = None,
- ) -> ObjectID:
- """Create a new stash.
- Args:
- committer: Optional committer name to use
- author: Optional author name to use
- message: Optional commit message
- """
- # First, create the index commit.
- commit_kwargs = CommitKwargs()
- if committer is not None:
- commit_kwargs["committer"] = committer
- if author is not None:
- commit_kwargs["author"] = author
- index = self._repo.open_index()
- index_tree_id = index.commit(self._repo.object_store)
- # Create a dangling commit for the index state
- # Note: We pass ref=None which is handled specially in do_commit
- # to create a commit without updating any reference
- index_commit_id = self._repo.get_worktree().commit(
- tree=index_tree_id,
- message=b"Index stash",
- merge_heads=[self._repo.head()],
- no_verify=True,
- ref=None, # Don't update any ref
- **commit_kwargs,
- )
- # Then, the working tree one.
- # Filter out entries with None values since commit_tree expects non-None values
- fresh_objects = [
- (path, sha, mode)
- for path, sha, mode in iter_fresh_objects(
- index,
- os.fsencode(self._repo.path),
- object_store=self._repo.object_store,
- )
- if sha is not None and mode is not None
- ]
- stash_tree_id = commit_tree(
- self._repo.object_store,
- fresh_objects,
- )
- if message is None:
- message = b"A stash on " + self._repo.head()
- # TODO(jelmer): Just pass parents into do_commit()?
- self._repo.refs[self._ref] = self._repo.head()
- cid: ObjectID = self._repo.get_worktree().commit(
- ref=self._ref,
- tree=stash_tree_id,
- message=message,
- merge_heads=[index_commit_id],
- no_verify=True,
- **commit_kwargs,
- )
- # Reset working tree and index to HEAD to match git's behavior
- # Use update_working_tree to reset from stash tree to HEAD tree
- # Get HEAD tree
- head_commit = self._repo.get_object(self._repo.head())
- assert isinstance(head_commit, Commit)
- head_tree_id = head_commit.tree
- # Update from stash tree to HEAD tree
- # This will remove files that were in stash but not in HEAD,
- # and restore files to their HEAD versions
- changes = tree_changes(self._repo.object_store, stash_tree_id, head_tree_id)
- update_working_tree(
- self._repo,
- old_tree_id=stash_tree_id,
- new_tree_id=head_tree_id,
- change_iterator=changes,
- allow_overwrite_modified=True, # We need to overwrite modified files
- )
- return cid
- def __getitem__(self, index: int) -> "Entry":
- """Get stash entry by index."""
- return list(self.stashes())[index]
- def __len__(self) -> int:
- """Return number of stash entries."""
- return len(list(self.stashes()))
|