stash.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. # stash.py
  2. # Copyright (C) 2018 Jelmer Vernooij <jelmer@samba.org>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Stash handling."""
  22. __all__ = [
  23. "DEFAULT_STASH_REF",
  24. "CommitKwargs",
  25. "Stash",
  26. ]
  27. import os
  28. import sys
  29. from typing import TYPE_CHECKING, TypedDict
  30. from .diff_tree import tree_changes
  31. from .file import GitFile
  32. from .index import (
  33. IndexEntry,
  34. _tree_to_fs_path,
  35. build_file_from_blob,
  36. commit_tree,
  37. index_entry_from_stat,
  38. iter_fresh_objects,
  39. symlink,
  40. update_working_tree,
  41. validate_path,
  42. validate_path_element_default,
  43. validate_path_element_hfs,
  44. validate_path_element_ntfs,
  45. )
  46. from .object_store import iter_tree_contents
  47. from .objects import S_IFGITLINK, Blob, Commit, ObjectID, TreeEntry
  48. from .reflog import drop_reflog_entry, read_reflog
  49. from .refs import Ref
  50. if TYPE_CHECKING:
  51. from .reflog import Entry
  52. from .repo import Repo
  53. class CommitKwargs(TypedDict, total=False):
  54. """Keyword arguments for do_commit."""
  55. committer: bytes
  56. author: bytes
  57. DEFAULT_STASH_REF = Ref(b"refs/stash")
  58. class Stash:
  59. """A Git stash.
  60. Note that this doesn't currently update the working tree.
  61. """
  62. def __init__(self, repo: "Repo", ref: Ref = DEFAULT_STASH_REF) -> None:
  63. """Initialize Stash.
  64. Args:
  65. repo: Repository object
  66. ref: Stash reference name
  67. """
  68. self._ref = ref
  69. self._repo = repo
  70. @property
  71. def _reflog_path(self) -> str:
  72. return os.path.join(self._repo.commondir(), "logs", os.fsdecode(self._ref))
  73. def stashes(self) -> list["Entry"]:
  74. """Get list of stash entries.
  75. Returns:
  76. List of stash entries in chronological order
  77. """
  78. try:
  79. with GitFile(self._reflog_path, "rb") as f:
  80. return list(reversed(list(read_reflog(f))))
  81. except FileNotFoundError:
  82. return []
  83. @classmethod
  84. def from_repo(cls, repo: "Repo") -> "Stash":
  85. """Create a new stash from a Repo object."""
  86. return cls(repo)
  87. def drop(self, index: int) -> None:
  88. """Drop entry with specified index."""
  89. with open(self._reflog_path, "rb+") as f:
  90. drop_reflog_entry(f, index, rewrite=True)
  91. if len(self) == 0:
  92. os.remove(self._reflog_path)
  93. del self._repo.refs[self._ref]
  94. return
  95. if index == 0:
  96. self._repo.refs[self._ref] = self[0].new_sha
  97. def pop(self, index: int) -> "Entry":
  98. """Pop a stash entry and apply its changes.
  99. Args:
  100. index: Index of the stash entry to pop (0 is the most recent)
  101. Returns:
  102. The stash entry that was popped
  103. """
  104. # Get the stash entry before removing it
  105. entry = self[index]
  106. # Get the stash commit
  107. stash_commit = self._repo.get_object(entry.new_sha)
  108. assert isinstance(stash_commit, Commit)
  109. # The stash commit has the working tree changes
  110. # Its first parent is the commit the stash was based on
  111. # Its second parent is the index commit
  112. if len(stash_commit.parents) < 1:
  113. raise ValueError("Invalid stash entry: no parent commits")
  114. base_commit_sha = stash_commit.parents[0]
  115. # Get current HEAD to determine if we can apply cleanly
  116. try:
  117. from dulwich.refs import HEADREF
  118. current_head = self._repo.refs[HEADREF]
  119. except KeyError:
  120. raise ValueError("Cannot pop stash: no HEAD")
  121. # Check if we're at the same commit where the stash was created
  122. # If not, we need to do a three-way merge
  123. if current_head != base_commit_sha:
  124. # For now, we'll apply changes directly but this could cause conflicts
  125. # A full implementation would do a three-way merge
  126. pass
  127. # Apply the stash changes to the working tree and index
  128. # Get config for working directory update
  129. config = self._repo.get_config()
  130. honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
  131. if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
  132. validate_path_element = validate_path_element_ntfs
  133. elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
  134. validate_path_element = validate_path_element_hfs
  135. else:
  136. validate_path_element = validate_path_element_default
  137. if config.get_boolean(b"core", b"symlinks", True):
  138. symlink_fn = symlink
  139. else:
  140. def symlink_fn( # type: ignore[misc,unused-ignore]
  141. src: str | bytes,
  142. dst: str | bytes,
  143. target_is_directory: bool = False,
  144. *,
  145. dir_fd: int | None = None,
  146. ) -> None:
  147. mode = "w" + ("b" if isinstance(src, bytes) else "")
  148. with open(dst, mode) as f:
  149. f.write(src)
  150. # Get blob normalizer for line ending conversion
  151. blob_normalizer = self._repo.get_blob_normalizer()
  152. # Open the index
  153. repo_index = self._repo.open_index()
  154. # Apply working tree changes
  155. stash_tree_id = stash_commit.tree
  156. repo_path = os.fsencode(self._repo.path)
  157. # First, if we have index changes (second parent), restore the index state
  158. if len(stash_commit.parents) >= 2:
  159. index_commit_sha = stash_commit.parents[1]
  160. index_commit = self._repo.get_object(index_commit_sha)
  161. assert isinstance(index_commit, Commit)
  162. index_tree_id = index_commit.tree
  163. # Update index entries from the stashed index tree
  164. tree_entry: TreeEntry
  165. for tree_entry in iter_tree_contents(
  166. self._repo.object_store, index_tree_id
  167. ):
  168. assert (
  169. tree_entry.path is not None
  170. and tree_entry.mode is not None
  171. and tree_entry.sha is not None
  172. )
  173. if not validate_path(tree_entry.path, validate_path_element):
  174. continue
  175. # Add to index with stage 0 (normal)
  176. # Get file stats for the entry
  177. full_path = _tree_to_fs_path(repo_path, tree_entry.path)
  178. try:
  179. st = os.lstat(full_path)
  180. except FileNotFoundError:
  181. # File doesn't exist yet, use dummy stats
  182. st = os.stat_result((tree_entry.mode, 0, 0, 0, 0, 0, 0, 0, 0, 0))
  183. repo_index[tree_entry.path] = index_entry_from_stat(st, tree_entry.sha)
  184. # Apply working tree changes from the stash
  185. tree_entry2: TreeEntry
  186. for tree_entry2 in iter_tree_contents(self._repo.object_store, stash_tree_id):
  187. assert (
  188. tree_entry2.path is not None
  189. and tree_entry2.mode is not None
  190. and tree_entry2.sha is not None
  191. )
  192. if not validate_path(tree_entry2.path, validate_path_element):
  193. continue
  194. full_path = _tree_to_fs_path(repo_path, tree_entry2.path)
  195. # Create parent directories if needed
  196. parent_dir = os.path.dirname(full_path)
  197. if parent_dir and not os.path.exists(parent_dir):
  198. os.makedirs(parent_dir)
  199. # Write the file
  200. if tree_entry2.mode == S_IFGITLINK:
  201. # Submodule - just create directory
  202. if not os.path.isdir(full_path):
  203. os.mkdir(full_path)
  204. st = os.lstat(full_path)
  205. else:
  206. obj = self._repo.object_store[tree_entry2.sha]
  207. assert isinstance(obj, Blob)
  208. # Apply blob normalization for checkout if normalizer is provided
  209. if blob_normalizer is not None:
  210. obj = blob_normalizer.checkout_normalize(obj, tree_entry2.path)
  211. st = build_file_from_blob(
  212. obj,
  213. tree_entry2.mode,
  214. full_path,
  215. honor_filemode=honor_filemode,
  216. symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore]
  217. )
  218. # Update index if the file wasn't already staged
  219. if tree_entry2.path not in repo_index:
  220. # Update with file stats from disk
  221. repo_index[tree_entry2.path] = index_entry_from_stat(
  222. st, tree_entry2.sha
  223. )
  224. else:
  225. existing_entry = repo_index[tree_entry2.path]
  226. if (
  227. isinstance(existing_entry, IndexEntry)
  228. and existing_entry.mode == tree_entry2.mode
  229. and existing_entry.sha == tree_entry2.sha
  230. ):
  231. # Update with file stats from disk
  232. repo_index[tree_entry2.path] = index_entry_from_stat(
  233. st, tree_entry2.sha
  234. )
  235. # Write the updated index
  236. repo_index.write()
  237. # Remove the stash entry
  238. self.drop(index)
  239. return entry
  240. def push(
  241. self,
  242. committer: bytes | None = None,
  243. author: bytes | None = None,
  244. message: bytes | None = None,
  245. ) -> ObjectID:
  246. """Create a new stash.
  247. Args:
  248. committer: Optional committer name to use
  249. author: Optional author name to use
  250. message: Optional commit message
  251. """
  252. # First, create the index commit.
  253. commit_kwargs = CommitKwargs()
  254. if committer is not None:
  255. commit_kwargs["committer"] = committer
  256. if author is not None:
  257. commit_kwargs["author"] = author
  258. index = self._repo.open_index()
  259. index_tree_id = index.commit(self._repo.object_store)
  260. # Create a dangling commit for the index state
  261. # Note: We pass ref=None which is handled specially in do_commit
  262. # to create a commit without updating any reference
  263. index_commit_id = self._repo.get_worktree().commit(
  264. tree=index_tree_id,
  265. message=b"Index stash",
  266. merge_heads=[self._repo.head()],
  267. no_verify=True,
  268. sign=False,
  269. ref=None, # Don't update any ref
  270. **commit_kwargs,
  271. )
  272. # Then, the working tree one.
  273. # Filter out entries with None values since commit_tree expects non-None values
  274. fresh_objects = [
  275. (path, sha, mode)
  276. for path, sha, mode in iter_fresh_objects(
  277. index,
  278. os.fsencode(self._repo.path),
  279. object_store=self._repo.object_store,
  280. )
  281. if sha is not None and mode is not None
  282. ]
  283. stash_tree_id = commit_tree(
  284. self._repo.object_store,
  285. fresh_objects,
  286. )
  287. if message is None:
  288. message = b"A stash on " + self._repo.head()
  289. # TODO(jelmer): Just pass parents into do_commit()?
  290. self._repo.refs[self._ref] = self._repo.head()
  291. cid: ObjectID = self._repo.get_worktree().commit(
  292. ref=self._ref,
  293. tree=stash_tree_id,
  294. message=message,
  295. merge_heads=[index_commit_id],
  296. no_verify=True,
  297. sign=False,
  298. **commit_kwargs,
  299. )
  300. # Reset working tree and index to HEAD to match git's behavior
  301. # Use update_working_tree to reset from stash tree to HEAD tree
  302. # Get HEAD tree
  303. head_commit = self._repo.get_object(self._repo.head())
  304. assert isinstance(head_commit, Commit)
  305. head_tree_id = head_commit.tree
  306. # Update from stash tree to HEAD tree
  307. # This will remove files that were in stash but not in HEAD,
  308. # and restore files to their HEAD versions
  309. changes = tree_changes(self._repo.object_store, stash_tree_id, head_tree_id)
  310. update_working_tree(
  311. self._repo,
  312. old_tree_id=stash_tree_id,
  313. new_tree_id=head_tree_id,
  314. change_iterator=changes,
  315. allow_overwrite_modified=True, # We need to overwrite modified files
  316. )
  317. return cid
  318. def __getitem__(self, index: int) -> "Entry":
  319. """Get stash entry by index."""
  320. return list(self.stashes())[index]
  321. def __len__(self) -> int:
  322. """Return number of stash entries."""
  323. return len(list(self.stashes()))