|
|
@@ -24,7 +24,6 @@
|
|
|
|
|
|
import os
|
|
|
import types
|
|
|
-import warnings
|
|
|
from collections.abc import Callable, Iterable, Iterator, Mapping
|
|
|
from contextlib import suppress
|
|
|
from typing import (
|
|
|
@@ -42,7 +41,6 @@ if TYPE_CHECKING:
|
|
|
from .errors import PackedRefsException, RefFormatError
|
|
|
from .file import GitFile, ensure_dir_exists
|
|
|
from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
|
|
|
-from .pack import ObjectContainer
|
|
|
|
|
|
Ref = NewType("Ref", bytes)
|
|
|
|
|
|
@@ -774,40 +772,6 @@ class DictRefsContainer(RefsContainer):
|
|
|
self._peeled.update(peeled)
|
|
|
|
|
|
|
|
|
-class InfoRefsContainer(RefsContainer):
|
|
|
- """Refs container that reads refs from a info/refs file."""
|
|
|
-
|
|
|
- def __init__(self, f: BinaryIO) -> None:
|
|
|
- """Initialize InfoRefsContainer from info/refs file."""
|
|
|
- self._refs: dict[Ref, ObjectID] = {}
|
|
|
- self._peeled: dict[Ref, ObjectID] = {}
|
|
|
- refs = read_info_refs(f)
|
|
|
- (self._refs, self._peeled) = split_peeled_refs(refs)
|
|
|
-
|
|
|
- def allkeys(self) -> set[Ref]:
|
|
|
- """Return all reference keys."""
|
|
|
- return set(self._refs.keys())
|
|
|
-
|
|
|
- def read_loose_ref(self, name: Ref) -> bytes | None:
|
|
|
- """Read a loose reference."""
|
|
|
- return self._refs.get(name, None)
|
|
|
-
|
|
|
- def get_packed_refs(self) -> dict[Ref, ObjectID]:
|
|
|
- """Get packed references."""
|
|
|
- return {}
|
|
|
-
|
|
|
- def get_peeled(self, name: Ref) -> ObjectID | None:
|
|
|
- """Get peeled version of a reference."""
|
|
|
- try:
|
|
|
- return self._peeled[name]
|
|
|
- except KeyError:
|
|
|
- ref_value = self._refs.get(name)
|
|
|
- # Only return if it's an ObjectID (not a symref)
|
|
|
- if isinstance(ref_value, bytes) and len(ref_value) == 40:
|
|
|
- return ObjectID(ref_value)
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
class DiskRefsContainer(RefsContainer):
|
|
|
"""Refs container that reads refs from disk."""
|
|
|
|
|
|
@@ -1444,31 +1408,6 @@ def read_info_refs(f: BinaryIO) -> dict[Ref, ObjectID]:
|
|
|
return ret
|
|
|
|
|
|
|
|
|
-def write_info_refs(
|
|
|
- refs: Mapping[Ref, ObjectID], store: ObjectContainer
|
|
|
-) -> Iterator[bytes]:
|
|
|
- """Generate info refs."""
|
|
|
- # TODO: Avoid recursive import :(
|
|
|
- from .object_store import peel_sha
|
|
|
-
|
|
|
- # TODO: Move this function to dulwich.protocol
|
|
|
- from .protocol import PEELED_TAG_SUFFIX
|
|
|
-
|
|
|
- for name, sha in sorted(refs.items()):
|
|
|
- # get_refs() includes HEAD as a special case, but we don't want to
|
|
|
- # advertise it
|
|
|
- if name == HEADREF:
|
|
|
- continue
|
|
|
- try:
|
|
|
- o = store[sha]
|
|
|
- except KeyError:
|
|
|
- continue
|
|
|
- _unpeeled, peeled = peel_sha(store, sha)
|
|
|
- yield o.id + b"\t" + name + b"\n"
|
|
|
- if o.id != peeled.id:
|
|
|
- yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
|
|
|
-
|
|
|
-
|
|
|
def is_local_branch(x: bytes) -> bool:
|
|
|
"""Check if a ref name is a local branch."""
|
|
|
return x.startswith(LOCAL_BRANCH_PREFIX)
|
|
|
@@ -1606,36 +1545,6 @@ def shorten_ref_name(ref: bytes) -> bytes:
|
|
|
return ref
|
|
|
|
|
|
|
|
|
-def strip_peeled_refs(
|
|
|
- refs: Mapping[Ref, ObjectID | None],
|
|
|
-) -> dict[Ref, ObjectID | None]:
|
|
|
- """Remove all peeled refs."""
|
|
|
- # TODO: Move this function to dulwich.protocol
|
|
|
- from .protocol import PEELED_TAG_SUFFIX
|
|
|
-
|
|
|
- return {
|
|
|
- ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-def split_peeled_refs(
|
|
|
- refs: Mapping[Ref, ObjectID],
|
|
|
-) -> tuple[dict[Ref, ObjectID], dict[Ref, ObjectID]]:
|
|
|
- """Split peeled refs from regular refs."""
|
|
|
- # TODO: Move this function to dulwich.protocol
|
|
|
- from .protocol import PEELED_TAG_SUFFIX
|
|
|
-
|
|
|
- peeled: dict[Ref, ObjectID] = {}
|
|
|
- regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
|
|
|
-
|
|
|
- for ref, sha in refs.items():
|
|
|
- if ref.endswith(PEELED_TAG_SUFFIX):
|
|
|
- # Peeled refs are always ObjectID values
|
|
|
- peeled[Ref(ref[: -len(PEELED_TAG_SUFFIX)])] = sha
|
|
|
-
|
|
|
- return regular, peeled
|
|
|
-
|
|
|
-
|
|
|
def _set_origin_head(
|
|
|
refs: RefsContainer, origin: bytes, origin_head: bytes | None
|
|
|
) -> None:
|
|
|
@@ -1712,8 +1621,7 @@ def _import_remote_refs(
|
|
|
prune: bool = False,
|
|
|
prune_tags: bool = False,
|
|
|
) -> None:
|
|
|
- # TODO: Move this function to dulwich.protocol
|
|
|
- from .protocol import PEELED_TAG_SUFFIX
|
|
|
+ from .protocol import PEELED_TAG_SUFFIX, strip_peeled_refs
|
|
|
|
|
|
stripped_refs = strip_peeled_refs(refs)
|
|
|
branches: dict[Ref, ObjectID | None] = {
|
|
|
@@ -1737,43 +1645,6 @@ def _import_remote_refs(
|
|
|
)
|
|
|
|
|
|
|
|
|
-def serialize_refs(
|
|
|
- store: ObjectContainer, refs: Mapping[Ref, ObjectID]
|
|
|
-) -> dict[bytes, ObjectID]:
|
|
|
- """Serialize refs with peeled refs.
|
|
|
-
|
|
|
- Args:
|
|
|
- store: Object store to peel refs from
|
|
|
- refs: Dictionary of ref names to SHAs
|
|
|
-
|
|
|
- Returns:
|
|
|
- Dictionary with refs and peeled refs (marked with ^{})
|
|
|
- """
|
|
|
- # TODO: Avoid recursive import :(
|
|
|
- from .object_store import peel_sha
|
|
|
-
|
|
|
- # TODO: Move this function to dulwich.protocol
|
|
|
- from .protocol import PEELED_TAG_SUFFIX
|
|
|
-
|
|
|
- ret: dict[bytes, ObjectID] = {}
|
|
|
- for ref, sha in refs.items():
|
|
|
- try:
|
|
|
- unpeeled, peeled = peel_sha(store, ObjectID(sha))
|
|
|
- except KeyError:
|
|
|
- warnings.warn(
|
|
|
- "ref {} points at non-present sha {}".format(
|
|
|
- ref.decode("utf-8", "replace"), sha.decode("ascii")
|
|
|
- ),
|
|
|
- UserWarning,
|
|
|
- )
|
|
|
- continue
|
|
|
- else:
|
|
|
- if isinstance(unpeeled, Tag):
|
|
|
- ret[ref + PEELED_TAG_SUFFIX] = peeled.id
|
|
|
- ret[ref] = unpeeled.id
|
|
|
- return ret
|
|
|
-
|
|
|
-
|
|
|
class locked_ref:
|
|
|
"""Lock a ref while making modifications.
|
|
|
|