|
|
@@ -108,6 +108,7 @@ if TYPE_CHECKING:
|
|
|
|
|
|
from .bundle import Bundle
|
|
|
from .config import Config, apply_instead_of, get_xdg_config_home_path
|
|
|
+from .credentials import match_partial_url, match_urls
|
|
|
from .errors import GitProtocolError, HangupException, NotGitRepository, SendPackError
|
|
|
from .object_store import GraphWalker
|
|
|
from .pack import (
|
|
|
@@ -3149,6 +3150,58 @@ def default_user_agent_string() -> str:
|
|
|
return "git/dulwich/{}".format(".".join([str(x) for x in dulwich.__version__]))
|
|
|
|
|
|
|
|
|
+def _urlmatch_http_sections(
|
|
|
+ config: Config, url: Optional[str]
|
|
|
+) -> Iterator[tuple[bytes, ...]]:
|
|
|
+ """Yield http config sections matching the given URL, ordered by specificity.
|
|
|
+
|
|
|
+ Yields sections from least specific to most specific, so callers can
|
|
|
+ apply settings in order with more specific settings overriding less specific ones.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ config: Git configuration object
|
|
|
+ url: URL to match against config sections (if None, only yields global http section)
|
|
|
+
|
|
|
+ Yields:
|
|
|
+ Config section tuples that match the URL, ordered by specificity
|
|
|
+ """
|
|
|
+ encoding = getattr(config, "encoding", None) or sys.getdefaultencoding()
|
|
|
+ parsed_url = urlparse(url) if url else None
|
|
|
+
|
|
|
+ # Collect all matching sections with their specificity
|
|
|
+ # (specificity is based on URL path length - longer = more specific)
|
|
|
+ matching_sections: list[tuple[int, tuple[bytes, ...]]] = []
|
|
|
+
|
|
|
+ for config_section in config.sections():
|
|
|
+ if config_section[0] != b"http":
|
|
|
+ continue
|
|
|
+
|
|
|
+ if len(config_section) < 2:
|
|
|
+ # Global http section (least specific)
|
|
|
+ matching_sections.append((0, config_section))
|
|
|
+ elif parsed_url is not None:
|
|
|
+ # URL-specific http section - only match if we have a URL
|
|
|
+ config_url = config_section[1].decode(encoding)
|
|
|
+ parsed_config_url = urlparse(config_url)
|
|
|
+
|
|
|
+ is_match = False
|
|
|
+ if parsed_config_url.scheme and parsed_config_url.netloc:
|
|
|
+ is_match = match_urls(parsed_url, parsed_config_url)
|
|
|
+ else:
|
|
|
+ is_match = match_partial_url(parsed_url, config_url)
|
|
|
+
|
|
|
+ if is_match:
|
|
|
+ # Calculate specificity based on URL path length
|
|
|
+ specificity = len(parsed_config_url.path.rstrip("/"))
|
|
|
+ matching_sections.append((specificity, config_section))
|
|
|
+
|
|
|
+ # Sort by specificity (least specific first)
|
|
|
+ matching_sections.sort(key=lambda x: x[0])
|
|
|
+
|
|
|
+ for _, section in matching_sections:
|
|
|
+ yield section
|
|
|
+
|
|
|
+
|
|
|
def default_urllib3_manager(
|
|
|
config: Optional[Config],
|
|
|
pool_manager_cls: Optional[type] = None,
|
|
|
@@ -3191,59 +3244,87 @@ def default_urllib3_manager(
|
|
|
proxy_server = None
|
|
|
|
|
|
if config is not None:
|
|
|
- if proxy_server is None:
|
|
|
+ # Iterate through all matching http sections from least to most specific
|
|
|
+ # More specific settings will override less specific ones
|
|
|
+ for section in _urlmatch_http_sections(config, base_url):
|
|
|
+ if proxy_server is None:
|
|
|
+ try:
|
|
|
+ proxy_server_bytes = config.get(section, b"proxy")
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ if proxy_server_bytes is not None:
|
|
|
+ proxy_server = proxy_server_bytes.decode("utf-8")
|
|
|
+
|
|
|
try:
|
|
|
- proxy_server_bytes = config.get(b"http", b"proxy")
|
|
|
- if proxy_server_bytes is not None:
|
|
|
- proxy_server = proxy_server_bytes.decode("utf-8")
|
|
|
+ user_agent_bytes = config.get(section, b"useragent")
|
|
|
except KeyError:
|
|
|
pass
|
|
|
- try:
|
|
|
- user_agent_bytes = config.get(b"http", b"useragent")
|
|
|
- if user_agent_bytes is not None:
|
|
|
- user_agent = user_agent_bytes.decode("utf-8")
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
+ else:
|
|
|
+ if user_agent_bytes is not None:
|
|
|
+ user_agent = user_agent_bytes.decode("utf-8")
|
|
|
|
|
|
- # TODO(jelmer): Support per-host settings
|
|
|
- try:
|
|
|
- ssl_verify = config.get_boolean(b"http", b"sslVerify")
|
|
|
- except KeyError:
|
|
|
- ssl_verify = True
|
|
|
+ try:
|
|
|
+ ssl_verify_value = config.get_boolean(section, b"sslVerify")
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ if ssl_verify_value is not None:
|
|
|
+ ssl_verify = ssl_verify_value
|
|
|
|
|
|
- try:
|
|
|
- ca_certs_bytes = config.get(b"http", b"sslCAInfo")
|
|
|
- if ca_certs_bytes is not None:
|
|
|
- ca_certs = ca_certs_bytes.decode("utf-8")
|
|
|
- except KeyError:
|
|
|
- ca_certs = None
|
|
|
-
|
|
|
- # Check for timeout configuration
|
|
|
- if timeout is None:
|
|
|
try:
|
|
|
- timeout_bytes = config.get(b"http", b"timeout")
|
|
|
- if timeout_bytes is not None:
|
|
|
- timeout = float(timeout_bytes.decode("utf-8"))
|
|
|
+ ca_certs_bytes = config.get(section, b"sslCAInfo")
|
|
|
except KeyError:
|
|
|
pass
|
|
|
+ else:
|
|
|
+ if ca_certs_bytes is not None:
|
|
|
+ ca_certs = ca_certs_bytes.decode("utf-8")
|
|
|
+
|
|
|
+ if timeout is None:
|
|
|
+ try:
|
|
|
+ timeout_bytes = config.get(section, b"timeout")
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ if timeout_bytes is not None:
|
|
|
+ timeout = float(timeout_bytes.decode("utf-8"))
|
|
|
+
|
|
|
+ # Default ssl_verify to True if not set
|
|
|
+ if ssl_verify is None:
|
|
|
+ ssl_verify = True
|
|
|
|
|
|
if user_agent is None:
|
|
|
user_agent = default_user_agent_string()
|
|
|
|
|
|
headers = {"User-agent": user_agent}
|
|
|
|
|
|
- # Check for extra headers in config
|
|
|
+ # Check for extra headers in config with URL matching
|
|
|
if config is not None:
|
|
|
- try:
|
|
|
- # Git allows multiple http.extraHeader entries
|
|
|
- extra_headers = config.get_multivar(b"http", b"extraHeader")
|
|
|
+ # Apply extra headers from least specific to most specific
|
|
|
+ for section in _urlmatch_http_sections(config, base_url):
|
|
|
+ try:
|
|
|
+ extra_headers = config.get_multivar(section, b"extraHeader")
|
|
|
+ except KeyError:
|
|
|
+ continue
|
|
|
+
|
|
|
for extra_header in extra_headers:
|
|
|
- if extra_header and b": " in extra_header:
|
|
|
- # Parse the header (format: "Header-Name: value")
|
|
|
- header_name, header_value = extra_header.split(b": ", 1)
|
|
|
+ if not extra_header:
|
|
|
+ logger.warning("Ignoring empty http.extraHeader value")
|
|
|
+ continue
|
|
|
+ if b": " not in extra_header:
|
|
|
+ logger.warning(
|
|
|
+ "Ignoring invalid http.extraHeader value %r (missing ': ' separator)",
|
|
|
+ extra_header,
|
|
|
+ )
|
|
|
+ continue
|
|
|
+ # Parse the header (format: "Header-Name: value")
|
|
|
+ header_name, header_value = extra_header.split(b": ", 1)
|
|
|
+ try:
|
|
|
headers[header_name.decode("utf-8")] = header_value.decode("utf-8")
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
+ except UnicodeDecodeError as e:
|
|
|
+ logger.warning(
|
|
|
+ "Ignoring http.extraHeader with invalid UTF-8: %s", e
|
|
|
+ )
|
|
|
|
|
|
kwargs: dict[str, Union[str, float, None]] = {
|
|
|
"ca_certs": ca_certs,
|