Răsfoiți Sursa

Add LFS support implementation

Extends the basic LFS store with full Git LFS functionality:

- LFSPointer class for parsing and creating LFS pointer files
- LFSFilterDriver implementing clean/smudge filters for LFS
- Automatic LFS filter registration in FilterRegistry
- Support for the Git LFS pointer file format (version 1)
- Graceful handling of missing LFS objects
- Comprehensive tests for all LFS components
Jelmer Vernooij 1 lună în urmă
părinte
comite
8dccfdbd5f
4 a modificat fișierele cu 455 adăugiri și 2 ștergeri
  1. 19 0
      dulwich/filters.py
  2. 106 1
      dulwich/lfs.py
  3. 199 1
      tests/test_lfs.py
  4. 131 0
      tests/test_lfs_integration.py

+ 19 - 0
dulwich/filters.py

@@ -89,6 +89,9 @@ class FilterRegistry:
         self._drivers: dict[str, FilterDriver] = {}
         self._factories: dict[str, Callable[[FilterRegistry], FilterDriver]] = {}
 
+        # Register built-in filter factories
+        self.register_factory("lfs", self._create_lfs_filter)
+
     def register_factory(
         self, name: str, factory: Callable[["FilterRegistry"], FilterDriver]
     ) -> None:
@@ -153,6 +156,22 @@ class FilterRegistry:
 
         return None
 
