|
|
@@ -374,7 +374,6 @@ from ..protocol import ZERO_SHA, Protocol
|
|
|
from ..refs import (
|
|
|
HEADREF,
|
|
|
LOCAL_BRANCH_PREFIX,
|
|
|
- LOCAL_NOTES_PREFIX,
|
|
|
LOCAL_REMOTE_PREFIX,
|
|
|
LOCAL_REPLACE_PREFIX,
|
|
|
LOCAL_TAG_PREFIX,
|
|
|
@@ -385,7 +384,6 @@ from ..refs import (
|
|
|
filter_ref_prefix,
|
|
|
local_branch_name,
|
|
|
local_replace_name,
|
|
|
- local_tag_name,
|
|
|
parse_remote_ref,
|
|
|
shorten_ref_name,
|
|
|
)
|
|
|
@@ -417,12 +415,19 @@ from .lfs import (
|
|
|
lfs_track,
|
|
|
lfs_untrack,
|
|
|
)
|
|
|
+from .notes import (
|
|
|
+ notes_add,
|
|
|
+ notes_list,
|
|
|
+ notes_remove,
|
|
|
+ notes_show,
|
|
|
+)
|
|
|
from .submodule import (
|
|
|
submodule_add,
|
|
|
submodule_init,
|
|
|
submodule_list,
|
|
|
submodule_update,
|
|
|
)
|
|
|
+from .tag import tag_create, tag_delete, tag_list, verify_tag
|
|
|
|
|
|
# Module level tuple definition for status output
|
|
|
GitStatus = namedtuple("GitStatus", "staged unstaged untracked")
|
|
|
@@ -2341,100 +2346,6 @@ def _canonical_part(url: str) -> str:
|
|
|
return name
|
|
|
|
|
|
|
|
|
-def tag_create(
|
|
|
- repo: RepoPath,
|
|
|
- tag: str | bytes,
|
|
|
- author: str | bytes | None = None,
|
|
|
- message: str | bytes | None = None,
|
|
|
- annotated: bool = False,
|
|
|
- objectish: str | bytes = "HEAD",
|
|
|
- tag_time: int | None = None,
|
|
|
- tag_timezone: int | None = None,
|
|
|
- sign: bool | None = None,
|
|
|
- encoding: str = DEFAULT_ENCODING,
|
|
|
-) -> None:
|
|
|
- """Creates a tag in git via dulwich calls.
|
|
|
-
|
|
|
- Args:
|
|
|
- repo: Path to repository
|
|
|
- tag: tag string
|
|
|
- author: tag author (optional, if annotated is set)
|
|
|
- message: tag message (optional)
|
|
|
- annotated: whether to create an annotated tag
|
|
|
- objectish: object the tag should point at, defaults to HEAD
|
|
|
- tag_time: Optional time for annotated tag
|
|
|
- tag_timezone: Optional timezone for annotated tag
|
|
|
- sign: GPG Sign the tag (bool, defaults to False,
|
|
|
- pass True to use default GPG key,
|
|
|
- pass a str containing Key ID to use a specific GPG key)
|
|
|
- encoding: Encoding to use for tag messages
|
|
|
- """
|
|
|
- with open_repo_closing(repo) as r:
|
|
|
- object = parse_object(r, objectish)
|
|
|
-
|
|
|
- if isinstance(tag, str):
|
|
|
- tag = tag.encode(encoding)
|
|
|
-
|
|
|
- if annotated:
|
|
|
- # Create the tag object
|
|
|
- tag_obj = Tag()
|
|
|
- if author is None:
|
|
|
- author = get_user_identity(r.get_config_stack())
|
|
|
- elif isinstance(author, str):
|
|
|
- author = author.encode(encoding)
|
|
|
- else:
|
|
|
- assert isinstance(author, bytes)
|
|
|
- tag_obj.tagger = author
|
|
|
- if isinstance(message, str):
|
|
|
- message = message.encode(encoding)
|
|
|
- elif isinstance(message, bytes):
|
|
|
- pass
|
|
|
- else:
|
|
|
- message = b""
|
|
|
- tag_obj.message = message + "\n".encode(encoding)
|
|
|
- tag_obj.name = tag
|
|
|
- tag_obj.object = (type(object), object.id)
|
|
|
- if tag_time is None:
|
|
|
- tag_time = int(time.time())
|
|
|
- tag_obj.tag_time = tag_time
|
|
|
- if tag_timezone is None:
|
|
|
- tag_timezone = get_user_timezones()[1]
|
|
|
- elif isinstance(tag_timezone, str):
|
|
|
- tag_timezone = parse_timezone(tag_timezone.encode())
|
|
|
- tag_obj.tag_timezone = tag_timezone
|
|
|
-
|
|
|
- # Check if we should sign the tag
|
|
|
- config = r.get_config_stack()
|
|
|
-
|
|
|
- if sign is None:
|
|
|
- # Check tag.gpgSign configuration when sign is not explicitly set
|
|
|
- try:
|
|
|
- should_sign = config.get_boolean(
|
|
|
- (b"tag",), b"gpgsign", default=False
|
|
|
- )
|
|
|
- except KeyError:
|
|
|
- should_sign = False # Default to not signing if no config
|
|
|
- else:
|
|
|
- should_sign = sign
|
|
|
-
|
|
|
- # Get the signing key from config if signing is enabled
|
|
|
- keyid = None
|
|
|
- if should_sign:
|
|
|
- try:
|
|
|
- keyid_bytes = config.get((b"user",), b"signingkey")
|
|
|
- keyid = keyid_bytes.decode() if keyid_bytes else None
|
|
|
- except KeyError:
|
|
|
- keyid = None
|
|
|
- tag_obj.sign(keyid)
|
|
|
-
|
|
|
- r.object_store.add_object(tag_obj)
|
|
|
- tag_id = tag_obj.id
|
|
|
- else:
|
|
|
- tag_id = object.id
|
|
|
-
|
|
|
- r.refs[_make_tag_ref(tag)] = tag_id
|
|
|
-
|
|
|
-
|
|
|
def verify_commit(
|
|
|
repo: RepoPath,
|
|
|
committish: str | bytes = "HEAD",
|
|
|
@@ -2459,209 +2370,6 @@ def verify_commit(
|
|
|
commit.verify(keyids)
|
|
|
|
|
|
|
|
|
-def verify_tag(
|
|
|
- repo: RepoPath,
|
|
|
- tagname: str | bytes,
|
|
|
- keyids: list[str] | None = None,
|
|
|
-) -> None:
|
|
|
- """Verify GPG signature on a tag.
|
|
|
-
|
|
|
- Args:
|
|
|
- repo: Path to repository
|
|
|
- tagname: Name of tag to verify
|
|
|
- keyids: Optional list of trusted key IDs. If provided, the tag
|
|
|
- must be signed by one of these keys. If not provided, just verifies
|
|
|
- that the tag has a valid signature.
|
|
|
-
|
|
|
- Raises:
|
|
|
- gpg.errors.BadSignatures: if GPG signature verification fails
|
|
|
- gpg.errors.MissingSignatures: if tag was not signed by a key
|
|
|
- specified in keyids
|
|
|
- """
|
|
|
- with open_repo_closing(repo) as r:
|
|
|
- if isinstance(tagname, str):
|
|
|
- tagname = tagname.encode()
|
|
|
- tag_ref = _make_tag_ref(tagname)
|
|
|
- tag_id = r.refs[tag_ref]
|
|
|
- tag_obj = r[tag_id]
|
|
|
- if not isinstance(tag_obj, Tag):
|
|
|
- raise Error(f"{tagname!r} does not point to a tag object")
|
|
|
- tag_obj.verify(keyids)
|
|
|
-
|
|
|
-
|
|
|
-def tag_list(repo: RepoPath, outstream: TextIO = sys.stdout) -> list[Ref]:
|
|
|
- """List all tags.
|
|
|
-
|
|
|
- Args:
|
|
|
- repo: Path to repository
|
|
|
- outstream: Stream to write tags to
|
|
|
- """
|
|
|
- with open_repo_closing(repo) as r:
|
|
|
- tags: list[Ref] = sorted(r.refs.as_dict(Ref(b"refs/tags")))
|
|
|
- return tags
|
|
|
-
|
|
|
-
|
|
|
-def tag_delete(repo: RepoPath, name: str | bytes) -> None:
|
|
|
- """Remove a tag.
|
|
|
-
|
|
|
- Args:
|
|
|
- repo: Path to repository
|
|
|
- name: Name of tag to remove
|
|
|
- """
|
|
|
- with open_repo_closing(repo) as r:
|
|
|
- if isinstance(name, bytes):
|
|
|
- names = [name]
|
|
|
- elif isinstance(name, list):
|
|
|
- names = name
|
|
|
- else:
|
|
|
- raise Error(f"Unexpected tag name type {name!r}")
|
|
|
- for name in names:
|
|
|
- del r.refs[_make_tag_ref(name)]
|
|
|
-
|
|
|
-
|
|
|
-def _make_notes_ref(name: bytes) -> bytes:
|
|
|
- """Make a notes ref name."""
|
|
|
- if name.startswith(b"refs/notes/"):
|
|
|
- return name
|
|
|
- return LOCAL_NOTES_PREFIX + name
|
|
|
-
|
|
|
-
|
|
|
-def notes_add(
|
|
|
- repo: RepoPath,
|
|
|
- object_sha: bytes,
|
|
|
- note: bytes,
|
|
|
- ref: bytes = b"commits",
|
|
|
- author: bytes | None = None,
|
|
|
- committer: bytes | None = None,
|
|
|
- message: bytes | None = None,
|
|
|
-) -> bytes:
|
|
|
- """Add or update a note for an object.
|
|
|
-
|
|
|
- Args:
|
|
|
- repo: Path to repository
|
|
|
- object_sha: SHA of the object to annotate
|
|
|
- note: Note content
|
|
|
- ref: Notes ref to use (defaults to "commits" for refs/notes/commits)
|
|
|
- author: Author identity (defaults to committer)
|
|
|
- committer: Committer identity (defaults to config)
|
|
|
- message: Commit message for the notes update
|
|
|
-
|
|
|
- Returns:
|
|
|
- SHA of the new notes commit
|
|
|
- """
|
|
|
- with open_repo_closing(repo) as r:
|
|
|
- # Parse the object to get its SHA
|
|
|
- obj = parse_object(r, object_sha)
|
|
|
- object_sha = obj.id
|
|
|
-
|
|
|
- if isinstance(note, str):
|
|
|
- note = note.encode(DEFAULT_ENCODING)
|
|
|
- if isinstance(ref, str):
|
|
|
- ref = ref.encode(DEFAULT_ENCODING)
|
|
|
-
|
|
|
- notes_ref = _make_notes_ref(ref)
|
|
|
- config = r.get_config_stack()
|
|
|
-
|
|
|
- return r.notes.set_note(
|
|
|
- object_sha,
|
|
|
- note,
|
|
|
- notes_ref,
|
|
|
- author=author,
|
|
|
- committer=committer,
|
|
|
- message=message,
|
|
|
- config=config,
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
-def notes_remove(
|
|
|
- repo: RepoPath,
|
|
|
- object_sha: bytes,
|
|
|
- ref: bytes = b"commits",
|
|
|
- author: bytes | None = None,
|
|
|
- committer: bytes | None = None,
|
|
|
- message: bytes | None = None,
|
|
|
-) -> bytes | None:
|
|
|
- """Remove a note for an object.
|
|
|
-
|
|
|
- Args:
|
|
|
- repo: Path to repository
|
|
|
- object_sha: SHA of the object to remove notes from
|
|
|
- ref: Notes ref to use (defaults to "commits" for refs/notes/commits)
|
|
|
- author: Author identity (defaults to committer)
|
|
|
- committer: Committer identity (defaults to config)
|
|
|
- message: Commit message for the notes removal
|
|
|
-
|
|
|
- Returns:
|
|
|
- SHA of the new notes commit, or None if no note existed
|
|
|
- """
|
|
|
- with open_repo_closing(repo) as r:
|
|
|
- # Parse the object to get its SHA
|
|
|
- obj = parse_object(r, object_sha)
|
|
|
- object_sha = obj.id
|
|
|
-
|
|
|
- if isinstance(ref, str):
|
|
|
- ref = ref.encode(DEFAULT_ENCODING)
|
|
|
-
|
|
|
- notes_ref = _make_notes_ref(ref)
|
|
|
- config = r.get_config_stack()
|
|
|
-
|
|
|
- return r.notes.remove_note(
|
|
|
- object_sha,
|
|
|
- notes_ref,
|
|
|
- author=author,
|
|
|
- committer=committer,
|
|
|
- message=message,
|
|
|
- config=config,
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
-def notes_show(
|
|
|
- repo: str | os.PathLike[str] | Repo, object_sha: bytes, ref: bytes = b"commits"
|
|
|
-) -> bytes | None:
|
|
|
- """Show the note for an object.
|
|
|
-
|
|
|
- Args:
|
|
|
- repo: Path to repository
|
|
|
- object_sha: SHA of the object
|
|
|
- ref: Notes ref to use (defaults to "commits" for refs/notes/commits)
|
|
|
-
|
|
|
- Returns:
|
|
|
- Note content as bytes, or None if no note exists
|
|
|
- """
|
|
|
- with open_repo_closing(repo) as r:
|
|
|
- # Parse the object to get its SHA
|
|
|
- obj = parse_object(r, object_sha)
|
|
|
- object_sha = obj.id
|
|
|
-
|
|
|
- if isinstance(ref, str):
|
|
|
- ref = ref.encode(DEFAULT_ENCODING)
|
|
|
-
|
|
|
- notes_ref = _make_notes_ref(ref)
|
|
|
- config = r.get_config_stack()
|
|
|
-
|
|
|
- return r.notes.get_note(object_sha, notes_ref, config=config)
|
|
|
-
|
|
|
-
|
|
|
-def notes_list(repo: RepoPath, ref: bytes = b"commits") -> list[tuple[ObjectID, bytes]]:
|
|
|
- """List all notes in a notes ref.
|
|
|
-
|
|
|
- Args:
|
|
|
- repo: Path to repository
|
|
|
- ref: Notes ref to use (defaults to "commits" for refs/notes/commits)
|
|
|
-
|
|
|
- Returns:
|
|
|
- List of tuples of (object_sha, note_content)
|
|
|
- """
|
|
|
- with open_repo_closing(repo) as r:
|
|
|
- if isinstance(ref, str):
|
|
|
- ref = ref.encode(DEFAULT_ENCODING)
|
|
|
-
|
|
|
- notes_ref = _make_notes_ref(ref)
|
|
|
- config = r.get_config_stack()
|
|
|
-
|
|
|
- return r.notes.list_notes(notes_ref, config=config)
|
|
|
-
|
|
|
-
|
|
|
def replace_list(repo: RepoPath) -> list[tuple[ObjectID, ObjectID]]:
|
|
|
"""List all replacement refs.
|
|
|
|
|
|
@@ -3797,12 +3505,6 @@ def _make_branch_ref(name: str | bytes) -> Ref:
|
|
|
return local_branch_name(name)
|
|
|
|
|
|
|
|
|
-def _make_tag_ref(name: str | bytes) -> Ref:
|
|
|
- if isinstance(name, str):
|
|
|
- name = name.encode(DEFAULT_ENCODING)
|
|
|
- return local_tag_name(name)
|
|
|
-
|
|
|
-
|
|
|
def _make_replace_ref(name: str | bytes | ObjectID) -> Ref:
|
|
|
if isinstance(name, str):
|
|
|
name = name.encode(DEFAULT_ENCODING)
|
|
|
@@ -4588,7 +4290,7 @@ def show_branch(
|
|
|
for ref_name, sha in sorted_branches:
|
|
|
try:
|
|
|
commit = r[sha]
|
|
|
- if hasattr(commit, "message"):
|
|
|
+ if isinstance(commit, Commit):
|
|
|
message = commit.message.decode("utf-8", errors="replace").split(
|
|
|
"\n"
|
|
|
)[0]
|
|
|
@@ -4625,14 +4327,18 @@ def show_branch(
|
|
|
|
|
|
try:
|
|
|
commit = r[sha]
|
|
|
- if not hasattr(commit, "commit_time"):
|
|
|
+ except KeyError:
|
|
|
+ # Commit not found, stop traversal
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ if not isinstance(commit, Commit):
|
|
|
return
|
|
|
|
|
|
timestamp = commit.commit_time
|
|
|
- parents = commit.parents if hasattr(commit, "parents") else []
|
|
|
+ parents = commit.parents if isinstance(commit, Commit) else []
|
|
|
message = (
|
|
|
commit.message.decode("utf-8", errors="replace").split("\n")[0]
|
|
|
- if hasattr(commit, "message")
|
|
|
+ if isinstance(commit, Commit)
|
|
|
else ""
|
|
|
)
|
|
|
|
|
|
@@ -4642,9 +4348,6 @@ def show_branch(
|
|
|
# Recurse to parents
|
|
|
for parent in parents:
|
|
|
collect_commits(parent, branch_idx, visited)
|
|
|
- except KeyError:
|
|
|
- # Commit not found, stop traversal
|
|
|
- pass
|
|
|
|
|
|
# Collect commits from all branches
|
|
|
for i, (_, sha) in enumerate(sorted_branches):
|