|
@@ -29,6 +29,7 @@ from dulwich.objects import (
|
|
|
Tag,
|
|
|
)
|
|
|
from dulwich.repo import (
|
|
|
+ BaseRepo,
|
|
|
DictRefsContainer,
|
|
|
MemoryRepo,
|
|
|
)
|
|
@@ -44,7 +45,12 @@ from dulwich.web import (
|
|
|
HTTP_FORBIDDEN,
|
|
|
HTTP_ERROR,
|
|
|
send_file,
|
|
|
+ get_text_file,
|
|
|
+ get_loose_object,
|
|
|
+ get_pack_file,
|
|
|
+ get_idx_file,
|
|
|
get_info_refs,
|
|
|
+ get_info_packs,
|
|
|
handle_service_request,
|
|
|
_LengthLimitedFile,
|
|
|
HTTPGitRequest,
|
|
@@ -78,6 +84,17 @@ class WebTestCase(TestCase):
|
|
|
self.assertTrue(('Content-Type', expected) in self._headers)
|
|
|
|
|
|
|
|
|
+def _test_backend(objects, refs=None, named_files=None):
|
|
|
+ if not refs:
|
|
|
+ refs = {}
|
|
|
+ if not named_files:
|
|
|
+ named_files = {}
|
|
|
+ repo = MemoryRepo.init_bare(objects, refs)
|
|
|
+ for path, contents in named_files.iteritems():
|
|
|
+ repo._put_named_file(path, contents)
|
|
|
+ return DictBackend({'/': repo})
|
|
|
+
|
|
|
+
|
|
|
class DumbHandlersTestCase(WebTestCase):
|
|
|
|
|
|
def test_send_file_not_found(self):
|
|
@@ -119,6 +136,64 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
self.assertEquals(HTTP_ERROR, self._status)
|
|
|
self.assertTrue(f.closed)
|
|
|
|
|
|
+ # non-IOErrors are reraised
|
|
|
+ f = TestFile(AttributeError)
|
|
|
+ self.assertRaises(AttributeError, list,
|
|
|
+ send_file(self._req, f, 'some/thing'))
|
|
|
+ self.assertTrue(f.closed)
|
|
|
+
|
|
|
+ def test_get_text_file(self):
|
|
|
+ backend = _test_backend([], named_files={'description': 'foo'})
|
|
|
+ mat = re.search('.*', 'description')
|
|
|
+ output = ''.join(get_text_file(self._req, backend, mat))
|
|
|
+ self.assertEquals('foo', output)
|
|
|
+ self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertContentTypeEquals('text/plain')
|
|
|
+
|
|
|
+ def test_get_loose_object(self):
|
|
|
+ blob = make_object(Blob, data='foo')
|
|
|
+ backend = _test_backend([blob])
|
|
|
+ mat = re.search('^(..)(.{38})$', blob.id)
|
|
|
+ output = ''.join(get_loose_object(self._req, backend, mat))
|
|
|
+ self.assertEquals(blob.as_legacy_object(), output)
|
|
|
+ self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertContentTypeEquals('application/x-git-loose-object')
|
|
|
+
|
|
|
+ def test_get_loose_object_missing(self):
|
|
|
+ mat = re.search('^(..)(.{38})$', '1' * 40)
|
|
|
+ list(get_loose_object(self._req, _test_backend([]), mat))
|
|
|
+ self.assertEquals(HTTP_NOT_FOUND, self._status)
|
|
|
+
|
|
|
+ def test_get_loose_object_error(self):
|
|
|
+ blob = make_object(Blob, data='foo')
|
|
|
+ backend = _test_backend([blob])
|
|
|
+ mat = re.search('^(..)(.{38})$', blob.id)
|
|
|
+
|
|
|
+ def as_legacy_object_error():
|
|
|
+ raise IOError
|
|
|
+
|
|
|
+ blob.as_legacy_object = as_legacy_object_error
|
|
|
+ list(get_loose_object(self._req, backend, mat))
|
|
|
+ self.assertEquals(HTTP_ERROR, self._status)
|
|
|
+
|
|
|
+ def test_get_pack_file(self):
|
|
|
+ pack_name = 'objects/pack/pack-%s.pack' % ('1' * 40)
|
|
|
+ backend = _test_backend([], named_files={pack_name: 'pack contents'})
|
|
|
+ mat = re.search('.*', pack_name)
|
|
|
+ output = ''.join(get_pack_file(self._req, backend, mat))
|
|
|
+ self.assertEquals('pack contents', output)
|
|
|
+ self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertContentTypeEquals('application/x-git-packed-objects')
|
|
|
+
|
|
|
+ def test_get_idx_file(self):
|
|
|
+ idx_name = 'objects/pack/pack-%s.idx' % ('1' * 40)
|
|
|
+ backend = _test_backend([], named_files={idx_name: 'idx contents'})
|
|
|
+ mat = re.search('.*', idx_name)
|
|
|
+ output = ''.join(get_idx_file(self._req, backend, mat))
|
|
|
+ self.assertEquals('idx contents', output)
|
|
|
+ self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertContentTypeEquals('application/x-git-packed-objects-toc')
|
|
|
+
|
|
|
def test_get_info_refs(self):
|
|
|
self._environ['QUERY_STRING'] = ''
|
|
|
|
|
@@ -140,7 +215,7 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
'refs/tags/tag-tag': tag1.id,
|
|
|
'refs/tags/blob-tag': blob3.id,
|
|
|
}
|
|
|
- backend = DictBackend({'/': MemoryRepo.init_bare(objects, refs)})
|
|
|
+ backend = _test_backend(objects, refs=refs)
|
|
|
|
|
|
mat = re.search('.*', '//info/refs')
|
|
|
self.assertEquals(['%s\trefs/heads/master\n' % blob1.id,
|
|
@@ -148,6 +223,35 @@ class DumbHandlersTestCase(WebTestCase):
|
|
|
'%s\trefs/tags/tag-tag\n' % tag1.id,
|
|
|
'%s\trefs/tags/tag-tag^{}\n' % blob2.id],
|
|
|
list(get_info_refs(self._req, backend, mat)))
|
|
|
+ self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertContentTypeEquals('text/plain')
|
|
|
+
|
|
|
+ def test_get_info_packs(self):
|
|
|
+ class TestPack(object):
|
|
|
+ def __init__(self, sha):
|
|
|
+ self._sha = sha
|
|
|
+
|
|
|
+ def name(self):
|
|
|
+ return self._sha
|
|
|
+
|
|
|
+ packs = [TestPack(str(i) * 40) for i in xrange(1, 4)]
|
|
|
+
|
|
|
+ class TestObjectStore(MemoryObjectStore):
|
|
|
+ # property must be overridden, can't be assigned
|
|
|
+ @property
|
|
|
+ def packs(self):
|
|
|
+ return packs
|
|
|
+
|
|
|
+ store = TestObjectStore()
|
|
|
+ repo = BaseRepo(store, None)
|
|
|
+ backend = DictBackend({'/': repo})
|
|
|
+ mat = re.search('.*', '//info/packs')
|
|
|
+ output = ''.join(get_info_packs(self._req, backend, mat))
|
|
|
+ expected = 'P pack-%s.pack\n' * 3
|
|
|
+ expected %= ('1' * 40, '2' * 40, '3' * 40)
|
|
|
+ self.assertEquals(expected, output)
|
|
|
+ self.assertEquals(HTTP_OK, self._status)
|
|
|
+ self.assertContentTypeEquals('text/plain')
|
|
|
|
|
|
|
|
|
class SmartHandlersTestCase(WebTestCase):
|