|
@@ -19,6 +19,7 @@
|
|
|
"""Tests for the Git HTTP server."""
|
|
|
|
|
|
from cStringIO import StringIO
|
|
|
+import gzip
|
|
|
import re
|
|
|
import os
|
|
|
|
|
@@ -44,6 +45,7 @@ from dulwich.web import (
|
|
|
HTTP_NOT_FOUND,
|
|
|
HTTP_FORBIDDEN,
|
|
|
HTTP_ERROR,
|
|
|
+ GunzipFilter,
|
|
|
send_file,
|
|
|
get_text_file,
|
|
|
get_loose_object,
|
|
@@ -117,13 +119,13 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
|
|
|
def test_send_file_not_found(self):
|
|
|
list(send_file(self._req, None, 'text/plain'))
|
|
|
- self.assertEquals(HTTP_NOT_FOUND, self._status)
|
|
|
+ self.assertEqual(HTTP_NOT_FOUND, self._status)
|
|
|
|
|
|
def test_send_file(self):
|
|
|
f = StringIO('foobar')
|
|
|
output = ''.join(send_file(self._req, f, 'some/thing'))
|
|
|
- self.assertEquals('foobar', output)
|
|
|
- self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertEqual('foobar', output)
|
|
|
+ self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('some/thing')
|
|
|
self.assertTrue(f.closed)
|
|
|
|
|
@@ -131,9 +133,9 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
bufsize = 10240
|
|
|
xs = 'x' * bufsize
|
|
|
f = StringIO(2 * xs)
|
|
|
- self.assertEquals([xs, xs],
|
|
|
+ self.assertEqual([xs, xs],
|
|
|
list(send_file(self._req, f, 'some/thing')))
|
|
|
- self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('some/thing')
|
|
|
self.assertTrue(f.closed)
|
|
|
|
|
@@ -151,7 +153,7 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
|
|
|
f = TestFile(IOError)
|
|
|
list(send_file(self._req, f, 'some/thing'))
|
|
|
- self.assertEquals(HTTP_ERROR, self._status)
|
|
|
+ self.assertEqual(HTTP_ERROR, self._status)
|
|
|
self.assertTrue(f.closed)
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
@@ -166,8 +168,8 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
backend = _test_backend([], named_files={'description': 'foo'})
|
|
|
mat = re.search('.*', 'description')
|
|
|
output = ''.join(get_text_file(self._req, backend, mat))
|
|
|
- self.assertEquals('foo', output)
|
|
|
- self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertEqual('foo', output)
|
|
|
+ self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('text/plain')
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
@@ -176,15 +178,15 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
backend = _test_backend([blob])
|
|
|
mat = re.search('^(..)(.{38})$', blob.id)
|
|
|
output = ''.join(get_loose_object(self._req, backend, mat))
|
|
|
- self.assertEquals(blob.as_legacy_object(), output)
|
|
|
- self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertEqual(blob.as_legacy_object(), output)
|
|
|
+ self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('application/x-git-loose-object')
|
|
|
self.assertTrue(self._req.cached)
|
|
|
|
|
|
def test_get_loose_object_missing(self):
|
|
|
mat = re.search('^(..)(.{38})$', '1' * 40)
|
|
|
list(get_loose_object(self._req, _test_backend([]), mat))
|
|
|
- self.assertEquals(HTTP_NOT_FOUND, self._status)
|
|
|
+ self.assertEqual(HTTP_NOT_FOUND, self._status)
|
|
|
|
|
|
def test_get_loose_object_error(self):
|
|
|
blob = make_object(Blob, data='foo')
|
|
@@ -196,15 +198,15 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
|
|
|
blob.as_legacy_object = as_legacy_object_error
|
|
|
list(get_loose_object(self._req, backend, mat))
|
|
|
- self.assertEquals(HTTP_ERROR, self._status)
|
|
|
+ self.assertEqual(HTTP_ERROR, self._status)
|
|
|
|
|
|
def test_get_pack_file(self):
|
|
|
pack_name = os.path.join('objects', 'pack', 'pack-%s.pack' % ('1' * 40))
|
|
|
backend = _test_backend([], named_files={pack_name: 'pack contents'})
|
|
|
mat = re.search('.*', pack_name)
|
|
|
output = ''.join(get_pack_file(self._req, backend, mat))
|
|
|
- self.assertEquals('pack contents', output)
|
|
|
- self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertEqual('pack contents', output)
|
|
|
+ self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('application/x-git-packed-objects')
|
|
|
self.assertTrue(self._req.cached)
|
|
|
|
|
@@ -213,8 +215,8 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
backend = _test_backend([], named_files={idx_name: 'idx contents'})
|
|
|
mat = re.search('.*', idx_name)
|
|
|
output = ''.join(get_idx_file(self._req, backend, mat))
|
|
|
- self.assertEquals('idx contents', output)
|
|
|
- self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertEqual('idx contents', output)
|
|
|
+ self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('application/x-git-packed-objects-toc')
|
|
|
self.assertTrue(self._req.cached)
|
|
|
|
|
@@ -242,12 +244,12 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
backend = _test_backend(objects, refs=refs)
|
|
|
|
|
|
mat = re.search('.*', '//info/refs')
|
|
|
- self.assertEquals(['%s\trefs/heads/master\n' % blob1.id,
|
|
|
+ self.assertEqual(['%s\trefs/heads/master\n' % blob1.id,
|
|
|
'%s\trefs/tags/blob-tag\n' % blob3.id,
|
|
|
'%s\trefs/tags/tag-tag\n' % tag1.id,
|
|
|
'%s\trefs/tags/tag-tag^{}\n' % blob2.id],
|
|
|
list(get_info_refs(self._req, backend, mat)))
|
|
|
- self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('text/plain')
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
@@ -274,8 +276,8 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
output = ''.join(get_info_packs(self._req, backend, mat))
|
|
|
expected = 'P pack-%s.pack\n' * 3
|
|
|
expected %= ('1' * 40, '2' * 40, '3' * 40)
|
|
|
- self.assertEquals(expected, output)
|
|
|
- self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertEqual(expected, output)
|
|
|
+ self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('text/plain')
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
@@ -303,7 +305,7 @@ class SmartHandlersTestCase(WebTestCase):
|
|
|
def test_handle_service_request_unknown(self):
|
|
|
mat = re.search('.*', '/git-evil-handler')
|
|
|
list(handle_service_request(self._req, 'backend', mat))
|
|
|
- self.assertEquals(HTTP_FORBIDDEN, self._status)
|
|
|
+ self.assertEqual(HTTP_FORBIDDEN, self._status)
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
|
def _run_handle_service_request(self, content_length=None):
|
|
@@ -334,7 +336,7 @@ class SmartHandlersTestCase(WebTestCase):
|
|
|
def test_get_info_refs_unknown(self):
|
|
|
self._environ['QUERY_STRING'] = 'service=git-evil-handler'
|
|
|
list(get_info_refs(self._req, 'backend', None))
|
|
|
- self.assertEquals(HTTP_FORBIDDEN, self._status)
|
|
|
+ self.assertEqual(HTTP_FORBIDDEN, self._status)
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
|
def test_get_info_refs(self):
|
|
@@ -344,12 +346,12 @@ class SmartHandlersTestCase(WebTestCase):
|
|
|
mat = re.search('.*', '/git-upload-pack')
|
|
|
handler_output = ''.join(get_info_refs(self._req, 'backend', mat))
|
|
|
write_output = self._output.getvalue()
|
|
|
- self.assertEquals(('001e# service=git-upload-pack\n'
|
|
|
+ self.assertEqual(('001e# service=git-upload-pack\n'
|
|
|
'0000'
|
|
|
# input is ignored by the handler
|
|
|
'handled input: '), write_output)
|
|
|
# Ensure all output was written via the write callback.
|
|
|
- self.assertEquals('', handler_output)
|
|
|
+ self.assertEqual('', handler_output)
|
|
|
self.assertTrue(self._handler.advertise_refs)
|
|
|
self.assertTrue(self._handler.http_req)
|
|
|
self.assertFalse(self._req.cached)
|
|
@@ -358,18 +360,18 @@ class SmartHandlersTestCase(WebTestCase):
|
|
|
class LengthLimitedFileTestCase(TestCase):
|
|
|
def test_no_cutoff(self):
|
|
|
f = _LengthLimitedFile(StringIO('foobar'), 1024)
|
|
|
- self.assertEquals('foobar', f.read())
|
|
|
+ self.assertEqual('foobar', f.read())
|
|
|
|
|
|
def test_cutoff(self):
|
|
|
f = _LengthLimitedFile(StringIO('foobar'), 3)
|
|
|
- self.assertEquals('foo', f.read())
|
|
|
- self.assertEquals('', f.read())
|
|
|
+ self.assertEqual('foo', f.read())
|
|
|
+ self.assertEqual('', f.read())
|
|
|
|
|
|
def test_multiple_reads(self):
|
|
|
f = _LengthLimitedFile(StringIO('foobar'), 3)
|
|
|
- self.assertEquals('fo', f.read(2))
|
|
|
- self.assertEquals('o', f.read(2))
|
|
|
- self.assertEquals('', f.read())
|
|
|
+ self.assertEqual('fo', f.read(2))
|
|
|
+ self.assertEqual('o', f.read(2))
|
|
|
+ self.assertEqual('', f.read())
|
|
|
|
|
|
|
|
|
class HTTPGitRequestTestCase(WebTestCase):
|
|
@@ -380,29 +382,29 @@ class HTTPGitRequestTestCase(WebTestCase):
|
|
|
def test_not_found(self):
|
|
|
self._req.cache_forever() # cache headers should be discarded
|
|
|
message = 'Something not found'
|
|
|
- self.assertEquals(message, self._req.not_found(message))
|
|
|
- self.assertEquals(HTTP_NOT_FOUND, self._status)
|
|
|
- self.assertEquals(set([('Content-Type', 'text/plain')]),
|
|
|
+ self.assertEqual(message, self._req.not_found(message))
|
|
|
+ self.assertEqual(HTTP_NOT_FOUND, self._status)
|
|
|
+ self.assertEqual(set([('Content-Type', 'text/plain')]),
|
|
|
set(self._headers))
|
|
|
|
|
|
def test_forbidden(self):
|
|
|
self._req.cache_forever() # cache headers should be discarded
|
|
|
message = 'Something not found'
|
|
|
- self.assertEquals(message, self._req.forbidden(message))
|
|
|
- self.assertEquals(HTTP_FORBIDDEN, self._status)
|
|
|
- self.assertEquals(set([('Content-Type', 'text/plain')]),
|
|
|
+ self.assertEqual(message, self._req.forbidden(message))
|
|
|
+ self.assertEqual(HTTP_FORBIDDEN, self._status)
|
|
|
+ self.assertEqual(set([('Content-Type', 'text/plain')]),
|
|
|
set(self._headers))
|
|
|
|
|
|
def test_respond_ok(self):
|
|
|
self._req.respond()
|
|
|
- self.assertEquals([], self._headers)
|
|
|
- self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertEqual([], self._headers)
|
|
|
+ self.assertEqual(HTTP_OK, self._status)
|
|
|
|
|
|
def test_respond(self):
|
|
|
self._req.nocache()
|
|
|
self._req.respond(status=402, content_type='some/type',
|
|
|
headers=[('X-Foo', 'foo'), ('X-Bar', 'bar')])
|
|
|
- self.assertEquals(set([
|
|
|
+ self.assertEqual(set([
|
|
|
('X-Foo', 'foo'),
|
|
|
('X-Bar', 'bar'),
|
|
|
('Content-Type', 'some/type'),
|
|
@@ -410,7 +412,7 @@ class HTTPGitRequestTestCase(WebTestCase):
|
|
|
('Pragma', 'no-cache'),
|
|
|
('Cache-Control', 'no-cache, max-age=0, must-revalidate'),
|
|
|
]), set(self._headers))
|
|
|
- self.assertEquals(402, self._status)
|
|
|
+ self.assertEqual(402, self._status)
|
|
|
|
|
|
|
|
|
class HTTPGitApplicationTestCase(TestCase):
|
|
@@ -419,19 +421,61 @@ class HTTPGitApplicationTestCase(TestCase):
|
|
|
super(HTTPGitApplicationTestCase, self).setUp()
|
|
|
self._app = HTTPGitApplication('backend')
|
|
|
|
|
|
- def test_call(self):
|
|
|
- def test_handler(req, backend, mat):
|
|
|
- # tests interface used by all handlers
|
|
|
- self.assertEquals(environ, req.environ)
|
|
|
- self.assertEquals('backend', backend)
|
|
|
- self.assertEquals('/foo', mat.group(0))
|
|
|
- return 'output'
|
|
|
-
|
|
|
- self._app.services = {
|
|
|
- ('GET', re.compile('/foo$')): test_handler,
|
|
|
+ self._environ = {
|
|
|
+ 'PATH_INFO': '/foo',
|
|
|
+ 'REQUEST_METHOD': 'GET',
|
|
|
}
|
|
|
- environ = {
|
|
|
- 'PATH_INFO': '/foo',
|
|
|
- 'REQUEST_METHOD': 'GET',
|
|
|
- }
|
|
|
- self.assertEquals('output', self._app(environ, None))
|
|
|
+
|
|
|
+ def _test_handler(self, req, backend, mat):
|
|
|
+ # tests interface used by all handlers
|
|
|
+ self.assertEqual(self._environ, req.environ)
|
|
|
+ self.assertEqual('backend', backend)
|
|
|
+ self.assertEqual('/foo', mat.group(0))
|
|
|
+ return 'output'
|
|
|
+
|
|
|
+ def _add_handler(self, app):
|
|
|
+ req = self._environ['REQUEST_METHOD']
|
|
|
+ app.services = {
|
|
|
+ (req, re.compile('/foo$')): self._test_handler,
|
|
|
+ }
|
|
|
+
|
|
|
+ def test_call(self):
|
|
|
+ self._add_handler(self._app)
|
|
|
+ self.assertEqual('output', self._app(self._environ, None))
|
|
|
+
|
|
|
+
|
|
|
+class GunzipTestCase(HTTPGitApplicationTestCase):
|
|
|
+ """TestCase for testing the GunzipFilter, ensuring the wsgi.input
|
|
|
+ is correctly decompressed and headers are corrected.
|
|
|
+ """
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ super(GunzipTestCase, self).setUp()
|
|
|
+ self._app = GunzipFilter(self._app)
|
|
|
+ self._environ['HTTP_CONTENT_ENCODING'] = 'gzip'
|
|
|
+ self._environ['REQUEST_METHOD'] = 'POST'
|
|
|
+
|
|
|
+ def _get_zstream(self, text):
|
|
|
+ zstream = StringIO()
|
|
|
+ zfile = gzip.GzipFile(fileobj=zstream, mode='w')
|
|
|
+ zfile.write(text)
|
|
|
+ zfile.close()
|
|
|
+ return zstream
|
|
|
+
|
|
|
+ def test_call(self):
|
|
|
+ self._add_handler(self._app.app)
|
|
|
+ orig = self.__class__.__doc__
|
|
|
+ zstream = self._get_zstream(orig)
|
|
|
+ zlength = zstream.tell()
|
|
|
+ zstream.seek(0)
|
|
|
+ self.assertLess(zlength, len(orig))
|
|
|
+ self.assertEqual(self._environ['HTTP_CONTENT_ENCODING'], 'gzip')
|
|
|
+ self._environ['CONTENT_LENGTH'] = zlength
|
|
|
+ self._environ['wsgi.input'] = zstream
|
|
|
+ app_output = self._app(self._environ, None)
|
|
|
+ buf = self._environ['wsgi.input']
|
|
|
+ self.assertIsNot(buf, zstream)
|
|
|
+ buf.seek(0)
|
|
|
+ self.assertEqual(orig, buf.read())
|
|
|
+ self.assertIs(None, self._environ.get('CONTENT_LENGTH'))
|
|
|
+ self.assertNotIn('HTTP_CONTENT_ENCODING', self._environ)
|