refs.py 66 KB


  1. # refs.py -- For dealing with git refs
  2. # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Ref handling."""
  22. import os
  23. import types
  24. import warnings
  25. from collections.abc import Iterable, Iterator, Mapping
  26. from contextlib import suppress
  27. from typing import (
  28. IO,
  29. TYPE_CHECKING,
  30. Any,
  31. BinaryIO,
  32. Callable,
  33. Optional,
  34. TypeVar,
  35. Union,
  36. )
  37. if TYPE_CHECKING:
  38. from .file import _GitFile
  39. from .errors import PackedRefsException, RefFormatError
  40. from .file import GitFile, ensure_dir_exists
  41. from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
  42. from .pack import ObjectContainer
  43. Ref = bytes
  44. HEADREF = b"HEAD"
  45. SYMREF = b"ref: "
  46. LOCAL_BRANCH_PREFIX = b"refs/heads/"
  47. LOCAL_TAG_PREFIX = b"refs/tags/"
  48. LOCAL_REMOTE_PREFIX = b"refs/remotes/"
  49. LOCAL_NOTES_PREFIX = b"refs/notes/"
  50. LOCAL_REPLACE_PREFIX = b"refs/replace/"
  51. BAD_REF_CHARS = set(b"\177 ~^:?*[")
  52. PEELED_TAG_SUFFIX = b"^{}"
  53. # For backwards compatibility
  54. ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
  55. class SymrefLoop(Exception):
  56. """There is a loop between one or more symrefs."""
  57. def __init__(self, ref: bytes, depth: int) -> None:
  58. """Initialize SymrefLoop exception."""
  59. self.ref = ref
  60. self.depth = depth
  61. def parse_symref_value(contents: bytes) -> bytes:
  62. """Parse a symref value.
  63. Args:
  64. contents: Contents to parse
  65. Returns: Destination
  66. """
  67. if contents.startswith(SYMREF):
  68. return contents[len(SYMREF) :].rstrip(b"\r\n")
  69. raise ValueError(contents)
  70. def check_ref_format(refname: Ref) -> bool:
  71. """Check if a refname is correctly formatted.
  72. Implements all the same rules as git-check-ref-format[1].
  73. [1]
  74. http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
  75. Args:
  76. refname: The refname to check
  77. Returns: True if refname is valid, False otherwise
  78. """
  79. # These could be combined into one big expression, but are listed
  80. # separately to parallel [1].
  81. if b"/." in refname or refname.startswith(b"."):
  82. return False
  83. if b"/" not in refname:
  84. return False
  85. if b".." in refname:
  86. return False
  87. for i, c in enumerate(refname):
  88. if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
  89. return False
  90. if refname[-1] in b"/.":
  91. return False
  92. if refname.endswith(b".lock"):
  93. return False
  94. if b"@{" in refname:
  95. return False
  96. if b"\\" in refname:
  97. return False
  98. return True
  99. def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
  100. """Parse a remote ref into remote name and branch name.
  101. Args:
  102. ref: Remote ref like b"refs/remotes/origin/main"
  103. Returns:
  104. Tuple of (remote_name, branch_name)
  105. Raises:
  106. ValueError: If ref is not a valid remote ref
  107. """
  108. if not ref.startswith(LOCAL_REMOTE_PREFIX):
  109. raise ValueError(f"Not a remote ref: {ref!r}")
  110. # Remove the prefix
  111. remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
  112. # Split into remote name and branch name
  113. parts = remainder.split(b"/", 1)
  114. if len(parts) != 2:
  115. raise ValueError(f"Invalid remote ref format: {ref!r}")
  116. remote_name, branch_name = parts
  117. return (remote_name, branch_name)
  118. class RefsContainer:
  119. """A container for refs."""
  120. def __init__(
  121. self,
  122. logger: Optional[
  123. Callable[
  124. [
  125. bytes,
  126. bytes,
  127. bytes,
  128. Optional[bytes],
  129. Optional[int],
  130. Optional[int],
  131. bytes,
  132. ],
  133. None,
  134. ]
  135. ] = None,
  136. ) -> None:
  137. """Initialize RefsContainer with optional logger function."""
  138. self._logger = logger
  139. def _log(
  140. self,
  141. ref: bytes,
  142. old_sha: Optional[bytes],
  143. new_sha: Optional[bytes],
  144. committer: Optional[bytes] = None,
  145. timestamp: Optional[int] = None,
  146. timezone: Optional[int] = None,
  147. message: Optional[bytes] = None,
  148. ) -> None:
  149. if self._logger is None:
  150. return
  151. if message is None:
  152. return
  153. # Use ZERO_SHA for None values, matching git behavior
  154. if old_sha is None:
  155. old_sha = ZERO_SHA
  156. if new_sha is None:
  157. new_sha = ZERO_SHA
  158. self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
  159. def set_symbolic_ref(
  160. self,
  161. name: bytes,
  162. other: bytes,
  163. committer: Optional[bytes] = None,
  164. timestamp: Optional[int] = None,
  165. timezone: Optional[int] = None,
  166. message: Optional[bytes] = None,
  167. ) -> None:
  168. """Make a ref point at another ref.
  169. Args:
  170. name: Name of the ref to set
  171. other: Name of the ref to point at
  172. committer: Optional committer name/email
  173. timestamp: Optional timestamp
  174. timezone: Optional timezone
  175. message: Optional message
  176. """
  177. raise NotImplementedError(self.set_symbolic_ref)
  178. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  179. """Get contents of the packed-refs file.
  180. Returns: Dictionary mapping ref names to SHA1s
  181. Note: Will return an empty dictionary when no packed-refs file is
  182. present.
  183. """
  184. raise NotImplementedError(self.get_packed_refs)
  185. def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None:
  186. """Add the given refs as packed refs.
  187. Args:
  188. new_refs: A mapping of ref names to targets; if a target is None that
  189. means remove the ref
  190. """
  191. raise NotImplementedError(self.add_packed_refs)
  192. def get_peeled(self, name: bytes) -> Optional[ObjectID]:
  193. """Return the cached peeled value of a ref, if available.
  194. Args:
  195. name: Name of the ref to peel
  196. Returns: The peeled value of the ref. If the ref is known not point to
  197. a tag, this will be the SHA the ref refers to. If the ref may point
  198. to a tag, but no cached information is available, None is returned.
  199. """
  200. return None
  201. def import_refs(
  202. self,
  203. base: Ref,
  204. other: Mapping[Ref, ObjectID],
  205. committer: Optional[bytes] = None,
  206. timestamp: Optional[bytes] = None,
  207. timezone: Optional[bytes] = None,
  208. message: Optional[bytes] = None,
  209. prune: bool = False,
  210. ) -> None:
  211. """Import refs from another repository.
  212. Args:
  213. base: Base ref to import into (e.g., b'refs/remotes/origin')
  214. other: Dictionary of refs to import
  215. committer: Optional committer for reflog
  216. timestamp: Optional timestamp for reflog
  217. timezone: Optional timezone for reflog
  218. message: Optional message for reflog
  219. prune: If True, remove refs not in other
  220. """
  221. if prune:
  222. to_delete = set(self.subkeys(base))
  223. else:
  224. to_delete = set()
  225. for name, value in other.items():
  226. if value is None:
  227. to_delete.add(name)
  228. else:
  229. self.set_if_equals(
  230. b"/".join((base, name)), None, value, message=message
  231. )
  232. if to_delete:
  233. try:
  234. to_delete.remove(name)
  235. except KeyError:
  236. pass
  237. for ref in to_delete:
  238. self.remove_if_equals(b"/".join((base, ref)), None, message=message)
  239. def allkeys(self) -> set[Ref]:
  240. """All refs present in this container."""
  241. raise NotImplementedError(self.allkeys)
  242. def __iter__(self) -> Iterator[Ref]:
  243. """Iterate over all reference keys."""
  244. return iter(self.allkeys())
  245. def keys(self, base: Optional[bytes] = None) -> set[bytes]:
  246. """Refs present in this container.
  247. Args:
  248. base: An optional base to return refs under.
  249. Returns: An unsorted set of valid refs in this container, including
  250. packed refs.
  251. """
  252. if base is not None:
  253. return self.subkeys(base)
  254. else:
  255. return self.allkeys()
  256. def subkeys(self, base: bytes) -> set[bytes]:
  257. """Refs present in this container under a base.
  258. Args:
  259. base: The base to return refs under.
  260. Returns: A set of valid refs in this container under the base; the base
  261. prefix is stripped from the ref names returned.
  262. """
  263. keys = set()
  264. base_len = len(base) + 1
  265. for refname in self.allkeys():
  266. if refname.startswith(base):
  267. keys.add(refname[base_len:])
  268. return keys
  269. def as_dict(self, base: Optional[bytes] = None) -> dict[Ref, ObjectID]:
  270. """Return the contents of this container as a dictionary."""
  271. ret = {}
  272. keys = self.keys(base)
  273. if base is None:
  274. base = b""
  275. else:
  276. base = base.rstrip(b"/")
  277. for key in keys:
  278. try:
  279. ret[key] = self[(base + b"/" + key).strip(b"/")]
  280. except (SymrefLoop, KeyError):
  281. continue # Unable to resolve
  282. return ret
  283. def _check_refname(self, name: bytes) -> None:
  284. """Ensure a refname is valid and lives in refs or is HEAD.
  285. HEAD is not a valid refname according to git-check-ref-format, but this
  286. class needs to be able to touch HEAD. Also, check_ref_format expects
  287. refnames without the leading 'refs/', but this class requires that
  288. so it cannot touch anything outside the refs dir (or HEAD).
  289. Args:
  290. name: The name of the reference.
  291. Raises:
  292. KeyError: if a refname is not HEAD or is otherwise not valid.
  293. """
  294. if name in (HEADREF, b"refs/stash"):
  295. return
  296. if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
  297. raise RefFormatError(name)
  298. def read_ref(self, refname: bytes) -> Optional[bytes]:
  299. """Read a reference without following any references.
  300. Args:
  301. refname: The name of the reference
  302. Returns: The contents of the ref file, or None if it does
  303. not exist.
  304. """
  305. contents = self.read_loose_ref(refname)
  306. if not contents:
  307. contents = self.get_packed_refs().get(refname, None)
  308. return contents
  309. def read_loose_ref(self, name: bytes) -> Optional[bytes]:
  310. """Read a loose reference and return its contents.
  311. Args:
  312. name: the refname to read
  313. Returns: The contents of the ref file, or None if it does
  314. not exist.
  315. """
  316. raise NotImplementedError(self.read_loose_ref)
  317. def follow(self, name: bytes) -> tuple[list[bytes], Optional[bytes]]:
  318. """Follow a reference name.
  319. Returns: a tuple of (refnames, sha), wheres refnames are the names of
  320. references in the chain
  321. """
  322. contents: Optional[bytes] = SYMREF + name
  323. depth = 0
  324. refnames = []
  325. while contents and contents.startswith(SYMREF):
  326. refname = contents[len(SYMREF) :]
  327. refnames.append(refname)
  328. contents = self.read_ref(refname)
  329. if not contents:
  330. break
  331. depth += 1
  332. if depth > 5:
  333. raise SymrefLoop(name, depth)
  334. return refnames, contents
  335. def __contains__(self, refname: bytes) -> bool:
  336. """Check if a reference exists."""
  337. if self.read_ref(refname):
  338. return True
  339. return False
  340. def __getitem__(self, name: bytes) -> ObjectID:
  341. """Get the SHA1 for a reference name.
  342. This method follows all symbolic references.
  343. """
  344. _, sha = self.follow(name)
  345. if sha is None:
  346. raise KeyError(name)
  347. return sha
  348. def set_if_equals(
  349. self,
  350. name: bytes,
  351. old_ref: Optional[bytes],
  352. new_ref: bytes,
  353. committer: Optional[bytes] = None,
  354. timestamp: Optional[int] = None,
  355. timezone: Optional[int] = None,
  356. message: Optional[bytes] = None,
  357. ) -> bool:
  358. """Set a refname to new_ref only if it currently equals old_ref.
  359. This method follows all symbolic references if applicable for the
  360. subclass, and can be used to perform an atomic compare-and-swap
  361. operation.
  362. Args:
  363. name: The refname to set.
  364. old_ref: The old sha the refname must refer to, or None to set
  365. unconditionally.
  366. new_ref: The new sha the refname will refer to.
  367. committer: Optional committer name/email
  368. timestamp: Optional timestamp
  369. timezone: Optional timezone
  370. message: Message for reflog
  371. Returns: True if the set was successful, False otherwise.
  372. """
  373. raise NotImplementedError(self.set_if_equals)
  374. def add_if_new(
  375. self,
  376. name: bytes,
  377. ref: bytes,
  378. committer: Optional[bytes] = None,
  379. timestamp: Optional[int] = None,
  380. timezone: Optional[int] = None,
  381. message: Optional[bytes] = None,
  382. ) -> bool:
  383. """Add a new reference only if it does not already exist.
  384. Args:
  385. name: Ref name
  386. ref: Ref value
  387. committer: Optional committer name/email
  388. timestamp: Optional timestamp
  389. timezone: Optional timezone
  390. message: Optional message for reflog
  391. """
  392. raise NotImplementedError(self.add_if_new)
  393. def __setitem__(self, name: bytes, ref: bytes) -> None:
  394. """Set a reference name to point to the given SHA1.
  395. This method follows all symbolic references if applicable for the
  396. subclass.
  397. Note: This method unconditionally overwrites the contents of a
  398. reference. To update atomically only if the reference has not
  399. changed, use set_if_equals().
  400. Args:
  401. name: The refname to set.
  402. ref: The new sha the refname will refer to.
  403. """
  404. if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
  405. raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
  406. self.set_if_equals(name, None, ref)
  407. def remove_if_equals(
  408. self,
  409. name: bytes,
  410. old_ref: Optional[bytes],
  411. committer: Optional[bytes] = None,
  412. timestamp: Optional[int] = None,
  413. timezone: Optional[int] = None,
  414. message: Optional[bytes] = None,
  415. ) -> bool:
  416. """Remove a refname only if it currently equals old_ref.
  417. This method does not follow symbolic references, even if applicable for
  418. the subclass. It can be used to perform an atomic compare-and-delete
  419. operation.
  420. Args:
  421. name: The refname to delete.
  422. old_ref: The old sha the refname must refer to, or None to
  423. delete unconditionally.
  424. committer: Optional committer name/email
  425. timestamp: Optional timestamp
  426. timezone: Optional timezone
  427. message: Message for reflog
  428. Returns: True if the delete was successful, False otherwise.
  429. """
  430. raise NotImplementedError(self.remove_if_equals)
  431. def __delitem__(self, name: bytes) -> None:
  432. """Remove a refname.
  433. This method does not follow symbolic references, even if applicable for
  434. the subclass.
  435. Note: This method unconditionally deletes the contents of a reference.
  436. To delete atomically only if the reference has not changed, use
  437. remove_if_equals().
  438. Args:
  439. name: The refname to delete.
  440. """
  441. self.remove_if_equals(name, None)
  442. def get_symrefs(self) -> dict[bytes, bytes]:
  443. """Get a dict with all symrefs in this container.
  444. Returns: Dictionary mapping source ref to target ref
  445. """
  446. ret = {}
  447. for src in self.allkeys():
  448. try:
  449. ref_value = self.read_ref(src)
  450. assert ref_value is not None
  451. dst = parse_symref_value(ref_value)
  452. except ValueError:
  453. pass
  454. else:
  455. ret[src] = dst
  456. return ret
  457. def pack_refs(self, all: bool = False) -> None:
  458. """Pack loose refs into packed-refs file.
  459. Args:
  460. all: If True, pack all refs. If False, only pack tags.
  461. """
  462. raise NotImplementedError(self.pack_refs)
  463. class DictRefsContainer(RefsContainer):
  464. """RefsContainer backed by a simple dict.
  465. This container does not support symbolic or packed references and is not
  466. threadsafe.
  467. """
  468. def __init__(
  469. self,
  470. refs: dict[bytes, bytes],
  471. logger: Optional[
  472. Callable[
  473. [
  474. bytes,
  475. Optional[bytes],
  476. Optional[bytes],
  477. Optional[bytes],
  478. Optional[int],
  479. Optional[int],
  480. Optional[bytes],
  481. ],
  482. None,
  483. ]
  484. ] = None,
  485. ) -> None:
  486. """Initialize DictRefsContainer with refs dictionary and optional logger."""
  487. super().__init__(logger=logger)
  488. self._refs = refs
  489. self._peeled: dict[bytes, ObjectID] = {}
  490. self._watchers: set[Any] = set()
  491. def allkeys(self) -> set[bytes]:
  492. """Return all reference keys."""
  493. return set(self._refs.keys())
  494. def read_loose_ref(self, name: bytes) -> Optional[bytes]:
  495. """Read a loose reference."""
  496. return self._refs.get(name, None)
  497. def get_packed_refs(self) -> dict[bytes, bytes]:
  498. """Get packed references."""
  499. return {}
  500. def _notify(self, ref: bytes, newsha: Optional[bytes]) -> None:
  501. for watcher in self._watchers:
  502. watcher._notify((ref, newsha))
  503. def set_symbolic_ref(
  504. self,
  505. name: Ref,
  506. other: Ref,
  507. committer: Optional[bytes] = None,
  508. timestamp: Optional[int] = None,
  509. timezone: Optional[int] = None,
  510. message: Optional[bytes] = None,
  511. ) -> None:
  512. """Make a ref point at another ref.
  513. Args:
  514. name: Name of the ref to set
  515. other: Name of the ref to point at
  516. committer: Optional committer name for reflog
  517. timestamp: Optional timestamp for reflog
  518. timezone: Optional timezone for reflog
  519. message: Optional message for reflog
  520. """
  521. old = self.follow(name)[-1]
  522. new = SYMREF + other
  523. self._refs[name] = new
  524. self._notify(name, new)
  525. self._log(
  526. name,
  527. old,
  528. new,
  529. committer=committer,
  530. timestamp=timestamp,
  531. timezone=timezone,
  532. message=message,
  533. )
  534. def set_if_equals(
  535. self,
  536. name: bytes,
  537. old_ref: Optional[bytes],
  538. new_ref: bytes,
  539. committer: Optional[bytes] = None,
  540. timestamp: Optional[int] = None,
  541. timezone: Optional[int] = None,
  542. message: Optional[bytes] = None,
  543. ) -> bool:
  544. """Set a refname to new_ref only if it currently equals old_ref.
  545. This method follows all symbolic references, and can be used to perform
  546. an atomic compare-and-swap operation.
  547. Args:
  548. name: The refname to set.
  549. old_ref: The old sha the refname must refer to, or None to set
  550. unconditionally.
  551. new_ref: The new sha the refname will refer to.
  552. committer: Optional committer name for reflog
  553. timestamp: Optional timestamp for reflog
  554. timezone: Optional timezone for reflog
  555. message: Optional message for reflog
  556. Returns:
  557. True if the set was successful, False otherwise.
  558. """
  559. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  560. return False
  561. # Only update the specific ref requested, not the whole chain
  562. self._check_refname(name)
  563. old = self._refs.get(name)
  564. self._refs[name] = new_ref
  565. self._notify(name, new_ref)
  566. self._log(
  567. name,
  568. old,
  569. new_ref,
  570. committer=committer,
  571. timestamp=timestamp,
  572. timezone=timezone,
  573. message=message,
  574. )
  575. return True
  576. def add_if_new(
  577. self,
  578. name: Ref,
  579. ref: ObjectID,
  580. committer: Optional[bytes] = None,
  581. timestamp: Optional[int] = None,
  582. timezone: Optional[int] = None,
  583. message: Optional[bytes] = None,
  584. ) -> bool:
  585. """Add a new reference only if it does not already exist.
  586. Args:
  587. name: Ref name
  588. ref: Ref value
  589. committer: Optional committer name for reflog
  590. timestamp: Optional timestamp for reflog
  591. timezone: Optional timezone for reflog
  592. message: Optional message for reflog
  593. Returns:
  594. True if the add was successful, False otherwise.
  595. """
  596. if name in self._refs:
  597. return False
  598. self._refs[name] = ref
  599. self._notify(name, ref)
  600. self._log(
  601. name,
  602. None,
  603. ref,
  604. committer=committer,
  605. timestamp=timestamp,
  606. timezone=timezone,
  607. message=message,
  608. )
  609. return True
  610. def remove_if_equals(
  611. self,
  612. name: bytes,
  613. old_ref: Optional[bytes],
  614. committer: Optional[bytes] = None,
  615. timestamp: Optional[int] = None,
  616. timezone: Optional[int] = None,
  617. message: Optional[bytes] = None,
  618. ) -> bool:
  619. """Remove a refname only if it currently equals old_ref.
  620. This method does not follow symbolic references. It can be used to
  621. perform an atomic compare-and-delete operation.
  622. Args:
  623. name: The refname to delete.
  624. old_ref: The old sha the refname must refer to, or None to
  625. delete unconditionally.
  626. committer: Optional committer name for reflog
  627. timestamp: Optional timestamp for reflog
  628. timezone: Optional timezone for reflog
  629. message: Optional message for reflog
  630. Returns:
  631. True if the delete was successful, False otherwise.
  632. """
  633. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  634. return False
  635. try:
  636. old = self._refs.pop(name)
  637. except KeyError:
  638. pass
  639. else:
  640. self._notify(name, None)
  641. self._log(
  642. name,
  643. old,
  644. None,
  645. committer=committer,
  646. timestamp=timestamp,
  647. timezone=timezone,
  648. message=message,
  649. )
  650. return True
  651. def get_peeled(self, name: bytes) -> Optional[bytes]:
  652. """Get peeled version of a reference."""
  653. return self._peeled.get(name)
  654. def _update(self, refs: Mapping[bytes, bytes]) -> None:
  655. """Update multiple refs; intended only for testing."""
  656. # TODO(dborowitz): replace this with a public function that uses
  657. # set_if_equal.
  658. for ref, sha in refs.items():
  659. self.set_if_equals(ref, None, sha)
  660. def _update_peeled(self, peeled: Mapping[bytes, bytes]) -> None:
  661. """Update cached peeled refs; intended only for testing."""
  662. self._peeled.update(peeled)
  663. class InfoRefsContainer(RefsContainer):
  664. """Refs container that reads refs from a info/refs file."""
  665. def __init__(self, f: BinaryIO) -> None:
  666. """Initialize InfoRefsContainer from info/refs file."""
  667. self._refs: dict[bytes, bytes] = {}
  668. self._peeled: dict[bytes, bytes] = {}
  669. refs = read_info_refs(f)
  670. (self._refs, self._peeled) = split_peeled_refs(refs)
  671. def allkeys(self) -> set[bytes]:
  672. """Return all reference keys."""
  673. return set(self._refs.keys())
  674. def read_loose_ref(self, name: bytes) -> Optional[bytes]:
  675. """Read a loose reference."""
  676. return self._refs.get(name, None)
  677. def get_packed_refs(self) -> dict[bytes, bytes]:
  678. """Get packed references."""
  679. return {}
  680. def get_peeled(self, name: bytes) -> Optional[bytes]:
  681. """Get peeled version of a reference."""
  682. try:
  683. return self._peeled[name]
  684. except KeyError:
  685. return self._refs[name]
  686. class DiskRefsContainer(RefsContainer):
  687. """Refs container that reads refs from disk."""
  688. def __init__(
  689. self,
  690. path: Union[str, bytes, os.PathLike[str]],
  691. worktree_path: Optional[Union[str, bytes, os.PathLike[str]]] = None,
  692. logger: Optional[
  693. Callable[
  694. [
  695. bytes,
  696. bytes,
  697. bytes,
  698. Optional[bytes],
  699. Optional[int],
  700. Optional[int],
  701. bytes,
  702. ],
  703. None,
  704. ]
  705. ] = None,
  706. ) -> None:
  707. """Initialize DiskRefsContainer."""
  708. super().__init__(logger=logger)
  709. # Convert path-like objects to strings, then to bytes for Git compatibility
  710. self.path = os.fsencode(os.fspath(path))
  711. if worktree_path is None:
  712. self.worktree_path = self.path
  713. else:
  714. self.worktree_path = os.fsencode(os.fspath(worktree_path))
  715. self._packed_refs: Optional[dict[bytes, bytes]] = None
  716. self._peeled_refs: Optional[dict[bytes, bytes]] = None
  717. def __repr__(self) -> str:
  718. """Return string representation of DiskRefsContainer."""
  719. return f"{self.__class__.__name__}({self.path!r})"
  720. def _iter_dir(
  721. self,
  722. path: bytes,
  723. base: bytes,
  724. dir_filter: Optional[Callable[[bytes], bool]] = None,
  725. ) -> Iterator[bytes]:
  726. refspath = os.path.join(path, base.rstrip(b"/"))
  727. prefix_len = len(os.path.join(path, b""))
  728. for root, dirs, files in os.walk(refspath):
  729. directory = root[prefix_len:]
  730. if os.path.sep != "/":
  731. directory = directory.replace(os.fsencode(os.path.sep), b"/")
  732. if dir_filter is not None:
  733. dirs[:] = [
  734. d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
  735. ]
  736. for filename in files:
  737. refname = b"/".join([directory, filename])
  738. if check_ref_format(refname):
  739. yield refname
  740. def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]:
  741. base = base.rstrip(b"/") + b"/"
  742. search_paths: list[tuple[bytes, Optional[Callable[[bytes], bool]]]] = []
  743. if base != b"refs/":
  744. path = self.worktree_path if is_per_worktree_ref(base) else self.path
  745. search_paths.append((path, None))
  746. elif self.worktree_path == self.path:
  747. # Iterate through all the refs from the main worktree
  748. search_paths.append((self.path, None))
  749. else:
  750. # Iterate through all the shared refs from the commondir, excluding per-worktree refs
  751. search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
  752. # Iterate through all the per-worktree refs from the worktree's gitdir
  753. search_paths.append((self.worktree_path, is_per_worktree_ref))
  754. for path, dir_filter in search_paths:
  755. yield from self._iter_dir(path, base, dir_filter=dir_filter)
  756. def subkeys(self, base: bytes) -> set[bytes]:
  757. """Return subkeys under a given base reference path."""
  758. subkeys = set()
  759. for key in self._iter_loose_refs(base):
  760. if key.startswith(base):
  761. subkeys.add(key[len(base) :].strip(b"/"))
  762. for key in self.get_packed_refs():
  763. if key.startswith(base):
  764. subkeys.add(key[len(base) :].strip(b"/"))
  765. return subkeys
  766. def allkeys(self) -> set[bytes]:
  767. """Return all reference keys."""
  768. allkeys = set()
  769. if os.path.exists(self.refpath(HEADREF)):
  770. allkeys.add(HEADREF)
  771. allkeys.update(self._iter_loose_refs())
  772. allkeys.update(self.get_packed_refs())
  773. return allkeys
  774. def refpath(self, name: bytes) -> bytes:
  775. """Return the disk path of a ref."""
  776. path = name
  777. if os.path.sep != "/":
  778. path = path.replace(b"/", os.fsencode(os.path.sep))
  779. root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
  780. return os.path.join(root_dir, path)
  781. def get_packed_refs(self) -> dict[bytes, bytes]:
  782. """Get contents of the packed-refs file.
  783. Returns: Dictionary mapping ref names to SHA1s
  784. Note: Will return an empty dictionary when no packed-refs file is
  785. present.
  786. """
  787. # TODO: invalidate the cache on repacking
  788. if self._packed_refs is None:
  789. # set both to empty because we want _peeled_refs to be
  790. # None if and only if _packed_refs is also None.
  791. self._packed_refs = {}
  792. self._peeled_refs = {}
  793. path = os.path.join(self.path, b"packed-refs")
  794. try:
  795. f = GitFile(path, "rb")
  796. except FileNotFoundError:
  797. return {}
  798. with f:
  799. first_line = next(iter(f)).rstrip()
  800. if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
  801. for sha, name, peeled in read_packed_refs_with_peeled(f):
  802. self._packed_refs[name] = sha
  803. if peeled:
  804. self._peeled_refs[name] = peeled
  805. else:
  806. f.seek(0)
  807. for sha, name in read_packed_refs(f):
  808. self._packed_refs[name] = sha
  809. return self._packed_refs
  810. def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None:
  811. """Add the given refs as packed refs.
  812. Args:
  813. new_refs: A mapping of ref names to targets; if a target is None that
  814. means remove the ref
  815. """
  816. if not new_refs:
  817. return
  818. path = os.path.join(self.path, b"packed-refs")
  819. with GitFile(path, "wb") as f:
  820. # reread cached refs from disk, while holding the lock
  821. packed_refs = self.get_packed_refs().copy()
  822. for ref, target in new_refs.items():
  823. # sanity check
  824. if ref == HEADREF:
  825. raise ValueError("cannot pack HEAD")
  826. # remove any loose refs pointing to this one -- please
  827. # note that this bypasses remove_if_equals as we don't
  828. # want to affect packed refs in here
  829. with suppress(OSError):
  830. os.remove(self.refpath(ref))
  831. if target is not None:
  832. packed_refs[ref] = target
  833. else:
  834. packed_refs.pop(ref, None)
  835. write_packed_refs(f, packed_refs, self._peeled_refs)
  836. self._packed_refs = packed_refs
  837. def get_peeled(self, name: bytes) -> Optional[bytes]:
  838. """Return the cached peeled value of a ref, if available.
  839. Args:
  840. name: Name of the ref to peel
  841. Returns: The peeled value of the ref. If the ref is known not point to
  842. a tag, this will be the SHA the ref refers to. If the ref may point
  843. to a tag, but no cached information is available, None is returned.
  844. """
  845. self.get_packed_refs()
  846. if (
  847. self._peeled_refs is None
  848. or self._packed_refs is None
  849. or name not in self._packed_refs
  850. ):
  851. # No cache: no peeled refs were read, or this ref is loose
  852. return None
  853. if name in self._peeled_refs:
  854. return self._peeled_refs[name]
  855. else:
  856. # Known not peelable
  857. return self[name]
  858. def read_loose_ref(self, name: bytes) -> Optional[bytes]:
  859. """Read a reference file and return its contents.
  860. If the reference file a symbolic reference, only read the first line of
  861. the file. Otherwise, only read the first 40 bytes.
  862. Args:
  863. name: the refname to read, relative to refpath
  864. Returns: The contents of the ref file, or None if the file does not
  865. exist.
  866. Raises:
  867. IOError: if any other error occurs
  868. """
  869. filename = self.refpath(name)
  870. try:
  871. with GitFile(filename, "rb") as f:
  872. header = f.read(len(SYMREF))
  873. if header == SYMREF:
  874. # Read only the first line
  875. return header + next(iter(f)).rstrip(b"\r\n")
  876. else:
  877. # Read only the first 40 bytes
  878. return header + f.read(40 - len(SYMREF))
  879. except (OSError, UnicodeError):
  880. # don't assume anything specific about the error; in
  881. # particular, invalid or forbidden paths can raise weird
  882. # errors depending on the specific operating system
  883. return None
  884. def _remove_packed_ref(self, name: bytes) -> None:
  885. if self._packed_refs is None:
  886. return
  887. filename = os.path.join(self.path, b"packed-refs")
  888. # reread cached refs from disk, while holding the lock
  889. f = GitFile(filename, "wb")
  890. try:
  891. self._packed_refs = None
  892. self.get_packed_refs()
  893. if self._packed_refs is None or name not in self._packed_refs:
  894. f.abort()
  895. return
  896. del self._packed_refs[name]
  897. if self._peeled_refs is not None:
  898. with suppress(KeyError):
  899. del self._peeled_refs[name]
  900. write_packed_refs(f, self._packed_refs, self._peeled_refs)
  901. f.close()
  902. except BaseException:
  903. f.abort()
  904. raise
  905. def set_symbolic_ref(
  906. self,
  907. name: bytes,
  908. other: bytes,
  909. committer: Optional[bytes] = None,
  910. timestamp: Optional[int] = None,
  911. timezone: Optional[int] = None,
  912. message: Optional[bytes] = None,
  913. ) -> None:
  914. """Make a ref point at another ref.
  915. Args:
  916. name: Name of the ref to set
  917. other: Name of the ref to point at
  918. committer: Optional committer name
  919. timestamp: Optional timestamp
  920. timezone: Optional timezone
  921. message: Optional message to describe the change
  922. """
  923. self._check_refname(name)
  924. self._check_refname(other)
  925. filename = self.refpath(name)
  926. f = GitFile(filename, "wb")
  927. try:
  928. f.write(SYMREF + other + b"\n")
  929. sha = self.follow(name)[-1]
  930. self._log(
  931. name,
  932. sha,
  933. sha,
  934. committer=committer,
  935. timestamp=timestamp,
  936. timezone=timezone,
  937. message=message,
  938. )
  939. except BaseException:
  940. f.abort()
  941. raise
  942. else:
  943. f.close()
  944. def set_if_equals(
  945. self,
  946. name: bytes,
  947. old_ref: Optional[bytes],
  948. new_ref: bytes,
  949. committer: Optional[bytes] = None,
  950. timestamp: Optional[int] = None,
  951. timezone: Optional[int] = None,
  952. message: Optional[bytes] = None,
  953. ) -> bool:
  954. """Set a refname to new_ref only if it currently equals old_ref.
  955. This method follows all symbolic references, and can be used to perform
  956. an atomic compare-and-swap operation.
  957. Args:
  958. name: The refname to set.
  959. old_ref: The old sha the refname must refer to, or None to set
  960. unconditionally.
  961. new_ref: The new sha the refname will refer to.
  962. committer: Optional committer name
  963. timestamp: Optional timestamp
  964. timezone: Optional timezone
  965. message: Set message for reflog
  966. Returns: True if the set was successful, False otherwise.
  967. """
  968. self._check_refname(name)
  969. try:
  970. realnames, _ = self.follow(name)
  971. realname = realnames[-1]
  972. except (KeyError, IndexError, SymrefLoop):
  973. realname = name
  974. filename = self.refpath(realname)
  975. # make sure none of the ancestor folders is in packed refs
  976. probe_ref = os.path.dirname(realname)
  977. packed_refs = self.get_packed_refs()
  978. while probe_ref:
  979. if packed_refs.get(probe_ref, None) is not None:
  980. raise NotADirectoryError(filename)
  981. probe_ref = os.path.dirname(probe_ref)
  982. ensure_dir_exists(os.path.dirname(filename))
  983. with GitFile(filename, "wb") as f:
  984. if old_ref is not None:
  985. try:
  986. # read again while holding the lock to handle race conditions
  987. orig_ref = self.read_loose_ref(realname)
  988. if orig_ref is None:
  989. orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
  990. if orig_ref != old_ref:
  991. f.abort()
  992. return False
  993. except OSError:
  994. f.abort()
  995. raise
  996. # Check if ref already has the desired value while holding the lock
  997. # This avoids fsync when ref is unchanged but still detects lock conflicts
  998. current_ref = self.read_loose_ref(realname)
  999. if current_ref is None:
  1000. current_ref = packed_refs.get(realname, None)
  1001. if current_ref is not None and current_ref == new_ref:
  1002. # Ref already has desired value, abort write to avoid fsync
  1003. f.abort()
  1004. return True
  1005. try:
  1006. f.write(new_ref + b"\n")
  1007. except OSError:
  1008. f.abort()
  1009. raise
  1010. self._log(
  1011. realname,
  1012. old_ref,
  1013. new_ref,
  1014. committer=committer,
  1015. timestamp=timestamp,
  1016. timezone=timezone,
  1017. message=message,
  1018. )
  1019. return True
  1020. def add_if_new(
  1021. self,
  1022. name: bytes,
  1023. ref: bytes,
  1024. committer: Optional[bytes] = None,
  1025. timestamp: Optional[int] = None,
  1026. timezone: Optional[int] = None,
  1027. message: Optional[bytes] = None,
  1028. ) -> bool:
  1029. """Add a new reference only if it does not already exist.
  1030. This method follows symrefs, and only ensures that the last ref in the
  1031. chain does not exist.
  1032. Args:
  1033. name: The refname to set.
  1034. ref: The new sha the refname will refer to.
  1035. committer: Optional committer name
  1036. timestamp: Optional timestamp
  1037. timezone: Optional timezone
  1038. message: Optional message for reflog
  1039. Returns: True if the add was successful, False otherwise.
  1040. """
  1041. try:
  1042. realnames, contents = self.follow(name)
  1043. if contents is not None:
  1044. return False
  1045. realname = realnames[-1]
  1046. except (KeyError, IndexError):
  1047. realname = name
  1048. self._check_refname(realname)
  1049. filename = self.refpath(realname)
  1050. ensure_dir_exists(os.path.dirname(filename))
  1051. with GitFile(filename, "wb") as f:
  1052. if os.path.exists(filename) or name in self.get_packed_refs():
  1053. f.abort()
  1054. return False
  1055. try:
  1056. f.write(ref + b"\n")
  1057. except OSError:
  1058. f.abort()
  1059. raise
  1060. else:
  1061. self._log(
  1062. name,
  1063. None,
  1064. ref,
  1065. committer=committer,
  1066. timestamp=timestamp,
  1067. timezone=timezone,
  1068. message=message,
  1069. )
  1070. return True
  1071. def remove_if_equals(
  1072. self,
  1073. name: bytes,
  1074. old_ref: Optional[bytes],
  1075. committer: Optional[bytes] = None,
  1076. timestamp: Optional[int] = None,
  1077. timezone: Optional[int] = None,
  1078. message: Optional[bytes] = None,
  1079. ) -> bool:
  1080. """Remove a refname only if it currently equals old_ref.
  1081. This method does not follow symbolic references. It can be used to
  1082. perform an atomic compare-and-delete operation.
  1083. Args:
  1084. name: The refname to delete.
  1085. old_ref: The old sha the refname must refer to, or None to
  1086. delete unconditionally.
  1087. committer: Optional committer name
  1088. timestamp: Optional timestamp
  1089. timezone: Optional timezone
  1090. message: Optional message
  1091. Returns: True if the delete was successful, False otherwise.
  1092. """
  1093. self._check_refname(name)
  1094. filename = self.refpath(name)
  1095. ensure_dir_exists(os.path.dirname(filename))
  1096. f = GitFile(filename, "wb")
  1097. try:
  1098. if old_ref is not None:
  1099. orig_ref = self.read_loose_ref(name)
  1100. if orig_ref is None:
  1101. orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
  1102. if orig_ref != old_ref:
  1103. return False
  1104. # remove the reference file itself
  1105. try:
  1106. found = os.path.lexists(filename)
  1107. except OSError:
  1108. # may only be packed, or otherwise unstorable
  1109. found = False
  1110. if found:
  1111. os.remove(filename)
  1112. self._remove_packed_ref(name)
  1113. self._log(
  1114. name,
  1115. old_ref,
  1116. None,
  1117. committer=committer,
  1118. timestamp=timestamp,
  1119. timezone=timezone,
  1120. message=message,
  1121. )
  1122. finally:
  1123. # never write, we just wanted the lock
  1124. f.abort()
  1125. # outside of the lock, clean-up any parent directory that might now
  1126. # be empty. this ensures that re-creating a reference of the same
  1127. # name of what was previously a directory works as expected
  1128. parent = name
  1129. while True:
  1130. try:
  1131. parent, _ = parent.rsplit(b"/", 1)
  1132. except ValueError:
  1133. break
  1134. if parent == b"refs":
  1135. break
  1136. parent_filename = self.refpath(parent)
  1137. try:
  1138. os.rmdir(parent_filename)
  1139. except OSError:
  1140. # this can be caused by the parent directory being
  1141. # removed by another process, being not empty, etc.
  1142. # in any case, this is non fatal because we already
  1143. # removed the reference, just ignore it
  1144. break
  1145. return True
  1146. def pack_refs(self, all: bool = False) -> None:
  1147. """Pack loose refs into packed-refs file.
  1148. Args:
  1149. all: If True, pack all refs. If False, only pack tags.
  1150. """
  1151. refs_to_pack: dict[Ref, Optional[ObjectID]] = {}
  1152. for ref in self.allkeys():
  1153. if ref == HEADREF:
  1154. # Never pack HEAD
  1155. continue
  1156. if all or ref.startswith(LOCAL_TAG_PREFIX):
  1157. try:
  1158. sha = self[ref]
  1159. if sha:
  1160. refs_to_pack[ref] = sha
  1161. except KeyError:
  1162. # Broken ref, skip it
  1163. pass
  1164. if refs_to_pack:
  1165. self.add_packed_refs(refs_to_pack)
  1166. def _split_ref_line(line: bytes) -> tuple[bytes, bytes]:
  1167. """Split a single ref line into a tuple of SHA1 and name."""
  1168. fields = line.rstrip(b"\n\r").split(b" ")
  1169. if len(fields) != 2:
  1170. raise PackedRefsException(f"invalid ref line {line!r}")
  1171. sha, name = fields
  1172. if not valid_hexsha(sha):
  1173. raise PackedRefsException(f"Invalid hex sha {sha!r}")
  1174. if not check_ref_format(name):
  1175. raise PackedRefsException(f"invalid ref name {name!r}")
  1176. return (sha, name)
  1177. def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]:
  1178. """Read a packed refs file.
  1179. Args:
  1180. f: file-like object to read from
  1181. Returns: Iterator over tuples with SHA1s and ref names.
  1182. """
  1183. for line in f:
  1184. if line.startswith(b"#"):
  1185. # Comment
  1186. continue
  1187. if line.startswith(b"^"):
  1188. raise PackedRefsException("found peeled ref in packed-refs without peeled")
  1189. yield _split_ref_line(line)
  1190. def read_packed_refs_with_peeled(
  1191. f: IO[bytes],
  1192. ) -> Iterator[tuple[bytes, bytes, Optional[bytes]]]:
  1193. """Read a packed refs file including peeled refs.
  1194. Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
  1195. with ref names, SHA1s, and peeled SHA1s (or None).
  1196. Args:
  1197. f: file-like object to read from, seek'ed to the second line
  1198. """
  1199. last = None
  1200. for line in f:
  1201. if line.startswith(b"#"):
  1202. continue
  1203. line = line.rstrip(b"\r\n")
  1204. if line.startswith(b"^"):
  1205. if not last:
  1206. raise PackedRefsException("unexpected peeled ref line")
  1207. if not valid_hexsha(line[1:]):
  1208. raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
  1209. sha, name = _split_ref_line(last)
  1210. last = None
  1211. yield (sha, name, line[1:])
  1212. else:
  1213. if last:
  1214. sha, name = _split_ref_line(last)
  1215. yield (sha, name, None)
  1216. last = line
  1217. if last:
  1218. sha, name = _split_ref_line(last)
  1219. yield (sha, name, None)
  1220. def write_packed_refs(
  1221. f: IO[bytes],
  1222. packed_refs: Mapping[bytes, bytes],
  1223. peeled_refs: Optional[Mapping[bytes, bytes]] = None,
  1224. ) -> None:
  1225. """Write a packed refs file.
  1226. Args:
  1227. f: empty file-like object to write to
  1228. packed_refs: dict of refname to sha of packed refs to write
  1229. peeled_refs: dict of refname to peeled value of sha
  1230. """
  1231. if peeled_refs is None:
  1232. peeled_refs = {}
  1233. else:
  1234. f.write(b"# pack-refs with: peeled\n")
  1235. for refname in sorted(packed_refs.keys()):
  1236. f.write(git_line(packed_refs[refname], refname))
  1237. if refname in peeled_refs:
  1238. f.write(b"^" + peeled_refs[refname] + b"\n")
  1239. def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]:
  1240. """Read info/refs file.
  1241. Args:
  1242. f: File-like object to read from
  1243. Returns:
  1244. Dictionary mapping ref names to SHA1s
  1245. """
  1246. ret = {}
  1247. for line in f.readlines():
  1248. (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
  1249. ret[name] = sha
  1250. return ret
  1251. def write_info_refs(
  1252. refs: Mapping[bytes, bytes], store: ObjectContainer
  1253. ) -> Iterator[bytes]:
  1254. """Generate info refs."""
  1255. # TODO: Avoid recursive import :(
  1256. from .object_store import peel_sha
  1257. for name, sha in sorted(refs.items()):
  1258. # get_refs() includes HEAD as a special case, but we don't want to
  1259. # advertise it
  1260. if name == HEADREF:
  1261. continue
  1262. try:
  1263. o = store[sha]
  1264. except KeyError:
  1265. continue
  1266. _unpeeled, peeled = peel_sha(store, sha)
  1267. yield o.id + b"\t" + name + b"\n"
  1268. if o.id != peeled.id:
  1269. yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
  1270. def is_local_branch(x: bytes) -> bool:
  1271. """Check if a ref name is a local branch."""
  1272. return x.startswith(LOCAL_BRANCH_PREFIX)
  1273. def local_branch_name(name: bytes) -> bytes:
  1274. """Build a full branch ref from a short name.
  1275. Args:
  1276. name: Short branch name (e.g., b"master") or full ref
  1277. Returns:
  1278. Full branch ref name (e.g., b"refs/heads/master")
  1279. Examples:
  1280. >>> local_branch_name(b"master")
  1281. b'refs/heads/master'
  1282. >>> local_branch_name(b"refs/heads/master")
  1283. b'refs/heads/master'
  1284. """
  1285. if name.startswith(LOCAL_BRANCH_PREFIX):
  1286. return name
  1287. return LOCAL_BRANCH_PREFIX + name
  1288. def local_tag_name(name: bytes) -> bytes:
  1289. """Build a full tag ref from a short name.
  1290. Args:
  1291. name: Short tag name (e.g., b"v1.0") or full ref
  1292. Returns:
  1293. Full tag ref name (e.g., b"refs/tags/v1.0")
  1294. Examples:
  1295. >>> local_tag_name(b"v1.0")
  1296. b'refs/tags/v1.0'
  1297. >>> local_tag_name(b"refs/tags/v1.0")
  1298. b'refs/tags/v1.0'
  1299. """
  1300. if name.startswith(LOCAL_TAG_PREFIX):
  1301. return name
  1302. return LOCAL_TAG_PREFIX + name
  1303. def local_replace_name(name: bytes) -> bytes:
  1304. """Build a full replace ref from a short name.
  1305. Args:
  1306. name: Short replace name (object SHA) or full ref
  1307. Returns:
  1308. Full replace ref name (e.g., b"refs/replace/<sha>")
  1309. Examples:
  1310. >>> local_replace_name(b"abc123")
  1311. b'refs/replace/abc123'
  1312. >>> local_replace_name(b"refs/replace/abc123")
  1313. b'refs/replace/abc123'
  1314. """
  1315. if name.startswith(LOCAL_REPLACE_PREFIX):
  1316. return name
  1317. return LOCAL_REPLACE_PREFIX + name
  1318. def extract_branch_name(ref: bytes) -> bytes:
  1319. """Extract branch name from a full branch ref.
  1320. Args:
  1321. ref: Full branch ref (e.g., b"refs/heads/master")
  1322. Returns:
  1323. Short branch name (e.g., b"master")
  1324. Raises:
  1325. ValueError: If ref is not a local branch
  1326. Examples:
  1327. >>> extract_branch_name(b"refs/heads/master")
  1328. b'master'
  1329. >>> extract_branch_name(b"refs/heads/feature/foo")
  1330. b'feature/foo'
  1331. """
  1332. if not ref.startswith(LOCAL_BRANCH_PREFIX):
  1333. raise ValueError(f"Not a local branch ref: {ref!r}")
  1334. return ref[len(LOCAL_BRANCH_PREFIX) :]
  1335. def extract_tag_name(ref: bytes) -> bytes:
  1336. """Extract tag name from a full tag ref.
  1337. Args:
  1338. ref: Full tag ref (e.g., b"refs/tags/v1.0")
  1339. Returns:
  1340. Short tag name (e.g., b"v1.0")
  1341. Raises:
  1342. ValueError: If ref is not a local tag
  1343. Examples:
  1344. >>> extract_tag_name(b"refs/tags/v1.0")
  1345. b'v1.0'
  1346. """
  1347. if not ref.startswith(LOCAL_TAG_PREFIX):
  1348. raise ValueError(f"Not a local tag ref: {ref!r}")
  1349. return ref[len(LOCAL_TAG_PREFIX) :]
  1350. def shorten_ref_name(ref: bytes) -> bytes:
  1351. """Convert a full ref name to its short form.
  1352. Args:
  1353. ref: Full ref name (e.g., b"refs/heads/master")
  1354. Returns:
  1355. Short ref name (e.g., b"master")
  1356. Examples:
  1357. >>> shorten_ref_name(b"refs/heads/master")
  1358. b'master'
  1359. >>> shorten_ref_name(b"refs/remotes/origin/main")
  1360. b'origin/main'
  1361. >>> shorten_ref_name(b"refs/tags/v1.0")
  1362. b'v1.0'
  1363. >>> shorten_ref_name(b"HEAD")
  1364. b'HEAD'
  1365. """
  1366. if ref.startswith(LOCAL_BRANCH_PREFIX):
  1367. return ref[len(LOCAL_BRANCH_PREFIX) :]
  1368. elif ref.startswith(LOCAL_REMOTE_PREFIX):
  1369. return ref[len(LOCAL_REMOTE_PREFIX) :]
  1370. elif ref.startswith(LOCAL_TAG_PREFIX):
  1371. return ref[len(LOCAL_TAG_PREFIX) :]
  1372. return ref
  1373. T = TypeVar("T", dict[bytes, bytes], dict[bytes, Optional[bytes]])
  1374. def strip_peeled_refs(refs: T) -> T:
  1375. """Remove all peeled refs."""
  1376. return {
  1377. ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
  1378. }
  1379. def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]:
  1380. """Split peeled refs from regular refs."""
  1381. peeled: dict[bytes, bytes] = {}
  1382. regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
  1383. for ref, sha in refs.items():
  1384. if ref.endswith(PEELED_TAG_SUFFIX):
  1385. # Only add to peeled dict if sha is not None
  1386. if sha is not None:
  1387. peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
  1388. return regular, peeled
  1389. def _set_origin_head(
  1390. refs: RefsContainer, origin: bytes, origin_head: Optional[bytes]
  1391. ) -> None:
  1392. # set refs/remotes/origin/HEAD
  1393. origin_base = b"refs/remotes/" + origin + b"/"
  1394. if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1395. origin_ref = origin_base + HEADREF
  1396. target_ref = origin_base + extract_branch_name(origin_head)
  1397. if target_ref in refs:
  1398. refs.set_symbolic_ref(origin_ref, target_ref)
  1399. def _set_default_branch(
  1400. refs: RefsContainer,
  1401. origin: bytes,
  1402. origin_head: Optional[bytes],
  1403. branch: Optional[bytes],
  1404. ref_message: Optional[bytes],
  1405. ) -> bytes:
  1406. """Set the default branch."""
  1407. origin_base = b"refs/remotes/" + origin + b"/"
  1408. if branch:
  1409. origin_ref = origin_base + branch
  1410. if origin_ref in refs:
  1411. local_ref = local_branch_name(branch)
  1412. refs.add_if_new(local_ref, refs[origin_ref], ref_message)
  1413. head_ref = local_ref
  1414. elif local_tag_name(branch) in refs:
  1415. head_ref = local_tag_name(branch)
  1416. else:
  1417. raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
  1418. elif origin_head:
  1419. head_ref = origin_head
  1420. if origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1421. origin_ref = origin_base + extract_branch_name(origin_head)
  1422. else:
  1423. origin_ref = origin_head
  1424. try:
  1425. refs.add_if_new(head_ref, refs[origin_ref], ref_message)
  1426. except KeyError:
  1427. pass
  1428. else:
  1429. raise ValueError("neither origin_head nor branch are provided")
  1430. return head_ref
  1431. def _set_head(
  1432. refs: RefsContainer, head_ref: bytes, ref_message: Optional[bytes]
  1433. ) -> Optional[bytes]:
  1434. if head_ref.startswith(LOCAL_TAG_PREFIX):
  1435. # detach HEAD at specified tag
  1436. head = refs[head_ref]
  1437. if isinstance(head, Tag):
  1438. _cls, obj = head.object
  1439. head = obj.get_object(obj).id
  1440. del refs[HEADREF]
  1441. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1442. else:
  1443. # set HEAD to specific branch
  1444. try:
  1445. head = refs[head_ref]
  1446. refs.set_symbolic_ref(HEADREF, head_ref)
  1447. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1448. except KeyError:
  1449. head = None
  1450. return head
  1451. def _import_remote_refs(
  1452. refs_container: RefsContainer,
  1453. remote_name: str,
  1454. refs: dict[bytes, Optional[bytes]],
  1455. message: Optional[bytes] = None,
  1456. prune: bool = False,
  1457. prune_tags: bool = False,
  1458. ) -> None:
  1459. stripped_refs = strip_peeled_refs(refs)
  1460. branches = {
  1461. extract_branch_name(n): v
  1462. for (n, v) in stripped_refs.items()
  1463. if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None
  1464. }
  1465. refs_container.import_refs(
  1466. b"refs/remotes/" + remote_name.encode(),
  1467. branches,
  1468. message=message,
  1469. prune=prune,
  1470. )
  1471. tags = {
  1472. extract_tag_name(n): v
  1473. for (n, v) in stripped_refs.items()
  1474. if n.startswith(LOCAL_TAG_PREFIX)
  1475. and not n.endswith(PEELED_TAG_SUFFIX)
  1476. and v is not None
  1477. }
  1478. refs_container.import_refs(
  1479. LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
  1480. )
  1481. def serialize_refs(
  1482. store: ObjectContainer, refs: Mapping[bytes, bytes]
  1483. ) -> dict[bytes, bytes]:
  1484. """Serialize refs with peeled refs.
  1485. Args:
  1486. store: Object store to peel refs from
  1487. refs: Dictionary of ref names to SHAs
  1488. Returns:
  1489. Dictionary with refs and peeled refs (marked with ^{})
  1490. """
  1491. # TODO: Avoid recursive import :(
  1492. from .object_store import peel_sha
  1493. ret = {}
  1494. for ref, sha in refs.items():
  1495. try:
  1496. unpeeled, peeled = peel_sha(store, sha)
  1497. except KeyError:
  1498. warnings.warn(
  1499. "ref {} points at non-present sha {}".format(
  1500. ref.decode("utf-8", "replace"), sha.decode("ascii")
  1501. ),
  1502. UserWarning,
  1503. )
  1504. continue
  1505. else:
  1506. if isinstance(unpeeled, Tag):
  1507. ret[ref + PEELED_TAG_SUFFIX] = peeled.id
  1508. ret[ref] = unpeeled.id
  1509. return ret
  1510. class locked_ref:
  1511. """Lock a ref while making modifications.
  1512. Works as a context manager.
  1513. """
  1514. def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
  1515. """Initialize a locked ref.
  1516. Args:
  1517. refs_container: The DiskRefsContainer to lock the ref in
  1518. refname: The ref name to lock
  1519. """
  1520. self._refs_container = refs_container
  1521. self._refname = refname
  1522. self._file: Optional[_GitFile] = None
  1523. self._realname: Optional[Ref] = None
  1524. self._deleted = False
  1525. def __enter__(self) -> "locked_ref":
  1526. """Enter the context manager and acquire the lock.
  1527. Returns:
  1528. This locked_ref instance
  1529. Raises:
  1530. OSError: If the lock cannot be acquired
  1531. """
  1532. self._refs_container._check_refname(self._refname)
  1533. try:
  1534. realnames, _ = self._refs_container.follow(self._refname)
  1535. self._realname = realnames[-1]
  1536. except (KeyError, IndexError, SymrefLoop):
  1537. self._realname = self._refname
  1538. filename = self._refs_container.refpath(self._realname)
  1539. ensure_dir_exists(os.path.dirname(filename))
  1540. f = GitFile(filename, "wb")
  1541. self._file = f
  1542. return self
  1543. def __exit__(
  1544. self,
  1545. exc_type: Optional[type],
  1546. exc_value: Optional[BaseException],
  1547. traceback: Optional[types.TracebackType],
  1548. ) -> None:
  1549. """Exit the context manager and release the lock.
  1550. Args:
  1551. exc_type: Type of exception if one occurred
  1552. exc_value: Exception instance if one occurred
  1553. traceback: Traceback if an exception occurred
  1554. """
  1555. if self._file:
  1556. if exc_type is not None or self._deleted:
  1557. self._file.abort()
  1558. else:
  1559. self._file.close()
  1560. def get(self) -> Optional[bytes]:
  1561. """Get the current value of the ref."""
  1562. if not self._file:
  1563. raise RuntimeError("locked_ref not in context")
  1564. assert self._realname is not None
  1565. current_ref = self._refs_container.read_loose_ref(self._realname)
  1566. if current_ref is None:
  1567. current_ref = self._refs_container.get_packed_refs().get(
  1568. self._realname, None
  1569. )
  1570. return current_ref
  1571. def ensure_equals(self, expected_value: Optional[bytes]) -> bool:
  1572. """Ensure the ref currently equals the expected value.
  1573. Args:
  1574. expected_value: The expected current value of the ref
  1575. Returns:
  1576. True if the ref equals the expected value, False otherwise
  1577. """
  1578. current_value = self.get()
  1579. return current_value == expected_value
  1580. def set(self, new_ref: bytes) -> None:
  1581. """Set the ref to a new value.
  1582. Args:
  1583. new_ref: The new SHA1 or symbolic ref value
  1584. """
  1585. if not self._file:
  1586. raise RuntimeError("locked_ref not in context")
  1587. if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
  1588. raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
  1589. self._file.seek(0)
  1590. self._file.truncate()
  1591. self._file.write(new_ref + b"\n")
  1592. self._deleted = False
  1593. def set_symbolic_ref(self, target: Ref) -> None:
  1594. """Make this ref point at another ref.
  1595. Args:
  1596. target: Name of the ref to point at
  1597. """
  1598. if not self._file:
  1599. raise RuntimeError("locked_ref not in context")
  1600. self._refs_container._check_refname(target)
  1601. self._file.seek(0)
  1602. self._file.truncate()
  1603. self._file.write(SYMREF + target + b"\n")
  1604. self._deleted = False
  1605. def delete(self) -> None:
  1606. """Delete the ref file while holding the lock."""
  1607. if not self._file:
  1608. raise RuntimeError("locked_ref not in context")
  1609. # Delete the actual ref file while holding the lock
  1610. if self._realname:
  1611. filename = self._refs_container.refpath(self._realname)
  1612. try:
  1613. if os.path.lexists(filename):
  1614. os.remove(filename)
  1615. except FileNotFoundError:
  1616. pass
  1617. self._refs_container._remove_packed_ref(self._realname)
  1618. self._deleted = True
  1619. class NamespacedRefsContainer(RefsContainer):
  1620. """Wrapper that adds namespace prefix to all ref operations.
  1621. This implements Git's GIT_NAMESPACE feature, which stores refs under
  1622. refs/namespaces/<namespace>/ and filters operations to only show refs
  1623. within that namespace.
  1624. Example:
  1625. With namespace "foo", a ref "refs/heads/master" is stored as
  1626. "refs/namespaces/foo/refs/heads/master" in the underlying container.
  1627. """
  1628. def __init__(self, refs: RefsContainer, namespace: bytes) -> None:
  1629. """Initialize NamespacedRefsContainer.
  1630. Args:
  1631. refs: The underlying refs container to wrap
  1632. namespace: The namespace prefix (e.g., b"foo" or b"foo/bar")
  1633. """
  1634. super().__init__(logger=refs._logger)
  1635. self._refs = refs
  1636. # Build namespace prefix: refs/namespaces/<namespace>/
  1637. # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/
  1638. namespace_parts = namespace.split(b"/")
  1639. self._namespace_prefix = b""
  1640. for part in namespace_parts:
  1641. self._namespace_prefix += b"refs/namespaces/" + part + b"/"
  1642. def _apply_namespace(self, name: bytes) -> bytes:
  1643. """Apply namespace prefix to a ref name."""
  1644. # HEAD and other special refs are not namespaced
  1645. if name == HEADREF or not name.startswith(b"refs/"):
  1646. return name
  1647. return self._namespace_prefix + name
  1648. def _strip_namespace(self, name: bytes) -> Optional[bytes]:
  1649. """Remove namespace prefix from a ref name.
  1650. Returns None if the ref is not in our namespace.
  1651. """
  1652. # HEAD and other special refs are not namespaced
  1653. if name == HEADREF or not name.startswith(b"refs/"):
  1654. return name
  1655. if name.startswith(self._namespace_prefix):
  1656. return name[len(self._namespace_prefix) :]
  1657. return None
  1658. def allkeys(self) -> set[bytes]:
  1659. """Return all reference keys in this namespace."""
  1660. keys = set()
  1661. for key in self._refs.allkeys():
  1662. stripped = self._strip_namespace(key)
  1663. if stripped is not None:
  1664. keys.add(stripped)
  1665. return keys
  1666. def read_loose_ref(self, name: bytes) -> Optional[bytes]:
  1667. """Read a loose reference."""
  1668. return self._refs.read_loose_ref(self._apply_namespace(name))
  1669. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  1670. """Get packed refs within this namespace."""
  1671. packed = {}
  1672. for name, value in self._refs.get_packed_refs().items():
  1673. stripped = self._strip_namespace(name)
  1674. if stripped is not None:
  1675. packed[stripped] = value
  1676. return packed
  1677. def add_packed_refs(self, new_refs: Mapping[Ref, Optional[ObjectID]]) -> None:
  1678. """Add packed refs with namespace prefix."""
  1679. namespaced_refs = {
  1680. self._apply_namespace(name): value for name, value in new_refs.items()
  1681. }
  1682. self._refs.add_packed_refs(namespaced_refs)
  1683. def get_peeled(self, name: bytes) -> Optional[ObjectID]:
  1684. """Return the cached peeled value of a ref."""
  1685. return self._refs.get_peeled(self._apply_namespace(name))
  1686. def set_symbolic_ref(
  1687. self,
  1688. name: bytes,
  1689. other: bytes,
  1690. committer: Optional[bytes] = None,
  1691. timestamp: Optional[int] = None,
  1692. timezone: Optional[int] = None,
  1693. message: Optional[bytes] = None,
  1694. ) -> None:
  1695. """Make a ref point at another ref."""
  1696. self._refs.set_symbolic_ref(
  1697. self._apply_namespace(name),
  1698. self._apply_namespace(other),
  1699. committer=committer,
  1700. timestamp=timestamp,
  1701. timezone=timezone,
  1702. message=message,
  1703. )
  1704. def set_if_equals(
  1705. self,
  1706. name: bytes,
  1707. old_ref: Optional[bytes],
  1708. new_ref: bytes,
  1709. committer: Optional[bytes] = None,
  1710. timestamp: Optional[int] = None,
  1711. timezone: Optional[int] = None,
  1712. message: Optional[bytes] = None,
  1713. ) -> bool:
  1714. """Set a refname to new_ref only if it currently equals old_ref."""
  1715. return self._refs.set_if_equals(
  1716. self._apply_namespace(name),
  1717. old_ref,
  1718. new_ref,
  1719. committer=committer,
  1720. timestamp=timestamp,
  1721. timezone=timezone,
  1722. message=message,
  1723. )
  1724. def add_if_new(
  1725. self,
  1726. name: bytes,
  1727. ref: bytes,
  1728. committer: Optional[bytes] = None,
  1729. timestamp: Optional[int] = None,
  1730. timezone: Optional[int] = None,
  1731. message: Optional[bytes] = None,
  1732. ) -> bool:
  1733. """Add a new reference only if it does not already exist."""
  1734. return self._refs.add_if_new(
  1735. self._apply_namespace(name),
  1736. ref,
  1737. committer=committer,
  1738. timestamp=timestamp,
  1739. timezone=timezone,
  1740. message=message,
  1741. )
  1742. def remove_if_equals(
  1743. self,
  1744. name: bytes,
  1745. old_ref: Optional[bytes],
  1746. committer: Optional[bytes] = None,
  1747. timestamp: Optional[int] = None,
  1748. timezone: Optional[int] = None,
  1749. message: Optional[bytes] = None,
  1750. ) -> bool:
  1751. """Remove a refname only if it currently equals old_ref."""
  1752. return self._refs.remove_if_equals(
  1753. self._apply_namespace(name),
  1754. old_ref,
  1755. committer=committer,
  1756. timestamp=timestamp,
  1757. timezone=timezone,
  1758. message=message,
  1759. )
  1760. def pack_refs(self, all: bool = False) -> None:
  1761. """Pack loose refs into packed-refs file.
  1762. Note: This packs all refs in the underlying container, not just
  1763. those in the namespace.
  1764. """
  1765. self._refs.pack_refs(all=all)
  1766. def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
  1767. """Filter refs to only include those with a given prefix.
  1768. Args:
  1769. refs: A dictionary of refs.
  1770. prefixes: The prefixes to filter by.
  1771. """
  1772. filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
  1773. return filtered
  1774. def is_per_worktree_ref(ref: bytes) -> bool:
  1775. """Returns whether a reference is stored per worktree or not.
  1776. Per-worktree references are:
  1777. - all pseudorefs, e.g. HEAD
  1778. - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
  1779. All refs starting with "refs/" are shared, except for the ones listed above.
  1780. See https://git-scm.com/docs/git-worktree#_refs.
  1781. """
  1782. return not ref.startswith(b"refs/") or ref.startswith(
  1783. (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
  1784. )