| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499 |
- # objects.py -- Access to base git objects
- # Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
- # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Access to base git objects."""
- __all__ = [
- "BEGIN_PGP_SIGNATURE",
- "BEGIN_SSH_SIGNATURE",
- "MAX_TIME",
- "OBJECT_CLASSES",
- "SIGNATURE_PGP",
- "SIGNATURE_SSH",
- "S_IFGITLINK",
- "S_ISGITLINK",
- "ZERO_SHA",
- "Blob",
- "Commit",
- "EmptyFileException",
- "FixedSha",
- "ObjectID",
- "RawObjectID",
- "ShaFile",
- "SubmoduleEncountered",
- "Tag",
- "Tree",
- "TreeEntry",
- "check_hexsha",
- "check_identity",
- "check_time",
- "filename_to_hex",
- "format_time_entry",
- "format_timezone",
- "git_line",
- "hex_to_filename",
- "hex_to_sha",
- "is_blob",
- "is_commit",
- "is_tag",
- "is_tree",
- "key_entry",
- "key_entry_name_order",
- "object_class",
- "object_header",
- "parse_commit",
- "parse_commit_broken",
- "parse_tree",
- "pretty_format_tree_entry",
- "serializable_property",
- "serialize_tree",
- "sha_to_hex",
- "sorted_tree_items",
- "valid_hexsha",
- ]
- import binascii
- import os
- import posixpath
- import re
- import stat
- import sys
- import zlib
- from collections.abc import Callable, Iterable, Iterator, Sequence
- from hashlib import sha1
- from io import BufferedIOBase, BytesIO
- from typing import (
- IO,
- TYPE_CHECKING,
- NamedTuple,
- TypeVar,
- )
- if sys.version_info >= (3, 11):
- from typing import Self
- else:
- from typing_extensions import Self
- from typing import NewType, TypeGuard
- from . import replace_me
- from .errors import (
- ChecksumMismatch,
- FileFormatException,
- NotBlobError,
- NotCommitError,
- NotTagError,
- NotTreeError,
- ObjectFormatException,
- )
- from .file import GitFile
- from .object_format import DEFAULT_OBJECT_FORMAT, ObjectFormat
- if TYPE_CHECKING:
- from _hashlib import HASH
- from .file import _GitFile
- # Zero SHA constants for backward compatibility - now defined below as ObjectID
- # Header fields for commits
- _TREE_HEADER = b"tree"
- _PARENT_HEADER = b"parent"
- _AUTHOR_HEADER = b"author"
- _COMMITTER_HEADER = b"committer"
- _ENCODING_HEADER = b"encoding"
- _MERGETAG_HEADER = b"mergetag"
- _GPGSIG_HEADER = b"gpgsig"
- # Header fields for objects
- _OBJECT_HEADER = b"object"
- _TYPE_HEADER = b"type"
- _TAG_HEADER = b"tag"
- _TAGGER_HEADER = b"tagger"
- S_IFGITLINK = 0o160000
- # Intentionally flexible regex to support various types of brokenness
- # in commit/tag author/committer/tagger lines
- _TIME_ENTRY_RE = re.compile(
- b"^(?P<person>.*) (?P<time>-?[0-9]+) (?P<timezone>[+-]{0,2}[0-9]+)$"
- )
- MAX_TIME = 9223372036854775807 # (2**63) - 1 - signed long int max
- BEGIN_PGP_SIGNATURE = b"-----BEGIN PGP SIGNATURE-----"
- BEGIN_SSH_SIGNATURE = b"-----BEGIN SSH SIGNATURE-----"
- # Signature type constants
- SIGNATURE_PGP = b"pgp"
- SIGNATURE_SSH = b"ssh"
- # Hex SHA type
- ObjectID = NewType("ObjectID", bytes)
- # Raw SHA type
- RawObjectID = NewType("RawObjectID", bytes)
- # Zero SHA constant
- ZERO_SHA: ObjectID = ObjectID(b"0" * 40)
- class EmptyFileException(FileFormatException):
- """An unexpectedly empty file was encountered."""
- def S_ISGITLINK(m: int) -> bool:
- """Check if a mode indicates a submodule.
- Args:
- m: Mode to check
- Returns: a ``boolean``
- """
- return stat.S_IFMT(m) == S_IFGITLINK
- def _decompress(string: bytes) -> bytes:
- dcomp = zlib.decompressobj()
- dcomped = dcomp.decompress(string)
- dcomped += dcomp.flush()
- return dcomped
- def sha_to_hex(sha: RawObjectID) -> ObjectID:
- """Takes a string and returns the hex of the sha within."""
- hexsha = binascii.hexlify(sha)
- # Support both SHA1 (40 chars) and SHA256 (64 chars)
- if len(hexsha) not in (40, 64):
- raise ValueError(f"Incorrect length of sha string: {hexsha!r}")
- return ObjectID(hexsha)
- def hex_to_sha(hex: ObjectID | str) -> RawObjectID:
- """Takes a hex sha and returns a binary sha."""
- # Support both SHA1 (40 chars) and SHA256 (64 chars)
- if len(hex) not in (40, 64):
- raise ValueError(f"Incorrect length of hexsha: {hex!r}")
- try:
- return RawObjectID(binascii.unhexlify(hex))
- except TypeError as exc:
- if not isinstance(hex, bytes):
- raise
- raise ValueError(exc.args[0]) from exc
- def valid_hexsha(hex: bytes | str) -> bool:
- """Check if a hex string is a valid SHA1 or SHA256.
- Args:
- hex: Hex string to validate
- Returns:
- True if valid SHA1 (40 chars) or SHA256 (64 chars), False otherwise
- """
- if len(hex) not in (40, 64):
- return False
- try:
- binascii.unhexlify(hex)
- except (TypeError, binascii.Error):
- return False
- else:
- return True
- PathT = TypeVar("PathT", str, bytes)
- def hex_to_filename(path: PathT, hex: str | bytes) -> PathT:
- """Takes a hex sha and returns its filename relative to the given path."""
- # os.path.join accepts bytes or unicode, but all args must be of the same
- # type. Make sure that hex which is expected to be bytes, is the same type
- # as path.
- if isinstance(path, str):
- if isinstance(hex, bytes):
- hex_str = hex.decode("ascii")
- else:
- hex_str = hex
- dir_name = hex_str[:2]
- file_name = hex_str[2:]
- result = os.path.join(path, dir_name, file_name)
- assert isinstance(result, str)
- return result
- else:
- # path is bytes
- if isinstance(hex, str):
- hex_bytes = hex.encode("ascii")
- else:
- hex_bytes = hex
- dir_name_b = hex_bytes[:2]
- file_name_b = hex_bytes[2:]
- result_b = os.path.join(path, dir_name_b, file_name_b)
- assert isinstance(result_b, bytes)
- return result_b
- def filename_to_hex(filename: str | bytes) -> str:
- """Takes an object filename and returns its corresponding hex sha."""
- # grab the last (up to) two path components
- errmsg = f"Invalid object filename: {filename!r}"
- if isinstance(filename, str):
- names = filename.rsplit(os.path.sep, 2)[-2:]
- assert len(names) == 2, errmsg
- base, rest = names
- assert len(base) == 2 and len(rest) == 38, errmsg
- hex_str = base + rest
- hex_bytes = hex_str.encode("ascii")
- else:
- # filename is bytes
- sep = (
- os.path.sep.encode("ascii") if isinstance(os.path.sep, str) else os.path.sep
- )
- names_b = filename.rsplit(sep, 2)[-2:]
- assert len(names_b) == 2, errmsg
- base_b, rest_b = names_b
- assert len(base_b) == 2 and len(rest_b) == 38, errmsg
- hex_bytes = base_b + rest_b
- hex_to_sha(ObjectID(hex_bytes))
- return hex_bytes.decode("ascii")
- def object_header(num_type: int, length: int) -> bytes:
- """Return an object header for the given numeric type and text length."""
- cls = object_class(num_type)
- if cls is None:
- raise AssertionError(f"unsupported class type num: {num_type}")
- return cls.type_name + b" " + str(length).encode("ascii") + b"\0"
- def serializable_property(name: str, docstring: str | None = None) -> property:
- """A property that helps tracking whether serialization is necessary."""
- def set(obj: "ShaFile", value: object) -> None:
- """Set the property value and mark the object as needing serialization.
- Args:
- obj: The ShaFile object
- value: The value to set
- """
- setattr(obj, "_" + name, value)
- obj._needs_serialization = True
- def get(obj: "ShaFile") -> object:
- """Get the property value.
- Args:
- obj: The ShaFile object
- Returns:
- The property value
- """
- return getattr(obj, "_" + name)
- return property(get, set, doc=docstring)
- def object_class(type: bytes | int) -> type["ShaFile"] | None:
- """Get the object class corresponding to the given type.
- Args:
- type: Either a type name string or a numeric type.
- Returns: The ShaFile subclass corresponding to the given type, or None if
- type is not a valid type name/number.
- """
- return _TYPE_MAP.get(type, None)
- def check_hexsha(hex: str | bytes, error_msg: str) -> None:
- """Check if a string is a valid hex sha string.
- Args:
- hex: Hex string to check
- error_msg: Error message to use in exception
- Raises:
- ObjectFormatException: Raised when the string is not valid
- """
- if not valid_hexsha(hex):
- raise ObjectFormatException(f"{error_msg} {hex!r}")
- def check_identity(identity: bytes | None, error_msg: str) -> None:
- """Check if the specified identity is valid.
- This will raise an exception if the identity is not valid.
- Args:
- identity: Identity string
- error_msg: Error message to use in exception
- """
- if identity is None:
- raise ObjectFormatException(error_msg)
- email_start = identity.find(b"<")
- email_end = identity.find(b">")
- if not all(
- [
- email_start >= 1,
- identity[email_start - 1] == b" "[0],
- identity.find(b"<", email_start + 1) == -1,
- email_end == len(identity) - 1,
- b"\0" not in identity,
- b"\n" not in identity,
- ]
- ):
- raise ObjectFormatException(error_msg)
- def _path_to_bytes(path: str | bytes) -> bytes:
- """Convert a path to bytes for use in error messages."""
- if isinstance(path, str):
- return path.encode("utf-8", "surrogateescape")
- return path
- def check_time(time_seconds: int) -> None:
- """Check if the specified time is not prone to overflow error.
- This will raise an exception if the time is not valid.
- Args:
- time_seconds: time in seconds
- """
- # Prevent overflow error
- if time_seconds > MAX_TIME:
- raise ObjectFormatException(f"Date field should not exceed {MAX_TIME}")
- def git_line(*items: bytes) -> bytes:
- """Formats items into a space separated line."""
- return b" ".join(items) + b"\n"
- class FixedSha:
- """SHA object that behaves like hashlib's but is given a fixed value."""
- __slots__ = ("_hexsha", "_sha")
- def __init__(self, hexsha: str | bytes) -> None:
- """Initialize FixedSha with a fixed SHA value.
- Args:
- hexsha: Hex SHA value as string or bytes
- """
- if isinstance(hexsha, str):
- hexsha = hexsha.encode("ascii")
- if not isinstance(hexsha, bytes):
- raise TypeError(f"Expected bytes for hexsha, got {hexsha!r}")
- self._hexsha = hexsha
- self._sha = hex_to_sha(ObjectID(hexsha))
- def digest(self) -> bytes:
- """Return the raw SHA digest."""
- return self._sha
- def hexdigest(self) -> str:
- """Return the hex SHA digest."""
- return self._hexsha.decode("ascii")
- # Type guard functions for runtime type narrowing
- if TYPE_CHECKING:
- def is_commit(obj: "ShaFile") -> TypeGuard["Commit"]:
- """Check if a ShaFile is a Commit."""
- return obj.type_name == b"commit"
- def is_tree(obj: "ShaFile") -> TypeGuard["Tree"]:
- """Check if a ShaFile is a Tree."""
- return obj.type_name == b"tree"
- def is_blob(obj: "ShaFile") -> TypeGuard["Blob"]:
- """Check if a ShaFile is a Blob."""
- return obj.type_name == b"blob"
- def is_tag(obj: "ShaFile") -> TypeGuard["Tag"]:
- """Check if a ShaFile is a Tag."""
- return obj.type_name == b"tag"
- else:
- # Runtime versions without type narrowing
- def is_commit(obj: "ShaFile") -> bool:
- """Check if a ShaFile is a Commit."""
- return obj.type_name == b"commit"
- def is_tree(obj: "ShaFile") -> bool:
- """Check if a ShaFile is a Tree."""
- return obj.type_name == b"tree"
- def is_blob(obj: "ShaFile") -> bool:
- """Check if a ShaFile is a Blob."""
- return obj.type_name == b"blob"
- def is_tag(obj: "ShaFile") -> bool:
- """Check if a ShaFile is a Tag."""
- return obj.type_name == b"tag"
- class ShaFile:
- """A git SHA file."""
- __slots__ = ("_chunked_text", "_needs_serialization", "_sha", "object_format")
- _needs_serialization: bool
- type_name: bytes
- type_num: int
- _chunked_text: list[bytes] | None
- _sha: "FixedSha | None | HASH"
- object_format: ObjectFormat
- def __init__(self) -> None:
- """Initialize a ShaFile."""
- self._sha = None
- self._chunked_text = None
- self._needs_serialization = True
- self.object_format = DEFAULT_OBJECT_FORMAT
- @staticmethod
- def _parse_legacy_object_header(
- magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile"
- ) -> "ShaFile":
- """Parse a legacy object, creating it but not reading the file."""
- bufsize = 1024
- decomp = zlib.decompressobj()
- header = decomp.decompress(magic)
- start = 0
- end = -1
- while end < 0:
- extra = f.read(bufsize)
- header += decomp.decompress(extra)
- magic += extra
- end = header.find(b"\0", start)
- start = len(header)
- header = header[:end]
- type_name, size = header.split(b" ", 1)
- try:
- int(size) # sanity check
- except ValueError as exc:
- raise ObjectFormatException(f"Object size not an integer: {exc}") from exc
- obj_class = object_class(type_name)
- if not obj_class:
- raise ObjectFormatException(
- "Not a known type: {}".format(type_name.decode("ascii"))
- )
- return obj_class()
- def _parse_legacy_object(self, map: bytes) -> None:
- """Parse a legacy object, setting the raw string."""
- text = _decompress(map)
- header_end = text.find(b"\0")
- if header_end < 0:
- raise ObjectFormatException("Invalid object header, no \\0")
- self.set_raw_string(text[header_end + 1 :])
- def as_legacy_object_chunks(self, compression_level: int = -1) -> Iterator[bytes]:
- """Return chunks representing the object in the experimental format.
- Returns: List of strings
- """
- compobj = zlib.compressobj(compression_level)
- yield compobj.compress(self._header())
- for chunk in self.as_raw_chunks():
- yield compobj.compress(chunk)
- yield compobj.flush()
- def as_legacy_object(self, compression_level: int = -1) -> bytes:
- """Return string representing the object in the experimental format."""
- return b"".join(
- self.as_legacy_object_chunks(compression_level=compression_level)
- )
- def as_raw_chunks(self) -> list[bytes]:
- """Return chunks with serialization of the object.
- Returns: List of strings, not necessarily one per line
- """
- if self._needs_serialization:
- self._sha = None
- self._chunked_text = self._serialize()
- self._needs_serialization = False
- assert self._chunked_text is not None
- return self._chunked_text
- def as_raw_string(self) -> bytes:
- """Return raw string with serialization of the object.
- Returns: String object
- """
- return b"".join(self.as_raw_chunks())
- def __bytes__(self) -> bytes:
- """Return raw string serialization of this object."""
- return self.as_raw_string()
- def __hash__(self) -> int:
- """Return unique hash for this object."""
- return hash(self.id)
- def as_pretty_string(self) -> str:
- """Return a string representing this object, fit for display."""
- return self.as_raw_string().decode("utf-8", "replace")
- def set_raw_string(
- self, text: bytes, sha: ObjectID | RawObjectID | None = None
- ) -> None:
- """Set the contents of this object from a serialized string."""
- if not isinstance(text, bytes):
- raise TypeError(f"Expected bytes for text, got {text!r}")
- self.set_raw_chunks([text], sha)
- def set_raw_chunks(
- self,
- chunks: list[bytes],
- sha: ObjectID | RawObjectID | None = None,
- *,
- object_format: ObjectFormat | None = None,
- ) -> None:
- """Set the contents of this object from a list of chunks."""
- self._chunked_text = chunks
- # Set hash algorithm if provided
- if object_format is not None:
- self.object_format = object_format
- # Set SHA before deserialization so Tree can use hash algorithm
- if sha is None:
- self._sha = None
- else:
- self._sha = FixedSha(sha)
- self._deserialize(chunks)
- self._needs_serialization = False
- @staticmethod
- def _parse_object_header(
- magic: bytes, f: BufferedIOBase | IO[bytes] | "_GitFile"
- ) -> "ShaFile":
- """Parse a new style object, creating it but not reading the file."""
- num_type = (ord(magic[0:1]) >> 4) & 7
- obj_class = object_class(num_type)
- if not obj_class:
- raise ObjectFormatException(f"Not a known type {num_type}")
- return obj_class()
- def _parse_object(self, map: bytes) -> None:
- """Parse a new style object, setting self._text."""
- # skip type and size; type must have already been determined, and
- # we trust zlib to fail if it's otherwise corrupted
- byte = ord(map[0:1])
- used = 1
- while (byte & 0x80) != 0:
- byte = ord(map[used : used + 1])
- used += 1
- raw = map[used:]
- self.set_raw_string(_decompress(raw))
- @classmethod
- def _is_legacy_object(cls, magic: bytes) -> bool:
- b0 = ord(magic[0:1])
- b1 = ord(magic[1:2])
- word = (b0 << 8) + b1
- return (b0 & 0x8F) == 0x08 and (word % 31) == 0
- @classmethod
- def _parse_file(
- cls,
- f: BufferedIOBase | IO[bytes] | "_GitFile",
- *,
- object_format: ObjectFormat | None = None,
- ) -> "ShaFile":
- map = f.read()
- if not map:
- raise EmptyFileException("Corrupted empty file detected")
- if cls._is_legacy_object(map):
- obj = cls._parse_legacy_object_header(map, f)
- if object_format is not None:
- obj.object_format = object_format
- obj._parse_legacy_object(map)
- else:
- obj = cls._parse_object_header(map, f)
- if object_format is not None:
- obj.object_format = object_format
- obj._parse_object(map)
- return obj
- def _deserialize(self, chunks: list[bytes]) -> None:
- raise NotImplementedError(self._deserialize)
- def _serialize(self) -> list[bytes]:
- raise NotImplementedError(self._serialize)
- @classmethod
- def from_path(
- cls,
- path: str | bytes,
- sha: ObjectID | None = None,
- *,
- object_format: ObjectFormat | None = None,
- ) -> "ShaFile":
- """Open a SHA file from disk."""
- with GitFile(path, "rb") as f:
- return cls.from_file(f, sha, object_format=object_format)
- @classmethod
- def from_file(
- cls,
- f: BufferedIOBase | IO[bytes] | "_GitFile",
- sha: ObjectID | None = None,
- *,
- object_format: ObjectFormat | None = None,
- ) -> "ShaFile":
- """Get the contents of a SHA file on disk."""
- try:
- # Validate SHA length matches hash algorithm if both provided
- if sha is not None and object_format is not None:
- expected_len = object_format.hex_length
- if len(sha) != expected_len:
- raise ValueError(
- f"SHA length {len(sha)} doesn't match hash algorithm "
- f"{object_format.name} (expected {expected_len})"
- )
- obj = cls._parse_file(f, object_format=object_format)
- if sha is not None:
- obj._sha = FixedSha(sha)
- else:
- obj._sha = None
- return obj
- except (IndexError, ValueError) as exc:
- raise ObjectFormatException("invalid object header") from exc
- @staticmethod
- def from_raw_string(
- type_num: int,
- string: bytes,
- sha: ObjectID | RawObjectID | None = None,
- *,
- object_format: ObjectFormat | None = None,
- ) -> "ShaFile":
- """Creates an object of the indicated type from the raw string given.
- Args:
- type_num: The numeric type of the object.
- string: The raw uncompressed contents.
- sha: Optional known sha for the object
- object_format: Optional hash algorithm for the object
- """
- cls = object_class(type_num)
- if cls is None:
- raise AssertionError(f"unsupported class type num: {type_num}")
- obj = cls()
- if object_format is not None:
- obj.object_format = object_format
- obj.set_raw_string(string, sha)
- return obj
- @staticmethod
- def from_raw_chunks(
- type_num: int, chunks: list[bytes], sha: ObjectID | RawObjectID | None = None
- ) -> "ShaFile":
- """Creates an object of the indicated type from the raw chunks given.
- Args:
- type_num: The numeric type of the object.
- chunks: An iterable of the raw uncompressed contents.
- sha: Optional known sha for the object
- """
- cls = object_class(type_num)
- if cls is None:
- raise AssertionError(f"unsupported class type num: {type_num}")
- obj = cls()
- obj.set_raw_chunks(chunks, sha)
- return obj
- @classmethod
- def from_string(cls, string: bytes) -> Self:
- """Create a ShaFile from a string."""
- obj = cls()
- obj.set_raw_string(string)
- return obj
- def _check_has_member(self, member: str, error_msg: str) -> None:
- """Check that the object has a given member variable.
- Args:
- member: the member variable to check for
- error_msg: the message for an error if the member is missing
- Raises:
- ObjectFormatException: with the given error_msg if member is
- missing or is None
- """
- if getattr(self, member, None) is None:
- raise ObjectFormatException(error_msg)
- def check(self) -> None:
- """Check this object for internal consistency.
- Raises:
- ObjectFormatException: if the object is malformed in some way
- ChecksumMismatch: if the object was created with a SHA that does
- not match its contents
- """
- # TODO: if we find that error-checking during object parsing is a
- # performance bottleneck, those checks should be moved to the class's
- # check() method during optimization so we can still check the object
- # when necessary.
- old_sha = self.id
- try:
- self._deserialize(self.as_raw_chunks())
- self._sha = None
- new_sha = self.id
- except Exception as exc:
- raise ObjectFormatException(exc) from exc
- if old_sha != new_sha:
- raise ChecksumMismatch(new_sha, old_sha)
- def _header(self) -> bytes:
- return object_header(self.type_num, self.raw_length())
- def raw_length(self) -> int:
- """Returns the length of the raw string of this object."""
- return sum(map(len, self.as_raw_chunks()))
- def sha(self, object_format: ObjectFormat | None = None) -> "FixedSha | HASH":
- """The SHA object that is the name of this object.
- Args:
- object_format: Optional HashAlgorithm to use. Defaults to SHA1.
- """
- # If using a different hash algorithm, always recalculate
- if object_format is not None:
- new_sha = object_format.new_hash()
- new_sha.update(self._header())
- for chunk in self.as_raw_chunks():
- new_sha.update(chunk)
- return new_sha
- # Otherwise use cached SHA1 value
- if self._sha is None or self._needs_serialization:
- # this is a local because as_raw_chunks() overwrites self._sha
- new_sha = sha1()
- new_sha.update(self._header())
- for chunk in self.as_raw_chunks():
- new_sha.update(chunk)
- self._sha = new_sha
- return self._sha
- def copy(self) -> "ShaFile":
- """Create a new copy of this SHA1 object from its raw string."""
- obj_class = object_class(self.type_num)
- if obj_class is None:
- raise AssertionError(f"invalid type num {self.type_num}")
- return obj_class.from_raw_string(self.type_num, self.as_raw_string(), self.id)
- @property
- def id(self) -> ObjectID:
- """The hex SHA1 of this object.
- For SHA256 repositories, use get_id(object_format) instead.
- This property always returns SHA1 for backward compatibility.
- """
- return ObjectID(self.sha().hexdigest().encode("ascii"))
- def get_id(self, object_format: ObjectFormat | None = None) -> bytes:
- """Get the hex SHA of this object using the specified hash algorithm.
- Args:
- object_format: Optional HashAlgorithm to use. Defaults to SHA1.
- Example:
- >>> blob = Blob()
- >>> blob.data = b"Hello, World!"
- >>> blob.id # Always returns SHA1 for backward compatibility
- b'4ab299c8ad6ed14f31923dd94f8b5f5cb89dfb54'
- >>> blob.get_id() # Same as .id
- b'4ab299c8ad6ed14f31923dd94f8b5f5cb89dfb54'
- >>> from dulwich.object_format import SHA256
- >>> blob.get_id(SHA256) # Get SHA256 hash
- b'03ba204e2f2e707...' # 64-character SHA256
- """
- return self.sha(object_format).hexdigest().encode("ascii")
- def __repr__(self) -> str:
- """Return string representation of this object."""
- return f"<{self.__class__.__name__} {self.id!r}>"
- def __ne__(self, other: object) -> bool:
- """Check whether this object does not match the other."""
- return not isinstance(other, ShaFile) or self.id != other.id
- def __eq__(self, other: object) -> bool:
- """Return True if the SHAs of the two objects match."""
- return isinstance(other, ShaFile) and self.id == other.id
- def __lt__(self, other: object) -> bool:
- """Return whether SHA of this object is less than the other."""
- if not isinstance(other, ShaFile):
- raise TypeError
- return self.id < other.id
- def __le__(self, other: object) -> bool:
- """Check whether SHA of this object is less than or equal to the other."""
- if not isinstance(other, ShaFile):
- raise TypeError
- return self.id <= other.id
- class Blob(ShaFile):
- """A Git Blob object."""
- __slots__ = ()
- type_name = b"blob"
- type_num = 3
- _chunked_text: list[bytes]
- def __init__(self) -> None:
- """Initialize a new Blob object."""
- super().__init__()
- self._chunked_text = []
- self._needs_serialization = False
- def _get_data(self) -> bytes:
- return self.as_raw_string()
- def _set_data(self, data: bytes) -> None:
- self.set_raw_string(data)
- data = property(
- _get_data, _set_data, doc="The text contained within the blob object."
- )
- def _get_chunked(self) -> list[bytes]:
- return self._chunked_text
- def _set_chunked(self, chunks: list[bytes]) -> None:
- self._chunked_text = chunks
- def _serialize(self) -> list[bytes]:
- return self._chunked_text
- def _deserialize(self, chunks: list[bytes]) -> None:
- self._chunked_text = chunks
- chunked = property(
- _get_chunked,
- _set_chunked,
- doc="The text in the blob object, as chunks (not necessarily lines)",
- )
- @classmethod
- def from_path(
- cls,
- path: str | bytes,
- sha: ObjectID | None = None,
- *,
- object_format: ObjectFormat | None = None,
- ) -> "Blob":
- """Read a blob from a file on disk.
- Args:
- path: Path to the blob file
- sha: Optional known SHA for the object
- object_format: Optional object format to use
- Returns:
- A Blob object
- Raises:
- NotBlobError: If the file is not a blob
- """
- blob = ShaFile.from_path(path, sha, object_format=object_format)
- if not isinstance(blob, cls):
- raise NotBlobError(_path_to_bytes(path))
- return blob
- def check(self) -> None:
- """Check this object for internal consistency.
- Raises:
- ObjectFormatException: if the object is malformed in some way
- """
- super().check()
- def splitlines(self) -> list[bytes]:
- """Return list of lines in this blob.
- This preserves the original line endings.
- """
- chunks = self.chunked
- if not chunks:
- return []
- if len(chunks) == 1:
- result: list[bytes] = chunks[0].splitlines(True)
- return result
- remaining = None
- ret = []
- for chunk in chunks:
- lines = chunk.splitlines(True)
- if len(lines) > 1:
- ret.append((remaining or b"") + lines[0])
- ret.extend(lines[1:-1])
- remaining = lines[-1]
- elif len(lines) == 1:
- if remaining is None:
- remaining = lines.pop()
- else:
- remaining += lines.pop()
- if remaining is not None:
- ret.append(remaining)
- return ret
- def _parse_message(
- chunks: Iterable[bytes],
- ) -> Iterator[tuple[None, None] | tuple[bytes | None, bytes]]:
- """Parse a message with a list of fields and a body.
- Args:
- chunks: the raw chunks of the tag or commit object.
- Returns: iterator of tuples of (field, value), one per header line, in the
- order read from the text, possibly including duplicates. Includes a
- field named None for the freeform tag/commit text.
- """
- f = BytesIO(b"".join(chunks))
- k = None
- v = b""
- eof = False
- def _strip_last_newline(value: bytes) -> bytes:
- """Strip the last newline from value."""
- if value and value.endswith(b"\n"):
- return value[:-1]
- return value
- # Parse the headers
- #
- # Headers can contain newlines. The next line is indented with a space.
- # We store the latest key as 'k', and the accumulated value as 'v'.
- for line in f:
- if line.startswith(b" "):
- # Indented continuation of the previous line
- v += line[1:]
- else:
- if k is not None:
- # We parsed a new header, return its value
- yield (k, _strip_last_newline(v))
- if line == b"\n":
- # Empty line indicates end of headers
- break
- (k, v) = line.split(b" ", 1)
- else:
- # We reached end of file before the headers ended. We still need to
- # return the previous header, then we need to return a None field for
- # the text.
- eof = True
- if k is not None:
- yield (k, _strip_last_newline(v))
- yield (None, None)
- if not eof:
- # We didn't reach the end of file while parsing headers. We can return
- # the rest of the file as a message.
- yield (None, f.read())
- f.close()
- def _format_message(
- headers: Sequence[tuple[bytes, bytes]], body: bytes | None
- ) -> Iterator[bytes]:
- for field, value in headers:
- lines = value.split(b"\n")
- yield git_line(field, lines[0])
- for line in lines[1:]:
- yield b" " + line + b"\n"
- yield b"\n" # There must be a new line after the headers
- if body:
- yield body
- class Tag(ShaFile):
- """A Git Tag object."""
- type_name = b"tag"
- type_num = 4
- __slots__ = (
- "_message",
- "_name",
- "_object_class",
- "_object_sha",
- "_signature",
- "_tag_time",
- "_tag_timezone",
- "_tag_timezone_neg_utc",
- "_tagger",
- )
- _message: bytes | None
- _name: bytes | None
- _object_class: "type[ShaFile] | None"
- _object_sha: bytes | None
- _signature: bytes | None
- _tag_time: int | None
- _tag_timezone: int | None
- _tag_timezone_neg_utc: bool | None
- _tagger: bytes | None
- def __init__(self) -> None:
- """Initialize a new Tag object."""
- super().__init__()
- self._tagger = None
- self._tag_time = None
- self._tag_timezone = None
- self._tag_timezone_neg_utc = False
- self._signature: bytes | None = None
- @classmethod
- def from_path(
- cls,
- filename: str | bytes,
- sha: ObjectID | None = None,
- *,
- object_format: ObjectFormat | None = None,
- ) -> "Tag":
- """Read a tag from a file on disk.
- Args:
- filename: Path to the tag file
- sha: Optional known SHA for the object
- object_format: Optional object format to use
- Returns:
- A Tag object
- Raises:
- NotTagError: If the file is not a tag
- """
- tag = ShaFile.from_path(filename, sha, object_format=object_format)
- if not isinstance(tag, cls):
- raise NotTagError(_path_to_bytes(filename))
- return tag
- def check(self) -> None:
- """Check this object for internal consistency.
- Raises:
- ObjectFormatException: if the object is malformed in some way
- """
- super().check()
- assert self._chunked_text is not None
- self._check_has_member("_object_sha", "missing object sha")
- self._check_has_member("_object_class", "missing object type")
- self._check_has_member("_name", "missing tag name")
- if not self._name:
- raise ObjectFormatException("empty tag name")
- if self._object_sha is None:
- raise ObjectFormatException("missing object sha")
- check_hexsha(self._object_sha, "invalid object sha")
- if self._tagger is not None:
- check_identity(self._tagger, "invalid tagger")
- self._check_has_member("_tag_time", "missing tag time")
- if self._tag_time is None:
- raise ObjectFormatException("missing tag time")
- check_time(self._tag_time)
- last = None
- for field, _ in _parse_message(self._chunked_text):
- if field == _OBJECT_HEADER and last is not None:
- raise ObjectFormatException("unexpected object")
- elif field == _TYPE_HEADER and last != _OBJECT_HEADER:
- raise ObjectFormatException("unexpected type")
- elif field == _TAG_HEADER and last != _TYPE_HEADER:
- raise ObjectFormatException("unexpected tag name")
- elif field == _TAGGER_HEADER and last != _TAG_HEADER:
- raise ObjectFormatException("unexpected tagger")
- last = field
- def _serialize(self) -> list[bytes]:
- headers = []
- if self._object_sha is None:
- raise ObjectFormatException("missing object sha")
- headers.append((_OBJECT_HEADER, self._object_sha))
- if self._object_class is None:
- raise ObjectFormatException("missing object class")
- headers.append((_TYPE_HEADER, self._object_class.type_name))
- if self._name is None:
- raise ObjectFormatException("missing tag name")
- headers.append((_TAG_HEADER, self._name))
- if self._tagger:
- if self._tag_time is None:
- headers.append((_TAGGER_HEADER, self._tagger))
- else:
- if self._tag_timezone is None or self._tag_timezone_neg_utc is None:
- raise ObjectFormatException("missing timezone info")
- headers.append(
- (
- _TAGGER_HEADER,
- format_time_entry(
- self._tagger,
- self._tag_time,
- (self._tag_timezone, self._tag_timezone_neg_utc),
- ),
- )
- )
- if self.message is None and self._signature is None:
- body = None
- else:
- body = (self.message or b"") + (self._signature or b"")
- return list(_format_message(headers, body))
- def _deserialize(self, chunks: list[bytes]) -> None:
- """Grab the metadata attached to the tag."""
- self._tagger = None
- self._tag_time = None
- self._tag_timezone = None
- self._tag_timezone_neg_utc = False
- for field, value in _parse_message(chunks):
- if field == _OBJECT_HEADER:
- self._object_sha = value
- elif field == _TYPE_HEADER:
- assert isinstance(value, bytes)
- obj_class = object_class(value)
- if not obj_class:
- raise ObjectFormatException(f"Not a known type: {value!r}")
- self._object_class = obj_class
- elif field == _TAG_HEADER:
- self._name = value
- elif field == _TAGGER_HEADER:
- if value is None:
- raise ObjectFormatException("missing tagger value")
- (
- self._tagger,
- self._tag_time,
- (self._tag_timezone, self._tag_timezone_neg_utc),
- ) = parse_time_entry(value)
- elif field is None:
- if value is None:
- self._message = None
- self._signature = None
- else:
- # Try to find either PGP or SSH signature
- sig_idx = None
- try:
- sig_idx = value.index(BEGIN_PGP_SIGNATURE)
- except ValueError:
- try:
- sig_idx = value.index(BEGIN_SSH_SIGNATURE)
- except ValueError:
- pass
- if sig_idx is not None:
- self._message = value[:sig_idx]
- self._signature = value[sig_idx:]
- else:
- self._message = value
- self._signature = None
- else:
- raise ObjectFormatException(
- f"Unknown field {field.decode('ascii', 'replace')}"
- )
- def _get_object(self) -> tuple[type[ShaFile], bytes]:
- """Get the object pointed to by this tag.
- Returns: tuple of (object class, sha).
- """
- if self._object_class is None or self._object_sha is None:
- raise ValueError("Tag object is not properly initialized")
- return (self._object_class, self._object_sha)
- def _set_object(self, value: tuple[type[ShaFile], bytes]) -> None:
- (self._object_class, self._object_sha) = value
- self._needs_serialization = True
- object = property(_get_object, _set_object)
- name = serializable_property("name", "The name of this tag")
- tagger = serializable_property(
- "tagger", "Returns the name of the person who created this tag"
- )
- tag_time = serializable_property(
- "tag_time",
- "The creation timestamp of the tag. As the number of seconds since the epoch",
- )
- tag_timezone = serializable_property(
- "tag_timezone", "The timezone that tag_time is in."
- )
- message = serializable_property("message", "the message attached to this tag")
- signature = serializable_property("signature", "Optional detached GPG signature")
- def sign(self, keyid: str | None = None) -> None:
- """Sign this tag with a GPG key.
- Args:
- keyid: Optional GPG key ID to use for signing. If not specified,
- the default GPG key will be used.
- """
- import gpg
- with gpg.Context(armor=True) as c:
- if keyid is not None:
- key = c.get_key(keyid)
- with gpg.Context(armor=True, signers=[key]) as ctx:
- self.signature, _unused_result = ctx.sign(
- self.as_raw_string(),
- mode=gpg.constants.sig.mode.DETACH,
- )
- else:
- self.signature, _unused_result = c.sign(
- self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
- )
- def raw_without_sig(self) -> bytes:
- """Return raw string serialization without the GPG/SSH signature.
- self.signature is a signature for the returned raw byte string serialization.
- """
- ret = self.as_raw_string()
- if self._signature:
- ret = ret[: -len(self._signature)]
- return ret
- def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]:
- """Extract the payload, signature, and signature type from this tag.
- Returns:
- tuple of (``payload``, ``signature``, ``signature_type``) where:
- - ``payload``: The raw tag data without the signature
- - ``signature``: The signature bytes if present, None otherwise
- - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature
- Raises:
- ObjectFormatException: If signature has unknown format
- """
- if self._signature is None:
- return self.as_raw_string(), None, None
- payload = self.raw_without_sig()
- # Determine signature type
- if self._signature.startswith(BEGIN_PGP_SIGNATURE):
- sig_type = SIGNATURE_PGP
- elif self._signature.startswith(BEGIN_SSH_SIGNATURE):
- sig_type = SIGNATURE_SSH
- else:
- raise ObjectFormatException("Unknown signature format")
- return payload, self._signature, sig_type
- def verify(self, keyids: Iterable[str] | None = None) -> None:
- """Verify GPG signature for this tag (if it is signed).
- Args:
- keyids: Optional iterable of trusted keyids for this tag.
- If this tag is not signed by any key in keyids verification will
- fail. If not specified, this function only verifies that the tag
- has a valid signature.
- Raises:
- gpg.errors.BadSignatures: if GPG signature verification fails
- gpg.errors.MissingSignatures: if tag was not signed by a key
- specified in keyids
- """
- if self._signature is None:
- return
- import gpg
- with gpg.Context() as ctx:
- data, result = ctx.verify(
- self.raw_without_sig(),
- signature=self._signature,
- )
- if keyids:
- keys = [ctx.get_key(key) for key in keyids]
- for key in keys:
- for subkey in key.subkeys:
- for sig in result.signatures:
- if subkey.can_sign and subkey.fpr == sig.fpr:
- return
- raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
- class TreeEntry(NamedTuple):
- """Named tuple encapsulating a single tree entry."""
- path: bytes
- mode: int
- sha: ObjectID
- def in_path(self, path: bytes) -> "TreeEntry":
- """Return a copy of this entry with the given path prepended."""
- if not isinstance(self.path, bytes):
- raise TypeError(f"Expected bytes for path, got {path!r}")
- return TreeEntry(posixpath.join(path, self.path), self.mode, self.sha)
- def parse_tree(
- text: bytes, sha_len: int | None = None, *, strict: bool = False
- ) -> Iterator[tuple[bytes, int, bytes]]:
- """Parse a tree text.
- Args:
- text: Serialized text to parse
- sha_len: Length of the object IDs in bytes
- strict: Whether to be strict about format
- Returns: iterator of tuples of (name, mode, sha)
- Raises:
- ObjectFormatException: if the object was malformed in some way
- """
- count = 0
- length = len(text)
- while count < length:
- mode_end = text.index(b" ", count)
- mode_text = text[count:mode_end]
- if strict and mode_text.startswith(b"0"):
- raise ObjectFormatException(f"Invalid mode {mode_text!r}")
- try:
- mode = int(mode_text, 8)
- except ValueError as exc:
- raise ObjectFormatException(f"Invalid mode {mode_text!r}") from exc
- name_end = text.index(b"\0", mode_end)
- name = text[mode_end + 1 : name_end]
- if sha_len is None:
- raise ObjectFormatException("sha_len must be specified")
- count = name_end + 1 + sha_len
- if count > length:
- raise ObjectFormatException(
- f"Tree entry extends beyond tree length: {count} > {length}"
- )
- sha = text[name_end + 1 : count]
- if len(sha) != sha_len:
- raise ObjectFormatException(
- f"Sha has invalid length: {len(sha)} != {sha_len}"
- )
- hexsha = sha_to_hex(RawObjectID(sha))
- yield (name, mode, hexsha)
- def serialize_tree(items: Iterable[tuple[bytes, int, ObjectID]]) -> Iterator[bytes]:
- """Serialize the items in a tree to a text.
- Args:
- items: Sorted iterable over (name, mode, sha) tuples
- Returns: Serialized tree text as chunks
- """
- for name, mode, hexsha in items:
- yield (
- (f"{mode:04o}").encode("ascii") + b" " + name + b"\0" + hex_to_sha(hexsha)
- )
- def sorted_tree_items(
- entries: dict[bytes, tuple[int, ObjectID]], name_order: bool
- ) -> Iterator[TreeEntry]:
- """Iterate over a tree entries dictionary.
- Args:
- name_order: If True, iterate entries in order of their name. If
- False, iterate entries in tree order, that is, treat subtree entries as
- having '/' appended.
- entries: Dictionary mapping names to (mode, sha) tuples
- Returns: Iterator over (name, mode, hexsha)
- """
- if name_order:
- key_func = key_entry_name_order
- else:
- key_func = key_entry
- for name, entry in sorted(entries.items(), key=key_func):
- mode, hexsha = entry
- # Stricter type checks than normal to mirror checks in the Rust version.
- mode = int(mode)
- if not isinstance(hexsha, bytes):
- raise TypeError(f"Expected bytes for SHA, got {hexsha!r}")
- yield TreeEntry(name, mode, hexsha)
- def key_entry(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
- """Sort key for tree entry.
- Args:
- entry: (name, value) tuple
- """
- (name, (mode, _sha)) = entry
- if stat.S_ISDIR(mode):
- name += b"/"
- return name
- def key_entry_name_order(entry: tuple[bytes, tuple[int, ObjectID]]) -> bytes:
- """Sort key for tree entry in name order."""
- return entry[0]
- def pretty_format_tree_entry(
- name: bytes, mode: int, hexsha: ObjectID, encoding: str = "utf-8"
- ) -> str:
- """Pretty format tree entry.
- Args:
- name: Name of the directory entry
- mode: Mode of entry
- hexsha: Hexsha of the referenced object
- encoding: Character encoding for the name
- Returns: string describing the tree entry
- """
- if mode & stat.S_IFDIR:
- kind = "tree"
- else:
- kind = "blob"
- return "{:04o} {} {}\t{}\n".format(
- mode,
- kind,
- hexsha.decode("ascii"),
- name.decode(encoding, "replace"),
- )
- class SubmoduleEncountered(Exception):
- """A submodule was encountered while resolving a path."""
- def __init__(self, path: bytes, sha: ObjectID) -> None:
- """Initialize SubmoduleEncountered exception.
- Args:
- path: Path where the submodule was encountered
- sha: SHA of the submodule
- """
- self.path = path
- self.sha = sha
- class Tree(ShaFile):
- """A Git tree object."""
- type_name = b"tree"
- type_num = 2
- __slots__ = "_entries"
- def __init__(self) -> None:
- """Initialize an empty Tree."""
- super().__init__()
- self._entries: dict[bytes, tuple[int, ObjectID]] = {}
- @classmethod
- def from_path(
- cls,
- filename: str | bytes,
- sha: ObjectID | None = None,
- *,
- object_format: ObjectFormat | None = None,
- ) -> "Tree":
- """Read a tree from a file on disk.
- Args:
- filename: Path to the tree file
- sha: Optional known SHA for the object
- object_format: Optional object format to use
- Returns:
- A Tree object
- Raises:
- NotTreeError: If the file is not a tree
- """
- tree = ShaFile.from_path(filename, sha, object_format=object_format)
- if not isinstance(tree, cls):
- raise NotTreeError(_path_to_bytes(filename))
- return tree
- def __contains__(self, name: bytes) -> bool:
- """Check if name exists in tree."""
- return name in self._entries
- def __getitem__(self, name: bytes) -> tuple[int, ObjectID]:
- """Get tree entry by name."""
- return self._entries[name]
- def __setitem__(self, name: bytes, value: tuple[int, ObjectID]) -> None:
- """Set a tree entry by name.
- Args:
- name: The name of the entry, as a string.
- value: A tuple of (mode, hexsha), where mode is the mode of the
- entry as an integral type and hexsha is the hex SHA of the entry as
- a string.
- """
- mode, hexsha = value
- self._entries[name] = (mode, hexsha)
- self._needs_serialization = True
- def __delitem__(self, name: bytes) -> None:
- """Delete tree entry by name."""
- del self._entries[name]
- self._needs_serialization = True
- def __len__(self) -> int:
- """Return number of entries in tree."""
- return len(self._entries)
- def __iter__(self) -> Iterator[bytes]:
- """Iterate over tree entry names."""
- return iter(self._entries)
- def add(self, name: bytes, mode: int, hexsha: ObjectID) -> None:
- """Add an entry to the tree.
- Args:
- mode: The mode of the entry as an integral type. Not all
- possible modes are supported by git; see check() for details.
- name: The name of the entry, as a string.
- hexsha: The hex SHA of the entry as a string.
- """
- self._entries[name] = mode, hexsha
- self._needs_serialization = True
- def iteritems(self, name_order: bool = False) -> Iterator[TreeEntry]:
- """Iterate over entries.
- Args:
- name_order: If True, iterate in name order instead of tree
- order.
- Returns: Iterator over (name, mode, sha) tuples
- """
- return sorted_tree_items(self._entries, name_order)
- def items(self) -> list[TreeEntry]:
- """Return the sorted entries in this tree.
- Returns: List with (name, mode, sha) tuples
- """
- return list(self.iteritems())
- def _deserialize(self, chunks: list[bytes]) -> None:
- """Grab the entries in the tree."""
- try:
- parsed_entries = parse_tree(
- b"".join(chunks),
- sha_len=self.object_format.oid_length,
- )
- except ValueError as exc:
- raise ObjectFormatException(exc) from exc
- # TODO: list comprehension is for efficiency in the common (small)
- # case; if memory efficiency in the large case is a concern, use a
- # genexp.
- self._entries = {n: (m, ObjectID(s)) for n, m, s in parsed_entries}
- def check(self) -> None:
- """Check this object for internal consistency.
- Raises:
- ObjectFormatException: if the object is malformed in some way
- """
- super().check()
- assert self._chunked_text is not None
- last = None
- allowed_modes = (
- stat.S_IFREG | 0o755,
- stat.S_IFREG | 0o644,
- stat.S_IFLNK,
- stat.S_IFDIR,
- S_IFGITLINK,
- # TODO: optionally exclude as in git fsck --strict
- stat.S_IFREG | 0o664,
- )
- for name, mode, sha in parse_tree(
- b"".join(self._chunked_text),
- strict=True,
- sha_len=self.object_format.oid_length,
- ):
- check_hexsha(sha, f"invalid sha {sha!r}")
- if b"/" in name or name in (b"", b".", b"..", b".git"):
- raise ObjectFormatException(
- "invalid name {}".format(name.decode("utf-8", "replace"))
- )
- if mode not in allowed_modes:
- raise ObjectFormatException(f"invalid mode {mode:06o}")
- entry = (name, (mode, ObjectID(sha)))
- if last:
- if key_entry(last) > key_entry(entry):
- raise ObjectFormatException("entries not sorted")
- if name == last[0]:
- raise ObjectFormatException(f"duplicate entry {name!r}")
- last = entry
- def _serialize(self) -> list[bytes]:
- return list(serialize_tree(self.iteritems()))
- def as_pretty_string(self) -> str:
- """Return a human-readable string representation of this tree.
- Returns:
- Pretty-printed tree entries
- """
- text: list[str] = []
- for entry in self.iteritems():
- if (
- entry.path is not None
- and entry.mode is not None
- and entry.sha is not None
- ):
- text.append(pretty_format_tree_entry(entry.path, entry.mode, entry.sha))
- return "".join(text)
- def lookup_path(
- self, lookup_obj: Callable[[ObjectID], ShaFile], path: bytes
- ) -> tuple[int, ObjectID]:
- """Look up an object in a Git tree.
- Args:
- lookup_obj: Callback for retrieving object by SHA1
- path: Path to lookup
- Returns: A tuple of (mode, SHA) of the resulting path.
- """
- # Handle empty path - return the tree itself
- if not path:
- return stat.S_IFDIR, self.id
- parts = path.split(b"/")
- sha = self.id
- mode: int | None = None
- for i, p in enumerate(parts):
- if not p:
- continue
- if mode is not None and S_ISGITLINK(mode):
- raise SubmoduleEncountered(b"/".join(parts[:i]), sha)
- obj = lookup_obj(sha)
- if not isinstance(obj, Tree):
- raise NotTreeError(sha)
- mode, sha = obj[p]
- if mode is None:
- raise ValueError("No valid path found")
- return mode, sha
- def parse_timezone(text: bytes) -> tuple[int, bool]:
- """Parse a timezone text fragment (e.g. '+0100').
- Args:
- text: Text to parse.
- Returns: Tuple with timezone as seconds difference to UTC
- and a boolean indicating whether this was a UTC timezone
- prefixed with a negative sign (-0000).
- """
- # cgit parses the first character as the sign, and the rest
- # as an integer (using strtol), which could also be negative.
- # We do the same for compatibility. See #697828.
- if text[0] not in b"+-":
- raise ValueError("Timezone must start with + or - ({text})".format(**vars()))
- sign = text[:1]
- offset = int(text[1:])
- if sign == b"-":
- offset = -offset
- unnecessary_negative_timezone = offset >= 0 and sign == b"-"
- signum = ((offset < 0) and -1) or 1
- offset = abs(offset)
- hours = int(offset / 100)
- minutes = offset % 100
- return (
- signum * (hours * 3600 + minutes * 60),
- unnecessary_negative_timezone,
- )
- def parse_timezone_broken(text: bytes) -> tuple[int, bool]:
- """Parse a timezone text fragment, accepting broken formats.
- This function handles various broken timezone formats found in the wild:
- - Missing sign prefix (e.g., '0000' instead of '+0000')
- - Double negative (e.g., '--700')
- Args:
- text: Text to parse.
- Returns: Tuple with timezone as seconds difference to UTC
- and a boolean indicating whether this was a UTC timezone
- prefixed with a negative sign (-0000).
- """
- if text[0] not in b"+-":
- # Some (broken) commits do not have a sign
- text = b"+" + text
- # cgit parses the first character as the sign, and the rest
- # as an integer (using strtol), which could also be negative.
- # We do the same for compatibility. See #697828.
- sign = text[:1]
- offset = int(text[1:])
- if sign == b"-":
- offset = -offset
- unnecessary_negative_timezone = offset >= 0 and sign == b"-"
- signum = ((offset < 0) and -1) or 1
- offset = abs(offset)
- hours = int(offset / 100)
- minutes = offset % 100
- return (
- signum * (hours * 3600 + minutes * 60),
- unnecessary_negative_timezone,
- )
- def format_timezone(offset: int, unnecessary_negative_timezone: bool = False) -> bytes:
- """Format a timezone for Git serialization.
- Args:
- offset: Timezone offset as seconds difference to UTC
- unnecessary_negative_timezone: Whether to use a minus sign for
- UTC or positive timezones (-0000 and --700 rather than +0000 / +0700).
- """
- if offset % 60 != 0:
- raise ValueError("Unable to handle non-minute offset.")
- if offset < 0 or unnecessary_negative_timezone:
- sign = "-"
- offset = -offset
- else:
- sign = "+"
- return ("%c%02d%02d" % (sign, offset / 3600, (offset / 60) % 60)).encode("ascii") # noqa: UP031
- def parse_time_entry(
- value: bytes,
- ) -> tuple[bytes, int | None, tuple[int | None, bool]]:
- """Parse event.
- Args:
- value: Bytes representing a git commit/tag line
- Raises:
- ObjectFormatException in case of parsing error (malformed
- field date)
- Returns: Tuple of (author, time, (timezone, timezone_neg_utc))
- """
- try:
- sep = value.rindex(b"> ")
- except ValueError:
- return (value, None, (None, False))
- try:
- person = value[0 : sep + 1]
- rest = value[sep + 2 :]
- timetext, timezonetext = rest.rsplit(b" ", 1)
- time = int(timetext)
- timezone, timezone_neg_utc = parse_timezone(timezonetext)
- except ValueError as exc:
- raise ObjectFormatException(exc) from exc
- return person, time, (timezone, timezone_neg_utc)
- def parse_time_entry_broken(
- value: bytes,
- ) -> tuple[bytes, int | None, tuple[int | None, bool]]:
- """Parse event, accepting broken formats.
- This function handles various broken author/committer/tagger line formats:
- - Missing angle brackets around email
- - Unsigned timezones
- - Double-negative timezones
- Args:
- value: Bytes representing a git commit/tag line
- Raises:
- ObjectFormatException in case of parsing error
- Returns: Tuple of (author, time, (timezone, timezone_neg_utc))
- """
- m = _TIME_ENTRY_RE.match(value)
- if not m:
- raise ObjectFormatException(f"Unable to parse time entry: {value!r}")
- person = m.group("person")
- timetext = m.group("time")
- timezonetext = m.group("timezone")
- time = int(timetext)
- timezone, timezone_neg_utc = parse_timezone_broken(timezonetext)
- return person, time, (timezone, timezone_neg_utc)
- def format_time_entry(
- person: bytes, time: int, timezone_info: tuple[int, bool]
- ) -> bytes:
- """Format an event."""
- (timezone, timezone_neg_utc) = timezone_info
- return b" ".join(
- [person, str(time).encode("ascii"), format_timezone(timezone, timezone_neg_utc)]
- )
- def _parse_commit(
- chunks: Iterable[bytes],
- ) -> tuple[
- bytes | None,
- list[bytes],
- tuple[bytes | None, int | None, tuple[int | None, bool | None]],
- tuple[bytes | None, int | None, tuple[int | None, bool | None]],
- bytes | None,
- list[Tag],
- bytes | None,
- bytes | None,
- list[tuple[bytes, bytes]],
- ]:
- """Parse a commit object from chunks.
- Args:
- chunks: Chunks to parse
- Returns: Tuple of (tree, parents, author_info, commit_info,
- encoding, mergetag, gpgsig, message, extra)
- """
- parents = []
- extra: list[tuple[bytes, bytes]] = []
- tree = None
- author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
- None,
- None,
- (None, None),
- )
- commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
- None,
- None,
- (None, None),
- )
- encoding = None
- mergetag = []
- message = None
- gpgsig = None
- for field, value in _parse_message(chunks):
- # TODO(jelmer): Enforce ordering
- if field == _TREE_HEADER:
- tree = value
- elif field == _PARENT_HEADER:
- if value is None:
- raise ObjectFormatException("missing parent value")
- parents.append(value)
- elif field == _AUTHOR_HEADER:
- if value is None:
- raise ObjectFormatException("missing author value")
- author_info = parse_time_entry(value)
- elif field == _COMMITTER_HEADER:
- if value is None:
- raise ObjectFormatException("missing committer value")
- commit_info = parse_time_entry(value)
- elif field == _ENCODING_HEADER:
- encoding = value
- elif field == _MERGETAG_HEADER:
- if value is None:
- raise ObjectFormatException("missing mergetag value")
- tag = Tag.from_string(value + b"\n")
- assert isinstance(tag, Tag)
- mergetag.append(tag)
- elif field == _GPGSIG_HEADER:
- gpgsig = value
- elif field is None:
- message = value
- else:
- if value is None:
- raise ObjectFormatException(f"missing value for field {field!r}")
- extra.append((field, value))
- return (
- tree,
- parents,
- author_info,
- commit_info,
- encoding,
- mergetag,
- gpgsig,
- message,
- extra,
- )
- def _parse_commit_broken(
- chunks: Iterable[bytes],
- ) -> tuple[
- bytes | None,
- list[bytes],
- tuple[bytes | None, int | None, tuple[int | None, bool | None]],
- tuple[bytes | None, int | None, tuple[int | None, bool | None]],
- bytes | None,
- list[Tag],
- bytes | None,
- bytes | None,
- list[tuple[bytes, bytes]],
- ]:
- """Parse a commit object from chunks, accepting broken formats.
- This function handles various broken author/committer line formats:
- - Missing angle brackets around email
- - Unsigned timezones
- - Double-negative timezones
- Args:
- chunks: Chunks to parse
- Returns: Tuple of (tree, parents, author_info, commit_info,
- encoding, mergetag, gpgsig, message, extra)
- """
- parents = []
- extra: list[tuple[bytes, bytes]] = []
- tree = None
- author_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
- None,
- None,
- (None, None),
- )
- commit_info: tuple[bytes | None, int | None, tuple[int | None, bool | None]] = (
- None,
- None,
- (None, None),
- )
- encoding = None
- mergetag = []
- message = None
- gpgsig = None
- for field, value in _parse_message(chunks):
- # TODO(jelmer): Enforce ordering
- if field == _TREE_HEADER:
- tree = value
- elif field == _PARENT_HEADER:
- if value is None:
- raise ObjectFormatException("missing parent value")
- parents.append(value)
- elif field == _AUTHOR_HEADER:
- if value is None:
- raise ObjectFormatException("missing author value")
- author_info = parse_time_entry_broken(value)
- elif field == _COMMITTER_HEADER:
- if value is None:
- raise ObjectFormatException("missing committer value")
- commit_info = parse_time_entry_broken(value)
- elif field == _ENCODING_HEADER:
- encoding = value
- elif field == _MERGETAG_HEADER:
- if value is None:
- raise ObjectFormatException("missing mergetag value")
- tag = Tag.from_string(value + b"\n")
- assert isinstance(tag, Tag)
- mergetag.append(tag)
- elif field == _GPGSIG_HEADER:
- gpgsig = value
- elif field is None:
- message = value
- else:
- if value is None:
- raise ObjectFormatException(f"missing value for field {field!r}")
- extra.append((field, value))
- return (
- tree,
- parents,
- author_info,
- commit_info,
- encoding,
- mergetag,
- gpgsig,
- message,
- extra,
- )
- class Commit(ShaFile):
- """A git commit object."""
- type_name = b"commit"
- type_num = 1
- __slots__ = (
- "_author",
- "_author_time",
- "_author_timezone",
- "_author_timezone_neg_utc",
- "_commit_time",
- "_commit_timezone",
- "_commit_timezone_neg_utc",
- "_committer",
- "_encoding",
- "_extra",
- "_gpgsig",
- "_mergetag",
- "_message",
- "_parents",
- "_tree",
- )
- def __init__(self) -> None:
- """Initialize an empty Commit."""
- super().__init__()
- self._parents: list[ObjectID] = []
- self._encoding: bytes | None = None
- self._mergetag: list[Tag] = []
- self._gpgsig: bytes | None = None
- self._extra: list[tuple[bytes, bytes]] = []
- self._author_timezone_neg_utc: bool | None = False
- self._commit_timezone_neg_utc: bool | None = False
- @classmethod
- def from_path(
- cls,
- path: str | bytes,
- sha: ObjectID | None = None,
- *,
- object_format: ObjectFormat | None = None,
- ) -> "Commit":
- """Read a commit from a file on disk.
- Args:
- path: Path to the commit file
- sha: Optional known SHA for the object
- object_format: Optional object format to use
- Returns:
- A Commit object
- Raises:
- NotCommitError: If the file is not a commit
- """
- commit = ShaFile.from_path(path, sha, object_format=object_format)
- if not isinstance(commit, cls):
- raise NotCommitError(_path_to_bytes(path))
- return commit
- def _deserialize(self, chunks: list[bytes]) -> None:
- (
- tree,
- parents,
- author_info,
- commit_info,
- encoding,
- mergetag,
- gpgsig,
- message,
- extra,
- ) = parse_commit(chunks)
- self._tree = tree
- self._parents = [ObjectID(p) for p in parents]
- self._encoding = encoding
- self._mergetag = mergetag
- self._gpgsig = gpgsig
- self._message = message
- self._extra = extra
- (
- self._author,
- self._author_time,
- (self._author_timezone, self._author_timezone_neg_utc),
- ) = author_info
- (
- self._committer,
- self._commit_time,
- (self._commit_timezone, self._commit_timezone_neg_utc),
- ) = commit_info
- def check(self) -> None:
- """Check this object for internal consistency.
- Raises:
- ObjectFormatException: if the object is malformed in some way
- """
- super().check()
- assert self._chunked_text is not None
- self._check_has_member("_tree", "missing tree")
- self._check_has_member("_author", "missing author")
- self._check_has_member("_committer", "missing committer")
- self._check_has_member("_author_time", "missing author time")
- self._check_has_member("_commit_time", "missing commit time")
- for parent in self._parents:
- check_hexsha(parent, "invalid parent sha")
- assert self._tree is not None # checked by _check_has_member above
- check_hexsha(self._tree, "invalid tree sha")
- assert self._author is not None # checked by _check_has_member above
- assert self._committer is not None # checked by _check_has_member above
- check_identity(self._author, "invalid author")
- check_identity(self._committer, "invalid committer")
- assert self._author_time is not None # checked by _check_has_member above
- assert self._commit_time is not None # checked by _check_has_member above
- check_time(self._author_time)
- check_time(self._commit_time)
- last = None
- for field, _ in _parse_message(self._chunked_text):
- if field == _TREE_HEADER and last is not None:
- raise ObjectFormatException("unexpected tree")
- elif field == _PARENT_HEADER and last not in (
- _PARENT_HEADER,
- _TREE_HEADER,
- ):
- raise ObjectFormatException("unexpected parent")
- elif field == _AUTHOR_HEADER and last not in (
- _TREE_HEADER,
- _PARENT_HEADER,
- ):
- raise ObjectFormatException("unexpected author")
- elif field == _COMMITTER_HEADER and last != _AUTHOR_HEADER:
- raise ObjectFormatException("unexpected committer")
- elif field == _ENCODING_HEADER and last != _COMMITTER_HEADER:
- raise ObjectFormatException("unexpected encoding")
- last = field
- # TODO: optionally check for duplicate parents
- def sign(self, keyid: str | None = None) -> None:
- """Sign this commit with a GPG key.
- Args:
- keyid: Optional GPG key ID to use for signing. If not specified,
- the default GPG key will be used.
- """
- import gpg
- with gpg.Context(armor=True) as c:
- if keyid is not None:
- key = c.get_key(keyid)
- with gpg.Context(armor=True, signers=[key]) as ctx:
- self.gpgsig, _unused_result = ctx.sign(
- self.as_raw_string(),
- mode=gpg.constants.sig.mode.DETACH,
- )
- else:
- self.gpgsig, _unused_result = c.sign(
- self.as_raw_string(), mode=gpg.constants.sig.mode.DETACH
- )
- def raw_without_sig(self) -> bytes:
- """Return raw string serialization without the GPG/SSH signature.
- self.gpgsig is a signature for the returned raw byte string serialization.
- """
- tmp = self.copy()
- assert isinstance(tmp, Commit)
- tmp._gpgsig = None
- tmp.gpgsig = None
- return tmp.as_raw_string()
- def extract_signature(self) -> tuple[bytes, bytes | None, bytes | None]:
- """Extract the payload, signature, and signature type from this commit.
- Returns:
- tuple of (``payload``, ``signature``, ``signature_type``) where:
- - ``payload``: The raw commit data without the signature
- - ``signature``: The signature bytes if present, None otherwise
- - ``signature_type``: SIGNATURE_PGP for PGP, SIGNATURE_SSH for SSH, None if no signature
- Raises:
- ObjectFormatException: If signature has unknown format
- """
- if self._gpgsig is None:
- return self.as_raw_string(), None, None
- payload = self.raw_without_sig()
- # Determine signature type
- if self._gpgsig.startswith(BEGIN_PGP_SIGNATURE):
- sig_type = SIGNATURE_PGP
- elif self._gpgsig.startswith(BEGIN_SSH_SIGNATURE):
- sig_type = SIGNATURE_SSH
- else:
- raise ObjectFormatException("Unknown signature format")
- return payload, self._gpgsig, sig_type
- def verify(self, keyids: Iterable[str] | None = None) -> None:
- """Verify GPG signature for this commit (if it is signed).
- Args:
- keyids: Optional iterable of trusted keyids for this commit.
- If this commit is not signed by any key in keyids verification will
- fail. If not specified, this function only verifies that the commit
- has a valid signature.
- Raises:
- gpg.errors.BadSignatures: if GPG signature verification fails
- gpg.errors.MissingSignatures: if commit was not signed by a key
- specified in keyids
- """
- if self._gpgsig is None:
- return
- import gpg
- with gpg.Context() as ctx:
- data, result = ctx.verify(
- self.raw_without_sig(),
- signature=self._gpgsig,
- )
- if keyids:
- keys = [ctx.get_key(key) for key in keyids]
- for key in keys:
- for subkey in key.subkeys:
- for sig in result.signatures:
- if subkey.can_sign and subkey.fpr == sig.fpr:
- return
- raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
- def _serialize(self) -> list[bytes]:
- headers = []
- assert self._tree is not None
- tree_bytes = self._tree.id if isinstance(self._tree, Tree) else self._tree
- headers.append((_TREE_HEADER, tree_bytes))
- for p in self._parents:
- headers.append((_PARENT_HEADER, p))
- assert self._author is not None
- assert self._author_time is not None
- assert self._author_timezone is not None
- assert self._author_timezone_neg_utc is not None
- headers.append(
- (
- _AUTHOR_HEADER,
- format_time_entry(
- self._author,
- self._author_time,
- (self._author_timezone, self._author_timezone_neg_utc),
- ),
- )
- )
- assert self._committer is not None
- assert self._commit_time is not None
- assert self._commit_timezone is not None
- assert self._commit_timezone_neg_utc is not None
- headers.append(
- (
- _COMMITTER_HEADER,
- format_time_entry(
- self._committer,
- self._commit_time,
- (self._commit_timezone, self._commit_timezone_neg_utc),
- ),
- )
- )
- if self.encoding:
- headers.append((_ENCODING_HEADER, self.encoding))
- for mergetag in self.mergetag:
- headers.append((_MERGETAG_HEADER, mergetag.as_raw_string()[:-1]))
- headers.extend(
- (field, value) for field, value in self._extra if value is not None
- )
- if self.gpgsig:
- headers.append((_GPGSIG_HEADER, self.gpgsig))
- return list(_format_message(headers, self._message))
- tree = serializable_property("tree", "Tree that is the state of this commit")
- def _get_parents(self) -> list[ObjectID]:
- """Return a list of parents of this commit."""
- return self._parents
- def _set_parents(self, value: list[ObjectID]) -> None:
- """Set a list of parents of this commit."""
- self._needs_serialization = True
- self._parents = value
- parents = property(
- _get_parents,
- _set_parents,
- doc="Parents of this commit, by their SHA1.",
- )
- @replace_me(since="0.21.0", remove_in="0.24.0")
- def _get_extra(self) -> list[tuple[bytes, bytes]]:
- """Return extra settings of this commit."""
- return self._extra
- extra = property(
- _get_extra,
- doc="Extra header fields not understood (presumably added in a "
- "newer version of git). Kept verbatim so the object can "
- "be correctly reserialized. For private commit metadata, use "
- "pseudo-headers in Commit.message, rather than this field.",
- )
- author = serializable_property("author", "The name of the author of the commit")
- committer = serializable_property(
- "committer", "The name of the committer of the commit"
- )
- message = serializable_property("message", "The commit message")
- commit_time = serializable_property(
- "commit_time",
- "The timestamp of the commit. As the number of seconds since the epoch.",
- )
- commit_timezone = serializable_property(
- "commit_timezone", "The zone the commit time is in"
- )
- author_time = serializable_property(
- "author_time",
- "The timestamp the commit was written. As the number of "
- "seconds since the epoch.",
- )
- author_timezone = serializable_property(
- "author_timezone", "Returns the zone the author time is in."
- )
- encoding = serializable_property("encoding", "Encoding of the commit message.")
- mergetag = serializable_property("mergetag", "Associated signed tag.")
- gpgsig = serializable_property("gpgsig", "GPG Signature.")
- OBJECT_CLASSES = (
- Commit,
- Tree,
- Blob,
- Tag,
- )
- _TYPE_MAP: dict[bytes | int, type[ShaFile]] = {}
- for cls in OBJECT_CLASSES:
- _TYPE_MAP[cls.type_name] = cls
- _TYPE_MAP[cls.type_num] = cls
- # Public API functions
- @replace_me(since="0.21.0", remove_in="0.24.0")
- def parse_commit(
- chunks: Iterable[bytes],
- ) -> tuple[
- bytes | None,
- list[bytes],
- tuple[bytes | None, int | None, tuple[int | None, bool | None]],
- tuple[bytes | None, int | None, tuple[int | None, bool | None]],
- bytes | None,
- list[Tag],
- bytes | None,
- bytes | None,
- list[tuple[bytes, bytes]],
- ]:
- """Parse a commit object from chunks.
- Args:
- chunks: Chunks to parse
- Returns: Tuple of (tree, parents, author_info, commit_info,
- encoding, mergetag, gpgsig, message, extra)
- """
- return _parse_commit(chunks)
- def parse_commit_broken(data: bytes) -> Commit:
- """Parse a commit with broken author/committer lines.
- This function handles various broken formats found in the wild:
- - Missing angle brackets around email addresses
- - Unsigned timezones (e.g., "0000" instead of "+0000")
- - Double-negative timezones (e.g., "--700")
- - Negative timestamps
- - Long/short/nonsensical timezone values
- Warning: Commits parsed with this function may not round-trip correctly
- through serialization, as the broken formatting is normalized during parsing.
- The .check() method will likely fail for commits with malformed identity fields.
- Args:
- data: Raw commit data as bytes
- Returns:
- A Commit object with normalized fields
- Example:
- >>> data = b'''tree d80c186a03f423a81b39df39dc87fd269736ca86
- ... author user@example.com 1234567890 -0500
- ... committer user@example.com 1234567890 -0500
- ...
- ... Commit message
- ... '''
- >>> commit = parse_commit_broken(data)
- >>> commit.author
- b'user@example.com'
- """
- commit = Commit()
- (
- tree,
- parents,
- author_info,
- commit_info,
- encoding,
- mergetag,
- gpgsig,
- message,
- extra,
- ) = _parse_commit_broken([data])
- commit._tree = tree
- commit._parents = [ObjectID(p) for p in parents]
- commit._encoding = encoding
- commit._mergetag = mergetag
- commit._gpgsig = gpgsig
- commit._message = message
- commit._extra = extra
- (
- commit._author,
- commit._author_time,
- (commit._author_timezone, commit._author_timezone_neg_utc),
- ) = author_info
- (
- commit._committer,
- commit._commit_time,
- (commit._commit_timezone, commit._commit_timezone_neg_utc),
- ) = commit_info
- return commit
- # Hold on to the pure-python implementations for testing
- _parse_tree_py = parse_tree
- _sorted_tree_items_py = sorted_tree_items
- try:
- # Try to import Rust versions
- from dulwich._objects import (
- parse_tree as _parse_tree_rs,
- )
- from dulwich._objects import (
- sorted_tree_items as _sorted_tree_items_rs,
- )
- except ImportError:
- pass
- else:
- parse_tree = _parse_tree_rs
- sorted_tree_items = _sorted_tree_items_rs
|