test_web.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. # test_web.py -- Tests for the git HTTP server
  2. # Copryight (C) 2010 Google, Inc.
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # or (at your option) any later version of the License.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  17. # MA 02110-1301, USA.
  18. """Tests for the Git HTTP server."""
  19. from cStringIO import StringIO
  20. import re
  21. from unittest import TestCase
  22. from dulwich.objects import (
  23. type_map,
  24. Tag,
  25. Blob,
  26. )
  27. from dulwich.web import (
  28. HTTP_OK,
  29. HTTP_NOT_FOUND,
  30. HTTP_FORBIDDEN,
  31. send_file,
  32. get_info_refs,
  33. handle_service_request,
  34. _LengthLimitedFile,
  35. HTTPGitRequest,
  36. HTTPGitApplication,
  37. )
  38. class WebTestCase(TestCase):
  39. """Base TestCase that sets up some useful instance vars."""
  40. def setUp(self):
  41. self._environ = {}
  42. self._req = HTTPGitRequest(self._environ, self._start_response)
  43. self._status = None
  44. self._headers = []
  45. def _start_response(self, status, headers):
  46. self._status = status
  47. self._headers = list(headers)
  48. class DumbHandlersTestCase(WebTestCase):
  49. def test_send_file_not_found(self):
  50. list(send_file(self._req, None, 'text/plain'))
  51. self.assertEquals(HTTP_NOT_FOUND, self._status)
  52. def test_send_file(self):
  53. f = StringIO('foobar')
  54. output = ''.join(send_file(self._req, f, 'text/plain'))
  55. self.assertEquals('foobar', output)
  56. self.assertEquals(HTTP_OK, self._status)
  57. self.assertTrue(('Content-Type', 'text/plain') in self._headers)
  58. self.assertTrue(f.closed)
  59. def test_send_file_buffered(self):
  60. bufsize = 10240
  61. xs = 'x' * bufsize
  62. f = StringIO(2 * xs)
  63. self.assertEquals([xs, xs],
  64. list(send_file(self._req, f, 'text/plain')))
  65. self.assertEquals(HTTP_OK, self._status)
  66. self.assertTrue(('Content-Type', 'text/plain') in self._headers)
  67. self.assertTrue(f.closed)
  68. def test_send_file_error(self):
  69. class TestFile(object):
  70. def __init__(self):
  71. self.closed = False
  72. def read(self, size=-1):
  73. raise IOError
  74. def close(self):
  75. self.closed = True
  76. f = TestFile()
  77. list(send_file(self._req, f, 'text/plain'))
  78. self.assertEquals(HTTP_NOT_FOUND, self._status)
  79. self.assertTrue(f.closed)
  80. def test_get_info_refs(self):
  81. self._environ['QUERY_STRING'] = ''
  82. class TestTag(object):
  83. type = Tag().type
  84. def __init__(self, sha, obj_type, obj_sha):
  85. self.sha = lambda: sha
  86. self.object = (obj_type, obj_sha)
  87. class TestBlob(object):
  88. type = Blob().type
  89. def __init__(self, sha):
  90. self.sha = lambda: sha
  91. blob1 = TestBlob('111')
  92. blob2 = TestBlob('222')
  93. blob3 = TestBlob('333')
  94. tag1 = TestTag('aaa', TestTag.type, 'bbb')
  95. tag2 = TestTag('bbb', TestBlob.type, '222')
  96. class TestBackend(object):
  97. def __init__(self):
  98. objects = [blob1, blob2, blob3, tag1, tag2]
  99. self.repo = dict((o.sha(), o) for o in objects)
  100. def get_refs(self):
  101. return {
  102. 'HEAD': '000',
  103. 'refs/heads/master': blob1.sha(),
  104. 'refs/tags/tag-tag': tag1.sha(),
  105. 'refs/tags/blob-tag': blob3.sha(),
  106. }
  107. self.assertEquals(['111\trefs/heads/master\n',
  108. '333\trefs/tags/blob-tag\n',
  109. 'aaa\trefs/tags/tag-tag\n',
  110. '222\trefs/tags/tag-tag^{}\n'],
  111. list(get_info_refs(self._req, TestBackend(), None)))
  112. class SmartHandlersTestCase(WebTestCase):
  113. class TestProtocol(object):
  114. def __init__(self, handler):
  115. self._handler = handler
  116. def write_pkt_line(self, line):
  117. if line is None:
  118. self._handler.write('flush-pkt\n')
  119. else:
  120. self._handler.write('pkt-line: %s' % line)
  121. class _TestUploadPackHandler(object):
  122. def __init__(self, backend, read, write, stateless_rpc=False,
  123. advertise_refs=False):
  124. self.read = read
  125. self.write = write
  126. self.proto = SmartHandlersTestCase.TestProtocol(self)
  127. self.stateless_rpc = stateless_rpc
  128. self.advertise_refs = advertise_refs
  129. def handle(self):
  130. self.write('handled input: %s' % self.read())
  131. def _MakeHandler(self, *args, **kwargs):
  132. self._handler = self._TestUploadPackHandler(*args, **kwargs)
  133. return self._handler
  134. def services(self):
  135. return {'git-upload-pack': self._MakeHandler}
  136. def test_handle_service_request_unknown(self):
  137. mat = re.search('.*', '/git-evil-handler')
  138. list(handle_service_request(self._req, 'backend', mat))
  139. self.assertEquals(HTTP_FORBIDDEN, self._status)
  140. def test_handle_service_request(self):
  141. self._environ['wsgi.input'] = StringIO('foo')
  142. mat = re.search('.*', '/git-upload-pack')
  143. output = ''.join(handle_service_request(self._req, 'backend', mat,
  144. services=self.services()))
  145. self.assertEqual('handled input: foo', output)
  146. response_type = 'application/x-git-upload-pack-response'
  147. self.assertTrue(('Content-Type', response_type) in self._headers)
  148. self.assertFalse(self._handler.advertise_refs)
  149. self.assertTrue(self._handler.stateless_rpc)
  150. def test_handle_service_request_with_length(self):
  151. self._environ['wsgi.input'] = StringIO('foobar')
  152. self._environ['CONTENT_LENGTH'] = 3
  153. mat = re.search('.*', '/git-upload-pack')
  154. output = ''.join(handle_service_request(self._req, 'backend', mat,
  155. services=self.services()))
  156. self.assertEqual('handled input: foo', output)
  157. response_type = 'application/x-git-upload-pack-response'
  158. self.assertTrue(('Content-Type', response_type) in self._headers)
  159. def test_get_info_refs_unknown(self):
  160. self._environ['QUERY_STRING'] = 'service=git-evil-handler'
  161. list(get_info_refs(self._req, 'backend', None,
  162. services=self.services()))
  163. self.assertEquals(HTTP_FORBIDDEN, self._status)
  164. def test_get_info_refs(self):
  165. self._environ['wsgi.input'] = StringIO('foo')
  166. self._environ['QUERY_STRING'] = 'service=git-upload-pack'
  167. output = ''.join(get_info_refs(self._req, 'backend', None,
  168. services=self.services()))
  169. self.assertEquals(('pkt-line: # service=git-upload-pack\n'
  170. 'flush-pkt\n'
  171. # input is ignored by the handler
  172. 'handled input: '), output)
  173. self.assertTrue(self._handler.advertise_refs)
  174. self.assertTrue(self._handler.stateless_rpc)
  175. class LengthLimitedFileTestCase(TestCase):
  176. def test_no_cutoff(self):
  177. f = _LengthLimitedFile(StringIO('foobar'), 1024)
  178. self.assertEquals('foobar', f.read())
  179. def test_cutoff(self):
  180. f = _LengthLimitedFile(StringIO('foobar'), 3)
  181. self.assertEquals('foo', f.read())
  182. self.assertEquals('', f.read())
  183. def test_multiple_reads(self):
  184. f = _LengthLimitedFile(StringIO('foobar'), 3)
  185. self.assertEquals('fo', f.read(2))
  186. self.assertEquals('o', f.read(2))
  187. self.assertEquals('', f.read())
  188. class HTTPGitRequestTestCase(WebTestCase):
  189. def test_not_found(self):
  190. self._req.cache_forever() # cache headers should be discarded
  191. message = 'Something not found'
  192. self.assertEquals(message, self._req.not_found(message))
  193. self.assertEquals(HTTP_NOT_FOUND, self._status)
  194. self.assertEquals(set([('Content-Type', 'text/plain')]),
  195. set(self._headers))
  196. def test_forbidden(self):
  197. self._req.cache_forever() # cache headers should be discarded
  198. message = 'Something not found'
  199. self.assertEquals(message, self._req.forbidden(message))
  200. self.assertEquals(HTTP_FORBIDDEN, self._status)
  201. self.assertEquals(set([('Content-Type', 'text/plain')]),
  202. set(self._headers))
  203. def test_respond_ok(self):
  204. self._req.respond()
  205. self.assertEquals([], self._headers)
  206. self.assertEquals(HTTP_OK, self._status)
  207. def test_respond(self):
  208. self._req.nocache()
  209. self._req.respond(status=402, content_type='some/type',
  210. headers=[('X-Foo', 'foo'), ('X-Bar', 'bar')])
  211. self.assertEquals(set([
  212. ('X-Foo', 'foo'),
  213. ('X-Bar', 'bar'),
  214. ('Content-Type', 'some/type'),
  215. ('Expires', 'Fri, 01 Jan 1980 00:00:00 GMT'),
  216. ('Pragma', 'no-cache'),
  217. ('Cache-Control', 'no-cache, max-age=0, must-revalidate'),
  218. ]), set(self._headers))
  219. self.assertEquals(402, self._status)
  220. class HTTPGitApplicationTestCase(TestCase):
  221. def setUp(self):
  222. self._app = HTTPGitApplication('backend')
  223. def test_call(self):
  224. def test_handler(req, backend, mat):
  225. # tests interface used by all handlers
  226. self.assertEquals(environ, req.environ)
  227. self.assertEquals('backend', backend)
  228. self.assertEquals('/foo', mat.group(0))
  229. return 'output'
  230. self._app.services = {
  231. ('GET', re.compile('/foo$')): test_handler,
  232. }
  233. environ = {
  234. 'PATH_INFO': '/foo',
  235. 'REQUEST_METHOD': 'GET',
  236. }
  237. self.assertEquals('output', self._app(environ, None))