lfs.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. # lfs.py -- Implementation of the LFS
  2. # Copyright (C) 2020 Jelmer Vernooij
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Git Large File Storage (LFS) support.
  22. This module provides support for Git LFS, which is a Git extension for
  23. versioning large files. It replaces large files with text pointers inside Git,
  24. while storing the file contents on a remote server.
  25. Key components:
  26. - LFS pointer file parsing and creation
  27. - LFS object storage and retrieval
  28. - HTTP client for LFS server communication
  29. - Integration with dulwich repositories
  30. """
  31. import hashlib
  32. import json
  33. import logging
  34. import os
  35. import tempfile
  36. from collections.abc import Iterable
  37. from dataclasses import dataclass
  38. from typing import TYPE_CHECKING, BinaryIO, Optional, Union
  39. from urllib.parse import urljoin, urlparse
  40. from urllib.request import Request, urlopen
  41. logger = logging.getLogger(__name__)
  42. if TYPE_CHECKING:
  43. import urllib3
  44. from .config import Config
  45. from .repo import Repo
  46. @dataclass
  47. class LFSAction:
  48. """LFS action structure."""
  49. href: str
  50. header: Optional[dict[str, str]] = None
  51. expires_at: Optional[str] = None
  52. @dataclass
  53. class LFSErrorInfo:
  54. """LFS error structure."""
  55. code: int
  56. message: str
  57. @dataclass
  58. class LFSBatchObject:
  59. """LFS batch object structure."""
  60. oid: str
  61. size: int
  62. authenticated: Optional[bool] = None
  63. actions: Optional[dict[str, LFSAction]] = None
  64. error: Optional[LFSErrorInfo] = None
  65. @dataclass
  66. class LFSBatchResponse:
  67. """LFS batch response structure."""
  68. transfer: str
  69. objects: list[LFSBatchObject]
  70. hash_algo: Optional[str] = None
  71. class LFSStore:
  72. """Stores objects on disk, indexed by SHA256."""
  73. def __init__(self, path: str) -> None:
  74. """Initialize LFSStore."""
  75. self.path = path
  76. @classmethod
  77. def create(cls, lfs_dir: str) -> "LFSStore":
  78. """Create a new LFS store."""
  79. if not os.path.isdir(lfs_dir):
  80. os.mkdir(lfs_dir)
  81. tmp_dir = os.path.join(lfs_dir, "tmp")
  82. if not os.path.isdir(tmp_dir):
  83. os.mkdir(tmp_dir)
  84. objects_dir = os.path.join(lfs_dir, "objects")
  85. if not os.path.isdir(objects_dir):
  86. os.mkdir(objects_dir)
  87. return cls(lfs_dir)
  88. @classmethod
  89. def from_repo(cls, repo: "Repo", create: bool = False) -> "LFSStore":
  90. """Create LFS store from repository."""
  91. lfs_dir = os.path.join(repo.controldir(), "lfs")
  92. if create:
  93. return cls.create(lfs_dir)
  94. return cls(lfs_dir)
  95. @classmethod
  96. def from_controldir(cls, controldir: str, create: bool = False) -> "LFSStore":
  97. """Create LFS store from control directory."""
  98. lfs_dir = os.path.join(controldir, "lfs")
  99. if create:
  100. return cls.create(lfs_dir)
  101. return cls(lfs_dir)
  102. def _sha_path(self, sha: str) -> str:
  103. return os.path.join(self.path, "objects", sha[0:2], sha[2:4], sha)
  104. def open_object(self, sha: str) -> BinaryIO:
  105. """Open an object by sha."""
  106. try:
  107. return open(self._sha_path(sha), "rb")
  108. except FileNotFoundError as exc:
  109. raise KeyError(sha) from exc
  110. def write_object(self, chunks: Iterable[bytes]) -> str:
  111. """Write an object.
  112. Returns: object SHA
  113. """
  114. # First pass: compute SHA256 and collect data
  115. sha = hashlib.sha256()
  116. data_chunks = []
  117. for chunk in chunks:
  118. sha.update(chunk)
  119. data_chunks.append(chunk)
  120. sha_hex = sha.hexdigest()
  121. path = self._sha_path(sha_hex)
  122. # If object already exists, no need to write
  123. if os.path.exists(path):
  124. return sha_hex
  125. # Object doesn't exist, write it
  126. if not os.path.exists(os.path.dirname(path)):
  127. os.makedirs(os.path.dirname(path))
  128. tmpdir = os.path.join(self.path, "tmp")
  129. with tempfile.NamedTemporaryFile(dir=tmpdir, mode="wb", delete=False) as f:
  130. for chunk in data_chunks:
  131. f.write(chunk)
  132. f.flush()
  133. tmppath = f.name
  134. # Handle concurrent writes - if file already exists, just remove temp file
  135. if os.path.exists(path):
  136. os.remove(tmppath)
  137. else:
  138. os.rename(tmppath, path)
  139. return sha_hex
  140. class LFSPointer:
  141. """Represents an LFS pointer file."""
  142. def __init__(self, oid: str, size: int) -> None:
  143. """Initialize LFSPointer."""
  144. self.oid = oid
  145. self.size = size
  146. @classmethod
  147. def from_bytes(cls, data: bytes) -> Optional["LFSPointer"]:
  148. """Parse LFS pointer from bytes.
  149. Returns None if data is not a valid LFS pointer.
  150. """
  151. try:
  152. text = data.decode("utf-8")
  153. except UnicodeDecodeError:
  154. return None
  155. # LFS pointer files have a specific format
  156. lines = text.strip().split("\n")
  157. if len(lines) < 3:
  158. return None
  159. # Must start with version
  160. if not lines[0].startswith("version https://git-lfs.github.com/spec/v1"):
  161. return None
  162. oid = None
  163. size = None
  164. for line in lines[1:]:
  165. if line.startswith("oid sha256:"):
  166. oid = line[11:].strip()
  167. elif line.startswith("size "):
  168. try:
  169. size = int(line[5:].strip())
  170. # Size must be non-negative
  171. if size < 0:
  172. return None
  173. except ValueError:
  174. return None
  175. if oid is None or size is None:
  176. return None
  177. return cls(oid, size)
  178. def to_bytes(self) -> bytes:
  179. """Convert LFS pointer to bytes."""
  180. return (
  181. f"version https://git-lfs.github.com/spec/v1\n"
  182. f"oid sha256:{self.oid}\n"
  183. f"size {self.size}\n"
  184. ).encode()
  185. def is_valid_oid(self) -> bool:
  186. """Check if the OID is valid SHA256."""
  187. if len(self.oid) != 64:
  188. return False
  189. try:
  190. int(self.oid, 16)
  191. return True
  192. except ValueError:
  193. return False
  194. class LFSFilterDriver:
  195. """LFS filter driver implementation."""
  196. def __init__(
  197. self, lfs_store: "LFSStore", config: Optional["Config"] = None
  198. ) -> None:
  199. """Initialize LFSFilterDriver."""
  200. self.lfs_store = lfs_store
  201. self.config = config
  202. def clean(self, data: bytes) -> bytes:
  203. """Convert file content to LFS pointer (clean filter)."""
  204. # Check if data is already an LFS pointer
  205. pointer = LFSPointer.from_bytes(data)
  206. if pointer is not None:
  207. return data
  208. # Store the file content in LFS
  209. sha = self.lfs_store.write_object([data])
  210. # Create and return LFS pointer
  211. pointer = LFSPointer(sha, len(data))
  212. return pointer.to_bytes()
  213. def smudge(self, data: bytes, path: bytes = b"") -> bytes:
  214. """Convert LFS pointer to file content (smudge filter)."""
  215. # Try to parse as LFS pointer
  216. pointer = LFSPointer.from_bytes(data)
  217. if pointer is None:
  218. # Not an LFS pointer, return as-is
  219. return data
  220. # Validate the pointer
  221. if not pointer.is_valid_oid():
  222. return data
  223. try:
  224. # Read the actual content from LFS store
  225. with self.lfs_store.open_object(pointer.oid) as f:
  226. return f.read()
  227. except KeyError:
  228. # Object not found in LFS store, try to download it
  229. try:
  230. content = self._download_object(pointer)
  231. return content
  232. except LFSError as e:
  233. # Download failed, fall back to returning pointer
  234. logger.warning("LFS object download failed for %s: %s", pointer.oid, e)
  235. # Return pointer as-is when object is missing and download failed
  236. return data
  237. def _download_object(self, pointer: LFSPointer) -> bytes:
  238. """Download an LFS object from the server.
  239. Args:
  240. pointer: LFS pointer containing OID and size
  241. Returns:
  242. Downloaded content
  243. Raises:
  244. LFSError: If download fails for any reason
  245. """
  246. if self.config is None:
  247. raise LFSError("No configuration available for LFS download")
  248. # Create LFS client and download
  249. client = LFSClient.from_config(self.config)
  250. if client is None:
  251. raise LFSError("No LFS client available from configuration")
  252. content = client.download(pointer.oid, pointer.size)
  253. # Store the downloaded content in local LFS store
  254. stored_oid = self.lfs_store.write_object([content])
  255. # Verify the stored OID matches what we expected
  256. if stored_oid != pointer.oid:
  257. raise LFSError(
  258. f"Downloaded OID mismatch: expected {pointer.oid}, got {stored_oid}"
  259. )
  260. return content
  261. def cleanup(self) -> None:
  262. """Clean up any resources held by this filter driver."""
  263. # LFSFilterDriver doesn't hold any resources that need cleanup
  264. def reuse(self, config, filter_name: str) -> bool:
  265. """Check if this filter driver should be reused with the given configuration."""
  266. # LFSFilterDriver is stateless and lightweight, no need to cache
  267. return False
  268. def _get_lfs_user_agent(config: Optional["Config"]) -> str:
  269. """Get User-Agent string for LFS requests, respecting git config."""
  270. try:
  271. if config:
  272. # Use configured user agent verbatim if set
  273. return config.get(b"http", b"useragent").decode()
  274. except KeyError:
  275. pass
  276. # Default LFS user agent (similar to git-lfs format)
  277. from . import __version__
  278. version_str = ".".join([str(x) for x in __version__])
  279. return f"git-lfs/dulwich/{version_str}"
  280. class LFSClient:
  281. """LFS client for network operations."""
  282. def __init__(self, url: str, config: Optional["Config"] = None) -> None:
  283. """Initialize LFS client.
  284. Args:
  285. url: LFS server URL
  286. config: Optional git config for authentication/proxy settings
  287. """
  288. self._base_url = url.rstrip("/") + "/" # Ensure trailing slash for urljoin
  289. self.config = config
  290. self._pool_manager: Optional[urllib3.PoolManager] = None
  291. @classmethod
  292. def from_config(cls, config: "Config") -> Optional["LFSClient"]:
  293. """Create LFS client from git config."""
  294. # Try to get LFS URL from config first
  295. try:
  296. url = config.get((b"lfs",), b"url").decode()
  297. except KeyError:
  298. pass
  299. else:
  300. return cls(url, config)
  301. # Fall back to deriving from remote URL (same as git-lfs)
  302. try:
  303. remote_url = config.get((b"remote", b"origin"), b"url").decode()
  304. except KeyError:
  305. pass
  306. else:
  307. # Convert SSH URLs to HTTPS if needed
  308. if remote_url.startswith("git@"):
  309. # Convert git@host:user/repo.git to https://host/user/repo.git
  310. if ":" in remote_url and "/" in remote_url:
  311. host_and_path = remote_url[4:] # Remove "git@"
  312. if ":" in host_and_path:
  313. host, path = host_and_path.split(":", 1)
  314. remote_url = f"https://{host}/{path}"
  315. # Ensure URL ends with .git for consistent LFS endpoint
  316. if not remote_url.endswith(".git"):
  317. remote_url = f"{remote_url}.git"
  318. # Standard LFS endpoint is remote_url + "/info/lfs"
  319. lfs_url = f"{remote_url}/info/lfs"
  320. parsed = urlparse(lfs_url)
  321. if not parsed.scheme or not parsed.netloc:
  322. return None
  323. return LFSClient(lfs_url, config)
  324. return None
  325. @property
  326. def url(self) -> str:
  327. """Get the LFS server URL without trailing slash."""
  328. return self._base_url.rstrip("/")
  329. def _get_pool_manager(self) -> "urllib3.PoolManager":
  330. """Get urllib3 pool manager with git config applied."""
  331. if self._pool_manager is None:
  332. from dulwich.client import default_urllib3_manager
  333. self._pool_manager = default_urllib3_manager(self.config) # type: ignore[assignment]
  334. return self._pool_manager
  335. def _make_request(
  336. self,
  337. method: str,
  338. path: str,
  339. data: Optional[bytes] = None,
  340. headers: Optional[dict[str, str]] = None,
  341. ) -> bytes:
  342. """Make an HTTP request to the LFS server."""
  343. url = urljoin(self._base_url, path)
  344. req_headers = {
  345. "Accept": "application/vnd.git-lfs+json",
  346. "Content-Type": "application/vnd.git-lfs+json",
  347. "User-Agent": _get_lfs_user_agent(self.config),
  348. }
  349. if headers:
  350. req_headers.update(headers)
  351. # Use urllib3 pool manager with git config applied
  352. pool_manager = self._get_pool_manager()
  353. response = pool_manager.request(method, url, headers=req_headers, body=data)
  354. if response.status >= 400:
  355. raise ValueError(
  356. f"HTTP {response.status}: {response.data.decode('utf-8', errors='ignore')}"
  357. )
  358. return response.data # type: ignore[return-value]
  359. def batch(
  360. self,
  361. operation: str,
  362. objects: list[dict[str, Union[str, int]]],
  363. ref: Optional[str] = None,
  364. ) -> LFSBatchResponse:
  365. """Perform batch operation to get transfer URLs.
  366. Args:
  367. operation: "download" or "upload"
  368. objects: List of {"oid": str, "size": int} dicts
  369. ref: Optional ref name
  370. Returns:
  371. Batch response from server
  372. """
  373. data: dict[
  374. str, Union[str, list[str], list[dict[str, Union[str, int]]], dict[str, str]]
  375. ] = {
  376. "operation": operation,
  377. "transfers": ["basic"],
  378. "objects": objects,
  379. }
  380. if ref:
  381. data["ref"] = {"name": ref}
  382. response = self._make_request(
  383. "POST", "objects/batch", json.dumps(data).encode("utf-8")
  384. )
  385. if not response:
  386. raise ValueError("Empty response from LFS server")
  387. response_data = json.loads(response)
  388. return self._parse_batch_response(response_data)
  389. def _parse_batch_response(self, data: dict) -> LFSBatchResponse:
  390. """Parse JSON response into LFSBatchResponse dataclass."""
  391. objects = []
  392. for obj_data in data.get("objects", []):
  393. actions = None
  394. if "actions" in obj_data:
  395. actions = {}
  396. for action_name, action_data in obj_data["actions"].items():
  397. actions[action_name] = LFSAction(
  398. href=action_data["href"],
  399. header=action_data.get("header"),
  400. expires_at=action_data.get("expires_at"),
  401. )
  402. error = None
  403. if "error" in obj_data:
  404. error = LFSErrorInfo(
  405. code=obj_data["error"]["code"], message=obj_data["error"]["message"]
  406. )
  407. batch_obj = LFSBatchObject(
  408. oid=obj_data["oid"],
  409. size=obj_data["size"],
  410. authenticated=obj_data.get("authenticated"),
  411. actions=actions,
  412. error=error,
  413. )
  414. objects.append(batch_obj)
  415. return LFSBatchResponse(
  416. transfer=data.get("transfer", "basic"),
  417. objects=objects,
  418. hash_algo=data.get("hash_algo"),
  419. )
  420. def download(self, oid: str, size: int, ref: Optional[str] = None) -> bytes:
  421. """Download an LFS object.
  422. Args:
  423. oid: Object ID (SHA256)
  424. size: Expected size
  425. ref: Optional ref name
  426. Returns:
  427. Object content
  428. """
  429. # Get download URL via batch API
  430. batch_resp = self.batch("download", [{"oid": oid, "size": size}], ref)
  431. if not batch_resp.objects:
  432. raise LFSError(f"No objects returned for {oid}")
  433. obj = batch_resp.objects[0]
  434. if obj.error:
  435. raise LFSError(f"Server error for {oid}: {obj.error.message}")
  436. if not obj.actions or "download" not in obj.actions:
  437. raise LFSError(f"No download actions for {oid}")
  438. download_action = obj.actions["download"]
  439. download_url = download_action.href
  440. # Download the object using urllib3 with git config
  441. download_headers = {"User-Agent": _get_lfs_user_agent(self.config)}
  442. if download_action.header:
  443. download_headers.update(download_action.header)
  444. pool_manager = self._get_pool_manager()
  445. response = pool_manager.request("GET", download_url, headers=download_headers)
  446. content = response.data
  447. # Verify size
  448. if len(content) != size:
  449. raise LFSError(f"Downloaded size {len(content)} != expected {size}")
  450. # Verify SHA256
  451. actual_oid = hashlib.sha256(content).hexdigest()
  452. if actual_oid != oid:
  453. raise LFSError(f"Downloaded OID {actual_oid} != expected {oid}")
  454. return content # type: ignore[return-value]
  455. def upload(
  456. self, oid: str, size: int, content: bytes, ref: Optional[str] = None
  457. ) -> None:
  458. """Upload an LFS object.
  459. Args:
  460. oid: Object ID (SHA256)
  461. size: Object size
  462. content: Object content
  463. ref: Optional ref name
  464. """
  465. # Get upload URL via batch API
  466. batch_resp = self.batch("upload", [{"oid": oid, "size": size}], ref)
  467. if not batch_resp.objects:
  468. raise LFSError(f"No objects returned for {oid}")
  469. obj = batch_resp.objects[0]
  470. if obj.error:
  471. raise LFSError(f"Server error for {oid}: {obj.error.message}")
  472. # If no actions, object already exists
  473. if not obj.actions:
  474. return
  475. if "upload" not in obj.actions:
  476. raise LFSError(f"No upload action for {oid}")
  477. upload_action = obj.actions["upload"]
  478. upload_url = upload_action.href
  479. # Upload the object
  480. req = Request(upload_url, data=content, method="PUT")
  481. if upload_action.header:
  482. for name, value in upload_action.header.items():
  483. req.add_header(name, value)
  484. with urlopen(req) as response:
  485. if response.status >= 400:
  486. raise LFSError(f"Upload failed with status {response.status}")
  487. # Verify if needed
  488. if obj.actions and "verify" in obj.actions:
  489. verify_action = obj.actions["verify"]
  490. verify_data = json.dumps({"oid": oid, "size": size}).encode("utf-8")
  491. req = Request(verify_action.href, data=verify_data, method="POST")
  492. req.add_header("Content-Type", "application/vnd.git-lfs+json")
  493. if verify_action.header:
  494. for name, value in verify_action.header.items():
  495. req.add_header(name, value)
  496. with urlopen(req) as response:
  497. if response.status >= 400:
  498. raise LFSError(f"Verification failed with status {response.status}")
  499. class LFSError(Exception):
  500. """LFS-specific error."""