lfs.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # lfs.py -- Implementation of the LFS
  2. # Copyright (C) 2020 Jelmer Vernooij
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. import hashlib
  22. import os
  23. import tempfile
  24. from collections.abc import Iterable
  25. from typing import TYPE_CHECKING, BinaryIO, Optional
  26. if TYPE_CHECKING:
  27. from .repo import Repo
  28. class LFSStore:
  29. """Stores objects on disk, indexed by SHA256."""
  30. def __init__(self, path: str) -> None:
  31. self.path = path
  32. @classmethod
  33. def create(cls, lfs_dir: str) -> "LFSStore":
  34. if not os.path.isdir(lfs_dir):
  35. os.mkdir(lfs_dir)
  36. os.mkdir(os.path.join(lfs_dir, "tmp"))
  37. os.mkdir(os.path.join(lfs_dir, "objects"))
  38. return cls(lfs_dir)
  39. @classmethod
  40. def from_repo(cls, repo: "Repo", create: bool = False) -> "LFSStore":
  41. lfs_dir = os.path.join(repo.controldir(), "lfs")
  42. if create:
  43. return cls.create(lfs_dir)
  44. return cls(lfs_dir)
  45. def _sha_path(self, sha: str) -> str:
  46. return os.path.join(self.path, "objects", sha[0:2], sha[2:4], sha)
  47. def open_object(self, sha: str) -> BinaryIO:
  48. """Open an object by sha."""
  49. try:
  50. return open(self._sha_path(sha), "rb")
  51. except FileNotFoundError as exc:
  52. raise KeyError(sha) from exc
  53. def write_object(self, chunks: Iterable[bytes]) -> str:
  54. """Write an object.
  55. Returns: object SHA
  56. """
  57. sha = hashlib.sha256()
  58. tmpdir = os.path.join(self.path, "tmp")
  59. with tempfile.NamedTemporaryFile(dir=tmpdir, mode="wb", delete=False) as f:
  60. for chunk in chunks:
  61. sha.update(chunk)
  62. f.write(chunk)
  63. f.flush()
  64. tmppath = f.name
  65. path = self._sha_path(sha.hexdigest())
  66. if not os.path.exists(os.path.dirname(path)):
  67. os.makedirs(os.path.dirname(path))
  68. os.rename(tmppath, path)
  69. return sha.hexdigest()
  70. class LFSPointer:
  71. """Represents an LFS pointer file."""
  72. def __init__(self, oid: str, size: int) -> None:
  73. self.oid = oid
  74. self.size = size
  75. @classmethod
  76. def from_bytes(cls, data: bytes) -> Optional["LFSPointer"]:
  77. """Parse LFS pointer from bytes.
  78. Returns None if data is not a valid LFS pointer.
  79. """
  80. try:
  81. text = data.decode("utf-8")
  82. except UnicodeDecodeError:
  83. return None
  84. # LFS pointer files have a specific format
  85. lines = text.strip().split("\n")
  86. if len(lines) < 3:
  87. return None
  88. # Must start with version
  89. if not lines[0].startswith("version https://git-lfs.github.com/spec/v1"):
  90. return None
  91. oid = None
  92. size = None
  93. for line in lines[1:]:
  94. if line.startswith("oid sha256:"):
  95. oid = line[11:].strip()
  96. elif line.startswith("size "):
  97. try:
  98. size = int(line[5:].strip())
  99. except ValueError:
  100. return None
  101. if oid is None or size is None:
  102. return None
  103. return cls(oid, size)
  104. def to_bytes(self) -> bytes:
  105. """Convert LFS pointer to bytes."""
  106. return (
  107. f"version https://git-lfs.github.com/spec/v1\n"
  108. f"oid sha256:{self.oid}\n"
  109. f"size {self.size}\n"
  110. ).encode()
  111. def is_valid_oid(self) -> bool:
  112. """Check if the OID is valid SHA256."""
  113. if len(self.oid) != 64:
  114. return False
  115. try:
  116. int(self.oid, 16)
  117. return True
  118. except ValueError:
  119. return False
  120. class LFSFilterDriver:
  121. """LFS filter driver implementation."""
  122. def __init__(self, lfs_store: "LFSStore") -> None:
  123. self.lfs_store = lfs_store
  124. def clean(self, data: bytes) -> bytes:
  125. """Convert file content to LFS pointer (clean filter)."""
  126. # Check if data is already an LFS pointer
  127. pointer = LFSPointer.from_bytes(data)
  128. if pointer is not None:
  129. return data
  130. # Store the file content in LFS
  131. sha = self.lfs_store.write_object([data])
  132. # Create and return LFS pointer
  133. pointer = LFSPointer(sha, len(data))
  134. return pointer.to_bytes()
  135. def smudge(self, data: bytes) -> bytes:
  136. """Convert LFS pointer to file content (smudge filter)."""
  137. # Try to parse as LFS pointer
  138. pointer = LFSPointer.from_bytes(data)
  139. if pointer is None:
  140. # Not an LFS pointer, return as-is
  141. return data
  142. # Validate the pointer
  143. if not pointer.is_valid_oid():
  144. return data
  145. try:
  146. # Read the actual content from LFS store
  147. with self.lfs_store.open_object(pointer.oid) as f:
  148. return f.read()
  149. except KeyError:
  150. # Object not found in LFS store, return pointer as-is
  151. # This matches Git LFS behavior when object is missing
  152. return data