|
@@ -26,11 +26,11 @@ about how it seems to work.
|
|
|
The normalization is a two-fold process that happens at two moments:
|
|
|
|
|
|
- When reading a file from the index and to the working directory. For example
|
|
|
- when doing a ``git clone`` or ``git checkout`` call. We call this process the
|
|
|
- read filter in this module.
|
|
|
+ when doing a ``git clone`` or ``git checkout`` call. This is called the
|
|
|
+ smudge filter (repository -> working tree).
|
|
|
- When writing a file to the index from the working directory. For example
|
|
|
- when doing a ``git add`` call. We call this process the write filter in this
|
|
|
- module.
|
|
|
+ when doing a ``git add`` call. This is called the clean filter (working tree
|
|
|
+ -> repository).
|
|
|
|
|
|
Note that when checking status (getting unstaged changes), whether or not
|
|
|
normalization is done on write depends on whether or not the file in the
|
|
@@ -108,13 +108,13 @@ attribute defined in ``.gitattributes``; it takes three possible values:
|
|
|
line-endings in the working directory and convert line-endings to LF
|
|
|
when writing to the index. When autocrlf is set to true, eol value is
|
|
|
ignored.
|
|
|
- - ``input``: Quite similar to the ``true`` value but only force the write
|
|
|
+ - ``input``: Quite similar to the ``true`` value but only applies the clean
|
|
|
filter, ie line-ending of new files added to the index will get their
|
|
|
line-endings converted to LF.
|
|
|
- ``false`` (default): No normalization is done.
|
|
|
|
|
|
``core.eol`` is the top-level configuration to define the line-ending to use
|
|
|
-when applying the read_filer. It takes three possible values:
|
|
|
+when applying the smudge filter. It takes three possible values:
|
|
|
|
|
|
- ``lf``: When normalization is done, force line-endings to be ``LF`` in the
|
|
|
working directory.
|
|
@@ -143,6 +143,9 @@ if TYPE_CHECKING:
|
|
|
from .config import StackedConfig
|
|
|
from .object_store import BaseObjectStore
|
|
|
|
|
|
+from . import replace_me
|
|
|
+from .attrs import GitAttributes, Pattern
|
|
|
+from .filters import FilterBlobNormalizer, FilterDriver, FilterRegistry
|
|
|
from .object_store import iter_tree_contents
|
|
|
from .objects import Blob, ObjectID
|
|
|
from .patch import is_binary
|
|
@@ -151,6 +154,42 @@ CRLF = b"\r\n"
|
|
|
LF = b"\n"
|
|
|
|
|
|
|
|
|
+class LineEndingFilter(FilterDriver):
|
|
|
+ """Filter driver for line ending conversion."""
|
|
|
+
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ clean_conversion: Optional[Callable[[bytes], bytes]] = None,
|
|
|
+ smudge_conversion: Optional[Callable[[bytes], bytes]] = None,
|
|
|
+ binary_detection: bool = True,
|
|
|
+ ):
|
|
|
+ self.clean_conversion = clean_conversion
|
|
|
+ self.smudge_conversion = smudge_conversion
|
|
|
+ self.binary_detection = binary_detection
|
|
|
+
|
|
|
+ def clean(self, data: bytes) -> bytes:
|
|
|
+ """Apply line ending conversion for checkin (working tree -> repository)."""
|
|
|
+ if self.clean_conversion is None:
|
|
|
+ return data
|
|
|
+
|
|
|
+ # Skip binary files if detection is enabled
|
|
|
+ if self.binary_detection and is_binary(data):
|
|
|
+ return data
|
|
|
+
|
|
|
+ return self.clean_conversion(data)
|
|
|
+
|
|
|
+ def smudge(self, data: bytes) -> bytes:
|
|
|
+ """Apply line ending conversion for checkout (repository -> working tree)."""
|
|
|
+ if self.smudge_conversion is None:
|
|
|
+ return data
|
|
|
+
|
|
|
+ # Skip binary files if detection is enabled
|
|
|
+ if self.binary_detection and is_binary(data):
|
|
|
+ return data
|
|
|
+
|
|
|
+ return self.smudge_conversion(data)
|
|
|
+
|
|
|
+
|
|
|
def convert_crlf_to_lf(text_hunk: bytes) -> bytes:
|
|
|
"""Convert CRLF in text hunk into LF.
|
|
|
|
|
@@ -181,46 +220,26 @@ def convert_lf_to_crlf(text_hunk: bytes) -> bytes:
|
|
|
return CRLF.join(cleaned_parts)
|
|
|
|
|
|
|
|
|
-def get_checkout_filter(
|
|
|
- core_eol: str, core_autocrlf: Union[bool, str], git_attributes: dict[str, Any]
|
|
|
+def get_smudge_filter(
|
|
|
+ core_eol: str, core_autocrlf: bytes
|
|
|
) -> Optional[Callable[[bytes], bytes]]:
|
|
|
- """Returns the correct checkout filter based on the passed arguments."""
|
|
|
- # TODO this function should process the git_attributes for the path and if
|
|
|
- # the text attribute is not defined, fallback on the
|
|
|
- # get_checkout_filter_autocrlf function with the autocrlf value
|
|
|
- if isinstance(core_autocrlf, bool):
|
|
|
- autocrlf_bytes = b"true" if core_autocrlf else b"false"
|
|
|
- else:
|
|
|
- autocrlf_bytes = (
|
|
|
- core_autocrlf.encode("ascii")
|
|
|
- if isinstance(core_autocrlf, str)
|
|
|
- else core_autocrlf
|
|
|
- )
|
|
|
- return get_checkout_filter_autocrlf(autocrlf_bytes)
|
|
|
+ """Returns the correct smudge filter based on the passed arguments."""
|
|
|
+ # Git attributes handling is done by the filter infrastructure
|
|
|
+ return get_smudge_filter_autocrlf(core_autocrlf)
|
|
|
|
|
|
|
|
|
-def get_checkin_filter(
|
|
|
- core_eol: str, core_autocrlf: Union[bool, str], git_attributes: dict[str, Any]
|
|
|
+def get_clean_filter(
|
|
|
+ core_eol: str, core_autocrlf: bytes
|
|
|
) -> Optional[Callable[[bytes], bytes]]:
|
|
|
- """Returns the correct checkin filter based on the passed arguments."""
|
|
|
- # TODO this function should process the git_attributes for the path and if
|
|
|
- # the text attribute is not defined, fallback on the
|
|
|
- # get_checkin_filter_autocrlf function with the autocrlf value
|
|
|
- if isinstance(core_autocrlf, bool):
|
|
|
- autocrlf_bytes = b"true" if core_autocrlf else b"false"
|
|
|
- else:
|
|
|
- autocrlf_bytes = (
|
|
|
- core_autocrlf.encode("ascii")
|
|
|
- if isinstance(core_autocrlf, str)
|
|
|
- else core_autocrlf
|
|
|
- )
|
|
|
- return get_checkin_filter_autocrlf(autocrlf_bytes)
|
|
|
+ """Returns the correct clean filter based on the passed arguments."""
|
|
|
+ # Git attributes handling is done by the filter infrastructure
|
|
|
+ return get_clean_filter_autocrlf(core_autocrlf)
|
|
|
|
|
|
|
|
|
-def get_checkout_filter_autocrlf(
|
|
|
+def get_smudge_filter_autocrlf(
|
|
|
core_autocrlf: bytes,
|
|
|
) -> Optional[Callable[[bytes], bytes]]:
|
|
|
- """Returns the correct checkout filter base on autocrlf value.
|
|
|
+ """Returns the correct smudge filter base on autocrlf value.
|
|
|
|
|
|
Args:
|
|
|
core_autocrlf: The bytes configuration value of core.autocrlf.
|
|
@@ -234,10 +253,10 @@ def get_checkout_filter_autocrlf(
|
|
|
return None
|
|
|
|
|
|
|
|
|
-def get_checkin_filter_autocrlf(
|
|
|
+def get_clean_filter_autocrlf(
|
|
|
core_autocrlf: bytes,
|
|
|
) -> Optional[Callable[[bytes], bytes]]:
|
|
|
- """Returns the correct checkin filter base on autocrlf value.
|
|
|
+ """Returns the correct clean filter base on autocrlf value.
|
|
|
|
|
|
Args:
|
|
|
core_autocrlf: The bytes configuration value of core.autocrlf.
|
|
@@ -252,63 +271,162 @@ def get_checkin_filter_autocrlf(
|
|
|
return None
|
|
|
|
|
|
|
|
|
-class BlobNormalizer:
|
|
|
+# Backwards compatibility wrappers
|
|
|
+@replace_me(since="0.23.1", remove_in="0.25.0")
|
|
|
+def get_checkout_filter(
|
|
|
+ core_eol: str, core_autocrlf: Union[bool, str], git_attributes: dict[str, Any]
|
|
|
+) -> Optional[Callable[[bytes], bytes]]:
|
|
|
+ """Deprecated: Use get_smudge_filter instead."""
|
|
|
+ # Convert core_autocrlf to bytes for compatibility
|
|
|
+ if isinstance(core_autocrlf, bool):
|
|
|
+ autocrlf_bytes = b"true" if core_autocrlf else b"false"
|
|
|
+ else:
|
|
|
+ autocrlf_bytes = (
|
|
|
+ core_autocrlf.encode("utf-8")
|
|
|
+ if isinstance(core_autocrlf, str)
|
|
|
+ else core_autocrlf
|
|
|
+ )
|
|
|
+ return get_smudge_filter(core_eol, autocrlf_bytes)
|
|
|
+
|
|
|
+
|
|
|
+@replace_me(since="0.23.1", remove_in="0.25.0")
|
|
|
+def get_checkin_filter(
|
|
|
+ core_eol: str, core_autocrlf: Union[bool, str], git_attributes: dict[str, Any]
|
|
|
+) -> Optional[Callable[[bytes], bytes]]:
|
|
|
+ """Deprecated: Use get_clean_filter instead."""
|
|
|
+ # Convert core_autocrlf to bytes for compatibility
|
|
|
+ if isinstance(core_autocrlf, bool):
|
|
|
+ autocrlf_bytes = b"true" if core_autocrlf else b"false"
|
|
|
+ else:
|
|
|
+ autocrlf_bytes = (
|
|
|
+ core_autocrlf.encode("utf-8")
|
|
|
+ if isinstance(core_autocrlf, str)
|
|
|
+ else core_autocrlf
|
|
|
+ )
|
|
|
+ return get_clean_filter(core_eol, autocrlf_bytes)
|
|
|
+
|
|
|
+
|
|
|
+@replace_me(since="0.23.1", remove_in="0.25.0")
|
|
|
+def get_checkout_filter_autocrlf(
|
|
|
+ core_autocrlf: bytes,
|
|
|
+) -> Optional[Callable[[bytes], bytes]]:
|
|
|
+ """Deprecated: Use get_smudge_filter_autocrlf instead."""
|
|
|
+ return get_smudge_filter_autocrlf(core_autocrlf)
|
|
|
+
|
|
|
+
|
|
|
+@replace_me(since="0.23.1", remove_in="0.25.0")
|
|
|
+def get_checkin_filter_autocrlf(
|
|
|
+ core_autocrlf: bytes,
|
|
|
+) -> Optional[Callable[[bytes], bytes]]:
|
|
|
+ """Deprecated: Use get_clean_filter_autocrlf instead."""
|
|
|
+ return get_clean_filter_autocrlf(core_autocrlf)
|
|
|
+
|
|
|
+
|
|
|
+class BlobNormalizer(FilterBlobNormalizer):
|
|
|
"""An object to store computation result of which filter to apply based
|
|
|
on configuration, gitattributes, path and operation (checkin or checkout).
|
|
|
+
|
|
|
+ This class maintains backward compatibility while using the filter infrastructure.
|
|
|
"""
|
|
|
|
|
|
def __init__(
|
|
|
- self, config_stack: "StackedConfig", gitattributes: dict[str, Any]
|
|
|
+ self,
|
|
|
+ config_stack: "StackedConfig",
|
|
|
+ gitattributes: dict[str, Any],
|
|
|
+ core_eol: str = "native",
|
|
|
+ autocrlf: bytes = b"false",
|
|
|
) -> None:
|
|
|
- self.config_stack = config_stack
|
|
|
- self.gitattributes = gitattributes
|
|
|
-
|
|
|
- # Compute which filters we needs based on parameters
|
|
|
- try:
|
|
|
- core_eol_raw = config_stack.get("core", "eol")
|
|
|
- core_eol: str = (
|
|
|
- core_eol_raw.decode("ascii")
|
|
|
- if isinstance(core_eol_raw, bytes)
|
|
|
- else core_eol_raw
|
|
|
- )
|
|
|
- except KeyError:
|
|
|
- core_eol = "native"
|
|
|
-
|
|
|
- try:
|
|
|
- core_autocrlf_raw = config_stack.get("core", "autocrlf")
|
|
|
- if isinstance(core_autocrlf_raw, bytes):
|
|
|
- core_autocrlf: Union[bool, str] = core_autocrlf_raw.decode(
|
|
|
- "ascii"
|
|
|
- ).lower()
|
|
|
+ # Set up a filter registry with line ending filters
|
|
|
+ filter_registry = FilterRegistry(config_stack)
|
|
|
+
|
|
|
+ # Create line ending filter if needed
|
|
|
+ smudge_filter = get_smudge_filter(core_eol, autocrlf)
|
|
|
+ clean_filter = get_clean_filter(core_eol, autocrlf)
|
|
|
+
|
|
|
+ # Always register a text filter that can be used by gitattributes
|
|
|
+ # Even if autocrlf is false, gitattributes text=true should work
|
|
|
+ line_ending_filter = LineEndingFilter(
|
|
|
+ clean_conversion=clean_filter or convert_crlf_to_lf,
|
|
|
+ smudge_conversion=smudge_filter or convert_lf_to_crlf,
|
|
|
+ binary_detection=True,
|
|
|
+ )
|
|
|
+ filter_registry.register_driver("text", line_ending_filter)
|
|
|
+
|
|
|
+ # Convert dict gitattributes to GitAttributes object for parent class
|
|
|
+ git_attrs_patterns = []
|
|
|
+ for pattern_str, attrs in gitattributes.items():
|
|
|
+ if isinstance(pattern_str, str):
|
|
|
+ pattern_bytes = pattern_str.encode("utf-8")
|
|
|
else:
|
|
|
- core_autocrlf = core_autocrlf_raw.lower()
|
|
|
- except KeyError:
|
|
|
- core_autocrlf = False
|
|
|
+ pattern_bytes = pattern_str
|
|
|
+ pattern = Pattern(pattern_bytes)
|
|
|
+ git_attrs_patterns.append((pattern, attrs))
|
|
|
|
|
|
- self.fallback_read_filter = get_checkout_filter(
|
|
|
- core_eol, core_autocrlf, self.gitattributes
|
|
|
- )
|
|
|
- self.fallback_write_filter = get_checkin_filter(
|
|
|
- core_eol, core_autocrlf, self.gitattributes
|
|
|
- )
|
|
|
+ git_attributes = GitAttributes(git_attrs_patterns)
|
|
|
+
|
|
|
+ # Initialize parent class with gitattributes
|
|
|
+ # The filter infrastructure will handle gitattributes processing
|
|
|
+ super().__init__(config_stack, git_attributes, filter_registry)
|
|
|
+
|
|
|
+ # Store original filters for backward compatibility
|
|
|
+ self.fallback_read_filter = smudge_filter
|
|
|
+ self.fallback_write_filter = clean_filter
|
|
|
|
|
|
def checkin_normalize(self, blob: Blob, tree_path: bytes) -> Blob:
|
|
|
"""Normalize a blob during a checkin operation."""
|
|
|
- if self.fallback_write_filter is not None:
|
|
|
- return normalize_blob(
|
|
|
- blob, self.fallback_write_filter, binary_detection=True
|
|
|
+ # First try to get filter from gitattributes (handled by parent)
|
|
|
+ result = super().checkin_normalize(blob, tree_path)
|
|
|
+
|
|
|
+ # Check if gitattributes explicitly disabled text conversion
|
|
|
+ attrs = self.gitattributes.match_path(tree_path)
|
|
|
+ if b"text" in attrs and attrs[b"text"] is False:
|
|
|
+ # Explicitly marked as binary, no conversion
|
|
|
+ return blob
|
|
|
+
|
|
|
+ # If no filter was applied via gitattributes and we have a fallback filter
|
|
|
+ # (autocrlf is enabled), apply it to all files
|
|
|
+ if result is blob and self.fallback_write_filter is not None:
|
|
|
+ # Apply the clean filter with binary detection
|
|
|
+ line_ending_filter = LineEndingFilter(
|
|
|
+ clean_conversion=self.fallback_write_filter,
|
|
|
+ smudge_conversion=None,
|
|
|
+ binary_detection=True,
|
|
|
)
|
|
|
+ filtered_data = line_ending_filter.clean(blob.data)
|
|
|
+ if filtered_data != blob.data:
|
|
|
+ new_blob = Blob()
|
|
|
+ new_blob.data = filtered_data
|
|
|
+ return new_blob
|
|
|
|
|
|
- return blob
|
|
|
+ return result
|
|
|
|
|
|
def checkout_normalize(self, blob: Blob, tree_path: bytes) -> Blob:
|
|
|
"""Normalize a blob during a checkout operation."""
|
|
|
- if self.fallback_read_filter is not None:
|
|
|
- return normalize_blob(
|
|
|
- blob, self.fallback_read_filter, binary_detection=True
|
|
|
+ # First try to get filter from gitattributes (handled by parent)
|
|
|
+ result = super().checkout_normalize(blob, tree_path)
|
|
|
+
|
|
|
+ # Check if gitattributes explicitly disabled text conversion
|
|
|
+ attrs = self.gitattributes.match_path(tree_path)
|
|
|
+ if b"text" in attrs and attrs[b"text"] is False:
|
|
|
+ # Explicitly marked as binary, no conversion
|
|
|
+ return blob
|
|
|
+
|
|
|
+ # If no filter was applied via gitattributes and we have a fallback filter
|
|
|
+ # (autocrlf is enabled), apply it to all files
|
|
|
+ if result is blob and self.fallback_read_filter is not None:
|
|
|
+ # Apply the smudge filter with binary detection
|
|
|
+ line_ending_filter = LineEndingFilter(
|
|
|
+ clean_conversion=None,
|
|
|
+ smudge_conversion=self.fallback_read_filter,
|
|
|
+ binary_detection=True,
|
|
|
)
|
|
|
+ filtered_data = line_ending_filter.smudge(blob.data)
|
|
|
+ if filtered_data != blob.data:
|
|
|
+ new_blob = Blob()
|
|
|
+ new_blob.data = filtered_data
|
|
|
+ return new_blob
|
|
|
|
|
|
- return blob
|
|
|
+ return result
|
|
|
|
|
|
|
|
|
def normalize_blob(
|
|
@@ -344,8 +462,10 @@ class TreeBlobNormalizer(BlobNormalizer):
|
|
|
git_attributes: dict[str, Any],
|
|
|
object_store: "BaseObjectStore",
|
|
|
tree: Optional[ObjectID] = None,
|
|
|
+ core_eol: str = "native",
|
|
|
+ autocrlf: bytes = b"false",
|
|
|
) -> None:
|
|
|
- super().__init__(config_stack, git_attributes)
|
|
|
+ super().__init__(config_stack, git_attributes, core_eol, autocrlf)
|
|
|
if tree:
|
|
|
self.existing_paths = {
|
|
|
name for name, _, _ in iter_tree_contents(object_store, tree)
|