stash.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. # stash.py
  2. # Copyright (C) 2018 Jelmer Vernooij <jelmer@samba.org>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Stash handling."""
  22. import os
  23. import sys
  24. from typing import TYPE_CHECKING, Optional, TypedDict, Union
  25. from .diff_tree import tree_changes
  26. from .file import GitFile
  27. from .index import (
  28. IndexEntry,
  29. _tree_to_fs_path,
  30. build_file_from_blob,
  31. commit_tree,
  32. index_entry_from_stat,
  33. iter_fresh_objects,
  34. symlink,
  35. update_working_tree,
  36. validate_path,
  37. validate_path_element_default,
  38. validate_path_element_hfs,
  39. validate_path_element_ntfs,
  40. )
  41. from .object_store import iter_tree_contents
  42. from .objects import S_IFGITLINK, Blob, Commit, ObjectID, TreeEntry
  43. from .reflog import drop_reflog_entry, read_reflog
  44. from .refs import Ref
  45. if TYPE_CHECKING:
  46. from .reflog import Entry
  47. from .repo import Repo
  48. class CommitKwargs(TypedDict, total=False):
  49. """Keyword arguments for do_commit."""
  50. committer: bytes
  51. author: bytes
  52. DEFAULT_STASH_REF = b"refs/stash"
  53. class Stash:
  54. """A Git stash.
  55. Note that this doesn't currently update the working tree.
  56. """
  57. def __init__(self, repo: "Repo", ref: Ref = DEFAULT_STASH_REF) -> None:
  58. """Initialize Stash.
  59. Args:
  60. repo: Repository object
  61. ref: Stash reference name
  62. """
  63. self._ref = ref
  64. self._repo = repo
  65. @property
  66. def _reflog_path(self) -> str:
  67. return os.path.join(self._repo.commondir(), "logs", os.fsdecode(self._ref))
  68. def stashes(self) -> list["Entry"]:
  69. """Get list of stash entries.
  70. Returns:
  71. List of stash entries in chronological order
  72. """
  73. try:
  74. with GitFile(self._reflog_path, "rb") as f:
  75. return list(reversed(list(read_reflog(f))))
  76. except FileNotFoundError:
  77. return []
  78. @classmethod
  79. def from_repo(cls, repo: "Repo") -> "Stash":
  80. """Create a new stash from a Repo object."""
  81. return cls(repo)
  82. def drop(self, index: int) -> None:
  83. """Drop entry with specified index."""
  84. with open(self._reflog_path, "rb+") as f:
  85. drop_reflog_entry(f, index, rewrite=True)
  86. if len(self) == 0:
  87. os.remove(self._reflog_path)
  88. del self._repo.refs[self._ref]
  89. return
  90. if index == 0:
  91. self._repo.refs[self._ref] = self[0].new_sha
  92. def pop(self, index: int) -> "Entry":
  93. """Pop a stash entry and apply its changes.
  94. Args:
  95. index: Index of the stash entry to pop (0 is the most recent)
  96. Returns:
  97. The stash entry that was popped
  98. """
  99. # Get the stash entry before removing it
  100. entry = self[index]
  101. # Get the stash commit
  102. stash_commit = self._repo.get_object(entry.new_sha)
  103. assert isinstance(stash_commit, Commit)
  104. # The stash commit has the working tree changes
  105. # Its first parent is the commit the stash was based on
  106. # Its second parent is the index commit
  107. if len(stash_commit.parents) < 1:
  108. raise ValueError("Invalid stash entry: no parent commits")
  109. base_commit_sha = stash_commit.parents[0]
  110. # Get current HEAD to determine if we can apply cleanly
  111. try:
  112. current_head = self._repo.refs[b"HEAD"]
  113. except KeyError:
  114. raise ValueError("Cannot pop stash: no HEAD")
  115. # Check if we're at the same commit where the stash was created
  116. # If not, we need to do a three-way merge
  117. if current_head != base_commit_sha:
  118. # For now, we'll apply changes directly but this could cause conflicts
  119. # A full implementation would do a three-way merge
  120. pass
  121. # Apply the stash changes to the working tree and index
  122. # Get config for working directory update
  123. config = self._repo.get_config()
  124. honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
  125. if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
  126. validate_path_element = validate_path_element_ntfs
  127. elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
  128. validate_path_element = validate_path_element_hfs
  129. else:
  130. validate_path_element = validate_path_element_default
  131. if config.get_boolean(b"core", b"symlinks", True):
  132. symlink_fn = symlink
  133. else:
  134. def symlink_fn( # type: ignore[misc,unused-ignore]
  135. src: Union[str, bytes],
  136. dst: Union[str, bytes],
  137. target_is_directory: bool = False,
  138. *,
  139. dir_fd: Optional[int] = None,
  140. ) -> None:
  141. mode = "w" + ("b" if isinstance(src, bytes) else "")
  142. with open(dst, mode) as f:
  143. f.write(src)
  144. # Get blob normalizer for line ending conversion
  145. blob_normalizer = self._repo.get_blob_normalizer()
  146. # Open the index
  147. repo_index = self._repo.open_index()
  148. # Apply working tree changes
  149. stash_tree_id = stash_commit.tree
  150. repo_path = os.fsencode(self._repo.path)
  151. # First, if we have index changes (second parent), restore the index state
  152. if len(stash_commit.parents) >= 2:
  153. index_commit_sha = stash_commit.parents[1]
  154. index_commit = self._repo.get_object(index_commit_sha)
  155. assert isinstance(index_commit, Commit)
  156. index_tree_id = index_commit.tree
  157. # Update index entries from the stashed index tree
  158. tree_entry: TreeEntry
  159. for tree_entry in iter_tree_contents(
  160. self._repo.object_store, index_tree_id
  161. ):
  162. assert (
  163. tree_entry.path is not None
  164. and tree_entry.mode is not None
  165. and tree_entry.sha is not None
  166. )
  167. if not validate_path(tree_entry.path, validate_path_element):
  168. continue
  169. # Add to index with stage 0 (normal)
  170. # Get file stats for the entry
  171. full_path = _tree_to_fs_path(repo_path, tree_entry.path)
  172. try:
  173. st = os.lstat(full_path)
  174. except FileNotFoundError:
  175. # File doesn't exist yet, use dummy stats
  176. st = os.stat_result((tree_entry.mode, 0, 0, 0, 0, 0, 0, 0, 0, 0))
  177. repo_index[tree_entry.path] = index_entry_from_stat(st, tree_entry.sha)
  178. # Apply working tree changes from the stash
  179. tree_entry2: TreeEntry
  180. for tree_entry2 in iter_tree_contents(self._repo.object_store, stash_tree_id):
  181. assert (
  182. tree_entry2.path is not None
  183. and tree_entry2.mode is not None
  184. and tree_entry2.sha is not None
  185. )
  186. if not validate_path(tree_entry2.path, validate_path_element):
  187. continue
  188. full_path = _tree_to_fs_path(repo_path, tree_entry2.path)
  189. # Create parent directories if needed
  190. parent_dir = os.path.dirname(full_path)
  191. if parent_dir and not os.path.exists(parent_dir):
  192. os.makedirs(parent_dir)
  193. # Write the file
  194. if tree_entry2.mode == S_IFGITLINK:
  195. # Submodule - just create directory
  196. if not os.path.isdir(full_path):
  197. os.mkdir(full_path)
  198. st = os.lstat(full_path)
  199. else:
  200. obj = self._repo.object_store[tree_entry2.sha]
  201. assert isinstance(obj, Blob)
  202. # Apply blob normalization for checkout if normalizer is provided
  203. if blob_normalizer is not None:
  204. obj = blob_normalizer.checkout_normalize(obj, tree_entry2.path)
  205. st = build_file_from_blob(
  206. obj,
  207. tree_entry2.mode,
  208. full_path,
  209. honor_filemode=honor_filemode,
  210. symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore]
  211. )
  212. # Update index if the file wasn't already staged
  213. if tree_entry2.path not in repo_index:
  214. # Update with file stats from disk
  215. repo_index[tree_entry2.path] = index_entry_from_stat(
  216. st, tree_entry2.sha
  217. )
  218. else:
  219. existing_entry = repo_index[tree_entry2.path]
  220. if (
  221. isinstance(existing_entry, IndexEntry)
  222. and existing_entry.mode == tree_entry2.mode
  223. and existing_entry.sha == tree_entry2.sha
  224. ):
  225. # Update with file stats from disk
  226. repo_index[tree_entry2.path] = index_entry_from_stat(
  227. st, tree_entry2.sha
  228. )
  229. # Write the updated index
  230. repo_index.write()
  231. # Remove the stash entry
  232. self.drop(index)
  233. return entry
  234. def push(
  235. self,
  236. committer: Optional[bytes] = None,
  237. author: Optional[bytes] = None,
  238. message: Optional[bytes] = None,
  239. ) -> ObjectID:
  240. """Create a new stash.
  241. Args:
  242. committer: Optional committer name to use
  243. author: Optional author name to use
  244. message: Optional commit message
  245. """
  246. # First, create the index commit.
  247. commit_kwargs = CommitKwargs()
  248. if committer is not None:
  249. commit_kwargs["committer"] = committer
  250. if author is not None:
  251. commit_kwargs["author"] = author
  252. index = self._repo.open_index()
  253. index_tree_id = index.commit(self._repo.object_store)
  254. # Create a dangling commit for the index state
  255. # Note: We pass ref=None which is handled specially in do_commit
  256. # to create a commit without updating any reference
  257. index_commit_id = self._repo.get_worktree().commit(
  258. tree=index_tree_id,
  259. message=b"Index stash",
  260. merge_heads=[self._repo.head()],
  261. no_verify=True,
  262. ref=None, # Don't update any ref
  263. **commit_kwargs,
  264. )
  265. # Then, the working tree one.
  266. # Filter out entries with None values since commit_tree expects non-None values
  267. fresh_objects = [
  268. (path, sha, mode)
  269. for path, sha, mode in iter_fresh_objects(
  270. index,
  271. os.fsencode(self._repo.path),
  272. object_store=self._repo.object_store,
  273. )
  274. if sha is not None and mode is not None
  275. ]
  276. stash_tree_id = commit_tree(
  277. self._repo.object_store,
  278. fresh_objects,
  279. )
  280. if message is None:
  281. message = b"A stash on " + self._repo.head()
  282. # TODO(jelmer): Just pass parents into do_commit()?
  283. self._repo.refs[self._ref] = self._repo.head()
  284. cid: ObjectID = self._repo.get_worktree().commit(
  285. ref=self._ref,
  286. tree=stash_tree_id,
  287. message=message,
  288. merge_heads=[index_commit_id],
  289. no_verify=True,
  290. **commit_kwargs,
  291. )
  292. # Reset working tree and index to HEAD to match git's behavior
  293. # Use update_working_tree to reset from stash tree to HEAD tree
  294. # Get HEAD tree
  295. head_commit = self._repo.get_object(self._repo.head())
  296. assert isinstance(head_commit, Commit)
  297. head_tree_id = head_commit.tree
  298. # Update from stash tree to HEAD tree
  299. # This will remove files that were in stash but not in HEAD,
  300. # and restore files to their HEAD versions
  301. changes = tree_changes(self._repo.object_store, stash_tree_id, head_tree_id)
  302. update_working_tree(
  303. self._repo,
  304. old_tree_id=stash_tree_id,
  305. new_tree_id=head_tree_id,
  306. change_iterator=changes,
  307. allow_overwrite_modified=True, # We need to overwrite modified files
  308. )
  309. return cid
  310. def __getitem__(self, index: int) -> "Entry":
  311. """Get stash entry by index."""
  312. return list(self.stashes())[index]
  313. def __len__(self) -> int:
  314. """Return number of stash entries."""
  315. return len(list(self.stashes()))