2
0

lfs.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. # lfs.py -- Implementation of the LFS
  2. # Copyright (C) 2020 Jelmer Vernooij
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. import hashlib
  22. import json
  23. import logging
  24. import os
  25. import tempfile
  26. from collections.abc import Iterable
  27. from dataclasses import dataclass
  28. from typing import TYPE_CHECKING, BinaryIO, Optional, Union
  29. from urllib.parse import urljoin, urlparse
  30. from urllib.request import Request, urlopen
  31. if TYPE_CHECKING:
  32. from .config import Config
  33. from .repo import Repo
  34. @dataclass
  35. class LFSAction:
  36. """LFS action structure."""
  37. href: str
  38. header: Optional[dict[str, str]] = None
  39. expires_at: Optional[str] = None
  40. @dataclass
  41. class LFSErrorInfo:
  42. """LFS error structure."""
  43. code: int
  44. message: str
  45. @dataclass
  46. class LFSBatchObject:
  47. """LFS batch object structure."""
  48. oid: str
  49. size: int
  50. authenticated: Optional[bool] = None
  51. actions: Optional[dict[str, LFSAction]] = None
  52. error: Optional[LFSErrorInfo] = None
  53. @dataclass
  54. class LFSBatchResponse:
  55. """LFS batch response structure."""
  56. transfer: str
  57. objects: list[LFSBatchObject]
  58. hash_algo: Optional[str] = None
  59. class LFSStore:
  60. """Stores objects on disk, indexed by SHA256."""
  61. def __init__(self, path: str) -> None:
  62. self.path = path
  63. @classmethod
  64. def create(cls, lfs_dir: str) -> "LFSStore":
  65. if not os.path.isdir(lfs_dir):
  66. os.mkdir(lfs_dir)
  67. tmp_dir = os.path.join(lfs_dir, "tmp")
  68. if not os.path.isdir(tmp_dir):
  69. os.mkdir(tmp_dir)
  70. objects_dir = os.path.join(lfs_dir, "objects")
  71. if not os.path.isdir(objects_dir):
  72. os.mkdir(objects_dir)
  73. return cls(lfs_dir)
  74. @classmethod
  75. def from_repo(cls, repo: "Repo", create: bool = False) -> "LFSStore":
  76. lfs_dir = os.path.join(repo.controldir(), "lfs")
  77. if create:
  78. return cls.create(lfs_dir)
  79. return cls(lfs_dir)
  80. @classmethod
  81. def from_controldir(cls, controldir: str, create: bool = False) -> "LFSStore":
  82. lfs_dir = os.path.join(controldir, "lfs")
  83. if create:
  84. return cls.create(lfs_dir)
  85. return cls(lfs_dir)
  86. def _sha_path(self, sha: str) -> str:
  87. return os.path.join(self.path, "objects", sha[0:2], sha[2:4], sha)
  88. def open_object(self, sha: str) -> BinaryIO:
  89. """Open an object by sha."""
  90. try:
  91. return open(self._sha_path(sha), "rb")
  92. except FileNotFoundError as exc:
  93. raise KeyError(sha) from exc
  94. def write_object(self, chunks: Iterable[bytes]) -> str:
  95. """Write an object.
  96. Returns: object SHA
  97. """
  98. sha = hashlib.sha256()
  99. tmpdir = os.path.join(self.path, "tmp")
  100. with tempfile.NamedTemporaryFile(dir=tmpdir, mode="wb", delete=False) as f:
  101. for chunk in chunks:
  102. sha.update(chunk)
  103. f.write(chunk)
  104. f.flush()
  105. tmppath = f.name
  106. path = self._sha_path(sha.hexdigest())
  107. if not os.path.exists(os.path.dirname(path)):
  108. os.makedirs(os.path.dirname(path))
  109. # Handle concurrent writes - if file already exists, just remove temp file
  110. if os.path.exists(path):
  111. os.remove(tmppath)
  112. else:
  113. os.rename(tmppath, path)
  114. return sha.hexdigest()
  115. class LFSPointer:
  116. """Represents an LFS pointer file."""
  117. def __init__(self, oid: str, size: int) -> None:
  118. self.oid = oid
  119. self.size = size
  120. @classmethod
  121. def from_bytes(cls, data: bytes) -> Optional["LFSPointer"]:
  122. """Parse LFS pointer from bytes.
  123. Returns None if data is not a valid LFS pointer.
  124. """
  125. try:
  126. text = data.decode("utf-8")
  127. except UnicodeDecodeError:
  128. return None
  129. # LFS pointer files have a specific format
  130. lines = text.strip().split("\n")
  131. if len(lines) < 3:
  132. return None
  133. # Must start with version
  134. if not lines[0].startswith("version https://git-lfs.github.com/spec/v1"):
  135. return None
  136. oid = None
  137. size = None
  138. for line in lines[1:]:
  139. if line.startswith("oid sha256:"):
  140. oid = line[11:].strip()
  141. elif line.startswith("size "):
  142. try:
  143. size = int(line[5:].strip())
  144. # Size must be non-negative
  145. if size < 0:
  146. return None
  147. except ValueError:
  148. return None
  149. if oid is None or size is None:
  150. return None
  151. return cls(oid, size)
  152. def to_bytes(self) -> bytes:
  153. """Convert LFS pointer to bytes."""
  154. return (
  155. f"version https://git-lfs.github.com/spec/v1\n"
  156. f"oid sha256:{self.oid}\n"
  157. f"size {self.size}\n"
  158. ).encode()
  159. def is_valid_oid(self) -> bool:
  160. """Check if the OID is valid SHA256."""
  161. if len(self.oid) != 64:
  162. return False
  163. try:
  164. int(self.oid, 16)
  165. return True
  166. except ValueError:
  167. return False
  168. class LFSFilterDriver:
  169. """LFS filter driver implementation."""
  170. def __init__(
  171. self, lfs_store: "LFSStore", config: Optional["Config"] = None
  172. ) -> None:
  173. self.lfs_store = lfs_store
  174. self.config = config
  175. def clean(self, data: bytes) -> bytes:
  176. """Convert file content to LFS pointer (clean filter)."""
  177. # Check if data is already an LFS pointer
  178. pointer = LFSPointer.from_bytes(data)
  179. if pointer is not None:
  180. return data
  181. # Store the file content in LFS
  182. sha = self.lfs_store.write_object([data])
  183. # Create and return LFS pointer
  184. pointer = LFSPointer(sha, len(data))
  185. return pointer.to_bytes()
  186. def smudge(self, data: bytes, path: bytes = b"") -> bytes:
  187. """Convert LFS pointer to file content (smudge filter)."""
  188. # Try to parse as LFS pointer
  189. pointer = LFSPointer.from_bytes(data)
  190. if pointer is None:
  191. # Not an LFS pointer, return as-is
  192. return data
  193. # Validate the pointer
  194. if not pointer.is_valid_oid():
  195. return data
  196. try:
  197. # Read the actual content from LFS store
  198. with self.lfs_store.open_object(pointer.oid) as f:
  199. return f.read()
  200. except KeyError:
  201. # Object not found in LFS store, try to download it
  202. try:
  203. content = self._download_object(pointer)
  204. return content
  205. except LFSError as e:
  206. # Download failed, fall back to returning pointer
  207. logging.warning("LFS object download failed for %s: %s", pointer.oid, e)
  208. # Return pointer as-is when object is missing and download failed
  209. return data
  210. def _download_object(self, pointer: LFSPointer) -> bytes:
  211. """Download an LFS object from the server.
  212. Args:
  213. pointer: LFS pointer containing OID and size
  214. Returns:
  215. Downloaded content
  216. Raises:
  217. LFSError: If download fails for any reason
  218. """
  219. if self.config is None:
  220. raise LFSError("No configuration available for LFS download")
  221. # Create LFS client and download
  222. client = LFSClient.from_config(self.config)
  223. if client is None:
  224. raise LFSError("No LFS client available from configuration")
  225. content = client.download(pointer.oid, pointer.size)
  226. # Store the downloaded content in local LFS store
  227. stored_oid = self.lfs_store.write_object([content])
  228. # Verify the stored OID matches what we expected
  229. if stored_oid != pointer.oid:
  230. raise LFSError(
  231. f"Downloaded OID mismatch: expected {pointer.oid}, got {stored_oid}"
  232. )
  233. return content
  234. def _get_lfs_user_agent(config):
  235. """Get User-Agent string for LFS requests, respecting git config."""
  236. try:
  237. if config:
  238. # Use configured user agent verbatim if set
  239. return config.get(b"http", b"useragent").decode()
  240. except KeyError:
  241. pass
  242. # Default LFS user agent (similar to git-lfs format)
  243. from . import __version__
  244. version_str = ".".join([str(x) for x in __version__])
  245. return f"git-lfs/dulwich/{version_str}"
  246. class LFSClient:
  247. """LFS client for network operations."""
  248. def __init__(self, url: str, config: Optional["Config"] = None) -> None:
  249. """Initialize LFS client.
  250. Args:
  251. url: LFS server URL
  252. config: Optional git config for authentication/proxy settings
  253. """
  254. self._base_url = url.rstrip("/") + "/" # Ensure trailing slash for urljoin
  255. self.config = config
  256. self._pool_manager = None
  257. @classmethod
  258. def from_config(cls, config: "Config") -> Optional["LFSClient"]:
  259. """Create LFS client from git config."""
  260. # Try to get LFS URL from config first
  261. try:
  262. url = config.get((b"lfs",), b"url").decode()
  263. except KeyError:
  264. pass
  265. else:
  266. return cls(url, config)
  267. # Fall back to deriving from remote URL (same as git-lfs)
  268. try:
  269. remote_url = config.get((b"remote", b"origin"), b"url").decode()
  270. except KeyError:
  271. pass
  272. else:
  273. # Convert SSH URLs to HTTPS if needed
  274. if remote_url.startswith("git@"):
  275. # Convert git@host:user/repo.git to https://host/user/repo.git
  276. if ":" in remote_url and "/" in remote_url:
  277. host_and_path = remote_url[4:] # Remove "git@"
  278. if ":" in host_and_path:
  279. host, path = host_and_path.split(":", 1)
  280. remote_url = f"https://{host}/{path}"
  281. # Ensure URL ends with .git for consistent LFS endpoint
  282. if not remote_url.endswith(".git"):
  283. remote_url = f"{remote_url}.git"
  284. # Standard LFS endpoint is remote_url + "/info/lfs"
  285. lfs_url = f"{remote_url}/info/lfs"
  286. parsed = urlparse(lfs_url)
  287. if not parsed.scheme or not parsed.netloc:
  288. return None
  289. return LFSClient(lfs_url, config)
  290. return None
  291. @property
  292. def url(self) -> str:
  293. """Get the LFS server URL without trailing slash."""
  294. return self._base_url.rstrip("/")
  295. def _get_pool_manager(self):
  296. """Get urllib3 pool manager with git config applied."""
  297. if self._pool_manager is None:
  298. from dulwich.client import default_urllib3_manager
  299. self._pool_manager = default_urllib3_manager(self.config)
  300. return self._pool_manager
  301. def _make_request(
  302. self,
  303. method: str,
  304. path: str,
  305. data: Optional[bytes] = None,
  306. headers: Optional[dict[str, str]] = None,
  307. ) -> bytes:
  308. """Make an HTTP request to the LFS server."""
  309. url = urljoin(self._base_url, path)
  310. req_headers = {
  311. "Accept": "application/vnd.git-lfs+json",
  312. "Content-Type": "application/vnd.git-lfs+json",
  313. "User-Agent": _get_lfs_user_agent(self.config),
  314. }
  315. if headers:
  316. req_headers.update(headers)
  317. # Use urllib3 pool manager with git config applied
  318. pool_manager = self._get_pool_manager()
  319. response = pool_manager.request(method, url, headers=req_headers, body=data)
  320. if response.status >= 400:
  321. raise ValueError(
  322. f"HTTP {response.status}: {response.data.decode('utf-8', errors='ignore')}"
  323. )
  324. return response.data
  325. def batch(
  326. self,
  327. operation: str,
  328. objects: list[dict[str, Union[str, int]]],
  329. ref: Optional[str] = None,
  330. ) -> LFSBatchResponse:
  331. """Perform batch operation to get transfer URLs.
  332. Args:
  333. operation: "download" or "upload"
  334. objects: List of {"oid": str, "size": int} dicts
  335. ref: Optional ref name
  336. Returns:
  337. Batch response from server
  338. """
  339. data: dict[
  340. str, Union[str, list[str], list[dict[str, Union[str, int]]], dict[str, str]]
  341. ] = {
  342. "operation": operation,
  343. "transfers": ["basic"],
  344. "objects": objects,
  345. }
  346. if ref:
  347. data["ref"] = {"name": ref}
  348. response = self._make_request(
  349. "POST", "objects/batch", json.dumps(data).encode("utf-8")
  350. )
  351. if not response:
  352. raise ValueError("Empty response from LFS server")
  353. response_data = json.loads(response)
  354. return self._parse_batch_response(response_data)
  355. def _parse_batch_response(self, data: dict) -> LFSBatchResponse:
  356. """Parse JSON response into LFSBatchResponse dataclass."""
  357. objects = []
  358. for obj_data in data.get("objects", []):
  359. actions = None
  360. if "actions" in obj_data:
  361. actions = {}
  362. for action_name, action_data in obj_data["actions"].items():
  363. actions[action_name] = LFSAction(
  364. href=action_data["href"],
  365. header=action_data.get("header"),
  366. expires_at=action_data.get("expires_at"),
  367. )
  368. error = None
  369. if "error" in obj_data:
  370. error = LFSErrorInfo(
  371. code=obj_data["error"]["code"], message=obj_data["error"]["message"]
  372. )
  373. batch_obj = LFSBatchObject(
  374. oid=obj_data["oid"],
  375. size=obj_data["size"],
  376. authenticated=obj_data.get("authenticated"),
  377. actions=actions,
  378. error=error,
  379. )
  380. objects.append(batch_obj)
  381. return LFSBatchResponse(
  382. transfer=data.get("transfer", "basic"),
  383. objects=objects,
  384. hash_algo=data.get("hash_algo"),
  385. )
  386. def download(self, oid: str, size: int, ref: Optional[str] = None) -> bytes:
  387. """Download an LFS object.
  388. Args:
  389. oid: Object ID (SHA256)
  390. size: Expected size
  391. ref: Optional ref name
  392. Returns:
  393. Object content
  394. """
  395. # Get download URL via batch API
  396. batch_resp = self.batch("download", [{"oid": oid, "size": size}], ref)
  397. if not batch_resp.objects:
  398. raise LFSError(f"No objects returned for {oid}")
  399. obj = batch_resp.objects[0]
  400. if obj.error:
  401. raise LFSError(f"Server error for {oid}: {obj.error.message}")
  402. if not obj.actions or "download" not in obj.actions:
  403. raise LFSError(f"No download actions for {oid}")
  404. download_action = obj.actions["download"]
  405. download_url = download_action.href
  406. # Download the object using urllib3 with git config
  407. download_headers = {"User-Agent": _get_lfs_user_agent(self.config)}
  408. if download_action.header:
  409. download_headers.update(download_action.header)
  410. pool_manager = self._get_pool_manager()
  411. response = pool_manager.request("GET", download_url, headers=download_headers)
  412. content = response.data
  413. # Verify size
  414. if len(content) != size:
  415. raise LFSError(f"Downloaded size {len(content)} != expected {size}")
  416. # Verify SHA256
  417. actual_oid = hashlib.sha256(content).hexdigest()
  418. if actual_oid != oid:
  419. raise LFSError(f"Downloaded OID {actual_oid} != expected {oid}")
  420. return content
  421. def upload(
  422. self, oid: str, size: int, content: bytes, ref: Optional[str] = None
  423. ) -> None:
  424. """Upload an LFS object.
  425. Args:
  426. oid: Object ID (SHA256)
  427. size: Object size
  428. content: Object content
  429. ref: Optional ref name
  430. """
  431. # Get upload URL via batch API
  432. batch_resp = self.batch("upload", [{"oid": oid, "size": size}], ref)
  433. if not batch_resp.objects:
  434. raise LFSError(f"No objects returned for {oid}")
  435. obj = batch_resp.objects[0]
  436. if obj.error:
  437. raise LFSError(f"Server error for {oid}: {obj.error.message}")
  438. # If no actions, object already exists
  439. if not obj.actions:
  440. return
  441. if "upload" not in obj.actions:
  442. raise LFSError(f"No upload action for {oid}")
  443. upload_action = obj.actions["upload"]
  444. upload_url = upload_action.href
  445. # Upload the object
  446. req = Request(upload_url, data=content, method="PUT")
  447. if upload_action.header:
  448. for name, value in upload_action.header.items():
  449. req.add_header(name, value)
  450. with urlopen(req) as response:
  451. if response.status >= 400:
  452. raise LFSError(f"Upload failed with status {response.status}")
  453. # Verify if needed
  454. if obj.actions and "verify" in obj.actions:
  455. verify_action = obj.actions["verify"]
  456. verify_data = json.dumps({"oid": oid, "size": size}).encode("utf-8")
  457. req = Request(verify_action.href, data=verify_data, method="POST")
  458. req.add_header("Content-Type", "application/vnd.git-lfs+json")
  459. if verify_action.header:
  460. for name, value in verify_action.header.items():
  461. req.add_header(name, value)
  462. with urlopen(req) as response:
  463. if response.status >= 400:
  464. raise LFSError(f"Verification failed with status {response.status}")
  465. class LFSError(Exception):
  466. """LFS-specific error."""