|
@@ -23,6 +23,7 @@
|
|
|
|
|
|
"""
|
|
|
import os
|
|
|
+from typing import Dict, Optional
|
|
|
|
|
|
from dulwich.errors import (
|
|
|
PackedRefsException,
|
|
@@ -40,6 +41,7 @@ from dulwich.file import (
|
|
|
)
|
|
|
|
|
|
|
|
|
+HEADREF = b"HEAD"
|
|
|
SYMREF = b"ref: "
|
|
|
LOCAL_BRANCH_PREFIX = b"refs/heads/"
|
|
|
LOCAL_TAG_PREFIX = b"refs/tags/"
|
|
@@ -247,7 +249,7 @@ class RefsContainer(object):
|
|
|
Raises:
|
|
|
KeyError: if a refname is not HEAD or is otherwise not valid.
|
|
|
"""
|
|
|
- if name in (b"HEAD", b"refs/stash"):
|
|
|
+ if name in (HEADREF, b"refs/stash"):
|
|
|
return
|
|
|
if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
|
|
|
raise RefFormatError(name)
|
|
@@ -724,8 +726,8 @@ class DiskRefsContainer(RefsContainer):
|
|
|
|
|
|
def allkeys(self):
|
|
|
allkeys = set()
|
|
|
- if os.path.exists(self.refpath(b"HEAD")):
|
|
|
- allkeys.add(b"HEAD")
|
|
|
+ if os.path.exists(self.refpath(HEADREF)):
|
|
|
+ allkeys.add(HEADREF)
|
|
|
path = self.refpath(b"")
|
|
|
refspath = self.refpath(b"refs")
|
|
|
for root, unused_dirs, files in os.walk(refspath):
|
|
@@ -745,7 +747,7 @@ class DiskRefsContainer(RefsContainer):
|
|
|
name = name.replace(b"/", os.fsencode(os.path.sep))
|
|
|
# TODO: as the 'HEAD' reference is working tree specific, it
|
|
|
# should actually not be a part of RefsContainer
|
|
|
- if name == b"HEAD":
|
|
|
+ if name == HEADREF:
|
|
|
return os.path.join(self.worktree_path, name)
|
|
|
else:
|
|
|
return os.path.join(self.path, name)
|
|
@@ -1181,7 +1183,7 @@ def write_info_refs(refs, store):
|
|
|
for name, sha in sorted(refs.items()):
|
|
|
# get_refs() includes HEAD as a special case, but we don't want to
|
|
|
# advertise it
|
|
|
- if name == b"HEAD":
|
|
|
+ if name == HEADREF:
|
|
|
continue
|
|
|
try:
|
|
|
o = store[sha]
|
|
@@ -1210,7 +1212,7 @@ def _set_origin_head(refs, origin, origin_head):
|
|
|
# set refs/remotes/origin/HEAD
|
|
|
origin_base = b"refs/remotes/" + origin + b"/"
|
|
|
if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
|
|
|
- origin_ref = origin_base + b"HEAD"
|
|
|
+ origin_ref = origin_base + HEADREF
|
|
|
target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
|
|
|
if target_ref in refs:
|
|
|
refs.set_symbolic_ref(origin_ref, target_ref)
|
|
@@ -1254,18 +1256,44 @@ def _set_head(refs, head_ref, ref_message):
|
|
|
if isinstance(head, Tag):
|
|
|
_cls, obj = head.object
|
|
|
head = obj.get_object(obj).id
|
|
|
- del refs[b"HEAD"]
|
|
|
+ del refs[HEADREF]
|
|
|
refs.set_if_equals(
|
|
|
- b"HEAD", None, head, message=ref_message
|
|
|
+ HEADREF, None, head, message=ref_message
|
|
|
)
|
|
|
else:
|
|
|
# set HEAD to specific branch
|
|
|
try:
|
|
|
head = refs[head_ref]
|
|
|
- refs.set_symbolic_ref(b"HEAD", head_ref)
|
|
|
- refs.set_if_equals(
|
|
|
- b"HEAD", None, head, message=ref_message
|
|
|
- )
|
|
|
+ refs.set_symbolic_ref(HEADREF, head_ref)
|
|
|
+ refs.set_if_equals(HEADREF, None, head, message=ref_message)
|
|
|
except KeyError:
|
|
|
head = None
|
|
|
return head
|
|
|
+
|
|
|
+
|
|
|
+def _import_remote_refs(
|
|
|
+ refs_container: RefsContainer,
|
|
|
+ remote_name: str,
|
|
|
+ refs: Dict[str, str],
|
|
|
+ message: Optional[bytes] = None,
|
|
|
+ prune: bool = False,
|
|
|
+ prune_tags: bool = False,
|
|
|
+):
|
|
|
+ stripped_refs = strip_peeled_refs(refs)
|
|
|
+ branches = {
|
|
|
+ n[len(LOCAL_BRANCH_PREFIX) :]: v
|
|
|
+ for (n, v) in stripped_refs.items()
|
|
|
+ if n.startswith(LOCAL_BRANCH_PREFIX)
|
|
|
+ }
|
|
|
+ refs_container.import_refs(
|
|
|
+ b"refs/remotes/" + remote_name.encode(),
|
|
|
+ branches,
|
|
|
+ message=message,
|
|
|
+ prune=prune,
|
|
|
+ )
|
|
|
+ tags = {
|
|
|
+ n[len(LOCAL_TAG_PREFIX) :]: v
|
|
|
+ for (n, v) in stripped_refs.items()
|
|
|
+ if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(ANNOTATED_TAG_SUFFIX)
|
|
|
+ }
|
|
|
+ refs_container.import_refs(LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags)
|