|
|
@@ -116,18 +116,19 @@ if TYPE_CHECKING:
|
|
|
ZERO_SHA = b"0" * 40 # SHA1 - kept for backward compatibility
|
|
|
|
|
|
|
|
|
-def zero_sha_for(hash_algorithm=None):
|
|
|
+def zero_sha_for(object_format=None) -> bytes:
|
|
|
"""Get the zero SHA for a given hash algorithm.
|
|
|
|
|
|
Args:
|
|
|
- hash_algorithm: HashAlgorithm instance. If None, returns SHA1 zero.
|
|
|
+ object_format: HashAlgorithm instance. If None, returns SHA1 zero.
|
|
|
|
|
|
Returns:
|
|
|
Zero SHA as hex bytes (40 chars for SHA1, 64 for SHA256)
|
|
|
"""
|
|
|
- if hash_algorithm is None:
|
|
|
+ if object_format is None:
|
|
|
return ZERO_SHA
|
|
|
- return hash_algorithm.zero_oid
|
|
|
+ return object_format.zero_oid
|
|
|
+
|
|
|
|
|
|
|
|
|
# Header fields for commits
|
|
|
@@ -203,7 +204,7 @@ def hex_to_sha(hex: ObjectID | str) -> RawObjectID:
|
|
|
"""Takes a hex sha and returns a binary sha."""
|
|
|
# Support both SHA1 (40 chars) and SHA256 (64 chars)
|
|
|
if len(hex) not in (40, 64):
|
|
|
- raise ValueError(f"Incorrect length of hexsha: {hex}")
|
|
|
+ raise ValueError(f"Incorrect length of hexsha: {hex!r}")
|
|
|
try:
|
|
|
return RawObjectID(binascii.unhexlify(hex))
|
|
|
except TypeError as exc:
|
|
|
@@ -465,7 +466,7 @@ else:
|
|
|
class ShaFile:
|
|
|
"""A git SHA file."""
|
|
|
|
|
|
- __slots__ = ("_chunked_text", "_needs_serialization", "_sha")
|
|
|
+ __slots__ = ("_chunked_text", "_needs_serialization", "_sha", "object_format")
|
|
|
|
|
|
_needs_serialization: bool
|
|
|
type_name: bytes
|
|
|
@@ -473,6 +474,13 @@ class ShaFile:
|
|
|
_chunked_text: list[bytes] | None
|
|
|
_sha: "FixedSha | None | HASH"
|
|
|
|
|
|
+ def __init__(self) -> None:
|
|
|
+ """Initialize a ShaFile."""
|
|
|
+ self._sha = None
|
|
|
+ self._chunked_text = None
|
|
|
+ self._needs_serialization = True
|
|
|
+ self.object_format = None
|
|
|
+
|
|
|
@staticmethod
|
|
|
def _parse_legacy_object_header(
|
|
|
magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile"
|
|
|
@@ -567,15 +575,18 @@ class ShaFile:
|
|
|
self.set_raw_chunks([text], sha)
|
|
|
|
|
|
def set_raw_chunks(
|
|
|
- self, chunks: list[bytes], sha: ObjectID | RawObjectID | None = None
|
|
|
+ self, chunks: list[bytes], sha: ObjectID | RawObjectID | None = None, *, object_format=None
|
|
|
) -> None:
|
|
|
"""Set the contents of this object from a list of chunks."""
|
|
|
self._chunked_text = chunks
|
|
|
- # Set SHA before deserialization so Tree can detect hash algorithm
|
|
|
+ # Set hash algorithm if provided
|
|
|
+ if object_format is not None:
|
|
|
+ self.object_format = object_format
|
|
|
+ # Set SHA before deserialization so Tree can use hash algorithm
|
|
|
if sha is None:
|
|
|
self._sha = None
|
|
|
else:
|
|
|
- self._sha = FixedSha(sha) # type: ignore
|
|
|
+ self._sha = FixedSha(sha)
|
|
|
self._deserialize(chunks)
|
|
|
self._needs_serialization = False
|
|
|
|
|
|
@@ -610,25 +621,25 @@ class ShaFile:
|
|
|
return (b0 & 0x8F) == 0x08 and (word % 31) == 0
|
|
|
|
|
|
@classmethod
|
|
|
- def _parse_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile") -> "ShaFile":
|
|
|
+ def _parse_file(
|
|
|
+ cls, f: BufferedIOBase | IO[bytes] | "_GitFile", *, object_format=None
|
|
|
+ ) -> "ShaFile":
|
|
|
map = f.read()
|
|
|
if not map:
|
|
|
raise EmptyFileException("Corrupted empty file detected")
|
|
|
|
|
|
if cls._is_legacy_object(map):
|
|
|
obj = cls._parse_legacy_object_header(map, f)
|
|
|
+ if object_format is not None:
|
|
|
+ obj.object_format = object_format
|
|
|
obj._parse_legacy_object(map)
|
|
|
else:
|
|
|
obj = cls._parse_object_header(map, f)
|
|
|
+ if object_format is not None:
|
|
|
+ obj.object_format = object_format
|
|
|
obj._parse_object(map)
|
|
|
return obj
|
|
|
|
|
|
- def __init__(self) -> None:
|
|
|
- """Don't call this directly."""
|
|
|
- self._sha = None
|
|
|
- self._chunked_text = []
|
|
|
- self._needs_serialization = True
|
|
|
-
|
|
|
def _deserialize(self, chunks: list[bytes]) -> None:
|
|
|
raise NotImplementedError(self._deserialize)
|
|
|
|
|
|
@@ -636,17 +647,33 @@ class ShaFile:
|
|
|
raise NotImplementedError(self._serialize)
|
|
|
|
|
|
@classmethod
|
|
|
- def from_path(cls, path: str | bytes, sha: ObjectID | None = None) -> "ShaFile":
|
|
|
+ def from_path(
|
|
|
+ cls, path: str | bytes, sha: ObjectID | None = None, *, object_format=None
|
|
|
+ ) -> "ShaFile":
|
|
|
"""Open a SHA file from disk."""
|
|
|
with GitFile(path, "rb") as f:
|
|
|
- return cls.from_file(f, sha)
|
|
|
+ return cls.from_file(f, sha, object_format=object_format)
|
|
|
|
|
|
@classmethod
|
|
|
- def from_file(cls, f: BufferedIOBase | IO[bytes] | "_GitFile", sha: ObjectID | None = None) -> "ShaFile":
|
|
|
+ def from_file(
|
|
|
+ cls,
|
|
|
+ f: BufferedIOBase | IO[bytes] | "_GitFile",
|
|
|
+ sha: ObjectID | None = None,
|
|
|
+ *,
|
|
|
+ object_format=None,
|
|
|
+ ) -> "ShaFile":
|
|
|
"""Get the contents of a SHA file on disk."""
|
|
|
try:
|
|
|
- obj = cls._parse_file(f)
|
|
|
- # Set SHA after parsing but before any further processing
|
|
|
+ # Validate SHA length matches hash algorithm if both provided
|
|
|
+ if sha is not None and object_format is not None:
|
|
|
+ expected_len = object_format.hex_length
|
|
|
+ if len(sha) != expected_len:
|
|
|
+ raise ValueError(
|
|
|
+ f"SHA length {len(sha)} doesn't match hash algorithm "
|
|
|
+ f"{object_format.name} (expected {expected_len})"
|
|
|
+ )
|
|
|
+
|
|
|
+ obj = cls._parse_file(f, object_format=object_format)
|
|
|
if sha is not None:
|
|
|
obj._sha = FixedSha(sha)
|
|
|
else:
|
|
|
@@ -657,7 +684,11 @@ class ShaFile:
|
|
|
|
|
|
@staticmethod
|
|
|
def from_raw_string(
|
|
|
- type_num: int, string: bytes, sha: ObjectID | RawObjectID | None = None
|
|
|
+ type_num: int,
|
|
|
+ string: bytes,
|
|
|
+ sha: ObjectID | RawObjectID | None = None,
|
|
|
+ *,
|
|
|
+ object_format=None,
|
|
|
) -> "ShaFile":
|
|
|
"""Creates an object of the indicated type from the raw string given.
|
|
|
|
|
|
@@ -665,11 +696,14 @@ class ShaFile:
|
|
|
type_num: The numeric type of the object.
|
|
|
string: The raw uncompressed contents.
|
|
|
sha: Optional known sha for the object
|
|
|
+ object_format: Optional hash algorithm for the object
|
|
|
"""
|
|
|
cls = object_class(type_num)
|
|
|
if cls is None:
|
|
|
raise AssertionError(f"unsupported class type num: {type_num}")
|
|
|
obj = cls()
|
|
|
+ if object_format is not None:
|
|
|
+ obj.object_format = object_format
|
|
|
obj.set_raw_string(string, sha)
|
|
|
return obj
|
|
|
|
|
|
@@ -740,15 +774,15 @@ class ShaFile:
|
|
|
"""Returns the length of the raw string of this object."""
|
|
|
return sum(map(len, self.as_raw_chunks()))
|
|
|
|
|
|
- def sha(self, hash_algorithm=None) -> "FixedSha | HASH":
|
|
|
+ def sha(self, object_format=None) -> "FixedSha | HASH":
|
|
|
"""The SHA object that is the name of this object.
|
|
|
|
|
|
Args:
|
|
|
- hash_algorithm: Optional HashAlgorithm to use. Defaults to SHA1.
|
|
|
+ object_format: Optional HashAlgorithm to use. Defaults to SHA1.
|
|
|
"""
|
|
|
# If using a different hash algorithm, always recalculate
|
|
|
- if hash_algorithm is not None:
|
|
|
- new_sha = hash_algorithm.new_hash()
|
|
|
+ if object_format is not None:
|
|
|
+ new_sha = object_format.new_hash()
|
|
|
new_sha.update(self._header())
|
|
|
for chunk in self.as_raw_chunks():
|
|
|
new_sha.update(chunk)
|
|
|
@@ -775,16 +809,16 @@ class ShaFile:
|
|
|
def id(self) -> ObjectID:
|
|
|
"""The hex SHA1 of this object.
|
|
|
|
|
|
- For SHA256 repositories, use get_id(hash_algorithm) instead.
|
|
|
+ For SHA256 repositories, use get_id(object_format) instead.
|
|
|
This property always returns SHA1 for backward compatibility.
|
|
|
"""
|
|
|
return ObjectID(self.sha().hexdigest().encode("ascii"))
|
|
|
|
|
|
- def get_id(self, hash_algorithm=None):
|
|
|
+ def get_id(self, object_format=None) -> bytes:
|
|
|
"""Get the hex SHA of this object using the specified hash algorithm.
|
|
|
|
|
|
Args:
|
|
|
- hash_algorithm: Optional HashAlgorithm to use. Defaults to SHA1.
|
|
|
+ object_format: Optional HashAlgorithm to use. Defaults to SHA1.
|
|
|
|
|
|
Example:
|
|
|
>>> blob = Blob()
|
|
|
@@ -793,11 +827,11 @@ class ShaFile:
|
|
|
b'4ab299c8ad6ed14f31923dd94f8b5f5cb89dfb54'
|
|
|
>>> blob.get_id() # Same as .id
|
|
|
b'4ab299c8ad6ed14f31923dd94f8b5f5cb89dfb54'
|
|
|
- >>> from dulwich.hash import SHA256
|
|
|
+ >>> from dulwich.object_format import SHA256
|
|
|
>>> blob.get_id(SHA256) # Get SHA256 hash
|
|
|
b'03ba204e2f2e707...' # 64-character SHA256
|
|
|
"""
|
|
|
- return self.sha(hash_algorithm).hexdigest().encode("ascii")
|
|
|
+ return self.sha(object_format).hexdigest().encode("ascii")
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
"""Return string representation of this object."""
|
|
|
@@ -869,11 +903,12 @@ class Blob(ShaFile):
|
|
|
)
|
|
|
|
|
|
@classmethod
|
|
|
- def from_path(cls, path: str | bytes) -> "Blob":
|
|
|
+ def from_path(cls, path: str | bytes, sha: ObjectID | None = None) -> "Blob":
|
|
|
"""Read a blob from a file on disk.
|
|
|
|
|
|
Args:
|
|
|
path: Path to the blob file
|
|
|
+ sha: Optional known SHA for the object
|
|
|
|
|
|
Returns:
|
|
|
A Blob object
|
|
|
@@ -881,7 +916,7 @@ class Blob(ShaFile):
|
|
|
Raises:
|
|
|
NotBlobError: If the file is not a blob
|
|
|
"""
|
|
|
- blob = ShaFile.from_path(path)
|
|
|
+ blob = ShaFile.from_path(path, sha)
|
|
|
if not isinstance(blob, cls):
|
|
|
raise NotBlobError(_path_to_bytes(path))
|
|
|
return blob
|
|
|
@@ -1030,11 +1065,12 @@ class Tag(ShaFile):
|
|
|
self._signature: bytes | None = None
|
|
|
|
|
|
@classmethod
|
|
|
- def from_path(cls, filename: str | bytes) -> "Tag":
|
|
|
+ def from_path(cls, filename: str | bytes, sha: ObjectID | None = None) -> "Tag":
|
|
|
"""Read a tag from a file on disk.
|
|
|
|
|
|
Args:
|
|
|
filename: Path to the tag file
|
|
|
+ sha: Optional known SHA for the object
|
|
|
|
|
|
Returns:
|
|
|
A Tag object
|
|
|
@@ -1042,7 +1078,7 @@ class Tag(ShaFile):
|
|
|
Raises:
|
|
|
NotTagError: If the file is not a tag
|
|
|
"""
|
|
|
- tag = ShaFile.from_path(filename)
|
|
|
+ tag = ShaFile.from_path(filename, sha)
|
|
|
if not isinstance(tag, cls):
|
|
|
raise NotTagError(_path_to_bytes(filename))
|
|
|
return tag
|
|
|
@@ -1310,21 +1346,21 @@ class TreeEntry(NamedTuple):
|
|
|
|
|
|
|
|
|
def parse_tree(
|
|
|
- text: bytes, strict: bool = False, hash_algorithm=None
|
|
|
+ text: bytes, strict: bool = False, object_format=None
|
|
|
) -> Iterator[tuple[bytes, int, ObjectID]]:
|
|
|
"""Parse a tree text.
|
|
|
|
|
|
Args:
|
|
|
text: Serialized text to parse
|
|
|
strict: Whether to be strict about format
|
|
|
- hash_algorithm: Hash algorithm object (SHA1 or SHA256) - if None, auto-detect
|
|
|
+ object_format: Hash algorithm object (SHA1 or SHA256) - if None, auto-detect
|
|
|
Returns: iterator of tuples of (name, mode, sha)
|
|
|
|
|
|
Raises:
|
|
|
ObjectFormatException: if the object was malformed in some way
|
|
|
"""
|
|
|
- if hash_algorithm is not None:
|
|
|
- sha_len = hash_algorithm.oid_length
|
|
|
+ if object_format is not None:
|
|
|
+ sha_len = object_format.oid_length
|
|
|
return _parse_tree_with_sha_len(text, strict, sha_len)
|
|
|
|
|
|
# Try both hash lengths and use the one that works
|
|
|
@@ -1336,7 +1372,9 @@ def parse_tree(
|
|
|
return _parse_tree_with_sha_len(text, strict, 32)
|
|
|
|
|
|
|
|
|
-def _parse_tree_with_sha_len(text, strict, sha_len):
|
|
|
+def _parse_tree_with_sha_len(
|
|
|
+ text: bytes, strict: bool, sha_len: int
|
|
|
+) -> Iterator[tuple[bytes, int, bytes]]:
|
|
|
"""Helper function to parse tree with a specific hash length."""
|
|
|
count = 0
|
|
|
length = len(text)
|
|
|
@@ -1474,27 +1512,6 @@ class Tree(ShaFile):
|
|
|
super().__init__()
|
|
|
self._entries: dict[bytes, tuple[int, ObjectID]] = {}
|
|
|
|
|
|
- def _get_hash_algorithm(self):
|
|
|
- """Get the hash algorithm based on the object's SHA."""
|
|
|
- if not hasattr(self, "_sha") or self._sha is None:
|
|
|
- return None
|
|
|
-
|
|
|
- # Get the raw SHA bytes
|
|
|
- sha = self._sha.digest() if hasattr(self._sha, "digest") else self._sha
|
|
|
- if not isinstance(sha, bytes):
|
|
|
- return None
|
|
|
-
|
|
|
- # Import hash modules lazily to avoid circular imports
|
|
|
- if len(sha) == 32:
|
|
|
- from .hash import SHA256
|
|
|
-
|
|
|
- return SHA256
|
|
|
- elif len(sha) == 20:
|
|
|
- from .hash import SHA1
|
|
|
-
|
|
|
- return SHA1
|
|
|
- return None
|
|
|
-
|
|
|
@classmethod
|
|
|
def from_path(cls, filename: str | bytes, sha: ObjectID | None = None) -> "Tree":
|
|
|
"""Read a tree from a file on disk.
|
|
|
@@ -1581,7 +1598,7 @@ class Tree(ShaFile):
|
|
|
"""Grab the entries in the tree."""
|
|
|
try:
|
|
|
parsed_entries = parse_tree(
|
|
|
- b"".join(chunks), hash_algorithm=self._get_hash_algorithm()
|
|
|
+ b"".join(chunks), object_format=self.object_format
|
|
|
)
|
|
|
except ValueError as exc:
|
|
|
raise ObjectFormatException(exc) from exc
|
|
|
@@ -1611,9 +1628,9 @@ class Tree(ShaFile):
|
|
|
for name, mode, sha in parse_tree(
|
|
|
b"".join(self._chunked_text),
|
|
|
strict=True,
|
|
|
- hash_algorithm=self._get_hash_algorithm(),
|
|
|
+ object_format=self.object_format,
|
|
|
):
|
|
|
- check_hexsha(sha, f"invalid sha {sha}")
|
|
|
+ check_hexsha(sha, f"invalid sha {sha!r}")
|
|
|
if b"/" in name or name in (b"", b".", b"..", b".git"):
|
|
|
raise ObjectFormatException(
|
|
|
"invalid name {}".format(name.decode("utf-8", "replace"))
|
|
|
@@ -1884,11 +1901,12 @@ class Commit(ShaFile):
|
|
|
self._commit_timezone_neg_utc: bool | None = False
|
|
|
|
|
|
@classmethod
|
|
|
- def from_path(cls, path: str | bytes) -> "Commit":
|
|
|
+ def from_path(cls, path: str | bytes, sha: ObjectID | None = None) -> "Commit":
|
|
|
"""Read a commit from a file on disk.
|
|
|
|
|
|
Args:
|
|
|
path: Path to the commit file
|
|
|
+ sha: Optional known SHA for the object
|
|
|
|
|
|
Returns:
|
|
|
A Commit object
|
|
|
@@ -1896,7 +1914,7 @@ class Commit(ShaFile):
|
|
|
Raises:
|
|
|
NotCommitError: If the file is not a commit
|
|
|
"""
|
|
|
- commit = ShaFile.from_path(path)
|
|
|
+ commit = ShaFile.from_path(path, sha)
|
|
|
if not isinstance(commit, cls):
|
|
|
raise NotCommitError(_path_to_bytes(path))
|
|
|
return commit
|