|
@@ -1,4 +1,4 @@
|
|
|
-# test_filters.py -- tests for filter drivers
|
|
|
+# test_filters.py -- Tests for filters
|
|
|
# Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
|
|
|
#
|
|
|
# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
|
|
@@ -19,239 +19,181 @@
|
|
|
# License, Version 2.0.
|
|
|
#
|
|
|
|
|
|
-"""Tests for filter drivers support."""
|
|
|
+"""Tests for filters."""
|
|
|
|
|
|
-import sys
|
|
|
-from unittest import skipIf
|
|
|
+import os
|
|
|
+import tempfile
|
|
|
+import unittest
|
|
|
|
|
|
-from dulwich.config import ConfigDict
|
|
|
-from dulwich.filters import (
|
|
|
- FilterBlobNormalizer,
|
|
|
- FilterRegistry,
|
|
|
- ProcessFilterDriver,
|
|
|
- get_filter_for_path,
|
|
|
-)
|
|
|
-from dulwich.objects import Blob
|
|
|
+from dulwich import porcelain
|
|
|
+from dulwich.repo import Repo
|
|
|
|
|
|
from . import TestCase
|
|
|
|
|
|
|
|
|
-class ProcessFilterDriverTests(TestCase):
|
|
|
- @skipIf(sys.platform == "win32", "Unix shell commands")
|
|
|
- def test_clean_filter(self) -> None:
|
|
|
- """Test clean filter with external command."""
|
|
|
- # Use a simple command that converts to uppercase
|
|
|
- driver = ProcessFilterDriver(clean_cmd="tr '[:lower:]' '[:upper:]'")
|
|
|
- result = driver.clean(b"hello world")
|
|
|
- self.assertEqual(result, b"HELLO WORLD")
|
|
|
-
|
|
|
- @skipIf(sys.platform == "win32", "Unix shell commands")
|
|
|
- def test_smudge_filter(self) -> None:
|
|
|
- """Test smudge filter with external command."""
|
|
|
- # Use a simple command that converts to lowercase
|
|
|
- driver = ProcessFilterDriver(smudge_cmd="tr '[:upper:]' '[:lower:]'")
|
|
|
- result = driver.smudge(b"HELLO WORLD")
|
|
|
- self.assertEqual(result, b"hello world")
|
|
|
-
|
|
|
- def test_no_filters(self) -> None:
|
|
|
- """Test driver with no filters configured."""
|
|
|
- driver = ProcessFilterDriver()
|
|
|
- data = b"test data"
|
|
|
- self.assertEqual(driver.clean(data), data)
|
|
|
- self.assertEqual(driver.smudge(data), data)
|
|
|
-
|
|
|
- @skipIf(sys.platform == "win32", "Unix shell commands")
|
|
|
- def test_failing_filter(self) -> None:
|
|
|
- """Test that failing filter propagates the error."""
|
|
|
- import subprocess
|
|
|
-
|
|
|
- # Use a command that will fail
|
|
|
- driver = ProcessFilterDriver(clean_cmd="false")
|
|
|
- data = b"test data"
|
|
|
- # Should raise CalledProcessError
|
|
|
- with self.assertRaises(subprocess.CalledProcessError):
|
|
|
- driver.clean(data)
|
|
|
-
|
|
|
- # Test smudge filter too
|
|
|
- driver = ProcessFilterDriver(smudge_cmd="false")
|
|
|
- with self.assertRaises(subprocess.CalledProcessError):
|
|
|
- driver.smudge(data)
|
|
|
-
|
|
|
-
|
|
|
-class FilterRegistryTests(TestCase):
|
|
|
- def setUp(self) -> None:
|
|
|
- super().setUp()
|
|
|
- self.config = ConfigDict()
|
|
|
- self.registry = FilterRegistry(self.config)
|
|
|
-
|
|
|
- def test_register_and_get_driver(self) -> None:
|
|
|
- """Test registering and retrieving a driver."""
|
|
|
- driver = ProcessFilterDriver(clean_cmd="cat")
|
|
|
- self.registry.register_driver("test", driver)
|
|
|
-
|
|
|
- retrieved = self.registry.get_driver("test")
|
|
|
- self.assertIs(retrieved, driver)
|
|
|
-
|
|
|
- def test_get_nonexistent_driver(self) -> None:
|
|
|
- """Test getting a non-existent driver."""
|
|
|
- result = self.registry.get_driver("nonexistent")
|
|
|
- self.assertIsNone(result)
|
|
|
-
|
|
|
- def test_register_factory(self) -> None:
|
|
|
- """Test registering a driver factory."""
|
|
|
- created_driver = ProcessFilterDriver(clean_cmd="cat")
|
|
|
-
|
|
|
- def factory(registry):
|
|
|
- return created_driver
|
|
|
-
|
|
|
- self.registry.register_factory("test", factory)
|
|
|
-
|
|
|
- # Getting driver should invoke factory
|
|
|
- retrieved = self.registry.get_driver("test")
|
|
|
- self.assertIs(retrieved, created_driver)
|
|
|
-
|
|
|
- # Second get should return cached instance
|
|
|
- retrieved2 = self.registry.get_driver("test")
|
|
|
- self.assertIs(retrieved2, created_driver)
|
|
|
-
|
|
|
- def test_create_from_config(self) -> None:
|
|
|
- """Test creating driver from config."""
|
|
|
- # Set up config using the proper Config interface
|
|
|
- self.config.set(("filter", "test"), "clean", b"cat")
|
|
|
- self.config.set(("filter", "test"), "smudge", b"tac")
|
|
|
-
|
|
|
- # Get driver (should be created from config)
|
|
|
- driver = self.registry.get_driver("test")
|
|
|
- self.assertIsNotNone(driver)
|
|
|
- self.assertIsInstance(driver, ProcessFilterDriver)
|
|
|
- self.assertEqual(driver.clean_cmd, "cat")
|
|
|
- self.assertEqual(driver.smudge_cmd, "tac")
|
|
|
-
|
|
|
- def test_builtin_lfs_factory(self) -> None:
|
|
|
- """Test that LFS filter is available as a built-in."""
|
|
|
- from dulwich.lfs import LFSFilterDriver
|
|
|
-
|
|
|
- # Should be able to get LFS filter without explicit registration
|
|
|
- driver = self.registry.get_driver("lfs")
|
|
|
- self.assertIsNotNone(driver)
|
|
|
- self.assertIsInstance(driver, LFSFilterDriver)
|
|
|
-
|
|
|
-
|
|
|
-class GetFilterForPathTests(TestCase):
|
|
|
- def setUp(self) -> None:
|
|
|
- super().setUp()
|
|
|
- self.registry = FilterRegistry()
|
|
|
- self.driver = ProcessFilterDriver(clean_cmd="cat")
|
|
|
- self.registry.register_driver("test", self.driver)
|
|
|
-
|
|
|
- def test_get_filter_for_path(self) -> None:
|
|
|
- """Test getting filter for a path with filter attribute."""
|
|
|
- gitattributes = {
|
|
|
- b"*.txt": {b"filter": b"test"},
|
|
|
- }
|
|
|
-
|
|
|
- result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
|
|
|
- self.assertIs(result, self.driver)
|
|
|
-
|
|
|
- def test_no_filter_attribute(self) -> None:
|
|
|
- """Test path with no filter attribute."""
|
|
|
- gitattributes = {
|
|
|
- b"*.txt": {b"text": b"auto"},
|
|
|
- }
|
|
|
-
|
|
|
- result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
|
|
|
- self.assertIsNone(result)
|
|
|
+class GitAttributesFilterIntegrationTests(TestCase):
|
|
|
+ """Test gitattributes integration with filter drivers."""
|
|
|
|
|
|
- def test_no_matching_pattern(self) -> None:
|
|
|
- """Test path with no matching pattern."""
|
|
|
- gitattributes = {
|
|
|
- b"*.jpg": {b"filter": b"test"},
|
|
|
- }
|
|
|
-
|
|
|
- result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
|
|
|
- self.assertIsNone(result)
|
|
|
-
|
|
|
- def test_filter_not_registered(self) -> None:
|
|
|
- """Test path with filter that's not registered."""
|
|
|
- gitattributes = {
|
|
|
- b"*.txt": {b"filter": b"nonexistent"},
|
|
|
- }
|
|
|
-
|
|
|
- result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
|
|
|
- self.assertIsNone(result)
|
|
|
-
|
|
|
-
|
|
|
-class FilterBlobNormalizerTests(TestCase):
|
|
|
def setUp(self) -> None:
|
|
|
super().setUp()
|
|
|
- self.config = ConfigDict()
|
|
|
- self.registry = FilterRegistry(self.config)
|
|
|
- self.gitattributes = {}
|
|
|
- self.normalizer = FilterBlobNormalizer(
|
|
|
- self.config, self.gitattributes, self.registry
|
|
|
- )
|
|
|
-
|
|
|
- def test_no_filter(self) -> None:
|
|
|
- """Test normalizer with no filter defined."""
|
|
|
- blob = Blob()
|
|
|
- blob.data = b"test content"
|
|
|
-
|
|
|
- # Both checkin and checkout should return blob unchanged
|
|
|
- result = self.normalizer.checkin_normalize(blob, b"file.txt")
|
|
|
- self.assertIs(result, blob)
|
|
|
-
|
|
|
- result = self.normalizer.checkout_normalize(blob, b"file.txt")
|
|
|
- self.assertIs(result, blob)
|
|
|
-
|
|
|
- def test_with_filter(self) -> None:
|
|
|
- """Test normalizer with a filter defined."""
|
|
|
-
|
|
|
- # Create a simple filter that converts to uppercase on clean
|
|
|
- # and lowercase on smudge
|
|
|
- class TestFilter:
|
|
|
- def clean(self, data):
|
|
|
- return data.upper()
|
|
|
-
|
|
|
- def smudge(self, data):
|
|
|
- return data.lower()
|
|
|
-
|
|
|
- # Register the filter and set it in gitattributes
|
|
|
- self.registry.register_driver("test", TestFilter())
|
|
|
- self.gitattributes[b"*.txt"] = {b"filter": b"test"}
|
|
|
-
|
|
|
- blob = Blob()
|
|
|
- blob.data = b"Test Content"
|
|
|
-
|
|
|
- # Checkin should uppercase
|
|
|
- result = self.normalizer.checkin_normalize(blob, b"file.txt")
|
|
|
- self.assertEqual(result.data, b"TEST CONTENT")
|
|
|
- self.assertIsNot(result, blob) # Should be a new blob
|
|
|
-
|
|
|
- # Checkout should lowercase
|
|
|
- result = self.normalizer.checkout_normalize(blob, b"file.txt")
|
|
|
- self.assertEqual(result.data, b"test content")
|
|
|
- self.assertIsNot(result, blob) # Should be a new blob
|
|
|
-
|
|
|
- def test_filter_returns_same_data(self) -> None:
|
|
|
- """Test that normalizer returns same blob if filter doesn't change data."""
|
|
|
-
|
|
|
- # Create a filter that returns data unchanged
|
|
|
- class NoOpFilter:
|
|
|
- def clean(self, data):
|
|
|
- return data
|
|
|
-
|
|
|
- def smudge(self, data):
|
|
|
- return data
|
|
|
-
|
|
|
- self.registry.register_driver("noop", NoOpFilter())
|
|
|
- self.gitattributes[b"*.txt"] = {b"filter": b"noop"}
|
|
|
-
|
|
|
- blob = Blob()
|
|
|
- blob.data = b"unchanged content"
|
|
|
-
|
|
|
- # Both operations should return the same blob instance
|
|
|
- result = self.normalizer.checkin_normalize(blob, b"file.txt")
|
|
|
- self.assertIs(result, blob)
|
|
|
-
|
|
|
- result = self.normalizer.checkout_normalize(blob, b"file.txt")
|
|
|
- self.assertIs(result, blob)
|
|
|
+ self.test_dir = tempfile.mkdtemp()
|
|
|
+ self.addCleanup(self._cleanup_test_dir)
|
|
|
+ self.repo = Repo.init(self.test_dir)
|
|
|
+
|
|
|
+ def _cleanup_test_dir(self) -> None:
|
|
|
+ """Clean up test directory."""
|
|
|
+ import shutil
|
|
|
+
|
|
|
+ shutil.rmtree(self.test_dir)
|
|
|
+
|
|
|
+ def test_gitattributes_text_filter(self) -> None:
|
|
|
+ """Test that text attribute triggers line ending conversion."""
|
|
|
+ # Configure autocrlf first
|
|
|
+ config = self.repo.get_config()
|
|
|
+ config.set((b"core",), b"autocrlf", b"true")
|
|
|
+ config.write_to_path()
|
|
|
+
|
|
|
+ # Create .gitattributes with text attribute
|
|
|
+ gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
|
|
|
+ with open(gitattributes_path, "wb") as f:
|
|
|
+ f.write(b"*.txt text\n")
|
|
|
+ f.write(b"*.bin -text\n")
|
|
|
+
|
|
|
+ # Add .gitattributes
|
|
|
+ porcelain.add(self.repo, paths=[".gitattributes"])
|
|
|
+ porcelain.commit(self.repo, message=b"Add gitattributes")
|
|
|
+
|
|
|
+ # Create text file with CRLF
|
|
|
+ text_file = os.path.join(self.test_dir, "test.txt")
|
|
|
+ with open(text_file, "wb") as f:
|
|
|
+ f.write(b"line1\r\nline2\r\n")
|
|
|
+
|
|
|
+ # Create binary file with CRLF
|
|
|
+ bin_file = os.path.join(self.test_dir, "test.bin")
|
|
|
+ with open(bin_file, "wb") as f:
|
|
|
+ f.write(b"binary\r\ndata\r\n")
|
|
|
+
|
|
|
+ # Add files
|
|
|
+ porcelain.add(self.repo, paths=["test.txt", "test.bin"])
|
|
|
+
|
|
|
+ # Check that text file was normalized
|
|
|
+ index = self.repo.open_index()
|
|
|
+ text_entry = index[b"test.txt"]
|
|
|
+ text_blob = self.repo.object_store[text_entry.sha]
|
|
|
+ self.assertEqual(text_blob.data, b"line1\nline2\n")
|
|
|
+
|
|
|
+ # Check that binary file was not normalized
|
|
|
+ bin_entry = index[b"test.bin"]
|
|
|
+ bin_blob = self.repo.object_store[bin_entry.sha]
|
|
|
+ self.assertEqual(bin_blob.data, b"binary\r\ndata\r\n")
|
|
|
+
|
|
|
+ @unittest.skip("Custom process filters require external commands")
|
|
|
+ def test_gitattributes_custom_filter(self) -> None:
|
|
|
+ """Test custom filter specified in gitattributes."""
|
|
|
+ # Create .gitattributes with custom filter
|
|
|
+ gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
|
|
|
+ with open(gitattributes_path, "wb") as f:
|
|
|
+ f.write(b"*.secret filter=redact\n")
|
|
|
+
|
|
|
+ # Configure custom filter (use tr command for testing)
|
|
|
+ config = self.repo.get_config()
|
|
|
+ # This filter replaces all digits with X
|
|
|
+ config.set((b"filter", b"redact"), b"clean", b"tr '0-9' 'X'")
|
|
|
+ config.write_to_path()
|
|
|
+
|
|
|
+ # Add .gitattributes
|
|
|
+ porcelain.add(self.repo, paths=[".gitattributes"])
|
|
|
+
|
|
|
+ # Create file with sensitive content
|
|
|
+ secret_file = os.path.join(self.test_dir, "password.secret")
|
|
|
+ with open(secret_file, "wb") as f:
|
|
|
+ f.write(b"password123\ntoken456\n")
|
|
|
+
|
|
|
+ # Add file
|
|
|
+ porcelain.add(self.repo, paths=["password.secret"])
|
|
|
+
|
|
|
+ # Check that content was filtered
|
|
|
+ index = self.repo.open_index()
|
|
|
+ entry = index[b"password.secret"]
|
|
|
+ blob = self.repo.object_store[entry.sha]
|
|
|
+ self.assertEqual(blob.data, b"passwordXXX\ntokenXXX\n")
|
|
|
+
|
|
|
+ def test_gitattributes_from_tree(self) -> None:
|
|
|
+ """Test that gitattributes from tree are used when no working tree exists."""
|
|
|
+ # Create .gitattributes with text attribute
|
|
|
+ gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
|
|
|
+ with open(gitattributes_path, "wb") as f:
|
|
|
+ f.write(b"*.txt text\n")
|
|
|
+
|
|
|
+ # Add and commit .gitattributes
|
|
|
+ porcelain.add(self.repo, paths=[".gitattributes"])
|
|
|
+ porcelain.commit(self.repo, message=b"Add gitattributes")
|
|
|
+
|
|
|
+ # Remove .gitattributes from working tree
|
|
|
+ os.remove(gitattributes_path)
|
|
|
+
|
|
|
+ # Get gitattributes - should still work from tree
|
|
|
+ gitattributes = self.repo.get_gitattributes()
|
|
|
+ attrs = gitattributes.match_path(b"test.txt")
|
|
|
+ self.assertEqual(attrs.get(b"text"), True)
|
|
|
+
|
|
|
+ def test_gitattributes_info_attributes(self) -> None:
|
|
|
+ """Test that .git/info/attributes is read."""
|
|
|
+ # Create info/attributes
|
|
|
+ info_dir = os.path.join(self.repo.controldir(), "info")
|
|
|
+ if not os.path.exists(info_dir):
|
|
|
+ os.makedirs(info_dir)
|
|
|
+ info_attrs_path = os.path.join(info_dir, "attributes")
|
|
|
+ with open(info_attrs_path, "wb") as f:
|
|
|
+ f.write(b"*.log text\n")
|
|
|
+
|
|
|
+ # Get gitattributes
|
|
|
+ gitattributes = self.repo.get_gitattributes()
|
|
|
+ attrs = gitattributes.match_path(b"debug.log")
|
|
|
+ self.assertEqual(attrs.get(b"text"), True)
|
|
|
+
|
|
|
+ @unittest.skip("Custom process filters require external commands")
|
|
|
+ def test_filter_precedence(self) -> None:
|
|
|
+ """Test that filter attribute takes precedence over text attribute."""
|
|
|
+ # Create .gitattributes with both text and filter
|
|
|
+ gitattributes_path = os.path.join(self.test_dir, ".gitattributes")
|
|
|
+ with open(gitattributes_path, "wb") as f:
|
|
|
+ f.write(b"*.txt text filter=custom\n")
|
|
|
+
|
|
|
+ # Configure autocrlf and custom filter
|
|
|
+ config = self.repo.get_config()
|
|
|
+ config.set((b"core",), b"autocrlf", b"true")
|
|
|
+ # This filter converts to uppercase
|
|
|
+ config.set((b"filter", b"custom"), b"clean", b"tr '[:lower:]' '[:upper:]'")
|
|
|
+ config.write_to_path()
|
|
|
+
|
|
|
+ # Add .gitattributes
|
|
|
+ porcelain.add(self.repo, paths=[".gitattributes"])
|
|
|
+
|
|
|
+ # Create text file with lowercase and CRLF
|
|
|
+ text_file = os.path.join(self.test_dir, "test.txt")
|
|
|
+ with open(text_file, "wb") as f:
|
|
|
+ f.write(b"hello\r\nworld\r\n")
|
|
|
+
|
|
|
+ # Add file
|
|
|
+ porcelain.add(self.repo, paths=["test.txt"])
|
|
|
+
|
|
|
+ # Check that custom filter was applied (not just line ending conversion)
|
|
|
+ index = self.repo.open_index()
|
|
|
+ entry = index[b"test.txt"]
|
|
|
+ blob = self.repo.object_store[entry.sha]
|
|
|
+ # Should be uppercase with LF endings
|
|
|
+ self.assertEqual(blob.data, b"HELLO\nWORLD\n")
|
|
|
+
|
|
|
+ def test_blob_normalizer_integration(self) -> None:
|
|
|
+ """Test that get_blob_normalizer returns a FilterBlobNormalizer."""
|
|
|
+ normalizer = self.repo.get_blob_normalizer()
|
|
|
+
|
|
|
+ # Check it's the right type
|
|
|
+ from dulwich.filters import FilterBlobNormalizer
|
|
|
+
|
|
|
+ self.assertIsInstance(normalizer, FilterBlobNormalizer)
|
|
|
+
|
|
|
+ # Check it has access to gitattributes
|
|
|
+ self.assertIsNotNone(normalizer.gitattributes)
|
|
|
+ self.assertIsNotNone(normalizer.filter_registry)
|