config.py 45 KB


  1. # config.py - Reading and writing Git config files
  2. # Copyright (C) 2011-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Reading and writing Git configuration files.
  22. Todo:
  23. * preserve formatting when updating configuration files
  24. """
  25. import logging
  26. import os
  27. import re
  28. import sys
  29. from collections.abc import (
  30. ItemsView,
  31. Iterable,
  32. Iterator,
  33. KeysView,
  34. MutableMapping,
  35. ValuesView,
  36. )
  37. from contextlib import suppress
  38. from pathlib import Path
  39. from typing import (
  40. IO,
  41. Any,
  42. BinaryIO,
  43. Callable,
  44. Generic,
  45. Optional,
  46. TypeVar,
  47. Union,
  48. cast,
  49. overload,
  50. )
  51. from .file import GitFile, _GitFile
  52. ConfigKey = Union[str, bytes, tuple[Union[str, bytes], ...]]
  53. ConfigValue = Union[str, bytes, bool, int]
  54. logger = logging.getLogger(__name__)
  55. # Type for file opener callback
  56. FileOpener = Callable[[Union[str, os.PathLike]], BinaryIO]
  57. # Type for includeIf condition matcher
  58. # Takes the condition value (e.g., "main" for onbranch:main) and returns bool
  59. ConditionMatcher = Callable[[str], bool]
  60. # Security limits for include files
  61. MAX_INCLUDE_FILE_SIZE = 1024 * 1024 # 1MB max for included config files
  62. DEFAULT_MAX_INCLUDE_DEPTH = 10 # Maximum recursion depth for includes
  63. def _match_gitdir_pattern(
  64. path: bytes, pattern: bytes, ignorecase: bool = False
  65. ) -> bool:
  66. """Simple gitdir pattern matching for includeIf conditions.
  67. This handles the basic gitdir patterns used in includeIf directives.
  68. """
  69. # Convert to strings for easier manipulation
  70. path_str = path.decode("utf-8", errors="replace")
  71. pattern_str = pattern.decode("utf-8", errors="replace")
  72. # Normalize paths to use forward slashes for consistent matching
  73. path_str = path_str.replace("\\", "/")
  74. pattern_str = pattern_str.replace("\\", "/")
  75. if ignorecase:
  76. path_str = path_str.lower()
  77. pattern_str = pattern_str.lower()
  78. # Handle the common cases for gitdir patterns
  79. if pattern_str.startswith("**/") and pattern_str.endswith("/**"):
  80. # Pattern like **/dirname/** should match any path containing dirname
  81. dirname = pattern_str[3:-3] # Remove **/ and /**
  82. # Check if path contains the directory name as a path component
  83. return ("/" + dirname + "/") in path_str or path_str.endswith("/" + dirname)
  84. elif pattern_str.startswith("**/"):
  85. # Pattern like **/filename
  86. suffix = pattern_str[3:] # Remove **/
  87. return suffix in path_str or path_str.endswith("/" + suffix)
  88. elif pattern_str.endswith("/**"):
  89. # Pattern like /path/to/dir/** should match /path/to/dir and any subdirectory
  90. base_pattern = pattern_str[:-3] # Remove /**
  91. return path_str == base_pattern or path_str.startswith(base_pattern + "/")
  92. elif "**" in pattern_str:
  93. # Handle patterns with ** in the middle
  94. parts = pattern_str.split("**")
  95. if len(parts) == 2:
  96. prefix, suffix = parts
  97. # Path must start with prefix and end with suffix (if any)
  98. if prefix and not path_str.startswith(prefix):
  99. return False
  100. if suffix and not path_str.endswith(suffix):
  101. return False
  102. return True
  103. # Direct match or simple glob pattern
  104. if "*" in pattern_str or "?" in pattern_str or "[" in pattern_str:
  105. import fnmatch
  106. return fnmatch.fnmatch(path_str, pattern_str)
  107. else:
  108. return path_str == pattern_str
  109. def match_glob_pattern(value: str, pattern: str) -> bool:
  110. r"""Match a value against a glob pattern.
  111. Supports simple glob patterns like ``*`` and ``**``.
  112. Raises:
  113. ValueError: If the pattern is invalid
  114. """
  115. # Convert glob pattern to regex
  116. pattern_escaped = re.escape(pattern)
  117. # Replace escaped \*\* with .* (match anything)
  118. pattern_escaped = pattern_escaped.replace(r"\*\*", ".*")
  119. # Replace escaped \* with [^/]* (match anything except /)
  120. pattern_escaped = pattern_escaped.replace(r"\*", "[^/]*")
  121. # Anchor the pattern
  122. pattern_regex = f"^{pattern_escaped}$"
  123. try:
  124. return bool(re.match(pattern_regex, value))
  125. except re.error as e:
  126. raise ValueError(f"Invalid glob pattern {pattern!r}: {e}")
  127. def lower_key(key: ConfigKey) -> ConfigKey:
  128. if isinstance(key, (bytes, str)):
  129. return key.lower()
  130. if isinstance(key, tuple):
  131. # For config sections, only lowercase the section name (first element)
  132. # but preserve the case of subsection names (remaining elements)
  133. if len(key) > 0:
  134. first = key[0]
  135. assert isinstance(first, (bytes, str))
  136. return (first.lower(), *key[1:])
  137. return key
  138. raise TypeError(key)
  139. K = TypeVar("K", bound=ConfigKey) # Key type must be ConfigKey
  140. V = TypeVar("V") # Value type
  141. _T = TypeVar("_T") # For get() default parameter
  142. class CaseInsensitiveOrderedMultiDict(MutableMapping[K, V], Generic[K, V]):
  143. def __init__(self, default_factory: Optional[Callable[[], V]] = None) -> None:
  144. self._real: list[tuple[K, V]] = []
  145. self._keyed: dict[Any, V] = {}
  146. self._default_factory = default_factory
  147. @classmethod
  148. def make(
  149. cls, dict_in: Optional[Union[MutableMapping[K, V], "CaseInsensitiveOrderedMultiDict[K, V]"]] = None,
  150. default_factory: Optional[Callable[[], V]] = None
  151. ) -> "CaseInsensitiveOrderedMultiDict[K, V]":
  152. if isinstance(dict_in, cls):
  153. return dict_in
  154. out = cls(default_factory=default_factory)
  155. if dict_in is None:
  156. return out
  157. if not isinstance(dict_in, MutableMapping):
  158. raise TypeError
  159. for key, value in dict_in.items():
  160. out[key] = value
  161. return out
  162. def __len__(self) -> int:
  163. return len(self._keyed)
  164. def keys(self) -> KeysView[K]:
  165. return self._keyed.keys() # type: ignore[return-value]
  166. def items(self) -> ItemsView[K, V]:
  167. # Return a view that iterates over the real list to preserve order
  168. class OrderedItemsView(ItemsView[K, V]):
  169. def __init__(self, mapping: CaseInsensitiveOrderedMultiDict[K, V]):
  170. self._mapping = mapping
  171. def __iter__(self) -> Iterator[tuple[K, V]]:
  172. return iter(self._mapping._real)
  173. def __len__(self) -> int:
  174. return len(self._mapping._real)
  175. def __contains__(self, item: object) -> bool:
  176. if not isinstance(item, tuple) or len(item) != 2:
  177. return False
  178. key, value = item
  179. return any(k == key and v == value for k, v in self._mapping._real)
  180. return OrderedItemsView(self)
  181. def __iter__(self) -> Iterator[K]:
  182. return iter(self._keyed)
  183. def values(self) -> ValuesView[V]:
  184. return self._keyed.values()
  185. def __setitem__(self, key: K, value: V) -> None:
  186. self._real.append((key, value))
  187. self._keyed[lower_key(key)] = value
  188. def set(self, key: K, value: V) -> None:
  189. # This method replaces all existing values for the key
  190. lower = lower_key(key)
  191. self._real = [(k, v) for k, v in self._real if lower_key(k) != lower]
  192. self._real.append((key, value))
  193. self._keyed[lower] = value
  194. def __delitem__(self, key: K) -> None:
  195. lower_k = lower_key(key)
  196. del self._keyed[lower_k]
  197. for i, (actual, unused_value) in reversed(list(enumerate(self._real))):
  198. if lower_key(actual) == lower_k:
  199. del self._real[i]
  200. def __getitem__(self, item: K) -> V:
  201. return self._keyed[lower_key(item)]
  202. def get(self, key: K, /, default: Union[V, _T, None] = None) -> Union[V, _T, None]: # type: ignore[override]
  203. try:
  204. return self[key]
  205. except KeyError:
  206. if default is not None:
  207. return default
  208. elif self._default_factory is not None:
  209. return self._default_factory()
  210. else:
  211. return None
  212. def get_all(self, key: K) -> Iterator[V]:
  213. lowered_key = lower_key(key)
  214. for actual, value in self._real:
  215. if lower_key(actual) == lowered_key:
  216. yield value
  217. def setdefault(self, key: K, default: Optional[V] = None) -> V:
  218. try:
  219. return self[key]
  220. except KeyError:
  221. if default is not None:
  222. self[key] = default
  223. return default
  224. elif self._default_factory is not None:
  225. value = self._default_factory()
  226. self[key] = value
  227. return value
  228. else:
  229. raise
  230. Name = bytes
  231. NameLike = Union[bytes, str]
  232. Section = tuple[bytes, ...]
  233. SectionLike = Union[bytes, str, tuple[Union[bytes, str], ...]]
  234. Value = bytes
  235. ValueLike = Union[bytes, str]
  236. class Config:
  237. """A Git configuration."""
  238. def get(self, section: SectionLike, name: NameLike) -> Value:
  239. """Retrieve the contents of a configuration setting.
  240. Args:
  241. section: Tuple with section name and optional subsection name
  242. name: Variable name
  243. Returns:
  244. Contents of the setting
  245. Raises:
  246. KeyError: if the value is not set
  247. """
  248. raise NotImplementedError(self.get)
  249. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  250. """Retrieve the contents of a multivar configuration setting.
  251. Args:
  252. section: Tuple with section name and optional subsection namee
  253. name: Variable name
  254. Returns:
  255. Contents of the setting as iterable
  256. Raises:
  257. KeyError: if the value is not set
  258. """
  259. raise NotImplementedError(self.get_multivar)
  260. @overload
  261. def get_boolean(
  262. self, section: SectionLike, name: NameLike, default: bool
  263. ) -> bool: ...
  264. @overload
  265. def get_boolean(self, section: SectionLike, name: NameLike) -> Optional[bool]: ...
  266. def get_boolean(
  267. self, section: SectionLike, name: NameLike, default: Optional[bool] = None
  268. ) -> Optional[bool]:
  269. """Retrieve a configuration setting as boolean.
  270. Args:
  271. section: Tuple with section name and optional subsection name
  272. name: Name of the setting, including section and possible
  273. subsection.
  274. Returns:
  275. Contents of the setting
  276. """
  277. try:
  278. value = self.get(section, name)
  279. except KeyError:
  280. return default
  281. if value.lower() == b"true":
  282. return True
  283. elif value.lower() == b"false":
  284. return False
  285. raise ValueError(f"not a valid boolean string: {value!r}")
  286. def set(
  287. self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
  288. ) -> None:
  289. """Set a configuration value.
  290. Args:
  291. section: Tuple with section name and optional subsection namee
  292. name: Name of the configuration value, including section
  293. and optional subsection
  294. value: value of the setting
  295. """
  296. raise NotImplementedError(self.set)
  297. def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
  298. """Iterate over the configuration pairs for a specific section.
  299. Args:
  300. section: Tuple with section name and optional subsection namee
  301. Returns:
  302. Iterator over (name, value) pairs
  303. """
  304. raise NotImplementedError(self.items)
  305. def sections(self) -> Iterator[Section]:
  306. """Iterate over the sections.
  307. Returns: Iterator over section tuples
  308. """
  309. raise NotImplementedError(self.sections)
  310. def has_section(self, name: Section) -> bool:
  311. """Check if a specified section exists.
  312. Args:
  313. name: Name of section to check for
  314. Returns:
  315. boolean indicating whether the section exists
  316. """
  317. return name in self.sections()
  318. class ConfigDict(Config):
  319. """Git configuration stored in a dictionary."""
  320. def __init__(
  321. self,
  322. values: Union[
  323. MutableMapping[Section, MutableMapping[Name, Value]], None
  324. ] = None,
  325. encoding: Union[str, None] = None,
  326. ) -> None:
  327. """Create a new ConfigDict."""
  328. if encoding is None:
  329. encoding = sys.getdefaultencoding()
  330. self.encoding = encoding
  331. self._values: CaseInsensitiveOrderedMultiDict[
  332. Section, CaseInsensitiveOrderedMultiDict[Name, Value]
  333. ] = CaseInsensitiveOrderedMultiDict.make(
  334. values, default_factory=CaseInsensitiveOrderedMultiDict
  335. )
  336. def __repr__(self) -> str:
  337. return f"{self.__class__.__name__}({self._values!r})"
  338. def __eq__(self, other: object) -> bool:
  339. return isinstance(other, self.__class__) and other._values == self._values
  340. def __getitem__(self, key: Section) -> CaseInsensitiveOrderedMultiDict[Name, Value]:
  341. return self._values.__getitem__(key)
  342. def __setitem__(self, key: Section, value: CaseInsensitiveOrderedMultiDict[Name, Value]) -> None:
  343. return self._values.__setitem__(key, value)
  344. def __delitem__(self, key: Section) -> None:
  345. return self._values.__delitem__(key)
  346. def __iter__(self) -> Iterator[Section]:
  347. return self._values.__iter__()
  348. def __len__(self) -> int:
  349. return self._values.__len__()
  350. def keys(self) -> KeysView[Section]:
  351. return self._values.keys()
  352. @classmethod
  353. def _parse_setting(cls, name: str) -> tuple[str, Optional[str], str]:
  354. parts = name.split(".")
  355. if len(parts) == 3:
  356. return (parts[0], parts[1], parts[2])
  357. else:
  358. return (parts[0], None, parts[1])
  359. def _check_section_and_name(
  360. self, section: SectionLike, name: NameLike
  361. ) -> tuple[Section, Name]:
  362. if not isinstance(section, tuple):
  363. section = (section,)
  364. checked_section = tuple(
  365. [
  366. subsection.encode(self.encoding)
  367. if not isinstance(subsection, bytes)
  368. else subsection
  369. for subsection in section
  370. ]
  371. )
  372. if not isinstance(name, bytes):
  373. name = name.encode(self.encoding)
  374. return checked_section, name
  375. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  376. section, name = self._check_section_and_name(section, name)
  377. if len(section) > 1:
  378. try:
  379. return self._values[section].get_all(name)
  380. except KeyError:
  381. pass
  382. return self._values[(section[0],)].get_all(name)
  383. def get(
  384. self,
  385. section: SectionLike,
  386. name: NameLike,
  387. ) -> Value:
  388. section, name = self._check_section_and_name(section, name)
  389. if len(section) > 1:
  390. try:
  391. return self._values[section][name]
  392. except KeyError:
  393. pass
  394. return self._values[(section[0],)][name]
  395. def set(
  396. self,
  397. section: SectionLike,
  398. name: NameLike,
  399. value: Union[ValueLike, bool],
  400. ) -> None:
  401. section, name = self._check_section_and_name(section, name)
  402. if isinstance(value, bool):
  403. value = b"true" if value else b"false"
  404. if not isinstance(value, bytes):
  405. value = value.encode(self.encoding)
  406. section_dict = self._values.setdefault(section)
  407. if hasattr(section_dict, "set"):
  408. section_dict.set(name, value)
  409. else:
  410. section_dict[name] = value
  411. def add(
  412. self,
  413. section: SectionLike,
  414. name: NameLike,
  415. value: Union[ValueLike, bool],
  416. ) -> None:
  417. """Add a value to a configuration setting, creating a multivar if needed."""
  418. section, name = self._check_section_and_name(section, name)
  419. if isinstance(value, bool):
  420. value = b"true" if value else b"false"
  421. if not isinstance(value, bytes):
  422. value = value.encode(self.encoding)
  423. self._values.setdefault(section)[name] = value
  424. def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
  425. section_bytes, _ = self._check_section_and_name(section, b"")
  426. section_dict = self._values.get(section_bytes)
  427. if section_dict is not None:
  428. return iter(section_dict.items())
  429. return iter([])
  430. def sections(self) -> Iterator[Section]:
  431. return iter(self._values.keys())
  432. def _format_string(value: bytes) -> bytes:
  433. if (
  434. value.startswith((b" ", b"\t"))
  435. or value.endswith((b" ", b"\t"))
  436. or b"#" in value
  437. ):
  438. return b'"' + _escape_value(value) + b'"'
  439. else:
  440. return _escape_value(value)
  441. _ESCAPE_TABLE = {
  442. ord(b"\\"): ord(b"\\"),
  443. ord(b'"'): ord(b'"'),
  444. ord(b"n"): ord(b"\n"),
  445. ord(b"t"): ord(b"\t"),
  446. ord(b"b"): ord(b"\b"),
  447. }
  448. _COMMENT_CHARS = [ord(b"#"), ord(b";")]
  449. _WHITESPACE_CHARS = [ord(b"\t"), ord(b" ")]
  450. def _parse_string(value: bytes) -> bytes:
  451. value = bytearray(value.strip())
  452. ret = bytearray()
  453. whitespace = bytearray()
  454. in_quotes = False
  455. i = 0
  456. while i < len(value):
  457. c = value[i]
  458. if c == ord(b"\\"):
  459. i += 1
  460. if i >= len(value):
  461. # Backslash at end of string - treat as literal backslash
  462. if whitespace:
  463. ret.extend(whitespace)
  464. whitespace = bytearray()
  465. ret.append(ord(b"\\"))
  466. else:
  467. try:
  468. v = _ESCAPE_TABLE[value[i]]
  469. if whitespace:
  470. ret.extend(whitespace)
  471. whitespace = bytearray()
  472. ret.append(v)
  473. except KeyError:
  474. # Unknown escape sequence - treat backslash as literal and process next char normally
  475. if whitespace:
  476. ret.extend(whitespace)
  477. whitespace = bytearray()
  478. ret.append(ord(b"\\"))
  479. i -= 1 # Reprocess the character after the backslash
  480. elif c == ord(b'"'):
  481. in_quotes = not in_quotes
  482. elif c in _COMMENT_CHARS and not in_quotes:
  483. # the rest of the line is a comment
  484. break
  485. elif c in _WHITESPACE_CHARS:
  486. whitespace.append(c)
  487. else:
  488. if whitespace:
  489. ret.extend(whitespace)
  490. whitespace = bytearray()
  491. ret.append(c)
  492. i += 1
  493. if in_quotes:
  494. raise ValueError("missing end quote")
  495. return bytes(ret)
  496. def _escape_value(value: bytes) -> bytes:
  497. """Escape a value."""
  498. value = value.replace(b"\\", b"\\\\")
  499. value = value.replace(b"\r", b"\\r")
  500. value = value.replace(b"\n", b"\\n")
  501. value = value.replace(b"\t", b"\\t")
  502. value = value.replace(b'"', b'\\"')
  503. return value
  504. def _check_variable_name(name: bytes) -> bool:
  505. for i in range(len(name)):
  506. c = name[i : i + 1]
  507. if not c.isalnum() and c != b"-":
  508. return False
  509. return True
  510. def _check_section_name(name: bytes) -> bool:
  511. for i in range(len(name)):
  512. c = name[i : i + 1]
  513. if not c.isalnum() and c not in (b"-", b"."):
  514. return False
  515. return True
  516. def _strip_comments(line: bytes) -> bytes:
  517. comment_bytes = {ord(b"#"), ord(b";")}
  518. quote = ord(b'"')
  519. string_open = False
  520. # Normalize line to bytearray for simple 2/3 compatibility
  521. for i, character in enumerate(bytearray(line)):
  522. # Comment characters outside balanced quotes denote comment start
  523. if character == quote:
  524. string_open = not string_open
  525. elif not string_open and character in comment_bytes:
  526. return line[:i]
  527. return line
  528. def _is_line_continuation(value: bytes) -> bool:
  529. """Check if a value ends with a line continuation backslash.
  530. A line continuation occurs when a line ends with a backslash that is:
  531. 1. Not escaped (not preceded by another backslash)
  532. 2. Not within quotes
  533. Args:
  534. value: The value to check
  535. Returns:
  536. True if the value ends with a line continuation backslash
  537. """
  538. if not value.endswith((b"\\\n", b"\\\r\n")):
  539. return False
  540. # Remove only the newline characters, keep the content including the backslash
  541. if value.endswith(b"\\\r\n"):
  542. content = value[:-2] # Remove \r\n, keep the \
  543. else:
  544. content = value[:-1] # Remove \n, keep the \
  545. if not content.endswith(b"\\"):
  546. return False
  547. # Count consecutive backslashes at the end
  548. backslash_count = 0
  549. for i in range(len(content) - 1, -1, -1):
  550. if content[i : i + 1] == b"\\":
  551. backslash_count += 1
  552. else:
  553. break
  554. # If we have an odd number of backslashes, the last one is a line continuation
  555. # If we have an even number, they are all escaped and there's no continuation
  556. return backslash_count % 2 == 1
  557. def _parse_section_header_line(line: bytes) -> tuple[Section, bytes]:
  558. # Parse section header ("[bla]")
  559. line = _strip_comments(line).rstrip()
  560. in_quotes = False
  561. escaped = False
  562. for i, c in enumerate(line):
  563. if escaped:
  564. escaped = False
  565. continue
  566. if c == ord(b'"'):
  567. in_quotes = not in_quotes
  568. if c == ord(b"\\"):
  569. escaped = True
  570. if c == ord(b"]") and not in_quotes:
  571. last = i
  572. break
  573. else:
  574. raise ValueError("expected trailing ]")
  575. pts = line[1:last].split(b" ", 1)
  576. line = line[last + 1 :]
  577. section: Section
  578. if len(pts) == 2:
  579. # Handle subsections - Git allows more complex syntax for certain sections like includeIf
  580. if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
  581. # Standard quoted subsection
  582. pts[1] = pts[1][1:-1]
  583. elif pts[0] == b"includeIf":
  584. # Special handling for includeIf sections which can have complex conditions
  585. # Git allows these without strict quote validation
  586. pts[1] = pts[1].strip()
  587. if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
  588. pts[1] = pts[1][1:-1]
  589. else:
  590. # Other sections must have quoted subsections
  591. raise ValueError(f"Invalid subsection {pts[1]!r}")
  592. if not _check_section_name(pts[0]):
  593. raise ValueError(f"invalid section name {pts[0]!r}")
  594. section = (pts[0], pts[1])
  595. else:
  596. if not _check_section_name(pts[0]):
  597. raise ValueError(f"invalid section name {pts[0]!r}")
  598. pts = pts[0].split(b".", 1)
  599. if len(pts) == 2:
  600. section = (pts[0], pts[1])
  601. else:
  602. section = (pts[0],)
  603. return section, line
  604. class ConfigFile(ConfigDict):
  605. """A Git configuration file, like .git/config or ~/.gitconfig."""
  606. def __init__(
  607. self,
  608. values: Union[
  609. MutableMapping[Section, MutableMapping[Name, Value]], None
  610. ] = None,
  611. encoding: Union[str, None] = None,
  612. ) -> None:
  613. super().__init__(values=values, encoding=encoding)
  614. self.path: Optional[str] = None
  615. self._included_paths: set[str] = set() # Track included files to prevent cycles
  616. @classmethod
  617. def from_file(
  618. cls,
  619. f: BinaryIO,
  620. *,
  621. config_dir: Optional[str] = None,
  622. included_paths: Optional[set[str]] = None,
  623. include_depth: int = 0,
  624. max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
  625. file_opener: Optional[FileOpener] = None,
  626. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  627. ) -> "ConfigFile":
  628. """Read configuration from a file-like object.
  629. Args:
  630. f: File-like object to read from
  631. config_dir: Directory containing the config file (for relative includes)
  632. included_paths: Set of already included paths (to prevent cycles)
  633. include_depth: Current include depth (to prevent infinite recursion)
  634. max_include_depth: Maximum allowed include depth
  635. file_opener: Optional callback to open included files
  636. condition_matchers: Optional dict of condition matchers for includeIf
  637. """
  638. if include_depth > max_include_depth:
  639. # Prevent excessive recursion
  640. raise ValueError(f"Maximum include depth ({max_include_depth}) exceeded")
  641. ret = cls()
  642. if included_paths is not None:
  643. ret._included_paths = included_paths.copy()
  644. section: Optional[Section] = None
  645. setting = None
  646. continuation = None
  647. for lineno, line in enumerate(f.readlines()):
  648. if lineno == 0 and line.startswith(b"\xef\xbb\xbf"):
  649. line = line[3:]
  650. line = line.lstrip()
  651. if setting is None:
  652. if len(line) > 0 and line[:1] == b"[":
  653. section, line = _parse_section_header_line(line)
  654. ret._values.setdefault(section)
  655. if _strip_comments(line).strip() == b"":
  656. continue
  657. if section is None:
  658. raise ValueError(f"setting {line!r} without section")
  659. try:
  660. setting, value = line.split(b"=", 1)
  661. except ValueError:
  662. setting = line
  663. value = b"true"
  664. setting = setting.strip()
  665. if not _check_variable_name(setting):
  666. raise ValueError(f"invalid variable name {setting!r}")
  667. if _is_line_continuation(value):
  668. if value.endswith(b"\\\r\n"):
  669. continuation = value[:-3]
  670. else:
  671. continuation = value[:-2]
  672. else:
  673. continuation = None
  674. value = _parse_string(value)
  675. ret._values[section][setting] = value
  676. # Process include/includeIf directives
  677. ret._handle_include_directive(
  678. section,
  679. setting,
  680. value,
  681. config_dir=config_dir,
  682. include_depth=include_depth,
  683. max_include_depth=max_include_depth,
  684. file_opener=file_opener,
  685. condition_matchers=condition_matchers,
  686. )
  687. setting = None
  688. else: # continuation line
  689. assert continuation is not None
  690. if _is_line_continuation(line):
  691. if line.endswith(b"\\\r\n"):
  692. continuation += line[:-3]
  693. else:
  694. continuation += line[:-2]
  695. else:
  696. continuation += line
  697. value = _parse_string(continuation)
  698. assert section is not None # Already checked above
  699. ret._values[section][setting] = value
  700. # Process include/includeIf directives
  701. ret._handle_include_directive(
  702. section,
  703. setting,
  704. value,
  705. config_dir=config_dir,
  706. include_depth=include_depth,
  707. max_include_depth=max_include_depth,
  708. file_opener=file_opener,
  709. condition_matchers=condition_matchers,
  710. )
  711. continuation = None
  712. setting = None
  713. return ret
  714. def _handle_include_directive(
  715. self,
  716. section: Optional[Section],
  717. setting: bytes,
  718. value: bytes,
  719. *,
  720. config_dir: Optional[str],
  721. include_depth: int,
  722. max_include_depth: int,
  723. file_opener: Optional[FileOpener],
  724. condition_matchers: Optional[dict[str, ConditionMatcher]],
  725. ) -> None:
  726. """Handle include/includeIf directives during config parsing."""
  727. if (
  728. section is not None
  729. and setting == b"path"
  730. and (
  731. section[0].lower() == b"include"
  732. or (len(section) > 1 and section[0].lower() == b"includeif")
  733. )
  734. ):
  735. self._process_include(
  736. section,
  737. value,
  738. config_dir=config_dir,
  739. include_depth=include_depth,
  740. max_include_depth=max_include_depth,
  741. file_opener=file_opener,
  742. condition_matchers=condition_matchers,
  743. )
  744. def _process_include(
  745. self,
  746. section: Section,
  747. path_value: bytes,
  748. *,
  749. config_dir: Optional[str],
  750. include_depth: int,
  751. max_include_depth: int,
  752. file_opener: Optional[FileOpener],
  753. condition_matchers: Optional[dict[str, ConditionMatcher]],
  754. ) -> None:
  755. """Process an include or includeIf directive."""
  756. path_str = path_value.decode(self.encoding, errors="replace")
  757. # Handle includeIf conditions
  758. if len(section) > 1 and section[0].lower() == b"includeif":
  759. condition = section[1].decode(self.encoding, errors="replace")
  760. if not self._evaluate_includeif_condition(
  761. condition, config_dir, condition_matchers
  762. ):
  763. return
  764. # Resolve the include path
  765. include_path = self._resolve_include_path(path_str, config_dir)
  766. if not include_path:
  767. return
  768. # Check for circular includes
  769. try:
  770. abs_path = str(Path(include_path).resolve())
  771. except (OSError, ValueError) as e:
  772. # Invalid path - log and skip
  773. logger.debug("Invalid include path %r: %s", include_path, e)
  774. return
  775. if abs_path in self._included_paths:
  776. return
  777. # Load and merge the included file
  778. try:
  779. # Use provided file opener or default to GitFile
  780. opener: FileOpener
  781. if file_opener is None:
  782. def opener(path: Union[str, os.PathLike]) -> BinaryIO:
  783. return cast(BinaryIO, GitFile(path, "rb"))
  784. else:
  785. opener = file_opener
  786. f = opener(include_path)
  787. except (OSError, ValueError) as e:
  788. # Git silently ignores missing or unreadable include files
  789. # Log for debugging purposes
  790. logger.debug("Invalid include path %r: %s", include_path, e)
  791. else:
  792. with f as included_file:
  793. # Track this path to prevent cycles
  794. self._included_paths.add(abs_path)
  795. # Parse the included file
  796. included_config = ConfigFile.from_file(
  797. included_file,
  798. config_dir=os.path.dirname(include_path),
  799. included_paths=self._included_paths,
  800. include_depth=include_depth + 1,
  801. max_include_depth=max_include_depth,
  802. file_opener=file_opener,
  803. condition_matchers=condition_matchers,
  804. )
  805. # Merge the included configuration
  806. self._merge_config(included_config)
  807. def _merge_config(self, other: "ConfigFile") -> None:
  808. """Merge another config file into this one."""
  809. for section, values in other._values.items():
  810. if section not in self._values:
  811. self._values[section] = CaseInsensitiveOrderedMultiDict()
  812. for key, value in values.items():
  813. self._values[section][key] = value
  814. def _resolve_include_path(
  815. self, path: str, config_dir: Optional[str]
  816. ) -> Optional[str]:
  817. """Resolve an include path to an absolute path."""
  818. # Expand ~ to home directory
  819. path = os.path.expanduser(path)
  820. # If path is relative and we have a config directory, make it relative to that
  821. if not os.path.isabs(path) and config_dir:
  822. path = os.path.join(config_dir, path)
  823. return path
  824. def _evaluate_includeif_condition(
  825. self,
  826. condition: str,
  827. config_dir: Optional[str] = None,
  828. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  829. ) -> bool:
  830. """Evaluate an includeIf condition."""
  831. # Try custom matchers first if provided
  832. if condition_matchers:
  833. for prefix, matcher in condition_matchers.items():
  834. if condition.startswith(prefix):
  835. return matcher(condition[len(prefix) :])
  836. # Fall back to built-in matchers
  837. if condition.startswith("hasconfig:"):
  838. return self._evaluate_hasconfig_condition(condition[10:])
  839. else:
  840. # Unknown condition type - log and ignore (Git behavior)
  841. logger.debug("Unknown includeIf condition: %r", condition)
  842. return False
  843. def _evaluate_hasconfig_condition(self, condition: str) -> bool:
  844. """Evaluate a hasconfig condition.
  845. Format: hasconfig:config.key:pattern
  846. Example: hasconfig:remote.*.url:ssh://org-*@github.com/**
  847. """
  848. # Split on the first colon to separate config key from pattern
  849. parts = condition.split(":", 1)
  850. if len(parts) != 2:
  851. logger.debug("Invalid hasconfig condition format: %r", condition)
  852. return False
  853. config_key, pattern = parts
  854. # Parse the config key to get section and name
  855. key_parts = config_key.split(".", 2)
  856. if len(key_parts) < 2:
  857. logger.debug("Invalid hasconfig config key: %r", config_key)
  858. return False
  859. # Handle wildcards in section names (e.g., remote.*)
  860. if len(key_parts) == 3 and key_parts[1] == "*":
  861. # Match any subsection
  862. section_prefix = key_parts[0].encode(self.encoding)
  863. name = key_parts[2].encode(self.encoding)
  864. # Check all sections that match the pattern
  865. for section in self.sections():
  866. if len(section) == 2 and section[0] == section_prefix:
  867. try:
  868. values = list(self.get_multivar(section, name))
  869. for value in values:
  870. if self._match_hasconfig_pattern(value, pattern):
  871. return True
  872. except KeyError:
  873. continue
  874. else:
  875. # Direct section lookup
  876. if len(key_parts) == 2:
  877. section = (key_parts[0].encode(self.encoding),)
  878. name = key_parts[1].encode(self.encoding)
  879. else:
  880. section = (
  881. key_parts[0].encode(self.encoding),
  882. key_parts[1].encode(self.encoding),
  883. )
  884. name = key_parts[2].encode(self.encoding)
  885. try:
  886. values = list(self.get_multivar(section, name))
  887. for value in values:
  888. if self._match_hasconfig_pattern(value, pattern):
  889. return True
  890. except KeyError:
  891. pass
  892. return False
  893. def _match_hasconfig_pattern(self, value: bytes, pattern: str) -> bool:
  894. """Match a config value against a hasconfig pattern.
  895. Supports simple glob patterns like ``*`` and ``**``.
  896. """
  897. value_str = value.decode(self.encoding, errors="replace")
  898. return match_glob_pattern(value_str, pattern)
  899. @classmethod
  900. def from_path(
  901. cls,
  902. path: Union[str, os.PathLike],
  903. *,
  904. max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
  905. file_opener: Optional[FileOpener] = None,
  906. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  907. ) -> "ConfigFile":
  908. """Read configuration from a file on disk.
  909. Args:
  910. path: Path to the configuration file
  911. max_include_depth: Maximum allowed include depth
  912. file_opener: Optional callback to open included files
  913. condition_matchers: Optional dict of condition matchers for includeIf
  914. """
  915. abs_path = os.fspath(path)
  916. config_dir = os.path.dirname(abs_path)
  917. # Use provided file opener or default to GitFile
  918. opener: FileOpener
  919. if file_opener is None:
  920. def opener(p: Union[str, os.PathLike]) -> BinaryIO:
  921. return cast(BinaryIO, GitFile(p, "rb"))
  922. else:
  923. opener = file_opener
  924. with opener(abs_path) as f:
  925. ret = cls.from_file(
  926. f,
  927. config_dir=config_dir,
  928. max_include_depth=max_include_depth,
  929. file_opener=file_opener,
  930. condition_matchers=condition_matchers,
  931. )
  932. ret.path = abs_path
  933. return ret
  934. def write_to_path(self, path: Optional[Union[str, os.PathLike]] = None) -> None:
  935. """Write configuration to a file on disk."""
  936. if path is None:
  937. if self.path is None:
  938. raise ValueError("No path specified and no default path available")
  939. path_to_use: Union[str, os.PathLike] = self.path
  940. else:
  941. path_to_use = path
  942. with GitFile(path_to_use, "wb") as f:
  943. self.write_to_file(f)
  944. def write_to_file(self, f: Union[IO[bytes], _GitFile]) -> None:
  945. """Write configuration to a file-like object."""
  946. for section, values in self._values.items():
  947. try:
  948. section_name, subsection_name = section
  949. except ValueError:
  950. (section_name,) = section
  951. subsection_name = None
  952. if subsection_name is None:
  953. f.write(b"[" + section_name + b"]\n")
  954. else:
  955. f.write(b"[" + section_name + b' "' + subsection_name + b'"]\n')
  956. for key, value in values.items():
  957. value = _format_string(value)
  958. f.write(b"\t" + key + b" = " + value + b"\n")
  959. def get_xdg_config_home_path(*path_segments: str) -> str:
  960. xdg_config_home = os.environ.get(
  961. "XDG_CONFIG_HOME",
  962. os.path.expanduser("~/.config/"),
  963. )
  964. return os.path.join(xdg_config_home, *path_segments)
  965. def _find_git_in_win_path() -> Iterator[str]:
  966. for exe in ("git.exe", "git.cmd"):
  967. for path in os.environ.get("PATH", "").split(";"):
  968. if os.path.exists(os.path.join(path, exe)):
  969. # in windows native shells (powershell/cmd) exe path is
  970. # .../Git/bin/git.exe or .../Git/cmd/git.exe
  971. #
  972. # in git-bash exe path is .../Git/mingw64/bin/git.exe
  973. git_dir, _bin_dir = os.path.split(path)
  974. yield git_dir
  975. parent_dir, basename = os.path.split(git_dir)
  976. if basename == "mingw32" or basename == "mingw64":
  977. yield parent_dir
  978. break
  979. def _find_git_in_win_reg() -> Iterator[str]:
  980. import platform
  981. import winreg
  982. if platform.machine() == "AMD64":
  983. subkey = (
  984. "SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\"
  985. "CurrentVersion\\Uninstall\\Git_is1"
  986. )
  987. else:
  988. subkey = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\Git_is1"
  989. for key in (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE): # type: ignore
  990. with suppress(OSError):
  991. with winreg.OpenKey(key, subkey) as k: # type: ignore
  992. val, typ = winreg.QueryValueEx(k, "InstallLocation") # type: ignore
  993. if typ == winreg.REG_SZ: # type: ignore
  994. yield val
  995. # There is no set standard for system config dirs on windows. We try the
  996. # following:
  997. # - %PROGRAMDATA%/Git/config - (deprecated) Windows config dir per CGit docs
  998. # - %PROGRAMFILES%/Git/etc/gitconfig - Git for Windows (msysgit) config dir
  999. # Used if CGit installation (Git/bin/git.exe) is found in PATH in the
  1000. # system registry
  1001. def get_win_system_paths() -> Iterator[str]:
  1002. if "PROGRAMDATA" in os.environ:
  1003. yield os.path.join(os.environ["PROGRAMDATA"], "Git", "config")
  1004. for git_dir in _find_git_in_win_path():
  1005. yield os.path.join(git_dir, "etc", "gitconfig")
  1006. for git_dir in _find_git_in_win_reg():
  1007. yield os.path.join(git_dir, "etc", "gitconfig")
  1008. class StackedConfig(Config):
  1009. """Configuration which reads from multiple config files.."""
  1010. def __init__(
  1011. self, backends: list[ConfigFile], writable: Optional[ConfigFile] = None
  1012. ) -> None:
  1013. self.backends = backends
  1014. self.writable = writable
  1015. def __repr__(self) -> str:
  1016. return f"<{self.__class__.__name__} for {self.backends!r}>"
  1017. @classmethod
  1018. def default(cls) -> "StackedConfig":
  1019. return cls(cls.default_backends())
  1020. @classmethod
  1021. def default_backends(cls) -> list[ConfigFile]:
  1022. """Retrieve the default configuration.
  1023. See git-config(1) for details on the files searched.
  1024. """
  1025. paths = []
  1026. # Handle GIT_CONFIG_GLOBAL - overrides user config paths
  1027. try:
  1028. paths.append(os.environ["GIT_CONFIG_GLOBAL"])
  1029. except KeyError:
  1030. paths.append(os.path.expanduser("~/.gitconfig"))
  1031. paths.append(get_xdg_config_home_path("git", "config"))
  1032. # Handle GIT_CONFIG_SYSTEM and GIT_CONFIG_NOSYSTEM
  1033. try:
  1034. paths.append(os.environ["GIT_CONFIG_SYSTEM"])
  1035. except KeyError:
  1036. if "GIT_CONFIG_NOSYSTEM" not in os.environ:
  1037. paths.append("/etc/gitconfig")
  1038. if sys.platform == "win32":
  1039. paths.extend(get_win_system_paths())
  1040. backends = []
  1041. for path in paths:
  1042. try:
  1043. cf = ConfigFile.from_path(path)
  1044. except FileNotFoundError:
  1045. continue
  1046. backends.append(cf)
  1047. return backends
  1048. def get(self, section: SectionLike, name: NameLike) -> Value:
  1049. if not isinstance(section, tuple):
  1050. section = (section,)
  1051. for backend in self.backends:
  1052. try:
  1053. return backend.get(section, name)
  1054. except KeyError:
  1055. pass
  1056. raise KeyError(name)
  1057. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  1058. if not isinstance(section, tuple):
  1059. section = (section,)
  1060. for backend in self.backends:
  1061. try:
  1062. yield from backend.get_multivar(section, name)
  1063. except KeyError:
  1064. pass
  1065. def set(
  1066. self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
  1067. ) -> None:
  1068. if self.writable is None:
  1069. raise NotImplementedError(self.set)
  1070. return self.writable.set(section, name, value)
  1071. def sections(self) -> Iterator[Section]:
  1072. seen = set()
  1073. for backend in self.backends:
  1074. for section in backend.sections():
  1075. if section not in seen:
  1076. seen.add(section)
  1077. yield section
  1078. def read_submodules(
  1079. path: Union[str, os.PathLike],
  1080. ) -> Iterator[tuple[bytes, bytes, bytes]]:
  1081. """Read a .gitmodules file."""
  1082. cfg = ConfigFile.from_path(path)
  1083. return parse_submodules(cfg)
  1084. def parse_submodules(config: ConfigFile) -> Iterator[tuple[bytes, bytes, bytes]]:
  1085. """Parse a gitmodules GitConfig file, returning submodules.
  1086. Args:
  1087. config: A `ConfigFile`
  1088. Returns:
  1089. list of tuples (submodule path, url, name),
  1090. where name is quoted part of the section's name.
  1091. """
  1092. for section in config.sections():
  1093. section_kind, section_name = section
  1094. if section_kind == b"submodule":
  1095. try:
  1096. sm_path = config.get(section, b"path")
  1097. sm_url = config.get(section, b"url")
  1098. yield (sm_path, sm_url, section_name)
  1099. except KeyError:
  1100. # If either path or url is missing, just ignore this
  1101. # submodule entry and move on to the next one. This is
  1102. # how git itself handles malformed .gitmodule entries.
  1103. pass
  1104. def iter_instead_of(config: Config, push: bool = False) -> Iterable[tuple[str, str]]:
  1105. """Iterate over insteadOf / pushInsteadOf values."""
  1106. for section in config.sections():
  1107. if section[0] != b"url":
  1108. continue
  1109. replacement = section[1]
  1110. try:
  1111. needles = list(config.get_multivar(section, "insteadOf"))
  1112. except KeyError:
  1113. needles = []
  1114. if push:
  1115. try:
  1116. needles += list(config.get_multivar(section, "pushInsteadOf"))
  1117. except KeyError:
  1118. pass
  1119. for needle in needles:
  1120. assert isinstance(needle, bytes)
  1121. yield needle.decode("utf-8"), replacement.decode("utf-8")
  1122. def apply_instead_of(config: Config, orig_url: str, push: bool = False) -> str:
  1123. """Apply insteadOf / pushInsteadOf to a URL."""
  1124. longest_needle = ""
  1125. updated_url = orig_url
  1126. for needle, replacement in iter_instead_of(config, push):
  1127. if not orig_url.startswith(needle):
  1128. continue
  1129. if len(longest_needle) < len(needle):
  1130. longest_needle = needle
  1131. updated_url = replacement + orig_url[len(needle) :]
  1132. return updated_url