2
0

test_web.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. # test_web.py -- Tests for the git HTTP server
  2. # Copyright (C) 2010 Google, Inc.
  3. #
  4. # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
  5. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  6. # General Public License as public by the Free Software Foundation; version 2.0
  7. # or (at your option) any later version. You can redistribute it and/or
  8. # modify it under the terms of either of these two licenses.
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. # You should have received a copy of the licenses; if not, see
  17. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  18. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  19. # License, Version 2.0.
  20. #
  21. """Tests for the Git HTTP server."""
  22. import gzip
  23. import os
  24. import re
  25. from io import BytesIO
  26. from typing import NoReturn
  27. from dulwich.object_store import MemoryObjectStore
  28. from dulwich.objects import Blob
  29. from dulwich.repo import BaseRepo, MemoryRepo
  30. from dulwich.server import DictBackend
  31. from dulwich.tests.utils import make_object, make_tag
  32. from dulwich.web import (
  33. HTTP_ERROR,
  34. HTTP_FORBIDDEN,
  35. HTTP_NOT_FOUND,
  36. HTTP_OK,
  37. GunzipFilter,
  38. HTTPGitApplication,
  39. HTTPGitRequest,
  40. _LengthLimitedFile,
  41. get_idx_file,
  42. get_info_packs,
  43. get_info_refs,
  44. get_loose_object,
  45. get_pack_file,
  46. get_text_file,
  47. handle_service_request,
  48. send_file,
  49. )
  50. from . import TestCase
  51. class MinimalistWSGIInputStream:
  52. """WSGI input stream with no 'seek()' and 'tell()' methods."""
  53. def __init__(self, data) -> None:
  54. self.data = data
  55. self.pos = 0
  56. def read(self, howmuch):
  57. start = self.pos
  58. end = self.pos + howmuch
  59. if start >= len(self.data):
  60. return b""
  61. self.pos = end
  62. return self.data[start:end]
  63. class MinimalistWSGIInputStream2(MinimalistWSGIInputStream):
  64. """WSGI input stream with no *working* 'seek()' and 'tell()' methods."""
  65. def seek(self, pos) -> NoReturn:
  66. raise NotImplementedError
  67. def tell(self) -> NoReturn:
  68. raise NotImplementedError
  69. class TestHTTPGitRequest(HTTPGitRequest):
  70. """HTTPGitRequest with overridden methods to help test caching."""
  71. def __init__(self, *args, **kwargs) -> None:
  72. HTTPGitRequest.__init__(self, *args, **kwargs)
  73. self.cached = None
  74. def nocache(self) -> None:
  75. self.cached = False
  76. def cache_forever(self) -> None:
  77. self.cached = True
  78. class WebTestCase(TestCase):
  79. """Base TestCase with useful instance vars and utility functions."""
  80. _req_class: type[HTTPGitRequest] = TestHTTPGitRequest
  81. def setUp(self) -> None:
  82. super().setUp()
  83. self._environ = {}
  84. self._req = self._req_class(
  85. self._environ, self._start_response, handlers=self._handlers()
  86. )
  87. self._status = None
  88. self._headers = []
  89. self._output = BytesIO()
  90. def _start_response(self, status, headers):
  91. self._status = status
  92. self._headers = list(headers)
  93. return self._output.write
  94. def _handlers(self) -> None:
  95. return None
  96. def assertContentTypeEquals(self, expected) -> None:
  97. self.assertIn(("Content-Type", expected), self._headers)
  98. def _test_backend(objects, refs=None, named_files=None):
  99. if not refs:
  100. refs = {}
  101. if not named_files:
  102. named_files = {}
  103. repo = MemoryRepo.init_bare(objects, refs)
  104. for path, contents in named_files.items():
  105. repo._put_named_file(path, contents)
  106. return DictBackend({"/": repo})
  107. class DumbHandlersTestCase(WebTestCase):
  108. def test_send_file_not_found(self) -> None:
  109. list(send_file(self._req, None, "text/plain"))
  110. self.assertEqual(HTTP_NOT_FOUND, self._status)
  111. def test_send_file(self) -> None:
  112. f = BytesIO(b"foobar")
  113. output = b"".join(send_file(self._req, f, "some/thing"))
  114. self.assertEqual(b"foobar", output)
  115. self.assertEqual(HTTP_OK, self._status)
  116. self.assertContentTypeEquals("some/thing")
  117. self.assertTrue(f.closed)
  118. def test_send_file_buffered(self) -> None:
  119. bufsize = 10240
  120. xs = b"x" * bufsize
  121. f = BytesIO(2 * xs)
  122. self.assertEqual([xs, xs], list(send_file(self._req, f, "some/thing")))
  123. self.assertEqual(HTTP_OK, self._status)
  124. self.assertContentTypeEquals("some/thing")
  125. self.assertTrue(f.closed)
  126. def test_send_file_error(self) -> None:
  127. class TestFile:
  128. def __init__(self, exc_class) -> None:
  129. self.closed = False
  130. self._exc_class = exc_class
  131. def read(self, size=-1) -> NoReturn:
  132. raise self._exc_class
  133. def close(self) -> None:
  134. self.closed = True
  135. f = TestFile(IOError)
  136. list(send_file(self._req, f, "some/thing"))
  137. self.assertEqual(HTTP_ERROR, self._status)
  138. self.assertTrue(f.closed)
  139. self.assertFalse(self._req.cached)
  140. # non-IOErrors are reraised
  141. f = TestFile(AttributeError)
  142. self.assertRaises(AttributeError, list, send_file(self._req, f, "some/thing"))
  143. self.assertTrue(f.closed)
  144. self.assertFalse(self._req.cached)
  145. def test_get_text_file(self) -> None:
  146. backend = _test_backend([], named_files={"description": b"foo"})
  147. mat = re.search(".*", "description")
  148. output = b"".join(get_text_file(self._req, backend, mat))
  149. self.assertEqual(b"foo", output)
  150. self.assertEqual(HTTP_OK, self._status)
  151. self.assertContentTypeEquals("text/plain")
  152. self.assertFalse(self._req.cached)
  153. def test_get_loose_object(self) -> None:
  154. blob = make_object(Blob, data=b"foo")
  155. backend = _test_backend([blob])
  156. mat = re.search("^(..)(.{38})$", blob.id.decode("ascii"))
  157. output = b"".join(get_loose_object(self._req, backend, mat))
  158. self.assertEqual(blob.as_legacy_object(), output)
  159. self.assertEqual(HTTP_OK, self._status)
  160. self.assertContentTypeEquals("application/x-git-loose-object")
  161. self.assertTrue(self._req.cached)
  162. def test_get_loose_object_missing(self) -> None:
  163. mat = re.search("^(..)(.{38})$", "1" * 40)
  164. list(get_loose_object(self._req, _test_backend([]), mat))
  165. self.assertEqual(HTTP_NOT_FOUND, self._status)
  166. def test_get_loose_object_error(self) -> None:
  167. blob = make_object(Blob, data=b"foo")
  168. backend = _test_backend([blob])
  169. mat = re.search("^(..)(.{38})$", blob.id.decode("ascii"))
  170. def as_legacy_object_error(self) -> NoReturn:
  171. raise OSError
  172. self.addCleanup(setattr, Blob, "as_legacy_object", Blob.as_legacy_object)
  173. Blob.as_legacy_object = as_legacy_object_error
  174. list(get_loose_object(self._req, backend, mat))
  175. self.assertEqual(HTTP_ERROR, self._status)
  176. def test_get_pack_file(self) -> None:
  177. pack_name = os.path.join("objects", "pack", "pack-%s.pack" % ("1" * 40))
  178. backend = _test_backend([], named_files={pack_name: b"pack contents"})
  179. mat = re.search(".*", pack_name)
  180. output = b"".join(get_pack_file(self._req, backend, mat))
  181. self.assertEqual(b"pack contents", output)
  182. self.assertEqual(HTTP_OK, self._status)
  183. self.assertContentTypeEquals("application/x-git-packed-objects")
  184. self.assertTrue(self._req.cached)
  185. def test_get_idx_file(self) -> None:
  186. idx_name = os.path.join("objects", "pack", "pack-%s.idx" % ("1" * 40))
  187. backend = _test_backend([], named_files={idx_name: b"idx contents"})
  188. mat = re.search(".*", idx_name)
  189. output = b"".join(get_idx_file(self._req, backend, mat))
  190. self.assertEqual(b"idx contents", output)
  191. self.assertEqual(HTTP_OK, self._status)
  192. self.assertContentTypeEquals("application/x-git-packed-objects-toc")
  193. self.assertTrue(self._req.cached)
  194. def test_get_info_refs(self) -> None:
  195. self._environ["QUERY_STRING"] = ""
  196. blob1 = make_object(Blob, data=b"1")
  197. blob2 = make_object(Blob, data=b"2")
  198. blob3 = make_object(Blob, data=b"3")
  199. tag1 = make_tag(blob2, name=b"tag-tag")
  200. objects = [blob1, blob2, blob3, tag1]
  201. refs = {
  202. b"HEAD": b"000",
  203. b"refs/heads/master": blob1.id,
  204. b"refs/tags/tag-tag": tag1.id,
  205. b"refs/tags/blob-tag": blob3.id,
  206. }
  207. backend = _test_backend(objects, refs=refs)
  208. mat = re.search(".*", "//info/refs")
  209. self.assertEqual(
  210. [
  211. blob1.id + b"\trefs/heads/master\n",
  212. blob3.id + b"\trefs/tags/blob-tag\n",
  213. tag1.id + b"\trefs/tags/tag-tag\n",
  214. blob2.id + b"\trefs/tags/tag-tag^{}\n",
  215. ],
  216. list(get_info_refs(self._req, backend, mat)),
  217. )
  218. self.assertEqual(HTTP_OK, self._status)
  219. self.assertContentTypeEquals("text/plain")
  220. self.assertFalse(self._req.cached)
  221. def test_get_info_refs_not_found(self) -> None:
  222. self._environ["QUERY_STRING"] = ""
  223. objects = []
  224. refs = {}
  225. backend = _test_backend(objects, refs=refs)
  226. mat = re.search("info/refs", "/foo/info/refs")
  227. self.assertEqual(
  228. [b"No git repository was found at /foo"],
  229. list(get_info_refs(self._req, backend, mat)),
  230. )
  231. self.assertEqual(HTTP_NOT_FOUND, self._status)
  232. self.assertContentTypeEquals("text/plain")
  233. def test_get_info_packs(self) -> None:
  234. class TestPackData:
  235. def __init__(self, sha) -> None:
  236. self.filename = f"pack-{sha}.pack"
  237. class TestPack:
  238. def __init__(self, sha) -> None:
  239. self.data = TestPackData(sha)
  240. packs = [TestPack(str(i) * 40) for i in range(1, 4)]
  241. class TestObjectStore(MemoryObjectStore):
  242. # property must be overridden, can't be assigned
  243. @property
  244. def packs(self):
  245. return packs
  246. store = TestObjectStore()
  247. repo = BaseRepo(store, None)
  248. backend = DictBackend({"/": repo})
  249. mat = re.search(".*", "//info/packs")
  250. output = b"".join(get_info_packs(self._req, backend, mat))
  251. expected = b"".join(
  252. [(b"P pack-" + s + b".pack\n") for s in [b"1" * 40, b"2" * 40, b"3" * 40]]
  253. )
  254. self.assertEqual(expected, output)
  255. self.assertEqual(HTTP_OK, self._status)
  256. self.assertContentTypeEquals("text/plain")
  257. self.assertFalse(self._req.cached)
  258. class SmartHandlersTestCase(WebTestCase):
  259. class _TestUploadPackHandler:
  260. def __init__(
  261. self,
  262. backend,
  263. args,
  264. proto,
  265. stateless_rpc=None,
  266. advertise_refs=False,
  267. ) -> None:
  268. self.args = args
  269. self.proto = proto
  270. self.stateless_rpc = stateless_rpc
  271. self.advertise_refs = advertise_refs
  272. def handle(self) -> None:
  273. self.proto.write(b"handled input: " + self.proto.recv(1024))
  274. def _make_handler(self, *args, **kwargs):
  275. self._handler = self._TestUploadPackHandler(*args, **kwargs)
  276. return self._handler
  277. def _handlers(self):
  278. return {b"git-upload-pack": self._make_handler}
  279. def test_handle_service_request_unknown(self) -> None:
  280. mat = re.search(".*", "/git-evil-handler")
  281. content = list(handle_service_request(self._req, "backend", mat))
  282. self.assertEqual(HTTP_FORBIDDEN, self._status)
  283. self.assertNotIn(b"git-evil-handler", b"".join(content))
  284. self.assertFalse(self._req.cached)
  285. def _run_handle_service_request(self, content_length=None) -> None:
  286. self._environ["wsgi.input"] = BytesIO(b"foo")
  287. if content_length is not None:
  288. self._environ["CONTENT_LENGTH"] = content_length
  289. mat = re.search(".*", "/git-upload-pack")
  290. class Backend:
  291. def open_repository(self, path) -> None:
  292. return None
  293. handler_output = b"".join(handle_service_request(self._req, Backend(), mat))
  294. write_output = self._output.getvalue()
  295. # Ensure all output was written via the write callback.
  296. self.assertEqual(b"", handler_output)
  297. self.assertEqual(b"handled input: foo", write_output)
  298. self.assertContentTypeEquals("application/x-git-upload-pack-result")
  299. self.assertFalse(self._handler.advertise_refs)
  300. self.assertTrue(self._handler.stateless_rpc)
  301. self.assertFalse(self._req.cached)
  302. def test_handle_service_request(self) -> None:
  303. self._run_handle_service_request()
  304. def test_handle_service_request_with_length(self) -> None:
  305. self._run_handle_service_request(content_length="3")
  306. def test_handle_service_request_empty_length(self) -> None:
  307. self._run_handle_service_request(content_length="")
  308. def test_get_info_refs_unknown(self) -> None:
  309. self._environ["QUERY_STRING"] = "service=git-evil-handler"
  310. class Backend:
  311. def open_repository(self, url) -> None:
  312. return None
  313. mat = re.search(".*", "/git-evil-pack")
  314. content = list(get_info_refs(self._req, Backend(), mat))
  315. self.assertNotIn(b"git-evil-handler", b"".join(content))
  316. self.assertEqual(HTTP_FORBIDDEN, self._status)
  317. self.assertFalse(self._req.cached)
  318. def test_get_info_refs(self) -> None:
  319. self._environ["wsgi.input"] = BytesIO(b"foo")
  320. self._environ["QUERY_STRING"] = "service=git-upload-pack"
  321. class Backend:
  322. def open_repository(self, url) -> None:
  323. return None
  324. mat = re.search(".*", "/git-upload-pack")
  325. handler_output = b"".join(get_info_refs(self._req, Backend(), mat))
  326. write_output = self._output.getvalue()
  327. self.assertEqual(
  328. (
  329. b"001e# service=git-upload-pack\n"
  330. b"0000"
  331. # input is ignored by the handler
  332. b"handled input: "
  333. ),
  334. write_output,
  335. )
  336. # Ensure all output was written via the write callback.
  337. self.assertEqual(b"", handler_output)
  338. self.assertTrue(self._handler.advertise_refs)
  339. self.assertTrue(self._handler.stateless_rpc)
  340. self.assertFalse(self._req.cached)
  341. class LengthLimitedFileTestCase(TestCase):
  342. def test_no_cutoff(self) -> None:
  343. f = _LengthLimitedFile(BytesIO(b"foobar"), 1024)
  344. self.assertEqual(b"foobar", f.read())
  345. def test_cutoff(self) -> None:
  346. f = _LengthLimitedFile(BytesIO(b"foobar"), 3)
  347. self.assertEqual(b"foo", f.read())
  348. self.assertEqual(b"", f.read())
  349. def test_multiple_reads(self) -> None:
  350. f = _LengthLimitedFile(BytesIO(b"foobar"), 3)
  351. self.assertEqual(b"fo", f.read(2))
  352. self.assertEqual(b"o", f.read(2))
  353. self.assertEqual(b"", f.read())
  354. class HTTPGitRequestTestCase(WebTestCase):
  355. # This class tests the contents of the actual cache headers
  356. _req_class = HTTPGitRequest
  357. def test_not_found(self) -> None:
  358. self._req.cache_forever() # cache headers should be discarded
  359. message = "Something not found"
  360. self.assertEqual(message.encode("ascii"), self._req.not_found(message))
  361. self.assertEqual(HTTP_NOT_FOUND, self._status)
  362. self.assertEqual({("Content-Type", "text/plain")}, set(self._headers))
  363. def test_forbidden(self) -> None:
  364. self._req.cache_forever() # cache headers should be discarded
  365. message = "Something not found"
  366. self.assertEqual(message.encode("ascii"), self._req.forbidden(message))
  367. self.assertEqual(HTTP_FORBIDDEN, self._status)
  368. self.assertEqual({("Content-Type", "text/plain")}, set(self._headers))
  369. def test_respond_ok(self) -> None:
  370. self._req.respond()
  371. self.assertEqual([], self._headers)
  372. self.assertEqual(HTTP_OK, self._status)
  373. def test_respond(self) -> None:
  374. self._req.nocache()
  375. self._req.respond(
  376. status=402,
  377. content_type="some/type",
  378. headers=[("X-Foo", "foo"), ("X-Bar", "bar")],
  379. )
  380. self.assertEqual(
  381. {
  382. ("X-Foo", "foo"),
  383. ("X-Bar", "bar"),
  384. ("Content-Type", "some/type"),
  385. ("Expires", "Fri, 01 Jan 1980 00:00:00 GMT"),
  386. ("Pragma", "no-cache"),
  387. ("Cache-Control", "no-cache, max-age=0, must-revalidate"),
  388. },
  389. set(self._headers),
  390. )
  391. self.assertEqual(402, self._status)
  392. class HTTPGitApplicationTestCase(TestCase):
  393. def setUp(self) -> None:
  394. super().setUp()
  395. self._app = HTTPGitApplication("backend")
  396. self._environ = {
  397. "PATH_INFO": "/foo",
  398. "REQUEST_METHOD": "GET",
  399. }
  400. def _test_handler(self, req, backend, mat) -> str:
  401. # tests interface used by all handlers
  402. self.assertEqual(self._environ, req.environ)
  403. self.assertEqual("backend", backend)
  404. self.assertEqual("/foo", mat.group(0))
  405. return "output"
  406. def _add_handler(self, app) -> None:
  407. req = self._environ["REQUEST_METHOD"]
  408. app.services = {
  409. (req, re.compile("/foo$")): self._test_handler,
  410. }
  411. def test_call(self) -> None:
  412. self._add_handler(self._app)
  413. self.assertEqual("output", self._app(self._environ, None))
  414. def test_fallback_app(self) -> None:
  415. def test_app(environ, start_response) -> str:
  416. return "output"
  417. app = HTTPGitApplication("backend", fallback_app=test_app)
  418. self.assertEqual("output", app(self._environ, None))
  419. class GunzipTestCase(HTTPGitApplicationTestCase):
  420. __doc__ = """TestCase for testing the GunzipFilter, ensuring the wsgi.input
  421. is correctly decompressed and headers are corrected.
  422. """
  423. example_text = __doc__.encode("ascii")
  424. def setUp(self) -> None:
  425. super().setUp()
  426. self._app = GunzipFilter(self._app)
  427. self._environ["HTTP_CONTENT_ENCODING"] = "gzip"
  428. self._environ["REQUEST_METHOD"] = "POST"
  429. def _get_zstream(self, text):
  430. zstream = BytesIO()
  431. zfile = gzip.GzipFile(fileobj=zstream, mode="wb")
  432. zfile.write(text)
  433. zfile.close()
  434. zlength = zstream.tell()
  435. zstream.seek(0)
  436. return zstream, zlength
  437. def _test_call(self, orig, zstream, zlength) -> None:
  438. self._add_handler(self._app.app)
  439. self.assertLess(zlength, len(orig))
  440. self.assertEqual(self._environ["HTTP_CONTENT_ENCODING"], "gzip")
  441. self._environ["CONTENT_LENGTH"] = zlength
  442. self._environ["wsgi.input"] = zstream
  443. self._app(self._environ, None)
  444. buf = self._environ["wsgi.input"]
  445. self.assertIsNot(buf, zstream)
  446. buf.seek(0)
  447. self.assertEqual(orig, buf.read())
  448. self.assertIs(None, self._environ.get("CONTENT_LENGTH"))
  449. self.assertNotIn("HTTP_CONTENT_ENCODING", self._environ)
  450. def test_call(self) -> None:
  451. self._test_call(self.example_text, *self._get_zstream(self.example_text))
  452. def test_call_no_seek(self) -> None:
  453. """This ensures that the gunzipping code doesn't require any methods on
  454. 'wsgi.input' except for '.read()'. (In particular, it shouldn't
  455. require '.seek()'. See https://github.com/jelmer/dulwich/issues/140.).
  456. """
  457. zstream, zlength = self._get_zstream(self.example_text)
  458. self._test_call(
  459. self.example_text,
  460. MinimalistWSGIInputStream(zstream.read()),
  461. zlength,
  462. )
  463. def test_call_no_working_seek(self) -> None:
  464. """Similar to 'test_call_no_seek', but this time the methods are available
  465. (but defunct). See https://github.com/jonashaag/klaus/issues/154.
  466. """
  467. zstream, zlength = self._get_zstream(self.example_text)
  468. self._test_call(
  469. self.example_text,
  470. MinimalistWSGIInputStream2(zstream.read()),
  471. zlength,
  472. )