1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084 |
- import pickle
- from io import BytesIO
- from itertools import chain
- from urllib.parse import urlencode
- from django.core.exceptions import DisallowedHost
- from django.core.handlers.wsgi import LimitedStream, WSGIRequest
- from django.http import (
- HttpHeaders,
- HttpRequest,
- RawPostDataException,
- UnreadablePostError,
- )
- from django.http.multipartparser import MultiPartParserError
- from django.http.request import split_domain_port
- from django.test import RequestFactory, SimpleTestCase, override_settings
- from django.test.client import FakePayload
- class RequestsTests(SimpleTestCase):
- def test_httprequest(self):
- request = HttpRequest()
- self.assertEqual(list(request.GET), [])
- self.assertEqual(list(request.POST), [])
- self.assertEqual(list(request.COOKIES), [])
- self.assertEqual(list(request.META), [])
- # .GET and .POST should be QueryDicts
- self.assertEqual(request.GET.urlencode(), "")
- self.assertEqual(request.POST.urlencode(), "")
- # and FILES should be MultiValueDict
- self.assertEqual(request.FILES.getlist("foo"), [])
- self.assertIsNone(request.content_type)
- self.assertIsNone(request.content_params)
- def test_httprequest_full_path(self):
- request = HttpRequest()
- request.path = "/;some/?awful/=path/foo:bar/"
- request.path_info = "/prefix" + request.path
- request.META["QUERY_STRING"] = ";some=query&+query=string"
- expected = "/%3Bsome/%3Fawful/%3Dpath/foo:bar/?;some=query&+query=string"
- self.assertEqual(request.get_full_path(), expected)
- self.assertEqual(request.get_full_path_info(), "/prefix" + expected)
- def test_httprequest_full_path_with_query_string_and_fragment(self):
- request = HttpRequest()
- request.path = "/foo#bar"
- request.path_info = "/prefix" + request.path
- request.META["QUERY_STRING"] = "baz#quux"
- self.assertEqual(request.get_full_path(), "/foo%23bar?baz#quux")
- self.assertEqual(request.get_full_path_info(), "/prefix/foo%23bar?baz#quux")
- def test_httprequest_repr(self):
- request = HttpRequest()
- request.path = "/somepath/"
- request.method = "GET"
- request.GET = {"get-key": "get-value"}
- request.POST = {"post-key": "post-value"}
- request.COOKIES = {"post-key": "post-value"}
- request.META = {"post-key": "post-value"}
- self.assertEqual(repr(request), "<HttpRequest: GET '/somepath/'>")
- def test_httprequest_repr_invalid_method_and_path(self):
- request = HttpRequest()
- self.assertEqual(repr(request), "<HttpRequest>")
- request = HttpRequest()
- request.method = "GET"
- self.assertEqual(repr(request), "<HttpRequest>")
- request = HttpRequest()
- request.path = ""
- self.assertEqual(repr(request), "<HttpRequest>")
- def test_wsgirequest(self):
- request = WSGIRequest(
- {
- "PATH_INFO": "bogus",
- "REQUEST_METHOD": "bogus",
- "CONTENT_TYPE": "text/html; charset=utf8",
- "wsgi.input": BytesIO(b""),
- }
- )
- self.assertEqual(list(request.GET), [])
- self.assertEqual(list(request.POST), [])
- self.assertEqual(list(request.COOKIES), [])
- self.assertEqual(
- set(request.META),
- {
- "PATH_INFO",
- "REQUEST_METHOD",
- "SCRIPT_NAME",
- "CONTENT_TYPE",
- "wsgi.input",
- },
- )
- self.assertEqual(request.META["PATH_INFO"], "bogus")
- self.assertEqual(request.META["REQUEST_METHOD"], "bogus")
- self.assertEqual(request.META["SCRIPT_NAME"], "")
- self.assertEqual(request.content_type, "text/html")
- self.assertEqual(request.content_params, {"charset": "utf8"})
- def test_wsgirequest_with_script_name(self):
- """
- The request's path is correctly assembled, regardless of whether or
- not the SCRIPT_NAME has a trailing slash (#20169).
- """
- # With trailing slash
- request = WSGIRequest(
- {
- "PATH_INFO": "/somepath/",
- "SCRIPT_NAME": "/PREFIX/",
- "REQUEST_METHOD": "get",
- "wsgi.input": BytesIO(b""),
- }
- )
- self.assertEqual(request.path, "/PREFIX/somepath/")
- # Without trailing slash
- request = WSGIRequest(
- {
- "PATH_INFO": "/somepath/",
- "SCRIPT_NAME": "/PREFIX",
- "REQUEST_METHOD": "get",
- "wsgi.input": BytesIO(b""),
- }
- )
- self.assertEqual(request.path, "/PREFIX/somepath/")
- def test_wsgirequest_script_url_double_slashes(self):
- """
- WSGI squashes multiple successive slashes in PATH_INFO, WSGIRequest
- should take that into account when populating request.path and
- request.META['SCRIPT_NAME'] (#17133).
- """
- request = WSGIRequest(
- {
- "SCRIPT_URL": "/mst/milestones//accounts/login//help",
- "PATH_INFO": "/milestones/accounts/login/help",
- "REQUEST_METHOD": "get",
- "wsgi.input": BytesIO(b""),
- }
- )
- self.assertEqual(request.path, "/mst/milestones/accounts/login/help")
- self.assertEqual(request.META["SCRIPT_NAME"], "/mst")
- def test_wsgirequest_with_force_script_name(self):
- """
- The FORCE_SCRIPT_NAME setting takes precedence over the request's
- SCRIPT_NAME environment parameter (#20169).
- """
- with override_settings(FORCE_SCRIPT_NAME="/FORCED_PREFIX/"):
- request = WSGIRequest(
- {
- "PATH_INFO": "/somepath/",
- "SCRIPT_NAME": "/PREFIX/",
- "REQUEST_METHOD": "get",
- "wsgi.input": BytesIO(b""),
- }
- )
- self.assertEqual(request.path, "/FORCED_PREFIX/somepath/")
- def test_wsgirequest_path_with_force_script_name_trailing_slash(self):
- """
- The request's path is correctly assembled, regardless of whether or not
- the FORCE_SCRIPT_NAME setting has a trailing slash (#20169).
- """
- # With trailing slash
- with override_settings(FORCE_SCRIPT_NAME="/FORCED_PREFIX/"):
- request = WSGIRequest(
- {
- "PATH_INFO": "/somepath/",
- "REQUEST_METHOD": "get",
- "wsgi.input": BytesIO(b""),
- }
- )
- self.assertEqual(request.path, "/FORCED_PREFIX/somepath/")
- # Without trailing slash
- with override_settings(FORCE_SCRIPT_NAME="/FORCED_PREFIX"):
- request = WSGIRequest(
- {
- "PATH_INFO": "/somepath/",
- "REQUEST_METHOD": "get",
- "wsgi.input": BytesIO(b""),
- }
- )
- self.assertEqual(request.path, "/FORCED_PREFIX/somepath/")
- def test_wsgirequest_repr(self):
- request = WSGIRequest({"REQUEST_METHOD": "get", "wsgi.input": BytesIO(b"")})
- self.assertEqual(repr(request), "<WSGIRequest: GET '/'>")
- request = WSGIRequest(
- {
- "PATH_INFO": "/somepath/",
- "REQUEST_METHOD": "get",
- "wsgi.input": BytesIO(b""),
- }
- )
- request.GET = {"get-key": "get-value"}
- request.POST = {"post-key": "post-value"}
- request.COOKIES = {"post-key": "post-value"}
- request.META = {"post-key": "post-value"}
- self.assertEqual(repr(request), "<WSGIRequest: GET '/somepath/'>")
- def test_wsgirequest_path_info(self):
- def wsgi_str(path_info, encoding="utf-8"):
- path_info = path_info.encode(
- encoding
- ) # Actual URL sent by the browser (bytestring)
- path_info = path_info.decode(
- "iso-8859-1"
- ) # Value in the WSGI environ dict (native string)
- return path_info
- # Regression for #19468
- request = WSGIRequest(
- {
- "PATH_INFO": wsgi_str("/سلام/"),
- "REQUEST_METHOD": "get",
- "wsgi.input": BytesIO(b""),
- }
- )
- self.assertEqual(request.path, "/سلام/")
- # The URL may be incorrectly encoded in a non-UTF-8 encoding (#26971)
- request = WSGIRequest(
- {
- "PATH_INFO": wsgi_str("/café/", encoding="iso-8859-1"),
- "REQUEST_METHOD": "get",
- "wsgi.input": BytesIO(b""),
- }
- )
- # Since it's impossible to decide the (wrong) encoding of the URL, it's
- # left percent-encoded in the path.
- self.assertEqual(request.path, "/caf%E9/")
- def test_limited_stream(self):
- # Read all of a limited stream
- stream = LimitedStream(BytesIO(b"test"), 2)
- self.assertEqual(stream.read(), b"te")
- # Reading again returns nothing.
- self.assertEqual(stream.read(), b"")
- # Read a number of characters greater than the stream has to offer
- stream = LimitedStream(BytesIO(b"test"), 2)
- self.assertEqual(stream.read(5), b"te")
- # Reading again returns nothing.
- self.assertEqual(stream.readline(5), b"")
- # Read sequentially from a stream
- stream = LimitedStream(BytesIO(b"12345678"), 8)
- self.assertEqual(stream.read(5), b"12345")
- self.assertEqual(stream.read(5), b"678")
- # Reading again returns nothing.
- self.assertEqual(stream.readline(5), b"")
- # Read lines from a stream
- stream = LimitedStream(BytesIO(b"1234\n5678\nabcd\nefgh\nijkl"), 24)
- # Read a full line, unconditionally
- self.assertEqual(stream.readline(), b"1234\n")
- # Read a number of characters less than a line
- self.assertEqual(stream.readline(2), b"56")
- # Read the rest of the partial line
- self.assertEqual(stream.readline(), b"78\n")
- # Read a full line, with a character limit greater than the line length
- self.assertEqual(stream.readline(6), b"abcd\n")
- # Read the next line, deliberately terminated at the line end
- self.assertEqual(stream.readline(4), b"efgh")
- # Read the next line... just the line end
- self.assertEqual(stream.readline(), b"\n")
- # Read everything else.
- self.assertEqual(stream.readline(), b"ijkl")
- # Regression for #15018
- # If a stream contains a newline, but the provided length
- # is less than the number of provided characters, the newline
- # doesn't reset the available character count
- stream = LimitedStream(BytesIO(b"1234\nabcdef"), 9)
- self.assertEqual(stream.readline(10), b"1234\n")
- self.assertEqual(stream.readline(3), b"abc")
- # Now expire the available characters
- self.assertEqual(stream.readline(3), b"d")
- # Reading again returns nothing.
- self.assertEqual(stream.readline(2), b"")
- # Same test, but with read, not readline.
- stream = LimitedStream(BytesIO(b"1234\nabcdef"), 9)
- self.assertEqual(stream.read(6), b"1234\na")
- self.assertEqual(stream.read(2), b"bc")
- self.assertEqual(stream.read(2), b"d")
- self.assertEqual(stream.read(2), b"")
- self.assertEqual(stream.read(), b"")
- def test_stream(self):
- payload = FakePayload("name=value")
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "application/x-www-form-urlencoded",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- },
- )
- self.assertEqual(request.read(), b"name=value")
- def test_read_after_value(self):
- """
- Reading from request is allowed after accessing request contents as
- POST or body.
- """
- payload = FakePayload("name=value")
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "application/x-www-form-urlencoded",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- }
- )
- self.assertEqual(request.POST, {"name": ["value"]})
- self.assertEqual(request.body, b"name=value")
- self.assertEqual(request.read(), b"name=value")
- def test_value_after_read(self):
- """
- Construction of POST or body is not allowed after reading
- from request.
- """
- payload = FakePayload("name=value")
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "application/x-www-form-urlencoded",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- }
- )
- self.assertEqual(request.read(2), b"na")
- with self.assertRaises(RawPostDataException):
- request.body
- self.assertEqual(request.POST, {})
- def test_non_ascii_POST(self):
- payload = FakePayload(urlencode({"key": "España"}))
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": "application/x-www-form-urlencoded",
- "wsgi.input": payload,
- }
- )
- self.assertEqual(request.POST, {"key": ["España"]})
- def test_alternate_charset_POST(self):
- """
- Test a POST with non-utf-8 payload encoding.
- """
- payload = FakePayload(urlencode({"key": "España".encode("latin-1")}))
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_LENGTH": len(payload),
- "CONTENT_TYPE": "application/x-www-form-urlencoded; charset=iso-8859-1",
- "wsgi.input": payload,
- }
- )
- self.assertEqual(request.POST, {"key": ["España"]})
- def test_body_after_POST_multipart_form_data(self):
- """
- Reading body after parsing multipart/form-data is not allowed
- """
- # Because multipart is used for large amounts of data i.e. file uploads,
- # we don't want the data held in memory twice, and we don't want to
- # silence the error by setting body = '' either.
- payload = FakePayload(
- "\r\n".join(
- [
- "--boundary",
- 'Content-Disposition: form-data; name="name"',
- "",
- "value",
- "--boundary--",
- ]
- )
- )
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "multipart/form-data; boundary=boundary",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- }
- )
- self.assertEqual(request.POST, {"name": ["value"]})
- with self.assertRaises(RawPostDataException):
- request.body
- def test_body_after_POST_multipart_related(self):
- """
- Reading body after parsing multipart that isn't form-data is allowed
- """
- # Ticket #9054
- # There are cases in which the multipart data is related instead of
- # being a binary upload, in which case it should still be accessible
- # via body.
- payload_data = b"\r\n".join(
- [
- b"--boundary",
- b'Content-ID: id; name="name"',
- b"",
- b"value",
- b"--boundary--",
- ]
- )
- payload = FakePayload(payload_data)
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "multipart/related; boundary=boundary",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- }
- )
- self.assertEqual(request.POST, {})
- self.assertEqual(request.body, payload_data)
- def test_POST_multipart_with_content_length_zero(self):
- """
- Multipart POST requests with Content-Length >= 0 are valid and need to
- be handled.
- """
- # According to RFC 9110 Section 8.6 every POST with Content-Length >= 0
- # is a valid request, so ensure that we handle Content-Length == 0.
- payload = FakePayload(
- "\r\n".join(
- [
- "--boundary",
- 'Content-Disposition: form-data; name="name"',
- "",
- "value",
- "--boundary--",
- ]
- )
- )
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "multipart/form-data; boundary=boundary",
- "CONTENT_LENGTH": 0,
- "wsgi.input": payload,
- }
- )
- self.assertEqual(request.POST, {})
- def test_POST_binary_only(self):
- payload = b"\r\n\x01\x00\x00\x00ab\x00\x00\xcd\xcc,@"
- environ = {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "application/octet-stream",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": BytesIO(payload),
- }
- request = WSGIRequest(environ)
- self.assertEqual(request.POST, {})
- self.assertEqual(request.FILES, {})
- self.assertEqual(request.body, payload)
- # Same test without specifying content-type
- environ.update({"CONTENT_TYPE": "", "wsgi.input": BytesIO(payload)})
- request = WSGIRequest(environ)
- self.assertEqual(request.POST, {})
- self.assertEqual(request.FILES, {})
- self.assertEqual(request.body, payload)
- def test_read_by_lines(self):
- payload = FakePayload("name=value")
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "application/x-www-form-urlencoded",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- }
- )
- self.assertEqual(list(request), [b"name=value"])
- def test_POST_after_body_read(self):
- """
- POST should be populated even if body is read first
- """
- payload = FakePayload("name=value")
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "application/x-www-form-urlencoded",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- }
- )
- request.body # evaluate
- self.assertEqual(request.POST, {"name": ["value"]})
- def test_POST_after_body_read_and_stream_read(self):
- """
- POST should be populated even if body is read first, and then
- the stream is read second.
- """
- payload = FakePayload("name=value")
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "application/x-www-form-urlencoded",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- }
- )
- request.body # evaluate
- self.assertEqual(request.read(1), b"n")
- self.assertEqual(request.POST, {"name": ["value"]})
- def test_POST_after_body_read_and_stream_read_multipart(self):
- """
- POST should be populated even if body is read first, and then
- the stream is read second. Using multipart/form-data instead of urlencoded.
- """
- payload = FakePayload(
- "\r\n".join(
- [
- "--boundary",
- 'Content-Disposition: form-data; name="name"',
- "",
- "value",
- "--boundary--" "",
- ]
- )
- )
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "multipart/form-data; boundary=boundary",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- }
- )
- request.body # evaluate
- # Consume enough data to mess up the parsing:
- self.assertEqual(request.read(13), b"--boundary\r\nC")
- self.assertEqual(request.POST, {"name": ["value"]})
- def test_POST_immutable_for_multipart(self):
- """
- MultiPartParser.parse() leaves request.POST immutable.
- """
- payload = FakePayload(
- "\r\n".join(
- [
- "--boundary",
- 'Content-Disposition: form-data; name="name"',
- "",
- "value",
- "--boundary--",
- ]
- )
- )
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "multipart/form-data; boundary=boundary",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- }
- )
- self.assertFalse(request.POST._mutable)
- def test_multipart_without_boundary(self):
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "multipart/form-data;",
- "CONTENT_LENGTH": 0,
- "wsgi.input": FakePayload(),
- }
- )
- with self.assertRaisesMessage(
- MultiPartParserError, "Invalid boundary in multipart: None"
- ):
- request.POST
- def test_multipart_non_ascii_content_type(self):
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "multipart/form-data; boundary = \xe0",
- "CONTENT_LENGTH": 0,
- "wsgi.input": FakePayload(),
- }
- )
- msg = (
- "Invalid non-ASCII Content-Type in multipart: multipart/form-data; "
- "boundary = à"
- )
- with self.assertRaisesMessage(MultiPartParserError, msg):
- request.POST
- def test_POST_connection_error(self):
- """
- If wsgi.input.read() raises an exception while trying to read() the
- POST, the exception is identifiable (not a generic OSError).
- """
- class ExplodingBytesIO(BytesIO):
- def read(self, len=0):
- raise OSError("kaboom!")
- payload = b"name=value"
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "application/x-www-form-urlencoded",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": ExplodingBytesIO(payload),
- }
- )
- with self.assertRaises(UnreadablePostError):
- request.body
- def test_set_encoding_clears_POST(self):
- payload = FakePayload("name=Hello Günter")
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "application/x-www-form-urlencoded",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": payload,
- }
- )
- self.assertEqual(request.POST, {"name": ["Hello Günter"]})
- request.encoding = "iso-8859-16"
- self.assertEqual(request.POST, {"name": ["Hello GĂŒnter"]})
- def test_set_encoding_clears_GET(self):
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "GET",
- "wsgi.input": "",
- "QUERY_STRING": "name=Hello%20G%C3%BCnter",
- }
- )
- self.assertEqual(request.GET, {"name": ["Hello Günter"]})
- request.encoding = "iso-8859-16"
- self.assertEqual(request.GET, {"name": ["Hello G\u0102\u0152nter"]})
- def test_FILES_connection_error(self):
- """
- If wsgi.input.read() raises an exception while trying to read() the
- FILES, the exception is identifiable (not a generic OSError).
- """
- class ExplodingBytesIO(BytesIO):
- def read(self, len=0):
- raise OSError("kaboom!")
- payload = b"x"
- request = WSGIRequest(
- {
- "REQUEST_METHOD": "POST",
- "CONTENT_TYPE": "multipart/form-data; boundary=foo_",
- "CONTENT_LENGTH": len(payload),
- "wsgi.input": ExplodingBytesIO(payload),
- }
- )
- with self.assertRaises(UnreadablePostError):
- request.FILES
- def test_pickling_request(self):
- request = HttpRequest()
- request.method = "GET"
- request.path = "/testpath/"
- request.META = {
- "QUERY_STRING": ";some=query&+query=string",
- "SERVER_NAME": "example.com",
- "SERVER_PORT": 80,
- }
- request.COOKIES = {"post-key": "post-value"}
- dump = pickle.dumps(request)
- request_from_pickle = pickle.loads(dump)
- self.assertEqual(repr(request), repr(request_from_pickle))
- class HostValidationTests(SimpleTestCase):
- poisoned_hosts = [
- "example.com@evil.tld",
- "example.com:dr.frankenstein@evil.tld",
- "example.com:dr.frankenstein@evil.tld:80",
- "example.com:80/badpath",
- "example.com: recovermypassword.com",
- ]
- @override_settings(
- USE_X_FORWARDED_HOST=False,
- ALLOWED_HOSTS=[
- "forward.com",
- "example.com",
- "internal.com",
- "12.34.56.78",
- "[2001:19f0:feee::dead:beef:cafe]",
- "xn--4ca9at.com",
- ".multitenant.com",
- "INSENSITIVE.com",
- "[::ffff:169.254.169.254]",
- ],
- )
- def test_http_get_host(self):
- # Check if X_FORWARDED_HOST is provided.
- request = HttpRequest()
- request.META = {
- "HTTP_X_FORWARDED_HOST": "forward.com",
- "HTTP_HOST": "example.com",
- "SERVER_NAME": "internal.com",
- "SERVER_PORT": 80,
- }
- # X_FORWARDED_HOST is ignored.
- self.assertEqual(request.get_host(), "example.com")
- # Check if X_FORWARDED_HOST isn't provided.
- request = HttpRequest()
- request.META = {
- "HTTP_HOST": "example.com",
- "SERVER_NAME": "internal.com",
- "SERVER_PORT": 80,
- }
- self.assertEqual(request.get_host(), "example.com")
- # Check if HTTP_HOST isn't provided.
- request = HttpRequest()
- request.META = {
- "SERVER_NAME": "internal.com",
- "SERVER_PORT": 80,
- }
- self.assertEqual(request.get_host(), "internal.com")
- # Check if HTTP_HOST isn't provided, and we're on a nonstandard port
- request = HttpRequest()
- request.META = {
- "SERVER_NAME": "internal.com",
- "SERVER_PORT": 8042,
- }
- self.assertEqual(request.get_host(), "internal.com:8042")
- legit_hosts = [
- "example.com",
- "example.com:80",
- "12.34.56.78",
- "12.34.56.78:443",
- "[2001:19f0:feee::dead:beef:cafe]",
- "[2001:19f0:feee::dead:beef:cafe]:8080",
- "xn--4ca9at.com", # Punycode for öäü.com
- "anything.multitenant.com",
- "multitenant.com",
- "insensitive.com",
- "example.com.",
- "example.com.:80",
- "[::ffff:169.254.169.254]",
- ]
- for host in legit_hosts:
- request = HttpRequest()
- request.META = {
- "HTTP_HOST": host,
- }
- request.get_host()
- # Poisoned host headers are rejected as suspicious
- for host in chain(self.poisoned_hosts, ["other.com", "example.com.."]):
- with self.assertRaises(DisallowedHost):
- request = HttpRequest()
- request.META = {
- "HTTP_HOST": host,
- }
- request.get_host()
- @override_settings(USE_X_FORWARDED_HOST=True, ALLOWED_HOSTS=["*"])
- def test_http_get_host_with_x_forwarded_host(self):
- # Check if X_FORWARDED_HOST is provided.
- request = HttpRequest()
- request.META = {
- "HTTP_X_FORWARDED_HOST": "forward.com",
- "HTTP_HOST": "example.com",
- "SERVER_NAME": "internal.com",
- "SERVER_PORT": 80,
- }
- # X_FORWARDED_HOST is obeyed.
- self.assertEqual(request.get_host(), "forward.com")
- # Check if X_FORWARDED_HOST isn't provided.
- request = HttpRequest()
- request.META = {
- "HTTP_HOST": "example.com",
- "SERVER_NAME": "internal.com",
- "SERVER_PORT": 80,
- }
- self.assertEqual(request.get_host(), "example.com")
- # Check if HTTP_HOST isn't provided.
- request = HttpRequest()
- request.META = {
- "SERVER_NAME": "internal.com",
- "SERVER_PORT": 80,
- }
- self.assertEqual(request.get_host(), "internal.com")
- # Check if HTTP_HOST isn't provided, and we're on a nonstandard port
- request = HttpRequest()
- request.META = {
- "SERVER_NAME": "internal.com",
- "SERVER_PORT": 8042,
- }
- self.assertEqual(request.get_host(), "internal.com:8042")
- # Poisoned host headers are rejected as suspicious
- legit_hosts = [
- "example.com",
- "example.com:80",
- "12.34.56.78",
- "12.34.56.78:443",
- "[2001:19f0:feee::dead:beef:cafe]",
- "[2001:19f0:feee::dead:beef:cafe]:8080",
- "xn--4ca9at.com", # Punycode for öäü.com
- ]
- for host in legit_hosts:
- request = HttpRequest()
- request.META = {
- "HTTP_HOST": host,
- }
- request.get_host()
- for host in self.poisoned_hosts:
- with self.assertRaises(DisallowedHost):
- request = HttpRequest()
- request.META = {
- "HTTP_HOST": host,
- }
- request.get_host()
- @override_settings(USE_X_FORWARDED_PORT=False)
- def test_get_port(self):
- request = HttpRequest()
- request.META = {
- "SERVER_PORT": "8080",
- "HTTP_X_FORWARDED_PORT": "80",
- }
- # Shouldn't use the X-Forwarded-Port header
- self.assertEqual(request.get_port(), "8080")
- request = HttpRequest()
- request.META = {
- "SERVER_PORT": "8080",
- }
- self.assertEqual(request.get_port(), "8080")
- @override_settings(USE_X_FORWARDED_PORT=True)
- def test_get_port_with_x_forwarded_port(self):
- request = HttpRequest()
- request.META = {
- "SERVER_PORT": "8080",
- "HTTP_X_FORWARDED_PORT": "80",
- }
- # Should use the X-Forwarded-Port header
- self.assertEqual(request.get_port(), "80")
- request = HttpRequest()
- request.META = {
- "SERVER_PORT": "8080",
- }
- self.assertEqual(request.get_port(), "8080")
- @override_settings(DEBUG=True, ALLOWED_HOSTS=[])
- def test_host_validation_in_debug_mode(self):
- """
- If ALLOWED_HOSTS is empty and DEBUG is True, variants of localhost are
- allowed.
- """
- valid_hosts = ["localhost", "subdomain.localhost", "127.0.0.1", "[::1]"]
- for host in valid_hosts:
- request = HttpRequest()
- request.META = {"HTTP_HOST": host}
- self.assertEqual(request.get_host(), host)
- # Other hostnames raise a DisallowedHost.
- with self.assertRaises(DisallowedHost):
- request = HttpRequest()
- request.META = {"HTTP_HOST": "example.com"}
- request.get_host()
- @override_settings(ALLOWED_HOSTS=[])
- def test_get_host_suggestion_of_allowed_host(self):
- """
- get_host() makes helpful suggestions if a valid-looking host is not in
- ALLOWED_HOSTS.
- """
- msg_invalid_host = "Invalid HTTP_HOST header: %r."
- msg_suggestion = msg_invalid_host + " You may need to add %r to ALLOWED_HOSTS."
- msg_suggestion2 = (
- msg_invalid_host
- + " The domain name provided is not valid according to RFC 1034/1035"
- )
- for host in [ # Valid-looking hosts
- "example.com",
- "12.34.56.78",
- "[2001:19f0:feee::dead:beef:cafe]",
- "xn--4ca9at.com", # Punycode for öäü.com
- ]:
- request = HttpRequest()
- request.META = {"HTTP_HOST": host}
- with self.assertRaisesMessage(
- DisallowedHost, msg_suggestion % (host, host)
- ):
- request.get_host()
- for domain, port in [ # Valid-looking hosts with a port number
- ("example.com", 80),
- ("12.34.56.78", 443),
- ("[2001:19f0:feee::dead:beef:cafe]", 8080),
- ]:
- host = "%s:%s" % (domain, port)
- request = HttpRequest()
- request.META = {"HTTP_HOST": host}
- with self.assertRaisesMessage(
- DisallowedHost, msg_suggestion % (host, domain)
- ):
- request.get_host()
- for host in self.poisoned_hosts:
- request = HttpRequest()
- request.META = {"HTTP_HOST": host}
- with self.assertRaisesMessage(DisallowedHost, msg_invalid_host % host):
- request.get_host()
- request = HttpRequest()
- request.META = {"HTTP_HOST": "invalid_hostname.com"}
- with self.assertRaisesMessage(
- DisallowedHost, msg_suggestion2 % "invalid_hostname.com"
- ):
- request.get_host()
- def test_split_domain_port_removes_trailing_dot(self):
- domain, port = split_domain_port("example.com.:8080")
- self.assertEqual(domain, "example.com")
- self.assertEqual(port, "8080")
- class BuildAbsoluteURITests(SimpleTestCase):
- factory = RequestFactory()
- def test_absolute_url(self):
- request = HttpRequest()
- url = "https://www.example.com/asdf"
- self.assertEqual(request.build_absolute_uri(location=url), url)
- def test_host_retrieval(self):
- request = HttpRequest()
- request.get_host = lambda: "www.example.com"
- request.path = ""
- self.assertEqual(
- request.build_absolute_uri(location="/path/with:colons"),
- "http://www.example.com/path/with:colons",
- )
- def test_request_path_begins_with_two_slashes(self):
- # //// creates a request with a path beginning with //
- request = self.factory.get("////absolute-uri")
- tests = (
- # location isn't provided
- (None, "http://testserver//absolute-uri"),
- # An absolute URL
- ("http://example.com/?foo=bar", "http://example.com/?foo=bar"),
- # A schema-relative URL
- ("//example.com/?foo=bar", "http://example.com/?foo=bar"),
- # Relative URLs
- ("/foo/bar/", "http://testserver/foo/bar/"),
- ("/foo/./bar/", "http://testserver/foo/bar/"),
- ("/foo/../bar/", "http://testserver/bar/"),
- ("///foo/bar/", "http://testserver/foo/bar/"),
- )
- for location, expected_url in tests:
- with self.subTest(location=location):
- self.assertEqual(
- request.build_absolute_uri(location=location), expected_url
- )
- class RequestHeadersTests(SimpleTestCase):
- ENVIRON = {
- # Non-headers are ignored.
- "PATH_INFO": "/somepath/",
- "REQUEST_METHOD": "get",
- "wsgi.input": BytesIO(b""),
- "SERVER_NAME": "internal.com",
- "SERVER_PORT": 80,
- # These non-HTTP prefixed headers are included.
- "CONTENT_TYPE": "text/html",
- "CONTENT_LENGTH": "100",
- # All HTTP-prefixed headers are included.
- "HTTP_ACCEPT": "*",
- "HTTP_HOST": "example.com",
- "HTTP_USER_AGENT": "python-requests/1.2.0",
- }
- def test_base_request_headers(self):
- request = HttpRequest()
- request.META = self.ENVIRON
- self.assertEqual(
- dict(request.headers),
- {
- "Content-Type": "text/html",
- "Content-Length": "100",
- "Accept": "*",
- "Host": "example.com",
- "User-Agent": "python-requests/1.2.0",
- },
- )
- def test_wsgi_request_headers(self):
- request = WSGIRequest(self.ENVIRON)
- self.assertEqual(
- dict(request.headers),
- {
- "Content-Type": "text/html",
- "Content-Length": "100",
- "Accept": "*",
- "Host": "example.com",
- "User-Agent": "python-requests/1.2.0",
- },
- )
- def test_wsgi_request_headers_getitem(self):
- request = WSGIRequest(self.ENVIRON)
- self.assertEqual(request.headers["User-Agent"], "python-requests/1.2.0")
- self.assertEqual(request.headers["user-agent"], "python-requests/1.2.0")
- self.assertEqual(request.headers["user_agent"], "python-requests/1.2.0")
- self.assertEqual(request.headers["Content-Type"], "text/html")
- self.assertEqual(request.headers["Content-Length"], "100")
- def test_wsgi_request_headers_get(self):
- request = WSGIRequest(self.ENVIRON)
- self.assertEqual(request.headers.get("User-Agent"), "python-requests/1.2.0")
- self.assertEqual(request.headers.get("user-agent"), "python-requests/1.2.0")
- self.assertEqual(request.headers.get("Content-Type"), "text/html")
- self.assertEqual(request.headers.get("Content-Length"), "100")
- class HttpHeadersTests(SimpleTestCase):
- def test_basic(self):
- environ = {
- "CONTENT_TYPE": "text/html",
- "CONTENT_LENGTH": "100",
- "HTTP_HOST": "example.com",
- }
- headers = HttpHeaders(environ)
- self.assertEqual(sorted(headers), ["Content-Length", "Content-Type", "Host"])
- self.assertEqual(
- headers,
- {
- "Content-Type": "text/html",
- "Content-Length": "100",
- "Host": "example.com",
- },
- )
- def test_parse_header_name(self):
- tests = (
- ("PATH_INFO", None),
- ("HTTP_ACCEPT", "Accept"),
- ("HTTP_USER_AGENT", "User-Agent"),
- ("HTTP_X_FORWARDED_PROTO", "X-Forwarded-Proto"),
- ("CONTENT_TYPE", "Content-Type"),
- ("CONTENT_LENGTH", "Content-Length"),
- )
- for header, expected in tests:
- with self.subTest(header=header):
- self.assertEqual(HttpHeaders.parse_header_name(header), expected)
|