+    def _create_lfs_filter(self, registry: "FilterRegistry") -> FilterDriver:
+        """Create LFS filter driver."""
+        from .lfs import LFSFilterDriver, LFSStore
+
+        # If we have a repo, use its LFS store
+        if registry.repo is not None:
+            lfs_store = LFSStore.from_repo(registry.repo, create=True)
+        else:
+            # Fall back to creating a temporary LFS store
+            import tempfile
+
+            lfs_dir = tempfile.mkdtemp(prefix="dulwich-lfs-")
+            lfs_store = LFSStore.create(lfs_dir)
+
+        return LFSFilterDriver(lfs_store)
+
 
 def get_filter_for_path(
     path: bytes,

+ 106 - 1
dulwich/lfs.py

@@ -23,7 +23,7 @@ import hashlib
 import os
 import tempfile
 from collections.abc import Iterable
-from typing import TYPE_CHECKING, BinaryIO
+from typing import TYPE_CHECKING, BinaryIO, Optional
 
 if TYPE_CHECKING:
     from .repo import Repo
@@ -78,3 +78,108 @@ class LFSStore:
             os.makedirs(os.path.dirname(path))
         os.rename(tmppath, path)
         return sha.hexdigest()
+
+
+class LFSPointer:
+    """Represents an LFS pointer file."""
+
+    def __init__(self, oid: str, size: int) -> None:
+        self.oid = oid
+        self.size = size
+
+    @classmethod
+    def from_bytes(cls, data: bytes) -> Optional["LFSPointer"]:
+        """Parse LFS pointer from bytes.
+
+        Returns None if data is not a valid LFS pointer.
+        """
+        try:
+            text = data.decode("utf-8")
+        except UnicodeDecodeError:
+            return None
+
+        # LFS pointer files have a specific format
+        lines = text.strip().split("\n")
+        if len(lines) < 3:
+            return None
+
+        # Must start with version
+        if not lines[0].startswith("version https://git-lfs.github.com/spec/v1"):
+            return None
+
+        oid = None
+        size = None
+
+        for line in lines[1:]:
+            if line.startswith("oid sha256:"):
+                oid = line[11:].strip()
+            elif line.startswith("size "):
+                try:
+                    size = int(line[5:].strip())
+                except ValueError:
+                    return None
+
+        if oid is None or size is None:
+            return None
+
+        return cls(oid, size)
+
+    def to_bytes(self) -> bytes:
+        """Convert LFS pointer to bytes."""
+        return (
+            f"version https://git-lfs.github.com/spec/v1\n"
+            f"oid sha256:{self.oid}\n"
+            f"size {self.size}\n"
+        ).encode()
+
+    def is_valid_oid(self) -> bool:
+        """Check if the OID is valid SHA256."""
+        if len(self.oid) != 64:
+            return False
+        try:
+            int(self.oid, 16)
+            return True
+        except ValueError:
+            return False
+
+
+class LFSFilterDriver:
+    """LFS filter driver implementation."""
+
+    def __init__(self, lfs_store: "LFSStore") -> None:
+        self.lfs_store = lfs_store
+
+    def clean(self, data: bytes) -> bytes:
+        """Convert file content to LFS pointer (clean filter)."""
+        # Check if data is already an LFS pointer
+        pointer = LFSPointer.from_bytes(data)
+        if pointer is not None:
+            return data
+
+        # Store the file content in LFS
+        sha = self.lfs_store.write_object([data])
+
+        # Create and return LFS pointer
+        pointer = LFSPointer(sha, len(data))
+        return pointer.to_bytes()
+
+    def smudge(self, data: bytes) -> bytes:
+        """Convert LFS pointer to file content (smudge filter)."""
+        # Try to parse as LFS pointer
+        pointer = LFSPointer.from_bytes(data)
+        if pointer is None:
+            # Not an LFS pointer, return as-is
+            return data
+
+        # Validate the pointer
+        if not pointer.is_valid_oid():
+            return data
+
+        try:
+            # Read the actual content from LFS store
+            with self.lfs_store.open_object(pointer.oid) as f:
+                return f.read()
+        except KeyError:
+            # Object not found in LFS store, return pointer as-is
+            # This matches Git LFS behavior when object is missing
+            return data

+ 199 - 1
tests/test_lfs.py

@@ -24,7 +24,7 @@
 import shutil
 import tempfile
 
-from dulwich.lfs import LFSStore
+from dulwich.lfs import LFSFilterDriver, LFSPointer, LFSStore
 
 from . import TestCase
 
@@ -86,3 +86,201 @@ class LFSTests(TestCase):
         self.assertTrue(os.path.isdir(lfs_dir))
         self.assertTrue(os.path.isdir(os.path.join(lfs_dir, "tmp")))
         self.assertTrue(os.path.isdir(os.path.join(lfs_dir, "objects")))
+
+
+class LFSPointerTests(TestCase):
+    def test_from_bytes_valid(self) -> None:
+        """Test parsing a valid LFS pointer."""
+        pointer_data = (
+            b"version https://git-lfs.github.com/spec/v1\n"
+            b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
+            b"size 0\n"
+        )
+        pointer = LFSPointer.from_bytes(pointer_data)
+        self.assertIsNotNone(pointer)
+        self.assertEqual(
+            pointer.oid,
+            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
+        )
+        self.assertEqual(pointer.size, 0)
+
+    def test_from_bytes_with_extra_fields(self) -> None:
+        """Test parsing LFS pointer with extra fields (should still work)."""
+        pointer_data = (
+            b"version https://git-lfs.github.com/spec/v1\n"
+            b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
+            b"size 1234\n"
+            b"x-custom-field value\n"
+        )
+        pointer = LFSPointer.from_bytes(pointer_data)
+        self.assertIsNotNone(pointer)
+        self.assertEqual(pointer.size, 1234)
+
+    def test_from_bytes_invalid_version(self) -> None:
+        """Test parsing with invalid version line."""
+        pointer_data = (
+            b"version https://invalid.com/spec/v1\n"
+            b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
+            b"size 0\n"
+        )
+        pointer = LFSPointer.from_bytes(pointer_data)
+        self.assertIsNone(pointer)
+
+    def test_from_bytes_missing_oid(self) -> None:
+        """Test parsing with missing OID."""
+        pointer_data = b"version https://git-lfs.github.com/spec/v1\nsize 0\n"
+        pointer = LFSPointer.from_bytes(pointer_data)
+        self.assertIsNone(pointer)
+
+    def test_from_bytes_missing_size(self) -> None:
+        """Test parsing with missing size."""
+        pointer_data = (
+            b"version https://git-lfs.github.com/spec/v1\n"
+            b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
+        )
+        pointer = LFSPointer.from_bytes(pointer_data)
+        self.assertIsNone(pointer)
+
+    def test_from_bytes_invalid_size(self) -> None:
+        """Test parsing with invalid size."""
+        pointer_data = (
+            b"version https://git-lfs.github.com/spec/v1\n"
+            b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
+            b"size not_a_number\n"
+        )
+        pointer = LFSPointer.from_bytes(pointer_data)
+        self.assertIsNone(pointer)
+
+    def test_from_bytes_binary_data(self) -> None:
+        """Test parsing binary data (not an LFS pointer)."""
+        binary_data = b"\x00\x01\x02\x03\x04"
+        pointer = LFSPointer.from_bytes(binary_data)
+        self.assertIsNone(pointer)
+
+    def test_to_bytes(self) -> None:
+        """Test converting LFS pointer to bytes."""
+        pointer = LFSPointer(
+            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 1234
+        )
+        data = pointer.to_bytes()
+        expected = (
+            b"version https://git-lfs.github.com/spec/v1\n"
+            b"oid sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n"
+            b"size 1234\n"
+        )
+        self.assertEqual(data, expected)
+
+    def test_round_trip(self) -> None:
+        """Test converting to bytes and back."""
+        original = LFSPointer(
+            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 9876
+        )
+        data = original.to_bytes()
+        parsed = LFSPointer.from_bytes(data)
+        self.assertIsNotNone(parsed)
+        self.assertEqual(parsed.oid, original.oid)
+        self.assertEqual(parsed.size, original.size)
+
+    def test_is_valid_oid(self) -> None:
+        """Test OID validation."""
+        # Valid SHA256
+        valid_pointer = LFSPointer(
+            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0
+        )
+        self.assertTrue(valid_pointer.is_valid_oid())
+
+        # Too short
+        short_pointer = LFSPointer("e3b0c44298fc1c14", 0)
+        self.assertFalse(short_pointer.is_valid_oid())
+
+        # Invalid hex characters
+        invalid_pointer = LFSPointer(
+            "g3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 0
+        )
+        self.assertFalse(invalid_pointer.is_valid_oid())
+
+
+class LFSFilterDriverTests(TestCase):
+    def setUp(self) -> None:
+        super().setUp()
+        self.test_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.test_dir)
+        self.lfs_store = LFSStore.create(self.test_dir)
+        self.filter_driver = LFSFilterDriver(self.lfs_store)
+
+    def test_clean_new_file(self) -> None:
+        """Test clean filter on new file content."""
+        content = b"This is a test file content"
+        result = self.filter_driver.clean(content)
+
+        # Result should be an LFS pointer
+        pointer = LFSPointer.from_bytes(result)
+        self.assertIsNotNone(pointer)
+        self.assertEqual(pointer.size, len(content))
+
+        # Content should be stored in LFS
+        with self.lfs_store.open_object(pointer.oid) as f:
+            self.assertEqual(f.read(), content)
+
+    def test_clean_existing_pointer(self) -> None:
+        """Test clean filter on already-pointer content."""
+        # Create a pointer
+        pointer = LFSPointer(
+            "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", 1234
+        )
+        pointer_data = pointer.to_bytes()
+
+        # Clean should return the pointer unchanged
+        result = self.filter_driver.clean(pointer_data)
+        self.assertEqual(result, pointer_data)
+
+    def test_smudge_valid_pointer(self) -> None:
+        """Test smudge filter with valid pointer."""
+        # Store some content
+        content = b"This is the actual file content"
+        sha = self.lfs_store.write_object([content])
+
+        # Create pointer
+        pointer = LFSPointer(sha, len(content))
+        pointer_data = pointer.to_bytes()
+
+        # Smudge should return the actual content
+        result = self.filter_driver.smudge(pointer_data)
+        self.assertEqual(result, content)
+
+    def test_smudge_missing_object(self) -> None:
+        """Test smudge filter with missing LFS object."""
+        # Create pointer to non-existent object
+        pointer = LFSPointer(
+            "0000000000000000000000000000000000000000000000000000000000000000", 1234
+        )
+        pointer_data = pointer.to_bytes()
+
+        # Smudge should return the pointer as-is when object is missing
+        result = self.filter_driver.smudge(pointer_data)
+        self.assertEqual(result, pointer_data)
+
+    def test_smudge_non_pointer(self) -> None:
+        """Test smudge filter on non-pointer content."""
+        content = b"This is not an LFS pointer"
+
+        # Smudge should return content unchanged
+        result = self.filter_driver.smudge(content)
+        self.assertEqual(result, content)
+
+    def test_round_trip(self) -> None:
+        """Test clean followed by smudge."""
+        original_content = b"Round trip test content"
+
+        # Clean (working tree -> repo)
+        pointer_data = self.filter_driver.clean(original_content)
+
+        # Verify it's a pointer
+        pointer = LFSPointer.from_bytes(pointer_data)
+        self.assertIsNotNone(pointer)
+
+        # Smudge (repo -> working tree)
+        restored_content = self.filter_driver.smudge(pointer_data)
+
+        # Should get back the original content
+        self.assertEqual(restored_content, original_content)

