filters.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. # filters.py -- Git filter drivers (clean/smudge) implementation
  2. # Copyright (C) 2024 Jelmer Vernooij
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Implementation of Git filter drivers (clean/smudge filters)."""
  22. import logging
  23. import subprocess
  24. from typing import TYPE_CHECKING, Callable, Optional, Protocol
  25. from .attrs import GitAttributes
  26. from .objects import Blob
  27. if TYPE_CHECKING:
  28. from .config import StackedConfig
  29. class FilterError(Exception):
  30. """Exception raised when filter operations fail."""
  31. class FilterDriver(Protocol):
  32. """Protocol for filter drivers."""
  33. def clean(self, data: bytes) -> bytes:
  34. """Apply clean filter (working tree → repository)."""
  35. ...
  36. def smudge(self, data: bytes) -> bytes:
  37. """Apply smudge filter (repository → working tree)."""
  38. ...
  39. class ProcessFilterDriver:
  40. """Filter driver that executes external processes."""
  41. def __init__(
  42. self,
  43. clean_cmd: Optional[str] = None,
  44. smudge_cmd: Optional[str] = None,
  45. required: bool = False,
  46. ) -> None:
  47. self.clean_cmd = clean_cmd
  48. self.smudge_cmd = smudge_cmd
  49. self.required = required
  50. def clean(self, data: bytes) -> bytes:
  51. """Apply clean filter using external process."""
  52. if not self.clean_cmd:
  53. if self.required:
  54. raise FilterError("Clean command is required but not configured")
  55. return data
  56. try:
  57. result = subprocess.run(
  58. self.clean_cmd,
  59. shell=True,
  60. input=data,
  61. capture_output=True,
  62. check=True,
  63. )
  64. return result.stdout
  65. except subprocess.CalledProcessError as e:
  66. if self.required:
  67. raise FilterError(f"Required clean filter failed: {e}")
  68. # If not required, log warning and return original data on failure
  69. logging.warning(f"Optional clean filter failed: {e}")
  70. return data
  71. def smudge(self, data: bytes) -> bytes:
  72. """Apply smudge filter using external process."""
  73. if not self.smudge_cmd:
  74. if self.required:
  75. raise FilterError("Smudge command is required but not configured")
  76. return data
  77. try:
  78. result = subprocess.run(
  79. self.smudge_cmd,
  80. shell=True,
  81. input=data,
  82. capture_output=True,
  83. check=True,
  84. )
  85. return result.stdout
  86. except subprocess.CalledProcessError as e:
  87. if self.required:
  88. raise FilterError(f"Required smudge filter failed: {e}")
  89. # If not required, log warning and return original data on failure
  90. logging.warning(f"Optional smudge filter failed: {e}")
  91. return data
  92. class FilterRegistry:
  93. """Registry for filter drivers."""
  94. def __init__(self, config: Optional["StackedConfig"] = None, repo=None) -> None:
  95. self.config = config
  96. self.repo = repo
  97. self._drivers: dict[str, FilterDriver] = {}
  98. self._factories: dict[str, Callable[[FilterRegistry], FilterDriver]] = {}
  99. # Register built-in filter factories
  100. self.register_factory("lfs", self._create_lfs_filter)
  101. self.register_factory("text", self._create_text_filter)
  102. # Auto-register line ending filter if autocrlf is enabled
  103. self._setup_line_ending_filter()
  104. def register_factory(
  105. self, name: str, factory: Callable[["FilterRegistry"], FilterDriver]
  106. ) -> None:
  107. """Register a filter driver factory."""
  108. self._factories[name] = factory
  109. def register_driver(self, name: str, driver: FilterDriver) -> None:
  110. """Register a filter driver instance."""
  111. self._drivers[name] = driver
  112. def get_driver(self, name: str) -> Optional[FilterDriver]:
  113. """Get a filter driver by name."""
  114. # Check if we already have an instance
  115. if name in self._drivers:
  116. return self._drivers[name]
  117. # Try to create from factory
  118. if name in self._factories:
  119. factory_driver = self._factories[name](self)
  120. self._drivers[name] = factory_driver
  121. return factory_driver
  122. # Try to create from config
  123. if self.config is not None:
  124. config_driver = self._create_from_config(name)
  125. if config_driver is not None:
  126. self._drivers[name] = config_driver
  127. return config_driver
  128. return None
  129. def _create_from_config(self, name: str) -> Optional[FilterDriver]:
  130. """Create a filter driver from config."""
  131. if self.config is None:
  132. return None
  133. clean_cmd: Optional[str] = None
  134. smudge_cmd: Optional[str] = None
  135. # Get clean command
  136. try:
  137. clean_cmd_raw = self.config.get(("filter", name), "clean")
  138. if isinstance(clean_cmd_raw, bytes):
  139. clean_cmd = clean_cmd_raw.decode("utf-8")
  140. else:
  141. clean_cmd = clean_cmd_raw
  142. except KeyError:
  143. pass
  144. # Get smudge command
  145. try:
  146. smudge_cmd_raw = self.config.get(("filter", name), "smudge")
  147. if isinstance(smudge_cmd_raw, bytes):
  148. smudge_cmd = smudge_cmd_raw.decode("utf-8")
  149. else:
  150. smudge_cmd = smudge_cmd_raw
  151. except KeyError:
  152. pass
  153. # Get required flag (defaults to False)
  154. required = self.config.get_boolean(("filter", name), "required", False)
  155. if clean_cmd or smudge_cmd:
  156. return ProcessFilterDriver(clean_cmd, smudge_cmd, required)
  157. return None
  158. def _create_lfs_filter(self, registry: "FilterRegistry") -> FilterDriver:
  159. """Create LFS filter driver."""
  160. from .lfs import LFSFilterDriver, LFSStore
  161. # If we have a repo, use its LFS store
  162. if registry.repo is not None:
  163. lfs_store = LFSStore.from_repo(registry.repo, create=True)
  164. else:
  165. # Fall back to creating a temporary LFS store
  166. import tempfile
  167. lfs_dir = tempfile.mkdtemp(prefix="dulwich-lfs-")
  168. lfs_store = LFSStore.create(lfs_dir)
  169. return LFSFilterDriver(lfs_store)
  170. def _create_text_filter(self, registry: "FilterRegistry") -> FilterDriver:
  171. """Create text filter driver for line ending conversion.
  172. This filter is used when files have the 'text' attribute set explicitly.
  173. It always normalizes line endings on checkin (CRLF -> LF).
  174. """
  175. from .line_ending import (
  176. LineEndingFilter,
  177. convert_crlf_to_lf,
  178. get_smudge_filter,
  179. )
  180. if self.config is None:
  181. # Default text filter: always normalize on checkin
  182. return LineEndingFilter(
  183. clean_conversion=convert_crlf_to_lf,
  184. smudge_conversion=None,
  185. binary_detection=True,
  186. )
  187. # Get core.eol and core.autocrlf settings for smudge behavior
  188. try:
  189. core_eol_raw = self.config.get("core", "eol")
  190. core_eol: str = (
  191. core_eol_raw.decode("ascii")
  192. if isinstance(core_eol_raw, bytes)
  193. else core_eol_raw
  194. )
  195. except KeyError:
  196. core_eol = "native"
  197. # Parse autocrlf as bytes (can be b"true", b"input", or b"false")
  198. try:
  199. autocrlf_raw = self.config.get("core", "autocrlf")
  200. autocrlf: bytes = (
  201. autocrlf_raw.lower()
  202. if isinstance(autocrlf_raw, bytes)
  203. else str(autocrlf_raw).lower().encode("ascii")
  204. )
  205. except KeyError:
  206. autocrlf = b"false"
  207. # For explicit text attribute:
  208. # - Always normalize to LF on checkin (clean)
  209. # - Smudge behavior depends on core.eol and core.autocrlf
  210. smudge_filter = get_smudge_filter(core_eol, autocrlf)
  211. clean_filter = convert_crlf_to_lf
  212. return LineEndingFilter(
  213. clean_conversion=clean_filter,
  214. smudge_conversion=smudge_filter,
  215. binary_detection=True,
  216. )
  217. def _setup_line_ending_filter(self) -> None:
  218. """Automatically register line ending filter if configured."""
  219. if self.config is None:
  220. return
  221. # Parse autocrlf as bytes
  222. try:
  223. autocrlf_raw = self.config.get("core", "autocrlf")
  224. autocrlf: bytes = (
  225. autocrlf_raw.lower()
  226. if isinstance(autocrlf_raw, bytes)
  227. else str(autocrlf_raw).lower().encode("ascii")
  228. )
  229. except KeyError:
  230. return
  231. # If autocrlf is enabled, register the text filter
  232. if autocrlf in (b"true", b"input"):
  233. # Pre-create the text filter so it's available
  234. self.get_driver("text")
  235. def get_filter_for_path(
  236. path: bytes,
  237. gitattributes: "GitAttributes",
  238. filter_registry: FilterRegistry,
  239. ) -> Optional[FilterDriver]:
  240. """Get the appropriate filter driver for a given path.
  241. Args:
  242. path: Path to check
  243. gitattributes: GitAttributes object with parsed patterns
  244. filter_registry: Registry of filter drivers
  245. Returns:
  246. FilterDriver instance or None
  247. """
  248. # Get all attributes for this path
  249. attributes = gitattributes.match_path(path)
  250. # Check if there's a filter attribute
  251. filter_name = attributes.get(b"filter")
  252. if filter_name is not None:
  253. if isinstance(filter_name, bool):
  254. return None
  255. if isinstance(filter_name, bytes):
  256. filter_name_str = filter_name.decode("utf-8")
  257. driver = filter_registry.get_driver(filter_name_str)
  258. # Check if filter is required but missing
  259. if driver is None and filter_registry.config is not None:
  260. required = filter_registry.config.get_boolean(
  261. ("filter", filter_name_str), "required", False
  262. )
  263. if required:
  264. raise FilterError(
  265. f"Required filter '{filter_name_str}' is not available"
  266. )
  267. return driver
  268. return None
  269. # Check for text attribute
  270. text_attr = attributes.get(b"text")
  271. if text_attr is True:
  272. # Use the text filter for line ending conversion
  273. return filter_registry.get_driver("text")
  274. elif text_attr is False:
  275. # -text means binary, no conversion
  276. return None
  277. # If no explicit text attribute, check if autocrlf is enabled
  278. # When autocrlf is true/input, files are treated as text by default
  279. if filter_registry.config is not None:
  280. try:
  281. autocrlf_raw = filter_registry.config.get("core", "autocrlf")
  282. autocrlf: bytes = (
  283. autocrlf_raw.lower()
  284. if isinstance(autocrlf_raw, bytes)
  285. else str(autocrlf_raw).lower().encode("ascii")
  286. )
  287. if autocrlf in (b"true", b"input"):
  288. # Use text filter for files without explicit attributes
  289. return filter_registry.get_driver("text")
  290. except KeyError:
  291. pass
  292. return None
  293. class FilterBlobNormalizer:
  294. """Blob normalizer that applies clean/smudge filters based on gitattributes.
  295. This can be used in addition to or instead of line ending normalization.
  296. """
  297. def __init__(
  298. self,
  299. config_stack: Optional["StackedConfig"],
  300. gitattributes: GitAttributes,
  301. filter_registry: Optional[FilterRegistry] = None,
  302. repo=None,
  303. ) -> None:
  304. self.config_stack = config_stack
  305. self.gitattributes = gitattributes
  306. self.filter_registry = filter_registry or FilterRegistry(config_stack, repo)
  307. def checkin_normalize(self, blob: Blob, path: bytes) -> Blob:
  308. """Apply clean filter during checkin (working tree -> repository)."""
  309. # Get filter for this path
  310. filter_driver = get_filter_for_path(
  311. path, self.gitattributes, self.filter_registry
  312. )
  313. if filter_driver is None:
  314. return blob
  315. # Apply clean filter
  316. filtered_data = filter_driver.clean(blob.data)
  317. if filtered_data == blob.data:
  318. return blob
  319. # Create new blob with filtered data
  320. new_blob = Blob()
  321. new_blob.data = filtered_data
  322. return new_blob
  323. def checkout_normalize(self, blob: Blob, path: bytes) -> Blob:
  324. """Apply smudge filter during checkout (repository -> working tree)."""
  325. # Get filter for this path
  326. filter_driver = get_filter_for_path(
  327. path, self.gitattributes, self.filter_registry
  328. )
  329. if filter_driver is None:
  330. return blob
  331. # Apply smudge filter
  332. filtered_data = filter_driver.smudge(blob.data)
  333. if filtered_data == blob.data:
  334. return blob
  335. # Create new blob with filtered data
  336. new_blob = Blob()
  337. new_blob.data = filtered_data
  338. return new_blob