worktree.py 49 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413
  1. # worktree.py -- Working tree operations for Git repositories
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Working tree operations for Git repositories."""
  22. from __future__ import annotations
  23. import builtins
  24. import os
  25. import shutil
  26. import stat
  27. import sys
  28. import tempfile
  29. import time
  30. import warnings
  31. from collections.abc import Iterable, Iterator, Sequence
  32. from contextlib import contextmanager
  33. from pathlib import Path
  34. from typing import Any, Callable, Union
  35. from .errors import CommitError, HookError
  36. from .objects import Blob, Commit, ObjectID, Tag, Tree
  37. from .refs import SYMREF, Ref, local_branch_name
  38. from .repo import (
  39. GITDIR,
  40. WORKTREES,
  41. Repo,
  42. check_user_identity,
  43. get_user_identity,
  44. )
  45. from .trailers import add_trailer_to_message
  46. class WorkTreeInfo:
  47. """Information about a single worktree.
  48. Attributes:
  49. path: Path to the worktree
  50. head: Current HEAD commit SHA
  51. branch: Current branch (if not detached)
  52. bare: Whether this is a bare repository
  53. detached: Whether HEAD is detached
  54. locked: Whether the worktree is locked
  55. prunable: Whether the worktree can be pruned
  56. lock_reason: Reason for locking (if locked)
  57. """
  58. def __init__(
  59. self,
  60. path: str,
  61. head: bytes | None = None,
  62. branch: bytes | None = None,
  63. bare: bool = False,
  64. detached: bool = False,
  65. locked: bool = False,
  66. prunable: bool = False,
  67. lock_reason: str | None = None,
  68. ):
  69. """Initialize WorkTreeInfo.
  70. Args:
  71. path: Path to the worktree
  72. head: Current HEAD commit SHA
  73. branch: Current branch (if not detached)
  74. bare: Whether this is a bare repository
  75. detached: Whether HEAD is detached
  76. locked: Whether the worktree is locked
  77. prunable: Whether the worktree can be pruned
  78. lock_reason: Reason for locking (if locked)
  79. """
  80. self.path = path
  81. self.head = head
  82. self.branch = branch
  83. self.bare = bare
  84. self.detached = detached
  85. self.locked = locked
  86. self.prunable = prunable
  87. self.lock_reason = lock_reason
  88. def __repr__(self) -> str:
  89. """Return string representation of WorkTreeInfo."""
  90. return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
  91. def __eq__(self, other: object) -> bool:
  92. """Check equality with another WorkTreeInfo."""
  93. if not isinstance(other, WorkTreeInfo):
  94. return NotImplemented
  95. return (
  96. self.path == other.path
  97. and self.head == other.head
  98. and self.branch == other.branch
  99. and self.bare == other.bare
  100. and self.detached == other.detached
  101. and self.locked == other.locked
  102. and self.prunable == other.prunable
  103. and self.lock_reason == other.lock_reason
  104. )
  105. def open(self) -> WorkTree:
  106. """Open this worktree as a WorkTree.
  107. Returns:
  108. WorkTree object for this worktree
  109. Raises:
  110. NotGitRepository: If the worktree path is invalid
  111. """
  112. from .repo import Repo
  113. repo = Repo(self.path)
  114. return WorkTree(repo, self.path)
  115. class WorkTreeContainer:
  116. """Container for managing multiple working trees.
  117. This class manages worktrees for a repository, similar to how
  118. RefsContainer manages references.
  119. """
  120. def __init__(self, repo: Repo) -> None:
  121. """Initialize a WorkTreeContainer for the given repository.
  122. Args:
  123. repo: The repository this container belongs to
  124. """
  125. self._repo = repo
  126. def list(self) -> list[WorkTreeInfo]:
  127. """List all worktrees for this repository.
  128. Returns:
  129. A list of WorkTreeInfo objects
  130. """
  131. return list_worktrees(self._repo)
  132. def add(
  133. self,
  134. path: str | bytes | os.PathLike[str],
  135. branch: str | bytes | None = None,
  136. commit: ObjectID | None = None,
  137. force: bool = False,
  138. detach: bool = False,
  139. exist_ok: bool = False,
  140. ) -> Repo:
  141. """Add a new worktree.
  142. Args:
  143. path: Path where the new worktree should be created
  144. branch: Branch to checkout in the new worktree
  145. commit: Specific commit to checkout (results in detached HEAD)
  146. force: Force creation even if branch is already checked out elsewhere
  147. detach: Detach HEAD in the new worktree
  148. exist_ok: If True, do not raise an error if the directory already exists
  149. Returns:
  150. The newly created worktree repository
  151. """
  152. return add_worktree(
  153. self._repo,
  154. path,
  155. branch=branch,
  156. commit=commit,
  157. force=force,
  158. detach=detach,
  159. exist_ok=exist_ok,
  160. )
  161. def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None:
  162. """Remove a worktree.
  163. Args:
  164. path: Path to the worktree to remove
  165. force: Force removal even if there are local changes
  166. """
  167. remove_worktree(self._repo, path, force=force)
  168. def prune(
  169. self, expire: int | None = None, dry_run: bool = False
  170. ) -> builtins.list[str]:
  171. """Prune worktree administrative files for missing worktrees.
  172. Args:
  173. expire: Only prune worktrees older than this many seconds
  174. dry_run: Don't actually remove anything, just report what would be removed
  175. Returns:
  176. List of pruned worktree identifiers
  177. """
  178. return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
  179. def move(
  180. self,
  181. old_path: str | bytes | os.PathLike[str],
  182. new_path: str | bytes | os.PathLike[str],
  183. ) -> None:
  184. """Move a worktree to a new location.
  185. Args:
  186. old_path: Current path of the worktree
  187. new_path: New path for the worktree
  188. """
  189. move_worktree(self._repo, old_path, new_path)
  190. def lock(
  191. self, path: str | bytes | os.PathLike[str], reason: str | None = None
  192. ) -> None:
  193. """Lock a worktree to prevent it from being pruned.
  194. Args:
  195. path: Path to the worktree to lock
  196. reason: Optional reason for locking
  197. """
  198. lock_worktree(self._repo, path, reason=reason)
  199. def unlock(self, path: str | bytes | os.PathLike[str]) -> None:
  200. """Unlock a worktree.
  201. Args:
  202. path: Path to the worktree to unlock
  203. """
  204. unlock_worktree(self._repo, path)
  205. def repair(
  206. self, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
  207. ) -> builtins.list[str]:
  208. """Repair worktree administrative files.
  209. Args:
  210. paths: Optional list of worktree paths to repair. If None, repairs
  211. connections from the main repository to all linked worktrees.
  212. Returns:
  213. List of repaired worktree paths
  214. """
  215. return repair_worktree(self._repo, paths=paths)
  216. def __iter__(self) -> Iterator[WorkTreeInfo]:
  217. """Iterate over all worktrees."""
  218. yield from self.list()
  219. class WorkTree:
  220. """Working tree operations for a Git repository.
  221. This class provides methods for working with the working tree,
  222. such as staging files, committing changes, and resetting the index.
  223. """
  224. def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
  225. """Initialize a WorkTree for the given repository.
  226. Args:
  227. repo: The repository this working tree belongs to
  228. path: Path to the working tree directory
  229. """
  230. self._repo = repo
  231. raw_path = os.fspath(path)
  232. if isinstance(raw_path, bytes):
  233. self.path: str = os.fsdecode(raw_path)
  234. else:
  235. self.path = raw_path
  236. self.path = os.path.abspath(self.path)
  237. def stage(
  238. self,
  239. fs_paths: str
  240. | bytes
  241. | os.PathLike[str]
  242. | Iterable[str | bytes | os.PathLike[str]],
  243. ) -> None:
  244. """Stage a set of paths.
  245. Args:
  246. fs_paths: List of paths, relative to the repository path
  247. """
  248. root_path_bytes = os.fsencode(self.path)
  249. if isinstance(fs_paths, (str, bytes, os.PathLike)):
  250. fs_paths = [fs_paths]
  251. fs_paths = list(fs_paths)
  252. from .index import (
  253. _fs_to_tree_path,
  254. blob_from_path_and_stat,
  255. index_entry_from_directory,
  256. index_entry_from_stat,
  257. )
  258. index = self._repo.open_index()
  259. blob_normalizer = self._repo.get_blob_normalizer()
  260. for fs_path in fs_paths:
  261. if not isinstance(fs_path, bytes):
  262. fs_path = os.fsencode(fs_path)
  263. if os.path.isabs(fs_path):
  264. raise ValueError(
  265. f"path {fs_path!r} should be relative to "
  266. "repository root, not absolute"
  267. )
  268. tree_path = _fs_to_tree_path(fs_path)
  269. full_path = os.path.join(root_path_bytes, fs_path)
  270. try:
  271. st = os.lstat(full_path)
  272. except (FileNotFoundError, NotADirectoryError):
  273. # File no longer exists
  274. try:
  275. del index[tree_path]
  276. except KeyError:
  277. pass # already removed
  278. else:
  279. if stat.S_ISDIR(st.st_mode):
  280. entry = index_entry_from_directory(st, full_path)
  281. if entry:
  282. index[tree_path] = entry
  283. else:
  284. try:
  285. del index[tree_path]
  286. except KeyError:
  287. pass
  288. elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
  289. try:
  290. del index[tree_path]
  291. except KeyError:
  292. pass
  293. else:
  294. blob = blob_from_path_and_stat(full_path, st)
  295. blob = blob_normalizer.checkin_normalize(blob, fs_path)
  296. self._repo.object_store.add_object(blob)
  297. index[tree_path] = index_entry_from_stat(st, blob.id)
  298. index.write()
  299. def unstage(self, fs_paths: Sequence[str]) -> None:
  300. """Unstage specific file in the index.
  301. Args:
  302. fs_paths: a list of files to unstage,
  303. relative to the repository path.
  304. """
  305. from .index import IndexEntry, _fs_to_tree_path
  306. index = self._repo.open_index()
  307. try:
  308. commit = self._repo[b"HEAD"]
  309. except KeyError:
  310. # no head mean no commit in the repo
  311. for fs_path in fs_paths:
  312. tree_path = _fs_to_tree_path(fs_path)
  313. del index[tree_path]
  314. index.write()
  315. return
  316. else:
  317. assert isinstance(commit, Commit), "HEAD must be a commit"
  318. tree_id = commit.tree
  319. for fs_path in fs_paths:
  320. tree_path = _fs_to_tree_path(fs_path)
  321. try:
  322. tree = self._repo.object_store[tree_id]
  323. assert isinstance(tree, Tree)
  324. tree_entry = tree.lookup_path(
  325. self._repo.object_store.__getitem__, tree_path
  326. )
  327. except KeyError:
  328. # if tree_entry didn't exist, this file was being added, so
  329. # remove index entry
  330. try:
  331. del index[tree_path]
  332. continue
  333. except KeyError as exc:
  334. raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
  335. st = None
  336. try:
  337. st = os.lstat(os.path.join(self.path, fs_path))
  338. except FileNotFoundError:
  339. pass
  340. blob_obj = self._repo[tree_entry[1]]
  341. assert isinstance(blob_obj, Blob)
  342. blob_size = len(blob_obj.data)
  343. index_entry = IndexEntry(
  344. ctime=(commit.commit_time, 0),
  345. mtime=(commit.commit_time, 0),
  346. dev=st.st_dev if st else 0,
  347. ino=st.st_ino if st else 0,
  348. mode=tree_entry[0],
  349. uid=st.st_uid if st else 0,
  350. gid=st.st_gid if st else 0,
  351. size=blob_size,
  352. sha=tree_entry[1],
  353. flags=0,
  354. extended_flags=0,
  355. )
  356. index[tree_path] = index_entry
  357. index.write()
  358. def commit(
  359. self,
  360. message: Union[str, bytes, Callable[[Any, Commit], bytes], None] = None,
  361. committer: bytes | None = None,
  362. author: bytes | None = None,
  363. commit_timestamp: float | None = None,
  364. commit_timezone: int | None = None,
  365. author_timestamp: float | None = None,
  366. author_timezone: int | None = None,
  367. tree: ObjectID | None = None,
  368. encoding: bytes | None = None,
  369. ref: Ref | None = b"HEAD",
  370. merge_heads: Sequence[ObjectID] | None = None,
  371. no_verify: bool = False,
  372. sign: bool | None = None,
  373. signoff: bool | None = None,
  374. ) -> ObjectID:
  375. """Create a new commit.
  376. If not specified, committer and author default to
  377. get_user_identity(..., 'COMMITTER')
  378. and get_user_identity(..., 'AUTHOR') respectively.
  379. Args:
  380. message: Commit message (bytes or callable that takes (repo, commit)
  381. and returns bytes)
  382. committer: Committer fullname
  383. author: Author fullname
  384. commit_timestamp: Commit timestamp (defaults to now)
  385. commit_timezone: Commit timestamp timezone (defaults to GMT)
  386. author_timestamp: Author timestamp (defaults to commit
  387. timestamp)
  388. author_timezone: Author timestamp timezone
  389. (defaults to commit timestamp timezone)
  390. tree: SHA1 of the tree root to use (if not specified the
  391. current index will be committed).
  392. encoding: Encoding
  393. ref: Optional ref to commit to (defaults to current branch).
  394. If None, creates a dangling commit without updating any ref.
  395. merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
  396. no_verify: Skip pre-commit and commit-msg hooks
  397. sign: GPG Sign the commit (bool, defaults to False,
  398. pass True to use default GPG key,
  399. pass a str containing Key ID to use a specific GPG key)
  400. signoff: Add Signed-off-by line (DCO) to commit message.
  401. If None, uses format.signoff config.
  402. Returns:
  403. New commit SHA1
  404. """
  405. try:
  406. if not no_verify:
  407. self._repo.hooks["pre-commit"].execute()
  408. except HookError as exc:
  409. raise CommitError(exc) from exc
  410. except KeyError: # no hook defined, silent fallthrough
  411. pass
  412. c = Commit()
  413. if tree is None:
  414. index = self._repo.open_index()
  415. c.tree = index.commit(self._repo.object_store)
  416. else:
  417. if len(tree) != 40:
  418. raise ValueError("tree must be a 40-byte hex sha string")
  419. c.tree = tree
  420. config = self._repo.get_config_stack()
  421. if merge_heads is None:
  422. merge_heads = self._repo._read_heads("MERGE_HEAD")
  423. if committer is None:
  424. committer = get_user_identity(config, kind="COMMITTER")
  425. check_user_identity(committer)
  426. c.committer = committer
  427. if commit_timestamp is None:
  428. # FIXME: Support GIT_COMMITTER_DATE environment variable
  429. commit_timestamp = time.time()
  430. c.commit_time = int(commit_timestamp)
  431. if commit_timezone is None:
  432. # FIXME: Use current user timezone rather than UTC
  433. commit_timezone = 0
  434. c.commit_timezone = commit_timezone
  435. if author is None:
  436. author = get_user_identity(config, kind="AUTHOR")
  437. c.author = author
  438. check_user_identity(author)
  439. if author_timestamp is None:
  440. # FIXME: Support GIT_AUTHOR_DATE environment variable
  441. author_timestamp = commit_timestamp
  442. c.author_time = int(author_timestamp)
  443. if author_timezone is None:
  444. author_timezone = commit_timezone
  445. c.author_timezone = author_timezone
  446. if encoding is None:
  447. try:
  448. encoding = config.get(("i18n",), "commitEncoding")
  449. except KeyError:
  450. pass # No dice
  451. if encoding is not None:
  452. c.encoding = encoding
  453. # Store original message (might be callable)
  454. original_message = message
  455. message = None # Will be set later after parents are set
  456. # Check if we should sign the commit
  457. if sign is None:
  458. # Check commit.gpgSign configuration when sign is not explicitly set
  459. try:
  460. should_sign = config.get_boolean(
  461. (b"commit",), b"gpgsign", default=False
  462. )
  463. except KeyError:
  464. should_sign = False # Default to not signing if no config
  465. else:
  466. should_sign = sign
  467. # Get the signing key from config if signing is enabled
  468. keyid = None
  469. if should_sign:
  470. try:
  471. keyid_bytes = config.get((b"user",), b"signingkey")
  472. keyid = keyid_bytes.decode() if keyid_bytes else None
  473. except KeyError:
  474. keyid = None
  475. if ref is None:
  476. # Create a dangling commit
  477. c.parents = merge_heads
  478. else:
  479. try:
  480. old_head = self._repo.refs[ref]
  481. c.parents = [old_head, *merge_heads]
  482. except KeyError:
  483. c.parents = merge_heads
  484. # Handle message after parents are set
  485. if callable(original_message):
  486. message = original_message(self._repo, c)
  487. if message is None:
  488. raise ValueError("Message callback returned None")
  489. else:
  490. message = original_message
  491. if message is None:
  492. # FIXME: Try to read commit message from .git/MERGE_MSG
  493. raise ValueError("No commit message specified")
  494. # Handle signoff
  495. should_signoff = signoff
  496. if should_signoff is None:
  497. # Check format.signOff configuration
  498. try:
  499. should_signoff = config.get_boolean(
  500. (b"format",), b"signoff", default=False
  501. )
  502. except KeyError:
  503. should_signoff = False
  504. if should_signoff:
  505. # Add Signed-off-by trailer
  506. # Get the committer identity for the signoff
  507. signoff_identity = committer
  508. if isinstance(message, bytes):
  509. message_bytes = message
  510. else:
  511. message_bytes = message.encode("utf-8")
  512. message_bytes = add_trailer_to_message(
  513. message_bytes,
  514. "Signed-off-by",
  515. signoff_identity.decode("utf-8")
  516. if isinstance(signoff_identity, bytes)
  517. else signoff_identity,
  518. separator=":",
  519. where="end",
  520. if_exists="addIfDifferentNeighbor",
  521. if_missing="add",
  522. )
  523. message = message_bytes
  524. try:
  525. if no_verify:
  526. c.message = message
  527. else:
  528. c.message = self._repo.hooks["commit-msg"].execute(message)
  529. if c.message is None:
  530. c.message = message
  531. except HookError as exc:
  532. raise CommitError(exc) from exc
  533. except KeyError: # no hook defined, message not modified
  534. c.message = message
  535. if ref is None:
  536. # Create a dangling commit
  537. if should_sign:
  538. c.sign(keyid)
  539. self._repo.object_store.add_object(c)
  540. else:
  541. try:
  542. old_head = self._repo.refs[ref]
  543. if should_sign:
  544. c.sign(keyid)
  545. self._repo.object_store.add_object(c)
  546. message_bytes = (
  547. message.encode() if isinstance(message, str) else message
  548. )
  549. ok = self._repo.refs.set_if_equals(
  550. ref,
  551. old_head,
  552. c.id,
  553. message=b"commit: " + message_bytes,
  554. committer=committer,
  555. timestamp=int(commit_timestamp)
  556. if commit_timestamp is not None
  557. else None,
  558. timezone=commit_timezone,
  559. )
  560. except KeyError:
  561. c.parents = merge_heads
  562. if should_sign:
  563. c.sign(keyid)
  564. self._repo.object_store.add_object(c)
  565. message_bytes = (
  566. message.encode() if isinstance(message, str) else message
  567. )
  568. ok = self._repo.refs.add_if_new(
  569. ref,
  570. c.id,
  571. message=b"commit: " + message_bytes,
  572. committer=committer,
  573. timestamp=int(commit_timestamp)
  574. if commit_timestamp is not None
  575. else None,
  576. timezone=commit_timezone,
  577. )
  578. if not ok:
  579. # Fail if the atomic compare-and-swap failed, leaving the
  580. # commit and all its objects as garbage.
  581. raise CommitError(f"{ref!r} changed during commit")
  582. self._repo._del_named_file("MERGE_HEAD")
  583. try:
  584. self._repo.hooks["post-commit"].execute()
  585. except HookError as e: # silent failure
  586. warnings.warn(f"post-commit hook failed: {e}", UserWarning)
  587. except KeyError: # no hook defined, silent fallthrough
  588. pass
  589. # Trigger auto GC if needed
  590. from .gc import maybe_auto_gc
  591. maybe_auto_gc(self._repo)
  592. return c.id
  593. def reset_index(self, tree: bytes | None = None) -> None:
  594. """Reset the index back to a specific tree.
  595. Args:
  596. tree: Tree SHA to reset to, None for current HEAD tree.
  597. """
  598. from .index import (
  599. build_index_from_tree,
  600. symlink,
  601. validate_path_element_default,
  602. validate_path_element_hfs,
  603. validate_path_element_ntfs,
  604. )
  605. if tree is None:
  606. head = self._repo[b"HEAD"]
  607. if isinstance(head, Tag):
  608. _cls, obj = head.object
  609. head = self._repo.get_object(obj)
  610. from .objects import Commit
  611. assert isinstance(head, Commit)
  612. tree = head.tree
  613. config = self._repo.get_config()
  614. honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
  615. if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
  616. validate_path_element = validate_path_element_ntfs
  617. elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
  618. validate_path_element = validate_path_element_hfs
  619. else:
  620. validate_path_element = validate_path_element_default
  621. if config.get_boolean(b"core", b"symlinks", True):
  622. symlink_fn = symlink
  623. else:
  624. def symlink_fn( # type: ignore[misc,unused-ignore]
  625. src: Union[str, bytes],
  626. dst: Union[str, bytes],
  627. target_is_directory: bool = False,
  628. *,
  629. dir_fd: int | None = None,
  630. ) -> None:
  631. with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f:
  632. f.write(src)
  633. blob_normalizer = self._repo.get_blob_normalizer()
  634. return build_index_from_tree(
  635. self.path,
  636. self._repo.index_path(),
  637. self._repo.object_store,
  638. tree,
  639. honor_filemode=honor_filemode,
  640. validate_path_element=validate_path_element,
  641. symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore]
  642. blob_normalizer=blob_normalizer,
  643. )
  644. def _sparse_checkout_file_path(self) -> str:
  645. """Return the path of the sparse-checkout file in this repo's control dir."""
  646. return os.path.join(self._repo.controldir(), "info", "sparse-checkout")
  647. def configure_for_cone_mode(self) -> None:
  648. """Ensure the repository is configured for cone-mode sparse-checkout."""
  649. config = self._repo.get_config()
  650. config.set((b"core",), b"sparseCheckout", b"true")
  651. config.set((b"core",), b"sparseCheckoutCone", b"true")
  652. config.write_to_path()
  653. def infer_cone_mode(self) -> bool:
  654. """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
  655. config = self._repo.get_config()
  656. try:
  657. sc_cone = config.get((b"core",), b"sparseCheckoutCone")
  658. return sc_cone == b"true"
  659. except KeyError:
  660. # If core.sparseCheckoutCone is not set, default to False
  661. return False
  662. def get_sparse_checkout_patterns(self) -> list[str]:
  663. """Return a list of sparse-checkout patterns from info/sparse-checkout.
  664. Returns:
  665. A list of patterns. Returns an empty list if the file is missing.
  666. """
  667. path = self._sparse_checkout_file_path()
  668. try:
  669. with open(path, encoding="utf-8") as f:
  670. return [line.strip() for line in f if line.strip()]
  671. except FileNotFoundError:
  672. return []
  673. def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
  674. """Write the given sparse-checkout patterns into info/sparse-checkout.
  675. Creates the info/ directory if it does not exist.
  676. Args:
  677. patterns: A list of gitignore-style patterns to store.
  678. """
  679. info_dir = os.path.join(self._repo.controldir(), "info")
  680. os.makedirs(info_dir, exist_ok=True)
  681. path = self._sparse_checkout_file_path()
  682. with open(path, "w", encoding="utf-8") as f:
  683. for pat in patterns:
  684. f.write(pat + "\n")
  685. def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None:
  686. """Write the given cone-mode directory patterns into info/sparse-checkout.
  687. For each directory to include, add an inclusion line that "undoes" the prior
  688. ``!/*/`` 'exclude' that re-includes that directory and everything under it.
  689. Never add the same line twice.
  690. """
  691. patterns = ["/*", "!/*/"]
  692. if dirs:
  693. for d in dirs:
  694. d = d.strip("/")
  695. line = f"/{d}/"
  696. if d and line not in patterns:
  697. patterns.append(line)
  698. self.set_sparse_checkout_patterns(patterns)
  699. def read_worktree_lock_reason(worktree_path: str) -> str | None:
  700. """Read the lock reason for a worktree.
  701. Args:
  702. worktree_path: Path to the worktree's administrative directory
  703. Returns:
  704. The lock reason if the worktree is locked, None otherwise
  705. """
  706. locked_path = os.path.join(worktree_path, "locked")
  707. if not os.path.exists(locked_path):
  708. return None
  709. try:
  710. with open(locked_path) as f:
  711. return f.read().strip()
  712. except (FileNotFoundError, PermissionError):
  713. return None
  714. def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
  715. """List all worktrees for the given repository.
  716. Args:
  717. repo: The repository to list worktrees for
  718. Returns:
  719. A list of WorkTreeInfo objects
  720. """
  721. worktrees = []
  722. # Add main worktree
  723. main_wt_info = WorkTreeInfo(
  724. path=repo.path,
  725. head=repo.head(),
  726. bare=repo.bare,
  727. detached=False,
  728. locked=False,
  729. prunable=False,
  730. )
  731. # Get branch info for main worktree
  732. try:
  733. with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
  734. head_contents = f.read().strip()
  735. if head_contents.startswith(SYMREF):
  736. ref_name = head_contents[len(SYMREF) :].strip()
  737. main_wt_info.branch = ref_name
  738. else:
  739. main_wt_info.detached = True
  740. main_wt_info.branch = None
  741. except (FileNotFoundError, PermissionError):
  742. main_wt_info.branch = None
  743. main_wt_info.detached = True
  744. worktrees.append(main_wt_info)
  745. # List additional worktrees
  746. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  747. if os.path.isdir(worktrees_dir):
  748. for entry in os.listdir(worktrees_dir):
  749. worktree_path = os.path.join(worktrees_dir, entry)
  750. if not os.path.isdir(worktree_path):
  751. continue
  752. wt_info = WorkTreeInfo(
  753. path="", # Will be set below
  754. bare=False,
  755. detached=False,
  756. locked=False,
  757. prunable=False,
  758. )
  759. # Read gitdir to get actual worktree path
  760. gitdir_path = os.path.join(worktree_path, GITDIR)
  761. try:
  762. with open(gitdir_path, "rb") as f:
  763. gitdir_contents = f.read().strip()
  764. # Convert relative path to absolute if needed
  765. wt_path = os.fsdecode(gitdir_contents)
  766. if not os.path.isabs(wt_path):
  767. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  768. wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
  769. except (FileNotFoundError, PermissionError):
  770. # Worktree directory is missing, skip it
  771. # TODO: Consider adding these as prunable worktrees with a placeholder path
  772. continue
  773. # Check if worktree path exists
  774. if wt_info.path and not os.path.exists(wt_info.path):
  775. wt_info.prunable = True
  776. # Read HEAD
  777. head_path = os.path.join(worktree_path, "HEAD")
  778. try:
  779. with open(head_path, "rb") as f:
  780. head_contents = f.read().strip()
  781. if head_contents.startswith(SYMREF):
  782. ref_name = head_contents[len(SYMREF) :].strip()
  783. wt_info.branch = ref_name
  784. # Resolve ref to get commit sha
  785. try:
  786. wt_info.head = repo.refs[ref_name]
  787. except KeyError:
  788. wt_info.head = None
  789. else:
  790. wt_info.detached = True
  791. wt_info.branch = None
  792. wt_info.head = head_contents
  793. except (FileNotFoundError, PermissionError):
  794. wt_info.head = None
  795. wt_info.branch = None
  796. # Check if locked
  797. lock_reason = read_worktree_lock_reason(worktree_path)
  798. if lock_reason is not None:
  799. wt_info.locked = True
  800. wt_info.lock_reason = lock_reason
  801. worktrees.append(wt_info)
  802. return worktrees
  803. def add_worktree(
  804. repo: Repo,
  805. path: str | bytes | os.PathLike[str],
  806. branch: str | bytes | None = None,
  807. commit: ObjectID | None = None,
  808. force: bool = False,
  809. detach: bool = False,
  810. exist_ok: bool = False,
  811. ) -> Repo:
  812. """Add a new worktree to the repository.
  813. Args:
  814. repo: The main repository
  815. path: Path where the new worktree should be created
  816. branch: Branch to checkout in the new worktree (creates if doesn't exist)
  817. commit: Specific commit to checkout (results in detached HEAD)
  818. force: Force creation even if branch is already checked out elsewhere
  819. detach: Detach HEAD in the new worktree
  820. exist_ok: If True, do not raise an error if the directory already exists
  821. Returns:
  822. The newly created worktree repository
  823. Raises:
  824. ValueError: If the path already exists (and exist_ok is False) or branch is already checked out
  825. """
  826. from .repo import Repo as RepoClass
  827. path = os.fspath(path)
  828. if isinstance(path, bytes):
  829. path = os.fsdecode(path)
  830. # Check if path already exists
  831. if os.path.exists(path) and not exist_ok:
  832. raise ValueError(f"Path already exists: {path}")
  833. # Normalize branch name
  834. if branch is not None:
  835. if isinstance(branch, str):
  836. branch = branch.encode()
  837. branch = local_branch_name(branch)
  838. # Check if branch is already checked out in another worktree
  839. if branch and not force:
  840. for wt in list_worktrees(repo):
  841. if wt.branch == branch:
  842. raise ValueError(
  843. f"Branch {branch.decode()} is already checked out at {wt.path}"
  844. )
  845. # Determine what to checkout
  846. if commit is not None:
  847. checkout_ref = commit
  848. detach = True
  849. elif branch is not None:
  850. # Check if branch exists
  851. try:
  852. checkout_ref = repo.refs[branch]
  853. except KeyError:
  854. if commit is None:
  855. # Create new branch from HEAD
  856. checkout_ref = repo.head()
  857. repo.refs[branch] = checkout_ref
  858. else:
  859. # Create new branch from specified commit
  860. checkout_ref = commit
  861. repo.refs[branch] = checkout_ref
  862. else:
  863. # Default to current HEAD
  864. checkout_ref = repo.head()
  865. detach = True
  866. # Create the worktree directory
  867. os.makedirs(path, exist_ok=exist_ok)
  868. # Initialize the worktree
  869. identifier = os.path.basename(path)
  870. wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier)
  871. # Set HEAD appropriately
  872. if detach:
  873. # Detached HEAD - write SHA directly to HEAD
  874. with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
  875. f.write(checkout_ref + b"\n")
  876. else:
  877. # Point to branch
  878. assert branch is not None # Should be guaranteed by logic above
  879. wt_repo.refs.set_symbolic_ref(b"HEAD", branch)
  880. # Reset index to match HEAD
  881. wt_repo.get_worktree().reset_index()
  882. return wt_repo
  883. def remove_worktree(
  884. repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False
  885. ) -> None:
  886. """Remove a worktree.
  887. Args:
  888. repo: The main repository
  889. path: Path to the worktree to remove
  890. force: Force removal even if there are local changes
  891. Raises:
  892. ValueError: If the worktree doesn't exist, has local changes, or is locked
  893. """
  894. path = os.fspath(path)
  895. if isinstance(path, bytes):
  896. path = os.fsdecode(path)
  897. # Don't allow removing the main worktree
  898. if os.path.abspath(path) == os.path.abspath(repo.path):
  899. raise ValueError("Cannot remove the main working tree")
  900. # Find the worktree
  901. worktree_found = False
  902. worktree_id = None
  903. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  904. if os.path.isdir(worktrees_dir):
  905. for entry in os.listdir(worktrees_dir):
  906. worktree_path = os.path.join(worktrees_dir, entry)
  907. gitdir_path = os.path.join(worktree_path, GITDIR)
  908. try:
  909. with open(gitdir_path, "rb") as f:
  910. gitdir_contents = f.read().strip()
  911. wt_path = os.fsdecode(gitdir_contents)
  912. if not os.path.isabs(wt_path):
  913. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  914. wt_dir = os.path.dirname(wt_path) # Remove .git suffix
  915. if os.path.abspath(wt_dir) == os.path.abspath(path):
  916. worktree_found = True
  917. worktree_id = entry
  918. break
  919. except (FileNotFoundError, PermissionError):
  920. continue
  921. if not worktree_found:
  922. raise ValueError(f"Worktree not found: {path}")
  923. assert worktree_id is not None # Should be set if worktree_found is True
  924. worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
  925. # Check if locked
  926. if os.path.exists(os.path.join(worktree_control_dir, "locked")):
  927. if not force:
  928. raise ValueError(f"Worktree is locked: {path}")
  929. # Check for local changes if not forcing
  930. if not force and os.path.exists(path):
  931. # TODO: Check for uncommitted changes in the worktree
  932. pass
  933. # Remove the working directory
  934. if os.path.exists(path):
  935. shutil.rmtree(path)
  936. # Remove the administrative files
  937. shutil.rmtree(worktree_control_dir)
  938. def prune_worktrees(
  939. repo: Repo, expire: int | None = None, dry_run: bool = False
  940. ) -> list[str]:
  941. """Prune worktree administrative files for missing worktrees.
  942. Args:
  943. repo: The main repository
  944. expire: Only prune worktrees older than this many seconds
  945. dry_run: Don't actually remove anything, just report what would be removed
  946. Returns:
  947. List of pruned worktree identifiers
  948. """
  949. pruned: list[str] = []
  950. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  951. if not os.path.isdir(worktrees_dir):
  952. return pruned
  953. current_time = time.time()
  954. for entry in os.listdir(worktrees_dir):
  955. worktree_path = os.path.join(worktrees_dir, entry)
  956. if not os.path.isdir(worktree_path):
  957. continue
  958. # Skip locked worktrees
  959. if os.path.exists(os.path.join(worktree_path, "locked")):
  960. continue
  961. should_prune = False
  962. # Check if gitdir exists and points to valid location
  963. gitdir_path = os.path.join(worktree_path, GITDIR)
  964. try:
  965. with open(gitdir_path, "rb") as f:
  966. gitdir_contents = f.read().strip()
  967. wt_path = os.fsdecode(gitdir_contents)
  968. if not os.path.isabs(wt_path):
  969. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  970. wt_dir = os.path.dirname(wt_path) # Remove .git suffix
  971. if not os.path.exists(wt_dir):
  972. should_prune = True
  973. except (FileNotFoundError, PermissionError):
  974. should_prune = True
  975. # Check expiry time if specified
  976. if should_prune and expire is not None:
  977. stat_info = os.stat(worktree_path)
  978. age = current_time - stat_info.st_mtime
  979. if age < expire:
  980. should_prune = False
  981. if should_prune:
  982. pruned.append(entry)
  983. if not dry_run:
  984. shutil.rmtree(worktree_path)
  985. return pruned
  986. def lock_worktree(
  987. repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None
  988. ) -> None:
  989. """Lock a worktree to prevent it from being pruned.
  990. Args:
  991. repo: The main repository
  992. path: Path to the worktree to lock
  993. reason: Optional reason for locking
  994. """
  995. worktree_id = _find_worktree_id(repo, path)
  996. worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
  997. lock_path = os.path.join(worktree_control_dir, "locked")
  998. with open(lock_path, "w") as f:
  999. if reason:
  1000. f.write(reason)
  1001. def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
  1002. """Unlock a worktree.
  1003. Args:
  1004. repo: The main repository
  1005. path: Path to the worktree to unlock
  1006. """
  1007. worktree_id = _find_worktree_id(repo, path)
  1008. worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
  1009. lock_path = os.path.join(worktree_control_dir, "locked")
  1010. if os.path.exists(lock_path):
  1011. os.remove(lock_path)
  1012. def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str:
  1013. """Find the worktree identifier for the given path.
  1014. Args:
  1015. repo: The main repository
  1016. path: Path to the worktree
  1017. Returns:
  1018. The worktree identifier
  1019. Raises:
  1020. ValueError: If the worktree is not found
  1021. """
  1022. path = os.fspath(path)
  1023. if isinstance(path, bytes):
  1024. path = os.fsdecode(path)
  1025. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  1026. if os.path.isdir(worktrees_dir):
  1027. for entry in os.listdir(worktrees_dir):
  1028. worktree_path = os.path.join(worktrees_dir, entry)
  1029. gitdir_path = os.path.join(worktree_path, GITDIR)
  1030. try:
  1031. with open(gitdir_path, "rb") as f:
  1032. gitdir_contents = f.read().strip()
  1033. wt_path = os.fsdecode(gitdir_contents)
  1034. if not os.path.isabs(wt_path):
  1035. wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
  1036. wt_dir = os.path.dirname(wt_path) # Remove .git suffix
  1037. if os.path.abspath(wt_dir) == os.path.abspath(path):
  1038. return entry
  1039. except (FileNotFoundError, PermissionError):
  1040. continue
  1041. raise ValueError(f"Worktree not found: {path}")
  1042. def move_worktree(
  1043. repo: Repo,
  1044. old_path: str | bytes | os.PathLike[str],
  1045. new_path: str | bytes | os.PathLike[str],
  1046. ) -> None:
  1047. """Move a worktree to a new location.
  1048. Args:
  1049. repo: The main repository
  1050. old_path: Current path of the worktree
  1051. new_path: New path for the worktree
  1052. Raises:
  1053. ValueError: If the worktree doesn't exist or new path already exists
  1054. """
  1055. old_path = os.fspath(old_path)
  1056. new_path = os.fspath(new_path)
  1057. if isinstance(old_path, bytes):
  1058. old_path = os.fsdecode(old_path)
  1059. if isinstance(new_path, bytes):
  1060. new_path = os.fsdecode(new_path)
  1061. # Don't allow moving the main worktree
  1062. if os.path.abspath(old_path) == os.path.abspath(repo.path):
  1063. raise ValueError("Cannot move the main working tree")
  1064. # Check if new path already exists
  1065. if os.path.exists(new_path):
  1066. raise ValueError(f"Path already exists: {new_path}")
  1067. # Find the worktree
  1068. worktree_id = _find_worktree_id(repo, old_path)
  1069. worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
  1070. # Move the actual worktree directory
  1071. shutil.move(old_path, new_path)
  1072. # Update the gitdir file in the worktree
  1073. gitdir_file = os.path.join(new_path, ".git")
  1074. # Update the gitdir pointer in the control directory
  1075. with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
  1076. f.write(os.fsencode(gitdir_file) + b"\n")
  1077. def repair_worktree(
  1078. repo: Repo, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
  1079. ) -> list[str]:
  1080. """Repair worktree administrative files.
  1081. This repairs the connection between worktrees and the main repository
  1082. when they have been moved or become corrupted.
  1083. Args:
  1084. repo: The main repository
  1085. paths: Optional list of worktree paths to repair. If None, repairs
  1086. connections from the main repository to all linked worktrees.
  1087. Returns:
  1088. List of repaired worktree paths
  1089. Raises:
  1090. ValueError: If a specified path is not a valid worktree
  1091. """
  1092. repaired: list[str] = []
  1093. worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
  1094. if paths:
  1095. # Repair specific worktrees
  1096. for path in paths:
  1097. path_str = os.fspath(path)
  1098. if isinstance(path_str, bytes):
  1099. path_str = os.fsdecode(path_str)
  1100. path_str = os.path.abspath(path_str)
  1101. # Check if this is a linked worktree
  1102. gitdir_file = os.path.join(path_str, ".git")
  1103. if not os.path.exists(gitdir_file):
  1104. raise ValueError(f"Not a valid worktree: {path_str}")
  1105. # Read the .git file to get the worktree control directory
  1106. try:
  1107. with open(gitdir_file, "rb") as f:
  1108. gitdir_content = f.read().strip()
  1109. if gitdir_content.startswith(b"gitdir: "):
  1110. worktree_control_path = gitdir_content[8:].decode()
  1111. else:
  1112. raise ValueError(f"Invalid .git file in worktree: {path_str}")
  1113. except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
  1114. raise ValueError(
  1115. f"Cannot read .git file in worktree: {path_str}"
  1116. ) from e
  1117. # Make the path absolute if it's relative
  1118. if not os.path.isabs(worktree_control_path):
  1119. worktree_control_path = os.path.abspath(
  1120. os.path.join(path_str, worktree_control_path)
  1121. )
  1122. # Update the gitdir file in the worktree control directory
  1123. gitdir_pointer = os.path.join(worktree_control_path, GITDIR)
  1124. if os.path.exists(gitdir_pointer):
  1125. # Update to point to the current location
  1126. with open(gitdir_pointer, "wb") as f:
  1127. f.write(os.fsencode(gitdir_file) + b"\n")
  1128. repaired.append(path_str)
  1129. else:
  1130. # Repair from main repository to all linked worktrees
  1131. if not os.path.isdir(worktrees_dir):
  1132. return repaired
  1133. for entry in os.listdir(worktrees_dir):
  1134. worktree_control_path = os.path.join(worktrees_dir, entry)
  1135. if not os.path.isdir(worktree_control_path):
  1136. continue
  1137. # Read the gitdir file to find where the worktree thinks it is
  1138. gitdir_path = os.path.join(worktree_control_path, GITDIR)
  1139. try:
  1140. with open(gitdir_path, "rb") as f:
  1141. gitdir_contents = f.read().strip()
  1142. old_gitdir_location = os.fsdecode(gitdir_contents)
  1143. except (FileNotFoundError, PermissionError):
  1144. # Can't repair if we can't read the gitdir file
  1145. continue
  1146. # Get the worktree directory (remove .git suffix)
  1147. old_worktree_path = os.path.dirname(old_gitdir_location)
  1148. # Check if the .git file exists at the old location
  1149. if os.path.exists(old_gitdir_location):
  1150. # Try to read and update the .git file to ensure it points back correctly
  1151. try:
  1152. with open(old_gitdir_location, "rb") as f:
  1153. content = f.read().strip()
  1154. if content.startswith(b"gitdir: "):
  1155. current_pointer = content[8:].decode()
  1156. if not os.path.isabs(current_pointer):
  1157. current_pointer = os.path.abspath(
  1158. os.path.join(old_worktree_path, current_pointer)
  1159. )
  1160. # If it doesn't point to the right place, fix it
  1161. expected_pointer = worktree_control_path
  1162. if os.path.abspath(current_pointer) != os.path.abspath(
  1163. expected_pointer
  1164. ):
  1165. # Update the .git file to point to the correct location
  1166. with open(old_gitdir_location, "wb") as wf:
  1167. wf.write(
  1168. b"gitdir: "
  1169. + os.fsencode(worktree_control_path)
  1170. + b"\n"
  1171. )
  1172. repaired.append(old_worktree_path)
  1173. except (PermissionError, UnicodeDecodeError):
  1174. continue
  1175. return repaired
  1176. @contextmanager
  1177. def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
  1178. """Create a temporary worktree that is automatically cleaned up.
  1179. Args:
  1180. repo: Dulwich repository object
  1181. prefix: Prefix for the temporary directory name
  1182. Yields:
  1183. Worktree object
  1184. """
  1185. temp_dir = None
  1186. worktree = None
  1187. try:
  1188. # Create temporary directory
  1189. temp_dir = tempfile.mkdtemp(prefix=prefix)
  1190. # Add worktree
  1191. worktree = repo.worktrees.add(temp_dir, exist_ok=True)
  1192. yield worktree
  1193. finally:
  1194. # Clean up worktree registration
  1195. if worktree:
  1196. repo.worktrees.remove(worktree.path)
  1197. # Clean up temporary directory
  1198. if temp_dir and Path(temp_dir).exists():
  1199. shutil.rmtree(temp_dir)