|
@@ -66,7 +66,7 @@ NO_CACHE_HEADERS = [
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
-def cache_forever_headers(now=None):
|
|
|
|
|
|
|
+def cache_forever_headers(now: Optional[float] = None) -> list[tuple[str, str]]:
|
|
|
if now is None:
|
|
if now is None:
|
|
|
now = time.time()
|
|
now = time.time()
|
|
|
return [
|
|
return [
|
|
@@ -113,7 +113,7 @@ def date_time_string(timestamp: Optional[float] = None) -> str:
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def url_prefix(mat) -> str:
|
|
|
|
|
|
|
+def url_prefix(mat: re.Match[str]) -> str:
|
|
|
"""Extract the URL prefix from a regex match.
|
|
"""Extract the URL prefix from a regex match.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -125,12 +125,12 @@ def url_prefix(mat) -> str:
|
|
|
return "/" + mat.string[: mat.start()].strip("/")
|
|
return "/" + mat.string[: mat.start()].strip("/")
|
|
|
|
|
|
|
|
|
|
|
|
|
-def get_repo(backend, mat) -> BaseRepo:
|
|
|
|
|
|
|
+def get_repo(backend: 'Backend', mat: re.Match[str]) -> BaseRepo:
|
|
|
"""Get a Repo instance for the given backend and URL regex match."""
|
|
"""Get a Repo instance for the given backend and URL regex match."""
|
|
|
return backend.open_repository(url_prefix(mat))
|
|
return backend.open_repository(url_prefix(mat))
|
|
|
|
|
|
|
|
|
|
|
|
|
-def send_file(req, f, content_type):
|
|
|
|
|
|
|
+def send_file(req: 'HTTPGitRequest', f: BytesIO, content_type: str) -> Iterator[bytes]:
|
|
|
"""Send a file-like object to the request output.
|
|
"""Send a file-like object to the request output.
|
|
|
|
|
|
|
|
Args:
|
|
Args:
|
|
@@ -155,18 +155,18 @@ def send_file(req, f, content_type):
|
|
|
f.close()
|
|
f.close()
|
|
|
|
|
|
|
|
|
|
|
|
|
-def _url_to_path(url):
|
|
|
|
|
|
|
+def _url_to_path(url: str) -> str:
|
|
|
return url.replace("/", os.path.sep)
|
|
return url.replace("/", os.path.sep)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def get_text_file(req, backend, mat):
|
|
|
|
|
|
|
+def get_text_file(req: 'HTTPGitRequest', backend: 'Backend', mat: re.Match[str]) -> Iterator[bytes]:
|
|
|
req.nocache()
|
|
req.nocache()
|
|
|
path = _url_to_path(mat.group())
|
|
path = _url_to_path(mat.group())
|
|
|
logger.info("Sending plain text file %s", path)
|
|
logger.info("Sending plain text file %s", path)
|
|
|
return send_file(req, get_repo(backend, mat).get_named_file(path), "text/plain")
|
|
return send_file(req, get_repo(backend, mat).get_named_file(path), "text/plain")
|
|
|
|
|
|
|
|
|
|
|
|
|
-def get_loose_object(req, backend, mat):
|
|
|
|
|
|
|
+def get_loose_object(req: 'HTTPGitRequest', backend: 'Backend', mat: re.Match[str]) -> Iterator[bytes]:
|
|
|
sha = (mat.group(1) + mat.group(2)).encode("ascii")
|
|
sha = (mat.group(1) + mat.group(2)).encode("ascii")
|
|
|
logger.info("Sending loose object %s", sha)
|
|
logger.info("Sending loose object %s", sha)
|
|
|
object_store = get_repo(backend, mat).object_store
|
|
object_store = get_repo(backend, mat).object_store
|
|
@@ -183,7 +183,7 @@ def get_loose_object(req, backend, mat):
|
|
|
yield data
|
|
yield data
|
|
|
|
|
|
|
|
|
|
|
|
|
-def get_pack_file(req, backend, mat):
|
|
|
|
|
|
|
+def get_pack_file(req: 'HTTPGitRequest', backend: 'Backend', mat: re.Match[str]) -> Iterator[bytes]:
|
|
|
req.cache_forever()
|
|
req.cache_forever()
|
|
|
path = _url_to_path(mat.group())
|
|
path = _url_to_path(mat.group())
|
|
|
logger.info("Sending pack file %s", path)
|
|
logger.info("Sending pack file %s", path)
|
|
@@ -194,7 +194,7 @@ def get_pack_file(req, backend, mat):
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def get_idx_file(req, backend, mat):
|
|
|
|
|
|
|
+def get_idx_file(req: 'HTTPGitRequest', backend: 'Backend', mat: re.Match[str]) -> Iterator[bytes]:
|
|
|
req.cache_forever()
|
|
req.cache_forever()
|
|
|
path = _url_to_path(mat.group())
|
|
path = _url_to_path(mat.group())
|
|
|
logger.info("Sending pack file %s", path)
|
|
logger.info("Sending pack file %s", path)
|
|
@@ -205,7 +205,7 @@ def get_idx_file(req, backend, mat):
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def get_info_refs(req, backend, mat):
|
|
|
|
|
|
|
+def get_info_refs(req: 'HTTPGitRequest', backend: 'Backend', mat: re.Match[str]) -> Iterator[bytes]:
|
|
|
params = parse_qs(req.environ["QUERY_STRING"])
|
|
params = parse_qs(req.environ["QUERY_STRING"])
|
|
|
service = params.get("service", [None])[0]
|
|
service = params.get("service", [None])[0]
|
|
|
try:
|
|
try:
|
|
@@ -240,14 +240,14 @@ def get_info_refs(req, backend, mat):
|
|
|
yield from generate_info_refs(repo)
|
|
yield from generate_info_refs(repo)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def get_info_packs(req, backend, mat):
|
|
|
|
|
|
|
+def get_info_packs(req: 'HTTPGitRequest', backend: 'Backend', mat: re.Match[str]) -> Iterator[bytes]:
|
|
|
req.nocache()
|
|
req.nocache()
|
|
|
req.respond(HTTP_OK, "text/plain")
|
|
req.respond(HTTP_OK, "text/plain")
|
|
|
logger.info("Emulating dumb info/packs")
|
|
logger.info("Emulating dumb info/packs")
|
|
|
return generate_objects_info_packs(get_repo(backend, mat))
|
|
return generate_objects_info_packs(get_repo(backend, mat))
|
|
|
|
|
|
|
|
|
|
|
|
|
-def _chunk_iter(f):
|
|
|
|
|
|
|
+def _chunk_iter(f: BytesIO) -> Iterator[bytes]:
|
|
|
while True:
|
|
while True:
|
|
|
line = f.readline()
|
|
line = f.readline()
|
|
|
length = int(line.rstrip(), 16)
|
|
length = int(line.rstrip(), 16)
|
|
@@ -260,11 +260,11 @@ def _chunk_iter(f):
|
|
|
class ChunkReader:
|
|
class ChunkReader:
|
|
|
"""Reader for chunked transfer encoding streams."""
|
|
"""Reader for chunked transfer encoding streams."""
|
|
|
|
|
|
|
|
- def __init__(self, f) -> None:
|
|
|
|
|
|
|
+ def __init__(self, f: BytesIO) -> None:
|
|
|
self._iter = _chunk_iter(f)
|
|
self._iter = _chunk_iter(f)
|
|
|
self._buffer: list[bytes] = []
|
|
self._buffer: list[bytes] = []
|
|
|
|
|
|
|
|
- def read(self, n):
|
|
|
|
|
|
|
+ def read(self, n: int) -> bytes:
|
|
|
while sum(map(len, self._buffer)) < n:
|
|
while sum(map(len, self._buffer)) < n:
|
|
|
try:
|
|
try:
|
|
|
self._buffer.append(next(self._iter))
|
|
self._buffer.append(next(self._iter))
|
|
@@ -284,11 +284,11 @@ class _LengthLimitedFile:
|
|
|
but not implemented in wsgiref as of 2.5.
|
|
but not implemented in wsgiref as of 2.5.
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
- def __init__(self, input, max_bytes) -> None:
|
|
|
|
|
|
|
+ def __init__(self, input: BytesIO, max_bytes: int) -> None:
|
|
|
self._input = input
|
|
self._input = input
|
|
|
self._bytes_avail = max_bytes
|
|
self._bytes_avail = max_bytes
|
|
|
|
|
|
|
|
- def read(self, size=-1):
|
|
|
|
|
|
|
+ def read(self, size: int = -1) -> bytes:
|
|
|
if self._bytes_avail <= 0:
|
|
if self._bytes_avail <= 0:
|
|
|
return b""
|
|
return b""
|
|
|
if size == -1 or size > self._bytes_avail:
|
|
if size == -1 or size > self._bytes_avail:
|
|
@@ -299,7 +299,7 @@ class _LengthLimitedFile:
|
|
|
# TODO: support more methods as necessary
|
|
# TODO: support more methods as necessary
|
|
|
|
|
|
|
|
|
|
|
|
|
-def handle_service_request(req, backend, mat):
|
|
|
|
|
|
|
+def handle_service_request(req: 'HTTPGitRequest', backend: 'Backend', mat: re.Match[str]) -> Iterator[bytes]:
|
|
|
service = mat.group().lstrip("/")
|
|
service = mat.group().lstrip("/")
|
|
|
logger.info("Handling service request for %s", service)
|
|
logger.info("Handling service request for %s", service)
|
|
|
handler_cls = req.handlers.get(service.encode("ascii"), None)
|
|
handler_cls = req.handlers.get(service.encode("ascii"), None)
|