|
@@ -46,7 +46,7 @@ import select
|
|
|
import socket
|
|
|
import subprocess
|
|
|
import sys
|
|
|
-from typing import Any, Callable, Dict, List, Optional, Set, Tuple, IO
|
|
|
+from typing import Any, Callable, Dict, List, Optional, Set, Tuple, IO, Iterable
|
|
|
|
|
|
from urllib.parse import (
|
|
|
quote as urlquote,
|
|
@@ -59,7 +59,7 @@ from urllib.parse import (
|
|
|
|
|
|
|
|
|
import dulwich
|
|
|
-from dulwich.config import get_xdg_config_home_path
|
|
|
+from dulwich.config import get_xdg_config_home_path, Config
|
|
|
from dulwich.errors import (
|
|
|
GitProtocolError,
|
|
|
NotGitRepository,
|
|
@@ -2268,12 +2268,49 @@ def _win32_url_to_path(parsed) -> str:
|
|
|
return url2pathname(netloc + path) # type: ignore
|
|
|
|
|
|
|
|
|
-def get_transport_and_path_from_url(url, config=None, **kwargs):
|
|
|
+def iter_instead_of(config: Config, push: bool = False) -> Iterable[Tuple[str, str]]:
|
|
|
+ """Iterate over insteadOf / pushInsteadOf values.
|
|
|
+ """
|
|
|
+ for section in config.sections():
|
|
|
+ if section[0] != b'url':
|
|
|
+ continue
|
|
|
+ replacement = section[1]
|
|
|
+ try:
|
|
|
+ needles = list(config.get_multivar(section, "insteadOf"))
|
|
|
+ except KeyError:
|
|
|
+ needles = []
|
|
|
+ if push:
|
|
|
+ try:
|
|
|
+ needles += list(config.get_multivar(section, "pushInsteadOf"))
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ for needle in needles:
|
|
|
+ yield needle.decode('utf-8'), replacement.decode('utf-8')
|
|
|
+
|
|
|
+
|
|
|
+def apply_instead_of(config: Config, orig_url: str, push: bool = False) -> str:
|
|
|
+ """Apply insteadOf / pushInsteadOf to a URL.
|
|
|
+ """
|
|
|
+ longest_needle = ""
|
|
|
+ updated_url = orig_url
|
|
|
+ for needle, replacement in iter_instead_of(config, push):
|
|
|
+ if not orig_url.startswith(needle):
|
|
|
+ continue
|
|
|
+ if len(longest_needle) < len(needle):
|
|
|
+ longest_needle = needle
|
|
|
+ updated_url = replacement + orig_url[len(needle):]
|
|
|
+ return updated_url
|
|
|
+
|
|
|
+
|
|
|
+def get_transport_and_path_from_url(
|
|
|
+ url: str, config: Optional[Config] = None,
|
|
|
+ operation: Optional[str] = None, **kwargs) -> Tuple[GitClient, str]:
|
|
|
"""Obtain a git client from a URL.
|
|
|
|
|
|
Args:
|
|
|
url: URL to open (a unicode string)
|
|
|
config: Optional config object
|
|
|
+ operation: Kind of operation that'll be performed; "pull" or "push"
|
|
|
thin_packs: Whether or not thin packs should be retrieved
|
|
|
report_activity: Optional callback for reporting transport
|
|
|
activity.
|
|
@@ -2282,6 +2319,8 @@ def get_transport_and_path_from_url(url, config=None, **kwargs):
|
|
|
Tuple with client instance and relative path.
|
|
|
|
|
|
"""
|
|
|
+ if config is not None:
|
|
|
+ url = apply_instead_of(config, url, push=(operation == "push"))
|
|
|
parsed = urlparse(url)
|
|
|
if parsed.scheme == "git":
|
|
|
return (TCPGitClient.from_parsedurl(parsed, **kwargs), parsed.path)
|
|
@@ -2303,7 +2342,7 @@ def get_transport_and_path_from_url(url, config=None, **kwargs):
|
|
|
raise ValueError("unknown scheme '%s'" % parsed.scheme)
|
|
|
|
|
|
|
|
|
-def parse_rsync_url(location):
|
|
|
+def parse_rsync_url(location: str) -> Tuple[Optional[str], str, str]:
|
|
|
"""Parse a rsync-style URL."""
|
|
|
if ":" in location and "@" not in location:
|
|
|
# SSH with no user@, zero or one leading slash.
|
|
@@ -2324,6 +2363,7 @@ def parse_rsync_url(location):
|
|
|
|
|
|
def get_transport_and_path(
|
|
|
location: str,
|
|
|
+ operation: Optional[str] = None,
|
|
|
**kwargs: Any
|
|
|
) -> Tuple[GitClient, str]:
|
|
|
"""Obtain a git client from a URL.
|
|
@@ -2331,6 +2371,7 @@ def get_transport_and_path(
|
|
|
Args:
|
|
|
location: URL or path (a string)
|
|
|
config: Optional config object
|
|
|
+ operation: Kind of operation that'll be performed; "pull" or "push"
|
|
|
thin_packs: Whether or not thin packs should be retrieved
|
|
|
report_activity: Optional callback for reporting transport
|
|
|
activity.
|
|
@@ -2341,7 +2382,7 @@ def get_transport_and_path(
|
|
|
"""
|
|
|
# First, try to parse it as a URL
|
|
|
try:
|
|
|
- return get_transport_and_path_from_url(location, **kwargs)
|
|
|
+ return get_transport_and_path_from_url(location, operation=operation, **kwargs)
|
|
|
except ValueError:
|
|
|
pass
|
|
|
|