|
|
@@ -192,6 +192,7 @@ from .refs import (
|
|
|
SymrefLoop,
|
|
|
_import_remote_refs,
|
|
|
filter_ref_prefix,
|
|
|
+ shorten_ref_name,
|
|
|
)
|
|
|
from .repo import BaseRepo, Repo, get_user_identity
|
|
|
from .server import (
|
|
|
@@ -3963,7 +3964,10 @@ def show_ref(
|
|
|
# Check if the end of ref matches the pattern
|
|
|
matches = True
|
|
|
for i in range(len(pattern_parts)):
|
|
|
- if ref_parts[-(len(pattern_parts) - i)] != pattern_parts[i]:
|
|
|
+ if (
|
|
|
+ ref_parts[-(len(pattern_parts) - i)]
|
|
|
+ != pattern_parts[i]
|
|
|
+ ):
|
|
|
matches = False
|
|
|
break
|
|
|
if matches:
|
|
|
@@ -3985,6 +3989,7 @@ def show_ref(
|
|
|
obj = r.get_object(sha)
|
|
|
# Peel tag objects to get the underlying commit/object
|
|
|
from .objects import Tag
|
|
|
+
|
|
|
while obj.type_name == b"tag":
|
|
|
assert isinstance(obj, Tag)
|
|
|
_obj_class, sha = obj.object
|
|
|
@@ -3997,6 +4002,283 @@ def show_ref(
|
|
|
return result
|
|
|
|
|
|
|
|
|
+def show_branch(
|
|
|
+ repo: Union[Repo, str] = ".",
|
|
|
+ branches: Optional[list[Union[str, bytes]]] = None,
|
|
|
+ all_branches: bool = False,
|
|
|
+ remotes: bool = False,
|
|
|
+ current: bool = False,
|
|
|
+ topo_order: bool = False,
|
|
|
+ more: Optional[int] = None,
|
|
|
+ list_branches: bool = False,
|
|
|
+ independent_branches: bool = False,
|
|
|
+ merge_base: bool = False,
|
|
|
+) -> list[str]:
|
|
|
+ """Display branches and their commits.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ repo: Path to the repository
|
|
|
+ branches: List of specific branches to show (default: all local branches)
|
|
|
+ all_branches: Show both local and remote branches
|
|
|
+ remotes: Show only remote branches
|
|
|
+ current: Include current branch if not specified
|
|
|
+ topo_order: Show in topological order instead of chronological
|
|
|
+ more: Show N more commits beyond common ancestor (negative to show only headers)
|
|
|
+ list_branches: Synonym for more=-1 (show only branch headers)
|
|
|
+ independent_branches: Show only branches not reachable from others
|
|
|
+ merge_base: Show merge bases instead of commit list
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ List of output lines
|
|
|
+ """
|
|
|
+ from .graph import find_octopus_base, independent
|
|
|
+
|
|
|
+ output_lines: list[str] = []
|
|
|
+
|
|
|
+ with open_repo_closing(repo) as r:
|
|
|
+ refs = r.get_refs()
|
|
|
+
|
|
|
+ # Determine which branches to show
|
|
|
+ branch_refs: dict[bytes, bytes] = {}
|
|
|
+
|
|
|
+ if branches:
|
|
|
+ # Specific branches requested
|
|
|
+ for branch in branches:
|
|
|
+ branch_bytes = (
|
|
|
+ os.fsencode(branch) if isinstance(branch, str) else branch
|
|
|
+ )
|
|
|
+ # Try as full ref name first
|
|
|
+ if branch_bytes in refs:
|
|
|
+ branch_refs[branch_bytes] = refs[branch_bytes]
|
|
|
+ # Try as branch name
|
|
|
+ elif LOCAL_BRANCH_PREFIX + branch_bytes in refs:
|
|
|
+ branch_refs[LOCAL_BRANCH_PREFIX + branch_bytes] = refs[
|
|
|
+ LOCAL_BRANCH_PREFIX + branch_bytes
|
|
|
+ ]
|
|
|
+ # Try as remote branch
|
|
|
+ elif LOCAL_REMOTE_PREFIX + branch_bytes in refs:
|
|
|
+ branch_refs[LOCAL_REMOTE_PREFIX + branch_bytes] = refs[
|
|
|
+ LOCAL_REMOTE_PREFIX + branch_bytes
|
|
|
+ ]
|
|
|
+ else:
|
|
|
+ # Default behavior: show local branches
|
|
|
+ if all_branches:
|
|
|
+ # Show both local and remote branches
|
|
|
+ branch_refs = filter_ref_prefix(
|
|
|
+ refs, [LOCAL_BRANCH_PREFIX, LOCAL_REMOTE_PREFIX]
|
|
|
+ )
|
|
|
+ elif remotes:
|
|
|
+ # Show only remote branches
|
|
|
+ branch_refs = filter_ref_prefix(refs, [LOCAL_REMOTE_PREFIX])
|
|
|
+ else:
|
|
|
+ # Show only local branches
|
|
|
+ branch_refs = filter_ref_prefix(refs, [LOCAL_BRANCH_PREFIX])
|
|
|
+
|
|
|
+ # Add current branch if requested and not already included
|
|
|
+ if current:
|
|
|
+ try:
|
|
|
+ head_refs, _ = r.refs.follow(b"HEAD")
|
|
|
+ if head_refs:
|
|
|
+ head_ref = head_refs[0]
|
|
|
+ if head_ref not in branch_refs and head_ref in refs:
|
|
|
+ branch_refs[head_ref] = refs[head_ref]
|
|
|
+ except (KeyError, TypeError):
|
|
|
+ # HEAD doesn't point to a branch or doesn't exist
|
|
|
+ pass
|
|
|
+
|
|
|
+ if not branch_refs:
|
|
|
+ return output_lines
|
|
|
+
|
|
|
+ # Sort branches for consistent output
|
|
|
+ sorted_branches = sorted(branch_refs.items(), key=lambda x: x[0])
|
|
|
+ branch_sha_list = [sha for _, sha in sorted_branches]
|
|
|
+
|
|
|
+ # Handle --independent flag
|
|
|
+ if independent_branches:
|
|
|
+ independent_shas = independent(r, branch_sha_list)
|
|
|
+ for ref_name, sha in sorted_branches:
|
|
|
+ if sha in independent_shas:
|
|
|
+ ref_str = os.fsdecode(shorten_ref_name(ref_name))
|
|
|
+ output_lines.append(ref_str)
|
|
|
+ return output_lines
|
|
|
+
|
|
|
+ # Handle --merge-base flag
|
|
|
+ if merge_base:
|
|
|
+ if len(branch_sha_list) < 2:
|
|
|
+ # Need at least 2 branches for merge base
|
|
|
+ return output_lines
|
|
|
+
|
|
|
+ merge_bases = find_octopus_base(r, branch_sha_list)
|
|
|
+ for sha in merge_bases:
|
|
|
+ output_lines.append(sha.decode("ascii"))
|
|
|
+ return output_lines
|
|
|
+
|
|
|
+ # Get current branch for marking
|
|
|
+ current_branch: Optional[bytes] = None
|
|
|
+ try:
|
|
|
+ head_refs, _ = r.refs.follow(b"HEAD")
|
|
|
+ if head_refs:
|
|
|
+ current_branch = head_refs[0]
|
|
|
+ except (KeyError, TypeError):
|
|
|
+ pass
|
|
|
+
|
|
|
+ # Collect commit information for each branch
|
|
|
+ branch_commits: list[tuple[bytes, str]] = [] # (sha, message)
|
|
|
+ for ref_name, sha in sorted_branches:
|
|
|
+ try:
|
|
|
+ commit = r[sha]
|
|
|
+ if hasattr(commit, "message"):
|
|
|
+ message = commit.message.decode("utf-8", errors="replace").split(
|
|
|
+ "\n"
|
|
|
+ )[0]
|
|
|
+ else:
|
|
|
+ message = ""
|
|
|
+ branch_commits.append((sha, message))
|
|
|
+ except KeyError:
|
|
|
+ branch_commits.append((sha, ""))
|
|
|
+
|
|
|
+ # Handle --list flag (show only branch headers)
|
|
|
+ if list_branches or (more is not None and more < 0):
|
|
|
+ # Just show the branch headers
|
|
|
+ for i, (ref_name, sha) in enumerate(sorted_branches):
|
|
|
+ is_current = ref_name == current_branch
|
|
|
+ marker = "*" if is_current else "!"
|
|
|
+ # Create spacing for alignment
|
|
|
+ prefix = " " * i + marker + " " * (len(sorted_branches) - i - 1)
|
|
|
+ ref_str = os.fsdecode(shorten_ref_name(ref_name))
|
|
|
+ _, message = branch_commits[i]
|
|
|
+ output_lines.append(f"{prefix}[{ref_str}] {message}")
|
|
|
+ return output_lines
|
|
|
+
|
|
|
+ # Build commit history for visualization
|
|
|
+ # Collect all commits reachable from any branch
|
|
|
+ all_commits: dict[
|
|
|
+ bytes, tuple[int, list[bytes], str]
|
|
|
+ ] = {} # sha -> (timestamp, parents, message)
|
|
|
+
|
|
|
+ def collect_commits(sha: bytes, branch_idx: int, visited: set[bytes]) -> None:
|
|
|
+ """Recursively collect commits."""
|
|
|
+ if sha in visited:
|
|
|
+ return
|
|
|
+ visited.add(sha)
|
|
|
+
|
|
|
+ try:
|
|
|
+ commit = r[sha]
|
|
|
+ if not hasattr(commit, "commit_time"):
|
|
|
+ return
|
|
|
+
|
|
|
+ timestamp = commit.commit_time
|
|
|
+ parents = commit.parents if hasattr(commit, "parents") else []
|
|
|
+ message = (
|
|
|
+ commit.message.decode("utf-8", errors="replace").split("\n")[0]
|
|
|
+ if hasattr(commit, "message")
|
|
|
+ else ""
|
|
|
+ )
|
|
|
+
|
|
|
+ if sha not in all_commits:
|
|
|
+ all_commits[sha] = (timestamp, parents, message)
|
|
|
+
|
|
|
+ # Recurse to parents
|
|
|
+ for parent in parents:
|
|
|
+ collect_commits(parent, branch_idx, visited)
|
|
|
+ except KeyError:
|
|
|
+ # Commit not found, stop traversal
|
|
|
+ pass
|
|
|
+
|
|
|
+ # Collect commits from all branches
|
|
|
+ for i, (_, sha) in enumerate(sorted_branches):
|
|
|
+ collect_commits(sha, i, set())
|
|
|
+
|
|
|
+ # Find common ancestor
|
|
|
+ common_ancestor_sha = None
|
|
|
+ if len(branch_sha_list) >= 2:
|
|
|
+ try:
|
|
|
+ merge_bases = find_octopus_base(r, branch_sha_list)
|
|
|
+ if merge_bases:
|
|
|
+ common_ancestor_sha = merge_bases[0]
|
|
|
+ except (KeyError, IndexError):
|
|
|
+ pass
|
|
|
+
|
|
|
+ # Sort commits (chronological by default, or topological if requested)
|
|
|
+ if topo_order:
|
|
|
+ # Topological sort is more complex, for now use chronological
|
|
|
+ # TODO: Implement proper topological ordering
|
|
|
+ sorted_commits = sorted(all_commits.items(), key=lambda x: -x[1][0])
|
|
|
+ else:
|
|
|
+ # Reverse chronological order (newest first)
|
|
|
+ sorted_commits = sorted(all_commits.items(), key=lambda x: -x[1][0])
|
|
|
+
|
|
|
+ # Determine how many commits to show
|
|
|
+ if more is not None:
|
|
|
+ # Find index of common ancestor
|
|
|
+ if common_ancestor_sha and common_ancestor_sha in all_commits:
|
|
|
+ ancestor_idx = next(
|
|
|
+ (
|
|
|
+ i
|
|
|
+ for i, (sha, _) in enumerate(sorted_commits)
|
|
|
+ if sha == common_ancestor_sha
|
|
|
+ ),
|
|
|
+ None,
|
|
|
+ )
|
|
|
+ if ancestor_idx is not None:
|
|
|
+ # Show commits up to ancestor + more
|
|
|
+ sorted_commits = sorted_commits[: ancestor_idx + 1 + more]
|
|
|
+
|
|
|
+ # Determine which branches contain which commits
|
|
|
+ branch_contains: list[set[bytes]] = []
|
|
|
+ for ref_name, sha in sorted_branches:
|
|
|
+ reachable = set()
|
|
|
+
|
|
|
+ def mark_reachable(commit_sha: bytes) -> None:
|
|
|
+ if commit_sha in reachable:
|
|
|
+ return
|
|
|
+ reachable.add(commit_sha)
|
|
|
+ if commit_sha in all_commits:
|
|
|
+ _, parents, _ = all_commits[commit_sha]
|
|
|
+ for parent in parents:
|
|
|
+ mark_reachable(parent)
|
|
|
+
|
|
|
+ mark_reachable(sha)
|
|
|
+ branch_contains.append(reachable)
|
|
|
+
|
|
|
+ # Output branch headers
|
|
|
+ for i, (ref_name, sha) in enumerate(sorted_branches):
|
|
|
+ is_current = ref_name == current_branch
|
|
|
+ marker = "*" if is_current else "!"
|
|
|
+ # Create spacing for alignment
|
|
|
+ prefix = " " * i + marker + " " * (len(sorted_branches) - i - 1)
|
|
|
+ ref_str = os.fsdecode(shorten_ref_name(ref_name))
|
|
|
+ _, message = branch_commits[i]
|
|
|
+ output_lines.append(f"{prefix}[{ref_str}] {message}")
|
|
|
+
|
|
|
+ # Output separator
|
|
|
+ output_lines.append("-" * (len(sorted_branches) + 2))
|
|
|
+
|
|
|
+ # Output commits
|
|
|
+ for commit_sha, (_, _, message) in sorted_commits:
|
|
|
+ # Build marker string
|
|
|
+ markers = []
|
|
|
+ for i, (ref_name, branch_sha) in enumerate(sorted_branches):
|
|
|
+ if commit_sha == branch_sha:
|
|
|
+ # This is the tip of the branch
|
|
|
+ markers.append("*")
|
|
|
+ elif commit_sha in branch_contains[i]:
|
|
|
+ # This commit is in the branch
|
|
|
+ markers.append("+")
|
|
|
+ else:
|
|
|
+ # This commit is not in the branch
|
|
|
+ markers.append(" ")
|
|
|
+
|
|
|
+ marker_str = "".join(markers)
|
|
|
+ output_lines.append(f"{marker_str} [{message}]")
|
|
|
+
|
|
|
+ # Limit output to 26 branches (git show-branch limitation)
|
|
|
+ if len(sorted_branches) > 26:
|
|
|
+ break
|
|
|
+
|
|
|
+ return output_lines
|
|
|
+
|
|
|
+
|
|
|
def ls_remote(
|
|
|
remote: Union[str, bytes],
|
|
|
config: Optional[Config] = None,
|