|
@@ -137,15 +137,21 @@ Sources:
|
|
|
- https://adaptivepatchwork.com/2012/03/01/mind-the-end-of-your-line/
|
|
|
"""
|
|
|
|
|
|
+from typing import TYPE_CHECKING, Any, Callable, Optional, Union
|
|
|
+
|
|
|
+if TYPE_CHECKING:
|
|
|
+ from .config import StackedConfig
|
|
|
+ from .object_store import BaseObjectStore
|
|
|
+
|
|
|
from .object_store import iter_tree_contents
|
|
|
-from .objects import Blob
|
|
|
+from .objects import Blob, ObjectID
|
|
|
from .patch import is_binary
|
|
|
|
|
|
CRLF = b"\r\n"
|
|
|
LF = b"\n"
|
|
|
|
|
|
|
|
|
-def convert_crlf_to_lf(text_hunk):
|
|
|
+def convert_crlf_to_lf(text_hunk: bytes) -> bytes:
|
|
|
"""Convert CRLF in text hunk into LF.
|
|
|
|
|
|
Args:
|
|
@@ -155,7 +161,7 @@ def convert_crlf_to_lf(text_hunk):
|
|
|
return text_hunk.replace(CRLF, LF)
|
|
|
|
|
|
|
|
|
-def convert_lf_to_crlf(text_hunk):
|
|
|
+def convert_lf_to_crlf(text_hunk: bytes) -> bytes:
|
|
|
"""Convert LF in text hunk into CRLF.
|
|
|
|
|
|
Args:
|
|
@@ -167,23 +173,45 @@ def convert_lf_to_crlf(text_hunk):
|
|
|
return intermediary.replace(LF, CRLF)
|
|
|
|
|
|
|
|
|
-def get_checkout_filter(core_eol, core_autocrlf, git_attributes):
|
|
|
+def get_checkout_filter(
|
|
|
+ core_eol: str, core_autocrlf: Union[bool, str], git_attributes: dict[str, Any]
|
|
|
+) -> Optional[Callable[[bytes], bytes]]:
|
|
|
"""Returns the correct checkout filter based on the passed arguments."""
|
|
|
# TODO this function should process the git_attributes for the path and if
|
|
|
# the text attribute is not defined, fallback on the
|
|
|
# get_checkout_filter_autocrlf function with the autocrlf value
|
|
|
- return get_checkout_filter_autocrlf(core_autocrlf)
|
|
|
+ if isinstance(core_autocrlf, bool):
|
|
|
+ autocrlf_bytes = b"true" if core_autocrlf else b"false"
|
|
|
+ else:
|
|
|
+ autocrlf_bytes = (
|
|
|
+ core_autocrlf.encode("ascii")
|
|
|
+ if isinstance(core_autocrlf, str)
|
|
|
+ else core_autocrlf
|
|
|
+ )
|
|
|
+ return get_checkout_filter_autocrlf(autocrlf_bytes)
|
|
|
|
|
|
|
|
|
-def get_checkin_filter(core_eol, core_autocrlf, git_attributes):
|
|
|
+def get_checkin_filter(
|
|
|
+ core_eol: str, core_autocrlf: Union[bool, str], git_attributes: dict[str, Any]
|
|
|
+) -> Optional[Callable[[bytes], bytes]]:
|
|
|
"""Returns the correct checkin filter based on the passed arguments."""
|
|
|
# TODO this function should process the git_attributes for the path and if
|
|
|
# the text attribute is not defined, fallback on the
|
|
|
# get_checkin_filter_autocrlf function with the autocrlf value
|
|
|
- return get_checkin_filter_autocrlf(core_autocrlf)
|
|
|
+ if isinstance(core_autocrlf, bool):
|
|
|
+ autocrlf_bytes = b"true" if core_autocrlf else b"false"
|
|
|
+ else:
|
|
|
+ autocrlf_bytes = (
|
|
|
+ core_autocrlf.encode("ascii")
|
|
|
+ if isinstance(core_autocrlf, str)
|
|
|
+ else core_autocrlf
|
|
|
+ )
|
|
|
+ return get_checkin_filter_autocrlf(autocrlf_bytes)
|
|
|
|
|
|
|
|
|
-def get_checkout_filter_autocrlf(core_autocrlf):
|
|
|
+def get_checkout_filter_autocrlf(
|
|
|
+ core_autocrlf: bytes,
|
|
|
+) -> Optional[Callable[[bytes], bytes]]:
|
|
|
"""Returns the correct checkout filter base on autocrlf value.
|
|
|
|
|
|
Args:
|
|
@@ -198,7 +226,9 @@ def get_checkout_filter_autocrlf(core_autocrlf):
|
|
|
return None
|
|
|
|
|
|
|
|
|
-def get_checkin_filter_autocrlf(core_autocrlf):
|
|
|
+def get_checkin_filter_autocrlf(
|
|
|
+ core_autocrlf: bytes,
|
|
|
+) -> Optional[Callable[[bytes], bytes]]:
|
|
|
"""Returns the correct checkin filter base on autocrlf value.
|
|
|
|
|
|
Args:
|
|
@@ -219,18 +249,31 @@ class BlobNormalizer:
|
|
|
on configuration, gitattributes, path and operation (checkin or checkout).
|
|
|
"""
|
|
|
|
|
|
- def __init__(self, config_stack, gitattributes) -> None:
|
|
|
+ def __init__(
|
|
|
+ self, config_stack: "StackedConfig", gitattributes: dict[str, Any]
|
|
|
+ ) -> None:
|
|
|
self.config_stack = config_stack
|
|
|
self.gitattributes = gitattributes
|
|
|
|
|
|
# Compute which filters we needs based on parameters
|
|
|
try:
|
|
|
- core_eol = config_stack.get("core", "eol")
|
|
|
+ core_eol_raw = config_stack.get("core", "eol")
|
|
|
+ core_eol: str = (
|
|
|
+ core_eol_raw.decode("ascii")
|
|
|
+ if isinstance(core_eol_raw, bytes)
|
|
|
+ else core_eol_raw
|
|
|
+ )
|
|
|
except KeyError:
|
|
|
core_eol = "native"
|
|
|
|
|
|
try:
|
|
|
- core_autocrlf = config_stack.get("core", "autocrlf").lower()
|
|
|
+ core_autocrlf_raw = config_stack.get("core", "autocrlf")
|
|
|
+ if isinstance(core_autocrlf_raw, bytes):
|
|
|
+ core_autocrlf: Union[bool, str] = core_autocrlf_raw.decode(
|
|
|
+ "ascii"
|
|
|
+ ).lower()
|
|
|
+ else:
|
|
|
+ core_autocrlf = core_autocrlf_raw.lower()
|
|
|
except KeyError:
|
|
|
core_autocrlf = False
|
|
|
|
|
@@ -241,7 +284,7 @@ class BlobNormalizer:
|
|
|
core_eol, core_autocrlf, self.gitattributes
|
|
|
)
|
|
|
|
|
|
- def checkin_normalize(self, blob, tree_path):
|
|
|
+ def checkin_normalize(self, blob: Blob, tree_path: bytes) -> Blob:
|
|
|
"""Normalize a blob during a checkin operation."""
|
|
|
if self.fallback_write_filter is not None:
|
|
|
return normalize_blob(
|
|
@@ -250,7 +293,7 @@ class BlobNormalizer:
|
|
|
|
|
|
return blob
|
|
|
|
|
|
- def checkout_normalize(self, blob, tree_path):
|
|
|
+ def checkout_normalize(self, blob: Blob, tree_path: bytes) -> Blob:
|
|
|
"""Normalize a blob during a checkout operation."""
|
|
|
if self.fallback_read_filter is not None:
|
|
|
return normalize_blob(
|
|
@@ -260,7 +303,9 @@ class BlobNormalizer:
|
|
|
return blob
|
|
|
|
|
|
|
|
|
-def normalize_blob(blob, conversion, binary_detection):
|
|
|
+def normalize_blob(
|
|
|
+ blob: Blob, conversion: Callable[[bytes], bytes], binary_detection: bool
|
|
|
+) -> Blob:
|
|
|
"""Takes a blob as input returns either the original blob if
|
|
|
binary_detection is True and the blob content looks like binary, else
|
|
|
return a new blob with converted data.
|
|
@@ -285,7 +330,13 @@ def normalize_blob(blob, conversion, binary_detection):
|
|
|
|
|
|
|
|
|
class TreeBlobNormalizer(BlobNormalizer):
|
|
|
- def __init__(self, config_stack, git_attributes, object_store, tree=None) -> None:
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ config_stack: "StackedConfig",
|
|
|
+ git_attributes: dict[str, Any],
|
|
|
+ object_store: "BaseObjectStore",
|
|
|
+ tree: Optional[ObjectID] = None,
|
|
|
+ ) -> None:
|
|
|
super().__init__(config_stack, git_attributes)
|
|
|
if tree:
|
|
|
self.existing_paths = {
|
|
@@ -294,7 +345,7 @@ class TreeBlobNormalizer(BlobNormalizer):
|
|
|
else:
|
|
|
self.existing_paths = set()
|
|
|
|
|
|
- def checkin_normalize(self, blob, tree_path):
|
|
|
+ def checkin_normalize(self, blob: Blob, tree_path: bytes) -> Blob:
|
|
|
# Existing files should only be normalized on checkin if it was
|
|
|
# previously normalized on checkout
|
|
|
if (
|