config.py 46 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347
  1. # config.py - Reading and writing Git config files
  2. # Copyright (C) 2011-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Reading and writing Git configuration files.
  22. Todo:
  23. * preserve formatting when updating configuration files
  24. """
  25. import logging
  26. import os
  27. import re
  28. import sys
  29. from collections.abc import (
  30. ItemsView,
  31. Iterable,
  32. Iterator,
  33. KeysView,
  34. MutableMapping,
  35. ValuesView,
  36. )
  37. from contextlib import suppress
  38. from pathlib import Path
  39. from typing import (
  40. IO,
  41. Any,
  42. BinaryIO,
  43. Callable,
  44. Generic,
  45. Optional,
  46. TypeVar,
  47. Union,
  48. cast,
  49. overload,
  50. )
  51. from .file import GitFile, _GitFile
  52. ConfigKey = Union[str, bytes, tuple[Union[str, bytes], ...]]
  53. ConfigValue = Union[str, bytes, bool, int]
  54. logger = logging.getLogger(__name__)
  55. # Type for file opener callback
  56. FileOpener = Callable[[Union[str, os.PathLike]], BinaryIO]
  57. # Type for includeIf condition matcher
  58. # Takes the condition value (e.g., "main" for onbranch:main) and returns bool
  59. ConditionMatcher = Callable[[str], bool]
  60. # Security limits for include files
  61. MAX_INCLUDE_FILE_SIZE = 1024 * 1024 # 1MB max for included config files
  62. DEFAULT_MAX_INCLUDE_DEPTH = 10 # Maximum recursion depth for includes
  63. def _match_gitdir_pattern(
  64. path: bytes, pattern: bytes, ignorecase: bool = False
  65. ) -> bool:
  66. """Simple gitdir pattern matching for includeIf conditions.
  67. This handles the basic gitdir patterns used in includeIf directives.
  68. """
  69. # Convert to strings for easier manipulation
  70. path_str = path.decode("utf-8", errors="replace")
  71. pattern_str = pattern.decode("utf-8", errors="replace")
  72. # Normalize paths to use forward slashes for consistent matching
  73. path_str = path_str.replace("\\", "/")
  74. pattern_str = pattern_str.replace("\\", "/")
  75. if ignorecase:
  76. path_str = path_str.lower()
  77. pattern_str = pattern_str.lower()
  78. # Handle the common cases for gitdir patterns
  79. if pattern_str.startswith("**/") and pattern_str.endswith("/**"):
  80. # Pattern like **/dirname/** should match any path containing dirname
  81. dirname = pattern_str[3:-3] # Remove **/ and /**
  82. # Check if path contains the directory name as a path component
  83. return ("/" + dirname + "/") in path_str or path_str.endswith("/" + dirname)
  84. elif pattern_str.startswith("**/"):
  85. # Pattern like **/filename
  86. suffix = pattern_str[3:] # Remove **/
  87. return suffix in path_str or path_str.endswith("/" + suffix)
  88. elif pattern_str.endswith("/**"):
  89. # Pattern like /path/to/dir/** should match /path/to/dir and any subdirectory
  90. base_pattern = pattern_str[:-3] # Remove /**
  91. return path_str == base_pattern or path_str.startswith(base_pattern + "/")
  92. elif "**" in pattern_str:
  93. # Handle patterns with ** in the middle
  94. parts = pattern_str.split("**")
  95. if len(parts) == 2:
  96. prefix, suffix = parts
  97. # Path must start with prefix and end with suffix (if any)
  98. if prefix and not path_str.startswith(prefix):
  99. return False
  100. if suffix and not path_str.endswith(suffix):
  101. return False
  102. return True
  103. # Direct match or simple glob pattern
  104. if "*" in pattern_str or "?" in pattern_str or "[" in pattern_str:
  105. import fnmatch
  106. return fnmatch.fnmatch(path_str, pattern_str)
  107. else:
  108. return path_str == pattern_str
  109. def match_glob_pattern(value: str, pattern: str) -> bool:
  110. r"""Match a value against a glob pattern.
  111. Supports simple glob patterns like ``*`` and ``**``.
  112. Raises:
  113. ValueError: If the pattern is invalid
  114. """
  115. # Convert glob pattern to regex
  116. pattern_escaped = re.escape(pattern)
  117. # Replace escaped \*\* with .* (match anything)
  118. pattern_escaped = pattern_escaped.replace(r"\*\*", ".*")
  119. # Replace escaped \* with [^/]* (match anything except /)
  120. pattern_escaped = pattern_escaped.replace(r"\*", "[^/]*")
  121. # Anchor the pattern
  122. pattern_regex = f"^{pattern_escaped}$"
  123. try:
  124. return bool(re.match(pattern_regex, value))
  125. except re.error as e:
  126. raise ValueError(f"Invalid glob pattern {pattern!r}: {e}")
  127. def lower_key(key: ConfigKey) -> ConfigKey:
  128. if isinstance(key, (bytes, str)):
  129. return key.lower()
  130. if isinstance(key, tuple):
  131. # For config sections, only lowercase the section name (first element)
  132. # but preserve the case of subsection names (remaining elements)
  133. if len(key) > 0:
  134. first = key[0]
  135. assert isinstance(first, (bytes, str))
  136. return (first.lower(), *key[1:])
  137. return key
  138. raise TypeError(key)
  139. K = TypeVar("K", bound=ConfigKey) # Key type must be ConfigKey
  140. V = TypeVar("V") # Value type
  141. _T = TypeVar("_T") # For get() default parameter
  142. class CaseInsensitiveOrderedMultiDict(MutableMapping[K, V], Generic[K, V]):
  143. def __init__(self, default_factory: Optional[Callable[[], V]] = None) -> None:
  144. self._real: list[tuple[K, V]] = []
  145. self._keyed: dict[Any, V] = {}
  146. self._default_factory = default_factory
  147. @classmethod
  148. def make(
  149. cls,
  150. dict_in: Optional[
  151. Union[MutableMapping[K, V], "CaseInsensitiveOrderedMultiDict[K, V]"]
  152. ] = None,
  153. default_factory: Optional[Callable[[], V]] = None,
  154. ) -> "CaseInsensitiveOrderedMultiDict[K, V]":
  155. if isinstance(dict_in, cls):
  156. return dict_in
  157. out = cls(default_factory=default_factory)
  158. if dict_in is None:
  159. return out
  160. if not isinstance(dict_in, MutableMapping):
  161. raise TypeError
  162. for key, value in dict_in.items():
  163. out[key] = value
  164. return out
  165. def __len__(self) -> int:
  166. return len(self._keyed)
  167. def keys(self) -> KeysView[K]:
  168. return self._keyed.keys() # type: ignore[return-value]
  169. def items(self) -> ItemsView[K, V]:
  170. # Return a view that iterates over the real list to preserve order
  171. class OrderedItemsView(ItemsView[K, V]):
  172. def __init__(self, mapping: CaseInsensitiveOrderedMultiDict[K, V]):
  173. self._mapping = mapping
  174. def __iter__(self) -> Iterator[tuple[K, V]]:
  175. return iter(self._mapping._real)
  176. def __len__(self) -> int:
  177. return len(self._mapping._real)
  178. def __contains__(self, item: object) -> bool:
  179. if not isinstance(item, tuple) or len(item) != 2:
  180. return False
  181. key, value = item
  182. return any(k == key and v == value for k, v in self._mapping._real)
  183. return OrderedItemsView(self)
  184. def __iter__(self) -> Iterator[K]:
  185. return iter(self._keyed)
  186. def values(self) -> ValuesView[V]:
  187. return self._keyed.values()
  188. def __setitem__(self, key: K, value: V) -> None:
  189. self._real.append((key, value))
  190. self._keyed[lower_key(key)] = value
  191. def set(self, key: K, value: V) -> None:
  192. # This method replaces all existing values for the key
  193. lower = lower_key(key)
  194. self._real = [(k, v) for k, v in self._real if lower_key(k) != lower]
  195. self._real.append((key, value))
  196. self._keyed[lower] = value
  197. def __delitem__(self, key: K) -> None:
  198. lower_k = lower_key(key)
  199. del self._keyed[lower_k]
  200. for i, (actual, unused_value) in reversed(list(enumerate(self._real))):
  201. if lower_key(actual) == lower_k:
  202. del self._real[i]
  203. def __getitem__(self, item: K) -> V:
  204. return self._keyed[lower_key(item)]
  205. def get(self, key: K, /, default: Union[V, _T, None] = None) -> Union[V, _T, None]: # type: ignore[override]
  206. try:
  207. return self[key]
  208. except KeyError:
  209. if default is not None:
  210. return default
  211. elif self._default_factory is not None:
  212. return self._default_factory()
  213. else:
  214. return None
  215. def get_all(self, key: K) -> Iterator[V]:
  216. lowered_key = lower_key(key)
  217. for actual, value in self._real:
  218. if lower_key(actual) == lowered_key:
  219. yield value
  220. def setdefault(self, key: K, default: Optional[V] = None) -> V:
  221. try:
  222. return self[key]
  223. except KeyError:
  224. if default is not None:
  225. self[key] = default
  226. return default
  227. elif self._default_factory is not None:
  228. value = self._default_factory()
  229. self[key] = value
  230. return value
  231. else:
  232. raise
  233. Name = bytes
  234. NameLike = Union[bytes, str]
  235. Section = tuple[bytes, ...]
  236. SectionLike = Union[bytes, str, tuple[Union[bytes, str], ...]]
  237. Value = bytes
  238. ValueLike = Union[bytes, str]
  239. class Config:
  240. """A Git configuration."""
  241. def get(self, section: SectionLike, name: NameLike) -> Value:
  242. """Retrieve the contents of a configuration setting.
  243. Args:
  244. section: Tuple with section name and optional subsection name
  245. name: Variable name
  246. Returns:
  247. Contents of the setting
  248. Raises:
  249. KeyError: if the value is not set
  250. """
  251. raise NotImplementedError(self.get)
  252. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  253. """Retrieve the contents of a multivar configuration setting.
  254. Args:
  255. section: Tuple with section name and optional subsection namee
  256. name: Variable name
  257. Returns:
  258. Contents of the setting as iterable
  259. Raises:
  260. KeyError: if the value is not set
  261. """
  262. raise NotImplementedError(self.get_multivar)
  263. @overload
  264. def get_boolean(
  265. self, section: SectionLike, name: NameLike, default: bool
  266. ) -> bool: ...
  267. @overload
  268. def get_boolean(self, section: SectionLike, name: NameLike) -> Optional[bool]: ...
  269. def get_boolean(
  270. self, section: SectionLike, name: NameLike, default: Optional[bool] = None
  271. ) -> Optional[bool]:
  272. """Retrieve a configuration setting as boolean.
  273. Args:
  274. section: Tuple with section name and optional subsection name
  275. name: Name of the setting, including section and possible
  276. subsection.
  277. Returns:
  278. Contents of the setting
  279. """
  280. try:
  281. value = self.get(section, name)
  282. except KeyError:
  283. return default
  284. if value.lower() == b"true":
  285. return True
  286. elif value.lower() == b"false":
  287. return False
  288. raise ValueError(f"not a valid boolean string: {value!r}")
  289. def set(
  290. self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
  291. ) -> None:
  292. """Set a configuration value.
  293. Args:
  294. section: Tuple with section name and optional subsection namee
  295. name: Name of the configuration value, including section
  296. and optional subsection
  297. value: value of the setting
  298. """
  299. raise NotImplementedError(self.set)
  300. def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
  301. """Iterate over the configuration pairs for a specific section.
  302. Args:
  303. section: Tuple with section name and optional subsection namee
  304. Returns:
  305. Iterator over (name, value) pairs
  306. """
  307. raise NotImplementedError(self.items)
  308. def sections(self) -> Iterator[Section]:
  309. """Iterate over the sections.
  310. Returns: Iterator over section tuples
  311. """
  312. raise NotImplementedError(self.sections)
  313. def has_section(self, name: Section) -> bool:
  314. """Check if a specified section exists.
  315. Args:
  316. name: Name of section to check for
  317. Returns:
  318. boolean indicating whether the section exists
  319. """
  320. return name in self.sections()
  321. class ConfigDict(Config):
  322. """Git configuration stored in a dictionary."""
  323. def __init__(
  324. self,
  325. values: Union[
  326. MutableMapping[Section, CaseInsensitiveOrderedMultiDict[Name, Value]], None
  327. ] = None,
  328. encoding: Union[str, None] = None,
  329. ) -> None:
  330. """Create a new ConfigDict."""
  331. if encoding is None:
  332. encoding = sys.getdefaultencoding()
  333. self.encoding = encoding
  334. self._values: CaseInsensitiveOrderedMultiDict[
  335. Section, CaseInsensitiveOrderedMultiDict[Name, Value]
  336. ] = CaseInsensitiveOrderedMultiDict.make(
  337. values, default_factory=CaseInsensitiveOrderedMultiDict
  338. )
  339. def __repr__(self) -> str:
  340. return f"{self.__class__.__name__}({self._values!r})"
  341. def __eq__(self, other: object) -> bool:
  342. return isinstance(other, self.__class__) and other._values == self._values
  343. def __getitem__(self, key: Section) -> CaseInsensitiveOrderedMultiDict[Name, Value]:
  344. return self._values.__getitem__(key)
  345. def __setitem__(
  346. self, key: Section, value: CaseInsensitiveOrderedMultiDict[Name, Value]
  347. ) -> None:
  348. return self._values.__setitem__(key, value)
  349. def __delitem__(self, key: Section) -> None:
  350. return self._values.__delitem__(key)
  351. def __iter__(self) -> Iterator[Section]:
  352. return self._values.__iter__()
  353. def __len__(self) -> int:
  354. return self._values.__len__()
  355. def keys(self) -> KeysView[Section]:
  356. return self._values.keys()
  357. @classmethod
  358. def _parse_setting(cls, name: str) -> tuple[str, Optional[str], str]:
  359. parts = name.split(".")
  360. if len(parts) == 3:
  361. return (parts[0], parts[1], parts[2])
  362. else:
  363. return (parts[0], None, parts[1])
  364. def _check_section_and_name(
  365. self, section: SectionLike, name: NameLike
  366. ) -> tuple[Section, Name]:
  367. if not isinstance(section, tuple):
  368. section = (section,)
  369. checked_section = tuple(
  370. [
  371. subsection.encode(self.encoding)
  372. if not isinstance(subsection, bytes)
  373. else subsection
  374. for subsection in section
  375. ]
  376. )
  377. if not isinstance(name, bytes):
  378. name = name.encode(self.encoding)
  379. return checked_section, name
  380. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  381. section, name = self._check_section_and_name(section, name)
  382. if len(section) > 1:
  383. try:
  384. return self._values[section].get_all(name)
  385. except KeyError:
  386. pass
  387. return self._values[(section[0],)].get_all(name)
  388. def get(
  389. self,
  390. section: SectionLike,
  391. name: NameLike,
  392. ) -> Value:
  393. section, name = self._check_section_and_name(section, name)
  394. if len(section) > 1:
  395. try:
  396. return self._values[section][name]
  397. except KeyError:
  398. pass
  399. return self._values[(section[0],)][name]
  400. def set(
  401. self,
  402. section: SectionLike,
  403. name: NameLike,
  404. value: Union[ValueLike, bool],
  405. ) -> None:
  406. section, name = self._check_section_and_name(section, name)
  407. if isinstance(value, bool):
  408. value = b"true" if value else b"false"
  409. if not isinstance(value, bytes):
  410. value = value.encode(self.encoding)
  411. section_dict = self._values.setdefault(section)
  412. if hasattr(section_dict, "set"):
  413. section_dict.set(name, value)
  414. else:
  415. section_dict[name] = value
  416. def add(
  417. self,
  418. section: SectionLike,
  419. name: NameLike,
  420. value: Union[ValueLike, bool],
  421. ) -> None:
  422. """Add a value to a configuration setting, creating a multivar if needed."""
  423. section, name = self._check_section_and_name(section, name)
  424. if isinstance(value, bool):
  425. value = b"true" if value else b"false"
  426. if not isinstance(value, bytes):
  427. value = value.encode(self.encoding)
  428. self._values.setdefault(section)[name] = value
  429. def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
  430. section_bytes, _ = self._check_section_and_name(section, b"")
  431. section_dict = self._values.get(section_bytes)
  432. if section_dict is not None:
  433. return iter(section_dict.items())
  434. return iter([])
  435. def sections(self) -> Iterator[Section]:
  436. return iter(self._values.keys())
  437. def _format_string(value: bytes) -> bytes:
  438. if (
  439. value.startswith((b" ", b"\t"))
  440. or value.endswith((b" ", b"\t"))
  441. or b"#" in value
  442. ):
  443. return b'"' + _escape_value(value) + b'"'
  444. else:
  445. return _escape_value(value)
  446. _ESCAPE_TABLE = {
  447. ord(b"\\"): ord(b"\\"),
  448. ord(b'"'): ord(b'"'),
  449. ord(b"n"): ord(b"\n"),
  450. ord(b"t"): ord(b"\t"),
  451. ord(b"b"): ord(b"\b"),
  452. }
  453. _COMMENT_CHARS = [ord(b"#"), ord(b";")]
  454. _WHITESPACE_CHARS = [ord(b"\t"), ord(b" ")]
  455. def _parse_string(value: bytes) -> bytes:
  456. value = bytearray(value.strip())
  457. ret = bytearray()
  458. whitespace = bytearray()
  459. in_quotes = False
  460. i = 0
  461. while i < len(value):
  462. c = value[i]
  463. if c == ord(b"\\"):
  464. i += 1
  465. if i >= len(value):
  466. # Backslash at end of string - treat as literal backslash
  467. if whitespace:
  468. ret.extend(whitespace)
  469. whitespace = bytearray()
  470. ret.append(ord(b"\\"))
  471. else:
  472. try:
  473. v = _ESCAPE_TABLE[value[i]]
  474. if whitespace:
  475. ret.extend(whitespace)
  476. whitespace = bytearray()
  477. ret.append(v)
  478. except KeyError:
  479. # Unknown escape sequence - treat backslash as literal and process next char normally
  480. if whitespace:
  481. ret.extend(whitespace)
  482. whitespace = bytearray()
  483. ret.append(ord(b"\\"))
  484. i -= 1 # Reprocess the character after the backslash
  485. elif c == ord(b'"'):
  486. in_quotes = not in_quotes
  487. elif c in _COMMENT_CHARS and not in_quotes:
  488. # the rest of the line is a comment
  489. break
  490. elif c in _WHITESPACE_CHARS:
  491. whitespace.append(c)
  492. else:
  493. if whitespace:
  494. ret.extend(whitespace)
  495. whitespace = bytearray()
  496. ret.append(c)
  497. i += 1
  498. if in_quotes:
  499. raise ValueError("missing end quote")
  500. return bytes(ret)
  501. def _escape_value(value: bytes) -> bytes:
  502. """Escape a value."""
  503. value = value.replace(b"\\", b"\\\\")
  504. value = value.replace(b"\r", b"\\r")
  505. value = value.replace(b"\n", b"\\n")
  506. value = value.replace(b"\t", b"\\t")
  507. value = value.replace(b'"', b'\\"')
  508. return value
  509. def _check_variable_name(name: bytes) -> bool:
  510. for i in range(len(name)):
  511. c = name[i : i + 1]
  512. if not c.isalnum() and c != b"-":
  513. return False
  514. return True
  515. def _check_section_name(name: bytes) -> bool:
  516. for i in range(len(name)):
  517. c = name[i : i + 1]
  518. if not c.isalnum() and c not in (b"-", b"."):
  519. return False
  520. return True
  521. def _strip_comments(line: bytes) -> bytes:
  522. comment_bytes = {ord(b"#"), ord(b";")}
  523. quote = ord(b'"')
  524. string_open = False
  525. # Normalize line to bytearray for simple 2/3 compatibility
  526. for i, character in enumerate(bytearray(line)):
  527. # Comment characters outside balanced quotes denote comment start
  528. if character == quote:
  529. string_open = not string_open
  530. elif not string_open and character in comment_bytes:
  531. return line[:i]
  532. return line
  533. def _is_line_continuation(value: bytes) -> bool:
  534. """Check if a value ends with a line continuation backslash.
  535. A line continuation occurs when a line ends with a backslash that is:
  536. 1. Not escaped (not preceded by another backslash)
  537. 2. Not within quotes
  538. Args:
  539. value: The value to check
  540. Returns:
  541. True if the value ends with a line continuation backslash
  542. """
  543. if not value.endswith((b"\\\n", b"\\\r\n")):
  544. return False
  545. # Remove only the newline characters, keep the content including the backslash
  546. if value.endswith(b"\\\r\n"):
  547. content = value[:-2] # Remove \r\n, keep the \
  548. else:
  549. content = value[:-1] # Remove \n, keep the \
  550. if not content.endswith(b"\\"):
  551. return False
  552. # Count consecutive backslashes at the end
  553. backslash_count = 0
  554. for i in range(len(content) - 1, -1, -1):
  555. if content[i : i + 1] == b"\\":
  556. backslash_count += 1
  557. else:
  558. break
  559. # If we have an odd number of backslashes, the last one is a line continuation
  560. # If we have an even number, they are all escaped and there's no continuation
  561. return backslash_count % 2 == 1
  562. def _parse_section_header_line(line: bytes) -> tuple[Section, bytes]:
  563. # Parse section header ("[bla]")
  564. line = _strip_comments(line).rstrip()
  565. in_quotes = False
  566. escaped = False
  567. for i, c in enumerate(line):
  568. if escaped:
  569. escaped = False
  570. continue
  571. if c == ord(b'"'):
  572. in_quotes = not in_quotes
  573. if c == ord(b"\\"):
  574. escaped = True
  575. if c == ord(b"]") and not in_quotes:
  576. last = i
  577. break
  578. else:
  579. raise ValueError("expected trailing ]")
  580. pts = line[1:last].split(b" ", 1)
  581. line = line[last + 1 :]
  582. section: Section
  583. if len(pts) == 2:
  584. # Handle subsections - Git allows more complex syntax for certain sections like includeIf
  585. if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
  586. # Standard quoted subsection
  587. pts[1] = pts[1][1:-1]
  588. elif pts[0] == b"includeIf":
  589. # Special handling for includeIf sections which can have complex conditions
  590. # Git allows these without strict quote validation
  591. pts[1] = pts[1].strip()
  592. if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
  593. pts[1] = pts[1][1:-1]
  594. else:
  595. # Other sections must have quoted subsections
  596. raise ValueError(f"Invalid subsection {pts[1]!r}")
  597. if not _check_section_name(pts[0]):
  598. raise ValueError(f"invalid section name {pts[0]!r}")
  599. section = (pts[0], pts[1])
  600. else:
  601. if not _check_section_name(pts[0]):
  602. raise ValueError(f"invalid section name {pts[0]!r}")
  603. pts = pts[0].split(b".", 1)
  604. if len(pts) == 2:
  605. section = (pts[0], pts[1])
  606. else:
  607. section = (pts[0],)
  608. return section, line
  609. class ConfigFile(ConfigDict):
  610. """A Git configuration file, like .git/config or ~/.gitconfig."""
  611. def __init__(
  612. self,
  613. values: Union[
  614. MutableMapping[Section, CaseInsensitiveOrderedMultiDict[Name, Value]], None
  615. ] = None,
  616. encoding: Union[str, None] = None,
  617. ) -> None:
  618. super().__init__(values=values, encoding=encoding)
  619. self.path: Optional[str] = None
  620. self._included_paths: set[str] = set() # Track included files to prevent cycles
  621. @classmethod
  622. def from_file(
  623. cls,
  624. f: BinaryIO,
  625. *,
  626. config_dir: Optional[str] = None,
  627. included_paths: Optional[set[str]] = None,
  628. include_depth: int = 0,
  629. max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
  630. file_opener: Optional[FileOpener] = None,
  631. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  632. ) -> "ConfigFile":
  633. """Read configuration from a file-like object.
  634. Args:
  635. f: File-like object to read from
  636. config_dir: Directory containing the config file (for relative includes)
  637. included_paths: Set of already included paths (to prevent cycles)
  638. include_depth: Current include depth (to prevent infinite recursion)
  639. max_include_depth: Maximum allowed include depth
  640. file_opener: Optional callback to open included files
  641. condition_matchers: Optional dict of condition matchers for includeIf
  642. """
  643. if include_depth > max_include_depth:
  644. # Prevent excessive recursion
  645. raise ValueError(f"Maximum include depth ({max_include_depth}) exceeded")
  646. ret = cls()
  647. if included_paths is not None:
  648. ret._included_paths = included_paths.copy()
  649. section: Optional[Section] = None
  650. setting = None
  651. continuation = None
  652. for lineno, line in enumerate(f.readlines()):
  653. if lineno == 0 and line.startswith(b"\xef\xbb\xbf"):
  654. line = line[3:]
  655. line = line.lstrip()
  656. if setting is None:
  657. if len(line) > 0 and line[:1] == b"[":
  658. section, line = _parse_section_header_line(line)
  659. ret._values.setdefault(section)
  660. if _strip_comments(line).strip() == b"":
  661. continue
  662. if section is None:
  663. raise ValueError(f"setting {line!r} without section")
  664. try:
  665. setting, value = line.split(b"=", 1)
  666. except ValueError:
  667. setting = line
  668. value = b"true"
  669. setting = setting.strip()
  670. if not _check_variable_name(setting):
  671. raise ValueError(f"invalid variable name {setting!r}")
  672. if _is_line_continuation(value):
  673. if value.endswith(b"\\\r\n"):
  674. continuation = value[:-3]
  675. else:
  676. continuation = value[:-2]
  677. else:
  678. continuation = None
  679. value = _parse_string(value)
  680. ret._values[section][setting] = value
  681. # Process include/includeIf directives
  682. ret._handle_include_directive(
  683. section,
  684. setting,
  685. value,
  686. config_dir=config_dir,
  687. include_depth=include_depth,
  688. max_include_depth=max_include_depth,
  689. file_opener=file_opener,
  690. condition_matchers=condition_matchers,
  691. )
  692. setting = None
  693. else: # continuation line
  694. assert continuation is not None
  695. if _is_line_continuation(line):
  696. if line.endswith(b"\\\r\n"):
  697. continuation += line[:-3]
  698. else:
  699. continuation += line[:-2]
  700. else:
  701. continuation += line
  702. value = _parse_string(continuation)
  703. assert section is not None # Already checked above
  704. ret._values[section][setting] = value
  705. # Process include/includeIf directives
  706. ret._handle_include_directive(
  707. section,
  708. setting,
  709. value,
  710. config_dir=config_dir,
  711. include_depth=include_depth,
  712. max_include_depth=max_include_depth,
  713. file_opener=file_opener,
  714. condition_matchers=condition_matchers,
  715. )
  716. continuation = None
  717. setting = None
  718. return ret
  719. def _handle_include_directive(
  720. self,
  721. section: Optional[Section],
  722. setting: bytes,
  723. value: bytes,
  724. *,
  725. config_dir: Optional[str],
  726. include_depth: int,
  727. max_include_depth: int,
  728. file_opener: Optional[FileOpener],
  729. condition_matchers: Optional[dict[str, ConditionMatcher]],
  730. ) -> None:
  731. """Handle include/includeIf directives during config parsing."""
  732. if (
  733. section is not None
  734. and setting == b"path"
  735. and (
  736. section[0].lower() == b"include"
  737. or (len(section) > 1 and section[0].lower() == b"includeif")
  738. )
  739. ):
  740. self._process_include(
  741. section,
  742. value,
  743. config_dir=config_dir,
  744. include_depth=include_depth,
  745. max_include_depth=max_include_depth,
  746. file_opener=file_opener,
  747. condition_matchers=condition_matchers,
  748. )
  749. def _process_include(
  750. self,
  751. section: Section,
  752. path_value: bytes,
  753. *,
  754. config_dir: Optional[str],
  755. include_depth: int,
  756. max_include_depth: int,
  757. file_opener: Optional[FileOpener],
  758. condition_matchers: Optional[dict[str, ConditionMatcher]],
  759. ) -> None:
  760. """Process an include or includeIf directive."""
  761. path_str = path_value.decode(self.encoding, errors="replace")
  762. # Handle includeIf conditions
  763. if len(section) > 1 and section[0].lower() == b"includeif":
  764. condition = section[1].decode(self.encoding, errors="replace")
  765. if not self._evaluate_includeif_condition(
  766. condition, config_dir, condition_matchers
  767. ):
  768. return
  769. # Resolve the include path
  770. include_path = self._resolve_include_path(path_str, config_dir)
  771. if not include_path:
  772. return
  773. # Check for circular includes
  774. try:
  775. abs_path = str(Path(include_path).resolve())
  776. except (OSError, ValueError) as e:
  777. # Invalid path - log and skip
  778. logger.debug("Invalid include path %r: %s", include_path, e)
  779. return
  780. if abs_path in self._included_paths:
  781. return
  782. # Load and merge the included file
  783. try:
  784. # Use provided file opener or default to GitFile
  785. opener: FileOpener
  786. if file_opener is None:
  787. def opener(path: Union[str, os.PathLike]) -> BinaryIO:
  788. return cast(BinaryIO, GitFile(path, "rb"))
  789. else:
  790. opener = file_opener
  791. f = opener(include_path)
  792. except (OSError, ValueError) as e:
  793. # Git silently ignores missing or unreadable include files
  794. # Log for debugging purposes
  795. logger.debug("Invalid include path %r: %s", include_path, e)
  796. else:
  797. with f as included_file:
  798. # Track this path to prevent cycles
  799. self._included_paths.add(abs_path)
  800. # Parse the included file
  801. included_config = ConfigFile.from_file(
  802. included_file,
  803. config_dir=os.path.dirname(include_path),
  804. included_paths=self._included_paths,
  805. include_depth=include_depth + 1,
  806. max_include_depth=max_include_depth,
  807. file_opener=file_opener,
  808. condition_matchers=condition_matchers,
  809. )
  810. # Merge the included configuration
  811. self._merge_config(included_config)
  812. def _merge_config(self, other: "ConfigFile") -> None:
  813. """Merge another config file into this one."""
  814. for section, values in other._values.items():
  815. if section not in self._values:
  816. self._values[section] = CaseInsensitiveOrderedMultiDict()
  817. for key, value in values.items():
  818. self._values[section][key] = value
  819. def _resolve_include_path(
  820. self, path: str, config_dir: Optional[str]
  821. ) -> Optional[str]:
  822. """Resolve an include path to an absolute path."""
  823. # Expand ~ to home directory
  824. path = os.path.expanduser(path)
  825. # If path is relative and we have a config directory, make it relative to that
  826. if not os.path.isabs(path) and config_dir:
  827. path = os.path.join(config_dir, path)
  828. return path
  829. def _evaluate_includeif_condition(
  830. self,
  831. condition: str,
  832. config_dir: Optional[str] = None,
  833. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  834. ) -> bool:
  835. """Evaluate an includeIf condition."""
  836. # Try custom matchers first if provided
  837. if condition_matchers:
  838. for prefix, matcher in condition_matchers.items():
  839. if condition.startswith(prefix):
  840. return matcher(condition[len(prefix) :])
  841. # Fall back to built-in matchers
  842. if condition.startswith("hasconfig:"):
  843. return self._evaluate_hasconfig_condition(condition[10:])
  844. else:
  845. # Unknown condition type - log and ignore (Git behavior)
  846. logger.debug("Unknown includeIf condition: %r", condition)
  847. return False
  848. def _evaluate_hasconfig_condition(self, condition: str) -> bool:
  849. """Evaluate a hasconfig condition.
  850. Format: hasconfig:config.key:pattern
  851. Example: hasconfig:remote.*.url:ssh://org-*@github.com/**
  852. """
  853. # Split on the first colon to separate config key from pattern
  854. parts = condition.split(":", 1)
  855. if len(parts) != 2:
  856. logger.debug("Invalid hasconfig condition format: %r", condition)
  857. return False
  858. config_key, pattern = parts
  859. # Parse the config key to get section and name
  860. key_parts = config_key.split(".", 2)
  861. if len(key_parts) < 2:
  862. logger.debug("Invalid hasconfig config key: %r", config_key)
  863. return False
  864. # Handle wildcards in section names (e.g., remote.*)
  865. if len(key_parts) == 3 and key_parts[1] == "*":
  866. # Match any subsection
  867. section_prefix = key_parts[0].encode(self.encoding)
  868. name = key_parts[2].encode(self.encoding)
  869. # Check all sections that match the pattern
  870. for section in self.sections():
  871. if len(section) == 2 and section[0] == section_prefix:
  872. try:
  873. values = list(self.get_multivar(section, name))
  874. for value in values:
  875. if self._match_hasconfig_pattern(value, pattern):
  876. return True
  877. except KeyError:
  878. continue
  879. else:
  880. # Direct section lookup
  881. if len(key_parts) == 2:
  882. section = (key_parts[0].encode(self.encoding),)
  883. name = key_parts[1].encode(self.encoding)
  884. else:
  885. section = (
  886. key_parts[0].encode(self.encoding),
  887. key_parts[1].encode(self.encoding),
  888. )
  889. name = key_parts[2].encode(self.encoding)
  890. try:
  891. values = list(self.get_multivar(section, name))
  892. for value in values:
  893. if self._match_hasconfig_pattern(value, pattern):
  894. return True
  895. except KeyError:
  896. pass
  897. return False
  898. def _match_hasconfig_pattern(self, value: bytes, pattern: str) -> bool:
  899. """Match a config value against a hasconfig pattern.
  900. Supports simple glob patterns like ``*`` and ``**``.
  901. """
  902. value_str = value.decode(self.encoding, errors="replace")
  903. return match_glob_pattern(value_str, pattern)
  904. @classmethod
  905. def from_path(
  906. cls,
  907. path: Union[str, os.PathLike],
  908. *,
  909. max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
  910. file_opener: Optional[FileOpener] = None,
  911. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  912. ) -> "ConfigFile":
  913. """Read configuration from a file on disk.
  914. Args:
  915. path: Path to the configuration file
  916. max_include_depth: Maximum allowed include depth
  917. file_opener: Optional callback to open included files
  918. condition_matchers: Optional dict of condition matchers for includeIf
  919. """
  920. abs_path = os.fspath(path)
  921. config_dir = os.path.dirname(abs_path)
  922. # Use provided file opener or default to GitFile
  923. opener: FileOpener
  924. if file_opener is None:
  925. def opener(p: Union[str, os.PathLike]) -> BinaryIO:
  926. return cast(BinaryIO, GitFile(p, "rb"))
  927. else:
  928. opener = file_opener
  929. with opener(abs_path) as f:
  930. ret = cls.from_file(
  931. f,
  932. config_dir=config_dir,
  933. max_include_depth=max_include_depth,
  934. file_opener=file_opener,
  935. condition_matchers=condition_matchers,
  936. )
  937. ret.path = abs_path
  938. return ret
  939. def write_to_path(self, path: Optional[Union[str, os.PathLike]] = None) -> None:
  940. """Write configuration to a file on disk."""
  941. if path is None:
  942. if self.path is None:
  943. raise ValueError("No path specified and no default path available")
  944. path_to_use: Union[str, os.PathLike] = self.path
  945. else:
  946. path_to_use = path
  947. with GitFile(path_to_use, "wb") as f:
  948. self.write_to_file(f)
  949. def write_to_file(self, f: Union[IO[bytes], _GitFile]) -> None:
  950. """Write configuration to a file-like object."""
  951. for section, values in self._values.items():
  952. try:
  953. section_name, subsection_name = section
  954. except ValueError:
  955. (section_name,) = section
  956. subsection_name = None
  957. if subsection_name is None:
  958. f.write(b"[" + section_name + b"]\n")
  959. else:
  960. f.write(b"[" + section_name + b' "' + subsection_name + b'"]\n')
  961. for key, value in values.items():
  962. value = _format_string(value)
  963. f.write(b"\t" + key + b" = " + value + b"\n")
  964. def get_xdg_config_home_path(*path_segments: str) -> str:
  965. xdg_config_home = os.environ.get(
  966. "XDG_CONFIG_HOME",
  967. os.path.expanduser("~/.config/"),
  968. )
  969. return os.path.join(xdg_config_home, *path_segments)
  970. def _find_git_in_win_path() -> Iterator[str]:
  971. for exe in ("git.exe", "git.cmd"):
  972. for path in os.environ.get("PATH", "").split(";"):
  973. if os.path.exists(os.path.join(path, exe)):
  974. # in windows native shells (powershell/cmd) exe path is
  975. # .../Git/bin/git.exe or .../Git/cmd/git.exe
  976. #
  977. # in git-bash exe path is .../Git/mingw64/bin/git.exe
  978. git_dir, _bin_dir = os.path.split(path)
  979. yield git_dir
  980. parent_dir, basename = os.path.split(git_dir)
  981. if basename == "mingw32" or basename == "mingw64":
  982. yield parent_dir
  983. break
  984. def _find_git_in_win_reg() -> Iterator[str]:
  985. import platform
  986. import winreg
  987. if platform.machine() == "AMD64":
  988. subkey = (
  989. "SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\"
  990. "CurrentVersion\\Uninstall\\Git_is1"
  991. )
  992. else:
  993. subkey = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\Git_is1"
  994. for key in (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE): # type: ignore
  995. with suppress(OSError):
  996. with winreg.OpenKey(key, subkey) as k: # type: ignore
  997. val, typ = winreg.QueryValueEx(k, "InstallLocation") # type: ignore
  998. if typ == winreg.REG_SZ: # type: ignore
  999. yield val
  1000. # There is no set standard for system config dirs on windows. We try the
  1001. # following:
  1002. # - %PROGRAMDATA%/Git/config - (deprecated) Windows config dir per CGit docs
  1003. # - %PROGRAMFILES%/Git/etc/gitconfig - Git for Windows (msysgit) config dir
  1004. # Used if CGit installation (Git/bin/git.exe) is found in PATH in the
  1005. # system registry
  1006. def get_win_system_paths() -> Iterator[str]:
  1007. if "PROGRAMDATA" in os.environ:
  1008. yield os.path.join(os.environ["PROGRAMDATA"], "Git", "config")
  1009. for git_dir in _find_git_in_win_path():
  1010. yield os.path.join(git_dir, "etc", "gitconfig")
  1011. for git_dir in _find_git_in_win_reg():
  1012. yield os.path.join(git_dir, "etc", "gitconfig")
  1013. class StackedConfig(Config):
  1014. """Configuration which reads from multiple config files.."""
  1015. def __init__(
  1016. self, backends: list[ConfigFile], writable: Optional[ConfigFile] = None
  1017. ) -> None:
  1018. self.backends = backends
  1019. self.writable = writable
  1020. def __repr__(self) -> str:
  1021. return f"<{self.__class__.__name__} for {self.backends!r}>"
  1022. @classmethod
  1023. def default(cls) -> "StackedConfig":
  1024. return cls(cls.default_backends())
  1025. @classmethod
  1026. def default_backends(cls) -> list[ConfigFile]:
  1027. """Retrieve the default configuration.
  1028. See git-config(1) for details on the files searched.
  1029. """
  1030. paths = []
  1031. # Handle GIT_CONFIG_GLOBAL - overrides user config paths
  1032. try:
  1033. paths.append(os.environ["GIT_CONFIG_GLOBAL"])
  1034. except KeyError:
  1035. paths.append(os.path.expanduser("~/.gitconfig"))
  1036. paths.append(get_xdg_config_home_path("git", "config"))
  1037. # Handle GIT_CONFIG_SYSTEM and GIT_CONFIG_NOSYSTEM
  1038. try:
  1039. paths.append(os.environ["GIT_CONFIG_SYSTEM"])
  1040. except KeyError:
  1041. if "GIT_CONFIG_NOSYSTEM" not in os.environ:
  1042. paths.append("/etc/gitconfig")
  1043. if sys.platform == "win32":
  1044. paths.extend(get_win_system_paths())
  1045. backends = []
  1046. for path in paths:
  1047. try:
  1048. cf = ConfigFile.from_path(path)
  1049. except FileNotFoundError:
  1050. continue
  1051. backends.append(cf)
  1052. return backends
  1053. def get(self, section: SectionLike, name: NameLike) -> Value:
  1054. if not isinstance(section, tuple):
  1055. section = (section,)
  1056. for backend in self.backends:
  1057. try:
  1058. return backend.get(section, name)
  1059. except KeyError:
  1060. pass
  1061. raise KeyError(name)
  1062. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  1063. if not isinstance(section, tuple):
  1064. section = (section,)
  1065. for backend in self.backends:
  1066. try:
  1067. yield from backend.get_multivar(section, name)
  1068. except KeyError:
  1069. pass
  1070. def set(
  1071. self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
  1072. ) -> None:
  1073. if self.writable is None:
  1074. raise NotImplementedError(self.set)
  1075. return self.writable.set(section, name, value)
  1076. def sections(self) -> Iterator[Section]:
  1077. seen = set()
  1078. for backend in self.backends:
  1079. for section in backend.sections():
  1080. if section not in seen:
  1081. seen.add(section)
  1082. yield section
  1083. def read_submodules(
  1084. path: Union[str, os.PathLike],
  1085. ) -> Iterator[tuple[bytes, bytes, bytes]]:
  1086. """Read a .gitmodules file."""
  1087. cfg = ConfigFile.from_path(path)
  1088. return parse_submodules(cfg)
  1089. def parse_submodules(config: ConfigFile) -> Iterator[tuple[bytes, bytes, bytes]]:
  1090. """Parse a gitmodules GitConfig file, returning submodules.
  1091. Args:
  1092. config: A `ConfigFile`
  1093. Returns:
  1094. list of tuples (submodule path, url, name),
  1095. where name is quoted part of the section's name.
  1096. """
  1097. for section in config.sections():
  1098. section_kind, section_name = section
  1099. if section_kind == b"submodule":
  1100. try:
  1101. sm_path = config.get(section, b"path")
  1102. sm_url = config.get(section, b"url")
  1103. yield (sm_path, sm_url, section_name)
  1104. except KeyError:
  1105. # If either path or url is missing, just ignore this
  1106. # submodule entry and move on to the next one. This is
  1107. # how git itself handles malformed .gitmodule entries.
  1108. pass
  1109. def iter_instead_of(config: Config, push: bool = False) -> Iterable[tuple[str, str]]:
  1110. """Iterate over insteadOf / pushInsteadOf values."""
  1111. for section in config.sections():
  1112. if section[0] != b"url":
  1113. continue
  1114. replacement = section[1]
  1115. try:
  1116. needles = list(config.get_multivar(section, "insteadOf"))
  1117. except KeyError:
  1118. needles = []
  1119. if push:
  1120. try:
  1121. needles += list(config.get_multivar(section, "pushInsteadOf"))
  1122. except KeyError:
  1123. pass
  1124. for needle in needles:
  1125. assert isinstance(needle, bytes)
  1126. yield needle.decode("utf-8"), replacement.decode("utf-8")
  1127. def apply_instead_of(config: Config, orig_url: str, push: bool = False) -> str:
  1128. """Apply insteadOf / pushInsteadOf to a URL."""
  1129. longest_needle = ""
  1130. updated_url = orig_url
  1131. for needle, replacement in iter_instead_of(config, push):
  1132. if not orig_url.startswith(needle):
  1133. continue
  1134. if len(longest_needle) < len(needle):
  1135. longest_needle = needle
  1136. updated_url = replacement + orig_url[len(needle) :]
  1137. return updated_url