|
|
@@ -5082,6 +5082,63 @@ def check_ignore(
|
|
|
yield _quote_path(output_path) if quote_path else output_path
|
|
|
|
|
|
|
|
|
+def _check_uncommitted_changes(
|
|
|
+ repo: Repo, target_tree_id: bytes, force: bool = False
|
|
|
+) -> None:
|
|
|
+ """Check for uncommitted changes that would conflict with a checkout/switch.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ repo: Repository object
|
|
|
+ target_tree_id: Tree ID to check conflicts against
|
|
|
+ force: If True, skip the check
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ CheckoutError: If there are conflicting local changes
|
|
|
+ """
|
|
|
+ if force:
|
|
|
+ return
|
|
|
+
|
|
|
+ # Get current HEAD tree for comparison
|
|
|
+ try:
|
|
|
+ current_head = repo.refs[b"HEAD"]
|
|
|
+ current_commit = repo[current_head]
|
|
|
+ assert isinstance(current_commit, Commit), "Expected a Commit object"
|
|
|
+ except KeyError:
|
|
|
+ # No HEAD yet (empty repo)
|
|
|
+ return
|
|
|
+
|
|
|
+ status_report = status(repo)
|
|
|
+ changes = []
|
|
|
+ # staged is a dict with 'add', 'delete', 'modify' keys
|
|
|
+ if isinstance(status_report.staged, dict):
|
|
|
+ changes.extend(status_report.staged.get("add", []))
|
|
|
+ changes.extend(status_report.staged.get("delete", []))
|
|
|
+ changes.extend(status_report.staged.get("modify", []))
|
|
|
+ # unstaged is a list
|
|
|
+ changes.extend(status_report.unstaged)
|
|
|
+
|
|
|
+ if changes:
|
|
|
+ # Check if any changes would conflict with checkout
|
|
|
+ target_tree_obj = repo[target_tree_id]
|
|
|
+ assert isinstance(target_tree_obj, Tree), "Expected a Tree object"
|
|
|
+ target_tree = target_tree_obj
|
|
|
+ for change in changes:
|
|
|
+ if isinstance(change, str):
|
|
|
+ change = change.encode(DEFAULT_ENCODING)
|
|
|
+
|
|
|
+ try:
|
|
|
+ target_tree.lookup_path(repo.object_store.__getitem__, change)
|
|
|
+ except KeyError:
|
|
|
+ # File doesn't exist in target tree - change can be preserved
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ # File exists in target tree - would overwrite local changes
|
|
|
+ raise CheckoutError(
|
|
|
+ f"Your local changes to '{change.decode()}' would be "
|
|
|
+ "overwritten. Please commit or stash before switching."
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
def update_head(
|
|
|
repo: RepoPath,
|
|
|
target: str | bytes,
|
|
|
@@ -5246,36 +5303,8 @@ def checkout(
|
|
|
current_tree_id = None
|
|
|
|
|
|
# Check for uncommitted changes if not forcing
|
|
|
- if not force and current_tree_id is not None:
|
|
|
- status_report = status(r)
|
|
|
- changes = []
|
|
|
- # staged is a dict with 'add', 'delete', 'modify' keys
|
|
|
- if isinstance(status_report.staged, dict):
|
|
|
- changes.extend(status_report.staged.get("add", []))
|
|
|
- changes.extend(status_report.staged.get("delete", []))
|
|
|
- changes.extend(status_report.staged.get("modify", []))
|
|
|
- # unstaged is a list
|
|
|
- changes.extend(status_report.unstaged)
|
|
|
- if changes:
|
|
|
- # Check if any changes would conflict with checkout
|
|
|
- target_tree_obj = r[target_tree_id]
|
|
|
- assert isinstance(target_tree_obj, Tree), "Expected a Tree object"
|
|
|
- target_tree = target_tree_obj
|
|
|
- for change in changes:
|
|
|
- if isinstance(change, str):
|
|
|
- change = change.encode(DEFAULT_ENCODING)
|
|
|
-
|
|
|
- try:
|
|
|
- target_tree.lookup_path(r.object_store.__getitem__, change)
|
|
|
- except KeyError:
|
|
|
- # File doesn't exist in target tree - change can be preserved
|
|
|
- pass
|
|
|
- else:
|
|
|
- # File exists in target tree - would overwrite local changes
|
|
|
- raise CheckoutError(
|
|
|
- f"Your local changes to '{change.decode()}' would be "
|
|
|
- "overwritten by checkout. Please commit or stash before switching."
|
|
|
- )
|
|
|
+ if current_tree_id is not None:
|
|
|
+ _check_uncommitted_changes(r, target_tree_id, force)
|
|
|
|
|
|
# Get configuration for working directory update
|
|
|
config = r.get_config()
|
|
|
@@ -5373,6 +5402,308 @@ def checkout(
|
|
|
update_head(r, target_commit.id.decode("ascii"), detached=True)
|
|
|
|
|
|
|
|
|
+def restore(
|
|
|
+ repo: str | os.PathLike[str] | Repo,
|
|
|
+ paths: list[bytes | str],
|
|
|
+ source: str | bytes | Commit | Tag | None = None,
|
|
|
+ staged: bool = False,
|
|
|
+ worktree: bool = True,
|
|
|
+) -> None:
|
|
|
+ """Restore working tree files.
|
|
|
+
|
|
|
+ This is similar to 'git restore', allowing you to restore specific files
|
|
|
+ from a commit or the index without changing HEAD.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ repo: Path to repository or repository object
|
|
|
+ paths: List of specific paths to restore
|
|
|
+ source: Branch name, tag, or commit SHA to restore from. If None, restores
|
|
|
+ staged files from HEAD, or worktree files from index
|
|
|
+ staged: Restore files in the index (--staged)
|
|
|
+ worktree: Restore files in the working tree (default: True)
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ CheckoutError: If restore cannot be performed
|
|
|
+ ValueError: If neither staged nor worktree is specified
|
|
|
+ KeyError: If the source reference cannot be found
|
|
|
+ """
|
|
|
+ if not staged and not worktree:
|
|
|
+ raise ValueError("At least one of staged or worktree must be True")
|
|
|
+
|
|
|
+ with open_repo_closing(repo) as r:
|
|
|
+ from .index import _fs_to_tree_path, build_file_from_blob
|
|
|
+
|
|
|
+ wt = r.get_worktree()
|
|
|
+
|
|
|
+ # Determine the source tree
|
|
|
+ if source is None:
|
|
|
+ if staged:
|
|
|
+ # Restoring staged files from HEAD
|
|
|
+ try:
|
|
|
+ source = r.refs[b"HEAD"]
|
|
|
+ except KeyError:
|
|
|
+ raise CheckoutError("No HEAD reference found")
|
|
|
+ elif worktree:
|
|
|
+ # Restoring worktree files from index
|
|
|
+ from .index import ConflictedIndexEntry, IndexEntry
|
|
|
+
|
|
|
+ index = r.open_index()
|
|
|
+ for path in paths:
|
|
|
+ if isinstance(path, str):
|
|
|
+ tree_path = _fs_to_tree_path(path)
|
|
|
+ else:
|
|
|
+ tree_path = path
|
|
|
+
|
|
|
+ try:
|
|
|
+ index_entry = index[tree_path]
|
|
|
+ if isinstance(index_entry, ConflictedIndexEntry):
|
|
|
+ raise CheckoutError(
|
|
|
+ f"Path '{path if isinstance(path, str) else path.decode(DEFAULT_ENCODING)}' has conflicts"
|
|
|
+ )
|
|
|
+ blob = r[index_entry.sha]
|
|
|
+ assert isinstance(blob, Blob), "Expected a Blob object"
|
|
|
+
|
|
|
+ full_path = os.path.join(os.fsencode(r.path), tree_path)
|
|
|
+ mode = index_entry.mode
|
|
|
+
|
|
|
+ # Use build_file_from_blob to write the file
|
|
|
+ build_file_from_blob(blob, mode, full_path)
|
|
|
+ except KeyError:
|
|
|
+ # Path doesn't exist in index
|
|
|
+ raise CheckoutError(
|
|
|
+ f"Path '{path if isinstance(path, str) else path.decode(DEFAULT_ENCODING)}' not in index"
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ # source is not None at this point
|
|
|
+ assert source is not None
|
|
|
+ # Get the source tree
|
|
|
+ source_tree = parse_tree(r, treeish=source)
|
|
|
+
|
|
|
+ # Restore specified paths from source tree
|
|
|
+ for path in paths:
|
|
|
+ if isinstance(path, str):
|
|
|
+ tree_path = _fs_to_tree_path(path)
|
|
|
+ else:
|
|
|
+ tree_path = path
|
|
|
+
|
|
|
+ try:
|
|
|
+ # Look up the path in the source tree
|
|
|
+ mode, sha = source_tree.lookup_path(
|
|
|
+ r.object_store.__getitem__, tree_path
|
|
|
+ )
|
|
|
+ blob = r[sha]
|
|
|
+ assert isinstance(blob, Blob), "Expected a Blob object"
|
|
|
+ except KeyError:
|
|
|
+ # Path doesn't exist in source tree
|
|
|
+ raise CheckoutError(
|
|
|
+ f"Path '{path if isinstance(path, str) else path.decode(DEFAULT_ENCODING)}' not found in source"
|
|
|
+ )
|
|
|
+
|
|
|
+ full_path = os.path.join(os.fsencode(r.path), tree_path)
|
|
|
+
|
|
|
+ if worktree:
|
|
|
+ # Use build_file_from_blob to restore to working tree
|
|
|
+ build_file_from_blob(blob, mode, full_path)
|
|
|
+
|
|
|
+ if staged:
|
|
|
+ # Update the index with the blob from source
|
|
|
+ from .index import IndexEntry
|
|
|
+
|
|
|
+ index = r.open_index()
|
|
|
+
|
|
|
+ # When only updating staged (not worktree), we want to reset the index
|
|
|
+ # to the source, but invalidate the stat cache so Git knows to check
|
|
|
+ # the worktree file. Use zeros for stat fields.
|
|
|
+ if not worktree:
|
|
|
+ # Invalidate stat cache by using zeros
|
|
|
+ new_entry = IndexEntry(
|
|
|
+ ctime=(0, 0),
|
|
|
+ mtime=(0, 0),
|
|
|
+ dev=0,
|
|
|
+ ino=0,
|
|
|
+ mode=mode,
|
|
|
+ uid=0,
|
|
|
+ gid=0,
|
|
|
+ size=0,
|
|
|
+ sha=sha,
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # If we also updated worktree, use actual stat
|
|
|
+ from .index import index_entry_from_stat
|
|
|
+
|
|
|
+ st = os.lstat(full_path)
|
|
|
+ new_entry = index_entry_from_stat(st, sha, mode)
|
|
|
+
|
|
|
+ index[tree_path] = new_entry
|
|
|
+ index.write()
|
|
|
+
|
|
|
+
|
|
|
+def switch(
|
|
|
+ repo: str | os.PathLike[str] | Repo,
|
|
|
+ target: str | bytes | Commit | Tag,
|
|
|
+ create: str | bytes | None = None,
|
|
|
+ force: bool = False,
|
|
|
+ detach: bool = False,
|
|
|
+) -> None:
|
|
|
+ """Switch branches.
|
|
|
+
|
|
|
+ This is similar to 'git switch', allowing you to switch to a different
|
|
|
+ branch or commit, updating both HEAD and the working tree.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ repo: Path to repository or repository object
|
|
|
+ target: Branch name, tag, or commit SHA to switch to
|
|
|
+ create: Create a new branch at target before switching (like git switch -c)
|
|
|
+ force: Force switch even if there are local changes
|
|
|
+ detach: Switch to a commit in detached HEAD state (like git switch --detach)
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ CheckoutError: If switch cannot be performed due to conflicts
|
|
|
+ KeyError: If the target reference cannot be found
|
|
|
+ ValueError: If both create and detach are specified
|
|
|
+ """
|
|
|
+ if create and detach:
|
|
|
+ raise ValueError("Cannot use both create and detach options")
|
|
|
+
|
|
|
+ with open_repo_closing(repo) as r:
|
|
|
+ # Store the original target for later reference checks
|
|
|
+ original_target = target
|
|
|
+
|
|
|
+ if isinstance(target, str):
|
|
|
+ target_bytes = target.encode(DEFAULT_ENCODING)
|
|
|
+ elif isinstance(target, bytes):
|
|
|
+ target_bytes = target
|
|
|
+ else:
|
|
|
+ # For Commit/Tag objects, we'll use their SHA
|
|
|
+ target_bytes = target.id
|
|
|
+
|
|
|
+ if isinstance(create, str):
|
|
|
+ create = create.encode(DEFAULT_ENCODING)
|
|
|
+
|
|
|
+ # Parse the target to get the commit
|
|
|
+ target_commit = parse_commit(r, original_target)
|
|
|
+ target_tree_id = target_commit.tree
|
|
|
+
|
|
|
+ # Get current HEAD tree for comparison
|
|
|
+ try:
|
|
|
+ current_head = r.refs[b"HEAD"]
|
|
|
+ current_commit = r[current_head]
|
|
|
+ assert isinstance(current_commit, Commit), "Expected a Commit object"
|
|
|
+ current_tree_id = current_commit.tree
|
|
|
+ except KeyError:
|
|
|
+ # No HEAD yet (empty repo)
|
|
|
+ current_tree_id = None
|
|
|
+
|
|
|
+ # Check for uncommitted changes if not forcing
|
|
|
+ if current_tree_id is not None:
|
|
|
+ _check_uncommitted_changes(r, target_tree_id, force)
|
|
|
+
|
|
|
+ # Get configuration for working directory update
|
|
|
+ config = r.get_config()
|
|
|
+ honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
|
|
|
+
|
|
|
+ if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
|
|
|
+ validate_path_element = validate_path_element_ntfs
|
|
|
+ else:
|
|
|
+ validate_path_element = validate_path_element_default
|
|
|
+
|
|
|
+ if config.get_boolean(b"core", b"symlinks", True):
|
|
|
+
|
|
|
+ def symlink_wrapper(
|
|
|
+ source: str | bytes | os.PathLike[str],
|
|
|
+ target: str | bytes | os.PathLike[str],
|
|
|
+ ) -> None:
|
|
|
+ symlink(source, target) # type: ignore[arg-type,unused-ignore]
|
|
|
+
|
|
|
+ symlink_fn = symlink_wrapper
|
|
|
+ else:
|
|
|
+
|
|
|
+ def symlink_fallback(
|
|
|
+ source: str | bytes | os.PathLike[str],
|
|
|
+ target: str | bytes | os.PathLike[str],
|
|
|
+ ) -> None:
|
|
|
+ mode = "w" + ("b" if isinstance(source, bytes) else "")
|
|
|
+ with open(target, mode) as f:
|
|
|
+ f.write(source)
|
|
|
+
|
|
|
+ symlink_fn = symlink_fallback
|
|
|
+
|
|
|
+ # Get blob normalizer for line ending conversion
|
|
|
+ blob_normalizer = r.get_blob_normalizer()
|
|
|
+
|
|
|
+ # Update working tree
|
|
|
+ tree_change_iterator: Iterator[TreeChange] = tree_changes(
|
|
|
+ r.object_store, current_tree_id, target_tree_id
|
|
|
+ )
|
|
|
+ update_working_tree(
|
|
|
+ r,
|
|
|
+ current_tree_id,
|
|
|
+ target_tree_id,
|
|
|
+ change_iterator=tree_change_iterator,
|
|
|
+ honor_filemode=honor_filemode,
|
|
|
+ validate_path_element=validate_path_element,
|
|
|
+ symlink_fn=symlink_fn,
|
|
|
+ force_remove_untracked=force,
|
|
|
+ blob_normalizer=blob_normalizer,
|
|
|
+ allow_overwrite_modified=force,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Update HEAD
|
|
|
+ if create:
|
|
|
+ # Create new branch and switch to it
|
|
|
+ branch_create(r, create, objectish=target_commit.id.decode("ascii"))
|
|
|
+ update_head(r, create)
|
|
|
+
|
|
|
+ # Set up tracking if creating from a remote branch
|
|
|
+ from .refs import LOCAL_REMOTE_PREFIX, local_branch_name, parse_remote_ref
|
|
|
+
|
|
|
+ if isinstance(original_target, bytes) and target_bytes.startswith(
|
|
|
+ LOCAL_REMOTE_PREFIX
|
|
|
+ ):
|
|
|
+ try:
|
|
|
+ remote_name, branch_name = parse_remote_ref(target_bytes)
|
|
|
+ # Set tracking to refs/heads/<branch> on the remote
|
|
|
+ set_branch_tracking(
|
|
|
+ r, create, remote_name, local_branch_name(branch_name)
|
|
|
+ )
|
|
|
+ except ValueError:
|
|
|
+ # Invalid remote ref format, skip tracking setup
|
|
|
+ pass
|
|
|
+ elif detach:
|
|
|
+ # Detached HEAD mode
|
|
|
+ update_head(r, target_commit.id.decode("ascii"), detached=True)
|
|
|
+ else:
|
|
|
+ # Check if target is a branch name (with or without refs/heads/ prefix)
|
|
|
+ branch_ref = None
|
|
|
+ if (
|
|
|
+ isinstance(original_target, (str, bytes))
|
|
|
+ and target_bytes in r.refs.keys()
|
|
|
+ ):
|
|
|
+ if target_bytes.startswith(LOCAL_BRANCH_PREFIX):
|
|
|
+ branch_ref = target_bytes
|
|
|
+ else:
|
|
|
+ # Try adding refs/heads/ prefix
|
|
|
+ potential_branch = (
|
|
|
+ _make_branch_ref(target_bytes)
|
|
|
+ if isinstance(original_target, (str, bytes))
|
|
|
+ else None
|
|
|
+ )
|
|
|
+ if potential_branch in r.refs.keys():
|
|
|
+ branch_ref = potential_branch
|
|
|
+
|
|
|
+ if branch_ref:
|
|
|
+ # It's a branch - update HEAD symbolically
|
|
|
+ update_head(r, branch_ref)
|
|
|
+ else:
|
|
|
+ # It's a tag, other ref, or commit SHA
|
|
|
+ # In git switch, this would be an error unless --detach is used
|
|
|
+ raise CheckoutError(
|
|
|
+ f"'{target_bytes.decode(DEFAULT_ENCODING)}' is not a branch. "
|
|
|
+ "Use detach=True to switch to a commit in detached HEAD state."
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
def reset_file(
|
|
|
repo: Repo,
|
|
|
file_path: str,
|