test_web.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. # test_web.py -- Tests for the git HTTP server
  2. # Copyright (C) 2010 Google, Inc.
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # or (at your option) any later version of the License.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  17. # MA 02110-1301, USA.
  18. """Tests for the Git HTTP server."""
  19. from cStringIO import StringIO
  20. import re
  21. from dulwich.object_store import (
  22. MemoryObjectStore,
  23. )
  24. from dulwich.objects import (
  25. Blob,
  26. Tag,
  27. )
  28. from dulwich.repo import (
  29. BaseRepo,
  30. DictRefsContainer,
  31. MemoryRepo,
  32. )
  33. from dulwich.server import (
  34. DictBackend,
  35. )
  36. from dulwich.tests import (
  37. TestCase,
  38. )
  39. from dulwich.web import (
  40. HTTP_OK,
  41. HTTP_NOT_FOUND,
  42. HTTP_FORBIDDEN,
  43. HTTP_ERROR,
  44. send_file,
  45. get_text_file,
  46. get_loose_object,
  47. get_pack_file,
  48. get_idx_file,
  49. get_info_refs,
  50. get_info_packs,
  51. handle_service_request,
  52. _LengthLimitedFile,
  53. HTTPGitRequest,
  54. HTTPGitApplication,
  55. )
  56. from utils import make_object
  57. class TestHTTPGitRequest(HTTPGitRequest):
  58. """HTTPGitRequest with overridden methods to help test caching."""
  59. def __init__(self, *args, **kwargs):
  60. HTTPGitRequest.__init__(self, *args, **kwargs)
  61. self.cached = None
  62. def nocache(self):
  63. self.cached = False
  64. def cache_forever(self):
  65. self.cached = True
  66. class WebTestCase(TestCase):
  67. """Base TestCase with useful instance vars and utility functions."""
  68. _req_class = TestHTTPGitRequest
  69. def setUp(self):
  70. super(WebTestCase, self).setUp()
  71. self._environ = {}
  72. self._req = self._req_class(self._environ, self._start_response,
  73. handlers=self._handlers())
  74. self._status = None
  75. self._headers = []
  76. self._output = StringIO()
  77. def _start_response(self, status, headers):
  78. self._status = status
  79. self._headers = list(headers)
  80. return self._output.write
  81. def _handlers(self):
  82. return None
  83. def assertContentTypeEquals(self, expected):
  84. self.assertTrue(('Content-Type', expected) in self._headers)
  85. def _test_backend(objects, refs=None, named_files=None):
  86. if not refs:
  87. refs = {}
  88. if not named_files:
  89. named_files = {}
  90. repo = MemoryRepo.init_bare(objects, refs)
  91. for path, contents in named_files.iteritems():
  92. repo._put_named_file(path, contents)
  93. return DictBackend({'/': repo})
  94. class DumbHandlersTestCase(WebTestCase):
  95. def test_send_file_not_found(self):
  96. list(send_file(self._req, None, 'text/plain'))
  97. self.assertEquals(HTTP_NOT_FOUND, self._status)
  98. def test_send_file(self):
  99. f = StringIO('foobar')
  100. output = ''.join(send_file(self._req, f, 'some/thing'))
  101. self.assertEquals('foobar', output)
  102. self.assertEquals(HTTP_OK, self._status)
  103. self.assertContentTypeEquals('some/thing')
  104. self.assertTrue(f.closed)
  105. def test_send_file_buffered(self):
  106. bufsize = 10240
  107. xs = 'x' * bufsize
  108. f = StringIO(2 * xs)
  109. self.assertEquals([xs, xs],
  110. list(send_file(self._req, f, 'some/thing')))
  111. self.assertEquals(HTTP_OK, self._status)
  112. self.assertContentTypeEquals('some/thing')
  113. self.assertTrue(f.closed)
  114. def test_send_file_error(self):
  115. class TestFile(object):
  116. def __init__(self, exc_class):
  117. self.closed = False
  118. self._exc_class = exc_class
  119. def read(self, size=-1):
  120. raise self._exc_class()
  121. def close(self):
  122. self.closed = True
  123. f = TestFile(IOError)
  124. list(send_file(self._req, f, 'some/thing'))
  125. self.assertEquals(HTTP_ERROR, self._status)
  126. self.assertTrue(f.closed)
  127. self.assertFalse(self._req.cached)
  128. # non-IOErrors are reraised
  129. f = TestFile(AttributeError)
  130. self.assertRaises(AttributeError, list,
  131. send_file(self._req, f, 'some/thing'))
  132. self.assertTrue(f.closed)
  133. self.assertFalse(self._req.cached)
  134. def test_get_text_file(self):
  135. backend = _test_backend([], named_files={'description': 'foo'})
  136. mat = re.search('.*', 'description')
  137. output = ''.join(get_text_file(self._req, backend, mat))
  138. self.assertEquals('foo', output)
  139. self.assertEquals(HTTP_OK, self._status)
  140. self.assertContentTypeEquals('text/plain')
  141. self.assertFalse(self._req.cached)
  142. def test_get_loose_object(self):
  143. blob = make_object(Blob, data='foo')
  144. backend = _test_backend([blob])
  145. mat = re.search('^(..)(.{38})$', blob.id)
  146. output = ''.join(get_loose_object(self._req, backend, mat))
  147. self.assertEquals(blob.as_legacy_object(), output)
  148. self.assertEquals(HTTP_OK, self._status)
  149. self.assertContentTypeEquals('application/x-git-loose-object')
  150. self.assertTrue(self._req.cached)
  151. def test_get_loose_object_missing(self):
  152. mat = re.search('^(..)(.{38})$', '1' * 40)
  153. list(get_loose_object(self._req, _test_backend([]), mat))
  154. self.assertEquals(HTTP_NOT_FOUND, self._status)
  155. def test_get_loose_object_error(self):
  156. blob = make_object(Blob, data='foo')
  157. backend = _test_backend([blob])
  158. mat = re.search('^(..)(.{38})$', blob.id)
  159. def as_legacy_object_error():
  160. raise IOError
  161. blob.as_legacy_object = as_legacy_object_error
  162. list(get_loose_object(self._req, backend, mat))
  163. self.assertEquals(HTTP_ERROR, self._status)
  164. def test_get_pack_file(self):
  165. pack_name = 'objects/pack/pack-%s.pack' % ('1' * 40)
  166. backend = _test_backend([], named_files={pack_name: 'pack contents'})
  167. mat = re.search('.*', pack_name)
  168. output = ''.join(get_pack_file(self._req, backend, mat))
  169. self.assertEquals('pack contents', output)
  170. self.assertEquals(HTTP_OK, self._status)
  171. self.assertContentTypeEquals('application/x-git-packed-objects')
  172. self.assertTrue(self._req.cached)
  173. def test_get_idx_file(self):
  174. idx_name = 'objects/pack/pack-%s.idx' % ('1' * 40)
  175. backend = _test_backend([], named_files={idx_name: 'idx contents'})
  176. mat = re.search('.*', idx_name)
  177. output = ''.join(get_idx_file(self._req, backend, mat))
  178. self.assertEquals('idx contents', output)
  179. self.assertEquals(HTTP_OK, self._status)
  180. self.assertContentTypeEquals('application/x-git-packed-objects-toc')
  181. self.assertTrue(self._req.cached)
  182. def test_get_info_refs(self):
  183. self._environ['QUERY_STRING'] = ''
  184. blob1 = make_object(Blob, data='1')
  185. blob2 = make_object(Blob, data='2')
  186. blob3 = make_object(Blob, data='3')
  187. tag1 = make_object(Tag, name='tag-tag',
  188. tagger='Test <test@example.com>',
  189. tag_time=12345,
  190. tag_timezone=0,
  191. message='message',
  192. object=(Blob, blob2.id))
  193. objects = [blob1, blob2, blob3, tag1]
  194. refs = {
  195. 'HEAD': '000',
  196. 'refs/heads/master': blob1.id,
  197. 'refs/tags/tag-tag': tag1.id,
  198. 'refs/tags/blob-tag': blob3.id,
  199. }
  200. backend = _test_backend(objects, refs=refs)
  201. mat = re.search('.*', '//info/refs')
  202. self.assertEquals(['%s\trefs/heads/master\n' % blob1.id,
  203. '%s\trefs/tags/blob-tag\n' % blob3.id,
  204. '%s\trefs/tags/tag-tag\n' % tag1.id,
  205. '%s\trefs/tags/tag-tag^{}\n' % blob2.id],
  206. list(get_info_refs(self._req, backend, mat)))
  207. self.assertEquals(HTTP_OK, self._status)
  208. self.assertContentTypeEquals('text/plain')
  209. self.assertFalse(self._req.cached)
  210. def test_get_info_packs(self):
  211. class TestPack(object):
  212. def __init__(self, sha):
  213. self._sha = sha
  214. def name(self):
  215. return self._sha
  216. packs = [TestPack(str(i) * 40) for i in xrange(1, 4)]
  217. class TestObjectStore(MemoryObjectStore):
  218. # property must be overridden, can't be assigned
  219. @property
  220. def packs(self):
  221. return packs
  222. store = TestObjectStore()
  223. repo = BaseRepo(store, None)
  224. backend = DictBackend({'/': repo})
  225. mat = re.search('.*', '//info/packs')
  226. output = ''.join(get_info_packs(self._req, backend, mat))
  227. expected = 'P pack-%s.pack\n' * 3
  228. expected %= ('1' * 40, '2' * 40, '3' * 40)
  229. self.assertEquals(expected, output)
  230. self.assertEquals(HTTP_OK, self._status)
  231. self.assertContentTypeEquals('text/plain')
  232. self.assertFalse(self._req.cached)
  233. class SmartHandlersTestCase(WebTestCase):
  234. class _TestUploadPackHandler(object):
  235. def __init__(self, backend, args, proto, stateless_rpc=False,
  236. advertise_refs=False):
  237. self.args = args
  238. self.proto = proto
  239. self.stateless_rpc = stateless_rpc
  240. self.advertise_refs = advertise_refs
  241. def handle(self):
  242. self.proto.write('handled input: %s' % self.proto.recv(1024))
  243. def _make_handler(self, *args, **kwargs):
  244. self._handler = self._TestUploadPackHandler(*args, **kwargs)
  245. return self._handler
  246. def _handlers(self):
  247. return {'git-upload-pack': self._make_handler}
  248. def test_handle_service_request_unknown(self):
  249. mat = re.search('.*', '/git-evil-handler')
  250. list(handle_service_request(self._req, 'backend', mat))
  251. self.assertEquals(HTTP_FORBIDDEN, self._status)
  252. self.assertFalse(self._req.cached)
  253. def _run_handle_service_request(self, content_length=None):
  254. self._environ['wsgi.input'] = StringIO('foo')
  255. if content_length is not None:
  256. self._environ['CONTENT_LENGTH'] = content_length
  257. mat = re.search('.*', '/git-upload-pack')
  258. handler_output = ''.join(
  259. handle_service_request(self._req, 'backend', mat))
  260. write_output = self._output.getvalue()
  261. # Ensure all output was written via the write callback.
  262. self.assertEqual('', handler_output)
  263. self.assertEqual('handled input: foo', write_output)
  264. self.assertContentTypeEquals('application/x-git-upload-pack-response')
  265. self.assertFalse(self._handler.advertise_refs)
  266. self.assertTrue(self._handler.stateless_rpc)
  267. self.assertFalse(self._req.cached)
  268. def test_handle_service_request(self):
  269. self._run_handle_service_request()
  270. def test_handle_service_request_with_length(self):
  271. self._run_handle_service_request(content_length='3')
  272. def test_handle_service_request_empty_length(self):
  273. self._run_handle_service_request(content_length='')
  274. def test_get_info_refs_unknown(self):
  275. self._environ['QUERY_STRING'] = 'service=git-evil-handler'
  276. list(get_info_refs(self._req, 'backend', None))
  277. self.assertEquals(HTTP_FORBIDDEN, self._status)
  278. self.assertFalse(self._req.cached)
  279. def test_get_info_refs(self):
  280. self._environ['wsgi.input'] = StringIO('foo')
  281. self._environ['QUERY_STRING'] = 'service=git-upload-pack'
  282. mat = re.search('.*', '/git-upload-pack')
  283. handler_output = ''.join(get_info_refs(self._req, 'backend', mat))
  284. write_output = self._output.getvalue()
  285. self.assertEquals(('001e# service=git-upload-pack\n'
  286. '0000'
  287. # input is ignored by the handler
  288. 'handled input: '), write_output)
  289. # Ensure all output was written via the write callback.
  290. self.assertEquals('', handler_output)
  291. self.assertTrue(self._handler.advertise_refs)
  292. self.assertTrue(self._handler.stateless_rpc)
  293. self.assertFalse(self._req.cached)
  294. class LengthLimitedFileTestCase(TestCase):
  295. def test_no_cutoff(self):
  296. f = _LengthLimitedFile(StringIO('foobar'), 1024)
  297. self.assertEquals('foobar', f.read())
  298. def test_cutoff(self):
  299. f = _LengthLimitedFile(StringIO('foobar'), 3)
  300. self.assertEquals('foo', f.read())
  301. self.assertEquals('', f.read())
  302. def test_multiple_reads(self):
  303. f = _LengthLimitedFile(StringIO('foobar'), 3)
  304. self.assertEquals('fo', f.read(2))
  305. self.assertEquals('o', f.read(2))
  306. self.assertEquals('', f.read())
  307. class HTTPGitRequestTestCase(WebTestCase):
  308. # This class tests the contents of the actual cache headers
  309. _req_class = HTTPGitRequest
  310. def test_not_found(self):
  311. self._req.cache_forever() # cache headers should be discarded
  312. message = 'Something not found'
  313. self.assertEquals(message, self._req.not_found(message))
  314. self.assertEquals(HTTP_NOT_FOUND, self._status)
  315. self.assertEquals(set([('Content-Type', 'text/plain')]),
  316. set(self._headers))
  317. def test_forbidden(self):
  318. self._req.cache_forever() # cache headers should be discarded
  319. message = 'Something not found'
  320. self.assertEquals(message, self._req.forbidden(message))
  321. self.assertEquals(HTTP_FORBIDDEN, self._status)
  322. self.assertEquals(set([('Content-Type', 'text/plain')]),
  323. set(self._headers))
  324. def test_respond_ok(self):
  325. self._req.respond()
  326. self.assertEquals([], self._headers)
  327. self.assertEquals(HTTP_OK, self._status)
  328. def test_respond(self):
  329. self._req.nocache()
  330. self._req.respond(status=402, content_type='some/type',
  331. headers=[('X-Foo', 'foo'), ('X-Bar', 'bar')])
  332. self.assertEquals(set([
  333. ('X-Foo', 'foo'),
  334. ('X-Bar', 'bar'),
  335. ('Content-Type', 'some/type'),
  336. ('Expires', 'Fri, 01 Jan 1980 00:00:00 GMT'),
  337. ('Pragma', 'no-cache'),
  338. ('Cache-Control', 'no-cache, max-age=0, must-revalidate'),
  339. ]), set(self._headers))
  340. self.assertEquals(402, self._status)
  341. class HTTPGitApplicationTestCase(TestCase):
  342. def setUp(self):
  343. super(HTTPGitApplicationTestCase, self).setUp()
  344. self._app = HTTPGitApplication('backend')
  345. def test_call(self):
  346. def test_handler(req, backend, mat):
  347. # tests interface used by all handlers
  348. self.assertEquals(environ, req.environ)
  349. self.assertEquals('backend', backend)
  350. self.assertEquals('/foo', mat.group(0))
  351. return 'output'
  352. self._app.services = {
  353. ('GET', re.compile('/foo$')): test_handler,
  354. }
  355. environ = {
  356. 'PATH_INFO': '/foo',
  357. 'REQUEST_METHOD': 'GET',
  358. }
  359. self.assertEquals('output', self._app(environ, None))