|
@@ -19,6 +19,7 @@
|
|
|
"""Tests for the Git HTTP server."""
|
|
|
|
|
|
from cStringIO import StringIO
|
|
|
+import gzip
|
|
|
import re
|
|
|
import os
|
|
|
|
|
@@ -44,6 +45,7 @@ from dulwich.web import (
|
|
|
HTTP_NOT_FOUND,
|
|
|
HTTP_FORBIDDEN,
|
|
|
HTTP_ERROR,
|
|
|
+ GunzipFilter,
|
|
|
send_file,
|
|
|
get_text_file,
|
|
|
get_loose_object,
|
|
@@ -419,19 +421,62 @@ class HTTPGitApplicationTestCase(TestCase):
|
|
|
super(HTTPGitApplicationTestCase, self).setUp()
|
|
|
self._app = HTTPGitApplication('backend')
|
|
|
|
|
|
- def test_call(self):
|
|
|
- def test_handler(req, backend, mat):
|
|
|
-
|
|
|
- self.assertEqual(environ, req.environ)
|
|
|
- self.assertEqual('backend', backend)
|
|
|
- self.assertEqual('/foo', mat.group(0))
|
|
|
- return 'output'
|
|
|
-
|
|
|
- self._app.services = {
|
|
|
- ('GET', re.compile('/foo$')): test_handler,
|
|
|
+ self._environ = {
|
|
|
+ 'PATH_INFO': '/foo',
|
|
|
+ 'REQUEST_METHOD': 'GET',
|
|
|
}
|
|
|
- environ = {
|
|
|
- 'PATH_INFO': '/foo',
|
|
|
- 'REQUEST_METHOD': 'GET',
|
|
|
- }
|
|
|
- self.assertEqual('output', self._app(environ, None))
|
|
|
+
|
|
|
+ def _test_handler(self, req, backend, mat):
|
|
|
+
|
|
|
+ self.assertEquals(self._environ, req.environ)
|
|
|
+ self.assertEquals('backend', backend)
|
|
|
+ self.assertEquals('/foo', mat.group(0))
|
|
|
+ return 'output'
|
|
|
+
|
|
|
+ def _add_handler(self, app):
|
|
|
+ req = self._environ['REQUEST_METHOD']
|
|
|
+ app.services = {
|
|
|
+ (req, re.compile('/foo$')): self._test_handler,
|
|
|
+ }
|
|
|
+
|
|
|
+ def test_call(self):
|
|
|
+ self._add_handler(self._app)
|
|
|
+ self.assertEquals('output', self._app(self._environ, None))
|
|
|
+
|
|
|
+
|
|
|
+class GunzipTestCase(HTTPGitApplicationTestCase):
|
|
|
+ """TestCase for testing the GunzipFilter, ensuring the wsgi.input
|
|
|
+ is correctly decompressed and headers are corrected.
|
|
|
+ """
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ super(GunzipTestCase, self).setUp()
|
|
|
+ self._app = GunzipFilter(self._app)
|
|
|
+ self._environ['HTTP_CONTENT_ENCODING'] = 'gzip'
|
|
|
+ self._environ['REQUEST_METHOD'] = 'POST'
|
|
|
+
|
|
|
+ def _get_zstream(self, text):
|
|
|
+ zstream = StringIO()
|
|
|
+ zfile = gzip.GzipFile(fileobj=zstream, mode='w')
|
|
|
+ zfile.write(text)
|
|
|
+ zfile.close()
|
|
|
+ return zstream
|
|
|
+
|
|
|
+ def test_call(self):
|
|
|
+ self._add_handler(self._app.app)
|
|
|
+ orig = self.__class__.__doc__
|
|
|
+ zstream = self._get_zstream(orig)
|
|
|
+ zlength = zstream.tell()
|
|
|
+ zstream.seek(0)
|
|
|
+ self.assertLess(zlength, len(orig))
|
|
|
+ self.assertEquals(self._environ['HTTP_CONTENT_ENCODING'],
|
|
|
+ 'gzip')
|
|
|
+ self._environ['CONTENT_LENGTH'] = zlength
|
|
|
+ self._environ['wsgi.input'] = zstream
|
|
|
+ app_output = self._app(self._environ, None)
|
|
|
+ buf = self._environ['wsgi.input']
|
|
|
+ self.assertIsNot(buf, zstream)
|
|
|
+ buf.seek(0)
|
|
|
+ self.assertEquals(orig, buf.read())
|
|
|
+ self.assertLess(zlength, int(self._environ['CONTENT_LENGTH']))
|
|
|
+ self.assertNotIn('HTTP_CONTENT_ENCODING', self._environ)
|