Переглянути джерело

move unified clone logic into clone.py

Peter Rowlands 3 роки тому
батько
коміт
1fc02c0f7d
3 змінених файлів з 187 додано та 129 видалено
  1. 179 0
      dulwich/clone.py
  2. 4 1
      dulwich/porcelain.py
  3. 4 128
      dulwich/repo.py

+ 179 - 0
dulwich/clone.py

@@ -0,0 +1,179 @@
+# clone.py
+# Copyright (C) 2021 Jelmer Vernooij <jelmer@samba.org>
+#
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Repository clone handling."""
+
+import os
+from typing import TYPE_CHECKING, Callable, Tuple
+
+from dulwich.objects import (
+    Tag,
+)
+from dulwich.refs import (
+    LOCAL_BRANCH_PREFIX,
+    LOCAL_TAG_PREFIX,
+)
+
+if TYPE_CHECKING:
+    from dulwich.repo import Repo
+
+
+def do_clone(
+    source_path,
+    target_path,
+    clone_refs: Callable[["Repo", bytes], Tuple[bytes, bytes]] = None,
+    mkdir=True,
+    bare=False,
+    origin=b"origin",
+    checkout=None,
+    errstream=None,
+    branch=None,
+):
+    """Clone a repository.
+
+    Args:
+      source_path: Source repository path
+      target_path: Target repository path
+      clone_refs: Callback to handle setting up cloned remote refs in
+        the target repo
+      mkdir: Create the target directory
+      bare: Whether to create a bare repository
+      checkout: Whether or not to check-out HEAD after cloning
+      origin: Base name for refs in target repository
+        cloned from this repository
+      branch: Optional branch or tag to be used as HEAD in the new repository
+        instead of the source repository's HEAD.
+    Returns: Created repository as `Repo`
+    """
+    from dulwich.repo import Repo
+
+    if not clone_refs:
+        raise ValueError("clone_refs callback is required")
+
+    if not bare:
+        target = Repo.init(target_path, mkdir=mkdir)
+        if checkout is None:
+            checkout = True
+    else:
+        if checkout:
+            raise ValueError("checkout and bare are incompatible")
+        target = Repo.init_bare(target_path, mkdir=mkdir)
+
+    try:
+        target_config = target.get_config()
+        target_config.set((b"remote", origin), b"url", source_path)
+        target_config.set(
+            (b"remote", origin),
+            b"fetch",
+            b"+refs/heads/*:refs/remotes/" + origin + b"/*",
+        )
+        target_config.write_to_path()
+
+        ref_message = b"clone: from " + source_path
+        origin_head, origin_sha = clone_refs(target, ref_message)
+        if origin_sha and not origin_head:
+            # set detached HEAD
+            target.refs[b"HEAD"] = origin_sha
+
+        _set_origin_head(target, origin, origin_head)
+        head_ref = _set_default_branch(
+            target, origin, origin_head, branch, ref_message
+        )
+
+        # Update target head
+        if head_ref:
+            head = _set_head(target, head_ref, ref_message)
+        else:
+            head = None
+
+        if checkout and head is not None:
+            if errstream:
+                errstream.write(b"Checking out " + head + b"\n")
+            target.reset_index()
+    except BaseException:
+        target.close()
+        raise
+
+    return target
+
+
+def _set_origin_head(r, origin, origin_head):
+    # set refs/remotes/origin/HEAD
+    origin_base = b"refs/remotes/" + origin + b"/"
+    if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
+        origin_ref = origin_base + b"HEAD"
+        target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
+        if target_ref in r.refs:
+            r.refs.set_symbolic_ref(origin_ref, target_ref)
+
+
+def _set_default_branch(r, origin, origin_head, branch, ref_message):
+    origin_base = b"refs/remotes/" + origin + b"/"
+    if branch:
+        origin_ref = origin_base + branch
+        if origin_ref in r.refs:
+            local_ref = LOCAL_BRANCH_PREFIX + branch
+            r.refs.add_if_new(
+                local_ref, r.refs[origin_ref], ref_message
+            )
+            head_ref = local_ref
+        elif LOCAL_TAG_PREFIX + branch in r.refs:
+            head_ref = LOCAL_TAG_PREFIX + branch
+        else:
+            raise ValueError(
+                "%s is not a valid branch or tag" % os.fsencode(branch)
+            )
+    elif origin_head:
+        head_ref = origin_head
+        if origin_head.startswith(LOCAL_BRANCH_PREFIX):
+            origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
+        else:
+            origin_ref = origin_head
+        try:
+            r.refs.add_if_new(
+                head_ref, r.refs[origin_ref], ref_message
+            )
+        except KeyError:
+            pass
+    return head_ref
+
+
+def _set_head(r, head_ref, ref_message):
+    if head_ref.startswith(LOCAL_TAG_PREFIX):
+        # detach HEAD at specified tag
+        head = r.refs[head_ref]
+        if isinstance(head, Tag):
+            _cls, obj = head.object
+            head = obj.get_object(obj).id
+        del r.refs[b"HEAD"]
+        r.refs.set_if_equals(
+            b"HEAD", None, head, message=ref_message
+        )
+    else:
+        # set HEAD to specific branch
+        try:
+            head = r.refs[head_ref]
+            r.refs.set_symbolic_ref(b"HEAD", head_ref)
+            r.refs.set_if_equals(
+                b"HEAD", None, head, message=ref_message
+            )
+        except KeyError:
+            head = None
+    return head

+ 4 - 1
dulwich/porcelain.py

@@ -87,6 +87,9 @@ from dulwich.archive import (
 from dulwich.client import (
     get_transport_and_path,
 )
+from dulwich.clone import (
+    do_clone,
+)
 from dulwich.config import (
     StackedConfig,
 )
@@ -463,7 +466,7 @@ def clone(
         return head_ref, head_sha
 
     try:
-        return Repo.do_clone(
+        return do_clone(
             source,
             target,
             clone_refs=clone_refs,

+ 4 - 128
dulwich/repo.py

@@ -42,6 +42,9 @@ if TYPE_CHECKING:
     from dulwich.config import StackedConfig, ConfigFile
     from dulwich.index import Index
 
+from dulwich.clone import (
+    do_clone,
+)
 from dulwich.errors import (
     NoIndexPresent,
     NotBlobError,
@@ -1407,7 +1410,7 @@ class Repo(BaseRepo):
         if not isinstance(encoded_path, bytes):
             encoded_path = os.fsencode(encoded_path)
 
-        return self.do_clone(
+        return do_clone(
             encoded_path,
             target_path,
             clone_refs=clone_refs,
@@ -1418,133 +1421,6 @@ class Repo(BaseRepo):
             branch=branch,
         )
 
-    @classmethod
-    def do_clone(
-        cls,
-        source_path,
-        target_path,
-        clone_refs=None,
-        mkdir=True,
-        bare=False,
-        origin=b"origin",
-        checkout=None,
-        errstream=None,
-        branch=None,
-    ):
-        if not clone_refs:
-            raise ValueError("clone_refs callback is required")
-
-        if not bare:
-            target = cls.init(target_path, mkdir=mkdir)
-            if checkout is None:
-                checkout = True
-        else:
-            if checkout:
-                raise ValueError("checkout and bare are incompatible")
-            target = cls.init_bare(target_path, mkdir=mkdir)
-
-        try:
-            target_config = target.get_config()
-            target_config.set((b"remote", origin), b"url", source_path)
-            target_config.set(
-                (b"remote", origin),
-                b"fetch",
-                b"+refs/heads/*:refs/remotes/" + origin + b"/*",
-            )
-            target_config.write_to_path()
-
-            ref_message = b"clone: from " + source_path
-            origin_head, origin_sha = clone_refs(target, ref_message)
-            if origin_sha and not origin_head:
-                # set detached HEAD
-                target.refs[b"HEAD"] = origin_sha
-
-            cls._clone_set_origin_head(target, origin, origin_head)
-            head_ref = cls._clone_set_default_branch(
-                target, origin, origin_head, branch, ref_message
-            )
-
-            # Update target head
-            if head_ref:
-                head = cls._clone_set_head(target, head_ref, ref_message)
-            else:
-                head = None
-
-            if checkout and head is not None:
-                if errstream:
-                    errstream.write(b"Checking out " + head + b"\n")
-                target.reset_index()
-        except BaseException:
-            target.close()
-            raise
-
-        return target
-
-    @staticmethod
-    def _clone_set_origin_head(r, origin, origin_head):
-        # set refs/remotes/origin/HEAD
-        origin_base = b"refs/remotes/" + origin + b"/"
-        if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
-            origin_ref = origin_base + b"HEAD"
-            target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
-            if target_ref in r.refs:
-                r.refs.set_symbolic_ref(origin_ref, target_ref)
-
-    @staticmethod
-    def _clone_set_default_branch(r, origin, origin_head, branch, ref_message):
-        origin_base = b"refs/remotes/" + origin + b"/"
-        if branch:
-            origin_ref = origin_base + branch
-            if origin_ref in r.refs:
-                local_ref = LOCAL_BRANCH_PREFIX + branch
-                r.refs.add_if_new(
-                    local_ref, r.refs[origin_ref], ref_message
-                )
-                head_ref = local_ref
-            elif LOCAL_TAG_PREFIX + branch in r.refs:
-                head_ref = LOCAL_TAG_PREFIX + branch
-            else:
-                raise ValueError(
-                    "%s is not a valid branch or tag" % os.fsencode(branch)
-                )
-        elif origin_head:
-            head_ref = origin_head
-            if origin_head.startswith(LOCAL_BRANCH_PREFIX):
-                origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
-            else:
-                origin_ref = origin_head
-            try:
-                r.refs.add_if_new(
-                    head_ref, r.refs[origin_ref], ref_message
-                )
-            except KeyError:
-                pass
-        return head_ref
-
-    @staticmethod
-    def _clone_set_head(r, head_ref, ref_message):
-        if head_ref.startswith(LOCAL_TAG_PREFIX):
-            # detach HEAD at specified tag
-            head = r.refs[head_ref]
-            if isinstance(head, Tag):
-                _cls, obj = head.object
-                head = obj.get_object(obj).id
-            del r.refs[b"HEAD"]
-            r.refs.set_if_equals(
-                b"HEAD", None, head, message=ref_message
-            )
-        else:
-            # set HEAD to specific branch
-            try:
-                head = r.refs[head_ref]
-                r.refs.set_symbolic_ref(b"HEAD", head_ref)
-                r.refs.set_if_equals(
-                    b"HEAD", None, head, message=ref_message
-                )
-            except KeyError:
-                head = None
-        return head
-
     def reset_index(self, tree=None):
         """Reset the index back to a specific tree.