# test_web.py -- Tests for the git HTTP server # Copyright (C) 2010 Google, Inc. # # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU # General Public License as public by the Free Software Foundation; version 2.0 # or (at your option) any later version. You can redistribute it and/or # modify it under the terms of either of these two licenses. # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # You should have received a copy of the licenses; if not, see # for a copy of the GNU General Public License # and for a copy of the Apache # License, Version 2.0. # """Tests for the Git HTTP server.""" import gzip import os import re from io import BytesIO from typing import NoReturn from dulwich.object_store import MemoryObjectStore from dulwich.objects import Blob from dulwich.repo import BaseRepo, MemoryRepo from dulwich.server import DictBackend from dulwich.tests.utils import make_object, make_tag from dulwich.web import ( HTTP_ERROR, HTTP_FORBIDDEN, HTTP_NOT_FOUND, HTTP_OK, GunzipFilter, HTTPGitApplication, HTTPGitRequest, _LengthLimitedFile, get_idx_file, get_info_packs, get_info_refs, get_loose_object, get_pack_file, get_text_file, handle_service_request, send_file, ) from . import TestCase class MinimalistWSGIInputStream: """WSGI input stream with no 'seek()' and 'tell()' methods.""" def __init__(self, data) -> None: self.data = data self.pos = 0 def read(self, howmuch): start = self.pos end = self.pos + howmuch if start >= len(self.data): return b"" self.pos = end return self.data[start:end] class MinimalistWSGIInputStream2(MinimalistWSGIInputStream): """WSGI input stream with no *working* 'seek()' and 'tell()' methods.""" def seek(self, pos) -> NoReturn: raise NotImplementedError def tell(self) -> NoReturn: raise NotImplementedError class TestHTTPGitRequest(HTTPGitRequest): """HTTPGitRequest with overridden methods to help test caching.""" def __init__(self, *args, **kwargs) -> None: HTTPGitRequest.__init__(self, *args, **kwargs) self.cached = None def nocache(self) -> None: self.cached = False def cache_forever(self) -> None: self.cached = True class WebTestCase(TestCase): """Base TestCase with useful instance vars and utility functions.""" _req_class: type[HTTPGitRequest] = TestHTTPGitRequest def setUp(self) -> None: super().setUp() self._environ = {} self._req = self._req_class( self._environ, self._start_response, handlers=self._handlers() ) self._status = None self._headers = [] self._output = BytesIO() def _start_response(self, status, headers): self._status = status self._headers = list(headers) return self._output.write def _handlers(self) -> None: return None def assertContentTypeEquals(self, expected) -> None: self.assertIn(("Content-Type", expected), self._headers) def _test_backend(objects, refs=None, named_files=None): if not refs: refs = {} if not named_files: named_files = {} repo = MemoryRepo.init_bare(objects, refs) for path, contents in named_files.items(): repo._put_named_file(path, contents) return DictBackend({"/": repo}) class DumbHandlersTestCase(WebTestCase): def test_send_file_not_found(self) -> None: list(send_file(self._req, None, "text/plain")) self.assertEqual(HTTP_NOT_FOUND, self._status) def test_send_file(self) -> None: f = BytesIO(b"foobar") output = b"".join(send_file(self._req, f, "some/thing")) self.assertEqual(b"foobar", output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals("some/thing") self.assertTrue(f.closed) def test_send_file_buffered(self) -> None: bufsize = 10240 xs = b"x" * bufsize f = BytesIO(2 * xs) self.assertEqual([xs, xs], list(send_file(self._req, f, "some/thing"))) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals("some/thing") self.assertTrue(f.closed) def test_send_file_error(self) -> None: class TestFile: def __init__(self, exc_class) -> None: self.closed = False self._exc_class = exc_class def read(self, size=-1) -> NoReturn: raise self._exc_class def close(self) -> None: self.closed = True f = TestFile(IOError) list(send_file(self._req, f, "some/thing")) self.assertEqual(HTTP_ERROR, self._status) self.assertTrue(f.closed) self.assertFalse(self._req.cached) # non-IOErrors are reraised f = TestFile(AttributeError) self.assertRaises(AttributeError, list, send_file(self._req, f, "some/thing")) self.assertTrue(f.closed) self.assertFalse(self._req.cached) def test_get_text_file(self) -> None: backend = _test_backend([], named_files={"description": b"foo"}) mat = re.search(".*", "description") output = b"".join(get_text_file(self._req, backend, mat)) self.assertEqual(b"foo", output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals("text/plain") self.assertFalse(self._req.cached) def test_get_loose_object(self) -> None: blob = make_object(Blob, data=b"foo") backend = _test_backend([blob]) mat = re.search("^(..)(.{38})$", blob.id.decode("ascii")) output = b"".join(get_loose_object(self._req, backend, mat)) self.assertEqual(blob.as_legacy_object(), output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals("application/x-git-loose-object") self.assertTrue(self._req.cached) def test_get_loose_object_missing(self) -> None: mat = re.search("^(..)(.{38})$", "1" * 40) list(get_loose_object(self._req, _test_backend([]), mat)) self.assertEqual(HTTP_NOT_FOUND, self._status) def test_get_loose_object_error(self) -> None: blob = make_object(Blob, data=b"foo") backend = _test_backend([blob]) mat = re.search("^(..)(.{38})$", blob.id.decode("ascii")) def as_legacy_object_error(self) -> NoReturn: raise OSError self.addCleanup(setattr, Blob, "as_legacy_object", Blob.as_legacy_object) Blob.as_legacy_object = as_legacy_object_error list(get_loose_object(self._req, backend, mat)) self.assertEqual(HTTP_ERROR, self._status) def test_get_pack_file(self) -> None: pack_name = os.path.join("objects", "pack", "pack-%s.pack" % ("1" * 40)) backend = _test_backend([], named_files={pack_name: b"pack contents"}) mat = re.search(".*", pack_name) output = b"".join(get_pack_file(self._req, backend, mat)) self.assertEqual(b"pack contents", output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals("application/x-git-packed-objects") self.assertTrue(self._req.cached) def test_get_idx_file(self) -> None: idx_name = os.path.join("objects", "pack", "pack-%s.idx" % ("1" * 40)) backend = _test_backend([], named_files={idx_name: b"idx contents"}) mat = re.search(".*", idx_name) output = b"".join(get_idx_file(self._req, backend, mat)) self.assertEqual(b"idx contents", output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals("application/x-git-packed-objects-toc") self.assertTrue(self._req.cached) def test_get_info_refs(self) -> None: self._environ["QUERY_STRING"] = "" blob1 = make_object(Blob, data=b"1") blob2 = make_object(Blob, data=b"2") blob3 = make_object(Blob, data=b"3") tag1 = make_tag(blob2, name=b"tag-tag") objects = [blob1, blob2, blob3, tag1] refs = { b"HEAD": b"000", b"refs/heads/master": blob1.id, b"refs/tags/tag-tag": tag1.id, b"refs/tags/blob-tag": blob3.id, } backend = _test_backend(objects, refs=refs) mat = re.search(".*", "//info/refs") self.assertEqual( [ blob1.id + b"\trefs/heads/master\n", blob3.id + b"\trefs/tags/blob-tag\n", tag1.id + b"\trefs/tags/tag-tag\n", blob2.id + b"\trefs/tags/tag-tag^{}\n", ], list(get_info_refs(self._req, backend, mat)), ) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals("text/plain") self.assertFalse(self._req.cached) def test_get_info_refs_not_found(self) -> None: self._environ["QUERY_STRING"] = "" objects = [] refs = {} backend = _test_backend(objects, refs=refs) mat = re.search("info/refs", "/foo/info/refs") self.assertEqual( [b"No git repository was found at /foo"], list(get_info_refs(self._req, backend, mat)), ) self.assertEqual(HTTP_NOT_FOUND, self._status) self.assertContentTypeEquals("text/plain") def test_get_info_packs(self) -> None: class TestPackData: def __init__(self, sha) -> None: self.filename = f"pack-{sha}.pack" class TestPack: def __init__(self, sha) -> None: self.data = TestPackData(sha) packs = [TestPack(str(i) * 40) for i in range(1, 4)] class TestObjectStore(MemoryObjectStore): # property must be overridden, can't be assigned @property def packs(self): return packs store = TestObjectStore() repo = BaseRepo(store, None) backend = DictBackend({"/": repo}) mat = re.search(".*", "//info/packs") output = b"".join(get_info_packs(self._req, backend, mat)) expected = b"".join( [(b"P pack-" + s + b".pack\n") for s in [b"1" * 40, b"2" * 40, b"3" * 40]] ) self.assertEqual(expected, output) self.assertEqual(HTTP_OK, self._status) self.assertContentTypeEquals("text/plain") self.assertFalse(self._req.cached) class SmartHandlersTestCase(WebTestCase): class _TestUploadPackHandler: def __init__( self, backend, args, proto, stateless_rpc=None, advertise_refs=False, ) -> None: self.args = args self.proto = proto self.stateless_rpc = stateless_rpc self.advertise_refs = advertise_refs def handle(self) -> None: self.proto.write(b"handled input: " + self.proto.recv(1024)) def _make_handler(self, *args, **kwargs): self._handler = self._TestUploadPackHandler(*args, **kwargs) return self._handler def _handlers(self): return {b"git-upload-pack": self._make_handler} def test_handle_service_request_unknown(self) -> None: mat = re.search(".*", "/git-evil-handler") content = list(handle_service_request(self._req, "backend", mat)) self.assertEqual(HTTP_FORBIDDEN, self._status) self.assertNotIn(b"git-evil-handler", b"".join(content)) self.assertFalse(self._req.cached) def _run_handle_service_request(self, content_length=None) -> None: self._environ["wsgi.input"] = BytesIO(b"foo") if content_length is not None: self._environ["CONTENT_LENGTH"] = content_length mat = re.search(".*", "/git-upload-pack") class Backend: def open_repository(self, path) -> None: return None handler_output = b"".join(handle_service_request(self._req, Backend(), mat)) write_output = self._output.getvalue() # Ensure all output was written via the write callback. self.assertEqual(b"", handler_output) self.assertEqual(b"handled input: foo", write_output) self.assertContentTypeEquals("application/x-git-upload-pack-result") self.assertFalse(self._handler.advertise_refs) self.assertTrue(self._handler.stateless_rpc) self.assertFalse(self._req.cached) def test_handle_service_request(self) -> None: self._run_handle_service_request() def test_handle_service_request_with_length(self) -> None: self._run_handle_service_request(content_length="3") def test_handle_service_request_empty_length(self) -> None: self._run_handle_service_request(content_length="") def test_get_info_refs_unknown(self) -> None: self._environ["QUERY_STRING"] = "service=git-evil-handler" class Backend: def open_repository(self, url) -> None: return None mat = re.search(".*", "/git-evil-pack") content = list(get_info_refs(self._req, Backend(), mat)) self.assertNotIn(b"git-evil-handler", b"".join(content)) self.assertEqual(HTTP_FORBIDDEN, self._status) self.assertFalse(self._req.cached) def test_get_info_refs(self) -> None: self._environ["wsgi.input"] = BytesIO(b"foo") self._environ["QUERY_STRING"] = "service=git-upload-pack" class Backend: def open_repository(self, url) -> None: return None mat = re.search(".*", "/git-upload-pack") handler_output = b"".join(get_info_refs(self._req, Backend(), mat)) write_output = self._output.getvalue() self.assertEqual( ( b"001e# service=git-upload-pack\n" b"0000" # input is ignored by the handler b"handled input: " ), write_output, ) # Ensure all output was written via the write callback. self.assertEqual(b"", handler_output) self.assertTrue(self._handler.advertise_refs) self.assertTrue(self._handler.stateless_rpc) self.assertFalse(self._req.cached) class LengthLimitedFileTestCase(TestCase): def test_no_cutoff(self) -> None: f = _LengthLimitedFile(BytesIO(b"foobar"), 1024) self.assertEqual(b"foobar", f.read()) def test_cutoff(self) -> None: f = _LengthLimitedFile(BytesIO(b"foobar"), 3) self.assertEqual(b"foo", f.read()) self.assertEqual(b"", f.read()) def test_multiple_reads(self) -> None: f = _LengthLimitedFile(BytesIO(b"foobar"), 3) self.assertEqual(b"fo", f.read(2)) self.assertEqual(b"o", f.read(2)) self.assertEqual(b"", f.read()) class HTTPGitRequestTestCase(WebTestCase): # This class tests the contents of the actual cache headers _req_class = HTTPGitRequest def test_not_found(self) -> None: self._req.cache_forever() # cache headers should be discarded message = "Something not found" self.assertEqual(message.encode("ascii"), self._req.not_found(message)) self.assertEqual(HTTP_NOT_FOUND, self._status) self.assertEqual({("Content-Type", "text/plain")}, set(self._headers)) def test_forbidden(self) -> None: self._req.cache_forever() # cache headers should be discarded message = "Something not found" self.assertEqual(message.encode("ascii"), self._req.forbidden(message)) self.assertEqual(HTTP_FORBIDDEN, self._status) self.assertEqual({("Content-Type", "text/plain")}, set(self._headers)) def test_respond_ok(self) -> None: self._req.respond() self.assertEqual([], self._headers) self.assertEqual(HTTP_OK, self._status) def test_respond(self) -> None: self._req.nocache() self._req.respond( status=402, content_type="some/type", headers=[("X-Foo", "foo"), ("X-Bar", "bar")], ) self.assertEqual( { ("X-Foo", "foo"), ("X-Bar", "bar"), ("Content-Type", "some/type"), ("Expires", "Fri, 01 Jan 1980 00:00:00 GMT"), ("Pragma", "no-cache"), ("Cache-Control", "no-cache, max-age=0, must-revalidate"), }, set(self._headers), ) self.assertEqual(402, self._status) class HTTPGitApplicationTestCase(TestCase): def setUp(self) -> None: super().setUp() self._app = HTTPGitApplication("backend") self._environ = { "PATH_INFO": "/foo", "REQUEST_METHOD": "GET", } def _test_handler(self, req, backend, mat) -> str: # tests interface used by all handlers self.assertEqual(self._environ, req.environ) self.assertEqual("backend", backend) self.assertEqual("/foo", mat.group(0)) return "output" def _add_handler(self, app) -> None: req = self._environ["REQUEST_METHOD"] app.services = { (req, re.compile("/foo$")): self._test_handler, } def test_call(self) -> None: self._add_handler(self._app) self.assertEqual("output", self._app(self._environ, None)) def test_fallback_app(self) -> None: def test_app(environ, start_response) -> str: return "output" app = HTTPGitApplication("backend", fallback_app=test_app) self.assertEqual("output", app(self._environ, None)) class GunzipTestCase(HTTPGitApplicationTestCase): __doc__ = """TestCase for testing the GunzipFilter, ensuring the wsgi.input is correctly decompressed and headers are corrected. """ example_text = __doc__.encode("ascii") def setUp(self) -> None: super().setUp() self._app = GunzipFilter(self._app) self._environ["HTTP_CONTENT_ENCODING"] = "gzip" self._environ["REQUEST_METHOD"] = "POST" def _get_zstream(self, text): zstream = BytesIO() zfile = gzip.GzipFile(fileobj=zstream, mode="wb") zfile.write(text) zfile.close() zlength = zstream.tell() zstream.seek(0) return zstream, zlength def _test_call(self, orig, zstream, zlength) -> None: self._add_handler(self._app.app) self.assertLess(zlength, len(orig)) self.assertEqual(self._environ["HTTP_CONTENT_ENCODING"], "gzip") self._environ["CONTENT_LENGTH"] = zlength self._environ["wsgi.input"] = zstream self._app(self._environ, None) buf = self._environ["wsgi.input"] self.assertIsNot(buf, zstream) buf.seek(0) self.assertEqual(orig, buf.read()) self.assertIs(None, self._environ.get("CONTENT_LENGTH")) self.assertNotIn("HTTP_CONTENT_ENCODING", self._environ) def test_call(self) -> None: self._test_call(self.example_text, *self._get_zstream(self.example_text)) def test_call_no_seek(self) -> None: """This ensures that the gunzipping code doesn't require any methods on 'wsgi.input' except for '.read()'. (In particular, it shouldn't require '.seek()'. See https://github.com/jelmer/dulwich/issues/140.). """ zstream, zlength = self._get_zstream(self.example_text) self._test_call( self.example_text, MinimalistWSGIInputStream(zstream.read()), zlength, ) def test_call_no_working_seek(self) -> None: """Similar to 'test_call_no_seek', but this time the methods are available (but defunct). See https://github.com/jonashaag/klaus/issues/154. """ zstream, zlength = self._get_zstream(self.example_text) self._test_call( self.example_text, MinimalistWSGIInputStream2(zstream.read()), zlength, )