# dumb.py -- Support for dumb HTTP(S) git repositories
# Copyright (C) 2025 Dulwich contributors
#
# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as published by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# for a copy of the GNU General Public License
# and for a copy of the Apache
# License, Version 2.0.
#
"""Support for dumb HTTP(S) git repositories."""
__all__ = [
"DumbHTTPObjectStore",
"DumbRemoteHTTPRepo",
]
import os
import tempfile
import zlib
from collections.abc import Callable, Iterator, Mapping, Sequence
from io import BytesIO
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin
if TYPE_CHECKING:
from .object_format import ObjectFormat
from .errors import NotGitRepository, ObjectFormatException
from .object_store import BaseObjectStore
from .objects import (
ZERO_SHA,
Blob,
Commit,
ObjectID,
RawObjectID,
ShaFile,
Tag,
Tree,
hex_to_sha,
sha_to_hex,
)
from .pack import Pack, PackData, PackIndex, UnpackedObject, load_pack_index_file
from .protocol import split_peeled_refs
from .refs import Ref, read_info_refs
class DumbHTTPObjectStore(BaseObjectStore):
"""Object store implementation that fetches objects over dumb HTTP."""
def __init__(
self,
base_url: str,
http_request_func: Callable[
[str, dict[str, str]], tuple[Any, Callable[..., bytes]]
],
object_format: "ObjectFormat | None" = None,
) -> None:
"""Initialize a DumbHTTPObjectStore.
Args:
base_url: Base URL of the remote repository (e.g. "https://example.com/repo.git/")
http_request_func: Function to make HTTP requests, should accept (url, headers)
and return (response, read_func).
object_format: Object format to use (defaults to DEFAULT_OBJECT_FORMAT)
"""
super().__init__(object_format=object_format)
self.base_url = base_url.rstrip("/") + "/"
self._http_request = http_request_func
self._packs: list[tuple[str, PackIndex | None]] | None = None
self._cached_objects: dict[bytes, tuple[int, bytes]] = {}
self._temp_pack_dir: str | None = None
def _ensure_temp_pack_dir(self) -> None:
"""Ensure we have a temporary directory for storing pack files."""
if self._temp_pack_dir is None:
self._temp_pack_dir = tempfile.mkdtemp(prefix="dulwich-dumb-")
def _fetch_url(self, path: str) -> bytes:
"""Fetch content from a URL path relative to base_url.
Args:
path: Path relative to base URL
Returns:
Content as bytes
Raises:
IOError: If the URL cannot be fetched
"""
url = urljoin(self.base_url, path)
resp, read = self._http_request(url, {})
try:
if resp.status == 404:
raise OSError(f"Not found: {url}")
elif resp.status != 200:
raise OSError(f"HTTP error {resp.status}: {url}")
# Read all content
chunks = []
while True:
chunk = read(4096)
if not chunk:
break
chunks.append(chunk)
return b"".join(chunks)
finally:
resp.close()
def _fetch_loose_object(self, sha: bytes) -> tuple[int, bytes]:
"""Fetch a loose object by SHA.
Args:
sha: SHA1 of the object (hex string as bytes)
Returns:
Tuple of (type_num, content)
Raises:
KeyError: If object not found
"""
hex_sha = sha.decode("ascii")
path = f"objects/{hex_sha[:2]}/{hex_sha[2:]}"
try:
compressed = self._fetch_url(path)
except OSError:
raise KeyError(sha)
# Decompress and parse the object
decompressed = zlib.decompress(compressed)
# Parse header
header_end = decompressed.find(b"\x00")
if header_end == -1:
raise ObjectFormatException("Invalid object header")
header = decompressed[:header_end]
content = decompressed[header_end + 1 :]
parts = header.split(b" ", 1)
if len(parts) != 2:
raise ObjectFormatException("Invalid object header")
obj_type = parts[0]
obj_size = int(parts[1])
if len(content) != obj_size:
raise ObjectFormatException("Object size mismatch")
# Convert type name to type number
type_map = {
b"blob": Blob.type_num,
b"tree": Tree.type_num,
b"commit": Commit.type_num,
b"tag": Tag.type_num,
}
if obj_type not in type_map:
raise ObjectFormatException(f"Unknown object type: {obj_type!r}")
return type_map[obj_type], content
def _load_packs(self) -> None:
"""Load the list of available packs from the remote."""
if self._packs is not None:
return
self._packs = []
try:
packs_data = self._fetch_url("objects/info/packs")
except OSError:
# No packs file, repository might only have loose objects
return
for line in packs_data.strip().split(b"\n"):
if line.startswith(b"P "):
pack_name = line[2:].decode("utf-8")
# Extract just the pack name without path
if "/" in pack_name:
pack_name = pack_name.split("/")[-1]
if pack_name.endswith(".pack"):
pack_name = pack_name[:-5] # Remove .pack extension
self._packs.append((pack_name, None))
def _get_pack_index(self, pack_name: str) -> PackIndex:
"""Get or fetch a pack index.
Args:
pack_name: Name of the pack (without .idx extension)
Returns:
PackIndex object
"""
# Find the pack in our list
for i, (name, idx) in enumerate(self._packs or []):
if name == pack_name:
if idx is None:
# Fetch and cache the index
idx_data = self._fetch_url(f"objects/pack/{pack_name}.idx")
idx = load_pack_index_file("", BytesIO(idx_data), self.object_format)
if self._packs is not None:
self._packs[i] = (name, idx)
return idx
raise KeyError(f"Pack not found: {pack_name}")
def _fetch_from_pack(self, sha: RawObjectID | ObjectID) -> tuple[int, bytes]:
"""Try to fetch an object from pack files.
Args:
sha: SHA1 of the object (hex string as bytes)
Returns:
Tuple of (type_num, content)
Raises:
KeyError: If object not found in any pack
"""
self._load_packs()
# Convert hex to binary for pack operations
if len(sha) == 20:
binsha = RawObjectID(sha) # Already binary
else:
binsha = hex_to_sha(ObjectID(sha)) # Convert hex to binary
for pack_name, pack_idx in self._packs or []:
if pack_idx is None:
pack_idx = self._get_pack_index(pack_name)
try:
# Check if object is in this pack
pack_idx.object_offset(binsha)
except KeyError:
continue
# We found the object, now we need to fetch the pack data
# For efficiency, we could fetch just the needed portion, but for
# simplicity we'll fetch the whole pack and cache it
self._ensure_temp_pack_dir()
if self._temp_pack_dir is None:
raise RuntimeError("Temp pack directory not initialized")
pack_path = os.path.join(self._temp_pack_dir, f"{pack_name}.pack")
if not os.path.exists(pack_path):
# Download the pack file
data = self._fetch_url(f"objects/pack/{pack_name}.pack")
with open(pack_path, "wb") as f:
f.write(data)
# Open the pack and get the object
pack_data = PackData(pack_path, object_format=self.object_format)
pack = Pack.from_objects(
pack_data, pack_idx
)
try:
return pack.get_raw(binsha)
finally:
pack.close()
raise KeyError(sha)
def get_raw(self, sha: RawObjectID | ObjectID) -> tuple[int, bytes]:
"""Obtain the raw text for an object.
Args:
sha: SHA1 of the object
Returns:
Tuple with numeric type and object contents
"""
# Check cache first
if sha in self._cached_objects:
return self._cached_objects[sha]
# Try packs first
try:
result = self._fetch_from_pack(sha)
self._cached_objects[sha] = result
return result
except KeyError:
pass
# Try loose object
result = self._fetch_loose_object(sha)
self._cached_objects[sha] = result
return result
def contains_loose(self, sha: RawObjectID | ObjectID) -> bool:
"""Check if a particular object is present by SHA1 and is loose."""
try:
self._fetch_loose_object(sha)
return True
except KeyError:
return False
def __contains__(self, sha: RawObjectID | ObjectID) -> bool:
"""Check if a particular object is present by SHA1."""
if sha in self._cached_objects:
return True
# Try packs
try:
self._fetch_from_pack(sha)
return True
except KeyError:
pass
# Try loose object
try:
self._fetch_loose_object(sha)
return True
except KeyError:
return False
def __iter__(self) -> Iterator[ObjectID]:
"""Iterate over all SHAs in the store.
Note: This is inefficient for dumb HTTP as it requires
downloading all pack indices.
"""
seen = set()
# We can't efficiently list loose objects over dumb HTTP
# So we only iterate pack objects
self._load_packs()
for pack_name, idx in self._packs or []:
if idx is None:
idx = self._get_pack_index(pack_name)
for sha in idx:
if sha not in seen:
seen.add(sha)
yield sha_to_hex(RawObjectID(sha))
@property
def packs(self) -> list[Any]:
"""Iterable of pack objects.
Note: Returns empty list as we don't have actual Pack objects.
"""
return []
def add_object(self, obj: ShaFile) -> None:
"""Add a single object to this object store."""
raise NotImplementedError("Cannot add objects to dumb HTTP repository")
def add_objects(
self,
objects: Sequence[tuple[ShaFile, str | None]],
progress: Callable[[str], None] | None = None,
) -> "Pack | None":
"""Add a set of objects to this object store."""
raise NotImplementedError("Cannot add objects to dumb HTTP repository")
def __del__(self) -> None:
"""Clean up temporary directory on deletion."""
if self._temp_pack_dir and os.path.exists(self._temp_pack_dir):
import shutil
shutil.rmtree(self._temp_pack_dir, ignore_errors=True)
class DumbRemoteHTTPRepo:
"""Repository implementation for dumb HTTP remotes."""
def __init__(
self,
base_url: str,
http_request_func: Callable[
[str, dict[str, str]], tuple[Any, Callable[..., bytes]]
],
) -> None:
"""Initialize a DumbRemoteHTTPRepo.
Args:
base_url: Base URL of the remote repository
http_request_func: Function to make HTTP requests.
"""
self.base_url = base_url.rstrip("/") + "/"
self._http_request = http_request_func
self._refs: dict[Ref, ObjectID] | None = None
self._peeled: dict[Ref, ObjectID] | None = None
self.object_store = DumbHTTPObjectStore(base_url, http_request_func)
def _fetch_url(self, path: str) -> bytes:
"""Fetch content from a URL path relative to base_url."""
url = urljoin(self.base_url, path)
resp, read = self._http_request(url, {})
try:
if resp.status == 404:
raise OSError(f"Not found: {url}")
elif resp.status != 200:
raise OSError(f"HTTP error {resp.status}: {url}")
chunks = []
while True:
chunk = read(4096)
if not chunk:
break
chunks.append(chunk)
return b"".join(chunks)
finally:
resp.close()
def get_refs(self) -> dict[Ref, ObjectID]:
"""Get dictionary with all refs."""
if self._refs is None:
# Fetch info/refs
try:
refs_data = self._fetch_url("info/refs")
except OSError:
raise NotGitRepository(f"Cannot read refs from {self.base_url}")
refs_hex = read_info_refs(BytesIO(refs_data))
# Keep SHAs as hex
refs_raw, peeled_raw = split_peeled_refs(refs_hex)
# Convert to typed dicts
self._refs = {Ref(k): ObjectID(v) for k, v in refs_raw.items()}
self._peeled = peeled_raw
return dict(self._refs)
def get_head(self) -> Ref:
"""Get the current HEAD reference.
Returns:
HEAD reference name or commit ID
"""
head_resp_bytes = self._fetch_url("HEAD")
head_split = head_resp_bytes.replace(b"\n", b"").split(b" ")
head_target_bytes = head_split[1] if len(head_split) > 1 else head_split[0]
# handle HEAD legacy format containing a commit id instead of a ref name
for ref_name, ret_target in self.get_refs().items():
if ret_target == head_target_bytes:
return ref_name
return Ref(head_target_bytes)
def get_peeled(self, ref: Ref) -> ObjectID:
"""Get the peeled value of a ref."""
# For dumb HTTP, we don't have peeled refs readily available
# We would need to fetch and parse tag objects
sha: ObjectID | None = self.get_refs().get(ref, None)
return sha if sha is not None else ZERO_SHA
def fetch_pack_data(
self,
determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]],
graph_walker: object,
progress: Callable[[bytes], None] | None = None,
*,
get_tagged: bool | None = None,
depth: int | None = None,
) -> Iterator[UnpackedObject]:
"""Fetch pack data from the remote.
This is the main method for fetching objects from a dumb HTTP remote.
Since dumb HTTP doesn't support negotiation, we need to download
all objects reachable from the wanted refs.
Args:
determine_wants: Function that returns list of wanted SHAs
graph_walker: GraphWalker instance (not used for dumb HTTP)
progress: Optional progress callback
get_tagged: Whether to get tagged objects
depth: Depth for shallow clones (not supported for dumb HTTP)
Returns:
Iterator of UnpackedObject instances
"""
refs = self.get_refs()
wants = determine_wants(refs, depth)
if not wants:
return
# For dumb HTTP, we traverse the object graph starting from wants
to_fetch = set(wants)
seen = set()
while to_fetch:
sha = to_fetch.pop()
if sha in seen:
continue
seen.add(sha)
# Fetch the object
try:
type_num, content = self.object_store.get_raw(sha)
except KeyError:
# Object not found, skip it
continue
unpacked = UnpackedObject(type_num, sha=sha, decomp_chunks=[content])
yield unpacked
# Parse the object to find references to other objects
obj = ShaFile.from_raw_string(type_num, content)
if isinstance(obj, Commit): # Commit
to_fetch.add(obj.tree)
for parent in obj.parents:
to_fetch.add(parent)
elif isinstance(obj, Tag): # Tag
to_fetch.add(obj.object[1])
elif isinstance(obj, Tree): # Tree
for _, _, item_sha in obj.items():
assert item_sha is not None
to_fetch.add(item_sha)
if progress:
progress(f"Fetching objects: {len(seen)} done\n".encode())