worktree.py 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229
  1. # worktree.py -- Working tree operations for Git repositories
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Working tree operations for Git repositories."""
  22. from __future__ import annotations
  23. import builtins
  24. import os
  25. import shutil
  26. import stat
  27. import sys
  28. import tempfile
  29. import time
  30. import warnings
  31. from collections.abc import Iterable, Iterator
  32. from contextlib import contextmanager
  33. from pathlib import Path
  34. from typing import Any, Callable, Union
  35. from .errors import CommitError, HookError
  36. from .objects import Blob, Commit, ObjectID, Tag, Tree
  37. from .refs import SYMREF, Ref
  38. from .repo import (
  39. GITDIR,
  40. WORKTREES,
  41. Repo,
  42. check_user_identity,
  43. get_user_identity,
  44. )
  45. class WorkTreeInfo:
  46. """Information about a single worktree.
  47. Attributes:
  48. path: Path to the worktree
  49. head: Current HEAD commit SHA
  50. branch: Current branch (if not detached)
  51. bare: Whether this is a bare repository
  52. detached: Whether HEAD is detached
  53. locked: Whether the worktree is locked
  54. prunable: Whether the worktree can be pruned
  55. lock_reason: Reason for locking (if locked)
  56. """
  57. def __init__(
  58. self,
  59. path: str,
  60. head: bytes | None = None,
  61. branch: bytes | None = None,
  62. bare: bool = False,
  63. detached: bool = False,
  64. locked: bool = False,
  65. prunable: bool = False,
  66. lock_reason: str | None = None,
  67. ):
  68. """Initialize WorkTreeInfo.
  69. Args:
  70. path: Path to the worktree
  71. head: Current HEAD commit SHA
  72. branch: Current branch (if not detached)
  73. bare: Whether this is a bare repository
  74. detached: Whether HEAD is detached
  75. locked: Whether the worktree is locked
  76. prunable: Whether the worktree can be pruned
  77. lock_reason: Reason for locking (if locked)
  78. """
  79. self.path = path
  80. self.head = head
  81. self.branch = branch
  82. self.bare = bare
  83. self.detached = detached
  84. self.locked = locked
  85. self.prunable = prunable
  86. self.lock_reason = lock_reason
  87. def __repr__(self) -> str:
  88. """Return string representation of WorkTreeInfo."""
  89. return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
  90. def __eq__(self, other: object) -> bool:
  91. """Check equality with another WorkTreeInfo."""
  92. if not isinstance(other, WorkTreeInfo):
  93. return NotImplemented
  94. return (
  95. self.path == other.path
  96. and self.head == other.head
  97. and self.branch == other.branch
  98. and self.bare == other.bare
  99. and self.detached == other.detached
  100. and self.locked == other.locked
  101. and self.prunable == other.prunable
  102. and self.lock_reason == other.lock_reason
  103. )
  104. def open(self) -> WorkTree:
  105. """Open this worktree as a WorkTree.
  106. Returns:
  107. WorkTree object for this worktree
  108. Raises:
  109. NotGitRepository: If the worktree path is invalid
  110. """
  111. from .repo import Repo
  112. repo = Repo(self.path)
  113. return WorkTree(repo, self.path)
  114. class WorkTreeContainer:
  115. """Container for managing multiple working trees.
  116. This class manages worktrees for a repository, similar to how
  117. RefsContainer manages references.
  118. """
  119. def __init__(self, repo: Repo) -> None:
  120. """Initialize a WorkTreeContainer for the given repository.
  121. Args:
  122. repo: The repository this container belongs to
  123. """
  124. self._repo = repo
  125. def list(self) -> list[WorkTreeInfo]:
  126. """List all worktrees for this repository.
  127. Returns:
  128. A list of WorkTreeInfo objects
  129. """
  130. return list_worktrees(self._repo)
  131. def add(
  132. self,
  133. path: str | bytes | os.PathLike,
  134. branch: str | bytes | None = None,
  135. commit: ObjectID | None = None,
  136. force: bool = False,
  137. detach: bool = False,
  138. exist_ok: bool = False,
  139. ) -> Repo:
  140. """Add a new worktree.
  141. Args:
  142. path: Path where the new worktree should be created
  143. branch: Branch to checkout in the new worktree
  144. commit: Specific commit to checkout (results in detached HEAD)
  145. force: Force creation even if branch is already checked out elsewhere
  146. detach: Detach HEAD in the new worktree
  147. exist_ok: If True, do not raise an error if the directory already exists
  148. Returns:
  149. The newly created worktree repository
  150. """
  151. return add_worktree(
  152. self._repo,
  153. path,
  154. branch=branch,
  155. commit=commit,
  156. force=force,
  157. detach=detach,
  158. exist_ok=exist_ok,
  159. )
  160. def remove(self, path: str | bytes | os.PathLike, force: bool = False) -> None:
  161. """Remove a worktree.
  162. Args:
  163. path: Path to the worktree to remove
  164. force: Force removal even if there are local changes
  165. """
  166. remove_worktree(self._repo, path, force=force)
  167. def prune(
  168. self, expire: int | None = None, dry_run: bool = False
  169. ) -> builtins.list[str]:
  170. """Prune worktree administrative files for missing worktrees.
  171. Args:
  172. expire: Only prune worktrees older than this many seconds
  173. dry_run: Don't actually remove anything, just report what would be removed
  174. Returns:
  175. List of pruned worktree identifiers
  176. """
  177. return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
  178. def move(
  179. self, old_path: str | bytes | os.PathLike, new_path: str | bytes | os.PathLike
  180. ) -> None:
  181. """Move a worktree to a new location.
  182. Args:
  183. old_path: Current path of the worktree
  184. new_path: New path for the worktree
  185. """
  186. move_worktree(self._repo, old_path, new_path)
  187. def lock(self, path: str | bytes | os.PathLike, reason: str | None = None) -> None:
  188. """Lock a worktree to prevent it from being pruned.
  189. Args:
  190. path: Path to the worktree to lock
  191. reason: Optional reason for locking
  192. """
  193. lock_worktree(self._repo, path, reason=reason)
  194. def unlock(self, path: str | bytes | os.PathLike) -> None:
  195. """Unlock a worktree.
  196. Args:
  197. path: Path to the worktree to unlock
  198. """
  199. unlock_worktree(self._repo, path)
  200. def __iter__(self) -> Iterator[WorkTreeInfo]:
  201. """Iterate over all worktrees."""
  202. yield from self.list()
  203. class WorkTree:
  204. """Working tree operations for a Git repository.
  205. This class provides methods for working with the working tree,
  206. such as staging files, committing changes, and resetting the index.
  207. """
  208. def __init__(self, repo: Repo, path: str | bytes | os.PathLike) -> None:
  209. """Initialize a WorkTree for the given repository.
  210. Args:
  211. repo: The repository this working tree belongs to
  212. path: Path to the working tree directory
  213. """
  214. self._repo = repo
  215. raw_path = os.fspath(path)
  216. if isinstance(raw_path, bytes):
  217. self.path: str = os.fsdecode(raw_path)
  218. else:
  219. self.path = raw_path
  220. self.path = os.path.abspath(self.path)
  221. def stage(
  222. self,
  223. fs_paths: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
  224. ) -> None:
  225. """Stage a set of paths.
  226. Args:
  227. fs_paths: List of paths, relative to the repository path
  228. """
  229. root_path_bytes = os.fsencode(self.path)
  230. if isinstance(fs_paths, (str, bytes, os.PathLike)):
  231. fs_paths = [fs_paths]
  232. fs_paths = list(fs_paths)
  233. from .index import (
  234. _fs_to_tree_path,
  235. blob_from_path_and_stat,
  236. index_entry_from_directory,
  237. index_entry_from_stat,
  238. )
  239. index = self._repo.open_index()
  240. blob_normalizer = self._repo.get_blob_normalizer()
  241. for fs_path in fs_paths:
  242. if not isinstance(fs_path, bytes):
  243. fs_path = os.fsencode(fs_path)
  244. if os.path.isabs(fs_path):
  245. raise ValueError(
  246. f"path {fs_path!r} should be relative to "
  247. "repository root, not absolute"
  248. )
  249. tree_path = _fs_to_tree_path(fs_path)
  250. full_path = os.path.join(root_path_bytes, fs_path)
  251. try:
  252. st = os.lstat(full_path)
  253. except (FileNotFoundError, NotADirectoryError):
  254. # File no longer exists
  255. try:
  256. del index[tree_path]
  257. except KeyError:
  258. pass # already removed
  259. else:
  260. if stat.S_ISDIR(st.st_mode):
  261. entry = index_entry_from_directory(st, full_path)
  262. if entry:
  263. index[tree_path] = entry
  264. else:
  265. try:
  266. del index[tree_path]
  267. except KeyError:
  268. pass
  269. elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
  270. try:
  271. del index[tree_path]
  272. except KeyError:
  273. pass
  274. else:
  275. blob = blob_from_path_and_stat(full_path, st)
  276. blob = blob_normalizer.checkin_normalize(blob, fs_path)
  277. self._repo.object_store.add_object(blob)
  278. index[tree_path] = index_entry_from_stat(st, blob.id)
  279. index.write()
  280. def unstage(self, fs_paths: list[str]) -> None:
  281. """Unstage specific file in the index.
  282. Args:
  283. fs_paths: a list of files to unstage,
  284. relative to the repository path.
  285. """
  286. from .index import IndexEntry, _fs_to_tree_path
  287. index = self._repo.open_index()
  288. try:
  289. commit = self._repo[b"HEAD"]
  290. except KeyError:
  291. # no head mean no commit in the repo
  292. for fs_path in fs_paths:
  293. tree_path = _fs_to_tree_path(fs_path)
  294. del index[tree_path]
  295. index.write()
  296. return
  297. else:
  298. assert isinstance(commit, Commit), "HEAD must be a commit"
  299. tree_id = commit.tree
  300. for fs_path in fs_paths:
  301. tree_path = _fs_to_tree_path(fs_path)
  302. try:
  303. tree = self._repo.object_store[tree_id]
  304. assert isinstance(tree, Tree)
  305. tree_entry = tree.lookup_path(
  306. self._repo.object_store.__getitem__, tree_path
  307. )
  308. except KeyError:
  309. # if tree_entry didn't exist, this file was being added, so
  310. # remove index entry
  311. try:
  312. del index[tree_path]
  313. continue
  314. except KeyError as exc:
  315. raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
  316. st = None
  317. try:
  318. st = os.lstat(os.path.join(self.path, fs_path))
  319. except FileNotFoundError:
  320. pass
  321. blob_obj = self._repo[tree_entry[1]]
  322. assert isinstance(blob_obj, Blob)
  323. blob_size = len(blob_obj.data)
  324. index_entry = IndexEntry(
  325. ctime=(commit.commit_time, 0),
  326. mtime=(commit.commit_time, 0),
  327. dev=st.st_dev if st else 0,
  328. ino=st.st_ino if st else 0,
  329. mode=tree_entry[0],
  330. uid=st.st_uid if st else 0,
  331. gid=st.st_gid if st else 0,
  332. size=blob_size,
  333. sha=tree_entry[1],
  334. flags=0,
  335. extended_flags=0,
  336. )
  337. index[tree_path] = index_entry
  338. index.write()
  339. def commit(
  340. self,
  341. message: Union[str, bytes, Callable[[Any, Commit], bytes], None] = None,
  342. committer: bytes | None = None,
  343. author: bytes | None = None,
  344. commit_timestamp: float | None = None,
  345. commit_timezone: int | None = None,
  346. author_timestamp: float | None = None,
  347. author_timezone: int | None = None,
  348. tree: ObjectID | None = None,
  349. encoding: bytes | None = None,
  350. ref: Ref | None = b"HEAD",
  351. merge_heads: list[ObjectID] | None = None,
  352. no_verify: bool = False,
  353. sign: bool = False,
  354. ) -> ObjectID:
  355. """Create a new commit.
  356. If not specified, committer and author default to
  357. get_user_identity(..., 'COMMITTER')
  358. and get_user_identity(..., 'AUTHOR') respectively.
  359. Args:
  360. message: Commit message (bytes or callable that takes (repo, commit)
  361. and returns bytes)
  362. committer: Committer fullname
  363. author: Author fullname
  364. commit_timestamp: Commit timestamp (defaults to now)
  365. commit_timezone: Commit timestamp timezone (defaults to GMT)
  366. author_timestamp: Author timestamp (defaults to commit
  367. timestamp)
  368. author_timezone: Author timestamp timezone
  369. (defaults to commit timestamp timezone)
  370. tree: SHA1 of the tree root to use (if not specified the
  371. current index will be committed).
  372. encoding: Encoding
  373. ref: Optional ref to commit to (defaults to current branch).
  374. If None, creates a dangling commit without updating any ref.
  375. merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
  376. no_verify: Skip pre-commit and commit-msg hooks
  377. sign: GPG Sign the commit (bool, defaults to False,
  378. pass True to use default GPG key,
  379. pass a str containing Key ID to use a specific GPG key)
  380. Returns:
  381. New commit SHA1
  382. """
  383. try:
  384. if not no_verify:
  385. self._repo.hooks["pre-commit"].execute()
  386. except HookError as exc:
  387. raise CommitError(exc) from exc
  388. except KeyError: # no hook defined, silent fallthrough
  389. pass
  390. c = Commit()
  391. if tree is None:
  392. index = self._repo.open_index()
  393. c.tree = index.commit(self._repo.object_store)
  394. else:
  395. if len(tree) != 40:
  396. raise ValueError("tree must be a 40-byte hex sha string")
  397. c.tree = tree
  398. config = self._repo.get_config_stack()
  399. if merge_heads is None:
  400. merge_heads = self._repo._read_heads("MERGE_HEAD")
  401. if committer is None:
  402. committer = get_user_identity(config, kind="COMMITTER")
  403. check_user_identity(committer)
  404. c.committer = committer
  405. if commit_timestamp is None:
  406. # FIXME: Support GIT_COMMITTER_DATE environment variable
  407. commit_timestamp = time.time()
  408. c.commit_time = int(commit_timestamp)
  409. if commit_timezone is None:
  410. # FIXME: Use current user timezone rather than UTC
  411. commit_timezone = 0
  412. c.commit_timezone = commit_timezone
  413. if author is None:
  414. author = get_user_identity(config, kind="AUTHOR")
  415. c.author = author
  416. check_user_identity(author)
  417. if author_timestamp is None:
  418. # FIXME: Support GIT_AUTHOR_DATE environment variable
  419. author_timestamp = commit_timestamp
  420. c.author_time = int(author_timestamp)
  421. if author_timezone is None:
  422. author_timezone = commit_timezone
  423. c.author_timezone = author_timezone
  424. if encoding is None:
  425. try:
  426. encoding = config.get(("i18n",), "commitEncoding")
  427. except KeyError:
  428. pass # No dice
  429. if encoding is not None:
  430. c.encoding = encoding
  431. # Store original message (might be callable)
  432. original_message = message
  433. message = None # Will be set later after parents are set
  434. # Check if we should sign the commit
  435. should_sign = sign
  436. if sign is None:
  437. # Check commit.gpgSign configuration when sign is not explicitly set
  438. config = self._repo.get_config_stack()
  439. try:
  440. should_sign = config.get_boolean((b"commit",), b"gpgSign")
  441. except KeyError:
  442. should_sign = False # Default to not signing if no config
  443. keyid = sign if isinstance(sign, str) else None
  444. if ref is None:
  445. # Create a dangling commit
  446. c.parents = merge_heads
  447. else:
  448. try:
  449. old_head = self._repo.refs[ref]
  450. c.parents = [old_head, *merge_heads]
  451. except KeyError:
  452. c.parents = merge_heads
  453. # Handle message after parents are set
  454. if callable(original_message):
  455. message = original_message(self._repo, c)
  456. if message is None:
  457. raise ValueError("Message callback returned None")
  458. else:
  459. message = original_message
  460. if message is None:
  461. # FIXME: Try to read commit message from .git/MERGE_MSG
  462. raise ValueError("No commit message specified")
  463. try:
  464. if no_verify:
  465. c.message = message
  466. else:
  467. c.message = self._repo.hooks["commit-msg"].execute(message)
  468. if c.message is None:
  469. c.message = message
  470. except HookError as exc:
  471. raise CommitError(exc) from exc
  472. except KeyError: # no hook defined, message not modified
  473. c.message = message
  474. if ref is None:
  475. # Create a dangling commit
  476. if should_sign:
  477. c.sign(keyid)
  478. self._repo.object_store.add_object(c)
  479. else:
  480. try:
  481. old_head = self._repo.refs[ref]
  482. if should_sign:
  483. c.sign(keyid)
  484. self._repo.object_store.add_object(c)
  485. message_bytes = (
  486. message.encode() if isinstance(message, str) else message
  487. )
  488. ok = self._repo.refs.set_if_equals(
  489. ref,
  490. old_head,
  491. c.id,
  492. message=b"commit: " + message_bytes,
  493. committer=committer,
  494. timestamp=int(commit_timestamp)
  495. if commit_timestamp is not None
  496. else None,
  497. timezone=commit_timezone,
  498. )
  499. except KeyError:
  500. c.parents = merge_heads
  501. if should_sign:
  502. c.sign(keyid)
  503. self._repo.object_store.add_object(c)
  504. message_bytes = (
  505. message.encode() if isinstance(message, str) else message
  506. )
  507. ok = self._repo.refs.add_if_new(
  508. ref,
  509. c.id,
  510. message=b"commit: " + message_bytes,
  511. committer=committer,
  512. timestamp=int(commit_timestamp)
  513. if commit_timestamp is not None
  514. else None,
  515. timezone=commit_timezone,
  516. )
  517. if not ok:
  518. # Fail if the atomic compare-and-swap failed, leaving the
  519. # commit and all its objects as garbage.
  520. raise CommitError(f"{ref!r} changed during commit")
  521. self._repo._del_named_file("MERGE_HEAD")
  522. try:
  523. self._repo.hooks["post-commit"].execute()
  524. except HookError as e: # silent failure
  525. warnings.warn(f"post-commit hook failed: {e}", UserWarning)
  526. except KeyError: # no hook defined, silent fallthrough
  527. pass
  528. # Trigger auto GC if needed
  529. from .gc import maybe_auto_gc
  530. maybe_auto_gc(self._repo)
  531. return c.id
  532. def reset_index(self, tree: bytes | None = None) -> None:
  533. """Reset the index back to a specific tree.
  534. Args:
  535. tree: Tree SHA to reset to, None for current HEAD tree.
  536. """
  537. from .index import (
  538. build_index_from_tree,
  539. symlink,
  540. validate_path_element_default,
  541. validate_path_element_hfs,
  542. validate_path_element_ntfs,
  543. )
  544. if tree is None:
  545. head = self._repo[b"HEAD"]
  546. if isinstance(head, Tag):
  547. _cls, obj = head.object
  548. head = self._repo.get_object(obj)
  549. from .objects import Commit
  550. assert isinstance(head, Commit)
  551. tree = head.tree
  552. config = self._repo.get_config()
  553. honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
  554. if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
  555. validate_path_element = validate_path_element_ntfs
  556. elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
  557. validate_path_element = validate_path_element_hfs
  558. else:
  559. validate_path_element = validate_path_element_default
  560. if config.get_boolean(b"core", b"symlinks", True):
  561. symlink_fn = symlink
  562. else:
  563. def symlink_fn(
  564. src: Union[str, bytes, os.PathLike],
  565. dst: Union[str, bytes, os.PathLike],
  566. target_is_directory: bool = False,
  567. *,
  568. dir_fd: int | None = None,
  569. ) -> None:
  570. with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f:
  571. f.write(src)
  572. blob_normalizer = self._repo.get_blob_normalizer()
  573. return build_index_from_tree(
  574. self.path,
  575. self._repo.index_path(),
  576. self._repo.object_store,
  577. tree,
  578. honor_filemode=honor_filemode,
  579. validate_path_element=validate_path_element,
  580. symlink_fn=symlink_fn, # type: ignore[arg-type]
  581. blob_normalizer=blob_normalizer,
  582. )
  583. def _sparse_checkout_file_path(self) -> str:
  584. """Return the path of the sparse-checkout file in this repo's control dir."""
  585. return os.path.join(self._repo.controldir(), "info", "sparse-checkout")
  586. def configure_for_cone_mode(self) -> None:
  587. """Ensure the repository is configured for cone-mode sparse-checkout."""
  588. config = self._repo.get_config()
  589. config.set((b"core",), b"sparseCheckout", b"true")
  590. config.set((b"core",), b"sparseCheckoutCone", b"true")
  591. config.write_to_path()
  592. def infer_cone_mode(self) -> bool:
  593. """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
  594. config = self._repo.get_config()
  595. try:
  596. sc_cone = config.get((b"core",), b"sparseCheckoutCone")
  597. return sc_cone == b"true"
  598. except KeyError:
  599. # If core.sparseCheckoutCone is not set, default to False
  600. return False
  601. def get_sparse_checkout_patterns(self) -> list[str]:
  602. """Return a list of sparse-checkout patterns from info/sparse-checkout.
  603. Returns:
  604. A list of patterns. Returns an empty list if the file is missing.
  605. """
  606. path = self._sparse_checkout_file_path()
  607. try:
  608. with open(path, encoding="utf-8") as f:
  609. return [line.strip() for line in f if line.strip()]
  610. except FileNotFoundError:
  611. return []
  612. def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
  613. """Write the given sparse-checkout patterns into info/sparse-checkout.
  614. Creates the info/ directory if it does not exist.
  615. Args:
  616. patterns: A list of gitignore-style patterns to store.
  617. """
  618. info_dir = os.path.join(self._repo.controldir(), "info")
  619. os.makedirs(info_dir, exist_ok=True)
  620. path = self._sparse_checkout_file_path()
  621. with open(path, "w", encoding="utf-8") as f:
  622. for pat in patterns:
  623. f.write(pat + "\n")
  624. def set_cone_mode_patterns(self, dirs: list[str] | None = None) -> None:
  625. """Write the given cone-mode directory patterns into info/sparse-checkout.
  626. For each directory to include, add an inclusion line that "undoes" the prior
  627. ``!/*/`` 'exclude' that re-includes that directory and everything under it.
  628. Never add the same line twice.
  629. """
  630. patterns = ["/*", "!/*/"]
  631. if dirs:
  632. for d in dirs:
  633. d = d.strip("/")
  634. line = f"/{d}/"
  635. if d and line not in patterns:
  636. patterns.append(line)
  637. self.set_sparse_checkout_patterns(patterns)
  638. def read_worktree_lock_reason(worktree_path: str) -> str | None:
  639. """Read the lock reason for a worktree.
  640. Args:
  641. worktree_path: Path to the worktree's administrative directory
  642. Returns:
  643. The lock reason if the worktree is locked, None otherwise
  644. """
  645. locked_path = os.path.join(worktree_path, "locked")
  646. if not os.path.exists(locked_path):
  647. return None
  648. try:
  649. with open(locked_path) as f:
  650. return f.read().strip()
  651. except (FileNotFoundError, PermissionError):
  652. return None
  653. def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
  654. """List all worktrees for the given repository.
  655. Args:
  656. repo: The repository to list worktrees for
  657. Returns:
  658. A list of WorkTreeInfo objects
  659. """
  660. worktrees = []
  661. # Add main worktree
  662. main_wt_info = WorkTreeInfo(
  663. path=repo.path,
  664. head=repo.head(),
  665. bare=repo.bare,
  666. detached=False,
  667. locked=False,
  668. prunable=False,
  669. )
  670. # Get branch info for main worktree
  671. try:
  672. with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
  673. head_contents = f.read().strip()
  674. if head_contents.startswith(SYMREF):
  675. ref_name = head_contents[len(SYMREF) :].strip()
  676. main_wt_info.branch = ref_name
  677. else:
  678. main_wt_info.detached = True
  679. main_wt_info.branch = None
  680. except (FileNotFoundError, PermissionError):
  681. main_wt_info.branch = None
  682. main_wt_info.detached = True
  683. worktrees.append(main_wt_info)
  684. # List additional worktrees
  685. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  686. if os.path.isdir(worktrees_dir):
  687. for entry in os.listdir(worktrees_dir):
  688. worktree_path = os.path.join(worktrees_dir, entry)
  689. if not os.path.isdir(worktree_path):
  690. continue
  691. wt_info = WorkTreeInfo(
  692. path="", # Will be set below
  693. bare=False,
  694. detached=False,
  695. locked=False,
  696. prunable=False,
  697. )
  698. # Read gitdir to get actual worktree path
  699. gitdir_path = os.path.join(worktree_path, GITDIR)
  700. try:
  701. with open(gitdir_path, "rb") as f:
  702. gitdir_contents = f.read().strip()
  703. # Convert relative path to absolute if needed
  704. wt_path = os.fsdecode(gitdir_contents)
  705. if not os.path.isabs(wt_path):
  706. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  707. wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
  708. except (FileNotFoundError, PermissionError):
  709. # Worktree directory is missing, skip it
  710. # TODO: Consider adding these as prunable worktrees with a placeholder path
  711. continue
  712. # Check if worktree path exists
  713. if wt_info.path and not os.path.exists(wt_info.path):
  714. wt_info.prunable = True
  715. # Read HEAD
  716. head_path = os.path.join(worktree_path, "HEAD")
  717. try:
  718. with open(head_path, "rb") as f:
  719. head_contents = f.read().strip()
  720. if head_contents.startswith(SYMREF):
  721. ref_name = head_contents[len(SYMREF) :].strip()
  722. wt_info.branch = ref_name
  723. # Resolve ref to get commit sha
  724. try:
  725. wt_info.head = repo.refs[ref_name]
  726. except KeyError:
  727. wt_info.head = None
  728. else:
  729. wt_info.detached = True
  730. wt_info.branch = None
  731. wt_info.head = head_contents
  732. except (FileNotFoundError, PermissionError):
  733. wt_info.head = None
  734. wt_info.branch = None
  735. # Check if locked
  736. lock_reason = read_worktree_lock_reason(worktree_path)
  737. if lock_reason is not None:
  738. wt_info.locked = True
  739. wt_info.lock_reason = lock_reason
  740. worktrees.append(wt_info)
  741. return worktrees
  742. def add_worktree(
  743. repo: Repo,
  744. path: str | bytes | os.PathLike,
  745. branch: str | bytes | None = None,
  746. commit: ObjectID | None = None,
  747. force: bool = False,
  748. detach: bool = False,
  749. exist_ok: bool = False,
  750. ) -> Repo:
  751. """Add a new worktree to the repository.
  752. Args:
  753. repo: The main repository
  754. path: Path where the new worktree should be created
  755. branch: Branch to checkout in the new worktree (creates if doesn't exist)
  756. commit: Specific commit to checkout (results in detached HEAD)
  757. force: Force creation even if branch is already checked out elsewhere
  758. detach: Detach HEAD in the new worktree
  759. exist_ok: If True, do not raise an error if the directory already exists
  760. Returns:
  761. The newly created worktree repository
  762. Raises:
  763. ValueError: If the path already exists (and exist_ok is False) or branch is already checked out
  764. """
  765. from .repo import Repo as RepoClass
  766. path = os.fspath(path)
  767. if isinstance(path, bytes):
  768. path = os.fsdecode(path)
  769. # Check if path already exists
  770. if os.path.exists(path) and not exist_ok:
  771. raise ValueError(f"Path already exists: {path}")
  772. # Normalize branch name
  773. if branch is not None:
  774. if isinstance(branch, str):
  775. branch = branch.encode()
  776. if not branch.startswith(b"refs/heads/"):
  777. branch = b"refs/heads/" + branch
  778. # Check if branch is already checked out in another worktree
  779. if branch and not force:
  780. for wt in list_worktrees(repo):
  781. if wt.branch == branch:
  782. raise ValueError(
  783. f"Branch {branch.decode()} is already checked out at {wt.path}"
  784. )
  785. # Determine what to checkout
  786. if commit is not None:
  787. checkout_ref = commit
  788. detach = True
  789. elif branch is not None:
  790. # Check if branch exists
  791. try:
  792. checkout_ref = repo.refs[branch]
  793. except KeyError:
  794. if commit is None:
  795. # Create new branch from HEAD
  796. checkout_ref = repo.head()
  797. repo.refs[branch] = checkout_ref
  798. else:
  799. # Create new branch from specified commit
  800. checkout_ref = commit
  801. repo.refs[branch] = checkout_ref
  802. else:
  803. # Default to current HEAD
  804. checkout_ref = repo.head()
  805. detach = True
  806. # Create the worktree directory
  807. os.makedirs(path, exist_ok=exist_ok)
  808. # Initialize the worktree
  809. identifier = os.path.basename(path)
  810. wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier)
  811. # Set HEAD appropriately
  812. if detach:
  813. # Detached HEAD - write SHA directly to HEAD
  814. with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
  815. f.write(checkout_ref + b"\n")
  816. else:
  817. # Point to branch
  818. wt_repo.refs.set_symbolic_ref(b"HEAD", branch)
  819. # Reset index to match HEAD
  820. wt_repo.get_worktree().reset_index()
  821. return wt_repo
  822. def remove_worktree(
  823. repo: Repo, path: str | bytes | os.PathLike, force: bool = False
  824. ) -> None:
  825. """Remove a worktree.
  826. Args:
  827. repo: The main repository
  828. path: Path to the worktree to remove
  829. force: Force removal even if there are local changes
  830. Raises:
  831. ValueError: If the worktree doesn't exist, has local changes, or is locked
  832. """
  833. path = os.fspath(path)
  834. if isinstance(path, bytes):
  835. path = os.fsdecode(path)
  836. # Don't allow removing the main worktree
  837. if os.path.abspath(path) == os.path.abspath(repo.path):
  838. raise ValueError("Cannot remove the main working tree")
  839. # Find the worktree
  840. worktree_found = False
  841. worktree_id = None
  842. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  843. if os.path.isdir(worktrees_dir):
  844. for entry in os.listdir(worktrees_dir):
  845. worktree_path = os.path.join(worktrees_dir, entry)
  846. gitdir_path = os.path.join(worktree_path, GITDIR)
  847. try:
  848. with open(gitdir_path, "rb") as f:
  849. gitdir_contents = f.read().strip()
  850. wt_path = os.fsdecode(gitdir_contents)
  851. if not os.path.isabs(wt_path):
  852. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  853. wt_dir = os.path.dirname(wt_path) # Remove .git suffix
  854. if os.path.abspath(wt_dir) == os.path.abspath(path):
  855. worktree_found = True
  856. worktree_id = entry
  857. break
  858. except (FileNotFoundError, PermissionError):
  859. continue
  860. if not worktree_found:
  861. raise ValueError(f"Worktree not found: {path}")
  862. assert worktree_id is not None # Should be set if worktree_found is True
  863. worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
  864. # Check if locked
  865. if os.path.exists(os.path.join(worktree_control_dir, "locked")):
  866. if not force:
  867. raise ValueError(f"Worktree is locked: {path}")
  868. # Check for local changes if not forcing
  869. if not force and os.path.exists(path):
  870. # TODO: Check for uncommitted changes in the worktree
  871. pass
  872. # Remove the working directory
  873. if os.path.exists(path):
  874. shutil.rmtree(path)
  875. # Remove the administrative files
  876. shutil.rmtree(worktree_control_dir)
  877. def prune_worktrees(
  878. repo: Repo, expire: int | None = None, dry_run: bool = False
  879. ) -> list[str]:
  880. """Prune worktree administrative files for missing worktrees.
  881. Args:
  882. repo: The main repository
  883. expire: Only prune worktrees older than this many seconds
  884. dry_run: Don't actually remove anything, just report what would be removed
  885. Returns:
  886. List of pruned worktree identifiers
  887. """
  888. pruned: list[str] = []
  889. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  890. if not os.path.isdir(worktrees_dir):
  891. return pruned
  892. current_time = time.time()
  893. for entry in os.listdir(worktrees_dir):
  894. worktree_path = os.path.join(worktrees_dir, entry)
  895. if not os.path.isdir(worktree_path):
  896. continue
  897. # Skip locked worktrees
  898. if os.path.exists(os.path.join(worktree_path, "locked")):
  899. continue
  900. should_prune = False
  901. # Check if gitdir exists and points to valid location
  902. gitdir_path = os.path.join(worktree_path, GITDIR)
  903. try:
  904. with open(gitdir_path, "rb") as f:
  905. gitdir_contents = f.read().strip()
  906. wt_path = os.fsdecode(gitdir_contents)
  907. if not os.path.isabs(wt_path):
  908. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  909. wt_dir = os.path.dirname(wt_path) # Remove .git suffix
  910. if not os.path.exists(wt_dir):
  911. should_prune = True
  912. except (FileNotFoundError, PermissionError):
  913. should_prune = True
  914. # Check expiry time if specified
  915. if should_prune and expire is not None:
  916. stat_info = os.stat(worktree_path)
  917. age = current_time - stat_info.st_mtime
  918. if age < expire:
  919. should_prune = False
  920. if should_prune:
  921. pruned.append(entry)
  922. if not dry_run:
  923. shutil.rmtree(worktree_path)
  924. return pruned
  925. def lock_worktree(
  926. repo: Repo, path: str | bytes | os.PathLike, reason: str | None = None
  927. ) -> None:
  928. """Lock a worktree to prevent it from being pruned.
  929. Args:
  930. repo: The main repository
  931. path: Path to the worktree to lock
  932. reason: Optional reason for locking
  933. """
  934. worktree_id = _find_worktree_id(repo, path)
  935. worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
  936. lock_path = os.path.join(worktree_control_dir, "locked")
  937. with open(lock_path, "w") as f:
  938. if reason:
  939. f.write(reason)
  940. def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike) -> None:
  941. """Unlock a worktree.
  942. Args:
  943. repo: The main repository
  944. path: Path to the worktree to unlock
  945. """
  946. worktree_id = _find_worktree_id(repo, path)
  947. worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
  948. lock_path = os.path.join(worktree_control_dir, "locked")
  949. if os.path.exists(lock_path):
  950. os.remove(lock_path)
  951. def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike) -> str:
  952. """Find the worktree identifier for the given path.
  953. Args:
  954. repo: The main repository
  955. path: Path to the worktree
  956. Returns:
  957. The worktree identifier
  958. Raises:
  959. ValueError: If the worktree is not found
  960. """
  961. path = os.fspath(path)
  962. if isinstance(path, bytes):
  963. path = os.fsdecode(path)
  964. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  965. if os.path.isdir(worktrees_dir):
  966. for entry in os.listdir(worktrees_dir):
  967. worktree_path = os.path.join(worktrees_dir, entry)
  968. gitdir_path = os.path.join(worktree_path, GITDIR)
  969. try:
  970. with open(gitdir_path, "rb") as f:
  971. gitdir_contents = f.read().strip()
  972. wt_path = os.fsdecode(gitdir_contents)
  973. if not os.path.isabs(wt_path):
  974. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  975. wt_dir = os.path.dirname(wt_path) # Remove .git suffix
  976. if os.path.abspath(wt_dir) == os.path.abspath(path):
  977. return entry
  978. except (FileNotFoundError, PermissionError):
  979. continue
  980. raise ValueError(f"Worktree not found: {path}")
  981. def move_worktree(
  982. repo: Repo,
  983. old_path: str | bytes | os.PathLike,
  984. new_path: str | bytes | os.PathLike,
  985. ) -> None:
  986. """Move a worktree to a new location.
  987. Args:
  988. repo: The main repository
  989. old_path: Current path of the worktree
  990. new_path: New path for the worktree
  991. Raises:
  992. ValueError: If the worktree doesn't exist or new path already exists
  993. """
  994. old_path = os.fspath(old_path)
  995. new_path = os.fspath(new_path)
  996. if isinstance(old_path, bytes):
  997. old_path = os.fsdecode(old_path)
  998. if isinstance(new_path, bytes):
  999. new_path = os.fsdecode(new_path)
  1000. # Don't allow moving the main worktree
  1001. if os.path.abspath(old_path) == os.path.abspath(repo.path):
  1002. raise ValueError("Cannot move the main working tree")
  1003. # Check if new path already exists
  1004. if os.path.exists(new_path):
  1005. raise ValueError(f"Path already exists: {new_path}")
  1006. # Find the worktree
  1007. worktree_id = _find_worktree_id(repo, old_path)
  1008. worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
  1009. # Move the actual worktree directory
  1010. shutil.move(old_path, new_path)
  1011. # Update the gitdir file in the worktree
  1012. gitdir_file = os.path.join(new_path, ".git")
  1013. # Update the gitdir pointer in the control directory
  1014. with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
  1015. f.write(os.fsencode(gitdir_file) + b"\n")
  1016. @contextmanager
  1017. def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
  1018. """Create a temporary worktree that is automatically cleaned up.
  1019. Args:
  1020. repo: Dulwich repository object
  1021. prefix: Prefix for the temporary directory name
  1022. Yields:
  1023. Worktree object
  1024. """
  1025. temp_dir = None
  1026. worktree = None
  1027. try:
  1028. # Create temporary directory
  1029. temp_dir = tempfile.mkdtemp(prefix=prefix)
  1030. # Add worktree
  1031. worktree = repo.worktrees.add(temp_dir, exist_ok=True)
  1032. yield worktree
  1033. finally:
  1034. # Clean up worktree registration
  1035. if worktree:
  1036. repo.worktrees.remove(worktree.path)
  1037. # Clean up temporary directory
  1038. if temp_dir and Path(temp_dir).exists():
  1039. shutil.rmtree(temp_dir)