test_web.py 11 KB


  1. # test_web.py -- Tests for the git HTTP server
  2. # Copryight (C) 2010 Google, Inc.
  3. #
  4. # This program is free software; you can redistribute it and/or
  5. # modify it under the terms of the GNU General Public License
  6. # as published by the Free Software Foundation; version 2
  7. # or (at your option) any later version of the License.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU General Public License
  15. # along with this program; if not, write to the Free Software
  16. # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  17. # MA 02110-1301, USA.
  18. """Tests for the Git HTTP server."""
  19. from cStringIO import StringIO
  20. import re
  21. from unittest import TestCase
  22. from dulwich.objects import (
  23. Blob,
  24. )
  25. from dulwich.web import (
  26. HTTP_OK,
  27. HTTP_NOT_FOUND,
  28. HTTP_FORBIDDEN,
  29. send_file,
  30. get_info_refs,
  31. handle_service_request,
  32. _LengthLimitedFile,
  33. HTTPGitRequest,
  34. HTTPGitApplication,
  35. )
  36. class WebTestCase(TestCase):
  37. """Base TestCase that sets up some useful instance vars."""
  38. def setUp(self):
  39. self._environ = {}
  40. self._req = HTTPGitRequest(self._environ, self._start_response)
  41. self._status = None
  42. self._headers = []
  43. def _start_response(self, status, headers):
  44. self._status = status
  45. self._headers = list(headers)
  46. class DumbHandlersTestCase(WebTestCase):
  47. def test_send_file_not_found(self):
  48. list(send_file(self._req, None, 'text/plain'))
  49. self.assertEquals(HTTP_NOT_FOUND, self._status)
  50. def test_send_file(self):
  51. f = StringIO('foobar')
  52. output = ''.join(send_file(self._req, f, 'text/plain'))
  53. self.assertEquals('foobar', output)
  54. self.assertEquals(HTTP_OK, self._status)
  55. self.assertTrue(('Content-Type', 'text/plain') in self._headers)
  56. self.assertTrue(f.closed)
  57. def test_send_file_buffered(self):
  58. bufsize = 10240
  59. xs = 'x' * bufsize
  60. f = StringIO(2 * xs)
  61. self.assertEquals([xs, xs],
  62. list(send_file(self._req, f, 'text/plain')))
  63. self.assertEquals(HTTP_OK, self._status)
  64. self.assertTrue(('Content-Type', 'text/plain') in self._headers)
  65. self.assertTrue(f.closed)
  66. def test_send_file_error(self):
  67. class TestFile(object):
  68. def __init__(self):
  69. self.closed = False
  70. def read(self, size=-1):
  71. raise IOError
  72. def close(self):
  73. self.closed = True
  74. f = TestFile()
  75. list(send_file(self._req, f, 'text/plain'))
  76. self.assertEquals(HTTP_NOT_FOUND, self._status)
  77. self.assertTrue(f.closed)
  78. def test_get_info_refs(self):
  79. self._environ['QUERY_STRING'] = ''
  80. class TestTag(object):
  81. def __init__(self, sha, obj_class, obj_sha):
  82. self.sha = lambda: sha
  83. self.object = (obj_class, obj_sha)
  84. class TestBlob(object):
  85. def __init__(self, sha):
  86. self.sha = lambda: sha
  87. blob1 = TestBlob('111')
  88. blob2 = TestBlob('222')
  89. blob3 = TestBlob('333')
  90. tag1 = TestTag('aaa', Blob, '222')
  91. class TestRepo(object):
  92. def __init__(self, objects, peeled):
  93. self._objects = dict((o.sha(), o) for o in objects)
  94. self._peeled = peeled
  95. def get_peeled(self, sha):
  96. return self._peeled[sha]
  97. def __getitem__(self, sha):
  98. return self._objects[sha]
  99. def get_refs(self):
  100. return {
  101. 'HEAD': '000',
  102. 'refs/heads/master': blob1.sha(),
  103. 'refs/tags/tag-tag': tag1.sha(),
  104. 'refs/tags/blob-tag': blob3.sha(),
  105. }
  106. class TestBackend(object):
  107. def __init__(self):
  108. objects = [blob1, blob2, blob3, tag1]
  109. self.repo = TestRepo(objects, {
  110. 'HEAD': '000',
  111. 'refs/heads/master': blob1.sha(),
  112. 'refs/tags/tag-tag': blob2.sha(),
  113. 'refs/tags/blob-tag': blob3.sha(),
  114. })
  115. def open_repository(self, path):
  116. assert path == '/'
  117. return self.repo
  118. def get_refs(self):
  119. return {
  120. 'HEAD': '000',
  121. 'refs/heads/master': blob1.sha(),
  122. 'refs/tags/tag-tag': tag1.sha(),
  123. 'refs/tags/blob-tag': blob3.sha(),
  124. }
  125. mat = re.search('.*', '//info/refs')
  126. self.assertEquals(['111\trefs/heads/master\n',
  127. '333\trefs/tags/blob-tag\n',
  128. 'aaa\trefs/tags/tag-tag\n',
  129. '222\trefs/tags/tag-tag^{}\n'],
  130. list(get_info_refs(self._req, TestBackend(), mat)))
  131. class SmartHandlersTestCase(WebTestCase):
  132. class _TestUploadPackHandler(object):
  133. def __init__(self, backend, args, proto, stateless_rpc=False,
  134. advertise_refs=False):
  135. self.args = args
  136. self.proto = proto
  137. self.stateless_rpc = stateless_rpc
  138. self.advertise_refs = advertise_refs
  139. def handle(self):
  140. self.proto.write('handled input: %s' % self.proto.recv(1024))
  141. def _make_handler(self, *args, **kwargs):
  142. self._handler = self._TestUploadPackHandler(*args, **kwargs)
  143. return self._handler
  144. def services(self):
  145. return {'git-upload-pack': self._make_handler}
  146. def test_handle_service_request_unknown(self):
  147. mat = re.search('.*', '/git-evil-handler')
  148. list(handle_service_request(self._req, 'backend', mat))
  149. self.assertEquals(HTTP_FORBIDDEN, self._status)
  150. def test_handle_service_request(self):
  151. self._environ['wsgi.input'] = StringIO('foo')
  152. mat = re.search('.*', '/git-upload-pack')
  153. output = ''.join(handle_service_request(self._req, 'backend', mat,
  154. services=self.services()))
  155. self.assertEqual('handled input: foo', output)
  156. response_type = 'application/x-git-upload-pack-response'
  157. self.assertTrue(('Content-Type', response_type) in self._headers)
  158. self.assertFalse(self._handler.advertise_refs)
  159. self.assertTrue(self._handler.stateless_rpc)
  160. def test_handle_service_request_with_length(self):
  161. self._environ['wsgi.input'] = StringIO('foobar')
  162. self._environ['CONTENT_LENGTH'] = 3
  163. mat = re.search('.*', '/git-upload-pack')
  164. output = ''.join(handle_service_request(self._req, 'backend', mat,
  165. services=self.services()))
  166. self.assertEqual('handled input: foo', output)
  167. response_type = 'application/x-git-upload-pack-response'
  168. self.assertTrue(('Content-Type', response_type) in self._headers)
  169. def test_get_info_refs_unknown(self):
  170. self._environ['QUERY_STRING'] = 'service=git-evil-handler'
  171. list(get_info_refs(self._req, 'backend', None,
  172. services=self.services()))
  173. self.assertEquals(HTTP_FORBIDDEN, self._status)
  174. def test_get_info_refs(self):
  175. self._environ['wsgi.input'] = StringIO('foo')
  176. self._environ['QUERY_STRING'] = 'service=git-upload-pack'
  177. mat = re.search('.*', '/git-upload-pack')
  178. output = ''.join(get_info_refs(self._req, 'backend', mat,
  179. services=self.services()))
  180. self.assertEquals(('001e# service=git-upload-pack\n'
  181. '0000'
  182. # input is ignored by the handler
  183. 'handled input: '), output)
  184. self.assertTrue(self._handler.advertise_refs)
  185. self.assertTrue(self._handler.stateless_rpc)
  186. class LengthLimitedFileTestCase(TestCase):
  187. def test_no_cutoff(self):
  188. f = _LengthLimitedFile(StringIO('foobar'), 1024)
  189. self.assertEquals('foobar', f.read())
  190. def test_cutoff(self):
  191. f = _LengthLimitedFile(StringIO('foobar'), 3)
  192. self.assertEquals('foo', f.read())
  193. self.assertEquals('', f.read())
  194. def test_multiple_reads(self):
  195. f = _LengthLimitedFile(StringIO('foobar'), 3)
  196. self.assertEquals('fo', f.read(2))
  197. self.assertEquals('o', f.read(2))
  198. self.assertEquals('', f.read())
  199. class HTTPGitRequestTestCase(WebTestCase):
  200. def test_not_found(self):
  201. self._req.cache_forever() # cache headers should be discarded
  202. message = 'Something not found'
  203. self.assertEquals(message, self._req.not_found(message))
  204. self.assertEquals(HTTP_NOT_FOUND, self._status)
  205. self.assertEquals(set([('Content-Type', 'text/plain')]),
  206. set(self._headers))
  207. def test_forbidden(self):
  208. self._req.cache_forever() # cache headers should be discarded
  209. message = 'Something not found'
  210. self.assertEquals(message, self._req.forbidden(message))
  211. self.assertEquals(HTTP_FORBIDDEN, self._status)
  212. self.assertEquals(set([('Content-Type', 'text/plain')]),
  213. set(self._headers))
  214. def test_respond_ok(self):
  215. self._req.respond()
  216. self.assertEquals([], self._headers)
  217. self.assertEquals(HTTP_OK, self._status)
  218. def test_respond(self):
  219. self._req.nocache()
  220. self._req.respond(status=402, content_type='some/type',
  221. headers=[('X-Foo', 'foo'), ('X-Bar', 'bar')])
  222. self.assertEquals(set([
  223. ('X-Foo', 'foo'),
  224. ('X-Bar', 'bar'),
  225. ('Content-Type', 'some/type'),
  226. ('Expires', 'Fri, 01 Jan 1980 00:00:00 GMT'),
  227. ('Pragma', 'no-cache'),
  228. ('Cache-Control', 'no-cache, max-age=0, must-revalidate'),
  229. ]), set(self._headers))
  230. self.assertEquals(402, self._status)
  231. class HTTPGitApplicationTestCase(TestCase):
  232. def setUp(self):
  233. self._app = HTTPGitApplication('backend')
  234. def test_call(self):
  235. def test_handler(req, backend, mat):
  236. # tests interface used by all handlers
  237. self.assertEquals(environ, req.environ)
  238. self.assertEquals('backend', backend)
  239. self.assertEquals('/foo', mat.group(0))
  240. return 'output'
  241. self._app.services = {
  242. ('GET', re.compile('/foo$')): test_handler,
  243. }
  244. environ = {
  245. 'PATH_INFO': '/foo',
  246. 'REQUEST_METHOD': 'GET',
  247. }
  248. self.assertEquals('output', self._app(environ, None))