test_web.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. # test_web.py -- Tests for the git HTTP server
  2. # Copyright (C) 2010 Google, Inc.
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as published by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the Git HTTP server."""
  22. import gzip
  23. import logging
  24. import os
  25. import re
  26. from io import BytesIO
  27. from typing import NoReturn
  28. from dulwich.object_store import MemoryObjectStore
  29. from dulwich.objects import Blob
  30. from dulwich.repo import BaseRepo, MemoryRepo
  31. from dulwich.server import DictBackend
  32. from dulwich.tests.utils import make_object, make_tag
  33. from dulwich.web import (
  34. HTTP_ERROR,
  35. HTTP_FORBIDDEN,
  36. HTTP_NOT_FOUND,
  37. HTTP_OK,
  38. GunzipFilter,
  39. HTTPGitApplication,
  40. HTTPGitRequest,
  41. _LengthLimitedFile,
  42. get_idx_file,
  43. get_info_packs,
  44. get_info_refs,
  45. get_loose_object,
  46. get_pack_file,
  47. get_text_file,
  48. handle_service_request,
  49. send_file,
  50. )
  51. from . import TestCase
  52. class MinimalistWSGIInputStream:
  53. """WSGI input stream with no 'seek()' and 'tell()' methods."""
  54. def __init__(self, data) -> None:
  55. self.data = data
  56. self.pos = 0
  57. def read(self, howmuch):
  58. start = self.pos
  59. end = self.pos + howmuch
  60. if start >= len(self.data):
  61. return b""
  62. self.pos = end
  63. return self.data[start:end]
  64. class MinimalistWSGIInputStream2(MinimalistWSGIInputStream):
  65. """WSGI input stream with no *working* 'seek()' and 'tell()' methods."""
  66. def seek(self, pos) -> NoReturn:
  67. raise NotImplementedError
  68. def tell(self) -> NoReturn:
  69. raise NotImplementedError
  70. class TestHTTPGitRequest(HTTPGitRequest):
  71. """HTTPGitRequest with overridden methods to help test caching."""
  72. def __init__(self, *args, **kwargs) -> None:
  73. HTTPGitRequest.__init__(self, *args, **kwargs)
  74. self.cached = None
  75. def nocache(self) -> None:
  76. self.cached = False
  77. def cache_forever(self) -> None:
  78. self.cached = True
  79. class WebTestCase(TestCase):
  80. """Base TestCase with useful instance vars and utility functions."""
  81. _req_class: type[HTTPGitRequest] = TestHTTPGitRequest
  82. def setUp(self) -> None:
  83. super().setUp()
  84. # Suppress expected error logging during web tests
  85. web_logger = logging.getLogger("dulwich.web")
  86. original_level = web_logger.level
  87. web_logger.setLevel(logging.CRITICAL)
  88. self.addCleanup(web_logger.setLevel, original_level)
  89. self._environ = {}
  90. self._req = self._req_class(
  91. self._environ, self._start_response, handlers=self._handlers()
  92. )
  93. self._status = None
  94. self._headers = []
  95. self._output = BytesIO()
  96. def _start_response(self, status, headers):
  97. self._status = status
  98. self._headers = list(headers)
  99. return self._output.write
  100. def _handlers(self) -> None:
  101. return None
  102. def assertContentTypeEquals(self, expected) -> None:
  103. self.assertIn(("Content-Type", expected), self._headers)
  104. def _test_backend(objects, refs=None, named_files=None):
  105. if not refs:
  106. refs = {}
  107. if not named_files:
  108. named_files = {}
  109. repo = MemoryRepo.init_bare(objects, refs)
  110. for path, contents in named_files.items():
  111. repo._put_named_file(path, contents)
  112. return DictBackend({"/": repo})
  113. class DumbHandlersTestCase(WebTestCase):
  114. def test_send_file_not_found(self) -> None:
  115. list(send_file(self._req, None, "text/plain"))
  116. self.assertEqual(HTTP_NOT_FOUND, self._status)
  117. def test_send_file(self) -> None:
  118. f = BytesIO(b"foobar")
  119. output = b"".join(send_file(self._req, f, "some/thing"))
  120. self.assertEqual(b"foobar", output)
  121. self.assertEqual(HTTP_OK, self._status)
  122. self.assertContentTypeEquals("some/thing")
  123. self.assertTrue(f.closed)
  124. def test_send_file_buffered(self) -> None:
  125. bufsize = 10240
  126. xs = b"x" * bufsize
  127. f = BytesIO(2 * xs)
  128. self.assertEqual([xs, xs], list(send_file(self._req, f, "some/thing")))
  129. self.assertEqual(HTTP_OK, self._status)
  130. self.assertContentTypeEquals("some/thing")
  131. self.assertTrue(f.closed)
  132. def test_send_file_error(self) -> None:
  133. class TestFile:
  134. def __init__(self, exc_class) -> None:
  135. self.closed = False
  136. self._exc_class = exc_class
  137. def read(self, size=-1) -> NoReturn:
  138. raise self._exc_class
  139. def close(self) -> None:
  140. self.closed = True
  141. f = TestFile(IOError)
  142. list(send_file(self._req, f, "some/thing"))
  143. self.assertEqual(HTTP_ERROR, self._status)
  144. self.assertTrue(f.closed)
  145. self.assertFalse(self._req.cached)
  146. # non-IOErrors are reraised
  147. f = TestFile(AttributeError)
  148. self.assertRaises(AttributeError, list, send_file(self._req, f, "some/thing"))
  149. self.assertTrue(f.closed)
  150. self.assertFalse(self._req.cached)
  151. def test_get_text_file(self) -> None:
  152. backend = _test_backend([], named_files={"description": b"foo"})
  153. mat = re.search(".*", "description")
  154. output = b"".join(get_text_file(self._req, backend, mat))
  155. self.assertEqual(b"foo", output)
  156. self.assertEqual(HTTP_OK, self._status)
  157. self.assertContentTypeEquals("text/plain")
  158. self.assertFalse(self._req.cached)
  159. def test_get_loose_object(self) -> None:
  160. blob = make_object(Blob, data=b"foo")
  161. backend = _test_backend([blob])
  162. mat = re.search("^(..)(.{38})$", blob.id.decode("ascii"))
  163. output = b"".join(get_loose_object(self._req, backend, mat))
  164. self.assertEqual(blob.as_legacy_object(), output)
  165. self.assertEqual(HTTP_OK, self._status)
  166. self.assertContentTypeEquals("application/x-git-loose-object")
  167. self.assertTrue(self._req.cached)
  168. def test_get_loose_object_missing(self) -> None:
  169. mat = re.search("^(..)(.{38})$", "1" * 40)
  170. list(get_loose_object(self._req, _test_backend([]), mat))
  171. self.assertEqual(HTTP_NOT_FOUND, self._status)
  172. def test_get_loose_object_error(self) -> None:
  173. blob = make_object(Blob, data=b"foo")
  174. backend = _test_backend([blob])
  175. mat = re.search("^(..)(.{38})$", blob.id.decode("ascii"))
  176. def as_legacy_object_error(self) -> NoReturn:
  177. raise OSError
  178. self.addCleanup(setattr, Blob, "as_legacy_object", Blob.as_legacy_object)
  179. Blob.as_legacy_object = as_legacy_object_error
  180. list(get_loose_object(self._req, backend, mat))
  181. self.assertEqual(HTTP_ERROR, self._status)
  182. def test_get_pack_file(self) -> None:
  183. pack_name = os.path.join("objects", "pack", "pack-%s.pack" % ("1" * 40))
  184. backend = _test_backend([], named_files={pack_name: b"pack contents"})
  185. mat = re.search(".*", pack_name)
  186. output = b"".join(get_pack_file(self._req, backend, mat))
  187. self.assertEqual(b"pack contents", output)
  188. self.assertEqual(HTTP_OK, self._status)
  189. self.assertContentTypeEquals("application/x-git-packed-objects")
  190. self.assertTrue(self._req.cached)
  191. def test_get_idx_file(self) -> None:
  192. idx_name = os.path.join("objects", "pack", "pack-%s.idx" % ("1" * 40))
  193. backend = _test_backend([], named_files={idx_name: b"idx contents"})
  194. mat = re.search(".*", idx_name)
  195. output = b"".join(get_idx_file(self._req, backend, mat))
  196. self.assertEqual(b"idx contents", output)
  197. self.assertEqual(HTTP_OK, self._status)
  198. self.assertContentTypeEquals("application/x-git-packed-objects-toc")
  199. self.assertTrue(self._req.cached)
  200. def test_get_info_refs(self) -> None:
  201. self._environ["QUERY_STRING"] = ""
  202. blob1 = make_object(Blob, data=b"1")
  203. blob2 = make_object(Blob, data=b"2")
  204. blob3 = make_object(Blob, data=b"3")
  205. tag1 = make_tag(blob2, name=b"tag-tag")
  206. objects = [blob1, blob2, blob3, tag1]
  207. refs = {
  208. b"HEAD": b"000",
  209. b"refs/heads/master": blob1.id,
  210. b"refs/tags/tag-tag": tag1.id,
  211. b"refs/tags/blob-tag": blob3.id,
  212. }
  213. backend = _test_backend(objects, refs=refs)
  214. mat = re.search(".*", "//info/refs")
  215. self.assertEqual(
  216. [
  217. blob1.id + b"\trefs/heads/master\n",
  218. blob3.id + b"\trefs/tags/blob-tag\n",
  219. tag1.id + b"\trefs/tags/tag-tag\n",
  220. blob2.id + b"\trefs/tags/tag-tag^{}\n",
  221. ],
  222. list(get_info_refs(self._req, backend, mat)),
  223. )
  224. self.assertEqual(HTTP_OK, self._status)
  225. self.assertContentTypeEquals("text/plain")
  226. self.assertFalse(self._req.cached)
  227. def test_get_info_refs_not_found(self) -> None:
  228. self._environ["QUERY_STRING"] = ""
  229. objects = []
  230. refs = {}
  231. backend = _test_backend(objects, refs=refs)
  232. mat = re.search("info/refs", "/foo/info/refs")
  233. self.assertEqual(
  234. [b"No git repository was found at /foo"],
  235. list(get_info_refs(self._req, backend, mat)),
  236. )
  237. self.assertEqual(HTTP_NOT_FOUND, self._status)
  238. self.assertContentTypeEquals("text/plain")
  239. def test_get_info_packs(self) -> None:
  240. class TestPackData:
  241. def __init__(self, sha) -> None:
  242. self.filename = f"pack-{sha}.pack"
  243. class TestPack:
  244. def __init__(self, sha) -> None:
  245. self.data = TestPackData(sha)
  246. packs = [TestPack(str(i) * 40) for i in range(1, 4)]
  247. class TestObjectStore(MemoryObjectStore):
  248. # property must be overridden, can't be assigned
  249. @property
  250. def packs(self):
  251. return packs
  252. store = TestObjectStore()
  253. repo = BaseRepo(store, None)
  254. backend = DictBackend({"/": repo})
  255. mat = re.search(".*", "//info/packs")
  256. output = b"".join(get_info_packs(self._req, backend, mat))
  257. expected = b"".join(
  258. [(b"P pack-" + s + b".pack\n") for s in [b"1" * 40, b"2" * 40, b"3" * 40]]
  259. )
  260. self.assertEqual(expected, output)
  261. self.assertEqual(HTTP_OK, self._status)
  262. self.assertContentTypeEquals("text/plain")
  263. self.assertFalse(self._req.cached)
  264. class SmartHandlersTestCase(WebTestCase):
  265. class _TestUploadPackHandler:
  266. def __init__(
  267. self,
  268. backend,
  269. args,
  270. proto,
  271. stateless_rpc=None,
  272. advertise_refs=False,
  273. ) -> None:
  274. self.args = args
  275. self.proto = proto
  276. self.stateless_rpc = stateless_rpc
  277. self.advertise_refs = advertise_refs
  278. def handle(self) -> None:
  279. self.proto.write(b"handled input: " + self.proto.recv(1024))
  280. def _make_handler(self, *args, **kwargs):
  281. self._handler = self._TestUploadPackHandler(*args, **kwargs)
  282. return self._handler
  283. def _handlers(self):
  284. return {b"git-upload-pack": self._make_handler}
  285. def test_handle_service_request_unknown(self) -> None:
  286. mat = re.search(".*", "/git-evil-handler")
  287. content = list(handle_service_request(self._req, "backend", mat))
  288. self.assertEqual(HTTP_FORBIDDEN, self._status)
  289. self.assertNotIn(b"git-evil-handler", b"".join(content))
  290. self.assertFalse(self._req.cached)
  291. def _run_handle_service_request(self, content_length=None) -> None:
  292. self._environ["wsgi.input"] = BytesIO(b"foo")
  293. if content_length is not None:
  294. self._environ["CONTENT_LENGTH"] = content_length
  295. mat = re.search(".*", "/git-upload-pack")
  296. class Backend:
  297. def open_repository(self, path) -> None:
  298. return None
  299. handler_output = b"".join(handle_service_request(self._req, Backend(), mat))
  300. write_output = self._output.getvalue()
  301. # Ensure all output was written via the write callback.
  302. self.assertEqual(b"", handler_output)
  303. self.assertEqual(b"handled input: foo", write_output)
  304. self.assertContentTypeEquals("application/x-git-upload-pack-result")
  305. self.assertFalse(self._handler.advertise_refs)
  306. self.assertTrue(self._handler.stateless_rpc)
  307. self.assertFalse(self._req.cached)
  308. def test_handle_service_request(self) -> None:
  309. self._run_handle_service_request()
  310. def test_handle_service_request_with_length(self) -> None:
  311. self._run_handle_service_request(content_length="3")
  312. def test_handle_service_request_empty_length(self) -> None:
  313. self._run_handle_service_request(content_length="")
  314. def test_get_info_refs_unknown(self) -> None:
  315. self._environ["QUERY_STRING"] = "service=git-evil-handler"
  316. class Backend:
  317. def open_repository(self, url) -> None:
  318. return None
  319. mat = re.search(".*", "/git-evil-pack")
  320. content = list(get_info_refs(self._req, Backend(), mat))
  321. self.assertNotIn(b"git-evil-handler", b"".join(content))
  322. self.assertEqual(HTTP_FORBIDDEN, self._status)
  323. self.assertFalse(self._req.cached)
  324. def test_get_info_refs(self) -> None:
  325. self._environ["wsgi.input"] = BytesIO(b"foo")
  326. self._environ["QUERY_STRING"] = "service=git-upload-pack"
  327. class Backend:
  328. def open_repository(self, url) -> None:
  329. return None
  330. mat = re.search(".*", "/git-upload-pack")
  331. handler_output = b"".join(get_info_refs(self._req, Backend(), mat))
  332. write_output = self._output.getvalue()
  333. self.assertEqual(
  334. (
  335. b"001e# service=git-upload-pack\n"
  336. b"0000"
  337. # input is ignored by the handler
  338. b"handled input: "
  339. ),
  340. write_output,
  341. )
  342. # Ensure all output was written via the write callback.
  343. self.assertEqual(b"", handler_output)
  344. self.assertTrue(self._handler.advertise_refs)
  345. self.assertTrue(self._handler.stateless_rpc)
  346. self.assertFalse(self._req.cached)
  347. class LengthLimitedFileTestCase(TestCase):
  348. def test_no_cutoff(self) -> None:
  349. f = _LengthLimitedFile(BytesIO(b"foobar"), 1024)
  350. self.assertEqual(b"foobar", f.read())
  351. def test_cutoff(self) -> None:
  352. f = _LengthLimitedFile(BytesIO(b"foobar"), 3)
  353. self.assertEqual(b"foo", f.read())
  354. self.assertEqual(b"", f.read())
  355. def test_multiple_reads(self) -> None:
  356. f = _LengthLimitedFile(BytesIO(b"foobar"), 3)
  357. self.assertEqual(b"fo", f.read(2))
  358. self.assertEqual(b"o", f.read(2))
  359. self.assertEqual(b"", f.read())
  360. class HTTPGitRequestTestCase(WebTestCase):
  361. # This class tests the contents of the actual cache headers
  362. _req_class = HTTPGitRequest
  363. def test_not_found(self) -> None:
  364. self._req.cache_forever() # cache headers should be discarded
  365. message = "Something not found"
  366. self.assertEqual(message.encode("ascii"), self._req.not_found(message))
  367. self.assertEqual(HTTP_NOT_FOUND, self._status)
  368. self.assertEqual({("Content-Type", "text/plain")}, set(self._headers))
  369. def test_forbidden(self) -> None:
  370. self._req.cache_forever() # cache headers should be discarded
  371. message = "Something not found"
  372. self.assertEqual(message.encode("ascii"), self._req.forbidden(message))
  373. self.assertEqual(HTTP_FORBIDDEN, self._status)
  374. self.assertEqual({("Content-Type", "text/plain")}, set(self._headers))
  375. def test_respond_ok(self) -> None:
  376. self._req.respond()
  377. self.assertEqual([], self._headers)
  378. self.assertEqual(HTTP_OK, self._status)
  379. def test_respond(self) -> None:
  380. self._req.nocache()
  381. self._req.respond(
  382. status=402,
  383. content_type="some/type",
  384. headers=[("X-Foo", "foo"), ("X-Bar", "bar")],
  385. )
  386. self.assertEqual(
  387. {
  388. ("X-Foo", "foo"),
  389. ("X-Bar", "bar"),
  390. ("Content-Type", "some/type"),
  391. ("Expires", "Fri, 01 Jan 1980 00:00:00 GMT"),
  392. ("Pragma", "no-cache"),
  393. ("Cache-Control", "no-cache, max-age=0, must-revalidate"),
  394. },
  395. set(self._headers),
  396. )
  397. self.assertEqual(402, self._status)
  398. class HTTPGitApplicationTestCase(TestCase):
  399. def setUp(self) -> None:
  400. super().setUp()
  401. self._app = HTTPGitApplication("backend")
  402. self._environ = {
  403. "PATH_INFO": "/foo",
  404. "REQUEST_METHOD": "GET",
  405. }
  406. def _test_handler(self, req, backend, mat) -> str:
  407. # tests interface used by all handlers
  408. self.assertEqual(self._environ, req.environ)
  409. self.assertEqual("backend", backend)
  410. self.assertEqual("/foo", mat.group(0))
  411. return "output"
  412. def _add_handler(self, app) -> None:
  413. req = self._environ["REQUEST_METHOD"]
  414. app.services = {
  415. (req, re.compile("/foo$")): self._test_handler,
  416. }
  417. def test_call(self) -> None:
  418. self._add_handler(self._app)
  419. self.assertEqual("output", self._app(self._environ, None))
  420. def test_fallback_app(self) -> None:
  421. def test_app(environ, start_response) -> str:
  422. return "output"
  423. app = HTTPGitApplication("backend", fallback_app=test_app)
  424. self.assertEqual("output", app(self._environ, None))
  425. class GunzipTestCase(HTTPGitApplicationTestCase):
  426. __doc__ = """TestCase for testing the GunzipFilter, ensuring the wsgi.input
  427. is correctly decompressed and headers are corrected.
  428. """
  429. example_text = __doc__.encode("ascii")
  430. def setUp(self) -> None:
  431. super().setUp()
  432. self._app = GunzipFilter(self._app)
  433. self._environ["HTTP_CONTENT_ENCODING"] = "gzip"
  434. self._environ["REQUEST_METHOD"] = "POST"
  435. def _get_zstream(self, text):
  436. zstream = BytesIO()
  437. zfile = gzip.GzipFile(fileobj=zstream, mode="wb")
  438. zfile.write(text)
  439. zfile.close()
  440. zlength = zstream.tell()
  441. zstream.seek(0)
  442. return zstream, zlength
  443. def _test_call(self, orig, zstream, zlength) -> None:
  444. self._add_handler(self._app.app)
  445. self.assertLess(zlength, len(orig))
  446. self.assertEqual(self._environ["HTTP_CONTENT_ENCODING"], "gzip")
  447. self._environ["CONTENT_LENGTH"] = zlength
  448. self._environ["wsgi.input"] = zstream
  449. self._app(self._environ, None)
  450. buf = self._environ["wsgi.input"]
  451. self.assertIsNot(buf, zstream)
  452. buf.seek(0)
  453. self.assertEqual(orig, buf.read())
  454. self.assertIs(None, self._environ.get("CONTENT_LENGTH"))
  455. self.assertNotIn("HTTP_CONTENT_ENCODING", self._environ)
  456. def test_call(self) -> None:
  457. self._test_call(self.example_text, *self._get_zstream(self.example_text))
  458. def test_call_no_seek(self) -> None:
  459. """This ensures that the gunzipping code doesn't require any methods on
  460. 'wsgi.input' except for '.read()'. (In particular, it shouldn't
  461. require '.seek()'. See https://github.com/jelmer/dulwich/issues/140.).
  462. """
  463. zstream, zlength = self._get_zstream(self.example_text)
  464. self._test_call(
  465. self.example_text,
  466. MinimalistWSGIInputStream(zstream.read()),
  467. zlength,
  468. )
  469. def test_call_no_working_seek(self) -> None:
  470. """Similar to 'test_call_no_seek', but this time the methods are available
  471. (but defunct). See https://github.com/jonashaag/klaus/issues/154.
  472. """
  473. zstream, zlength = self._get_zstream(self.example_text)
  474. self._test_call(
  475. self.example_text,
  476. MinimalistWSGIInputStream2(zstream.read()),
  477. zlength,
  478. )