| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413 |
- # worktree.py -- Working tree operations for Git repositories
- # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Working tree operations for Git repositories."""
- from __future__ import annotations
- import builtins
- import os
- import shutil
- import stat
- import sys
- import tempfile
- import time
- import warnings
- from collections.abc import Iterable, Iterator, Sequence
- from contextlib import contextmanager
- from pathlib import Path
- from typing import Any, Callable, Union
- from .errors import CommitError, HookError
- from .objects import Blob, Commit, ObjectID, Tag, Tree
- from .refs import SYMREF, Ref, local_branch_name
- from .repo import (
- GITDIR,
- WORKTREES,
- Repo,
- check_user_identity,
- get_user_identity,
- )
- from .trailers import add_trailer_to_message
- class WorkTreeInfo:
- """Information about a single worktree.
- Attributes:
- path: Path to the worktree
- head: Current HEAD commit SHA
- branch: Current branch (if not detached)
- bare: Whether this is a bare repository
- detached: Whether HEAD is detached
- locked: Whether the worktree is locked
- prunable: Whether the worktree can be pruned
- lock_reason: Reason for locking (if locked)
- """
- def __init__(
- self,
- path: str,
- head: bytes | None = None,
- branch: bytes | None = None,
- bare: bool = False,
- detached: bool = False,
- locked: bool = False,
- prunable: bool = False,
- lock_reason: str | None = None,
- ):
- """Initialize WorkTreeInfo.
- Args:
- path: Path to the worktree
- head: Current HEAD commit SHA
- branch: Current branch (if not detached)
- bare: Whether this is a bare repository
- detached: Whether HEAD is detached
- locked: Whether the worktree is locked
- prunable: Whether the worktree can be pruned
- lock_reason: Reason for locking (if locked)
- """
- self.path = path
- self.head = head
- self.branch = branch
- self.bare = bare
- self.detached = detached
- self.locked = locked
- self.prunable = prunable
- self.lock_reason = lock_reason
- def __repr__(self) -> str:
- """Return string representation of WorkTreeInfo."""
- return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
- def __eq__(self, other: object) -> bool:
- """Check equality with another WorkTreeInfo."""
- if not isinstance(other, WorkTreeInfo):
- return NotImplemented
- return (
- self.path == other.path
- and self.head == other.head
- and self.branch == other.branch
- and self.bare == other.bare
- and self.detached == other.detached
- and self.locked == other.locked
- and self.prunable == other.prunable
- and self.lock_reason == other.lock_reason
- )
- def open(self) -> WorkTree:
- """Open this worktree as a WorkTree.
- Returns:
- WorkTree object for this worktree
- Raises:
- NotGitRepository: If the worktree path is invalid
- """
- from .repo import Repo
- repo = Repo(self.path)
- return WorkTree(repo, self.path)
- class WorkTreeContainer:
- """Container for managing multiple working trees.
- This class manages worktrees for a repository, similar to how
- RefsContainer manages references.
- """
- def __init__(self, repo: Repo) -> None:
- """Initialize a WorkTreeContainer for the given repository.
- Args:
- repo: The repository this container belongs to
- """
- self._repo = repo
- def list(self) -> list[WorkTreeInfo]:
- """List all worktrees for this repository.
- Returns:
- A list of WorkTreeInfo objects
- """
- return list_worktrees(self._repo)
- def add(
- self,
- path: str | bytes | os.PathLike[str],
- branch: str | bytes | None = None,
- commit: ObjectID | None = None,
- force: bool = False,
- detach: bool = False,
- exist_ok: bool = False,
- ) -> Repo:
- """Add a new worktree.
- Args:
- path: Path where the new worktree should be created
- branch: Branch to checkout in the new worktree
- commit: Specific commit to checkout (results in detached HEAD)
- force: Force creation even if branch is already checked out elsewhere
- detach: Detach HEAD in the new worktree
- exist_ok: If True, do not raise an error if the directory already exists
- Returns:
- The newly created worktree repository
- """
- return add_worktree(
- self._repo,
- path,
- branch=branch,
- commit=commit,
- force=force,
- detach=detach,
- exist_ok=exist_ok,
- )
- def remove(self, path: str | bytes | os.PathLike[str], force: bool = False) -> None:
- """Remove a worktree.
- Args:
- path: Path to the worktree to remove
- force: Force removal even if there are local changes
- """
- remove_worktree(self._repo, path, force=force)
- def prune(
- self, expire: int | None = None, dry_run: bool = False
- ) -> builtins.list[str]:
- """Prune worktree administrative files for missing worktrees.
- Args:
- expire: Only prune worktrees older than this many seconds
- dry_run: Don't actually remove anything, just report what would be removed
- Returns:
- List of pruned worktree identifiers
- """
- return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
- def move(
- self,
- old_path: str | bytes | os.PathLike[str],
- new_path: str | bytes | os.PathLike[str],
- ) -> None:
- """Move a worktree to a new location.
- Args:
- old_path: Current path of the worktree
- new_path: New path for the worktree
- """
- move_worktree(self._repo, old_path, new_path)
- def lock(
- self, path: str | bytes | os.PathLike[str], reason: str | None = None
- ) -> None:
- """Lock a worktree to prevent it from being pruned.
- Args:
- path: Path to the worktree to lock
- reason: Optional reason for locking
- """
- lock_worktree(self._repo, path, reason=reason)
- def unlock(self, path: str | bytes | os.PathLike[str]) -> None:
- """Unlock a worktree.
- Args:
- path: Path to the worktree to unlock
- """
- unlock_worktree(self._repo, path)
- def repair(
- self, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
- ) -> builtins.list[str]:
- """Repair worktree administrative files.
- Args:
- paths: Optional list of worktree paths to repair. If None, repairs
- connections from the main repository to all linked worktrees.
- Returns:
- List of repaired worktree paths
- """
- return repair_worktree(self._repo, paths=paths)
- def __iter__(self) -> Iterator[WorkTreeInfo]:
- """Iterate over all worktrees."""
- yield from self.list()
- class WorkTree:
- """Working tree operations for a Git repository.
- This class provides methods for working with the working tree,
- such as staging files, committing changes, and resetting the index.
- """
- def __init__(self, repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
- """Initialize a WorkTree for the given repository.
- Args:
- repo: The repository this working tree belongs to
- path: Path to the working tree directory
- """
- self._repo = repo
- raw_path = os.fspath(path)
- if isinstance(raw_path, bytes):
- self.path: str = os.fsdecode(raw_path)
- else:
- self.path = raw_path
- self.path = os.path.abspath(self.path)
- def stage(
- self,
- fs_paths: str
- | bytes
- | os.PathLike[str]
- | Iterable[str | bytes | os.PathLike[str]],
- ) -> None:
- """Stage a set of paths.
- Args:
- fs_paths: List of paths, relative to the repository path
- """
- root_path_bytes = os.fsencode(self.path)
- if isinstance(fs_paths, (str, bytes, os.PathLike)):
- fs_paths = [fs_paths]
- fs_paths = list(fs_paths)
- from .index import (
- _fs_to_tree_path,
- blob_from_path_and_stat,
- index_entry_from_directory,
- index_entry_from_stat,
- )
- index = self._repo.open_index()
- blob_normalizer = self._repo.get_blob_normalizer()
- for fs_path in fs_paths:
- if not isinstance(fs_path, bytes):
- fs_path = os.fsencode(fs_path)
- if os.path.isabs(fs_path):
- raise ValueError(
- f"path {fs_path!r} should be relative to "
- "repository root, not absolute"
- )
- tree_path = _fs_to_tree_path(fs_path)
- full_path = os.path.join(root_path_bytes, fs_path)
- try:
- st = os.lstat(full_path)
- except (FileNotFoundError, NotADirectoryError):
- # File no longer exists
- try:
- del index[tree_path]
- except KeyError:
- pass # already removed
- else:
- if stat.S_ISDIR(st.st_mode):
- entry = index_entry_from_directory(st, full_path)
- if entry:
- index[tree_path] = entry
- else:
- try:
- del index[tree_path]
- except KeyError:
- pass
- elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
- try:
- del index[tree_path]
- except KeyError:
- pass
- else:
- blob = blob_from_path_and_stat(full_path, st)
- blob = blob_normalizer.checkin_normalize(blob, fs_path)
- self._repo.object_store.add_object(blob)
- index[tree_path] = index_entry_from_stat(st, blob.id)
- index.write()
- def unstage(self, fs_paths: Sequence[str]) -> None:
- """Unstage specific file in the index.
- Args:
- fs_paths: a list of files to unstage,
- relative to the repository path.
- """
- from .index import IndexEntry, _fs_to_tree_path
- index = self._repo.open_index()
- try:
- commit = self._repo[b"HEAD"]
- except KeyError:
- # no head mean no commit in the repo
- for fs_path in fs_paths:
- tree_path = _fs_to_tree_path(fs_path)
- del index[tree_path]
- index.write()
- return
- else:
- assert isinstance(commit, Commit), "HEAD must be a commit"
- tree_id = commit.tree
- for fs_path in fs_paths:
- tree_path = _fs_to_tree_path(fs_path)
- try:
- tree = self._repo.object_store[tree_id]
- assert isinstance(tree, Tree)
- tree_entry = tree.lookup_path(
- self._repo.object_store.__getitem__, tree_path
- )
- except KeyError:
- # if tree_entry didn't exist, this file was being added, so
- # remove index entry
- try:
- del index[tree_path]
- continue
- except KeyError as exc:
- raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
- st = None
- try:
- st = os.lstat(os.path.join(self.path, fs_path))
- except FileNotFoundError:
- pass
- blob_obj = self._repo[tree_entry[1]]
- assert isinstance(blob_obj, Blob)
- blob_size = len(blob_obj.data)
- index_entry = IndexEntry(
- ctime=(commit.commit_time, 0),
- mtime=(commit.commit_time, 0),
- dev=st.st_dev if st else 0,
- ino=st.st_ino if st else 0,
- mode=tree_entry[0],
- uid=st.st_uid if st else 0,
- gid=st.st_gid if st else 0,
- size=blob_size,
- sha=tree_entry[1],
- flags=0,
- extended_flags=0,
- )
- index[tree_path] = index_entry
- index.write()
- def commit(
- self,
- message: Union[str, bytes, Callable[[Any, Commit], bytes], None] = None,
- committer: bytes | None = None,
- author: bytes | None = None,
- commit_timestamp: float | None = None,
- commit_timezone: int | None = None,
- author_timestamp: float | None = None,
- author_timezone: int | None = None,
- tree: ObjectID | None = None,
- encoding: bytes | None = None,
- ref: Ref | None = b"HEAD",
- merge_heads: Sequence[ObjectID] | None = None,
- no_verify: bool = False,
- sign: bool | None = None,
- signoff: bool | None = None,
- ) -> ObjectID:
- """Create a new commit.
- If not specified, committer and author default to
- get_user_identity(..., 'COMMITTER')
- and get_user_identity(..., 'AUTHOR') respectively.
- Args:
- message: Commit message (bytes or callable that takes (repo, commit)
- and returns bytes)
- committer: Committer fullname
- author: Author fullname
- commit_timestamp: Commit timestamp (defaults to now)
- commit_timezone: Commit timestamp timezone (defaults to GMT)
- author_timestamp: Author timestamp (defaults to commit
- timestamp)
- author_timezone: Author timestamp timezone
- (defaults to commit timestamp timezone)
- tree: SHA1 of the tree root to use (if not specified the
- current index will be committed).
- encoding: Encoding
- ref: Optional ref to commit to (defaults to current branch).
- If None, creates a dangling commit without updating any ref.
- merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
- no_verify: Skip pre-commit and commit-msg hooks
- sign: GPG Sign the commit (bool, defaults to False,
- pass True to use default GPG key,
- pass a str containing Key ID to use a specific GPG key)
- signoff: Add Signed-off-by line (DCO) to commit message.
- If None, uses format.signoff config.
- Returns:
- New commit SHA1
- """
- try:
- if not no_verify:
- self._repo.hooks["pre-commit"].execute()
- except HookError as exc:
- raise CommitError(exc) from exc
- except KeyError: # no hook defined, silent fallthrough
- pass
- c = Commit()
- if tree is None:
- index = self._repo.open_index()
- c.tree = index.commit(self._repo.object_store)
- else:
- if len(tree) != 40:
- raise ValueError("tree must be a 40-byte hex sha string")
- c.tree = tree
- config = self._repo.get_config_stack()
- if merge_heads is None:
- merge_heads = self._repo._read_heads("MERGE_HEAD")
- if committer is None:
- committer = get_user_identity(config, kind="COMMITTER")
- check_user_identity(committer)
- c.committer = committer
- if commit_timestamp is None:
- # FIXME: Support GIT_COMMITTER_DATE environment variable
- commit_timestamp = time.time()
- c.commit_time = int(commit_timestamp)
- if commit_timezone is None:
- # FIXME: Use current user timezone rather than UTC
- commit_timezone = 0
- c.commit_timezone = commit_timezone
- if author is None:
- author = get_user_identity(config, kind="AUTHOR")
- c.author = author
- check_user_identity(author)
- if author_timestamp is None:
- # FIXME: Support GIT_AUTHOR_DATE environment variable
- author_timestamp = commit_timestamp
- c.author_time = int(author_timestamp)
- if author_timezone is None:
- author_timezone = commit_timezone
- c.author_timezone = author_timezone
- if encoding is None:
- try:
- encoding = config.get(("i18n",), "commitEncoding")
- except KeyError:
- pass # No dice
- if encoding is not None:
- c.encoding = encoding
- # Store original message (might be callable)
- original_message = message
- message = None # Will be set later after parents are set
- # Check if we should sign the commit
- if sign is None:
- # Check commit.gpgSign configuration when sign is not explicitly set
- try:
- should_sign = config.get_boolean(
- (b"commit",), b"gpgsign", default=False
- )
- except KeyError:
- should_sign = False # Default to not signing if no config
- else:
- should_sign = sign
- # Get the signing key from config if signing is enabled
- keyid = None
- if should_sign:
- try:
- keyid_bytes = config.get((b"user",), b"signingkey")
- keyid = keyid_bytes.decode() if keyid_bytes else None
- except KeyError:
- keyid = None
- if ref is None:
- # Create a dangling commit
- c.parents = merge_heads
- else:
- try:
- old_head = self._repo.refs[ref]
- c.parents = [old_head, *merge_heads]
- except KeyError:
- c.parents = merge_heads
- # Handle message after parents are set
- if callable(original_message):
- message = original_message(self._repo, c)
- if message is None:
- raise ValueError("Message callback returned None")
- else:
- message = original_message
- if message is None:
- # FIXME: Try to read commit message from .git/MERGE_MSG
- raise ValueError("No commit message specified")
- # Handle signoff
- should_signoff = signoff
- if should_signoff is None:
- # Check format.signOff configuration
- try:
- should_signoff = config.get_boolean(
- (b"format",), b"signoff", default=False
- )
- except KeyError:
- should_signoff = False
- if should_signoff:
- # Add Signed-off-by trailer
- # Get the committer identity for the signoff
- signoff_identity = committer
- if isinstance(message, bytes):
- message_bytes = message
- else:
- message_bytes = message.encode("utf-8")
- message_bytes = add_trailer_to_message(
- message_bytes,
- "Signed-off-by",
- signoff_identity.decode("utf-8")
- if isinstance(signoff_identity, bytes)
- else signoff_identity,
- separator=":",
- where="end",
- if_exists="addIfDifferentNeighbor",
- if_missing="add",
- )
- message = message_bytes
- try:
- if no_verify:
- c.message = message
- else:
- c.message = self._repo.hooks["commit-msg"].execute(message)
- if c.message is None:
- c.message = message
- except HookError as exc:
- raise CommitError(exc) from exc
- except KeyError: # no hook defined, message not modified
- c.message = message
- if ref is None:
- # Create a dangling commit
- if should_sign:
- c.sign(keyid)
- self._repo.object_store.add_object(c)
- else:
- try:
- old_head = self._repo.refs[ref]
- if should_sign:
- c.sign(keyid)
- self._repo.object_store.add_object(c)
- message_bytes = (
- message.encode() if isinstance(message, str) else message
- )
- ok = self._repo.refs.set_if_equals(
- ref,
- old_head,
- c.id,
- message=b"commit: " + message_bytes,
- committer=committer,
- timestamp=int(commit_timestamp)
- if commit_timestamp is not None
- else None,
- timezone=commit_timezone,
- )
- except KeyError:
- c.parents = merge_heads
- if should_sign:
- c.sign(keyid)
- self._repo.object_store.add_object(c)
- message_bytes = (
- message.encode() if isinstance(message, str) else message
- )
- ok = self._repo.refs.add_if_new(
- ref,
- c.id,
- message=b"commit: " + message_bytes,
- committer=committer,
- timestamp=int(commit_timestamp)
- if commit_timestamp is not None
- else None,
- timezone=commit_timezone,
- )
- if not ok:
- # Fail if the atomic compare-and-swap failed, leaving the
- # commit and all its objects as garbage.
- raise CommitError(f"{ref!r} changed during commit")
- self._repo._del_named_file("MERGE_HEAD")
- try:
- self._repo.hooks["post-commit"].execute()
- except HookError as e: # silent failure
- warnings.warn(f"post-commit hook failed: {e}", UserWarning)
- except KeyError: # no hook defined, silent fallthrough
- pass
- # Trigger auto GC if needed
- from .gc import maybe_auto_gc
- maybe_auto_gc(self._repo)
- return c.id
- def reset_index(self, tree: bytes | None = None) -> None:
- """Reset the index back to a specific tree.
- Args:
- tree: Tree SHA to reset to, None for current HEAD tree.
- """
- from .index import (
- build_index_from_tree,
- symlink,
- validate_path_element_default,
- validate_path_element_hfs,
- validate_path_element_ntfs,
- )
- if tree is None:
- head = self._repo[b"HEAD"]
- if isinstance(head, Tag):
- _cls, obj = head.object
- head = self._repo.get_object(obj)
- from .objects import Commit
- assert isinstance(head, Commit)
- tree = head.tree
- config = self._repo.get_config()
- honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
- if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
- validate_path_element = validate_path_element_ntfs
- elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
- validate_path_element = validate_path_element_hfs
- else:
- validate_path_element = validate_path_element_default
- if config.get_boolean(b"core", b"symlinks", True):
- symlink_fn = symlink
- else:
- def symlink_fn( # type: ignore[misc,unused-ignore]
- src: Union[str, bytes],
- dst: Union[str, bytes],
- target_is_directory: bool = False,
- *,
- dir_fd: int | None = None,
- ) -> None:
- with open(dst, "w" + ("b" if isinstance(src, bytes) else "")) as f:
- f.write(src)
- blob_normalizer = self._repo.get_blob_normalizer()
- return build_index_from_tree(
- self.path,
- self._repo.index_path(),
- self._repo.object_store,
- tree,
- honor_filemode=honor_filemode,
- validate_path_element=validate_path_element,
- symlink_fn=symlink_fn, # type: ignore[arg-type,unused-ignore]
- blob_normalizer=blob_normalizer,
- )
- def _sparse_checkout_file_path(self) -> str:
- """Return the path of the sparse-checkout file in this repo's control dir."""
- return os.path.join(self._repo.controldir(), "info", "sparse-checkout")
- def configure_for_cone_mode(self) -> None:
- """Ensure the repository is configured for cone-mode sparse-checkout."""
- config = self._repo.get_config()
- config.set((b"core",), b"sparseCheckout", b"true")
- config.set((b"core",), b"sparseCheckoutCone", b"true")
- config.write_to_path()
- def infer_cone_mode(self) -> bool:
- """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
- config = self._repo.get_config()
- try:
- sc_cone = config.get((b"core",), b"sparseCheckoutCone")
- return sc_cone == b"true"
- except KeyError:
- # If core.sparseCheckoutCone is not set, default to False
- return False
- def get_sparse_checkout_patterns(self) -> list[str]:
- """Return a list of sparse-checkout patterns from info/sparse-checkout.
- Returns:
- A list of patterns. Returns an empty list if the file is missing.
- """
- path = self._sparse_checkout_file_path()
- try:
- with open(path, encoding="utf-8") as f:
- return [line.strip() for line in f if line.strip()]
- except FileNotFoundError:
- return []
- def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
- """Write the given sparse-checkout patterns into info/sparse-checkout.
- Creates the info/ directory if it does not exist.
- Args:
- patterns: A list of gitignore-style patterns to store.
- """
- info_dir = os.path.join(self._repo.controldir(), "info")
- os.makedirs(info_dir, exist_ok=True)
- path = self._sparse_checkout_file_path()
- with open(path, "w", encoding="utf-8") as f:
- for pat in patterns:
- f.write(pat + "\n")
- def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None:
- """Write the given cone-mode directory patterns into info/sparse-checkout.
- For each directory to include, add an inclusion line that "undoes" the prior
- ``!/*/`` 'exclude' that re-includes that directory and everything under it.
- Never add the same line twice.
- """
- patterns = ["/*", "!/*/"]
- if dirs:
- for d in dirs:
- d = d.strip("/")
- line = f"/{d}/"
- if d and line not in patterns:
- patterns.append(line)
- self.set_sparse_checkout_patterns(patterns)
- def read_worktree_lock_reason(worktree_path: str) -> str | None:
- """Read the lock reason for a worktree.
- Args:
- worktree_path: Path to the worktree's administrative directory
- Returns:
- The lock reason if the worktree is locked, None otherwise
- """
- locked_path = os.path.join(worktree_path, "locked")
- if not os.path.exists(locked_path):
- return None
- try:
- with open(locked_path) as f:
- return f.read().strip()
- except (FileNotFoundError, PermissionError):
- return None
- def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
- """List all worktrees for the given repository.
- Args:
- repo: The repository to list worktrees for
- Returns:
- A list of WorkTreeInfo objects
- """
- worktrees = []
- # Add main worktree
- main_wt_info = WorkTreeInfo(
- path=repo.path,
- head=repo.head(),
- bare=repo.bare,
- detached=False,
- locked=False,
- prunable=False,
- )
- # Get branch info for main worktree
- try:
- with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
- head_contents = f.read().strip()
- if head_contents.startswith(SYMREF):
- ref_name = head_contents[len(SYMREF) :].strip()
- main_wt_info.branch = ref_name
- else:
- main_wt_info.detached = True
- main_wt_info.branch = None
- except (FileNotFoundError, PermissionError):
- main_wt_info.branch = None
- main_wt_info.detached = True
- worktrees.append(main_wt_info)
- # List additional worktrees
- worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
- if os.path.isdir(worktrees_dir):
- for entry in os.listdir(worktrees_dir):
- worktree_path = os.path.join(worktrees_dir, entry)
- if not os.path.isdir(worktree_path):
- continue
- wt_info = WorkTreeInfo(
- path="", # Will be set below
- bare=False,
- detached=False,
- locked=False,
- prunable=False,
- )
- # Read gitdir to get actual worktree path
- gitdir_path = os.path.join(worktree_path, GITDIR)
- try:
- with open(gitdir_path, "rb") as f:
- gitdir_contents = f.read().strip()
- # Convert relative path to absolute if needed
- wt_path = os.fsdecode(gitdir_contents)
- if not os.path.isabs(wt_path):
- wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
- wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
- except (FileNotFoundError, PermissionError):
- # Worktree directory is missing, skip it
- # TODO: Consider adding these as prunable worktrees with a placeholder path
- continue
- # Check if worktree path exists
- if wt_info.path and not os.path.exists(wt_info.path):
- wt_info.prunable = True
- # Read HEAD
- head_path = os.path.join(worktree_path, "HEAD")
- try:
- with open(head_path, "rb") as f:
- head_contents = f.read().strip()
- if head_contents.startswith(SYMREF):
- ref_name = head_contents[len(SYMREF) :].strip()
- wt_info.branch = ref_name
- # Resolve ref to get commit sha
- try:
- wt_info.head = repo.refs[ref_name]
- except KeyError:
- wt_info.head = None
- else:
- wt_info.detached = True
- wt_info.branch = None
- wt_info.head = head_contents
- except (FileNotFoundError, PermissionError):
- wt_info.head = None
- wt_info.branch = None
- # Check if locked
- lock_reason = read_worktree_lock_reason(worktree_path)
- if lock_reason is not None:
- wt_info.locked = True
- wt_info.lock_reason = lock_reason
- worktrees.append(wt_info)
- return worktrees
- def add_worktree(
- repo: Repo,
- path: str | bytes | os.PathLike[str],
- branch: str | bytes | None = None,
- commit: ObjectID | None = None,
- force: bool = False,
- detach: bool = False,
- exist_ok: bool = False,
- ) -> Repo:
- """Add a new worktree to the repository.
- Args:
- repo: The main repository
- path: Path where the new worktree should be created
- branch: Branch to checkout in the new worktree (creates if doesn't exist)
- commit: Specific commit to checkout (results in detached HEAD)
- force: Force creation even if branch is already checked out elsewhere
- detach: Detach HEAD in the new worktree
- exist_ok: If True, do not raise an error if the directory already exists
- Returns:
- The newly created worktree repository
- Raises:
- ValueError: If the path already exists (and exist_ok is False) or branch is already checked out
- """
- from .repo import Repo as RepoClass
- path = os.fspath(path)
- if isinstance(path, bytes):
- path = os.fsdecode(path)
- # Check if path already exists
- if os.path.exists(path) and not exist_ok:
- raise ValueError(f"Path already exists: {path}")
- # Normalize branch name
- if branch is not None:
- if isinstance(branch, str):
- branch = branch.encode()
- branch = local_branch_name(branch)
- # Check if branch is already checked out in another worktree
- if branch and not force:
- for wt in list_worktrees(repo):
- if wt.branch == branch:
- raise ValueError(
- f"Branch {branch.decode()} is already checked out at {wt.path}"
- )
- # Determine what to checkout
- if commit is not None:
- checkout_ref = commit
- detach = True
- elif branch is not None:
- # Check if branch exists
- try:
- checkout_ref = repo.refs[branch]
- except KeyError:
- if commit is None:
- # Create new branch from HEAD
- checkout_ref = repo.head()
- repo.refs[branch] = checkout_ref
- else:
- # Create new branch from specified commit
- checkout_ref = commit
- repo.refs[branch] = checkout_ref
- else:
- # Default to current HEAD
- checkout_ref = repo.head()
- detach = True
- # Create the worktree directory
- os.makedirs(path, exist_ok=exist_ok)
- # Initialize the worktree
- identifier = os.path.basename(path)
- wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier)
- # Set HEAD appropriately
- if detach:
- # Detached HEAD - write SHA directly to HEAD
- with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
- f.write(checkout_ref + b"\n")
- else:
- # Point to branch
- assert branch is not None # Should be guaranteed by logic above
- wt_repo.refs.set_symbolic_ref(b"HEAD", branch)
- # Reset index to match HEAD
- wt_repo.get_worktree().reset_index()
- return wt_repo
- def remove_worktree(
- repo: Repo, path: str | bytes | os.PathLike[str], force: bool = False
- ) -> None:
- """Remove a worktree.
- Args:
- repo: The main repository
- path: Path to the worktree to remove
- force: Force removal even if there are local changes
- Raises:
- ValueError: If the worktree doesn't exist, has local changes, or is locked
- """
- path = os.fspath(path)
- if isinstance(path, bytes):
- path = os.fsdecode(path)
- # Don't allow removing the main worktree
- if os.path.abspath(path) == os.path.abspath(repo.path):
- raise ValueError("Cannot remove the main working tree")
- # Find the worktree
- worktree_found = False
- worktree_id = None
- worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
- if os.path.isdir(worktrees_dir):
- for entry in os.listdir(worktrees_dir):
- worktree_path = os.path.join(worktrees_dir, entry)
- gitdir_path = os.path.join(worktree_path, GITDIR)
- try:
- with open(gitdir_path, "rb") as f:
- gitdir_contents = f.read().strip()
- wt_path = os.fsdecode(gitdir_contents)
- if not os.path.isabs(wt_path):
- wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
- wt_dir = os.path.dirname(wt_path) # Remove .git suffix
- if os.path.abspath(wt_dir) == os.path.abspath(path):
- worktree_found = True
- worktree_id = entry
- break
- except (FileNotFoundError, PermissionError):
- continue
- if not worktree_found:
- raise ValueError(f"Worktree not found: {path}")
- assert worktree_id is not None # Should be set if worktree_found is True
- worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
- # Check if locked
- if os.path.exists(os.path.join(worktree_control_dir, "locked")):
- if not force:
- raise ValueError(f"Worktree is locked: {path}")
- # Check for local changes if not forcing
- if not force and os.path.exists(path):
- # TODO: Check for uncommitted changes in the worktree
- pass
- # Remove the working directory
- if os.path.exists(path):
- shutil.rmtree(path)
- # Remove the administrative files
- shutil.rmtree(worktree_control_dir)
- def prune_worktrees(
- repo: Repo, expire: int | None = None, dry_run: bool = False
- ) -> list[str]:
- """Prune worktree administrative files for missing worktrees.
- Args:
- repo: The main repository
- expire: Only prune worktrees older than this many seconds
- dry_run: Don't actually remove anything, just report what would be removed
- Returns:
- List of pruned worktree identifiers
- """
- pruned: list[str] = []
- worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
- if not os.path.isdir(worktrees_dir):
- return pruned
- current_time = time.time()
- for entry in os.listdir(worktrees_dir):
- worktree_path = os.path.join(worktrees_dir, entry)
- if not os.path.isdir(worktree_path):
- continue
- # Skip locked worktrees
- if os.path.exists(os.path.join(worktree_path, "locked")):
- continue
- should_prune = False
- # Check if gitdir exists and points to valid location
- gitdir_path = os.path.join(worktree_path, GITDIR)
- try:
- with open(gitdir_path, "rb") as f:
- gitdir_contents = f.read().strip()
- wt_path = os.fsdecode(gitdir_contents)
- if not os.path.isabs(wt_path):
- wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
- wt_dir = os.path.dirname(wt_path) # Remove .git suffix
- if not os.path.exists(wt_dir):
- should_prune = True
- except (FileNotFoundError, PermissionError):
- should_prune = True
- # Check expiry time if specified
- if should_prune and expire is not None:
- stat_info = os.stat(worktree_path)
- age = current_time - stat_info.st_mtime
- if age < expire:
- should_prune = False
- if should_prune:
- pruned.append(entry)
- if not dry_run:
- shutil.rmtree(worktree_path)
- return pruned
- def lock_worktree(
- repo: Repo, path: str | bytes | os.PathLike[str], reason: str | None = None
- ) -> None:
- """Lock a worktree to prevent it from being pruned.
- Args:
- repo: The main repository
- path: Path to the worktree to lock
- reason: Optional reason for locking
- """
- worktree_id = _find_worktree_id(repo, path)
- worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
- lock_path = os.path.join(worktree_control_dir, "locked")
- with open(lock_path, "w") as f:
- if reason:
- f.write(reason)
- def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike[str]) -> None:
- """Unlock a worktree.
- Args:
- repo: The main repository
- path: Path to the worktree to unlock
- """
- worktree_id = _find_worktree_id(repo, path)
- worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
- lock_path = os.path.join(worktree_control_dir, "locked")
- if os.path.exists(lock_path):
- os.remove(lock_path)
- def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike[str]) -> str:
- """Find the worktree identifier for the given path.
- Args:
- repo: The main repository
- path: Path to the worktree
- Returns:
- The worktree identifier
- Raises:
- ValueError: If the worktree is not found
- """
- path = os.fspath(path)
- if isinstance(path, bytes):
- path = os.fsdecode(path)
- worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
- if os.path.isdir(worktrees_dir):
- for entry in os.listdir(worktrees_dir):
- worktree_path = os.path.join(worktrees_dir, entry)
- gitdir_path = os.path.join(worktree_path, GITDIR)
- try:
- with open(gitdir_path, "rb") as f:
- gitdir_contents = f.read().strip()
- wt_path = os.fsdecode(gitdir_contents)
- if not os.path.isabs(wt_path):
- wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
- wt_dir = os.path.dirname(wt_path) # Remove .git suffix
- if os.path.abspath(wt_dir) == os.path.abspath(path):
- return entry
- except (FileNotFoundError, PermissionError):
- continue
- raise ValueError(f"Worktree not found: {path}")
- def move_worktree(
- repo: Repo,
- old_path: str | bytes | os.PathLike[str],
- new_path: str | bytes | os.PathLike[str],
- ) -> None:
- """Move a worktree to a new location.
- Args:
- repo: The main repository
- old_path: Current path of the worktree
- new_path: New path for the worktree
- Raises:
- ValueError: If the worktree doesn't exist or new path already exists
- """
- old_path = os.fspath(old_path)
- new_path = os.fspath(new_path)
- if isinstance(old_path, bytes):
- old_path = os.fsdecode(old_path)
- if isinstance(new_path, bytes):
- new_path = os.fsdecode(new_path)
- # Don't allow moving the main worktree
- if os.path.abspath(old_path) == os.path.abspath(repo.path):
- raise ValueError("Cannot move the main working tree")
- # Check if new path already exists
- if os.path.exists(new_path):
- raise ValueError(f"Path already exists: {new_path}")
- # Find the worktree
- worktree_id = _find_worktree_id(repo, old_path)
- worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
- # Move the actual worktree directory
- shutil.move(old_path, new_path)
- # Update the gitdir file in the worktree
- gitdir_file = os.path.join(new_path, ".git")
- # Update the gitdir pointer in the control directory
- with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
- f.write(os.fsencode(gitdir_file) + b"\n")
- def repair_worktree(
- repo: Repo, paths: Sequence[str | bytes | os.PathLike[str]] | None = None
- ) -> list[str]:
- """Repair worktree administrative files.
- This repairs the connection between worktrees and the main repository
- when they have been moved or become corrupted.
- Args:
- repo: The main repository
- paths: Optional list of worktree paths to repair. If None, repairs
- connections from the main repository to all linked worktrees.
- Returns:
- List of repaired worktree paths
- Raises:
- ValueError: If a specified path is not a valid worktree
- """
- repaired: list[str] = []
- worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
- if paths:
- # Repair specific worktrees
- for path in paths:
- path_str = os.fspath(path)
- if isinstance(path_str, bytes):
- path_str = os.fsdecode(path_str)
- path_str = os.path.abspath(path_str)
- # Check if this is a linked worktree
- gitdir_file = os.path.join(path_str, ".git")
- if not os.path.exists(gitdir_file):
- raise ValueError(f"Not a valid worktree: {path_str}")
- # Read the .git file to get the worktree control directory
- try:
- with open(gitdir_file, "rb") as f:
- gitdir_content = f.read().strip()
- if gitdir_content.startswith(b"gitdir: "):
- worktree_control_path = gitdir_content[8:].decode()
- else:
- raise ValueError(f"Invalid .git file in worktree: {path_str}")
- except (FileNotFoundError, PermissionError, UnicodeDecodeError) as e:
- raise ValueError(
- f"Cannot read .git file in worktree: {path_str}"
- ) from e
- # Make the path absolute if it's relative
- if not os.path.isabs(worktree_control_path):
- worktree_control_path = os.path.abspath(
- os.path.join(path_str, worktree_control_path)
- )
- # Update the gitdir file in the worktree control directory
- gitdir_pointer = os.path.join(worktree_control_path, GITDIR)
- if os.path.exists(gitdir_pointer):
- # Update to point to the current location
- with open(gitdir_pointer, "wb") as f:
- f.write(os.fsencode(gitdir_file) + b"\n")
- repaired.append(path_str)
- else:
- # Repair from main repository to all linked worktrees
- if not os.path.isdir(worktrees_dir):
- return repaired
- for entry in os.listdir(worktrees_dir):
- worktree_control_path = os.path.join(worktrees_dir, entry)
- if not os.path.isdir(worktree_control_path):
- continue
- # Read the gitdir file to find where the worktree thinks it is
- gitdir_path = os.path.join(worktree_control_path, GITDIR)
- try:
- with open(gitdir_path, "rb") as f:
- gitdir_contents = f.read().strip()
- old_gitdir_location = os.fsdecode(gitdir_contents)
- except (FileNotFoundError, PermissionError):
- # Can't repair if we can't read the gitdir file
- continue
- # Get the worktree directory (remove .git suffix)
- old_worktree_path = os.path.dirname(old_gitdir_location)
- # Check if the .git file exists at the old location
- if os.path.exists(old_gitdir_location):
- # Try to read and update the .git file to ensure it points back correctly
- try:
- with open(old_gitdir_location, "rb") as f:
- content = f.read().strip()
- if content.startswith(b"gitdir: "):
- current_pointer = content[8:].decode()
- if not os.path.isabs(current_pointer):
- current_pointer = os.path.abspath(
- os.path.join(old_worktree_path, current_pointer)
- )
- # If it doesn't point to the right place, fix it
- expected_pointer = worktree_control_path
- if os.path.abspath(current_pointer) != os.path.abspath(
- expected_pointer
- ):
- # Update the .git file to point to the correct location
- with open(old_gitdir_location, "wb") as wf:
- wf.write(
- b"gitdir: "
- + os.fsencode(worktree_control_path)
- + b"\n"
- )
- repaired.append(old_worktree_path)
- except (PermissionError, UnicodeDecodeError):
- continue
- return repaired
- @contextmanager
- def temporary_worktree(repo: Repo, prefix: str = "tmp-worktree-") -> Iterator[Repo]:
- """Create a temporary worktree that is automatically cleaned up.
- Args:
- repo: Dulwich repository object
- prefix: Prefix for the temporary directory name
- Yields:
- Worktree object
- """
- temp_dir = None
- worktree = None
- try:
- # Create temporary directory
- temp_dir = tempfile.mkdtemp(prefix=prefix)
- # Add worktree
- worktree = repo.worktrees.add(temp_dir, exist_ok=True)
- yield worktree
- finally:
- # Clean up worktree registration
- if worktree:
- repo.worktrees.remove(worktree.path)
- # Clean up temporary directory
- if temp_dir and Path(temp_dir).exists():
- shutil.rmtree(temp_dir)
|