123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- #! -*- coding: utf-8 -*-
- from __future__ import absolute_import
- import base64
- import errno
- import hashlib
- import json
- import os
- import shutil
- from StringIO import StringIO
- from django.core.files import temp as tempfile
- from django.core.files.uploadedfile import SimpleUploadedFile
- from django.http.multipartparser import MultiPartParser
- from django.test import TestCase, client
- from django.utils import unittest
- from . import uploadhandler
- from .models import FileModel, temp_storage, UPLOAD_TO
- UNICODE_FILENAME = u'test-0123456789_中文_Orléans.jpg'
- class FileUploadTests(TestCase):
- def test_simple_upload(self):
- with open(__file__) as fp:
- post_data = {
- 'name': 'Ringo',
- 'file_field': fp,
- }
- response = self.client.post('/file_uploads/upload/', post_data)
- self.assertEqual(response.status_code, 200)
- def test_large_upload(self):
- tdir = tempfile.gettempdir()
- file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir)
- file1.write('a' * (2 ** 21))
- file1.seek(0)
- file2 = tempfile.NamedTemporaryFile(suffix=".file2", dir=tdir)
- file2.write('a' * (10 * 2 ** 20))
- file2.seek(0)
- post_data = {
- 'name': 'Ringo',
- 'file_field1': file1,
- 'file_field2': file2,
- }
- for key in post_data.keys():
- try:
- post_data[key + '_hash'] = hashlib.sha1(post_data[key].read()).hexdigest()
- post_data[key].seek(0)
- except AttributeError:
- post_data[key + '_hash'] = hashlib.sha1(post_data[key]).hexdigest()
- response = self.client.post('/file_uploads/verify/', post_data)
- self.assertEqual(response.status_code, 200)
- def test_base64_upload(self):
- test_string = "This data will be transmitted base64-encoded."
- payload = "\r\n".join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file"; filename="test.txt"',
- 'Content-Type: application/octet-stream',
- 'Content-Transfer-Encoding: base64',
- '',
- base64.b64encode(test_string),
- '--' + client.BOUNDARY + '--',
- '',
- ])
- r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': "/file_uploads/echo_content/",
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': client.FakePayload(payload),
- }
- response = self.client.request(**r)
- received = json.loads(response.content)
- self.assertEqual(received['file'], test_string)
- def test_unicode_file_name(self):
- tdir = tempfile.gettempdir()
- # This file contains chinese symbols and an accented char in the name.
- with open(os.path.join(tdir, UNICODE_FILENAME.encode('utf-8')), 'w+b') as file1:
- file1.write('b' * (2 ** 10))
- file1.seek(0)
- post_data = {
- 'file_unicode': file1,
- }
- response = self.client.post('/file_uploads/unicode_name/', post_data)
- try:
- os.unlink(file1.name)
- except:
- pass
- self.assertEqual(response.status_code, 200)
- def test_dangerous_file_names(self):
- """Uploaded file names should be sanitized before ever reaching the view."""
- # This test simulates possible directory traversal attacks by a
- # malicious uploader We have to do some monkeybusiness here to construct
- # a malicious payload with an invalid file name (containing os.sep or
- # os.pardir). This similar to what an attacker would need to do when
- # trying such an attack.
- scary_file_names = [
- "/tmp/hax0rd.txt", # Absolute path, *nix-style.
- "C:\\Windows\\hax0rd.txt", # Absolute path, win-syle.
- "C:/Windows/hax0rd.txt", # Absolute path, broken-style.
- "\\tmp\\hax0rd.txt", # Absolute path, broken in a different way.
- "/tmp\\hax0rd.txt", # Absolute path, broken by mixing.
- "subdir/hax0rd.txt", # Descendant path, *nix-style.
- "subdir\\hax0rd.txt", # Descendant path, win-style.
- "sub/dir\\hax0rd.txt", # Descendant path, mixed.
- "../../hax0rd.txt", # Relative path, *nix-style.
- "..\\..\\hax0rd.txt", # Relative path, win-style.
- "../..\\hax0rd.txt" # Relative path, mixed.
- ]
- payload = []
- for i, name in enumerate(scary_file_names):
- payload.extend([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
- 'Content-Type: application/octet-stream',
- '',
- 'You got pwnd.'
- ])
- payload.extend([
- '--' + client.BOUNDARY + '--',
- '',
- ])
- payload = "\r\n".join(payload)
- r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': "/file_uploads/echo/",
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': client.FakePayload(payload),
- }
- response = self.client.request(**r)
- # The filenames should have been sanitized by the time it got to the view.
- recieved = json.loads(response.content)
- for i, name in enumerate(scary_file_names):
- got = recieved["file%s" % i]
- self.assertEqual(got, "hax0rd.txt")
- def test_filename_overflow(self):
- """File names over 256 characters (dangerous on some platforms) get fixed up."""
- name = "%s.txt" % ("f"*500)
- payload = "\r\n".join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file"; filename="%s"' % name,
- 'Content-Type: application/octet-stream',
- '',
- 'Oops.'
- '--' + client.BOUNDARY + '--',
- '',
- ])
- r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': "/file_uploads/echo/",
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': client.FakePayload(payload),
- }
- got = json.loads(self.client.request(**r).content)
- self.assertTrue(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
- def test_truncated_multipart_handled_gracefully(self):
- """
- If passed an incomplete multipart message, MultiPartParser does not
- attempt to read beyond the end of the stream, and simply will handle
- the part that can be parsed gracefully.
- """
- payload = "\r\n".join([
- '--' + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file"; filename="foo.txt"',
- 'Content-Type: application/octet-stream',
- '',
- 'file contents'
- '--' + client.BOUNDARY + '--',
- '',
- ])
- payload = payload[:-10]
- r = {
- 'CONTENT_LENGTH': len(payload),
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': '/file_uploads/echo/',
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': client.FakePayload(payload),
- }
- got = json.loads(self.client.request(**r).content)
- self.assertEqual(got, {})
- def test_empty_multipart_handled_gracefully(self):
- """
- If passed an empty multipart message, MultiPartParser will return
- an empty QueryDict.
- """
- r = {
- 'CONTENT_LENGTH': 0,
- 'CONTENT_TYPE': client.MULTIPART_CONTENT,
- 'PATH_INFO': '/file_uploads/echo/',
- 'REQUEST_METHOD': 'POST',
- 'wsgi.input': client.FakePayload(''),
- }
- got = json.loads(self.client.request(**r).content)
- self.assertEqual(got, {})
- def test_custom_upload_handler(self):
- # A small file (under the 5M quota)
- smallfile = tempfile.NamedTemporaryFile()
- smallfile.write('a' * (2 ** 21))
- smallfile.seek(0)
- # A big file (over the quota)
- bigfile = tempfile.NamedTemporaryFile()
- bigfile.write('a' * (10 * 2 ** 20))
- bigfile.seek(0)
- # Small file posting should work.
- response = self.client.post('/file_uploads/quota/', {'f': smallfile})
- got = json.loads(response.content)
- self.assertTrue('f' in got)
- # Large files don't go through.
- response = self.client.post("/file_uploads/quota/", {'f': bigfile})
- got = json.loads(response.content)
- self.assertTrue('f' not in got)
- def test_broken_custom_upload_handler(self):
- f = tempfile.NamedTemporaryFile()
- f.write('a' * (2 ** 21))
- f.seek(0)
- # AttributeError: You cannot alter upload handlers after the upload has been processed.
- self.assertRaises(
- AttributeError,
- self.client.post,
- '/file_uploads/quota/broken/',
- {'f': f}
- )
- def test_fileupload_getlist(self):
- file1 = tempfile.NamedTemporaryFile()
- file1.write('a' * (2 ** 23))
- file1.seek(0)
- file2 = tempfile.NamedTemporaryFile()
- file2.write('a' * (2 * 2 ** 18))
- file2.seek(0)
- file2a = tempfile.NamedTemporaryFile()
- file2a.write('a' * (5 * 2 ** 20))
- file2a.seek(0)
- response = self.client.post('/file_uploads/getlist_count/', {
- 'file1': file1,
- 'field1': u'test',
- 'field2': u'test3',
- 'field3': u'test5',
- 'field4': u'test6',
- 'field5': u'test7',
- 'file2': (file2, file2a)
- })
- got = json.loads(response.content)
- self.assertEqual(got.get('file1'), 1)
- self.assertEqual(got.get('file2'), 2)
- def test_file_error_blocking(self):
- """
- The server should not block when there are upload errors (bug #8622).
- This can happen if something -- i.e. an exception handler -- tries to
- access POST while handling an error in parsing POST. This shouldn't
- cause an infinite loop!
- """
- class POSTAccessingHandler(client.ClientHandler):
- """A handler that'll access POST during an exception."""
- def handle_uncaught_exception(self, request, resolver, exc_info):
- ret = super(POSTAccessingHandler, self).handle_uncaught_exception(request, resolver, exc_info)
- p = request.POST
- return ret
- # Maybe this is a little more complicated that it needs to be; but if
- # the django.test.client.FakePayload.read() implementation changes then
- # this test would fail. So we need to know exactly what kind of error
- # it raises when there is an attempt to read more than the available bytes:
- try:
- client.FakePayload('a').read(2)
- except Exception as reference_error:
- pass
- # install the custom handler that tries to access request.POST
- self.client.handler = POSTAccessingHandler()
- with open(__file__) as fp:
- post_data = {
- 'name': 'Ringo',
- 'file_field': fp,
- }
- try:
- response = self.client.post('/file_uploads/upload_errors/', post_data)
- except reference_error.__class__ as err:
- self.assertFalse(
- str(err) == str(reference_error),
- "Caught a repeated exception that'll cause an infinite loop in file uploads."
- )
- except Exception as err:
- # CustomUploadError is the error that should have been raised
- self.assertEqual(err.__class__, uploadhandler.CustomUploadError)
- def test_filename_case_preservation(self):
- """
- The storage backend shouldn't mess with the case of the filenames
- uploaded.
- """
- # Synthesize the contents of a file upload with a mixed case filename
- # so we don't have to carry such a file in the Django tests source code
- # tree.
- vars = {'boundary': 'oUrBoUnDaRyStRiNg'}
- post_data = [
- '--%(boundary)s',
- 'Content-Disposition: form-data; name="file_field"; '
- 'filename="MiXeD_cAsE.txt"',
- 'Content-Type: application/octet-stream',
- '',
- 'file contents\n'
- '',
- '--%(boundary)s--\r\n',
- ]
- response = self.client.post(
- '/file_uploads/filename_case/',
- '\r\n'.join(post_data) % vars,
- 'multipart/form-data; boundary=%(boundary)s' % vars
- )
- self.assertEqual(response.status_code, 200)
- id = int(response.content)
- obj = FileModel.objects.get(pk=id)
- # The name of the file uploaded and the file stored in the server-side
- # shouldn't differ.
- self.assertEqual(os.path.basename(obj.testfile.path), 'MiXeD_cAsE.txt')
- class DirectoryCreationTests(unittest.TestCase):
- """
- Tests for error handling during directory creation
- via _save_FIELD_file (ticket #6450)
- """
- def setUp(self):
- self.obj = FileModel()
- if not os.path.isdir(temp_storage.location):
- os.makedirs(temp_storage.location)
- if os.path.isdir(UPLOAD_TO):
- os.chmod(UPLOAD_TO, 0700)
- shutil.rmtree(UPLOAD_TO)
- def tearDown(self):
- os.chmod(temp_storage.location, 0700)
- shutil.rmtree(temp_storage.location)
- def test_readonly_root(self):
- """Permission errors are not swallowed"""
- os.chmod(temp_storage.location, 0500)
- try:
- self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', 'x'))
- except OSError as err:
- self.assertEqual(err.errno, errno.EACCES)
- except Exception:
- self.fail("OSError [Errno %s] not raised." % errno.EACCES)
- def test_not_a_directory(self):
- """The correct IOError is raised when the upload directory name exists but isn't a directory"""
- # Create a file with the upload directory name
- open(UPLOAD_TO, 'w').close()
- try:
- self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', 'x'))
- except IOError as err:
- # The test needs to be done on a specific string as IOError
- # is raised even without the patch (just not early enough)
- self.assertEqual(err.args[0],
- "%s exists and is not a directory." % UPLOAD_TO)
- except:
- self.fail("IOError not raised")
- class MultiParserTests(unittest.TestCase):
- def test_empty_upload_handlers(self):
- # We're not actually parsing here; just checking if the parser properly
- # instantiates with empty upload handlers.
- parser = MultiPartParser({
- 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
- 'CONTENT_LENGTH': '1'
- }, StringIO('x'), [], 'utf-8')
|