stash.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. # stash.py
  2. # Copyright (C) 2018 Jelmer Vernooij <jelmer@samba.org>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Stash handling."""
  22. import os
  23. from typing import TYPE_CHECKING, Optional, TypedDict
  24. from .file import GitFile
  25. from .index import (
  26. IndexEntry,
  27. _tree_to_fs_path,
  28. build_file_from_blob,
  29. commit_tree,
  30. index_entry_from_stat,
  31. iter_fresh_objects,
  32. iter_tree_contents,
  33. symlink,
  34. update_working_tree,
  35. validate_path,
  36. validate_path_element_default,
  37. validate_path_element_ntfs,
  38. )
  39. from .objects import S_IFGITLINK, Blob, Commit, ObjectID
  40. from .reflog import drop_reflog_entry, read_reflog
  41. from .refs import Ref
  42. if TYPE_CHECKING:
  43. from .reflog import Entry
  44. from .repo import Repo
  45. class CommitKwargs(TypedDict, total=False):
  46. """Keyword arguments for do_commit."""
  47. committer: bytes
  48. author: bytes
  49. DEFAULT_STASH_REF = b"refs/stash"
  50. class Stash:
  51. """A Git stash.
  52. Note that this doesn't currently update the working tree.
  53. """
  54. def __init__(self, repo: "Repo", ref: Ref = DEFAULT_STASH_REF) -> None:
  55. self._ref = ref
  56. self._repo = repo
  57. @property
  58. def _reflog_path(self) -> str:
  59. return os.path.join(self._repo.commondir(), "logs", os.fsdecode(self._ref))
  60. def stashes(self) -> list["Entry"]:
  61. try:
  62. with GitFile(self._reflog_path, "rb") as f:
  63. return list(reversed(list(read_reflog(f))))
  64. except FileNotFoundError:
  65. return []
  66. @classmethod
  67. def from_repo(cls, repo: "Repo") -> "Stash":
  68. """Create a new stash from a Repo object."""
  69. return cls(repo)
  70. def drop(self, index: int) -> None:
  71. """Drop entry with specified index."""
  72. with open(self._reflog_path, "rb+") as f:
  73. drop_reflog_entry(f, index, rewrite=True)
  74. if len(self) == 0:
  75. os.remove(self._reflog_path)
  76. del self._repo.refs[self._ref]
  77. return
  78. if index == 0:
  79. self._repo.refs[self._ref] = self[0].new_sha
  80. def pop(self, index: int) -> "Entry":
  81. """Pop a stash entry and apply its changes.
  82. Args:
  83. index: Index of the stash entry to pop (0 is the most recent)
  84. Returns:
  85. The stash entry that was popped
  86. """
  87. # Get the stash entry before removing it
  88. entry = self[index]
  89. # Get the stash commit
  90. stash_commit = self._repo.get_object(entry.new_sha)
  91. assert isinstance(stash_commit, Commit)
  92. # The stash commit has the working tree changes
  93. # Its first parent is the commit the stash was based on
  94. # Its second parent is the index commit
  95. if len(stash_commit.parents) < 1:
  96. raise ValueError("Invalid stash entry: no parent commits")
  97. base_commit_sha = stash_commit.parents[0]
  98. # Get current HEAD to determine if we can apply cleanly
  99. try:
  100. current_head = self._repo.refs[b"HEAD"]
  101. except KeyError:
  102. raise ValueError("Cannot pop stash: no HEAD")
  103. # Check if we're at the same commit where the stash was created
  104. # If not, we need to do a three-way merge
  105. if current_head != base_commit_sha:
  106. # For now, we'll apply changes directly but this could cause conflicts
  107. # A full implementation would do a three-way merge
  108. pass
  109. # Apply the stash changes to the working tree and index
  110. # Get config for working directory update
  111. config = self._repo.get_config()
  112. honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
  113. if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
  114. validate_path_element = validate_path_element_ntfs
  115. else:
  116. validate_path_element = validate_path_element_default
  117. if config.get_boolean(b"core", b"symlinks", True):
  118. symlink_fn = symlink
  119. else:
  120. def symlink_fn(source, target) -> None: # type: ignore
  121. mode = "w" + ("b" if isinstance(source, bytes) else "")
  122. with open(target, mode) as f:
  123. f.write(source)
  124. # Get blob normalizer for line ending conversion
  125. blob_normalizer = self._repo.get_blob_normalizer()
  126. # Open the index
  127. repo_index = self._repo.open_index()
  128. # Apply working tree changes
  129. stash_tree_id = stash_commit.tree
  130. repo_path = os.fsencode(self._repo.path)
  131. # First, if we have index changes (second parent), restore the index state
  132. if len(stash_commit.parents) >= 2:
  133. index_commit_sha = stash_commit.parents[1]
  134. index_commit = self._repo.get_object(index_commit_sha)
  135. assert isinstance(index_commit, Commit)
  136. index_tree_id = index_commit.tree
  137. # Update index entries from the stashed index tree
  138. for entry in iter_tree_contents(self._repo.object_store, index_tree_id):
  139. if not validate_path(entry.path, validate_path_element):
  140. continue
  141. # Add to index with stage 0 (normal)
  142. # Get file stats for the entry
  143. full_path = _tree_to_fs_path(repo_path, entry.path)
  144. try:
  145. st = os.lstat(full_path)
  146. except FileNotFoundError:
  147. # File doesn't exist yet, use dummy stats
  148. st = os.stat_result((entry.mode, 0, 0, 0, 0, 0, 0, 0, 0, 0))
  149. repo_index[entry.path] = index_entry_from_stat(st, entry.sha)
  150. # Apply working tree changes from the stash
  151. for entry in iter_tree_contents(self._repo.object_store, stash_tree_id):
  152. if not validate_path(entry.path, validate_path_element):
  153. continue
  154. full_path = _tree_to_fs_path(repo_path, entry.path)
  155. # Create parent directories if needed
  156. parent_dir = os.path.dirname(full_path)
  157. if parent_dir and not os.path.exists(parent_dir):
  158. os.makedirs(parent_dir)
  159. # Write the file
  160. if entry.mode == S_IFGITLINK:
  161. # Submodule - just create directory
  162. if not os.path.isdir(full_path):
  163. os.mkdir(full_path)
  164. st = os.lstat(full_path)
  165. else:
  166. obj = self._repo.object_store[entry.sha]
  167. assert isinstance(obj, Blob)
  168. # Apply blob normalization for checkout if normalizer is provided
  169. if blob_normalizer is not None:
  170. obj = blob_normalizer.checkout_normalize(obj, entry.path)
  171. st = build_file_from_blob(
  172. obj,
  173. entry.mode,
  174. full_path,
  175. honor_filemode=honor_filemode,
  176. symlink_fn=symlink_fn,
  177. )
  178. # Update index if the file wasn't already staged
  179. if entry.path not in repo_index:
  180. # Update with file stats from disk
  181. repo_index[entry.path] = index_entry_from_stat(st, entry.sha)
  182. else:
  183. existing_entry = repo_index[entry.path]
  184. if (
  185. isinstance(existing_entry, IndexEntry)
  186. and existing_entry.mode == entry.mode
  187. and existing_entry.sha == entry.sha
  188. ):
  189. # Update with file stats from disk
  190. repo_index[entry.path] = index_entry_from_stat(st, entry.sha)
  191. # Write the updated index
  192. repo_index.write()
  193. # Remove the stash entry
  194. self.drop(index)
  195. return entry
  196. def push(
  197. self,
  198. committer: Optional[bytes] = None,
  199. author: Optional[bytes] = None,
  200. message: Optional[bytes] = None,
  201. ) -> ObjectID:
  202. """Create a new stash.
  203. Args:
  204. committer: Optional committer name to use
  205. author: Optional author name to use
  206. message: Optional commit message
  207. """
  208. # First, create the index commit.
  209. commit_kwargs = CommitKwargs()
  210. if committer is not None:
  211. commit_kwargs["committer"] = committer
  212. if author is not None:
  213. commit_kwargs["author"] = author
  214. index = self._repo.open_index()
  215. index_tree_id = index.commit(self._repo.object_store)
  216. # Create a dangling commit for the index state
  217. # Note: We pass ref=None which is handled specially in do_commit
  218. # to create a commit without updating any reference
  219. index_commit_id = self._repo.do_commit(
  220. tree=index_tree_id,
  221. message=b"Index stash",
  222. merge_heads=[self._repo.head()],
  223. no_verify=True,
  224. ref=None, # Don't update any ref
  225. **commit_kwargs,
  226. )
  227. # Then, the working tree one.
  228. # Filter out entries with None values since commit_tree expects non-None values
  229. fresh_objects = [
  230. (path, sha, mode)
  231. for path, sha, mode in iter_fresh_objects(
  232. index,
  233. os.fsencode(self._repo.path),
  234. object_store=self._repo.object_store,
  235. )
  236. if sha is not None and mode is not None
  237. ]
  238. stash_tree_id = commit_tree(
  239. self._repo.object_store,
  240. fresh_objects,
  241. )
  242. if message is None:
  243. message = b"A stash on " + self._repo.head()
  244. # TODO(jelmer): Just pass parents into do_commit()?
  245. self._repo.refs[self._ref] = self._repo.head()
  246. cid = self._repo.do_commit(
  247. ref=self._ref,
  248. tree=stash_tree_id,
  249. message=message,
  250. merge_heads=[index_commit_id],
  251. no_verify=True,
  252. **commit_kwargs,
  253. )
  254. # Reset working tree and index to HEAD to match git's behavior
  255. # Use update_working_tree to reset from stash tree to HEAD tree
  256. # Get HEAD tree
  257. head_commit = self._repo.get_object(self._repo.head())
  258. assert isinstance(head_commit, Commit)
  259. head_tree_id = head_commit.tree
  260. # Update from stash tree to HEAD tree
  261. # This will remove files that were in stash but not in HEAD,
  262. # and restore files to their HEAD versions
  263. update_working_tree(
  264. self._repo,
  265. old_tree_id=stash_tree_id,
  266. new_tree_id=head_tree_id,
  267. )
  268. return cid
  269. def __getitem__(self, index: int) -> "Entry":
  270. return list(self.stashes())[index]
  271. def __len__(self) -> int:
  272. return len(list(self.stashes()))