lfs.py 19 KB


  1. # lfs.py -- Implementation of the LFS
  2. # Copyright (C) 2020 Jelmer Vernooij
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Git Large File Storage (LFS) support.
  22. This module provides support for Git LFS, which is a Git extension for
  23. versioning large files. It replaces large files with text pointers inside Git,
  24. while storing the file contents on a remote server.
  25. Key components:
  26. - LFS pointer file parsing and creation
  27. - LFS object storage and retrieval
  28. - HTTP client for LFS server communication
  29. - Integration with dulwich repositories
  30. """
  31. import hashlib
  32. import json
  33. import logging
  34. import os
  35. import tempfile
  36. from collections.abc import Iterable
  37. from dataclasses import dataclass
  38. from typing import TYPE_CHECKING, BinaryIO, Optional, Union
  39. from urllib.parse import urljoin, urlparse
  40. from urllib.request import Request, urlopen
  41. logger = logging.getLogger(__name__)
  42. if TYPE_CHECKING:
  43. import urllib3
  44. from .config import Config
  45. from .repo import Repo
  46. @dataclass
  47. class LFSAction:
  48. """LFS action structure."""
  49. href: str
  50. header: Optional[dict[str, str]] = None
  51. expires_at: Optional[str] = None
  52. @dataclass
  53. class LFSErrorInfo:
  54. """LFS error structure."""
  55. code: int
  56. message: str
  57. @dataclass
  58. class LFSBatchObject:
  59. """LFS batch object structure."""
  60. oid: str
  61. size: int
  62. authenticated: Optional[bool] = None
  63. actions: Optional[dict[str, LFSAction]] = None
  64. error: Optional[LFSErrorInfo] = None
  65. @dataclass
  66. class LFSBatchResponse:
  67. """LFS batch response structure."""
  68. transfer: str
  69. objects: list[LFSBatchObject]
  70. hash_algo: Optional[str] = None
  71. class LFSStore:
  72. """Stores objects on disk, indexed by SHA256."""
  73. def __init__(self, path: str) -> None:
  74. """Initialize LFSStore."""
  75. self.path = path
  76. @classmethod
  77. def create(cls, lfs_dir: str) -> "LFSStore":
  78. """Create a new LFS store."""
  79. if not os.path.isdir(lfs_dir):
  80. os.mkdir(lfs_dir)
  81. tmp_dir = os.path.join(lfs_dir, "tmp")
  82. if not os.path.isdir(tmp_dir):
  83. os.mkdir(tmp_dir)
  84. objects_dir = os.path.join(lfs_dir, "objects")
  85. if not os.path.isdir(objects_dir):
  86. os.mkdir(objects_dir)
  87. return cls(lfs_dir)
  88. @classmethod
  89. def from_repo(cls, repo: "Repo", create: bool = False) -> "LFSStore":
  90. """Create LFS store from repository."""
  91. lfs_dir = os.path.join(repo.controldir(), "lfs")
  92. if create:
  93. return cls.create(lfs_dir)
  94. return cls(lfs_dir)
  95. @classmethod
  96. def from_controldir(cls, controldir: str, create: bool = False) -> "LFSStore":
  97. """Create LFS store from control directory."""
  98. lfs_dir = os.path.join(controldir, "lfs")
  99. if create:
  100. return cls.create(lfs_dir)
  101. return cls(lfs_dir)
  102. def _sha_path(self, sha: str) -> str:
  103. return os.path.join(self.path, "objects", sha[0:2], sha[2:4], sha)
  104. def open_object(self, sha: str) -> BinaryIO:
  105. """Open an object by sha."""
  106. try:
  107. return open(self._sha_path(sha), "rb")
  108. except FileNotFoundError as exc:
  109. raise KeyError(sha) from exc
  110. def write_object(self, chunks: Iterable[bytes]) -> str:
  111. """Write an object.
  112. Returns: object SHA
  113. """
  114. sha = hashlib.sha256()
  115. tmpdir = os.path.join(self.path, "tmp")
  116. with tempfile.NamedTemporaryFile(dir=tmpdir, mode="wb", delete=False) as f:
  117. for chunk in chunks:
  118. sha.update(chunk)
  119. f.write(chunk)
  120. f.flush()
  121. tmppath = f.name
  122. path = self._sha_path(sha.hexdigest())
  123. if not os.path.exists(os.path.dirname(path)):
  124. os.makedirs(os.path.dirname(path))
  125. # Handle concurrent writes - if file already exists, just remove temp file
  126. if os.path.exists(path):
  127. os.remove(tmppath)
  128. else:
  129. os.rename(tmppath, path)
  130. return sha.hexdigest()
  131. class LFSPointer:
  132. """Represents an LFS pointer file."""
  133. def __init__(self, oid: str, size: int) -> None:
  134. """Initialize LFSPointer."""
  135. self.oid = oid
  136. self.size = size
  137. @classmethod
  138. def from_bytes(cls, data: bytes) -> Optional["LFSPointer"]:
  139. """Parse LFS pointer from bytes.
  140. Returns None if data is not a valid LFS pointer.
  141. """
  142. try:
  143. text = data.decode("utf-8")
  144. except UnicodeDecodeError:
  145. return None
  146. # LFS pointer files have a specific format
  147. lines = text.strip().split("\n")
  148. if len(lines) < 3:
  149. return None
  150. # Must start with version
  151. if not lines[0].startswith("version https://git-lfs.github.com/spec/v1"):
  152. return None
  153. oid = None
  154. size = None
  155. for line in lines[1:]:
  156. if line.startswith("oid sha256:"):
  157. oid = line[11:].strip()
  158. elif line.startswith("size "):
  159. try:
  160. size = int(line[5:].strip())
  161. # Size must be non-negative
  162. if size < 0:
  163. return None
  164. except ValueError:
  165. return None
  166. if oid is None or size is None:
  167. return None
  168. return cls(oid, size)
  169. def to_bytes(self) -> bytes:
  170. """Convert LFS pointer to bytes."""
  171. return (
  172. f"version https://git-lfs.github.com/spec/v1\n"
  173. f"oid sha256:{self.oid}\n"
  174. f"size {self.size}\n"
  175. ).encode()
  176. def is_valid_oid(self) -> bool:
  177. """Check if the OID is valid SHA256."""
  178. if len(self.oid) != 64:
  179. return False
  180. try:
  181. int(self.oid, 16)
  182. return True
  183. except ValueError:
  184. return False
  185. class LFSFilterDriver:
  186. """LFS filter driver implementation."""
  187. def __init__(
  188. self, lfs_store: "LFSStore", config: Optional["Config"] = None
  189. ) -> None:
  190. """Initialize LFSFilterDriver."""
  191. self.lfs_store = lfs_store
  192. self.config = config
  193. def clean(self, data: bytes) -> bytes:
  194. """Convert file content to LFS pointer (clean filter)."""
  195. # Check if data is already an LFS pointer
  196. pointer = LFSPointer.from_bytes(data)
  197. if pointer is not None:
  198. return data
  199. # Store the file content in LFS
  200. sha = self.lfs_store.write_object([data])
  201. # Create and return LFS pointer
  202. pointer = LFSPointer(sha, len(data))
  203. return pointer.to_bytes()
  204. def smudge(self, data: bytes, path: bytes = b"") -> bytes:
  205. """Convert LFS pointer to file content (smudge filter)."""
  206. # Try to parse as LFS pointer
  207. pointer = LFSPointer.from_bytes(data)
  208. if pointer is None:
  209. # Not an LFS pointer, return as-is
  210. return data
  211. # Validate the pointer
  212. if not pointer.is_valid_oid():
  213. return data
  214. try:
  215. # Read the actual content from LFS store
  216. with self.lfs_store.open_object(pointer.oid) as f:
  217. return f.read()
  218. except KeyError:
  219. # Object not found in LFS store, try to download it
  220. try:
  221. content = self._download_object(pointer)
  222. return content
  223. except LFSError as e:
  224. # Download failed, fall back to returning pointer
  225. logger.warning("LFS object download failed for %s: %s", pointer.oid, e)
  226. # Return pointer as-is when object is missing and download failed
  227. return data
  228. def _download_object(self, pointer: LFSPointer) -> bytes:
  229. """Download an LFS object from the server.
  230. Args:
  231. pointer: LFS pointer containing OID and size
  232. Returns:
  233. Downloaded content
  234. Raises:
  235. LFSError: If download fails for any reason
  236. """
  237. if self.config is None:
  238. raise LFSError("No configuration available for LFS download")
  239. # Create LFS client and download
  240. client = LFSClient.from_config(self.config)
  241. if client is None:
  242. raise LFSError("No LFS client available from configuration")
  243. content = client.download(pointer.oid, pointer.size)
  244. # Store the downloaded content in local LFS store
  245. stored_oid = self.lfs_store.write_object([content])
  246. # Verify the stored OID matches what we expected
  247. if stored_oid != pointer.oid:
  248. raise LFSError(
  249. f"Downloaded OID mismatch: expected {pointer.oid}, got {stored_oid}"
  250. )
  251. return content
  252. def _get_lfs_user_agent(config: Optional["Config"]) -> str:
  253. """Get User-Agent string for LFS requests, respecting git config."""
  254. try:
  255. if config:
  256. # Use configured user agent verbatim if set
  257. return config.get(b"http", b"useragent").decode()
  258. except KeyError:
  259. pass
  260. # Default LFS user agent (similar to git-lfs format)
  261. from . import __version__
  262. version_str = ".".join([str(x) for x in __version__])
  263. return f"git-lfs/dulwich/{version_str}"
  264. class LFSClient:
  265. """LFS client for network operations."""
  266. def __init__(self, url: str, config: Optional["Config"] = None) -> None:
  267. """Initialize LFS client.
  268. Args:
  269. url: LFS server URL
  270. config: Optional git config for authentication/proxy settings
  271. """
  272. self._base_url = url.rstrip("/") + "/" # Ensure trailing slash for urljoin
  273. self.config = config
  274. self._pool_manager: Optional[urllib3.PoolManager] = None
  275. @classmethod
  276. def from_config(cls, config: "Config") -> Optional["LFSClient"]:
  277. """Create LFS client from git config."""
  278. # Try to get LFS URL from config first
  279. try:
  280. url = config.get((b"lfs",), b"url").decode()
  281. except KeyError:
  282. pass
  283. else:
  284. return cls(url, config)
  285. # Fall back to deriving from remote URL (same as git-lfs)
  286. try:
  287. remote_url = config.get((b"remote", b"origin"), b"url").decode()
  288. except KeyError:
  289. pass
  290. else:
  291. # Convert SSH URLs to HTTPS if needed
  292. if remote_url.startswith("git@"):
  293. # Convert git@host:user/repo.git to https://host/user/repo.git
  294. if ":" in remote_url and "/" in remote_url:
  295. host_and_path = remote_url[4:] # Remove "git@"
  296. if ":" in host_and_path:
  297. host, path = host_and_path.split(":", 1)
  298. remote_url = f"https://{host}/{path}"
  299. # Ensure URL ends with .git for consistent LFS endpoint
  300. if not remote_url.endswith(".git"):
  301. remote_url = f"{remote_url}.git"
  302. # Standard LFS endpoint is remote_url + "/info/lfs"
  303. lfs_url = f"{remote_url}/info/lfs"
  304. parsed = urlparse(lfs_url)
  305. if not parsed.scheme or not parsed.netloc:
  306. return None
  307. return LFSClient(lfs_url, config)
  308. return None
  309. @property
  310. def url(self) -> str:
  311. """Get the LFS server URL without trailing slash."""
  312. return self._base_url.rstrip("/")
  313. def _get_pool_manager(self) -> "urllib3.PoolManager":
  314. """Get urllib3 pool manager with git config applied."""
  315. if self._pool_manager is None:
  316. from dulwich.client import default_urllib3_manager
  317. self._pool_manager = default_urllib3_manager(self.config) # type: ignore[assignment]
  318. return self._pool_manager
  319. def _make_request(
  320. self,
  321. method: str,
  322. path: str,
  323. data: Optional[bytes] = None,
  324. headers: Optional[dict[str, str]] = None,
  325. ) -> bytes:
  326. """Make an HTTP request to the LFS server."""
  327. url = urljoin(self._base_url, path)
  328. req_headers = {
  329. "Accept": "application/vnd.git-lfs+json",
  330. "Content-Type": "application/vnd.git-lfs+json",
  331. "User-Agent": _get_lfs_user_agent(self.config),
  332. }
  333. if headers:
  334. req_headers.update(headers)
  335. # Use urllib3 pool manager with git config applied
  336. pool_manager = self._get_pool_manager()
  337. response = pool_manager.request(method, url, headers=req_headers, body=data)
  338. if response.status >= 400:
  339. raise ValueError(
  340. f"HTTP {response.status}: {response.data.decode('utf-8', errors='ignore')}"
  341. )
  342. return response.data # type: ignore[return-value]
  343. def batch(
  344. self,
  345. operation: str,
  346. objects: list[dict[str, Union[str, int]]],
  347. ref: Optional[str] = None,
  348. ) -> LFSBatchResponse:
  349. """Perform batch operation to get transfer URLs.
  350. Args:
  351. operation: "download" or "upload"
  352. objects: List of {"oid": str, "size": int} dicts
  353. ref: Optional ref name
  354. Returns:
  355. Batch response from server
  356. """
  357. data: dict[
  358. str, Union[str, list[str], list[dict[str, Union[str, int]]], dict[str, str]]
  359. ] = {
  360. "operation": operation,
  361. "transfers": ["basic"],
  362. "objects": objects,
  363. }
  364. if ref:
  365. data["ref"] = {"name": ref}
  366. response = self._make_request(
  367. "POST", "objects/batch", json.dumps(data).encode("utf-8")
  368. )
  369. if not response:
  370. raise ValueError("Empty response from LFS server")
  371. response_data = json.loads(response)
  372. return self._parse_batch_response(response_data)
  373. def _parse_batch_response(self, data: dict) -> LFSBatchResponse:
  374. """Parse JSON response into LFSBatchResponse dataclass."""
  375. objects = []
  376. for obj_data in data.get("objects", []):
  377. actions = None
  378. if "actions" in obj_data:
  379. actions = {}
  380. for action_name, action_data in obj_data["actions"].items():
  381. actions[action_name] = LFSAction(
  382. href=action_data["href"],
  383. header=action_data.get("header"),
  384. expires_at=action_data.get("expires_at"),
  385. )
  386. error = None
  387. if "error" in obj_data:
  388. error = LFSErrorInfo(
  389. code=obj_data["error"]["code"], message=obj_data["error"]["message"]
  390. )
  391. batch_obj = LFSBatchObject(
  392. oid=obj_data["oid"],
  393. size=obj_data["size"],
  394. authenticated=obj_data.get("authenticated"),
  395. actions=actions,
  396. error=error,
  397. )
  398. objects.append(batch_obj)
  399. return LFSBatchResponse(
  400. transfer=data.get("transfer", "basic"),
  401. objects=objects,
  402. hash_algo=data.get("hash_algo"),
  403. )
  404. def download(self, oid: str, size: int, ref: Optional[str] = None) -> bytes:
  405. """Download an LFS object.
  406. Args:
  407. oid: Object ID (SHA256)
  408. size: Expected size
  409. ref: Optional ref name
  410. Returns:
  411. Object content
  412. """
  413. # Get download URL via batch API
  414. batch_resp = self.batch("download", [{"oid": oid, "size": size}], ref)
  415. if not batch_resp.objects:
  416. raise LFSError(f"No objects returned for {oid}")
  417. obj = batch_resp.objects[0]
  418. if obj.error:
  419. raise LFSError(f"Server error for {oid}: {obj.error.message}")
  420. if not obj.actions or "download" not in obj.actions:
  421. raise LFSError(f"No download actions for {oid}")
  422. download_action = obj.actions["download"]
  423. download_url = download_action.href
  424. # Download the object using urllib3 with git config
  425. download_headers = {"User-Agent": _get_lfs_user_agent(self.config)}
  426. if download_action.header:
  427. download_headers.update(download_action.header)
  428. pool_manager = self._get_pool_manager()
  429. response = pool_manager.request("GET", download_url, headers=download_headers)
  430. content = response.data
  431. # Verify size
  432. if len(content) != size:
  433. raise LFSError(f"Downloaded size {len(content)} != expected {size}")
  434. # Verify SHA256
  435. actual_oid = hashlib.sha256(content).hexdigest()
  436. if actual_oid != oid:
  437. raise LFSError(f"Downloaded OID {actual_oid} != expected {oid}")
  438. return content # type: ignore[return-value]
  439. def upload(
  440. self, oid: str, size: int, content: bytes, ref: Optional[str] = None
  441. ) -> None:
  442. """Upload an LFS object.
  443. Args:
  444. oid: Object ID (SHA256)
  445. size: Object size
  446. content: Object content
  447. ref: Optional ref name
  448. """
  449. # Get upload URL via batch API
  450. batch_resp = self.batch("upload", [{"oid": oid, "size": size}], ref)
  451. if not batch_resp.objects:
  452. raise LFSError(f"No objects returned for {oid}")
  453. obj = batch_resp.objects[0]
  454. if obj.error:
  455. raise LFSError(f"Server error for {oid}: {obj.error.message}")
  456. # If no actions, object already exists
  457. if not obj.actions:
  458. return
  459. if "upload" not in obj.actions:
  460. raise LFSError(f"No upload action for {oid}")
  461. upload_action = obj.actions["upload"]
  462. upload_url = upload_action.href
  463. # Upload the object
  464. req = Request(upload_url, data=content, method="PUT")
  465. if upload_action.header:
  466. for name, value in upload_action.header.items():
  467. req.add_header(name, value)
  468. with urlopen(req) as response:
  469. if response.status >= 400:
  470. raise LFSError(f"Upload failed with status {response.status}")
  471. # Verify if needed
  472. if obj.actions and "verify" in obj.actions:
  473. verify_action = obj.actions["verify"]
  474. verify_data = json.dumps({"oid": oid, "size": size}).encode("utf-8")
  475. req = Request(verify_action.href, data=verify_data, method="POST")
  476. req.add_header("Content-Type", "application/vnd.git-lfs+json")
  477. if verify_action.header:
  478. for name, value in verify_action.header.items():
  479. req.add_header(name, value)
  480. with urlopen(req) as response:
  481. if response.status >= 400:
  482. raise LFSError(f"Verification failed with status {response.status}")
  483. class LFSError(Exception):
  484. """LFS-specific error."""