|
@@ -30,14 +30,19 @@ import zlib
|
|
|
from collections import namedtuple
|
|
|
from collections.abc import Callable, Iterable, Iterator
|
|
|
from hashlib import sha1
|
|
|
-from io import BytesIO
|
|
|
+from io import BufferedIOBase, BytesIO
|
|
|
from typing import (
|
|
|
+ IO,
|
|
|
TYPE_CHECKING,
|
|
|
- BinaryIO,
|
|
|
Optional,
|
|
|
Union,
|
|
|
)
|
|
|
|
|
|
+try:
|
|
|
+ from typing import TypeGuard # type: ignore
|
|
|
+except ImportError:
|
|
|
+ from typing_extensions import TypeGuard
|
|
|
+
|
|
|
from . import replace_me
|
|
|
from .errors import (
|
|
|
ChecksumMismatch,
|
|
@@ -53,6 +58,8 @@ from .file import GitFile
|
|
|
if TYPE_CHECKING:
|
|
|
from _hashlib import HASH
|
|
|
|
|
|
+ from .file import _GitFile
|
|
|
+
|
|
|
ZERO_SHA = b"0" * 40
|
|
|
|
|
|
# Header fields for commits
|
|
@@ -86,7 +93,7 @@ class EmptyFileException(FileFormatException):
|
|
|
"""An unexpectedly empty file was encountered."""
|
|
|
|
|
|
|
|
|
-def S_ISGITLINK(m):
|
|
|
+def S_ISGITLINK(m: int) -> bool:
|
|
|
"""Check if a mode indicates a submodule.
|
|
|
|
|
|
Args:
|
|
@@ -96,23 +103,23 @@ def S_ISGITLINK(m):
|
|
|
return stat.S_IFMT(m) == S_IFGITLINK
|
|
|
|
|
|
|
|
|
-def _decompress(string):
|
|
|
+def _decompress(string: bytes) -> bytes:
|
|
|
dcomp = zlib.decompressobj()
|
|
|
dcomped = dcomp.decompress(string)
|
|
|
dcomped += dcomp.flush()
|
|
|
return dcomped
|
|
|
|
|
|
|
|
|
-def sha_to_hex(sha):
|
|
|
+def sha_to_hex(sha: ObjectID) -> bytes:
|
|
|
"""Takes a string and returns the hex of the sha within."""
|
|
|
hexsha = binascii.hexlify(sha)
|
|
|
assert len(hexsha) == 40, f"Incorrect length of sha1 string: {hexsha!r}"
|
|
|
return hexsha
|
|
|
|
|
|
|
|
|
-def hex_to_sha(hex):
|
|
|
+def hex_to_sha(hex: Union[bytes, str]) -> bytes:
|
|
|
"""Takes a hex sha and returns a binary sha."""
|
|
|
- assert len(hex) == 40, f"Incorrect length of hexsha: {hex}"
|
|
|
+ assert len(hex) == 40, f"Incorrect length of hexsha: {hex!r}"
|
|
|
try:
|
|
|
return binascii.unhexlify(hex)
|
|
|
except TypeError as exc:
|
|
@@ -121,7 +128,7 @@ def hex_to_sha(hex):
|
|
|
raise ValueError(exc.args[0]) from exc
|
|
|
|
|
|
|
|
|
-def valid_hexsha(hex) -> bool:
|
|
|
+def valid_hexsha(hex: Union[bytes, str]) -> bool:
|
|
|
if len(hex) != 40:
|
|
|
return False
|
|
|
try:
|
|
@@ -132,30 +139,32 @@ def valid_hexsha(hex) -> bool:
|
|
|
return True
|
|
|
|
|
|
|
|
|
-def hex_to_filename(path, hex):
|
|
|
+def hex_to_filename(
|
|
|
+ path: Union[str, bytes], hex: Union[str, bytes]
|
|
|
+) -> Union[str, bytes]:
|
|
|
"""Takes a hex sha and returns its filename relative to the given path."""
|
|
|
# os.path.join accepts bytes or unicode, but all args must be of the same
|
|
|
# type. Make sure that hex which is expected to be bytes, is the same type
|
|
|
# as path.
|
|
|
if type(path) is not type(hex) and getattr(path, "encode", None) is not None:
|
|
|
- hex = hex.decode("ascii")
|
|
|
+ hex = hex.decode("ascii") # type: ignore
|
|
|
dir = hex[:2]
|
|
|
file = hex[2:]
|
|
|
# Check from object dir
|
|
|
- return os.path.join(path, dir, file)
|
|
|
+ return os.path.join(path, dir, file) # type: ignore
|
|
|
|
|
|
|
|
|
-def filename_to_hex(filename):
|
|
|
+def filename_to_hex(filename: Union[str, bytes]) -> str:
|
|
|
"""Takes an object filename and returns its corresponding hex sha."""
|
|
|
# grab the last (up to) two path components
|
|
|
- names = filename.rsplit(os.path.sep, 2)[-2:]
|
|
|
- errmsg = f"Invalid object filename: {filename}"
|
|
|
+ names = filename.rsplit(os.path.sep, 2)[-2:] # type: ignore
|
|
|
+ errmsg = f"Invalid object filename: {filename!r}"
|
|
|
assert len(names) == 2, errmsg
|
|
|
base, rest = names
|
|
|
assert len(base) == 2 and len(rest) == 38, errmsg
|
|
|
- hex = (base + rest).encode("ascii")
|
|
|
- hex_to_sha(hex)
|
|
|
- return hex
|
|
|
+ hex_bytes = (base + rest).encode("ascii") # type: ignore
|
|
|
+ hex_to_sha(hex_bytes)
|
|
|
+ return hex_bytes.decode("ascii")
|
|
|
|
|
|
|
|
|
def object_header(num_type: int, length: int) -> bytes:
|
|
@@ -166,14 +175,14 @@ def object_header(num_type: int, length: int) -> bytes:
|
|
|
return cls.type_name + b" " + str(length).encode("ascii") + b"\0"
|
|
|
|
|
|
|
|
|
-def serializable_property(name: str, docstring: Optional[str] = None):
|
|
|
+def serializable_property(name: str, docstring: Optional[str] = None) -> property:
|
|
|
"""A property that helps tracking whether serialization is necessary."""
|
|
|
|
|
|
- def set(obj, value) -> None:
|
|
|
+ def set(obj: "ShaFile", value: object) -> None:
|
|
|
setattr(obj, "_" + name, value)
|
|
|
obj._needs_serialization = True
|
|
|
|
|
|
- def get(obj):
|
|
|
+ def get(obj: "ShaFile") -> object:
|
|
|
return getattr(obj, "_" + name)
|
|
|
|
|
|
return property(get, set, doc=docstring)
|
|
@@ -190,7 +199,7 @@ def object_class(type: Union[bytes, int]) -> Optional[type["ShaFile"]]:
|
|
|
return _TYPE_MAP.get(type, None)
|
|
|
|
|
|
|
|
|
-def check_hexsha(hex, error_msg) -> None:
|
|
|
+def check_hexsha(hex: Union[str, bytes], error_msg: str) -> None:
|
|
|
"""Check if a string is a valid hex sha string.
|
|
|
|
|
|
Args:
|
|
@@ -200,7 +209,7 @@ def check_hexsha(hex, error_msg) -> None:
|
|
|
ObjectFormatException: Raised when the string is not valid
|
|
|
"""
|
|
|
if not valid_hexsha(hex):
|
|
|
- raise ObjectFormatException(f"{error_msg} {hex}")
|
|
|
+ raise ObjectFormatException(f"{error_msg} {hex!r}")
|
|
|
|
|
|
|
|
|
def check_identity(identity: Optional[bytes], error_msg: str) -> None:
|
|
@@ -229,7 +238,7 @@ def check_identity(identity: Optional[bytes], error_msg: str) -> None:
|
|
|
raise ObjectFormatException(error_msg)
|
|
|
|
|
|
|
|
|
-def check_time(time_seconds) -> None:
|
|
|
+def check_time(time_seconds: int) -> None:
|
|
|
"""Check if the specified time is not prone to overflow error.
|
|
|
|
|
|
This will raise an exception if the time is not valid.
|
|
@@ -243,7 +252,7 @@ def check_time(time_seconds) -> None:
|
|
|
raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}")
|
|
|
|
|
|
|
|
|
-def git_line(*items):
|
|
|
+def git_line(*items: bytes) -> bytes:
|
|
|
"""Formats items into a space separated line."""
|
|
|
return b" ".join(items) + b"\n"
|
|
|
|
|
@@ -253,9 +262,9 @@ class FixedSha:
|
|
|
|
|
|
__slots__ = ("_hexsha", "_sha")
|
|
|
|
|
|
- def __init__(self, hexsha) -> None:
|
|
|
+ def __init__(self, hexsha: Union[str, bytes]) -> None:
|
|
|
if getattr(hexsha, "encode", None) is not None:
|
|
|
- hexsha = hexsha.encode("ascii")
|
|
|
+ hexsha = hexsha.encode("ascii") # type: ignore
|
|
|
if not isinstance(hexsha, bytes):
|
|
|
raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}")
|
|
|
self._hexsha = hexsha
|
|
@@ -270,6 +279,43 @@ class FixedSha:
|
|
|
return self._hexsha.decode("ascii")
|
|
|
|
|
|
|
|
|
+# Type guard functions for runtime type narrowing
|
|
|
+if TYPE_CHECKING:
|
|
|
+
|
|
|
+ def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]:
|
|
|
+ """Check if a ShaFile is a Commit."""
|
|
|
+ return obj.type_name == b"commit"
|
|
|
+
|
|
|
+ def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]:
|
|
|
+ """Check if a ShaFile is a Tree."""
|
|
|
+ return obj.type_name == b"tree"
|
|
|
+
|
|
|
+ def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]:
|
|
|
+ """Check if a ShaFile is a Blob."""
|
|
|
+ return obj.type_name == b"blob"
|
|
|
+
|
|
|
+ def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]:
|
|
|
+ """Check if a ShaFile is a Tag."""
|
|
|
+ return obj.type_name == b"tag"
|
|
|
+else:
|
|
|
+ # Runtime versions without type narrowing
|
|
|
+ def is_commit(obj: "ShaFile") -> bool:
|
|
|
+ """Check if a ShaFile is a Commit."""
|
|
|
+ return obj.type_name == b"commit"
|
|
|
+
|
|
|
+ def is_tree(obj: "ShaFile") -> bool:
|
|
|
+ """Check if a ShaFile is a Tree."""
|
|
|
+ return obj.type_name == b"tree"
|
|
|
+
|
|
|
+ def is_blob(obj: "ShaFile") -> bool:
|
|
|
+ """Check if a ShaFile is a Blob."""
|
|
|
+ return obj.type_name == b"blob"
|
|
|
+
|
|
|
+ def is_tag(obj: "ShaFile") -> bool:
|
|
|
+ """Check if a ShaFile is a Tag."""
|
|
|
+ return obj.type_name == b"tag"
|
|
|
+
|
|
|
+
|
|
|
class ShaFile:
|
|
|
"""A git SHA file."""
|
|
|
|
|
@@ -282,7 +328,9 @@ class ShaFile:
|
|
|
_sha: Union[FixedSha, None, "HASH"]
|
|
|
|
|
|
@staticmethod
|
|
|
- def _parse_legacy_object_header(magic, f: BinaryIO) -> "ShaFile":
|
|
|
+ def _parse_legacy_object_header(
|
|
|
+ magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
|
|
|
+ ) -> "ShaFile":
|
|
|
"""Parse a legacy object, creating it but not reading the file."""
|
|
|
bufsize = 1024
|
|
|
decomp = zlib.decompressobj()
|
|
@@ -308,7 +356,7 @@ class ShaFile:
|
|
|
)
|
|
|
return obj_class()
|
|
|
|
|
|
- def _parse_legacy_object(self, map) -> None:
|
|
|
+ def _parse_legacy_object(self, map: bytes) -> None:
|
|
|
"""Parse a legacy object, setting the raw string."""
|
|
|
text = _decompress(map)
|
|
|
header_end = text.find(b"\0")
|
|
@@ -382,7 +430,9 @@ class ShaFile:
|
|
|
self._needs_serialization = False
|
|
|
|
|
|
@staticmethod
|
|
|
- def _parse_object_header(magic, f):
|
|
|
+ def _parse_object_header(
|
|
|
+ magic: bytes, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]
|
|
|
+ ) -> "ShaFile":
|
|
|
"""Parse a new style object, creating it but not reading the file."""
|
|
|
num_type = (ord(magic[0:1]) >> 4) & 7
|
|
|
obj_class = object_class(num_type)
|
|
@@ -390,7 +440,7 @@ class ShaFile:
|
|
|
raise ObjectFormatException(f"Not a known type {num_type}")
|
|
|
return obj_class()
|
|
|
|
|
|
- def _parse_object(self, map) -> None:
|
|
|
+ def _parse_object(self, map: bytes) -> None:
|
|
|
"""Parse a new style object, setting self._text."""
|
|
|
# skip type and size; type must have already been determined, and
|
|
|
# we trust zlib to fail if it's otherwise corrupted
|
|
@@ -410,7 +460,7 @@ class ShaFile:
|
|
|
return (b0 & 0x8F) == 0x08 and (word % 31) == 0
|
|
|
|
|
|
@classmethod
|
|
|
- def _parse_file(cls, f):
|
|
|
+ def _parse_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
|
|
|
map = f.read()
|
|
|
if not map:
|
|
|
raise EmptyFileException("Corrupted empty file detected")
|
|
@@ -436,13 +486,13 @@ class ShaFile:
|
|
|
raise NotImplementedError(self._serialize)
|
|
|
|
|
|
@classmethod
|
|
|
- def from_path(cls, path):
|
|
|
+ def from_path(cls, path: Union[str, bytes]) -> "ShaFile":
|
|
|
"""Open a SHA file from disk."""
|
|
|
with GitFile(path, "rb") as f:
|
|
|
return cls.from_file(f)
|
|
|
|
|
|
@classmethod
|
|
|
- def from_file(cls, f):
|
|
|
+ def from_file(cls, f: Union[BufferedIOBase, IO[bytes], "_GitFile"]) -> "ShaFile":
|
|
|
"""Get the contents of a SHA file on disk."""
|
|
|
try:
|
|
|
obj = cls._parse_file(f)
|
|
@@ -453,7 +503,7 @@ class ShaFile:
|
|
|
|
|
|
@staticmethod
|
|
|
def from_raw_string(
|
|
|
- type_num, string: bytes, sha: Optional[ObjectID] = None
|
|
|
+ type_num: int, string: bytes, sha: Optional[ObjectID] = None
|
|
|
) -> "ShaFile":
|
|
|
"""Creates an object of the indicated type from the raw string given.
|
|
|
|
|
@@ -472,7 +522,7 @@ class ShaFile:
|
|
|
@staticmethod
|
|
|
def from_raw_chunks(
|
|
|
type_num: int, chunks: list[bytes], sha: Optional[ObjectID] = None
|
|
|
- ):
|
|
|
+ ) -> "ShaFile":
|
|
|
"""Creates an object of the indicated type from the raw chunks given.
|
|
|
|
|
|
Args:
|
|
@@ -488,13 +538,13 @@ class ShaFile:
|
|
|
return obj
|
|
|
|
|
|
@classmethod
|
|
|
- def from_string(cls, string):
|
|
|
+ def from_string(cls, string: bytes) -> "ShaFile":
|
|
|
"""Create a ShaFile from a string."""
|
|
|
obj = cls()
|
|
|
obj.set_raw_string(string)
|
|
|
return obj
|
|
|
|
|
|
- def _check_has_member(self, member, error_msg) -> None:
|
|
|
+ def _check_has_member(self, member: str, error_msg: str) -> None:
|
|
|
"""Check that the object has a given member variable.
|
|
|
|
|
|
Args:
|
|
@@ -529,7 +579,7 @@ class ShaFile:
|
|
|
if old_sha != new_sha:
|
|
|
raise ChecksumMismatch(new_sha, old_sha)
|
|
|
|
|
|
- def _header(self):
|
|
|
+ def _header(self) -> bytes:
|
|
|
return object_header(self.type_num, self.raw_length())
|
|
|
|
|
|
def raw_length(self) -> int:
|
|
@@ -555,28 +605,28 @@ class ShaFile:
|
|
|
return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
|
|
|
|
|
|
@property
|
|
|
- def id(self):
|
|
|
+ def id(self) -> bytes:
|
|
|
"""The hex SHA of this object."""
|
|
|
return self.sha().hexdigest().encode("ascii")
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
- return f"<{self.__class__.__name__} {self.id}>"
|
|
|
+ return f"<{self.__class__.__name__} {self.id!r}>"
|
|
|
|
|
|
- def __ne__(self, other) -> bool:
|
|
|
+ def __ne__(self, other: object) -> bool:
|
|
|
"""Check whether this object does not match the other."""
|
|
|
return not isinstance(other, ShaFile) or self.id != other.id
|
|
|
|
|
|
- def __eq__(self, other) -> bool:
|
|
|
+ def __eq__(self, other: object) -> bool:
|
|
|
"""Return True if the SHAs of the two objects match."""
|
|
|
return isinstance(other, ShaFile) and self.id == other.id
|
|
|
|
|
|
- def __lt__(self, other) -> bool:
|
|
|
+ def __lt__(self, other: object) -> bool:
|
|
|
"""Return whether SHA of this object is less than the other."""
|
|
|
if not isinstance(other, ShaFile):
|
|
|
raise TypeError
|
|
|
return self.id < other.id
|
|
|
|
|
|
- def __le__(self, other) -> bool:
|
|
|
+ def __le__(self, other: object) -> bool:
|
|
|
"""Check whether SHA of this object is less than or equal to the other."""
|
|
|
if not isinstance(other, ShaFile):
|
|
|
raise TypeError
|
|
@@ -598,26 +648,26 @@ class Blob(ShaFile):
|
|
|
self._chunked_text = []
|
|
|
self._needs_serialization = False
|
|
|
|
|
|
- def _get_data(self):
|
|
|
+ def _get_data(self) -> bytes:
|
|
|
return self.as_raw_string()
|
|
|
|
|
|
- def _set_data(self, data) -> None:
|
|
|
+ def _set_data(self, data: bytes) -> None:
|
|
|
self.set_raw_string(data)
|
|
|
|
|
|
data = property(
|
|
|
_get_data, _set_data, doc="The text contained within the blob object."
|
|
|
)
|
|
|
|
|
|
- def _get_chunked(self):
|
|
|
+ def _get_chunked(self) -> list[bytes]:
|
|
|
return self._chunked_text
|
|
|
|
|
|
def _set_chunked(self, chunks: list[bytes]) -> None:
|
|
|
self._chunked_text = chunks
|
|
|
|
|
|
- def _serialize(self):
|
|
|
+ def _serialize(self) -> list[bytes]:
|
|
|
return self._chunked_text
|
|
|
|
|
|
- def _deserialize(self, chunks) -> None:
|
|
|
+ def _deserialize(self, chunks: list[bytes]) -> None:
|
|
|
self._chunked_text = chunks
|
|
|
|
|
|
chunked = property(
|
|
@@ -627,7 +677,7 @@ class Blob(ShaFile):
|
|
|
)
|
|
|
|
|
|
@classmethod
|
|
|
- def from_path(cls, path):
|
|
|
+ def from_path(cls, path: Union[str, bytes]) -> "Blob":
|
|
|
blob = ShaFile.from_path(path)
|
|
|
if not isinstance(blob, cls):
|
|
|
raise NotBlobError(path)
|
|
@@ -685,7 +735,7 @@ def _parse_message(
|
|
|
v = b""
|
|
|
eof = False
|
|
|
|
|
|
- def _strip_last_newline(value):
|
|
|
+ def _strip_last_newline(value: bytes) -> bytes:
|
|
|
"""Strip the last newline from value."""
|
|
|
if value and value.endswith(b"\n"):
|
|
|
return value[:-1]
|
|
@@ -725,7 +775,9 @@ def _parse_message(
|
|
|
f.close()
|
|
|
|
|
|
|
|
|
-def _format_message(headers, body):
|
|
|
+def _format_message(
|
|
|
+ headers: list[tuple[bytes, bytes]], body: Optional[bytes]
|
|
|
+) -> Iterator[bytes]:
|
|
|
for field, value in headers:
|
|
|
lines = value.split(b"\n")
|
|
|
yield git_line(field, lines[0])
|
|
@@ -754,6 +806,14 @@ class Tag(ShaFile):
|
|
|
"_tagger",
|
|
|
)
|
|
|
|
|
|
+ _message: Optional[bytes]
|
|
|
+ _name: Optional[bytes]
|
|
|
+ _object_class: Optional[type["ShaFile"]]
|
|
|
+ _object_sha: Optional[bytes]
|
|
|
+ _signature: Optional[bytes]
|
|
|
+ _tag_time: Optional[int]
|
|
|
+ _tag_timezone: Optional[int]
|
|
|
+ _tag_timezone_neg_utc: Optional[bool]
|
|
|
_tagger: Optional[bytes]
|
|
|
|
|
|
def __init__(self) -> None:
|
|
@@ -765,7 +825,7 @@ class Tag(ShaFile):
|
|
|
self._signature: Optional[bytes] = None
|
|
|
|
|
|
@classmethod
|
|
|
- def from_path(cls, filename):
|
|
|
+ def from_path(cls, filename: Union[str, bytes]) -> "Tag":
|
|
|
tag = ShaFile.from_path(filename)
|
|
|
if not isinstance(tag, cls):
|
|
|
raise NotTagError(filename)
|
|
@@ -786,12 +846,16 @@ class Tag(ShaFile):
|
|
|
if not self._name:
|
|
|
raise ObjectFormatException("empty tag name")
|
|
|
|
|
|
+ if self._object_sha is None:
|
|
|
+ raise ObjectFormatException("missing object sha")
|
|
|
check_hexsha(self._object_sha, "invalid object sha")
|
|
|
|
|
|
if self._tagger is not None:
|
|
|
check_identity(self._tagger, "invalid tagger")
|
|
|
|
|
|
self._check_has_member("_tag_time", "missing tag time")
|
|
|
+ if self._tag_time is None:
|
|
|
+ raise ObjectFormatException("missing tag time")
|
|
|
check_time(self._tag_time)
|
|
|
|
|
|
last = None
|
|
@@ -806,15 +870,23 @@ class Tag(ShaFile):
|
|
|
raise ObjectFormatException("unexpected tagger")
|
|
|
last = field
|
|
|
|
|
|
- def _serialize(self):
|
|
|
+ def _serialize(self) -> list[bytes]:
|
|
|
headers = []
|
|
|
+ if self._object_sha is None:
|
|
|
+ raise ObjectFormatException("missing object sha")
|
|
|
headers.append((_OBJECT_HEADER, self._object_sha))
|
|
|
+ if self._object_class is None:
|
|
|
+ raise ObjectFormatException("missing object class")
|
|
|
headers.append((_TYPE_HEADER, self._object_class.type_name))
|
|
|
+ if self._name is None:
|
|
|
+ raise ObjectFormatException("missing tag name")
|
|
|
headers.append((_TAG_HEADER, self._name))
|
|
|
if self._tagger:
|
|
|
if self._tag_time is None:
|
|
|
headers.append((_TAGGER_HEADER, self._tagger))
|
|
|
else:
|
|
|
+ if self._tag_timezone is None or self._tag_timezone_neg_utc is None:
|
|
|
+ raise ObjectFormatException("missing timezone info")
|
|
|
headers.append(
|
|
|
(
|
|
|
_TAGGER_HEADER,
|
|
@@ -832,7 +904,7 @@ class Tag(ShaFile):
|
|
|
body = (self.message or b"") + (self._signature or b"")
|
|
|
return list(_format_message(headers, body))
|
|
|
|
|
|
- def _deserialize(self, chunks) -> None:
|
|
|
+ def _deserialize(self, chunks: list[bytes]) -> None:
|
|
|
"""Grab the metadata attached to the tag."""
|
|
|
self._tagger = None
|
|
|
self._tag_time = None
|
|
@@ -850,6 +922,8 @@ class Tag(ShaFile):
|
|
|
elif field == _TAG_HEADER:
|
|
|
self._name = value
|
|
|
elif field == _TAGGER_HEADER:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing tagger value")
|
|
|
(
|
|
|
self._tagger,
|
|
|
self._tag_time,
|
|
@@ -873,14 +947,16 @@ class Tag(ShaFile):
|
|
|
f"Unknown field {field.decode('ascii', 'replace')}"
|
|
|
)
|
|
|
|
|
|
- def _get_object(self):
|
|
|
+ def _get_object(self) -> tuple[type[ShaFile], bytes]:
|
|
|
"""Get the object pointed to by this tag.
|
|
|
|
|
|
Returns: tuple of (object class, sha).
|
|
|
"""
|
|
|
+ if self._object_class is None or self._object_sha is None:
|
|
|
+ raise ValueError("Tag object is not properly initialized")
|
|
|
return (self._object_class, self._object_sha)
|
|
|
|
|
|
- def _set_object(self, value) -> None:
|
|
|
+ def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None:
|
|
|
(self._object_class, self._object_sha) = value
|
|
|
self._needs_serialization = True
|
|
|
|
|
@@ -964,14 +1040,14 @@ class Tag(ShaFile):
|
|
|
class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])):
|
|
|
"""Named tuple encapsulating a single tree entry."""
|
|
|
|
|
|
- def in_path(self, path: bytes):
|
|
|
+ def in_path(self, path: bytes) -> "TreeEntry":
|
|
|
"""Return a copy of this entry with the given path prepended."""
|
|
|
if not isinstance(self.path, bytes):
|
|
|
raise TypeError(f"Expected bytes for path, got {path!r}")
|
|
|
return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha)
|
|
|
|
|
|
|
|
|
-def parse_tree(text, strict=False):
|
|
|
+def parse_tree(text: bytes, strict: bool = False) -> Iterator[tuple[bytes, int, bytes]]:
|
|
|
"""Parse a tree text.
|
|
|
|
|
|
Args:
|
|
@@ -987,11 +1063,11 @@ def parse_tree(text, strict=False):
|
|
|
mode_end = text.index(b" ", count)
|
|
|
mode_text = text[count:mode_end]
|
|
|
if strict and mode_text.startswith(b"0"):
|
|
|
- raise ObjectFormatException(f"Invalid mode '{mode_text}'")
|
|
|
+ raise ObjectFormatException(f"Invalid mode {mode_text!r}")
|
|
|
try:
|
|
|
mode = int(mode_text, 8)
|
|
|
except ValueError as exc:
|
|
|
- raise ObjectFormatException(f"Invalid mode '{mode_text}'") from exc
|
|
|
+ raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc
|
|
|
name_end = text.index(b"\0", mode_end)
|
|
|
name = text[mode_end + 1 : name_end]
|
|
|
count = name_end + 21
|
|
@@ -1002,7 +1078,7 @@ def parse_tree(text, strict=False):
|
|
|
yield (name, mode, hexsha)
|
|
|
|
|
|
|
|
|
-def serialize_tree(items):
|
|
|
+def serialize_tree(items: Iterable[tuple[bytes, int, bytes]]) -> Iterator[bytes]:
|
|
|
"""Serialize the items in a tree to a text.
|
|
|
|
|
|
Args:
|
|
@@ -1015,7 +1091,9 @@ def serialize_tree(items):
|
|
|
)
|
|
|
|
|
|
|
|
|
-def sorted_tree_items(entries, name_order: bool):
|
|
|
+def sorted_tree_items(
|
|
|
+ entries: dict[bytes, tuple[int, bytes]], name_order: bool
|
|
|
+) -> Iterator[TreeEntry]:
|
|
|
"""Iterate over a tree entries dictionary.
|
|
|
|
|
|
Args:
|
|
@@ -1055,7 +1133,9 @@ def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
|
|
|
return entry[0]
|
|
|
|
|
|
|
|
|
-def pretty_format_tree_entry(name, mode, hexsha, encoding="utf-8") -> str:
|
|
|
+def pretty_format_tree_entry(
|
|
|
+ name: bytes, mode: int, hexsha: bytes, encoding: str = "utf-8"
|
|
|
+) -> str:
|
|
|
"""Pretty format tree entry.
|
|
|
|
|
|
Args:
|
|
@@ -1079,7 +1159,7 @@ def pretty_format_tree_entry(name, mode, hexsha, encoding="utf-8") -> str:
|
|
|
class SubmoduleEncountered(Exception):
|
|
|
"""A submodule was encountered while resolving a path."""
|
|
|
|
|
|
- def __init__(self, path, sha) -> None:
|
|
|
+ def __init__(self, path: bytes, sha: ObjectID) -> None:
|
|
|
self.path = path
|
|
|
self.sha = sha
|
|
|
|
|
@@ -1097,19 +1177,19 @@ class Tree(ShaFile):
|
|
|
self._entries: dict[bytes, tuple[int, bytes]] = {}
|
|
|
|
|
|
@classmethod
|
|
|
- def from_path(cls, filename):
|
|
|
+ def from_path(cls, filename: Union[str, bytes]) -> "Tree":
|
|
|
tree = ShaFile.from_path(filename)
|
|
|
if not isinstance(tree, cls):
|
|
|
raise NotTreeError(filename)
|
|
|
return tree
|
|
|
|
|
|
- def __contains__(self, name) -> bool:
|
|
|
+ def __contains__(self, name: bytes) -> bool:
|
|
|
return name in self._entries
|
|
|
|
|
|
- def __getitem__(self, name):
|
|
|
+ def __getitem__(self, name: bytes) -> tuple[int, ObjectID]:
|
|
|
return self._entries[name]
|
|
|
|
|
|
- def __setitem__(self, name, value) -> None:
|
|
|
+ def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None:
|
|
|
"""Set a tree entry by name.
|
|
|
|
|
|
Args:
|
|
@@ -1122,17 +1202,17 @@ class Tree(ShaFile):
|
|
|
self._entries[name] = (mode, hexsha)
|
|
|
self._needs_serialization = True
|
|
|
|
|
|
- def __delitem__(self, name) -> None:
|
|
|
+ def __delitem__(self, name: bytes) -> None:
|
|
|
del self._entries[name]
|
|
|
self._needs_serialization = True
|
|
|
|
|
|
def __len__(self) -> int:
|
|
|
return len(self._entries)
|
|
|
|
|
|
- def __iter__(self):
|
|
|
+ def __iter__(self) -> Iterator[bytes]:
|
|
|
return iter(self._entries)
|
|
|
|
|
|
- def add(self, name, mode, hexsha) -> None:
|
|
|
+ def add(self, name: bytes, mode: int, hexsha: bytes) -> None:
|
|
|
"""Add an entry to the tree.
|
|
|
|
|
|
Args:
|
|
@@ -1144,7 +1224,7 @@ class Tree(ShaFile):
|
|
|
self._entries[name] = mode, hexsha
|
|
|
self._needs_serialization = True
|
|
|
|
|
|
- def iteritems(self, name_order=False) -> Iterator[TreeEntry]:
|
|
|
+ def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]:
|
|
|
"""Iterate over entries.
|
|
|
|
|
|
Args:
|
|
@@ -1161,7 +1241,7 @@ class Tree(ShaFile):
|
|
|
"""
|
|
|
return list(self.iteritems())
|
|
|
|
|
|
- def _deserialize(self, chunks) -> None:
|
|
|
+ def _deserialize(self, chunks: list[bytes]) -> None:
|
|
|
"""Grab the entries in the tree."""
|
|
|
try:
|
|
|
parsed_entries = parse_tree(b"".join(chunks))
|
|
@@ -1191,7 +1271,7 @@ class Tree(ShaFile):
|
|
|
stat.S_IFREG | 0o664,
|
|
|
)
|
|
|
for name, mode, sha in parse_tree(b"".join(self._chunked_text), True):
|
|
|
- check_hexsha(sha, f"invalid sha {sha}")
|
|
|
+ check_hexsha(sha, f"invalid sha {sha!r}")
|
|
|
if b"/" in name or name in (b"", b".", b"..", b".git"):
|
|
|
raise ObjectFormatException(
|
|
|
"invalid name {}".format(name.decode("utf-8", "replace"))
|
|
@@ -1205,10 +1285,10 @@ class Tree(ShaFile):
|
|
|
if key_entry(last) > key_entry(entry):
|
|
|
raise ObjectFormatException("entries not sorted")
|
|
|
if name == last[0]:
|
|
|
- raise ObjectFormatException(f"duplicate entry {name}")
|
|
|
+ raise ObjectFormatException(f"duplicate entry {name!r}")
|
|
|
last = entry
|
|
|
|
|
|
- def _serialize(self):
|
|
|
+ def _serialize(self) -> list[bytes]:
|
|
|
return list(serialize_tree(self.iteritems()))
|
|
|
|
|
|
def as_pretty_string(self) -> str:
|
|
@@ -1217,7 +1297,9 @@ class Tree(ShaFile):
|
|
|
text.append(pretty_format_tree_entry(name, mode, hexsha))
|
|
|
return "".join(text)
|
|
|
|
|
|
- def lookup_path(self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes):
|
|
|
+ def lookup_path(
|
|
|
+ self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes
|
|
|
+ ) -> tuple[int, ObjectID]:
|
|
|
"""Look up an object in a Git tree.
|
|
|
|
|
|
Args:
|
|
@@ -1227,7 +1309,7 @@ class Tree(ShaFile):
|
|
|
"""
|
|
|
parts = path.split(b"/")
|
|
|
sha = self.id
|
|
|
- mode = None
|
|
|
+ mode: Optional[int] = None
|
|
|
for i, p in enumerate(parts):
|
|
|
if not p:
|
|
|
continue
|
|
@@ -1237,10 +1319,12 @@ class Tree(ShaFile):
|
|
|
if not isinstance(obj, Tree):
|
|
|
raise NotTreeError(sha)
|
|
|
mode, sha = obj[p]
|
|
|
+ if mode is None:
|
|
|
+ raise ValueError("No valid path found")
|
|
|
return mode, sha
|
|
|
|
|
|
|
|
|
-def parse_timezone(text):
|
|
|
+def parse_timezone(text: bytes) -> tuple[int, bool]:
|
|
|
"""Parse a timezone text fragment (e.g. '+0100').
|
|
|
|
|
|
Args:
|
|
@@ -1269,7 +1353,7 @@ def parse_timezone(text):
|
|
|
)
|
|
|
|
|
|
|
|
|
-def format_timezone(offset, unnecessary_negative_timezone=False):
|
|
|
+def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes:
|
|
|
"""Format a timezone for Git serialization.
|
|
|
|
|
|
Args:
|
|
@@ -1287,7 +1371,9 @@ def format_timezone(offset, unnecessary_negative_timezone=False):
|
|
|
return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031
|
|
|
|
|
|
|
|
|
-def parse_time_entry(value):
|
|
|
+def parse_time_entry(
|
|
|
+ value: bytes,
|
|
|
+) -> tuple[bytes, Optional[int], tuple[Optional[int], bool]]:
|
|
|
"""Parse event.
|
|
|
|
|
|
Args:
|
|
@@ -1312,7 +1398,9 @@ def parse_time_entry(value):
|
|
|
return person, time, (timezone, timezone_neg_utc)
|
|
|
|
|
|
|
|
|
-def format_time_entry(person, time, timezone_info):
|
|
|
+def format_time_entry(
|
|
|
+ person: bytes, time: int, timezone_info: tuple[int, bool]
|
|
|
+) -> bytes:
|
|
|
"""Format an event."""
|
|
|
(timezone, timezone_neg_utc) = timezone_info
|
|
|
return b" ".join(
|
|
@@ -1321,7 +1409,19 @@ def format_time_entry(person, time, timezone_info):
|
|
|
|
|
|
|
|
|
@replace_me(since="0.21.0", remove_in="0.24.0")
|
|
|
-def parse_commit(chunks):
|
|
|
+def parse_commit(
|
|
|
+ chunks: Iterable[bytes],
|
|
|
+) -> tuple[
|
|
|
+ Optional[bytes],
|
|
|
+ list[bytes],
|
|
|
+ tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]],
|
|
|
+ tuple[Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]],
|
|
|
+ Optional[bytes],
|
|
|
+ list[Tag],
|
|
|
+ Optional[bytes],
|
|
|
+ Optional[bytes],
|
|
|
+ list[tuple[bytes, bytes]],
|
|
|
+]:
|
|
|
"""Parse a commit object from chunks.
|
|
|
|
|
|
Args:
|
|
@@ -1332,8 +1432,12 @@ def parse_commit(chunks):
|
|
|
parents = []
|
|
|
extra = []
|
|
|
tree = None
|
|
|
- author_info = (None, None, (None, None))
|
|
|
- commit_info = (None, None, (None, None))
|
|
|
+ author_info: tuple[
|
|
|
+ Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
|
|
|
+ ] = (None, None, (None, None))
|
|
|
+ commit_info: tuple[
|
|
|
+ Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
|
|
|
+ ] = (None, None, (None, None))
|
|
|
encoding = None
|
|
|
mergetag = []
|
|
|
message = None
|
|
@@ -1344,20 +1448,32 @@ def parse_commit(chunks):
|
|
|
if field == _TREE_HEADER:
|
|
|
tree = value
|
|
|
elif field == _PARENT_HEADER:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing parent value")
|
|
|
parents.append(value)
|
|
|
elif field == _AUTHOR_HEADER:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing author value")
|
|
|
author_info = parse_time_entry(value)
|
|
|
elif field == _COMMITTER_HEADER:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing committer value")
|
|
|
commit_info = parse_time_entry(value)
|
|
|
elif field == _ENCODING_HEADER:
|
|
|
encoding = value
|
|
|
elif field == _MERGETAG_HEADER:
|
|
|
- mergetag.append(Tag.from_string(value + b"\n"))
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing mergetag value")
|
|
|
+ tag = Tag.from_string(value + b"\n")
|
|
|
+ assert isinstance(tag, Tag)
|
|
|
+ mergetag.append(tag)
|
|
|
elif field == _GPGSIG_HEADER:
|
|
|
gpgsig = value
|
|
|
elif field is None:
|
|
|
message = value
|
|
|
else:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException(f"missing value for field {field!r}")
|
|
|
extra.append((field, value))
|
|
|
return (
|
|
|
tree,
|
|
@@ -1407,18 +1523,22 @@ class Commit(ShaFile):
|
|
|
self._commit_timezone_neg_utc: Optional[bool] = False
|
|
|
|
|
|
@classmethod
|
|
|
- def from_path(cls, path):
|
|
|
+ def from_path(cls, path: Union[str, bytes]) -> "Commit":
|
|
|
commit = ShaFile.from_path(path)
|
|
|
if not isinstance(commit, cls):
|
|
|
raise NotCommitError(path)
|
|
|
return commit
|
|
|
|
|
|
- def _deserialize(self, chunks) -> None:
|
|
|
+ def _deserialize(self, chunks: list[bytes]) -> None:
|
|
|
self._parents = []
|
|
|
self._extra = []
|
|
|
self._tree = None
|
|
|
- author_info = (None, None, (None, None))
|
|
|
- commit_info = (None, None, (None, None))
|
|
|
+ author_info: tuple[
|
|
|
+ Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
|
|
|
+ ] = (None, None, (None, None))
|
|
|
+ commit_info: tuple[
|
|
|
+ Optional[bytes], Optional[int], tuple[Optional[int], Optional[bool]]
|
|
|
+ ] = (None, None, (None, None))
|
|
|
self._encoding = None
|
|
|
self._mergetag = []
|
|
|
self._message = None
|
|
@@ -1432,14 +1552,20 @@ class Commit(ShaFile):
|
|
|
assert value is not None
|
|
|
self._parents.append(value)
|
|
|
elif field == _AUTHOR_HEADER:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing author value")
|
|
|
author_info = parse_time_entry(value)
|
|
|
elif field == _COMMITTER_HEADER:
|
|
|
+ if value is None:
|
|
|
+ raise ObjectFormatException("missing committer value")
|
|
|
commit_info = parse_time_entry(value)
|
|
|
elif field == _ENCODING_HEADER:
|
|
|
self._encoding = value
|
|
|
elif field == _MERGETAG_HEADER:
|
|
|
assert value is not None
|
|
|
- self._mergetag.append(Tag.from_string(value + b"\n"))
|
|
|
+ tag = Tag.from_string(value + b"\n")
|
|
|
+ assert isinstance(tag, Tag)
|
|
|
+ self._mergetag.append(tag)
|
|
|
elif field == _GPGSIG_HEADER:
|
|
|
self._gpgsig = value
|
|
|
elif field is None:
|
|
@@ -1474,11 +1600,16 @@ class Commit(ShaFile):
|
|
|
|
|
|
for parent in self._parents:
|
|
|
check_hexsha(parent, "invalid parent sha")
|
|
|
+ assert self._tree is not None # checked by _check_has_member above
|
|
|
check_hexsha(self._tree, "invalid tree sha")
|
|
|
|
|
|
+ assert self._author is not None # checked by _check_has_member above
|
|
|
+ assert self._committer is not None # checked by _check_has_member above
|
|
|
check_identity(self._author, "invalid author")
|
|
|
check_identity(self._committer, "invalid committer")
|
|
|
|
|
|
+ assert self._author_time is not None # checked by _check_has_member above
|
|
|
+ assert self._commit_time is not None # checked by _check_has_member above
|
|
|
check_time(self._author_time)
|
|
|
check_time(self._commit_time)
|
|
|
|
|
@@ -1564,12 +1695,17 @@ class Commit(ShaFile):
|
|
|
return
|
|
|
raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
|
|
|
|
|
|
- def _serialize(self):
|
|
|
+ def _serialize(self) -> list[bytes]:
|
|
|
headers = []
|
|
|
+ assert self._tree is not None
|
|
|
tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree
|
|
|
headers.append((_TREE_HEADER, tree_bytes))
|
|
|
for p in self._parents:
|
|
|
headers.append((_PARENT_HEADER, p))
|
|
|
+ assert self._author is not None
|
|
|
+ assert self._author_time is not None
|
|
|
+ assert self._author_timezone is not None
|
|
|
+ assert self._author_timezone_neg_utc is not None
|
|
|
headers.append(
|
|
|
(
|
|
|
_AUTHOR_HEADER,
|
|
@@ -1580,6 +1716,10 @@ class Commit(ShaFile):
|
|
|
),
|
|
|
)
|
|
|
)
|
|
|
+ assert self._committer is not None
|
|
|
+ assert self._commit_time is not None
|
|
|
+ assert self._commit_timezone is not None
|
|
|
+ assert self._commit_timezone_neg_utc is not None
|
|
|
headers.append(
|
|
|
(
|
|
|
_COMMITTER_HEADER,
|
|
@@ -1594,18 +1734,20 @@ class Commit(ShaFile):
|
|
|
headers.append((_ENCODING_HEADER, self.encoding))
|
|
|
for mergetag in self.mergetag:
|
|
|
headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1]))
|
|
|
- headers.extend(self._extra)
|
|
|
+ headers.extend(
|
|
|
+ (field, value) for field, value in self._extra if value is not None
|
|
|
+ )
|
|
|
if self.gpgsig:
|
|
|
headers.append((_GPGSIG_HEADER, self.gpgsig))
|
|
|
return list(_format_message(headers, self._message))
|
|
|
|
|
|
tree = serializable_property("tree", "Tree that is the state of this commit")
|
|
|
|
|
|
- def _get_parents(self):
|
|
|
+ def _get_parents(self) -> list[bytes]:
|
|
|
"""Return a list of parents of this commit."""
|
|
|
return self._parents
|
|
|
|
|
|
- def _set_parents(self, value) -> None:
|
|
|
+ def _set_parents(self, value: list[bytes]) -> None:
|
|
|
"""Set a list of parents of this commit."""
|
|
|
self._needs_serialization = True
|
|
|
self._parents = value
|
|
@@ -1617,7 +1759,7 @@ class Commit(ShaFile):
|
|
|
)
|
|
|
|
|
|
@replace_me(since="0.21.0", remove_in="0.24.0")
|
|
|
- def _get_extra(self):
|
|
|
+ def _get_extra(self) -> list[tuple[bytes, Optional[bytes]]]:
|
|
|
"""Return extra settings of this commit."""
|
|
|
return self._extra
|
|
|
|