refs.py 64 KB


  1. # refs.py -- For dealing with git refs
  2. # Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Ref handling."""
  22. __all__ = [
  23. "HEADREF",
  24. "LOCAL_BRANCH_PREFIX",
  25. "LOCAL_NOTES_PREFIX",
  26. "LOCAL_REMOTE_PREFIX",
  27. "LOCAL_REPLACE_PREFIX",
  28. "LOCAL_TAG_PREFIX",
  29. "SYMREF",
  30. "DictRefsContainer",
  31. "DiskRefsContainer",
  32. "NamespacedRefsContainer",
  33. "Ref",
  34. "RefsContainer",
  35. "SymrefLoop",
  36. "check_ref_format",
  37. "extract_branch_name",
  38. "extract_tag_name",
  39. "filter_ref_prefix",
  40. "is_local_branch",
  41. "is_per_worktree_ref",
  42. "local_branch_name",
  43. "local_replace_name",
  44. "local_tag_name",
  45. "parse_remote_ref",
  46. "parse_symref_value",
  47. "read_info_refs",
  48. "read_packed_refs",
  49. "read_packed_refs_with_peeled",
  50. "set_ref_from_raw",
  51. "shorten_ref_name",
  52. "write_packed_refs",
  53. ]
  54. import os
  55. import types
  56. from collections.abc import Callable, Iterable, Iterator, Mapping
  57. from contextlib import suppress
  58. from typing import (
  59. IO,
  60. TYPE_CHECKING,
  61. Any,
  62. BinaryIO,
  63. NewType,
  64. TypeVar,
  65. )
  66. if TYPE_CHECKING:
  67. from .file import _GitFile
  68. from .errors import PackedRefsException, RefFormatError
  69. from .file import GitFile, ensure_dir_exists
  70. from .objects import ZERO_SHA, ObjectID, Tag, git_line, valid_hexsha
  71. Ref = NewType("Ref", bytes)
  72. T = TypeVar("T", dict[Ref, ObjectID], dict[Ref, ObjectID | None])
  73. HEADREF = Ref(b"HEAD")
  74. SYMREF = b"ref: "
  75. LOCAL_BRANCH_PREFIX = b"refs/heads/"
  76. LOCAL_TAG_PREFIX = b"refs/tags/"
  77. LOCAL_REMOTE_PREFIX = b"refs/remotes/"
  78. LOCAL_NOTES_PREFIX = b"refs/notes/"
  79. LOCAL_REPLACE_PREFIX = b"refs/replace/"
  80. BAD_REF_CHARS: set[int] = set(b"\177 ~^:?*[")
  81. class SymrefLoop(Exception):
  82. """There is a loop between one or more symrefs."""
  83. def __init__(self, ref: bytes, depth: int) -> None:
  84. """Initialize SymrefLoop exception."""
  85. self.ref = ref
  86. self.depth = depth
  87. def parse_symref_value(contents: bytes) -> bytes:
  88. """Parse a symref value.
  89. Args:
  90. contents: Contents to parse
  91. Returns: Destination
  92. """
  93. if contents.startswith(SYMREF):
  94. return contents[len(SYMREF) :].rstrip(b"\r\n")
  95. raise ValueError(contents)
  96. def check_ref_format(refname: Ref) -> bool:
  97. """Check if a refname is correctly formatted.
  98. Implements all the same rules as git-check-ref-format[1].
  99. [1]
  100. http://www.kernel.org/pub/software/scm/git/docs/git-check-ref-format.html
  101. Args:
  102. refname: The refname to check
  103. Returns: True if refname is valid, False otherwise
  104. """
  105. # These could be combined into one big expression, but are listed
  106. # separately to parallel [1].
  107. if b"/." in refname or refname.startswith(b"."): # type: ignore[comparison-overlap]
  108. return False
  109. if b"/" not in refname: # type: ignore[comparison-overlap]
  110. return False
  111. if b".." in refname: # type: ignore[comparison-overlap]
  112. return False
  113. for i, c in enumerate(refname):
  114. if ord(refname[i : i + 1]) < 0o40 or c in BAD_REF_CHARS:
  115. return False
  116. if refname[-1] in b"/.":
  117. return False
  118. if refname.endswith(b".lock"):
  119. return False
  120. if b"@{" in refname: # type: ignore[comparison-overlap]
  121. return False
  122. if b"\\" in refname: # type: ignore[comparison-overlap]
  123. return False
  124. return True
  125. def parse_remote_ref(ref: bytes) -> tuple[bytes, bytes]:
  126. """Parse a remote ref into remote name and branch name.
  127. Args:
  128. ref: Remote ref like b"refs/remotes/origin/main"
  129. Returns:
  130. Tuple of (remote_name, branch_name)
  131. Raises:
  132. ValueError: If ref is not a valid remote ref
  133. """
  134. if not ref.startswith(LOCAL_REMOTE_PREFIX):
  135. raise ValueError(f"Not a remote ref: {ref!r}")
  136. # Remove the prefix
  137. remainder = ref[len(LOCAL_REMOTE_PREFIX) :]
  138. # Split into remote name and branch name
  139. parts = remainder.split(b"/", 1)
  140. if len(parts) != 2:
  141. raise ValueError(f"Invalid remote ref format: {ref!r}")
  142. remote_name, branch_name = parts
  143. return (remote_name, branch_name)
  144. def set_ref_from_raw(refs: "RefsContainer", name: Ref, raw_ref: bytes) -> None:
  145. """Set a reference from a raw ref value.
  146. This handles both symbolic refs (starting with 'ref: ') and direct ObjectID refs.
  147. Args:
  148. refs: The RefsContainer to set the ref in
  149. name: The ref name to set
  150. raw_ref: The raw ref value (either a symbolic ref or an ObjectID)
  151. """
  152. if raw_ref.startswith(SYMREF):
  153. # It's a symbolic ref
  154. target = Ref(raw_ref[len(SYMREF) :])
  155. refs.set_symbolic_ref(name, target)
  156. else:
  157. # It's a direct ObjectID
  158. refs[name] = ObjectID(raw_ref)
  159. class RefsContainer:
  160. """A container for refs."""
  161. def __init__(
  162. self,
  163. logger: Callable[
  164. [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
  165. ]
  166. | None = None,
  167. ) -> None:
  168. """Initialize RefsContainer with optional logger function."""
  169. self._logger = logger
  170. def _log(
  171. self,
  172. ref: bytes,
  173. old_sha: bytes | None,
  174. new_sha: bytes | None,
  175. committer: bytes | None = None,
  176. timestamp: int | None = None,
  177. timezone: int | None = None,
  178. message: bytes | None = None,
  179. ) -> None:
  180. if self._logger is None:
  181. return
  182. if message is None:
  183. return
  184. # Use ZERO_SHA for None values, matching git behavior
  185. if old_sha is None:
  186. old_sha = ZERO_SHA
  187. if new_sha is None:
  188. new_sha = ZERO_SHA
  189. self._logger(ref, old_sha, new_sha, committer, timestamp, timezone, message)
  190. def set_symbolic_ref(
  191. self,
  192. name: Ref,
  193. other: Ref,
  194. committer: bytes | None = None,
  195. timestamp: int | None = None,
  196. timezone: int | None = None,
  197. message: bytes | None = None,
  198. ) -> None:
  199. """Make a ref point at another ref.
  200. Args:
  201. name: Name of the ref to set
  202. other: Name of the ref to point at
  203. committer: Optional committer name/email
  204. timestamp: Optional timestamp
  205. timezone: Optional timezone
  206. message: Optional message
  207. """
  208. raise NotImplementedError(self.set_symbolic_ref)
  209. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  210. """Get contents of the packed-refs file.
  211. Returns: Dictionary mapping ref names to SHA1s
  212. Note: Will return an empty dictionary when no packed-refs file is
  213. present.
  214. """
  215. raise NotImplementedError(self.get_packed_refs)
  216. def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
  217. """Add the given refs as packed refs.
  218. Args:
  219. new_refs: A mapping of ref names to targets; if a target is None that
  220. means remove the ref
  221. """
  222. raise NotImplementedError(self.add_packed_refs)
  223. def get_peeled(self, name: Ref) -> ObjectID | None:
  224. """Return the cached peeled value of a ref, if available.
  225. Args:
  226. name: Name of the ref to peel
  227. Returns: The peeled value of the ref. If the ref is known not point to
  228. a tag, this will be the SHA the ref refers to. If the ref may point
  229. to a tag, but no cached information is available, None is returned.
  230. """
  231. return None
  232. def import_refs(
  233. self,
  234. base: Ref,
  235. other: Mapping[Ref, ObjectID | None],
  236. committer: bytes | None = None,
  237. timestamp: bytes | None = None,
  238. timezone: bytes | None = None,
  239. message: bytes | None = None,
  240. prune: bool = False,
  241. ) -> None:
  242. """Import refs from another repository.
  243. Args:
  244. base: Base ref to import into (e.g., b'refs/remotes/origin')
  245. other: Dictionary of refs to import
  246. committer: Optional committer for reflog
  247. timestamp: Optional timestamp for reflog
  248. timezone: Optional timezone for reflog
  249. message: Optional message for reflog
  250. prune: If True, remove refs not in other
  251. """
  252. if prune:
  253. to_delete = set(self.subkeys(base))
  254. else:
  255. to_delete = set()
  256. for name, value in other.items():
  257. if value is None:
  258. to_delete.add(name)
  259. else:
  260. self.set_if_equals(
  261. Ref(b"/".join((base, name))), None, value, message=message
  262. )
  263. if to_delete:
  264. try:
  265. to_delete.remove(name)
  266. except KeyError:
  267. pass
  268. for ref in to_delete:
  269. self.remove_if_equals(Ref(b"/".join((base, ref))), None, message=message)
  270. def allkeys(self) -> set[Ref]:
  271. """All refs present in this container."""
  272. raise NotImplementedError(self.allkeys)
  273. def __iter__(self) -> Iterator[Ref]:
  274. """Iterate over all reference keys."""
  275. return iter(self.allkeys())
  276. def keys(self, base: Ref | None = None) -> set[Ref]:
  277. """Refs present in this container.
  278. Args:
  279. base: An optional base to return refs under.
  280. Returns: An unsorted set of valid refs in this container, including
  281. packed refs.
  282. """
  283. if base is not None:
  284. return self.subkeys(base)
  285. else:
  286. return self.allkeys()
  287. def subkeys(self, base: Ref) -> set[Ref]:
  288. """Refs present in this container under a base.
  289. Args:
  290. base: The base to return refs under.
  291. Returns: A set of valid refs in this container under the base; the base
  292. prefix is stripped from the ref names returned.
  293. """
  294. keys: set[Ref] = set()
  295. base_len = len(base) + 1
  296. for refname in self.allkeys():
  297. if refname.startswith(base):
  298. keys.add(Ref(refname[base_len:]))
  299. return keys
  300. def as_dict(self, base: Ref | None = None) -> dict[Ref, ObjectID]:
  301. """Return the contents of this container as a dictionary."""
  302. ret: dict[Ref, ObjectID] = {}
  303. keys = self.keys(base)
  304. base_bytes: bytes
  305. if base is None:
  306. base_bytes = b""
  307. else:
  308. base_bytes = base.rstrip(b"/")
  309. for key in keys:
  310. try:
  311. ret[key] = self[Ref((base_bytes + b"/" + key).strip(b"/"))]
  312. except (SymrefLoop, KeyError):
  313. continue # Unable to resolve
  314. return ret
  315. def _check_refname(self, name: Ref) -> None:
  316. """Ensure a refname is valid and lives in refs or is HEAD.
  317. HEAD is not a valid refname according to git-check-ref-format, but this
  318. class needs to be able to touch HEAD. Also, check_ref_format expects
  319. refnames without the leading 'refs/', but this class requires that
  320. so it cannot touch anything outside the refs dir (or HEAD).
  321. Args:
  322. name: The name of the reference.
  323. Raises:
  324. KeyError: if a refname is not HEAD or is otherwise not valid.
  325. """
  326. if name in (HEADREF, Ref(b"refs/stash")):
  327. return
  328. if not name.startswith(b"refs/") or not check_ref_format(Ref(name[5:])):
  329. raise RefFormatError(name)
  330. def read_ref(self, refname: Ref) -> bytes | None:
  331. """Read a reference without following any references.
  332. Args:
  333. refname: The name of the reference
  334. Returns: The contents of the ref file, or None if it does
  335. not exist.
  336. """
  337. contents = self.read_loose_ref(refname)
  338. if not contents:
  339. contents = self.get_packed_refs().get(refname, None)
  340. return contents
  341. def read_loose_ref(self, name: Ref) -> bytes | None:
  342. """Read a loose reference and return its contents.
  343. Args:
  344. name: the refname to read
  345. Returns: The contents of the ref file, or None if it does
  346. not exist.
  347. """
  348. raise NotImplementedError(self.read_loose_ref)
  349. def follow(self, name: Ref) -> tuple[list[Ref], ObjectID | None]:
  350. """Follow a reference name.
  351. Returns: a tuple of (refnames, sha), wheres refnames are the names of
  352. references in the chain
  353. """
  354. contents: bytes | None = SYMREF + name
  355. depth = 0
  356. refnames: list[Ref] = []
  357. while contents and contents.startswith(SYMREF):
  358. refname = Ref(contents[len(SYMREF) :])
  359. refnames.append(refname)
  360. contents = self.read_ref(refname)
  361. if not contents:
  362. break
  363. depth += 1
  364. if depth > 5:
  365. raise SymrefLoop(name, depth)
  366. return refnames, ObjectID(contents) if contents else None
  367. def __contains__(self, refname: Ref) -> bool:
  368. """Check if a reference exists."""
  369. if self.read_ref(refname):
  370. return True
  371. return False
  372. def __getitem__(self, name: Ref) -> ObjectID:
  373. """Get the SHA1 for a reference name.
  374. This method follows all symbolic references.
  375. """
  376. _, sha = self.follow(name)
  377. if sha is None:
  378. raise KeyError(name)
  379. return sha
  380. def set_if_equals(
  381. self,
  382. name: Ref,
  383. old_ref: ObjectID | None,
  384. new_ref: ObjectID,
  385. committer: bytes | None = None,
  386. timestamp: int | None = None,
  387. timezone: int | None = None,
  388. message: bytes | None = None,
  389. ) -> bool:
  390. """Set a refname to new_ref only if it currently equals old_ref.
  391. This method follows all symbolic references if applicable for the
  392. subclass, and can be used to perform an atomic compare-and-swap
  393. operation.
  394. Args:
  395. name: The refname to set.
  396. old_ref: The old sha the refname must refer to, or None to set
  397. unconditionally.
  398. new_ref: The new sha the refname will refer to.
  399. committer: Optional committer name/email
  400. timestamp: Optional timestamp
  401. timezone: Optional timezone
  402. message: Message for reflog
  403. Returns: True if the set was successful, False otherwise.
  404. """
  405. raise NotImplementedError(self.set_if_equals)
  406. def add_if_new(
  407. self,
  408. name: Ref,
  409. ref: ObjectID,
  410. committer: bytes | None = None,
  411. timestamp: int | None = None,
  412. timezone: int | None = None,
  413. message: bytes | None = None,
  414. ) -> bool:
  415. """Add a new reference only if it does not already exist.
  416. Args:
  417. name: Ref name
  418. ref: Ref value
  419. committer: Optional committer name/email
  420. timestamp: Optional timestamp
  421. timezone: Optional timezone
  422. message: Optional message for reflog
  423. """
  424. raise NotImplementedError(self.add_if_new)
  425. def __setitem__(self, name: Ref, ref: ObjectID) -> None:
  426. """Set a reference name to point to the given SHA1.
  427. This method follows all symbolic references if applicable for the
  428. subclass.
  429. Note: This method unconditionally overwrites the contents of a
  430. reference. To update atomically only if the reference has not
  431. changed, use set_if_equals().
  432. Args:
  433. name: The refname to set.
  434. ref: The new sha the refname will refer to.
  435. """
  436. if not (valid_hexsha(ref) or ref.startswith(SYMREF)):
  437. raise ValueError(f"{ref!r} must be a valid sha (40 chars) or a symref")
  438. self.set_if_equals(name, None, ref)
  439. def remove_if_equals(
  440. self,
  441. name: Ref,
  442. old_ref: ObjectID | None,
  443. committer: bytes | None = None,
  444. timestamp: int | None = None,
  445. timezone: int | None = None,
  446. message: bytes | None = None,
  447. ) -> bool:
  448. """Remove a refname only if it currently equals old_ref.
  449. This method does not follow symbolic references, even if applicable for
  450. the subclass. It can be used to perform an atomic compare-and-delete
  451. operation.
  452. Args:
  453. name: The refname to delete.
  454. old_ref: The old sha the refname must refer to, or None to
  455. delete unconditionally.
  456. committer: Optional committer name/email
  457. timestamp: Optional timestamp
  458. timezone: Optional timezone
  459. message: Message for reflog
  460. Returns: True if the delete was successful, False otherwise.
  461. """
  462. raise NotImplementedError(self.remove_if_equals)
  463. def __delitem__(self, name: Ref) -> None:
  464. """Remove a refname.
  465. This method does not follow symbolic references, even if applicable for
  466. the subclass.
  467. Note: This method unconditionally deletes the contents of a reference.
  468. To delete atomically only if the reference has not changed, use
  469. remove_if_equals().
  470. Args:
  471. name: The refname to delete.
  472. """
  473. self.remove_if_equals(name, None)
  474. def get_symrefs(self) -> dict[Ref, Ref]:
  475. """Get a dict with all symrefs in this container.
  476. Returns: Dictionary mapping source ref to target ref
  477. """
  478. ret: dict[Ref, Ref] = {}
  479. for src in self.allkeys():
  480. try:
  481. ref_value = self.read_ref(src)
  482. assert ref_value is not None
  483. dst = parse_symref_value(ref_value)
  484. except ValueError:
  485. pass
  486. else:
  487. ret[src] = Ref(dst)
  488. return ret
  489. def pack_refs(self, all: bool = False) -> None:
  490. """Pack loose refs into packed-refs file.
  491. Args:
  492. all: If True, pack all refs. If False, only pack tags.
  493. """
  494. raise NotImplementedError(self.pack_refs)
  495. class DictRefsContainer(RefsContainer):
  496. """RefsContainer backed by a simple dict.
  497. This container does not support symbolic or packed references and is not
  498. threadsafe.
  499. """
  500. def __init__(
  501. self,
  502. refs: dict[Ref, bytes],
  503. logger: Callable[
  504. [
  505. bytes,
  506. bytes | None,
  507. bytes | None,
  508. bytes | None,
  509. int | None,
  510. int | None,
  511. bytes | None,
  512. ],
  513. None,
  514. ]
  515. | None = None,
  516. ) -> None:
  517. """Initialize DictRefsContainer with refs dictionary and optional logger."""
  518. super().__init__(logger=logger)
  519. self._refs = refs
  520. self._peeled: dict[Ref, ObjectID] = {}
  521. self._watchers: set[Any] = set()
  522. def allkeys(self) -> set[Ref]:
  523. """Return all reference keys."""
  524. return set(self._refs.keys())
  525. def read_loose_ref(self, name: Ref) -> bytes | None:
  526. """Read a loose reference."""
  527. return self._refs.get(name, None)
  528. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  529. """Get packed references."""
  530. return {}
  531. def _notify(self, ref: bytes, newsha: bytes | None) -> None:
  532. for watcher in self._watchers:
  533. watcher._notify((ref, newsha))
  534. def set_symbolic_ref(
  535. self,
  536. name: Ref,
  537. other: Ref,
  538. committer: bytes | None = None,
  539. timestamp: int | None = None,
  540. timezone: int | None = None,
  541. message: bytes | None = None,
  542. ) -> None:
  543. """Make a ref point at another ref.
  544. Args:
  545. name: Name of the ref to set
  546. other: Name of the ref to point at
  547. committer: Optional committer name for reflog
  548. timestamp: Optional timestamp for reflog
  549. timezone: Optional timezone for reflog
  550. message: Optional message for reflog
  551. """
  552. old = self.follow(name)[-1]
  553. new = SYMREF + other
  554. self._refs[name] = new
  555. self._notify(name, new)
  556. self._log(
  557. name,
  558. old,
  559. new,
  560. committer=committer,
  561. timestamp=timestamp,
  562. timezone=timezone,
  563. message=message,
  564. )
  565. def set_if_equals(
  566. self,
  567. name: Ref,
  568. old_ref: ObjectID | None,
  569. new_ref: ObjectID,
  570. committer: bytes | None = None,
  571. timestamp: int | None = None,
  572. timezone: int | None = None,
  573. message: bytes | None = None,
  574. ) -> bool:
  575. """Set a refname to new_ref only if it currently equals old_ref.
  576. This method follows all symbolic references, and can be used to perform
  577. an atomic compare-and-swap operation.
  578. Args:
  579. name: The refname to set.
  580. old_ref: The old sha the refname must refer to, or None to set
  581. unconditionally.
  582. new_ref: The new sha the refname will refer to.
  583. committer: Optional committer name for reflog
  584. timestamp: Optional timestamp for reflog
  585. timezone: Optional timezone for reflog
  586. message: Optional message for reflog
  587. Returns:
  588. True if the set was successful, False otherwise.
  589. """
  590. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  591. return False
  592. # Only update the specific ref requested, not the whole chain
  593. self._check_refname(name)
  594. old = self._refs.get(name)
  595. self._refs[name] = new_ref
  596. self._notify(name, new_ref)
  597. self._log(
  598. name,
  599. old,
  600. new_ref,
  601. committer=committer,
  602. timestamp=timestamp,
  603. timezone=timezone,
  604. message=message,
  605. )
  606. return True
  607. def add_if_new(
  608. self,
  609. name: Ref,
  610. ref: ObjectID,
  611. committer: bytes | None = None,
  612. timestamp: int | None = None,
  613. timezone: int | None = None,
  614. message: bytes | None = None,
  615. ) -> bool:
  616. """Add a new reference only if it does not already exist.
  617. Args:
  618. name: Ref name
  619. ref: Ref value
  620. committer: Optional committer name for reflog
  621. timestamp: Optional timestamp for reflog
  622. timezone: Optional timezone for reflog
  623. message: Optional message for reflog
  624. Returns:
  625. True if the add was successful, False otherwise.
  626. """
  627. if name in self._refs:
  628. return False
  629. self._refs[name] = ref
  630. self._notify(name, ref)
  631. self._log(
  632. name,
  633. None,
  634. ref,
  635. committer=committer,
  636. timestamp=timestamp,
  637. timezone=timezone,
  638. message=message,
  639. )
  640. return True
  641. def remove_if_equals(
  642. self,
  643. name: Ref,
  644. old_ref: ObjectID | None,
  645. committer: bytes | None = None,
  646. timestamp: int | None = None,
  647. timezone: int | None = None,
  648. message: bytes | None = None,
  649. ) -> bool:
  650. """Remove a refname only if it currently equals old_ref.
  651. This method does not follow symbolic references. It can be used to
  652. perform an atomic compare-and-delete operation.
  653. Args:
  654. name: The refname to delete.
  655. old_ref: The old sha the refname must refer to, or None to
  656. delete unconditionally.
  657. committer: Optional committer name for reflog
  658. timestamp: Optional timestamp for reflog
  659. timezone: Optional timezone for reflog
  660. message: Optional message for reflog
  661. Returns:
  662. True if the delete was successful, False otherwise.
  663. """
  664. if old_ref is not None and self._refs.get(name, ZERO_SHA) != old_ref:
  665. return False
  666. try:
  667. old = self._refs.pop(name)
  668. except KeyError:
  669. pass
  670. else:
  671. self._notify(name, None)
  672. self._log(
  673. name,
  674. old,
  675. None,
  676. committer=committer,
  677. timestamp=timestamp,
  678. timezone=timezone,
  679. message=message,
  680. )
  681. return True
  682. def get_peeled(self, name: Ref) -> ObjectID | None:
  683. """Get peeled version of a reference."""
  684. return self._peeled.get(name)
  685. def _update(self, refs: Mapping[Ref, ObjectID]) -> None:
  686. """Update multiple refs; intended only for testing."""
  687. # TODO(dborowitz): replace this with a public function that uses
  688. # set_if_equal.
  689. for ref, sha in refs.items():
  690. self.set_if_equals(ref, None, sha)
  691. def _update_peeled(self, peeled: Mapping[Ref, ObjectID]) -> None:
  692. """Update cached peeled refs; intended only for testing."""
  693. self._peeled.update(peeled)
  694. class DiskRefsContainer(RefsContainer):
  695. """Refs container that reads refs from disk."""
  696. def __init__(
  697. self,
  698. path: str | bytes | os.PathLike[str],
  699. worktree_path: str | bytes | os.PathLike[str] | None = None,
  700. logger: Callable[
  701. [bytes, bytes, bytes, bytes | None, int | None, int | None, bytes], None
  702. ]
  703. | None = None,
  704. ) -> None:
  705. """Initialize DiskRefsContainer."""
  706. super().__init__(logger=logger)
  707. # Convert path-like objects to strings, then to bytes for Git compatibility
  708. self.path = os.fsencode(os.fspath(path))
  709. if worktree_path is None:
  710. self.worktree_path = self.path
  711. else:
  712. self.worktree_path = os.fsencode(os.fspath(worktree_path))
  713. self._packed_refs: dict[Ref, ObjectID] | None = None
  714. self._peeled_refs: dict[Ref, ObjectID] | None = None
  715. def __repr__(self) -> str:
  716. """Return string representation of DiskRefsContainer."""
  717. return f"{self.__class__.__name__}({self.path!r})"
  718. def _iter_dir(
  719. self,
  720. path: bytes,
  721. base: bytes,
  722. dir_filter: Callable[[bytes], bool] | None = None,
  723. ) -> Iterator[Ref]:
  724. refspath = os.path.join(path, base.rstrip(b"/"))
  725. prefix_len = len(os.path.join(path, b""))
  726. for root, dirs, files in os.walk(refspath):
  727. directory = root[prefix_len:]
  728. if os.path.sep != "/":
  729. directory = directory.replace(os.fsencode(os.path.sep), b"/")
  730. if dir_filter is not None:
  731. dirs[:] = [
  732. d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
  733. ]
  734. for filename in files:
  735. refname = b"/".join([directory, filename])
  736. if check_ref_format(Ref(refname)):
  737. yield Ref(refname)
  738. def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[Ref]:
  739. base = base.rstrip(b"/") + b"/"
  740. search_paths: list[tuple[bytes, Callable[[bytes], bool] | None]] = []
  741. if base != b"refs/":
  742. path = self.worktree_path if is_per_worktree_ref(base) else self.path
  743. search_paths.append((path, None))
  744. elif self.worktree_path == self.path:
  745. # Iterate through all the refs from the main worktree
  746. search_paths.append((self.path, None))
  747. else:
  748. # Iterate through all the shared refs from the commondir, excluding per-worktree refs
  749. search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
  750. # Iterate through all the per-worktree refs from the worktree's gitdir
  751. search_paths.append((self.worktree_path, is_per_worktree_ref))
  752. for path, dir_filter in search_paths:
  753. yield from self._iter_dir(path, base, dir_filter=dir_filter)
  754. def subkeys(self, base: Ref) -> set[Ref]:
  755. """Return subkeys under a given base reference path."""
  756. subkeys: set[Ref] = set()
  757. for key in self._iter_loose_refs(base):
  758. if key.startswith(base):
  759. subkeys.add(Ref(key[len(base) :].strip(b"/")))
  760. for key in self.get_packed_refs():
  761. if key.startswith(base):
  762. subkeys.add(Ref(key[len(base) :].strip(b"/")))
  763. return subkeys
  764. def allkeys(self) -> set[Ref]:
  765. """Return all reference keys."""
  766. allkeys: set[Ref] = set()
  767. if os.path.exists(self.refpath(HEADREF)):
  768. allkeys.add(Ref(HEADREF))
  769. allkeys.update(self._iter_loose_refs())
  770. allkeys.update(self.get_packed_refs())
  771. return allkeys
  772. def refpath(self, name: bytes) -> bytes:
  773. """Return the disk path of a ref."""
  774. path = name
  775. if os.path.sep != "/":
  776. path = path.replace(b"/", os.fsencode(os.path.sep))
  777. root_dir = self.worktree_path if is_per_worktree_ref(name) else self.path
  778. return os.path.join(root_dir, path)
  779. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  780. """Get contents of the packed-refs file.
  781. Returns: Dictionary mapping ref names to SHA1s
  782. Note: Will return an empty dictionary when no packed-refs file is
  783. present.
  784. """
  785. # TODO: invalidate the cache on repacking
  786. if self._packed_refs is None:
  787. # set both to empty because we want _peeled_refs to be
  788. # None if and only if _packed_refs is also None.
  789. self._packed_refs = {}
  790. self._peeled_refs = {}
  791. path = os.path.join(self.path, b"packed-refs")
  792. try:
  793. f = GitFile(path, "rb")
  794. except FileNotFoundError:
  795. return {}
  796. with f:
  797. first_line = next(iter(f)).rstrip()
  798. if first_line.startswith(b"# pack-refs") and b" peeled" in first_line:
  799. for sha, name, peeled in read_packed_refs_with_peeled(f):
  800. self._packed_refs[name] = sha
  801. if peeled:
  802. self._peeled_refs[name] = peeled
  803. else:
  804. f.seek(0)
  805. for sha, name in read_packed_refs(f):
  806. self._packed_refs[name] = sha
  807. return self._packed_refs
  808. def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
  809. """Add the given refs as packed refs.
  810. Args:
  811. new_refs: A mapping of ref names to targets; if a target is None that
  812. means remove the ref
  813. """
  814. if not new_refs:
  815. return
  816. path = os.path.join(self.path, b"packed-refs")
  817. with GitFile(path, "wb") as f:
  818. # reread cached refs from disk, while holding the lock
  819. packed_refs = self.get_packed_refs().copy()
  820. for ref, target in new_refs.items():
  821. # sanity check
  822. if ref == HEADREF:
  823. raise ValueError("cannot pack HEAD")
  824. # remove any loose refs pointing to this one -- please
  825. # note that this bypasses remove_if_equals as we don't
  826. # want to affect packed refs in here
  827. with suppress(OSError):
  828. os.remove(self.refpath(ref))
  829. if target is not None:
  830. packed_refs[ref] = target
  831. else:
  832. packed_refs.pop(ref, None)
  833. write_packed_refs(f, packed_refs, self._peeled_refs)
  834. self._packed_refs = packed_refs
  835. def get_peeled(self, name: Ref) -> ObjectID | None:
  836. """Return the cached peeled value of a ref, if available.
  837. Args:
  838. name: Name of the ref to peel
  839. Returns: The peeled value of the ref. If the ref is known not point to
  840. a tag, this will be the SHA the ref refers to. If the ref may point
  841. to a tag, but no cached information is available, None is returned.
  842. """
  843. self.get_packed_refs()
  844. if (
  845. self._peeled_refs is None
  846. or self._packed_refs is None
  847. or name not in self._packed_refs
  848. ):
  849. # No cache: no peeled refs were read, or this ref is loose
  850. return None
  851. if name in self._peeled_refs:
  852. return self._peeled_refs[name]
  853. else:
  854. # Known not peelable
  855. return self[name]
  856. def read_loose_ref(self, name: Ref) -> bytes | None:
  857. """Read a reference file and return its contents.
  858. If the reference file a symbolic reference, only read the first line of
  859. the file. Otherwise, only read the first 40 bytes.
  860. Args:
  861. name: the refname to read, relative to refpath
  862. Returns: The contents of the ref file, or None if the file does not
  863. exist.
  864. Raises:
  865. IOError: if any other error occurs
  866. """
  867. filename = self.refpath(name)
  868. try:
  869. with GitFile(filename, "rb") as f:
  870. header = f.read(len(SYMREF))
  871. if header == SYMREF:
  872. # Read only the first line
  873. return header + next(iter(f)).rstrip(b"\r\n")
  874. else:
  875. # Read only the first 40 bytes
  876. return header + f.read(40 - len(SYMREF))
  877. except (OSError, UnicodeError):
  878. # don't assume anything specific about the error; in
  879. # particular, invalid or forbidden paths can raise weird
  880. # errors depending on the specific operating system
  881. return None
  882. def _remove_packed_ref(self, name: Ref) -> None:
  883. if self._packed_refs is None:
  884. return
  885. filename = os.path.join(self.path, b"packed-refs")
  886. # reread cached refs from disk, while holding the lock
  887. f = GitFile(filename, "wb")
  888. try:
  889. self._packed_refs = None
  890. self.get_packed_refs()
  891. if self._packed_refs is None or name not in self._packed_refs:
  892. f.abort()
  893. return
  894. del self._packed_refs[name]
  895. if self._peeled_refs is not None:
  896. with suppress(KeyError):
  897. del self._peeled_refs[name]
  898. write_packed_refs(f, self._packed_refs, self._peeled_refs)
  899. f.close()
  900. except BaseException:
  901. f.abort()
  902. raise
  903. def set_symbolic_ref(
  904. self,
  905. name: Ref,
  906. other: Ref,
  907. committer: bytes | None = None,
  908. timestamp: int | None = None,
  909. timezone: int | None = None,
  910. message: bytes | None = None,
  911. ) -> None:
  912. """Make a ref point at another ref.
  913. Args:
  914. name: Name of the ref to set
  915. other: Name of the ref to point at
  916. committer: Optional committer name
  917. timestamp: Optional timestamp
  918. timezone: Optional timezone
  919. message: Optional message to describe the change
  920. """
  921. self._check_refname(name)
  922. self._check_refname(other)
  923. filename = self.refpath(name)
  924. f = GitFile(filename, "wb")
  925. try:
  926. f.write(SYMREF + other + b"\n")
  927. sha = self.follow(name)[-1]
  928. self._log(
  929. name,
  930. sha,
  931. sha,
  932. committer=committer,
  933. timestamp=timestamp,
  934. timezone=timezone,
  935. message=message,
  936. )
  937. except BaseException:
  938. f.abort()
  939. raise
  940. else:
  941. f.close()
  942. def set_if_equals(
  943. self,
  944. name: Ref,
  945. old_ref: ObjectID | None,
  946. new_ref: ObjectID,
  947. committer: bytes | None = None,
  948. timestamp: int | None = None,
  949. timezone: int | None = None,
  950. message: bytes | None = None,
  951. ) -> bool:
  952. """Set a refname to new_ref only if it currently equals old_ref.
  953. This method follows all symbolic references, and can be used to perform
  954. an atomic compare-and-swap operation.
  955. Args:
  956. name: The refname to set.
  957. old_ref: The old sha the refname must refer to, or None to set
  958. unconditionally.
  959. new_ref: The new sha the refname will refer to.
  960. committer: Optional committer name
  961. timestamp: Optional timestamp
  962. timezone: Optional timezone
  963. message: Set message for reflog
  964. Returns: True if the set was successful, False otherwise.
  965. """
  966. self._check_refname(name)
  967. try:
  968. realnames, _ = self.follow(name)
  969. realname = realnames[-1]
  970. except (KeyError, IndexError, SymrefLoop):
  971. realname = name
  972. filename = self.refpath(realname)
  973. # make sure none of the ancestor folders is in packed refs
  974. probe_ref = Ref(os.path.dirname(realname))
  975. packed_refs = self.get_packed_refs()
  976. while probe_ref:
  977. if packed_refs.get(probe_ref, None) is not None:
  978. raise NotADirectoryError(filename)
  979. probe_ref = Ref(os.path.dirname(probe_ref))
  980. ensure_dir_exists(os.path.dirname(filename))
  981. with GitFile(filename, "wb") as f:
  982. if old_ref is not None:
  983. try:
  984. # read again while holding the lock to handle race conditions
  985. orig_ref = self.read_loose_ref(realname)
  986. if orig_ref is None:
  987. orig_ref = self.get_packed_refs().get(realname, ZERO_SHA)
  988. if orig_ref != old_ref:
  989. f.abort()
  990. return False
  991. except OSError:
  992. f.abort()
  993. raise
  994. # Check if ref already has the desired value while holding the lock
  995. # This avoids fsync when ref is unchanged but still detects lock conflicts
  996. current_ref = self.read_loose_ref(realname)
  997. if current_ref is None:
  998. current_ref = packed_refs.get(realname, None)
  999. if current_ref is not None and current_ref == new_ref:
  1000. # Ref already has desired value, abort write to avoid fsync
  1001. f.abort()
  1002. return True
  1003. try:
  1004. f.write(new_ref + b"\n")
  1005. except OSError:
  1006. f.abort()
  1007. raise
  1008. self._log(
  1009. realname,
  1010. old_ref,
  1011. new_ref,
  1012. committer=committer,
  1013. timestamp=timestamp,
  1014. timezone=timezone,
  1015. message=message,
  1016. )
  1017. return True
  1018. def add_if_new(
  1019. self,
  1020. name: Ref,
  1021. ref: ObjectID,
  1022. committer: bytes | None = None,
  1023. timestamp: int | None = None,
  1024. timezone: int | None = None,
  1025. message: bytes | None = None,
  1026. ) -> bool:
  1027. """Add a new reference only if it does not already exist.
  1028. This method follows symrefs, and only ensures that the last ref in the
  1029. chain does not exist.
  1030. Args:
  1031. name: The refname to set.
  1032. ref: The new sha the refname will refer to.
  1033. committer: Optional committer name
  1034. timestamp: Optional timestamp
  1035. timezone: Optional timezone
  1036. message: Optional message for reflog
  1037. Returns: True if the add was successful, False otherwise.
  1038. """
  1039. try:
  1040. realnames, contents = self.follow(name)
  1041. if contents is not None:
  1042. return False
  1043. realname = realnames[-1]
  1044. except (KeyError, IndexError):
  1045. realname = name
  1046. self._check_refname(realname)
  1047. filename = self.refpath(realname)
  1048. ensure_dir_exists(os.path.dirname(filename))
  1049. with GitFile(filename, "wb") as f:
  1050. if os.path.exists(filename) or name in self.get_packed_refs():
  1051. f.abort()
  1052. return False
  1053. try:
  1054. f.write(ref + b"\n")
  1055. except OSError:
  1056. f.abort()
  1057. raise
  1058. else:
  1059. self._log(
  1060. name,
  1061. None,
  1062. ref,
  1063. committer=committer,
  1064. timestamp=timestamp,
  1065. timezone=timezone,
  1066. message=message,
  1067. )
  1068. return True
  1069. def remove_if_equals(
  1070. self,
  1071. name: Ref,
  1072. old_ref: ObjectID | None,
  1073. committer: bytes | None = None,
  1074. timestamp: int | None = None,
  1075. timezone: int | None = None,
  1076. message: bytes | None = None,
  1077. ) -> bool:
  1078. """Remove a refname only if it currently equals old_ref.
  1079. This method does not follow symbolic references. It can be used to
  1080. perform an atomic compare-and-delete operation.
  1081. Args:
  1082. name: The refname to delete.
  1083. old_ref: The old sha the refname must refer to, or None to
  1084. delete unconditionally.
  1085. committer: Optional committer name
  1086. timestamp: Optional timestamp
  1087. timezone: Optional timezone
  1088. message: Optional message
  1089. Returns: True if the delete was successful, False otherwise.
  1090. """
  1091. self._check_refname(name)
  1092. filename = self.refpath(name)
  1093. ensure_dir_exists(os.path.dirname(filename))
  1094. f = GitFile(filename, "wb")
  1095. try:
  1096. if old_ref is not None:
  1097. orig_ref = self.read_loose_ref(name)
  1098. if orig_ref is None:
  1099. orig_ref = self.get_packed_refs().get(name)
  1100. if orig_ref is None:
  1101. orig_ref = ZERO_SHA
  1102. if orig_ref != old_ref:
  1103. return False
  1104. # remove the reference file itself
  1105. try:
  1106. found = os.path.lexists(filename)
  1107. except OSError:
  1108. # may only be packed, or otherwise unstorable
  1109. found = False
  1110. if found:
  1111. os.remove(filename)
  1112. self._remove_packed_ref(name)
  1113. self._log(
  1114. name,
  1115. old_ref,
  1116. None,
  1117. committer=committer,
  1118. timestamp=timestamp,
  1119. timezone=timezone,
  1120. message=message,
  1121. )
  1122. finally:
  1123. # never write, we just wanted the lock
  1124. f.abort()
  1125. # outside of the lock, clean-up any parent directory that might now
  1126. # be empty. this ensures that re-creating a reference of the same
  1127. # name of what was previously a directory works as expected
  1128. parent = name
  1129. while True:
  1130. try:
  1131. parent_bytes, _ = parent.rsplit(b"/", 1)
  1132. parent = Ref(parent_bytes)
  1133. except ValueError:
  1134. break
  1135. if parent == b"refs":
  1136. break
  1137. parent_filename = self.refpath(parent)
  1138. try:
  1139. os.rmdir(parent_filename)
  1140. except OSError:
  1141. # this can be caused by the parent directory being
  1142. # removed by another process, being not empty, etc.
  1143. # in any case, this is non fatal because we already
  1144. # removed the reference, just ignore it
  1145. break
  1146. return True
  1147. def pack_refs(self, all: bool = False) -> None:
  1148. """Pack loose refs into packed-refs file.
  1149. Args:
  1150. all: If True, pack all refs. If False, only pack tags.
  1151. """
  1152. refs_to_pack: dict[Ref, ObjectID | None] = {}
  1153. for ref in self.allkeys():
  1154. if ref == HEADREF:
  1155. # Never pack HEAD
  1156. continue
  1157. if all or ref.startswith(LOCAL_TAG_PREFIX):
  1158. try:
  1159. sha = self[ref]
  1160. if sha:
  1161. refs_to_pack[ref] = sha
  1162. except KeyError:
  1163. # Broken ref, skip it
  1164. pass
  1165. if refs_to_pack:
  1166. self.add_packed_refs(refs_to_pack)
  1167. def _split_ref_line(line: bytes) -> tuple[ObjectID, Ref]:
  1168. """Split a single ref line into a tuple of SHA1 and name."""
  1169. fields = line.rstrip(b"\n\r").split(b" ")
  1170. if len(fields) != 2:
  1171. raise PackedRefsException(f"invalid ref line {line!r}")
  1172. sha, name = fields
  1173. if not valid_hexsha(sha):
  1174. raise PackedRefsException(f"Invalid hex sha {sha!r}")
  1175. if not check_ref_format(Ref(name)):
  1176. raise PackedRefsException(f"invalid ref name {name!r}")
  1177. return (ObjectID(sha), Ref(name))
  1178. def read_packed_refs(f: IO[bytes]) -> Iterator[tuple[ObjectID, Ref]]:
  1179. """Read a packed refs file.
  1180. Args:
  1181. f: file-like object to read from
  1182. Returns: Iterator over tuples with SHA1s and ref names.
  1183. """
  1184. for line in f:
  1185. if line.startswith(b"#"):
  1186. # Comment
  1187. continue
  1188. if line.startswith(b"^"):
  1189. raise PackedRefsException("found peeled ref in packed-refs without peeled")
  1190. yield _split_ref_line(line)
  1191. def read_packed_refs_with_peeled(
  1192. f: IO[bytes],
  1193. ) -> Iterator[tuple[ObjectID, Ref, ObjectID | None]]:
  1194. """Read a packed refs file including peeled refs.
  1195. Assumes the "# pack-refs with: peeled" line was already read. Yields tuples
  1196. with ref names, SHA1s, and peeled SHA1s (or None).
  1197. Args:
  1198. f: file-like object to read from, seek'ed to the second line
  1199. """
  1200. last = None
  1201. for line in f:
  1202. if line.startswith(b"#"):
  1203. continue
  1204. line = line.rstrip(b"\r\n")
  1205. if line.startswith(b"^"):
  1206. if not last:
  1207. raise PackedRefsException("unexpected peeled ref line")
  1208. if not valid_hexsha(line[1:]):
  1209. raise PackedRefsException(f"Invalid hex sha {line[1:]!r}")
  1210. sha, name = _split_ref_line(last)
  1211. last = None
  1212. yield (sha, name, ObjectID(line[1:]))
  1213. else:
  1214. if last:
  1215. sha, name = _split_ref_line(last)
  1216. yield (sha, name, None)
  1217. last = line
  1218. if last:
  1219. sha, name = _split_ref_line(last)
  1220. yield (sha, name, None)
  1221. def write_packed_refs(
  1222. f: IO[bytes],
  1223. packed_refs: Mapping[Ref, ObjectID],
  1224. peeled_refs: Mapping[Ref, ObjectID] | None = None,
  1225. ) -> None:
  1226. """Write a packed refs file.
  1227. Args:
  1228. f: empty file-like object to write to
  1229. packed_refs: dict of refname to sha of packed refs to write
  1230. peeled_refs: dict of refname to peeled value of sha
  1231. """
  1232. if peeled_refs is None:
  1233. peeled_refs = {}
  1234. else:
  1235. f.write(b"# pack-refs with: peeled\n")
  1236. for refname in sorted(packed_refs.keys()):
  1237. f.write(git_line(packed_refs[refname], refname))
  1238. if refname in peeled_refs:
  1239. f.write(b"^" + peeled_refs[refname] + b"\n")
  1240. def read_info_refs(f: BinaryIO) -> dict[Ref, ObjectID]:
  1241. """Read info/refs file.
  1242. Args:
  1243. f: File-like object to read from
  1244. Returns:
  1245. Dictionary mapping ref names to SHA1s
  1246. """
  1247. ret: dict[Ref, ObjectID] = {}
  1248. for line in f.readlines():
  1249. (sha, name) = line.rstrip(b"\r\n").split(b"\t", 1)
  1250. ret[Ref(name)] = ObjectID(sha)
  1251. return ret
  1252. def is_local_branch(x: bytes) -> bool:
  1253. """Check if a ref name is a local branch."""
  1254. return x.startswith(LOCAL_BRANCH_PREFIX)
  1255. def local_branch_name(name: bytes) -> Ref:
  1256. """Build a full branch ref from a short name.
  1257. Args:
  1258. name: Short branch name (e.g., b"master") or full ref
  1259. Returns:
  1260. Full branch ref name (e.g., b"refs/heads/master")
  1261. Examples:
  1262. >>> local_branch_name(b"master")
  1263. b'refs/heads/master'
  1264. >>> local_branch_name(b"refs/heads/master")
  1265. b'refs/heads/master'
  1266. """
  1267. if name.startswith(LOCAL_BRANCH_PREFIX):
  1268. return Ref(name)
  1269. return Ref(LOCAL_BRANCH_PREFIX + name)
  1270. def local_tag_name(name: bytes) -> Ref:
  1271. """Build a full tag ref from a short name.
  1272. Args:
  1273. name: Short tag name (e.g., b"v1.0") or full ref
  1274. Returns:
  1275. Full tag ref name (e.g., b"refs/tags/v1.0")
  1276. Examples:
  1277. >>> local_tag_name(b"v1.0")
  1278. b'refs/tags/v1.0'
  1279. >>> local_tag_name(b"refs/tags/v1.0")
  1280. b'refs/tags/v1.0'
  1281. """
  1282. if name.startswith(LOCAL_TAG_PREFIX):
  1283. return Ref(name)
  1284. return Ref(LOCAL_TAG_PREFIX + name)
  1285. def local_replace_name(name: bytes) -> Ref:
  1286. """Build a full replace ref from a short name.
  1287. Args:
  1288. name: Short replace name (object SHA) or full ref
  1289. Returns:
  1290. Full replace ref name (e.g., b"refs/replace/<sha>")
  1291. Examples:
  1292. >>> local_replace_name(b"abc123")
  1293. b'refs/replace/abc123'
  1294. >>> local_replace_name(b"refs/replace/abc123")
  1295. b'refs/replace/abc123'
  1296. """
  1297. if name.startswith(LOCAL_REPLACE_PREFIX):
  1298. return Ref(name)
  1299. return Ref(LOCAL_REPLACE_PREFIX + name)
  1300. def extract_branch_name(ref: bytes) -> bytes:
  1301. """Extract branch name from a full branch ref.
  1302. Args:
  1303. ref: Full branch ref (e.g., b"refs/heads/master")
  1304. Returns:
  1305. Short branch name (e.g., b"master")
  1306. Raises:
  1307. ValueError: If ref is not a local branch
  1308. Examples:
  1309. >>> extract_branch_name(b"refs/heads/master")
  1310. b'master'
  1311. >>> extract_branch_name(b"refs/heads/feature/foo")
  1312. b'feature/foo'
  1313. """
  1314. if not ref.startswith(LOCAL_BRANCH_PREFIX):
  1315. raise ValueError(f"Not a local branch ref: {ref!r}")
  1316. return ref[len(LOCAL_BRANCH_PREFIX) :]
  1317. def extract_tag_name(ref: bytes) -> bytes:
  1318. """Extract tag name from a full tag ref.
  1319. Args:
  1320. ref: Full tag ref (e.g., b"refs/tags/v1.0")
  1321. Returns:
  1322. Short tag name (e.g., b"v1.0")
  1323. Raises:
  1324. ValueError: If ref is not a local tag
  1325. Examples:
  1326. >>> extract_tag_name(b"refs/tags/v1.0")
  1327. b'v1.0'
  1328. """
  1329. if not ref.startswith(LOCAL_TAG_PREFIX):
  1330. raise ValueError(f"Not a local tag ref: {ref!r}")
  1331. return ref[len(LOCAL_TAG_PREFIX) :]
  1332. def shorten_ref_name(ref: bytes) -> bytes:
  1333. """Convert a full ref name to its short form.
  1334. Args:
  1335. ref: Full ref name (e.g., b"refs/heads/master")
  1336. Returns:
  1337. Short ref name (e.g., b"master")
  1338. Examples:
  1339. >>> shorten_ref_name(b"refs/heads/master")
  1340. b'master'
  1341. >>> shorten_ref_name(b"refs/remotes/origin/main")
  1342. b'origin/main'
  1343. >>> shorten_ref_name(b"refs/tags/v1.0")
  1344. b'v1.0'
  1345. >>> shorten_ref_name(b"HEAD")
  1346. b'HEAD'
  1347. """
  1348. if ref.startswith(LOCAL_BRANCH_PREFIX):
  1349. return ref[len(LOCAL_BRANCH_PREFIX) :]
  1350. elif ref.startswith(LOCAL_REMOTE_PREFIX):
  1351. return ref[len(LOCAL_REMOTE_PREFIX) :]
  1352. elif ref.startswith(LOCAL_TAG_PREFIX):
  1353. return ref[len(LOCAL_TAG_PREFIX) :]
  1354. return ref
  1355. def _set_origin_head(
  1356. refs: RefsContainer, origin: bytes, origin_head: bytes | None
  1357. ) -> None:
  1358. # set refs/remotes/origin/HEAD
  1359. origin_base = b"refs/remotes/" + origin + b"/"
  1360. if origin_head and origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1361. origin_ref = Ref(origin_base + HEADREF)
  1362. target_ref = Ref(origin_base + extract_branch_name(origin_head))
  1363. if target_ref in refs:
  1364. refs.set_symbolic_ref(origin_ref, target_ref)
  1365. def _set_default_branch(
  1366. refs: RefsContainer,
  1367. origin: bytes,
  1368. origin_head: bytes | None,
  1369. branch: bytes | None,
  1370. ref_message: bytes | None,
  1371. ) -> bytes:
  1372. """Set the default branch."""
  1373. origin_base = b"refs/remotes/" + origin + b"/"
  1374. if branch:
  1375. origin_ref = Ref(origin_base + branch)
  1376. if origin_ref in refs:
  1377. local_ref = Ref(local_branch_name(branch))
  1378. refs.add_if_new(local_ref, refs[origin_ref], ref_message)
  1379. head_ref = local_ref
  1380. elif Ref(local_tag_name(branch)) in refs:
  1381. head_ref = Ref(local_tag_name(branch))
  1382. else:
  1383. raise ValueError(f"{os.fsencode(branch)!r} is not a valid branch or tag")
  1384. elif origin_head:
  1385. head_ref = Ref(origin_head)
  1386. if origin_head.startswith(LOCAL_BRANCH_PREFIX):
  1387. origin_ref = Ref(origin_base + extract_branch_name(origin_head))
  1388. else:
  1389. origin_ref = Ref(origin_head)
  1390. try:
  1391. refs.add_if_new(head_ref, refs[origin_ref], ref_message)
  1392. except KeyError:
  1393. pass
  1394. else:
  1395. raise ValueError("neither origin_head nor branch are provided")
  1396. return head_ref
  1397. def _set_head(
  1398. refs: RefsContainer, head_ref: bytes, ref_message: bytes | None
  1399. ) -> ObjectID | None:
  1400. if head_ref.startswith(LOCAL_TAG_PREFIX):
  1401. # detach HEAD at specified tag
  1402. head = refs[Ref(head_ref)]
  1403. if isinstance(head, Tag):
  1404. _cls, obj = head.object
  1405. head = obj.get_object(obj).id
  1406. del refs[HEADREF]
  1407. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1408. else:
  1409. # set HEAD to specific branch
  1410. try:
  1411. head = refs[Ref(head_ref)]
  1412. refs.set_symbolic_ref(HEADREF, Ref(head_ref))
  1413. refs.set_if_equals(HEADREF, None, head, message=ref_message)
  1414. except KeyError:
  1415. head = None
  1416. return head
  1417. def _import_remote_refs(
  1418. refs_container: RefsContainer,
  1419. remote_name: str,
  1420. refs: Mapping[Ref, ObjectID | None],
  1421. message: bytes | None = None,
  1422. prune: bool = False,
  1423. prune_tags: bool = False,
  1424. ) -> None:
  1425. from .protocol import PEELED_TAG_SUFFIX, strip_peeled_refs
  1426. stripped_refs = strip_peeled_refs(refs)
  1427. branches: dict[Ref, ObjectID | None] = {
  1428. Ref(extract_branch_name(n)): v
  1429. for (n, v) in stripped_refs.items()
  1430. if n.startswith(LOCAL_BRANCH_PREFIX)
  1431. }
  1432. refs_container.import_refs(
  1433. Ref(b"refs/remotes/" + remote_name.encode()),
  1434. branches,
  1435. message=message,
  1436. prune=prune,
  1437. )
  1438. tags: dict[Ref, ObjectID | None] = {
  1439. Ref(extract_tag_name(n)): v
  1440. for (n, v) in stripped_refs.items()
  1441. if n.startswith(LOCAL_TAG_PREFIX) and not n.endswith(PEELED_TAG_SUFFIX)
  1442. }
  1443. refs_container.import_refs(
  1444. Ref(LOCAL_TAG_PREFIX), tags, message=message, prune=prune_tags
  1445. )
  1446. class locked_ref:
  1447. """Lock a ref while making modifications.
  1448. Works as a context manager.
  1449. """
  1450. def __init__(self, refs_container: DiskRefsContainer, refname: Ref) -> None:
  1451. """Initialize a locked ref.
  1452. Args:
  1453. refs_container: The DiskRefsContainer to lock the ref in
  1454. refname: The ref name to lock
  1455. """
  1456. self._refs_container = refs_container
  1457. self._refname = refname
  1458. self._file: _GitFile | None = None
  1459. self._realname: Ref | None = None
  1460. self._deleted = False
  1461. def __enter__(self) -> "locked_ref":
  1462. """Enter the context manager and acquire the lock.
  1463. Returns:
  1464. This locked_ref instance
  1465. Raises:
  1466. OSError: If the lock cannot be acquired
  1467. """
  1468. self._refs_container._check_refname(self._refname)
  1469. try:
  1470. realnames, _ = self._refs_container.follow(self._refname)
  1471. self._realname = realnames[-1]
  1472. except (KeyError, IndexError, SymrefLoop):
  1473. self._realname = self._refname
  1474. filename = self._refs_container.refpath(self._realname)
  1475. ensure_dir_exists(os.path.dirname(filename))
  1476. f = GitFile(filename, "wb")
  1477. self._file = f
  1478. return self
  1479. def __exit__(
  1480. self,
  1481. exc_type: type | None,
  1482. exc_value: BaseException | None,
  1483. traceback: types.TracebackType | None,
  1484. ) -> None:
  1485. """Exit the context manager and release the lock.
  1486. Args:
  1487. exc_type: Type of exception if one occurred
  1488. exc_value: Exception instance if one occurred
  1489. traceback: Traceback if an exception occurred
  1490. """
  1491. if self._file:
  1492. if exc_type is not None or self._deleted:
  1493. self._file.abort()
  1494. else:
  1495. self._file.close()
  1496. def get(self) -> bytes | None:
  1497. """Get the current value of the ref."""
  1498. if not self._file:
  1499. raise RuntimeError("locked_ref not in context")
  1500. assert self._realname is not None
  1501. current_ref = self._refs_container.read_loose_ref(self._realname)
  1502. if current_ref is None:
  1503. current_ref = self._refs_container.get_packed_refs().get(
  1504. self._realname, None
  1505. )
  1506. return current_ref
  1507. def ensure_equals(self, expected_value: bytes | None) -> bool:
  1508. """Ensure the ref currently equals the expected value.
  1509. Args:
  1510. expected_value: The expected current value of the ref
  1511. Returns:
  1512. True if the ref equals the expected value, False otherwise
  1513. """
  1514. current_value = self.get()
  1515. return current_value == expected_value
  1516. def set(self, new_ref: bytes) -> None:
  1517. """Set the ref to a new value.
  1518. Args:
  1519. new_ref: The new SHA1 or symbolic ref value
  1520. """
  1521. if not self._file:
  1522. raise RuntimeError("locked_ref not in context")
  1523. if not (valid_hexsha(new_ref) or new_ref.startswith(SYMREF)):
  1524. raise ValueError(f"{new_ref!r} must be a valid sha (40 chars) or a symref")
  1525. self._file.seek(0)
  1526. self._file.truncate()
  1527. self._file.write(new_ref + b"\n")
  1528. self._deleted = False
  1529. def set_symbolic_ref(self, target: Ref) -> None:
  1530. """Make this ref point at another ref.
  1531. Args:
  1532. target: Name of the ref to point at
  1533. """
  1534. if not self._file:
  1535. raise RuntimeError("locked_ref not in context")
  1536. self._refs_container._check_refname(target)
  1537. self._file.seek(0)
  1538. self._file.truncate()
  1539. self._file.write(SYMREF + target + b"\n")
  1540. self._deleted = False
  1541. def delete(self) -> None:
  1542. """Delete the ref file while holding the lock."""
  1543. if not self._file:
  1544. raise RuntimeError("locked_ref not in context")
  1545. # Delete the actual ref file while holding the lock
  1546. if self._realname:
  1547. filename = self._refs_container.refpath(self._realname)
  1548. try:
  1549. if os.path.lexists(filename):
  1550. os.remove(filename)
  1551. except FileNotFoundError:
  1552. pass
  1553. self._refs_container._remove_packed_ref(self._realname)
  1554. self._deleted = True
  1555. class NamespacedRefsContainer(RefsContainer):
  1556. """Wrapper that adds namespace prefix to all ref operations.
  1557. This implements Git's GIT_NAMESPACE feature, which stores refs under
  1558. refs/namespaces/<namespace>/ and filters operations to only show refs
  1559. within that namespace.
  1560. Example:
  1561. With namespace "foo", a ref "refs/heads/master" is stored as
  1562. "refs/namespaces/foo/refs/heads/master" in the underlying container.
  1563. """
  1564. def __init__(self, refs: RefsContainer, namespace: bytes) -> None:
  1565. """Initialize NamespacedRefsContainer.
  1566. Args:
  1567. refs: The underlying refs container to wrap
  1568. namespace: The namespace prefix (e.g., b"foo" or b"foo/bar")
  1569. """
  1570. super().__init__(logger=refs._logger)
  1571. self._refs = refs
  1572. # Build namespace prefix: refs/namespaces/<namespace>/
  1573. # Support nested namespaces: foo/bar -> refs/namespaces/foo/refs/namespaces/bar/
  1574. namespace_parts = namespace.split(b"/")
  1575. self._namespace_prefix = b""
  1576. for part in namespace_parts:
  1577. self._namespace_prefix += b"refs/namespaces/" + part + b"/"
  1578. def _apply_namespace(self, name: bytes) -> bytes:
  1579. """Apply namespace prefix to a ref name."""
  1580. # HEAD and other special refs are not namespaced
  1581. if name == HEADREF or not name.startswith(b"refs/"):
  1582. return name
  1583. return self._namespace_prefix + name
  1584. def _strip_namespace(self, name: bytes) -> bytes | None:
  1585. """Remove namespace prefix from a ref name.
  1586. Returns None if the ref is not in our namespace.
  1587. """
  1588. # HEAD and other special refs are not namespaced
  1589. if name == HEADREF or not name.startswith(b"refs/"):
  1590. return name
  1591. if name.startswith(self._namespace_prefix):
  1592. return name[len(self._namespace_prefix) :]
  1593. return None
  1594. def allkeys(self) -> set[Ref]:
  1595. """Return all reference keys in this namespace."""
  1596. keys: set[Ref] = set()
  1597. for key in self._refs.allkeys():
  1598. stripped = self._strip_namespace(key)
  1599. if stripped is not None:
  1600. keys.add(Ref(stripped))
  1601. return keys
  1602. def read_loose_ref(self, name: Ref) -> bytes | None:
  1603. """Read a loose reference."""
  1604. return self._refs.read_loose_ref(Ref(self._apply_namespace(name)))
  1605. def get_packed_refs(self) -> dict[Ref, ObjectID]:
  1606. """Get packed refs within this namespace."""
  1607. packed: dict[Ref, ObjectID] = {}
  1608. for name, value in self._refs.get_packed_refs().items():
  1609. stripped = self._strip_namespace(name)
  1610. if stripped is not None:
  1611. packed[Ref(stripped)] = value
  1612. return packed
  1613. def add_packed_refs(self, new_refs: Mapping[Ref, ObjectID | None]) -> None:
  1614. """Add packed refs with namespace prefix."""
  1615. namespaced_refs: dict[Ref, ObjectID | None] = {
  1616. Ref(self._apply_namespace(name)): value for name, value in new_refs.items()
  1617. }
  1618. self._refs.add_packed_refs(namespaced_refs)
  1619. def get_peeled(self, name: Ref) -> ObjectID | None:
  1620. """Return the cached peeled value of a ref."""
  1621. return self._refs.get_peeled(Ref(self._apply_namespace(name)))
  1622. def set_symbolic_ref(
  1623. self,
  1624. name: Ref,
  1625. other: Ref,
  1626. committer: bytes | None = None,
  1627. timestamp: int | None = None,
  1628. timezone: int | None = None,
  1629. message: bytes | None = None,
  1630. ) -> None:
  1631. """Make a ref point at another ref."""
  1632. self._refs.set_symbolic_ref(
  1633. Ref(self._apply_namespace(name)),
  1634. Ref(self._apply_namespace(other)),
  1635. committer=committer,
  1636. timestamp=timestamp,
  1637. timezone=timezone,
  1638. message=message,
  1639. )
  1640. def set_if_equals(
  1641. self,
  1642. name: Ref,
  1643. old_ref: ObjectID | None,
  1644. new_ref: ObjectID,
  1645. committer: bytes | None = None,
  1646. timestamp: int | None = None,
  1647. timezone: int | None = None,
  1648. message: bytes | None = None,
  1649. ) -> bool:
  1650. """Set a refname to new_ref only if it currently equals old_ref."""
  1651. return self._refs.set_if_equals(
  1652. Ref(self._apply_namespace(name)),
  1653. old_ref,
  1654. new_ref,
  1655. committer=committer,
  1656. timestamp=timestamp,
  1657. timezone=timezone,
  1658. message=message,
  1659. )
  1660. def add_if_new(
  1661. self,
  1662. name: Ref,
  1663. ref: ObjectID,
  1664. committer: bytes | None = None,
  1665. timestamp: int | None = None,
  1666. timezone: int | None = None,
  1667. message: bytes | None = None,
  1668. ) -> bool:
  1669. """Add a new reference only if it does not already exist."""
  1670. return self._refs.add_if_new(
  1671. Ref(self._apply_namespace(name)),
  1672. ref,
  1673. committer=committer,
  1674. timestamp=timestamp,
  1675. timezone=timezone,
  1676. message=message,
  1677. )
  1678. def remove_if_equals(
  1679. self,
  1680. name: Ref,
  1681. old_ref: ObjectID | None,
  1682. committer: bytes | None = None,
  1683. timestamp: int | None = None,
  1684. timezone: int | None = None,
  1685. message: bytes | None = None,
  1686. ) -> bool:
  1687. """Remove a refname only if it currently equals old_ref."""
  1688. return self._refs.remove_if_equals(
  1689. Ref(self._apply_namespace(name)),
  1690. old_ref,
  1691. committer=committer,
  1692. timestamp=timestamp,
  1693. timezone=timezone,
  1694. message=message,
  1695. )
  1696. def pack_refs(self, all: bool = False) -> None:
  1697. """Pack loose refs into packed-refs file.
  1698. Note: This packs all refs in the underlying container, not just
  1699. those in the namespace.
  1700. """
  1701. self._refs.pack_refs(all=all)
  1702. def filter_ref_prefix(refs: T, prefixes: Iterable[bytes]) -> T:
  1703. """Filter refs to only include those with a given prefix.
  1704. Args:
  1705. refs: A dictionary of refs.
  1706. prefixes: The prefixes to filter by.
  1707. """
  1708. filtered = {k: v for k, v in refs.items() if any(k.startswith(p) for p in prefixes)}
  1709. return filtered
  1710. def is_per_worktree_ref(ref: bytes) -> bool:
  1711. """Returns whether a reference is stored per worktree or not.
  1712. Per-worktree references are:
  1713. - all pseudorefs, e.g. HEAD
  1714. - all references stored inside "refs/bisect/", "refs/worktree/" and "refs/rewritten/"
  1715. All refs starting with "refs/" are shared, except for the ones listed above.
  1716. See https://git-scm.com/docs/git-worktree#_refs.
  1717. """
  1718. return not ref.startswith(b"refs/") or ref.startswith(
  1719. (b"refs/bisect/", b"refs/worktree/", b"refs/rewritten/")
  1720. )