|
|
@@ -61,8 +61,7 @@ __all__ = [
|
|
|
"object_class",
|
|
|
"object_header",
|
|
|
"parse_commit",
|
|
|
- "parse_time_entry",
|
|
|
- "parse_timezone",
|
|
|
+ "parse_commit_broken",
|
|
|
"parse_tree",
|
|
|
"pretty_format_tree_entry",
|
|
|
"serializable_property",
|
|
|
@@ -75,6 +74,7 @@ __all__ = [
|
|
|
import binascii
|
|
|
import os
|
|
|
import posixpath
|
|
|
+import re
|
|
|
import stat
|
|
|
import sys
|
|
|
import zlib
|
|
|
@@ -134,6 +134,12 @@ _TAGGER_HEADER = b"tagger"
|
|
|
|
|
|
S_IFGITLINK = 0o160000
|
|
|
|
|
|
+# Intentionally flexible regex to support various types of brokenness
|
|
|
+# in commit/tag author/committer/tagger lines
|
|
|
+_TIME_ENTRY_RE = re.compile(
|
|
|
+ b"^(?P<person>.*) (?P<time>-?[0-9]+) (?P<timezone>[+-]{0,2}[0-9]+)$"
|
|
|
+)
|
|
|
+
|
|
|
|
|
|
MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max
|
|
|
|
|
|
@@ -1730,6 +1736,41 @@ def parse_timezone(text: bytes) -> tuple[int, bool]:
|
|
|
)
|
|
|
|
|
|
|
|
|
+def parse_timezone_broken(text: bytes) -> tuple[int, bool]:
|
|
|
+ """Parse a timezone text fragment, accepting broken formats.
|
|
|
+
|
|
|
+ This function handles various broken timezone formats found in the wild:
|
|
|
+ - Missing sign prefix (e.g., '0000' instead of '+0000')
|
|
|
+ - Double negative (e.g., '--700')
|
|
|
+
|
|
|
+ Args:
|
|
|
+ text: Text to parse.
|
|
|
+ Returns: Tuple with timezone as seconds difference to UTC
|
|
|
+ and a boolean indicating whether this was a UTC timezone
|
|
|
+ prefixed with a negative sign (-0000).
|
|
|
+ """
|
|
|
+ if text[0] not in b"+-":
|
|
|
+ # Some (broken) commits do not have a sign
|
|
|
+ text = b"+" + text
|
|
|
+
|
|
|
+ # cgit parses the first character as the sign, and the rest
|
|
|
+ # as an integer (using strtol), which could also be negative.
|
|
|
+ # We do the same for compatibility. See #697828.
|
|
|
+ sign = text[:1]
|
|
|
+ offset = int(text[1:])
|
|
|
+ if sign == b"-":
|
|
|
+ offset = -offset
|
|
|
+ unnecessary_negative_timezone = offset >= 0 and sign == b"-"
|
|
|
+ signum = ((offset < 0) and -1) or 1
|
|
|
+ offset = abs(offset)
|
|
|
+ hours = int(offset / 100)
|
|
|
+ minutes = offset % 100
|
|
|
+ return (
|
|
|
+ signum * (hours * 3600 + minutes * 60),
|
|
|
+ unnecessary_negative_timezone,
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes:
|
|
|
"""Format a timezone for Git serialization.
|
|
|
|
|
|
@@ -1775,6 +1816,35 @@ def parse_time_entry(
|
|
|
return person, time, (timezone, timezone_neg_utc)
|
|
|
|
|
|
|
|
|
+def parse_time_entry_broken(
|
|
|
+ value: bytes,
|
|
|
+) -> tuple[bytes, int | None, tuple[int | None, bool]]:
|
|
|
+ """Parse event, accepting broken formats.
|
|
|
+
|
|
|
+ This function handles various broken author/committer/tagger line formats:
|
|
|
+ - Missing angle brackets around email
|
|
|
+ - Unsigned timezones
|
|
|
+ - Double-negative timezones
|
|
|
+
|
|
|
+ Args:
|
|
|
+ value: Bytes representing a git commit/tag line
|
|
|
+ Raises:
|
|
|
+ ObjectFormatException in case of parsing error
|
|
|
+ Returns: Tuple of (author, time, (timezone, timezone_neg_utc))
|
|
|
+ """
|
|
|
+ m = _TIME_ENTRY_RE.match(value)
|
|
|
+ if not m:
|
|
|
+ raise ObjectFormatException(f"Unable to parse time entry: {value!r}")
|
|
|
+
|
|
|
+ person = m.group("person")
|
|
|
+ timetext = m.group("time")
|
|
|
+ timezonetext = m.group("timezone")
|
|
|
+ time = int(timetext)
|
|
|
+ timezone, timezone_neg_utc = parse_timezone_broken(timezonetext)
|
|
|
+
|
|
|
+ return person, time, (timezone, timezone_neg_utc)
|
|
|
+
|
|
|
+
|
|
|
def format_time_entry(
|
|
|
person: bytes, time: int, timezone_info: tuple[int, bool]
|
|
|
) -> bytes:
|
|
|
@@ -1785,8 +1855,7 @@ def format_time_entry(
|
|
|
)
|
|
|
|
|
|
|
|
|
-@replace_me(since="0.21.0", remove_in="0.24.0")
|
|
|
-def parse_commit(
|
|
|
+def _parse_commit(
|
|
|
chunks: Iterable[bytes],
|
|
|
) -> tuple[
|
|
|
bytes | None,
|
|
|
@@ -1807,7 +1876,7 @@ def parse_commit(
|
|
|
encoding, mergetag, gpgsig, message, extra)
|
|
|
"""
|
|
|
parents = []
|
|
|
- extra = []
|
|
|
+ extra: list[tuple[bytes, bytes]] = []
|
|
|
tree = None
|
|
|
author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
|
|
|
None,
|
|
|
@@ -1869,6 +1938,94 @@ def parse_commit(
|
|
|
)
|
|
|
|
|
|
|
|
|
+def _parse_commit_broken(
|
|
|
+ chunks: Iterable[bytes],
|
|
|
+) -> tuple[
|
|
|
+ bytes | None,
|
|
|
+ list[bytes],
|
|
|
+ tuple[bytes | None, int | None, tuple[int | None, bool | None]],
|
|
|
+ tuple[bytes | None, int | None, tuple[int | None, bool | None]],
|
|
|
+ bytes | None,
|
|
|
+ list[Tag],
|
|
|
+ bytes | None,
|
|
|
+ bytes | None,
|
|
|
+ list[tuple[bytes, bytes]],
|
|
|
+]:
|
|
|
+ """Parse a commit object from chunks, accepting broken formats.
|
|
|
+
|
|
|
+ This function handles various broken author/committer line formats:
|
|
|
+ - Missing angle brackets around email
|
|
|
+ - Unsigned timezones
|
|
|
+ - Double-negative timezones
|
|
|
+
|
|
|
+ Args:
|
|
|
+ chunks: Chunks to parse
|
|
|
+ Returns: Tuple of (tree, parents, author_info, commit_info,
|
|
|
+ encoding, mergetag, gpgsig, message, extra)
|
|
|
+ """
|
|
|
+ parents = []
|
|
|
+ extra: list[tuple[bytes, bytes]] = []
|
|
|
+ tree = None
|
|
|
+ author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
|
|
|
+ None,
|
|
|
+ None,
|
|
|
+ (None, None),
|
|
|
+ )
|
|
|
+ commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
|
|
|
+ None,
|
|
|
+ None,
|
|
|
+ (None, None),
|
|
|
+ )
|
|
|
+ encoding = None
|
|
|
+ mergetag = []
|
|
|
+ message = None
|
|
|
+ gpgsig = None
|
|
|
+
|
|
|
+ for field, value in _parse_message(chunks):
|
|
|
+ # TODO(jelmer): Enforce ordering
|
|
|
+ if field == _TREE_HEADER:
|
|
|
+ tree = value
|
|
|
+ elif field == _PARENT_HEADER:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing parent value")
|
|
|
+ parents.append(value)
|
|
|
+ elif field == _AUTHOR_HEADER:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing author value")
|
|
|
+ author_info = parse_time_entry_broken(value)
|
|
|
+ elif field == _COMMITTER_HEADER:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing committer value")
|
|
|
+ commit_info = parse_time_entry_broken(value)
|
|
|
+ elif field == _ENCODING_HEADER:
|
|
|
+ encoding = value
|
|
|
+ elif field == _MERGETAG_HEADER:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing mergetag value")
|
|
|
+ tag = Tag.from_string(value + b"\n")
|
|
|
+ assert isinstance(tag, Tag)
|
|
|
+ mergetag.append(tag)
|
|
|
+ elif field == _GPGSIG_HEADER:
|
|
|
+ gpgsig = value
|
|
|
+ elif field is None:
|
|
|
+ message = value
|
|
|
+ else:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException(f"missing value for field {field!r}")
|
|
|
+ extra.append((field, value))
|
|
|
+ return (
|
|
|
+ tree,
|
|
|
+ parents,
|
|
|
+ author_info,
|
|
|
+ commit_info,
|
|
|
+ encoding,
|
|
|
+ mergetag,
|
|
|
+ gpgsig,
|
|
|
+ message,
|
|
|
+ extra,
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
class Commit(ShaFile):
|
|
|
"""A git commit object."""
|
|
|
|
|
|
@@ -1900,7 +2057,7 @@ class Commit(ShaFile):
|
|
|
self._encoding: bytes | None = None
|
|
|
self._mergetag: list[Tag] = []
|
|
|
self._gpgsig: bytes | None = None
|
|
|
- self._extra: list[tuple[bytes, bytes | None]] = []
|
|
|
+ self._extra: list[tuple[bytes, bytes]] = []
|
|
|
self._author_timezone_neg_utc: bool | None = False
|
|
|
self._commit_timezone_neg_utc: bool | None = False
|
|
|
|
|
|
@@ -1931,52 +2088,25 @@ class Commit(ShaFile):
|
|
|
return commit
|
|
|
|
|
|
def _deserialize(self, chunks: list[bytes]) -> None:
|
|
|
- self._parents = []
|
|
|
- self._extra = []
|
|
|
- self._tree = None
|
|
|
- author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
|
|
|
- None,
|
|
|
- None,
|
|
|
- (None, None),
|
|
|
- )
|
|
|
- commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
|
|
|
- None,
|
|
|
- None,
|
|
|
- (None, None),
|
|
|
- )
|
|
|
- self._encoding = None
|
|
|
- self._mergetag = []
|
|
|
- self._message = None
|
|
|
- self._gpgsig = None
|
|
|
-
|
|
|
- for field, value in _parse_message(chunks):
|
|
|
- # TODO(jelmer): Enforce ordering
|
|
|
- if field == _TREE_HEADER:
|
|
|
- self._tree = value
|
|
|
- elif field == _PARENT_HEADER:
|
|
|
- assert value is not None
|
|
|
- self._parents.append(ObjectID(value))
|
|
|
- elif field == _AUTHOR_HEADER:
|
|
|
- if value is None:
|
|
|
- raise ObjectFormatException("missing author value")
|
|
|
- author_info = parse_time_entry(value)
|
|
|
- elif field == _COMMITTER_HEADER:
|
|
|
- if value is None:
|
|
|
- raise ObjectFormatException("missing committer value")
|
|
|
- commit_info = parse_time_entry(value)
|
|
|
- elif field == _ENCODING_HEADER:
|
|
|
- self._encoding = value
|
|
|
- elif field == _MERGETAG_HEADER:
|
|
|
- assert value is not None
|
|
|
- tag = Tag.from_string(value + b"\n")
|
|
|
- assert isinstance(tag, Tag)
|
|
|
- self._mergetag.append(tag)
|
|
|
- elif field == _GPGSIG_HEADER:
|
|
|
- self._gpgsig = value
|
|
|
- elif field is None:
|
|
|
- self._message = value
|
|
|
- else:
|
|
|
- self._extra.append((field, value))
|
|
|
+ (
|
|
|
+ tree,
|
|
|
+ parents,
|
|
|
+ author_info,
|
|
|
+ commit_info,
|
|
|
+ encoding,
|
|
|
+ mergetag,
|
|
|
+ gpgsig,
|
|
|
+ message,
|
|
|
+ extra,
|
|
|
+ ) = parse_commit(chunks)
|
|
|
+
|
|
|
+ self._tree = tree
|
|
|
+ self._parents = [ObjectID(p) for p in parents]
|
|
|
+ self._encoding = encoding
|
|
|
+ self._mergetag = mergetag
|
|
|
+ self._gpgsig = gpgsig
|
|
|
+ self._message = message
|
|
|
+ self._extra = extra
|
|
|
|
|
|
(
|
|
|
self._author,
|
|
|
@@ -2198,7 +2328,7 @@ class Commit(ShaFile):
|
|
|
)
|
|
|
|
|
|
@replace_me(since="0.21.0", remove_in="0.24.0")
|
|
|
- def _get_extra(self) -> list[tuple[bytes, bytes | None]]:
|
|
|
+ def _get_extra(self) -> list[tuple[bytes, bytes]]:
|
|
|
"""Return extra settings of this commit."""
|
|
|
return self._extra
|
|
|
|
|
|
@@ -2258,6 +2388,99 @@ for cls in OBJECT_CLASSES:
|
|
|
_TYPE_MAP[cls.type_num] = cls
|
|
|
|
|
|
|
|
|
+# Public API functions
|
|
|
+
|
|
|
+
|
|
|
+@replace_me(since="0.21.0", remove_in="0.24.0")
|
|
|
+def parse_commit(
|
|
|
+ chunks: Iterable[bytes],
|
|
|
+) -> tuple[
|
|
|
+ bytes | None,
|
|
|
+ list[bytes],
|
|
|
+ tuple[bytes | None, int | None, tuple[int | None, bool | None]],
|
|
|
+ tuple[bytes | None, int | None, tuple[int | None, bool | None]],
|
|
|
+ bytes | None,
|
|
|
+ list[Tag],
|
|
|
+ bytes | None,
|
|
|
+ bytes | None,
|
|
|
+ list[tuple[bytes, bytes]],
|
|
|
+]:
|
|
|
+ """Parse a commit object from chunks.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ chunks: Chunks to parse
|
|
|
+ Returns: Tuple of (tree, parents, author_info, commit_info,
|
|
|
+ encoding, mergetag, gpgsig, message, extra)
|
|
|
+ """
|
|
|
+ return _parse_commit(chunks)
|
|
|
+
|
|
|
+
|
|
|
+def parse_commit_broken(data: bytes) -> Commit:
|
|
|
+ """Parse a commit with broken author/committer lines.
|
|
|
+
|
|
|
+ This function handles various broken formats found in the wild:
|
|
|
+ - Missing angle brackets around email addresses
|
|
|
+ - Unsigned timezones (e.g., "0000" instead of "+0000")
|
|
|
+ - Double-negative timezones (e.g., "--700")
|
|
|
+ - Negative timestamps
|
|
|
+ - Long/short/nonsensical timezone values
|
|
|
+
|
|
|
+ Warning: Commits parsed with this function may not round-trip correctly
|
|
|
+ through serialization, as the broken formatting is normalized during parsing.
|
|
|
+ The .check() method will likely fail for commits with malformed identity fields.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ data: Raw commit data as bytes
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ A Commit object with normalized fields
|
|
|
+
|
|
|
+ Example:
|
|
|
+ >>> data = b'''tree d80c186a03f423a81b39df39dc87fd269736ca86
|
|
|
+ ... author user@example.com 1234567890 -0500
|
|
|
+ ... committer user@example.com 1234567890 -0500
|
|
|
+ ...
|
|
|
+ ... Commit message
|
|
|
+ ... '''
|
|
|
+ >>> commit = parse_commit_broken(data)
|
|
|
+ >>> commit.author
|
|
|
+ b'user@example.com'
|
|
|
+ """
|
|
|
+ commit = Commit()
|
|
|
+ (
|
|
|
+ tree,
|
|
|
+ parents,
|
|
|
+ author_info,
|
|
|
+ commit_info,
|
|
|
+ encoding,
|
|
|
+ mergetag,
|
|
|
+ gpgsig,
|
|
|
+ message,
|
|
|
+ extra,
|
|
|
+ ) = _parse_commit_broken([data])
|
|
|
+
|
|
|
+ commit._tree = tree
|
|
|
+ commit._parents = [ObjectID(p) for p in parents]
|
|
|
+ commit._encoding = encoding
|
|
|
+ commit._mergetag = mergetag
|
|
|
+ commit._gpgsig = gpgsig
|
|
|
+ commit._message = message
|
|
|
+ commit._extra = extra
|
|
|
+
|
|
|
+ (
|
|
|
+ commit._author,
|
|
|
+ commit._author_time,
|
|
|
+ (commit._author_timezone, commit._author_timezone_neg_utc),
|
|
|
+ ) = author_info
|
|
|
+ (
|
|
|
+ commit._committer,
|
|
|
+ commit._commit_time,
|
|
|
+ (commit._commit_timezone, commit._commit_timezone_neg_utc),
|
|
|
+ ) = commit_info
|
|
|
+
|
|
|
+ return commit
|
|
|
+
|
|
|
+
|
|
|
# Hold on to the pure-python implementations for testing
|
|
|
_parse_tree_py = parse_tree
|
|
|
_sorted_tree_items_py = sorted_tree_items
|