2
0

refs.py 57 KB


  1. # refs.py -- For dealing with git refs
  2. # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Ref handling."""
  22. import os
  23. import types
  24. import warnings
  25. from collections.abc import Iterable, Iterator
  26. from contextlib import suppress
  27. from typing import (
  28. IO,
  29. TYPE_CHECKING,
  30. Any,
  31. BinaryIO,
  32. Callable,
  33. Optional,
  34. TypeVar,
  35. Union,
  36. cast,
  37. )
  38. if TYPE_CHECKING:
  39. from .file import _GitFile
  40. from .errors import PackedRefsException, RefFormatError
  41. from .file import GitFile, ensure_dir_exists
  42. from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
  43. from .pack import ObjectContainer
  44. Ref = bytes
  45. HEADREF = b"HEAD"
  46. SYMREF = b"ref: "
  47. LOCAL_BRANCH_PREFIX = b"refs/heads/"
  48. LOCAL_TAG_PREFIX = b"refs/tags/"
  49. LOCAL_REMOTE_PREFIX = b"refs/remotes/"
  50. LOCAL_NOTES_PREFIX = b"refs/notes/"
  51. BAD_REF_CHARS = set(b"\177 ~^:?*[")
  52. PEELED_TAG_SUFFIX = b"^{}"
  53. # For backwards compatibility
  54. ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
  55. class SymrefLoop(Exception):
  56. """There is a loop between one or more symrefs."""
  57. def __init__(self, ref: bytes, depth: int) -> None:
  58. """Initialize SymrefLoop exception."""
  59. self.ref = ref
  60. self.depth = depth
  61. def parse_symref_value(contents: bytes) -> bytes:
  62. """Parse a symref value.
  63. Args:
  64. contents: Contents to parse
  65. Returns: Destination
  66. """
  67. if contents.startswith(SYMREF):
  68. return contents[len(SYMREF) :].rstrip(b"\r\n")
  69. raise ValueError(contents)
  70. def check_ref_format(refname: Ref) -> bool:
  71. """Check if a refname is correctly formatted.
  72. Implements all the same rules as git-check-ref-format[1].
  73. [1]
  74. http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
  75. Args:
  76. refname: The refname to check
  77. Returns: True if refname is valid, False otherwise
  78. """
  79. # These could be combined into one big expression, but are listed
  80. # separately to parallel [1].
  81. if b"/." in refname or refname.startswith(b"."):
  82. return False
  83. if b"/" not in refname:
  84. return False
  85. if b".." in refname:
  86. return False
  87. for i, c in enumerate(refname):
  88. if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
  89. return False
  90. if refname[-1] in b"/.":
  91. return False
  92. if refname.endswith(b".lock"):
  93. return False
  94. if b"@{" in refname:
  95. return False
  96. if b"\\" in refname:
  97. return False
  98. return True
  99. def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
  100. """Parse a remote ref into remote name and branch name.
  101. Args:
  102. ref: Remote ref like b"refs/remotes/origin/main"
  103. Returns:
  104. Tuple of (remote_name, branch_name)
  105. Raises:
  106. ValueError: If ref is not a valid remote ref
  107. """
  108. if not ref.startswith(LOCAL_REMOTE_PREFIX):
  109. raise ValueError(f"Not a remote ref: {ref!r}")
  110. # Remove the prefix
  111. remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
  112. # Split into remote name and branch name
  113. parts = remainder.split(b"/", 1)
  114. if len(parts) != 2:
  115. raise ValueError(f"Invalid remote ref format: {ref!r}")
  116. remote_name, branch_name = parts
  117. return (remote_name, branch_name)
  118. class RefsContainer:
  119. """A container for refs."""
  120. def __init__(
  121. self,
  122. logger: Optional[
  123. Callable[
  124. [
  125. bytes,
  126. Optional[bytes],
  127. Optional[bytes],
  128. Optional[bytes],
  129. Optional[int],
  130. Optional[int],
  131. Optional[bytes],
  132. ],
  133. None,
  134. ]
  135. ] = None,
  136. ) -> None:
  137. """Initialize RefsContainer with optional logger function."""
  138. self._logger = logger
  139. def _log(
  140. self,
  141. ref: bytes,
  142. old_sha: Optional[bytes],
  143. new_sha: Optional[bytes],
  144. committer: Optional[bytes] = None,
  145. timestamp: Optional[int] = None,
  146. timezone: Optional[int] = None,
  147. message: Optional[bytes] = None,
  148. ) -> None:
  149. if self._logger is None:
  150. return
  151. if message is None:
  152. return
  153. self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
  154. def set_symbolic_ref(
  155. self,
  156. name: bytes,
  157. other: bytes,
  158. committer: Optional[bytes] = None,
  159. timestamp: Optional[int] = None,
  160. timezone: Optional[int] = None,
  161. message: Optional[bytes] = None,
  162. ) -> None:
  163. """Make a ref point at another ref.
  164. Args:
  165. name: Name of the ref to set
  166. other: Name of the ref to point at
  167. committer: Optional committer name/email
  168. timestamp: Optional timestamp
  169. timezone: Optional timezone
  170. message: Optional message
  171. """
  172. raise NotImplementedError(self.set_symbolic_ref)
  173. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  174. """Get contents of the packed-refs file.
  175. Returns: Dictionary mapping ref names to SHA1s
  176. Note: Will return an empty dictionary when no packed-refs file is
  177. present.
  178. """
  179. raise NotImplementedError(self.get_packed_refs)
  180. def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
  181. """Add the given refs as packed refs.
  182. Args:
  183. new_refs: A mapping of ref names to targets; if a target is None that
  184. means remove the ref
  185. """
  186. raise NotImplementedError(self.add_packed_refs)
  187. def get_peeled(self, name: bytes) -> Optional[ObjectID]:
  188. """Return the cached peeled value of a ref, if available.
  189. Args:
  190. name: Name of the ref to peel
  191. Returns: The peeled value of the ref. If the ref is known not point to
  192. a tag, this will be the SHA the ref refers to. If the ref may point
  193. to a tag, but no cached information is available, None is returned.
  194. """
  195. return None
  196. def import_refs(
  197. self,
  198. base: Ref,
  199. other: dict[Ref, ObjectID],
  200. committer: Optional[bytes] = None,
  201. timestamp: Optional[bytes] = None,
  202. timezone: Optional[bytes] = None,
  203. message: Optional[bytes] = None,
  204. prune: bool = False,
  205. ) -> None:
  206. """Import refs from another repository.
  207. Args:
  208. base: Base ref to import into (e.g., b'refs/remotes/origin')
  209. other: Dictionary of refs to import
  210. committer: Optional committer for reflog
  211. timestamp: Optional timestamp for reflog
  212. timezone: Optional timezone for reflog
  213. message: Optional message for reflog
  214. prune: If True, remove refs not in other
  215. """
  216. if prune:
  217. to_delete = set(self.subkeys(base))
  218. else:
  219. to_delete = set()
  220. for name, value in other.items():
  221. if value is None:
  222. to_delete.add(name)
  223. else:
  224. self.set_if_equals(
  225. b"/".join((base, name)), None, value, message=message
  226. )
  227. if to_delete:
  228. try:
  229. to_delete.remove(name)
  230. except KeyError:
  231. pass
  232. for ref in to_delete:
  233. self.remove_if_equals(b"/".join((base, ref)), None, message=message)
  234. def allkeys(self) -> set[Ref]:
  235. """All refs present in this container."""
  236. raise NotImplementedError(self.allkeys)
  237. def __iter__(self) -> Iterator[Ref]:
  238. """Iterate over all reference keys."""
  239. return iter(self.allkeys())
  240. def keys(self, base=None):
  241. """Refs present in this container.
  242. Args:
  243. base: An optional base to return refs under.
  244. Returns: An unsorted set of valid refs in this container, including
  245. packed refs.
  246. """
  247. if base is not None:
  248. return self.subkeys(base)
  249. else:
  250. return self.allkeys()
  251. def subkeys(self, base: bytes) -> set[bytes]:
  252. """Refs present in this container under a base.
  253. Args:
  254. base: The base to return refs under.
  255. Returns: A set of valid refs in this container under the base; the base
  256. prefix is stripped from the ref names returned.
  257. """
  258. keys = set()
  259. base_len = len(base) + 1
  260. for refname in self.allkeys():
  261. if refname.startswith(base):
  262. keys.add(refname[base_len:])
  263. return keys
  264. def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]:
  265. """Return the contents of this container as a dictionary."""
  266. ret = {}
  267. keys = self.keys(base)
  268. if base is None:
  269. base = b""
  270. else:
  271. base = base.rstrip(b"/")
  272. for key in keys:
  273. try:
  274. ret[key] = self[(base + b"/" + key).strip(b"/")]
  275. except (SymrefLoop, KeyError):
  276. continue # Unable to resolve
  277. return ret
  278. def _check_refname(self, name: bytes) -> None:
  279. """Ensure a refname is valid and lives in refs or is HEAD.
  280. HEAD is not a valid refname according to git-check-ref-format, but this
  281. class needs to be able to touch HEAD. Also, check_ref_format expects
  282. refnames without the leading 'refs/', but this class requires that
  283. so it cannot touch anything outside the refs dir (or HEAD).
  284. Args:
  285. name: The name of the reference.
  286. Raises:
  287. KeyError: if a refname is not HEAD or is otherwise not valid.
  288. """
  289. if name in (HEADREF, b"refs/stash"):
  290. return
  291. if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
  292. raise RefFormatError(name)
  293. def read_ref(self, refname: bytes) -> Optional[bytes]:
  294. """Read a reference without following any references.
  295. Args:
  296. refname: The name of the reference
  297. Returns: The contents of the ref file, or None if it does
  298. not exist.
  299. """
  300. contents = self.read_loose_ref(refname)
  301. if not contents:
  302. contents = self.get_packed_refs().get(refname, None)
  303. return contents
  304. def read_loose_ref(self, name: bytes) -> Optional[bytes]:
  305. """Read a loose reference and return its contents.
  306. Args:
  307. name: the refname to read
  308. Returns: The contents of the ref file, or None if it does
  309. not exist.
  310. """
  311. raise NotImplementedError(self.read_loose_ref)
  312. def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]:
  313. """Follow a reference name.
  314. Returns: a tuple of (refnames, sha), wheres refnames are the names of
  315. references in the chain
  316. """
  317. contents: Optional[bytes] = SYMREF + name
  318. depth = 0
  319. refnames = []
  320. while contents and contents.startswith(SYMREF):
  321. refname = contents[len(SYMREF) :]
  322. refnames.append(refname)
  323. contents = self.read_ref(refname)
  324. if not contents:
  325. break
  326. depth += 1
  327. if depth > 5:
  328. raise SymrefLoop(name, depth)
  329. return refnames, contents
  330. def __contains__(self, refname: bytes) -> bool:
  331. """Check if a reference exists."""
  332. if self.read_ref(refname):
  333. return True
  334. return False
  335. def __getitem__(self, name: bytes) -> ObjectID:
  336. """Get the SHA1 for a reference name.
  337. This method follows all symbolic references.
  338. """
  339. _, sha = self.follow(name)
  340. if sha is None:
  341. raise KeyError(name)
  342. return sha
  343. def set_if_equals(
  344. self,
  345. name: bytes,
  346. old_ref: Optional[bytes],
  347. new_ref: bytes,
  348. committer: Optional[bytes] = None,
  349. timestamp: Optional[int] = None,
  350. timezone: Optional[int] = None,
  351. message: Optional[bytes] = None,
  352. ) -> bool:
  353. """Set a refname to new_ref only if it currently equals old_ref.
  354. This method follows all symbolic references if applicable for the
  355. subclass, and can be used to perform an atomic compare-and-swap
  356. operation.
  357. Args:
  358. name: The refname to set.
  359. old_ref: The old sha the refname must refer to, or None to set
  360. unconditionally.
  361. new_ref: The new sha the refname will refer to.
  362. committer: Optional committer name/email
  363. timestamp: Optional timestamp
  364. timezone: Optional timezone
  365. message: Message for reflog
  366. Returns: True if the set was successful, False otherwise.
  367. """
  368. raise NotImplementedError(self.set_if_equals)
  369. def add_if_new(
  370. self,
  371. name: bytes,
  372. ref: bytes,
  373. committer: Optional[bytes] = None,
  374. timestamp: Optional[int] = None,
  375. timezone: Optional[int] = None,
  376. message: Optional[bytes] = None,
  377. ) -> bool:
  378. """Add a new reference only if it does not already exist.
  379. Args:
  380. name: Ref name
  381. ref: Ref value
  382. committer: Optional committer name/email
  383. timestamp: Optional timestamp
  384. timezone: Optional timezone
  385. message: Optional message for reflog
  386. """
  387. raise NotImplementedError(self.add_if_new)
  388. def __setitem__(self, name: bytes, ref: bytes) -> None:
  389. """Set a reference name to point to the given SHA1.
  390. This method follows all symbolic references if applicable for the
  391. subclass.
  392. Note: This method unconditionally overwrites the contents of a
  393. reference. To update atomically only if the reference has not
  394. changed, use set_if_equals().
  395. Args:
  396. name: The refname to set.
  397. ref: The new sha the refname will refer to.
  398. """
  399. if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
  400. raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
  401. self.set_if_equals(name, None, ref)
  402. def remove_if_equals(
  403. self,
  404. name: bytes,
  405. old_ref: Optional[bytes],
  406. committer: Optional[bytes] = None,
  407. timestamp: Optional[int] = None,
  408. timezone: Optional[int] = None,
  409. message: Optional[bytes] = None,
  410. ) -> bool:
  411. """Remove a refname only if it currently equals old_ref.
  412. This method does not follow symbolic references, even if applicable for
  413. the subclass. It can be used to perform an atomic compare-and-delete
  414. operation.
  415. Args:
  416. name: The refname to delete.
  417. old_ref: The old sha the refname must refer to, or None to
  418. delete unconditionally.
  419. committer: Optional committer name/email
  420. timestamp: Optional timestamp
  421. timezone: Optional timezone
  422. message: Message for reflog
  423. Returns: True if the delete was successful, False otherwise.
  424. """
  425. raise NotImplementedError(self.remove_if_equals)
  426. def __delitem__(self, name: bytes) -> None:
  427. """Remove a refname.
  428. This method does not follow symbolic references, even if applicable for
  429. the subclass.
  430. Note: This method unconditionally deletes the contents of a reference.
  431. To delete atomically only if the reference has not changed, use
  432. remove_if_equals().
  433. Args:
  434. name: The refname to delete.
  435. """
  436. self.remove_if_equals(name, None)
  437. def get_symrefs(self) -> dict[bytes, bytes]:
  438. """Get a dict with all symrefs in this container.
  439. Returns: Dictionary mapping source ref to target ref
  440. """
  441. ret = {}
  442. for src in self.allkeys():
  443. try:
  444. ref_value = self.read_ref(src)
  445. assert ref_value is not None
  446. dst = parse_symref_value(ref_value)
  447. except ValueError:
  448. pass
  449. else:
  450. ret[src] = dst
  451. return ret
  452. def pack_refs(self, all: bool = False) -> None:
  453. """Pack loose refs into packed-refs file.
  454. Args:
  455. all: If True, pack all refs. If False, only pack tags.
  456. """
  457. raise NotImplementedError(self.pack_refs)
  458. class DictRefsContainer(RefsContainer):
  459. """RefsContainer backed by a simple dict.
  460. This container does not support symbolic or packed references and is not
  461. threadsafe.
  462. """
  463. def __init__(
  464. self,
  465. refs: dict[bytes, bytes],
  466. logger: Optional[
  467. Callable[
  468. [
  469. bytes,
  470. Optional[bytes],
  471. Optional[bytes],
  472. Optional[bytes],
  473. Optional[int],
  474. Optional[int],
  475. Optional[bytes],
  476. ],
  477. None,
  478. ]
  479. ] = None,
  480. ) -> None:
  481. """Initialize DictRefsContainer with refs dictionary and optional logger."""
  482. super().__init__(logger=logger)
  483. self._refs = refs
  484. self._peeled: dict[bytes, ObjectID] = {}
  485. self._watchers: set[Any] = set()
  486. def allkeys(self) -> set[bytes]:
  487. """Return all reference keys."""
  488. return set(self._refs.keys())
  489. def read_loose_ref(self, name: bytes) -> Optional[bytes]:
  490. """Read a loose reference."""
  491. return self._refs.get(name, None)
  492. def get_packed_refs(self) -> dict[bytes, bytes]:
  493. """Get packed references."""
  494. return {}
  495. def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None:
  496. for watcher in self._watchers:
  497. watcher._notify((ref, newsha))
  498. def set_symbolic_ref(
  499. self,
  500. name: Ref,
  501. other: Ref,
  502. committer: Optional[bytes] = None,
  503. timestamp: Optional[int] = None,
  504. timezone: Optional[int] = None,
  505. message: Optional[bytes] = None,
  506. ) -> None:
  507. """Make a ref point at another ref.
  508. Args:
  509. name: Name of the ref to set
  510. other: Name of the ref to point at
  511. committer: Optional committer name for reflog
  512. timestamp: Optional timestamp for reflog
  513. timezone: Optional timezone for reflog
  514. message: Optional message for reflog
  515. """
  516. old = self.follow(name)[-1]
  517. new = SYMREF + other
  518. self._refs[name] = new
  519. self._notify(name, new)
  520. self._log(
  521. name,
  522. old,
  523. new,
  524. committer=committer,
  525. timestamp=timestamp,
  526. timezone=timezone,
  527. message=message,
  528. )
  529. def set_if_equals(
  530. self,
  531. name: bytes,
  532. old_ref: Optional[bytes],
  533. new_ref: bytes,
  534. committer: Optional[bytes] = None,
  535. timestamp: Optional[int] = None,
  536. timezone: Optional[int] = None,
  537. message: Optional[bytes] = None,
  538. ) -> bool:
  539. """Set a refname to new_ref only if it currently equals old_ref.
  540. This method follows all symbolic references, and can be used to perform
  541. an atomic compare-and-swap operation.
  542. Args:
  543. name: The refname to set.
  544. old_ref: The old sha the refname must refer to, or None to set
  545. unconditionally.
  546. new_ref: The new sha the refname will refer to.
  547. committer: Optional committer name for reflog
  548. timestamp: Optional timestamp for reflog
  549. timezone: Optional timezone for reflog
  550. message: Optional message for reflog
  551. Returns:
  552. True if the set was successful, False otherwise.
  553. """
  554. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  555. return False
  556. # Only update the specific ref requested, not the whole chain
  557. self._check_refname(name)
  558. old = self._refs.get(name)
  559. self._refs[name] = new_ref
  560. self._notify(name, new_ref)
  561. self._log(
  562. name,
  563. old,
  564. new_ref,
  565. committer=committer,
  566. timestamp=timestamp,
  567. timezone=timezone,
  568. message=message,
  569. )
  570. return True
  571. def add_if_new(
  572. self,
  573. name: Ref,
  574. ref: ObjectID,
  575. committer: Optional[bytes] = None,
  576. timestamp: Optional[int] = None,
  577. timezone: Optional[int] = None,
  578. message: Optional[bytes] = None,
  579. ) -> bool:
  580. """Add a new reference only if it does not already exist.
  581. Args:
  582. name: Ref name
  583. ref: Ref value
  584. committer: Optional committer name for reflog
  585. timestamp: Optional timestamp for reflog
  586. timezone: Optional timezone for reflog
  587. message: Optional message for reflog
  588. Returns:
  589. True if the add was successful, False otherwise.
  590. """
  591. if name in self._refs:
  592. return False
  593. self._refs[name] = ref
  594. self._notify(name, ref)
  595. self._log(
  596. name,
  597. None,
  598. ref,
  599. committer=committer,
  600. timestamp=timestamp,
  601. timezone=timezone,
  602. message=message,
  603. )
  604. return True
  605. def remove_if_equals(
  606. self,
  607. name: bytes,
  608. old_ref: Optional[bytes],
  609. committer: Optional[bytes] = None,
  610. timestamp: Optional[int] = None,
  611. timezone: Optional[int] = None,
  612. message: Optional[bytes] = None,
  613. ) -> bool:
  614. """Remove a refname only if it currently equals old_ref.
  615. This method does not follow symbolic references. It can be used to
  616. perform an atomic compare-and-delete operation.
  617. Args:
  618. name: The refname to delete.
  619. old_ref: The old sha the refname must refer to, or None to
  620. delete unconditionally.
  621. committer: Optional committer name for reflog
  622. timestamp: Optional timestamp for reflog
  623. timezone: Optional timezone for reflog
  624. message: Optional message for reflog
  625. Returns:
  626. True if the delete was successful, False otherwise.
  627. """
  628. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  629. return False
  630. try:
  631. old = self._refs.pop(name)
  632. except KeyError:
  633. pass
  634. else:
  635. self._notify(name, None)
  636. self._log(
  637. name,
  638. old,
  639. None,
  640. committer=committer,
  641. timestamp=timestamp,
  642. timezone=timezone,
  643. message=message,
  644. )
  645. return True
  646. def get_peeled(self, name: bytes) -> Optional[bytes]:
  647. """Get peeled version of a reference."""
  648. return self._peeled.get(name)
  649. def _update(self, refs: dict[bytes, bytes]) -> None:
  650. """Update multiple refs; intended only for testing."""
  651. # TODO(dborowitz): replace this with a public function that uses
  652. # set_if_equal.
  653. for ref, sha in refs.items():
  654. self.set_if_equals(ref, None, sha)
  655. def _update_peeled(self, peeled: dict[bytes, bytes]) -> None:
  656. """Update cached peeled refs; intended only for testing."""
  657. self._peeled.update(peeled)
  658. class InfoRefsContainer(RefsContainer):
  659. """Refs container that reads refs from a info/refs file."""
  660. def __init__(self, f: BinaryIO) -> None:
  661. """Initialize InfoRefsContainer from info/refs file."""
  662. self._refs: dict[bytes, bytes] = {}
  663. self._peeled: dict[bytes, bytes] = {}
  664. refs = read_info_refs(f)
  665. (self._refs, self._peeled) = split_peeled_refs(refs)
  666. def allkeys(self) -> set[bytes]:
  667. """Return all reference keys."""
  668. return set(self._refs.keys())
  669. def read_loose_ref(self, name: bytes) -> Optional[bytes]:
  670. """Read a loose reference."""
  671. return self._refs.get(name, None)
  672. def get_packed_refs(self) -> dict[bytes, bytes]:
  673. """Get packed references."""
  674. return {}
  675. def get_peeled(self, name: bytes) -> Optional[bytes]:
  676. """Get peeled version of a reference."""
  677. try:
  678. return self._peeled[name]
  679. except KeyError:
  680. return self._refs[name]
  681. class DiskRefsContainer(RefsContainer):
  682. """Refs container that reads refs from disk."""
  683. def __init__(
  684. self,
  685. path: Union[str, bytes, os.PathLike],
  686. worktree_path: Optional[Union[str, bytes, os.PathLike]] = None,
  687. logger: Optional[
  688. Callable[
  689. [
  690. bytes,
  691. Optional[bytes],
  692. Optional[bytes],
  693. Optional[bytes],
  694. Optional[int],
  695. Optional[int],
  696. Optional[bytes],
  697. ],
  698. None,
  699. ]
  700. ] = None,
  701. ) -> None:
  702. """Initialize DiskRefsContainer."""
  703. super().__init__(logger=logger)
  704. # Convert path-like objects to strings, then to bytes for Git compatibility
  705. self.path = os.fsencode(os.fspath(path))
  706. if worktree_path is None:
  707. self.worktree_path = self.path
  708. else:
  709. self.worktree_path = os.fsencode(os.fspath(worktree_path))
  710. self._packed_refs: Optional[dict[bytes, bytes]] = None
  711. self._peeled_refs: Optional[dict[bytes, bytes]] = None
  712. def __repr__(self) -> str:
  713. """Return string representation of DiskRefsContainer."""
  714. return f"{self.__class__.__name__}({self.path!r})"
  715. def _iter_dir(
  716. self,
  717. path: bytes,
  718. base: bytes,
  719. dir_filter: Optional[Callable[[bytes], bool]] = None,
  720. ) -> Iterator[bytes]:
  721. refspath = os.path.join(path, base.rstrip(b"/"))
  722. prefix_len = len(os.path.join(path, b""))
  723. for root, dirs, files in os.walk(refspath):
  724. directory = root[prefix_len:]
  725. if os.path.sep != "/":
  726. directory = directory.replace(os.fsencode(os.path.sep), b"/")
  727. if dir_filter is not None:
  728. dirs[:] = [
  729. d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
  730. ]
  731. for filename in files:
  732. refname = b"/".join([directory, filename])
  733. if check_ref_format(refname):
  734. yield refname
  735. def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]:
  736. base = base.rstrip(b"/") + b"/"
  737. search_paths: list[tuple[bytes, Optional[Callable[[bytes], bool]]]] = []
  738. if base != b"refs/":
  739. path = self.worktree_path if is_per_worktree_ref(base) else self.path
  740. search_paths.append((path, None))
  741. elif self.worktree_path == self.path:
  742. # Iterate through all the refs from the main worktree
  743. search_paths.append((self.path, None))
  744. else:
  745. # Iterate through all the shared refs from the commondir, excluding per-worktree refs
  746. search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
  747. # Iterate through all the per-worktree refs from the worktree's gitdir
  748. search_paths.append((self.worktree_path, is_per_worktree_ref))
  749. for path, dir_filter in search_paths:
  750. yield from self._iter_dir(path, base, dir_filter=dir_filter)
  751. def subkeys(self, base: bytes) -> set[bytes]:
  752. """Return subkeys under a given base reference path."""
  753. subkeys = set()
  754. for key in self._iter_loose_refs(base):
  755. if key.startswith(base):
  756. subkeys.add(key[len(base) :].strip(b"/"))
  757. for key in self.get_packed_refs():
  758. if key.startswith(base):
  759. subkeys.add(key[len(base) :].strip(b"/"))
  760. return subkeys
  761. def allkeys(self) -> set[bytes]:
  762. """Return all reference keys."""
  763. allkeys = set()
  764. if os.path.exists(self.refpath(HEADREF)):
  765. allkeys.add(HEADREF)
  766. allkeys.update(self._iter_loose_refs())
  767. allkeys.update(self.get_packed_refs())
  768. return allkeys
  769. def refpath(self, name: bytes) -> bytes:
  770. """Return the disk path of a ref."""
  771. path = name
  772. if os.path.sep != "/":
  773. path = path.replace(b"/", os.fsencode(os.path.sep))
  774. root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
  775. return os.path.join(root_dir, path)
  776. def get_packed_refs(self) -> dict[bytes, bytes]:
  777. """Get contents of the packed-refs file.
  778. Returns: Dictionary mapping ref names to SHA1s
  779. Note: Will return an empty dictionary when no packed-refs file is
  780. present.
  781. """
  782. # TODO: invalidate the cache on repacking
  783. if self._packed_refs is None:
  784. # set both to empty because we want _peeled_refs to be
  785. # None if and only if _packed_refs is also None.
  786. self._packed_refs = {}
  787. self._peeled_refs = {}
  788. path = os.path.join(self.path, b"packed-refs")
  789. try:
  790. f = GitFile(path, "rb")
  791. except FileNotFoundError:
  792. return {}
  793. with f:
  794. first_line = next(iter(f)).rstrip()
  795. if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
  796. for sha, name, peeled in read_packed_refs_with_peeled(f):
  797. self._packed_refs[name] = sha
  798. if peeled:
  799. self._peeled_refs[name] = peeled
  800. else:
  801. f.seek(0)
  802. for sha, name in read_packed_refs(f):
  803. self._packed_refs[name] = sha
  804. return self._packed_refs
  805. def add_packed_refs(self, new_refs: dict[Ref, Optional[ObjectID]]) -> None:
  806. """Add the given refs as packed refs.
  807. Args:
  808. new_refs: A mapping of ref names to targets; if a target is None that
  809. means remove the ref
  810. """
  811. if not new_refs:
  812. return
  813. path = os.path.join(self.path, b"packed-refs")
  814. with GitFile(path, "wb") as f:
  815. # reread cached refs from disk, while holding the lock
  816. packed_refs = self.get_packed_refs().copy()
  817. for ref, target in new_refs.items():
  818. # sanity check
  819. if ref == HEADREF:
  820. raise ValueError("cannot pack HEAD")
  821. # remove any loose refs pointing to this one -- please
  822. # note that this bypasses remove_if_equals as we don't
  823. # want to affect packed refs in here
  824. with suppress(OSError):
  825. os.remove(self.refpath(ref))
  826. if target is not None:
  827. packed_refs[ref] = target
  828. else:
  829. packed_refs.pop(ref, None)
  830. write_packed_refs(f, packed_refs, self._peeled_refs)
  831. self._packed_refs = packed_refs
  832. def get_peeled(self, name: bytes) -> Optional[bytes]:
  833. """Return the cached peeled value of a ref, if available.
  834. Args:
  835. name: Name of the ref to peel
  836. Returns: The peeled value of the ref. If the ref is known not point to
  837. a tag, this will be the SHA the ref refers to. If the ref may point
  838. to a tag, but no cached information is available, None is returned.
  839. """
  840. self.get_packed_refs()
  841. if (
  842. self._peeled_refs is None
  843. or self._packed_refs is None
  844. or name not in self._packed_refs
  845. ):
  846. # No cache: no peeled refs were read, or this ref is loose
  847. return None
  848. if name in self._peeled_refs:
  849. return self._peeled_refs[name]
  850. else:
  851. # Known not peelable
  852. return self[name]
  853. def read_loose_ref(self, name: bytes) -> Optional[bytes]:
  854. """Read a reference file and return its contents.
  855. If the reference file a symbolic reference, only read the first line of
  856. the file. Otherwise, only read the first 40 bytes.
  857. Args:
  858. name: the refname to read, relative to refpath
  859. Returns: The contents of the ref file, or None if the file does not
  860. exist.
  861. Raises:
  862. IOError: if any other error occurs
  863. """
  864. filename = self.refpath(name)
  865. try:
  866. with GitFile(filename, "rb") as f:
  867. header = f.read(len(SYMREF))
  868. if header == SYMREF:
  869. # Read only the first line
  870. return header + next(iter(f)).rstrip(b"\r\n")
  871. else:
  872. # Read only the first 40 bytes
  873. return header + f.read(40 - len(SYMREF))
  874. except (OSError, UnicodeError):
  875. # don't assume anything specific about the error; in
  876. # particular, invalid or forbidden paths can raise weird
  877. # errors depending on the specific operating system
  878. return None
  879. def _remove_packed_ref(self, name: bytes) -> None:
  880. if self._packed_refs is None:
  881. return
  882. filename = os.path.join(self.path, b"packed-refs")
  883. # reread cached refs from disk, while holding the lock
  884. f = GitFile(filename, "wb")
  885. try:
  886. self._packed_refs = None
  887. self.get_packed_refs()
  888. if self._packed_refs is None or name not in self._packed_refs:
  889. f.abort()
  890. return
  891. del self._packed_refs[name]
  892. if self._peeled_refs is not None:
  893. with suppress(KeyError):
  894. del self._peeled_refs[name]
  895. write_packed_refs(f, self._packed_refs, self._peeled_refs)
  896. f.close()
  897. except BaseException:
  898. f.abort()
  899. raise
  900. def set_symbolic_ref(
  901. self,
  902. name: bytes,
  903. other: bytes,
  904. committer: Optional[bytes] = None,
  905. timestamp: Optional[int] = None,
  906. timezone: Optional[int] = None,
  907. message: Optional[bytes] = None,
  908. ) -> None:
  909. """Make a ref point at another ref.
  910. Args:
  911. name: Name of the ref to set
  912. other: Name of the ref to point at
  913. committer: Optional committer name
  914. timestamp: Optional timestamp
  915. timezone: Optional timezone
  916. message: Optional message to describe the change
  917. """
  918. self._check_refname(name)
  919. self._check_refname(other)
  920. filename = self.refpath(name)
  921. f = GitFile(filename, "wb")
  922. try:
  923. f.write(SYMREF + other + b"\n")
  924. sha = self.follow(name)[-1]
  925. self._log(
  926. name,
  927. sha,
  928. sha,
  929. committer=committer,
  930. timestamp=timestamp,
  931. timezone=timezone,
  932. message=message,
  933. )
  934. except BaseException:
  935. f.abort()
  936. raise
  937. else:
  938. f.close()
  939. def set_if_equals(
  940. self,
  941. name: bytes,
  942. old_ref: Optional[bytes],
  943. new_ref: bytes,
  944. committer: Optional[bytes] = None,
  945. timestamp: Optional[int] = None,
  946. timezone: Optional[int] = None,
  947. message: Optional[bytes] = None,
  948. ) -> bool:
  949. """Set a refname to new_ref only if it currently equals old_ref.
  950. This method follows all symbolic references, and can be used to perform
  951. an atomic compare-and-swap operation.
  952. Args:
  953. name: The refname to set.
  954. old_ref: The old sha the refname must refer to, or None to set
  955. unconditionally.
  956. new_ref: The new sha the refname will refer to.
  957. committer: Optional committer name
  958. timestamp: Optional timestamp
  959. timezone: Optional timezone
  960. message: Set message for reflog
  961. Returns: True if the set was successful, False otherwise.
  962. """
  963. self._check_refname(name)
  964. try:
  965. realnames, _ = self.follow(name)
  966. realname = realnames[-1]
  967. except (KeyError, IndexError, SymrefLoop):
  968. realname = name
  969. filename = self.refpath(realname)
  970. # make sure none of the ancestor folders is in packed refs
  971. probe_ref = os.path.dirname(realname)
  972. packed_refs = self.get_packed_refs()
  973. while probe_ref:
  974. if packed_refs.get(probe_ref, None) is not None:
  975. raise NotADirectoryError(filename)
  976. probe_ref = os.path.dirname(probe_ref)
  977. ensure_dir_exists(os.path.dirname(filename))
  978. with GitFile(filename, "wb") as f:
  979. if old_ref is not None:
  980. try:
  981. # read again while holding the lock to handle race conditions
  982. orig_ref = self.read_loose_ref(realname)
  983. if orig_ref is None:
  984. orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
  985. if orig_ref != old_ref:
  986. f.abort()
  987. return False
  988. except OSError:
  989. f.abort()
  990. raise
  991. # Check if ref already has the desired value while holding the lock
  992. # This avoids fsync when ref is unchanged but still detects lock conflicts
  993. current_ref = self.read_loose_ref(realname)
  994. if current_ref is None:
  995. current_ref = packed_refs.get(realname, None)
  996. if current_ref is not None and current_ref == new_ref:
  997. # Ref already has desired value, abort write to avoid fsync
  998. f.abort()
  999. return True
  1000. try:
  1001. f.write(new_ref + b"\n")
  1002. except OSError:
  1003. f.abort()
  1004. raise
  1005. self._log(
  1006. realname,
  1007. old_ref,
  1008. new_ref,
  1009. committer=committer,
  1010. timestamp=timestamp,
  1011. timezone=timezone,
  1012. message=message,
  1013. )
  1014. return True
  1015. def add_if_new(
  1016. self,
  1017. name: bytes,
  1018. ref: bytes,
  1019. committer: Optional[bytes] = None,
  1020. timestamp: Optional[int] = None,
  1021. timezone: Optional[int] = None,
  1022. message: Optional[bytes] = None,
  1023. ) -> bool:
  1024. """Add a new reference only if it does not already exist.
  1025. This method follows symrefs, and only ensures that the last ref in the
  1026. chain does not exist.
  1027. Args:
  1028. name: The refname to set.
  1029. ref: The new sha the refname will refer to.
  1030. committer: Optional committer name
  1031. timestamp: Optional timestamp
  1032. timezone: Optional timezone
  1033. message: Optional message for reflog
  1034. Returns: True if the add was successful, False otherwise.
  1035. """
  1036. try:
  1037. realnames, contents = self.follow(name)
  1038. if contents is not None:
  1039. return False
  1040. realname = realnames[-1]
  1041. except (KeyError, IndexError):
  1042. realname = name
  1043. self._check_refname(realname)
  1044. filename = self.refpath(realname)
  1045. ensure_dir_exists(os.path.dirname(filename))
  1046. with GitFile(filename, "wb") as f:
  1047. if os.path.exists(filename) or name in self.get_packed_refs():
  1048. f.abort()
  1049. return False
  1050. try:
  1051. f.write(ref + b"\n")
  1052. except OSError:
  1053. f.abort()
  1054. raise
  1055. else:
  1056. self._log(
  1057. name,
  1058. None,
  1059. ref,
  1060. committer=committer,
  1061. timestamp=timestamp,
  1062. timezone=timezone,
  1063. message=message,
  1064. )
  1065. return True
  1066. def remove_if_equals(
  1067. self,
  1068. name: bytes,
  1069. old_ref: Optional[bytes],
  1070. committer: Optional[bytes] = None,
  1071. timestamp: Optional[int] = None,
  1072. timezone: Optional[int] = None,
  1073. message: Optional[bytes] = None,
  1074. ) -> bool:
  1075. """Remove a refname only if it currently equals old_ref.
  1076. This method does not follow symbolic references. It can be used to
  1077. perform an atomic compare-and-delete operation.
  1078. Args:
  1079. name: The refname to delete.
  1080. old_ref: The old sha the refname must refer to, or None to
  1081. delete unconditionally.
  1082. committer: Optional committer name
  1083. timestamp: Optional timestamp
  1084. timezone: Optional timezone
  1085. message: Optional message
  1086. Returns: True if the delete was successful, False otherwise.
  1087. """
  1088. self._check_refname(name)
  1089. filename = self.refpath(name)
  1090. ensure_dir_exists(os.path.dirname(filename))
  1091. f = GitFile(filename, "wb")
  1092. try:
  1093. if old_ref is not None:
  1094. orig_ref = self.read_loose_ref(name)
  1095. if orig_ref is None:
  1096. orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
  1097. if orig_ref != old_ref:
  1098. return False
  1099. # remove the reference file itself
  1100. try:
  1101. found = os.path.lexists(filename)
  1102. except OSError:
  1103. # may only be packed, or otherwise unstorable
  1104. found = False
  1105. if found:
  1106. os.remove(filename)
  1107. self._remove_packed_ref(name)
  1108. self._log(
  1109. name,
  1110. old_ref,
  1111. None,
  1112. committer=committer,
  1113. timestamp=timestamp,
  1114. timezone=timezone,
  1115. message=message,
  1116. )
  1117. finally:
  1118. # never write, we just wanted the lock
  1119. f.abort()
  1120. # outside of the lock, clean-up any parent directory that might now
  1121. # be empty. this ensures that re-creating a reference of the same
  1122. # name of what was previously a directory works as expected
  1123. parent = name
  1124. while True:
  1125. try:
  1126. parent, _ = parent.rsplit(b"/", 1)
  1127. except ValueError:
  1128. break
  1129. if parent == b"refs":
  1130. break
  1131. parent_filename = self.refpath(parent)
  1132. try:
  1133. os.rmdir(parent_filename)
  1134. except OSError:
  1135. # this can be caused by the parent directory being
  1136. # removed by another process, being not empty, etc.
  1137. # in any case, this is non fatal because we already
  1138. # removed the reference, just ignore it
  1139. break
  1140. return True
  1141. def pack_refs(self, all: bool = False) -> None:
  1142. """Pack loose refs into packed-refs file.
  1143. Args:
  1144. all: If True, pack all refs. If False, only pack tags.
  1145. """
  1146. refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
  1147. for ref in self.allkeys():
  1148. if ref == HEADREF:
  1149. # Never pack HEAD
  1150. continue
  1151. if all or ref.startswith(LOCAL_TAG_PREFIX):
  1152. try:
  1153. sha = self[ref]
  1154. if sha:
  1155. refs_to_pack[ref] = sha
  1156. except KeyError:
  1157. # Broken ref, skip it
  1158. pass
  1159. if refs_to_pack:
  1160. self.add_packed_refs(refs_to_pack)
  1161. def _split_ref_line(line: bytes) -> tuple[bytes, bytes]:
  1162. """Split a single ref line into a tuple of SHA1 and name."""
  1163. fields = line.rstrip(b"\n\r").split(b" ")
  1164. if len(fields) != 2:
  1165. raise PackedRefsException(f"invalid ref line {line!r}")
  1166. sha, name = fields
  1167. if not valid_hexsha(sha):
  1168. raise PackedRefsException(f"Invalid hex sha {sha!r}")
  1169. if not check_ref_format(name):
  1170. raise PackedRefsException(f"invalid ref name {name!r}")
  1171. return (sha, name)
  1172. def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]:
  1173. """Read a packed refs file.
  1174. Args:
  1175. f: file-like object to read from
  1176. Returns: Iterator over tuples with SHA1s and ref names.
  1177. """
  1178. for line in f:
  1179. if line.startswith(b"#"):
  1180. # Comment
  1181. continue
  1182. if line.startswith(b"^"):
  1183. raise PackedRefsException("found peeled ref in packed-refs without peeled")
  1184. yield _split_ref_line(line)
  1185. def read_packed_refs_with_peeled(
  1186. f: IO[bytes],
  1187. ) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]:
  1188. """Read a packed refs file including peeled refs.
  1189. Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
  1190. with ref names, SHA1s, and peeled SHA1s (or None).
  1191. Args:
  1192. f: file-like object to read from, seek'ed to the second line
  1193. """
  1194. last = None
  1195. for line in f:
  1196. if line[0] == b"#":
  1197. continue
  1198. line = line.rstrip(b"\r\n")
  1199. if line.startswith(b"^"):
  1200. if not last:
  1201. raise PackedRefsException("unexpected peeled ref line")
  1202. if not valid_hexsha(line[1:]):
  1203. raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
  1204. sha, name = _split_ref_line(last)
  1205. last = None
  1206. yield (sha, name, line[1:])
  1207. else:
  1208. if last:
  1209. sha, name = _split_ref_line(last)
  1210. yield (sha, name, None)
  1211. last = line
  1212. if last:
  1213. sha, name = _split_ref_line(last)
  1214. yield (sha, name, None)
  1215. def write_packed_refs(
  1216. f: IO[bytes],
  1217. packed_refs: dict[bytes, bytes],
  1218. peeled_refs: Optional[dict[bytes, bytes]] = None,
  1219. ) -> None:
  1220. """Write a packed refs file.
  1221. Args:
  1222. f: empty file-like object to write to
  1223. packed_refs: dict of refname to sha of packed refs to write
  1224. peeled_refs: dict of refname to peeled value of sha
  1225. """
  1226. if peeled_refs is None:
  1227. peeled_refs = {}
  1228. else:
  1229. f.write(b"# pack-refs with: peeled\n")
  1230. for refname in sorted(packed_refs.keys()):
  1231. f.write(git_line(packed_refs[refname], refname))
  1232. if refname in peeled_refs:
  1233. f.write(b"^" + peeled_refs[refname] + b"\n")
  1234. def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]:
  1235. """Read info/refs file.
  1236. Args:
  1237. f: File-like object to read from
  1238. Returns:
  1239. Dictionary mapping ref names to SHA1s
  1240. """
  1241. ret = {}
  1242. for line in f.readlines():
  1243. (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
  1244. ret[name] = sha
  1245. return ret
  1246. def write_info_refs(
  1247. refs: dict[bytes, bytes], store: ObjectContainer
  1248. ) -> Iterator[bytes]:
  1249. """Generate info refs."""
  1250. # TODO: Avoid recursive import :(
  1251. from .object_store import peel_sha
  1252. for name, sha in sorted(refs.items()):
  1253. # get_refs() includes HEAD as a special case, but we don't want to
  1254. # advertise it
  1255. if name == HEADREF:
  1256. continue
  1257. try:
  1258. o = store[sha]
  1259. except KeyError:
  1260. continue
  1261. unpeeled, peeled = peel_sha(store, sha)
  1262. yield o.id + b"\t" + name + b"\n"
  1263. if o.id != peeled.id:
  1264. yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
  1265. def is_local_branch(x: bytes) -> bool:
  1266. """Check if a ref name is a local branch."""
  1267. return x.startswith(LOCAL_BRANCH_PREFIX)
  1268. T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]])
  1269. def strip_peeled_refs(refs: T) -> T:
  1270. """Remove all peeled refs."""
  1271. return {
  1272. ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
  1273. }
  1274. def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]:
  1275. """Split peeled refs from regular refs."""
  1276. peeled: dict[bytes, bytes] = {}
  1277. regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
  1278. for ref, sha in refs.items():
  1279. if ref.endswith(PEELED_TAG_SUFFIX):
  1280. # Only add to peeled dict if sha is not None
  1281. if sha is not None:
  1282. peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
  1283. return regular, peeled
  1284. def _set_origin_head(
  1285. refs: RefsContainer, origin: bytes, origin_head: Optional[bytes]
  1286. ) -> None:
  1287. # set refs/remotes/origin/HEAD
  1288. origin_base = b"refs/remotes/" + origin + b"/"
  1289. if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1290. origin_ref = origin_base + HEADREF
  1291. target_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
  1292. if target_ref in refs:
  1293. refs.set_symbolic_ref(origin_ref, target_ref)
  1294. def _set_default_branch(
  1295. refs: RefsContainer,
  1296. origin: bytes,
  1297. origin_head: Optional[bytes],
  1298. branch: bytes,
  1299. ref_message: Optional[bytes],
  1300. ) -> bytes:
  1301. """Set the default branch."""
  1302. origin_base = b"refs/remotes/" + origin + b"/"
  1303. if branch:
  1304. origin_ref = origin_base + branch
  1305. if origin_ref in refs:
  1306. local_ref = LOCAL_BRANCH_PREFIX + branch
  1307. refs.add_if_new(local_ref, refs[origin_ref], ref_message)
  1308. head_ref = local_ref
  1309. elif LOCAL_TAG_PREFIX + branch in refs:
  1310. head_ref = LOCAL_TAG_PREFIX + branch
  1311. else:
  1312. raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
  1313. elif origin_head:
  1314. head_ref = origin_head
  1315. if origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1316. origin_ref = origin_base + origin_head[len(LOCAL_BRANCH_PREFIX) :]
  1317. else:
  1318. origin_ref = origin_head
  1319. try:
  1320. refs.add_if_new(head_ref, refs[origin_ref], ref_message)
  1321. except KeyError:
  1322. pass
  1323. else:
  1324. raise ValueError("neither origin_head nor branch are provided")
  1325. return head_ref
  1326. def _set_head(
  1327. refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes]
  1328. ) -> Optional[bytes]:
  1329. if head_ref.startswith(LOCAL_TAG_PREFIX):
  1330. # detach HEAD at specified tag
  1331. head = refs[head_ref]
  1332. if isinstance(head, Tag):
  1333. _cls, obj = head.object
  1334. head = obj.get_object(obj).id
  1335. del refs[HEADREF]
  1336. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1337. else:
  1338. # set HEAD to specific branch
  1339. try:
  1340. head = refs[head_ref]
  1341. refs.set_symbolic_ref(HEADREF, head_ref)
  1342. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1343. except KeyError:
  1344. head = None
  1345. return head
  1346. def _import_remote_refs(
  1347. refs_container: RefsContainer,
  1348. remote_name: str,
  1349. refs: dict[bytes, Optional[bytes]],
  1350. message: Optional[bytes] = None,
  1351. prune: bool = False,
  1352. prune_tags: bool = False,
  1353. ) -> None:
  1354. stripped_refs = strip_peeled_refs(refs)
  1355. branches = {
  1356. n[len(LOCAL_BRANCH_PREFIX) :]: v
  1357. for (n, v) in stripped_refs.items()
  1358. if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None
  1359. }
  1360. refs_container.import_refs(
  1361. b"refs/remotes/" + remote_name.encode(),
  1362. branches,
  1363. message=message,
  1364. prune=prune,
  1365. )
  1366. tags = {
  1367. n[len(LOCAL_TAG_PREFIX) :]: v
  1368. for (n, v) in stripped_refs.items()
  1369. if n.startswith(LOCAL_TAG_PREFIX)
  1370. and not n.endswith(PEELED_TAG_SUFFIX)
  1371. and v is not None
  1372. }
  1373. refs_container.import_refs(
  1374. LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
  1375. )
  1376. def serialize_refs(
  1377. store: ObjectContainer, refs: dict[bytes, bytes]
  1378. ) -> dict[bytes, bytes]:
  1379. """Serialize refs with peeled refs.
  1380. Args:
  1381. store: Object store to peel refs from
  1382. refs: Dictionary of ref names to SHAs
  1383. Returns:
  1384. Dictionary with refs and peeled refs (marked with ^{})
  1385. """
  1386. # TODO: Avoid recursive import :(
  1387. from .object_store import peel_sha
  1388. ret = {}
  1389. for ref, sha in refs.items():
  1390. try:
  1391. unpeeled, peeled = peel_sha(store, sha)
  1392. except KeyError:
  1393. warnings.warn(
  1394. "ref {} points at non-present sha {}".format(
  1395. ref.decode("utf-8", "replace"), sha.decode("ascii")
  1396. ),
  1397. UserWarning,
  1398. )
  1399. continue
  1400. else:
  1401. if isinstance(unpeeled, Tag):
  1402. ret[ref + PEELED_TAG_SUFFIX] = peeled.id
  1403. ret[ref] = unpeeled.id
  1404. return ret
  1405. class locked_ref:
  1406. """Lock a ref while making modifications.
  1407. Works as a context manager.
  1408. """
  1409. def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
  1410. """Initialize a locked ref.
  1411. Args:
  1412. refs_container: The DiskRefsContainer to lock the ref in
  1413. refname: The ref name to lock
  1414. """
  1415. self._refs_container = refs_container
  1416. self._refname = refname
  1417. self._file: Optional[_GitFile] = None
  1418. self._realname: Optional[Ref] = None
  1419. self._deleted = False
  1420. def __enter__(self) -> "locked_ref":
  1421. """Enter the context manager and acquire the lock.
  1422. Returns:
  1423. This locked_ref instance
  1424. Raises:
  1425. OSError: If the lock cannot be acquired
  1426. """
  1427. self._refs_container._check_refname(self._refname)
  1428. try:
  1429. realnames, _ = self._refs_container.follow(self._refname)
  1430. self._realname = realnames[-1]
  1431. except (KeyError, IndexError, SymrefLoop):
  1432. self._realname = self._refname
  1433. filename = self._refs_container.refpath(self._realname)
  1434. ensure_dir_exists(os.path.dirname(filename))
  1435. f = GitFile(filename, "wb")
  1436. self._file = f
  1437. return self
  1438. def __exit__(
  1439. self,
  1440. exc_type: Optional[type],
  1441. exc_value: Optional[BaseException],
  1442. traceback: Optional[types.TracebackType],
  1443. ) -> None:
  1444. """Exit the context manager and release the lock.
  1445. Args:
  1446. exc_type: Type of exception if one occurred
  1447. exc_value: Exception instance if one occurred
  1448. traceback: Traceback if an exception occurred
  1449. """
  1450. if self._file:
  1451. if exc_type is not None or self._deleted:
  1452. self._file.abort()
  1453. else:
  1454. self._file.close()
  1455. def get(self) -> Optional[bytes]:
  1456. """Get the current value of the ref."""
  1457. if not self._file:
  1458. raise RuntimeError("locked_ref not in context")
  1459. assert self._realname is not None
  1460. current_ref = self._refs_container.read_loose_ref(self._realname)
  1461. if current_ref is None:
  1462. current_ref = self._refs_container.get_packed_refs().get(
  1463. self._realname, None
  1464. )
  1465. return current_ref
  1466. def ensure_equals(self, expected_value: Optional[bytes]) -> bool:
  1467. """Ensure the ref currently equals the expected value.
  1468. Args:
  1469. expected_value: The expected current value of the ref
  1470. Returns:
  1471. True if the ref equals the expected value, False otherwise
  1472. """
  1473. current_value = self.get()
  1474. return current_value == expected_value
  1475. def set(self, new_ref: bytes) -> None:
  1476. """Set the ref to a new value.
  1477. Args:
  1478. new_ref: The new SHA1 or symbolic ref value
  1479. """
  1480. if not self._file:
  1481. raise RuntimeError("locked_ref not in context")
  1482. if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
  1483. raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
  1484. self._file.seek(0)
  1485. self._file.truncate()
  1486. self._file.write(new_ref + b"\n")
  1487. self._deleted = False
  1488. def set_symbolic_ref(self, target: Ref) -> None:
  1489. """Make this ref point at another ref.
  1490. Args:
  1491. target: Name of the ref to point at
  1492. """
  1493. if not self._file:
  1494. raise RuntimeError("locked_ref not in context")
  1495. self._refs_container._check_refname(target)
  1496. self._file.seek(0)
  1497. self._file.truncate()
  1498. self._file.write(SYMREF + target + b"\n")
  1499. self._deleted = False
  1500. def delete(self) -> None:
  1501. """Delete the ref file while holding the lock."""
  1502. if not self._file:
  1503. raise RuntimeError("locked_ref not in context")
  1504. # Delete the actual ref file while holding the lock
  1505. if self._realname:
  1506. filename = self._refs_container.refpath(self._realname)
  1507. try:
  1508. if os.path.lexists(filename):
  1509. os.remove(filename)
  1510. except FileNotFoundError:
  1511. pass
  1512. self._refs_container._remove_packed_ref(self._realname)
  1513. self._deleted = True
  1514. def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
  1515. """Filter refs to only include those with a given prefix.
  1516. Args:
  1517. refs: A dictionary of refs.
  1518. prefixes: The prefixes to filter by.
  1519. """
  1520. filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
  1521. return cast(T, filtered)
  1522. def is_per_worktree_ref(ref: bytes) -> bool:
  1523. """Returns whether a reference is stored per worktree or not.
  1524. Per-worktree references are:
  1525. - all pseudorefs, e.g. HEAD
  1526. - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
  1527. All refs starting with "refs/" are shared, except for the ones listed above.
  1528. See https://git-scm.com/docs/git-worktree#_refs.
  1529. """
  1530. return not ref.startswith(b"refs/") or ref.startswith(
  1531. (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
  1532. )