config.py 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338
  1. # config.py - Reading and writing Git config files
  2. # Copyright (C) 2011-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Reading and writing Git configuration files.
  22. Todo:
  23. * preserve formatting when updating configuration files
  24. """
  25. import logging
  26. import os
  27. import re
  28. import sys
  29. from collections.abc import (
  30. ItemsView,
  31. Iterable,
  32. Iterator,
  33. KeysView,
  34. MutableMapping,
  35. ValuesView,
  36. )
  37. from contextlib import suppress
  38. from pathlib import Path
  39. from typing import (
  40. IO,
  41. Any,
  42. BinaryIO,
  43. Callable,
  44. Generic,
  45. Optional,
  46. TypeVar,
  47. Union,
  48. overload,
  49. )
  50. from .file import GitFile, _GitFile
  51. ConfigKey = Union[str, bytes, tuple[Union[str, bytes], ...]]
  52. ConfigValue = Union[str, bytes, bool, int]
  53. logger = logging.getLogger(__name__)
  54. # Type for file opener callback
  55. FileOpener = Callable[[Union[str, os.PathLike]], BinaryIO]
  56. # Type for includeIf condition matcher
  57. # Takes the condition value (e.g., "main" for onbranch:main) and returns bool
  58. ConditionMatcher = Callable[[str], bool]
  59. # Security limits for include files
  60. MAX_INCLUDE_FILE_SIZE = 1024 * 1024 # 1MB max for included config files
  61. DEFAULT_MAX_INCLUDE_DEPTH = 10 # Maximum recursion depth for includes
  62. def _match_gitdir_pattern(
  63. path: bytes, pattern: bytes, ignorecase: bool = False
  64. ) -> bool:
  65. """Simple gitdir pattern matching for includeIf conditions.
  66. This handles the basic gitdir patterns used in includeIf directives.
  67. """
  68. # Convert to strings for easier manipulation
  69. path_str = path.decode("utf-8", errors="replace")
  70. pattern_str = pattern.decode("utf-8", errors="replace")
  71. # Normalize paths to use forward slashes for consistent matching
  72. path_str = path_str.replace("\\", "/")
  73. pattern_str = pattern_str.replace("\\", "/")
  74. if ignorecase:
  75. path_str = path_str.lower()
  76. pattern_str = pattern_str.lower()
  77. # Handle the common cases for gitdir patterns
  78. if pattern_str.startswith("**/") and pattern_str.endswith("/**"):
  79. # Pattern like **/dirname/** should match any path containing dirname
  80. dirname = pattern_str[3:-3] # Remove **/ and /**
  81. # Check if path contains the directory name as a path component
  82. return ("/" + dirname + "/") in path_str or path_str.endswith("/" + dirname)
  83. elif pattern_str.startswith("**/"):
  84. # Pattern like **/filename
  85. suffix = pattern_str[3:] # Remove **/
  86. return suffix in path_str or path_str.endswith("/" + suffix)
  87. elif pattern_str.endswith("/**"):
  88. # Pattern like /path/to/dir/** should match /path/to/dir and any subdirectory
  89. base_pattern = pattern_str[:-3] # Remove /**
  90. return path_str == base_pattern or path_str.startswith(base_pattern + "/")
  91. elif "**" in pattern_str:
  92. # Handle patterns with ** in the middle
  93. parts = pattern_str.split("**")
  94. if len(parts) == 2:
  95. prefix, suffix = parts
  96. # Path must start with prefix and end with suffix (if any)
  97. if prefix and not path_str.startswith(prefix):
  98. return False
  99. if suffix and not path_str.endswith(suffix):
  100. return False
  101. return True
  102. # Direct match or simple glob pattern
  103. if "*" in pattern_str or "?" in pattern_str or "[" in pattern_str:
  104. import fnmatch
  105. return fnmatch.fnmatch(path_str, pattern_str)
  106. else:
  107. return path_str == pattern_str
  108. def match_glob_pattern(value: str, pattern: str) -> bool:
  109. r"""Match a value against a glob pattern.
  110. Supports simple glob patterns like ``*`` and ``**``.
  111. Raises:
  112. ValueError: If the pattern is invalid
  113. """
  114. # Convert glob pattern to regex
  115. pattern_escaped = re.escape(pattern)
  116. # Replace escaped \*\* with .* (match anything)
  117. pattern_escaped = pattern_escaped.replace(r"\*\*", ".*")
  118. # Replace escaped \* with [^/]* (match anything except /)
  119. pattern_escaped = pattern_escaped.replace(r"\*", "[^/]*")
  120. # Anchor the pattern
  121. pattern_regex = f"^{pattern_escaped}$"
  122. try:
  123. return bool(re.match(pattern_regex, value))
  124. except re.error as e:
  125. raise ValueError(f"Invalid glob pattern {pattern!r}: {e}")
  126. def lower_key(key: ConfigKey) -> ConfigKey:
  127. if isinstance(key, (bytes, str)):
  128. return key.lower()
  129. if isinstance(key, tuple):
  130. # For config sections, only lowercase the section name (first element)
  131. # but preserve the case of subsection names (remaining elements)
  132. if len(key) > 0:
  133. first = key[0]
  134. assert isinstance(first, (bytes, str))
  135. return (first.lower(), *key[1:])
  136. return key
  137. raise TypeError(key)
  138. K = TypeVar("K", bound=ConfigKey) # Key type must be ConfigKey
  139. V = TypeVar("V") # Value type
  140. _T = TypeVar("_T") # For get() default parameter
  141. class CaseInsensitiveOrderedMultiDict(MutableMapping[K, V], Generic[K, V]):
  142. def __init__(self, default_factory: Optional[Callable[[], V]] = None) -> None:
  143. self._real: list[tuple[K, V]] = []
  144. self._keyed: dict[Any, V] = {}
  145. self._default_factory = default_factory
  146. @classmethod
  147. def make(
  148. cls, dict_in=None, default_factory=None
  149. ) -> "CaseInsensitiveOrderedMultiDict[K, V]":
  150. if isinstance(dict_in, cls):
  151. return dict_in
  152. out = cls(default_factory=default_factory)
  153. if dict_in is None:
  154. return out
  155. if not isinstance(dict_in, MutableMapping):
  156. raise TypeError
  157. for key, value in dict_in.items():
  158. out[key] = value
  159. return out
  160. def __len__(self) -> int:
  161. return len(self._keyed)
  162. def keys(self) -> KeysView[K]:
  163. return self._keyed.keys() # type: ignore[return-value]
  164. def items(self) -> ItemsView[K, V]:
  165. # Return a view that iterates over the real list to preserve order
  166. class OrderedItemsView(ItemsView[K, V]):
  167. def __init__(self, mapping: CaseInsensitiveOrderedMultiDict[K, V]):
  168. self._mapping = mapping
  169. def __iter__(self) -> Iterator[tuple[K, V]]:
  170. return iter(self._mapping._real)
  171. def __len__(self) -> int:
  172. return len(self._mapping._real)
  173. def __contains__(self, item: object) -> bool:
  174. if not isinstance(item, tuple) or len(item) != 2:
  175. return False
  176. key, value = item
  177. return any(k == key and v == value for k, v in self._mapping._real)
  178. return OrderedItemsView(self)
  179. def __iter__(self) -> Iterator[K]:
  180. return iter(self._keyed)
  181. def values(self) -> ValuesView[V]:
  182. return self._keyed.values()
  183. def __setitem__(self, key, value) -> None:
  184. self._real.append((key, value))
  185. self._keyed[lower_key(key)] = value
  186. def set(self, key, value) -> None:
  187. # This method replaces all existing values for the key
  188. lower = lower_key(key)
  189. self._real = [(k, v) for k, v in self._real if lower_key(k) != lower]
  190. self._real.append((key, value))
  191. self._keyed[lower] = value
  192. def __delitem__(self, key) -> None:
  193. key = lower_key(key)
  194. del self._keyed[key]
  195. for i, (actual, unused_value) in reversed(list(enumerate(self._real))):
  196. if lower_key(actual) == key:
  197. del self._real[i]
  198. def __getitem__(self, item: K) -> V:
  199. return self._keyed[lower_key(item)]
  200. def get(self, key: K, /, default: Union[V, _T, None] = None) -> Union[V, _T, None]: # type: ignore[override]
  201. try:
  202. return self[key]
  203. except KeyError:
  204. if default is not None:
  205. return default
  206. elif self._default_factory is not None:
  207. return self._default_factory()
  208. else:
  209. return None
  210. def get_all(self, key: K) -> Iterator[V]:
  211. lowered_key = lower_key(key)
  212. for actual, value in self._real:
  213. if lower_key(actual) == lowered_key:
  214. yield value
  215. def setdefault(self, key: K, default: Optional[V] = None) -> V:
  216. try:
  217. return self[key]
  218. except KeyError:
  219. if default is not None:
  220. self[key] = default
  221. return default
  222. elif self._default_factory is not None:
  223. value = self._default_factory()
  224. self[key] = value
  225. return value
  226. else:
  227. raise
  228. Name = bytes
  229. NameLike = Union[bytes, str]
  230. Section = tuple[bytes, ...]
  231. SectionLike = Union[bytes, str, tuple[Union[bytes, str], ...]]
  232. Value = bytes
  233. ValueLike = Union[bytes, str]
  234. class Config:
  235. """A Git configuration."""
  236. def get(self, section: SectionLike, name: NameLike) -> Value:
  237. """Retrieve the contents of a configuration setting.
  238. Args:
  239. section: Tuple with section name and optional subsection name
  240. name: Variable name
  241. Returns:
  242. Contents of the setting
  243. Raises:
  244. KeyError: if the value is not set
  245. """
  246. raise NotImplementedError(self.get)
  247. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  248. """Retrieve the contents of a multivar configuration setting.
  249. Args:
  250. section: Tuple with section name and optional subsection namee
  251. name: Variable name
  252. Returns:
  253. Contents of the setting as iterable
  254. Raises:
  255. KeyError: if the value is not set
  256. """
  257. raise NotImplementedError(self.get_multivar)
  258. @overload
  259. def get_boolean(
  260. self, section: SectionLike, name: NameLike, default: bool
  261. ) -> bool: ...
  262. @overload
  263. def get_boolean(self, section: SectionLike, name: NameLike) -> Optional[bool]: ...
  264. def get_boolean(
  265. self, section: SectionLike, name: NameLike, default: Optional[bool] = None
  266. ) -> Optional[bool]:
  267. """Retrieve a configuration setting as boolean.
  268. Args:
  269. section: Tuple with section name and optional subsection name
  270. name: Name of the setting, including section and possible
  271. subsection.
  272. Returns:
  273. Contents of the setting
  274. """
  275. try:
  276. value = self.get(section, name)
  277. except KeyError:
  278. return default
  279. if value.lower() == b"true":
  280. return True
  281. elif value.lower() == b"false":
  282. return False
  283. raise ValueError(f"not a valid boolean string: {value!r}")
  284. def set(
  285. self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
  286. ) -> None:
  287. """Set a configuration value.
  288. Args:
  289. section: Tuple with section name and optional subsection namee
  290. name: Name of the configuration value, including section
  291. and optional subsection
  292. value: value of the setting
  293. """
  294. raise NotImplementedError(self.set)
  295. def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
  296. """Iterate over the configuration pairs for a specific section.
  297. Args:
  298. section: Tuple with section name and optional subsection namee
  299. Returns:
  300. Iterator over (name, value) pairs
  301. """
  302. raise NotImplementedError(self.items)
  303. def sections(self) -> Iterator[Section]:
  304. """Iterate over the sections.
  305. Returns: Iterator over section tuples
  306. """
  307. raise NotImplementedError(self.sections)
  308. def has_section(self, name: Section) -> bool:
  309. """Check if a specified section exists.
  310. Args:
  311. name: Name of section to check for
  312. Returns:
  313. boolean indicating whether the section exists
  314. """
  315. return name in self.sections()
  316. class ConfigDict(Config):
  317. """Git configuration stored in a dictionary."""
  318. def __init__(
  319. self,
  320. values: Union[
  321. MutableMapping[Section, MutableMapping[Name, Value]], None
  322. ] = None,
  323. encoding: Union[str, None] = None,
  324. ) -> None:
  325. """Create a new ConfigDict."""
  326. if encoding is None:
  327. encoding = sys.getdefaultencoding()
  328. self.encoding = encoding
  329. self._values: CaseInsensitiveOrderedMultiDict[
  330. Section, CaseInsensitiveOrderedMultiDict[Name, Value]
  331. ] = CaseInsensitiveOrderedMultiDict.make(
  332. values, default_factory=CaseInsensitiveOrderedMultiDict
  333. )
  334. def __repr__(self) -> str:
  335. return f"{self.__class__.__name__}({self._values!r})"
  336. def __eq__(self, other: object) -> bool:
  337. return isinstance(other, self.__class__) and other._values == self._values
  338. def __getitem__(self, key: Section) -> CaseInsensitiveOrderedMultiDict[Name, Value]:
  339. return self._values.__getitem__(key)
  340. def __setitem__(self, key: Section, value: MutableMapping[Name, Value]) -> None:
  341. return self._values.__setitem__(key, value)
  342. def __delitem__(self, key: Section) -> None:
  343. return self._values.__delitem__(key)
  344. def __iter__(self) -> Iterator[Section]:
  345. return self._values.__iter__()
  346. def __len__(self) -> int:
  347. return self._values.__len__()
  348. def keys(self) -> KeysView[Section]:
  349. return self._values.keys()
  350. @classmethod
  351. def _parse_setting(cls, name: str) -> tuple[str, Optional[str], str]:
  352. parts = name.split(".")
  353. if len(parts) == 3:
  354. return (parts[0], parts[1], parts[2])
  355. else:
  356. return (parts[0], None, parts[1])
  357. def _check_section_and_name(
  358. self, section: SectionLike, name: NameLike
  359. ) -> tuple[Section, Name]:
  360. if not isinstance(section, tuple):
  361. section = (section,)
  362. checked_section = tuple(
  363. [
  364. subsection.encode(self.encoding)
  365. if not isinstance(subsection, bytes)
  366. else subsection
  367. for subsection in section
  368. ]
  369. )
  370. if not isinstance(name, bytes):
  371. name = name.encode(self.encoding)
  372. return checked_section, name
  373. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  374. section, name = self._check_section_and_name(section, name)
  375. if len(section) > 1:
  376. try:
  377. return self._values[section].get_all(name)
  378. except KeyError:
  379. pass
  380. return self._values[(section[0],)].get_all(name)
  381. def get(
  382. self,
  383. section: SectionLike,
  384. name: NameLike,
  385. ) -> Value:
  386. section, name = self._check_section_and_name(section, name)
  387. if len(section) > 1:
  388. try:
  389. return self._values[section][name]
  390. except KeyError:
  391. pass
  392. return self._values[(section[0],)][name]
  393. def set(
  394. self,
  395. section: SectionLike,
  396. name: NameLike,
  397. value: Union[ValueLike, bool],
  398. ) -> None:
  399. section, name = self._check_section_and_name(section, name)
  400. if isinstance(value, bool):
  401. value = b"true" if value else b"false"
  402. if not isinstance(value, bytes):
  403. value = value.encode(self.encoding)
  404. section_dict = self._values.setdefault(section)
  405. if hasattr(section_dict, "set"):
  406. section_dict.set(name, value)
  407. else:
  408. section_dict[name] = value
  409. def add(
  410. self,
  411. section: SectionLike,
  412. name: NameLike,
  413. value: Union[ValueLike, bool],
  414. ) -> None:
  415. """Add a value to a configuration setting, creating a multivar if needed."""
  416. section, name = self._check_section_and_name(section, name)
  417. if isinstance(value, bool):
  418. value = b"true" if value else b"false"
  419. if not isinstance(value, bytes):
  420. value = value.encode(self.encoding)
  421. self._values.setdefault(section)[name] = value
  422. def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
  423. section_bytes, _ = self._check_section_and_name(section, b"")
  424. section_dict = self._values.get(section_bytes)
  425. if section_dict is not None:
  426. return iter(section_dict.items())
  427. return iter([])
  428. def sections(self) -> Iterator[Section]:
  429. return iter(self._values.keys())
  430. def _format_string(value: bytes) -> bytes:
  431. if (
  432. value.startswith((b" ", b"\t"))
  433. or value.endswith((b" ", b"\t"))
  434. or b"#" in value
  435. ):
  436. return b'"' + _escape_value(value) + b'"'
  437. else:
  438. return _escape_value(value)
  439. _ESCAPE_TABLE = {
  440. ord(b"\\"): ord(b"\\"),
  441. ord(b'"'): ord(b'"'),
  442. ord(b"n"): ord(b"\n"),
  443. ord(b"t"): ord(b"\t"),
  444. ord(b"b"): ord(b"\b"),
  445. }
  446. _COMMENT_CHARS = [ord(b"#"), ord(b";")]
  447. _WHITESPACE_CHARS = [ord(b"\t"), ord(b" ")]
  448. def _parse_string(value: bytes) -> bytes:
  449. value = bytearray(value.strip())
  450. ret = bytearray()
  451. whitespace = bytearray()
  452. in_quotes = False
  453. i = 0
  454. while i < len(value):
  455. c = value[i]
  456. if c == ord(b"\\"):
  457. i += 1
  458. if i >= len(value):
  459. # Backslash at end of string - treat as literal backslash
  460. if whitespace:
  461. ret.extend(whitespace)
  462. whitespace = bytearray()
  463. ret.append(ord(b"\\"))
  464. else:
  465. try:
  466. v = _ESCAPE_TABLE[value[i]]
  467. if whitespace:
  468. ret.extend(whitespace)
  469. whitespace = bytearray()
  470. ret.append(v)
  471. except KeyError:
  472. # Unknown escape sequence - treat backslash as literal and process next char normally
  473. if whitespace:
  474. ret.extend(whitespace)
  475. whitespace = bytearray()
  476. ret.append(ord(b"\\"))
  477. i -= 1 # Reprocess the character after the backslash
  478. elif c == ord(b'"'):
  479. in_quotes = not in_quotes
  480. elif c in _COMMENT_CHARS and not in_quotes:
  481. # the rest of the line is a comment
  482. break
  483. elif c in _WHITESPACE_CHARS:
  484. whitespace.append(c)
  485. else:
  486. if whitespace:
  487. ret.extend(whitespace)
  488. whitespace = bytearray()
  489. ret.append(c)
  490. i += 1
  491. if in_quotes:
  492. raise ValueError("missing end quote")
  493. return bytes(ret)
  494. def _escape_value(value: bytes) -> bytes:
  495. """Escape a value."""
  496. value = value.replace(b"\\", b"\\\\")
  497. value = value.replace(b"\r", b"\\r")
  498. value = value.replace(b"\n", b"\\n")
  499. value = value.replace(b"\t", b"\\t")
  500. value = value.replace(b'"', b'\\"')
  501. return value
  502. def _check_variable_name(name: bytes) -> bool:
  503. for i in range(len(name)):
  504. c = name[i : i + 1]
  505. if not c.isalnum() and c != b"-":
  506. return False
  507. return True
  508. def _check_section_name(name: bytes) -> bool:
  509. for i in range(len(name)):
  510. c = name[i : i + 1]
  511. if not c.isalnum() and c not in (b"-", b"."):
  512. return False
  513. return True
  514. def _strip_comments(line: bytes) -> bytes:
  515. comment_bytes = {ord(b"#"), ord(b";")}
  516. quote = ord(b'"')
  517. string_open = False
  518. # Normalize line to bytearray for simple 2/3 compatibility
  519. for i, character in enumerate(bytearray(line)):
  520. # Comment characters outside balanced quotes denote comment start
  521. if character == quote:
  522. string_open = not string_open
  523. elif not string_open and character in comment_bytes:
  524. return line[:i]
  525. return line
  526. def _is_line_continuation(value: bytes) -> bool:
  527. """Check if a value ends with a line continuation backslash.
  528. A line continuation occurs when a line ends with a backslash that is:
  529. 1. Not escaped (not preceded by another backslash)
  530. 2. Not within quotes
  531. Args:
  532. value: The value to check
  533. Returns:
  534. True if the value ends with a line continuation backslash
  535. """
  536. if not value.endswith((b"\\\n", b"\\\r\n")):
  537. return False
  538. # Remove only the newline characters, keep the content including the backslash
  539. if value.endswith(b"\\\r\n"):
  540. content = value[:-2] # Remove \r\n, keep the \
  541. else:
  542. content = value[:-1] # Remove \n, keep the \
  543. if not content.endswith(b"\\"):
  544. return False
  545. # Count consecutive backslashes at the end
  546. backslash_count = 0
  547. for i in range(len(content) - 1, -1, -1):
  548. if content[i : i + 1] == b"\\":
  549. backslash_count += 1
  550. else:
  551. break
  552. # If we have an odd number of backslashes, the last one is a line continuation
  553. # If we have an even number, they are all escaped and there's no continuation
  554. return backslash_count % 2 == 1
  555. def _parse_section_header_line(line: bytes) -> tuple[Section, bytes]:
  556. # Parse section header ("[bla]")
  557. line = _strip_comments(line).rstrip()
  558. in_quotes = False
  559. escaped = False
  560. for i, c in enumerate(line):
  561. if escaped:
  562. escaped = False
  563. continue
  564. if c == ord(b'"'):
  565. in_quotes = not in_quotes
  566. if c == ord(b"\\"):
  567. escaped = True
  568. if c == ord(b"]") and not in_quotes:
  569. last = i
  570. break
  571. else:
  572. raise ValueError("expected trailing ]")
  573. pts = line[1:last].split(b" ", 1)
  574. line = line[last + 1 :]
  575. section: Section
  576. if len(pts) == 2:
  577. # Handle subsections - Git allows more complex syntax for certain sections like includeIf
  578. if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
  579. # Standard quoted subsection
  580. pts[1] = pts[1][1:-1]
  581. elif pts[0] == b"includeIf":
  582. # Special handling for includeIf sections which can have complex conditions
  583. # Git allows these without strict quote validation
  584. pts[1] = pts[1].strip()
  585. if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
  586. pts[1] = pts[1][1:-1]
  587. else:
  588. # Other sections must have quoted subsections
  589. raise ValueError(f"Invalid subsection {pts[1]!r}")
  590. if not _check_section_name(pts[0]):
  591. raise ValueError(f"invalid section name {pts[0]!r}")
  592. section = (pts[0], pts[1])
  593. else:
  594. if not _check_section_name(pts[0]):
  595. raise ValueError(f"invalid section name {pts[0]!r}")
  596. pts = pts[0].split(b".", 1)
  597. if len(pts) == 2:
  598. section = (pts[0], pts[1])
  599. else:
  600. section = (pts[0],)
  601. return section, line
  602. class ConfigFile(ConfigDict):
  603. """A Git configuration file, like .git/config or ~/.gitconfig."""
  604. def __init__(
  605. self,
  606. values: Union[
  607. MutableMapping[Section, MutableMapping[Name, Value]], None
  608. ] = None,
  609. encoding: Union[str, None] = None,
  610. ) -> None:
  611. super().__init__(values=values, encoding=encoding)
  612. self.path: Optional[str] = None
  613. self._included_paths: set[str] = set() # Track included files to prevent cycles
  614. @classmethod
  615. def from_file(
  616. cls,
  617. f: BinaryIO,
  618. *,
  619. config_dir: Optional[str] = None,
  620. included_paths: Optional[set[str]] = None,
  621. include_depth: int = 0,
  622. max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
  623. file_opener: Optional[FileOpener] = None,
  624. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  625. ) -> "ConfigFile":
  626. """Read configuration from a file-like object.
  627. Args:
  628. f: File-like object to read from
  629. config_dir: Directory containing the config file (for relative includes)
  630. included_paths: Set of already included paths (to prevent cycles)
  631. include_depth: Current include depth (to prevent infinite recursion)
  632. max_include_depth: Maximum allowed include depth
  633. file_opener: Optional callback to open included files
  634. condition_matchers: Optional dict of condition matchers for includeIf
  635. """
  636. if include_depth > max_include_depth:
  637. # Prevent excessive recursion
  638. raise ValueError(f"Maximum include depth ({max_include_depth}) exceeded")
  639. ret = cls()
  640. if included_paths is not None:
  641. ret._included_paths = included_paths.copy()
  642. section: Optional[Section] = None
  643. setting = None
  644. continuation = None
  645. for lineno, line in enumerate(f.readlines()):
  646. if lineno == 0 and line.startswith(b"\xef\xbb\xbf"):
  647. line = line[3:]
  648. line = line.lstrip()
  649. if setting is None:
  650. if len(line) > 0 and line[:1] == b"[":
  651. section, line = _parse_section_header_line(line)
  652. ret._values.setdefault(section)
  653. if _strip_comments(line).strip() == b"":
  654. continue
  655. if section is None:
  656. raise ValueError(f"setting {line!r} without section")
  657. try:
  658. setting, value = line.split(b"=", 1)
  659. except ValueError:
  660. setting = line
  661. value = b"true"
  662. setting = setting.strip()
  663. if not _check_variable_name(setting):
  664. raise ValueError(f"invalid variable name {setting!r}")
  665. if _is_line_continuation(value):
  666. if value.endswith(b"\\\r\n"):
  667. continuation = value[:-3]
  668. else:
  669. continuation = value[:-2]
  670. else:
  671. continuation = None
  672. value = _parse_string(value)
  673. ret._values[section][setting] = value
  674. # Process include/includeIf directives
  675. ret._handle_include_directive(
  676. section,
  677. setting,
  678. value,
  679. config_dir=config_dir,
  680. include_depth=include_depth,
  681. max_include_depth=max_include_depth,
  682. file_opener=file_opener,
  683. condition_matchers=condition_matchers,
  684. )
  685. setting = None
  686. else: # continuation line
  687. assert continuation is not None
  688. if _is_line_continuation(line):
  689. if line.endswith(b"\\\r\n"):
  690. continuation += line[:-3]
  691. else:
  692. continuation += line[:-2]
  693. else:
  694. continuation += line
  695. value = _parse_string(continuation)
  696. assert section is not None # Already checked above
  697. ret._values[section][setting] = value
  698. # Process include/includeIf directives
  699. ret._handle_include_directive(
  700. section,
  701. setting,
  702. value,
  703. config_dir=config_dir,
  704. include_depth=include_depth,
  705. max_include_depth=max_include_depth,
  706. file_opener=file_opener,
  707. condition_matchers=condition_matchers,
  708. )
  709. continuation = None
  710. setting = None
  711. return ret
  712. def _handle_include_directive(
  713. self,
  714. section: Optional[Section],
  715. setting: bytes,
  716. value: bytes,
  717. *,
  718. config_dir: Optional[str],
  719. include_depth: int,
  720. max_include_depth: int,
  721. file_opener: Optional[FileOpener],
  722. condition_matchers: Optional[dict[str, ConditionMatcher]],
  723. ) -> None:
  724. """Handle include/includeIf directives during config parsing."""
  725. if (
  726. section is not None
  727. and setting == b"path"
  728. and (
  729. section[0].lower() == b"include"
  730. or (len(section) > 1 and section[0].lower() == b"includeif")
  731. )
  732. ):
  733. self._process_include(
  734. section,
  735. value,
  736. config_dir=config_dir,
  737. include_depth=include_depth,
  738. max_include_depth=max_include_depth,
  739. file_opener=file_opener,
  740. condition_matchers=condition_matchers,
  741. )
  742. def _process_include(
  743. self,
  744. section: Section,
  745. path_value: bytes,
  746. *,
  747. config_dir: Optional[str],
  748. include_depth: int,
  749. max_include_depth: int,
  750. file_opener: Optional[FileOpener],
  751. condition_matchers: Optional[dict[str, ConditionMatcher]],
  752. ) -> None:
  753. """Process an include or includeIf directive."""
  754. path_str = path_value.decode(self.encoding, errors="replace")
  755. # Handle includeIf conditions
  756. if len(section) > 1 and section[0].lower() == b"includeif":
  757. condition = section[1].decode(self.encoding, errors="replace")
  758. if not self._evaluate_includeif_condition(
  759. condition, config_dir, condition_matchers
  760. ):
  761. return
  762. # Resolve the include path
  763. include_path = self._resolve_include_path(path_str, config_dir)
  764. if not include_path:
  765. return
  766. # Check for circular includes
  767. try:
  768. abs_path = str(Path(include_path).resolve())
  769. except (OSError, ValueError) as e:
  770. # Invalid path - log and skip
  771. logger.debug("Invalid include path %r: %s", include_path, e)
  772. return
  773. if abs_path in self._included_paths:
  774. return
  775. # Load and merge the included file
  776. try:
  777. # Use provided file opener or default to GitFile
  778. if file_opener is None:
  779. def opener(path):
  780. return GitFile(path, "rb")
  781. else:
  782. opener = file_opener
  783. f = opener(include_path)
  784. except (OSError, ValueError) as e:
  785. # Git silently ignores missing or unreadable include files
  786. # Log for debugging purposes
  787. logger.debug("Invalid include path %r: %s", include_path, e)
  788. else:
  789. with f as included_file:
  790. # Track this path to prevent cycles
  791. self._included_paths.add(abs_path)
  792. # Parse the included file
  793. included_config = ConfigFile.from_file(
  794. included_file,
  795. config_dir=os.path.dirname(include_path),
  796. included_paths=self._included_paths,
  797. include_depth=include_depth + 1,
  798. max_include_depth=max_include_depth,
  799. file_opener=file_opener,
  800. condition_matchers=condition_matchers,
  801. )
  802. # Merge the included configuration
  803. self._merge_config(included_config)
  804. def _merge_config(self, other: "ConfigFile") -> None:
  805. """Merge another config file into this one."""
  806. for section, values in other._values.items():
  807. if section not in self._values:
  808. self._values[section] = CaseInsensitiveOrderedMultiDict()
  809. for key, value in values.items():
  810. self._values[section][key] = value
  811. def _resolve_include_path(
  812. self, path: str, config_dir: Optional[str]
  813. ) -> Optional[str]:
  814. """Resolve an include path to an absolute path."""
  815. # Expand ~ to home directory
  816. path = os.path.expanduser(path)
  817. # If path is relative and we have a config directory, make it relative to that
  818. if not os.path.isabs(path) and config_dir:
  819. path = os.path.join(config_dir, path)
  820. return path
  821. def _evaluate_includeif_condition(
  822. self,
  823. condition: str,
  824. config_dir: Optional[str] = None,
  825. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  826. ) -> bool:
  827. """Evaluate an includeIf condition."""
  828. # Try custom matchers first if provided
  829. if condition_matchers:
  830. for prefix, matcher in condition_matchers.items():
  831. if condition.startswith(prefix):
  832. return matcher(condition[len(prefix) :])
  833. # Fall back to built-in matchers
  834. if condition.startswith("hasconfig:"):
  835. return self._evaluate_hasconfig_condition(condition[10:])
  836. else:
  837. # Unknown condition type - log and ignore (Git behavior)
  838. logger.debug("Unknown includeIf condition: %r", condition)
  839. return False
  840. def _evaluate_hasconfig_condition(self, condition: str) -> bool:
  841. """Evaluate a hasconfig condition.
  842. Format: hasconfig:config.key:pattern
  843. Example: hasconfig:remote.*.url:ssh://org-*@github.com/**
  844. """
  845. # Split on the first colon to separate config key from pattern
  846. parts = condition.split(":", 1)
  847. if len(parts) != 2:
  848. logger.debug("Invalid hasconfig condition format: %r", condition)
  849. return False
  850. config_key, pattern = parts
  851. # Parse the config key to get section and name
  852. key_parts = config_key.split(".", 2)
  853. if len(key_parts) < 2:
  854. logger.debug("Invalid hasconfig config key: %r", config_key)
  855. return False
  856. # Handle wildcards in section names (e.g., remote.*)
  857. if len(key_parts) == 3 and key_parts[1] == "*":
  858. # Match any subsection
  859. section_prefix = key_parts[0].encode(self.encoding)
  860. name = key_parts[2].encode(self.encoding)
  861. # Check all sections that match the pattern
  862. for section in self.sections():
  863. if len(section) == 2 and section[0] == section_prefix:
  864. try:
  865. values = list(self.get_multivar(section, name))
  866. for value in values:
  867. if self._match_hasconfig_pattern(value, pattern):
  868. return True
  869. except KeyError:
  870. continue
  871. else:
  872. # Direct section lookup
  873. if len(key_parts) == 2:
  874. section = (key_parts[0].encode(self.encoding),)
  875. name = key_parts[1].encode(self.encoding)
  876. else:
  877. section = (
  878. key_parts[0].encode(self.encoding),
  879. key_parts[1].encode(self.encoding),
  880. )
  881. name = key_parts[2].encode(self.encoding)
  882. try:
  883. values = list(self.get_multivar(section, name))
  884. for value in values:
  885. if self._match_hasconfig_pattern(value, pattern):
  886. return True
  887. except KeyError:
  888. pass
  889. return False
  890. def _match_hasconfig_pattern(self, value: bytes, pattern: str) -> bool:
  891. """Match a config value against a hasconfig pattern.
  892. Supports simple glob patterns like ``*`` and ``**``.
  893. """
  894. value_str = value.decode(self.encoding, errors="replace")
  895. return match_glob_pattern(value_str, pattern)
  896. @classmethod
  897. def from_path(
  898. cls,
  899. path: Union[str, os.PathLike],
  900. *,
  901. max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
  902. file_opener: Optional[FileOpener] = None,
  903. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  904. ) -> "ConfigFile":
  905. """Read configuration from a file on disk.
  906. Args:
  907. path: Path to the configuration file
  908. max_include_depth: Maximum allowed include depth
  909. file_opener: Optional callback to open included files
  910. condition_matchers: Optional dict of condition matchers for includeIf
  911. """
  912. abs_path = os.fspath(path)
  913. config_dir = os.path.dirname(abs_path)
  914. # Use provided file opener or default to GitFile
  915. if file_opener is None:
  916. def opener(p):
  917. return GitFile(p, "rb")
  918. else:
  919. opener = file_opener
  920. with opener(abs_path) as f:
  921. ret = cls.from_file(
  922. f,
  923. config_dir=config_dir,
  924. max_include_depth=max_include_depth,
  925. file_opener=file_opener,
  926. condition_matchers=condition_matchers,
  927. )
  928. ret.path = abs_path
  929. return ret
  930. def write_to_path(self, path: Optional[Union[str, os.PathLike]] = None) -> None:
  931. """Write configuration to a file on disk."""
  932. if path is None:
  933. if self.path is None:
  934. raise ValueError("No path specified and no default path available")
  935. path_to_use: Union[str, os.PathLike] = self.path
  936. else:
  937. path_to_use = path
  938. with GitFile(path_to_use, "wb") as f:
  939. self.write_to_file(f)
  940. def write_to_file(self, f: Union[IO[bytes], _GitFile]) -> None:
  941. """Write configuration to a file-like object."""
  942. for section, values in self._values.items():
  943. try:
  944. section_name, subsection_name = section
  945. except ValueError:
  946. (section_name,) = section
  947. subsection_name = None
  948. if subsection_name is None:
  949. f.write(b"[" + section_name + b"]\n")
  950. else:
  951. f.write(b"[" + section_name + b' "' + subsection_name + b'"]\n')
  952. for key, value in values.items():
  953. value = _format_string(value)
  954. f.write(b"\t" + key + b" = " + value + b"\n")
  955. def get_xdg_config_home_path(*path_segments: str) -> str:
  956. xdg_config_home = os.environ.get(
  957. "XDG_CONFIG_HOME",
  958. os.path.expanduser("~/.config/"),
  959. )
  960. return os.path.join(xdg_config_home, *path_segments)
  961. def _find_git_in_win_path() -> Iterator[str]:
  962. for exe in ("git.exe", "git.cmd"):
  963. for path in os.environ.get("PATH", "").split(";"):
  964. if os.path.exists(os.path.join(path, exe)):
  965. # in windows native shells (powershell/cmd) exe path is
  966. # .../Git/bin/git.exe or .../Git/cmd/git.exe
  967. #
  968. # in git-bash exe path is .../Git/mingw64/bin/git.exe
  969. git_dir, _bin_dir = os.path.split(path)
  970. yield git_dir
  971. parent_dir, basename = os.path.split(git_dir)
  972. if basename == "mingw32" or basename == "mingw64":
  973. yield parent_dir
  974. break
  975. def _find_git_in_win_reg() -> Iterator[str]:
  976. import platform
  977. import winreg
  978. if platform.machine() == "AMD64":
  979. subkey = (
  980. "SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\"
  981. "CurrentVersion\\Uninstall\\Git_is1"
  982. )
  983. else:
  984. subkey = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\Git_is1"
  985. for key in (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE): # type: ignore
  986. with suppress(OSError):
  987. with winreg.OpenKey(key, subkey) as k: # type: ignore
  988. val, typ = winreg.QueryValueEx(k, "InstallLocation") # type: ignore
  989. if typ == winreg.REG_SZ: # type: ignore
  990. yield val
  991. # There is no set standard for system config dirs on windows. We try the
  992. # following:
  993. # - %PROGRAMDATA%/Git/config - (deprecated) Windows config dir per CGit docs
  994. # - %PROGRAMFILES%/Git/etc/gitconfig - Git for Windows (msysgit) config dir
  995. # Used if CGit installation (Git/bin/git.exe) is found in PATH in the
  996. # system registry
  997. def get_win_system_paths() -> Iterator[str]:
  998. if "PROGRAMDATA" in os.environ:
  999. yield os.path.join(os.environ["PROGRAMDATA"], "Git", "config")
  1000. for git_dir in _find_git_in_win_path():
  1001. yield os.path.join(git_dir, "etc", "gitconfig")
  1002. for git_dir in _find_git_in_win_reg():
  1003. yield os.path.join(git_dir, "etc", "gitconfig")
  1004. class StackedConfig(Config):
  1005. """Configuration which reads from multiple config files.."""
  1006. def __init__(
  1007. self, backends: list[ConfigFile], writable: Optional[ConfigFile] = None
  1008. ) -> None:
  1009. self.backends = backends
  1010. self.writable = writable
  1011. def __repr__(self) -> str:
  1012. return f"<{self.__class__.__name__} for {self.backends!r}>"
  1013. @classmethod
  1014. def default(cls) -> "StackedConfig":
  1015. return cls(cls.default_backends())
  1016. @classmethod
  1017. def default_backends(cls) -> list[ConfigFile]:
  1018. """Retrieve the default configuration.
  1019. See git-config(1) for details on the files searched.
  1020. """
  1021. paths = []
  1022. # Handle GIT_CONFIG_GLOBAL - overrides user config paths
  1023. try:
  1024. paths.append(os.environ["GIT_CONFIG_GLOBAL"])
  1025. except KeyError:
  1026. paths.append(os.path.expanduser("~/.gitconfig"))
  1027. paths.append(get_xdg_config_home_path("git", "config"))
  1028. # Handle GIT_CONFIG_SYSTEM and GIT_CONFIG_NOSYSTEM
  1029. try:
  1030. paths.append(os.environ["GIT_CONFIG_SYSTEM"])
  1031. except KeyError:
  1032. if "GIT_CONFIG_NOSYSTEM" not in os.environ:
  1033. paths.append("/etc/gitconfig")
  1034. if sys.platform == "win32":
  1035. paths.extend(get_win_system_paths())
  1036. backends = []
  1037. for path in paths:
  1038. try:
  1039. cf = ConfigFile.from_path(path)
  1040. except FileNotFoundError:
  1041. continue
  1042. backends.append(cf)
  1043. return backends
  1044. def get(self, section: SectionLike, name: NameLike) -> Value:
  1045. if not isinstance(section, tuple):
  1046. section = (section,)
  1047. for backend in self.backends:
  1048. try:
  1049. return backend.get(section, name)
  1050. except KeyError:
  1051. pass
  1052. raise KeyError(name)
  1053. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  1054. if not isinstance(section, tuple):
  1055. section = (section,)
  1056. for backend in self.backends:
  1057. try:
  1058. yield from backend.get_multivar(section, name)
  1059. except KeyError:
  1060. pass
  1061. def set(
  1062. self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
  1063. ) -> None:
  1064. if self.writable is None:
  1065. raise NotImplementedError(self.set)
  1066. return self.writable.set(section, name, value)
  1067. def sections(self) -> Iterator[Section]:
  1068. seen = set()
  1069. for backend in self.backends:
  1070. for section in backend.sections():
  1071. if section not in seen:
  1072. seen.add(section)
  1073. yield section
  1074. def read_submodules(
  1075. path: Union[str, os.PathLike],
  1076. ) -> Iterator[tuple[bytes, bytes, bytes]]:
  1077. """Read a .gitmodules file."""
  1078. cfg = ConfigFile.from_path(path)
  1079. return parse_submodules(cfg)
  1080. def parse_submodules(config: ConfigFile) -> Iterator[tuple[bytes, bytes, bytes]]:
  1081. """Parse a gitmodules GitConfig file, returning submodules.
  1082. Args:
  1083. config: A `ConfigFile`
  1084. Returns:
  1085. list of tuples (submodule path, url, name),
  1086. where name is quoted part of the section's name.
  1087. """
  1088. for section in config.sections():
  1089. section_kind, section_name = section
  1090. if section_kind == b"submodule":
  1091. try:
  1092. sm_path = config.get(section, b"path")
  1093. sm_url = config.get(section, b"url")
  1094. yield (sm_path, sm_url, section_name)
  1095. except KeyError:
  1096. # If either path or url is missing, just ignore this
  1097. # submodule entry and move on to the next one. This is
  1098. # how git itself handles malformed .gitmodule entries.
  1099. pass
  1100. def iter_instead_of(config: Config, push: bool = False) -> Iterable[tuple[str, str]]:
  1101. """Iterate over insteadOf / pushInsteadOf values."""
  1102. for section in config.sections():
  1103. if section[0] != b"url":
  1104. continue
  1105. replacement = section[1]
  1106. try:
  1107. needles = list(config.get_multivar(section, "insteadOf"))
  1108. except KeyError:
  1109. needles = []
  1110. if push:
  1111. try:
  1112. needles += list(config.get_multivar(section, "pushInsteadOf"))
  1113. except KeyError:
  1114. pass
  1115. for needle in needles:
  1116. assert isinstance(needle, bytes)
  1117. yield needle.decode("utf-8"), replacement.decode("utf-8")
  1118. def apply_instead_of(config: Config, orig_url: str, push: bool = False) -> str:
  1119. """Apply insteadOf / pushInsteadOf to a URL."""
  1120. longest_needle = ""
  1121. updated_url = orig_url
  1122. for needle, replacement in iter_instead_of(config, push):
  1123. if not orig_url.startswith(needle):
  1124. continue
  1125. if len(longest_needle) < len(needle):
  1126. longest_needle = needle
  1127. updated_url = replacement + orig_url[len(needle) :]
  1128. return updated_url