lfs.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. # lfs.py -- Implementation of the LFS
  2. # Copyright (C) 2020 Jelmer Vernooij
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. import hashlib
  22. import json
  23. import os
  24. import tempfile
  25. from collections.abc import Iterable
  26. from dataclasses import dataclass
  27. from typing import TYPE_CHECKING, BinaryIO, Optional, Union
  28. from urllib.error import HTTPError
  29. from urllib.parse import urljoin
  30. from urllib.request import Request, urlopen
  31. if TYPE_CHECKING:
  32. from .repo import Repo
  33. @dataclass
  34. class LFSAction:
  35. """LFS action structure."""
  36. href: str
  37. header: Optional[dict[str, str]] = None
  38. expires_at: Optional[str] = None
  39. @dataclass
  40. class LFSErrorInfo:
  41. """LFS error structure."""
  42. code: int
  43. message: str
  44. @dataclass
  45. class LFSBatchObject:
  46. """LFS batch object structure."""
  47. oid: str
  48. size: int
  49. authenticated: Optional[bool] = None
  50. actions: Optional[dict[str, LFSAction]] = None
  51. error: Optional[LFSErrorInfo] = None
  52. @dataclass
  53. class LFSBatchResponse:
  54. """LFS batch response structure."""
  55. transfer: str
  56. objects: list[LFSBatchObject]
  57. hash_algo: Optional[str] = None
  58. class LFSStore:
  59. """Stores objects on disk, indexed by SHA256."""
  60. def __init__(self, path: str) -> None:
  61. self.path = path
  62. @classmethod
  63. def create(cls, lfs_dir: str) -> "LFSStore":
  64. if not os.path.isdir(lfs_dir):
  65. os.mkdir(lfs_dir)
  66. tmp_dir = os.path.join(lfs_dir, "tmp")
  67. if not os.path.isdir(tmp_dir):
  68. os.mkdir(tmp_dir)
  69. objects_dir = os.path.join(lfs_dir, "objects")
  70. if not os.path.isdir(objects_dir):
  71. os.mkdir(objects_dir)
  72. return cls(lfs_dir)
  73. @classmethod
  74. def from_repo(cls, repo: "Repo", create: bool = False) -> "LFSStore":
  75. lfs_dir = os.path.join(repo.controldir(), "lfs")
  76. if create:
  77. return cls.create(lfs_dir)
  78. return cls(lfs_dir)
  79. def _sha_path(self, sha: str) -> str:
  80. return os.path.join(self.path, "objects", sha[0:2], sha[2:4], sha)
  81. def open_object(self, sha: str) -> BinaryIO:
  82. """Open an object by sha."""
  83. try:
  84. return open(self._sha_path(sha), "rb")
  85. except FileNotFoundError as exc:
  86. raise KeyError(sha) from exc
  87. def write_object(self, chunks: Iterable[bytes]) -> str:
  88. """Write an object.
  89. Returns: object SHA
  90. """
  91. sha = hashlib.sha256()
  92. tmpdir = os.path.join(self.path, "tmp")
  93. with tempfile.NamedTemporaryFile(dir=tmpdir, mode="wb", delete=False) as f:
  94. for chunk in chunks:
  95. sha.update(chunk)
  96. f.write(chunk)
  97. f.flush()
  98. tmppath = f.name
  99. path = self._sha_path(sha.hexdigest())
  100. if not os.path.exists(os.path.dirname(path)):
  101. os.makedirs(os.path.dirname(path))
  102. # Handle concurrent writes - if file already exists, just remove temp file
  103. if os.path.exists(path):
  104. os.remove(tmppath)
  105. else:
  106. os.rename(tmppath, path)
  107. return sha.hexdigest()
  108. class LFSPointer:
  109. """Represents an LFS pointer file."""
  110. def __init__(self, oid: str, size: int) -> None:
  111. self.oid = oid
  112. self.size = size
  113. @classmethod
  114. def from_bytes(cls, data: bytes) -> Optional["LFSPointer"]:
  115. """Parse LFS pointer from bytes.
  116. Returns None if data is not a valid LFS pointer.
  117. """
  118. try:
  119. text = data.decode("utf-8")
  120. except UnicodeDecodeError:
  121. return None
  122. # LFS pointer files have a specific format
  123. lines = text.strip().split("\n")
  124. if len(lines) < 3:
  125. return None
  126. # Must start with version
  127. if not lines[0].startswith("version https://git-lfs.github.com/spec/v1"):
  128. return None
  129. oid = None
  130. size = None
  131. for line in lines[1:]:
  132. if line.startswith("oid sha256:"):
  133. oid = line[11:].strip()
  134. elif line.startswith("size "):
  135. try:
  136. size = int(line[5:].strip())
  137. # Size must be non-negative
  138. if size < 0:
  139. return None
  140. except ValueError:
  141. return None
  142. if oid is None or size is None:
  143. return None
  144. return cls(oid, size)
  145. def to_bytes(self) -> bytes:
  146. """Convert LFS pointer to bytes."""
  147. return (
  148. f"version https://git-lfs.github.com/spec/v1\n"
  149. f"oid sha256:{self.oid}\n"
  150. f"size {self.size}\n"
  151. ).encode()
  152. def is_valid_oid(self) -> bool:
  153. """Check if the OID is valid SHA256."""
  154. if len(self.oid) != 64:
  155. return False
  156. try:
  157. int(self.oid, 16)
  158. return True
  159. except ValueError:
  160. return False
  161. class LFSFilterDriver:
  162. """LFS filter driver implementation."""
  163. def __init__(self, lfs_store: "LFSStore") -> None:
  164. self.lfs_store = lfs_store
  165. def clean(self, data: bytes) -> bytes:
  166. """Convert file content to LFS pointer (clean filter)."""
  167. # Check if data is already an LFS pointer
  168. pointer = LFSPointer.from_bytes(data)
  169. if pointer is not None:
  170. return data
  171. # Store the file content in LFS
  172. sha = self.lfs_store.write_object([data])
  173. # Create and return LFS pointer
  174. pointer = LFSPointer(sha, len(data))
  175. return pointer.to_bytes()
  176. def smudge(self, data: bytes) -> bytes:
  177. """Convert LFS pointer to file content (smudge filter)."""
  178. # Try to parse as LFS pointer
  179. pointer = LFSPointer.from_bytes(data)
  180. if pointer is None:
  181. # Not an LFS pointer, return as-is
  182. return data
  183. # Validate the pointer
  184. if not pointer.is_valid_oid():
  185. return data
  186. try:
  187. # Read the actual content from LFS store
  188. with self.lfs_store.open_object(pointer.oid) as f:
  189. return f.read()
  190. except KeyError:
  191. # Object not found in LFS store, return pointer as-is
  192. # This matches Git LFS behavior when object is missing
  193. return data
  194. class LFSClient:
  195. """LFS client for network operations."""
  196. def __init__(self, url: str, auth: Optional[tuple[str, str]] = None) -> None:
  197. """Initialize LFS client.
  198. Args:
  199. url: LFS server URL
  200. auth: Optional (username, password) tuple for authentication
  201. """
  202. self.url = url.rstrip("/")
  203. self.auth = auth
  204. def _make_request(
  205. self,
  206. method: str,
  207. path: str,
  208. data: Optional[bytes] = None,
  209. headers: Optional[dict[str, str]] = None,
  210. ) -> bytes:
  211. """Make an HTTP request to the LFS server."""
  212. url = urljoin(self.url, path)
  213. req_headers = {
  214. "Accept": "application/vnd.git-lfs+json",
  215. "Content-Type": "application/vnd.git-lfs+json",
  216. }
  217. if headers:
  218. req_headers.update(headers)
  219. req = Request(url, data=data, headers=req_headers, method=method)
  220. if self.auth:
  221. import base64
  222. auth_str = f"{self.auth[0]}:{self.auth[1]}"
  223. b64_auth = base64.b64encode(auth_str.encode()).decode("ascii")
  224. req.add_header("Authorization", f"Basic {b64_auth}")
  225. try:
  226. with urlopen(req) as response:
  227. return response.read()
  228. except HTTPError as e:
  229. error_body = e.read().decode("utf-8", errors="ignore")
  230. raise LFSError(f"LFS server error {e.code}: {error_body}")
  231. def batch(
  232. self,
  233. operation: str,
  234. objects: list[dict[str, Union[str, int]]],
  235. ref: Optional[str] = None,
  236. ) -> LFSBatchResponse:
  237. """Perform batch operation to get transfer URLs.
  238. Args:
  239. operation: "download" or "upload"
  240. objects: List of {"oid": str, "size": int} dicts
  241. ref: Optional ref name
  242. Returns:
  243. Batch response from server
  244. """
  245. data: dict[
  246. str, Union[str, list[str], list[dict[str, Union[str, int]]], dict[str, str]]
  247. ] = {
  248. "operation": operation,
  249. "transfers": ["basic"],
  250. "objects": objects,
  251. }
  252. if ref:
  253. data["ref"] = {"name": ref}
  254. response = self._make_request(
  255. "POST", "/objects/batch", json.dumps(data).encode("utf-8")
  256. )
  257. response_data = json.loads(response)
  258. return self._parse_batch_response(response_data)
  259. def _parse_batch_response(self, data: dict) -> LFSBatchResponse:
  260. """Parse JSON response into LFSBatchResponse dataclass."""
  261. objects = []
  262. for obj_data in data.get("objects", []):
  263. actions = None
  264. if "actions" in obj_data:
  265. actions = {}
  266. for action_name, action_data in obj_data["actions"].items():
  267. actions[action_name] = LFSAction(
  268. href=action_data["href"],
  269. header=action_data.get("header"),
  270. expires_at=action_data.get("expires_at"),
  271. )
  272. error = None
  273. if "error" in obj_data:
  274. error = LFSErrorInfo(
  275. code=obj_data["error"]["code"], message=obj_data["error"]["message"]
  276. )
  277. batch_obj = LFSBatchObject(
  278. oid=obj_data["oid"],
  279. size=obj_data["size"],
  280. authenticated=obj_data.get("authenticated"),
  281. actions=actions,
  282. error=error,
  283. )
  284. objects.append(batch_obj)
  285. return LFSBatchResponse(
  286. transfer=data.get("transfer", "basic"),
  287. objects=objects,
  288. hash_algo=data.get("hash_algo"),
  289. )
  290. def download(self, oid: str, size: int, ref: Optional[str] = None) -> bytes:
  291. """Download an LFS object.
  292. Args:
  293. oid: Object ID (SHA256)
  294. size: Expected size
  295. ref: Optional ref name
  296. Returns:
  297. Object content
  298. """
  299. # Get download URL via batch API
  300. batch_resp = self.batch("download", [{"oid": oid, "size": size}], ref)
  301. if not batch_resp.objects:
  302. raise LFSError(f"No objects returned for {oid}")
  303. obj = batch_resp.objects[0]
  304. if obj.error:
  305. raise LFSError(f"Server error for {oid}: {obj.error.message}")
  306. if not obj.actions or "download" not in obj.actions:
  307. raise LFSError(f"No download actions for {oid}")
  308. download_action = obj.actions["download"]
  309. download_url = download_action.href
  310. # Download the object
  311. req = Request(download_url)
  312. if download_action.header:
  313. for name, value in download_action.header.items():
  314. req.add_header(name, value)
  315. with urlopen(req) as response:
  316. content = response.read()
  317. # Verify size
  318. if len(content) != size:
  319. raise LFSError(f"Downloaded size {len(content)} != expected {size}")
  320. # Verify SHA256
  321. actual_oid = hashlib.sha256(content).hexdigest()
  322. if actual_oid != oid:
  323. raise LFSError(f"Downloaded OID {actual_oid} != expected {oid}")
  324. return content
  325. def upload(
  326. self, oid: str, size: int, content: bytes, ref: Optional[str] = None
  327. ) -> None:
  328. """Upload an LFS object.
  329. Args:
  330. oid: Object ID (SHA256)
  331. size: Object size
  332. content: Object content
  333. ref: Optional ref name
  334. """
  335. # Get upload URL via batch API
  336. batch_resp = self.batch("upload", [{"oid": oid, "size": size}], ref)
  337. if not batch_resp.objects:
  338. raise LFSError(f"No objects returned for {oid}")
  339. obj = batch_resp.objects[0]
  340. if obj.error:
  341. raise LFSError(f"Server error for {oid}: {obj.error.message}")
  342. # If no actions, object already exists
  343. if not obj.actions:
  344. return
  345. if "upload" not in obj.actions:
  346. raise LFSError(f"No upload action for {oid}")
  347. upload_action = obj.actions["upload"]
  348. upload_url = upload_action.href
  349. # Upload the object
  350. req = Request(upload_url, data=content, method="PUT")
  351. if upload_action.header:
  352. for name, value in upload_action.header.items():
  353. req.add_header(name, value)
  354. with urlopen(req) as response:
  355. if response.status >= 400:
  356. raise LFSError(f"Upload failed with status {response.status}")
  357. # Verify if needed
  358. if obj.actions and "verify" in obj.actions:
  359. verify_action = obj.actions["verify"]
  360. verify_data = json.dumps({"oid": oid, "size": size}).encode("utf-8")
  361. req = Request(verify_action.href, data=verify_data, method="POST")
  362. req.add_header("Content-Type", "application/vnd.git-lfs+json")
  363. if verify_action.header:
  364. for name, value in verify_action.header.items():
  365. req.add_header(name, value)
  366. with urlopen(req) as response:
  367. if response.status >= 400:
  368. raise LFSError(f"Verification failed with status {response.status}")
  369. class LFSError(Exception):
  370. """LFS-specific error."""