|
|
@@ -831,42 +831,46 @@ class DiskRefsContainer(RefsContainer):
|
|
|
"""Return string representation of DiskRefsContainer."""
|
|
|
return f"{self.__class__.__name__}({self.path!r})"
|
|
|
|
|
|
+ def _iter_dir(
|
|
|
+ self,
|
|
|
+ path: bytes,
|
|
|
+ base: bytes,
|
|
|
+ dir_filter: Optional[Callable[[bytes], bool]] = None,
|
|
|
+ ) -> Iterator[bytes]:
|
|
|
+ refspath = os.path.join(path, base.rstrip(b"/"))
|
|
|
+ prefix_len = len(os.path.join(path, b""))
|
|
|
+
|
|
|
+ for root, dirs, files in os.walk(refspath):
|
|
|
+ directory = root[prefix_len:]
|
|
|
+ if os.path.sep != "/":
|
|
|
+ directory = directory.replace(os.fsencode(os.path.sep), b"/")
|
|
|
+ if dir_filter is not None:
|
|
|
+ dirs[:] = [
|
|
|
+ d for d in dirs if dir_filter(b"/".join([directory, d, b""]))
|
|
|
+ ]
|
|
|
+
|
|
|
+ for filename in files:
|
|
|
+ refname = b"/".join([directory, filename])
|
|
|
+ if check_ref_format(refname):
|
|
|
+ yield refname
|
|
|
+
|
|
|
def _iter_loose_refs(self, base: bytes = b"refs/") -> Iterator[bytes]:
|
|
|
base = base.rstrip(b"/") + b"/"
|
|
|
- is_main_worktree = self.worktree_path == self.path
|
|
|
-
|
|
|
- def _iter_path(path: bytes, base: bytes, worktree: bool) -> Iterator[bytes]:
|
|
|
- refspath = os.path.join(path, base.rstrip(b"/"))
|
|
|
- prefix_len = len(os.path.join(path, b""))
|
|
|
- for root, dirs, files in os.walk(refspath):
|
|
|
- directory = root[prefix_len:]
|
|
|
- if os.path.sep != "/":
|
|
|
- directory = directory.replace(os.fsencode(os.path.sep), b"/")
|
|
|
-
|
|
|
- if not is_main_worktree:
|
|
|
- dirs[:] = [
|
|
|
- d
|
|
|
- for d in dirs
|
|
|
- if is_per_worktree_ref(b"/".join([directory, d, b""]))
|
|
|
- == worktree
|
|
|
- ]
|
|
|
-
|
|
|
- for filename in files:
|
|
|
- refname = b"/".join([directory, filename])
|
|
|
- if check_ref_format(refname):
|
|
|
- yield refname
|
|
|
-
|
|
|
- if base == b"refs/":
|
|
|
- paths = [(self.path, False)]
|
|
|
- if not is_main_worktree:
|
|
|
- paths.append((self.worktree_path, True))
|
|
|
+ search_paths: list[tuple[bytes, Optional[Callable[[bytes], bool]]]] = []
|
|
|
+ if base != b"refs/":
|
|
|
+ path = self.worktree_path if is_per_worktree_ref(base) else self.path
|
|
|
+ search_paths.append((path, None))
|
|
|
+ elif self.worktree_path == self.path:
|
|
|
+ # Iterate through all the refs from the main worktree
|
|
|
+ search_paths.append((self.path, None))
|
|
|
else:
|
|
|
- is_worktree = is_per_worktree_ref(base)
|
|
|
- path = self.worktree_path if is_worktree else self.path
|
|
|
- paths = [(path, is_worktree)]
|
|
|
+ # Iterate through all the shared refs from the commondir, excluding per-worktree refs
|
|
|
+ search_paths.append((self.path, lambda r: not is_per_worktree_ref(r)))
|
|
|
+ # Iterate through all the per-worktree refs from the worktree's gitdir
|
|
|
+ search_paths.append((self.worktree_path, is_per_worktree_ref))
|
|
|
|
|
|
- for path, is_worktree in paths:
|
|
|
- yield from _iter_path(path, base, is_worktree)
|
|
|
+ for path, dir_filter in search_paths:
|
|
|
+ yield from self._iter_dir(path, base, dir_filter=dir_filter)
|
|
|
|
|
|
def subkeys(self, base: bytes) -> set[bytes]:
|
|
|
"""Return subkeys under a given base reference path."""
|