test_web.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. # test_web.py -- Tests for the git HTTP server
  2. # Copyright (C) 2010 Google, Inc.
  3. #
  4. # Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
  5. # General Public License as public by the Free Software Foundation; version 2.0
  6. # or (at your option) any later version. You can redistribute it and/or
  7. # modify it under the terms of either of these two licenses.
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. #
  15. # You should have received a copy of the licenses; if not, see
  16. # <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
  17. # and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
  18. # License, Version 2.0.
  19. #
  20. """Tests for the Git HTTP server."""
  21. import gzip
  22. import os
  23. import re
  24. from io import BytesIO
  25. from typing import Type
  26. from dulwich.tests import TestCase
  27. from ..object_store import MemoryObjectStore
  28. from ..objects import Blob
  29. from ..repo import BaseRepo, MemoryRepo
  30. from ..server import DictBackend
  31. from ..web import (
  32. HTTP_ERROR,
  33. HTTP_FORBIDDEN,
  34. HTTP_NOT_FOUND,
  35. HTTP_OK,
  36. GunzipFilter,
  37. HTTPGitApplication,
  38. HTTPGitRequest,
  39. _LengthLimitedFile,
  40. get_idx_file,
  41. get_info_packs,
  42. get_info_refs,
  43. get_loose_object,
  44. get_pack_file,
  45. get_text_file,
  46. handle_service_request,
  47. send_file,
  48. )
  49. from .utils import make_object, make_tag
  50. class MinimalistWSGIInputStream:
  51. """WSGI input stream with no 'seek()' and 'tell()' methods."""
  52. def __init__(self, data) -> None:
  53. self.data = data
  54. self.pos = 0
  55. def read(self, howmuch):
  56. start = self.pos
  57. end = self.pos + howmuch
  58. if start >= len(self.data):
  59. return b""
  60. self.pos = end
  61. return self.data[start:end]
  62. class MinimalistWSGIInputStream2(MinimalistWSGIInputStream):
  63. """WSGI input stream with no *working* 'seek()' and 'tell()' methods."""
  64. def seek(self, pos):
  65. raise NotImplementedError
  66. def tell(self):
  67. raise NotImplementedError
  68. class TestHTTPGitRequest(HTTPGitRequest):
  69. """HTTPGitRequest with overridden methods to help test caching."""
  70. def __init__(self, *args, **kwargs) -> None:
  71. HTTPGitRequest.__init__(self, *args, **kwargs)
  72. self.cached = None
  73. def nocache(self):
  74. self.cached = False
  75. def cache_forever(self):
  76. self.cached = True
  77. class WebTestCase(TestCase):
  78. """Base TestCase with useful instance vars and utility functions."""
  79. _req_class: Type[HTTPGitRequest] = TestHTTPGitRequest
  80. def setUp(self):
  81. super().setUp()
  82. self._environ = {}
  83. self._req = self._req_class(
  84. self._environ, self._start_response, handlers=self._handlers()
  85. )
  86. self._status = None
  87. self._headers = []
  88. self._output = BytesIO()
  89. def _start_response(self, status, headers):
  90. self._status = status
  91. self._headers = list(headers)
  92. return self._output.write
  93. def _handlers(self):
  94. return None
  95. def assertContentTypeEquals(self, expected):
  96. self.assertIn(("Content-Type", expected), self._headers)
  97. def _test_backend(objects, refs=None, named_files=None):
  98. if not refs:
  99. refs = {}
  100. if not named_files:
  101. named_files = {}
  102. repo = MemoryRepo.init_bare(objects, refs)
  103. for path, contents in named_files.items():
  104. repo._put_named_file(path, contents)
  105. return DictBackend({"/": repo})
  106. class DumbHandlersTestCase(WebTestCase):
  107. def test_send_file_not_found(self):
  108. list(send_file(self._req, None, "text/plain"))
  109. self.assertEqual(HTTP_NOT_FOUND, self._status)
  110. def test_send_file(self):
  111. f = BytesIO(b"foobar")
  112. output = b"".join(send_file(self._req, f, "some/thing"))
  113. self.assertEqual(b"foobar", output)
  114. self.assertEqual(HTTP_OK, self._status)
  115. self.assertContentTypeEquals("some/thing")
  116. self.assertTrue(f.closed)
  117. def test_send_file_buffered(self):
  118. bufsize = 10240
  119. xs = b"x" * bufsize
  120. f = BytesIO(2 * xs)
  121. self.assertEqual([xs, xs], list(send_file(self._req, f, "some/thing")))
  122. self.assertEqual(HTTP_OK, self._status)
  123. self.assertContentTypeEquals("some/thing")
  124. self.assertTrue(f.closed)
  125. def test_send_file_error(self):
  126. class TestFile:
  127. def __init__(self, exc_class) -> None:
  128. self.closed = False
  129. self._exc_class = exc_class
  130. def read(self, size=-1):
  131. raise self._exc_class
  132. def close(self):
  133. self.closed = True
  134. f = TestFile(IOError)
  135. list(send_file(self._req, f, "some/thing"))
  136. self.assertEqual(HTTP_ERROR, self._status)
  137. self.assertTrue(f.closed)
  138. self.assertFalse(self._req.cached)
  139. # non-IOErrors are reraised
  140. f = TestFile(AttributeError)
  141. self.assertRaises(AttributeError, list, send_file(self._req, f, "some/thing"))
  142. self.assertTrue(f.closed)
  143. self.assertFalse(self._req.cached)
  144. def test_get_text_file(self):
  145. backend = _test_backend([], named_files={"description": b"foo"})
  146. mat = re.search(".*", "description")
  147. output = b"".join(get_text_file(self._req, backend, mat))
  148. self.assertEqual(b"foo", output)
  149. self.assertEqual(HTTP_OK, self._status)
  150. self.assertContentTypeEquals("text/plain")
  151. self.assertFalse(self._req.cached)
  152. def test_get_loose_object(self):
  153. blob = make_object(Blob, data=b"foo")
  154. backend = _test_backend([blob])
  155. mat = re.search("^(..)(.{38})$", blob.id.decode("ascii"))
  156. output = b"".join(get_loose_object(self._req, backend, mat))
  157. self.assertEqual(blob.as_legacy_object(), output)
  158. self.assertEqual(HTTP_OK, self._status)
  159. self.assertContentTypeEquals("application/x-git-loose-object")
  160. self.assertTrue(self._req.cached)
  161. def test_get_loose_object_missing(self):
  162. mat = re.search("^(..)(.{38})$", "1" * 40)
  163. list(get_loose_object(self._req, _test_backend([]), mat))
  164. self.assertEqual(HTTP_NOT_FOUND, self._status)
  165. def test_get_loose_object_error(self):
  166. blob = make_object(Blob, data=b"foo")
  167. backend = _test_backend([blob])
  168. mat = re.search("^(..)(.{38})$", blob.id.decode("ascii"))
  169. def as_legacy_object_error(self):
  170. raise OSError
  171. self.addCleanup(setattr, Blob, "as_legacy_object", Blob.as_legacy_object)
  172. Blob.as_legacy_object = as_legacy_object_error
  173. list(get_loose_object(self._req, backend, mat))
  174. self.assertEqual(HTTP_ERROR, self._status)
  175. def test_get_pack_file(self):
  176. pack_name = os.path.join("objects", "pack", "pack-%s.pack" % ("1" * 40))
  177. backend = _test_backend([], named_files={pack_name: b"pack contents"})
  178. mat = re.search(".*", pack_name)
  179. output = b"".join(get_pack_file(self._req, backend, mat))
  180. self.assertEqual(b"pack contents", output)
  181. self.assertEqual(HTTP_OK, self._status)
  182. self.assertContentTypeEquals("application/x-git-packed-objects")
  183. self.assertTrue(self._req.cached)
  184. def test_get_idx_file(self):
  185. idx_name = os.path.join("objects", "pack", "pack-%s.idx" % ("1" * 40))
  186. backend = _test_backend([], named_files={idx_name: b"idx contents"})
  187. mat = re.search(".*", idx_name)
  188. output = b"".join(get_idx_file(self._req, backend, mat))
  189. self.assertEqual(b"idx contents", output)
  190. self.assertEqual(HTTP_OK, self._status)
  191. self.assertContentTypeEquals("application/x-git-packed-objects-toc")
  192. self.assertTrue(self._req.cached)
  193. def test_get_info_refs(self):
  194. self._environ["QUERY_STRING"] = ""
  195. blob1 = make_object(Blob, data=b"1")
  196. blob2 = make_object(Blob, data=b"2")
  197. blob3 = make_object(Blob, data=b"3")
  198. tag1 = make_tag(blob2, name=b"tag-tag")
  199. objects = [blob1, blob2, blob3, tag1]
  200. refs = {
  201. b"HEAD": b"000",
  202. b"refs/heads/master": blob1.id,
  203. b"refs/tags/tag-tag": tag1.id,
  204. b"refs/tags/blob-tag": blob3.id,
  205. }
  206. backend = _test_backend(objects, refs=refs)
  207. mat = re.search(".*", "//info/refs")
  208. self.assertEqual(
  209. [
  210. blob1.id + b"\trefs/heads/master\n",
  211. blob3.id + b"\trefs/tags/blob-tag\n",
  212. tag1.id + b"\trefs/tags/tag-tag\n",
  213. blob2.id + b"\trefs/tags/tag-tag^{}\n",
  214. ],
  215. list(get_info_refs(self._req, backend, mat)),
  216. )
  217. self.assertEqual(HTTP_OK, self._status)
  218. self.assertContentTypeEquals("text/plain")
  219. self.assertFalse(self._req.cached)
  220. def test_get_info_refs_not_found(self):
  221. self._environ["QUERY_STRING"] = ""
  222. objects = []
  223. refs = {}
  224. backend = _test_backend(objects, refs=refs)
  225. mat = re.search("info/refs", "/foo/info/refs")
  226. self.assertEqual(
  227. [b"No git repository was found at /foo"],
  228. list(get_info_refs(self._req, backend, mat)),
  229. )
  230. self.assertEqual(HTTP_NOT_FOUND, self._status)
  231. self.assertContentTypeEquals("text/plain")
  232. def test_get_info_packs(self):
  233. class TestPackData:
  234. def __init__(self, sha) -> None:
  235. self.filename = "pack-%s.pack" % sha
  236. class TestPack:
  237. def __init__(self, sha) -> None:
  238. self.data = TestPackData(sha)
  239. packs = [TestPack(str(i) * 40) for i in range(1, 4)]
  240. class TestObjectStore(MemoryObjectStore):
  241. # property must be overridden, can't be assigned
  242. @property
  243. def packs(self):
  244. return packs
  245. store = TestObjectStore()
  246. repo = BaseRepo(store, None)
  247. backend = DictBackend({"/": repo})
  248. mat = re.search(".*", "//info/packs")
  249. output = b"".join(get_info_packs(self._req, backend, mat))
  250. expected = b"".join(
  251. [(b"P pack-" + s + b".pack\n") for s in [b"1" * 40, b"2" * 40, b"3" * 40]]
  252. )
  253. self.assertEqual(expected, output)
  254. self.assertEqual(HTTP_OK, self._status)
  255. self.assertContentTypeEquals("text/plain")
  256. self.assertFalse(self._req.cached)
  257. class SmartHandlersTestCase(WebTestCase):
  258. class _TestUploadPackHandler:
  259. def __init__(
  260. self,
  261. backend,
  262. args,
  263. proto,
  264. stateless_rpc=None,
  265. advertise_refs=False,
  266. ) -> None:
  267. self.args = args
  268. self.proto = proto
  269. self.stateless_rpc = stateless_rpc
  270. self.advertise_refs = advertise_refs
  271. def handle(self):
  272. self.proto.write(b"handled input: " + self.proto.recv(1024))
  273. def _make_handler(self, *args, **kwargs):
  274. self._handler = self._TestUploadPackHandler(*args, **kwargs)
  275. return self._handler
  276. def _handlers(self):
  277. return {b"git-upload-pack": self._make_handler}
  278. def test_handle_service_request_unknown(self):
  279. mat = re.search(".*", "/git-evil-handler")
  280. content = list(handle_service_request(self._req, "backend", mat))
  281. self.assertEqual(HTTP_FORBIDDEN, self._status)
  282. self.assertNotIn(b"git-evil-handler", b"".join(content))
  283. self.assertFalse(self._req.cached)
  284. def _run_handle_service_request(self, content_length=None):
  285. self._environ["wsgi.input"] = BytesIO(b"foo")
  286. if content_length is not None:
  287. self._environ["CONTENT_LENGTH"] = content_length
  288. mat = re.search(".*", "/git-upload-pack")
  289. class Backend:
  290. def open_repository(self, path):
  291. return None
  292. handler_output = b"".join(handle_service_request(self._req, Backend(), mat))
  293. write_output = self._output.getvalue()
  294. # Ensure all output was written via the write callback.
  295. self.assertEqual(b"", handler_output)
  296. self.assertEqual(b"handled input: foo", write_output)
  297. self.assertContentTypeEquals("application/x-git-upload-pack-result")
  298. self.assertFalse(self._handler.advertise_refs)
  299. self.assertTrue(self._handler.stateless_rpc)
  300. self.assertFalse(self._req.cached)
  301. def test_handle_service_request(self):
  302. self._run_handle_service_request()
  303. def test_handle_service_request_with_length(self):
  304. self._run_handle_service_request(content_length="3")
  305. def test_handle_service_request_empty_length(self):
  306. self._run_handle_service_request(content_length="")
  307. def test_get_info_refs_unknown(self):
  308. self._environ["QUERY_STRING"] = "service=git-evil-handler"
  309. class Backend:
  310. def open_repository(self, url):
  311. return None
  312. mat = re.search(".*", "/git-evil-pack")
  313. content = list(get_info_refs(self._req, Backend(), mat))
  314. self.assertNotIn(b"git-evil-handler", b"".join(content))
  315. self.assertEqual(HTTP_FORBIDDEN, self._status)
  316. self.assertFalse(self._req.cached)
  317. def test_get_info_refs(self):
  318. self._environ["wsgi.input"] = BytesIO(b"foo")
  319. self._environ["QUERY_STRING"] = "service=git-upload-pack"
  320. class Backend:
  321. def open_repository(self, url):
  322. return None
  323. mat = re.search(".*", "/git-upload-pack")
  324. handler_output = b"".join(get_info_refs(self._req, Backend(), mat))
  325. write_output = self._output.getvalue()
  326. self.assertEqual(
  327. (
  328. b"001e# service=git-upload-pack\n"
  329. b"0000"
  330. # input is ignored by the handler
  331. b"handled input: "
  332. ),
  333. write_output,
  334. )
  335. # Ensure all output was written via the write callback.
  336. self.assertEqual(b"", handler_output)
  337. self.assertTrue(self._handler.advertise_refs)
  338. self.assertTrue(self._handler.stateless_rpc)
  339. self.assertFalse(self._req.cached)
  340. class LengthLimitedFileTestCase(TestCase):
  341. def test_no_cutoff(self):
  342. f = _LengthLimitedFile(BytesIO(b"foobar"), 1024)
  343. self.assertEqual(b"foobar", f.read())
  344. def test_cutoff(self):
  345. f = _LengthLimitedFile(BytesIO(b"foobar"), 3)
  346. self.assertEqual(b"foo", f.read())
  347. self.assertEqual(b"", f.read())
  348. def test_multiple_reads(self):
  349. f = _LengthLimitedFile(BytesIO(b"foobar"), 3)
  350. self.assertEqual(b"fo", f.read(2))
  351. self.assertEqual(b"o", f.read(2))
  352. self.assertEqual(b"", f.read())
  353. class HTTPGitRequestTestCase(WebTestCase):
  354. # This class tests the contents of the actual cache headers
  355. _req_class = HTTPGitRequest
  356. def test_not_found(self):
  357. self._req.cache_forever() # cache headers should be discarded
  358. message = "Something not found"
  359. self.assertEqual(message.encode("ascii"), self._req.not_found(message))
  360. self.assertEqual(HTTP_NOT_FOUND, self._status)
  361. self.assertEqual({("Content-Type", "text/plain")}, set(self._headers))
  362. def test_forbidden(self):
  363. self._req.cache_forever() # cache headers should be discarded
  364. message = "Something not found"
  365. self.assertEqual(message.encode("ascii"), self._req.forbidden(message))
  366. self.assertEqual(HTTP_FORBIDDEN, self._status)
  367. self.assertEqual({("Content-Type", "text/plain")}, set(self._headers))
  368. def test_respond_ok(self):
  369. self._req.respond()
  370. self.assertEqual([], self._headers)
  371. self.assertEqual(HTTP_OK, self._status)
  372. def test_respond(self):
  373. self._req.nocache()
  374. self._req.respond(
  375. status=402,
  376. content_type="some/type",
  377. headers=[("X-Foo", "foo"), ("X-Bar", "bar")],
  378. )
  379. self.assertEqual(
  380. {
  381. ("X-Foo", "foo"),
  382. ("X-Bar", "bar"),
  383. ("Content-Type", "some/type"),
  384. ("Expires", "Fri, 01 Jan 1980 00:00:00 GMT"),
  385. ("Pragma", "no-cache"),
  386. ("Cache-Control", "no-cache, max-age=0, must-revalidate"),
  387. },
  388. set(self._headers),
  389. )
  390. self.assertEqual(402, self._status)
  391. class HTTPGitApplicationTestCase(TestCase):
  392. def setUp(self):
  393. super().setUp()
  394. self._app = HTTPGitApplication("backend")
  395. self._environ = {
  396. "PATH_INFO": "/foo",
  397. "REQUEST_METHOD": "GET",
  398. }
  399. def _test_handler(self, req, backend, mat):
  400. # tests interface used by all handlers
  401. self.assertEqual(self._environ, req.environ)
  402. self.assertEqual("backend", backend)
  403. self.assertEqual("/foo", mat.group(0))
  404. return "output"
  405. def _add_handler(self, app):
  406. req = self._environ["REQUEST_METHOD"]
  407. app.services = {
  408. (req, re.compile("/foo$")): self._test_handler,
  409. }
  410. def test_call(self):
  411. self._add_handler(self._app)
  412. self.assertEqual("output", self._app(self._environ, None))
  413. def test_fallback_app(self):
  414. def test_app(environ, start_response):
  415. return "output"
  416. app = HTTPGitApplication("backend", fallback_app=test_app)
  417. self.assertEqual("output", app(self._environ, None))
  418. class GunzipTestCase(HTTPGitApplicationTestCase):
  419. __doc__ = """TestCase for testing the GunzipFilter, ensuring the wsgi.input
  420. is correctly decompressed and headers are corrected.
  421. """
  422. example_text = __doc__.encode("ascii")
  423. def setUp(self):
  424. super().setUp()
  425. self._app = GunzipFilter(self._app)
  426. self._environ["HTTP_CONTENT_ENCODING"] = "gzip"
  427. self._environ["REQUEST_METHOD"] = "POST"
  428. def _get_zstream(self, text):
  429. zstream = BytesIO()
  430. zfile = gzip.GzipFile(fileobj=zstream, mode="wb")
  431. zfile.write(text)
  432. zfile.close()
  433. zlength = zstream.tell()
  434. zstream.seek(0)
  435. return zstream, zlength
  436. def _test_call(self, orig, zstream, zlength):
  437. self._add_handler(self._app.app)
  438. self.assertLess(zlength, len(orig))
  439. self.assertEqual(self._environ["HTTP_CONTENT_ENCODING"], "gzip")
  440. self._environ["CONTENT_LENGTH"] = zlength
  441. self._environ["wsgi.input"] = zstream
  442. self._app(self._environ, None)
  443. buf = self._environ["wsgi.input"]
  444. self.assertIsNot(buf, zstream)
  445. buf.seek(0)
  446. self.assertEqual(orig, buf.read())
  447. self.assertIs(None, self._environ.get("CONTENT_LENGTH"))
  448. self.assertNotIn("HTTP_CONTENT_ENCODING", self._environ)
  449. def test_call(self):
  450. self._test_call(self.example_text, *self._get_zstream(self.example_text))
  451. def test_call_no_seek(self):
  452. """This ensures that the gunzipping code doesn't require any methods on
  453. 'wsgi.input' except for '.read()'. (In particular, it shouldn't
  454. require '.seek()'. See https://github.com/jelmer/dulwich/issues/140.).
  455. """
  456. zstream, zlength = self._get_zstream(self.example_text)
  457. self._test_call(
  458. self.example_text,
  459. MinimalistWSGIInputStream(zstream.read()),
  460. zlength,
  461. )
  462. def test_call_no_working_seek(self):
  463. """Similar to 'test_call_no_seek', but this time the methods are available
  464. (but defunct). See https://github.com/jonashaag/klaus/issues/154.
  465. """
  466. zstream, zlength = self._get_zstream(self.example_text)
  467. self._test_call(
  468. self.example_text,
  469. MinimalistWSGIInputStream2(zstream.read()),
  470. zlength,
  471. )