stash.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. # stash.py
  2. # Copyright (C) 2018 Jelmer Vernooij <jelmer@samba.org>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Stash handling."""
  22. import os
  23. import sys
  24. from typing import TYPE_CHECKING, TypedDict
  25. from .diff_tree import tree_changes
  26. from .file import GitFile
  27. from .index import (
  28. IndexEntry,
  29. _tree_to_fs_path,
  30. build_file_from_blob,
  31. commit_tree,
  32. index_entry_from_stat,
  33. iter_fresh_objects,
  34. symlink,
  35. update_working_tree,
  36. validate_path,
  37. validate_path_element_default,
  38. validate_path_element_hfs,
  39. validate_path_element_ntfs,
  40. )
  41. from .object_store import iter_tree_contents
  42. from .objects import S_IFGITLINK, Blob, Commit, ObjectID, TreeEntry
  43. from .reflog import drop_reflog_entry, read_reflog
  44. from .refs import Ref
  45. if TYPE_CHECKING:
  46. from .reflog import Entry
  47. from .repo import Repo
  48. class CommitKwargs(TypedDict, total=False):
  49. """Keyword arguments for do_commit."""
  50. committer: bytes
  51. author: bytes
  52. DEFAULT_STASH_REF = Ref(b"refs/stash")
  53. class Stash:
  54. """A Git stash.
  55. Note that this doesn't currently update the working tree.
  56. """
  57. def __init__(self, repo: "Repo", ref: Ref = DEFAULT_STASH_REF) -> None:
  58. """Initialize Stash.
  59. Args:
  60. repo: Repository object
  61. ref: Stash reference name
  62. """
  63. self._ref = ref
  64. self._repo = repo
  65. @property
  66. def _reflog_path(self) -> str:
  67. return os.path.join(self._repo.commondir(), "logs", os.fsdecode(self._ref))
  68. def stashes(self) -> list["Entry"]:
  69. """Get list of stash entries.
  70. Returns:
  71. List of stash entries in chronological order
  72. """
  73. try:
  74. with GitFile(self._reflog_path, "rb") as f:
  75. return list(reversed(list(read_reflog(f))))
  76. except FileNotFoundError:
  77. return []
  78. @classmethod
  79. def from_repo(cls, repo: "Repo") -> "Stash":
  80. """Create a new stash from a Repo object."""
  81. return cls(repo)
  82. def drop(self, index: int) -> None:
  83. """Drop entry with specified index."""
  84. with open(self._reflog_path, "rb+") as f:
  85. drop_reflog_entry(f, index, rewrite=True)
  86. if len(self) == 0:
  87. os.remove(self._reflog_path)
  88. del self._repo.refs[self._ref]
  89. return
  90. if index == 0:
  91. self._repo.refs[self._ref] = self[0].new_sha
  92. def pop(self, index: int) -> "Entry":
  93. """Pop a stash entry and apply its changes.
  94. Args:
  95. index: Index of the stash entry to pop (0 is the most recent)
  96. Returns:
  97. The stash entry that was popped
  98. """
  99. # Get the stash entry before removing it
  100. entry = self[index]
  101. # Get the stash commit
  102. stash_commit = self._repo.get_object(entry.new_sha)
  103. assert isinstance(stash_commit, Commit)
  104. # The stash commit has the working tree changes
  105. # Its first parent is the commit the stash was based on
  106. # Its second parent is the index commit
  107. if len(stash_commit.parents) < 1:
  108. raise ValueError("Invalid stash entry: no parent commits")
  109. base_commit_sha = stash_commit.parents[0]
  110. # Get current HEAD to determine if we can apply cleanly
  111. try:
  112. from dulwich.refs import HEADREF
  113. current_head = self._repo.refs[HEADREF]
  114. except KeyError:
  115. raise ValueError("Cannot pop stash: no HEAD")
  116. # Check if we're at the same commit where the stash was created
  117. # If not, we need to do a three-way merge
  118. if current_head != base_commit_sha:
  119. # For now, we'll apply changes directly but this could cause conflicts
  120. # A full implementation would do a three-way merge
  121. pass
  122. # Apply the stash changes to the working tree and index
  123. # Get config for working directory update
  124. config = self._repo.get_config()
  125. honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
  126. if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
  127. validate_path_element = validate_path_element_ntfs
  128. elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
  129. validate_path_element = validate_path_element_hfs
  130. else:
  131. validate_path_element = validate_path_element_default
  132. if config.get_boolean(b"core", b"symlinks", True):
  133. symlink_fn = symlink
  134. else:
  135. def symlink_fn( # type: ignore[misc,unused-ignore]
  136. src: str | bytes,
  137. dst: str | bytes,
  138. target_is_directory: bool = False,
  139. *,
  140. dir_fd: int | None = None,
  141. ) -> None:
  142. mode = "w" + ("b" if isinstance(src, bytes) else "")
  143. with open(dst, mode) as f:
  144. f.write(src)
  145. # Get blob normalizer for line ending conversion
  146. blob_normalizer = self._repo.get_blob_normalizer()
  147. # Open the index
  148. repo_index = self._repo.open_index()
  149. # Apply working tree changes
  150. stash_tree_id = stash_commit.tree
  151. repo_path = os.fsencode(self._repo.path)
  152. # First, if we have index changes (second parent), restore the index state
  153. if len(stash_commit.parents) >= 2:
  154. index_commit_sha = stash_commit.parents[1]
  155. index_commit = self._repo.get_object(index_commit_sha)
  156. assert isinstance(index_commit, Commit)
  157. index_tree_id = index_commit.tree
  158. # Update index entries from the stashed index tree
  159. tree_entry: TreeEntry
  160. for tree_entry in iter_tree_contents(
  161. self._repo.object_store, index_tree_id
  162. ):
  163. assert (
  164. tree_entry.path is not None
  165. and tree_entry.mode is not None
  166. and tree_entry.sha is not None
  167. )
  168. if not validate_path(tree_entry.path, validate_path_element):
  169. continue
  170. # Add to index with stage 0 (normal)
  171. # Get file stats for the entry
  172. full_path = _tree_to_fs_path(repo_path, tree_entry.path)
  173. try:
  174. st = os.lstat(full_path)
  175. except FileNotFoundError:
  176. # File doesn't exist yet, use dummy stats
  177. st = os.stat_result((tree_entry.mode, 0, 0, 0, 0, 0, 0, 0, 0, 0))
  178. repo_index[tree_entry.path] = index_entry_from_stat(st, tree_entry.sha)
  179. # Apply working tree changes from the stash
  180. tree_entry2: TreeEntry
  181. for tree_entry2 in iter_tree_contents(self._repo.object_store, stash_tree_id):
  182. assert (
  183. tree_entry2.path is not None
  184. and tree_entry2.mode is not None
  185. and tree_entry2.sha is not None
  186. )
  187. if not validate_path(tree_entry2.path, validate_path_element):
  188. continue
  189. full_path = _tree_to_fs_path(repo_path, tree_entry2.path)
  190. # Create parent directories if needed
  191. parent_dir = os.path.dirname(full_path)
  192. if parent_dir and not os.path.exists(parent_dir):
  193. os.makedirs(parent_dir)
  194. # Write the file
  195. if tree_entry2.mode == S_IFGITLINK:
  196. # Submodule - just create directory
  197. if not os.path.isdir(full_path):
  198. os.mkdir(full_path)
  199. st = os.lstat(full_path)
  200. else:
  201. obj = self._repo.object_store[tree_entry2.sha]
  202. assert isinstance(obj, Blob)
  203. # Apply blob normalization for checkout if normalizer is provided
  204. if blob_normalizer is not None:
  205. obj = blob_normalizer.checkout_normalize(obj, tree_entry2.path)
  206. st = build_file_from_blob(
  207. obj,
  208. tree_entry2.mode,
  209. full_path,
  210. honor_filemode=honor_filemode,
  211. symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore]
  212. )
  213. # Update index if the file wasn't already staged
  214. if tree_entry2.path not in repo_index:
  215. # Update with file stats from disk
  216. repo_index[tree_entry2.path] = index_entry_from_stat(
  217. st, tree_entry2.sha
  218. )
  219. else:
  220. existing_entry = repo_index[tree_entry2.path]
  221. if (
  222. isinstance(existing_entry, IndexEntry)
  223. and existing_entry.mode == tree_entry2.mode
  224. and existing_entry.sha == tree_entry2.sha
  225. ):
  226. # Update with file stats from disk
  227. repo_index[tree_entry2.path] = index_entry_from_stat(
  228. st, tree_entry2.sha
  229. )
  230. # Write the updated index
  231. repo_index.write()
  232. # Remove the stash entry
  233. self.drop(index)
  234. return entry
  235. def push(
  236. self,
  237. committer: bytes | None = None,
  238. author: bytes | None = None,
  239. message: bytes | None = None,
  240. ) -> ObjectID:
  241. """Create a new stash.
  242. Args:
  243. committer: Optional committer name to use
  244. author: Optional author name to use
  245. message: Optional commit message
  246. """
  247. # First, create the index commit.
  248. commit_kwargs = CommitKwargs()
  249. if committer is not None:
  250. commit_kwargs["committer"] = committer
  251. if author is not None:
  252. commit_kwargs["author"] = author
  253. index = self._repo.open_index()
  254. index_tree_id = index.commit(self._repo.object_store)
  255. # Create a dangling commit for the index state
  256. # Note: We pass ref=None which is handled specially in do_commit
  257. # to create a commit without updating any reference
  258. index_commit_id = self._repo.get_worktree().commit(
  259. tree=index_tree_id,
  260. message=b"Index stash",
  261. merge_heads=[self._repo.head()],
  262. no_verify=True,
  263. ref=None, # Don't update any ref
  264. **commit_kwargs,
  265. )
  266. # Then, the working tree one.
  267. # Filter out entries with None values since commit_tree expects non-None values
  268. fresh_objects = [
  269. (path, sha, mode)
  270. for path, sha, mode in iter_fresh_objects(
  271. index,
  272. os.fsencode(self._repo.path),
  273. object_store=self._repo.object_store,
  274. )
  275. if sha is not None and mode is not None
  276. ]
  277. stash_tree_id = commit_tree(
  278. self._repo.object_store,
  279. fresh_objects,
  280. )
  281. if message is None:
  282. message = b"A stash on " + self._repo.head()
  283. # TODO(jelmer): Just pass parents into do_commit()?
  284. self._repo.refs[self._ref] = self._repo.head()
  285. cid: ObjectID = self._repo.get_worktree().commit(
  286. ref=self._ref,
  287. tree=stash_tree_id,
  288. message=message,
  289. merge_heads=[index_commit_id],
  290. no_verify=True,
  291. **commit_kwargs,
  292. )
  293. # Reset working tree and index to HEAD to match git's behavior
  294. # Use update_working_tree to reset from stash tree to HEAD tree
  295. # Get HEAD tree
  296. head_commit = self._repo.get_object(self._repo.head())
  297. assert isinstance(head_commit, Commit)
  298. head_tree_id = head_commit.tree
  299. # Update from stash tree to HEAD tree
  300. # This will remove files that were in stash but not in HEAD,
  301. # and restore files to their HEAD versions
  302. changes = tree_changes(self._repo.object_store, stash_tree_id, head_tree_id)
  303. update_working_tree(
  304. self._repo,
  305. old_tree_id=stash_tree_id,
  306. new_tree_id=head_tree_id,
  307. change_iterator=changes,
  308. allow_overwrite_modified=True, # We need to overwrite modified files
  309. )
  310. return cid
  311. def __getitem__(self, index: int) -> "Entry":
  312. """Get stash entry by index."""
  313. return list(self.stashes())[index]
  314. def __len__(self) -> int:
  315. """Return number of stash entries."""
  316. return len(list(self.stashes()))