|
@@ -27,7 +27,7 @@ import tempfile
|
|
|
from collections.abc import Iterable
|
|
|
from dataclasses import dataclass
|
|
|
from typing import TYPE_CHECKING, BinaryIO, Optional, Union
|
|
|
-from urllib.parse import urljoin
|
|
|
+from urllib.parse import urljoin, urlparse
|
|
|
from urllib.request import Request, urlopen
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
@@ -97,6 +97,13 @@ class LFSStore:
|
|
|
return cls.create(lfs_dir)
|
|
|
return cls(lfs_dir)
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_controldir(cls, controldir: str, create: bool = False) -> "LFSStore":
|
|
|
+ lfs_dir = os.path.join(controldir, "lfs")
|
|
|
+ if create:
|
|
|
+ return cls.create(lfs_dir)
|
|
|
+ return cls(lfs_dir)
|
|
|
+
|
|
|
def _sha_path(self, sha: str) -> str:
|
|
|
return os.path.join(self.path, "objects", sha[0:2], sha[2:4], sha)
|
|
|
|
|
@@ -201,9 +208,11 @@ class LFSPointer:
|
|
|
class LFSFilterDriver:
|
|
|
"""LFS filter driver implementation."""
|
|
|
|
|
|
- def __init__(self, lfs_store: "LFSStore", repo: Optional["Repo"] = None) -> None:
|
|
|
+ def __init__(
|
|
|
+ self, lfs_store: "LFSStore", config: Optional["Config"] = None
|
|
|
+ ) -> None:
|
|
|
self.lfs_store = lfs_store
|
|
|
- self.repo = repo
|
|
|
+ self.config = config
|
|
|
|
|
|
def clean(self, data: bytes) -> bytes:
|
|
|
"""Convert file content to LFS pointer (clean filter)."""
|
|
@@ -243,10 +252,9 @@ class LFSFilterDriver:
|
|
|
except LFSError as e:
|
|
|
# Download failed, fall back to returning pointer
|
|
|
logging.warning("LFS object download failed for %s: %s", pointer.oid, e)
|
|
|
- pass
|
|
|
|
|
|
- # Return pointer as-is when object is missing and download failed
|
|
|
- return data
|
|
|
+ # Return pointer as-is when object is missing and download failed
|
|
|
+ return data
|
|
|
|
|
|
def _download_object(self, pointer: LFSPointer) -> bytes:
|
|
|
"""Download an LFS object from the server.
|
|
@@ -260,17 +268,13 @@ class LFSFilterDriver:
|
|
|
Raises:
|
|
|
LFSError: If download fails for any reason
|
|
|
"""
|
|
|
- if self.repo is None:
|
|
|
- raise LFSError("No repository available for LFS download")
|
|
|
-
|
|
|
- # Get LFS server URL from remote
|
|
|
- lfs_url = self._get_lfs_url()
|
|
|
- if not lfs_url:
|
|
|
- raise LFSError("No LFS server URL configured")
|
|
|
+ if self.config is None:
|
|
|
+ raise LFSError("No configuration available for LFS download")
|
|
|
|
|
|
# Create LFS client and download
|
|
|
- config = self.repo.get_config_stack() if self.repo else None
|
|
|
- client = LFSClient(lfs_url, config=config)
|
|
|
+ client = LFSClient.from_config(self.config)
|
|
|
+ if client is None:
|
|
|
+ raise LFSError("No LFS client available from configuration")
|
|
|
content = client.download(pointer.oid, pointer.size)
|
|
|
|
|
|
# Store the downloaded content in local LFS store
|
|
@@ -278,25 +282,53 @@ class LFSFilterDriver:
|
|
|
|
|
|
# Verify the stored OID matches what we expected
|
|
|
if stored_oid != pointer.oid:
|
|
|
- raise LFSError(f"Downloaded OID mismatch: expected {pointer.oid}, got {stored_oid}")
|
|
|
+ raise LFSError(
|
|
|
+ f"Downloaded OID mismatch: expected {pointer.oid}, got {stored_oid}"
|
|
|
+ )
|
|
|
|
|
|
return content
|
|
|
|
|
|
- def _get_lfs_url(self) -> Optional[str]:
|
|
|
- """Get LFS server URL from repository configuration.
|
|
|
|
|
|
- Returns:
|
|
|
- LFS server URL or None if not configured
|
|
|
+def _get_lfs_user_agent(config):
|
|
|
+ """Get User-Agent string for LFS requests, respecting git config."""
|
|
|
+ try:
|
|
|
+ if config:
|
|
|
+ # Use configured user agent verbatim if set
|
|
|
+ return config.get(b"http", b"useragent").decode()
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ # Default LFS user agent (similar to git-lfs format)
|
|
|
+ from . import __version__
|
|
|
+
|
|
|
+ version_str = ".".join([str(x) for x in __version__])
|
|
|
+ return f"git-lfs/dulwich/{version_str}"
|
|
|
+
|
|
|
+
|
|
|
+class LFSClient:
|
|
|
+ """LFS client for network operations."""
|
|
|
+
|
|
|
+ def __init__(self, url: str, config: Optional["Config"] = None) -> None:
|
|
|
+ """Initialize LFS client.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ url: LFS server URL
|
|
|
+ config: Optional git config for authentication/proxy settings
|
|
|
"""
|
|
|
- if self.repo is None:
|
|
|
- return None
|
|
|
+ self._base_url = url.rstrip("/") + "/" # Ensure trailing slash for urljoin
|
|
|
+ self.config = config
|
|
|
+ self._pool_manager = None
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def from_config(cls, config: "Config") -> Optional["LFSClient"]:
|
|
|
+ """Create LFS client from git config."""
|
|
|
# Try to get LFS URL from config first
|
|
|
- config = self.repo.get_config_stack()
|
|
|
try:
|
|
|
- return config.get((b"lfs",), b"url").decode()
|
|
|
+ url = config.get((b"lfs",), b"url").decode()
|
|
|
except KeyError:
|
|
|
pass
|
|
|
+ else:
|
|
|
+ return cls(url, config)
|
|
|
|
|
|
# Fall back to deriving from remote URL (same as git-lfs)
|
|
|
try:
|
|
@@ -319,47 +351,14 @@ class LFSFilterDriver:
|
|
|
|
|
|
# Standard LFS endpoint is remote_url + "/info/lfs"
|
|
|
lfs_url = f"{remote_url}/info/lfs"
|
|
|
-
|
|
|
- # Validate URL by parsing it
|
|
|
- from urllib.parse import urlparse
|
|
|
+
|
|
|
parsed = urlparse(lfs_url)
|
|
|
if not parsed.scheme or not parsed.netloc:
|
|
|
return None
|
|
|
-
|
|
|
- return lfs_url
|
|
|
-
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
-def _get_lfs_user_agent(config):
|
|
|
- """Get User-Agent string for LFS requests, respecting git config."""
|
|
|
- try:
|
|
|
- if config:
|
|
|
- # Use configured user agent verbatim if set
|
|
|
- return config.get(b"http", b"useragent").decode()
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
-
|
|
|
- # Default LFS user agent (similar to git-lfs format)
|
|
|
- from . import __version__
|
|
|
-
|
|
|
- version_str = ".".join([str(x) for x in __version__])
|
|
|
- return f"git-lfs/dulwich/{version_str}"
|
|
|
-
|
|
|
|
|
|
-class LFSClient:
|
|
|
- """LFS client for network operations."""
|
|
|
-
|
|
|
- def __init__(self, url: str, config: Optional["Config"] = None) -> None:
|
|
|
- """Initialize LFS client.
|
|
|
+ return LFSClient(lfs_url, config)
|
|
|
|
|
|
- Args:
|
|
|
- url: LFS server URL
|
|
|
- config: Optional git config for authentication/proxy settings
|
|
|
- """
|
|
|
- self._base_url = url.rstrip("/") + "/" # Ensure trailing slash for urljoin
|
|
|
- self.config = config
|
|
|
- self._pool_manager = None
|
|
|
+ return None
|
|
|
|
|
|
@property
|
|
|
def url(self) -> str:
|
|
@@ -369,11 +368,9 @@ class LFSClient:
|
|
|
def _get_pool_manager(self):
|
|
|
"""Get urllib3 pool manager with git config applied."""
|
|
|
if self._pool_manager is None:
|
|
|
- # For now, use plain urllib3 since dulwich's version has issues with LFS
|
|
|
- # TODO: Investigate why default_urllib3_manager breaks LFS requests
|
|
|
- import urllib3
|
|
|
+ from dulwich.client import default_urllib3_manager
|
|
|
|
|
|
- self._pool_manager = urllib3.PoolManager()
|
|
|
+ self._pool_manager = default_urllib3_manager(self.config)
|
|
|
return self._pool_manager
|
|
|
|
|
|
def _make_request(
|
|
@@ -397,7 +394,9 @@ class LFSClient:
|
|
|
pool_manager = self._get_pool_manager()
|
|
|
response = pool_manager.request(method, url, headers=req_headers, body=data)
|
|
|
if response.status >= 400:
|
|
|
- raise ValueError(f"HTTP {response.status}: {response.data.decode('utf-8', errors='ignore')}")
|
|
|
+ raise ValueError(
|
|
|
+ f"HTTP {response.status}: {response.data.decode('utf-8', errors='ignore')}"
|
|
|
+ )
|
|
|
return response.data
|
|
|
|
|
|
def batch(
|