|
@@ -137,6 +137,7 @@ Sources:
|
|
|
- https://adaptivepatchwork.com/2012/03/01/mind-the-end-of-your-line/
|
|
- https://adaptivepatchwork.com/2012/03/01/mind-the-end-of-your-line/
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
|
|
+import logging
|
|
|
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
|
|
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
|
|
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
if TYPE_CHECKING:
|
|
@@ -153,6 +154,8 @@ from .patch import is_binary
|
|
|
CRLF = b"\r\n"
|
|
CRLF = b"\r\n"
|
|
|
LF = b"\n"
|
|
LF = b"\n"
|
|
|
|
|
|
|
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
|
|
+
|
|
|
|
|
|
|
|
class LineEndingFilter(FilterDriver):
|
|
class LineEndingFilter(FilterDriver):
|
|
|
"""Filter driver for line ending conversion."""
|
|
"""Filter driver for line ending conversion."""
|
|
@@ -162,13 +165,91 @@ class LineEndingFilter(FilterDriver):
|
|
|
clean_conversion: Optional[Callable[[bytes], bytes]] = None,
|
|
clean_conversion: Optional[Callable[[bytes], bytes]] = None,
|
|
|
smudge_conversion: Optional[Callable[[bytes], bytes]] = None,
|
|
smudge_conversion: Optional[Callable[[bytes], bytes]] = None,
|
|
|
binary_detection: bool = True,
|
|
binary_detection: bool = True,
|
|
|
|
|
+ safecrlf: bytes = b"false",
|
|
|
):
|
|
):
|
|
|
"""Initialize LineEndingFilter."""
|
|
"""Initialize LineEndingFilter."""
|
|
|
self.clean_conversion = clean_conversion
|
|
self.clean_conversion = clean_conversion
|
|
|
self.smudge_conversion = smudge_conversion
|
|
self.smudge_conversion = smudge_conversion
|
|
|
self.binary_detection = binary_detection
|
|
self.binary_detection = binary_detection
|
|
|
|
|
+ self.safecrlf = safecrlf
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def from_config(
|
|
|
|
|
+ cls, config: Optional["StackedConfig"], for_text_attr: bool = False
|
|
|
|
|
+ ) -> "LineEndingFilter":
|
|
|
|
|
+ """Create a LineEndingFilter from git configuration.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ config: Git configuration stack
|
|
|
|
|
+ for_text_attr: If True, always normalize on checkin (for text attribute)
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ Configured LineEndingFilter instance
|
|
|
|
|
+ """
|
|
|
|
|
+ if config is None:
|
|
|
|
|
+ # Default filter
|
|
|
|
|
+ if for_text_attr:
|
|
|
|
|
+ # For text attribute: always normalize on checkin
|
|
|
|
|
+ return cls(
|
|
|
|
|
+ clean_conversion=convert_crlf_to_lf,
|
|
|
|
|
+ smudge_conversion=None,
|
|
|
|
|
+ binary_detection=True,
|
|
|
|
|
+ )
|
|
|
|
|
+ else:
|
|
|
|
|
+ # No config: no conversion
|
|
|
|
|
+ return cls()
|
|
|
|
|
+
|
|
|
|
|
+ # Get core.eol setting
|
|
|
|
|
+ try:
|
|
|
|
|
+ core_eol_raw = config.get("core", "eol")
|
|
|
|
|
+ core_eol: str = (
|
|
|
|
|
+ core_eol_raw.decode("ascii")
|
|
|
|
|
+ if isinstance(core_eol_raw, bytes)
|
|
|
|
|
+ else str(core_eol_raw)
|
|
|
|
|
+ )
|
|
|
|
|
+ except KeyError:
|
|
|
|
|
+ core_eol = "native"
|
|
|
|
|
+
|
|
|
|
|
+ # Get core.autocrlf setting
|
|
|
|
|
+ try:
|
|
|
|
|
+ autocrlf_raw = config.get("core", "autocrlf")
|
|
|
|
|
+ autocrlf: bytes = (
|
|
|
|
|
+ autocrlf_raw.lower()
|
|
|
|
|
+ if isinstance(autocrlf_raw, bytes)
|
|
|
|
|
+ else str(autocrlf_raw).lower().encode("ascii")
|
|
|
|
|
+ )
|
|
|
|
|
+ except KeyError:
|
|
|
|
|
+ autocrlf = b"false"
|
|
|
|
|
+
|
|
|
|
|
+ # Get core.safecrlf setting
|
|
|
|
|
+ try:
|
|
|
|
|
+ safecrlf_raw = config.get("core", "safecrlf")
|
|
|
|
|
+ safecrlf = (
|
|
|
|
|
+ safecrlf_raw
|
|
|
|
|
+ if isinstance(safecrlf_raw, bytes)
|
|
|
|
|
+ else safecrlf_raw.encode("utf-8")
|
|
|
|
|
+ )
|
|
|
|
|
+ except KeyError:
|
|
|
|
|
+ safecrlf = b"false"
|
|
|
|
|
+
|
|
|
|
|
+ if for_text_attr:
|
|
|
|
|
+ # For text attribute: always normalize to LF on checkin
|
|
|
|
|
+ # Smudge behavior depends on core.eol and core.autocrlf
|
|
|
|
|
+ smudge_filter = get_smudge_filter(core_eol, autocrlf)
|
|
|
|
|
+ clean_filter: Optional[Callable[[bytes], bytes]] = convert_crlf_to_lf
|
|
|
|
|
+ else:
|
|
|
|
|
+ # Normal autocrlf behavior
|
|
|
|
|
+ smudge_filter = get_smudge_filter(core_eol, autocrlf)
|
|
|
|
|
+ clean_filter = get_clean_filter(core_eol, autocrlf)
|
|
|
|
|
|
|
|
- def clean(self, data: bytes) -> bytes:
|
|
|
|
|
|
|
+ return cls(
|
|
|
|
|
+ clean_conversion=clean_filter,
|
|
|
|
|
+ smudge_conversion=smudge_filter,
|
|
|
|
|
+ binary_detection=True,
|
|
|
|
|
+ safecrlf=safecrlf,
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ def clean(self, data: bytes, path: bytes = b"") -> bytes:
|
|
|
"""Apply line ending conversion for checkin (working tree -> repository)."""
|
|
"""Apply line ending conversion for checkin (working tree -> repository)."""
|
|
|
if self.clean_conversion is None:
|
|
if self.clean_conversion is None:
|
|
|
return data
|
|
return data
|
|
@@ -177,7 +258,13 @@ class LineEndingFilter(FilterDriver):
|
|
|
if self.binary_detection and is_binary(data):
|
|
if self.binary_detection and is_binary(data):
|
|
|
return data
|
|
return data
|
|
|
|
|
|
|
|
- return self.clean_conversion(data)
|
|
|
|
|
|
|
+ converted = self.clean_conversion(data)
|
|
|
|
|
+
|
|
|
|
|
+ # Check if conversion is safe
|
|
|
|
|
+ if self.safecrlf != b"false":
|
|
|
|
|
+ check_safecrlf(data, converted, self.safecrlf, path)
|
|
|
|
|
+
|
|
|
|
|
+ return converted
|
|
|
|
|
|
|
|
def smudge(self, data: bytes, path: bytes = b"") -> bytes:
|
|
def smudge(self, data: bytes, path: bytes = b"") -> bytes:
|
|
|
"""Apply line ending conversion for checkout (repository -> working tree)."""
|
|
"""Apply line ending conversion for checkout (repository -> working tree)."""
|
|
@@ -188,7 +275,13 @@ class LineEndingFilter(FilterDriver):
|
|
|
if self.binary_detection and is_binary(data):
|
|
if self.binary_detection and is_binary(data):
|
|
|
return data
|
|
return data
|
|
|
|
|
|
|
|
- return self.smudge_conversion(data)
|
|
|
|
|
|
|
+ converted = self.smudge_conversion(data)
|
|
|
|
|
+
|
|
|
|
|
+ # Check if conversion is safe
|
|
|
|
|
+ if self.safecrlf != b"false":
|
|
|
|
|
+ check_safecrlf(data, converted, self.safecrlf, path)
|
|
|
|
|
+
|
|
|
|
|
+ return converted
|
|
|
|
|
|
|
|
def cleanup(self) -> None:
|
|
def cleanup(self) -> None:
|
|
|
"""Clean up any resources held by this filter driver."""
|
|
"""Clean up any resources held by this filter driver."""
|
|
@@ -231,6 +324,52 @@ def convert_lf_to_crlf(text_hunk: bytes) -> bytes:
|
|
|
return CRLF.join(cleaned_parts)
|
|
return CRLF.join(cleaned_parts)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+def check_safecrlf(
|
|
|
|
|
+ original: bytes, converted: bytes, safecrlf: bytes, path: bytes = b""
|
|
|
|
|
+) -> None:
|
|
|
|
|
+ """Check if CRLF conversion is safe according to core.safecrlf setting.
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ original: Original content before conversion
|
|
|
|
|
+ converted: Content after conversion
|
|
|
|
|
+ safecrlf: Value of core.safecrlf config (b"true", b"warn", or b"false")
|
|
|
|
|
+ path: Path to the file being checked (for error messages)
|
|
|
|
|
+
|
|
|
|
|
+ Raises:
|
|
|
|
|
+ ValueError: If safecrlf is "true" and conversion would lose data
|
|
|
|
|
+ """
|
|
|
|
|
+ if safecrlf == b"false":
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # Check if conversion is reversible
|
|
|
|
|
+ if safecrlf in (b"true", b"warn"):
|
|
|
|
|
+ # For CRLF->LF conversion, check if converting back would recover original
|
|
|
|
|
+ if CRLF in original and CRLF not in converted:
|
|
|
|
|
+ # This was a CRLF->LF conversion
|
|
|
|
|
+ recovered = convert_lf_to_crlf(converted)
|
|
|
|
|
+ if recovered != original:
|
|
|
|
|
+ msg = (
|
|
|
|
|
+ f"CRLF would be replaced by LF in {path.decode('utf-8', 'replace')}"
|
|
|
|
|
+ )
|
|
|
|
|
+ if safecrlf == b"true":
|
|
|
|
|
+ raise ValueError(msg)
|
|
|
|
|
+ else: # warn
|
|
|
|
|
+ logger.warning(msg)
|
|
|
|
|
+
|
|
|
|
|
+ # For LF->CRLF conversion, check if converting back would recover original
|
|
|
|
|
+ elif LF in original and CRLF in converted and CRLF not in original:
|
|
|
|
|
+ # This was a LF->CRLF conversion
|
|
|
|
|
+ recovered = convert_crlf_to_lf(converted)
|
|
|
|
|
+ if recovered != original:
|
|
|
|
|
+ msg = (
|
|
|
|
|
+ f"LF would be replaced by CRLF in {path.decode('utf-8', 'replace')}"
|
|
|
|
|
+ )
|
|
|
|
|
+ if safecrlf == b"true":
|
|
|
|
|
+ raise ValueError(msg)
|
|
|
|
|
+ else: # warn
|
|
|
|
|
+ logger.warning(msg)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
def get_smudge_filter(
|
|
def get_smudge_filter(
|
|
|
core_eol: str, core_autocrlf: bytes
|
|
core_eol: str, core_autocrlf: bytes
|
|
|
) -> Optional[Callable[[bytes], bytes]]:
|
|
) -> Optional[Callable[[bytes], bytes]]:
|
|
@@ -345,6 +484,7 @@ class BlobNormalizer(FilterBlobNormalizer):
|
|
|
gitattributes: dict[str, Any],
|
|
gitattributes: dict[str, Any],
|
|
|
core_eol: str = "native",
|
|
core_eol: str = "native",
|
|
|
autocrlf: bytes = b"false",
|
|
autocrlf: bytes = b"false",
|
|
|
|
|
+ safecrlf: bytes = b"false",
|
|
|
) -> None:
|
|
) -> None:
|
|
|
"""Initialize FilteringBlobNormalizer."""
|
|
"""Initialize FilteringBlobNormalizer."""
|
|
|
# Set up a filter registry with line ending filters
|
|
# Set up a filter registry with line ending filters
|
|
@@ -360,6 +500,7 @@ class BlobNormalizer(FilterBlobNormalizer):
|
|
|
clean_conversion=clean_filter or convert_crlf_to_lf,
|
|
clean_conversion=clean_filter or convert_crlf_to_lf,
|
|
|
smudge_conversion=smudge_filter or convert_lf_to_crlf,
|
|
smudge_conversion=smudge_filter or convert_lf_to_crlf,
|
|
|
binary_detection=True,
|
|
binary_detection=True,
|
|
|
|
|
+ safecrlf=safecrlf,
|
|
|
)
|
|
)
|
|
|
filter_registry.register_driver("text", line_ending_filter)
|
|
filter_registry.register_driver("text", line_ending_filter)
|
|
|
|
|
|
|
@@ -398,12 +539,24 @@ class BlobNormalizer(FilterBlobNormalizer):
|
|
|
# (autocrlf is enabled), apply it to all files
|
|
# (autocrlf is enabled), apply it to all files
|
|
|
if result is blob and self.fallback_write_filter is not None:
|
|
if result is blob and self.fallback_write_filter is not None:
|
|
|
# Apply the clean filter with binary detection
|
|
# Apply the clean filter with binary detection
|
|
|
|
|
+ # Get safecrlf from config
|
|
|
|
|
+ safecrlf = b"false"
|
|
|
|
|
+ if hasattr(self, "filter_registry") and hasattr(
|
|
|
|
|
+ self.filter_registry, "config_stack"
|
|
|
|
|
+ ):
|
|
|
|
|
+ safecrlf = self.filter_registry.config_stack.get(
|
|
|
|
|
+ b"core", b"safecrlf", b"false"
|
|
|
|
|
+ )
|
|
|
|
|
+ if hasattr(safecrlf, "encode"):
|
|
|
|
|
+ safecrlf = safecrlf.encode("utf-8")
|
|
|
|
|
+
|
|
|
line_ending_filter = LineEndingFilter(
|
|
line_ending_filter = LineEndingFilter(
|
|
|
clean_conversion=self.fallback_write_filter,
|
|
clean_conversion=self.fallback_write_filter,
|
|
|
smudge_conversion=None,
|
|
smudge_conversion=None,
|
|
|
binary_detection=True,
|
|
binary_detection=True,
|
|
|
|
|
+ safecrlf=safecrlf,
|
|
|
)
|
|
)
|
|
|
- filtered_data = line_ending_filter.clean(blob.data)
|
|
|
|
|
|
|
+ filtered_data = line_ending_filter.clean(blob.data, tree_path)
|
|
|
if filtered_data != blob.data:
|
|
if filtered_data != blob.data:
|
|
|
new_blob = Blob()
|
|
new_blob = Blob()
|
|
|
new_blob.data = filtered_data
|
|
new_blob.data = filtered_data
|
|
@@ -426,12 +579,24 @@ class BlobNormalizer(FilterBlobNormalizer):
|
|
|
# (autocrlf is enabled), apply it to all files
|
|
# (autocrlf is enabled), apply it to all files
|
|
|
if result is blob and self.fallback_read_filter is not None:
|
|
if result is blob and self.fallback_read_filter is not None:
|
|
|
# Apply the smudge filter with binary detection
|
|
# Apply the smudge filter with binary detection
|
|
|
|
|
+ # Get safecrlf from config
|
|
|
|
|
+ safecrlf = b"false"
|
|
|
|
|
+ if hasattr(self, "filter_registry") and hasattr(
|
|
|
|
|
+ self.filter_registry, "config_stack"
|
|
|
|
|
+ ):
|
|
|
|
|
+ safecrlf = self.filter_registry.config_stack.get(
|
|
|
|
|
+ b"core", b"safecrlf", b"false"
|
|
|
|
|
+ )
|
|
|
|
|
+ if hasattr(safecrlf, "encode"):
|
|
|
|
|
+ safecrlf = safecrlf.encode("utf-8")
|
|
|
|
|
+
|
|
|
line_ending_filter = LineEndingFilter(
|
|
line_ending_filter = LineEndingFilter(
|
|
|
clean_conversion=None,
|
|
clean_conversion=None,
|
|
|
smudge_conversion=self.fallback_read_filter,
|
|
smudge_conversion=self.fallback_read_filter,
|
|
|
binary_detection=True,
|
|
binary_detection=True,
|
|
|
|
|
+ safecrlf=safecrlf,
|
|
|
)
|
|
)
|
|
|
- filtered_data = line_ending_filter.smudge(blob.data)
|
|
|
|
|
|
|
+ filtered_data = line_ending_filter.smudge(blob.data, tree_path)
|
|
|
if filtered_data != blob.data:
|
|
if filtered_data != blob.data:
|
|
|
new_blob = Blob()
|
|
new_blob = Blob()
|
|
|
new_blob.data = filtered_data
|
|
new_blob.data = filtered_data
|
|
@@ -474,9 +639,10 @@ class TreeBlobNormalizer(BlobNormalizer):
|
|
|
tree: Optional[ObjectID] = None,
|
|
tree: Optional[ObjectID] = None,
|
|
|
core_eol: str = "native",
|
|
core_eol: str = "native",
|
|
|
autocrlf: bytes = b"false",
|
|
autocrlf: bytes = b"false",
|
|
|
|
|
+ safecrlf: bytes = b"false",
|
|
|
) -> None:
|
|
) -> None:
|
|
|
"""Initialize TreeBlobNormalizer."""
|
|
"""Initialize TreeBlobNormalizer."""
|
|
|
- super().__init__(config_stack, git_attributes, core_eol, autocrlf)
|
|
|
|
|
|
|
+ super().__init__(config_stack, git_attributes, core_eol, autocrlf, safecrlf)
|
|
|
if tree:
|
|
if tree:
|
|
|
self.existing_paths = {
|
|
self.existing_paths = {
|
|
|
name for name, _, _ in iter_tree_contents(object_store, tree)
|
|
name for name, _, _ in iter_tree_contents(object_store, tree)
|