refs.py 65 KB


  1. # refs.py -- For dealing with git refs
  2. # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Ref handling."""
  22. import os
  23. import types
  24. import warnings
  25. from collections.abc import Callable, Iterable, Iterator, Mapping
  26. from contextlib import suppress
  27. from typing import (
  28. IO,
  29. TYPE_CHECKING,
  30. Any,
  31. BinaryIO,
  32. TypeVar,
  33. )
  34. if TYPE_CHECKING:
  35. from .file import _GitFile
  36. from .errors import PackedRefsException, RefFormatError
  37. from .file import GitFile, ensure_dir_exists
  38. from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
  39. from .pack import ObjectContainer
  40. Ref = bytes
  41. HEADREF = b"HEAD"
  42. SYMREF = b"ref: "
  43. LOCAL_BRANCH_PREFIX = b"refs/heads/"
  44. LOCAL_TAG_PREFIX = b"refs/tags/"
  45. LOCAL_REMOTE_PREFIX = b"refs/remotes/"
  46. LOCAL_NOTES_PREFIX = b"refs/notes/"
  47. LOCAL_REPLACE_PREFIX = b"refs/replace/"
  48. BAD_REF_CHARS = set(b"\177 ~^:?*[")
  49. PEELED_TAG_SUFFIX = b"^{}"
  50. # For backwards compatibility
  51. ANNOTATED_TAG_SUFFIX = PEELED_TAG_SUFFIX
  52. class SymrefLoop(Exception):
  53. """There is a loop between one or more symrefs."""
  54. def __init__(self, ref: bytes, depth: int) -> None:
  55. """Initialize SymrefLoop exception."""
  56. self.ref = ref
  57. self.depth = depth
  58. def parse_symref_value(contents: bytes) -> bytes:
  59. """Parse a symref value.
  60. Args:
  61. contents: Contents to parse
  62. Returns: Destination
  63. """
  64. if contents.startswith(SYMREF):
  65. return contents[len(SYMREF) :].rstrip(b"\r\n")
  66. raise ValueError(contents)
  67. def check_ref_format(refname: Ref) -> bool:
  68. """Check if a refname is correctly formatted.
  69. Implements all the same rules as git-check-ref-format[1].
  70. [1]
  71. http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
  72. Args:
  73. refname: The refname to check
  74. Returns: True if refname is valid, False otherwise
  75. """
  76. # These could be combined into one big expression, but are listed
  77. # separately to parallel [1].
  78. if b"/." in refname or refname.startswith(b"."):
  79. return False
  80. if b"/" not in refname:
  81. return False
  82. if b".." in refname:
  83. return False
  84. for i, c in enumerate(refname):
  85. if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
  86. return False
  87. if refname[-1] in b"/.":
  88. return False
  89. if refname.endswith(b".lock"):
  90. return False
  91. if b"@{" in refname:
  92. return False
  93. if b"\\" in refname:
  94. return False
  95. return True
  96. def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
  97. """Parse a remote ref into remote name and branch name.
  98. Args:
  99. ref: Remote ref like b"refs/remotes/origin/main"
  100. Returns:
  101. Tuple of (remote_name, branch_name)
  102. Raises:
  103. ValueError: If ref is not a valid remote ref
  104. """
  105. if not ref.startswith(LOCAL_REMOTE_PREFIX):
  106. raise ValueError(f"Not a remote ref: {ref!r}")
  107. # Remove the prefix
  108. remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
  109. # Split into remote name and branch name
  110. parts = remainder.split(b"/", 1)
  111. if len(parts) != 2:
  112. raise ValueError(f"Invalid remote ref format: {ref!r}")
  113. remote_name, branch_name = parts
  114. return (remote_name, branch_name)
  115. class RefsContainer:
  116. """A container for refs."""
  117. def __init__(
  118. self,
  119. logger: Callable[
  120. [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
  121. ]
  122. | None = None,
  123. ) -> None:
  124. """Initialize RefsContainer with optional logger function."""
  125. self._logger = logger
  126. def _log(
  127. self,
  128. ref: bytes,
  129. old_sha: bytes | None,
  130. new_sha: bytes | None,
  131. committer: bytes | None = None,
  132. timestamp: int | None = None,
  133. timezone: int | None = None,
  134. message: bytes | None = None,
  135. ) -> None:
  136. if self._logger is None:
  137. return
  138. if message is None:
  139. return
  140. # Use ZERO_SHA for None values, matching git behavior
  141. if old_sha is None:
  142. old_sha = ZERO_SHA
  143. if new_sha is None:
  144. new_sha = ZERO_SHA
  145. self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
  146. def set_symbolic_ref(
  147. self,
  148. name: bytes,
  149. other: bytes,
  150. committer: bytes | None = None,
  151. timestamp: int | None = None,
  152. timezone: int | None = None,
  153. message: bytes | None = None,
  154. ) -> None:
  155. """Make a ref point at another ref.
  156. Args:
  157. name: Name of the ref to set
  158. other: Name of the ref to point at
  159. committer: Optional committer name/email
  160. timestamp: Optional timestamp
  161. timezone: Optional timezone
  162. message: Optional message
  163. """
  164. raise NotImplementedError(self.set_symbolic_ref)
  165. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  166. """Get contents of the packed-refs file.
  167. Returns: Dictionary mapping ref names to SHA1s
  168. Note: Will return an empty dictionary when no packed-refs file is
  169. present.
  170. """
  171. raise NotImplementedError(self.get_packed_refs)
  172. def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
  173. """Add the given refs as packed refs.
  174. Args:
  175. new_refs: A mapping of ref names to targets; if a target is None that
  176. means remove the ref
  177. """
  178. raise NotImplementedError(self.add_packed_refs)
  179. def get_peeled(self, name: bytes) -> ObjectID | None:
  180. """Return the cached peeled value of a ref, if available.
  181. Args:
  182. name: Name of the ref to peel
  183. Returns: The peeled value of the ref. If the ref is known not point to
  184. a tag, this will be the SHA the ref refers to. If the ref may point
  185. to a tag, but no cached information is available, None is returned.
  186. """
  187. return None
  188. def import_refs(
  189. self,
  190. base: Ref,
  191. other: Mapping[Ref, ObjectID],
  192. committer: bytes | None = None,
  193. timestamp: bytes | None = None,
  194. timezone: bytes | None = None,
  195. message: bytes | None = None,
  196. prune: bool = False,
  197. ) -> None:
  198. """Import refs from another repository.
  199. Args:
  200. base: Base ref to import into (e.g., b'refs/remotes/origin')
  201. other: Dictionary of refs to import
  202. committer: Optional committer for reflog
  203. timestamp: Optional timestamp for reflog
  204. timezone: Optional timezone for reflog
  205. message: Optional message for reflog
  206. prune: If True, remove refs not in other
  207. """
  208. if prune:
  209. to_delete = set(self.subkeys(base))
  210. else:
  211. to_delete = set()
  212. for name, value in other.items():
  213. if value is None:
  214. to_delete.add(name)
  215. else:
  216. self.set_if_equals(
  217. b"/".join((base, name)), None, value, message=message
  218. )
  219. if to_delete:
  220. try:
  221. to_delete.remove(name)
  222. except KeyError:
  223. pass
  224. for ref in to_delete:
  225. self.remove_if_equals(b"/".join((base, ref)), None, message=message)
  226. def allkeys(self) -> set[Ref]:
  227. """All refs present in this container."""
  228. raise NotImplementedError(self.allkeys)
  229. def __iter__(self) -> Iterator[Ref]:
  230. """Iterate over all reference keys."""
  231. return iter(self.allkeys())
  232. def keys(self, base: bytes | None = None) -> set[bytes]:
  233. """Refs present in this container.
  234. Args:
  235. base: An optional base to return refs under.
  236. Returns: An unsorted set of valid refs in this container, including
  237. packed refs.
  238. """
  239. if base is not None:
  240. return self.subkeys(base)
  241. else:
  242. return self.allkeys()
  243. def subkeys(self, base: bytes) -> set[bytes]:
  244. """Refs present in this container under a base.
  245. Args:
  246. base: The base to return refs under.
  247. Returns: A set of valid refs in this container under the base; the base
  248. prefix is stripped from the ref names returned.
  249. """
  250. keys = set()
  251. base_len = len(base) + 1
  252. for refname in self.allkeys():
  253. if refname.startswith(base):
  254. keys.add(refname[base_len:])
  255. return keys
  256. def as_dict(self, base: bytes | None = None) -> dict[Ref, ObjectID]:
  257. """Return the contents of this container as a dictionary."""
  258. ret = {}
  259. keys = self.keys(base)
  260. if base is None:
  261. base = b""
  262. else:
  263. base = base.rstrip(b"/")
  264. for key in keys:
  265. try:
  266. ret[key] = self[(base + b"/" + key).strip(b"/")]
  267. except (SymrefLoop, KeyError):
  268. continue # Unable to resolve
  269. return ret
  270. def _check_refname(self, name: bytes) -> None:
  271. """Ensure a refname is valid and lives in refs or is HEAD.
  272. HEAD is not a valid refname according to git-check-ref-format, but this
  273. class needs to be able to touch HEAD. Also, check_ref_format expects
  274. refnames without the leading 'refs/', but this class requires that
  275. so it cannot touch anything outside the refs dir (or HEAD).
  276. Args:
  277. name: The name of the reference.
  278. Raises:
  279. KeyError: if a refname is not HEAD or is otherwise not valid.
  280. """
  281. if name in (HEADREF, b"refs/stash"):
  282. return
  283. if not name.startswith(b"refs/") or not check_ref_format(name[5:]):
  284. raise RefFormatError(name)
  285. def read_ref(self, refname: bytes) -> bytes | None:
  286. """Read a reference without following any references.
  287. Args:
  288. refname: The name of the reference
  289. Returns: The contents of the ref file, or None if it does
  290. not exist.
  291. """
  292. contents = self.read_loose_ref(refname)
  293. if not contents:
  294. contents = self.get_packed_refs().get(refname, None)
  295. return contents
  296. def read_loose_ref(self, name: bytes) -> bytes | None:
  297. """Read a loose reference and return its contents.
  298. Args:
  299. name: the refname to read
  300. Returns: The contents of the ref file, or None if it does
  301. not exist.
  302. """
  303. raise NotImplementedError(self.read_loose_ref)
  304. def follow(self, name: bytes) -> tuple[list[bytes], bytes | None]:
  305. """Follow a reference name.
  306. Returns: a tuple of (refnames, sha), wheres refnames are the names of
  307. references in the chain
  308. """
  309. contents: bytes | None = SYMREF + name
  310. depth = 0
  311. refnames = []
  312. while contents and contents.startswith(SYMREF):
  313. refname = contents[len(SYMREF) :]
  314. refnames.append(refname)
  315. contents = self.read_ref(refname)
  316. if not contents:
  317. break
  318. depth += 1
  319. if depth > 5:
  320. raise SymrefLoop(name, depth)
  321. return refnames, contents
  322. def __contains__(self, refname: bytes) -> bool:
  323. """Check if a reference exists."""
  324. if self.read_ref(refname):
  325. return True
  326. return False
  327. def __getitem__(self, name: bytes) -> ObjectID:
  328. """Get the SHA1 for a reference name.
  329. This method follows all symbolic references.
  330. """
  331. _, sha = self.follow(name)
  332. if sha is None:
  333. raise KeyError(name)
  334. return sha
  335. def set_if_equals(
  336. self,
  337. name: bytes,
  338. old_ref: bytes | None,
  339. new_ref: bytes,
  340. committer: bytes | None = None,
  341. timestamp: int | None = None,
  342. timezone: int | None = None,
  343. message: bytes | None = None,
  344. ) -> bool:
  345. """Set a refname to new_ref only if it currently equals old_ref.
  346. This method follows all symbolic references if applicable for the
  347. subclass, and can be used to perform an atomic compare-and-swap
  348. operation.
  349. Args:
  350. name: The refname to set.
  351. old_ref: The old sha the refname must refer to, or None to set
  352. unconditionally.
  353. new_ref: The new sha the refname will refer to.
  354. committer: Optional committer name/email
  355. timestamp: Optional timestamp
  356. timezone: Optional timezone
  357. message: Message for reflog
  358. Returns: True if the set was successful, False otherwise.
  359. """
  360. raise NotImplementedError(self.set_if_equals)
  361. def add_if_new(
  362. self,
  363. name: bytes,
  364. ref: bytes,
  365. committer: bytes | None = None,
  366. timestamp: int | None = None,
  367. timezone: int | None = None,
  368. message: bytes | None = None,
  369. ) -> bool:
  370. """Add a new reference only if it does not already exist.
  371. Args:
  372. name: Ref name
  373. ref: Ref value
  374. committer: Optional committer name/email
  375. timestamp: Optional timestamp
  376. timezone: Optional timezone
  377. message: Optional message for reflog
  378. """
  379. raise NotImplementedError(self.add_if_new)
  380. def __setitem__(self, name: bytes, ref: bytes) -> None:
  381. """Set a reference name to point to the given SHA1.
  382. This method follows all symbolic references if applicable for the
  383. subclass.
  384. Note: This method unconditionally overwrites the contents of a
  385. reference. To update atomically only if the reference has not
  386. changed, use set_if_equals().
  387. Args:
  388. name: The refname to set.
  389. ref: The new sha the refname will refer to.
  390. """
  391. if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
  392. raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
  393. self.set_if_equals(name, None, ref)
  394. def remove_if_equals(
  395. self,
  396. name: bytes,
  397. old_ref: bytes | None,
  398. committer: bytes | None = None,
  399. timestamp: int | None = None,
  400. timezone: int | None = None,
  401. message: bytes | None = None,
  402. ) -> bool:
  403. """Remove a refname only if it currently equals old_ref.
  404. This method does not follow symbolic references, even if applicable for
  405. the subclass. It can be used to perform an atomic compare-and-delete
  406. operation.
  407. Args:
  408. name: The refname to delete.
  409. old_ref: The old sha the refname must refer to, or None to
  410. delete unconditionally.
  411. committer: Optional committer name/email
  412. timestamp: Optional timestamp
  413. timezone: Optional timezone
  414. message: Message for reflog
  415. Returns: True if the delete was successful, False otherwise.
  416. """
  417. raise NotImplementedError(self.remove_if_equals)
  418. def __delitem__(self, name: bytes) -> None:
  419. """Remove a refname.
  420. This method does not follow symbolic references, even if applicable for
  421. the subclass.
  422. Note: This method unconditionally deletes the contents of a reference.
  423. To delete atomically only if the reference has not changed, use
  424. remove_if_equals().
  425. Args:
  426. name: The refname to delete.
  427. """
  428. self.remove_if_equals(name, None)
  429. def get_symrefs(self) -> dict[bytes, bytes]:
  430. """Get a dict with all symrefs in this container.
  431. Returns: Dictionary mapping source ref to target ref
  432. """
  433. ret = {}
  434. for src in self.allkeys():
  435. try:
  436. ref_value = self.read_ref(src)
  437. assert ref_value is not None
  438. dst = parse_symref_value(ref_value)
  439. except ValueError:
  440. pass
  441. else:
  442. ret[src] = dst
  443. return ret
  444. def pack_refs(self, all: bool = False) -> None:
  445. """Pack loose refs into packed-refs file.
  446. Args:
  447. all: If True, pack all refs. If False, only pack tags.
  448. """
  449. raise NotImplementedError(self.pack_refs)
  450. class DictRefsContainer(RefsContainer):
  451. """RefsContainer backed by a simple dict.
  452. This container does not support symbolic or packed references and is not
  453. threadsafe.
  454. """
  455. def __init__(
  456. self,
  457. refs: dict[bytes, bytes],
  458. logger: Callable[
  459. [
  460. bytes,
  461. bytes | None,
  462. bytes | None,
  463. bytes | None,
  464. int | None,
  465. int | None,
  466. bytes | None,
  467. ],
  468. None,
  469. ]
  470. | None = None,
  471. ) -> None:
  472. """Initialize DictRefsContainer with refs dictionary and optional logger."""
  473. super().__init__(logger=logger)
  474. self._refs = refs
  475. self._peeled: dict[bytes, ObjectID] = {}
  476. self._watchers: set[Any] = set()
  477. def allkeys(self) -> set[bytes]:
  478. """Return all reference keys."""
  479. return set(self._refs.keys())
  480. def read_loose_ref(self, name: bytes) -> bytes | None:
  481. """Read a loose reference."""
  482. return self._refs.get(name, None)
  483. def get_packed_refs(self) -> dict[bytes, bytes]:
  484. """Get packed references."""
  485. return {}
  486. def _notify(self, ref: bytes, newsha: bytes | None) -> None:
  487. for watcher in self._watchers:
  488. watcher._notify((ref, newsha))
  489. def set_symbolic_ref(
  490. self,
  491. name: Ref,
  492. other: Ref,
  493. committer: bytes | None = None,
  494. timestamp: int | None = None,
  495. timezone: int | None = None,
  496. message: bytes | None = None,
  497. ) -> None:
  498. """Make a ref point at another ref.
  499. Args:
  500. name: Name of the ref to set
  501. other: Name of the ref to point at
  502. committer: Optional committer name for reflog
  503. timestamp: Optional timestamp for reflog
  504. timezone: Optional timezone for reflog
  505. message: Optional message for reflog
  506. """
  507. old = self.follow(name)[-1]
  508. new = SYMREF + other
  509. self._refs[name] = new
  510. self._notify(name, new)
  511. self._log(
  512. name,
  513. old,
  514. new,
  515. committer=committer,
  516. timestamp=timestamp,
  517. timezone=timezone,
  518. message=message,
  519. )
  520. def set_if_equals(
  521. self,
  522. name: bytes,
  523. old_ref: bytes | None,
  524. new_ref: bytes,
  525. committer: bytes | None = None,
  526. timestamp: int | None = None,
  527. timezone: int | None = None,
  528. message: bytes | None = None,
  529. ) -> bool:
  530. """Set a refname to new_ref only if it currently equals old_ref.
  531. This method follows all symbolic references, and can be used to perform
  532. an atomic compare-and-swap operation.
  533. Args:
  534. name: The refname to set.
  535. old_ref: The old sha the refname must refer to, or None to set
  536. unconditionally.
  537. new_ref: The new sha the refname will refer to.
  538. committer: Optional committer name for reflog
  539. timestamp: Optional timestamp for reflog
  540. timezone: Optional timezone for reflog
  541. message: Optional message for reflog
  542. Returns:
  543. True if the set was successful, False otherwise.
  544. """
  545. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  546. return False
  547. # Only update the specific ref requested, not the whole chain
  548. self._check_refname(name)
  549. old = self._refs.get(name)
  550. self._refs[name] = new_ref
  551. self._notify(name, new_ref)
  552. self._log(
  553. name,
  554. old,
  555. new_ref,
  556. committer=committer,
  557. timestamp=timestamp,
  558. timezone=timezone,
  559. message=message,
  560. )
  561. return True
  562. def add_if_new(
  563. self,
  564. name: Ref,
  565. ref: ObjectID,
  566. committer: bytes | None = None,
  567. timestamp: int | None = None,
  568. timezone: int | None = None,
  569. message: bytes | None = None,
  570. ) -> bool:
  571. """Add a new reference only if it does not already exist.
  572. Args:
  573. name: Ref name
  574. ref: Ref value
  575. committer: Optional committer name for reflog
  576. timestamp: Optional timestamp for reflog
  577. timezone: Optional timezone for reflog
  578. message: Optional message for reflog
  579. Returns:
  580. True if the add was successful, False otherwise.
  581. """
  582. if name in self._refs:
  583. return False
  584. self._refs[name] = ref
  585. self._notify(name, ref)
  586. self._log(
  587. name,
  588. None,
  589. ref,
  590. committer=committer,
  591. timestamp=timestamp,
  592. timezone=timezone,
  593. message=message,
  594. )
  595. return True
  596. def remove_if_equals(
  597. self,
  598. name: bytes,
  599. old_ref: bytes | None,
  600. committer: bytes | None = None,
  601. timestamp: int | None = None,
  602. timezone: int | None = None,
  603. message: bytes | None = None,
  604. ) -> bool:
  605. """Remove a refname only if it currently equals old_ref.
  606. This method does not follow symbolic references. It can be used to
  607. perform an atomic compare-and-delete operation.
  608. Args:
  609. name: The refname to delete.
  610. old_ref: The old sha the refname must refer to, or None to
  611. delete unconditionally.
  612. committer: Optional committer name for reflog
  613. timestamp: Optional timestamp for reflog
  614. timezone: Optional timezone for reflog
  615. message: Optional message for reflog
  616. Returns:
  617. True if the delete was successful, False otherwise.
  618. """
  619. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  620. return False
  621. try:
  622. old = self._refs.pop(name)
  623. except KeyError:
  624. pass
  625. else:
  626. self._notify(name, None)
  627. self._log(
  628. name,
  629. old,
  630. None,
  631. committer=committer,
  632. timestamp=timestamp,
  633. timezone=timezone,
  634. message=message,
  635. )
  636. return True
  637. def get_peeled(self, name: bytes) -> bytes | None:
  638. """Get peeled version of a reference."""
  639. return self._peeled.get(name)
  640. def _update(self, refs: Mapping[bytes, bytes]) -> None:
  641. """Update multiple refs; intended only for testing."""
  642. # TODO(dborowitz): replace this with a public function that uses
  643. # set_if_equal.
  644. for ref, sha in refs.items():
  645. self.set_if_equals(ref, None, sha)
  646. def _update_peeled(self, peeled: Mapping[bytes, bytes]) -> None:
  647. """Update cached peeled refs; intended only for testing."""
  648. self._peeled.update(peeled)
  649. class InfoRefsContainer(RefsContainer):
  650. """Refs container that reads refs from a info/refs file."""
  651. def __init__(self, f: BinaryIO) -> None:
  652. """Initialize InfoRefsContainer from info/refs file."""
  653. self._refs: dict[bytes, bytes] = {}
  654. self._peeled: dict[bytes, bytes] = {}
  655. refs = read_info_refs(f)
  656. (self._refs, self._peeled) = split_peeled_refs(refs)
  657. def allkeys(self) -> set[bytes]:
  658. """Return all reference keys."""
  659. return set(self._refs.keys())
  660. def read_loose_ref(self, name: bytes) -> bytes | None:
  661. """Read a loose reference."""
  662. return self._refs.get(name, None)
  663. def get_packed_refs(self) -> dict[bytes, bytes]:
  664. """Get packed references."""
  665. return {}
  666. def get_peeled(self, name: bytes) -> bytes | None:
  667. """Get peeled version of a reference."""
  668. try:
  669. return self._peeled[name]
  670. except KeyError:
  671. return self._refs[name]
  672. class DiskRefsContainer(RefsContainer):
  673. """Refs container that reads refs from disk."""
  674. def __init__(
  675. self,
  676. path: str | bytes | os.PathLike[str],
  677. worktree_path: str | bytes | os.PathLike[str] | None = None,
  678. logger: Callable[
  679. [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
  680. ]
  681. | None = None,
  682. ) -> None:
  683. """Initialize DiskRefsContainer."""
  684. super().__init__(logger=logger)
  685. # Convert path-like objects to strings, then to bytes for Git compatibility
  686. self.path = os.fsencode(os.fspath(path))
  687. if worktree_path is None:
  688. self.worktree_path = self.path
  689. else:
  690. self.worktree_path = os.fsencode(os.fspath(worktree_path))
  691. self._packed_refs: dict[bytes, bytes] | None = None
  692. self._peeled_refs: dict[bytes, bytes] | None = None
  693. def __repr__(self) -> str:
  694. """Return string representation of DiskRefsContainer."""
  695. return f"{self.__class__.__name__}({self.path!r})"
  696. def _iter_dir(
  697. self,
  698. path: bytes,
  699. base: bytes,
  700. dir_filter: Callable[[bytes], bool] | None = None,
  701. ) -> Iterator[bytes]:
  702. refspath = os.path.join(path, base.rstrip(b"/"))
  703. prefix_len = len(os.path.join(path, b""))
  704. for root, dirs, files in os.walk(refspath):
  705. directory = root[prefix_len:]
  706. if os.path.sep != "/":
  707. directory = directory.replace(os.fsencode(os.path.sep), b"/")
  708. if dir_filter is not None:
  709. dirs[:] = [
  710. d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
  711. ]
  712. for filename in files:
  713. refname = b"/".join([directory, filename])
  714. if check_ref_format(refname):
  715. yield refname
  716. def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]:
  717. base = base.rstrip(b"/") + b"/"
  718. search_paths: list[tuple[bytes, Callable[[bytes], bool] | None]] = []
  719. if base != b"refs/":
  720. path = self.worktree_path if is_per_worktree_ref(base) else self.path
  721. search_paths.append((path, None))
  722. elif self.worktree_path == self.path:
  723. # Iterate through all the refs from the main worktree
  724. search_paths.append((self.path, None))
  725. else:
  726. # Iterate through all the shared refs from the commondir, excluding per-worktree refs
  727. search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
  728. # Iterate through all the per-worktree refs from the worktree's gitdir
  729. search_paths.append((self.worktree_path, is_per_worktree_ref))
  730. for path, dir_filter in search_paths:
  731. yield from self._iter_dir(path, base, dir_filter=dir_filter)
  732. def subkeys(self, base: bytes) -> set[bytes]:
  733. """Return subkeys under a given base reference path."""
  734. subkeys = set()
  735. for key in self._iter_loose_refs(base):
  736. if key.startswith(base):
  737. subkeys.add(key[len(base) :].strip(b"/"))
  738. for key in self.get_packed_refs():
  739. if key.startswith(base):
  740. subkeys.add(key[len(base) :].strip(b"/"))
  741. return subkeys
  742. def allkeys(self) -> set[bytes]:
  743. """Return all reference keys."""
  744. allkeys = set()
  745. if os.path.exists(self.refpath(HEADREF)):
  746. allkeys.add(HEADREF)
  747. allkeys.update(self._iter_loose_refs())
  748. allkeys.update(self.get_packed_refs())
  749. return allkeys
  750. def refpath(self, name: bytes) -> bytes:
  751. """Return the disk path of a ref."""
  752. path = name
  753. if os.path.sep != "/":
  754. path = path.replace(b"/", os.fsencode(os.path.sep))
  755. root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
  756. return os.path.join(root_dir, path)
  757. def get_packed_refs(self) -> dict[bytes, bytes]:
  758. """Get contents of the packed-refs file.
  759. Returns: Dictionary mapping ref names to SHA1s
  760. Note: Will return an empty dictionary when no packed-refs file is
  761. present.
  762. """
  763. # TODO: invalidate the cache on repacking
  764. if self._packed_refs is None:
  765. # set both to empty because we want _peeled_refs to be
  766. # None if and only if _packed_refs is also None.
  767. self._packed_refs = {}
  768. self._peeled_refs = {}
  769. path = os.path.join(self.path, b"packed-refs")
  770. try:
  771. f = GitFile(path, "rb")
  772. except FileNotFoundError:
  773. return {}
  774. with f:
  775. first_line = next(iter(f)).rstrip()
  776. if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
  777. for sha, name, peeled in read_packed_refs_with_peeled(f):
  778. self._packed_refs[name] = sha
  779. if peeled:
  780. self._peeled_refs[name] = peeled
  781. else:
  782. f.seek(0)
  783. for sha, name in read_packed_refs(f):
  784. self._packed_refs[name] = sha
  785. return self._packed_refs
  786. def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
  787. """Add the given refs as packed refs.
  788. Args:
  789. new_refs: A mapping of ref names to targets; if a target is None that
  790. means remove the ref
  791. """
  792. if not new_refs:
  793. return
  794. path = os.path.join(self.path, b"packed-refs")
  795. with GitFile(path, "wb") as f:
  796. # reread cached refs from disk, while holding the lock
  797. packed_refs = self.get_packed_refs().copy()
  798. for ref, target in new_refs.items():
  799. # sanity check
  800. if ref == HEADREF:
  801. raise ValueError("cannot pack HEAD")
  802. # remove any loose refs pointing to this one -- please
  803. # note that this bypasses remove_if_equals as we don't
  804. # want to affect packed refs in here
  805. with suppress(OSError):
  806. os.remove(self.refpath(ref))
  807. if target is not None:
  808. packed_refs[ref] = target
  809. else:
  810. packed_refs.pop(ref, None)
  811. write_packed_refs(f, packed_refs, self._peeled_refs)
  812. self._packed_refs = packed_refs
  813. def get_peeled(self, name: bytes) -> bytes | None:
  814. """Return the cached peeled value of a ref, if available.
  815. Args:
  816. name: Name of the ref to peel
  817. Returns: The peeled value of the ref. If the ref is known not point to
  818. a tag, this will be the SHA the ref refers to. If the ref may point
  819. to a tag, but no cached information is available, None is returned.
  820. """
  821. self.get_packed_refs()
  822. if (
  823. self._peeled_refs is None
  824. or self._packed_refs is None
  825. or name not in self._packed_refs
  826. ):
  827. # No cache: no peeled refs were read, or this ref is loose
  828. return None
  829. if name in self._peeled_refs:
  830. return self._peeled_refs[name]
  831. else:
  832. # Known not peelable
  833. return self[name]
  834. def read_loose_ref(self, name: bytes) -> bytes | None:
  835. """Read a reference file and return its contents.
  836. If the reference file a symbolic reference, only read the first line of
  837. the file. Otherwise, only read the first 40 bytes.
  838. Args:
  839. name: the refname to read, relative to refpath
  840. Returns: The contents of the ref file, or None if the file does not
  841. exist.
  842. Raises:
  843. IOError: if any other error occurs
  844. """
  845. filename = self.refpath(name)
  846. try:
  847. with GitFile(filename, "rb") as f:
  848. header = f.read(len(SYMREF))
  849. if header == SYMREF:
  850. # Read only the first line
  851. return header + next(iter(f)).rstrip(b"\r\n")
  852. else:
  853. # Read only the first 40 bytes
  854. return header + f.read(40 - len(SYMREF))
  855. except (OSError, UnicodeError):
  856. # don't assume anything specific about the error; in
  857. # particular, invalid or forbidden paths can raise weird
  858. # errors depending on the specific operating system
  859. return None
  860. def _remove_packed_ref(self, name: bytes) -> None:
  861. if self._packed_refs is None:
  862. return
  863. filename = os.path.join(self.path, b"packed-refs")
  864. # reread cached refs from disk, while holding the lock
  865. f = GitFile(filename, "wb")
  866. try:
  867. self._packed_refs = None
  868. self.get_packed_refs()
  869. if self._packed_refs is None or name not in self._packed_refs:
  870. f.abort()
  871. return
  872. del self._packed_refs[name]
  873. if self._peeled_refs is not None:
  874. with suppress(KeyError):
  875. del self._peeled_refs[name]
  876. write_packed_refs(f, self._packed_refs, self._peeled_refs)
  877. f.close()
  878. except BaseException:
  879. f.abort()
  880. raise
  881. def set_symbolic_ref(
  882. self,
  883. name: bytes,
  884. other: bytes,
  885. committer: bytes | None = None,
  886. timestamp: int | None = None,
  887. timezone: int | None = None,
  888. message: bytes | None = None,
  889. ) -> None:
  890. """Make a ref point at another ref.
  891. Args:
  892. name: Name of the ref to set
  893. other: Name of the ref to point at
  894. committer: Optional committer name
  895. timestamp: Optional timestamp
  896. timezone: Optional timezone
  897. message: Optional message to describe the change
  898. """
  899. self._check_refname(name)
  900. self._check_refname(other)
  901. filename = self.refpath(name)
  902. f = GitFile(filename, "wb")
  903. try:
  904. f.write(SYMREF + other + b"\n")
  905. sha = self.follow(name)[-1]
  906. self._log(
  907. name,
  908. sha,
  909. sha,
  910. committer=committer,
  911. timestamp=timestamp,
  912. timezone=timezone,
  913. message=message,
  914. )
  915. except BaseException:
  916. f.abort()
  917. raise
  918. else:
  919. f.close()
  920. def set_if_equals(
  921. self,
  922. name: bytes,
  923. old_ref: bytes | None,
  924. new_ref: bytes,
  925. committer: bytes | None = None,
  926. timestamp: int | None = None,
  927. timezone: int | None = None,
  928. message: bytes | None = None,
  929. ) -> bool:
  930. """Set a refname to new_ref only if it currently equals old_ref.
  931. This method follows all symbolic references, and can be used to perform
  932. an atomic compare-and-swap operation.
  933. Args:
  934. name: The refname to set.
  935. old_ref: The old sha the refname must refer to, or None to set
  936. unconditionally.
  937. new_ref: The new sha the refname will refer to.
  938. committer: Optional committer name
  939. timestamp: Optional timestamp
  940. timezone: Optional timezone
  941. message: Set message for reflog
  942. Returns: True if the set was successful, False otherwise.
  943. """
  944. self._check_refname(name)
  945. try:
  946. realnames, _ = self.follow(name)
  947. realname = realnames[-1]
  948. except (KeyError, IndexError, SymrefLoop):
  949. realname = name
  950. filename = self.refpath(realname)
  951. # make sure none of the ancestor folders is in packed refs
  952. probe_ref = os.path.dirname(realname)
  953. packed_refs = self.get_packed_refs()
  954. while probe_ref:
  955. if packed_refs.get(probe_ref, None) is not None:
  956. raise NotADirectoryError(filename)
  957. probe_ref = os.path.dirname(probe_ref)
  958. ensure_dir_exists(os.path.dirname(filename))
  959. with GitFile(filename, "wb") as f:
  960. if old_ref is not None:
  961. try:
  962. # read again while holding the lock to handle race conditions
  963. orig_ref = self.read_loose_ref(realname)
  964. if orig_ref is None:
  965. orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
  966. if orig_ref != old_ref:
  967. f.abort()
  968. return False
  969. except OSError:
  970. f.abort()
  971. raise
  972. # Check if ref already has the desired value while holding the lock
  973. # This avoids fsync when ref is unchanged but still detects lock conflicts
  974. current_ref = self.read_loose_ref(realname)
  975. if current_ref is None:
  976. current_ref = packed_refs.get(realname, None)
  977. if current_ref is not None and current_ref == new_ref:
  978. # Ref already has desired value, abort write to avoid fsync
  979. f.abort()
  980. return True
  981. try:
  982. f.write(new_ref + b"\n")
  983. except OSError:
  984. f.abort()
  985. raise
  986. self._log(
  987. realname,
  988. old_ref,
  989. new_ref,
  990. committer=committer,
  991. timestamp=timestamp,
  992. timezone=timezone,
  993. message=message,
  994. )
  995. return True
  996. def add_if_new(
  997. self,
  998. name: bytes,
  999. ref: bytes,
  1000. committer: bytes | None = None,
  1001. timestamp: int | None = None,
  1002. timezone: int | None = None,
  1003. message: bytes | None = None,
  1004. ) -> bool:
  1005. """Add a new reference only if it does not already exist.
  1006. This method follows symrefs, and only ensures that the last ref in the
  1007. chain does not exist.
  1008. Args:
  1009. name: The refname to set.
  1010. ref: The new sha the refname will refer to.
  1011. committer: Optional committer name
  1012. timestamp: Optional timestamp
  1013. timezone: Optional timezone
  1014. message: Optional message for reflog
  1015. Returns: True if the add was successful, False otherwise.
  1016. """
  1017. try:
  1018. realnames, contents = self.follow(name)
  1019. if contents is not None:
  1020. return False
  1021. realname = realnames[-1]
  1022. except (KeyError, IndexError):
  1023. realname = name
  1024. self._check_refname(realname)
  1025. filename = self.refpath(realname)
  1026. ensure_dir_exists(os.path.dirname(filename))
  1027. with GitFile(filename, "wb") as f:
  1028. if os.path.exists(filename) or name in self.get_packed_refs():
  1029. f.abort()
  1030. return False
  1031. try:
  1032. f.write(ref + b"\n")
  1033. except OSError:
  1034. f.abort()
  1035. raise
  1036. else:
  1037. self._log(
  1038. name,
  1039. None,
  1040. ref,
  1041. committer=committer,
  1042. timestamp=timestamp,
  1043. timezone=timezone,
  1044. message=message,
  1045. )
  1046. return True
  1047. def remove_if_equals(
  1048. self,
  1049. name: bytes,
  1050. old_ref: bytes | None,
  1051. committer: bytes | None = None,
  1052. timestamp: int | None = None,
  1053. timezone: int | None = None,
  1054. message: bytes | None = None,
  1055. ) -> bool:
  1056. """Remove a refname only if it currently equals old_ref.
  1057. This method does not follow symbolic references. It can be used to
  1058. perform an atomic compare-and-delete operation.
  1059. Args:
  1060. name: The refname to delete.
  1061. old_ref: The old sha the refname must refer to, or None to
  1062. delete unconditionally.
  1063. committer: Optional committer name
  1064. timestamp: Optional timestamp
  1065. timezone: Optional timezone
  1066. message: Optional message
  1067. Returns: True if the delete was successful, False otherwise.
  1068. """
  1069. self._check_refname(name)
  1070. filename = self.refpath(name)
  1071. ensure_dir_exists(os.path.dirname(filename))
  1072. f = GitFile(filename, "wb")
  1073. try:
  1074. if old_ref is not None:
  1075. orig_ref = self.read_loose_ref(name)
  1076. if orig_ref is None:
  1077. orig_ref = self.get_packed_refs().get(name, ZERO_SHA)
  1078. if orig_ref != old_ref:
  1079. return False
  1080. # remove the reference file itself
  1081. try:
  1082. found = os.path.lexists(filename)
  1083. except OSError:
  1084. # may only be packed, or otherwise unstorable
  1085. found = False
  1086. if found:
  1087. os.remove(filename)
  1088. self._remove_packed_ref(name)
  1089. self._log(
  1090. name,
  1091. old_ref,
  1092. None,
  1093. committer=committer,
  1094. timestamp=timestamp,
  1095. timezone=timezone,
  1096. message=message,
  1097. )
  1098. finally:
  1099. # never write, we just wanted the lock
  1100. f.abort()
  1101. # outside of the lock, clean-up any parent directory that might now
  1102. # be empty. this ensures that re-creating a reference of the same
  1103. # name of what was previously a directory works as expected
  1104. parent = name
  1105. while True:
  1106. try:
  1107. parent, _ = parent.rsplit(b"/", 1)
  1108. except ValueError:
  1109. break
  1110. if parent == b"refs":
  1111. break
  1112. parent_filename = self.refpath(parent)
  1113. try:
  1114. os.rmdir(parent_filename)
  1115. except OSError:
  1116. # this can be caused by the parent directory being
  1117. # removed by another process, being not empty, etc.
  1118. # in any case, this is non fatal because we already
  1119. # removed the reference, just ignore it
  1120. break
  1121. return True
  1122. def pack_refs(self, all: bool = False) -> None:
  1123. """Pack loose refs into packed-refs file.
  1124. Args:
  1125. all: If True, pack all refs. If False, only pack tags.
  1126. """
  1127. refs_to_pack: dict[Ref, ObjectID | None] = {}
  1128. for ref in self.allkeys():
  1129. if ref == HEADREF:
  1130. # Never pack HEAD
  1131. continue
  1132. if all or ref.startswith(LOCAL_TAG_PREFIX):
  1133. try:
  1134. sha = self[ref]
  1135. if sha:
  1136. refs_to_pack[ref] = sha
  1137. except KeyError:
  1138. # Broken ref, skip it
  1139. pass
  1140. if refs_to_pack:
  1141. self.add_packed_refs(refs_to_pack)
  1142. def _split_ref_line(line: bytes) -> tuple[bytes, bytes]:
  1143. """Split a single ref line into a tuple of SHA1 and name."""
  1144. fields = line.rstrip(b"\n\r").split(b" ")
  1145. if len(fields) != 2:
  1146. raise PackedRefsException(f"invalid ref line {line!r}")
  1147. sha, name = fields
  1148. if not valid_hexsha(sha):
  1149. raise PackedRefsException(f"Invalid hex sha {sha!r}")
  1150. if not check_ref_format(name):
  1151. raise PackedRefsException(f"invalid ref name {name!r}")
  1152. return (sha, name)
  1153. def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[bytes, bytes]]:
  1154. """Read a packed refs file.
  1155. Args:
  1156. f: file-like object to read from
  1157. Returns: Iterator over tuples with SHA1s and ref names.
  1158. """
  1159. for line in f:
  1160. if line.startswith(b"#"):
  1161. # Comment
  1162. continue
  1163. if line.startswith(b"^"):
  1164. raise PackedRefsException("found peeled ref in packed-refs without peeled")
  1165. yield _split_ref_line(line)
  1166. def read_packed_refs_with_peeled(
  1167. f: IO[bytes],
  1168. ) -> Iterator[tuple[bytes, bytes, bytes | None]]:
  1169. """Read a packed refs file including peeled refs.
  1170. Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
  1171. with ref names, SHA1s, and peeled SHA1s (or None).
  1172. Args:
  1173. f: file-like object to read from, seek'ed to the second line
  1174. """
  1175. last = None
  1176. for line in f:
  1177. if line.startswith(b"#"):
  1178. continue
  1179. line = line.rstrip(b"\r\n")
  1180. if line.startswith(b"^"):
  1181. if not last:
  1182. raise PackedRefsException("unexpected peeled ref line")
  1183. if not valid_hexsha(line[1:]):
  1184. raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
  1185. sha, name = _split_ref_line(last)
  1186. last = None
  1187. yield (sha, name, line[1:])
  1188. else:
  1189. if last:
  1190. sha, name = _split_ref_line(last)
  1191. yield (sha, name, None)
  1192. last = line
  1193. if last:
  1194. sha, name = _split_ref_line(last)
  1195. yield (sha, name, None)
  1196. def write_packed_refs(
  1197. f: IO[bytes],
  1198. packed_refs: Mapping[bytes, bytes],
  1199. peeled_refs: Mapping[bytes, bytes] | None = None,
  1200. ) -> None:
  1201. """Write a packed refs file.
  1202. Args:
  1203. f: empty file-like object to write to
  1204. packed_refs: dict of refname to sha of packed refs to write
  1205. peeled_refs: dict of refname to peeled value of sha
  1206. """
  1207. if peeled_refs is None:
  1208. peeled_refs = {}
  1209. else:
  1210. f.write(b"# pack-refs with: peeled\n")
  1211. for refname in sorted(packed_refs.keys()):
  1212. f.write(git_line(packed_refs[refname], refname))
  1213. if refname in peeled_refs:
  1214. f.write(b"^" + peeled_refs[refname] + b"\n")
  1215. def read_info_refs(f: BinaryIO) -> dict[bytes, bytes]:
  1216. """Read info/refs file.
  1217. Args:
  1218. f: File-like object to read from
  1219. Returns:
  1220. Dictionary mapping ref names to SHA1s
  1221. """
  1222. ret = {}
  1223. for line in f.readlines():
  1224. (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
  1225. ret[name] = sha
  1226. return ret
  1227. def write_info_refs(
  1228. refs: Mapping[bytes, bytes], store: ObjectContainer
  1229. ) -> Iterator[bytes]:
  1230. """Generate info refs."""
  1231. # TODO: Avoid recursive import :(
  1232. from .object_store import peel_sha
  1233. for name, sha in sorted(refs.items()):
  1234. # get_refs() includes HEAD as a special case, but we don't want to
  1235. # advertise it
  1236. if name == HEADREF:
  1237. continue
  1238. try:
  1239. o = store[sha]
  1240. except KeyError:
  1241. continue
  1242. _unpeeled, peeled = peel_sha(store, sha)
  1243. yield o.id + b"\t" + name + b"\n"
  1244. if o.id != peeled.id:
  1245. yield peeled.id + b"\t" + name + PEELED_TAG_SUFFIX + b"\n"
  1246. def is_local_branch(x: bytes) -> bool:
  1247. """Check if a ref name is a local branch."""
  1248. return x.startswith(LOCAL_BRANCH_PREFIX)
  1249. def local_branch_name(name: bytes) -> bytes:
  1250. """Build a full branch ref from a short name.
  1251. Args:
  1252. name: Short branch name (e.g., b"master") or full ref
  1253. Returns:
  1254. Full branch ref name (e.g., b"refs/heads/master")
  1255. Examples:
  1256. >>> local_branch_name(b"master")
  1257. b'refs/heads/master'
  1258. >>> local_branch_name(b"refs/heads/master")
  1259. b'refs/heads/master'
  1260. """
  1261. if name.startswith(LOCAL_BRANCH_PREFIX):
  1262. return name
  1263. return LOCAL_BRANCH_PREFIX + name
  1264. def local_tag_name(name: bytes) -> bytes:
  1265. """Build a full tag ref from a short name.
  1266. Args:
  1267. name: Short tag name (e.g., b"v1.0") or full ref
  1268. Returns:
  1269. Full tag ref name (e.g., b"refs/tags/v1.0")
  1270. Examples:
  1271. >>> local_tag_name(b"v1.0")
  1272. b'refs/tags/v1.0'
  1273. >>> local_tag_name(b"refs/tags/v1.0")
  1274. b'refs/tags/v1.0'
  1275. """
  1276. if name.startswith(LOCAL_TAG_PREFIX):
  1277. return name
  1278. return LOCAL_TAG_PREFIX + name
  1279. def local_replace_name(name: bytes) -> bytes:
  1280. """Build a full replace ref from a short name.
  1281. Args:
  1282. name: Short replace name (object SHA) or full ref
  1283. Returns:
  1284. Full replace ref name (e.g., b"refs/replace/<sha>")
  1285. Examples:
  1286. >>> local_replace_name(b"abc123")
  1287. b'refs/replace/abc123'
  1288. >>> local_replace_name(b"refs/replace/abc123")
  1289. b'refs/replace/abc123'
  1290. """
  1291. if name.startswith(LOCAL_REPLACE_PREFIX):
  1292. return name
  1293. return LOCAL_REPLACE_PREFIX + name
  1294. def extract_branch_name(ref: bytes) -> bytes:
  1295. """Extract branch name from a full branch ref.
  1296. Args:
  1297. ref: Full branch ref (e.g., b"refs/heads/master")
  1298. Returns:
  1299. Short branch name (e.g., b"master")
  1300. Raises:
  1301. ValueError: If ref is not a local branch
  1302. Examples:
  1303. >>> extract_branch_name(b"refs/heads/master")
  1304. b'master'
  1305. >>> extract_branch_name(b"refs/heads/feature/foo")
  1306. b'feature/foo'
  1307. """
  1308. if not ref.startswith(LOCAL_BRANCH_PREFIX):
  1309. raise ValueError(f"Not a local branch ref: {ref!r}")
  1310. return ref[len(LOCAL_BRANCH_PREFIX) :]
  1311. def extract_tag_name(ref: bytes) -> bytes:
  1312. """Extract tag name from a full tag ref.
  1313. Args:
  1314. ref: Full tag ref (e.g., b"refs/tags/v1.0")
  1315. Returns:
  1316. Short tag name (e.g., b"v1.0")
  1317. Raises:
  1318. ValueError: If ref is not a local tag
  1319. Examples:
  1320. >>> extract_tag_name(b"refs/tags/v1.0")
  1321. b'v1.0'
  1322. """
  1323. if not ref.startswith(LOCAL_TAG_PREFIX):
  1324. raise ValueError(f"Not a local tag ref: {ref!r}")
  1325. return ref[len(LOCAL_TAG_PREFIX) :]
  1326. def shorten_ref_name(ref: bytes) -> bytes:
  1327. """Convert a full ref name to its short form.
  1328. Args:
  1329. ref: Full ref name (e.g., b"refs/heads/master")
  1330. Returns:
  1331. Short ref name (e.g., b"master")
  1332. Examples:
  1333. >>> shorten_ref_name(b"refs/heads/master")
  1334. b'master'
  1335. >>> shorten_ref_name(b"refs/remotes/origin/main")
  1336. b'origin/main'
  1337. >>> shorten_ref_name(b"refs/tags/v1.0")
  1338. b'v1.0'
  1339. >>> shorten_ref_name(b"HEAD")
  1340. b'HEAD'
  1341. """
  1342. if ref.startswith(LOCAL_BRANCH_PREFIX):
  1343. return ref[len(LOCAL_BRANCH_PREFIX) :]
  1344. elif ref.startswith(LOCAL_REMOTE_PREFIX):
  1345. return ref[len(LOCAL_REMOTE_PREFIX) :]
  1346. elif ref.startswith(LOCAL_TAG_PREFIX):
  1347. return ref[len(LOCAL_TAG_PREFIX) :]
  1348. return ref
  1349. T = TypeVar("T", dict[bytes, bytes], dict[bytes, bytes | None])
  1350. def strip_peeled_refs(refs: T) -> T:
  1351. """Remove all peeled refs."""
  1352. return {
  1353. ref: sha for (ref, sha) in refs.items() if not ref.endswith(PEELED_TAG_SUFFIX)
  1354. }
  1355. def split_peeled_refs(refs: T) -> tuple[T, dict[bytes, bytes]]:
  1356. """Split peeled refs from regular refs."""
  1357. peeled: dict[bytes, bytes] = {}
  1358. regular = {k: v for k, v in refs.items() if not k.endswith(PEELED_TAG_SUFFIX)}
  1359. for ref, sha in refs.items():
  1360. if ref.endswith(PEELED_TAG_SUFFIX):
  1361. # Only add to peeled dict if sha is not None
  1362. if sha is not None:
  1363. peeled[ref[: -len(PEELED_TAG_SUFFIX)]] = sha
  1364. return regular, peeled
  1365. def _set_origin_head(
  1366. refs: RefsContainer, origin: bytes, origin_head: bytes | None
  1367. ) -> None:
  1368. # set refs/remotes/origin/HEAD
  1369. origin_base = b"refs/remotes/" + origin + b"/"
  1370. if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1371. origin_ref = origin_base + HEADREF
  1372. target_ref = origin_base + extract_branch_name(origin_head)
  1373. if target_ref in refs:
  1374. refs.set_symbolic_ref(origin_ref, target_ref)
  1375. def _set_default_branch(
  1376. refs: RefsContainer,
  1377. origin: bytes,
  1378. origin_head: bytes | None,
  1379. branch: bytes | None,
  1380. ref_message: bytes | None,
  1381. ) -> bytes:
  1382. """Set the default branch."""
  1383. origin_base = b"refs/remotes/" + origin + b"/"
  1384. if branch:
  1385. origin_ref = origin_base + branch
  1386. if origin_ref in refs:
  1387. local_ref = local_branch_name(branch)
  1388. refs.add_if_new(local_ref, refs[origin_ref], ref_message)
  1389. head_ref = local_ref
  1390. elif local_tag_name(branch) in refs:
  1391. head_ref = local_tag_name(branch)
  1392. else:
  1393. raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
  1394. elif origin_head:
  1395. head_ref = origin_head
  1396. if origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1397. origin_ref = origin_base + extract_branch_name(origin_head)
  1398. else:
  1399. origin_ref = origin_head
  1400. try:
  1401. refs.add_if_new(head_ref, refs[origin_ref], ref_message)
  1402. except KeyError:
  1403. pass
  1404. else:
  1405. raise ValueError("neither origin_head nor branch are provided")
  1406. return head_ref
  1407. def _set_head(
  1408. refs: RefsContainer, head_ref: bytes, ref_message: bytes | None
  1409. ) -> bytes | None:
  1410. if head_ref.startswith(LOCAL_TAG_PREFIX):
  1411. # detach HEAD at specified tag
  1412. head = refs[head_ref]
  1413. if isinstance(head, Tag):
  1414. _cls, obj = head.object
  1415. head = obj.get_object(obj).id
  1416. del refs[HEADREF]
  1417. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1418. else:
  1419. # set HEAD to specific branch
  1420. try:
  1421. head = refs[head_ref]
  1422. refs.set_symbolic_ref(HEADREF, head_ref)
  1423. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1424. except KeyError:
  1425. head = None
  1426. return head
  1427. def _import_remote_refs(
  1428. refs_container: RefsContainer,
  1429. remote_name: str,
  1430. refs: dict[bytes, bytes | None],
  1431. message: bytes | None = None,
  1432. prune: bool = False,
  1433. prune_tags: bool = False,
  1434. ) -> None:
  1435. stripped_refs = strip_peeled_refs(refs)
  1436. branches = {
  1437. extract_branch_name(n): v
  1438. for (n, v) in stripped_refs.items()
  1439. if n.startswith(LOCAL_BRANCH_PREFIX) and v is not None
  1440. }
  1441. refs_container.import_refs(
  1442. b"refs/remotes/" + remote_name.encode(),
  1443. branches,
  1444. message=message,
  1445. prune=prune,
  1446. )
  1447. tags = {
  1448. extract_tag_name(n): v
  1449. for (n, v) in stripped_refs.items()
  1450. if n.startswith(LOCAL_TAG_PREFIX)
  1451. and not n.endswith(PEELED_TAG_SUFFIX)
  1452. and v is not None
  1453. }
  1454. refs_container.import_refs(
  1455. LOCAL_TAG_PREFIX, tags, message=message, prune=prune_tags
  1456. )
  1457. def serialize_refs(
  1458. store: ObjectContainer, refs: Mapping[bytes, bytes]
  1459. ) -> dict[bytes, bytes]:
  1460. """Serialize refs with peeled refs.
  1461. Args:
  1462. store: Object store to peel refs from
  1463. refs: Dictionary of ref names to SHAs
  1464. Returns:
  1465. Dictionary with refs and peeled refs (marked with ^{})
  1466. """
  1467. # TODO: Avoid recursive import :(
  1468. from .object_store import peel_sha
  1469. ret = {}
  1470. for ref, sha in refs.items():
  1471. try:
  1472. unpeeled, peeled = peel_sha(store, sha)
  1473. except KeyError:
  1474. warnings.warn(
  1475. "ref {} points at non-present sha {}".format(
  1476. ref.decode("utf-8", "replace"), sha.decode("ascii")
  1477. ),
  1478. UserWarning,
  1479. )
  1480. continue
  1481. else:
  1482. if isinstance(unpeeled, Tag):
  1483. ret[ref + PEELED_TAG_SUFFIX] = peeled.id
  1484. ret[ref] = unpeeled.id
  1485. return ret
  1486. class locked_ref:
  1487. """Lock a ref while making modifications.
  1488. Works as a context manager.
  1489. """
  1490. def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
  1491. """Initialize a locked ref.
  1492. Args:
  1493. refs_container: The DiskRefsContainer to lock the ref in
  1494. refname: The ref name to lock
  1495. """
  1496. self._refs_container = refs_container
  1497. self._refname = refname
  1498. self._file: _GitFile | None = None
  1499. self._realname: Ref | None = None
  1500. self._deleted = False
  1501. def __enter__(self) -> "locked_ref":
  1502. """Enter the context manager and acquire the lock.
  1503. Returns:
  1504. This locked_ref instance
  1505. Raises:
  1506. OSError: If the lock cannot be acquired
  1507. """
  1508. self._refs_container._check_refname(self._refname)
  1509. try:
  1510. realnames, _ = self._refs_container.follow(self._refname)
  1511. self._realname = realnames[-1]
  1512. except (KeyError, IndexError, SymrefLoop):
  1513. self._realname = self._refname
  1514. filename = self._refs_container.refpath(self._realname)
  1515. ensure_dir_exists(os.path.dirname(filename))
  1516. f = GitFile(filename, "wb")
  1517. self._file = f
  1518. return self
  1519. def __exit__(
  1520. self,
  1521. exc_type: type | None,
  1522. exc_value: BaseException | None,
  1523. traceback: types.TracebackType | None,
  1524. ) -> None:
  1525. """Exit the context manager and release the lock.
  1526. Args:
  1527. exc_type: Type of exception if one occurred
  1528. exc_value: Exception instance if one occurred
  1529. traceback: Traceback if an exception occurred
  1530. """
  1531. if self._file:
  1532. if exc_type is not None or self._deleted:
  1533. self._file.abort()
  1534. else:
  1535. self._file.close()
  1536. def get(self) -> bytes | None:
  1537. """Get the current value of the ref."""
  1538. if not self._file:
  1539. raise RuntimeError("locked_ref not in context")
  1540. assert self._realname is not None
  1541. current_ref = self._refs_container.read_loose_ref(self._realname)
  1542. if current_ref is None:
  1543. current_ref = self._refs_container.get_packed_refs().get(
  1544. self._realname, None
  1545. )
  1546. return current_ref
  1547. def ensure_equals(self, expected_value: bytes | None) -> bool:
  1548. """Ensure the ref currently equals the expected value.
  1549. Args:
  1550. expected_value: The expected current value of the ref
  1551. Returns:
  1552. True if the ref equals the expected value, False otherwise
  1553. """
  1554. current_value = self.get()
  1555. return current_value == expected_value
  1556. def set(self, new_ref: bytes) -> None:
  1557. """Set the ref to a new value.
  1558. Args:
  1559. new_ref: The new SHA1 or symbolic ref value
  1560. """
  1561. if not self._file:
  1562. raise RuntimeError("locked_ref not in context")
  1563. if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
  1564. raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
  1565. self._file.seek(0)
  1566. self._file.truncate()
  1567. self._file.write(new_ref + b"\n")
  1568. self._deleted = False
  1569. def set_symbolic_ref(self, target: Ref) -> None:
  1570. """Make this ref point at another ref.
  1571. Args:
  1572. target: Name of the ref to point at
  1573. """
  1574. if not self._file:
  1575. raise RuntimeError("locked_ref not in context")
  1576. self._refs_container._check_refname(target)
  1577. self._file.seek(0)
  1578. self._file.truncate()
  1579. self._file.write(SYMREF + target + b"\n")
  1580. self._deleted = False
  1581. def delete(self) -> None:
  1582. """Delete the ref file while holding the lock."""
  1583. if not self._file:
  1584. raise RuntimeError("locked_ref not in context")
  1585. # Delete the actual ref file while holding the lock
  1586. if self._realname:
  1587. filename = self._refs_container.refpath(self._realname)
  1588. try:
  1589. if os.path.lexists(filename):
  1590. os.remove(filename)
  1591. except FileNotFoundError:
  1592. pass
  1593. self._refs_container._remove_packed_ref(self._realname)
  1594. self._deleted = True
  1595. class NamespacedRefsContainer(RefsContainer):
  1596. """Wrapper that adds namespace prefix to all ref operations.
  1597. This implements Git's GIT_NAMESPACE feature, which stores refs under
  1598. refs/namespaces/<namespace>/ and filters operations to only show refs
  1599. within that namespace.
  1600. Example:
  1601. With namespace "foo", a ref "refs/heads/master" is stored as
  1602. "refs/namespaces/foo/refs/heads/master" in the underlying container.
  1603. """
  1604. def __init__(self, refs: RefsContainer, namespace: bytes) -> None:
  1605. """Initialize NamespacedRefsContainer.
  1606. Args:
  1607. refs: The underlying refs container to wrap
  1608. namespace: The namespace prefix (e.g., b"foo" or b"foo/bar")
  1609. """
  1610. super().__init__(logger=refs._logger)
  1611. self._refs = refs
  1612. # Build namespace prefix: refs/namespaces/<namespace>/
  1613. # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/
  1614. namespace_parts = namespace.split(b"/")
  1615. self._namespace_prefix = b""
  1616. for part in namespace_parts:
  1617. self._namespace_prefix += b"refs/namespaces/" + part + b"/"
  1618. def _apply_namespace(self, name: bytes) -> bytes:
  1619. """Apply namespace prefix to a ref name."""
  1620. # HEAD and other special refs are not namespaced
  1621. if name == HEADREF or not name.startswith(b"refs/"):
  1622. return name
  1623. return self._namespace_prefix + name
  1624. def _strip_namespace(self, name: bytes) -> bytes | None:
  1625. """Remove namespace prefix from a ref name.
  1626. Returns None if the ref is not in our namespace.
  1627. """
  1628. # HEAD and other special refs are not namespaced
  1629. if name == HEADREF or not name.startswith(b"refs/"):
  1630. return name
  1631. if name.startswith(self._namespace_prefix):
  1632. return name[len(self._namespace_prefix) :]
  1633. return None
  1634. def allkeys(self) -> set[bytes]:
  1635. """Return all reference keys in this namespace."""
  1636. keys = set()
  1637. for key in self._refs.allkeys():
  1638. stripped = self._strip_namespace(key)
  1639. if stripped is not None:
  1640. keys.add(stripped)
  1641. return keys
  1642. def read_loose_ref(self, name: bytes) -> bytes | None:
  1643. """Read a loose reference."""
  1644. return self._refs.read_loose_ref(self._apply_namespace(name))
  1645. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  1646. """Get packed refs within this namespace."""
  1647. packed = {}
  1648. for name, value in self._refs.get_packed_refs().items():
  1649. stripped = self._strip_namespace(name)
  1650. if stripped is not None:
  1651. packed[stripped] = value
  1652. return packed
  1653. def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
  1654. """Add packed refs with namespace prefix."""
  1655. namespaced_refs = {
  1656. self._apply_namespace(name): value for name, value in new_refs.items()
  1657. }
  1658. self._refs.add_packed_refs(namespaced_refs)
  1659. def get_peeled(self, name: bytes) -> ObjectID | None:
  1660. """Return the cached peeled value of a ref."""
  1661. return self._refs.get_peeled(self._apply_namespace(name))
  1662. def set_symbolic_ref(
  1663. self,
  1664. name: bytes,
  1665. other: bytes,
  1666. committer: bytes | None = None,
  1667. timestamp: int | None = None,
  1668. timezone: int | None = None,
  1669. message: bytes | None = None,
  1670. ) -> None:
  1671. """Make a ref point at another ref."""
  1672. self._refs.set_symbolic_ref(
  1673. self._apply_namespace(name),
  1674. self._apply_namespace(other),
  1675. committer=committer,
  1676. timestamp=timestamp,
  1677. timezone=timezone,
  1678. message=message,
  1679. )
  1680. def set_if_equals(
  1681. self,
  1682. name: bytes,
  1683. old_ref: bytes | None,
  1684. new_ref: bytes,
  1685. committer: bytes | None = None,
  1686. timestamp: int | None = None,
  1687. timezone: int | None = None,
  1688. message: bytes | None = None,
  1689. ) -> bool:
  1690. """Set a refname to new_ref only if it currently equals old_ref."""
  1691. return self._refs.set_if_equals(
  1692. self._apply_namespace(name),
  1693. old_ref,
  1694. new_ref,
  1695. committer=committer,
  1696. timestamp=timestamp,
  1697. timezone=timezone,
  1698. message=message,
  1699. )
  1700. def add_if_new(
  1701. self,
  1702. name: bytes,
  1703. ref: bytes,
  1704. committer: bytes | None = None,
  1705. timestamp: int | None = None,
  1706. timezone: int | None = None,
  1707. message: bytes | None = None,
  1708. ) -> bool:
  1709. """Add a new reference only if it does not already exist."""
  1710. return self._refs.add_if_new(
  1711. self._apply_namespace(name),
  1712. ref,
  1713. committer=committer,
  1714. timestamp=timestamp,
  1715. timezone=timezone,
  1716. message=message,
  1717. )
  1718. def remove_if_equals(
  1719. self,
  1720. name: bytes,
  1721. old_ref: bytes | None,
  1722. committer: bytes | None = None,
  1723. timestamp: int | None = None,
  1724. timezone: int | None = None,
  1725. message: bytes | None = None,
  1726. ) -> bool:
  1727. """Remove a refname only if it currently equals old_ref."""
  1728. return self._refs.remove_if_equals(
  1729. self._apply_namespace(name),
  1730. old_ref,
  1731. committer=committer,
  1732. timestamp=timestamp,
  1733. timezone=timezone,
  1734. message=message,
  1735. )
  1736. def pack_refs(self, all: bool = False) -> None:
  1737. """Pack loose refs into packed-refs file.
  1738. Note: This packs all refs in the underlying container, not just
  1739. those in the namespace.
  1740. """
  1741. self._refs.pack_refs(all=all)
  1742. def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
  1743. """Filter refs to only include those with a given prefix.
  1744. Args:
  1745. refs: A dictionary of refs.
  1746. prefixes: The prefixes to filter by.
  1747. """
  1748. filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
  1749. return filtered
  1750. def is_per_worktree_ref(ref: bytes) -> bool:
  1751. """Returns whether a reference is stored per worktree or not.
  1752. Per-worktree references are:
  1753. - all pseudorefs, e.g. HEAD
  1754. - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
  1755. All refs starting with "refs/" are shared, except for the ones listed above.
  1756. See https://git-scm.com/docs/git-worktree#_refs.
  1757. """
  1758. return not ref.startswith(b"refs/") or ref.startswith(
  1759. (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
  1760. )