config.py 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328
  1. # config.py - Reading and writing Git config files
  2. # Copyright (C) 2011-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Reading and writing Git configuration files.
  22. Todo:
  23. * preserve formatting when updating configuration files
  24. """
  25. import logging
  26. import os
  27. import re
  28. import sys
  29. from collections.abc import (
  30. ItemsView,
  31. Iterable,
  32. Iterator,
  33. KeysView,
  34. MutableMapping,
  35. ValuesView,
  36. )
  37. from contextlib import suppress
  38. from pathlib import Path
  39. from typing import (
  40. Any,
  41. BinaryIO,
  42. Callable,
  43. Generic,
  44. Optional,
  45. TypeVar,
  46. Union,
  47. overload,
  48. )
  49. from .file import GitFile
  50. ConfigKey = Union[str, bytes, tuple[Union[str, bytes], ...]]
  51. ConfigValue = Union[str, bytes, bool, int]
  52. logger = logging.getLogger(__name__)
  53. # Type for file opener callback
  54. FileOpener = Callable[[Union[str, os.PathLike]], BinaryIO]
  55. # Type for includeIf condition matcher
  56. # Takes the condition value (e.g., "main" for onbranch:main) and returns bool
  57. ConditionMatcher = Callable[[str], bool]
  58. # Security limits for include files
  59. MAX_INCLUDE_FILE_SIZE = 1024 * 1024 # 1MB max for included config files
  60. DEFAULT_MAX_INCLUDE_DEPTH = 10 # Maximum recursion depth for includes
  61. def _match_gitdir_pattern(
  62. path: bytes, pattern: bytes, ignorecase: bool = False
  63. ) -> bool:
  64. """Simple gitdir pattern matching for includeIf conditions.
  65. This handles the basic gitdir patterns used in includeIf directives.
  66. """
  67. # Convert to strings for easier manipulation
  68. path_str = path.decode("utf-8", errors="replace")
  69. pattern_str = pattern.decode("utf-8", errors="replace")
  70. # Normalize paths to use forward slashes for consistent matching
  71. path_str = path_str.replace("\\", "/")
  72. pattern_str = pattern_str.replace("\\", "/")
  73. if ignorecase:
  74. path_str = path_str.lower()
  75. pattern_str = pattern_str.lower()
  76. # Handle the common cases for gitdir patterns
  77. if pattern_str.startswith("**/") and pattern_str.endswith("/**"):
  78. # Pattern like **/dirname/** should match any path containing dirname
  79. dirname = pattern_str[3:-3] # Remove **/ and /**
  80. # Check if path contains the directory name as a path component
  81. return ("/" + dirname + "/") in path_str or path_str.endswith("/" + dirname)
  82. elif pattern_str.startswith("**/"):
  83. # Pattern like **/filename
  84. suffix = pattern_str[3:] # Remove **/
  85. return suffix in path_str or path_str.endswith("/" + suffix)
  86. elif pattern_str.endswith("/**"):
  87. # Pattern like /path/to/dir/** should match /path/to/dir and any subdirectory
  88. base_pattern = pattern_str[:-3] # Remove /**
  89. return path_str == base_pattern or path_str.startswith(base_pattern + "/")
  90. elif "**" in pattern_str:
  91. # Handle patterns with ** in the middle
  92. parts = pattern_str.split("**")
  93. if len(parts) == 2:
  94. prefix, suffix = parts
  95. # Path must start with prefix and end with suffix (if any)
  96. if prefix and not path_str.startswith(prefix):
  97. return False
  98. if suffix and not path_str.endswith(suffix):
  99. return False
  100. return True
  101. # Direct match or simple glob pattern
  102. if "*" in pattern_str or "?" in pattern_str or "[" in pattern_str:
  103. import fnmatch
  104. return fnmatch.fnmatch(path_str, pattern_str)
  105. else:
  106. return path_str == pattern_str
  107. def match_glob_pattern(value: str, pattern: str) -> bool:
  108. r"""Match a value against a glob pattern.
  109. Supports simple glob patterns like ``*`` and ``**``.
  110. Raises:
  111. ValueError: If the pattern is invalid
  112. """
  113. # Convert glob pattern to regex
  114. pattern_escaped = re.escape(pattern)
  115. # Replace escaped \*\* with .* (match anything)
  116. pattern_escaped = pattern_escaped.replace(r"\*\*", ".*")
  117. # Replace escaped \* with [^/]* (match anything except /)
  118. pattern_escaped = pattern_escaped.replace(r"\*", "[^/]*")
  119. # Anchor the pattern
  120. pattern_regex = f"^{pattern_escaped}$"
  121. try:
  122. return bool(re.match(pattern_regex, value))
  123. except re.error as e:
  124. raise ValueError(f"Invalid glob pattern {pattern!r}: {e}")
  125. def lower_key(key: ConfigKey) -> ConfigKey:
  126. if isinstance(key, (bytes, str)):
  127. return key.lower()
  128. if isinstance(key, tuple):
  129. # For config sections, only lowercase the section name (first element)
  130. # but preserve the case of subsection names (remaining elements)
  131. if len(key) > 0:
  132. first = key[0]
  133. assert isinstance(first, (bytes, str))
  134. return (first.lower(), *key[1:])
  135. return key
  136. raise TypeError(key)
  137. K = TypeVar("K", bound=ConfigKey) # Key type must be ConfigKey
  138. V = TypeVar("V") # Value type
  139. _T = TypeVar("_T") # For get() default parameter
  140. class CaseInsensitiveOrderedMultiDict(MutableMapping[K, V], Generic[K, V]):
  141. def __init__(self, default_factory: Optional[Callable[[], V]] = None) -> None:
  142. self._real: list[tuple[K, V]] = []
  143. self._keyed: dict[Any, V] = {}
  144. self._default_factory = default_factory
  145. @classmethod
  146. def make(
  147. cls, dict_in=None, default_factory=None
  148. ) -> "CaseInsensitiveOrderedMultiDict[K, V]":
  149. if isinstance(dict_in, cls):
  150. return dict_in
  151. out = cls(default_factory=default_factory)
  152. if dict_in is None:
  153. return out
  154. if not isinstance(dict_in, MutableMapping):
  155. raise TypeError
  156. for key, value in dict_in.items():
  157. out[key] = value
  158. return out
  159. def __len__(self) -> int:
  160. return len(self._keyed)
  161. def keys(self) -> KeysView[K]:
  162. return self._keyed.keys() # type: ignore[return-value]
  163. def items(self) -> ItemsView[K, V]:
  164. # Return a view that iterates over the real list to preserve order
  165. class OrderedItemsView(ItemsView[K, V]):
  166. def __init__(self, mapping: CaseInsensitiveOrderedMultiDict[K, V]):
  167. self._mapping = mapping
  168. def __iter__(self) -> Iterator[tuple[K, V]]:
  169. return iter(self._mapping._real)
  170. def __len__(self) -> int:
  171. return len(self._mapping._real)
  172. def __contains__(self, item: object) -> bool:
  173. if not isinstance(item, tuple) or len(item) != 2:
  174. return False
  175. key, value = item
  176. return any(k == key and v == value for k, v in self._mapping._real)
  177. return OrderedItemsView(self)
  178. def __iter__(self) -> Iterator[K]:
  179. return iter(self._keyed)
  180. def values(self) -> ValuesView[V]:
  181. return self._keyed.values()
  182. def __setitem__(self, key, value) -> None:
  183. self._real.append((key, value))
  184. self._keyed[lower_key(key)] = value
  185. def set(self, key, value) -> None:
  186. # This method replaces all existing values for the key
  187. lower = lower_key(key)
  188. self._real = [(k, v) for k, v in self._real if lower_key(k) != lower]
  189. self._real.append((key, value))
  190. self._keyed[lower] = value
  191. def __delitem__(self, key) -> None:
  192. key = lower_key(key)
  193. del self._keyed[key]
  194. for i, (actual, unused_value) in reversed(list(enumerate(self._real))):
  195. if lower_key(actual) == key:
  196. del self._real[i]
  197. def __getitem__(self, item: K) -> V:
  198. return self._keyed[lower_key(item)]
  199. def get(self, key: K, /, default: Union[V, _T, None] = None) -> Union[V, _T, None]: # type: ignore[override]
  200. try:
  201. return self[key]
  202. except KeyError:
  203. if default is not None:
  204. return default
  205. elif self._default_factory is not None:
  206. return self._default_factory()
  207. else:
  208. return None
  209. def get_all(self, key: K) -> Iterator[V]:
  210. lowered_key = lower_key(key)
  211. for actual, value in self._real:
  212. if lower_key(actual) == lowered_key:
  213. yield value
  214. def setdefault(self, key: K, default: Optional[V] = None) -> V:
  215. try:
  216. return self[key]
  217. except KeyError:
  218. if default is not None:
  219. self[key] = default
  220. return default
  221. elif self._default_factory is not None:
  222. value = self._default_factory()
  223. self[key] = value
  224. return value
  225. else:
  226. raise
  227. Name = bytes
  228. NameLike = Union[bytes, str]
  229. Section = tuple[bytes, ...]
  230. SectionLike = Union[bytes, str, tuple[Union[bytes, str], ...]]
  231. Value = bytes
  232. ValueLike = Union[bytes, str]
  233. class Config:
  234. """A Git configuration."""
  235. def get(self, section: SectionLike, name: NameLike) -> Value:
  236. """Retrieve the contents of a configuration setting.
  237. Args:
  238. section: Tuple with section name and optional subsection name
  239. name: Variable name
  240. Returns:
  241. Contents of the setting
  242. Raises:
  243. KeyError: if the value is not set
  244. """
  245. raise NotImplementedError(self.get)
  246. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  247. """Retrieve the contents of a multivar configuration setting.
  248. Args:
  249. section: Tuple with section name and optional subsection namee
  250. name: Variable name
  251. Returns:
  252. Contents of the setting as iterable
  253. Raises:
  254. KeyError: if the value is not set
  255. """
  256. raise NotImplementedError(self.get_multivar)
  257. @overload
  258. def get_boolean(
  259. self, section: SectionLike, name: NameLike, default: bool
  260. ) -> bool: ...
  261. @overload
  262. def get_boolean(self, section: SectionLike, name: NameLike) -> Optional[bool]: ...
  263. def get_boolean(
  264. self, section: SectionLike, name: NameLike, default: Optional[bool] = None
  265. ) -> Optional[bool]:
  266. """Retrieve a configuration setting as boolean.
  267. Args:
  268. section: Tuple with section name and optional subsection name
  269. name: Name of the setting, including section and possible
  270. subsection.
  271. Returns:
  272. Contents of the setting
  273. """
  274. try:
  275. value = self.get(section, name)
  276. except KeyError:
  277. return default
  278. if value.lower() == b"true":
  279. return True
  280. elif value.lower() == b"false":
  281. return False
  282. raise ValueError(f"not a valid boolean string: {value!r}")
  283. def set(
  284. self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
  285. ) -> None:
  286. """Set a configuration value.
  287. Args:
  288. section: Tuple with section name and optional subsection namee
  289. name: Name of the configuration value, including section
  290. and optional subsection
  291. value: value of the setting
  292. """
  293. raise NotImplementedError(self.set)
  294. def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
  295. """Iterate over the configuration pairs for a specific section.
  296. Args:
  297. section: Tuple with section name and optional subsection namee
  298. Returns:
  299. Iterator over (name, value) pairs
  300. """
  301. raise NotImplementedError(self.items)
  302. def sections(self) -> Iterator[Section]:
  303. """Iterate over the sections.
  304. Returns: Iterator over section tuples
  305. """
  306. raise NotImplementedError(self.sections)
  307. def has_section(self, name: Section) -> bool:
  308. """Check if a specified section exists.
  309. Args:
  310. name: Name of section to check for
  311. Returns:
  312. boolean indicating whether the section exists
  313. """
  314. return name in self.sections()
  315. class ConfigDict(Config):
  316. """Git configuration stored in a dictionary."""
  317. def __init__(
  318. self,
  319. values: Union[
  320. MutableMapping[Section, MutableMapping[Name, Value]], None
  321. ] = None,
  322. encoding: Union[str, None] = None,
  323. ) -> None:
  324. """Create a new ConfigDict."""
  325. if encoding is None:
  326. encoding = sys.getdefaultencoding()
  327. self.encoding = encoding
  328. self._values: CaseInsensitiveOrderedMultiDict[
  329. Section, CaseInsensitiveOrderedMultiDict[Name, Value]
  330. ] = CaseInsensitiveOrderedMultiDict.make(
  331. values, default_factory=CaseInsensitiveOrderedMultiDict
  332. )
  333. def __repr__(self) -> str:
  334. return f"{self.__class__.__name__}({self._values!r})"
  335. def __eq__(self, other: object) -> bool:
  336. return isinstance(other, self.__class__) and other._values == self._values
  337. def __getitem__(self, key: Section) -> CaseInsensitiveOrderedMultiDict[Name, Value]:
  338. return self._values.__getitem__(key)
  339. def __setitem__(self, key: Section, value: MutableMapping[Name, Value]) -> None:
  340. return self._values.__setitem__(key, value)
  341. def __delitem__(self, key: Section) -> None:
  342. return self._values.__delitem__(key)
  343. def __iter__(self) -> Iterator[Section]:
  344. return self._values.__iter__()
  345. def __len__(self) -> int:
  346. return self._values.__len__()
  347. def keys(self) -> KeysView[Section]:
  348. return self._values.keys()
  349. @classmethod
  350. def _parse_setting(cls, name: str) -> tuple[str, Optional[str], str]:
  351. parts = name.split(".")
  352. if len(parts) == 3:
  353. return (parts[0], parts[1], parts[2])
  354. else:
  355. return (parts[0], None, parts[1])
  356. def _check_section_and_name(
  357. self, section: SectionLike, name: NameLike
  358. ) -> tuple[Section, Name]:
  359. if not isinstance(section, tuple):
  360. section = (section,)
  361. checked_section = tuple(
  362. [
  363. subsection.encode(self.encoding)
  364. if not isinstance(subsection, bytes)
  365. else subsection
  366. for subsection in section
  367. ]
  368. )
  369. if not isinstance(name, bytes):
  370. name = name.encode(self.encoding)
  371. return checked_section, name
  372. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  373. section, name = self._check_section_and_name(section, name)
  374. if len(section) > 1:
  375. try:
  376. return self._values[section].get_all(name)
  377. except KeyError:
  378. pass
  379. return self._values[(section[0],)].get_all(name)
  380. def get(
  381. self,
  382. section: SectionLike,
  383. name: NameLike,
  384. ) -> Value:
  385. section, name = self._check_section_and_name(section, name)
  386. if len(section) > 1:
  387. try:
  388. return self._values[section][name]
  389. except KeyError:
  390. pass
  391. return self._values[(section[0],)][name]
  392. def set(
  393. self,
  394. section: SectionLike,
  395. name: NameLike,
  396. value: Union[ValueLike, bool],
  397. ) -> None:
  398. section, name = self._check_section_and_name(section, name)
  399. if isinstance(value, bool):
  400. value = b"true" if value else b"false"
  401. if not isinstance(value, bytes):
  402. value = value.encode(self.encoding)
  403. section_dict = self._values.setdefault(section)
  404. if hasattr(section_dict, "set"):
  405. section_dict.set(name, value)
  406. else:
  407. section_dict[name] = value
  408. def add(
  409. self,
  410. section: SectionLike,
  411. name: NameLike,
  412. value: Union[ValueLike, bool],
  413. ) -> None:
  414. """Add a value to a configuration setting, creating a multivar if needed."""
  415. section, name = self._check_section_and_name(section, name)
  416. if isinstance(value, bool):
  417. value = b"true" if value else b"false"
  418. if not isinstance(value, bytes):
  419. value = value.encode(self.encoding)
  420. self._values.setdefault(section)[name] = value
  421. def items(self, section: SectionLike) -> Iterator[tuple[Name, Value]]:
  422. section_bytes, _ = self._check_section_and_name(section, b"")
  423. section_dict = self._values.get(section_bytes)
  424. if section_dict is not None:
  425. return iter(section_dict.items())
  426. return iter([])
  427. def sections(self) -> Iterator[Section]:
  428. return iter(self._values.keys())
  429. def _format_string(value: bytes) -> bytes:
  430. if (
  431. value.startswith((b" ", b"\t"))
  432. or value.endswith((b" ", b"\t"))
  433. or b"#" in value
  434. ):
  435. return b'"' + _escape_value(value) + b'"'
  436. else:
  437. return _escape_value(value)
  438. _ESCAPE_TABLE = {
  439. ord(b"\\"): ord(b"\\"),
  440. ord(b'"'): ord(b'"'),
  441. ord(b"n"): ord(b"\n"),
  442. ord(b"t"): ord(b"\t"),
  443. ord(b"b"): ord(b"\b"),
  444. }
  445. _COMMENT_CHARS = [ord(b"#"), ord(b";")]
  446. _WHITESPACE_CHARS = [ord(b"\t"), ord(b" ")]
  447. def _parse_string(value: bytes) -> bytes:
  448. value = bytearray(value.strip())
  449. ret = bytearray()
  450. whitespace = bytearray()
  451. in_quotes = False
  452. i = 0
  453. while i < len(value):
  454. c = value[i]
  455. if c == ord(b"\\"):
  456. i += 1
  457. if i >= len(value):
  458. # Backslash at end of string - treat as literal backslash
  459. if whitespace:
  460. ret.extend(whitespace)
  461. whitespace = bytearray()
  462. ret.append(ord(b"\\"))
  463. else:
  464. try:
  465. v = _ESCAPE_TABLE[value[i]]
  466. if whitespace:
  467. ret.extend(whitespace)
  468. whitespace = bytearray()
  469. ret.append(v)
  470. except KeyError:
  471. # Unknown escape sequence - treat backslash as literal and process next char normally
  472. if whitespace:
  473. ret.extend(whitespace)
  474. whitespace = bytearray()
  475. ret.append(ord(b"\\"))
  476. i -= 1 # Reprocess the character after the backslash
  477. elif c == ord(b'"'):
  478. in_quotes = not in_quotes
  479. elif c in _COMMENT_CHARS and not in_quotes:
  480. # the rest of the line is a comment
  481. break
  482. elif c in _WHITESPACE_CHARS:
  483. whitespace.append(c)
  484. else:
  485. if whitespace:
  486. ret.extend(whitespace)
  487. whitespace = bytearray()
  488. ret.append(c)
  489. i += 1
  490. if in_quotes:
  491. raise ValueError("missing end quote")
  492. return bytes(ret)
  493. def _escape_value(value: bytes) -> bytes:
  494. """Escape a value."""
  495. value = value.replace(b"\\", b"\\\\")
  496. value = value.replace(b"\r", b"\\r")
  497. value = value.replace(b"\n", b"\\n")
  498. value = value.replace(b"\t", b"\\t")
  499. value = value.replace(b'"', b'\\"')
  500. return value
  501. def _check_variable_name(name: bytes) -> bool:
  502. for i in range(len(name)):
  503. c = name[i : i + 1]
  504. if not c.isalnum() and c != b"-":
  505. return False
  506. return True
  507. def _check_section_name(name: bytes) -> bool:
  508. for i in range(len(name)):
  509. c = name[i : i + 1]
  510. if not c.isalnum() and c not in (b"-", b"."):
  511. return False
  512. return True
  513. def _strip_comments(line: bytes) -> bytes:
  514. comment_bytes = {ord(b"#"), ord(b";")}
  515. quote = ord(b'"')
  516. string_open = False
  517. # Normalize line to bytearray for simple 2/3 compatibility
  518. for i, character in enumerate(bytearray(line)):
  519. # Comment characters outside balanced quotes denote comment start
  520. if character == quote:
  521. string_open = not string_open
  522. elif not string_open and character in comment_bytes:
  523. return line[:i]
  524. return line
  525. def _is_line_continuation(value: bytes) -> bool:
  526. """Check if a value ends with a line continuation backslash.
  527. A line continuation occurs when a line ends with a backslash that is:
  528. 1. Not escaped (not preceded by another backslash)
  529. 2. Not within quotes
  530. Args:
  531. value: The value to check
  532. Returns:
  533. True if the value ends with a line continuation backslash
  534. """
  535. if not value.endswith((b"\\\n", b"\\\r\n")):
  536. return False
  537. # Remove only the newline characters, keep the content including the backslash
  538. if value.endswith(b"\\\r\n"):
  539. content = value[:-2] # Remove \r\n, keep the \
  540. else:
  541. content = value[:-1] # Remove \n, keep the \
  542. if not content.endswith(b"\\"):
  543. return False
  544. # Count consecutive backslashes at the end
  545. backslash_count = 0
  546. for i in range(len(content) - 1, -1, -1):
  547. if content[i : i + 1] == b"\\":
  548. backslash_count += 1
  549. else:
  550. break
  551. # If we have an odd number of backslashes, the last one is a line continuation
  552. # If we have an even number, they are all escaped and there's no continuation
  553. return backslash_count % 2 == 1
  554. def _parse_section_header_line(line: bytes) -> tuple[Section, bytes]:
  555. # Parse section header ("[bla]")
  556. line = _strip_comments(line).rstrip()
  557. in_quotes = False
  558. escaped = False
  559. for i, c in enumerate(line):
  560. if escaped:
  561. escaped = False
  562. continue
  563. if c == ord(b'"'):
  564. in_quotes = not in_quotes
  565. if c == ord(b"\\"):
  566. escaped = True
  567. if c == ord(b"]") and not in_quotes:
  568. last = i
  569. break
  570. else:
  571. raise ValueError("expected trailing ]")
  572. pts = line[1:last].split(b" ", 1)
  573. line = line[last + 1 :]
  574. section: Section
  575. if len(pts) == 2:
  576. # Handle subsections - Git allows more complex syntax for certain sections like includeIf
  577. if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
  578. # Standard quoted subsection
  579. pts[1] = pts[1][1:-1]
  580. elif pts[0] == b"includeIf":
  581. # Special handling for includeIf sections which can have complex conditions
  582. # Git allows these without strict quote validation
  583. pts[1] = pts[1].strip()
  584. if pts[1][:1] == b'"' and pts[1][-1:] == b'"':
  585. pts[1] = pts[1][1:-1]
  586. else:
  587. # Other sections must have quoted subsections
  588. raise ValueError(f"Invalid subsection {pts[1]!r}")
  589. if not _check_section_name(pts[0]):
  590. raise ValueError(f"invalid section name {pts[0]!r}")
  591. section = (pts[0], pts[1])
  592. else:
  593. if not _check_section_name(pts[0]):
  594. raise ValueError(f"invalid section name {pts[0]!r}")
  595. pts = pts[0].split(b".", 1)
  596. if len(pts) == 2:
  597. section = (pts[0], pts[1])
  598. else:
  599. section = (pts[0],)
  600. return section, line
  601. class ConfigFile(ConfigDict):
  602. """A Git configuration file, like .git/config or ~/.gitconfig."""
  603. def __init__(
  604. self,
  605. values: Union[
  606. MutableMapping[Section, MutableMapping[Name, Value]], None
  607. ] = None,
  608. encoding: Union[str, None] = None,
  609. ) -> None:
  610. super().__init__(values=values, encoding=encoding)
  611. self.path: Optional[str] = None
  612. self._included_paths: set[str] = set() # Track included files to prevent cycles
  613. @classmethod
  614. def from_file(
  615. cls,
  616. f: BinaryIO,
  617. *,
  618. config_dir: Optional[str] = None,
  619. included_paths: Optional[set[str]] = None,
  620. include_depth: int = 0,
  621. max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
  622. file_opener: Optional[FileOpener] = None,
  623. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  624. ) -> "ConfigFile":
  625. """Read configuration from a file-like object.
  626. Args:
  627. f: File-like object to read from
  628. config_dir: Directory containing the config file (for relative includes)
  629. included_paths: Set of already included paths (to prevent cycles)
  630. include_depth: Current include depth (to prevent infinite recursion)
  631. max_include_depth: Maximum allowed include depth
  632. file_opener: Optional callback to open included files
  633. condition_matchers: Optional dict of condition matchers for includeIf
  634. """
  635. if include_depth > max_include_depth:
  636. # Prevent excessive recursion
  637. raise ValueError(f"Maximum include depth ({max_include_depth}) exceeded")
  638. ret = cls()
  639. if included_paths is not None:
  640. ret._included_paths = included_paths.copy()
  641. section: Optional[Section] = None
  642. setting = None
  643. continuation = None
  644. for lineno, line in enumerate(f.readlines()):
  645. if lineno == 0 and line.startswith(b"\xef\xbb\xbf"):
  646. line = line[3:]
  647. line = line.lstrip()
  648. if setting is None:
  649. if len(line) > 0 and line[:1] == b"[":
  650. section, line = _parse_section_header_line(line)
  651. ret._values.setdefault(section)
  652. if _strip_comments(line).strip() == b"":
  653. continue
  654. if section is None:
  655. raise ValueError(f"setting {line!r} without section")
  656. try:
  657. setting, value = line.split(b"=", 1)
  658. except ValueError:
  659. setting = line
  660. value = b"true"
  661. setting = setting.strip()
  662. if not _check_variable_name(setting):
  663. raise ValueError(f"invalid variable name {setting!r}")
  664. if _is_line_continuation(value):
  665. if value.endswith(b"\\\r\n"):
  666. continuation = value[:-3]
  667. else:
  668. continuation = value[:-2]
  669. else:
  670. continuation = None
  671. value = _parse_string(value)
  672. ret._values[section][setting] = value
  673. # Process include/includeIf directives
  674. ret._handle_include_directive(
  675. section,
  676. setting,
  677. value,
  678. config_dir=config_dir,
  679. include_depth=include_depth,
  680. max_include_depth=max_include_depth,
  681. file_opener=file_opener,
  682. condition_matchers=condition_matchers,
  683. )
  684. setting = None
  685. else: # continuation line
  686. assert continuation is not None
  687. if _is_line_continuation(line):
  688. if line.endswith(b"\\\r\n"):
  689. continuation += line[:-3]
  690. else:
  691. continuation += line[:-2]
  692. else:
  693. continuation += line
  694. value = _parse_string(continuation)
  695. assert section is not None # Already checked above
  696. ret._values[section][setting] = value
  697. # Process include/includeIf directives
  698. ret._handle_include_directive(
  699. section,
  700. setting,
  701. value,
  702. config_dir=config_dir,
  703. include_depth=include_depth,
  704. max_include_depth=max_include_depth,
  705. file_opener=file_opener,
  706. condition_matchers=condition_matchers,
  707. )
  708. continuation = None
  709. setting = None
  710. return ret
  711. def _handle_include_directive(
  712. self,
  713. section: Optional[Section],
  714. setting: bytes,
  715. value: bytes,
  716. *,
  717. config_dir: Optional[str],
  718. include_depth: int,
  719. max_include_depth: int,
  720. file_opener: Optional[FileOpener],
  721. condition_matchers: Optional[dict[str, ConditionMatcher]],
  722. ) -> None:
  723. """Handle include/includeIf directives during config parsing."""
  724. if (
  725. section is not None
  726. and setting == b"path"
  727. and (
  728. section[0].lower() == b"include"
  729. or (len(section) > 1 and section[0].lower() == b"includeif")
  730. )
  731. ):
  732. self._process_include(
  733. section,
  734. value,
  735. config_dir=config_dir,
  736. include_depth=include_depth,
  737. max_include_depth=max_include_depth,
  738. file_opener=file_opener,
  739. condition_matchers=condition_matchers,
  740. )
  741. def _process_include(
  742. self,
  743. section: Section,
  744. path_value: bytes,
  745. *,
  746. config_dir: Optional[str],
  747. include_depth: int,
  748. max_include_depth: int,
  749. file_opener: Optional[FileOpener],
  750. condition_matchers: Optional[dict[str, ConditionMatcher]],
  751. ) -> None:
  752. """Process an include or includeIf directive."""
  753. path_str = path_value.decode(self.encoding, errors="replace")
  754. # Handle includeIf conditions
  755. if len(section) > 1 and section[0].lower() == b"includeif":
  756. condition = section[1].decode(self.encoding, errors="replace")
  757. if not self._evaluate_includeif_condition(
  758. condition, config_dir, condition_matchers
  759. ):
  760. return
  761. # Resolve the include path
  762. include_path = self._resolve_include_path(path_str, config_dir)
  763. if not include_path:
  764. return
  765. # Check for circular includes
  766. try:
  767. abs_path = str(Path(include_path).resolve())
  768. except (OSError, ValueError) as e:
  769. # Invalid path - log and skip
  770. logger.debug("Invalid include path %r: %s", include_path, e)
  771. return
  772. if abs_path in self._included_paths:
  773. return
  774. # Load and merge the included file
  775. try:
  776. # Use provided file opener or default to GitFile
  777. if file_opener is None:
  778. def opener(path):
  779. return GitFile(path, "rb")
  780. else:
  781. opener = file_opener
  782. f = opener(include_path)
  783. except (OSError, ValueError) as e:
  784. # Git silently ignores missing or unreadable include files
  785. # Log for debugging purposes
  786. logger.debug("Invalid include path %r: %s", include_path, e)
  787. else:
  788. with f as included_file:
  789. # Track this path to prevent cycles
  790. self._included_paths.add(abs_path)
  791. # Parse the included file
  792. included_config = ConfigFile.from_file(
  793. included_file,
  794. config_dir=os.path.dirname(include_path),
  795. included_paths=self._included_paths,
  796. include_depth=include_depth + 1,
  797. max_include_depth=max_include_depth,
  798. file_opener=file_opener,
  799. condition_matchers=condition_matchers,
  800. )
  801. # Merge the included configuration
  802. self._merge_config(included_config)
  803. def _merge_config(self, other: "ConfigFile") -> None:
  804. """Merge another config file into this one."""
  805. for section, values in other._values.items():
  806. if section not in self._values:
  807. self._values[section] = CaseInsensitiveOrderedMultiDict()
  808. for key, value in values.items():
  809. self._values[section][key] = value
  810. def _resolve_include_path(
  811. self, path: str, config_dir: Optional[str]
  812. ) -> Optional[str]:
  813. """Resolve an include path to an absolute path."""
  814. # Expand ~ to home directory
  815. path = os.path.expanduser(path)
  816. # If path is relative and we have a config directory, make it relative to that
  817. if not os.path.isabs(path) and config_dir:
  818. path = os.path.join(config_dir, path)
  819. return path
  820. def _evaluate_includeif_condition(
  821. self,
  822. condition: str,
  823. config_dir: Optional[str] = None,
  824. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  825. ) -> bool:
  826. """Evaluate an includeIf condition."""
  827. # Try custom matchers first if provided
  828. if condition_matchers:
  829. for prefix, matcher in condition_matchers.items():
  830. if condition.startswith(prefix):
  831. return matcher(condition[len(prefix) :])
  832. # Fall back to built-in matchers
  833. if condition.startswith("hasconfig:"):
  834. return self._evaluate_hasconfig_condition(condition[10:])
  835. else:
  836. # Unknown condition type - log and ignore (Git behavior)
  837. logger.debug("Unknown includeIf condition: %r", condition)
  838. return False
  839. def _evaluate_hasconfig_condition(self, condition: str) -> bool:
  840. """Evaluate a hasconfig condition.
  841. Format: hasconfig:config.key:pattern
  842. Example: hasconfig:remote.*.url:ssh://org-*@github.com/**
  843. """
  844. # Split on the first colon to separate config key from pattern
  845. parts = condition.split(":", 1)
  846. if len(parts) != 2:
  847. logger.debug("Invalid hasconfig condition format: %r", condition)
  848. return False
  849. config_key, pattern = parts
  850. # Parse the config key to get section and name
  851. key_parts = config_key.split(".", 2)
  852. if len(key_parts) < 2:
  853. logger.debug("Invalid hasconfig config key: %r", config_key)
  854. return False
  855. # Handle wildcards in section names (e.g., remote.*)
  856. if len(key_parts) == 3 and key_parts[1] == "*":
  857. # Match any subsection
  858. section_prefix = key_parts[0].encode(self.encoding)
  859. name = key_parts[2].encode(self.encoding)
  860. # Check all sections that match the pattern
  861. for section in self.sections():
  862. if len(section) == 2 and section[0] == section_prefix:
  863. try:
  864. values = list(self.get_multivar(section, name))
  865. for value in values:
  866. if self._match_hasconfig_pattern(value, pattern):
  867. return True
  868. except KeyError:
  869. continue
  870. else:
  871. # Direct section lookup
  872. if len(key_parts) == 2:
  873. section = (key_parts[0].encode(self.encoding),)
  874. name = key_parts[1].encode(self.encoding)
  875. else:
  876. section = (
  877. key_parts[0].encode(self.encoding),
  878. key_parts[1].encode(self.encoding),
  879. )
  880. name = key_parts[2].encode(self.encoding)
  881. try:
  882. values = list(self.get_multivar(section, name))
  883. for value in values:
  884. if self._match_hasconfig_pattern(value, pattern):
  885. return True
  886. except KeyError:
  887. pass
  888. return False
  889. def _match_hasconfig_pattern(self, value: bytes, pattern: str) -> bool:
  890. """Match a config value against a hasconfig pattern.
  891. Supports simple glob patterns like ``*`` and ``**``.
  892. """
  893. value_str = value.decode(self.encoding, errors="replace")
  894. return match_glob_pattern(value_str, pattern)
  895. @classmethod
  896. def from_path(
  897. cls,
  898. path: Union[str, os.PathLike],
  899. *,
  900. max_include_depth: int = DEFAULT_MAX_INCLUDE_DEPTH,
  901. file_opener: Optional[FileOpener] = None,
  902. condition_matchers: Optional[dict[str, ConditionMatcher]] = None,
  903. ) -> "ConfigFile":
  904. """Read configuration from a file on disk.
  905. Args:
  906. path: Path to the configuration file
  907. max_include_depth: Maximum allowed include depth
  908. file_opener: Optional callback to open included files
  909. condition_matchers: Optional dict of condition matchers for includeIf
  910. """
  911. abs_path = os.fspath(path)
  912. config_dir = os.path.dirname(abs_path)
  913. # Use provided file opener or default to GitFile
  914. if file_opener is None:
  915. def opener(p):
  916. return GitFile(p, "rb")
  917. else:
  918. opener = file_opener
  919. with opener(abs_path) as f:
  920. ret = cls.from_file(
  921. f,
  922. config_dir=config_dir,
  923. max_include_depth=max_include_depth,
  924. file_opener=file_opener,
  925. condition_matchers=condition_matchers,
  926. )
  927. ret.path = abs_path
  928. return ret
  929. def write_to_path(self, path: Optional[Union[str, os.PathLike]] = None) -> None:
  930. """Write configuration to a file on disk."""
  931. if path is None:
  932. if self.path is None:
  933. raise ValueError("No path specified and no default path available")
  934. path_to_use: Union[str, os.PathLike] = self.path
  935. else:
  936. path_to_use = path
  937. with GitFile(path_to_use, "wb") as f:
  938. self.write_to_file(f)
  939. def write_to_file(self, f: BinaryIO) -> None:
  940. """Write configuration to a file-like object."""
  941. for section, values in self._values.items():
  942. try:
  943. section_name, subsection_name = section
  944. except ValueError:
  945. (section_name,) = section
  946. subsection_name = None
  947. if subsection_name is None:
  948. f.write(b"[" + section_name + b"]\n")
  949. else:
  950. f.write(b"[" + section_name + b' "' + subsection_name + b'"]\n')
  951. for key, value in values.items():
  952. value = _format_string(value)
  953. f.write(b"\t" + key + b" = " + value + b"\n")
  954. def get_xdg_config_home_path(*path_segments: str) -> str:
  955. xdg_config_home = os.environ.get(
  956. "XDG_CONFIG_HOME",
  957. os.path.expanduser("~/.config/"),
  958. )
  959. return os.path.join(xdg_config_home, *path_segments)
  960. def _find_git_in_win_path() -> Iterator[str]:
  961. for exe in ("git.exe", "git.cmd"):
  962. for path in os.environ.get("PATH", "").split(";"):
  963. if os.path.exists(os.path.join(path, exe)):
  964. # in windows native shells (powershell/cmd) exe path is
  965. # .../Git/bin/git.exe or .../Git/cmd/git.exe
  966. #
  967. # in git-bash exe path is .../Git/mingw64/bin/git.exe
  968. git_dir, _bin_dir = os.path.split(path)
  969. yield git_dir
  970. parent_dir, basename = os.path.split(git_dir)
  971. if basename == "mingw32" or basename == "mingw64":
  972. yield parent_dir
  973. break
  974. def _find_git_in_win_reg() -> Iterator[str]:
  975. import platform
  976. import winreg
  977. if platform.machine() == "AMD64":
  978. subkey = (
  979. "SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\"
  980. "CurrentVersion\\Uninstall\\Git_is1"
  981. )
  982. else:
  983. subkey = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\Git_is1"
  984. for key in (winreg.HKEY_CURRENT_USER, winreg.HKEY_LOCAL_MACHINE): # type: ignore
  985. with suppress(OSError):
  986. with winreg.OpenKey(key, subkey) as k: # type: ignore
  987. val, typ = winreg.QueryValueEx(k, "InstallLocation") # type: ignore
  988. if typ == winreg.REG_SZ: # type: ignore
  989. yield val
  990. # There is no set standard for system config dirs on windows. We try the
  991. # following:
  992. # - %PROGRAMDATA%/Git/config - (deprecated) Windows config dir per CGit docs
  993. # - %PROGRAMFILES%/Git/etc/gitconfig - Git for Windows (msysgit) config dir
  994. # Used if CGit installation (Git/bin/git.exe) is found in PATH in the
  995. # system registry
  996. def get_win_system_paths() -> Iterator[str]:
  997. if "PROGRAMDATA" in os.environ:
  998. yield os.path.join(os.environ["PROGRAMDATA"], "Git", "config")
  999. for git_dir in _find_git_in_win_path():
  1000. yield os.path.join(git_dir, "etc", "gitconfig")
  1001. for git_dir in _find_git_in_win_reg():
  1002. yield os.path.join(git_dir, "etc", "gitconfig")
  1003. class StackedConfig(Config):
  1004. """Configuration which reads from multiple config files.."""
  1005. def __init__(
  1006. self, backends: list[ConfigFile], writable: Optional[ConfigFile] = None
  1007. ) -> None:
  1008. self.backends = backends
  1009. self.writable = writable
  1010. def __repr__(self) -> str:
  1011. return f"<{self.__class__.__name__} for {self.backends!r}>"
  1012. @classmethod
  1013. def default(cls) -> "StackedConfig":
  1014. return cls(cls.default_backends())
  1015. @classmethod
  1016. def default_backends(cls) -> list[ConfigFile]:
  1017. """Retrieve the default configuration.
  1018. See git-config(1) for details on the files searched.
  1019. """
  1020. paths = []
  1021. paths.append(os.path.expanduser("~/.gitconfig"))
  1022. paths.append(get_xdg_config_home_path("git", "config"))
  1023. if "GIT_CONFIG_NOSYSTEM" not in os.environ:
  1024. paths.append("/etc/gitconfig")
  1025. if sys.platform == "win32":
  1026. paths.extend(get_win_system_paths())
  1027. backends = []
  1028. for path in paths:
  1029. try:
  1030. cf = ConfigFile.from_path(path)
  1031. except FileNotFoundError:
  1032. continue
  1033. backends.append(cf)
  1034. return backends
  1035. def get(self, section: SectionLike, name: NameLike) -> Value:
  1036. if not isinstance(section, tuple):
  1037. section = (section,)
  1038. for backend in self.backends:
  1039. try:
  1040. return backend.get(section, name)
  1041. except KeyError:
  1042. pass
  1043. raise KeyError(name)
  1044. def get_multivar(self, section: SectionLike, name: NameLike) -> Iterator[Value]:
  1045. if not isinstance(section, tuple):
  1046. section = (section,)
  1047. for backend in self.backends:
  1048. try:
  1049. yield from backend.get_multivar(section, name)
  1050. except KeyError:
  1051. pass
  1052. def set(
  1053. self, section: SectionLike, name: NameLike, value: Union[ValueLike, bool]
  1054. ) -> None:
  1055. if self.writable is None:
  1056. raise NotImplementedError(self.set)
  1057. return self.writable.set(section, name, value)
  1058. def sections(self) -> Iterator[Section]:
  1059. seen = set()
  1060. for backend in self.backends:
  1061. for section in backend.sections():
  1062. if section not in seen:
  1063. seen.add(section)
  1064. yield section
  1065. def read_submodules(
  1066. path: Union[str, os.PathLike],
  1067. ) -> Iterator[tuple[bytes, bytes, bytes]]:
  1068. """Read a .gitmodules file."""
  1069. cfg = ConfigFile.from_path(path)
  1070. return parse_submodules(cfg)
  1071. def parse_submodules(config: ConfigFile) -> Iterator[tuple[bytes, bytes, bytes]]:
  1072. """Parse a gitmodules GitConfig file, returning submodules.
  1073. Args:
  1074. config: A `ConfigFile`
  1075. Returns:
  1076. list of tuples (submodule path, url, name),
  1077. where name is quoted part of the section's name.
  1078. """
  1079. for section in config.sections():
  1080. section_kind, section_name = section
  1081. if section_kind == b"submodule":
  1082. try:
  1083. sm_path = config.get(section, b"path")
  1084. sm_url = config.get(section, b"url")
  1085. yield (sm_path, sm_url, section_name)
  1086. except KeyError:
  1087. # If either path or url is missing, just ignore this
  1088. # submodule entry and move on to the next one. This is
  1089. # how git itself handles malformed .gitmodule entries.
  1090. pass
  1091. def iter_instead_of(config: Config, push: bool = False) -> Iterable[tuple[str, str]]:
  1092. """Iterate over insteadOf / pushInsteadOf values."""
  1093. for section in config.sections():
  1094. if section[0] != b"url":
  1095. continue
  1096. replacement = section[1]
  1097. try:
  1098. needles = list(config.get_multivar(section, "insteadOf"))
  1099. except KeyError:
  1100. needles = []
  1101. if push:
  1102. try:
  1103. needles += list(config.get_multivar(section, "pushInsteadOf"))
  1104. except KeyError:
  1105. pass
  1106. for needle in needles:
  1107. assert isinstance(needle, bytes)
  1108. yield needle.decode("utf-8"), replacement.decode("utf-8")
  1109. def apply_instead_of(config: Config, orig_url: str, push: bool = False) -> str:
  1110. """Apply insteadOf / pushInsteadOf to a URL."""
  1111. longest_needle = ""
  1112. updated_url = orig_url
  1113. for needle, replacement in iter_instead_of(config, push):
  1114. if not orig_url.startswith(needle):
  1115. continue
  1116. if len(longest_needle) < len(needle):
  1117. longest_needle = needle
  1118. updated_url = replacement + orig_url[len(needle) :]
  1119. return updated_url