|
@@ -21,21 +21,196 @@
|
|
|
|
|
|
"""Working tree operations for Git repositories."""
|
|
"""Working tree operations for Git repositories."""
|
|
|
|
|
|
|
|
+from __future__ import annotations
|
|
|
|
+
|
|
|
|
+import builtins
|
|
import os
|
|
import os
|
|
|
|
+import shutil
|
|
import stat
|
|
import stat
|
|
import sys
|
|
import sys
|
|
import time
|
|
import time
|
|
import warnings
|
|
import warnings
|
|
from collections.abc import Iterable
|
|
from collections.abc import Iterable
|
|
-from typing import TYPE_CHECKING, Optional, Union
|
|
|
|
-
|
|
|
|
-if TYPE_CHECKING:
|
|
|
|
- from .repo import Repo
|
|
|
|
|
|
|
|
from .errors import CommitError, HookError
|
|
from .errors import CommitError, HookError
|
|
from .objects import Commit, ObjectID, Tag, Tree
|
|
from .objects import Commit, ObjectID, Tag, Tree
|
|
-from .refs import Ref
|
|
|
|
-from .repo import check_user_identity, get_user_identity
|
|
|
|
|
|
+from .refs import SYMREF, Ref
|
|
|
|
+from .repo import (
|
|
|
|
+ GITDIR,
|
|
|
|
+ WORKTREES,
|
|
|
|
+ Repo,
|
|
|
|
+ check_user_identity,
|
|
|
|
+ get_user_identity,
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class WorkTreeInfo:
|
|
|
|
+ """Information about a single worktree.
|
|
|
|
+
|
|
|
|
+ Attributes:
|
|
|
|
+ path: Path to the worktree
|
|
|
|
+ head: Current HEAD commit SHA
|
|
|
|
+ branch: Current branch (if not detached)
|
|
|
|
+ bare: Whether this is a bare repository
|
|
|
|
+ detached: Whether HEAD is detached
|
|
|
|
+ locked: Whether the worktree is locked
|
|
|
|
+ prunable: Whether the worktree can be pruned
|
|
|
|
+ lock_reason: Reason for locking (if locked)
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ def __init__(
|
|
|
|
+ self,
|
|
|
|
+ path: str,
|
|
|
|
+ head: bytes | None = None,
|
|
|
|
+ branch: bytes | None = None,
|
|
|
|
+ bare: bool = False,
|
|
|
|
+ detached: bool = False,
|
|
|
|
+ locked: bool = False,
|
|
|
|
+ prunable: bool = False,
|
|
|
|
+ lock_reason: str | None = None,
|
|
|
|
+ ):
|
|
|
|
+ self.path = path
|
|
|
|
+ self.head = head
|
|
|
|
+ self.branch = branch
|
|
|
|
+ self.bare = bare
|
|
|
|
+ self.detached = detached
|
|
|
|
+ self.locked = locked
|
|
|
|
+ self.prunable = prunable
|
|
|
|
+ self.lock_reason = lock_reason
|
|
|
|
+
|
|
|
|
+ def __repr__(self) -> str:
|
|
|
|
+ return f"WorkTreeInfo(path={self.path!r}, branch={self.branch!r}, detached={self.detached})"
|
|
|
|
+
|
|
|
|
+ def __eq__(self, other: object) -> bool:
|
|
|
|
+ if not isinstance(other, WorkTreeInfo):
|
|
|
|
+ return NotImplemented
|
|
|
|
+ return (
|
|
|
|
+ self.path == other.path
|
|
|
|
+ and self.head == other.head
|
|
|
|
+ and self.branch == other.branch
|
|
|
|
+ and self.bare == other.bare
|
|
|
|
+ and self.detached == other.detached
|
|
|
|
+ and self.locked == other.locked
|
|
|
|
+ and self.prunable == other.prunable
|
|
|
|
+ and self.lock_reason == other.lock_reason
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ def open(self) -> WorkTree:
|
|
|
|
+ """Open this worktree as a WorkTree.
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ WorkTree object for this worktree
|
|
|
|
+
|
|
|
|
+ Raises:
|
|
|
|
+ NotGitRepository: If the worktree path is invalid
|
|
|
|
+ """
|
|
|
|
+ from .repo import Repo
|
|
|
|
+
|
|
|
|
+ repo = Repo(self.path)
|
|
|
|
+ return WorkTree(repo, self.path)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class WorkTreeContainer:
|
|
|
|
+ """Container for managing multiple working trees.
|
|
|
|
+
|
|
|
|
+ This class manages worktrees for a repository, similar to how
|
|
|
|
+ RefsContainer manages references.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ def __init__(self, repo: Repo) -> None:
|
|
|
|
+ """Initialize a WorkTreeContainer for the given repository.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ repo: The repository this container belongs to
|
|
|
|
+ """
|
|
|
|
+ self._repo = repo
|
|
|
|
+
|
|
|
|
+ def list(self) -> list[WorkTreeInfo]:
|
|
|
|
+ """List all worktrees for this repository.
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ A list of WorkTreeInfo objects
|
|
|
|
+ """
|
|
|
|
+ return list_worktrees(self._repo)
|
|
|
|
+
|
|
|
|
+ def add(
|
|
|
|
+ self,
|
|
|
|
+ path: str | bytes | os.PathLike,
|
|
|
|
+ branch: str | bytes | None = None,
|
|
|
|
+ commit: ObjectID | None = None,
|
|
|
|
+ force: bool = False,
|
|
|
|
+ detach: bool = False,
|
|
|
|
+ ) -> Repo:
|
|
|
|
+ """Add a new worktree.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ path: Path where the new worktree should be created
|
|
|
|
+ branch: Branch to checkout in the new worktree
|
|
|
|
+ commit: Specific commit to checkout (results in detached HEAD)
|
|
|
|
+ force: Force creation even if branch is already checked out elsewhere
|
|
|
|
+ detach: Detach HEAD in the new worktree
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ The newly created worktree repository
|
|
|
|
+ """
|
|
|
|
+ return add_worktree(
|
|
|
|
+ self._repo, path, branch=branch, commit=commit, force=force, detach=detach
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ def remove(self, path: str | bytes | os.PathLike, force: bool = False) -> None:
|
|
|
|
+ """Remove a worktree.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ path: Path to the worktree to remove
|
|
|
|
+ force: Force removal even if there are local changes
|
|
|
|
+ """
|
|
|
|
+ remove_worktree(self._repo, path, force=force)
|
|
|
|
+
|
|
|
|
+ def prune(
|
|
|
|
+ self, expire: int | None = None, dry_run: bool = False
|
|
|
|
+ ) -> builtins.list[str]:
|
|
|
|
+ """Prune worktree administrative files for missing worktrees.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ expire: Only prune worktrees older than this many seconds
|
|
|
|
+ dry_run: Don't actually remove anything, just report what would be removed
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ List of pruned worktree identifiers
|
|
|
|
+ """
|
|
|
|
+ return prune_worktrees(self._repo, expire=expire, dry_run=dry_run)
|
|
|
|
+
|
|
|
|
+ def move(
|
|
|
|
+ self, old_path: str | bytes | os.PathLike, new_path: str | bytes | os.PathLike
|
|
|
|
+ ) -> None:
|
|
|
|
+ """Move a worktree to a new location.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ old_path: Current path of the worktree
|
|
|
|
+ new_path: New path for the worktree
|
|
|
|
+ """
|
|
|
|
+ move_worktree(self._repo, old_path, new_path)
|
|
|
|
+
|
|
|
|
+ def lock(self, path: str | bytes | os.PathLike, reason: str | None = None) -> None:
|
|
|
|
+ """Lock a worktree to prevent it from being pruned.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ path: Path to the worktree to lock
|
|
|
|
+ reason: Optional reason for locking
|
|
|
|
+ """
|
|
|
|
+ lock_worktree(self._repo, path, reason=reason)
|
|
|
|
+
|
|
|
|
+ def unlock(self, path: str | bytes | os.PathLike) -> None:
|
|
|
|
+ """Unlock a worktree.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ path: Path to the worktree to unlock
|
|
|
|
+ """
|
|
|
|
+ unlock_worktree(self._repo, path)
|
|
|
|
+
|
|
|
|
+ def __iter__(self):
|
|
|
|
+ """Iterate over all worktrees."""
|
|
|
|
+ yield from self.list()
|
|
|
|
|
|
|
|
|
|
class WorkTree:
|
|
class WorkTree:
|
|
@@ -45,7 +220,7 @@ class WorkTree:
|
|
such as staging files, committing changes, and resetting the index.
|
|
such as staging files, committing changes, and resetting the index.
|
|
"""
|
|
"""
|
|
|
|
|
|
- def __init__(self, repo: "Repo", path: Union[str, bytes, os.PathLike]) -> None:
|
|
|
|
|
|
+ def __init__(self, repo: Repo, path: str | bytes | os.PathLike) -> None:
|
|
"""Initialize a WorkTree for the given repository.
|
|
"""Initialize a WorkTree for the given repository.
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -62,9 +237,7 @@ class WorkTree:
|
|
|
|
|
|
def stage(
|
|
def stage(
|
|
self,
|
|
self,
|
|
- fs_paths: Union[
|
|
|
|
- str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]
|
|
|
|
- ],
|
|
|
|
|
|
+ fs_paths: str | bytes | os.PathLike | Iterable[str | bytes | os.PathLike],
|
|
) -> None:
|
|
) -> None:
|
|
"""Stage a set of paths.
|
|
"""Stage a set of paths.
|
|
|
|
|
|
@@ -98,7 +271,7 @@ class WorkTree:
|
|
full_path = os.path.join(root_path_bytes, fs_path)
|
|
full_path = os.path.join(root_path_bytes, fs_path)
|
|
try:
|
|
try:
|
|
st = os.lstat(full_path)
|
|
st = os.lstat(full_path)
|
|
- except OSError:
|
|
|
|
|
|
+ except (FileNotFoundError, NotADirectoryError):
|
|
# File no longer exists
|
|
# File no longer exists
|
|
try:
|
|
try:
|
|
del index[tree_path]
|
|
del index[tree_path]
|
|
@@ -187,17 +360,17 @@ class WorkTree:
|
|
|
|
|
|
def commit(
|
|
def commit(
|
|
self,
|
|
self,
|
|
- message: Optional[bytes] = None,
|
|
|
|
- committer: Optional[bytes] = None,
|
|
|
|
- author: Optional[bytes] = None,
|
|
|
|
|
|
+ message: bytes | None = None,
|
|
|
|
+ committer: bytes | None = None,
|
|
|
|
+ author: bytes | None = None,
|
|
commit_timestamp=None,
|
|
commit_timestamp=None,
|
|
commit_timezone=None,
|
|
commit_timezone=None,
|
|
author_timestamp=None,
|
|
author_timestamp=None,
|
|
author_timezone=None,
|
|
author_timezone=None,
|
|
- tree: Optional[ObjectID] = None,
|
|
|
|
- encoding: Optional[bytes] = None,
|
|
|
|
- ref: Optional[Ref] = b"HEAD",
|
|
|
|
- merge_heads: Optional[list[ObjectID]] = None,
|
|
|
|
|
|
+ tree: ObjectID | None = None,
|
|
|
|
+ encoding: bytes | None = None,
|
|
|
|
+ ref: Ref | None = b"HEAD",
|
|
|
|
+ merge_heads: list[ObjectID] | None = None,
|
|
no_verify: bool = False,
|
|
no_verify: bool = False,
|
|
sign: bool = False,
|
|
sign: bool = False,
|
|
):
|
|
):
|
|
@@ -385,7 +558,7 @@ class WorkTree:
|
|
|
|
|
|
return c.id
|
|
return c.id
|
|
|
|
|
|
- def reset_index(self, tree: Optional[bytes] = None):
|
|
|
|
|
|
+ def reset_index(self, tree: bytes | None = None):
|
|
"""Reset the index back to a specific tree.
|
|
"""Reset the index back to a specific tree.
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -485,7 +658,7 @@ class WorkTree:
|
|
for pat in patterns:
|
|
for pat in patterns:
|
|
f.write(pat + "\n")
|
|
f.write(pat + "\n")
|
|
|
|
|
|
- def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None:
|
|
|
|
|
|
+ def set_cone_mode_patterns(self, dirs: list[str] | None = None) -> None:
|
|
"""Write the given cone-mode directory patterns into info/sparse-checkout.
|
|
"""Write the given cone-mode directory patterns into info/sparse-checkout.
|
|
|
|
|
|
For each directory to include, add an inclusion line that "undoes" the prior
|
|
For each directory to include, add an inclusion line that "undoes" the prior
|
|
@@ -500,3 +673,471 @@ class WorkTree:
|
|
if d and line not in patterns:
|
|
if d and line not in patterns:
|
|
patterns.append(line)
|
|
patterns.append(line)
|
|
self.set_sparse_checkout_patterns(patterns)
|
|
self.set_sparse_checkout_patterns(patterns)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def read_worktree_lock_reason(worktree_path: str) -> str | None:
|
|
|
|
+ """Read the lock reason for a worktree.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ worktree_path: Path to the worktree's administrative directory
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ The lock reason if the worktree is locked, None otherwise
|
|
|
|
+ """
|
|
|
|
+ locked_path = os.path.join(worktree_path, "locked")
|
|
|
|
+ if not os.path.exists(locked_path):
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ with open(locked_path) as f:
|
|
|
|
+ return f.read().strip()
|
|
|
|
+ except (FileNotFoundError, PermissionError):
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def list_worktrees(repo: Repo) -> list[WorkTreeInfo]:
|
|
|
|
+ """List all worktrees for the given repository.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ repo: The repository to list worktrees for
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ A list of WorkTreeInfo objects
|
|
|
|
+ """
|
|
|
|
+ worktrees = []
|
|
|
|
+
|
|
|
|
+ # Add main worktree
|
|
|
|
+ main_wt_info = WorkTreeInfo(
|
|
|
|
+ path=repo.path,
|
|
|
|
+ head=repo.head(),
|
|
|
|
+ bare=repo.bare,
|
|
|
|
+ detached=False,
|
|
|
|
+ locked=False,
|
|
|
|
+ prunable=False,
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # Get branch info for main worktree
|
|
|
|
+ try:
|
|
|
|
+ with open(os.path.join(repo.controldir(), "HEAD"), "rb") as f:
|
|
|
|
+ head_contents = f.read().strip()
|
|
|
|
+ if head_contents.startswith(SYMREF):
|
|
|
|
+ ref_name = head_contents[len(SYMREF) :].strip()
|
|
|
|
+ main_wt_info.branch = ref_name
|
|
|
|
+ else:
|
|
|
|
+ main_wt_info.detached = True
|
|
|
|
+ main_wt_info.branch = None
|
|
|
|
+ except (FileNotFoundError, PermissionError):
|
|
|
|
+ main_wt_info.branch = None
|
|
|
|
+ main_wt_info.detached = True
|
|
|
|
+
|
|
|
|
+ worktrees.append(main_wt_info)
|
|
|
|
+
|
|
|
|
+ # List additional worktrees
|
|
|
|
+ worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
|
|
|
|
+ if os.path.isdir(worktrees_dir):
|
|
|
|
+ for entry in os.listdir(worktrees_dir):
|
|
|
|
+ worktree_path = os.path.join(worktrees_dir, entry)
|
|
|
|
+ if not os.path.isdir(worktree_path):
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ wt_info = WorkTreeInfo(
|
|
|
|
+ path="", # Will be set below
|
|
|
|
+ bare=False,
|
|
|
|
+ detached=False,
|
|
|
|
+ locked=False,
|
|
|
|
+ prunable=False,
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # Read gitdir to get actual worktree path
|
|
|
|
+ gitdir_path = os.path.join(worktree_path, GITDIR)
|
|
|
|
+ try:
|
|
|
|
+ with open(gitdir_path, "rb") as f:
|
|
|
|
+ gitdir_contents = f.read().strip()
|
|
|
|
+ # Convert relative path to absolute if needed
|
|
|
|
+ wt_path = os.fsdecode(gitdir_contents)
|
|
|
|
+ if not os.path.isabs(wt_path):
|
|
|
|
+ wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
|
|
|
|
+ wt_info.path = os.path.dirname(wt_path) # Remove .git suffix
|
|
|
|
+ except (FileNotFoundError, PermissionError):
|
|
|
|
+ # Worktree directory is missing, skip it
|
|
|
|
+ # TODO: Consider adding these as prunable worktrees with a placeholder path
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ # Check if worktree path exists
|
|
|
|
+ if wt_info.path and not os.path.exists(wt_info.path):
|
|
|
|
+ wt_info.prunable = True
|
|
|
|
+
|
|
|
|
+ # Read HEAD
|
|
|
|
+ head_path = os.path.join(worktree_path, "HEAD")
|
|
|
|
+ try:
|
|
|
|
+ with open(head_path, "rb") as f:
|
|
|
|
+ head_contents = f.read().strip()
|
|
|
|
+ if head_contents.startswith(SYMREF):
|
|
|
|
+ ref_name = head_contents[len(SYMREF) :].strip()
|
|
|
|
+ wt_info.branch = ref_name
|
|
|
|
+ # Resolve ref to get commit sha
|
|
|
|
+ try:
|
|
|
|
+ wt_info.head = repo.refs[ref_name]
|
|
|
|
+ except KeyError:
|
|
|
|
+ wt_info.head = None
|
|
|
|
+ else:
|
|
|
|
+ wt_info.detached = True
|
|
|
|
+ wt_info.branch = None
|
|
|
|
+ wt_info.head = head_contents
|
|
|
|
+ except (FileNotFoundError, PermissionError):
|
|
|
|
+ wt_info.head = None
|
|
|
|
+ wt_info.branch = None
|
|
|
|
+
|
|
|
|
+ # Check if locked
|
|
|
|
+ lock_reason = read_worktree_lock_reason(worktree_path)
|
|
|
|
+ if lock_reason is not None:
|
|
|
|
+ wt_info.locked = True
|
|
|
|
+ wt_info.lock_reason = lock_reason
|
|
|
|
+
|
|
|
|
+ worktrees.append(wt_info)
|
|
|
|
+
|
|
|
|
+ return worktrees
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def add_worktree(
|
|
|
|
+ repo: Repo,
|
|
|
|
+ path: str | bytes | os.PathLike,
|
|
|
|
+ branch: str | bytes | None = None,
|
|
|
|
+ commit: ObjectID | None = None,
|
|
|
|
+ force: bool = False,
|
|
|
|
+ detach: bool = False,
|
|
|
|
+) -> Repo:
|
|
|
|
+ """Add a new worktree to the repository.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ repo: The main repository
|
|
|
|
+ path: Path where the new worktree should be created
|
|
|
|
+ branch: Branch to checkout in the new worktree (creates if doesn't exist)
|
|
|
|
+ commit: Specific commit to checkout (results in detached HEAD)
|
|
|
|
+ force: Force creation even if branch is already checked out elsewhere
|
|
|
|
+ detach: Detach HEAD in the new worktree
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ The newly created worktree repository
|
|
|
|
+
|
|
|
|
+ Raises:
|
|
|
|
+ ValueError: If the path already exists or branch is already checked out
|
|
|
|
+ """
|
|
|
|
+ from .repo import Repo as RepoClass
|
|
|
|
+
|
|
|
|
+ path = os.fspath(path)
|
|
|
|
+ if isinstance(path, bytes):
|
|
|
|
+ path = os.fsdecode(path)
|
|
|
|
+
|
|
|
|
+ # Check if path already exists
|
|
|
|
+ if os.path.exists(path):
|
|
|
|
+ raise ValueError(f"Path already exists: {path}")
|
|
|
|
+
|
|
|
|
+ # Normalize branch name
|
|
|
|
+ if branch is not None:
|
|
|
|
+ if isinstance(branch, str):
|
|
|
|
+ branch = branch.encode()
|
|
|
|
+ if not branch.startswith(b"refs/heads/"):
|
|
|
|
+ branch = b"refs/heads/" + branch
|
|
|
|
+
|
|
|
|
+ # Check if branch is already checked out in another worktree
|
|
|
|
+ if branch and not force:
|
|
|
|
+ for wt in list_worktrees(repo):
|
|
|
|
+ if wt.branch == branch:
|
|
|
|
+ raise ValueError(
|
|
|
|
+ f"Branch {branch.decode()} is already checked out at {wt.path}"
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # Determine what to checkout
|
|
|
|
+ if commit is not None:
|
|
|
|
+ checkout_ref = commit
|
|
|
|
+ detach = True
|
|
|
|
+ elif branch is not None:
|
|
|
|
+ # Check if branch exists
|
|
|
|
+ try:
|
|
|
|
+ checkout_ref = repo.refs[branch]
|
|
|
|
+ except KeyError:
|
|
|
|
+ if commit is None:
|
|
|
|
+ # Create new branch from HEAD
|
|
|
|
+ checkout_ref = repo.head()
|
|
|
|
+ repo.refs[branch] = checkout_ref
|
|
|
|
+ else:
|
|
|
|
+ # Create new branch from specified commit
|
|
|
|
+ checkout_ref = commit
|
|
|
|
+ repo.refs[branch] = checkout_ref
|
|
|
|
+ else:
|
|
|
|
+ # Default to current HEAD
|
|
|
|
+ checkout_ref = repo.head()
|
|
|
|
+ detach = True
|
|
|
|
+
|
|
|
|
+ # Create the worktree directory
|
|
|
|
+ os.makedirs(path)
|
|
|
|
+
|
|
|
|
+ # Initialize the worktree
|
|
|
|
+ identifier = os.path.basename(path)
|
|
|
|
+ wt_repo = RepoClass._init_new_working_directory(path, repo, identifier=identifier)
|
|
|
|
+
|
|
|
|
+ # Set HEAD appropriately
|
|
|
|
+ if detach:
|
|
|
|
+ # Detached HEAD - write SHA directly to HEAD
|
|
|
|
+ with open(os.path.join(wt_repo.controldir(), "HEAD"), "wb") as f:
|
|
|
|
+ f.write(checkout_ref + b"\n")
|
|
|
|
+ else:
|
|
|
|
+ # Point to branch
|
|
|
|
+ wt_repo.refs.set_symbolic_ref(b"HEAD", branch)
|
|
|
|
+
|
|
|
|
+ # Reset index to match HEAD
|
|
|
|
+ wt_repo.reset_index()
|
|
|
|
+
|
|
|
|
+ return wt_repo
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def remove_worktree(
|
|
|
|
+ repo: Repo, path: str | bytes | os.PathLike, force: bool = False
|
|
|
|
+) -> None:
|
|
|
|
+ """Remove a worktree.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ repo: The main repository
|
|
|
|
+ path: Path to the worktree to remove
|
|
|
|
+ force: Force removal even if there are local changes
|
|
|
|
+
|
|
|
|
+ Raises:
|
|
|
|
+ ValueError: If the worktree doesn't exist, has local changes, or is locked
|
|
|
|
+ """
|
|
|
|
+ path = os.fspath(path)
|
|
|
|
+ if isinstance(path, bytes):
|
|
|
|
+ path = os.fsdecode(path)
|
|
|
|
+
|
|
|
|
+ # Don't allow removing the main worktree
|
|
|
|
+ if os.path.abspath(path) == os.path.abspath(repo.path):
|
|
|
|
+ raise ValueError("Cannot remove the main working tree")
|
|
|
|
+
|
|
|
|
+ # Find the worktree
|
|
|
|
+ worktree_found = False
|
|
|
|
+ worktree_id = None
|
|
|
|
+ worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
|
|
|
|
+
|
|
|
|
+ if os.path.isdir(worktrees_dir):
|
|
|
|
+ for entry in os.listdir(worktrees_dir):
|
|
|
|
+ worktree_path = os.path.join(worktrees_dir, entry)
|
|
|
|
+ gitdir_path = os.path.join(worktree_path, GITDIR)
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ with open(gitdir_path, "rb") as f:
|
|
|
|
+ gitdir_contents = f.read().strip()
|
|
|
|
+ wt_path = os.fsdecode(gitdir_contents)
|
|
|
|
+ if not os.path.isabs(wt_path):
|
|
|
|
+ wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
|
|
|
|
+ wt_dir = os.path.dirname(wt_path) # Remove .git suffix
|
|
|
|
+
|
|
|
|
+ if os.path.abspath(wt_dir) == os.path.abspath(path):
|
|
|
|
+ worktree_found = True
|
|
|
|
+ worktree_id = entry
|
|
|
|
+ break
|
|
|
|
+ except (FileNotFoundError, PermissionError):
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ if not worktree_found:
|
|
|
|
+ raise ValueError(f"Worktree not found: {path}")
|
|
|
|
+
|
|
|
|
+ assert worktree_id is not None # Should be set if worktree_found is True
|
|
|
|
+ worktree_control_dir = os.path.join(worktrees_dir, worktree_id)
|
|
|
|
+
|
|
|
|
+ # Check if locked
|
|
|
|
+ if os.path.exists(os.path.join(worktree_control_dir, "locked")):
|
|
|
|
+ if not force:
|
|
|
|
+ raise ValueError(f"Worktree is locked: {path}")
|
|
|
|
+
|
|
|
|
+ # Check for local changes if not forcing
|
|
|
|
+ if not force and os.path.exists(path):
|
|
|
|
+ # TODO: Check for uncommitted changes in the worktree
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+ # Remove the working directory
|
|
|
|
+ if os.path.exists(path):
|
|
|
|
+ shutil.rmtree(path)
|
|
|
|
+
|
|
|
|
+ # Remove the administrative files
|
|
|
|
+ shutil.rmtree(worktree_control_dir)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def prune_worktrees(
|
|
|
|
+ repo: Repo, expire: int | None = None, dry_run: bool = False
|
|
|
|
+) -> list[str]:
|
|
|
|
+ """Prune worktree administrative files for missing worktrees.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ repo: The main repository
|
|
|
|
+ expire: Only prune worktrees older than this many seconds
|
|
|
|
+ dry_run: Don't actually remove anything, just report what would be removed
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ List of pruned worktree identifiers
|
|
|
|
+ """
|
|
|
|
+ pruned: list[str] = []
|
|
|
|
+ worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
|
|
|
|
+
|
|
|
|
+ if not os.path.isdir(worktrees_dir):
|
|
|
|
+ return pruned
|
|
|
|
+
|
|
|
|
+ current_time = time.time()
|
|
|
|
+
|
|
|
|
+ for entry in os.listdir(worktrees_dir):
|
|
|
|
+ worktree_path = os.path.join(worktrees_dir, entry)
|
|
|
|
+ if not os.path.isdir(worktree_path):
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ # Skip locked worktrees
|
|
|
|
+ if os.path.exists(os.path.join(worktree_path, "locked")):
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ should_prune = False
|
|
|
|
+
|
|
|
|
+ # Check if gitdir exists and points to valid location
|
|
|
|
+ gitdir_path = os.path.join(worktree_path, GITDIR)
|
|
|
|
+ try:
|
|
|
|
+ with open(gitdir_path, "rb") as f:
|
|
|
|
+ gitdir_contents = f.read().strip()
|
|
|
|
+ wt_path = os.fsdecode(gitdir_contents)
|
|
|
|
+ if not os.path.isabs(wt_path):
|
|
|
|
+ wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
|
|
|
|
+ wt_dir = os.path.dirname(wt_path) # Remove .git suffix
|
|
|
|
+
|
|
|
|
+ if not os.path.exists(wt_dir):
|
|
|
|
+ should_prune = True
|
|
|
|
+ except (FileNotFoundError, PermissionError):
|
|
|
|
+ should_prune = True
|
|
|
|
+
|
|
|
|
+ # Check expiry time if specified
|
|
|
|
+ if should_prune and expire is not None:
|
|
|
|
+ stat_info = os.stat(worktree_path)
|
|
|
|
+ age = current_time - stat_info.st_mtime
|
|
|
|
+ if age < expire:
|
|
|
|
+ should_prune = False
|
|
|
|
+
|
|
|
|
+ if should_prune:
|
|
|
|
+ pruned.append(entry)
|
|
|
|
+ if not dry_run:
|
|
|
|
+ shutil.rmtree(worktree_path)
|
|
|
|
+
|
|
|
|
+ return pruned
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def lock_worktree(
|
|
|
|
+ repo: Repo, path: str | bytes | os.PathLike, reason: str | None = None
|
|
|
|
+) -> None:
|
|
|
|
+ """Lock a worktree to prevent it from being pruned.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ repo: The main repository
|
|
|
|
+ path: Path to the worktree to lock
|
|
|
|
+ reason: Optional reason for locking
|
|
|
|
+ """
|
|
|
|
+ worktree_id = _find_worktree_id(repo, path)
|
|
|
|
+ worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
|
|
|
|
+
|
|
|
|
+ lock_path = os.path.join(worktree_control_dir, "locked")
|
|
|
|
+ with open(lock_path, "w") as f:
|
|
|
|
+ if reason:
|
|
|
|
+ f.write(reason)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def unlock_worktree(repo: Repo, path: str | bytes | os.PathLike) -> None:
|
|
|
|
+ """Unlock a worktree.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ repo: The main repository
|
|
|
|
+ path: Path to the worktree to unlock
|
|
|
|
+ """
|
|
|
|
+ worktree_id = _find_worktree_id(repo, path)
|
|
|
|
+ worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
|
|
|
|
+
|
|
|
|
+ lock_path = os.path.join(worktree_control_dir, "locked")
|
|
|
|
+ if os.path.exists(lock_path):
|
|
|
|
+ os.remove(lock_path)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def _find_worktree_id(repo: Repo, path: str | bytes | os.PathLike) -> str:
|
|
|
|
+ """Find the worktree identifier for the given path.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ repo: The main repository
|
|
|
|
+ path: Path to the worktree
|
|
|
|
+
|
|
|
|
+ Returns:
|
|
|
|
+ The worktree identifier
|
|
|
|
+
|
|
|
|
+ Raises:
|
|
|
|
+ ValueError: If the worktree is not found
|
|
|
|
+ """
|
|
|
|
+ path = os.fspath(path)
|
|
|
|
+ if isinstance(path, bytes):
|
|
|
|
+ path = os.fsdecode(path)
|
|
|
|
+
|
|
|
|
+ worktrees_dir = os.path.join(repo.controldir(), WORKTREES)
|
|
|
|
+
|
|
|
|
+ if os.path.isdir(worktrees_dir):
|
|
|
|
+ for entry in os.listdir(worktrees_dir):
|
|
|
|
+ worktree_path = os.path.join(worktrees_dir, entry)
|
|
|
|
+ gitdir_path = os.path.join(worktree_path, GITDIR)
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ with open(gitdir_path, "rb") as f:
|
|
|
|
+ gitdir_contents = f.read().strip()
|
|
|
|
+ wt_path = os.fsdecode(gitdir_contents)
|
|
|
|
+ if not os.path.isabs(wt_path):
|
|
|
|
+ wt_path = os.path.abspath(os.path.join(worktree_path, wt_path))
|
|
|
|
+ wt_dir = os.path.dirname(wt_path) # Remove .git suffix
|
|
|
|
+
|
|
|
|
+ if os.path.abspath(wt_dir) == os.path.abspath(path):
|
|
|
|
+ return entry
|
|
|
|
+ except (FileNotFoundError, PermissionError):
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ raise ValueError(f"Worktree not found: {path}")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def move_worktree(
|
|
|
|
+ repo: Repo,
|
|
|
|
+ old_path: str | bytes | os.PathLike,
|
|
|
|
+ new_path: str | bytes | os.PathLike,
|
|
|
|
+) -> None:
|
|
|
|
+ """Move a worktree to a new location.
|
|
|
|
+
|
|
|
|
+ Args:
|
|
|
|
+ repo: The main repository
|
|
|
|
+ old_path: Current path of the worktree
|
|
|
|
+ new_path: New path for the worktree
|
|
|
|
+
|
|
|
|
+ Raises:
|
|
|
|
+ ValueError: If the worktree doesn't exist or new path already exists
|
|
|
|
+ """
|
|
|
|
+ old_path = os.fspath(old_path)
|
|
|
|
+ new_path = os.fspath(new_path)
|
|
|
|
+ if isinstance(old_path, bytes):
|
|
|
|
+ old_path = os.fsdecode(old_path)
|
|
|
|
+ if isinstance(new_path, bytes):
|
|
|
|
+ new_path = os.fsdecode(new_path)
|
|
|
|
+
|
|
|
|
+ # Don't allow moving the main worktree
|
|
|
|
+ if os.path.abspath(old_path) == os.path.abspath(repo.path):
|
|
|
|
+ raise ValueError("Cannot move the main working tree")
|
|
|
|
+
|
|
|
|
+ # Check if new path already exists
|
|
|
|
+ if os.path.exists(new_path):
|
|
|
|
+ raise ValueError(f"Path already exists: {new_path}")
|
|
|
|
+
|
|
|
|
+ # Find the worktree
|
|
|
|
+ worktree_id = _find_worktree_id(repo, old_path)
|
|
|
|
+ worktree_control_dir = os.path.join(repo.controldir(), WORKTREES, worktree_id)
|
|
|
|
+
|
|
|
|
+ # Move the actual worktree directory
|
|
|
|
+ shutil.move(old_path, new_path)
|
|
|
|
+
|
|
|
|
+ # Update the gitdir file in the worktree
|
|
|
|
+ gitdir_file = os.path.join(new_path, ".git")
|
|
|
|
+
|
|
|
|
+ # Update the gitdir pointer in the control directory
|
|
|
|
+ with open(os.path.join(worktree_control_dir, GITDIR), "wb") as f:
|
|
|
|
+ f.write(os.fsencode(gitdir_file) + b"\n")
|