test_filters.py 9.1 KB


  1. # test_filters.py -- tests for filter drivers
  2. # Copyright (C) 2024 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for filter drivers support."""
  22. import sys
  23. from unittest import skipIf
  24. from dulwich.config import ConfigDict
  25. from dulwich.filters import (
  26. FilterBlobNormalizer,
  27. FilterRegistry,
  28. ProcessFilterDriver,
  29. get_filter_for_path,
  30. )
  31. from dulwich.objects import Blob
  32. from . import TestCase
  33. class ProcessFilterDriverTests(TestCase):
  34. @skipIf(sys.platform == "win32", "Unix shell commands")
  35. def test_clean_filter(self) -> None:
  36. """Test clean filter with external command."""
  37. # Use a simple command that converts to uppercase
  38. driver = ProcessFilterDriver(clean_cmd="tr '[:lower:]' '[:upper:]'")
  39. result = driver.clean(b"hello world")
  40. self.assertEqual(result, b"HELLO WORLD")
  41. @skipIf(sys.platform == "win32", "Unix shell commands")
  42. def test_smudge_filter(self) -> None:
  43. """Test smudge filter with external command."""
  44. # Use a simple command that converts to lowercase
  45. driver = ProcessFilterDriver(smudge_cmd="tr '[:upper:]' '[:lower:]'")
  46. result = driver.smudge(b"HELLO WORLD")
  47. self.assertEqual(result, b"hello world")
  48. def test_no_filters(self) -> None:
  49. """Test driver with no filters configured."""
  50. driver = ProcessFilterDriver()
  51. data = b"test data"
  52. self.assertEqual(driver.clean(data), data)
  53. self.assertEqual(driver.smudge(data), data)
  54. @skipIf(sys.platform == "win32", "Unix shell commands")
  55. def test_failing_filter(self) -> None:
  56. """Test that failing filter propagates the error."""
  57. import subprocess
  58. # Use a command that will fail
  59. driver = ProcessFilterDriver(clean_cmd="false")
  60. data = b"test data"
  61. # Should raise CalledProcessError
  62. with self.assertRaises(subprocess.CalledProcessError):
  63. driver.clean(data)
  64. # Test smudge filter too
  65. driver = ProcessFilterDriver(smudge_cmd="false")
  66. with self.assertRaises(subprocess.CalledProcessError):
  67. driver.smudge(data)
  68. class FilterRegistryTests(TestCase):
  69. def setUp(self) -> None:
  70. super().setUp()
  71. self.config = ConfigDict()
  72. self.registry = FilterRegistry(self.config)
  73. def test_register_and_get_driver(self) -> None:
  74. """Test registering and retrieving a driver."""
  75. driver = ProcessFilterDriver(clean_cmd="cat")
  76. self.registry.register_driver("test", driver)
  77. retrieved = self.registry.get_driver("test")
  78. self.assertIs(retrieved, driver)
  79. def test_get_nonexistent_driver(self) -> None:
  80. """Test getting a non-existent driver."""
  81. result = self.registry.get_driver("nonexistent")
  82. self.assertIsNone(result)
  83. def test_register_factory(self) -> None:
  84. """Test registering a driver factory."""
  85. created_driver = ProcessFilterDriver(clean_cmd="cat")
  86. def factory(registry):
  87. return created_driver
  88. self.registry.register_factory("test", factory)
  89. # Getting driver should invoke factory
  90. retrieved = self.registry.get_driver("test")
  91. self.assertIs(retrieved, created_driver)
  92. # Second get should return cached instance
  93. retrieved2 = self.registry.get_driver("test")
  94. self.assertIs(retrieved2, created_driver)
  95. def test_create_from_config(self) -> None:
  96. """Test creating driver from config."""
  97. # Set up config using the proper Config interface
  98. self.config.set(("filter", "test"), "clean", b"cat")
  99. self.config.set(("filter", "test"), "smudge", b"tac")
  100. # Get driver (should be created from config)
  101. driver = self.registry.get_driver("test")
  102. self.assertIsNotNone(driver)
  103. self.assertIsInstance(driver, ProcessFilterDriver)
  104. self.assertEqual(driver.clean_cmd, "cat")
  105. self.assertEqual(driver.smudge_cmd, "tac")
  106. def test_builtin_lfs_factory(self) -> None:
  107. """Test that LFS filter is available as a built-in."""
  108. from dulwich.lfs import LFSFilterDriver
  109. # Should be able to get LFS filter without explicit registration
  110. driver = self.registry.get_driver("lfs")
  111. self.assertIsNotNone(driver)
  112. self.assertIsInstance(driver, LFSFilterDriver)
  113. class GetFilterForPathTests(TestCase):
  114. def setUp(self) -> None:
  115. super().setUp()
  116. self.registry = FilterRegistry()
  117. self.driver = ProcessFilterDriver(clean_cmd="cat")
  118. self.registry.register_driver("test", self.driver)
  119. def test_get_filter_for_path(self) -> None:
  120. """Test getting filter for a path with filter attribute."""
  121. gitattributes = {
  122. b"*.txt": {b"filter": b"test"},
  123. }
  124. result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
  125. self.assertIs(result, self.driver)
  126. def test_no_filter_attribute(self) -> None:
  127. """Test path with no filter attribute."""
  128. gitattributes = {
  129. b"*.txt": {b"text": b"auto"},
  130. }
  131. result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
  132. self.assertIsNone(result)
  133. def test_no_matching_pattern(self) -> None:
  134. """Test path with no matching pattern."""
  135. gitattributes = {
  136. b"*.jpg": {b"filter": b"test"},
  137. }
  138. result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
  139. self.assertIsNone(result)
  140. def test_filter_not_registered(self) -> None:
  141. """Test path with filter that's not registered."""
  142. gitattributes = {
  143. b"*.txt": {b"filter": b"nonexistent"},
  144. }
  145. result = get_filter_for_path(b"file.txt", gitattributes, self.registry)
  146. self.assertIsNone(result)
  147. class FilterBlobNormalizerTests(TestCase):
  148. def setUp(self) -> None:
  149. super().setUp()
  150. self.config = ConfigDict()
  151. self.registry = FilterRegistry(self.config)
  152. self.gitattributes = {}
  153. self.normalizer = FilterBlobNormalizer(
  154. self.config, self.gitattributes, self.registry
  155. )
  156. def test_no_filter(self) -> None:
  157. """Test normalizer with no filter defined."""
  158. blob = Blob()
  159. blob.data = b"test content"
  160. # Both checkin and checkout should return blob unchanged
  161. result = self.normalizer.checkin_normalize(blob, b"file.txt")
  162. self.assertIs(result, blob)
  163. result = self.normalizer.checkout_normalize(blob, b"file.txt")
  164. self.assertIs(result, blob)
  165. def test_with_filter(self) -> None:
  166. """Test normalizer with a filter defined."""
  167. # Create a simple filter that converts to uppercase on clean
  168. # and lowercase on smudge
  169. class TestFilter:
  170. def clean(self, data):
  171. return data.upper()
  172. def smudge(self, data):
  173. return data.lower()
  174. # Register the filter and set it in gitattributes
  175. self.registry.register_driver("test", TestFilter())
  176. self.gitattributes[b"*.txt"] = {b"filter": b"test"}
  177. blob = Blob()
  178. blob.data = b"Test Content"
  179. # Checkin should uppercase
  180. result = self.normalizer.checkin_normalize(blob, b"file.txt")
  181. self.assertEqual(result.data, b"TEST CONTENT")
  182. self.assertIsNot(result, blob) # Should be a new blob
  183. # Checkout should lowercase
  184. result = self.normalizer.checkout_normalize(blob, b"file.txt")
  185. self.assertEqual(result.data, b"test content")
  186. self.assertIsNot(result, blob) # Should be a new blob
  187. def test_filter_returns_same_data(self) -> None:
  188. """Test that normalizer returns same blob if filter doesn't change data."""
  189. # Create a filter that returns data unchanged
  190. class NoOpFilter:
  191. def clean(self, data):
  192. return data
  193. def smudge(self, data):
  194. return data
  195. self.registry.register_driver("noop", NoOpFilter())
  196. self.gitattributes[b"*.txt"] = {b"filter": b"noop"}
  197. blob = Blob()
  198. blob.data = b"unchanged content"
  199. # Both operations should return the same blob instance
  200. result = self.normalizer.checkin_normalize(blob, b"file.txt")
  201. self.assertIs(result, blob)
  202. result = self.normalizer.checkout_normalize(blob, b"file.txt")
  203. self.assertIs(result, blob)