worktree.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143
  1. # worktree.py -- Working tree operations for Git repositories
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Working tree operations for Git repositories."""
  22. from __future__ import annotations
  23. import builtins
  24. import os
  25. import shutil
  26. import stat
  27. import sys
  28. import time
  29. import warnings
  30. from collections.abc import Iterable
  31. from .errors import CommitError, HookError
  32. from .objects import Commit, ObjectID, Tag, Tree
  33. from .refs import SYMREF, Ref
  34. from .repo import (
  35. GITDIR,
  36. WORKTREES,
  37. Repo,
  38. check_user_identity,
  39. get_user_identity,
  40. )
  41. class WorkTreeInfo:
  42. """Information about a single worktree.
  43. Attributes:
  44. path: Path to the worktree
  45. head: Current HEAD commit SHA
  46. branch: Current branch (if not detached)
  47. bare: Whether this is a bare repository
  48. detached: Whether HEAD is detached
  49. locked: Whether the worktree is locked
  50. prunable: Whether the worktree can be pruned
  51. lock_reason: Reason for locking (if locked)
  52. """
  53. def __init__(
  54. self,
  55. path: str,
  56. head: bytes | None = None,
  57. branch: bytes | None = None,
  58. bare: bool = False,
  59. detached: bool = False,
  60. locked: bool = False,
  61. prunable: bool = False,
  62. lock_reason: str | None = None,
  63. ):
  64. self.path = path
  65. self.head = head
  66. self.branch = branch
  67. self.bare = bare
  68. self.detached = detached
  69. self.locked = locked
  70. self.prunable = prunable
  71. self.lock_reason = lock_reason
  72. def __repr__(self) -> str:
  73. return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
  74. def __eq__(self, other: object) -> bool:
  75. if not isinstance(other, WorkTreeInfo):
  76. return NotImplemented
  77. return (
  78. self.path == other.path
  79. and self.head == other.head
  80. and self.branch == other.branch
  81. and self.bare == other.bare
  82. and self.detached == other.detached
  83. and self.locked == other.locked
  84. and self.prunable == other.prunable
  85. and self.lock_reason == other.lock_reason
  86. )
  87. def open(self) -> WorkTree:
  88. """Open this worktree as a WorkTree.
  89. Returns:
  90. WorkTree object for this worktree
  91. Raises:
  92. NotGitRepository: If the worktree path is invalid
  93. """
  94. from .repo import Repo
  95. repo = Repo(self.path)
  96. return WorkTree(repo, self.path)
  97. class WorkTreeContainer:
  98. """Container for managing multiple working trees.
  99. This class manages worktrees for a repository, similar to how
  100. RefsContainer manages references.
  101. """
  102. def __init__(self, repo: Repo) -> None:
  103. """Initialize a WorkTreeContainer for the given repository.
  104. Args:
  105. repo: The repository this container belongs to
  106. """
  107. self._repo = repo
  108. def list(self) -> list[WorkTreeInfo]:
  109. """List all worktrees for this repository.
  110. Returns:
  111. A list of WorkTreeInfo objects
  112. """
  113. return list_worktrees(self._repo)
  114. def add(
  115. self,
  116. path: str | bytes | os.PathLike,
  117. branch: str | bytes | None = None,
  118. commit: ObjectID | None = None,
  119. force: bool = False,
  120. detach: bool = False,
  121. ) -> Repo:
  122. """Add a new worktree.
  123. Args:
  124. path: Path where the new worktree should be created
  125. branch: Branch to checkout in the new worktree
  126. commit: Specific commit to checkout (results in detached HEAD)
  127. force: Force creation even if branch is already checked out elsewhere
  128. detach: Detach HEAD in the new worktree
  129. Returns:
  130. The newly created worktree repository
  131. """
  132. return add_worktree(
  133. self._repo, path, branch=branch, commit=commit, force=force, detach=detach
  134. )
  135. def remove(self, path: str | bytes | os.PathLike, force: bool = False) -> None:
  136. """Remove a worktree.
  137. Args:
  138. path: Path to the worktree to remove
  139. force: Force removal even if there are local changes
  140. """
  141. remove_worktree(self._repo, path, force=force)
  142. def prune(
  143. self, expire: int | None = None, dry_run: bool = False
  144. ) -> builtins.list[str]:
  145. """Prune worktree administrative files for missing worktrees.
  146. Args:
  147. expire: Only prune worktrees older than this many seconds
  148. dry_run: Don't actually remove anything, just report what would be removed
  149. Returns:
  150. List of pruned worktree identifiers
  151. """
  152. return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
  153. def move(
  154. self, old_path: str | bytes | os.PathLike, new_path: str | bytes | os.PathLike
  155. ) -> None:
  156. """Move a worktree to a new location.
  157. Args:
  158. old_path: Current path of the worktree
  159. new_path: New path for the worktree
  160. """
  161. move_worktree(self._repo, old_path, new_path)
  162. def lock(self, path: str | bytes | os.PathLike, reason: str | None = None) -> None:
  163. """Lock a worktree to prevent it from being pruned.
  164. Args:
  165. path: Path to the worktree to lock
  166. reason: Optional reason for locking
  167. """
  168. lock_worktree(self._repo, path, reason=reason)
  169. def unlock(self, path: str | bytes | os.PathLike) -> None:
  170. """Unlock a worktree.
  171. Args:
  172. path: Path to the worktree to unlock
  173. """
  174. unlock_worktree(self._repo, path)
  175. def __iter__(self):
  176. """Iterate over all worktrees."""
  177. yield from self.list()
  178. class WorkTree:
  179. """Working tree operations for a Git repository.
  180. This class provides methods for working with the working tree,
  181. such as staging files, committing changes, and resetting the index.
  182. """
  183. def __init__(self, repo: Repo, path: str | bytes | os.PathLike) -> None:
  184. """Initialize a WorkTree for the given repository.
  185. Args:
  186. repo: The repository this working tree belongs to
  187. path: Path to the working tree directory
  188. """
  189. self._repo = repo
  190. raw_path = os.fspath(path)
  191. if isinstance(raw_path, bytes):
  192. self.path: str = os.fsdecode(raw_path)
  193. else:
  194. self.path = raw_path
  195. self.path = os.path.abspath(self.path)
  196. def stage(
  197. self,
  198. fs_paths: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
  199. ) -> None:
  200. """Stage a set of paths.
  201. Args:
  202. fs_paths: List of paths, relative to the repository path
  203. """
  204. root_path_bytes = os.fsencode(self.path)
  205. if isinstance(fs_paths, (str, bytes, os.PathLike)):
  206. fs_paths = [fs_paths]
  207. fs_paths = list(fs_paths)
  208. from .index import (
  209. _fs_to_tree_path,
  210. blob_from_path_and_stat,
  211. index_entry_from_directory,
  212. index_entry_from_stat,
  213. )
  214. index = self._repo.open_index()
  215. blob_normalizer = self._repo.get_blob_normalizer()
  216. for fs_path in fs_paths:
  217. if not isinstance(fs_path, bytes):
  218. fs_path = os.fsencode(fs_path)
  219. if os.path.isabs(fs_path):
  220. raise ValueError(
  221. f"path {fs_path!r} should be relative to "
  222. "repository root, not absolute"
  223. )
  224. tree_path = _fs_to_tree_path(fs_path)
  225. full_path = os.path.join(root_path_bytes, fs_path)
  226. try:
  227. st = os.lstat(full_path)
  228. except (FileNotFoundError, NotADirectoryError):
  229. # File no longer exists
  230. try:
  231. del index[tree_path]
  232. except KeyError:
  233. pass # already removed
  234. else:
  235. if stat.S_ISDIR(st.st_mode):
  236. entry = index_entry_from_directory(st, full_path)
  237. if entry:
  238. index[tree_path] = entry
  239. else:
  240. try:
  241. del index[tree_path]
  242. except KeyError:
  243. pass
  244. elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
  245. try:
  246. del index[tree_path]
  247. except KeyError:
  248. pass
  249. else:
  250. blob = blob_from_path_and_stat(full_path, st)
  251. blob = blob_normalizer.checkin_normalize(blob, fs_path)
  252. self._repo.object_store.add_object(blob)
  253. index[tree_path] = index_entry_from_stat(st, blob.id)
  254. index.write()
  255. def unstage(self, fs_paths: list[str]) -> None:
  256. """Unstage specific file in the index
  257. Args:
  258. fs_paths: a list of files to unstage,
  259. relative to the repository path.
  260. """
  261. from .index import IndexEntry, _fs_to_tree_path
  262. index = self._repo.open_index()
  263. try:
  264. tree_id = self._repo[b"HEAD"].tree
  265. except KeyError:
  266. # no head mean no commit in the repo
  267. for fs_path in fs_paths:
  268. tree_path = _fs_to_tree_path(fs_path)
  269. del index[tree_path]
  270. index.write()
  271. return
  272. for fs_path in fs_paths:
  273. tree_path = _fs_to_tree_path(fs_path)
  274. try:
  275. tree = self._repo.object_store[tree_id]
  276. assert isinstance(tree, Tree)
  277. tree_entry = tree.lookup_path(
  278. self._repo.object_store.__getitem__, tree_path
  279. )
  280. except KeyError:
  281. # if tree_entry didn't exist, this file was being added, so
  282. # remove index entry
  283. try:
  284. del index[tree_path]
  285. continue
  286. except KeyError as exc:
  287. raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
  288. st = None
  289. try:
  290. st = os.lstat(os.path.join(self.path, fs_path))
  291. except FileNotFoundError:
  292. pass
  293. index_entry = IndexEntry(
  294. ctime=(self._repo[b"HEAD"].commit_time, 0),
  295. mtime=(self._repo[b"HEAD"].commit_time, 0),
  296. dev=st.st_dev if st else 0,
  297. ino=st.st_ino if st else 0,
  298. mode=tree_entry[0],
  299. uid=st.st_uid if st else 0,
  300. gid=st.st_gid if st else 0,
  301. size=len(self._repo[tree_entry[1]].data),
  302. sha=tree_entry[1],
  303. flags=0,
  304. extended_flags=0,
  305. )
  306. index[tree_path] = index_entry
  307. index.write()
  308. def commit(
  309. self,
  310. message: bytes | None = None,
  311. committer: bytes | None = None,
  312. author: bytes | None = None,
  313. commit_timestamp=None,
  314. commit_timezone=None,
  315. author_timestamp=None,
  316. author_timezone=None,
  317. tree: ObjectID | None = None,
  318. encoding: bytes | None = None,
  319. ref: Ref | None = b"HEAD",
  320. merge_heads: list[ObjectID] | None = None,
  321. no_verify: bool = False,
  322. sign: bool = False,
  323. ):
  324. """Create a new commit.
  325. If not specified, committer and author default to
  326. get_user_identity(..., 'COMMITTER')
  327. and get_user_identity(..., 'AUTHOR') respectively.
  328. Args:
  329. message: Commit message (bytes or callable that takes (repo, commit)
  330. and returns bytes)
  331. committer: Committer fullname
  332. author: Author fullname
  333. commit_timestamp: Commit timestamp (defaults to now)
  334. commit_timezone: Commit timestamp timezone (defaults to GMT)
  335. author_timestamp: Author timestamp (defaults to commit
  336. timestamp)
  337. author_timezone: Author timestamp timezone
  338. (defaults to commit timestamp timezone)
  339. tree: SHA1 of the tree root to use (if not specified the
  340. current index will be committed).
  341. encoding: Encoding
  342. ref: Optional ref to commit to (defaults to current branch).
  343. If None, creates a dangling commit without updating any ref.
  344. merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
  345. no_verify: Skip pre-commit and commit-msg hooks
  346. sign: GPG Sign the commit (bool, defaults to False,
  347. pass True to use default GPG key,
  348. pass a str containing Key ID to use a specific GPG key)
  349. Returns:
  350. New commit SHA1
  351. """
  352. try:
  353. if not no_verify:
  354. self._repo.hooks["pre-commit"].execute()
  355. except HookError as exc:
  356. raise CommitError(exc) from exc
  357. except KeyError: # no hook defined, silent fallthrough
  358. pass
  359. c = Commit()
  360. if tree is None:
  361. index = self._repo.open_index()
  362. c.tree = index.commit(self._repo.object_store)
  363. else:
  364. if len(tree) != 40:
  365. raise ValueError("tree must be a 40-byte hex sha string")
  366. c.tree = tree
  367. config = self._repo.get_config_stack()
  368. if merge_heads is None:
  369. merge_heads = self._repo._read_heads("MERGE_HEAD")
  370. if committer is None:
  371. committer = get_user_identity(config, kind="COMMITTER")
  372. check_user_identity(committer)
  373. c.committer = committer
  374. if commit_timestamp is None:
  375. # FIXME: Support GIT_COMMITTER_DATE environment variable
  376. commit_timestamp = time.time()
  377. c.commit_time = int(commit_timestamp)
  378. if commit_timezone is None:
  379. # FIXME: Use current user timezone rather than UTC
  380. commit_timezone = 0
  381. c.commit_timezone = commit_timezone
  382. if author is None:
  383. author = get_user_identity(config, kind="AUTHOR")
  384. c.author = author
  385. check_user_identity(author)
  386. if author_timestamp is None:
  387. # FIXME: Support GIT_AUTHOR_DATE environment variable
  388. author_timestamp = commit_timestamp
  389. c.author_time = int(author_timestamp)
  390. if author_timezone is None:
  391. author_timezone = commit_timezone
  392. c.author_timezone = author_timezone
  393. if encoding is None:
  394. try:
  395. encoding = config.get(("i18n",), "commitEncoding")
  396. except KeyError:
  397. pass # No dice
  398. if encoding is not None:
  399. c.encoding = encoding
  400. # Store original message (might be callable)
  401. original_message = message
  402. message = None # Will be set later after parents are set
  403. # Check if we should sign the commit
  404. should_sign = sign
  405. if sign is None:
  406. # Check commit.gpgSign configuration when sign is not explicitly set
  407. config = self._repo.get_config_stack()
  408. try:
  409. should_sign = config.get_boolean((b"commit",), b"gpgSign")
  410. except KeyError:
  411. should_sign = False # Default to not signing if no config
  412. keyid = sign if isinstance(sign, str) else None
  413. if ref is None:
  414. # Create a dangling commit
  415. c.parents = merge_heads
  416. else:
  417. try:
  418. old_head = self._repo.refs[ref]
  419. c.parents = [old_head, *merge_heads]
  420. except KeyError:
  421. c.parents = merge_heads
  422. # Handle message after parents are set
  423. if callable(original_message):
  424. message = original_message(self._repo, c)
  425. if message is None:
  426. raise ValueError("Message callback returned None")
  427. else:
  428. message = original_message
  429. if message is None:
  430. # FIXME: Try to read commit message from .git/MERGE_MSG
  431. raise ValueError("No commit message specified")
  432. try:
  433. if no_verify:
  434. c.message = message
  435. else:
  436. c.message = self._repo.hooks["commit-msg"].execute(message)
  437. if c.message is None:
  438. c.message = message
  439. except HookError as exc:
  440. raise CommitError(exc) from exc
  441. except KeyError: # no hook defined, message not modified
  442. c.message = message
  443. if ref is None:
  444. # Create a dangling commit
  445. if should_sign:
  446. c.sign(keyid)
  447. self._repo.object_store.add_object(c)
  448. else:
  449. try:
  450. old_head = self._repo.refs[ref]
  451. if should_sign:
  452. c.sign(keyid)
  453. self._repo.object_store.add_object(c)
  454. ok = self._repo.refs.set_if_equals(
  455. ref,
  456. old_head,
  457. c.id,
  458. message=b"commit: " + message,
  459. committer=committer,
  460. timestamp=commit_timestamp,
  461. timezone=commit_timezone,
  462. )
  463. except KeyError:
  464. c.parents = merge_heads
  465. if should_sign:
  466. c.sign(keyid)
  467. self._repo.object_store.add_object(c)
  468. ok = self._repo.refs.add_if_new(
  469. ref,
  470. c.id,
  471. message=b"commit: " + message,
  472. committer=committer,
  473. timestamp=commit_timestamp,
  474. timezone=commit_timezone,
  475. )
  476. if not ok:
  477. # Fail if the atomic compare-and-swap failed, leaving the
  478. # commit and all its objects as garbage.
  479. raise CommitError(f"{ref!r} changed during commit")
  480. self._repo._del_named_file("MERGE_HEAD")
  481. try:
  482. self._repo.hooks["post-commit"].execute()
  483. except HookError as e: # silent failure
  484. warnings.warn(f"post-commit hook failed: {e}", UserWarning)
  485. except KeyError: # no hook defined, silent fallthrough
  486. pass
  487. # Trigger auto GC if needed
  488. from .gc import maybe_auto_gc
  489. maybe_auto_gc(self._repo)
  490. return c.id
  491. def reset_index(self, tree: bytes | None = None):
  492. """Reset the index back to a specific tree.
  493. Args:
  494. tree: Tree SHA to reset to, None for current HEAD tree.
  495. """
  496. from .index import (
  497. build_index_from_tree,
  498. symlink,
  499. validate_path_element_default,
  500. validate_path_element_hfs,
  501. validate_path_element_ntfs,
  502. )
  503. if tree is None:
  504. head = self._repo[b"HEAD"]
  505. if isinstance(head, Tag):
  506. _cls, obj = head.object
  507. head = self._repo.get_object(obj)
  508. tree = head.tree
  509. config = self._repo.get_config()
  510. honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
  511. if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
  512. validate_path_element = validate_path_element_ntfs
  513. elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
  514. validate_path_element = validate_path_element_hfs
  515. else:
  516. validate_path_element = validate_path_element_default
  517. if config.get_boolean(b"core", b"symlinks", True):
  518. symlink_fn = symlink
  519. else:
  520. def symlink_fn(source, target) -> None: # type: ignore
  521. with open(
  522. target, "w" + ("b" if isinstance(source, bytes) else "")
  523. ) as f:
  524. f.write(source)
  525. blob_normalizer = self._repo.get_blob_normalizer()
  526. return build_index_from_tree(
  527. self.path,
  528. self._repo.index_path(),
  529. self._repo.object_store,
  530. tree,
  531. honor_filemode=honor_filemode,
  532. validate_path_element=validate_path_element,
  533. symlink_fn=symlink_fn,
  534. blob_normalizer=blob_normalizer,
  535. )
  536. def _sparse_checkout_file_path(self) -> str:
  537. """Return the path of the sparse-checkout file in this repo's control dir."""
  538. return os.path.join(self._repo.controldir(), "info", "sparse-checkout")
  539. def configure_for_cone_mode(self) -> None:
  540. """Ensure the repository is configured for cone-mode sparse-checkout."""
  541. config = self._repo.get_config()
  542. config.set((b"core",), b"sparseCheckout", b"true")
  543. config.set((b"core",), b"sparseCheckoutCone", b"true")
  544. config.write_to_path()
  545. def infer_cone_mode(self) -> bool:
  546. """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
  547. config = self._repo.get_config()
  548. try:
  549. sc_cone = config.get((b"core",), b"sparseCheckoutCone")
  550. return sc_cone == b"true"
  551. except KeyError:
  552. # If core.sparseCheckoutCone is not set, default to False
  553. return False
  554. def get_sparse_checkout_patterns(self) -> list[str]:
  555. """Return a list of sparse-checkout patterns from info/sparse-checkout.
  556. Returns:
  557. A list of patterns. Returns an empty list if the file is missing.
  558. """
  559. path = self._sparse_checkout_file_path()
  560. try:
  561. with open(path, encoding="utf-8") as f:
  562. return [line.strip() for line in f if line.strip()]
  563. except FileNotFoundError:
  564. return []
  565. def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
  566. """Write the given sparse-checkout patterns into info/sparse-checkout.
  567. Creates the info/ directory if it does not exist.
  568. Args:
  569. patterns: A list of gitignore-style patterns to store.
  570. """
  571. info_dir = os.path.join(self._repo.controldir(), "info")
  572. os.makedirs(info_dir, exist_ok=True)
  573. path = self._sparse_checkout_file_path()
  574. with open(path, "w", encoding="utf-8") as f:
  575. for pat in patterns:
  576. f.write(pat + "\n")
  577. def set_cone_mode_patterns(self, dirs: list[str] | None = None) -> None:
  578. """Write the given cone-mode directory patterns into info/sparse-checkout.
  579. For each directory to include, add an inclusion line that "undoes" the prior
  580. ``!/*/`` 'exclude' that re-includes that directory and everything under it.
  581. Never add the same line twice.
  582. """
  583. patterns = ["/*", "!/*/"]
  584. if dirs:
  585. for d in dirs:
  586. d = d.strip("/")
  587. line = f"/{d}/"
  588. if d and line not in patterns:
  589. patterns.append(line)
  590. self.set_sparse_checkout_patterns(patterns)
  591. def read_worktree_lock_reason(worktree_path: str) -> str | None:
  592. """Read the lock reason for a worktree.
  593. Args:
  594. worktree_path: Path to the worktree's administrative directory
  595. Returns:
  596. The lock reason if the worktree is locked, None otherwise
  597. """
  598. locked_path = os.path.join(worktree_path, "locked")
  599. if not os.path.exists(locked_path):
  600. return None
  601. try:
  602. with open(locked_path) as f:
  603. return f.read().strip()
  604. except (FileNotFoundError, PermissionError):
  605. return None
  606. def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
  607. """List all worktrees for the given repository.
  608. Args:
  609. repo: The repository to list worktrees for
  610. Returns:
  611. A list of WorkTreeInfo objects
  612. """
  613. worktrees = []
  614. # Add main worktree
  615. main_wt_info = WorkTreeInfo(
  616. path=repo.path,
  617. head=repo.head(),
  618. bare=repo.bare,
  619. detached=False,
  620. locked=False,
  621. prunable=False,
  622. )
  623. # Get branch info for main worktree
  624. try:
  625. with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
  626. head_contents = f.read().strip()
  627. if head_contents.startswith(SYMREF):
  628. ref_name = head_contents[len(SYMREF) :].strip()
  629. main_wt_info.branch = ref_name
  630. else:
  631. main_wt_info.detached = True
  632. main_wt_info.branch = None
  633. except (FileNotFoundError, PermissionError):
  634. main_wt_info.branch = None
  635. main_wt_info.detached = True
  636. worktrees.append(main_wt_info)
  637. # List additional worktrees
  638. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  639. if os.path.isdir(worktrees_dir):
  640. for entry in os.listdir(worktrees_dir):
  641. worktree_path = os.path.join(worktrees_dir, entry)
  642. if not os.path.isdir(worktree_path):
  643. continue
  644. wt_info = WorkTreeInfo(
  645. path="", # Will be set below
  646. bare=False,
  647. detached=False,
  648. locked=False,
  649. prunable=False,
  650. )
  651. # Read gitdir to get actual worktree path
  652. gitdir_path = os.path.join(worktree_path, GITDIR)
  653. try:
  654. with open(gitdir_path, "rb") as f:
  655. gitdir_contents = f.read().strip()
  656. # Convert relative path to absolute if needed
  657. wt_path = os.fsdecode(gitdir_contents)
  658. if not os.path.isabs(wt_path):
  659. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  660. wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
  661. except (FileNotFoundError, PermissionError):
  662. # Worktree directory is missing, skip it
  663. # TODO: Consider adding these as prunable worktrees with a placeholder path
  664. continue
  665. # Check if worktree path exists
  666. if wt_info.path and not os.path.exists(wt_info.path):
  667. wt_info.prunable = True
  668. # Read HEAD
  669. head_path = os.path.join(worktree_path, "HEAD")
  670. try:
  671. with open(head_path, "rb") as f:
  672. head_contents = f.read().strip()
  673. if head_contents.startswith(SYMREF):
  674. ref_name = head_contents[len(SYMREF) :].strip()
  675. wt_info.branch = ref_name
  676. # Resolve ref to get commit sha
  677. try:
  678. wt_info.head = repo.refs[ref_name]
  679. except KeyError:
  680. wt_info.head = None
  681. else:
  682. wt_info.detached = True
  683. wt_info.branch = None
  684. wt_info.head = head_contents
  685. except (FileNotFoundError, PermissionError):
  686. wt_info.head = None
  687. wt_info.branch = None
  688. # Check if locked
  689. lock_reason = read_worktree_lock_reason(worktree_path)
  690. if lock_reason is not None:
  691. wt_info.locked = True
  692. wt_info.lock_reason = lock_reason
  693. worktrees.append(wt_info)
  694. return worktrees
  695. def add_worktree(
  696. repo: Repo,
  697. path: str | bytes | os.PathLike,
  698. branch: str | bytes | None = None,
  699. commit: ObjectID | None = None,
  700. force: bool = False,
  701. detach: bool = False,
  702. ) -> Repo:
  703. """Add a new worktree to the repository.
  704. Args:
  705. repo: The main repository
  706. path: Path where the new worktree should be created
  707. branch: Branch to checkout in the new worktree (creates if doesn't exist)
  708. commit: Specific commit to checkout (results in detached HEAD)
  709. force: Force creation even if branch is already checked out elsewhere
  710. detach: Detach HEAD in the new worktree
  711. Returns:
  712. The newly created worktree repository
  713. Raises:
  714. ValueError: If the path already exists or branch is already checked out
  715. """
  716. from .repo import Repo as RepoClass
  717. path = os.fspath(path)
  718. if isinstance(path, bytes):
  719. path = os.fsdecode(path)
  720. # Check if path already exists
  721. if os.path.exists(path):
  722. raise ValueError(f"Path already exists: {path}")
  723. # Normalize branch name
  724. if branch is not None:
  725. if isinstance(branch, str):
  726. branch = branch.encode()
  727. if not branch.startswith(b"refs/heads/"):
  728. branch = b"refs/heads/" + branch
  729. # Check if branch is already checked out in another worktree
  730. if branch and not force:
  731. for wt in list_worktrees(repo):
  732. if wt.branch == branch:
  733. raise ValueError(
  734. f"Branch {branch.decode()} is already checked out at {wt.path}"
  735. )
  736. # Determine what to checkout
  737. if commit is not None:
  738. checkout_ref = commit
  739. detach = True
  740. elif branch is not None:
  741. # Check if branch exists
  742. try:
  743. checkout_ref = repo.refs[branch]
  744. except KeyError:
  745. if commit is None:
  746. # Create new branch from HEAD
  747. checkout_ref = repo.head()
  748. repo.refs[branch] = checkout_ref
  749. else:
  750. # Create new branch from specified commit
  751. checkout_ref = commit
  752. repo.refs[branch] = checkout_ref
  753. else:
  754. # Default to current HEAD
  755. checkout_ref = repo.head()
  756. detach = True
  757. # Create the worktree directory
  758. os.makedirs(path)
  759. # Initialize the worktree
  760. identifier = os.path.basename(path)
  761. wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier)
  762. # Set HEAD appropriately
  763. if detach:
  764. # Detached HEAD - write SHA directly to HEAD
  765. with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
  766. f.write(checkout_ref + b"\n")
  767. else:
  768. # Point to branch
  769. wt_repo.refs.set_symbolic_ref(b"HEAD", branch)
  770. # Reset index to match HEAD
  771. wt_repo.get_worktree().reset_index()
  772. return wt_repo
  773. def remove_worktree(
  774. repo: Repo, path: str | bytes | os.PathLike, force: bool = False
  775. ) -> None:
  776. """Remove a worktree.
  777. Args:
  778. repo: The main repository
  779. path: Path to the worktree to remove
  780. force: Force removal even if there are local changes
  781. Raises:
  782. ValueError: If the worktree doesn't exist, has local changes, or is locked
  783. """
  784. path = os.fspath(path)
  785. if isinstance(path, bytes):
  786. path = os.fsdecode(path)
  787. # Don't allow removing the main worktree
  788. if os.path.abspath(path) == os.path.abspath(repo.path):
  789. raise ValueError("Cannot remove the main working tree")
  790. # Find the worktree
  791. worktree_found = False
  792. worktree_id = None
  793. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  794. if os.path.isdir(worktrees_dir):
  795. for entry in os.listdir(worktrees_dir):
  796. worktree_path = os.path.join(worktrees_dir, entry)
  797. gitdir_path = os.path.join(worktree_path, GITDIR)
  798. try:
  799. with open(gitdir_path, "rb") as f:
  800. gitdir_contents = f.read().strip()
  801. wt_path = os.fsdecode(gitdir_contents)
  802. if not os.path.isabs(wt_path):
  803. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  804. wt_dir = os.path.dirname(wt_path) # Remove .git suffix
  805. if os.path.abspath(wt_dir) == os.path.abspath(path):
  806. worktree_found = True
  807. worktree_id = entry
  808. break
  809. except (FileNotFoundError, PermissionError):
  810. continue
  811. if not worktree_found:
  812. raise ValueError(f"Worktree not found: {path}")
  813. assert worktree_id is not None # Should be set if worktree_found is True
  814. worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
  815. # Check if locked
  816. if os.path.exists(os.path.join(worktree_control_dir, "locked")):
  817. if not force:
  818. raise ValueError(f"Worktree is locked: {path}")
  819. # Check for local changes if not forcing
  820. if not force and os.path.exists(path):
  821. # TODO: Check for uncommitted changes in the worktree
  822. pass
  823. # Remove the working directory
  824. if os.path.exists(path):
  825. shutil.rmtree(path)
  826. # Remove the administrative files
  827. shutil.rmtree(worktree_control_dir)
  828. def prune_worktrees(
  829. repo: Repo, expire: int | None = None, dry_run: bool = False
  830. ) -> list[str]:
  831. """Prune worktree administrative files for missing worktrees.
  832. Args:
  833. repo: The main repository
  834. expire: Only prune worktrees older than this many seconds
  835. dry_run: Don't actually remove anything, just report what would be removed
  836. Returns:
  837. List of pruned worktree identifiers
  838. """
  839. pruned: list[str] = []
  840. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  841. if not os.path.isdir(worktrees_dir):
  842. return pruned
  843. current_time = time.time()
  844. for entry in os.listdir(worktrees_dir):
  845. worktree_path = os.path.join(worktrees_dir, entry)
  846. if not os.path.isdir(worktree_path):
  847. continue
  848. # Skip locked worktrees
  849. if os.path.exists(os.path.join(worktree_path, "locked")):
  850. continue
  851. should_prune = False
  852. # Check if gitdir exists and points to valid location
  853. gitdir_path = os.path.join(worktree_path, GITDIR)
  854. try:
  855. with open(gitdir_path, "rb") as f:
  856. gitdir_contents = f.read().strip()
  857. wt_path = os.fsdecode(gitdir_contents)
  858. if not os.path.isabs(wt_path):
  859. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  860. wt_dir = os.path.dirname(wt_path) # Remove .git suffix
  861. if not os.path.exists(wt_dir):
  862. should_prune = True
  863. except (FileNotFoundError, PermissionError):
  864. should_prune = True
  865. # Check expiry time if specified
  866. if should_prune and expire is not None:
  867. stat_info = os.stat(worktree_path)
  868. age = current_time - stat_info.st_mtime
  869. if age < expire:
  870. should_prune = False
  871. if should_prune:
  872. pruned.append(entry)
  873. if not dry_run:
  874. shutil.rmtree(worktree_path)
  875. return pruned
  876. def lock_worktree(
  877. repo: Repo, path: str | bytes | os.PathLike, reason: str | None = None
  878. ) -> None:
  879. """Lock a worktree to prevent it from being pruned.
  880. Args:
  881. repo: The main repository
  882. path: Path to the worktree to lock
  883. reason: Optional reason for locking
  884. """
  885. worktree_id = _find_worktree_id(repo, path)
  886. worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
  887. lock_path = os.path.join(worktree_control_dir, "locked")
  888. with open(lock_path, "w") as f:
  889. if reason:
  890. f.write(reason)
  891. def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike) -> None:
  892. """Unlock a worktree.
  893. Args:
  894. repo: The main repository
  895. path: Path to the worktree to unlock
  896. """
  897. worktree_id = _find_worktree_id(repo, path)
  898. worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
  899. lock_path = os.path.join(worktree_control_dir, "locked")
  900. if os.path.exists(lock_path):
  901. os.remove(lock_path)
  902. def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike) -> str:
  903. """Find the worktree identifier for the given path.
  904. Args:
  905. repo: The main repository
  906. path: Path to the worktree
  907. Returns:
  908. The worktree identifier
  909. Raises:
  910. ValueError: If the worktree is not found
  911. """
  912. path = os.fspath(path)
  913. if isinstance(path, bytes):
  914. path = os.fsdecode(path)
  915. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  916. if os.path.isdir(worktrees_dir):
  917. for entry in os.listdir(worktrees_dir):
  918. worktree_path = os.path.join(worktrees_dir, entry)
  919. gitdir_path = os.path.join(worktree_path, GITDIR)
  920. try:
  921. with open(gitdir_path, "rb") as f:
  922. gitdir_contents = f.read().strip()
  923. wt_path = os.fsdecode(gitdir_contents)
  924. if not os.path.isabs(wt_path):
  925. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  926. wt_dir = os.path.dirname(wt_path) # Remove .git suffix
  927. if os.path.abspath(wt_dir) == os.path.abspath(path):
  928. return entry
  929. except (FileNotFoundError, PermissionError):
  930. continue
  931. raise ValueError(f"Worktree not found: {path}")
  932. def move_worktree(
  933. repo: Repo,
  934. old_path: str | bytes | os.PathLike,
  935. new_path: str | bytes | os.PathLike,
  936. ) -> None:
  937. """Move a worktree to a new location.
  938. Args:
  939. repo: The main repository
  940. old_path: Current path of the worktree
  941. new_path: New path for the worktree
  942. Raises:
  943. ValueError: If the worktree doesn't exist or new path already exists
  944. """
  945. old_path = os.fspath(old_path)
  946. new_path = os.fspath(new_path)
  947. if isinstance(old_path, bytes):
  948. old_path = os.fsdecode(old_path)
  949. if isinstance(new_path, bytes):
  950. new_path = os.fsdecode(new_path)
  951. # Don't allow moving the main worktree
  952. if os.path.abspath(old_path) == os.path.abspath(repo.path):
  953. raise ValueError("Cannot move the main working tree")
  954. # Check if new path already exists
  955. if os.path.exists(new_path):
  956. raise ValueError(f"Path already exists: {new_path}")
  957. # Find the worktree
  958. worktree_id = _find_worktree_id(repo, old_path)
  959. worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
  960. # Move the actual worktree directory
  961. shutil.move(old_path, new_path)
  962. # Update the gitdir file in the worktree
  963. gitdir_file = os.path.join(new_path, ".git")
  964. # Update the gitdir pointer in the control directory
  965. with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
  966. f.write(os.fsencode(gitdir_file) + b"\n")