|
@@ -27,7 +27,7 @@ MEDIA_ROOT = sys_tempfile.mkdtemp()
|
|
|
UPLOAD_TO = os.path.join(MEDIA_ROOT, 'test_upload')
|
|
|
|
|
|
|
|
|
-@override_settings(MEDIA_ROOT=MEDIA_ROOT, ROOT_URLCONF='file_uploads.urls')
|
|
|
+@override_settings(MEDIA_ROOT=MEDIA_ROOT, ROOT_URLCONF='file_uploads.urls', MIDDLEWARE_CLASSES=())
|
|
|
class FileUploadTests(TestCase):
|
|
|
|
|
|
@classmethod
|
|
@@ -51,30 +51,30 @@ class FileUploadTests(TestCase):
|
|
|
def test_large_upload(self):
|
|
|
tdir = tempfile.gettempdir()
|
|
|
|
|
|
- file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir)
|
|
|
- file1.write(b'a' * (2 ** 21))
|
|
|
- file1.seek(0)
|
|
|
+ file = tempfile.NamedTemporaryFile
|
|
|
+ with file(suffix=".file1", dir=tdir) as file1, file(suffix=".file2", dir=tdir) as file2:
|
|
|
+ file1.write(b'a' * (2 ** 21))
|
|
|
+ file1.seek(0)
|
|
|
|
|
|
- file2 = tempfile.NamedTemporaryFile(suffix=".file2", dir=tdir)
|
|
|
- file2.write(b'a' * (10 * 2 ** 20))
|
|
|
- file2.seek(0)
|
|
|
+ file2.write(b'a' * (10 * 2 ** 20))
|
|
|
+ file2.seek(0)
|
|
|
|
|
|
- post_data = {
|
|
|
- 'name': 'Ringo',
|
|
|
- 'file_field1': file1,
|
|
|
- 'file_field2': file2,
|
|
|
- }
|
|
|
+ post_data = {
|
|
|
+ 'name': 'Ringo',
|
|
|
+ 'file_field1': file1,
|
|
|
+ 'file_field2': file2,
|
|
|
+ }
|
|
|
|
|
|
- for key in list(post_data):
|
|
|
- try:
|
|
|
- post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest()
|
|
|
- post_data[key].seek(0)
|
|
|
- except AttributeError:
|
|
|
- post_data[key + '_hash'] = hashlib.sha1(force_bytes(post_data[key])).hexdigest()
|
|
|
+ for key in list(post_data):
|
|
|
+ try:
|
|
|
+ post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest()
|
|
|
+ post_data[key].seek(0)
|
|
|
+ except AttributeError:
|
|
|
+ post_data[key + '_hash'] = hashlib.sha1(force_bytes(post_data[key])).hexdigest()
|
|
|
|
|
|
- response = self.client.post('/verify/', post_data)
|
|
|
+ response = self.client.post('/verify/', post_data)
|
|
|
|
|
|
- self.assertEqual(response.status_code, 200)
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
|
|
|
def _test_base64_upload(self, content):
|
|
|
payload = client.FakePayload("\r\n".join([
|
|
@@ -196,7 +196,9 @@ class FileUploadTests(TestCase):
|
|
|
'REQUEST_METHOD': 'POST',
|
|
|
'wsgi.input': payload,
|
|
|
}
|
|
|
- result = json.loads(self.client.request(**r).content.decode('utf-8'))
|
|
|
+ response = self.client.request(**r)
|
|
|
+
|
|
|
+ result = json.loads(response.content.decode('utf-8'))
|
|
|
for name, _, expected in cases:
|
|
|
got = result[name]
|
|
|
self.assertEqual(expected, got, 'Mismatch for {0}'.format(name))
|
|
@@ -207,22 +209,22 @@ class FileUploadTests(TestCase):
|
|
|
"""Uploaded files may have content type parameters available."""
|
|
|
tdir = tempfile.gettempdir()
|
|
|
|
|
|
- no_content_type = tempfile.NamedTemporaryFile(suffix=".ctype_extra", dir=tdir)
|
|
|
- no_content_type.write(b'something')
|
|
|
- no_content_type.seek(0)
|
|
|
+ file = tempfile.NamedTemporaryFile
|
|
|
+ with file(suffix=".ctype_extra", dir=tdir) as no_content_type, file(suffix=".ctype_extra", dir=tdir) as simple_file:
|
|
|
+ no_content_type.write(b'something')
|
|
|
+ no_content_type.seek(0)
|
|
|
|
|
|
- simple_file = tempfile.NamedTemporaryFile(suffix=".ctype_extra", dir=tdir)
|
|
|
- simple_file.write(b'something')
|
|
|
- simple_file.seek(0)
|
|
|
- simple_file.content_type = 'text/plain; test-key=test_value'
|
|
|
+ simple_file.write(b'something')
|
|
|
+ simple_file.seek(0)
|
|
|
+ simple_file.content_type = 'text/plain; test-key=test_value'
|
|
|
|
|
|
- response = self.client.post('/echo_content_type_extra/', {
|
|
|
- 'no_content_type': no_content_type,
|
|
|
- 'simple_file': simple_file,
|
|
|
- })
|
|
|
- received = json.loads(response.content.decode('utf-8'))
|
|
|
- self.assertEqual(received['no_content_type'], {})
|
|
|
- self.assertEqual(received['simple_file'], {'test-key': 'test_value'})
|
|
|
+ response = self.client.post('/echo_content_type_extra/', {
|
|
|
+ 'no_content_type': no_content_type,
|
|
|
+ 'simple_file': simple_file,
|
|
|
+ })
|
|
|
+ received = json.loads(response.content.decode('utf-8'))
|
|
|
+ self.assertEqual(received['no_content_type'], {})
|
|
|
+ self.assertEqual(received['simple_file'], {'test-key': 'test_value'})
|
|
|
|
|
|
def test_truncated_multipart_handled_gracefully(self):
|
|
|
"""
|
|
@@ -266,65 +268,66 @@ class FileUploadTests(TestCase):
|
|
|
self.assertEqual(got, {})
|
|
|
|
|
|
def test_custom_upload_handler(self):
|
|
|
- # A small file (under the 5M quota)
|
|
|
- smallfile = tempfile.NamedTemporaryFile()
|
|
|
- smallfile.write(b'a' * (2 ** 21))
|
|
|
- smallfile.seek(0)
|
|
|
-
|
|
|
- # A big file (over the quota)
|
|
|
- bigfile = tempfile.NamedTemporaryFile()
|
|
|
- bigfile.write(b'a' * (10 * 2 ** 20))
|
|
|
- bigfile.seek(0)
|
|
|
-
|
|
|
- # Small file posting should work.
|
|
|
- response = self.client.post('/quota/', {'f': smallfile})
|
|
|
- got = json.loads(response.content.decode('utf-8'))
|
|
|
- self.assertTrue('f' in got)
|
|
|
-
|
|
|
- # Large files don't go through.
|
|
|
- response = self.client.post("/quota/", {'f': bigfile})
|
|
|
- got = json.loads(response.content.decode('utf-8'))
|
|
|
- self.assertTrue('f' not in got)
|
|
|
+ file = tempfile.NamedTemporaryFile
|
|
|
+ with file() as smallfile, file() as bigfile:
|
|
|
+ # A small file (under the 5M quota)
|
|
|
+ smallfile = tempfile.NamedTemporaryFile()
|
|
|
+ smallfile.write(b'a' * (2 ** 21))
|
|
|
+ smallfile.seek(0)
|
|
|
+
|
|
|
+ # A big file (over the quota)
|
|
|
+ bigfile = tempfile.NamedTemporaryFile()
|
|
|
+ bigfile.write(b'a' * (10 * 2 ** 20))
|
|
|
+ bigfile.seek(0)
|
|
|
+
|
|
|
+ # Small file posting should work.
|
|
|
+ response = self.client.post('/quota/', {'f': smallfile})
|
|
|
+ got = json.loads(response.content.decode('utf-8'))
|
|
|
+ self.assertTrue('f' in got)
|
|
|
+
|
|
|
+ # Large files don't go through.
|
|
|
+ response = self.client.post("/quota/", {'f': bigfile})
|
|
|
+ got = json.loads(response.content.decode('utf-8'))
|
|
|
+ self.assertTrue('f' not in got)
|
|
|
|
|
|
def test_broken_custom_upload_handler(self):
|
|
|
- f = tempfile.NamedTemporaryFile()
|
|
|
- f.write(b'a' * (2 ** 21))
|
|
|
- f.seek(0)
|
|
|
-
|
|
|
- # AttributeError: You cannot alter upload handlers after the upload has been processed.
|
|
|
- self.assertRaises(
|
|
|
- AttributeError,
|
|
|
- self.client.post,
|
|
|
- '/quota/broken/',
|
|
|
- {'f': f}
|
|
|
- )
|
|
|
+ with tempfile.NamedTemporaryFile() as file:
|
|
|
+ file.write(b'a' * (2 ** 21))
|
|
|
+ file.seek(0)
|
|
|
+
|
|
|
+ # AttributeError: You cannot alter upload handlers after the upload has been processed.
|
|
|
+ self.assertRaises(
|
|
|
+ AttributeError,
|
|
|
+ self.client.post,
|
|
|
+ '/quota/broken/',
|
|
|
+ {'f': file}
|
|
|
+ )
|
|
|
|
|
|
def test_fileupload_getlist(self):
|
|
|
- file1 = tempfile.NamedTemporaryFile()
|
|
|
- file1.write(b'a' * (2 ** 23))
|
|
|
- file1.seek(0)
|
|
|
-
|
|
|
- file2 = tempfile.NamedTemporaryFile()
|
|
|
- file2.write(b'a' * (2 * 2 ** 18))
|
|
|
- file2.seek(0)
|
|
|
-
|
|
|
- file2a = tempfile.NamedTemporaryFile()
|
|
|
- file2a.write(b'a' * (5 * 2 ** 20))
|
|
|
- file2a.seek(0)
|
|
|
-
|
|
|
- response = self.client.post('/getlist_count/', {
|
|
|
- 'file1': file1,
|
|
|
- 'field1': 'test',
|
|
|
- 'field2': 'test3',
|
|
|
- 'field3': 'test5',
|
|
|
- 'field4': 'test6',
|
|
|
- 'field5': 'test7',
|
|
|
- 'file2': (file2, file2a)
|
|
|
- })
|
|
|
- got = json.loads(response.content.decode('utf-8'))
|
|
|
-
|
|
|
- self.assertEqual(got.get('file1'), 1)
|
|
|
- self.assertEqual(got.get('file2'), 2)
|
|
|
+ file = tempfile.NamedTemporaryFile
|
|
|
+ with file() as file1, file() as file2, file() as file2a:
|
|
|
+ file1.write(b'a' * (2 ** 23))
|
|
|
+ file1.seek(0)
|
|
|
+
|
|
|
+ file2.write(b'a' * (2 * 2 ** 18))
|
|
|
+ file2.seek(0)
|
|
|
+
|
|
|
+ file2a.write(b'a' * (5 * 2 ** 20))
|
|
|
+ file2a.seek(0)
|
|
|
+
|
|
|
+ response = self.client.post('/getlist_count/', {
|
|
|
+ 'file1': file1,
|
|
|
+ 'field1': 'test',
|
|
|
+ 'field2': 'test3',
|
|
|
+ 'field3': 'test5',
|
|
|
+ 'field4': 'test6',
|
|
|
+ 'field5': 'test7',
|
|
|
+ 'file2': (file2, file2a)
|
|
|
+ })
|
|
|
+ got = json.loads(response.content.decode('utf-8'))
|
|
|
+
|
|
|
+ self.assertEqual(got.get('file1'), 1)
|
|
|
+ self.assertEqual(got.get('file2'), 2)
|
|
|
|
|
|
def test_file_error_blocking(self):
|
|
|
"""
|
|
@@ -434,7 +437,8 @@ class DirectoryCreationTests(TestCase):
|
|
|
open(UPLOAD_TO, 'wb').close()
|
|
|
self.addCleanup(os.remove, UPLOAD_TO)
|
|
|
with self.assertRaises(IOError) as exc_info:
|
|
|
- self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', b'x'))
|
|
|
+ with SimpleUploadedFile('foo.txt', b'x') as file:
|
|
|
+ self.obj.testfile.save('foo.txt', file)
|
|
|
# The test needs to be done on a specific string as IOError
|
|
|
# is raised even without the patch (just not early enough)
|
|
|
self.assertEqual(exc_info.exception.args[0],
|