|
@@ -25,9 +25,19 @@
|
|
|
import os
|
|
import os
|
|
|
import types
|
|
import types
|
|
|
import warnings
|
|
import warnings
|
|
|
-from collections.abc import Iterator
|
|
|
|
|
|
|
+from collections.abc import Iterable, Iterator
|
|
|
from contextlib import suppress
|
|
from contextlib import suppress
|
|
|
-from typing import TYPE_CHECKING, Any, Optional, Union
|
|
|
|
|
|
|
+from typing import (
|
|
|
|
|
+ IO,
|
|
|
|
|
+ TYPE_CHECKING,
|
|
|
|
|
+ Any,
|
|
|
|
|
+ BinaryIO,
|
|
|
|
|
+ Callable,
|
|
|
|
|
+ Optional,
|
|
|
|
|
+ TypeVar,
|
|
|
|
|
+ Union,
|
|
|
|
|
+ cast,
|
|
|
|
|
+)
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
if TYPE_CHECKING:
|
|
|
from .file import _GitFile
|
|
from .file import _GitFile
|
|
@@ -55,13 +65,8 @@ ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
|
|
|
class SymrefLoop(Exception):
|
|
class SymrefLoop(Exception):
|
|
|
"""There is a loop between one or more symrefs."""
|
|
"""There is a loop between one or more symrefs."""
|
|
|
|
|
|
|
|
- def __init__(self, ref, depth) -> None:
|
|
|
|
|
- """Initialize a SymrefLoop exception.
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- ref: The ref that caused the loop
|
|
|
|
|
- depth: Depth at which the loop was detected
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def __init__(self, ref: bytes, depth: int) -> None:
|
|
|
|
|
+ """Initialize SymrefLoop exception."""
|
|
|
self.ref = ref
|
|
self.ref = ref
|
|
|
self.depth = depth
|
|
self.depth = depth
|
|
|
|
|
|
|
@@ -142,23 +147,35 @@ def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
|
|
|
class RefsContainer:
|
|
class RefsContainer:
|
|
|
"""A container for refs."""
|
|
"""A container for refs."""
|
|
|
|
|
|
|
|
- def __init__(self, logger=None) -> None:
|
|
|
|
|
- """Initialize a RefsContainer.
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- logger: Optional logger for reflog updates
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def __init__(
|
|
|
|
|
+ self,
|
|
|
|
|
+ logger: Optional[
|
|
|
|
|
+ Callable[
|
|
|
|
|
+ [
|
|
|
|
|
+ bytes,
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ Optional[int],
|
|
|
|
|
+ Optional[int],
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ ],
|
|
|
|
|
+ None,
|
|
|
|
|
+ ]
|
|
|
|
|
+ ] = None,
|
|
|
|
|
+ ) -> None:
|
|
|
|
|
+ """Initialize RefsContainer with optional logger function."""
|
|
|
self._logger = logger
|
|
self._logger = logger
|
|
|
|
|
|
|
|
def _log(
|
|
def _log(
|
|
|
self,
|
|
self,
|
|
|
- ref,
|
|
|
|
|
- old_sha,
|
|
|
|
|
- new_sha,
|
|
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
- message=None,
|
|
|
|
|
|
|
+ ref: bytes,
|
|
|
|
|
+ old_sha: Optional[bytes],
|
|
|
|
|
+ new_sha: Optional[bytes],
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> None:
|
|
) -> None:
|
|
|
if self._logger is None:
|
|
if self._logger is None:
|
|
|
return
|
|
return
|
|
@@ -168,12 +185,12 @@ class RefsContainer:
|
|
|
|
|
|
|
|
def set_symbolic_ref(
|
|
def set_symbolic_ref(
|
|
|
self,
|
|
self,
|
|
|
- name,
|
|
|
|
|
- other,
|
|
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
- message=None,
|
|
|
|
|
|
|
+ name: bytes,
|
|
|
|
|
+ other: bytes,
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> None:
|
|
) -> None:
|
|
|
"""Make a ref point at another ref.
|
|
"""Make a ref point at another ref.
|
|
|
|
|
|
|
@@ -206,7 +223,7 @@ class RefsContainer:
|
|
|
"""
|
|
"""
|
|
|
raise NotImplementedError(self.add_packed_refs)
|
|
raise NotImplementedError(self.add_packed_refs)
|
|
|
|
|
|
|
|
- def get_peeled(self, name) -> Optional[ObjectID]:
|
|
|
|
|
|
|
+ def get_peeled(self, name: bytes) -> Optional[ObjectID]:
|
|
|
"""Return the cached peeled value of a ref, if available.
|
|
"""Return the cached peeled value of a ref, if available.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -257,12 +274,12 @@ class RefsContainer:
|
|
|
for ref in to_delete:
|
|
for ref in to_delete:
|
|
|
self.remove_if_equals(b"/".join((base, ref)), None, message=message)
|
|
self.remove_if_equals(b"/".join((base, ref)), None, message=message)
|
|
|
|
|
|
|
|
- def allkeys(self) -> Iterator[Ref]:
|
|
|
|
|
|
|
+ def allkeys(self) -> set[Ref]:
|
|
|
"""All refs present in this container."""
|
|
"""All refs present in this container."""
|
|
|
raise NotImplementedError(self.allkeys)
|
|
raise NotImplementedError(self.allkeys)
|
|
|
|
|
|
|
|
- def __iter__(self):
|
|
|
|
|
- """Iterate over all ref names."""
|
|
|
|
|
|
|
+ def __iter__(self) -> Iterator[Ref]:
|
|
|
|
|
+ """Iterate over all reference keys."""
|
|
|
return iter(self.allkeys())
|
|
return iter(self.allkeys())
|
|
|
|
|
|
|
|
def keys(self, base=None):
|
|
def keys(self, base=None):
|
|
@@ -278,7 +295,7 @@ class RefsContainer:
|
|
|
else:
|
|
else:
|
|
|
return self.allkeys()
|
|
return self.allkeys()
|
|
|
|
|
|
|
|
- def subkeys(self, base):
|
|
|
|
|
|
|
+ def subkeys(self, base: bytes) -> set[bytes]:
|
|
|
"""Refs present in this container under a base.
|
|
"""Refs present in this container under a base.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -293,7 +310,7 @@ class RefsContainer:
|
|
|
keys.add(refname[base_len:])
|
|
keys.add(refname[base_len:])
|
|
|
return keys
|
|
return keys
|
|
|
|
|
|
|
|
- def as_dict(self, base=None) -> dict[Ref, ObjectID]:
|
|
|
|
|
|
|
+ def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]:
|
|
|
"""Return the contents of this container as a dictionary."""
|
|
"""Return the contents of this container as a dictionary."""
|
|
|
ret = {}
|
|
ret = {}
|
|
|
keys = self.keys(base)
|
|
keys = self.keys(base)
|
|
@@ -309,7 +326,7 @@ class RefsContainer:
|
|
|
|
|
|
|
|
return ret
|
|
return ret
|
|
|
|
|
|
|
|
- def _check_refname(self, name) -> None:
|
|
|
|
|
|
|
+ def _check_refname(self, name: bytes) -> None:
|
|
|
"""Ensure a refname is valid and lives in refs or is HEAD.
|
|
"""Ensure a refname is valid and lives in refs or is HEAD.
|
|
|
|
|
|
|
|
HEAD is not a valid refname according to git-check-ref-format, but this
|
|
HEAD is not a valid refname according to git-check-ref-format, but this
|
|
@@ -328,7 +345,7 @@ class RefsContainer:
|
|
|
if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
|
|
if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
|
|
|
raise RefFormatError(name)
|
|
raise RefFormatError(name)
|
|
|
|
|
|
|
|
- def read_ref(self, refname):
|
|
|
|
|
|
|
+ def read_ref(self, refname: bytes) -> Optional[bytes]:
|
|
|
"""Read a reference without following any references.
|
|
"""Read a reference without following any references.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -341,7 +358,7 @@ class RefsContainer:
|
|
|
contents = self.get_packed_refs().get(refname, None)
|
|
contents = self.get_packed_refs().get(refname, None)
|
|
|
return contents
|
|
return contents
|
|
|
|
|
|
|
|
- def read_loose_ref(self, name) -> bytes:
|
|
|
|
|
|
|
+ def read_loose_ref(self, name: bytes) -> Optional[bytes]:
|
|
|
"""Read a loose reference and return its contents.
|
|
"""Read a loose reference and return its contents.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -351,16 +368,16 @@ class RefsContainer:
|
|
|
"""
|
|
"""
|
|
|
raise NotImplementedError(self.read_loose_ref)
|
|
raise NotImplementedError(self.read_loose_ref)
|
|
|
|
|
|
|
|
- def follow(self, name) -> tuple[list[bytes], bytes]:
|
|
|
|
|
|
|
+ def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]:
|
|
|
"""Follow a reference name.
|
|
"""Follow a reference name.
|
|
|
|
|
|
|
|
Returns: a tuple of (refnames, sha), wheres refnames are the names of
|
|
Returns: a tuple of (refnames, sha), wheres refnames are the names of
|
|
|
references in the chain
|
|
references in the chain
|
|
|
"""
|
|
"""
|
|
|
- contents = SYMREF + name
|
|
|
|
|
|
|
+ contents: Optional[bytes] = SYMREF + name
|
|
|
depth = 0
|
|
depth = 0
|
|
|
refnames = []
|
|
refnames = []
|
|
|
- while contents.startswith(SYMREF):
|
|
|
|
|
|
|
+ while contents and contents.startswith(SYMREF):
|
|
|
refname = contents[len(SYMREF) :]
|
|
refname = contents[len(SYMREF) :]
|
|
|
refnames.append(refname)
|
|
refnames.append(refname)
|
|
|
contents = self.read_ref(refname)
|
|
contents = self.read_ref(refname)
|
|
@@ -371,20 +388,13 @@ class RefsContainer:
|
|
|
raise SymrefLoop(name, depth)
|
|
raise SymrefLoop(name, depth)
|
|
|
return refnames, contents
|
|
return refnames, contents
|
|
|
|
|
|
|
|
- def __contains__(self, refname) -> bool:
|
|
|
|
|
- """Check if a ref exists.
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- refname: Name of the ref to check
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- True if the ref exists
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def __contains__(self, refname: bytes) -> bool:
|
|
|
|
|
+ """Check if a reference exists."""
|
|
|
if self.read_ref(refname):
|
|
if self.read_ref(refname):
|
|
|
return True
|
|
return True
|
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
- def __getitem__(self, name) -> ObjectID:
|
|
|
|
|
|
|
+ def __getitem__(self, name: bytes) -> ObjectID:
|
|
|
"""Get the SHA1 for a reference name.
|
|
"""Get the SHA1 for a reference name.
|
|
|
|
|
|
|
|
This method follows all symbolic references.
|
|
This method follows all symbolic references.
|
|
@@ -396,13 +406,13 @@ class RefsContainer:
|
|
|
|
|
|
|
|
def set_if_equals(
|
|
def set_if_equals(
|
|
|
self,
|
|
self,
|
|
|
- name,
|
|
|
|
|
- old_ref,
|
|
|
|
|
- new_ref,
|
|
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
- message=None,
|
|
|
|
|
|
|
+ name: bytes,
|
|
|
|
|
+ old_ref: Optional[bytes],
|
|
|
|
|
+ new_ref: bytes,
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> bool:
|
|
) -> bool:
|
|
|
"""Set a refname to new_ref only if it currently equals old_ref.
|
|
"""Set a refname to new_ref only if it currently equals old_ref.
|
|
|
|
|
|
|
@@ -424,7 +434,13 @@ class RefsContainer:
|
|
|
raise NotImplementedError(self.set_if_equals)
|
|
raise NotImplementedError(self.set_if_equals)
|
|
|
|
|
|
|
|
def add_if_new(
|
|
def add_if_new(
|
|
|
- self, name, ref, committer=None, timestamp=None, timezone=None, message=None
|
|
|
|
|
|
|
+ self,
|
|
|
|
|
+ name: bytes,
|
|
|
|
|
+ ref: bytes,
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> bool:
|
|
) -> bool:
|
|
|
"""Add a new reference only if it does not already exist.
|
|
"""Add a new reference only if it does not already exist.
|
|
|
|
|
|
|
@@ -438,7 +454,7 @@ class RefsContainer:
|
|
|
"""
|
|
"""
|
|
|
raise NotImplementedError(self.add_if_new)
|
|
raise NotImplementedError(self.add_if_new)
|
|
|
|
|
|
|
|
- def __setitem__(self, name, ref) -> None:
|
|
|
|
|
|
|
+ def __setitem__(self, name: bytes, ref: bytes) -> None:
|
|
|
"""Set a reference name to point to the given SHA1.
|
|
"""Set a reference name to point to the given SHA1.
|
|
|
|
|
|
|
|
This method follows all symbolic references if applicable for the
|
|
This method follows all symbolic references if applicable for the
|
|
@@ -458,12 +474,12 @@ class RefsContainer:
|
|
|
|
|
|
|
|
def remove_if_equals(
|
|
def remove_if_equals(
|
|
|
self,
|
|
self,
|
|
|
- name,
|
|
|
|
|
- old_ref,
|
|
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
- message=None,
|
|
|
|
|
|
|
+ name: bytes,
|
|
|
|
|
+ old_ref: Optional[bytes],
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> bool:
|
|
) -> bool:
|
|
|
"""Remove a refname only if it currently equals old_ref.
|
|
"""Remove a refname only if it currently equals old_ref.
|
|
|
|
|
|
|
@@ -483,7 +499,7 @@ class RefsContainer:
|
|
|
"""
|
|
"""
|
|
|
raise NotImplementedError(self.remove_if_equals)
|
|
raise NotImplementedError(self.remove_if_equals)
|
|
|
|
|
|
|
|
- def __delitem__(self, name) -> None:
|
|
|
|
|
|
|
+ def __delitem__(self, name: bytes) -> None:
|
|
|
"""Remove a refname.
|
|
"""Remove a refname.
|
|
|
|
|
|
|
|
This method does not follow symbolic references, even if applicable for
|
|
This method does not follow symbolic references, even if applicable for
|
|
@@ -498,7 +514,7 @@ class RefsContainer:
|
|
|
"""
|
|
"""
|
|
|
self.remove_if_equals(name, None)
|
|
self.remove_if_equals(name, None)
|
|
|
|
|
|
|
|
- def get_symrefs(self):
|
|
|
|
|
|
|
+ def get_symrefs(self) -> dict[bytes, bytes]:
|
|
|
"""Get a dict with all symrefs in this container.
|
|
"""Get a dict with all symrefs in this container.
|
|
|
|
|
|
|
|
Returns: Dictionary mapping source ref to target ref
|
|
Returns: Dictionary mapping source ref to target ref
|
|
@@ -506,7 +522,9 @@ class RefsContainer:
|
|
|
ret = {}
|
|
ret = {}
|
|
|
for src in self.allkeys():
|
|
for src in self.allkeys():
|
|
|
try:
|
|
try:
|
|
|
- dst = parse_symref_value(self.read_ref(src))
|
|
|
|
|
|
|
+ ref_value = self.read_ref(src)
|
|
|
|
|
+ assert ref_value is not None
|
|
|
|
|
+ dst = parse_symref_value(ref_value)
|
|
|
except ValueError:
|
|
except ValueError:
|
|
|
pass
|
|
pass
|
|
|
else:
|
|
else:
|
|
@@ -529,41 +547,43 @@ class DictRefsContainer(RefsContainer):
|
|
|
threadsafe.
|
|
threadsafe.
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
- def __init__(self, refs, logger=None) -> None:
|
|
|
|
|
- """Initialize DictRefsContainer."""
|
|
|
|
|
|
|
+ def __init__(
|
|
|
|
|
+ self,
|
|
|
|
|
+ refs: dict[bytes, bytes],
|
|
|
|
|
+ logger: Optional[
|
|
|
|
|
+ Callable[
|
|
|
|
|
+ [
|
|
|
|
|
+ bytes,
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ Optional[int],
|
|
|
|
|
+ Optional[int],
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ ],
|
|
|
|
|
+ None,
|
|
|
|
|
+ ]
|
|
|
|
|
+ ] = None,
|
|
|
|
|
+ ) -> None:
|
|
|
|
|
+ """Initialize DictRefsContainer with refs dictionary and optional logger."""
|
|
|
super().__init__(logger=logger)
|
|
super().__init__(logger=logger)
|
|
|
self._refs = refs
|
|
self._refs = refs
|
|
|
self._peeled: dict[bytes, ObjectID] = {}
|
|
self._peeled: dict[bytes, ObjectID] = {}
|
|
|
self._watchers: set[Any] = set()
|
|
self._watchers: set[Any] = set()
|
|
|
|
|
|
|
|
- def allkeys(self):
|
|
|
|
|
- """Get all ref names.
|
|
|
|
|
|
|
+ def allkeys(self) -> set[bytes]:
|
|
|
|
|
+ """Return all reference keys."""
|
|
|
|
|
+ return set(self._refs.keys())
|
|
|
|
|
|
|
|
- Returns:
|
|
|
|
|
- All ref names in the container
|
|
|
|
|
- """
|
|
|
|
|
- return self._refs.keys()
|
|
|
|
|
-
|
|
|
|
|
- def read_loose_ref(self, name):
|
|
|
|
|
- """Read a reference from the refs dictionary.
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- name: The ref name to read
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- The ref value or None if not found
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def read_loose_ref(self, name: bytes) -> Optional[bytes]:
|
|
|
|
|
+ """Read a loose reference."""
|
|
|
return self._refs.get(name, None)
|
|
return self._refs.get(name, None)
|
|
|
|
|
|
|
|
- def get_packed_refs(self):
|
|
|
|
|
- """Get packed refs (always empty for DictRefsContainer).
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- Empty dictionary
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def get_packed_refs(self) -> dict[bytes, bytes]:
|
|
|
|
|
+ """Get packed references."""
|
|
|
return {}
|
|
return {}
|
|
|
|
|
|
|
|
- def _notify(self, ref, newsha) -> None:
|
|
|
|
|
|
|
+ def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None:
|
|
|
for watcher in self._watchers:
|
|
for watcher in self._watchers:
|
|
|
watcher._notify((ref, newsha))
|
|
watcher._notify((ref, newsha))
|
|
|
|
|
|
|
@@ -571,10 +591,10 @@ class DictRefsContainer(RefsContainer):
|
|
|
self,
|
|
self,
|
|
|
name: Ref,
|
|
name: Ref,
|
|
|
other: Ref,
|
|
other: Ref,
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
- message=None,
|
|
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> None:
|
|
) -> None:
|
|
|
"""Make a ref point at another ref.
|
|
"""Make a ref point at another ref.
|
|
|
|
|
|
|
@@ -602,13 +622,13 @@ class DictRefsContainer(RefsContainer):
|
|
|
|
|
|
|
|
def set_if_equals(
|
|
def set_if_equals(
|
|
|
self,
|
|
self,
|
|
|
- name,
|
|
|
|
|
- old_ref,
|
|
|
|
|
- new_ref,
|
|
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
- message=None,
|
|
|
|
|
|
|
+ name: bytes,
|
|
|
|
|
+ old_ref: Optional[bytes],
|
|
|
|
|
+ new_ref: bytes,
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> bool:
|
|
) -> bool:
|
|
|
"""Set a refname to new_ref only if it currently equals old_ref.
|
|
"""Set a refname to new_ref only if it currently equals old_ref.
|
|
|
|
|
|
|
@@ -650,9 +670,9 @@ class DictRefsContainer(RefsContainer):
|
|
|
self,
|
|
self,
|
|
|
name: Ref,
|
|
name: Ref,
|
|
|
ref: ObjectID,
|
|
ref: ObjectID,
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
message: Optional[bytes] = None,
|
|
message: Optional[bytes] = None,
|
|
|
) -> bool:
|
|
) -> bool:
|
|
|
"""Add a new reference only if it does not already exist.
|
|
"""Add a new reference only if it does not already exist.
|
|
@@ -685,12 +705,12 @@ class DictRefsContainer(RefsContainer):
|
|
|
|
|
|
|
|
def remove_if_equals(
|
|
def remove_if_equals(
|
|
|
self,
|
|
self,
|
|
|
- name,
|
|
|
|
|
- old_ref,
|
|
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
- message=None,
|
|
|
|
|
|
|
+ name: bytes,
|
|
|
|
|
+ old_ref: Optional[bytes],
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> bool:
|
|
) -> bool:
|
|
|
"""Remove a refname only if it currently equals old_ref.
|
|
"""Remove a refname only if it currently equals old_ref.
|
|
|
|
|
|
|
@@ -728,25 +748,18 @@ class DictRefsContainer(RefsContainer):
|
|
|
)
|
|
)
|
|
|
return True
|
|
return True
|
|
|
|
|
|
|
|
- def get_peeled(self, name):
|
|
|
|
|
- """Get the peeled value of a ref.
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- name: Ref name to get peeled value for
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- The peeled SHA or None if not available
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def get_peeled(self, name: bytes) -> Optional[bytes]:
|
|
|
|
|
+ """Get peeled version of a reference."""
|
|
|
return self._peeled.get(name)
|
|
return self._peeled.get(name)
|
|
|
|
|
|
|
|
- def _update(self, refs) -> None:
|
|
|
|
|
|
|
+ def _update(self, refs: dict[bytes, bytes]) -> None:
|
|
|
"""Update multiple refs; intended only for testing."""
|
|
"""Update multiple refs; intended only for testing."""
|
|
|
# TODO(dborowitz): replace this with a public function that uses
|
|
# TODO(dborowitz): replace this with a public function that uses
|
|
|
# set_if_equal.
|
|
# set_if_equal.
|
|
|
for ref, sha in refs.items():
|
|
for ref, sha in refs.items():
|
|
|
self.set_if_equals(ref, None, sha)
|
|
self.set_if_equals(ref, None, sha)
|
|
|
|
|
|
|
|
- def _update_peeled(self, peeled) -> None:
|
|
|
|
|
|
|
+ def _update_peeled(self, peeled: dict[bytes, bytes]) -> None:
|
|
|
"""Update cached peeled refs; intended only for testing."""
|
|
"""Update cached peeled refs; intended only for testing."""
|
|
|
self._peeled.update(peeled)
|
|
self._peeled.update(peeled)
|
|
|
|
|
|
|
@@ -754,56 +767,27 @@ class DictRefsContainer(RefsContainer):
|
|
|
class InfoRefsContainer(RefsContainer):
|
|
class InfoRefsContainer(RefsContainer):
|
|
|
"""Refs container that reads refs from a info/refs file."""
|
|
"""Refs container that reads refs from a info/refs file."""
|
|
|
|
|
|
|
|
- def __init__(self, f) -> None:
|
|
|
|
|
- """Initialize an InfoRefsContainer.
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- f: File-like object containing info/refs data
|
|
|
|
|
- """
|
|
|
|
|
- self._refs = {}
|
|
|
|
|
- self._peeled = {}
|
|
|
|
|
|
|
+ def __init__(self, f: BinaryIO) -> None:
|
|
|
|
|
+ """Initialize InfoRefsContainer from info/refs file."""
|
|
|
|
|
+ self._refs: dict[bytes, bytes] = {}
|
|
|
|
|
+ self._peeled: dict[bytes, bytes] = {}
|
|
|
refs = read_info_refs(f)
|
|
refs = read_info_refs(f)
|
|
|
(self._refs, self._peeled) = split_peeled_refs(refs)
|
|
(self._refs, self._peeled) = split_peeled_refs(refs)
|
|
|
|
|
|
|
|
- def allkeys(self):
|
|
|
|
|
- """Get all ref names.
|
|
|
|
|
|
|
+ def allkeys(self) -> set[bytes]:
|
|
|
|
|
+ """Return all reference keys."""
|
|
|
|
|
+ return set(self._refs.keys())
|
|
|
|
|
|
|
|
- Returns:
|
|
|
|
|
- All ref names in the info/refs file
|
|
|
|
|
- """
|
|
|
|
|
- return self._refs.keys()
|
|
|
|
|
-
|
|
|
|
|
- def read_loose_ref(self, name):
|
|
|
|
|
- """Read a reference from the parsed info/refs.
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- name: The ref name to read
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- The ref value or None if not found
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def read_loose_ref(self, name: bytes) -> Optional[bytes]:
|
|
|
|
|
+ """Read a loose reference."""
|
|
|
return self._refs.get(name, None)
|
|
return self._refs.get(name, None)
|
|
|
|
|
|
|
|
- def get_packed_refs(self):
|
|
|
|
|
- """Get packed refs (always empty for InfoRefsContainer).
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- Empty dictionary
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def get_packed_refs(self) -> dict[bytes, bytes]:
|
|
|
|
|
+ """Get packed references."""
|
|
|
return {}
|
|
return {}
|
|
|
|
|
|
|
|
- def get_peeled(self, name):
|
|
|
|
|
- """Get the peeled value of a ref.
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- name: Ref name to get peeled value for
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- The peeled SHA if available, otherwise the ref value itself
|
|
|
|
|
-
|
|
|
|
|
- Raises:
|
|
|
|
|
- KeyError: If the ref doesn't exist
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def get_peeled(self, name: bytes) -> Optional[bytes]:
|
|
|
|
|
+ """Get peeled version of a reference."""
|
|
|
try:
|
|
try:
|
|
|
return self._peeled[name]
|
|
return self._peeled[name]
|
|
|
except KeyError:
|
|
except KeyError:
|
|
@@ -817,7 +801,20 @@ class DiskRefsContainer(RefsContainer):
|
|
|
self,
|
|
self,
|
|
|
path: Union[str, bytes, os.PathLike],
|
|
path: Union[str, bytes, os.PathLike],
|
|
|
worktree_path: Optional[Union[str, bytes, os.PathLike]] = None,
|
|
worktree_path: Optional[Union[str, bytes, os.PathLike]] = None,
|
|
|
- logger=None,
|
|
|
|
|
|
|
+ logger: Optional[
|
|
|
|
|
+ Callable[
|
|
|
|
|
+ [
|
|
|
|
|
+ bytes,
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ Optional[int],
|
|
|
|
|
+ Optional[int],
|
|
|
|
|
+ Optional[bytes],
|
|
|
|
|
+ ],
|
|
|
|
|
+ None,
|
|
|
|
|
+ ]
|
|
|
|
|
+ ] = None,
|
|
|
) -> None:
|
|
) -> None:
|
|
|
"""Initialize DiskRefsContainer."""
|
|
"""Initialize DiskRefsContainer."""
|
|
|
super().__init__(logger=logger)
|
|
super().__init__(logger=logger)
|
|
@@ -827,22 +824,15 @@ class DiskRefsContainer(RefsContainer):
|
|
|
self.worktree_path = self.path
|
|
self.worktree_path = self.path
|
|
|
else:
|
|
else:
|
|
|
self.worktree_path = os.fsencode(os.fspath(worktree_path))
|
|
self.worktree_path = os.fsencode(os.fspath(worktree_path))
|
|
|
- self._packed_refs = None
|
|
|
|
|
- self._peeled_refs = None
|
|
|
|
|
|
|
+ self._packed_refs: Optional[dict[bytes, bytes]] = None
|
|
|
|
|
+ self._peeled_refs: Optional[dict[bytes, bytes]] = None
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
def __repr__(self) -> str:
|
|
|
"""Return string representation of DiskRefsContainer."""
|
|
"""Return string representation of DiskRefsContainer."""
|
|
|
return f"{self.__class__.__name__}({self.path!r})"
|
|
return f"{self.__class__.__name__}({self.path!r})"
|
|
|
|
|
|
|
|
- def subkeys(self, base):
|
|
|
|
|
- """Get all ref names under a base ref.
|
|
|
|
|
-
|
|
|
|
|
- Args:
|
|
|
|
|
- base: Base ref path to search under
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- Set of ref names under the base (without base prefix)
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def subkeys(self, base: bytes) -> set[bytes]:
|
|
|
|
|
+ """Return subkeys under a given base reference path."""
|
|
|
subkeys = set()
|
|
subkeys = set()
|
|
|
path = self.refpath(base)
|
|
path = self.refpath(base)
|
|
|
for root, unused_dirs, files in os.walk(path):
|
|
for root, unused_dirs, files in os.walk(path):
|
|
@@ -861,12 +851,8 @@ class DiskRefsContainer(RefsContainer):
|
|
|
subkeys.add(key[len(base) :].strip(b"/"))
|
|
subkeys.add(key[len(base) :].strip(b"/"))
|
|
|
return subkeys
|
|
return subkeys
|
|
|
|
|
|
|
|
- def allkeys(self):
|
|
|
|
|
- """Get all ref names from disk.
|
|
|
|
|
-
|
|
|
|
|
- Returns:
|
|
|
|
|
- Set of all ref names (both loose and packed)
|
|
|
|
|
- """
|
|
|
|
|
|
|
+ def allkeys(self) -> set[bytes]:
|
|
|
|
|
+ """Return all reference keys."""
|
|
|
allkeys = set()
|
|
allkeys = set()
|
|
|
if os.path.exists(self.refpath(HEADREF)):
|
|
if os.path.exists(self.refpath(HEADREF)):
|
|
|
allkeys.add(HEADREF)
|
|
allkeys.add(HEADREF)
|
|
@@ -883,7 +869,7 @@ class DiskRefsContainer(RefsContainer):
|
|
|
allkeys.update(self.get_packed_refs())
|
|
allkeys.update(self.get_packed_refs())
|
|
|
return allkeys
|
|
return allkeys
|
|
|
|
|
|
|
|
- def refpath(self, name):
|
|
|
|
|
|
|
+ def refpath(self, name: bytes) -> bytes:
|
|
|
"""Return the disk path of a ref."""
|
|
"""Return the disk path of a ref."""
|
|
|
if os.path.sep != "/":
|
|
if os.path.sep != "/":
|
|
|
name = name.replace(b"/", os.fsencode(os.path.sep))
|
|
name = name.replace(b"/", os.fsencode(os.path.sep))
|
|
@@ -894,7 +880,7 @@ class DiskRefsContainer(RefsContainer):
|
|
|
else:
|
|
else:
|
|
|
return os.path.join(self.path, name)
|
|
return os.path.join(self.path, name)
|
|
|
|
|
|
|
|
- def get_packed_refs(self):
|
|
|
|
|
|
|
+ def get_packed_refs(self) -> dict[bytes, bytes]:
|
|
|
"""Get contents of the packed-refs file.
|
|
"""Get contents of the packed-refs file.
|
|
|
|
|
|
|
|
Returns: Dictionary mapping ref names to SHA1s
|
|
Returns: Dictionary mapping ref names to SHA1s
|
|
@@ -962,7 +948,7 @@ class DiskRefsContainer(RefsContainer):
|
|
|
|
|
|
|
|
self._packed_refs = packed_refs
|
|
self._packed_refs = packed_refs
|
|
|
|
|
|
|
|
- def get_peeled(self, name):
|
|
|
|
|
|
|
+ def get_peeled(self, name: bytes) -> Optional[bytes]:
|
|
|
"""Return the cached peeled value of a ref, if available.
|
|
"""Return the cached peeled value of a ref, if available.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -972,7 +958,11 @@ class DiskRefsContainer(RefsContainer):
|
|
|
to a tag, but no cached information is available, None is returned.
|
|
to a tag, but no cached information is available, None is returned.
|
|
|
"""
|
|
"""
|
|
|
self.get_packed_refs()
|
|
self.get_packed_refs()
|
|
|
- if self._peeled_refs is None or name not in self._packed_refs:
|
|
|
|
|
|
|
+ if (
|
|
|
|
|
+ self._peeled_refs is None
|
|
|
|
|
+ or self._packed_refs is None
|
|
|
|
|
+ or name not in self._packed_refs
|
|
|
|
|
+ ):
|
|
|
# No cache: no peeled refs were read, or this ref is loose
|
|
# No cache: no peeled refs were read, or this ref is loose
|
|
|
return None
|
|
return None
|
|
|
if name in self._peeled_refs:
|
|
if name in self._peeled_refs:
|
|
@@ -981,7 +971,7 @@ class DiskRefsContainer(RefsContainer):
|
|
|
# Known not peelable
|
|
# Known not peelable
|
|
|
return self[name]
|
|
return self[name]
|
|
|
|
|
|
|
|
- def read_loose_ref(self, name):
|
|
|
|
|
|
|
+ def read_loose_ref(self, name: bytes) -> Optional[bytes]:
|
|
|
"""Read a reference file and return its contents.
|
|
"""Read a reference file and return its contents.
|
|
|
|
|
|
|
|
If the reference file a symbolic reference, only read the first line of
|
|
If the reference file a symbolic reference, only read the first line of
|
|
@@ -1011,7 +1001,7 @@ class DiskRefsContainer(RefsContainer):
|
|
|
# errors depending on the specific operating system
|
|
# errors depending on the specific operating system
|
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
- def _remove_packed_ref(self, name) -> None:
|
|
|
|
|
|
|
+ def _remove_packed_ref(self, name: bytes) -> None:
|
|
|
if self._packed_refs is None:
|
|
if self._packed_refs is None:
|
|
|
return
|
|
return
|
|
|
filename = os.path.join(self.path, b"packed-refs")
|
|
filename = os.path.join(self.path, b"packed-refs")
|
|
@@ -1021,13 +1011,14 @@ class DiskRefsContainer(RefsContainer):
|
|
|
self._packed_refs = None
|
|
self._packed_refs = None
|
|
|
self.get_packed_refs()
|
|
self.get_packed_refs()
|
|
|
|
|
|
|
|
- if name not in self._packed_refs:
|
|
|
|
|
|
|
+ if self._packed_refs is None or name not in self._packed_refs:
|
|
|
f.abort()
|
|
f.abort()
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
del self._packed_refs[name]
|
|
del self._packed_refs[name]
|
|
|
- with suppress(KeyError):
|
|
|
|
|
- del self._peeled_refs[name]
|
|
|
|
|
|
|
+ if self._peeled_refs is not None:
|
|
|
|
|
+ with suppress(KeyError):
|
|
|
|
|
+ del self._peeled_refs[name]
|
|
|
write_packed_refs(f, self._packed_refs, self._peeled_refs)
|
|
write_packed_refs(f, self._packed_refs, self._peeled_refs)
|
|
|
f.close()
|
|
f.close()
|
|
|
except BaseException:
|
|
except BaseException:
|
|
@@ -1036,12 +1027,12 @@ class DiskRefsContainer(RefsContainer):
|
|
|
|
|
|
|
|
def set_symbolic_ref(
|
|
def set_symbolic_ref(
|
|
|
self,
|
|
self,
|
|
|
- name,
|
|
|
|
|
- other,
|
|
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
- message=None,
|
|
|
|
|
|
|
+ name: bytes,
|
|
|
|
|
+ other: bytes,
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> None:
|
|
) -> None:
|
|
|
"""Make a ref point at another ref.
|
|
"""Make a ref point at another ref.
|
|
|
|
|
|
|
@@ -1077,13 +1068,13 @@ class DiskRefsContainer(RefsContainer):
|
|
|
|
|
|
|
|
def set_if_equals(
|
|
def set_if_equals(
|
|
|
self,
|
|
self,
|
|
|
- name,
|
|
|
|
|
- old_ref,
|
|
|
|
|
- new_ref,
|
|
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
- message=None,
|
|
|
|
|
|
|
+ name: bytes,
|
|
|
|
|
+ old_ref: Optional[bytes],
|
|
|
|
|
+ new_ref: bytes,
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> bool:
|
|
) -> bool:
|
|
|
"""Set a refname to new_ref only if it currently equals old_ref.
|
|
"""Set a refname to new_ref only if it currently equals old_ref.
|
|
|
|
|
|
|
@@ -1163,9 +1154,9 @@ class DiskRefsContainer(RefsContainer):
|
|
|
self,
|
|
self,
|
|
|
name: bytes,
|
|
name: bytes,
|
|
|
ref: bytes,
|
|
ref: bytes,
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
message: Optional[bytes] = None,
|
|
message: Optional[bytes] = None,
|
|
|
) -> bool:
|
|
) -> bool:
|
|
|
"""Add a new reference only if it does not already exist.
|
|
"""Add a new reference only if it does not already exist.
|
|
@@ -1215,12 +1206,12 @@ class DiskRefsContainer(RefsContainer):
|
|
|
|
|
|
|
|
def remove_if_equals(
|
|
def remove_if_equals(
|
|
|
self,
|
|
self,
|
|
|
- name,
|
|
|
|
|
- old_ref,
|
|
|
|
|
- committer=None,
|
|
|
|
|
- timestamp=None,
|
|
|
|
|
- timezone=None,
|
|
|
|
|
- message=None,
|
|
|
|
|
|
|
+ name: bytes,
|
|
|
|
|
+ old_ref: Optional[bytes],
|
|
|
|
|
+ committer: Optional[bytes] = None,
|
|
|
|
|
+ timestamp: Optional[int] = None,
|
|
|
|
|
+ timezone: Optional[int] = None,
|
|
|
|
|
+ message: Optional[bytes] = None,
|
|
|
) -> bool:
|
|
) -> bool:
|
|
|
"""Remove a refname only if it currently equals old_ref.
|
|
"""Remove a refname only if it currently equals old_ref.
|
|
|
|
|
|
|
@@ -1321,7 +1312,7 @@ class DiskRefsContainer(RefsContainer):
|
|
|
self.add_packed_refs(refs_to_pack)
|
|
self.add_packed_refs(refs_to_pack)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def _split_ref_line(line):
|
|
|
|
|
|
|
+def _split_ref_line(line: bytes) -> tuple[bytes, bytes]:
|
|
|
"""Split a single ref line into a tuple of SHA1 and name."""
|
|
"""Split a single ref line into a tuple of SHA1 and name."""
|
|
|
fields = line.rstrip(b"\n\r").split(b" ")
|
|
fields = line.rstrip(b"\n\r").split(b" ")
|
|
|
if len(fields) != 2:
|
|
if len(fields) != 2:
|
|
@@ -1334,7 +1325,7 @@ def _split_ref_line(line):
|
|
|
return (sha, name)
|
|
return (sha, name)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def read_packed_refs(f):
|
|
|
|
|
|
|
+def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]:
|
|
|
"""Read a packed refs file.
|
|
"""Read a packed refs file.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -1350,7 +1341,9 @@ def read_packed_refs(f):
|
|
|
yield _split_ref_line(line)
|
|
yield _split_ref_line(line)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def read_packed_refs_with_peeled(f):
|
|
|
|
|
|
|
+def read_packed_refs_with_peeled(
|
|
|
|
|
+ f: IO[bytes],
|
|
|
|
|
+) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]:
|
|
|
"""Read a packed refs file including peeled refs.
|
|
"""Read a packed refs file including peeled refs.
|
|
|
|
|
|
|
|
Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
|
|
Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
|
|
@@ -1382,7 +1375,11 @@ def read_packed_refs_with_peeled(f):
|
|
|
yield (sha, name, None)
|
|
yield (sha, name, None)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def write_packed_refs(f, packed_refs, peeled_refs=None) -> None:
|
|
|
|
|
|
|
+def write_packed_refs(
|
|
|
|
|
+ f: IO[bytes],
|
|
|
|
|
+ packed_refs: dict[bytes, bytes],
|
|
|
|
|
+ peeled_refs: Optional[dict[bytes, bytes]] = None,
|
|
|
|
|
+) -> None:
|
|
|
"""Write a packed refs file.
|
|
"""Write a packed refs file.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -1400,7 +1397,7 @@ def write_packed_refs(f, packed_refs, peeled_refs=None) -> None:
|
|
|
f.write(b"^" + peeled_refs[refname] + b"\n")
|
|
f.write(b"^" + peeled_refs[refname] + b"\n")
|
|
|
|
|
|
|
|
|
|
|
|
|
-def read_info_refs(f):
|
|
|
|
|
|
|
+def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]:
|
|
|
"""Read info/refs file.
|
|
"""Read info/refs file.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -1416,7 +1413,9 @@ def read_info_refs(f):
|
|
|
return ret
|
|
return ret
|
|
|
|
|
|
|
|
|
|
|
|
|
-def write_info_refs(refs, store: ObjectContainer):
|
|
|
|
|
|
|
+def write_info_refs(
|
|
|
|
|
+ refs: dict[bytes, bytes], store: ObjectContainer
|
|
|
|
|
+) -> Iterator[bytes]:
|
|
|
"""Generate info refs."""
|
|
"""Generate info refs."""
|
|
|
# TODO: Avoid recursive import :(
|
|
# TODO: Avoid recursive import :(
|
|
|
from .object_store import peel_sha
|
|
from .object_store import peel_sha
|
|
@@ -1436,38 +1435,38 @@ def write_info_refs(refs, store: ObjectContainer):
|
|
|
yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
|
|
yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
|
|
|
|
|
|
|
|
|
|
|
|
|
-def is_local_branch(x):
|
|
|
|
|
- """Check if a ref name refers to a local branch.
|
|
|
|
|
|
|
+def is_local_branch(x: bytes) -> bool:
|
|
|
|
|
+ """Check if a ref name is a local branch."""
|
|
|
|
|
+ return x.startswith(LOCAL_BRANCH_PREFIX)
|
|
|
|
|
|
|
|
- Args:
|
|
|
|
|
- x: Ref name to check
|
|
|
|
|
|
|
|
|
|
- Returns:
|
|
|
|
|
- True if ref is a local branch (refs/heads/...)
|
|
|
|
|
- """
|
|
|
|
|
- return x.startswith(LOCAL_BRANCH_PREFIX)
|
|
|
|
|
|
|
+T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]])
|
|
|
|
|
|
|
|
|
|
|
|
|
-def strip_peeled_refs(refs):
|
|
|
|
|
|
|
+def strip_peeled_refs(refs: T) -> T:
|
|
|
"""Remove all peeled refs."""
|
|
"""Remove all peeled refs."""
|
|
|
return {
|
|
return {
|
|
|
ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
|
|
ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
-def split_peeled_refs(refs):
|
|
|
|
|
|
|
+def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]:
|
|
|
"""Split peeled refs from regular refs."""
|
|
"""Split peeled refs from regular refs."""
|
|
|
- peeled = {}
|
|
|
|
|
- regular = {}
|
|
|
|
|
|
|
+ peeled: dict[bytes, bytes] = {}
|
|
|
|
|
+ regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
|
|
|
|
|
+
|
|
|
for ref, sha in refs.items():
|
|
for ref, sha in refs.items():
|
|
|
if ref.endswith(PEELED_TAG_SUFFIX):
|
|
if ref.endswith(PEELED_TAG_SUFFIX):
|
|
|
- peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
|
|
|
|
|
- else:
|
|
|
|
|
- regular[ref] = sha
|
|
|
|
|
|
|
+ # Only add to peeled dict if sha is not None
|
|
|
|
|
+ if sha is not None:
|
|
|
|
|
+ peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
|
|
|
|
|
+
|
|
|
return regular, peeled
|
|
return regular, peeled
|
|
|
|
|
|
|
|
|
|
|
|
|
-def _set_origin_head(refs, origin, origin_head) -> None:
|
|
|
|
|
|
|
+def _set_origin_head(
|
|
|
|
|
+ refs: RefsContainer, origin: bytes, origin_head: Optional[bytes]
|
|
|
|
|
+) -> None:
|
|
|
# set refs/remotes/origin/HEAD
|
|
# set refs/remotes/origin/HEAD
|
|
|
origin_base = b"refs/remotes/" + origin + b"/"
|
|
origin_base = b"refs/remotes/" + origin + b"/"
|
|
|
if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
|
|
if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
|
|
@@ -1511,7 +1510,9 @@ def _set_default_branch(
|
|
|
return head_ref
|
|
return head_ref
|
|
|
|
|
|
|
|
|
|
|
|
|
-def _set_head(refs, head_ref, ref_message):
|
|
|
|
|
|
|
+def _set_head(
|
|
|
|
|
+ refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes]
|
|
|
|
|
+) -> Optional[bytes]:
|
|
|
if head_ref.startswith(LOCAL_TAG_PREFIX):
|
|
if head_ref.startswith(LOCAL_TAG_PREFIX):
|
|
|
# detach HEAD at specified tag
|
|
# detach HEAD at specified tag
|
|
|
head = refs[head_ref]
|
|
head = refs[head_ref]
|
|
@@ -1534,7 +1535,7 @@ def _set_head(refs, head_ref, ref_message):
|
|
|
def _import_remote_refs(
|
|
def _import_remote_refs(
|
|
|
refs_container: RefsContainer,
|
|
refs_container: RefsContainer,
|
|
|
remote_name: str,
|
|
remote_name: str,
|
|
|
- refs: dict[str, str],
|
|
|
|
|
|
|
+ refs: dict[bytes, Optional[bytes]],
|
|
|
message: Optional[bytes] = None,
|
|
message: Optional[bytes] = None,
|
|
|
prune: bool = False,
|
|
prune: bool = False,
|
|
|
prune_tags: bool = False,
|
|
prune_tags: bool = False,
|
|
@@ -1543,7 +1544,7 @@ def _import_remote_refs(
|
|
|
branches = {
|
|
branches = {
|
|
|
n[len(LOCAL_BRANCH_PREFIX) :]: v
|
|
n[len(LOCAL_BRANCH_PREFIX) :]: v
|
|
|
for (n, v) in stripped_refs.items()
|
|
for (n, v) in stripped_refs.items()
|
|
|
- if n.startswith(LOCAL_BRANCH_PREFIX)
|
|
|
|
|
|
|
+ if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None
|
|
|
}
|
|
}
|
|
|
refs_container.import_refs(
|
|
refs_container.import_refs(
|
|
|
b"refs/remotes/" + remote_name.encode(),
|
|
b"refs/remotes/" + remote_name.encode(),
|
|
@@ -1554,14 +1555,18 @@ def _import_remote_refs(
|
|
|
tags = {
|
|
tags = {
|
|
|
n[len(LOCAL_TAG_PREFIX) :]: v
|
|
n[len(LOCAL_TAG_PREFIX) :]: v
|
|
|
for (n, v) in stripped_refs.items()
|
|
for (n, v) in stripped_refs.items()
|
|
|
- if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
|
|
|
|
|
|
|
+ if n.startswith(LOCAL_TAG_PREFIX)
|
|
|
|
|
+ and not n.endswith(PEELED_TAG_SUFFIX)
|
|
|
|
|
+ and v is not None
|
|
|
}
|
|
}
|
|
|
refs_container.import_refs(
|
|
refs_container.import_refs(
|
|
|
LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
|
|
LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def serialize_refs(store, refs):
|
|
|
|
|
|
|
+def serialize_refs(
|
|
|
|
|
+ store: ObjectContainer, refs: dict[bytes, bytes]
|
|
|
|
|
+) -> dict[bytes, bytes]:
|
|
|
"""Serialize refs with peeled refs.
|
|
"""Serialize refs with peeled refs.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -1658,6 +1663,7 @@ class locked_ref:
|
|
|
if not self._file:
|
|
if not self._file:
|
|
|
raise RuntimeError("locked_ref not in context")
|
|
raise RuntimeError("locked_ref not in context")
|
|
|
|
|
|
|
|
|
|
+ assert self._realname is not None
|
|
|
current_ref = self._refs_container.read_loose_ref(self._realname)
|
|
current_ref = self._refs_container.read_loose_ref(self._realname)
|
|
|
if current_ref is None:
|
|
if current_ref is None:
|
|
|
current_ref = self._refs_container.get_packed_refs().get(
|
|
current_ref = self._refs_container.get_packed_refs().get(
|
|
@@ -1724,3 +1730,14 @@ class locked_ref:
|
|
|
self._refs_container._remove_packed_ref(self._realname)
|
|
self._refs_container._remove_packed_ref(self._realname)
|
|
|
|
|
|
|
|
self._deleted = True
|
|
self._deleted = True
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
|
|
|
|
|
+ """Filter refs to only include those with a given prefix.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ refs: A dictionary of refs.
|
|
|
|
|
+ prefixes: The prefixes to filter by.
|
|
|
|
|
+ """
|
|
|
|
|
+ filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
|
|
|
|
|
+ return cast(T, filtered)
|