refs.py 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699
  1. # refs.py -- For dealing with git refs
  2. # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Ref handling."""
  22. import os
  23. import types
  24. import warnings
  25. from collections.abc import Iterator
  26. from contextlib import suppress
  27. from typing import TYPE_CHECKING, Any, Optional, Union
  28. if TYPE_CHECKING:
  29. from .file import _GitFile
  30. from .errors import PackedRefsException, RefFormatError
  31. from .file import GitFile, ensure_dir_exists
  32. from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
  33. from .pack import ObjectContainer
  34. Ref = bytes
  35. HEADREF = b"HEAD"
  36. SYMREF = b"ref: "
  37. LOCAL_BRANCH_PREFIX = b"refs/heads/"
  38. LOCAL_TAG_PREFIX = b"refs/tags/"
  39. LOCAL_REMOTE_PREFIX = b"refs/remotes/"
  40. LOCAL_NOTES_PREFIX = b"refs/notes/"
  41. BAD_REF_CHARS = set(b"\177 ~^:?*[")
  42. PEELED_TAG_SUFFIX = b"^{}"
  43. # For backwards compatibility
  44. ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
  45. class SymrefLoop(Exception):
  46. """There is a loop between one or more symrefs."""
  47. def __init__(self, ref, depth) -> None:
  48. """Initialize a SymrefLoop exception.
  49. Args:
  50. ref: The ref that caused the loop
  51. depth: Depth at which the loop was detected
  52. """
  53. self.ref = ref
  54. self.depth = depth
  55. def parse_symref_value(contents: bytes) -> bytes:
  56. """Parse a symref value.
  57. Args:
  58. contents: Contents to parse
  59. Returns: Destination
  60. """
  61. if contents.startswith(SYMREF):
  62. return contents[len(SYMREF) :].rstrip(b"\r\n")
  63. raise ValueError(contents)
  64. def check_ref_format(refname: Ref) -> bool:
  65. """Check if a refname is correctly formatted.
  66. Implements all the same rules as git-check-ref-format[1].
  67. [1]
  68. http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
  69. Args:
  70. refname: The refname to check
  71. Returns: True if refname is valid, False otherwise
  72. """
  73. # These could be combined into one big expression, but are listed
  74. # separately to parallel [1].
  75. if b"/." in refname or refname.startswith(b"."):
  76. return False
  77. if b"/" not in refname:
  78. return False
  79. if b".." in refname:
  80. return False
  81. for i, c in enumerate(refname):
  82. if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
  83. return False
  84. if refname[-1] in b"/.":
  85. return False
  86. if refname.endswith(b".lock"):
  87. return False
  88. if b"@{" in refname:
  89. return False
  90. if b"\\" in refname:
  91. return False
  92. return True
  93. def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
  94. """Parse a remote ref into remote name and branch name.
  95. Args:
  96. ref: Remote ref like b"refs/remotes/origin/main"
  97. Returns:
  98. Tuple of (remote_name, branch_name)
  99. Raises:
  100. ValueError: If ref is not a valid remote ref
  101. """
  102. if not ref.startswith(LOCAL_REMOTE_PREFIX):
  103. raise ValueError(f"Not a remote ref: {ref!r}")
  104. # Remove the prefix
  105. remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
  106. # Split into remote name and branch name
  107. parts = remainder.split(b"/", 1)
  108. if len(parts) != 2:
  109. raise ValueError(f"Invalid remote ref format: {ref!r}")
  110. remote_name, branch_name = parts
  111. return (remote_name, branch_name)
  112. class RefsContainer:
  113. """A container for refs."""
  114. def __init__(self, logger=None) -> None:
  115. """Initialize a RefsContainer.
  116. Args:
  117. logger: Optional logger for reflog updates
  118. """
  119. self._logger = logger
  120. def _log(
  121. self,
  122. ref,
  123. old_sha,
  124. new_sha,
  125. committer=None,
  126. timestamp=None,
  127. timezone=None,
  128. message=None,
  129. ) -> None:
  130. if self._logger is None:
  131. return
  132. if message is None:
  133. return
  134. self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
  135. def set_symbolic_ref(
  136. self,
  137. name,
  138. other,
  139. committer=None,
  140. timestamp=None,
  141. timezone=None,
  142. message=None,
  143. ) -> None:
  144. """Make a ref point at another ref.
  145. Args:
  146. name: Name of the ref to set
  147. other: Name of the ref to point at
  148. message: Optional message
  149. """
  150. raise NotImplementedError(self.set_symbolic_ref)
  151. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  152. """Get contents of the packed-refs file.
  153. Returns: Dictionary mapping ref names to SHA1s
  154. Note: Will return an empty dictionary when no packed-refs file is
  155. present.
  156. """
  157. raise NotImplementedError(self.get_packed_refs)
  158. def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
  159. """Add the given refs as packed refs.
  160. Args:
  161. new_refs: A mapping of ref names to targets; if a target is None that
  162. means remove the ref
  163. """
  164. raise NotImplementedError(self.add_packed_refs)
  165. def get_peeled(self, name) -> Optional[ObjectID]:
  166. """Return the cached peeled value of a ref, if available.
  167. Args:
  168. name: Name of the ref to peel
  169. Returns: The peeled value of the ref. If the ref is known not point to
  170. a tag, this will be the SHA the ref refers to. If the ref may point
  171. to a tag, but no cached information is available, None is returned.
  172. """
  173. return None
  174. def import_refs(
  175. self,
  176. base: Ref,
  177. other: dict[Ref, ObjectID],
  178. committer: Optional[bytes] = None,
  179. timestamp: Optional[bytes] = None,
  180. timezone: Optional[bytes] = None,
  181. message: Optional[bytes] = None,
  182. prune: bool = False,
  183. ) -> None:
  184. """Import refs from another repository.
  185. Args:
  186. base: Base ref to import into (e.g., b'refs/remotes/origin')
  187. other: Dictionary of refs to import
  188. committer: Optional committer for reflog
  189. timestamp: Optional timestamp for reflog
  190. timezone: Optional timezone for reflog
  191. message: Optional message for reflog
  192. prune: If True, remove refs not in other
  193. """
  194. if prune:
  195. to_delete = set(self.subkeys(base))
  196. else:
  197. to_delete = set()
  198. for name, value in other.items():
  199. if value is None:
  200. to_delete.add(name)
  201. else:
  202. self.set_if_equals(
  203. b"/".join((base, name)), None, value, message=message
  204. )
  205. if to_delete:
  206. try:
  207. to_delete.remove(name)
  208. except KeyError:
  209. pass
  210. for ref in to_delete:
  211. self.remove_if_equals(b"/".join((base, ref)), None, message=message)
  212. def allkeys(self) -> Iterator[Ref]:
  213. """All refs present in this container."""
  214. raise NotImplementedError(self.allkeys)
  215. def __iter__(self):
  216. """Iterate over all ref names."""
  217. return iter(self.allkeys())
  218. def keys(self, base=None):
  219. """Refs present in this container.
  220. Args:
  221. base: An optional base to return refs under.
  222. Returns: An unsorted set of valid refs in this container, including
  223. packed refs.
  224. """
  225. if base is not None:
  226. return self.subkeys(base)
  227. else:
  228. return self.allkeys()
  229. def subkeys(self, base):
  230. """Refs present in this container under a base.
  231. Args:
  232. base: The base to return refs under.
  233. Returns: A set of valid refs in this container under the base; the base
  234. prefix is stripped from the ref names returned.
  235. """
  236. keys = set()
  237. base_len = len(base) + 1
  238. for refname in self.allkeys():
  239. if refname.startswith(base):
  240. keys.add(refname[base_len:])
  241. return keys
  242. def as_dict(self, base=None) -> dict[Ref, ObjectID]:
  243. """Return the contents of this container as a dictionary."""
  244. ret = {}
  245. keys = self.keys(base)
  246. if base is None:
  247. base = b""
  248. else:
  249. base = base.rstrip(b"/")
  250. for key in keys:
  251. try:
  252. ret[key] = self[(base + b"/" + key).strip(b"/")]
  253. except (SymrefLoop, KeyError):
  254. continue # Unable to resolve
  255. return ret
  256. def _check_refname(self, name) -> None:
  257. """Ensure a refname is valid and lives in refs or is HEAD.
  258. HEAD is not a valid refname according to git-check-ref-format, but this
  259. class needs to be able to touch HEAD. Also, check_ref_format expects
  260. refnames without the leading 'refs/', but this class requires that
  261. so it cannot touch anything outside the refs dir (or HEAD).
  262. Args:
  263. name: The name of the reference.
  264. Raises:
  265. KeyError: if a refname is not HEAD or is otherwise not valid.
  266. """
  267. if name in (HEADREF, b"refs/stash"):
  268. return
  269. if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
  270. raise RefFormatError(name)
  271. def read_ref(self, refname):
  272. """Read a reference without following any references.
  273. Args:
  274. refname: The name of the reference
  275. Returns: The contents of the ref file, or None if it does
  276. not exist.
  277. """
  278. contents = self.read_loose_ref(refname)
  279. if not contents:
  280. contents = self.get_packed_refs().get(refname, None)
  281. return contents
  282. def read_loose_ref(self, name) -> bytes:
  283. """Read a loose reference and return its contents.
  284. Args:
  285. name: the refname to read
  286. Returns: The contents of the ref file, or None if it does
  287. not exist.
  288. """
  289. raise NotImplementedError(self.read_loose_ref)
  290. def follow(self, name) -> tuple[list[bytes], bytes]:
  291. """Follow a reference name.
  292. Returns: a tuple of (refnames, sha), wheres refnames are the names of
  293. references in the chain
  294. """
  295. contents = SYMREF + name
  296. depth = 0
  297. refnames = []
  298. while contents.startswith(SYMREF):
  299. refname = contents[len(SYMREF) :]
  300. refnames.append(refname)
  301. contents = self.read_ref(refname)
  302. if not contents:
  303. break
  304. depth += 1
  305. if depth > 5:
  306. raise SymrefLoop(name, depth)
  307. return refnames, contents
  308. def __contains__(self, refname) -> bool:
  309. """Check if a ref exists.
  310. Args:
  311. refname: Name of the ref to check
  312. Returns:
  313. True if the ref exists
  314. """
  315. if self.read_ref(refname):
  316. return True
  317. return False
  318. def __getitem__(self, name) -> ObjectID:
  319. """Get the SHA1 for a reference name.
  320. This method follows all symbolic references.
  321. """
  322. _, sha = self.follow(name)
  323. if sha is None:
  324. raise KeyError(name)
  325. return sha
  326. def set_if_equals(
  327. self,
  328. name,
  329. old_ref,
  330. new_ref,
  331. committer=None,
  332. timestamp=None,
  333. timezone=None,
  334. message=None,
  335. ) -> bool:
  336. """Set a refname to new_ref only if it currently equals old_ref.
  337. This method follows all symbolic references if applicable for the
  338. subclass, and can be used to perform an atomic compare-and-swap
  339. operation.
  340. Args:
  341. name: The refname to set.
  342. old_ref: The old sha the refname must refer to, or None to set
  343. unconditionally.
  344. new_ref: The new sha the refname will refer to.
  345. message: Message for reflog
  346. Returns: True if the set was successful, False otherwise.
  347. """
  348. raise NotImplementedError(self.set_if_equals)
  349. def add_if_new(
  350. self, name, ref, committer=None, timestamp=None, timezone=None, message=None
  351. ) -> bool:
  352. """Add a new reference only if it does not already exist.
  353. Args:
  354. name: Ref name
  355. ref: Ref value
  356. """
  357. raise NotImplementedError(self.add_if_new)
  358. def __setitem__(self, name, ref) -> None:
  359. """Set a reference name to point to the given SHA1.
  360. This method follows all symbolic references if applicable for the
  361. subclass.
  362. Note: This method unconditionally overwrites the contents of a
  363. reference. To update atomically only if the reference has not
  364. changed, use set_if_equals().
  365. Args:
  366. name: The refname to set.
  367. ref: The new sha the refname will refer to.
  368. """
  369. if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
  370. raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
  371. self.set_if_equals(name, None, ref)
  372. def remove_if_equals(
  373. self,
  374. name,
  375. old_ref,
  376. committer=None,
  377. timestamp=None,
  378. timezone=None,
  379. message=None,
  380. ) -> bool:
  381. """Remove a refname only if it currently equals old_ref.
  382. This method does not follow symbolic references, even if applicable for
  383. the subclass. It can be used to perform an atomic compare-and-delete
  384. operation.
  385. Args:
  386. name: The refname to delete.
  387. old_ref: The old sha the refname must refer to, or None to
  388. delete unconditionally.
  389. message: Message for reflog
  390. Returns: True if the delete was successful, False otherwise.
  391. """
  392. raise NotImplementedError(self.remove_if_equals)
  393. def __delitem__(self, name) -> None:
  394. """Remove a refname.
  395. This method does not follow symbolic references, even if applicable for
  396. the subclass.
  397. Note: This method unconditionally deletes the contents of a reference.
  398. To delete atomically only if the reference has not changed, use
  399. remove_if_equals().
  400. Args:
  401. name: The refname to delete.
  402. """
  403. self.remove_if_equals(name, None)
  404. def get_symrefs(self):
  405. """Get a dict with all symrefs in this container.
  406. Returns: Dictionary mapping source ref to target ref
  407. """
  408. ret = {}
  409. for src in self.allkeys():
  410. try:
  411. dst = parse_symref_value(self.read_ref(src))
  412. except ValueError:
  413. pass
  414. else:
  415. ret[src] = dst
  416. return ret
  417. def pack_refs(self, all: bool = False) -> None:
  418. """Pack loose refs into packed-refs file.
  419. Args:
  420. all: If True, pack all refs. If False, only pack tags.
  421. """
  422. raise NotImplementedError(self.pack_refs)
  423. class DictRefsContainer(RefsContainer):
  424. """RefsContainer backed by a simple dict.
  425. This container does not support symbolic or packed references and is not
  426. threadsafe.
  427. """
  428. def __init__(self, refs, logger=None) -> None:
  429. super().__init__(logger=logger)
  430. self._refs = refs
  431. self._peeled: dict[bytes, ObjectID] = {}
  432. self._watchers: set[Any] = set()
  433. def allkeys(self):
  434. """Get all ref names.
  435. Returns:
  436. All ref names in the container
  437. """
  438. return self._refs.keys()
  439. def read_loose_ref(self, name):
  440. """Read a reference from the refs dictionary.
  441. Args:
  442. name: The ref name to read
  443. Returns:
  444. The ref value or None if not found
  445. """
  446. return self._refs.get(name, None)
  447. def get_packed_refs(self):
  448. """Get packed refs (always empty for DictRefsContainer).
  449. Returns:
  450. Empty dictionary
  451. """
  452. return {}
  453. def _notify(self, ref, newsha) -> None:
  454. for watcher in self._watchers:
  455. watcher._notify((ref, newsha))
  456. def set_symbolic_ref(
  457. self,
  458. name: Ref,
  459. other: Ref,
  460. committer=None,
  461. timestamp=None,
  462. timezone=None,
  463. message=None,
  464. ) -> None:
  465. """Make a ref point at another ref.
  466. Args:
  467. name: Name of the ref to set
  468. other: Name of the ref to point at
  469. committer: Optional committer name for reflog
  470. timestamp: Optional timestamp for reflog
  471. timezone: Optional timezone for reflog
  472. message: Optional message for reflog
  473. """
  474. old = self.follow(name)[-1]
  475. new = SYMREF + other
  476. self._refs[name] = new
  477. self._notify(name, new)
  478. self._log(
  479. name,
  480. old,
  481. new,
  482. committer=committer,
  483. timestamp=timestamp,
  484. timezone=timezone,
  485. message=message,
  486. )
  487. def set_if_equals(
  488. self,
  489. name,
  490. old_ref,
  491. new_ref,
  492. committer=None,
  493. timestamp=None,
  494. timezone=None,
  495. message=None,
  496. ) -> bool:
  497. """Set a refname to new_ref only if it currently equals old_ref.
  498. This method follows all symbolic references, and can be used to perform
  499. an atomic compare-and-swap operation.
  500. Args:
  501. name: The refname to set.
  502. old_ref: The old sha the refname must refer to, or None to set
  503. unconditionally.
  504. new_ref: The new sha the refname will refer to.
  505. committer: Optional committer name for reflog
  506. timestamp: Optional timestamp for reflog
  507. timezone: Optional timezone for reflog
  508. message: Optional message for reflog
  509. Returns:
  510. True if the set was successful, False otherwise.
  511. """
  512. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  513. return False
  514. # Only update the specific ref requested, not the whole chain
  515. self._check_refname(name)
  516. old = self._refs.get(name)
  517. self._refs[name] = new_ref
  518. self._notify(name, new_ref)
  519. self._log(
  520. name,
  521. old,
  522. new_ref,
  523. committer=committer,
  524. timestamp=timestamp,
  525. timezone=timezone,
  526. message=message,
  527. )
  528. return True
  529. def add_if_new(
  530. self,
  531. name: Ref,
  532. ref: ObjectID,
  533. committer=None,
  534. timestamp=None,
  535. timezone=None,
  536. message: Optional[bytes] = None,
  537. ) -> bool:
  538. """Add a new reference only if it does not already exist.
  539. Args:
  540. name: Ref name
  541. ref: Ref value
  542. committer: Optional committer name for reflog
  543. timestamp: Optional timestamp for reflog
  544. timezone: Optional timezone for reflog
  545. message: Optional message for reflog
  546. Returns:
  547. True if the add was successful, False otherwise.
  548. """
  549. if name in self._refs:
  550. return False
  551. self._refs[name] = ref
  552. self._notify(name, ref)
  553. self._log(
  554. name,
  555. None,
  556. ref,
  557. committer=committer,
  558. timestamp=timestamp,
  559. timezone=timezone,
  560. message=message,
  561. )
  562. return True
  563. def remove_if_equals(
  564. self,
  565. name,
  566. old_ref,
  567. committer=None,
  568. timestamp=None,
  569. timezone=None,
  570. message=None,
  571. ) -> bool:
  572. """Remove a refname only if it currently equals old_ref.
  573. This method does not follow symbolic references. It can be used to
  574. perform an atomic compare-and-delete operation.
  575. Args:
  576. name: The refname to delete.
  577. old_ref: The old sha the refname must refer to, or None to
  578. delete unconditionally.
  579. committer: Optional committer name for reflog
  580. timestamp: Optional timestamp for reflog
  581. timezone: Optional timezone for reflog
  582. message: Optional message for reflog
  583. Returns:
  584. True if the delete was successful, False otherwise.
  585. """
  586. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  587. return False
  588. try:
  589. old = self._refs.pop(name)
  590. except KeyError:
  591. pass
  592. else:
  593. self._notify(name, None)
  594. self._log(
  595. name,
  596. old,
  597. None,
  598. committer=committer,
  599. timestamp=timestamp,
  600. timezone=timezone,
  601. message=message,
  602. )
  603. return True
  604. def get_peeled(self, name):
  605. """Get the peeled value of a ref.
  606. Args:
  607. name: Ref name to get peeled value for
  608. Returns:
  609. The peeled SHA or None if not available
  610. """
  611. return self._peeled.get(name)
  612. def _update(self, refs) -> None:
  613. """Update multiple refs; intended only for testing."""
  614. # TODO(dborowitz): replace this with a public function that uses
  615. # set_if_equal.
  616. for ref, sha in refs.items():
  617. self.set_if_equals(ref, None, sha)
  618. def _update_peeled(self, peeled) -> None:
  619. """Update cached peeled refs; intended only for testing."""
  620. self._peeled.update(peeled)
  621. class InfoRefsContainer(RefsContainer):
  622. """Refs container that reads refs from a info/refs file."""
  623. def __init__(self, f) -> None:
  624. """Initialize an InfoRefsContainer.
  625. Args:
  626. f: File-like object containing info/refs data
  627. """
  628. self._refs = {}
  629. self._peeled = {}
  630. refs = read_info_refs(f)
  631. (self._refs, self._peeled) = split_peeled_refs(refs)
  632. def allkeys(self):
  633. """Get all ref names.
  634. Returns:
  635. All ref names in the info/refs file
  636. """
  637. return self._refs.keys()
  638. def read_loose_ref(self, name):
  639. """Read a reference from the parsed info/refs.
  640. Args:
  641. name: The ref name to read
  642. Returns:
  643. The ref value or None if not found
  644. """
  645. return self._refs.get(name, None)
  646. def get_packed_refs(self):
  647. """Get packed refs (always empty for InfoRefsContainer).
  648. Returns:
  649. Empty dictionary
  650. """
  651. return {}
  652. def get_peeled(self, name):
  653. """Get the peeled value of a ref.
  654. Args:
  655. name: Ref name to get peeled value for
  656. Returns:
  657. The peeled SHA if available, otherwise the ref value itself
  658. Raises:
  659. KeyError: If the ref doesn't exist
  660. """
  661. try:
  662. return self._peeled[name]
  663. except KeyError:
  664. return self._refs[name]
  665. class DiskRefsContainer(RefsContainer):
  666. """Refs container that reads refs from disk."""
  667. def __init__(
  668. self,
  669. path: Union[str, bytes, os.PathLike],
  670. worktree_path: Optional[Union[str, bytes, os.PathLike]] = None,
  671. logger=None,
  672. ) -> None:
  673. super().__init__(logger=logger)
  674. # Convert path-like objects to strings, then to bytes for Git compatibility
  675. self.path = os.fsencode(os.fspath(path))
  676. if worktree_path is None:
  677. self.worktree_path = self.path
  678. else:
  679. self.worktree_path = os.fsencode(os.fspath(worktree_path))
  680. self._packed_refs = None
  681. self._peeled_refs = None
  682. def __repr__(self) -> str:
  683. """Return string representation of DiskRefsContainer."""
  684. return f"{self.__class__.__name__}({self.path!r})"
  685. def subkeys(self, base):
  686. """Get all ref names under a base ref.
  687. Args:
  688. base: Base ref path to search under
  689. Returns:
  690. Set of ref names under the base (without base prefix)
  691. """
  692. subkeys = set()
  693. path = self.refpath(base)
  694. for root, unused_dirs, files in os.walk(path):
  695. directory = root[len(path) :]
  696. if os.path.sep != "/":
  697. directory = directory.replace(os.fsencode(os.path.sep), b"/")
  698. directory = directory.strip(b"/")
  699. for filename in files:
  700. refname = b"/".join(([directory] if directory else []) + [filename])
  701. # check_ref_format requires at least one /, so we prepend the
  702. # base before calling it.
  703. if check_ref_format(base + b"/" + refname):
  704. subkeys.add(refname)
  705. for key in self.get_packed_refs():
  706. if key.startswith(base):
  707. subkeys.add(key[len(base) :].strip(b"/"))
  708. return subkeys
  709. def allkeys(self):
  710. """Get all ref names from disk.
  711. Returns:
  712. Set of all ref names (both loose and packed)
  713. """
  714. allkeys = set()
  715. if os.path.exists(self.refpath(HEADREF)):
  716. allkeys.add(HEADREF)
  717. path = self.refpath(b"")
  718. refspath = self.refpath(b"refs")
  719. for root, unused_dirs, files in os.walk(refspath):
  720. directory = root[len(path) :]
  721. if os.path.sep != "/":
  722. directory = directory.replace(os.fsencode(os.path.sep), b"/")
  723. for filename in files:
  724. refname = b"/".join([directory, filename])
  725. if check_ref_format(refname):
  726. allkeys.add(refname)
  727. allkeys.update(self.get_packed_refs())
  728. return allkeys
  729. def refpath(self, name):
  730. """Return the disk path of a ref."""
  731. if os.path.sep != "/":
  732. name = name.replace(b"/", os.fsencode(os.path.sep))
  733. # TODO: as the 'HEAD' reference is working tree specific, it
  734. # should actually not be a part of RefsContainer
  735. if name == HEADREF:
  736. return os.path.join(self.worktree_path, name)
  737. else:
  738. return os.path.join(self.path, name)
  739. def get_packed_refs(self):
  740. """Get contents of the packed-refs file.
  741. Returns: Dictionary mapping ref names to SHA1s
  742. Note: Will return an empty dictionary when no packed-refs file is
  743. present.
  744. """
  745. # TODO: invalidate the cache on repacking
  746. if self._packed_refs is None:
  747. # set both to empty because we want _peeled_refs to be
  748. # None if and only if _packed_refs is also None.
  749. self._packed_refs = {}
  750. self._peeled_refs = {}
  751. path = os.path.join(self.path, b"packed-refs")
  752. try:
  753. f = GitFile(path, "rb")
  754. except FileNotFoundError:
  755. return {}
  756. with f:
  757. first_line = next(iter(f)).rstrip()
  758. if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
  759. for sha, name, peeled in read_packed_refs_with_peeled(f):
  760. self._packed_refs[name] = sha
  761. if peeled:
  762. self._peeled_refs[name] = peeled
  763. else:
  764. f.seek(0)
  765. for sha, name in read_packed_refs(f):
  766. self._packed_refs[name] = sha
  767. return self._packed_refs
  768. def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
  769. """Add the given refs as packed refs.
  770. Args:
  771. new_refs: A mapping of ref names to targets; if a target is None that
  772. means remove the ref
  773. """
  774. if not new_refs:
  775. return
  776. path = os.path.join(self.path, b"packed-refs")
  777. with GitFile(path, "wb") as f:
  778. # reread cached refs from disk, while holding the lock
  779. packed_refs = self.get_packed_refs().copy()
  780. for ref, target in new_refs.items():
  781. # sanity check
  782. if ref == HEADREF:
  783. raise ValueError("cannot pack HEAD")
  784. # remove any loose refs pointing to this one -- please
  785. # note that this bypasses remove_if_equals as we don't
  786. # want to affect packed refs in here
  787. with suppress(OSError):
  788. os.remove(self.refpath(ref))
  789. if target is not None:
  790. packed_refs[ref] = target
  791. else:
  792. packed_refs.pop(ref, None)
  793. write_packed_refs(f, packed_refs, self._peeled_refs)
  794. self._packed_refs = packed_refs
  795. def get_peeled(self, name):
  796. """Return the cached peeled value of a ref, if available.
  797. Args:
  798. name: Name of the ref to peel
  799. Returns: The peeled value of the ref. If the ref is known not point to
  800. a tag, this will be the SHA the ref refers to. If the ref may point
  801. to a tag, but no cached information is available, None is returned.
  802. """
  803. self.get_packed_refs()
  804. if self._peeled_refs is None or name not in self._packed_refs:
  805. # No cache: no peeled refs were read, or this ref is loose
  806. return None
  807. if name in self._peeled_refs:
  808. return self._peeled_refs[name]
  809. else:
  810. # Known not peelable
  811. return self[name]
  812. def read_loose_ref(self, name):
  813. """Read a reference file and return its contents.
  814. If the reference file a symbolic reference, only read the first line of
  815. the file. Otherwise, only read the first 40 bytes.
  816. Args:
  817. name: the refname to read, relative to refpath
  818. Returns: The contents of the ref file, or None if the file does not
  819. exist.
  820. Raises:
  821. IOError: if any other error occurs
  822. """
  823. filename = self.refpath(name)
  824. try:
  825. with GitFile(filename, "rb") as f:
  826. header = f.read(len(SYMREF))
  827. if header == SYMREF:
  828. # Read only the first line
  829. return header + next(iter(f)).rstrip(b"\r\n")
  830. else:
  831. # Read only the first 40 bytes
  832. return header + f.read(40 - len(SYMREF))
  833. except (OSError, UnicodeError):
  834. # don't assume anything specific about the error; in
  835. # particular, invalid or forbidden paths can raise weird
  836. # errors depending on the specific operating system
  837. return None
  838. def _remove_packed_ref(self, name) -> None:
  839. if self._packed_refs is None:
  840. return
  841. filename = os.path.join(self.path, b"packed-refs")
  842. # reread cached refs from disk, while holding the lock
  843. f = GitFile(filename, "wb")
  844. try:
  845. self._packed_refs = None
  846. self.get_packed_refs()
  847. if name not in self._packed_refs:
  848. f.abort()
  849. return
  850. del self._packed_refs[name]
  851. with suppress(KeyError):
  852. del self._peeled_refs[name]
  853. write_packed_refs(f, self._packed_refs, self._peeled_refs)
  854. f.close()
  855. except BaseException:
  856. f.abort()
  857. raise
  858. def set_symbolic_ref(
  859. self,
  860. name,
  861. other,
  862. committer=None,
  863. timestamp=None,
  864. timezone=None,
  865. message=None,
  866. ) -> None:
  867. """Make a ref point at another ref.
  868. Args:
  869. name: Name of the ref to set
  870. other: Name of the ref to point at
  871. message: Optional message to describe the change
  872. """
  873. self._check_refname(name)
  874. self._check_refname(other)
  875. filename = self.refpath(name)
  876. f = GitFile(filename, "wb")
  877. try:
  878. f.write(SYMREF + other + b"\n")
  879. sha = self.follow(name)[-1]
  880. self._log(
  881. name,
  882. sha,
  883. sha,
  884. committer=committer,
  885. timestamp=timestamp,
  886. timezone=timezone,
  887. message=message,
  888. )
  889. except BaseException:
  890. f.abort()
  891. raise
  892. else:
  893. f.close()
  894. def set_if_equals(
  895. self,
  896. name,
  897. old_ref,
  898. new_ref,
  899. committer=None,
  900. timestamp=None,
  901. timezone=None,
  902. message=None,
  903. ) -> bool:
  904. """Set a refname to new_ref only if it currently equals old_ref.
  905. This method follows all symbolic references, and can be used to perform
  906. an atomic compare-and-swap operation.
  907. Args:
  908. name: The refname to set.
  909. old_ref: The old sha the refname must refer to, or None to set
  910. unconditionally.
  911. new_ref: The new sha the refname will refer to.
  912. message: Set message for reflog
  913. Returns: True if the set was successful, False otherwise.
  914. """
  915. self._check_refname(name)
  916. try:
  917. realnames, _ = self.follow(name)
  918. realname = realnames[-1]
  919. except (KeyError, IndexError, SymrefLoop):
  920. realname = name
  921. filename = self.refpath(realname)
  922. # make sure none of the ancestor folders is in packed refs
  923. probe_ref = os.path.dirname(realname)
  924. packed_refs = self.get_packed_refs()
  925. while probe_ref:
  926. if packed_refs.get(probe_ref, None) is not None:
  927. raise NotADirectoryError(filename)
  928. probe_ref = os.path.dirname(probe_ref)
  929. ensure_dir_exists(os.path.dirname(filename))
  930. with GitFile(filename, "wb") as f:
  931. if old_ref is not None:
  932. try:
  933. # read again while holding the lock to handle race conditions
  934. orig_ref = self.read_loose_ref(realname)
  935. if orig_ref is None:
  936. orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
  937. if orig_ref != old_ref:
  938. f.abort()
  939. return False
  940. except OSError:
  941. f.abort()
  942. raise
  943. # Check if ref already has the desired value while holding the lock
  944. # This avoids fsync when ref is unchanged but still detects lock conflicts
  945. current_ref = self.read_loose_ref(realname)
  946. if current_ref is None:
  947. current_ref = packed_refs.get(realname, None)
  948. if current_ref is not None and current_ref == new_ref:
  949. # Ref already has desired value, abort write to avoid fsync
  950. f.abort()
  951. return True
  952. try:
  953. f.write(new_ref + b"\n")
  954. except OSError:
  955. f.abort()
  956. raise
  957. self._log(
  958. realname,
  959. old_ref,
  960. new_ref,
  961. committer=committer,
  962. timestamp=timestamp,
  963. timezone=timezone,
  964. message=message,
  965. )
  966. return True
  967. def add_if_new(
  968. self,
  969. name: bytes,
  970. ref: bytes,
  971. committer=None,
  972. timestamp=None,
  973. timezone=None,
  974. message: Optional[bytes] = None,
  975. ) -> bool:
  976. """Add a new reference only if it does not already exist.
  977. This method follows symrefs, and only ensures that the last ref in the
  978. chain does not exist.
  979. Args:
  980. name: The refname to set.
  981. ref: The new sha the refname will refer to.
  982. message: Optional message for reflog
  983. Returns: True if the add was successful, False otherwise.
  984. """
  985. try:
  986. realnames, contents = self.follow(name)
  987. if contents is not None:
  988. return False
  989. realname = realnames[-1]
  990. except (KeyError, IndexError):
  991. realname = name
  992. self._check_refname(realname)
  993. filename = self.refpath(realname)
  994. ensure_dir_exists(os.path.dirname(filename))
  995. with GitFile(filename, "wb") as f:
  996. if os.path.exists(filename) or name in self.get_packed_refs():
  997. f.abort()
  998. return False
  999. try:
  1000. f.write(ref + b"\n")
  1001. except OSError:
  1002. f.abort()
  1003. raise
  1004. else:
  1005. self._log(
  1006. name,
  1007. None,
  1008. ref,
  1009. committer=committer,
  1010. timestamp=timestamp,
  1011. timezone=timezone,
  1012. message=message,
  1013. )
  1014. return True
  1015. def remove_if_equals(
  1016. self,
  1017. name,
  1018. old_ref,
  1019. committer=None,
  1020. timestamp=None,
  1021. timezone=None,
  1022. message=None,
  1023. ) -> bool:
  1024. """Remove a refname only if it currently equals old_ref.
  1025. This method does not follow symbolic references. It can be used to
  1026. perform an atomic compare-and-delete operation.
  1027. Args:
  1028. name: The refname to delete.
  1029. old_ref: The old sha the refname must refer to, or None to
  1030. delete unconditionally.
  1031. message: Optional message
  1032. Returns: True if the delete was successful, False otherwise.
  1033. """
  1034. self._check_refname(name)
  1035. filename = self.refpath(name)
  1036. ensure_dir_exists(os.path.dirname(filename))
  1037. f = GitFile(filename, "wb")
  1038. try:
  1039. if old_ref is not None:
  1040. orig_ref = self.read_loose_ref(name)
  1041. if orig_ref is None:
  1042. orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
  1043. if orig_ref != old_ref:
  1044. return False
  1045. # remove the reference file itself
  1046. try:
  1047. found = os.path.lexists(filename)
  1048. except OSError:
  1049. # may only be packed, or otherwise unstorable
  1050. found = False
  1051. if found:
  1052. os.remove(filename)
  1053. self._remove_packed_ref(name)
  1054. self._log(
  1055. name,
  1056. old_ref,
  1057. None,
  1058. committer=committer,
  1059. timestamp=timestamp,
  1060. timezone=timezone,
  1061. message=message,
  1062. )
  1063. finally:
  1064. # never write, we just wanted the lock
  1065. f.abort()
  1066. # outside of the lock, clean-up any parent directory that might now
  1067. # be empty. this ensures that re-creating a reference of the same
  1068. # name of what was previously a directory works as expected
  1069. parent = name
  1070. while True:
  1071. try:
  1072. parent, _ = parent.rsplit(b"/", 1)
  1073. except ValueError:
  1074. break
  1075. if parent == b"refs":
  1076. break
  1077. parent_filename = self.refpath(parent)
  1078. try:
  1079. os.rmdir(parent_filename)
  1080. except OSError:
  1081. # this can be caused by the parent directory being
  1082. # removed by another process, being not empty, etc.
  1083. # in any case, this is non fatal because we already
  1084. # removed the reference, just ignore it
  1085. break
  1086. return True
  1087. def pack_refs(self, all: bool = False) -> None:
  1088. """Pack loose refs into packed-refs file.
  1089. Args:
  1090. all: If True, pack all refs. If False, only pack tags.
  1091. """
  1092. refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
  1093. for ref in self.allkeys():
  1094. if ref == HEADREF:
  1095. # Never pack HEAD
  1096. continue
  1097. if all or ref.startswith(LOCAL_TAG_PREFIX):
  1098. try:
  1099. sha = self[ref]
  1100. if sha:
  1101. refs_to_pack[ref] = sha
  1102. except KeyError:
  1103. # Broken ref, skip it
  1104. pass
  1105. if refs_to_pack:
  1106. self.add_packed_refs(refs_to_pack)
  1107. def _split_ref_line(line):
  1108. """Split a single ref line into a tuple of SHA1 and name."""
  1109. fields = line.rstrip(b"\n\r").split(b" ")
  1110. if len(fields) != 2:
  1111. raise PackedRefsException(f"invalid ref line {line!r}")
  1112. sha, name = fields
  1113. if not valid_hexsha(sha):
  1114. raise PackedRefsException(f"Invalid hex sha {sha!r}")
  1115. if not check_ref_format(name):
  1116. raise PackedRefsException(f"invalid ref name {name!r}")
  1117. return (sha, name)
  1118. def read_packed_refs(f):
  1119. """Read a packed refs file.
  1120. Args:
  1121. f: file-like object to read from
  1122. Returns: Iterator over tuples with SHA1s and ref names.
  1123. """
  1124. for line in f:
  1125. if line.startswith(b"#"):
  1126. # Comment
  1127. continue
  1128. if line.startswith(b"^"):
  1129. raise PackedRefsException("found peeled ref in packed-refs without peeled")
  1130. yield _split_ref_line(line)
  1131. def read_packed_refs_with_peeled(f):
  1132. """Read a packed refs file including peeled refs.
  1133. Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
  1134. with ref names, SHA1s, and peeled SHA1s (or None).
  1135. Args:
  1136. f: file-like object to read from, seek'ed to the second line
  1137. """
  1138. last = None
  1139. for line in f:
  1140. if line[0] == b"#":
  1141. continue
  1142. line = line.rstrip(b"\r\n")
  1143. if line.startswith(b"^"):
  1144. if not last:
  1145. raise PackedRefsException("unexpected peeled ref line")
  1146. if not valid_hexsha(line[1:]):
  1147. raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
  1148. sha, name = _split_ref_line(last)
  1149. last = None
  1150. yield (sha, name, line[1:])
  1151. else:
  1152. if last:
  1153. sha, name = _split_ref_line(last)
  1154. yield (sha, name, None)
  1155. last = line
  1156. if last:
  1157. sha, name = _split_ref_line(last)
  1158. yield (sha, name, None)
  1159. def write_packed_refs(f, packed_refs, peeled_refs=None) -> None:
  1160. """Write a packed refs file.
  1161. Args:
  1162. f: empty file-like object to write to
  1163. packed_refs: dict of refname to sha of packed refs to write
  1164. peeled_refs: dict of refname to peeled value of sha
  1165. """
  1166. if peeled_refs is None:
  1167. peeled_refs = {}
  1168. else:
  1169. f.write(b"# pack-refs with: peeled\n")
  1170. for refname in sorted(packed_refs.keys()):
  1171. f.write(git_line(packed_refs[refname], refname))
  1172. if refname in peeled_refs:
  1173. f.write(b"^" + peeled_refs[refname] + b"\n")
  1174. def read_info_refs(f):
  1175. """Read info/refs file.
  1176. Args:
  1177. f: File-like object to read from
  1178. Returns:
  1179. Dictionary mapping ref names to SHA1s
  1180. """
  1181. ret = {}
  1182. for line in f.readlines():
  1183. (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
  1184. ret[name] = sha
  1185. return ret
  1186. def write_info_refs(refs, store: ObjectContainer):
  1187. """Generate info refs."""
  1188. # TODO: Avoid recursive import :(
  1189. from .object_store import peel_sha
  1190. for name, sha in sorted(refs.items()):
  1191. # get_refs() includes HEAD as a special case, but we don't want to
  1192. # advertise it
  1193. if name == HEADREF:
  1194. continue
  1195. try:
  1196. o = store[sha]
  1197. except KeyError:
  1198. continue
  1199. unpeeled, peeled = peel_sha(store, sha)
  1200. yield o.id + b"\t" + name + b"\n"
  1201. if o.id != peeled.id:
  1202. yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
  1203. def is_local_branch(x):
  1204. """Check if a ref name refers to a local branch.
  1205. Args:
  1206. x: Ref name to check
  1207. Returns:
  1208. True if ref is a local branch (refs/heads/...)
  1209. """
  1210. return x.startswith(LOCAL_BRANCH_PREFIX)
  1211. def strip_peeled_refs(refs):
  1212. """Remove all peeled refs."""
  1213. return {
  1214. ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
  1215. }
  1216. def split_peeled_refs(refs):
  1217. """Split peeled refs from regular refs."""
  1218. peeled = {}
  1219. regular = {}
  1220. for ref, sha in refs.items():
  1221. if ref.endswith(PEELED_TAG_SUFFIX):
  1222. peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
  1223. else:
  1224. regular[ref] = sha
  1225. return regular, peeled
  1226. def _set_origin_head(refs, origin, origin_head) -> None:
  1227. # set refs/remotes/origin/HEAD
  1228. origin_base = b"refs/remotes/" + origin + b"/"
  1229. if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1230. origin_ref = origin_base + HEADREF
  1231. target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
  1232. if target_ref in refs:
  1233. refs.set_symbolic_ref(origin_ref, target_ref)
  1234. def _set_default_branch(
  1235. refs: RefsContainer,
  1236. origin: bytes,
  1237. origin_head: Optional[bytes],
  1238. branch: bytes,
  1239. ref_message: Optional[bytes],
  1240. ) -> bytes:
  1241. """Set the default branch."""
  1242. origin_base = b"refs/remotes/" + origin + b"/"
  1243. if branch:
  1244. origin_ref = origin_base + branch
  1245. if origin_ref in refs:
  1246. local_ref = LOCAL_BRANCH_PREFIX + branch
  1247. refs.add_if_new(local_ref, refs[origin_ref], ref_message)
  1248. head_ref = local_ref
  1249. elif LOCAL_TAG_PREFIX + branch in refs:
  1250. head_ref = LOCAL_TAG_PREFIX + branch
  1251. else:
  1252. raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
  1253. elif origin_head:
  1254. head_ref = origin_head
  1255. if origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1256. origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
  1257. else:
  1258. origin_ref = origin_head
  1259. try:
  1260. refs.add_if_new(head_ref, refs[origin_ref], ref_message)
  1261. except KeyError:
  1262. pass
  1263. else:
  1264. raise ValueError("neither origin_head nor branch are provided")
  1265. return head_ref
  1266. def _set_head(refs, head_ref, ref_message):
  1267. if head_ref.startswith(LOCAL_TAG_PREFIX):
  1268. # detach HEAD at specified tag
  1269. head = refs[head_ref]
  1270. if isinstance(head, Tag):
  1271. _cls, obj = head.object
  1272. head = obj.get_object(obj).id
  1273. del refs[HEADREF]
  1274. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1275. else:
  1276. # set HEAD to specific branch
  1277. try:
  1278. head = refs[head_ref]
  1279. refs.set_symbolic_ref(HEADREF, head_ref)
  1280. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1281. except KeyError:
  1282. head = None
  1283. return head
  1284. def _import_remote_refs(
  1285. refs_container: RefsContainer,
  1286. remote_name: str,
  1287. refs: dict[str, str],
  1288. message: Optional[bytes] = None,
  1289. prune: bool = False,
  1290. prune_tags: bool = False,
  1291. ) -> None:
  1292. stripped_refs = strip_peeled_refs(refs)
  1293. branches = {
  1294. n[len(LOCAL_BRANCH_PREFIX) :]: v
  1295. for (n, v) in stripped_refs.items()
  1296. if n.startswith(LOCAL_BRANCH_PREFIX)
  1297. }
  1298. refs_container.import_refs(
  1299. b"refs/remotes/" + remote_name.encode(),
  1300. branches,
  1301. message=message,
  1302. prune=prune,
  1303. )
  1304. tags = {
  1305. n[len(LOCAL_TAG_PREFIX) :]: v
  1306. for (n, v) in stripped_refs.items()
  1307. if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
  1308. }
  1309. refs_container.import_refs(
  1310. LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
  1311. )
  1312. def serialize_refs(store, refs):
  1313. """Serialize refs with peeled refs.
  1314. Args:
  1315. store: Object store to peel refs from
  1316. refs: Dictionary of ref names to SHAs
  1317. Returns:
  1318. Dictionary with refs and peeled refs (marked with ^{})
  1319. """
  1320. # TODO: Avoid recursive import :(
  1321. from .object_store import peel_sha
  1322. ret = {}
  1323. for ref, sha in refs.items():
  1324. try:
  1325. unpeeled, peeled = peel_sha(store, sha)
  1326. except KeyError:
  1327. warnings.warn(
  1328. "ref {} points at non-present sha {}".format(
  1329. ref.decode("utf-8", "replace"), sha.decode("ascii")
  1330. ),
  1331. UserWarning,
  1332. )
  1333. continue
  1334. else:
  1335. if isinstance(unpeeled, Tag):
  1336. ret[ref + PEELED_TAG_SUFFIX] = peeled.id
  1337. ret[ref] = unpeeled.id
  1338. return ret
  1339. class locked_ref:
  1340. """Lock a ref while making modifications.
  1341. Works as a context manager.
  1342. """
  1343. def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
  1344. """Initialize a locked ref.
  1345. Args:
  1346. refs_container: The DiskRefsContainer to lock the ref in
  1347. refname: The ref name to lock
  1348. """
  1349. self._refs_container = refs_container
  1350. self._refname = refname
  1351. self._file: Optional[_GitFile] = None
  1352. self._realname: Optional[Ref] = None
  1353. self._deleted = False
  1354. def __enter__(self) -> "locked_ref":
  1355. """Enter the context manager and acquire the lock.
  1356. Returns:
  1357. This locked_ref instance
  1358. Raises:
  1359. OSError: If the lock cannot be acquired
  1360. """
  1361. self._refs_container._check_refname(self._refname)
  1362. try:
  1363. realnames, _ = self._refs_container.follow(self._refname)
  1364. self._realname = realnames[-1]
  1365. except (KeyError, IndexError, SymrefLoop):
  1366. self._realname = self._refname
  1367. filename = self._refs_container.refpath(self._realname)
  1368. ensure_dir_exists(os.path.dirname(filename))
  1369. f = GitFile(filename, "wb")
  1370. self._file = f
  1371. return self
  1372. def __exit__(
  1373. self,
  1374. exc_type: Optional[type],
  1375. exc_value: Optional[BaseException],
  1376. traceback: Optional[types.TracebackType],
  1377. ) -> None:
  1378. """Exit the context manager and release the lock.
  1379. Args:
  1380. exc_type: Type of exception if one occurred
  1381. exc_value: Exception instance if one occurred
  1382. traceback: Traceback if an exception occurred
  1383. """
  1384. if self._file:
  1385. if exc_type is not None or self._deleted:
  1386. self._file.abort()
  1387. else:
  1388. self._file.close()
  1389. def get(self) -> Optional[bytes]:
  1390. """Get the current value of the ref."""
  1391. if not self._file:
  1392. raise RuntimeError("locked_ref not in context")
  1393. current_ref = self._refs_container.read_loose_ref(self._realname)
  1394. if current_ref is None:
  1395. current_ref = self._refs_container.get_packed_refs().get(
  1396. self._realname, None
  1397. )
  1398. return current_ref
  1399. def ensure_equals(self, expected_value: Optional[bytes]) -> bool:
  1400. """Ensure the ref currently equals the expected value.
  1401. Args:
  1402. expected_value: The expected current value of the ref
  1403. Returns:
  1404. True if the ref equals the expected value, False otherwise
  1405. """
  1406. current_value = self.get()
  1407. return current_value == expected_value
  1408. def set(self, new_ref: bytes) -> None:
  1409. """Set the ref to a new value.
  1410. Args:
  1411. new_ref: The new SHA1 or symbolic ref value
  1412. """
  1413. if not self._file:
  1414. raise RuntimeError("locked_ref not in context")
  1415. if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
  1416. raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
  1417. self._file.seek(0)
  1418. self._file.truncate()
  1419. self._file.write(new_ref + b"\n")
  1420. self._deleted = False
  1421. def set_symbolic_ref(self, target: Ref) -> None:
  1422. """Make this ref point at another ref.
  1423. Args:
  1424. target: Name of the ref to point at
  1425. """
  1426. if not self._file:
  1427. raise RuntimeError("locked_ref not in context")
  1428. self._refs_container._check_refname(target)
  1429. self._file.seek(0)
  1430. self._file.truncate()
  1431. self._file.write(SYMREF + target + b"\n")
  1432. self._deleted = False
  1433. def delete(self) -> None:
  1434. """Delete the ref file while holding the lock."""
  1435. if not self._file:
  1436. raise RuntimeError("locked_ref not in context")
  1437. # Delete the actual ref file while holding the lock
  1438. if self._realname:
  1439. filename = self._refs_container.refpath(self._realname)
  1440. try:
  1441. if os.path.lexists(filename):
  1442. os.remove(filename)
  1443. except FileNotFoundError:
  1444. pass
  1445. self._refs_container._remove_packed_ref(self._realname)
  1446. self._deleted = True