|
@@ -27,6 +27,7 @@ Todo:
|
|
|
|
|
|
import logging
|
|
import logging
|
|
import os
|
|
import os
|
|
|
|
+import re
|
|
import sys
|
|
import sys
|
|
from collections.abc import Iterable, Iterator, KeysView, MutableMapping
|
|
from collections.abc import Iterable, Iterator, KeysView, MutableMapping
|
|
from contextlib import suppress
|
|
from contextlib import suppress
|
|
@@ -819,7 +820,7 @@ class ConfigFile(ConfigDict):
|
|
# Handle includeIf conditions
|
|
# Handle includeIf conditions
|
|
if len(section) > 1 and section[0].lower() == b"includeif":
|
|
if len(section) > 1 and section[0].lower() == b"includeif":
|
|
condition = section[1].decode(self.encoding, errors="replace")
|
|
condition = section[1].decode(self.encoding, errors="replace")
|
|
- if not self._evaluate_includeif_condition(condition, repo_dir):
|
|
|
|
|
|
+ if not self._evaluate_includeif_condition(condition, repo_dir, config_dir):
|
|
return
|
|
return
|
|
|
|
|
|
# Resolve the include path
|
|
# Resolve the include path
|
|
@@ -893,34 +894,188 @@ class ConfigFile(ConfigDict):
|
|
return path
|
|
return path
|
|
|
|
|
|
def _evaluate_includeif_condition(
|
|
def _evaluate_includeif_condition(
|
|
- self, condition: str, repo_dir: Optional[str]
|
|
|
|
|
|
+ self, condition: str, repo_dir: Optional[str], config_dir: Optional[str] = None
|
|
) -> bool:
|
|
) -> bool:
|
|
"""Evaluate an includeIf condition."""
|
|
"""Evaluate an includeIf condition."""
|
|
if condition.startswith("gitdir:"):
|
|
if condition.startswith("gitdir:"):
|
|
- return self._evaluate_gitdir_condition(condition[7:], repo_dir)
|
|
|
|
|
|
+ return self._evaluate_gitdir_condition(condition[7:], repo_dir, config_dir)
|
|
elif condition.startswith("gitdir/i:"):
|
|
elif condition.startswith("gitdir/i:"):
|
|
return self._evaluate_gitdir_condition(
|
|
return self._evaluate_gitdir_condition(
|
|
- condition[9:], repo_dir, case_sensitive=False
|
|
|
|
|
|
+ condition[9:], repo_dir, config_dir, case_sensitive=False
|
|
)
|
|
)
|
|
elif condition.startswith("hasconfig:"):
|
|
elif condition.startswith("hasconfig:"):
|
|
- # hasconfig conditions require special handling and access to the full config
|
|
|
|
- # For now, we'll skip these as they require more complex implementation
|
|
|
|
- return False
|
|
|
|
|
|
+ return self._evaluate_hasconfig_condition(condition[10:])
|
|
|
|
+ elif condition.startswith("onbranch:"):
|
|
|
|
+ return self._evaluate_onbranch_condition(condition[9:], repo_dir)
|
|
else:
|
|
else:
|
|
# Unknown condition type - log and ignore (Git behavior)
|
|
# Unknown condition type - log and ignore (Git behavior)
|
|
logger.debug("Unknown includeIf condition: %r", condition)
|
|
logger.debug("Unknown includeIf condition: %r", condition)
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
+ def _evaluate_hasconfig_condition(self, condition: str) -> bool:
|
|
|
|
+ """Evaluate a hasconfig condition.
|
|
|
|
+
|
|
|
|
+ Format: hasconfig:config.key:pattern
|
|
|
|
+ Example: hasconfig:remote.*.url:ssh://org-*@github.com/**
|
|
|
|
+ """
|
|
|
|
+ try:
|
|
|
|
+ # Split on the first colon to separate config key from pattern
|
|
|
|
+ parts = condition.split(":", 1)
|
|
|
|
+ if len(parts) != 2:
|
|
|
|
+ logger.debug("Invalid hasconfig condition format: %r", condition)
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ config_key, pattern = parts
|
|
|
|
+
|
|
|
|
+ # Parse the config key to get section and name
|
|
|
|
+ key_parts = config_key.split(".", 2)
|
|
|
|
+ if len(key_parts) < 2:
|
|
|
|
+ logger.debug("Invalid hasconfig config key: %r", config_key)
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ # Handle wildcards in section names (e.g., remote.*)
|
|
|
|
+ if len(key_parts) == 3 and key_parts[1] == "*":
|
|
|
|
+ # Match any subsection
|
|
|
|
+ section_prefix = key_parts[0].encode(self.encoding)
|
|
|
|
+ name = key_parts[2].encode(self.encoding)
|
|
|
|
+
|
|
|
|
+ # Check all sections that match the pattern
|
|
|
|
+ for section in self.sections():
|
|
|
|
+ if len(section) == 2 and section[0] == section_prefix:
|
|
|
|
+ try:
|
|
|
|
+ values = list(self.get_multivar(section, name))
|
|
|
|
+ for value in values:
|
|
|
|
+ if self._match_hasconfig_pattern(value, pattern):
|
|
|
|
+ return True
|
|
|
|
+ except KeyError:
|
|
|
|
+ continue
|
|
|
|
+ else:
|
|
|
|
+ # Direct section lookup
|
|
|
|
+ if len(key_parts) == 2:
|
|
|
|
+ section = (key_parts[0].encode(self.encoding),)
|
|
|
|
+ name = key_parts[1].encode(self.encoding)
|
|
|
|
+ else:
|
|
|
|
+ section = (
|
|
|
|
+ key_parts[0].encode(self.encoding),
|
|
|
|
+ key_parts[1].encode(self.encoding),
|
|
|
|
+ )
|
|
|
|
+ name = key_parts[2].encode(self.encoding)
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ values = list(self.get_multivar(section, name))
|
|
|
|
+ for value in values:
|
|
|
|
+ if self._match_hasconfig_pattern(value, pattern):
|
|
|
|
+ return True
|
|
|
|
+ except KeyError:
|
|
|
|
+ pass
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.debug("Error evaluating hasconfig condition %r: %s", condition, e)
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def _match_hasconfig_pattern(self, value: bytes, pattern: str) -> bool:
|
|
|
|
+ """Match a config value against a hasconfig pattern.
|
|
|
|
+
|
|
|
|
+ Supports simple glob patterns like * and **.
|
|
|
|
+ """
|
|
|
|
+ value_str = value.decode(self.encoding, errors="replace")
|
|
|
|
+
|
|
|
|
+ # Convert glob pattern to regex
|
|
|
|
+ # Escape special regex chars except * and **
|
|
|
|
+ pattern_escaped = re.escape(pattern)
|
|
|
|
+ # Replace escaped \*\* with .* (match anything)
|
|
|
|
+ pattern_escaped = pattern_escaped.replace(r"\*\*", ".*")
|
|
|
|
+ # Replace escaped \* with [^/]* (match anything except /)
|
|
|
|
+ pattern_escaped = pattern_escaped.replace(r"\*", "[^/]*")
|
|
|
|
+ # Anchor the pattern
|
|
|
|
+ pattern_regex = f"^{pattern_escaped}$"
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ return bool(re.match(pattern_regex, value_str))
|
|
|
|
+ except re.error:
|
|
|
|
+ logger.debug("Invalid hasconfig pattern: %r", pattern)
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def _evaluate_onbranch_condition(
|
|
|
|
+ self, pattern: str, repo_dir: Optional[str]
|
|
|
|
+ ) -> bool:
|
|
|
|
+ """Evaluate an onbranch condition.
|
|
|
|
+
|
|
|
|
+ Format: onbranch:pattern
|
|
|
|
+ Example: onbranch:main, onbranch:feature/*, onbranch:**/hotfix-*
|
|
|
|
+ """
|
|
|
|
+ if not repo_dir:
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ # Try to read HEAD to get current branch
|
|
|
|
+ head_path = os.path.join(repo_dir, ".git", "HEAD")
|
|
|
|
+ if os.path.isfile(head_path):
|
|
|
|
+ with open(head_path, "rb") as f:
|
|
|
|
+ head_content = f.read().strip()
|
|
|
|
+
|
|
|
|
+ # Check if HEAD points to a branch
|
|
|
|
+ if head_content.startswith(b"ref: refs/heads/"):
|
|
|
|
+ branch_name = head_content[16:].decode("utf-8", errors="replace")
|
|
|
|
+ return self._match_branch_pattern(branch_name, pattern)
|
|
|
|
+
|
|
|
|
+ # If repo_dir itself is .git directory
|
|
|
|
+ elif repo_dir.endswith(".git"):
|
|
|
|
+ head_path = os.path.join(repo_dir, "HEAD")
|
|
|
|
+ if os.path.isfile(head_path):
|
|
|
|
+ with open(head_path, "rb") as f:
|
|
|
|
+ head_content = f.read().strip()
|
|
|
|
+
|
|
|
|
+ if head_content.startswith(b"ref: refs/heads/"):
|
|
|
|
+ branch_name = head_content[16:].decode(
|
|
|
|
+ "utf-8", errors="replace"
|
|
|
|
+ )
|
|
|
|
+ return self._match_branch_pattern(branch_name, pattern)
|
|
|
|
+
|
|
|
|
+ return False
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.debug("Error evaluating onbranch condition %r: %s", pattern, e)
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+ def _match_branch_pattern(self, branch_name: str, pattern: str) -> bool:
|
|
|
|
+ """Match a branch name against an onbranch pattern.
|
|
|
|
+
|
|
|
|
+ Supports glob patterns like *, **, and exact matches.
|
|
|
|
+ """
|
|
|
|
+ # Convert glob pattern to regex
|
|
|
|
+ pattern_escaped = re.escape(pattern)
|
|
|
|
+ # Replace escaped \*\* with .* (match anything including /)
|
|
|
|
+ pattern_escaped = pattern_escaped.replace(r"\*\*", ".*")
|
|
|
|
+ # Replace escaped \* with [^/]* (match anything except /)
|
|
|
|
+ pattern_escaped = pattern_escaped.replace(r"\*", "[^/]*")
|
|
|
|
+ # Anchor the pattern
|
|
|
|
+ pattern_regex = f"^{pattern_escaped}$"
|
|
|
|
+
|
|
|
|
+ try:
|
|
|
|
+ return bool(re.match(pattern_regex, branch_name))
|
|
|
|
+ except re.error:
|
|
|
|
+ logger.debug("Invalid onbranch pattern: %r", pattern)
|
|
|
|
+ return False
|
|
|
|
+
|
|
def _evaluate_gitdir_condition(
|
|
def _evaluate_gitdir_condition(
|
|
- self, pattern: str, repo_dir: Optional[str], case_sensitive: bool = True
|
|
|
|
|
|
+ self,
|
|
|
|
+ pattern: str,
|
|
|
|
+ repo_dir: Optional[str],
|
|
|
|
+ config_dir: Optional[str] = None,
|
|
|
|
+ case_sensitive: bool = True,
|
|
) -> bool:
|
|
) -> bool:
|
|
"""Evaluate a gitdir condition using simplified pattern matching."""
|
|
"""Evaluate a gitdir condition using simplified pattern matching."""
|
|
if not repo_dir:
|
|
if not repo_dir:
|
|
return False
|
|
return False
|
|
|
|
|
|
- # Skip relative patterns for now (would need config file path)
|
|
|
|
|
|
+ # Handle relative patterns (starting with ./)
|
|
if pattern.startswith("./"):
|
|
if pattern.startswith("./"):
|
|
- return False
|
|
|
|
|
|
+ if not config_dir:
|
|
|
|
+ # Can't resolve relative pattern without config directory
|
|
|
|
+ return False
|
|
|
|
+ # Make pattern relative to config directory and normalize
|
|
|
|
+ pattern = os.path.normpath(os.path.join(config_dir, pattern[2:]))
|
|
|
|
|
|
# Normalize repository path
|
|
# Normalize repository path
|
|
try:
|
|
try:
|