|
@@ -28,7 +28,6 @@ from dulwich.object_store import (
|
|
|
)
|
|
|
from dulwich.objects import (
|
|
|
Blob,
|
|
|
- Tag,
|
|
|
)
|
|
|
from dulwich.repo import (
|
|
|
BaseRepo,
|
|
@@ -62,7 +61,6 @@ from dulwich.web import (
|
|
|
from dulwich.tests.utils import (
|
|
|
make_object,
|
|
|
make_tag,
|
|
|
- skipIfPY3,
|
|
|
)
|
|
|
|
|
|
|
|
@@ -112,12 +110,11 @@ def _test_backend(objects, refs=None, named_files=None):
|
|
|
if not named_files:
|
|
|
named_files = {}
|
|
|
repo = MemoryRepo.init_bare(objects, refs)
|
|
|
- for path, contents in named_files.iteritems():
|
|
|
+ for path, contents in named_files.items():
|
|
|
repo._put_named_file(path, contents)
|
|
|
return DictBackend({'/': repo})
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class DumbHandlersTestCase(WebTestCase):
|
|
|
|
|
|
def test_send_file_not_found(self):
|
|
@@ -125,16 +122,16 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
self.assertEqual(HTTP_NOT_FOUND, self._status)
|
|
|
|
|
|
def test_send_file(self):
|
|
|
- f = BytesIO('foobar')
|
|
|
- output = ''.join(send_file(self._req, f, 'some/thing'))
|
|
|
- self.assertEqual('foobar', output)
|
|
|
+ f = BytesIO(b'foobar')
|
|
|
+ output = b''.join(send_file(self._req, f, 'some/thing'))
|
|
|
+ self.assertEqual(b'foobar', output)
|
|
|
self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('some/thing')
|
|
|
self.assertTrue(f.closed)
|
|
|
|
|
|
def test_send_file_buffered(self):
|
|
|
bufsize = 10240
|
|
|
- xs = 'x' * bufsize
|
|
|
+ xs = b'x' * bufsize
|
|
|
f = BytesIO(2 * xs)
|
|
|
self.assertEqual([xs, xs],
|
|
|
list(send_file(self._req, f, 'some/thing')))
|
|
@@ -168,19 +165,19 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
|
def test_get_text_file(self):
|
|
|
- backend = _test_backend([], named_files={'description': 'foo'})
|
|
|
+ backend = _test_backend([], named_files={'description': b'foo'})
|
|
|
mat = re.search('.*', 'description')
|
|
|
- output = ''.join(get_text_file(self._req, backend, mat))
|
|
|
- self.assertEqual('foo', output)
|
|
|
+ output = b''.join(get_text_file(self._req, backend, mat))
|
|
|
+ self.assertEqual(b'foo', output)
|
|
|
self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('text/plain')
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
|
def test_get_loose_object(self):
|
|
|
- blob = make_object(Blob, data='foo')
|
|
|
+ blob = make_object(Blob, data=b'foo')
|
|
|
backend = _test_backend([blob])
|
|
|
- mat = re.search('^(..)(.{38})$', blob.id)
|
|
|
- output = ''.join(get_loose_object(self._req, backend, mat))
|
|
|
+ mat = re.search('^(..)(.{38})$', blob.id.decode('ascii'))
|
|
|
+ output = b''.join(get_loose_object(self._req, backend, mat))
|
|
|
self.assertEqual(blob.as_legacy_object(), output)
|
|
|
self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('application/x-git-loose-object')
|
|
@@ -192,9 +189,9 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
self.assertEqual(HTTP_NOT_FOUND, self._status)
|
|
|
|
|
|
def test_get_loose_object_error(self):
|
|
|
- blob = make_object(Blob, data='foo')
|
|
|
+ blob = make_object(Blob, data=b'foo')
|
|
|
backend = _test_backend([blob])
|
|
|
- mat = re.search('^(..)(.{38})$', blob.id)
|
|
|
+ mat = re.search('^(..)(.{38})$', blob.id.decode('ascii'))
|
|
|
|
|
|
def as_legacy_object_error():
|
|
|
raise IOError
|
|
@@ -205,20 +202,20 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
|
|
|
def test_get_pack_file(self):
|
|
|
pack_name = os.path.join('objects', 'pack', 'pack-%s.pack' % ('1' * 40))
|
|
|
- backend = _test_backend([], named_files={pack_name: 'pack contents'})
|
|
|
+ backend = _test_backend([], named_files={pack_name: b'pack contents'})
|
|
|
mat = re.search('.*', pack_name)
|
|
|
- output = ''.join(get_pack_file(self._req, backend, mat))
|
|
|
- self.assertEqual('pack contents', output)
|
|
|
+ output = b''.join(get_pack_file(self._req, backend, mat))
|
|
|
+ self.assertEqual(b'pack contents', output)
|
|
|
self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('application/x-git-packed-objects')
|
|
|
self.assertTrue(self._req.cached)
|
|
|
|
|
|
def test_get_idx_file(self):
|
|
|
idx_name = os.path.join('objects', 'pack', 'pack-%s.idx' % ('1' * 40))
|
|
|
- backend = _test_backend([], named_files={idx_name: 'idx contents'})
|
|
|
+ backend = _test_backend([], named_files={idx_name: b'idx contents'})
|
|
|
mat = re.search('.*', idx_name)
|
|
|
- output = ''.join(get_idx_file(self._req, backend, mat))
|
|
|
- self.assertEqual('idx contents', output)
|
|
|
+ output = b''.join(get_idx_file(self._req, backend, mat))
|
|
|
+ self.assertEqual(b'idx contents', output)
|
|
|
self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('application/x-git-packed-objects-toc')
|
|
|
self.assertTrue(self._req.cached)
|
|
@@ -226,26 +223,26 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
def test_get_info_refs(self):
|
|
|
self._environ['QUERY_STRING'] = ''
|
|
|
|
|
|
- blob1 = make_object(Blob, data='1')
|
|
|
- blob2 = make_object(Blob, data='2')
|
|
|
- blob3 = make_object(Blob, data='3')
|
|
|
+ blob1 = make_object(Blob, data=b'1')
|
|
|
+ blob2 = make_object(Blob, data=b'2')
|
|
|
+ blob3 = make_object(Blob, data=b'3')
|
|
|
|
|
|
- tag1 = make_tag(blob2, name='tag-tag')
|
|
|
+ tag1 = make_tag(blob2, name=b'tag-tag')
|
|
|
|
|
|
objects = [blob1, blob2, blob3, tag1]
|
|
|
refs = {
|
|
|
- 'HEAD': '000',
|
|
|
- 'refs/heads/master': blob1.id,
|
|
|
- 'refs/tags/tag-tag': tag1.id,
|
|
|
- 'refs/tags/blob-tag': blob3.id,
|
|
|
+ b'HEAD': b'000',
|
|
|
+ b'refs/heads/master': blob1.id,
|
|
|
+ b'refs/tags/tag-tag': tag1.id,
|
|
|
+ b'refs/tags/blob-tag': blob3.id,
|
|
|
}
|
|
|
backend = _test_backend(objects, refs=refs)
|
|
|
|
|
|
mat = re.search('.*', '//info/refs')
|
|
|
- self.assertEqual(['%s\trefs/heads/master\n' % blob1.id,
|
|
|
- '%s\trefs/tags/blob-tag\n' % blob3.id,
|
|
|
- '%s\trefs/tags/tag-tag\n' % tag1.id,
|
|
|
- '%s\trefs/tags/tag-tag^{}\n' % blob2.id],
|
|
|
+ self.assertEqual([blob1.id + b'\trefs/heads/master\n',
|
|
|
+ blob3.id + b'\trefs/tags/blob-tag\n',
|
|
|
+ tag1.id + b'\trefs/tags/tag-tag\n',
|
|
|
+ blob2.id + b'\trefs/tags/tag-tag^{}\n'],
|
|
|
list(get_info_refs(self._req, backend, mat)))
|
|
|
self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('text/plain')
|
|
@@ -273,16 +270,15 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
repo = BaseRepo(store, None)
|
|
|
backend = DictBackend({'/': repo})
|
|
|
mat = re.search('.*', '//info/packs')
|
|
|
- output = ''.join(get_info_packs(self._req, backend, mat))
|
|
|
- expected = 'P pack-%s.pack\n' * 3
|
|
|
- expected %= ('1' * 40, '2' * 40, '3' * 40)
|
|
|
+ output = b''.join(get_info_packs(self._req, backend, mat))
|
|
|
+ expected = b''.join(
|
|
|
+ [(b'P pack-' + s + b'.pack\n') for s in [b'1' * 40, b'2' * 40, b'3' * 40]])
|
|
|
self.assertEqual(expected, output)
|
|
|
self.assertEqual(HTTP_OK, self._status)
|
|
|
self.assertContentTypeEquals('text/plain')
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class SmartHandlersTestCase(WebTestCase):
|
|
|
|
|
|
class _TestUploadPackHandler(object):
|
|
@@ -294,7 +290,7 @@ class SmartHandlersTestCase(WebTestCase):
|
|
|
self.advertise_refs = advertise_refs
|
|
|
|
|
|
def handle(self):
|
|
|
- self.proto.write('handled input: %s' % self.proto.recv(1024))
|
|
|
+ self.proto.write(b'handled input: ' + self.proto.recv(1024))
|
|
|
|
|
|
def _make_handler(self, *args, **kwargs):
|
|
|
self._handler = self._TestUploadPackHandler(*args, **kwargs)
|
|
@@ -311,7 +307,7 @@ class SmartHandlersTestCase(WebTestCase):
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
|
def _run_handle_service_request(self, content_length=None):
|
|
|
- self._environ['wsgi.input'] = BytesIO('foo')
|
|
|
+ self._environ['wsgi.input'] = BytesIO(b'foo')
|
|
|
if content_length is not None:
|
|
|
self._environ['CONTENT_LENGTH'] = content_length
|
|
|
mat = re.search('.*', '/git-upload-pack')
|
|
@@ -320,7 +316,7 @@ class SmartHandlersTestCase(WebTestCase):
|
|
|
write_output = self._output.getvalue()
|
|
|
# Ensure all output was written via the write callback.
|
|
|
self.assertEqual('', handler_output)
|
|
|
- self.assertEqual('handled input: foo', write_output)
|
|
|
+ self.assertEqual(b'handled input: foo', write_output)
|
|
|
self.assertContentTypeEquals('application/x-git-upload-pack-result')
|
|
|
self.assertFalse(self._handler.advertise_refs)
|
|
|
self.assertTrue(self._handler.http_req)
|
|
@@ -337,45 +333,44 @@ class SmartHandlersTestCase(WebTestCase):
|
|
|
|
|
|
def test_get_info_refs_unknown(self):
|
|
|
self._environ['QUERY_STRING'] = 'service=git-evil-handler'
|
|
|
- content = list(get_info_refs(self._req, 'backend', None))
|
|
|
+ content = list(get_info_refs(self._req, b'backend', None))
|
|
|
self.assertFalse('git-evil-handler' in "".join(content))
|
|
|
self.assertEqual(HTTP_FORBIDDEN, self._status)
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
|
def test_get_info_refs(self):
|
|
|
- self._environ['wsgi.input'] = BytesIO('foo')
|
|
|
+ self._environ['wsgi.input'] = BytesIO(b'foo')
|
|
|
self._environ['QUERY_STRING'] = 'service=git-upload-pack'
|
|
|
|
|
|
mat = re.search('.*', '/git-upload-pack')
|
|
|
- handler_output = ''.join(get_info_refs(self._req, 'backend', mat))
|
|
|
+ handler_output = b''.join(get_info_refs(self._req, b'backend', mat))
|
|
|
write_output = self._output.getvalue()
|
|
|
- self.assertEqual(('001e# service=git-upload-pack\n'
|
|
|
- '0000'
|
|
|
+ self.assertEqual((b'001e# service=git-upload-pack\n'
|
|
|
+ b'0000'
|
|
|
# input is ignored by the handler
|
|
|
- 'handled input: '), write_output)
|
|
|
+ b'handled input: '), write_output)
|
|
|
# Ensure all output was written via the write callback.
|
|
|
- self.assertEqual('', handler_output)
|
|
|
+ self.assertEqual(b'', handler_output)
|
|
|
self.assertTrue(self._handler.advertise_refs)
|
|
|
self.assertTrue(self._handler.http_req)
|
|
|
self.assertFalse(self._req.cached)
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class LengthLimitedFileTestCase(TestCase):
|
|
|
def test_no_cutoff(self):
|
|
|
- f = _LengthLimitedFile(BytesIO('foobar'), 1024)
|
|
|
- self.assertEqual('foobar', f.read())
|
|
|
+ f = _LengthLimitedFile(BytesIO(b'foobar'), 1024)
|
|
|
+ self.assertEqual(b'foobar', f.read())
|
|
|
|
|
|
def test_cutoff(self):
|
|
|
- f = _LengthLimitedFile(BytesIO('foobar'), 3)
|
|
|
- self.assertEqual('foo', f.read())
|
|
|
- self.assertEqual('', f.read())
|
|
|
+ f = _LengthLimitedFile(BytesIO(b'foobar'), 3)
|
|
|
+ self.assertEqual(b'foo', f.read())
|
|
|
+ self.assertEqual(b'', f.read())
|
|
|
|
|
|
def test_multiple_reads(self):
|
|
|
- f = _LengthLimitedFile(BytesIO('foobar'), 3)
|
|
|
- self.assertEqual('fo', f.read(2))
|
|
|
- self.assertEqual('o', f.read(2))
|
|
|
- self.assertEqual('', f.read())
|
|
|
+ f = _LengthLimitedFile(BytesIO(b'foobar'), 3)
|
|
|
+ self.assertEqual(b'fo', f.read(2))
|
|
|
+ self.assertEqual(b'o', f.read(2))
|
|
|
+ self.assertEqual(b'', f.read())
|
|
|
|
|
|
|
|
|
class HTTPGitRequestTestCase(WebTestCase):
|
|
@@ -419,7 +414,6 @@ class HTTPGitRequestTestCase(WebTestCase):
|
|
|
self.assertEqual(402, self._status)
|
|
|
|
|
|
|
|
|
-@skipIfPY3
|
|
|
class HTTPGitApplicationTestCase(TestCase):
|
|
|
|
|
|
def setUp(self):
|
|
@@ -460,7 +454,7 @@ class GunzipTestCase(HTTPGitApplicationTestCase):
|
|
|
__doc__ = """TestCase for testing the GunzipFilter, ensuring the wsgi.input
|
|
|
is correctly decompressed and headers are corrected.
|
|
|
"""
|
|
|
- example_text = __doc__
|
|
|
+ example_text = __doc__.encode('ascii')
|
|
|
|
|
|
def setUp(self):
|
|
|
super(GunzipTestCase, self).setUp()
|