refs.py 53 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726
  1. # refs.py -- For dealing with git refs
  2. # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Ref handling."""
  22. import os
  23. import types
  24. import warnings
  25. from collections.abc import Iterator
  26. from contextlib import suppress
  27. from typing import TYPE_CHECKING, Any, Optional, Union
  28. if TYPE_CHECKING:
  29. from .file import _GitFile
  30. from .errors import PackedRefsException, RefFormatError
  31. from .file import GitFile, ensure_dir_exists
  32. from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
  33. from .pack import ObjectContainer
  34. Ref = bytes
  35. HEADREF = b"HEAD"
  36. SYMREF = b"ref: "
  37. LOCAL_BRANCH_PREFIX = b"refs/heads/"
  38. LOCAL_TAG_PREFIX = b"refs/tags/"
  39. LOCAL_REMOTE_PREFIX = b"refs/remotes/"
  40. LOCAL_NOTES_PREFIX = b"refs/notes/"
  41. BAD_REF_CHARS = set(b"\177 ~^:?*[")
  42. PEELED_TAG_SUFFIX = b"^{}"
  43. # For backwards compatibility
  44. ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
  45. class SymrefLoop(Exception):
  46. """There is a loop between one or more symrefs."""
  47. def __init__(self, ref, depth) -> None:
  48. """Initialize a SymrefLoop exception.
  49. Args:
  50. ref: The ref that caused the loop
  51. depth: Depth at which the loop was detected
  52. """
  53. self.ref = ref
  54. self.depth = depth
  55. def parse_symref_value(contents: bytes) -> bytes:
  56. """Parse a symref value.
  57. Args:
  58. contents: Contents to parse
  59. Returns: Destination
  60. """
  61. if contents.startswith(SYMREF):
  62. return contents[len(SYMREF) :].rstrip(b"\r\n")
  63. raise ValueError(contents)
  64. def check_ref_format(refname: Ref) -> bool:
  65. """Check if a refname is correctly formatted.
  66. Implements all the same rules as git-check-ref-format[1].
  67. [1]
  68. http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
  69. Args:
  70. refname: The refname to check
  71. Returns: True if refname is valid, False otherwise
  72. """
  73. # These could be combined into one big expression, but are listed
  74. # separately to parallel [1].
  75. if b"/." in refname or refname.startswith(b"."):
  76. return False
  77. if b"/" not in refname:
  78. return False
  79. if b".." in refname:
  80. return False
  81. for i, c in enumerate(refname):
  82. if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
  83. return False
  84. if refname[-1] in b"/.":
  85. return False
  86. if refname.endswith(b".lock"):
  87. return False
  88. if b"@{" in refname:
  89. return False
  90. if b"\\" in refname:
  91. return False
  92. return True
  93. def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
  94. """Parse a remote ref into remote name and branch name.
  95. Args:
  96. ref: Remote ref like b"refs/remotes/origin/main"
  97. Returns:
  98. Tuple of (remote_name, branch_name)
  99. Raises:
  100. ValueError: If ref is not a valid remote ref
  101. """
  102. if not ref.startswith(LOCAL_REMOTE_PREFIX):
  103. raise ValueError(f"Not a remote ref: {ref!r}")
  104. # Remove the prefix
  105. remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
  106. # Split into remote name and branch name
  107. parts = remainder.split(b"/", 1)
  108. if len(parts) != 2:
  109. raise ValueError(f"Invalid remote ref format: {ref!r}")
  110. remote_name, branch_name = parts
  111. return (remote_name, branch_name)
  112. class RefsContainer:
  113. """A container for refs."""
  114. def __init__(self, logger=None) -> None:
  115. """Initialize a RefsContainer.
  116. Args:
  117. logger: Optional logger for reflog updates
  118. """
  119. self._logger = logger
  120. def _log(
  121. self,
  122. ref,
  123. old_sha,
  124. new_sha,
  125. committer=None,
  126. timestamp=None,
  127. timezone=None,
  128. message=None,
  129. ) -> None:
  130. if self._logger is None:
  131. return
  132. if message is None:
  133. return
  134. self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
  135. def set_symbolic_ref(
  136. self,
  137. name,
  138. other,
  139. committer=None,
  140. timestamp=None,
  141. timezone=None,
  142. message=None,
  143. ) -> None:
  144. """Make a ref point at another ref.
  145. Args:
  146. name: Name of the ref to set
  147. other: Name of the ref to point at
  148. committer: Optional committer name/email
  149. timestamp: Optional timestamp
  150. timezone: Optional timezone
  151. message: Optional message
  152. """
  153. raise NotImplementedError(self.set_symbolic_ref)
  154. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  155. """Get contents of the packed-refs file.
  156. Returns: Dictionary mapping ref names to SHA1s
  157. Note: Will return an empty dictionary when no packed-refs file is
  158. present.
  159. """
  160. raise NotImplementedError(self.get_packed_refs)
  161. def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
  162. """Add the given refs as packed refs.
  163. Args:
  164. new_refs: A mapping of ref names to targets; if a target is None that
  165. means remove the ref
  166. """
  167. raise NotImplementedError(self.add_packed_refs)
  168. def get_peeled(self, name) -> Optional[ObjectID]:
  169. """Return the cached peeled value of a ref, if available.
  170. Args:
  171. name: Name of the ref to peel
  172. Returns: The peeled value of the ref. If the ref is known not point to
  173. a tag, this will be the SHA the ref refers to. If the ref may point
  174. to a tag, but no cached information is available, None is returned.
  175. """
  176. return None
  177. def import_refs(
  178. self,
  179. base: Ref,
  180. other: dict[Ref, ObjectID],
  181. committer: Optional[bytes] = None,
  182. timestamp: Optional[bytes] = None,
  183. timezone: Optional[bytes] = None,
  184. message: Optional[bytes] = None,
  185. prune: bool = False,
  186. ) -> None:
  187. """Import refs from another repository.
  188. Args:
  189. base: Base ref to import into (e.g., b'refs/remotes/origin')
  190. other: Dictionary of refs to import
  191. committer: Optional committer for reflog
  192. timestamp: Optional timestamp for reflog
  193. timezone: Optional timezone for reflog
  194. message: Optional message for reflog
  195. prune: If True, remove refs not in other
  196. """
  197. if prune:
  198. to_delete = set(self.subkeys(base))
  199. else:
  200. to_delete = set()
  201. for name, value in other.items():
  202. if value is None:
  203. to_delete.add(name)
  204. else:
  205. self.set_if_equals(
  206. b"/".join((base, name)), None, value, message=message
  207. )
  208. if to_delete:
  209. try:
  210. to_delete.remove(name)
  211. except KeyError:
  212. pass
  213. for ref in to_delete:
  214. self.remove_if_equals(b"/".join((base, ref)), None, message=message)
  215. def allkeys(self) -> Iterator[Ref]:
  216. """All refs present in this container."""
  217. raise NotImplementedError(self.allkeys)
  218. def __iter__(self):
  219. """Iterate over all ref names."""
  220. return iter(self.allkeys())
  221. def keys(self, base=None):
  222. """Refs present in this container.
  223. Args:
  224. base: An optional base to return refs under.
  225. Returns: An unsorted set of valid refs in this container, including
  226. packed refs.
  227. """
  228. if base is not None:
  229. return self.subkeys(base)
  230. else:
  231. return self.allkeys()
  232. def subkeys(self, base):
  233. """Refs present in this container under a base.
  234. Args:
  235. base: The base to return refs under.
  236. Returns: A set of valid refs in this container under the base; the base
  237. prefix is stripped from the ref names returned.
  238. """
  239. keys = set()
  240. base_len = len(base) + 1
  241. for refname in self.allkeys():
  242. if refname.startswith(base):
  243. keys.add(refname[base_len:])
  244. return keys
  245. def as_dict(self, base=None) -> dict[Ref, ObjectID]:
  246. """Return the contents of this container as a dictionary."""
  247. ret = {}
  248. keys = self.keys(base)
  249. if base is None:
  250. base = b""
  251. else:
  252. base = base.rstrip(b"/")
  253. for key in keys:
  254. try:
  255. ret[key] = self[(base + b"/" + key).strip(b"/")]
  256. except (SymrefLoop, KeyError):
  257. continue # Unable to resolve
  258. return ret
  259. def _check_refname(self, name) -> None:
  260. """Ensure a refname is valid and lives in refs or is HEAD.
  261. HEAD is not a valid refname according to git-check-ref-format, but this
  262. class needs to be able to touch HEAD. Also, check_ref_format expects
  263. refnames without the leading 'refs/', but this class requires that
  264. so it cannot touch anything outside the refs dir (or HEAD).
  265. Args:
  266. name: The name of the reference.
  267. Raises:
  268. KeyError: if a refname is not HEAD or is otherwise not valid.
  269. """
  270. if name in (HEADREF, b"refs/stash"):
  271. return
  272. if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
  273. raise RefFormatError(name)
  274. def read_ref(self, refname):
  275. """Read a reference without following any references.
  276. Args:
  277. refname: The name of the reference
  278. Returns: The contents of the ref file, or None if it does
  279. not exist.
  280. """
  281. contents = self.read_loose_ref(refname)
  282. if not contents:
  283. contents = self.get_packed_refs().get(refname, None)
  284. return contents
  285. def read_loose_ref(self, name) -> bytes:
  286. """Read a loose reference and return its contents.
  287. Args:
  288. name: the refname to read
  289. Returns: The contents of the ref file, or None if it does
  290. not exist.
  291. """
  292. raise NotImplementedError(self.read_loose_ref)
  293. def follow(self, name) -> tuple[list[bytes], bytes]:
  294. """Follow a reference name.
  295. Returns: a tuple of (refnames, sha), wheres refnames are the names of
  296. references in the chain
  297. """
  298. contents = SYMREF + name
  299. depth = 0
  300. refnames = []
  301. while contents.startswith(SYMREF):
  302. refname = contents[len(SYMREF) :]
  303. refnames.append(refname)
  304. contents = self.read_ref(refname)
  305. if not contents:
  306. break
  307. depth += 1
  308. if depth > 5:
  309. raise SymrefLoop(name, depth)
  310. return refnames, contents
  311. def __contains__(self, refname) -> bool:
  312. """Check if a ref exists.
  313. Args:
  314. refname: Name of the ref to check
  315. Returns:
  316. True if the ref exists
  317. """
  318. if self.read_ref(refname):
  319. return True
  320. return False
  321. def __getitem__(self, name) -> ObjectID:
  322. """Get the SHA1 for a reference name.
  323. This method follows all symbolic references.
  324. """
  325. _, sha = self.follow(name)
  326. if sha is None:
  327. raise KeyError(name)
  328. return sha
  329. def set_if_equals(
  330. self,
  331. name,
  332. old_ref,
  333. new_ref,
  334. committer=None,
  335. timestamp=None,
  336. timezone=None,
  337. message=None,
  338. ) -> bool:
  339. """Set a refname to new_ref only if it currently equals old_ref.
  340. This method follows all symbolic references if applicable for the
  341. subclass, and can be used to perform an atomic compare-and-swap
  342. operation.
  343. Args:
  344. name: The refname to set.
  345. old_ref: The old sha the refname must refer to, or None to set
  346. unconditionally.
  347. new_ref: The new sha the refname will refer to.
  348. committer: Optional committer name/email
  349. timestamp: Optional timestamp
  350. timezone: Optional timezone
  351. message: Message for reflog
  352. Returns: True if the set was successful, False otherwise.
  353. """
  354. raise NotImplementedError(self.set_if_equals)
  355. def add_if_new(
  356. self, name, ref, committer=None, timestamp=None, timezone=None, message=None
  357. ) -> bool:
  358. """Add a new reference only if it does not already exist.
  359. Args:
  360. name: Ref name
  361. ref: Ref value
  362. committer: Optional committer name/email
  363. timestamp: Optional timestamp
  364. timezone: Optional timezone
  365. message: Optional message for reflog
  366. """
  367. raise NotImplementedError(self.add_if_new)
  368. def __setitem__(self, name, ref) -> None:
  369. """Set a reference name to point to the given SHA1.
  370. This method follows all symbolic references if applicable for the
  371. subclass.
  372. Note: This method unconditionally overwrites the contents of a
  373. reference. To update atomically only if the reference has not
  374. changed, use set_if_equals().
  375. Args:
  376. name: The refname to set.
  377. ref: The new sha the refname will refer to.
  378. """
  379. if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
  380. raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
  381. self.set_if_equals(name, None, ref)
  382. def remove_if_equals(
  383. self,
  384. name,
  385. old_ref,
  386. committer=None,
  387. timestamp=None,
  388. timezone=None,
  389. message=None,
  390. ) -> bool:
  391. """Remove a refname only if it currently equals old_ref.
  392. This method does not follow symbolic references, even if applicable for
  393. the subclass. It can be used to perform an atomic compare-and-delete
  394. operation.
  395. Args:
  396. name: The refname to delete.
  397. old_ref: The old sha the refname must refer to, or None to
  398. delete unconditionally.
  399. committer: Optional committer name/email
  400. timestamp: Optional timestamp
  401. timezone: Optional timezone
  402. message: Message for reflog
  403. Returns: True if the delete was successful, False otherwise.
  404. """
  405. raise NotImplementedError(self.remove_if_equals)
  406. def __delitem__(self, name) -> None:
  407. """Remove a refname.
  408. This method does not follow symbolic references, even if applicable for
  409. the subclass.
  410. Note: This method unconditionally deletes the contents of a reference.
  411. To delete atomically only if the reference has not changed, use
  412. remove_if_equals().
  413. Args:
  414. name: The refname to delete.
  415. """
  416. self.remove_if_equals(name, None)
  417. def get_symrefs(self):
  418. """Get a dict with all symrefs in this container.
  419. Returns: Dictionary mapping source ref to target ref
  420. """
  421. ret = {}
  422. for src in self.allkeys():
  423. try:
  424. dst = parse_symref_value(self.read_ref(src))
  425. except ValueError:
  426. pass
  427. else:
  428. ret[src] = dst
  429. return ret
  430. def pack_refs(self, all: bool = False) -> None:
  431. """Pack loose refs into packed-refs file.
  432. Args:
  433. all: If True, pack all refs. If False, only pack tags.
  434. """
  435. raise NotImplementedError(self.pack_refs)
  436. class DictRefsContainer(RefsContainer):
  437. """RefsContainer backed by a simple dict.
  438. This container does not support symbolic or packed references and is not
  439. threadsafe.
  440. """
  441. def __init__(self, refs, logger=None) -> None:
  442. """Initialize DictRefsContainer."""
  443. super().__init__(logger=logger)
  444. self._refs = refs
  445. self._peeled: dict[bytes, ObjectID] = {}
  446. self._watchers: set[Any] = set()
  447. def allkeys(self):
  448. """Get all ref names.
  449. Returns:
  450. All ref names in the container
  451. """
  452. return self._refs.keys()
  453. def read_loose_ref(self, name):
  454. """Read a reference from the refs dictionary.
  455. Args:
  456. name: The ref name to read
  457. Returns:
  458. The ref value or None if not found
  459. """
  460. return self._refs.get(name, None)
  461. def get_packed_refs(self):
  462. """Get packed refs (always empty for DictRefsContainer).
  463. Returns:
  464. Empty dictionary
  465. """
  466. return {}
  467. def _notify(self, ref, newsha) -> None:
  468. for watcher in self._watchers:
  469. watcher._notify((ref, newsha))
  470. def set_symbolic_ref(
  471. self,
  472. name: Ref,
  473. other: Ref,
  474. committer=None,
  475. timestamp=None,
  476. timezone=None,
  477. message=None,
  478. ) -> None:
  479. """Make a ref point at another ref.
  480. Args:
  481. name: Name of the ref to set
  482. other: Name of the ref to point at
  483. committer: Optional committer name for reflog
  484. timestamp: Optional timestamp for reflog
  485. timezone: Optional timezone for reflog
  486. message: Optional message for reflog
  487. """
  488. old = self.follow(name)[-1]
  489. new = SYMREF + other
  490. self._refs[name] = new
  491. self._notify(name, new)
  492. self._log(
  493. name,
  494. old,
  495. new,
  496. committer=committer,
  497. timestamp=timestamp,
  498. timezone=timezone,
  499. message=message,
  500. )
  501. def set_if_equals(
  502. self,
  503. name,
  504. old_ref,
  505. new_ref,
  506. committer=None,
  507. timestamp=None,
  508. timezone=None,
  509. message=None,
  510. ) -> bool:
  511. """Set a refname to new_ref only if it currently equals old_ref.
  512. This method follows all symbolic references, and can be used to perform
  513. an atomic compare-and-swap operation.
  514. Args:
  515. name: The refname to set.
  516. old_ref: The old sha the refname must refer to, or None to set
  517. unconditionally.
  518. new_ref: The new sha the refname will refer to.
  519. committer: Optional committer name for reflog
  520. timestamp: Optional timestamp for reflog
  521. timezone: Optional timezone for reflog
  522. message: Optional message for reflog
  523. Returns:
  524. True if the set was successful, False otherwise.
  525. """
  526. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  527. return False
  528. # Only update the specific ref requested, not the whole chain
  529. self._check_refname(name)
  530. old = self._refs.get(name)
  531. self._refs[name] = new_ref
  532. self._notify(name, new_ref)
  533. self._log(
  534. name,
  535. old,
  536. new_ref,
  537. committer=committer,
  538. timestamp=timestamp,
  539. timezone=timezone,
  540. message=message,
  541. )
  542. return True
  543. def add_if_new(
  544. self,
  545. name: Ref,
  546. ref: ObjectID,
  547. committer=None,
  548. timestamp=None,
  549. timezone=None,
  550. message: Optional[bytes] = None,
  551. ) -> bool:
  552. """Add a new reference only if it does not already exist.
  553. Args:
  554. name: Ref name
  555. ref: Ref value
  556. committer: Optional committer name for reflog
  557. timestamp: Optional timestamp for reflog
  558. timezone: Optional timezone for reflog
  559. message: Optional message for reflog
  560. Returns:
  561. True if the add was successful, False otherwise.
  562. """
  563. if name in self._refs:
  564. return False
  565. self._refs[name] = ref
  566. self._notify(name, ref)
  567. self._log(
  568. name,
  569. None,
  570. ref,
  571. committer=committer,
  572. timestamp=timestamp,
  573. timezone=timezone,
  574. message=message,
  575. )
  576. return True
  577. def remove_if_equals(
  578. self,
  579. name,
  580. old_ref,
  581. committer=None,
  582. timestamp=None,
  583. timezone=None,
  584. message=None,
  585. ) -> bool:
  586. """Remove a refname only if it currently equals old_ref.
  587. This method does not follow symbolic references. It can be used to
  588. perform an atomic compare-and-delete operation.
  589. Args:
  590. name: The refname to delete.
  591. old_ref: The old sha the refname must refer to, or None to
  592. delete unconditionally.
  593. committer: Optional committer name for reflog
  594. timestamp: Optional timestamp for reflog
  595. timezone: Optional timezone for reflog
  596. message: Optional message for reflog
  597. Returns:
  598. True if the delete was successful, False otherwise.
  599. """
  600. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  601. return False
  602. try:
  603. old = self._refs.pop(name)
  604. except KeyError:
  605. pass
  606. else:
  607. self._notify(name, None)
  608. self._log(
  609. name,
  610. old,
  611. None,
  612. committer=committer,
  613. timestamp=timestamp,
  614. timezone=timezone,
  615. message=message,
  616. )
  617. return True
  618. def get_peeled(self, name):
  619. """Get the peeled value of a ref.
  620. Args:
  621. name: Ref name to get peeled value for
  622. Returns:
  623. The peeled SHA or None if not available
  624. """
  625. return self._peeled.get(name)
  626. def _update(self, refs) -> None:
  627. """Update multiple refs; intended only for testing."""
  628. # TODO(dborowitz): replace this with a public function that uses
  629. # set_if_equal.
  630. for ref, sha in refs.items():
  631. self.set_if_equals(ref, None, sha)
  632. def _update_peeled(self, peeled) -> None:
  633. """Update cached peeled refs; intended only for testing."""
  634. self._peeled.update(peeled)
  635. class InfoRefsContainer(RefsContainer):
  636. """Refs container that reads refs from a info/refs file."""
  637. def __init__(self, f) -> None:
  638. """Initialize an InfoRefsContainer.
  639. Args:
  640. f: File-like object containing info/refs data
  641. """
  642. self._refs = {}
  643. self._peeled = {}
  644. refs = read_info_refs(f)
  645. (self._refs, self._peeled) = split_peeled_refs(refs)
  646. def allkeys(self):
  647. """Get all ref names.
  648. Returns:
  649. All ref names in the info/refs file
  650. """
  651. return self._refs.keys()
  652. def read_loose_ref(self, name):
  653. """Read a reference from the parsed info/refs.
  654. Args:
  655. name: The ref name to read
  656. Returns:
  657. The ref value or None if not found
  658. """
  659. return self._refs.get(name, None)
  660. def get_packed_refs(self):
  661. """Get packed refs (always empty for InfoRefsContainer).
  662. Returns:
  663. Empty dictionary
  664. """
  665. return {}
  666. def get_peeled(self, name):
  667. """Get the peeled value of a ref.
  668. Args:
  669. name: Ref name to get peeled value for
  670. Returns:
  671. The peeled SHA if available, otherwise the ref value itself
  672. Raises:
  673. KeyError: If the ref doesn't exist
  674. """
  675. try:
  676. return self._peeled[name]
  677. except KeyError:
  678. return self._refs[name]
  679. class DiskRefsContainer(RefsContainer):
  680. """Refs container that reads refs from disk."""
  681. def __init__(
  682. self,
  683. path: Union[str, bytes, os.PathLike],
  684. worktree_path: Optional[Union[str, bytes, os.PathLike]] = None,
  685. logger=None,
  686. ) -> None:
  687. """Initialize DiskRefsContainer."""
  688. super().__init__(logger=logger)
  689. # Convert path-like objects to strings, then to bytes for Git compatibility
  690. self.path = os.fsencode(os.fspath(path))
  691. if worktree_path is None:
  692. self.worktree_path = self.path
  693. else:
  694. self.worktree_path = os.fsencode(os.fspath(worktree_path))
  695. self._packed_refs = None
  696. self._peeled_refs = None
  697. def __repr__(self) -> str:
  698. """Return string representation of DiskRefsContainer."""
  699. return f"{self.__class__.__name__}({self.path!r})"
  700. def subkeys(self, base):
  701. """Get all ref names under a base ref.
  702. Args:
  703. base: Base ref path to search under
  704. Returns:
  705. Set of ref names under the base (without base prefix)
  706. """
  707. subkeys = set()
  708. path = self.refpath(base)
  709. for root, unused_dirs, files in os.walk(path):
  710. directory = root[len(path) :]
  711. if os.path.sep != "/":
  712. directory = directory.replace(os.fsencode(os.path.sep), b"/")
  713. directory = directory.strip(b"/")
  714. for filename in files:
  715. refname = b"/".join(([directory] if directory else []) + [filename])
  716. # check_ref_format requires at least one /, so we prepend the
  717. # base before calling it.
  718. if check_ref_format(base + b"/" + refname):
  719. subkeys.add(refname)
  720. for key in self.get_packed_refs():
  721. if key.startswith(base):
  722. subkeys.add(key[len(base) :].strip(b"/"))
  723. return subkeys
  724. def allkeys(self):
  725. """Get all ref names from disk.
  726. Returns:
  727. Set of all ref names (both loose and packed)
  728. """
  729. allkeys = set()
  730. if os.path.exists(self.refpath(HEADREF)):
  731. allkeys.add(HEADREF)
  732. path = self.refpath(b"")
  733. refspath = self.refpath(b"refs")
  734. for root, unused_dirs, files in os.walk(refspath):
  735. directory = root[len(path) :]
  736. if os.path.sep != "/":
  737. directory = directory.replace(os.fsencode(os.path.sep), b"/")
  738. for filename in files:
  739. refname = b"/".join([directory, filename])
  740. if check_ref_format(refname):
  741. allkeys.add(refname)
  742. allkeys.update(self.get_packed_refs())
  743. return allkeys
  744. def refpath(self, name):
  745. """Return the disk path of a ref."""
  746. if os.path.sep != "/":
  747. name = name.replace(b"/", os.fsencode(os.path.sep))
  748. # TODO: as the 'HEAD' reference is working tree specific, it
  749. # should actually not be a part of RefsContainer
  750. if name == HEADREF:
  751. return os.path.join(self.worktree_path, name)
  752. else:
  753. return os.path.join(self.path, name)
  754. def get_packed_refs(self):
  755. """Get contents of the packed-refs file.
  756. Returns: Dictionary mapping ref names to SHA1s
  757. Note: Will return an empty dictionary when no packed-refs file is
  758. present.
  759. """
  760. # TODO: invalidate the cache on repacking
  761. if self._packed_refs is None:
  762. # set both to empty because we want _peeled_refs to be
  763. # None if and only if _packed_refs is also None.
  764. self._packed_refs = {}
  765. self._peeled_refs = {}
  766. path = os.path.join(self.path, b"packed-refs")
  767. try:
  768. f = GitFile(path, "rb")
  769. except FileNotFoundError:
  770. return {}
  771. with f:
  772. first_line = next(iter(f)).rstrip()
  773. if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
  774. for sha, name, peeled in read_packed_refs_with_peeled(f):
  775. self._packed_refs[name] = sha
  776. if peeled:
  777. self._peeled_refs[name] = peeled
  778. else:
  779. f.seek(0)
  780. for sha, name in read_packed_refs(f):
  781. self._packed_refs[name] = sha
  782. return self._packed_refs
  783. def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
  784. """Add the given refs as packed refs.
  785. Args:
  786. new_refs: A mapping of ref names to targets; if a target is None that
  787. means remove the ref
  788. """
  789. if not new_refs:
  790. return
  791. path = os.path.join(self.path, b"packed-refs")
  792. with GitFile(path, "wb") as f:
  793. # reread cached refs from disk, while holding the lock
  794. packed_refs = self.get_packed_refs().copy()
  795. for ref, target in new_refs.items():
  796. # sanity check
  797. if ref == HEADREF:
  798. raise ValueError("cannot pack HEAD")
  799. # remove any loose refs pointing to this one -- please
  800. # note that this bypasses remove_if_equals as we don't
  801. # want to affect packed refs in here
  802. with suppress(OSError):
  803. os.remove(self.refpath(ref))
  804. if target is not None:
  805. packed_refs[ref] = target
  806. else:
  807. packed_refs.pop(ref, None)
  808. write_packed_refs(f, packed_refs, self._peeled_refs)
  809. self._packed_refs = packed_refs
  810. def get_peeled(self, name):
  811. """Return the cached peeled value of a ref, if available.
  812. Args:
  813. name: Name of the ref to peel
  814. Returns: The peeled value of the ref. If the ref is known not point to
  815. a tag, this will be the SHA the ref refers to. If the ref may point
  816. to a tag, but no cached information is available, None is returned.
  817. """
  818. self.get_packed_refs()
  819. if self._peeled_refs is None or name not in self._packed_refs:
  820. # No cache: no peeled refs were read, or this ref is loose
  821. return None
  822. if name in self._peeled_refs:
  823. return self._peeled_refs[name]
  824. else:
  825. # Known not peelable
  826. return self[name]
  827. def read_loose_ref(self, name):
  828. """Read a reference file and return its contents.
  829. If the reference file a symbolic reference, only read the first line of
  830. the file. Otherwise, only read the first 40 bytes.
  831. Args:
  832. name: the refname to read, relative to refpath
  833. Returns: The contents of the ref file, or None if the file does not
  834. exist.
  835. Raises:
  836. IOError: if any other error occurs
  837. """
  838. filename = self.refpath(name)
  839. try:
  840. with GitFile(filename, "rb") as f:
  841. header = f.read(len(SYMREF))
  842. if header == SYMREF:
  843. # Read only the first line
  844. return header + next(iter(f)).rstrip(b"\r\n")
  845. else:
  846. # Read only the first 40 bytes
  847. return header + f.read(40 - len(SYMREF))
  848. except (OSError, UnicodeError):
  849. # don't assume anything specific about the error; in
  850. # particular, invalid or forbidden paths can raise weird
  851. # errors depending on the specific operating system
  852. return None
  853. def _remove_packed_ref(self, name) -> None:
  854. if self._packed_refs is None:
  855. return
  856. filename = os.path.join(self.path, b"packed-refs")
  857. # reread cached refs from disk, while holding the lock
  858. f = GitFile(filename, "wb")
  859. try:
  860. self._packed_refs = None
  861. self.get_packed_refs()
  862. if name not in self._packed_refs:
  863. f.abort()
  864. return
  865. del self._packed_refs[name]
  866. with suppress(KeyError):
  867. del self._peeled_refs[name]
  868. write_packed_refs(f, self._packed_refs, self._peeled_refs)
  869. f.close()
  870. except BaseException:
  871. f.abort()
  872. raise
  873. def set_symbolic_ref(
  874. self,
  875. name,
  876. other,
  877. committer=None,
  878. timestamp=None,
  879. timezone=None,
  880. message=None,
  881. ) -> None:
  882. """Make a ref point at another ref.
  883. Args:
  884. name: Name of the ref to set
  885. other: Name of the ref to point at
  886. committer: Optional committer name
  887. timestamp: Optional timestamp
  888. timezone: Optional timezone
  889. message: Optional message to describe the change
  890. """
  891. self._check_refname(name)
  892. self._check_refname(other)
  893. filename = self.refpath(name)
  894. f = GitFile(filename, "wb")
  895. try:
  896. f.write(SYMREF + other + b"\n")
  897. sha = self.follow(name)[-1]
  898. self._log(
  899. name,
  900. sha,
  901. sha,
  902. committer=committer,
  903. timestamp=timestamp,
  904. timezone=timezone,
  905. message=message,
  906. )
  907. except BaseException:
  908. f.abort()
  909. raise
  910. else:
  911. f.close()
  912. def set_if_equals(
  913. self,
  914. name,
  915. old_ref,
  916. new_ref,
  917. committer=None,
  918. timestamp=None,
  919. timezone=None,
  920. message=None,
  921. ) -> bool:
  922. """Set a refname to new_ref only if it currently equals old_ref.
  923. This method follows all symbolic references, and can be used to perform
  924. an atomic compare-and-swap operation.
  925. Args:
  926. name: The refname to set.
  927. old_ref: The old sha the refname must refer to, or None to set
  928. unconditionally.
  929. new_ref: The new sha the refname will refer to.
  930. committer: Optional committer name
  931. timestamp: Optional timestamp
  932. timezone: Optional timezone
  933. message: Set message for reflog
  934. Returns: True if the set was successful, False otherwise.
  935. """
  936. self._check_refname(name)
  937. try:
  938. realnames, _ = self.follow(name)
  939. realname = realnames[-1]
  940. except (KeyError, IndexError, SymrefLoop):
  941. realname = name
  942. filename = self.refpath(realname)
  943. # make sure none of the ancestor folders is in packed refs
  944. probe_ref = os.path.dirname(realname)
  945. packed_refs = self.get_packed_refs()
  946. while probe_ref:
  947. if packed_refs.get(probe_ref, None) is not None:
  948. raise NotADirectoryError(filename)
  949. probe_ref = os.path.dirname(probe_ref)
  950. ensure_dir_exists(os.path.dirname(filename))
  951. with GitFile(filename, "wb") as f:
  952. if old_ref is not None:
  953. try:
  954. # read again while holding the lock to handle race conditions
  955. orig_ref = self.read_loose_ref(realname)
  956. if orig_ref is None:
  957. orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
  958. if orig_ref != old_ref:
  959. f.abort()
  960. return False
  961. except OSError:
  962. f.abort()
  963. raise
  964. # Check if ref already has the desired value while holding the lock
  965. # This avoids fsync when ref is unchanged but still detects lock conflicts
  966. current_ref = self.read_loose_ref(realname)
  967. if current_ref is None:
  968. current_ref = packed_refs.get(realname, None)
  969. if current_ref is not None and current_ref == new_ref:
  970. # Ref already has desired value, abort write to avoid fsync
  971. f.abort()
  972. return True
  973. try:
  974. f.write(new_ref + b"\n")
  975. except OSError:
  976. f.abort()
  977. raise
  978. self._log(
  979. realname,
  980. old_ref,
  981. new_ref,
  982. committer=committer,
  983. timestamp=timestamp,
  984. timezone=timezone,
  985. message=message,
  986. )
  987. return True
  988. def add_if_new(
  989. self,
  990. name: bytes,
  991. ref: bytes,
  992. committer=None,
  993. timestamp=None,
  994. timezone=None,
  995. message: Optional[bytes] = None,
  996. ) -> bool:
  997. """Add a new reference only if it does not already exist.
  998. This method follows symrefs, and only ensures that the last ref in the
  999. chain does not exist.
  1000. Args:
  1001. name: The refname to set.
  1002. ref: The new sha the refname will refer to.
  1003. committer: Optional committer name
  1004. timestamp: Optional timestamp
  1005. timezone: Optional timezone
  1006. message: Optional message for reflog
  1007. Returns: True if the add was successful, False otherwise.
  1008. """
  1009. try:
  1010. realnames, contents = self.follow(name)
  1011. if contents is not None:
  1012. return False
  1013. realname = realnames[-1]
  1014. except (KeyError, IndexError):
  1015. realname = name
  1016. self._check_refname(realname)
  1017. filename = self.refpath(realname)
  1018. ensure_dir_exists(os.path.dirname(filename))
  1019. with GitFile(filename, "wb") as f:
  1020. if os.path.exists(filename) or name in self.get_packed_refs():
  1021. f.abort()
  1022. return False
  1023. try:
  1024. f.write(ref + b"\n")
  1025. except OSError:
  1026. f.abort()
  1027. raise
  1028. else:
  1029. self._log(
  1030. name,
  1031. None,
  1032. ref,
  1033. committer=committer,
  1034. timestamp=timestamp,
  1035. timezone=timezone,
  1036. message=message,
  1037. )
  1038. return True
  1039. def remove_if_equals(
  1040. self,
  1041. name,
  1042. old_ref,
  1043. committer=None,
  1044. timestamp=None,
  1045. timezone=None,
  1046. message=None,
  1047. ) -> bool:
  1048. """Remove a refname only if it currently equals old_ref.
  1049. This method does not follow symbolic references. It can be used to
  1050. perform an atomic compare-and-delete operation.
  1051. Args:
  1052. name: The refname to delete.
  1053. old_ref: The old sha the refname must refer to, or None to
  1054. delete unconditionally.
  1055. committer: Optional committer name
  1056. timestamp: Optional timestamp
  1057. timezone: Optional timezone
  1058. message: Optional message
  1059. Returns: True if the delete was successful, False otherwise.
  1060. """
  1061. self._check_refname(name)
  1062. filename = self.refpath(name)
  1063. ensure_dir_exists(os.path.dirname(filename))
  1064. f = GitFile(filename, "wb")
  1065. try:
  1066. if old_ref is not None:
  1067. orig_ref = self.read_loose_ref(name)
  1068. if orig_ref is None:
  1069. orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
  1070. if orig_ref != old_ref:
  1071. return False
  1072. # remove the reference file itself
  1073. try:
  1074. found = os.path.lexists(filename)
  1075. except OSError:
  1076. # may only be packed, or otherwise unstorable
  1077. found = False
  1078. if found:
  1079. os.remove(filename)
  1080. self._remove_packed_ref(name)
  1081. self._log(
  1082. name,
  1083. old_ref,
  1084. None,
  1085. committer=committer,
  1086. timestamp=timestamp,
  1087. timezone=timezone,
  1088. message=message,
  1089. )
  1090. finally:
  1091. # never write, we just wanted the lock
  1092. f.abort()
  1093. # outside of the lock, clean-up any parent directory that might now
  1094. # be empty. this ensures that re-creating a reference of the same
  1095. # name of what was previously a directory works as expected
  1096. parent = name
  1097. while True:
  1098. try:
  1099. parent, _ = parent.rsplit(b"/", 1)
  1100. except ValueError:
  1101. break
  1102. if parent == b"refs":
  1103. break
  1104. parent_filename = self.refpath(parent)
  1105. try:
  1106. os.rmdir(parent_filename)
  1107. except OSError:
  1108. # this can be caused by the parent directory being
  1109. # removed by another process, being not empty, etc.
  1110. # in any case, this is non fatal because we already
  1111. # removed the reference, just ignore it
  1112. break
  1113. return True
  1114. def pack_refs(self, all: bool = False) -> None:
  1115. """Pack loose refs into packed-refs file.
  1116. Args:
  1117. all: If True, pack all refs. If False, only pack tags.
  1118. """
  1119. refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
  1120. for ref in self.allkeys():
  1121. if ref == HEADREF:
  1122. # Never pack HEAD
  1123. continue
  1124. if all or ref.startswith(LOCAL_TAG_PREFIX):
  1125. try:
  1126. sha = self[ref]
  1127. if sha:
  1128. refs_to_pack[ref] = sha
  1129. except KeyError:
  1130. # Broken ref, skip it
  1131. pass
  1132. if refs_to_pack:
  1133. self.add_packed_refs(refs_to_pack)
  1134. def _split_ref_line(line):
  1135. """Split a single ref line into a tuple of SHA1 and name."""
  1136. fields = line.rstrip(b"\n\r").split(b" ")
  1137. if len(fields) != 2:
  1138. raise PackedRefsException(f"invalid ref line {line!r}")
  1139. sha, name = fields
  1140. if not valid_hexsha(sha):
  1141. raise PackedRefsException(f"Invalid hex sha {sha!r}")
  1142. if not check_ref_format(name):
  1143. raise PackedRefsException(f"invalid ref name {name!r}")
  1144. return (sha, name)
  1145. def read_packed_refs(f):
  1146. """Read a packed refs file.
  1147. Args:
  1148. f: file-like object to read from
  1149. Returns: Iterator over tuples with SHA1s and ref names.
  1150. """
  1151. for line in f:
  1152. if line.startswith(b"#"):
  1153. # Comment
  1154. continue
  1155. if line.startswith(b"^"):
  1156. raise PackedRefsException("found peeled ref in packed-refs without peeled")
  1157. yield _split_ref_line(line)
  1158. def read_packed_refs_with_peeled(f):
  1159. """Read a packed refs file including peeled refs.
  1160. Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
  1161. with ref names, SHA1s, and peeled SHA1s (or None).
  1162. Args:
  1163. f: file-like object to read from, seek'ed to the second line
  1164. """
  1165. last = None
  1166. for line in f:
  1167. if line[0] == b"#":
  1168. continue
  1169. line = line.rstrip(b"\r\n")
  1170. if line.startswith(b"^"):
  1171. if not last:
  1172. raise PackedRefsException("unexpected peeled ref line")
  1173. if not valid_hexsha(line[1:]):
  1174. raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
  1175. sha, name = _split_ref_line(last)
  1176. last = None
  1177. yield (sha, name, line[1:])
  1178. else:
  1179. if last:
  1180. sha, name = _split_ref_line(last)
  1181. yield (sha, name, None)
  1182. last = line
  1183. if last:
  1184. sha, name = _split_ref_line(last)
  1185. yield (sha, name, None)
  1186. def write_packed_refs(f, packed_refs, peeled_refs=None) -> None:
  1187. """Write a packed refs file.
  1188. Args:
  1189. f: empty file-like object to write to
  1190. packed_refs: dict of refname to sha of packed refs to write
  1191. peeled_refs: dict of refname to peeled value of sha
  1192. """
  1193. if peeled_refs is None:
  1194. peeled_refs = {}
  1195. else:
  1196. f.write(b"# pack-refs with: peeled\n")
  1197. for refname in sorted(packed_refs.keys()):
  1198. f.write(git_line(packed_refs[refname], refname))
  1199. if refname in peeled_refs:
  1200. f.write(b"^" + peeled_refs[refname] + b"\n")
  1201. def read_info_refs(f):
  1202. """Read info/refs file.
  1203. Args:
  1204. f: File-like object to read from
  1205. Returns:
  1206. Dictionary mapping ref names to SHA1s
  1207. """
  1208. ret = {}
  1209. for line in f.readlines():
  1210. (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
  1211. ret[name] = sha
  1212. return ret
  1213. def write_info_refs(refs, store: ObjectContainer):
  1214. """Generate info refs."""
  1215. # TODO: Avoid recursive import :(
  1216. from .object_store import peel_sha
  1217. for name, sha in sorted(refs.items()):
  1218. # get_refs() includes HEAD as a special case, but we don't want to
  1219. # advertise it
  1220. if name == HEADREF:
  1221. continue
  1222. try:
  1223. o = store[sha]
  1224. except KeyError:
  1225. continue
  1226. unpeeled, peeled = peel_sha(store, sha)
  1227. yield o.id + b"\t" + name + b"\n"
  1228. if o.id != peeled.id:
  1229. yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
  1230. def is_local_branch(x):
  1231. """Check if a ref name refers to a local branch.
  1232. Args:
  1233. x: Ref name to check
  1234. Returns:
  1235. True if ref is a local branch (refs/heads/...)
  1236. """
  1237. return x.startswith(LOCAL_BRANCH_PREFIX)
  1238. def strip_peeled_refs(refs):
  1239. """Remove all peeled refs."""
  1240. return {
  1241. ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
  1242. }
  1243. def split_peeled_refs(refs):
  1244. """Split peeled refs from regular refs."""
  1245. peeled = {}
  1246. regular = {}
  1247. for ref, sha in refs.items():
  1248. if ref.endswith(PEELED_TAG_SUFFIX):
  1249. peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
  1250. else:
  1251. regular[ref] = sha
  1252. return regular, peeled
  1253. def _set_origin_head(refs, origin, origin_head) -> None:
  1254. # set refs/remotes/origin/HEAD
  1255. origin_base = b"refs/remotes/" + origin + b"/"
  1256. if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1257. origin_ref = origin_base + HEADREF
  1258. target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
  1259. if target_ref in refs:
  1260. refs.set_symbolic_ref(origin_ref, target_ref)
  1261. def _set_default_branch(
  1262. refs: RefsContainer,
  1263. origin: bytes,
  1264. origin_head: Optional[bytes],
  1265. branch: bytes,
  1266. ref_message: Optional[bytes],
  1267. ) -> bytes:
  1268. """Set the default branch."""
  1269. origin_base = b"refs/remotes/" + origin + b"/"
  1270. if branch:
  1271. origin_ref = origin_base + branch
  1272. if origin_ref in refs:
  1273. local_ref = LOCAL_BRANCH_PREFIX + branch
  1274. refs.add_if_new(local_ref, refs[origin_ref], ref_message)
  1275. head_ref = local_ref
  1276. elif LOCAL_TAG_PREFIX + branch in refs:
  1277. head_ref = LOCAL_TAG_PREFIX + branch
  1278. else:
  1279. raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
  1280. elif origin_head:
  1281. head_ref = origin_head
  1282. if origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1283. origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
  1284. else:
  1285. origin_ref = origin_head
  1286. try:
  1287. refs.add_if_new(head_ref, refs[origin_ref], ref_message)
  1288. except KeyError:
  1289. pass
  1290. else:
  1291. raise ValueError("neither origin_head nor branch are provided")
  1292. return head_ref
  1293. def _set_head(refs, head_ref, ref_message):
  1294. if head_ref.startswith(LOCAL_TAG_PREFIX):
  1295. # detach HEAD at specified tag
  1296. head = refs[head_ref]
  1297. if isinstance(head, Tag):
  1298. _cls, obj = head.object
  1299. head = obj.get_object(obj).id
  1300. del refs[HEADREF]
  1301. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1302. else:
  1303. # set HEAD to specific branch
  1304. try:
  1305. head = refs[head_ref]
  1306. refs.set_symbolic_ref(HEADREF, head_ref)
  1307. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1308. except KeyError:
  1309. head = None
  1310. return head
  1311. def _import_remote_refs(
  1312. refs_container: RefsContainer,
  1313. remote_name: str,
  1314. refs: dict[str, str],
  1315. message: Optional[bytes] = None,
  1316. prune: bool = False,
  1317. prune_tags: bool = False,
  1318. ) -> None:
  1319. stripped_refs = strip_peeled_refs(refs)
  1320. branches = {
  1321. n[len(LOCAL_BRANCH_PREFIX) :]: v
  1322. for (n, v) in stripped_refs.items()
  1323. if n.startswith(LOCAL_BRANCH_PREFIX)
  1324. }
  1325. refs_container.import_refs(
  1326. b"refs/remotes/" + remote_name.encode(),
  1327. branches,
  1328. message=message,
  1329. prune=prune,
  1330. )
  1331. tags = {
  1332. n[len(LOCAL_TAG_PREFIX) :]: v
  1333. for (n, v) in stripped_refs.items()
  1334. if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
  1335. }
  1336. refs_container.import_refs(
  1337. LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
  1338. )
  1339. def serialize_refs(store, refs):
  1340. """Serialize refs with peeled refs.
  1341. Args:
  1342. store: Object store to peel refs from
  1343. refs: Dictionary of ref names to SHAs
  1344. Returns:
  1345. Dictionary with refs and peeled refs (marked with ^{})
  1346. """
  1347. # TODO: Avoid recursive import :(
  1348. from .object_store import peel_sha
  1349. ret = {}
  1350. for ref, sha in refs.items():
  1351. try:
  1352. unpeeled, peeled = peel_sha(store, sha)
  1353. except KeyError:
  1354. warnings.warn(
  1355. "ref {} points at non-present sha {}".format(
  1356. ref.decode("utf-8", "replace"), sha.decode("ascii")
  1357. ),
  1358. UserWarning,
  1359. )
  1360. continue
  1361. else:
  1362. if isinstance(unpeeled, Tag):
  1363. ret[ref + PEELED_TAG_SUFFIX] = peeled.id
  1364. ret[ref] = unpeeled.id
  1365. return ret
  1366. class locked_ref:
  1367. """Lock a ref while making modifications.
  1368. Works as a context manager.
  1369. """
  1370. def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
  1371. """Initialize a locked ref.
  1372. Args:
  1373. refs_container: The DiskRefsContainer to lock the ref in
  1374. refname: The ref name to lock
  1375. """
  1376. self._refs_container = refs_container
  1377. self._refname = refname
  1378. self._file: Optional[_GitFile] = None
  1379. self._realname: Optional[Ref] = None
  1380. self._deleted = False
  1381. def __enter__(self) -> "locked_ref":
  1382. """Enter the context manager and acquire the lock.
  1383. Returns:
  1384. This locked_ref instance
  1385. Raises:
  1386. OSError: If the lock cannot be acquired
  1387. """
  1388. self._refs_container._check_refname(self._refname)
  1389. try:
  1390. realnames, _ = self._refs_container.follow(self._refname)
  1391. self._realname = realnames[-1]
  1392. except (KeyError, IndexError, SymrefLoop):
  1393. self._realname = self._refname
  1394. filename = self._refs_container.refpath(self._realname)
  1395. ensure_dir_exists(os.path.dirname(filename))
  1396. f = GitFile(filename, "wb")
  1397. self._file = f
  1398. return self
  1399. def __exit__(
  1400. self,
  1401. exc_type: Optional[type],
  1402. exc_value: Optional[BaseException],
  1403. traceback: Optional[types.TracebackType],
  1404. ) -> None:
  1405. """Exit the context manager and release the lock.
  1406. Args:
  1407. exc_type: Type of exception if one occurred
  1408. exc_value: Exception instance if one occurred
  1409. traceback: Traceback if an exception occurred
  1410. """
  1411. if self._file:
  1412. if exc_type is not None or self._deleted:
  1413. self._file.abort()
  1414. else:
  1415. self._file.close()
  1416. def get(self) -> Optional[bytes]:
  1417. """Get the current value of the ref."""
  1418. if not self._file:
  1419. raise RuntimeError("locked_ref not in context")
  1420. current_ref = self._refs_container.read_loose_ref(self._realname)
  1421. if current_ref is None:
  1422. current_ref = self._refs_container.get_packed_refs().get(
  1423. self._realname, None
  1424. )
  1425. return current_ref
  1426. def ensure_equals(self, expected_value: Optional[bytes]) -> bool:
  1427. """Ensure the ref currently equals the expected value.
  1428. Args:
  1429. expected_value: The expected current value of the ref
  1430. Returns:
  1431. True if the ref equals the expected value, False otherwise
  1432. """
  1433. current_value = self.get()
  1434. return current_value == expected_value
  1435. def set(self, new_ref: bytes) -> None:
  1436. """Set the ref to a new value.
  1437. Args:
  1438. new_ref: The new SHA1 or symbolic ref value
  1439. """
  1440. if not self._file:
  1441. raise RuntimeError("locked_ref not in context")
  1442. if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
  1443. raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
  1444. self._file.seek(0)
  1445. self._file.truncate()
  1446. self._file.write(new_ref + b"\n")
  1447. self._deleted = False
  1448. def set_symbolic_ref(self, target: Ref) -> None:
  1449. """Make this ref point at another ref.
  1450. Args:
  1451. target: Name of the ref to point at
  1452. """
  1453. if not self._file:
  1454. raise RuntimeError("locked_ref not in context")
  1455. self._refs_container._check_refname(target)
  1456. self._file.seek(0)
  1457. self._file.truncate()
  1458. self._file.write(SYMREF + target + b"\n")
  1459. self._deleted = False
  1460. def delete(self) -> None:
  1461. """Delete the ref file while holding the lock."""
  1462. if not self._file:
  1463. raise RuntimeError("locked_ref not in context")
  1464. # Delete the actual ref file while holding the lock
  1465. if self._realname:
  1466. filename = self._refs_container.refpath(self._realname)
  1467. try:
  1468. if os.path.lexists(filename):
  1469. os.remove(filename)
  1470. except FileNotFoundError:
  1471. pass
  1472. self._refs_container._remove_packed_ref(self._realname)
  1473. self._deleted = True