123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917 |
- import base64
- import hashlib
- import os
- import shutil
- import sys
- import tempfile as sys_tempfile
- import unittest
- from io import BytesIO, StringIO
- from unittest import mock
- from urllib.parse import quote
- from django.conf import DEFAULT_STORAGE_ALIAS
- from django.core.exceptions import SuspiciousFileOperation
- from django.core.files import temp as tempfile
- from django.core.files.storage import default_storage
- from django.core.files.uploadedfile import SimpleUploadedFile, UploadedFile
- from django.http.multipartparser import (
- FILE,
- MultiPartParser,
- MultiPartParserError,
- Parser,
- )
- from django.test import SimpleTestCase, TestCase, client, override_settings
- from . import uploadhandler
- from .models import FileModel
- UNICODE_FILENAME = "test-0123456789_中文_Orléans.jpg"
- MEDIA_ROOT = sys_tempfile.mkdtemp()
- UPLOAD_FOLDER = "test_upload"
- UPLOAD_TO = os.path.join(MEDIA_ROOT, UPLOAD_FOLDER)
- CANDIDATE_TRAVERSAL_FILE_NAMES = [
- "/tmp/hax0rd.txt",
- "C:\\Windows\\hax0rd.txt",
- "C:/Windows/hax0rd.txt",
- "\\tmp\\hax0rd.txt",
- "/tmp\\hax0rd.txt",
- "subdir/hax0rd.txt",
- "subdir\\hax0rd.txt",
- "sub/dir\\hax0rd.txt",
- "../../hax0rd.txt",
- "..\\..\\hax0rd.txt",
- "../..\\hax0rd.txt",
- "../hax0rd.txt",
- "../hax0rd.txt",
- ]
- CANDIDATE_INVALID_FILE_NAMES = [
- "/tmp/",
- "c:\\tmp\\",
- "/tmp/.",
- "c:\\tmp\\.",
- "/tmp/..",
- "c:\\tmp\\..",
- "",
- ]
- @override_settings(
- MEDIA_ROOT=MEDIA_ROOT, ROOT_URLCONF="file_uploads.urls", MIDDLEWARE=[]
- )
- class FileUploadTests(TestCase):
- @classmethod
- def setUpClass(cls):
- super().setUpClass()
- os.makedirs(MEDIA_ROOT, exist_ok=True)
- cls.addClassCleanup(shutil.rmtree, MEDIA_ROOT)
- def test_upload_name_is_validated(self):
- candidates = [
- "/tmp/",
- "/tmp/..",
- "/tmp/.",
- ]
- if sys.platform == "win32":
- candidates.extend(
- [
- "c:\\tmp\\",
- "c:\\tmp\\..",
- "c:\\tmp\\.",
- ]
- )
- for file_name in candidates:
- with self.subTest(file_name=file_name):
- self.assertRaises(SuspiciousFileOperation, UploadedFile, name=file_name)
- def test_simple_upload(self):
- with open(__file__, "rb") as fp:
- post_data = {
- "name": "Ringo",
- "file_field": fp,
- }
- response = self.client.post("/upload/", post_data)
- self.assertEqual(response.status_code, 200)
- def test_large_upload(self):
- file = tempfile.NamedTemporaryFile
- with file(suffix=".file1") as file1, file(suffix=".file2") as file2:
- file1.write(b"a" * (2**21))
- file1.seek(0)
- file2.write(b"a" * (10 * 2**20))
- file2.seek(0)
- post_data = {
- "name": "Ringo",
- "file_field1": file1,
- "file_field2": file2,
- }
- for key in list(post_data):
- try:
- post_data[key + "_hash"] = hashlib.sha1(
- post_data[key].read()
- ).hexdigest()
- post_data[key].seek(0)
- except AttributeError:
- post_data[key + "_hash"] = hashlib.sha1(
- post_data[key].encode()
- ).hexdigest()
- response = self.client.post("/verify/", post_data)
- self.assertEqual(response.status_code, 200)
- def _test_base64_upload(self, content, encode=base64.b64encode):
- payload = client.FakePayload(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file"; filename="test.txt"',
- "Content-Type: application/octet-stream",
- "Content-Transfer-Encoding: base64",
- "",
- ]
- )
- )
- payload.write(b"\r\n" + encode(content.encode()) + b"\r\n")
- payload.write("--" + client.BOUNDARY + "--\r\n")
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/echo_content/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
- self.assertEqual(response.json()["file"], content)
- def test_base64_upload(self):
- self._test_base64_upload("This data will be transmitted base64-encoded.")
- def test_big_base64_upload(self):
- self._test_base64_upload("Big data" * 68000)
- def test_big_base64_newlines_upload(self):
- self._test_base64_upload("Big data" * 68000, encode=base64.encodebytes)
- def test_base64_invalid_upload(self):
- payload = client.FakePayload(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file"; filename="test.txt"',
- "Content-Type: application/octet-stream",
- "Content-Transfer-Encoding: base64",
- "",
- ]
- )
- )
- payload.write(b"\r\n!\r\n")
- payload.write("--" + client.BOUNDARY + "--\r\n")
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/echo_content/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
- self.assertEqual(response.json()["file"], "")
- def test_unicode_file_name(self):
- with sys_tempfile.TemporaryDirectory() as temp_dir:
-
- with open(os.path.join(temp_dir, UNICODE_FILENAME), "w+b") as file1:
- file1.write(b"b" * (2**10))
- file1.seek(0)
- response = self.client.post("/unicode_name/", {"file_unicode": file1})
- self.assertEqual(response.status_code, 200)
- def test_unicode_file_name_rfc2231(self):
- """
- Receiving file upload when filename is encoded with RFC 2231.
- """
- payload = client.FakePayload()
- payload.write(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file_unicode"; '
- "filename*=UTF-8''%s" % quote(UNICODE_FILENAME),
- "Content-Type: application/octet-stream",
- "",
- "You got pwnd.\r\n",
- "\r\n--" + client.BOUNDARY + "--\r\n",
- ]
- )
- )
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/unicode_name/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
- self.assertEqual(response.status_code, 200)
- def test_unicode_name_rfc2231(self):
- """
- Receiving file upload when filename is encoded with RFC 2231.
- """
- payload = client.FakePayload()
- payload.write(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- "Content-Disposition: form-data; name*=UTF-8''file_unicode; "
- "filename*=UTF-8''%s" % quote(UNICODE_FILENAME),
- "Content-Type: application/octet-stream",
- "",
- "You got pwnd.\r\n",
- "\r\n--" + client.BOUNDARY + "--\r\n",
- ]
- )
- )
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/unicode_name/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
- self.assertEqual(response.status_code, 200)
- def test_unicode_file_name_rfc2231_with_double_quotes(self):
- payload = client.FakePayload()
- payload.write(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file_unicode"; '
- "filename*=\"UTF-8''%s\"" % quote(UNICODE_FILENAME),
- "Content-Type: application/octet-stream",
- "",
- "You got pwnd.\r\n",
- "\r\n--" + client.BOUNDARY + "--\r\n",
- ]
- )
- )
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/unicode_name/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
- self.assertEqual(response.status_code, 200)
- def test_unicode_name_rfc2231_with_double_quotes(self):
- payload = client.FakePayload()
- payload.write(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- "Content-Disposition: form-data; name*=\"UTF-8''file_unicode\"; "
- "filename*=\"UTF-8''%s\"" % quote(UNICODE_FILENAME),
- "Content-Type: application/octet-stream",
- "",
- "You got pwnd.\r\n",
- "\r\n--" + client.BOUNDARY + "--\r\n",
- ]
- )
- )
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/unicode_name/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
- self.assertEqual(response.status_code, 200)
- def test_blank_filenames(self):
- """
- Receiving file upload when filename is blank (before and after
- sanitization) should be okay.
- """
- filenames = [
- "",
-
- "C:\\Windows\\",
-
- "/",
- "ends-with-slash/",
- ]
- payload = client.FakePayload()
- for i, name in enumerate(filenames):
- payload.write(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file%s"; filename="%s"'
- % (i, name),
- "Content-Type: application/octet-stream",
- "",
- "You got pwnd.\r\n",
- ]
- )
- )
- payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/echo/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
- self.assertEqual(response.status_code, 200)
-
- received = response.json()
- for i, name in enumerate(filenames):
- self.assertIsNone(received.get("file%s" % i))
- def test_non_printable_chars_in_file_names(self):
- file_name = "non-\x00printable\x00\n_chars.txt\x00"
- payload = client.FakePayload()
- payload.write(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- f'Content-Disposition: form-data; name="file"; '
- f'filename="{file_name}"',
- "Content-Type: application/octet-stream",
- "",
- "You got pwnd.\r\n",
- ]
- )
- )
- payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/echo/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
-
- received = response.json()
- self.assertEqual(received["file"], "non-printable_chars.txt")
- def test_dangerous_file_names(self):
- """Uploaded file names should be sanitized before ever reaching the view."""
-
-
-
-
-
- payload = client.FakePayload()
- for i, name in enumerate(CANDIDATE_TRAVERSAL_FILE_NAMES):
- payload.write(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file%s"; filename="%s"'
- % (i, name),
- "Content-Type: application/octet-stream",
- "",
- "You got pwnd.\r\n",
- ]
- )
- )
- payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/echo/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
-
- received = response.json()
- for i, name in enumerate(CANDIDATE_TRAVERSAL_FILE_NAMES):
- got = received["file%s" % i]
- self.assertEqual(got, "hax0rd.txt")
- def test_filename_overflow(self):
- """File names over 256 characters (dangerous on some platforms) get fixed up."""
- long_str = "f" * 300
- cases = [
-
- ("long_filename", "%s.txt" % long_str, "%s.txt" % long_str[:251]),
- ("long_extension", "foo.%s" % long_str, ".%s" % long_str[:254]),
- ("no_extension", long_str, long_str[:255]),
- ("no_filename", ".%s" % long_str, ".%s" % long_str[:254]),
- ("long_everything", "%s.%s" % (long_str, long_str), ".%s" % long_str[:254]),
- ]
- payload = client.FakePayload()
- for name, filename, _ in cases:
- payload.write(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- 'Content-Disposition: form-data; name="{}"; filename="{}"',
- "Content-Type: application/octet-stream",
- "",
- "Oops.",
- "",
- ]
- ).format(name, filename)
- )
- payload.write("\r\n--" + client.BOUNDARY + "--\r\n")
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/echo/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
- result = response.json()
- for name, _, expected in cases:
- got = result[name]
- self.assertEqual(expected, got, "Mismatch for {}".format(name))
- self.assertLess(
- len(got), 256, "Got a long file name (%s characters)." % len(got)
- )
- def test_file_content(self):
- file = tempfile.NamedTemporaryFile
- with file(suffix=".ctype_extra") as no_content_type, file(
- suffix=".ctype_extra"
- ) as simple_file:
- no_content_type.write(b"no content")
- no_content_type.seek(0)
- simple_file.write(b"text content")
- simple_file.seek(0)
- simple_file.content_type = "text/plain"
- string_io = StringIO("string content")
- bytes_io = BytesIO(b"binary content")
- response = self.client.post(
- "/echo_content/",
- {
- "no_content_type": no_content_type,
- "simple_file": simple_file,
- "string": string_io,
- "binary": bytes_io,
- },
- )
- received = response.json()
- self.assertEqual(received["no_content_type"], "no content")
- self.assertEqual(received["simple_file"], "text content")
- self.assertEqual(received["string"], "string content")
- self.assertEqual(received["binary"], "binary content")
- def test_content_type_extra(self):
- """Uploaded files may have content type parameters available."""
- file = tempfile.NamedTemporaryFile
- with file(suffix=".ctype_extra") as no_content_type, file(
- suffix=".ctype_extra"
- ) as simple_file:
- no_content_type.write(b"something")
- no_content_type.seek(0)
- simple_file.write(b"something")
- simple_file.seek(0)
- simple_file.content_type = "text/plain; test-key=test_value"
- response = self.client.post(
- "/echo_content_type_extra/",
- {
- "no_content_type": no_content_type,
- "simple_file": simple_file,
- },
- )
- received = response.json()
- self.assertEqual(received["no_content_type"], {})
- self.assertEqual(received["simple_file"], {"test-key": "test_value"})
- def test_truncated_multipart_handled_gracefully(self):
- """
- If passed an incomplete multipart message, MultiPartParser does not
- attempt to read beyond the end of the stream, and simply will handle
- the part that can be parsed gracefully.
- """
- payload_str = "\r\n".join(
- [
- "--" + client.BOUNDARY,
- 'Content-Disposition: form-data; name="file"; filename="foo.txt"',
- "Content-Type: application/octet-stream",
- "",
- "file contents" "--" + client.BOUNDARY + "--",
- "",
- ]
- )
- payload = client.FakePayload(payload_str[:-10])
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/echo/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- self.assertEqual(self.client.request(**r).json(), {})
- def test_empty_multipart_handled_gracefully(self):
- """
- If passed an empty multipart message, MultiPartParser will return
- an empty QueryDict.
- """
- r = {
- "CONTENT_LENGTH": 0,
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/echo/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": client.FakePayload(b""),
- }
- self.assertEqual(self.client.request(**r).json(), {})
- def test_custom_upload_handler(self):
- file = tempfile.NamedTemporaryFile
- with file() as smallfile, file() as bigfile:
-
- smallfile.write(b"a" * (2**21))
- smallfile.seek(0)
-
- bigfile.write(b"a" * (10 * 2**20))
- bigfile.seek(0)
-
- self.assertIn("f", self.client.post("/quota/", {"f": smallfile}).json())
-
- self.assertNotIn("f", self.client.post("/quota/", {"f": bigfile}).json())
- def test_broken_custom_upload_handler(self):
- with tempfile.NamedTemporaryFile() as file:
- file.write(b"a" * (2**21))
- file.seek(0)
- msg = (
- "You cannot alter upload handlers after the upload has been processed."
- )
- with self.assertRaisesMessage(AttributeError, msg):
- self.client.post("/quota/broken/", {"f": file})
- def test_stop_upload_temporary_file_handler(self):
- with tempfile.NamedTemporaryFile() as temp_file:
- temp_file.write(b"a")
- temp_file.seek(0)
- response = self.client.post("/temp_file/stop_upload/", {"file": temp_file})
- temp_path = response.json()["temp_path"]
- self.assertIs(os.path.exists(temp_path), False)
- def test_upload_interrupted_temporary_file_handler(self):
-
- class MockedParser(Parser):
- def __iter__(self):
- for item in super().__iter__():
- item_type, meta_data, field_stream = item
- yield item_type, meta_data, field_stream
- if item_type == FILE:
- return
- with tempfile.NamedTemporaryFile() as temp_file:
- temp_file.write(b"a")
- temp_file.seek(0)
- with mock.patch(
- "django.http.multipartparser.Parser",
- MockedParser,
- ):
- response = self.client.post(
- "/temp_file/upload_interrupted/",
- {"file": temp_file},
- )
- temp_path = response.json()["temp_path"]
- self.assertIs(os.path.exists(temp_path), False)
- def test_fileupload_getlist(self):
- file = tempfile.NamedTemporaryFile
- with file() as file1, file() as file2, file() as file2a:
- file1.write(b"a" * (2**23))
- file1.seek(0)
- file2.write(b"a" * (2 * 2**18))
- file2.seek(0)
- file2a.write(b"a" * (5 * 2**20))
- file2a.seek(0)
- response = self.client.post(
- "/getlist_count/",
- {
- "file1": file1,
- "field1": "test",
- "field2": "test3",
- "field3": "test5",
- "field4": "test6",
- "field5": "test7",
- "file2": (file2, file2a),
- },
- )
- got = response.json()
- self.assertEqual(got.get("file1"), 1)
- self.assertEqual(got.get("file2"), 2)
- def test_fileuploads_closed_at_request_end(self):
- file = tempfile.NamedTemporaryFile
- with file() as f1, file() as f2a, file() as f2b:
- response = self.client.post(
- "/fd_closing/t/",
- {
- "file": f1,
- "file2": (f2a, f2b),
- },
- )
- request = response.wsgi_request
-
- self.assertTrue(hasattr(request, "_files"))
- file = request._files["file"]
- self.assertTrue(file.closed)
- files = request._files.getlist("file2")
- self.assertTrue(files[0].closed)
- self.assertTrue(files[1].closed)
- def test_no_parsing_triggered_by_fd_closing(self):
- file = tempfile.NamedTemporaryFile
- with file() as f1, file() as f2a, file() as f2b:
- response = self.client.post(
- "/fd_closing/f/",
- {
- "file": f1,
- "file2": (f2a, f2b),
- },
- )
- request = response.wsgi_request
-
- self.assertFalse(hasattr(request, "_files"))
- def test_file_error_blocking(self):
- """
- The server should not block when there are upload errors (bug #8622).
- This can happen if something -- i.e. an exception handler -- tries to
- access POST while handling an error in parsing POST. This shouldn't
- cause an infinite loop!
- """
- class POSTAccessingHandler(client.ClientHandler):
- """A handler that'll access POST during an exception."""
- def handle_uncaught_exception(self, request, resolver, exc_info):
- ret = super().handle_uncaught_exception(request, resolver, exc_info)
- request.POST
- return ret
-
-
-
-
- try:
- client.FakePayload(b"a").read(2)
- except Exception as err:
- reference_error = err
-
- self.client.handler = POSTAccessingHandler()
- with open(__file__, "rb") as fp:
- post_data = {
- "name": "Ringo",
- "file_field": fp,
- }
- try:
- self.client.post("/upload_errors/", post_data)
- except reference_error.__class__ as err:
- self.assertNotEqual(
- str(err),
- str(reference_error),
- "Caught a repeated exception that'll cause an infinite loop in "
- "file uploads.",
- )
- except Exception as err:
-
- self.assertEqual(err.__class__, uploadhandler.CustomUploadError)
- def test_filename_case_preservation(self):
- """
- The storage backend shouldn't mess with the case of the filenames
- uploaded.
- """
-
-
-
- vars = {"boundary": "oUrBoUnDaRyStRiNg"}
- post_data = [
- "--%(boundary)s",
- 'Content-Disposition: form-data; name="file_field"; '
- 'filename="MiXeD_cAsE.txt"',
- "Content-Type: application/octet-stream",
- "",
- "file contents\n",
- "--%(boundary)s--\r\n",
- ]
- response = self.client.post(
- "/filename_case/",
- "\r\n".join(post_data) % vars,
- "multipart/form-data; boundary=%(boundary)s" % vars,
- )
- self.assertEqual(response.status_code, 200)
- id = int(response.content)
- obj = FileModel.objects.get(pk=id)
-
-
- self.assertEqual(os.path.basename(obj.testfile.path), "MiXeD_cAsE.txt")
- def test_filename_traversal_upload(self):
- os.makedirs(UPLOAD_TO, exist_ok=True)
- tests = [
- "../test.txt",
- "../test.txt",
- ]
- for file_name in tests:
- with self.subTest(file_name=file_name):
- payload = client.FakePayload()
- payload.write(
- "\r\n".join(
- [
- "--" + client.BOUNDARY,
- 'Content-Disposition: form-data; name="my_file"; '
- 'filename="%s";' % file_name,
- "Content-Type: text/plain",
- "",
- "file contents.\r\n",
- "\r\n--" + client.BOUNDARY + "--\r\n",
- ]
- ),
- )
- r = {
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": client.MULTIPART_CONTENT,
- "PATH_INFO": "/upload_traversal/",
- "REQUEST_METHOD": "POST",
- "wsgi.input": payload,
- }
- response = self.client.request(**r)
- result = response.json()
- self.assertEqual(response.status_code, 200)
- self.assertEqual(result["file_name"], "test.txt")
- self.assertIs(
- os.path.exists(os.path.join(MEDIA_ROOT, "test.txt")),
- False,
- )
- self.assertIs(
- os.path.exists(os.path.join(UPLOAD_TO, "test.txt")),
- True,
- )
- @override_settings(MEDIA_ROOT=MEDIA_ROOT)
- class DirectoryCreationTests(SimpleTestCase):
- """
- Tests for error handling during directory creation
- via _save_FIELD_file (ticket #6450)
- """
- @classmethod
- def setUpClass(cls):
- super().setUpClass()
- os.makedirs(MEDIA_ROOT, exist_ok=True)
- cls.addClassCleanup(shutil.rmtree, MEDIA_ROOT)
- def setUp(self):
- self.obj = FileModel()
- @unittest.skipIf(
- sys.platform == "win32", "Python on Windows doesn't have working os.chmod()."
- )
- @override_settings(
- STORAGES={
- DEFAULT_STORAGE_ALIAS: {
- "BACKEND": "django.core.files.storage.FileSystemStorage",
- }
- }
- )
- def test_readonly_root(self):
- """Permission errors are not swallowed"""
- os.chmod(MEDIA_ROOT, 0o500)
- self.addCleanup(os.chmod, MEDIA_ROOT, 0o700)
- with self.assertRaises(PermissionError):
- self.obj.testfile.save(
- "foo.txt", SimpleUploadedFile("foo.txt", b"x"), save=False
- )
- def test_not_a_directory(self):
- default_storage.delete(UPLOAD_TO)
-
- with SimpleUploadedFile(UPLOAD_TO, b"x") as file:
- default_storage.save(UPLOAD_TO, file)
- self.addCleanup(default_storage.delete, UPLOAD_TO)
- msg = "%s exists and is not a directory." % UPLOAD_TO
- with self.assertRaisesMessage(FileExistsError, msg):
- with SimpleUploadedFile("foo.txt", b"x") as file:
- self.obj.testfile.save("foo.txt", file, save=False)
- class MultiParserTests(SimpleTestCase):
- def test_empty_upload_handlers(self):
-
-
- MultiPartParser(
- {
- "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
- "CONTENT_LENGTH": "1",
- },
- StringIO("x"),
- [],
- "utf-8",
- )
- def test_invalid_content_type(self):
- with self.assertRaisesMessage(
- MultiPartParserError, "Invalid Content-Type: text/plain"
- ):
- MultiPartParser(
- {
- "CONTENT_TYPE": "text/plain",
- "CONTENT_LENGTH": "1",
- },
- StringIO("x"),
- [],
- "utf-8",
- )
- def test_negative_content_length(self):
- with self.assertRaisesMessage(
- MultiPartParserError, "Invalid content length: -1"
- ):
- MultiPartParser(
- {
- "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
- "CONTENT_LENGTH": -1,
- },
- StringIO("x"),
- [],
- "utf-8",
- )
- def test_bad_type_content_length(self):
- multipart_parser = MultiPartParser(
- {
- "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
- "CONTENT_LENGTH": "a",
- },
- StringIO("x"),
- [],
- "utf-8",
- )
- self.assertEqual(multipart_parser._content_length, 0)
- def test_sanitize_file_name(self):
- parser = MultiPartParser(
- {
- "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
- "CONTENT_LENGTH": "1",
- },
- StringIO("x"),
- [],
- "utf-8",
- )
- for file_name in CANDIDATE_TRAVERSAL_FILE_NAMES:
- with self.subTest(file_name=file_name):
- self.assertEqual(parser.sanitize_file_name(file_name), "hax0rd.txt")
- def test_sanitize_invalid_file_name(self):
- parser = MultiPartParser(
- {
- "CONTENT_TYPE": "multipart/form-data; boundary=_foo",
- "CONTENT_LENGTH": "1",
- },
- StringIO("x"),
- [],
- "utf-8",
- )
- for file_name in CANDIDATE_INVALID_FILE_NAMES:
- with self.subTest(file_name=file_name):
- self.assertIsNone(parser.sanitize_file_name(file_name))
|