stash.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. # stash.py
  2. # Copyright (C) 2018 Jelmer Vernooij <jelmer@samba.org>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Stash handling."""
  22. import os
  23. import sys
  24. from typing import TYPE_CHECKING, Optional, TypedDict
  25. from .diff_tree import tree_changes
  26. from .file import GitFile
  27. from .index import (
  28. IndexEntry,
  29. _tree_to_fs_path,
  30. build_file_from_blob,
  31. commit_tree,
  32. index_entry_from_stat,
  33. iter_fresh_objects,
  34. iter_tree_contents,
  35. symlink,
  36. update_working_tree,
  37. validate_path,
  38. validate_path_element_default,
  39. validate_path_element_hfs,
  40. validate_path_element_ntfs,
  41. )
  42. from .objects import S_IFGITLINK, Blob, Commit, ObjectID
  43. from .reflog import drop_reflog_entry, read_reflog
  44. from .refs import Ref
  45. if TYPE_CHECKING:
  46. from .reflog import Entry
  47. from .repo import Repo
  48. class CommitKwargs(TypedDict, total=False):
  49. """Keyword arguments for do_commit."""
  50. committer: bytes
  51. author: bytes
  52. DEFAULT_STASH_REF = b"refs/stash"
  53. class Stash:
  54. """A Git stash.
  55. Note that this doesn't currently update the working tree.
  56. """
  57. def __init__(self, repo: "Repo", ref: Ref = DEFAULT_STASH_REF) -> None:
  58. self._ref = ref
  59. self._repo = repo
  60. @property
  61. def _reflog_path(self) -> str:
  62. return os.path.join(self._repo.commondir(), "logs", os.fsdecode(self._ref))
  63. def stashes(self) -> list["Entry"]:
  64. try:
  65. with GitFile(self._reflog_path, "rb") as f:
  66. return list(reversed(list(read_reflog(f))))
  67. except FileNotFoundError:
  68. return []
  69. @classmethod
  70. def from_repo(cls, repo: "Repo") -> "Stash":
  71. """Create a new stash from a Repo object."""
  72. return cls(repo)
  73. def drop(self, index: int) -> None:
  74. """Drop entry with specified index."""
  75. with open(self._reflog_path, "rb+") as f:
  76. drop_reflog_entry(f, index, rewrite=True)
  77. if len(self) == 0:
  78. os.remove(self._reflog_path)
  79. del self._repo.refs[self._ref]
  80. return
  81. if index == 0:
  82. self._repo.refs[self._ref] = self[0].new_sha
  83. def pop(self, index: int) -> "Entry":
  84. """Pop a stash entry and apply its changes.
  85. Args:
  86. index: Index of the stash entry to pop (0 is the most recent)
  87. Returns:
  88. The stash entry that was popped
  89. """
  90. # Get the stash entry before removing it
  91. entry = self[index]
  92. # Get the stash commit
  93. stash_commit = self._repo.get_object(entry.new_sha)
  94. assert isinstance(stash_commit, Commit)
  95. # The stash commit has the working tree changes
  96. # Its first parent is the commit the stash was based on
  97. # Its second parent is the index commit
  98. if len(stash_commit.parents) < 1:
  99. raise ValueError("Invalid stash entry: no parent commits")
  100. base_commit_sha = stash_commit.parents[0]
  101. # Get current HEAD to determine if we can apply cleanly
  102. try:
  103. current_head = self._repo.refs[b"HEAD"]
  104. except KeyError:
  105. raise ValueError("Cannot pop stash: no HEAD")
  106. # Check if we're at the same commit where the stash was created
  107. # If not, we need to do a three-way merge
  108. if current_head != base_commit_sha:
  109. # For now, we'll apply changes directly but this could cause conflicts
  110. # A full implementation would do a three-way merge
  111. pass
  112. # Apply the stash changes to the working tree and index
  113. # Get config for working directory update
  114. config = self._repo.get_config()
  115. honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
  116. if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
  117. validate_path_element = validate_path_element_ntfs
  118. elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
  119. validate_path_element = validate_path_element_hfs
  120. else:
  121. validate_path_element = validate_path_element_default
  122. if config.get_boolean(b"core", b"symlinks", True):
  123. symlink_fn = symlink
  124. else:
  125. def symlink_fn(source, target) -> None: # type: ignore
  126. mode = "w" + ("b" if isinstance(source, bytes) else "")
  127. with open(target, mode) as f:
  128. f.write(source)
  129. # Get blob normalizer for line ending conversion
  130. blob_normalizer = self._repo.get_blob_normalizer()
  131. # Open the index
  132. repo_index = self._repo.open_index()
  133. # Apply working tree changes
  134. stash_tree_id = stash_commit.tree
  135. repo_path = os.fsencode(self._repo.path)
  136. # First, if we have index changes (second parent), restore the index state
  137. if len(stash_commit.parents) >= 2:
  138. index_commit_sha = stash_commit.parents[1]
  139. index_commit = self._repo.get_object(index_commit_sha)
  140. assert isinstance(index_commit, Commit)
  141. index_tree_id = index_commit.tree
  142. # Update index entries from the stashed index tree
  143. for entry in iter_tree_contents(self._repo.object_store, index_tree_id):
  144. if not validate_path(entry.path, validate_path_element):
  145. continue
  146. # Add to index with stage 0 (normal)
  147. # Get file stats for the entry
  148. full_path = _tree_to_fs_path(repo_path, entry.path)
  149. try:
  150. st = os.lstat(full_path)
  151. except FileNotFoundError:
  152. # File doesn't exist yet, use dummy stats
  153. st = os.stat_result((entry.mode, 0, 0, 0, 0, 0, 0, 0, 0, 0))
  154. repo_index[entry.path] = index_entry_from_stat(st, entry.sha)
  155. # Apply working tree changes from the stash
  156. for entry in iter_tree_contents(self._repo.object_store, stash_tree_id):
  157. if not validate_path(entry.path, validate_path_element):
  158. continue
  159. full_path = _tree_to_fs_path(repo_path, entry.path)
  160. # Create parent directories if needed
  161. parent_dir = os.path.dirname(full_path)
  162. if parent_dir and not os.path.exists(parent_dir):
  163. os.makedirs(parent_dir)
  164. # Write the file
  165. if entry.mode == S_IFGITLINK:
  166. # Submodule - just create directory
  167. if not os.path.isdir(full_path):
  168. os.mkdir(full_path)
  169. st = os.lstat(full_path)
  170. else:
  171. obj = self._repo.object_store[entry.sha]
  172. assert isinstance(obj, Blob)
  173. # Apply blob normalization for checkout if normalizer is provided
  174. if blob_normalizer is not None:
  175. obj = blob_normalizer.checkout_normalize(obj, entry.path)
  176. st = build_file_from_blob(
  177. obj,
  178. entry.mode,
  179. full_path,
  180. honor_filemode=honor_filemode,
  181. symlink_fn=symlink_fn,
  182. )
  183. # Update index if the file wasn't already staged
  184. if entry.path not in repo_index:
  185. # Update with file stats from disk
  186. repo_index[entry.path] = index_entry_from_stat(st, entry.sha)
  187. else:
  188. existing_entry = repo_index[entry.path]
  189. if (
  190. isinstance(existing_entry, IndexEntry)
  191. and existing_entry.mode == entry.mode
  192. and existing_entry.sha == entry.sha
  193. ):
  194. # Update with file stats from disk
  195. repo_index[entry.path] = index_entry_from_stat(st, entry.sha)
  196. # Write the updated index
  197. repo_index.write()
  198. # Remove the stash entry
  199. self.drop(index)
  200. return entry
  201. def push(
  202. self,
  203. committer: Optional[bytes] = None,
  204. author: Optional[bytes] = None,
  205. message: Optional[bytes] = None,
  206. ) -> ObjectID:
  207. """Create a new stash.
  208. Args:
  209. committer: Optional committer name to use
  210. author: Optional author name to use
  211. message: Optional commit message
  212. """
  213. # First, create the index commit.
  214. commit_kwargs = CommitKwargs()
  215. if committer is not None:
  216. commit_kwargs["committer"] = committer
  217. if author is not None:
  218. commit_kwargs["author"] = author
  219. index = self._repo.open_index()
  220. index_tree_id = index.commit(self._repo.object_store)
  221. # Create a dangling commit for the index state
  222. # Note: We pass ref=None which is handled specially in do_commit
  223. # to create a commit without updating any reference
  224. index_commit_id = self._repo.get_worktree().commit(
  225. tree=index_tree_id,
  226. message=b"Index stash",
  227. merge_heads=[self._repo.head()],
  228. no_verify=True,
  229. ref=None, # Don't update any ref
  230. **commit_kwargs,
  231. )
  232. # Then, the working tree one.
  233. # Filter out entries with None values since commit_tree expects non-None values
  234. fresh_objects = [
  235. (path, sha, mode)
  236. for path, sha, mode in iter_fresh_objects(
  237. index,
  238. os.fsencode(self._repo.path),
  239. object_store=self._repo.object_store,
  240. )
  241. if sha is not None and mode is not None
  242. ]
  243. stash_tree_id = commit_tree(
  244. self._repo.object_store,
  245. fresh_objects,
  246. )
  247. if message is None:
  248. message = b"A stash on " + self._repo.head()
  249. # TODO(jelmer): Just pass parents into do_commit()?
  250. self._repo.refs[self._ref] = self._repo.head()
  251. cid = self._repo.get_worktree().commit(
  252. ref=self._ref,
  253. tree=stash_tree_id,
  254. message=message,
  255. merge_heads=[index_commit_id],
  256. no_verify=True,
  257. **commit_kwargs,
  258. )
  259. # Reset working tree and index to HEAD to match git's behavior
  260. # Use update_working_tree to reset from stash tree to HEAD tree
  261. # Get HEAD tree
  262. head_commit = self._repo.get_object(self._repo.head())
  263. assert isinstance(head_commit, Commit)
  264. head_tree_id = head_commit.tree
  265. # Update from stash tree to HEAD tree
  266. # This will remove files that were in stash but not in HEAD,
  267. # and restore files to their HEAD versions
  268. changes = tree_changes(self._repo.object_store, stash_tree_id, head_tree_id)
  269. update_working_tree(
  270. self._repo,
  271. old_tree_id=stash_tree_id,
  272. new_tree_id=head_tree_id,
  273. change_iterator=changes,
  274. allow_overwrite_modified=True, # We need to overwrite modified files
  275. )
  276. return cid
  277. def __getitem__(self, index: int) -> "Entry":
  278. return list(self.stashes())[index]
  279. def __len__(self) -> int:
  280. return len(list(self.stashes()))