| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021 |
- # protocol.py -- Shared parts of the git protocols
- # Copyright (C) 2008 John Carr <john.carr@unrouted.co.uk>
- # Copyright (C) 2008-2012 Jelmer Vernooij <jelmer@jelmer.uk>
- #
- # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
- # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
- # General Public License as published by the Free Software Foundation; version 2.0
- # or (at your option) any later version. You can redistribute it and/or
- # modify it under the terms of either of these two licenses.
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- # You should have received a copy of the licenses; if not, see
- # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
- # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
- # License, Version 2.0.
- #
- """Generic functions for talking the git smart server protocol."""
- __all__ = [
- "CAPABILITIES_REF",
- "CAPABILITY_AGENT",
- "CAPABILITY_ALLOW_REACHABLE_SHA1_IN_WANT",
- "CAPABILITY_ALLOW_TIP_SHA1_IN_WANT",
- "CAPABILITY_ATOMIC",
- "CAPABILITY_DEEPEN_NOT",
- "CAPABILITY_DEEPEN_RELATIVE",
- "CAPABILITY_DEEPEN_SINCE",
- "CAPABILITY_DELETE_REFS",
- "CAPABILITY_FETCH",
- "CAPABILITY_FILTER",
- "CAPABILITY_INCLUDE_TAG",
- "CAPABILITY_MULTI_ACK",
- "CAPABILITY_MULTI_ACK_DETAILED",
- "CAPABILITY_NO_DONE",
- "CAPABILITY_NO_PROGRESS",
- "CAPABILITY_OBJECT_FORMAT",
- "CAPABILITY_OFS_DELTA",
- "CAPABILITY_QUIET",
- "CAPABILITY_REPORT_STATUS",
- "CAPABILITY_SHALLOW",
- "CAPABILITY_SIDE_BAND",
- "CAPABILITY_SIDE_BAND_64K",
- "CAPABILITY_SYMREF",
- "CAPABILITY_THIN_PACK",
- "COMMAND_DEEPEN",
- "COMMAND_DEEPEN_NOT",
- "COMMAND_DEEPEN_SINCE",
- "COMMAND_DONE",
- "COMMAND_FILTER",
- "COMMAND_HAVE",
- "COMMAND_SHALLOW",
- "COMMAND_UNSHALLOW",
- "COMMAND_WANT",
- "COMMON_CAPABILITIES",
- "DEFAULT_GIT_PROTOCOL_VERSION_FETCH",
- "DEFAULT_GIT_PROTOCOL_VERSION_SEND",
- "DEPTH_INFINITE",
- "GIT_PROTOCOL_VERSIONS",
- "KNOWN_RECEIVE_CAPABILITIES",
- "KNOWN_UPLOAD_CAPABILITIES",
- "MULTI_ACK",
- "MULTI_ACK_DETAILED",
- "NAK_LINE",
- "PEELED_TAG_SUFFIX",
- "SIDE_BAND_CHANNEL_DATA",
- "SIDE_BAND_CHANNEL_FATAL",
- "SIDE_BAND_CHANNEL_PROGRESS",
- "SINGLE_ACK",
- "TCP_GIT_PORT",
- "BufferedPktLineWriter",
- "PktLineParser",
- "Protocol",
- "ReceivableProtocol",
- "ack_type",
- "agent_string",
- "capability_agent",
- "capability_object_format",
- "capability_symref",
- "extract_capabilities",
- "extract_capability_names",
- "extract_want_line_capabilities",
- "find_capability",
- "format_ack_line",
- "format_capability_line",
- "format_cmd_pkt",
- "format_ref_line",
- "format_shallow_line",
- "format_unshallow_line",
- "parse_capability",
- "parse_cmd_pkt",
- "pkt_line",
- "pkt_seq",
- "serialize_refs",
- "split_peeled_refs",
- "strip_peeled_refs",
- "symref_capabilities",
- "write_info_refs",
- ]
- import types
- from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence
- from io import BytesIO
- from os import SEEK_END
- from typing import TYPE_CHECKING
- import dulwich
- from .errors import GitProtocolError, HangupException
- from .objects import ObjectID
- if TYPE_CHECKING:
- from .pack import ObjectContainer
- from .refs import Ref
- TCP_GIT_PORT = 9418
- # Git protocol version 0 is the original Git protocol, which lacked a
- # version number until Git protocol version 1 was introduced by Brandon
- # Williams in 2017.
- #
- # Protocol version 1 is simply the original v0 protocol with the addition of
- # a single packet line, which precedes the ref advertisement, indicating the
- # protocol version being used. This was done in preparation for protocol v2.
- #
- # Git protocol version 2 was first introduced by Brandon Williams in 2018 and
- # adds many features. See the gitprotocol-v2(5) manual page for details.
- # As of 2024, Git only implements version 2 during 'git fetch' and still uses
- # version 0 during 'git push'.
- GIT_PROTOCOL_VERSIONS = [0, 1, 2]
- DEFAULT_GIT_PROTOCOL_VERSION_FETCH = 2
- DEFAULT_GIT_PROTOCOL_VERSION_SEND = 0
- # Suffix used in the Git protocol to indicate peeled tag references
- PEELED_TAG_SUFFIX = b"^{}"
- ZERO_SHA: ObjectID = ObjectID(b"0" * 40)
- SINGLE_ACK = 0
- MULTI_ACK = 1
- MULTI_ACK_DETAILED = 2
- # pack data
- SIDE_BAND_CHANNEL_DATA = 1
- # progress messages
- SIDE_BAND_CHANNEL_PROGRESS = 2
- # fatal error message just before stream aborts
- SIDE_BAND_CHANNEL_FATAL = 3
- CAPABILITY_ATOMIC = b"atomic"
- CAPABILITY_DEEPEN_SINCE = b"deepen-since"
- CAPABILITY_DEEPEN_NOT = b"deepen-not"
- CAPABILITY_DEEPEN_RELATIVE = b"deepen-relative"
- CAPABILITY_DELETE_REFS = b"delete-refs"
- CAPABILITY_INCLUDE_TAG = b"include-tag"
- CAPABILITY_MULTI_ACK = b"multi_ack"
- CAPABILITY_MULTI_ACK_DETAILED = b"multi_ack_detailed"
- CAPABILITY_NO_DONE = b"no-done"
- CAPABILITY_NO_PROGRESS = b"no-progress"
- CAPABILITY_OFS_DELTA = b"ofs-delta"
- CAPABILITY_QUIET = b"quiet"
- CAPABILITY_REPORT_STATUS = b"report-status"
- CAPABILITY_SHALLOW = b"shallow"
- CAPABILITY_SIDE_BAND = b"side-band"
- CAPABILITY_SIDE_BAND_64K = b"side-band-64k"
- CAPABILITY_THIN_PACK = b"thin-pack"
- CAPABILITY_AGENT = b"agent"
- CAPABILITY_SYMREF = b"symref"
- CAPABILITY_ALLOW_TIP_SHA1_IN_WANT = b"allow-tip-sha1-in-want"
- CAPABILITY_ALLOW_REACHABLE_SHA1_IN_WANT = b"allow-reachable-sha1-in-want"
- CAPABILITY_FETCH = b"fetch"
- CAPABILITY_FILTER = b"filter"
- CAPABILITY_OBJECT_FORMAT = b"object-format"
- # Magic ref that is used to attach capabilities to when
- # there are no refs. Should always be ste to ZERO_SHA.
- CAPABILITIES_REF = b"capabilities^{}"
- COMMON_CAPABILITIES = [
- CAPABILITY_OFS_DELTA,
- CAPABILITY_SIDE_BAND,
- CAPABILITY_SIDE_BAND_64K,
- CAPABILITY_AGENT,
- CAPABILITY_NO_PROGRESS,
- ]
- KNOWN_UPLOAD_CAPABILITIES = set(
- [
- *COMMON_CAPABILITIES,
- CAPABILITY_THIN_PACK,
- CAPABILITY_MULTI_ACK,
- CAPABILITY_MULTI_ACK_DETAILED,
- CAPABILITY_INCLUDE_TAG,
- CAPABILITY_DEEPEN_SINCE,
- CAPABILITY_SYMREF,
- CAPABILITY_SHALLOW,
- CAPABILITY_DEEPEN_NOT,
- CAPABILITY_DEEPEN_RELATIVE,
- CAPABILITY_ALLOW_TIP_SHA1_IN_WANT,
- CAPABILITY_ALLOW_REACHABLE_SHA1_IN_WANT,
- CAPABILITY_FETCH,
- CAPABILITY_FILTER,
- ]
- )
- KNOWN_RECEIVE_CAPABILITIES = set(
- [
- *COMMON_CAPABILITIES,
- CAPABILITY_REPORT_STATUS,
- CAPABILITY_DELETE_REFS,
- CAPABILITY_QUIET,
- CAPABILITY_ATOMIC,
- ]
- )
- DEPTH_INFINITE = 0x7FFFFFFF
- NAK_LINE = b"NAK\n"
- def agent_string() -> bytes:
- """Generate the agent string for dulwich.
- Returns:
- Agent string as bytes
- """
- return ("dulwich/" + ".".join(map(str, dulwich.__version__))).encode("ascii")
- def capability_agent() -> bytes:
- """Generate the agent capability string.
- Returns:
- Agent capability with dulwich version
- """
- return CAPABILITY_AGENT + b"=" + agent_string()
- def capability_object_format(fmt: str) -> bytes:
- """Generate the object-format capability string.
- Args:
- fmt: Object format name (e.g., "sha1" or "sha256")
- Returns:
- Object-format capability with format name
- """
- return CAPABILITY_OBJECT_FORMAT + b"=" + fmt.encode("ascii")
- def capability_symref(from_ref: bytes, to_ref: bytes) -> bytes:
- """Generate a symref capability string.
- Args:
- from_ref: Source reference name
- to_ref: Target reference name
- Returns:
- Symref capability string
- """
- return CAPABILITY_SYMREF + b"=" + from_ref + b":" + to_ref
- def extract_capability_names(capabilities: Iterable[bytes]) -> set[bytes]:
- """Extract capability names from a list of capabilities.
- Args:
- capabilities: List of capability strings
- Returns:
- Set of capability names
- """
- return {parse_capability(c)[0] for c in capabilities}
- def parse_capability(capability: bytes) -> tuple[bytes, bytes | None]:
- """Parse a capability string into name and value.
- Args:
- capability: Capability string
- Returns:
- Tuple of (capability_name, capability_value)
- """
- parts = capability.split(b"=", 1)
- if len(parts) == 1:
- return (parts[0], None)
- return (parts[0], parts[1])
- def symref_capabilities(symrefs: Iterable[tuple[bytes, bytes]]) -> list[bytes]:
- """Generate symref capability strings from symref pairs.
- Args:
- symrefs: Iterable of (from_ref, to_ref) tuples
- Returns:
- List of symref capability strings
- """
- return [capability_symref(*k) for k in symrefs]
- COMMAND_DEEPEN = b"deepen"
- COMMAND_DEEPEN_SINCE = b"deepen-since"
- COMMAND_DEEPEN_NOT = b"deepen-not"
- COMMAND_SHALLOW = b"shallow"
- COMMAND_UNSHALLOW = b"unshallow"
- COMMAND_DONE = b"done"
- COMMAND_WANT = b"want"
- COMMAND_HAVE = b"have"
- COMMAND_FILTER = b"filter"
- def format_cmd_pkt(cmd: bytes, *args: bytes) -> bytes:
- """Format a command packet.
- Args:
- cmd: Command name
- *args: Command arguments
- Returns:
- Formatted command packet
- """
- return cmd + b" " + b"".join([(a + b"\0") for a in args])
- def parse_cmd_pkt(line: bytes) -> tuple[bytes, list[bytes]]:
- """Parse a command packet.
- Args:
- line: Command line to parse
- Returns:
- Tuple of (command, [arguments])
- """
- splice_at = line.find(b" ")
- cmd, args = line[:splice_at], line[splice_at + 1 :]
- assert args[-1:] == b"\x00"
- return cmd, args[:-1].split(b"\0")
- def pkt_line(data: bytes | None) -> bytes:
- """Wrap data in a pkt-line.
- Args:
- data: The data to wrap, as a str or None.
- Returns: The data prefixed with its length in pkt-line format; if data was
- None, returns the flush-pkt ('0000').
- """
- if data is None:
- return b"0000"
- return f"{len(data) + 4:04x}".encode("ascii") + data
- def pkt_seq(*seq: bytes | None) -> bytes:
- """Wrap a sequence of data in pkt-lines.
- Args:
- seq: An iterable of strings to wrap.
- """
- return b"".join([pkt_line(s) for s in seq]) + pkt_line(None)
- class Protocol:
- """Class for interacting with a remote git process over the wire.
- Parts of the git wire protocol use 'pkt-lines' to communicate. A pkt-line
- consists of the length of the line as a 4-byte hex string, followed by the
- payload data. The length includes the 4-byte header. The special line
- '0000' indicates the end of a section of input and is called a 'flush-pkt'.
- For details on the pkt-line format, see the cgit distribution:
- Documentation/technical/protocol-common.txt
- """
- def __init__(
- self,
- read: Callable[[int], bytes],
- write: Callable[[bytes], int | None],
- close: Callable[[], None] | None = None,
- report_activity: Callable[[int, str], None] | None = None,
- ) -> None:
- """Initialize Protocol.
- Args:
- read: Function to read bytes from the transport
- write: Function to write bytes to the transport
- close: Optional function to close the transport
- report_activity: Optional function to report activity
- """
- self.read = read
- self.write = write
- self._close = close
- self.report_activity = report_activity
- self._readahead: BytesIO | None = None
- def close(self) -> None:
- """Close the underlying transport if a close function was provided."""
- if self._close:
- self._close()
- def __enter__(self) -> "Protocol":
- """Enter context manager."""
- return self
- def __exit__(
- self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: types.TracebackType | None,
- ) -> None:
- """Exit context manager and close transport."""
- self.close()
- def read_pkt_line(self) -> bytes | None:
- """Reads a pkt-line from the remote git process.
- This method may read from the readahead buffer; see unread_pkt_line.
- Returns: The next string from the stream, without the length prefix, or
- None for a flush-pkt ('0000') or delim-pkt ('0001').
- """
- if self._readahead is None:
- read = self.read
- else:
- read = self._readahead.read
- self._readahead = None
- try:
- sizestr = read(4)
- if not sizestr:
- raise HangupException
- size = int(sizestr, 16)
- if size == 0 or size == 1: # flush-pkt or delim-pkt
- if self.report_activity:
- self.report_activity(4, "read")
- return None
- if self.report_activity:
- self.report_activity(size, "read")
- pkt_contents = read(size - 4)
- except ConnectionResetError as exc:
- raise HangupException from exc
- except OSError as exc:
- raise GitProtocolError(str(exc)) from exc
- else:
- if len(pkt_contents) + 4 != size:
- raise GitProtocolError(
- f"Length of pkt read {len(pkt_contents) + 4:04x} does not match length prefix {size:04x}"
- )
- return pkt_contents
- def eof(self) -> bool:
- """Test whether the protocol stream has reached EOF.
- Note that this refers to the actual stream EOF and not just a
- flush-pkt.
- Returns: True if the stream is at EOF, False otherwise.
- """
- try:
- next_line = self.read_pkt_line()
- except HangupException:
- return True
- self.unread_pkt_line(next_line)
- return False
- def unread_pkt_line(self, data: bytes | None) -> None:
- """Unread a single line of data into the readahead buffer.
- This method can be used to unread a single pkt-line into a fixed
- readahead buffer.
- Args:
- data: The data to unread, without the length prefix.
- Raises:
- ValueError: If more than one pkt-line is unread.
- """
- if self._readahead is not None:
- raise ValueError("Attempted to unread multiple pkt-lines.")
- self._readahead = BytesIO(pkt_line(data))
- def read_pkt_seq(self) -> Iterable[bytes]:
- """Read a sequence of pkt-lines from the remote git process.
- Returns: Yields each line of data up to but not including the next
- flush-pkt.
- """
- pkt = self.read_pkt_line()
- while pkt:
- yield pkt
- pkt = self.read_pkt_line()
- def write_pkt_line(self, line: bytes | None) -> None:
- """Sends a pkt-line to the remote git process.
- Args:
- line: A string containing the data to send, without the length
- prefix.
- """
- try:
- line = pkt_line(line)
- self.write(line)
- if self.report_activity:
- self.report_activity(len(line), "write")
- except OSError as exc:
- raise GitProtocolError(str(exc)) from exc
- def write_sideband(self, channel: int, blob: bytes) -> None:
- """Write multiplexed data to the sideband.
- Args:
- channel: An int specifying the channel to write to.
- blob: A blob of data (as a string) to send on this channel.
- """
- # a pktline can be a max of 65520. a sideband line can therefore be
- # 65520-5 = 65515
- # WTF: Why have the len in ASCII, but the channel in binary.
- while blob:
- self.write_pkt_line(bytes(bytearray([channel])) + blob[:65515])
- blob = blob[65515:]
- def send_cmd(self, cmd: bytes, *args: bytes) -> None:
- """Send a command and some arguments to a git server.
- Only used for the TCP git protocol (git://).
- Args:
- cmd: The remote service to access.
- args: List of arguments to send to remove service.
- """
- self.write_pkt_line(format_cmd_pkt(cmd, *args))
- def read_cmd(self) -> tuple[bytes, list[bytes]]:
- """Read a command and some arguments from the git client.
- Only used for the TCP git protocol (git://).
- Returns: A tuple of (command, [list of arguments]).
- """
- line = self.read_pkt_line()
- if line is None:
- raise GitProtocolError("Expected command, got flush packet")
- return parse_cmd_pkt(line)
- _RBUFSIZE = 65536 # 64KB buffer for better network I/O performance
- class ReceivableProtocol(Protocol):
- """Variant of Protocol that allows reading up to a size without blocking.
- This class has a recv() method that behaves like socket.recv() in addition
- to a read() method.
- If you want to read n bytes from the wire and block until exactly n bytes
- (or EOF) are read, use read(n). If you want to read at most n bytes from
- the wire but don't care if you get less, use recv(n). Note that recv(n)
- will still block until at least one byte is read.
- """
- def __init__(
- self,
- recv: Callable[[int], bytes],
- write: Callable[[bytes], int | None],
- close: Callable[[], None] | None = None,
- report_activity: Callable[[int, str], None] | None = None,
- rbufsize: int = _RBUFSIZE,
- ) -> None:
- """Initialize ReceivableProtocol.
- Args:
- recv: Function to receive bytes from the transport
- write: Function to write bytes to the transport
- close: Optional function to close the transport
- report_activity: Optional function to report activity
- rbufsize: Read buffer size
- """
- super().__init__(self.read, write, close=close, report_activity=report_activity)
- self._recv = recv
- self._rbuf = BytesIO()
- self._rbufsize = rbufsize
- def read(self, size: int) -> bytes:
- """Read bytes from the socket.
- Args:
- size: Number of bytes to read
- Returns:
- Bytes read from socket
- """
- # From _fileobj.read in socket.py in the Python 2.6.5 standard library,
- # with the following modifications:
- # - omit the size <= 0 branch
- # - seek back to start rather than 0 in case some buffer has been
- # consumed.
- # - use SEEK_END instead of the magic number.
- # Copyright (c) 2001-2010 Python Software Foundation; All Rights
- # Reserved
- # Licensed under the Python Software Foundation License.
- # TODO: see if buffer is more efficient than cBytesIO.
- assert size > 0
- # Our use of BytesIO rather than lists of string objects returned by
- # recv() minimizes memory usage and fragmentation that occurs when
- # rbufsize is large compared to the typical return value of recv().
- buf = self._rbuf
- start = buf.tell()
- buf.seek(0, SEEK_END)
- # buffer may have been partially consumed by recv()
- buf_len = buf.tell() - start
- if buf_len >= size:
- # Already have size bytes in our buffer? Extract and return.
- buf.seek(start)
- rv = buf.read(size)
- self._rbuf = BytesIO()
- self._rbuf.write(buf.read())
- self._rbuf.seek(0)
- return rv
- self._rbuf = BytesIO() # reset _rbuf. we consume it via buf.
- while True:
- left = size - buf_len
- # recv() will malloc the amount of memory given as its
- # parameter even though it often returns much less data
- # than that. The returned data string is short lived
- # as we copy it into a BytesIO and free it. This avoids
- # fragmentation issues on many platforms.
- data = self._recv(left)
- if not data:
- break
- n = len(data)
- if n == size and not buf_len:
- # Shortcut. Avoid buffer data copies when:
- # - We have no data in our buffer.
- # AND
- # - Our call to recv returned exactly the
- # number of bytes we were asked to read.
- return data
- if n == left:
- buf.write(data)
- del data # explicit free
- break
- assert n <= left, f"_recv({left}) returned {n} bytes"
- buf.write(data)
- buf_len += n
- del data # explicit free
- # assert buf_len == buf.tell()
- buf.seek(start)
- return buf.read()
- def recv(self, size: int) -> bytes:
- """Receive bytes from the socket with buffering.
- Args:
- size: Maximum number of bytes to receive
- Returns:
- Bytes received from socket
- """
- assert size > 0
- buf = self._rbuf
- start = buf.tell()
- buf.seek(0, SEEK_END)
- buf_len = buf.tell()
- buf.seek(start)
- left = buf_len - start
- if not left:
- # only read from the wire if our read buffer is exhausted
- data = self._recv(self._rbufsize)
- if len(data) == size:
- # shortcut: skip the buffer if we read exactly size bytes
- return data
- buf = BytesIO()
- buf.write(data)
- buf.seek(0)
- del data # explicit free
- self._rbuf = buf
- return buf.read(size)
- def extract_capabilities(text: bytes) -> tuple[bytes, list[bytes]]:
- """Extract a capabilities list from a string, if present.
- Args:
- text: String to extract from
- Returns: Tuple with text with capabilities removed and list of capabilities
- """
- if b"\0" not in text:
- return text, []
- text, capabilities = text.rstrip().split(b"\0")
- return (text, capabilities.strip().split(b" "))
- def extract_want_line_capabilities(text: bytes) -> tuple[bytes, list[bytes]]:
- """Extract a capabilities list from a want line, if present.
- Note that want lines have capabilities separated from the rest of the line
- by a space instead of a null byte. Thus want lines have the form:
- want obj-id cap1 cap2 ...
- Args:
- text: Want line to extract from
- Returns: Tuple with text with capabilities removed and list of capabilities
- """
- split_text = text.rstrip().split(b" ")
- if len(split_text) < 3:
- return text, []
- return (b" ".join(split_text[:2]), split_text[2:])
- def ack_type(capabilities: Iterable[bytes]) -> int:
- """Extract the ack type from a capabilities list."""
- if b"multi_ack_detailed" in capabilities:
- return MULTI_ACK_DETAILED
- elif b"multi_ack" in capabilities:
- return MULTI_ACK
- return SINGLE_ACK
- def find_capability(
- capabilities: Iterable[bytes], *capability_names: bytes
- ) -> bytes | None:
- """Find a capability value in a list of capabilities.
- This function looks for capabilities that may include arguments after an equals sign
- and returns only the value part (after the '='). For capabilities without values,
- returns the capability name itself.
- Args:
- capabilities: List of capability strings
- capability_names: Capability name(s) to search for
- Returns:
- The value after '=' if found, or the capability name if no '=', or None if not found
- Example:
- >>> caps = [b'filter=blob:none', b'agent=git/2.0', b'thin-pack']
- >>> find_capability(caps, b'filter')
- b'blob:none'
- >>> find_capability(caps, b'thin-pack')
- b'thin-pack'
- >>> find_capability(caps, b'missing')
- None
- """
- for cap in capabilities:
- for name in capability_names:
- if cap == name:
- return cap
- elif cap.startswith(name + b"="):
- return cap[len(name) + 1 :]
- return None
- class BufferedPktLineWriter:
- """Writer that wraps its data in pkt-lines and has an independent buffer.
- Consecutive calls to write() wrap the data in a pkt-line and then buffers
- it until enough lines have been written such that their total length
- (including length prefix) reach the buffer size.
- """
- def __init__(
- self, write: Callable[[bytes], int | None], bufsize: int = 65515
- ) -> None:
- """Initialize the BufferedPktLineWriter.
- Args:
- write: A write callback for the underlying writer.
- bufsize: The internal buffer size, including length prefixes.
- """
- self._write = write
- self._bufsize = bufsize
- self._wbuf = BytesIO()
- self._buflen = 0
- def write(self, data: bytes) -> None:
- """Write data, wrapping it in a pkt-line."""
- line = pkt_line(data)
- line_len = len(line)
- over = self._buflen + line_len - self._bufsize
- if over >= 0:
- start = line_len - over
- self._wbuf.write(line[:start])
- self.flush()
- else:
- start = 0
- saved = line[start:]
- self._wbuf.write(saved)
- self._buflen += len(saved)
- def flush(self) -> None:
- """Flush all data from the buffer."""
- data = self._wbuf.getvalue()
- if data:
- self._write(data)
- self._len = 0
- self._wbuf = BytesIO()
- class PktLineParser:
- """Packet line parser that hands completed packets off to a callback."""
- def __init__(self, handle_pkt: Callable[[bytes | None], None]) -> None:
- """Initialize PktLineParser.
- Args:
- handle_pkt: Callback function to handle completed packets
- """
- self.handle_pkt = handle_pkt
- self._readahead = BytesIO()
- def parse(self, data: bytes) -> None:
- """Parse a fragment of data and call back for any completed packets."""
- self._readahead.write(data)
- buf = self._readahead.getvalue()
- if len(buf) < 4:
- return
- while len(buf) >= 4:
- size = int(buf[:4], 16)
- if size == 0:
- self.handle_pkt(None)
- buf = buf[4:]
- elif size <= len(buf):
- self.handle_pkt(buf[4:size])
- buf = buf[size:]
- else:
- break
- self._readahead = BytesIO()
- self._readahead.write(buf)
- def get_tail(self) -> bytes:
- """Read back any unused data."""
- return self._readahead.getvalue()
- def format_capability_line(capabilities: Iterable[bytes]) -> bytes:
- """Format a capabilities list for the wire protocol.
- Args:
- capabilities: List of capability strings
- Returns:
- Space-separated capabilities as bytes
- """
- return b"".join([b" " + c for c in capabilities])
- def format_ref_line(
- ref: bytes, sha: bytes, capabilities: Sequence[bytes] | None = None
- ) -> bytes:
- """Format a ref advertisement line.
- Args:
- ref: Reference name
- sha: SHA hash
- capabilities: Optional list of capabilities
- Returns:
- Formatted ref line
- """
- if capabilities is None:
- return sha + b" " + ref + b"\n"
- else:
- return sha + b" " + ref + b"\0" + format_capability_line(capabilities) + b"\n"
- def format_shallow_line(sha: bytes) -> bytes:
- """Format a shallow line.
- Args:
- sha: SHA to mark as shallow
- Returns:
- Formatted shallow line
- """
- return COMMAND_SHALLOW + b" " + sha
- def format_unshallow_line(sha: bytes) -> bytes:
- """Format an unshallow line.
- Args:
- sha: SHA to unshallow
- Returns:
- Formatted unshallow line
- """
- return COMMAND_UNSHALLOW + b" " + sha
- def format_ack_line(sha: bytes, ack_type: bytes = b"") -> bytes:
- """Format an ACK line.
- Args:
- sha: SHA to acknowledge
- ack_type: Optional ACK type (e.g. b"continue")
- Returns:
- Formatted ACK line
- """
- if ack_type:
- ack_type = b" " + ack_type
- return b"ACK " + sha + ack_type + b"\n"
- def strip_peeled_refs(
- refs: "Mapping[Ref, ObjectID | None]",
- ) -> "dict[Ref, ObjectID | None]":
- """Remove all peeled refs from a refs dictionary.
- Args:
- refs: Dictionary of refs (may include peeled refs with ^{} suffix)
- Returns:
- Dictionary with peeled refs removed
- """
- return {
- ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
- }
- def split_peeled_refs(
- refs: "Mapping[Ref, ObjectID]",
- ) -> "tuple[dict[Ref, ObjectID], dict[Ref, ObjectID]]":
- """Split peeled refs from regular refs.
- Args:
- refs: Dictionary of refs (may include peeled refs with ^{} suffix)
- Returns:
- Tuple of (regular_refs, peeled_refs) where peeled_refs keys have
- the ^{} suffix removed
- """
- from .refs import Ref
- peeled: dict[Ref, ObjectID] = {}
- regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
- for ref, sha in refs.items():
- if ref.endswith(PEELED_TAG_SUFFIX):
- # Peeled refs are always ObjectID values
- peeled[Ref(ref[: -len(PEELED_TAG_SUFFIX)])] = sha
- return regular, peeled
- def write_info_refs(
- refs: "Mapping[Ref, ObjectID]", store: "ObjectContainer"
- ) -> "Iterator[bytes]":
- """Generate info refs in the format used by the dumb HTTP protocol.
- Args:
- refs: Dictionary of refs
- store: Object store to peel tags from
- Yields:
- Lines in info/refs format (sha + tab + refname)
- """
- from .object_store import peel_sha
- from .refs import HEADREF
- for name, sha in sorted(refs.items()):
- # get_refs() includes HEAD as a special case, but we don't want to
- # advertise it
- if name == HEADREF:
- continue
- try:
- o = store[sha]
- except KeyError:
- continue
- _unpeeled, peeled = peel_sha(store, sha)
- yield o.id + b"\t" + name + b"\n"
- if o.id != peeled.id:
- yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
- def serialize_refs(
- store: "ObjectContainer", refs: "Mapping[Ref, ObjectID]"
- ) -> "dict[bytes, ObjectID]":
- """Serialize refs with peeled refs for Git protocol v0/v1.
- This function is used to prepare refs for transmission over the Git protocol.
- For tags, it includes both the tag object and the dereferenced object.
- Args:
- store: Object store to peel refs from
- refs: Dictionary of ref names to SHAs
- Returns:
- Dictionary with refs and peeled refs (marked with ^{})
- """
- import warnings
- from .object_store import peel_sha
- from .objects import Tag
- ret: dict[bytes, ObjectID] = {}
- for ref, sha in refs.items():
- try:
- unpeeled, peeled = peel_sha(store, ObjectID(sha))
- except KeyError:
- warnings.warn(
- "ref {} points at non-present sha {}".format(
- ref.decode("utf-8", "replace"), sha.decode("ascii")
- ),
- UserWarning,
- )
- continue
- else:
- if isinstance(unpeeled, Tag):
- ret[ref + PEELED_TAG_SUFFIX] = peeled.id
- ret[ref] = unpeeled.id
- return ret
|