فهرست منبع

Add callback-based authentication support

Implement callback-based authentication for HTTP and proxy authentication
in Urllib3HttpGitClient. This provides a cleaner alternative to exception
interception for handling authentication challenges.

The implementation wraps the urllib3 pool manager with AuthCallbackPoolManager
which intercepts 401/407 responses and invokes the appropriate callbacks.

Fixes #822
Jelmer Vernooij 6 ماه پیش
والد
کامیت
2c4ce89b2e
5فایلهای تغییر یافته به همراه465 افزوده شده و 32 حذف شده
  1. 7 0
      NEWS
  2. 171 23
      dulwich/client.py
  3. 7 2
      dulwich/lfs.py
  4. 81 0
      examples/auth_callback.py
  5. 199 7
      tests/test_client.py

+ 7 - 0
NEWS

@@ -9,6 +9,13 @@
  * Add basic ``dulwich.aiohttp`` module that provides
    server support. (Jelmer Vernooij)
 
+ * Add callback-based authentication support for HTTP and proxy authentication
+   in ``Urllib3HttpGitClient``. This allows applications to handle
+   authentication dynamically without intercepting exceptions. Callbacks
+   receive the authentication scheme information (via WWW-Authenticate or
+   Proxy-Authenticate headers) and can provide credentials or cancel.
+   (Jelmer Vernooij, #822)
+
 0.25.0	2025-12-17
 
 **PLEASE NOTE**: This release makes quite a lot of changes to public APIs. This

+ 171 - 23
dulwich/client.py

@@ -108,6 +108,7 @@ if TYPE_CHECKING:
 import dulwich
 
 if TYPE_CHECKING:
+    from collections.abc import Mapping
     from typing import Protocol as TypingProtocol
 
     from .objects import ObjectID
@@ -115,14 +116,18 @@ if TYPE_CHECKING:
     from .refs import Ref
 
     class HTTPResponse(TypingProtocol):
-        """Protocol for HTTP response objects."""
+        """Protocol for HTTP response objects (matches urllib3.response.HTTPResponse)."""
 
-        redirect_location: str | None
+        status: int
+        headers: Mapping[str, str]
         content_type: str | None
+        redirect_location: str
 
-        def close(self) -> None:
-            """Close the response."""
-            ...
+        def close(self) -> None: ...
+
+        def read(self, amt: int | None = None) -> bytes: ...
+
+        def geturl(self) -> str | None: ...
 
     class GeneratePackDataFunc(TypingProtocol):
         """Protocol for generate_pack_data functions."""
@@ -3466,6 +3471,88 @@ def _urlmatch_http_sections(
         yield section
 
 
+class AuthCallbackPoolManager:
+    """Pool manager wrapper that handles authentication callbacks."""
+
+    def __init__(
+        self,
+        pool_manager: "urllib3.PoolManager | urllib3.ProxyManager",
+        auth_callback: Callable[[str, str, int], dict[str, str] | None] | None = None,
+        proxy_auth_callback: Callable[[str, str, int], dict[str, str] | None]
+        | None = None,
+    ) -> None:
+        self._pool_manager = pool_manager
+        self._auth_callback = auth_callback
+        self._proxy_auth_callback = proxy_auth_callback
+        self._auth_attempts: dict[str, int] = {}
+
+    def __getattr__(self, name: str):  # type: ignore[no-untyped-def]
+        # Delegate all other attributes to the wrapped pool manager
+        return getattr(self._pool_manager, name)
+
+    def request(self, method: str, url: str, *args, **kwargs):  # type: ignore[no-untyped-def]
+        """Make HTTP request with authentication callback support."""
+        max_attempts = 3
+        attempts = self._auth_attempts.get(url, 0)
+
+        while attempts < max_attempts:
+            response = self._pool_manager.request(method, url, *args, **kwargs)
+
+            if response.status == 401 and self._auth_callback:
+                # HTTP authentication required
+                www_authenticate = response.headers.get("WWW-Authenticate", "")
+                attempts += 1
+                self._auth_attempts[url] = attempts
+
+                # Call the authentication callback
+                credentials = self._auth_callback(url, www_authenticate, attempts)
+                if credentials:
+                    # Update request with new credentials
+                    import urllib3.util
+
+                    auth_header = urllib3.util.make_headers(
+                        basic_auth=f"{credentials['username']}:{credentials.get('password', '')}"
+                    )
+                    if "headers" in kwargs:
+                        kwargs["headers"].update(auth_header)
+                    else:
+                        kwargs["headers"] = auth_header
+                    # Retry the request
+                    continue
+
+            elif response.status == 407 and self._proxy_auth_callback:
+                # Proxy authentication required
+                proxy_authenticate = response.headers.get("Proxy-Authenticate", "")
+                attempts += 1
+                self._auth_attempts[url] = attempts
+
+                # Call the proxy authentication callback
+                credentials = self._proxy_auth_callback(
+                    url, proxy_authenticate, attempts
+                )
+                if credentials:
+                    # Update request with new proxy credentials
+                    import urllib3.util
+
+                    proxy_auth_header = urllib3.util.make_headers(
+                        proxy_basic_auth=f"{credentials['username']}:{credentials.get('password', '')}"
+                    )
+                    if "headers" in kwargs:
+                        kwargs["headers"].update(proxy_auth_header)
+                    else:
+                        kwargs["headers"] = proxy_auth_header
+                    # Retry the request
+                    continue
+
+            # Clear attempts on success or non-auth failure
+            if url in self._auth_attempts:
+                del self._auth_attempts[url]
+            return response
+
+        # Max attempts reached
+        return response
+
+
 def default_urllib3_manager(
     config: Config | None,
     pool_manager_cls: type | None = None,
@@ -3473,7 +3560,9 @@ def default_urllib3_manager(
     base_url: str | None = None,
     timeout: float | None = None,
     cert_reqs: str | None = None,
-) -> "urllib3.ProxyManager | urllib3.PoolManager":
+    auth_callback: Callable[[str, str, int], dict[str, str] | None] | None = None,
+    proxy_auth_callback: Callable[[str, str, int], dict[str, str] | None] | None = None,
+) -> "urllib3.ProxyManager | urllib3.PoolManager | AuthCallbackPoolManager":
     """Return urllib3 connection pool manager.
 
     Honour detected proxy configurations.
@@ -3485,11 +3574,14 @@ def default_urllib3_manager(
       base_url: Base URL for proxy bypass checks
       timeout: Timeout for HTTP requests in seconds
       cert_reqs: SSL certificate requirements (e.g. "CERT_REQUIRED", "CERT_NONE")
+      auth_callback: Optional callback for HTTP authentication
+      proxy_auth_callback: Optional callback for proxy authentication
 
     Returns:
       Either pool_manager_cls (defaults to ``urllib3.ProxyManager``) instance for
       proxy configurations, proxy_manager_cls
-      (defaults to ``urllib3.PoolManager``) instance otherwise
+      (defaults to ``urllib3.PoolManager``) instance otherwise. If auth callbacks
+      are provided, returns an AuthCallbackPoolManager wrapper.
 
     """
     proxy_server: str | None = None
@@ -3611,28 +3703,61 @@ def default_urllib3_manager(
 
     import urllib3
 
-    manager: urllib3.ProxyManager | urllib3.PoolManager
+    # Check for proxy authentication method configuration
+    proxy_auth_method: str | None = None
+    if config is not None:
+        try:
+            proxy_auth_method_bytes = config.get(b"http", b"proxyAuthMethod")
+            if proxy_auth_method_bytes and isinstance(proxy_auth_method_bytes, bytes):
+                proxy_auth_method = proxy_auth_method_bytes.decode().lower()
+        except KeyError:
+            pass
+
+    # Check environment variable override
+    env_proxy_auth = os.environ.get("GIT_HTTP_PROXY_AUTHMETHOD")
+    if env_proxy_auth:
+        proxy_auth_method = env_proxy_auth.lower()
+
+    base_manager: urllib3.ProxyManager | urllib3.PoolManager
     if proxy_server is not None:
         if proxy_manager_cls is None:
             proxy_manager_cls = urllib3.ProxyManager
         if not isinstance(proxy_server, str):
             proxy_server = proxy_server.decode()
         proxy_server_url = urlparse(proxy_server)
+
+        # Validate proxy auth method if specified
+        if proxy_auth_method and proxy_auth_method not in ("anyauth", "basic"):
+            # Only basic and anyauth are currently supported
+            # Other methods like digest, negotiate, ntlm would require additional libraries
+            raise NotImplementedError(
+                f"Proxy authentication method '{proxy_auth_method}' is not supported. "
+                "Only 'basic' and 'anyauth' are currently supported."
+            )
+
         if proxy_server_url.username is not None:
             proxy_headers = urllib3.make_headers(
                 proxy_basic_auth=f"{proxy_server_url.username}:{proxy_server_url.password or ''}"
             )
         else:
             proxy_headers = {}
-        manager = proxy_manager_cls(
+        base_manager = proxy_manager_cls(
             proxy_server, proxy_headers=proxy_headers, headers=headers, **kwargs
         )
     else:
         if pool_manager_cls is None:
             pool_manager_cls = urllib3.PoolManager
-        manager = pool_manager_cls(headers=headers, **kwargs)
+        base_manager = pool_manager_cls(headers=headers, **kwargs)
+
+    # Wrap with AuthCallbackPoolManager if callbacks are provided
+    if auth_callback is not None or proxy_auth_callback is not None:
+        return AuthCallbackPoolManager(
+            base_manager,
+            auth_callback=auth_callback,
+            proxy_auth_callback=proxy_auth_callback,
+        )
 
-    return manager
+    return base_manager
 
 
 def check_for_proxy_bypass(base_url: str | None) -> bool:
@@ -4345,13 +4470,30 @@ def _wrap_urllib3_exceptions(
 
 
 class Urllib3HttpGitClient(AbstractHttpGitClient):
-    """Git client that uses urllib3 for HTTP(S) connections."""
+    """HTTP Git client using urllib3.
+
+    Supports callback-based authentication for both HTTP and proxy authentication,
+    allowing dynamic credential handling without intercepting exceptions.
+
+    Example:
+        >>> def auth_callback(url, www_authenticate, attempt):
+        ...     # Parse www_authenticate header to determine auth scheme
+        ...     # Return credentials or None to cancel
+        ...     return {"username": "user", "password": "pass"}
+        >>>
+        >>> client = Urllib3HttpGitClient(
+        ...     "https://github.com/private/repo.git",
+        ...     auth_callback=auth_callback
+        ... )
+    """
+
+    pool_manager: "urllib3.PoolManager | urllib3.ProxyManager | AuthCallbackPoolManager"
 
     def __init__(
         self,
         base_url: str,
         dumb: bool | None = None,
-        pool_manager: "urllib3.PoolManager | None" = None,
+        pool_manager: "urllib3.PoolManager | urllib3.ProxyManager | AuthCallbackPoolManager | None" = None,
         config: Config | None = None,
         username: str | None = None,
         password: str | None = None,
@@ -4361,16 +4503,27 @@ class Urllib3HttpGitClient(AbstractHttpGitClient):
         report_activity: Callable[[int, str], None] | None = None,
         quiet: bool = False,
         include_tags: bool = False,
+        auth_callback: Callable[[str, str, int], dict[str, str] | None] | None = None,
+        proxy_auth_callback: Callable[[str, str, int], dict[str, str] | None]
+        | None = None,
     ) -> None:
         """Initialize Urllib3HttpGitClient."""
         self._timeout = timeout
         self._extra_headers = extra_headers or {}
+        self._auth_callback = auth_callback
+        self._proxy_auth_callback = proxy_auth_callback
 
         if pool_manager is None:
             self.pool_manager = default_urllib3_manager(
-                config, base_url=base_url, timeout=timeout
+                config,
+                base_url=base_url,
+                timeout=timeout,
+                auth_callback=auth_callback,
+                proxy_auth_callback=proxy_auth_callback,
             )
         else:
+            # Use provided pool manager as-is
+            # If you want callbacks with a custom pool manager, wrap it yourself
             self.pool_manager = pool_manager
 
         if username is not None:
@@ -4442,15 +4595,10 @@ class Urllib3HttpGitClient(AbstractHttpGitClient):
             if resp.status != 200:
                 raise GitProtocolError(f"unexpected http resp {resp.status} for {url}")
 
-        resp.content_type = resp.headers.get("Content-Type")  # type: ignore[attr-defined]
-        # Check if geturl() is available (urllib3 version >= 1.23)
-        try:
-            resp_url = resp.geturl()
-        except AttributeError:
-            # get_redirect_location() is available for urllib3 >= 1.1
-            resp.redirect_location = resp.get_redirect_location()  # type: ignore[attr-defined]
-        else:
-            resp.redirect_location = resp_url if resp_url != url else ""  # type: ignore[attr-defined]
+        # With urllib3 >= 2.2, geturl() is always available
+        resp.content_type = resp.headers.get("Content-Type")  # type: ignore[union-attr]
+        resp_url = resp.geturl()
+        resp.redirect_location = resp_url if resp_url != url else ""  # type: ignore[union-attr]
         return resp, _wrap_urllib3_exceptions(resp.read)  # type: ignore[return-value]
 
 

+ 7 - 2
dulwich/lfs.py

@@ -62,6 +62,7 @@ logger = logging.getLogger(__name__)
 if TYPE_CHECKING:
     import urllib3
 
+    from .client import AuthCallbackPoolManager
     from .config import Config
     from .repo import Repo
 
@@ -511,9 +512,13 @@ class HTTPLFSClient(LFSClient):
             config: Optional git config for authentication/proxy settings
         """
         super().__init__(url, config)
-        self._pool_manager: urllib3.PoolManager | None = None
+        self._pool_manager: (
+            urllib3.PoolManager | urllib3.ProxyManager | AuthCallbackPoolManager | None
+        ) = None
 
-    def _get_pool_manager(self) -> "urllib3.PoolManager":
+    def _get_pool_manager(
+        self,
+    ) -> "urllib3.PoolManager | urllib3.ProxyManager | AuthCallbackPoolManager":
         """Get urllib3 pool manager with git config applied."""
         if self._pool_manager is None:
             from dulwich.client import default_urllib3_manager

+ 81 - 0
examples/auth_callback.py

@@ -0,0 +1,81 @@
+#!/usr/bin/env python3
+"""Example of using callback-based authentication with dulwich HTTP client.
+
+This example demonstrates how to use the new callback-based authentication
+feature to handle HTTP and proxy authentication dynamically.
+
+Note: Dulwich currently supports 'basic' and 'anyauth' proxy authentication
+methods via the http.proxyAuthMethod git config option or the
+GIT_HTTP_PROXY_AUTHMETHOD environment variable. Other methods like 'digest',
+'negotiate', and 'ntlm' will raise NotImplementedError.
+"""
+
+from dulwich.client import HttpGitClient
+
+
+def my_auth_callback(url, www_authenticate, attempt):
+    """Callback function for HTTP authentication.
+
+    Args:
+        url: The URL that requires authentication
+        www_authenticate: The WWW-Authenticate header value from the server
+        attempt: The attempt number (starts at 1)
+
+    Returns:
+        dict: Credentials with 'username' and 'password' keys, or None to cancel
+    """
+    print(f"Authentication required for {url}")
+    print(f"Server says: {www_authenticate}")
+    print(f"Attempt {attempt} of 3")
+
+    # In a real application, you might:
+    # - Prompt the user for credentials
+    # - Look up credentials in a password manager
+    # - Parse the www_authenticate header to determine the auth scheme
+
+    if attempt <= 2:
+        # Example: return hardcoded credentials for demo
+        return {"username": "myuser", "password": "mypassword"}
+    else:
+        # Give up after 2 attempts
+        return None
+
+
+def my_proxy_auth_callback(url, proxy_authenticate, attempt):
+    """Callback function for proxy authentication.
+
+    Args:
+        url: The URL being accessed through the proxy
+        proxy_authenticate: The Proxy-Authenticate header value from the proxy
+        attempt: The attempt number (starts at 1)
+
+    Returns:
+        dict: Credentials with 'username' and 'password' keys, or None to cancel
+    """
+    print(f"Proxy authentication required for accessing {url}")
+    print(f"Proxy says: {proxy_authenticate}")
+
+    # Return proxy credentials
+    return {"username": "proxyuser", "password": "proxypass"}
+
+
+def main():
+    # Create an HTTP Git client with authentication callbacks
+    client = HttpGitClient(
+        "https://github.com/private/repo.git",
+        auth_callback=my_auth_callback,
+        proxy_auth_callback=my_proxy_auth_callback,
+    )
+
+    # Now when you use the client, it will call your callbacks
+    # if authentication is required
+    try:
+        # Example: fetch refs from the repository
+        refs = client.fetch_refs("https://github.com/private/repo.git")
+        print(f"Successfully fetched refs: {refs}")
+    except Exception as e:
+        print(f"Error: {e}")
+
+
+if __name__ == "__main__":
+    main()

+ 199 - 7
tests/test_client.py

@@ -27,7 +27,7 @@ import tempfile
 import warnings
 from io import BytesIO
 from typing import NoReturn
-from unittest.mock import patch
+from unittest.mock import MagicMock, Mock, patch
 from urllib.parse import quote as urlquote
 from urllib.parse import urlparse
 
@@ -35,6 +35,7 @@ import dulwich
 from dulwich import client
 from dulwich.bundle import create_bundle_from_repo, write_bundle
 from dulwich.client import (
+    AuthCallbackPoolManager,
     BundleClient,
     FetchPackResult,
     GitProtocolError,
@@ -690,7 +691,6 @@ class TestGetTransportAndPath(TestCase):
 
     def test_ssh_with_config(self) -> None:
         # Test that core.sshCommand from config is passed to SSHGitClient
-        from dulwich.config import ConfigDict
 
         config = ConfigDict()
         c, _path = get_transport_and_path(
@@ -1018,7 +1018,6 @@ class SSHGitClientTests(TestCase):
 
     def test_ssh_command_config(self) -> None:
         # Test core.sshCommand config setting
-        from dulwich.config import ConfigDict
 
         # No config, no environment - should default to "ssh"
         self.overrideEnv("GIT_SSH", None)
@@ -1884,7 +1883,6 @@ class HttpGitClientTests(TestCase):
 
     def test_timeout_from_config(self) -> None:
         """Test that timeout can be configured via git config."""
-        from dulwich.config import ConfigDict
 
         url = "https://github.com/jelmer/dulwich"
         config = ConfigDict()
@@ -1898,7 +1896,6 @@ class HttpGitClientTests(TestCase):
 
     def test_timeout_parameter_precedence(self) -> None:
         """Test that explicit timeout parameter takes precedence over config."""
-        from dulwich.config import ConfigDict
 
         url = "https://github.com/jelmer/dulwich"
         config = ConfigDict()
@@ -2192,6 +2189,24 @@ class HttpGitClientTests(TestCase):
         # Verify the config was passed through (this was the bug - it wasn't passed to subclasses before)
         self.assertIsNotNone(client.config)
 
+    def test_auth_callbacks(self) -> None:
+        url = "https://github.com/jelmer/dulwich"
+
+        def auth_callback(url, www_authenticate, attempt):
+            return {"username": "user", "password": "pass"}
+
+        def proxy_auth_callback(url, proxy_authenticate, attempt):
+            return {"username": "proxy_user", "password": "proxy_pass"}
+
+        c = HttpGitClient(
+            url, auth_callback=auth_callback, proxy_auth_callback=proxy_auth_callback
+        )
+
+        # Check that the pool manager is wrapped with AuthCallbackPoolManager
+        self.assertIsInstance(c.pool_manager, AuthCallbackPoolManager)
+        self.assertEqual(c._auth_callback, auth_callback)
+        self.assertEqual(c._proxy_auth_callback, proxy_auth_callback)
+
 
 class TCPGitClientTests(TestCase):
     def test_get_url(self) -> None:
@@ -2238,11 +2253,190 @@ class TCPGitClientTests(TestCase):
         self.assertEqual("git://[2001:db8::1]/jelmer/dulwich", url)
 
 
+class AuthCallbackPoolManagerTest(TestCase):
+    def test_http_auth_callback(self) -> None:
+        # Create a mock pool manager
+        mock_pool_manager = Mock()
+        mock_response = Mock()
+
+        # First request returns 401
+        mock_response.status = 401
+        mock_response.headers = {"WWW-Authenticate": 'Basic realm="test"'}
+
+        # Second request (after auth) returns 200
+        mock_response_success = Mock()
+        mock_response_success.status = 200
+        mock_response_success.headers = {}
+
+        mock_pool_manager.request = MagicMock(
+            side_effect=[mock_response, mock_response_success]
+        )
+
+        # Auth callback that returns credentials
+        def auth_callback(url, www_authenticate, attempt):
+            if attempt == 1:
+                return {"username": "testuser", "password": "testpass"}
+            return None
+
+        # Create the wrapper
+        auth_manager = AuthCallbackPoolManager(
+            mock_pool_manager, auth_callback=auth_callback
+        )
+
+        # Make request
+        result = auth_manager.request("GET", "https://example.com/test")
+
+        # Verify two requests were made
+        self.assertEqual(mock_pool_manager.request.call_count, 2)
+
+        # Verify auth headers were added on second request
+        second_call_kwargs = mock_pool_manager.request.call_args_list[1][1]
+        self.assertIn("headers", second_call_kwargs)
+        # urllib3 returns lowercase header names
+        self.assertIn("authorization", second_call_kwargs["headers"])
+
+        # Result should be the successful response
+        self.assertEqual(result, mock_response_success)
+
+    def test_proxy_auth_callback(self) -> None:
+        # Create a mock pool manager
+        mock_pool_manager = Mock()
+        mock_response = Mock()
+
+        # First request returns 407
+        mock_response.status = 407
+        mock_response.headers = {"Proxy-Authenticate": 'Basic realm="proxy"'}
+
+        # Second request (after auth) returns 200
+        mock_response_success = Mock()
+        mock_response_success.status = 200
+        mock_response_success.headers = {}
+
+        mock_pool_manager.request = MagicMock(
+            side_effect=[mock_response, mock_response_success]
+        )
+
+        # Proxy auth callback that returns credentials
+        def proxy_auth_callback(url, proxy_authenticate, attempt):
+            if attempt == 1:
+                return {"username": "proxyuser", "password": "proxypass"}
+            return None
+
+        # Create the wrapper
+        auth_manager = AuthCallbackPoolManager(
+            mock_pool_manager, proxy_auth_callback=proxy_auth_callback
+        )
+
+        # Make request
+        result = auth_manager.request("GET", "https://example.com/test")
+
+        # Verify two requests were made
+        self.assertEqual(mock_pool_manager.request.call_count, 2)
+
+        # Verify proxy auth headers were added on second request
+        second_call_kwargs = mock_pool_manager.request.call_args_list[1][1]
+        self.assertIn("headers", second_call_kwargs)
+        # urllib3 returns lowercase header names
+        self.assertIn("proxy-authorization", second_call_kwargs["headers"])
+
+        # Result should be the successful response
+        self.assertEqual(result, mock_response_success)
+
+    def test_max_attempts(self) -> None:
+        # Create a mock pool manager that always returns 401
+        mock_pool_manager = Mock()
+        mock_response = Mock()
+        mock_response.status = 401
+        mock_response.headers = {"WWW-Authenticate": 'Basic realm="test"'}
+        mock_pool_manager.request.return_value = mock_response
+
+        # Auth callback that always returns credentials
+        def auth_callback(url, www_authenticate, attempt):
+            return {"username": "user", "password": "pass"}
+
+        # Create the wrapper
+        auth_manager = AuthCallbackPoolManager(
+            mock_pool_manager, auth_callback=auth_callback
+        )
+
+        # Make request
+        result = auth_manager.request("GET", "https://example.com/test")
+
+        # Should have made 3 attempts (initial + 2 retries)
+        self.assertEqual(mock_pool_manager.request.call_count, 3)
+
+        # Result should be the last 401 response
+        self.assertEqual(result.status, 401)
+
+
 class DefaultUrllib3ManagerTest(TestCase):
     def test_no_config(self) -> None:
         manager = default_urllib3_manager(config=None)
         self.assertEqual(manager.connection_pool_kw["cert_reqs"], "CERT_REQUIRED")
 
+    def test_auth_callbacks(self) -> None:
+        def auth_callback(url, www_authenticate, attempt):
+            return {"username": "user", "password": "pass"}
+
+        def proxy_auth_callback(url, proxy_authenticate, attempt):
+            return {"username": "proxy_user", "password": "proxy_pass"}
+
+        manager = default_urllib3_manager(
+            config=None,
+            auth_callback=auth_callback,
+            proxy_auth_callback=proxy_auth_callback,
+        )
+        self.assertIsInstance(manager, AuthCallbackPoolManager)
+        self.assertEqual(manager._auth_callback, auth_callback)
+        self.assertEqual(manager._proxy_auth_callback, proxy_auth_callback)
+
+    def test_proxy_auth_method_unsupported(self) -> None:
+        import os
+
+        # Test with config
+        config = ConfigDict()
+        config.set((b"http",), b"proxy", b"http://user@proxy.example.com:8080")
+        config.set((b"http",), b"proxyAuthMethod", b"digest")
+
+        with self.assertRaises(NotImplementedError) as cm:
+            default_urllib3_manager(config=config)
+
+        self.assertIn("digest", str(cm.exception))
+        self.assertIn("not supported", str(cm.exception))
+
+        # Test with environment variable
+        config = ConfigDict()
+        config.set((b"http",), b"proxy", b"http://user@proxy.example.com:8080")
+
+        old_env = os.environ.get("GIT_HTTP_PROXY_AUTHMETHOD")
+        try:
+            os.environ["GIT_HTTP_PROXY_AUTHMETHOD"] = "ntlm"
+            with self.assertRaises(NotImplementedError) as cm:
+                default_urllib3_manager(config=config)
+
+            self.assertIn("ntlm", str(cm.exception))
+            self.assertIn("not supported", str(cm.exception))
+        finally:
+            if old_env is None:
+                os.environ.pop("GIT_HTTP_PROXY_AUTHMETHOD", None)
+            else:
+                os.environ["GIT_HTTP_PROXY_AUTHMETHOD"] = old_env
+
+    def test_proxy_auth_method_supported(self) -> None:
+        # Test basic auth method
+        config = ConfigDict()
+        config.set((b"http",), b"proxy", b"http://user@proxy.example.com:8080")
+        config.set((b"http",), b"proxyAuthMethod", b"basic")
+
+        # Should not raise
+        manager = default_urllib3_manager(config=config)
+        self.assertIsNotNone(manager)
+
+        # Test anyauth (default)
+        config.set((b"http",), b"proxyAuthMethod", b"anyauth")
+        manager = default_urllib3_manager(config=config)
+        self.assertIsNotNone(manager)
+
     def test_config_no_proxy(self) -> None:
         import urllib3
 
@@ -2485,7 +2679,6 @@ class DefaultUrllib3ManagerTest(TestCase):
 
     def test_timeout_from_config(self) -> None:
         """Test that timeout can be configured via git config."""
-        from dulwich.config import ConfigDict
 
         config = ConfigDict()
         config.set((b"http",), b"timeout", b"25")
@@ -2495,7 +2688,6 @@ class DefaultUrllib3ManagerTest(TestCase):
 
     def test_timeout_parameter_precedence(self) -> None:
         """Test that explicit timeout parameter takes precedence over config."""
-        from dulwich.config import ConfigDict
 
         config = ConfigDict()
         config.set((b"http",), b"timeout", b"25")