|
@@ -27,6 +27,7 @@ Todo:
|
|
|
|
|
|
import logging
|
|
|
import os
|
|
|
+import re
|
|
|
import sys
|
|
|
from collections.abc import Iterable, Iterator, KeysView, MutableMapping
|
|
|
from contextlib import suppress
|
|
@@ -47,6 +48,10 @@ logger = logging.getLogger(__name__)
|
|
|
# Type for file opener callback
|
|
|
FileOpener = Callable[[Union[str, os.PathLike]], BinaryIO]
|
|
|
|
|
|
+# Type for includeIf condition matcher
|
|
|
+# Takes the condition value (e.g., "main" for onbranch:main) and returns bool
|
|
|
+ConditionMatcher = Callable[[str], bool]
|
|
|
+
|
|
|
# Security limits for include files
|
|
|
MAX_INCLUDE_FILE_SIZE = 1024 * 1024 # 1MB max for included config files
|
|
|
DEFAULT_MAX_INCLUDE_DEPTH = 10 # Maximum recursion depth for includes
|
|
@@ -108,6 +113,29 @@ def _match_gitdir_pattern(
|
|
|
return path_str == pattern_str
|
|
|
|
|
|
|
|
|
+def match_glob_pattern(value: str, pattern: str) -> bool:
|
|
|
+ """Match a value against a glob pattern.
|
|
|
+
|
|
|
+ Supports simple glob patterns like * and **.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ ValueError: If the pattern is invalid
|
|
|
+ """
|
|
|
+ # Convert glob pattern to regex
|
|
|
+ pattern_escaped = re.escape(pattern)
|
|
|
+ # Replace escaped \*\* with .* (match anything)
|
|
|
+ pattern_escaped = pattern_escaped.replace(r"\*\*", ".*")
|
|
|
+ # Replace escaped \* with [^/]* (match anything except /)
|
|
|
+ pattern_escaped = pattern_escaped.replace(r"\*", "[^/]*")
|
|
|
+ # Anchor the pattern
|
|
|
+ pattern_regex = f"^{pattern_escaped}$"
|
|
|
+
|
|
|
+ try:
|
|
|
+ return bool(re.match(pattern_regex, value))
|
|
|
+ except re.error as e:
|
|
|
+ raise ValueError(f"Invalid glob pattern {pattern!r}: {e}")
|
|
|
+
|
|
|
+
|
|
|
def lower_key(key):
|
|
|
if isinstance(key, (bytes, str)):
|
|
|
return key.lower()
|
|
@@ -672,22 +700,22 @@ class ConfigFile(ConfigDict):
|
|
|
f: BinaryIO,
|
|
|
*,
|
|
|
config_dir: Optional[str] = None,
|
|
|
- repo_dir: Optional[str] = None,
|
|
|
included_paths: Optional[set[str]] = None,
|
|
|
include_depth: int = 0,
|
|
|
max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
|
|
|
file_opener: Optional[FileOpener] = None,
|
|
|
+ condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
|
|
|
) -> "ConfigFile":
|
|
|
"""Read configuration from a file-like object.
|
|
|
|
|
|
Args:
|
|
|
f: File-like object to read from
|
|
|
config_dir: Directory containing the config file (for relative includes)
|
|
|
- repo_dir: Repository directory (for gitdir conditions)
|
|
|
included_paths: Set of already included paths (to prevent cycles)
|
|
|
include_depth: Current include depth (to prevent infinite recursion)
|
|
|
max_include_depth: Maximum allowed include depth
|
|
|
file_opener: Optional callback to open included files
|
|
|
+ condition_matchers: Optional dict of condition matchers for includeIf
|
|
|
"""
|
|
|
if include_depth > max_include_depth:
|
|
|
# Prevent excessive recursion
|
|
@@ -736,10 +764,10 @@ class ConfigFile(ConfigDict):
|
|
|
setting,
|
|
|
value,
|
|
|
config_dir=config_dir,
|
|
|
- repo_dir=repo_dir,
|
|
|
include_depth=include_depth,
|
|
|
max_include_depth=max_include_depth,
|
|
|
file_opener=file_opener,
|
|
|
+ condition_matchers=condition_matchers,
|
|
|
)
|
|
|
|
|
|
setting = None
|
|
@@ -761,10 +789,10 @@ class ConfigFile(ConfigDict):
|
|
|
setting,
|
|
|
value,
|
|
|
config_dir=config_dir,
|
|
|
- repo_dir=repo_dir,
|
|
|
include_depth=include_depth,
|
|
|
max_include_depth=max_include_depth,
|
|
|
file_opener=file_opener,
|
|
|
+ condition_matchers=condition_matchers,
|
|
|
)
|
|
|
|
|
|
continuation = None
|
|
@@ -778,10 +806,10 @@ class ConfigFile(ConfigDict):
|
|
|
value: bytes,
|
|
|
*,
|
|
|
config_dir: Optional[str],
|
|
|
- repo_dir: Optional[str],
|
|
|
include_depth: int,
|
|
|
max_include_depth: int,
|
|
|
file_opener: Optional[FileOpener],
|
|
|
+ condition_matchers: Optional[dict[str, ConditionMatcher]],
|
|
|
) -> None:
|
|
|
"""Handle include/includeIf directives during config parsing."""
|
|
|
if (
|
|
@@ -796,10 +824,10 @@ class ConfigFile(ConfigDict):
|
|
|
section,
|
|
|
value,
|
|
|
config_dir=config_dir,
|
|
|
- repo_dir=repo_dir,
|
|
|
include_depth=include_depth,
|
|
|
max_include_depth=max_include_depth,
|
|
|
file_opener=file_opener,
|
|
|
+ condition_matchers=condition_matchers,
|
|
|
)
|
|
|
|
|
|
def _process_include(
|
|
@@ -808,10 +836,10 @@ class ConfigFile(ConfigDict):
|
|
|
path_value: bytes,
|
|
|
*,
|
|
|
config_dir: Optional[str],
|
|
|
- repo_dir: Optional[str],
|
|
|
include_depth: int,
|
|
|
max_include_depth: int,
|
|
|
file_opener: Optional[FileOpener],
|
|
|
+ condition_matchers: Optional[dict[str, ConditionMatcher]],
|
|
|
) -> None:
|
|
|
"""Process an include or includeIf directive."""
|
|
|
path_str = path_value.decode(self.encoding, errors="replace")
|
|
@@ -819,7 +847,9 @@ class ConfigFile(ConfigDict):
|
|
|
# Handle includeIf conditions
|
|
|
if len(section) > 1 and section[0].lower() == b"includeif":
|
|
|
condition = section[1].decode(self.encoding, errors="replace")
|
|
|
- if not self._evaluate_includeif_condition(condition, repo_dir):
|
|
|
+ if not self._evaluate_includeif_condition(
|
|
|
+ condition, config_dir, condition_matchers
|
|
|
+ ):
|
|
|
return
|
|
|
|
|
|
# Resolve the include path
|
|
@@ -861,11 +891,11 @@ class ConfigFile(ConfigDict):
|
|
|
included_config = ConfigFile.from_file(
|
|
|
included_file,
|
|
|
config_dir=os.path.dirname(include_path),
|
|
|
- repo_dir=repo_dir,
|
|
|
included_paths=self._included_paths,
|
|
|
include_depth=include_depth + 1,
|
|
|
max_include_depth=max_include_depth,
|
|
|
file_opener=file_opener,
|
|
|
+ condition_matchers=condition_matchers,
|
|
|
)
|
|
|
|
|
|
# Merge the included configuration
|
|
@@ -893,89 +923,108 @@ class ConfigFile(ConfigDict):
|
|
|
return path
|
|
|
|
|
|
def _evaluate_includeif_condition(
|
|
|
- self, condition: str, repo_dir: Optional[str]
|
|
|
+ self,
|
|
|
+ condition: str,
|
|
|
+ config_dir: Optional[str] = None,
|
|
|
+ condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
|
|
|
) -> bool:
|
|
|
"""Evaluate an includeIf condition."""
|
|
|
- if condition.startswith("gitdir:"):
|
|
|
- return self._evaluate_gitdir_condition(condition[7:], repo_dir)
|
|
|
- elif condition.startswith("gitdir/i:"):
|
|
|
- return self._evaluate_gitdir_condition(
|
|
|
- condition[9:], repo_dir, case_sensitive=False
|
|
|
- )
|
|
|
- elif condition.startswith("hasconfig:"):
|
|
|
- # hasconfig conditions require special handling and access to the full config
|
|
|
- # For now, we'll skip these as they require more complex implementation
|
|
|
- return False
|
|
|
+ # Try custom matchers first if provided
|
|
|
+ if condition_matchers:
|
|
|
+ for prefix, matcher in condition_matchers.items():
|
|
|
+ if condition.startswith(prefix):
|
|
|
+ return matcher(condition[len(prefix) :])
|
|
|
+
|
|
|
+ # Fall back to built-in matchers
|
|
|
+ if condition.startswith("hasconfig:"):
|
|
|
+ return self._evaluate_hasconfig_condition(condition[10:])
|
|
|
else:
|
|
|
# Unknown condition type - log and ignore (Git behavior)
|
|
|
logger.debug("Unknown includeIf condition: %r", condition)
|
|
|
return False
|
|
|
|
|
|
- def _evaluate_gitdir_condition(
|
|
|
- self, pattern: str, repo_dir: Optional[str], case_sensitive: bool = True
|
|
|
- ) -> bool:
|
|
|
- """Evaluate a gitdir condition using simplified pattern matching."""
|
|
|
- if not repo_dir:
|
|
|
- return False
|
|
|
-
|
|
|
- # Skip relative patterns for now (would need config file path)
|
|
|
- if pattern.startswith("./"):
|
|
|
- return False
|
|
|
+ def _evaluate_hasconfig_condition(self, condition: str) -> bool:
|
|
|
+ """Evaluate a hasconfig condition.
|
|
|
|
|
|
- # Normalize repository path
|
|
|
- try:
|
|
|
- repo_path = str(Path(repo_dir).resolve())
|
|
|
- except (OSError, ValueError) as e:
|
|
|
- logger.debug("Invalid repository path %r: %s", repo_dir, e)
|
|
|
+ Format: hasconfig:config.key:pattern
|
|
|
+ Example: hasconfig:remote.*.url:ssh://org-*@github.com/**
|
|
|
+ """
|
|
|
+ # Split on the first colon to separate config key from pattern
|
|
|
+ parts = condition.split(":", 1)
|
|
|
+ if len(parts) != 2:
|
|
|
+ logger.debug("Invalid hasconfig condition format: %r", condition)
|
|
|
return False
|
|
|
|
|
|
- # Expand ~ in pattern and normalize
|
|
|
- pattern = os.path.expanduser(pattern)
|
|
|
- pattern = self._normalize_gitdir_pattern(pattern)
|
|
|
+ config_key, pattern = parts
|
|
|
|
|
|
- # Use simple pattern matching for gitdir conditions
|
|
|
- pattern_bytes = pattern.encode("utf-8", errors="replace")
|
|
|
- repo_path_bytes = repo_path.encode("utf-8", errors="replace")
|
|
|
+ # Parse the config key to get section and name
|
|
|
+ key_parts = config_key.split(".", 2)
|
|
|
+ if len(key_parts) < 2:
|
|
|
+ logger.debug("Invalid hasconfig config key: %r", config_key)
|
|
|
+ return False
|
|
|
|
|
|
- return _match_gitdir_pattern(
|
|
|
- repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
|
|
|
- )
|
|
|
+ # Handle wildcards in section names (e.g., remote.*)
|
|
|
+ if len(key_parts) == 3 and key_parts[1] == "*":
|
|
|
+ # Match any subsection
|
|
|
+ section_prefix = key_parts[0].encode(self.encoding)
|
|
|
+ name = key_parts[2].encode(self.encoding)
|
|
|
+
|
|
|
+ # Check all sections that match the pattern
|
|
|
+ for section in self.sections():
|
|
|
+ if len(section) == 2 and section[0] == section_prefix:
|
|
|
+ try:
|
|
|
+ values = list(self.get_multivar(section, name))
|
|
|
+ for value in values:
|
|
|
+ if self._match_hasconfig_pattern(value, pattern):
|
|
|
+ return True
|
|
|
+ except KeyError:
|
|
|
+ continue
|
|
|
+ else:
|
|
|
+ # Direct section lookup
|
|
|
+ if len(key_parts) == 2:
|
|
|
+ section = (key_parts[0].encode(self.encoding),)
|
|
|
+ name = key_parts[1].encode(self.encoding)
|
|
|
+ else:
|
|
|
+ section = (
|
|
|
+ key_parts[0].encode(self.encoding),
|
|
|
+ key_parts[1].encode(self.encoding),
|
|
|
+ )
|
|
|
+ name = key_parts[2].encode(self.encoding)
|
|
|
|
|
|
- def _normalize_gitdir_pattern(self, pattern: str) -> str:
|
|
|
- """Normalize a gitdir pattern following Git's rules."""
|
|
|
- # Normalize path separators to forward slashes
|
|
|
- pattern = pattern.replace("\\", "/")
|
|
|
+ try:
|
|
|
+ values = list(self.get_multivar(section, name))
|
|
|
+ for value in values:
|
|
|
+ if self._match_hasconfig_pattern(value, pattern):
|
|
|
+ return True
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
|
|
|
- # If pattern doesn't start with ~/, ./, /, drive letter (Windows), or **, prepend **/
|
|
|
- if not pattern.startswith(("~/", "./", "/", "**")):
|
|
|
- # Check for Windows absolute path (e.g., C:/, D:/)
|
|
|
- if len(pattern) >= 2 and pattern[1] == ":":
|
|
|
- pass # Don't prepend **/ for Windows absolute paths
|
|
|
- else:
|
|
|
- pattern = "**/" + pattern
|
|
|
+ return False
|
|
|
|
|
|
- # If pattern ends with /, append **
|
|
|
- if pattern.endswith("/"):
|
|
|
- pattern = pattern + "**"
|
|
|
+ def _match_hasconfig_pattern(self, value: bytes, pattern: str) -> bool:
|
|
|
+ """Match a config value against a hasconfig pattern.
|
|
|
|
|
|
- return pattern
|
|
|
+ Supports simple glob patterns like * and **.
|
|
|
+ """
|
|
|
+ value_str = value.decode(self.encoding, errors="replace")
|
|
|
+ return match_glob_pattern(value_str, pattern)
|
|
|
|
|
|
@classmethod
|
|
|
def from_path(
|
|
|
cls,
|
|
|
path: Union[str, os.PathLike],
|
|
|
*,
|
|
|
- repo_dir: Optional[str] = None,
|
|
|
max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
|
|
|
file_opener: Optional[FileOpener] = None,
|
|
|
+ condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
|
|
|
) -> "ConfigFile":
|
|
|
"""Read configuration from a file on disk.
|
|
|
|
|
|
Args:
|
|
|
path: Path to the configuration file
|
|
|
- repo_dir: Repository directory (for gitdir conditions in includeIf)
|
|
|
max_include_depth: Maximum allowed include depth
|
|
|
file_opener: Optional callback to open included files
|
|
|
+ condition_matchers: Optional dict of condition matchers for includeIf
|
|
|
"""
|
|
|
abs_path = os.fspath(path)
|
|
|
config_dir = os.path.dirname(abs_path)
|
|
@@ -992,9 +1041,9 @@ class ConfigFile(ConfigDict):
|
|
|
ret = cls.from_file(
|
|
|
f,
|
|
|
config_dir=config_dir,
|
|
|
- repo_dir=repo_dir,
|
|
|
max_include_depth=max_include_depth,
|
|
|
file_opener=file_opener,
|
|
|
+ condition_matchers=condition_matchers,
|
|
|
)
|
|
|
ret.path = abs_path
|
|
|
return ret
|