|
|
@@ -21,6 +21,11 @@
|
|
|
|
|
|
"""Git rebase implementation."""
|
|
|
|
|
|
+import os
|
|
|
+import shutil
|
|
|
+import subprocess
|
|
|
+from dataclasses import dataclass
|
|
|
+from enum import Enum
|
|
|
from typing import Optional, Protocol
|
|
|
|
|
|
from dulwich.graph import find_merge_base
|
|
|
@@ -48,6 +53,314 @@ class RebaseAbort(RebaseError):
|
|
|
"""Raised when rebase is aborted."""
|
|
|
|
|
|
|
|
|
+class RebaseTodoCommand(Enum):
|
|
|
+ """Enum for rebase todo commands."""
|
|
|
+
|
|
|
+ PICK = "pick"
|
|
|
+ REWORD = "reword"
|
|
|
+ EDIT = "edit"
|
|
|
+ SQUASH = "squash"
|
|
|
+ FIXUP = "fixup"
|
|
|
+ EXEC = "exec"
|
|
|
+ BREAK = "break"
|
|
|
+ DROP = "drop"
|
|
|
+ LABEL = "label"
|
|
|
+ RESET = "reset"
|
|
|
+ MERGE = "merge"
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def from_string(cls, s: str) -> "RebaseTodoCommand":
|
|
|
+ """Parse a command from its string representation.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ s: Command string (can be abbreviated)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ RebaseTodoCommand enum value
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ ValueError: If command is not recognized
|
|
|
+ """
|
|
|
+ s = s.lower()
|
|
|
+ # Support abbreviations
|
|
|
+ abbreviations = {
|
|
|
+ "p": cls.PICK,
|
|
|
+ "r": cls.REWORD,
|
|
|
+ "e": cls.EDIT,
|
|
|
+ "s": cls.SQUASH,
|
|
|
+ "f": cls.FIXUP,
|
|
|
+ "x": cls.EXEC,
|
|
|
+ "b": cls.BREAK,
|
|
|
+ "d": cls.DROP,
|
|
|
+ "l": cls.LABEL,
|
|
|
+ "t": cls.RESET,
|
|
|
+ "m": cls.MERGE,
|
|
|
+ }
|
|
|
+
|
|
|
+ if s in abbreviations:
|
|
|
+ return abbreviations[s]
|
|
|
+
|
|
|
+ # Try full command name
|
|
|
+ try:
|
|
|
+ return cls(s)
|
|
|
+ except ValueError:
|
|
|
+ raise ValueError(f"Unknown rebase command: {s}")
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class RebaseTodoEntry:
|
|
|
+ """Represents a single entry in a rebase todo list."""
|
|
|
+
|
|
|
+ command: RebaseTodoCommand
|
|
|
+ commit_sha: Optional[bytes] = None # Store as hex string encoded as bytes
|
|
|
+ short_message: Optional[str] = None
|
|
|
+ arguments: Optional[str] = None
|
|
|
+
|
|
|
+ def to_string(self, abbreviate: bool = False) -> str:
|
|
|
+ """Convert to git-rebase-todo format string.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ abbreviate: Use abbreviated command names
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ String representation for todo file
|
|
|
+ """
|
|
|
+ if abbreviate:
|
|
|
+ cmd_map = {
|
|
|
+ RebaseTodoCommand.PICK: "p",
|
|
|
+ RebaseTodoCommand.REWORD: "r",
|
|
|
+ RebaseTodoCommand.EDIT: "e",
|
|
|
+ RebaseTodoCommand.SQUASH: "s",
|
|
|
+ RebaseTodoCommand.FIXUP: "f",
|
|
|
+ RebaseTodoCommand.EXEC: "x",
|
|
|
+ RebaseTodoCommand.BREAK: "b",
|
|
|
+ RebaseTodoCommand.DROP: "d",
|
|
|
+ RebaseTodoCommand.LABEL: "l",
|
|
|
+ RebaseTodoCommand.RESET: "t",
|
|
|
+ RebaseTodoCommand.MERGE: "m",
|
|
|
+ }
|
|
|
+ cmd = cmd_map.get(self.command, self.command.value)
|
|
|
+ else:
|
|
|
+ cmd = self.command.value
|
|
|
+
|
|
|
+ parts = [cmd]
|
|
|
+
|
|
|
+ if self.commit_sha:
|
|
|
+ # Use short SHA (first 7 chars) like Git does
|
|
|
+ parts.append(self.commit_sha.decode()[:7])
|
|
|
+
|
|
|
+ if self.arguments:
|
|
|
+ parts.append(self.arguments)
|
|
|
+ elif self.short_message:
|
|
|
+ parts.append(self.short_message)
|
|
|
+
|
|
|
+ return " ".join(parts)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def from_string(cls, line: str) -> Optional["RebaseTodoEntry"]:
|
|
|
+ """Parse a todo entry from a line.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ line: Line from git-rebase-todo file
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ RebaseTodoEntry or None if line is empty/comment
|
|
|
+ """
|
|
|
+ line = line.strip()
|
|
|
+
|
|
|
+ # Skip empty lines and comments
|
|
|
+ if not line or line.startswith("#"):
|
|
|
+ return None
|
|
|
+
|
|
|
+ parts = line.split(None, 2)
|
|
|
+ if not parts:
|
|
|
+ return None
|
|
|
+
|
|
|
+ command_str = parts[0]
|
|
|
+ try:
|
|
|
+ command = RebaseTodoCommand.from_string(command_str)
|
|
|
+ except ValueError:
|
|
|
+ # Unknown command, skip
|
|
|
+ return None
|
|
|
+
|
|
|
+ commit_sha = None
|
|
|
+ short_message = None
|
|
|
+ arguments = None
|
|
|
+
|
|
|
+ if command in (
|
|
|
+ RebaseTodoCommand.EXEC,
|
|
|
+ RebaseTodoCommand.LABEL,
|
|
|
+ RebaseTodoCommand.RESET,
|
|
|
+ ):
|
|
|
+ # These commands take arguments instead of commit SHA
|
|
|
+ if len(parts) > 1:
|
|
|
+ arguments = " ".join(parts[1:])
|
|
|
+ elif command == RebaseTodoCommand.BREAK:
|
|
|
+ # Break has no arguments
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ # Commands that operate on commits
|
|
|
+ if len(parts) > 1:
|
|
|
+ # Store SHA as hex string encoded as bytes
|
|
|
+ commit_sha = parts[1].encode()
|
|
|
+
|
|
|
+ # Parse commit message if present
|
|
|
+ if len(parts) > 2:
|
|
|
+ short_message = parts[2]
|
|
|
+
|
|
|
+ return cls(
|
|
|
+ command=command,
|
|
|
+ commit_sha=commit_sha,
|
|
|
+ short_message=short_message,
|
|
|
+ arguments=arguments,
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+class RebaseTodo:
|
|
|
+ """Manages the git-rebase-todo file for interactive rebase."""
|
|
|
+
|
|
|
+ def __init__(self, entries: Optional[list[RebaseTodoEntry]] = None):
|
|
|
+ """Initialize RebaseTodo.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ entries: List of todo entries
|
|
|
+ """
|
|
|
+ self.entries = entries or []
|
|
|
+ self.current_index = 0
|
|
|
+
|
|
|
+ def add_entry(self, entry: RebaseTodoEntry) -> None:
|
|
|
+ """Add an entry to the todo list."""
|
|
|
+ self.entries.append(entry)
|
|
|
+
|
|
|
+ def get_current(self) -> Optional[RebaseTodoEntry]:
|
|
|
+ """Get the current todo entry."""
|
|
|
+ if self.current_index < len(self.entries):
|
|
|
+ return self.entries[self.current_index]
|
|
|
+ return None
|
|
|
+
|
|
|
+ def advance(self) -> None:
|
|
|
+ """Move to the next todo entry."""
|
|
|
+ self.current_index += 1
|
|
|
+
|
|
|
+ def is_complete(self) -> bool:
|
|
|
+ """Check if all entries have been processed."""
|
|
|
+ return self.current_index >= len(self.entries)
|
|
|
+
|
|
|
+ def to_string(self, include_comments: bool = True) -> str:
|
|
|
+ """Convert to git-rebase-todo file format.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ include_comments: Include helpful comments
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ String content for todo file
|
|
|
+ """
|
|
|
+ lines = []
|
|
|
+
|
|
|
+ # Add entries from current position onward
|
|
|
+ for entry in self.entries[self.current_index :]:
|
|
|
+ lines.append(entry.to_string())
|
|
|
+
|
|
|
+ if include_comments:
|
|
|
+ lines.append("")
|
|
|
+ lines.append("# Rebase in progress")
|
|
|
+ lines.append("#")
|
|
|
+ lines.append("# Commands:")
|
|
|
+ lines.append("# p, pick <commit> = use commit")
|
|
|
+ lines.append(
|
|
|
+ "# r, reword <commit> = use commit, but edit the commit message"
|
|
|
+ )
|
|
|
+ lines.append("# e, edit <commit> = use commit, but stop for amending")
|
|
|
+ lines.append(
|
|
|
+ "# s, squash <commit> = use commit, but meld into previous commit"
|
|
|
+ )
|
|
|
+ lines.append(
|
|
|
+ "# f, fixup [-C | -c] <commit> = like 'squash' but keep only the previous"
|
|
|
+ )
|
|
|
+ lines.append(
|
|
|
+ "# commit's log message, unless -C is used, in which case"
|
|
|
+ )
|
|
|
+ lines.append(
|
|
|
+ "# keep only this commit's message; -c is same as -C but"
|
|
|
+ )
|
|
|
+ lines.append("# opens the editor")
|
|
|
+ lines.append(
|
|
|
+ "# x, exec <command> = run command (the rest of the line) using shell"
|
|
|
+ )
|
|
|
+ lines.append(
|
|
|
+ "# b, break = stop here (continue rebase later with 'git rebase --continue')"
|
|
|
+ )
|
|
|
+ lines.append("# d, drop <commit> = remove commit")
|
|
|
+ lines.append("# l, label <label> = label current HEAD with a name")
|
|
|
+ lines.append("# t, reset <label> = reset HEAD to a label")
|
|
|
+ lines.append("# m, merge [-C <commit> | -c <commit>] <label> [# <oneline>]")
|
|
|
+ lines.append(
|
|
|
+ "# . create a merge commit using the original merge commit's"
|
|
|
+ )
|
|
|
+ lines.append(
|
|
|
+ "# . message (or the oneline, if no original merge commit was"
|
|
|
+ )
|
|
|
+ lines.append(
|
|
|
+ "# . specified); use -c <commit> to reword the commit message"
|
|
|
+ )
|
|
|
+ lines.append("#")
|
|
|
+ lines.append(
|
|
|
+ "# These lines can be re-ordered; they are executed from top to bottom."
|
|
|
+ )
|
|
|
+ lines.append("#")
|
|
|
+ lines.append("# If you remove a line here THAT COMMIT WILL BE LOST.")
|
|
|
+ lines.append("#")
|
|
|
+ lines.append(
|
|
|
+ "# However, if you remove everything, the rebase will be aborted."
|
|
|
+ )
|
|
|
+ lines.append("#")
|
|
|
+
|
|
|
+ return "\n".join(lines)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def from_string(cls, content: str) -> "RebaseTodo":
|
|
|
+ """Parse a git-rebase-todo file.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ content: Content of todo file
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ RebaseTodo instance
|
|
|
+ """
|
|
|
+ entries = []
|
|
|
+ for line in content.splitlines():
|
|
|
+ entry = RebaseTodoEntry.from_string(line)
|
|
|
+ if entry:
|
|
|
+ entries.append(entry)
|
|
|
+
|
|
|
+ return cls(entries)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def from_commits(cls, commits: list[Commit]) -> "RebaseTodo":
|
|
|
+ """Create a todo list from a list of commits.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ commits: List of commits to rebase (in chronological order)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ RebaseTodo instance with pick commands for each commit
|
|
|
+ """
|
|
|
+ entries = []
|
|
|
+ for commit in commits:
|
|
|
+ # Extract first line of commit message
|
|
|
+ message = commit.message.decode("utf-8", errors="replace")
|
|
|
+ short_message = message.split("\n")[0][:50]
|
|
|
+
|
|
|
+ entry = RebaseTodoEntry(
|
|
|
+ command=RebaseTodoCommand.PICK,
|
|
|
+ commit_sha=commit.id, # Already bytes
|
|
|
+ short_message=short_message,
|
|
|
+ )
|
|
|
+ entries.append(entry)
|
|
|
+
|
|
|
+ return cls(entries)
|
|
|
+
|
|
|
+
|
|
|
class RebaseStateManager(Protocol):
|
|
|
"""Protocol for managing rebase state."""
|
|
|
|
|
|
@@ -82,6 +395,14 @@ class RebaseStateManager(Protocol):
|
|
|
"""Check if rebase state exists."""
|
|
|
...
|
|
|
|
|
|
+ def save_todo(self, todo: RebaseTodo) -> None:
|
|
|
+ """Save interactive rebase todo list."""
|
|
|
+ ...
|
|
|
+
|
|
|
+ def load_todo(self) -> Optional[RebaseTodo]:
|
|
|
+ """Load interactive rebase todo list."""
|
|
|
+ ...
|
|
|
+
|
|
|
|
|
|
class DiskRebaseStateManager:
|
|
|
"""Manages rebase state on disk using same files as C Git."""
|
|
|
@@ -103,8 +424,6 @@ class DiskRebaseStateManager:
|
|
|
done: list[Commit],
|
|
|
) -> None:
|
|
|
"""Save rebase state to disk."""
|
|
|
- import os
|
|
|
-
|
|
|
# Ensure the directory exists
|
|
|
os.makedirs(self.path, exist_ok=True)
|
|
|
|
|
|
@@ -132,12 +451,8 @@ class DiskRebaseStateManager:
|
|
|
self._write_file("msgnum", str(msgnum).encode())
|
|
|
self._write_file("end", str(end).encode())
|
|
|
|
|
|
- # TODO: Add support for writing git-rebase-todo for interactive rebase
|
|
|
-
|
|
|
def _write_file(self, name: str, content: bytes) -> None:
|
|
|
"""Write content to a file in the rebase directory."""
|
|
|
- import os
|
|
|
-
|
|
|
with open(os.path.join(self.path, name), "wb") as f:
|
|
|
f.write(content)
|
|
|
|
|
|
@@ -162,14 +477,10 @@ class DiskRebaseStateManager:
|
|
|
rebasing_branch = self._read_file("head-name")
|
|
|
onto = self._read_file("onto")
|
|
|
|
|
|
- # TODO: Load todo list and done list for resuming rebase
|
|
|
-
|
|
|
return original_head, rebasing_branch, onto, todo, done
|
|
|
|
|
|
def _read_file(self, name: str) -> Optional[bytes]:
|
|
|
"""Read content from a file in the rebase directory."""
|
|
|
- import os
|
|
|
-
|
|
|
try:
|
|
|
with open(os.path.join(self.path, name), "rb") as f:
|
|
|
return f.read().strip()
|
|
|
@@ -178,8 +489,6 @@ class DiskRebaseStateManager:
|
|
|
|
|
|
def clean(self) -> None:
|
|
|
"""Clean up rebase state files."""
|
|
|
- import shutil
|
|
|
-
|
|
|
try:
|
|
|
shutil.rmtree(self.path)
|
|
|
except FileNotFoundError:
|
|
|
@@ -188,10 +497,29 @@ class DiskRebaseStateManager:
|
|
|
|
|
|
def exists(self) -> bool:
|
|
|
"""Check if rebase state exists."""
|
|
|
- import os
|
|
|
-
|
|
|
return os.path.exists(os.path.join(self.path, "orig-head"))
|
|
|
|
|
|
+ def save_todo(self, todo: RebaseTodo) -> None:
|
|
|
+ """Save the interactive rebase todo list.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ todo: The RebaseTodo object to save
|
|
|
+ """
|
|
|
+ todo_content = todo.to_string()
|
|
|
+ self._write_file("git-rebase-todo", todo_content.encode("utf-8"))
|
|
|
+
|
|
|
+ def load_todo(self) -> Optional[RebaseTodo]:
|
|
|
+ """Load the interactive rebase todo list.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ RebaseTodo object or None if no todo file exists
|
|
|
+ """
|
|
|
+ todo_content = self._read_file("git-rebase-todo")
|
|
|
+ if todo_content:
|
|
|
+ todo_str = todo_content.decode("utf-8", errors="replace")
|
|
|
+ return RebaseTodo.from_string(todo_str)
|
|
|
+ return None
|
|
|
+
|
|
|
|
|
|
class MemoryRebaseStateManager:
|
|
|
"""Manages rebase state in memory for MemoryRepo."""
|
|
|
@@ -199,6 +527,7 @@ class MemoryRebaseStateManager:
|
|
|
def __init__(self, repo: Repo) -> None:
|
|
|
self.repo = repo
|
|
|
self._state: Optional[dict] = None
|
|
|
+ self._todo: Optional[RebaseTodo] = None
|
|
|
|
|
|
def save(
|
|
|
self,
|
|
|
@@ -241,11 +570,28 @@ class MemoryRebaseStateManager:
|
|
|
def clean(self) -> None:
|
|
|
"""Clean up rebase state."""
|
|
|
self._state = None
|
|
|
+ self._todo = None
|
|
|
|
|
|
def exists(self) -> bool:
|
|
|
"""Check if rebase state exists."""
|
|
|
return self._state is not None
|
|
|
|
|
|
+ def save_todo(self, todo: RebaseTodo) -> None:
|
|
|
+ """Save the interactive rebase todo list.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ todo: The RebaseTodo object to save
|
|
|
+ """
|
|
|
+ self._todo = todo
|
|
|
+
|
|
|
+ def load_todo(self) -> Optional[RebaseTodo]:
|
|
|
+ """Load the interactive rebase todo list.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ RebaseTodo object or None if no todo exists
|
|
|
+ """
|
|
|
+ return self._todo
|
|
|
+
|
|
|
|
|
|
class Rebaser:
|
|
|
"""Handles git rebase operations."""
|
|
|
@@ -487,7 +833,7 @@ class Rebaser:
|
|
|
last_commit = self._done[-1]
|
|
|
|
|
|
# Update the branch we're rebasing
|
|
|
- if hasattr(self, "_rebasing_branch") and self._rebasing_branch:
|
|
|
+ if self._rebasing_branch:
|
|
|
self.repo.refs[self._rebasing_branch] = last_commit.id
|
|
|
# If HEAD was pointing to this branch, it will follow automatically
|
|
|
else:
|
|
|
@@ -567,3 +913,326 @@ def rebase(
|
|
|
|
|
|
# Return the SHAs of the rebased commits
|
|
|
return [c.id for c in rebaser._done]
|
|
|
+
|
|
|
+
|
|
|
+def start_interactive(
|
|
|
+ repo: Repo,
|
|
|
+ upstream: bytes,
|
|
|
+ onto: Optional[bytes] = None,
|
|
|
+ branch: Optional[bytes] = None,
|
|
|
+ editor_callback=None,
|
|
|
+) -> RebaseTodo:
|
|
|
+ """Start an interactive rebase.
|
|
|
+
|
|
|
+ This function generates a todo list and optionally opens an editor for the user
|
|
|
+ to modify it before starting the rebase.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ repo: Repository to rebase in
|
|
|
+ upstream: Upstream branch/commit to rebase onto
|
|
|
+ onto: Specific commit to rebase onto (defaults to upstream)
|
|
|
+ branch: Branch to rebase (defaults to current branch)
|
|
|
+ editor_callback: Optional callback to edit todo content. If None, no editing.
|
|
|
+ Should take bytes and return bytes.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ RebaseTodo object with the (possibly edited) todo list
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ RebaseError: If rebase cannot be started
|
|
|
+ """
|
|
|
+ rebaser = Rebaser(repo)
|
|
|
+
|
|
|
+ # Get commits to rebase
|
|
|
+ commits = rebaser.start(upstream, onto, branch)
|
|
|
+
|
|
|
+ if not commits:
|
|
|
+ raise RebaseError("No commits to rebase")
|
|
|
+
|
|
|
+ # Generate todo list
|
|
|
+ todo = RebaseTodo.from_commits(commits)
|
|
|
+
|
|
|
+ # Save initial todo to disk
|
|
|
+ state_manager = repo.get_rebase_state_manager()
|
|
|
+ state_manager.save_todo(todo)
|
|
|
+
|
|
|
+ # Let user edit todo if callback provided
|
|
|
+ if editor_callback:
|
|
|
+ todo_content = todo.to_string().encode("utf-8")
|
|
|
+ edited_content = editor_callback(todo_content)
|
|
|
+
|
|
|
+ # Parse edited todo
|
|
|
+ edited_todo = RebaseTodo.from_string(
|
|
|
+ edited_content.decode("utf-8", errors="replace")
|
|
|
+ )
|
|
|
+
|
|
|
+ # Check if user removed all entries (abort)
|
|
|
+ if not edited_todo.entries:
|
|
|
+ # User removed everything, abort
|
|
|
+ rebaser.abort()
|
|
|
+ raise RebaseAbort("Rebase aborted - empty todo list")
|
|
|
+
|
|
|
+ todo = edited_todo
|
|
|
+
|
|
|
+ # Save edited todo
|
|
|
+ state_manager.save_todo(todo)
|
|
|
+
|
|
|
+ return todo
|
|
|
+
|
|
|
+
|
|
|
+def edit_todo(repo: Repo, editor_callback) -> RebaseTodo:
|
|
|
+ """Edit the todo list of an in-progress interactive rebase.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ repo: Repository with in-progress rebase
|
|
|
+ editor_callback: Callback to edit todo content. Takes bytes, returns bytes.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Updated RebaseTodo object
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ RebaseError: If no rebase is in progress or todo cannot be loaded
|
|
|
+ """
|
|
|
+ state_manager = repo.get_rebase_state_manager()
|
|
|
+
|
|
|
+ if not state_manager.exists():
|
|
|
+ raise RebaseError("No rebase in progress")
|
|
|
+
|
|
|
+ # Load current todo
|
|
|
+ todo = state_manager.load_todo()
|
|
|
+ if not todo:
|
|
|
+ raise RebaseError("No interactive rebase in progress")
|
|
|
+
|
|
|
+ # Edit todo
|
|
|
+ todo_content = todo.to_string().encode("utf-8")
|
|
|
+ edited_content = editor_callback(todo_content)
|
|
|
+
|
|
|
+ # Parse edited todo
|
|
|
+ edited_todo = RebaseTodo.from_string(
|
|
|
+ edited_content.decode("utf-8", errors="replace")
|
|
|
+ )
|
|
|
+
|
|
|
+ # Save edited todo
|
|
|
+ state_manager.save_todo(edited_todo)
|
|
|
+
|
|
|
+ return edited_todo
|
|
|
+
|
|
|
+
|
|
|
+def process_interactive_rebase(
|
|
|
+ repo: Repo,
|
|
|
+ todo: Optional[RebaseTodo] = None,
|
|
|
+ editor_callback=None,
|
|
|
+) -> tuple[bool, Optional[str]]:
|
|
|
+ """Process an interactive rebase.
|
|
|
+
|
|
|
+ This function executes the commands in the todo list sequentially.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ repo: Repository to rebase in
|
|
|
+ todo: RebaseTodo object (if None, loads from state)
|
|
|
+ editor_callback: Optional callback for reword operations
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Tuple of (is_complete, pause_reason)
|
|
|
+ - is_complete: True if rebase is complete, False if paused
|
|
|
+ - pause_reason: Reason for pause (e.g., "edit", "conflict", "break") or None
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ RebaseError: If rebase fails
|
|
|
+ """
|
|
|
+ state_manager = repo.get_rebase_state_manager()
|
|
|
+ rebaser = Rebaser(repo)
|
|
|
+
|
|
|
+ # Load todo if not provided
|
|
|
+ if todo is None:
|
|
|
+ todo = state_manager.load_todo()
|
|
|
+ if not todo:
|
|
|
+ raise RebaseError("No interactive rebase in progress")
|
|
|
+
|
|
|
+ # Process each todo entry
|
|
|
+ while not todo.is_complete():
|
|
|
+ entry = todo.get_current()
|
|
|
+ if not entry:
|
|
|
+ break
|
|
|
+
|
|
|
+ # Handle each command type
|
|
|
+ if entry.command == RebaseTodoCommand.PICK:
|
|
|
+ # Regular cherry-pick
|
|
|
+ result = rebaser.continue_()
|
|
|
+ if result is not None:
|
|
|
+ # Conflicts
|
|
|
+ return False, "conflict"
|
|
|
+
|
|
|
+ elif entry.command == RebaseTodoCommand.REWORD:
|
|
|
+ # Cherry-pick then edit message
|
|
|
+ result = rebaser.continue_()
|
|
|
+ if result is not None:
|
|
|
+ # Conflicts
|
|
|
+ return False, "conflict"
|
|
|
+
|
|
|
+ # Get the last commit and allow editing its message
|
|
|
+ if rebaser._done and editor_callback:
|
|
|
+ last_commit = rebaser._done[-1]
|
|
|
+ new_message = editor_callback(last_commit.message)
|
|
|
+
|
|
|
+ # Create new commit with edited message
|
|
|
+ new_commit = Commit()
|
|
|
+ new_commit.tree = last_commit.tree
|
|
|
+ new_commit.parents = last_commit.parents
|
|
|
+ new_commit.author = last_commit.author
|
|
|
+ new_commit.author_time = last_commit.author_time
|
|
|
+ new_commit.author_timezone = last_commit.author_timezone
|
|
|
+ new_commit.committer = last_commit.committer
|
|
|
+ new_commit.commit_time = last_commit.commit_time
|
|
|
+ new_commit.commit_timezone = last_commit.commit_timezone
|
|
|
+ new_commit.message = new_message
|
|
|
+ new_commit.encoding = last_commit.encoding
|
|
|
+
|
|
|
+ repo.object_store.add_object(new_commit)
|
|
|
+
|
|
|
+ # Replace last commit in done list
|
|
|
+ rebaser._done[-1] = new_commit
|
|
|
+
|
|
|
+ elif entry.command == RebaseTodoCommand.EDIT:
|
|
|
+ # Cherry-pick then pause
|
|
|
+ result = rebaser.continue_()
|
|
|
+ if result is not None:
|
|
|
+ # Conflicts
|
|
|
+ return False, "conflict"
|
|
|
+
|
|
|
+ # Pause for user to amend
|
|
|
+ todo.advance()
|
|
|
+ state_manager.save_todo(todo)
|
|
|
+ return False, "edit"
|
|
|
+
|
|
|
+ elif entry.command == RebaseTodoCommand.SQUASH:
|
|
|
+ # Combine with previous commit, keeping both messages
|
|
|
+ if not rebaser._done:
|
|
|
+ raise RebaseError("Cannot squash without a previous commit")
|
|
|
+
|
|
|
+ conflict_result = _squash_commits(
|
|
|
+ repo, rebaser, entry, keep_message=True, editor_callback=editor_callback
|
|
|
+ )
|
|
|
+ if conflict_result == "conflict":
|
|
|
+ return False, "conflict"
|
|
|
+
|
|
|
+ elif entry.command == RebaseTodoCommand.FIXUP:
|
|
|
+ # Combine with previous commit, discarding this message
|
|
|
+ if not rebaser._done:
|
|
|
+ raise RebaseError("Cannot fixup without a previous commit")
|
|
|
+
|
|
|
+ conflict_result = _squash_commits(
|
|
|
+ repo, rebaser, entry, keep_message=False, editor_callback=None
|
|
|
+ )
|
|
|
+ if conflict_result == "conflict":
|
|
|
+ return False, "conflict"
|
|
|
+
|
|
|
+ elif entry.command == RebaseTodoCommand.DROP:
|
|
|
+ # Skip this commit
|
|
|
+ if rebaser._todo:
|
|
|
+ rebaser._todo.pop(0)
|
|
|
+
|
|
|
+ elif entry.command == RebaseTodoCommand.EXEC:
|
|
|
+ # Execute shell command
|
|
|
+ if entry.arguments:
|
|
|
+ try:
|
|
|
+ subprocess.run(entry.arguments, shell=True, check=True)
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
+ # Command failed, pause rebase
|
|
|
+ return False, f"exec failed: {e}"
|
|
|
+
|
|
|
+ elif entry.command == RebaseTodoCommand.BREAK:
|
|
|
+ # Pause rebase
|
|
|
+ todo.advance()
|
|
|
+ state_manager.save_todo(todo)
|
|
|
+ return False, "break"
|
|
|
+
|
|
|
+ else:
|
|
|
+ # Unsupported command
|
|
|
+ raise RebaseError(f"Unsupported rebase command: {entry.command.value}")
|
|
|
+
|
|
|
+ # Move to next entry
|
|
|
+ todo.advance()
|
|
|
+
|
|
|
+ # Save progress
|
|
|
+ state_manager.save_todo(todo)
|
|
|
+ rebaser._save_rebase_state()
|
|
|
+
|
|
|
+ # Rebase complete
|
|
|
+ rebaser._finish_rebase()
|
|
|
+ return True, None
|
|
|
+
|
|
|
+
|
|
|
+def _squash_commits(
|
|
|
+ repo: Repo,
|
|
|
+ rebaser: Rebaser,
|
|
|
+ entry: RebaseTodoEntry,
|
|
|
+ keep_message: bool,
|
|
|
+ editor_callback=None,
|
|
|
+) -> Optional[str]:
|
|
|
+ """Helper to squash/fixup commits.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ repo: Repository
|
|
|
+ rebaser: Rebaser instance
|
|
|
+ entry: Todo entry for the commit to squash
|
|
|
+ keep_message: Whether to keep this commit's message (squash) or discard (fixup)
|
|
|
+ editor_callback: Optional callback to edit combined message (for squash)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ None on success, "conflict" on conflict
|
|
|
+ """
|
|
|
+ if not rebaser._done:
|
|
|
+ raise RebaseError("Cannot squash without a previous commit")
|
|
|
+
|
|
|
+ # Get the commit to squash
|
|
|
+ if not entry.commit_sha:
|
|
|
+ raise RebaseError("No commit SHA for squash/fixup operation")
|
|
|
+ commit_to_squash = repo[entry.commit_sha]
|
|
|
+
|
|
|
+ # Get the previous commit (target of squash)
|
|
|
+ previous_commit = rebaser._done[-1]
|
|
|
+
|
|
|
+ # Cherry-pick the changes onto the previous commit
|
|
|
+ parent = repo[commit_to_squash.parents[0]]
|
|
|
+
|
|
|
+ # Perform three-way merge for the tree
|
|
|
+ merged_tree, conflicts = three_way_merge(
|
|
|
+ repo.object_store, parent, previous_commit, commit_to_squash
|
|
|
+ )
|
|
|
+
|
|
|
+ if conflicts:
|
|
|
+ return "conflict"
|
|
|
+
|
|
|
+ # Combine messages if squashing (not fixup)
|
|
|
+ if keep_message:
|
|
|
+ combined_message = previous_commit.message + b"\n\n" + commit_to_squash.message
|
|
|
+ if editor_callback:
|
|
|
+ combined_message = editor_callback(combined_message)
|
|
|
+ else:
|
|
|
+ combined_message = previous_commit.message
|
|
|
+
|
|
|
+ # Create new combined commit
|
|
|
+ new_commit = Commit()
|
|
|
+ new_commit.tree = merged_tree.id
|
|
|
+ new_commit.parents = previous_commit.parents
|
|
|
+ new_commit.author = previous_commit.author
|
|
|
+ new_commit.author_time = previous_commit.author_time
|
|
|
+ new_commit.author_timezone = previous_commit.author_timezone
|
|
|
+ new_commit.committer = commit_to_squash.committer
|
|
|
+ new_commit.commit_time = commit_to_squash.commit_time
|
|
|
+ new_commit.commit_timezone = commit_to_squash.commit_timezone
|
|
|
+ new_commit.message = combined_message
|
|
|
+ new_commit.encoding = previous_commit.encoding
|
|
|
+
|
|
|
+ repo.object_store.add_object(merged_tree)
|
|
|
+ repo.object_store.add_object(new_commit)
|
|
|
+
|
|
|
+ # Replace the previous commit with the combined one
|
|
|
+ rebaser._done[-1] = new_commit
|
|
|
+
|
|
|
+ # Remove the squashed commit from todo
|
|
|
+ if rebaser._todo and rebaser._todo[0].id == commit_to_squash.id:
|
|
|
+ rebaser._todo.pop(0)
|
|
|
+
|
|
|
+ return None
|