+ 131 - 0
tests/test_lfs_integration.py

@@ -0,0 +1,131 @@
+# test_lfs_integration.py -- Integration tests for LFS with filters
+# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
+#
+# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
+# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
+# General Public License as public by the Free Software Foundation; version 2.0
+# or (at your option) any later version. You can redistribute it and/or
+# modify it under the terms of either of these two licenses.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# You should have received a copy of the licenses; if not, see
+# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
+# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
+# License, Version 2.0.
+#
+
+"""Integration tests for LFS with the filter system."""
+
+import shutil
+import tempfile
+
+from dulwich.config import ConfigDict
+from dulwich.filters import FilterBlobNormalizer, FilterRegistry
+from dulwich.lfs import LFSFilterDriver, LFSStore
+from dulwich.objects import Blob
+
+from . import TestCase
+
+
+class LFSFilterIntegrationTests(TestCase):
+    def setUp(self) -> None:
+        super().setUp()
+        # Create temporary directory for LFS store
+        self.test_dir = tempfile.mkdtemp()
+        self.addCleanup(shutil.rmtree, self.test_dir)
+
+        # Set up LFS store and filter
+        self.lfs_store = LFSStore.create(self.test_dir)
+        self.lfs_filter = LFSFilterDriver(self.lfs_store)
+
+        # Set up filter registry and normalizer
+        self.config = ConfigDict()
+        self.registry = FilterRegistry(self.config)
+        self.registry.register_driver("lfs", self.lfs_filter)
+
+        # Set up gitattributes to use LFS for .bin files
+        self.gitattributes = {
+            b"*.bin": {b"filter": b"lfs"},
+        }
+
+        self.normalizer = FilterBlobNormalizer(
+            self.config, self.gitattributes, self.registry
+        )
+
+    def test_lfs_round_trip(self) -> None:
+        """Test complete LFS round trip through filter normalizer."""
+        # Create a blob with binary content
+        original_content = b"This is a large binary file content" * 100
+        blob = Blob()
+        blob.data = original_content
+
+        # Checkin: should convert to LFS pointer
+        checked_in = self.normalizer.checkin_normalize(blob, b"large.bin")
+
+        # Verify it's an LFS pointer
+        self.assertLess(len(checked_in.data), len(original_content))
+        self.assertTrue(
+            checked_in.data.startswith(b"version https://git-lfs.github.com/spec/v1")
+        )
+
+        # Checkout: should restore original content
+        checked_out = self.normalizer.checkout_normalize(checked_in, b"large.bin")
+
+        # Verify we got back the original content
+        self.assertEqual(checked_out.data, original_content)
+
+    def test_non_lfs_file(self) -> None:
+        """Test that non-LFS files pass through unchanged."""
+        # Create a text file (not matching *.bin pattern)
+        content = b"This is a regular text file"
+        blob = Blob()
+        blob.data = content
+
+        # Both operations should return the original blob
+        checked_in = self.normalizer.checkin_normalize(blob, b"file.txt")
+        self.assertIs(checked_in, blob)
+
+        checked_out = self.normalizer.checkout_normalize(blob, b"file.txt")
+        self.assertIs(checked_out, blob)
+
+    def test_lfs_pointer_file(self) -> None:
+        """Test handling of files that are already LFS pointers."""
+        # Create an LFS pointer manually
+        from dulwich.lfs import LFSPointer
+
+        # First store some content
+        content = b"Content to be stored in LFS"
+        sha = self.lfs_store.write_object([content])
+
+        # Create pointer
+        pointer = LFSPointer(sha, len(content))
+        blob = Blob()
+        blob.data = pointer.to_bytes()
+
+        # Checkin should recognize it's already a pointer and not change it
+        checked_in = self.normalizer.checkin_normalize(blob, b"data.bin")
+        self.assertIs(checked_in, blob)
+
+        # Checkout should expand it
+        checked_out = self.normalizer.checkout_normalize(blob, b"data.bin")
+        self.assertEqual(checked_out.data, content)
+
+    def test_missing_lfs_object(self) -> None:
+        """Test handling of LFS pointer with missing object."""
+        from dulwich.lfs import LFSPointer
+
+        # Create pointer to non-existent object
+        pointer = LFSPointer(
+            "0000000000000000000000000000000000000000000000000000000000000000", 1234
+        )
+        blob = Blob()
+        blob.data = pointer.to_bytes()
+
+        # Checkout should return the pointer as-is when object is missing
+        checked_out = self.normalizer.checkout_normalize(blob, b"missing.bin")
+        self.assertEqual(checked_out.data, blob.data)