tests.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. # -*- encoding: utf-8 -*-
  2. from __future__ import unicode_literals
  3. import time
  4. import warnings
  5. from datetime import datetime, timedelta
  6. from io import BytesIO
  7. from django.db import connection, connections, DEFAULT_DB_ALIAS
  8. from django.core import signals
  9. from django.core.exceptions import SuspiciousOperation
  10. from django.core.handlers.wsgi import WSGIRequest, LimitedStream
  11. from django.http import HttpRequest, HttpResponse, parse_cookie, build_request_repr, UnreadablePostError
  12. from django.test import TransactionTestCase
  13. from django.test.client import FakePayload
  14. from django.test.utils import override_settings, str_prefix
  15. from django.utils import six
  16. from django.utils import unittest
  17. from django.utils.http import cookie_date, urlencode
  18. from django.utils.timezone import utc
  19. class RequestsTests(unittest.TestCase):
  20. def test_httprequest(self):
  21. request = HttpRequest()
  22. self.assertEqual(list(request.GET.keys()), [])
  23. self.assertEqual(list(request.POST.keys()), [])
  24. self.assertEqual(list(request.COOKIES.keys()), [])
  25. self.assertEqual(list(request.META.keys()), [])
  26. def test_httprequest_repr(self):
  27. request = HttpRequest()
  28. request.path = '/somepath/'
  29. request.GET = {'get-key': 'get-value'}
  30. request.POST = {'post-key': 'post-value'}
  31. request.COOKIES = {'post-key': 'post-value'}
  32. request.META = {'post-key': 'post-value'}
  33. self.assertEqual(repr(request), str_prefix("<HttpRequest\npath:/somepath/,\nGET:{%(_)s'get-key': %(_)s'get-value'},\nPOST:{%(_)s'post-key': %(_)s'post-value'},\nCOOKIES:{%(_)s'post-key': %(_)s'post-value'},\nMETA:{%(_)s'post-key': %(_)s'post-value'}>"))
  34. self.assertEqual(build_request_repr(request), repr(request))
  35. self.assertEqual(build_request_repr(request, path_override='/otherpath/', GET_override={'a': 'b'}, POST_override={'c': 'd'}, COOKIES_override={'e': 'f'}, META_override={'g': 'h'}),
  36. str_prefix("<HttpRequest\npath:/otherpath/,\nGET:{%(_)s'a': %(_)s'b'},\nPOST:{%(_)s'c': %(_)s'd'},\nCOOKIES:{%(_)s'e': %(_)s'f'},\nMETA:{%(_)s'g': %(_)s'h'}>"))
  37. def test_wsgirequest(self):
  38. request = WSGIRequest({'PATH_INFO': 'bogus', 'REQUEST_METHOD': 'bogus', 'wsgi.input': BytesIO(b'')})
  39. self.assertEqual(list(request.GET.keys()), [])
  40. self.assertEqual(list(request.POST.keys()), [])
  41. self.assertEqual(list(request.COOKIES.keys()), [])
  42. self.assertEqual(set(request.META.keys()), set(['PATH_INFO', 'REQUEST_METHOD', 'SCRIPT_NAME', 'wsgi.input']))
  43. self.assertEqual(request.META['PATH_INFO'], 'bogus')
  44. self.assertEqual(request.META['REQUEST_METHOD'], 'bogus')
  45. self.assertEqual(request.META['SCRIPT_NAME'], '')
  46. def test_wsgirequest_repr(self):
  47. request = WSGIRequest({'PATH_INFO': '/somepath/', 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')})
  48. request.GET = {'get-key': 'get-value'}
  49. request.POST = {'post-key': 'post-value'}
  50. request.COOKIES = {'post-key': 'post-value'}
  51. request.META = {'post-key': 'post-value'}
  52. self.assertEqual(repr(request), str_prefix("<WSGIRequest\npath:/somepath/,\nGET:{%(_)s'get-key': %(_)s'get-value'},\nPOST:{%(_)s'post-key': %(_)s'post-value'},\nCOOKIES:{%(_)s'post-key': %(_)s'post-value'},\nMETA:{%(_)s'post-key': %(_)s'post-value'}>"))
  53. self.assertEqual(build_request_repr(request), repr(request))
  54. self.assertEqual(build_request_repr(request, path_override='/otherpath/', GET_override={'a': 'b'}, POST_override={'c': 'd'}, COOKIES_override={'e': 'f'}, META_override={'g': 'h'}),
  55. str_prefix("<WSGIRequest\npath:/otherpath/,\nGET:{%(_)s'a': %(_)s'b'},\nPOST:{%(_)s'c': %(_)s'd'},\nCOOKIES:{%(_)s'e': %(_)s'f'},\nMETA:{%(_)s'g': %(_)s'h'}>"))
  56. def test_wsgirequest_path_info(self):
  57. def wsgi_str(path_info):
  58. path_info = path_info.encode('utf-8') # Actual URL sent by the browser (bytestring)
  59. if six.PY3:
  60. path_info = path_info.decode('iso-8859-1') # Value in the WSGI environ dict (native string)
  61. return path_info
  62. # Regression for #19468
  63. request = WSGIRequest({'PATH_INFO': wsgi_str("/سلام/"), 'REQUEST_METHOD': 'get', 'wsgi.input': BytesIO(b'')})
  64. self.assertEqual(request.path, "/سلام/")
  65. def test_parse_cookie(self):
  66. self.assertEqual(parse_cookie('invalid@key=true'), {})
  67. def test_httprequest_location(self):
  68. request = HttpRequest()
  69. self.assertEqual(request.build_absolute_uri(location="https://www.example.com/asdf"),
  70. 'https://www.example.com/asdf')
  71. request.get_host = lambda: 'www.example.com'
  72. request.path = ''
  73. self.assertEqual(request.build_absolute_uri(location="/path/with:colons"),
  74. 'http://www.example.com/path/with:colons')
  75. @override_settings(
  76. USE_X_FORWARDED_HOST=False,
  77. ALLOWED_HOSTS=[
  78. 'forward.com', 'example.com', 'internal.com', '12.34.56.78',
  79. '[2001:19f0:feee::dead:beef:cafe]', 'xn--4ca9at.com',
  80. '.multitenant.com', 'INSENSITIVE.com',
  81. ])
  82. def test_http_get_host(self):
  83. # Check if X_FORWARDED_HOST is provided.
  84. request = HttpRequest()
  85. request.META = {
  86. 'HTTP_X_FORWARDED_HOST': 'forward.com',
  87. 'HTTP_HOST': 'example.com',
  88. 'SERVER_NAME': 'internal.com',
  89. 'SERVER_PORT': 80,
  90. }
  91. # X_FORWARDED_HOST is ignored.
  92. self.assertEqual(request.get_host(), 'example.com')
  93. # Check if X_FORWARDED_HOST isn't provided.
  94. request = HttpRequest()
  95. request.META = {
  96. 'HTTP_HOST': 'example.com',
  97. 'SERVER_NAME': 'internal.com',
  98. 'SERVER_PORT': 80,
  99. }
  100. self.assertEqual(request.get_host(), 'example.com')
  101. # Check if HTTP_HOST isn't provided.
  102. request = HttpRequest()
  103. request.META = {
  104. 'SERVER_NAME': 'internal.com',
  105. 'SERVER_PORT': 80,
  106. }
  107. self.assertEqual(request.get_host(), 'internal.com')
  108. # Check if HTTP_HOST isn't provided, and we're on a nonstandard port
  109. request = HttpRequest()
  110. request.META = {
  111. 'SERVER_NAME': 'internal.com',
  112. 'SERVER_PORT': 8042,
  113. }
  114. self.assertEqual(request.get_host(), 'internal.com:8042')
  115. # Poisoned host headers are rejected as suspicious
  116. legit_hosts = [
  117. 'example.com',
  118. 'example.com:80',
  119. '12.34.56.78',
  120. '12.34.56.78:443',
  121. '[2001:19f0:feee::dead:beef:cafe]',
  122. '[2001:19f0:feee::dead:beef:cafe]:8080',
  123. 'xn--4ca9at.com', # Punnycode for öäü.com
  124. 'anything.multitenant.com',
  125. 'multitenant.com',
  126. 'insensitive.com',
  127. ]
  128. poisoned_hosts = [
  129. 'example.com@evil.tld',
  130. 'example.com:dr.frankenstein@evil.tld',
  131. 'example.com:dr.frankenstein@evil.tld:80',
  132. 'example.com:80/badpath',
  133. 'example.com: recovermypassword.com',
  134. 'other.com', # not in ALLOWED_HOSTS
  135. ]
  136. for host in legit_hosts:
  137. request = HttpRequest()
  138. request.META = {
  139. 'HTTP_HOST': host,
  140. }
  141. request.get_host()
  142. for host in poisoned_hosts:
  143. with self.assertRaises(SuspiciousOperation):
  144. request = HttpRequest()
  145. request.META = {
  146. 'HTTP_HOST': host,
  147. }
  148. request.get_host()
  149. @override_settings(USE_X_FORWARDED_HOST=True, ALLOWED_HOSTS=['*'])
  150. def test_http_get_host_with_x_forwarded_host(self):
  151. # Check if X_FORWARDED_HOST is provided.
  152. request = HttpRequest()
  153. request.META = {
  154. 'HTTP_X_FORWARDED_HOST': 'forward.com',
  155. 'HTTP_HOST': 'example.com',
  156. 'SERVER_NAME': 'internal.com',
  157. 'SERVER_PORT': 80,
  158. }
  159. # X_FORWARDED_HOST is obeyed.
  160. self.assertEqual(request.get_host(), 'forward.com')
  161. # Check if X_FORWARDED_HOST isn't provided.
  162. request = HttpRequest()
  163. request.META = {
  164. 'HTTP_HOST': 'example.com',
  165. 'SERVER_NAME': 'internal.com',
  166. 'SERVER_PORT': 80,
  167. }
  168. self.assertEqual(request.get_host(), 'example.com')
  169. # Check if HTTP_HOST isn't provided.
  170. request = HttpRequest()
  171. request.META = {
  172. 'SERVER_NAME': 'internal.com',
  173. 'SERVER_PORT': 80,
  174. }
  175. self.assertEqual(request.get_host(), 'internal.com')
  176. # Check if HTTP_HOST isn't provided, and we're on a nonstandard port
  177. request = HttpRequest()
  178. request.META = {
  179. 'SERVER_NAME': 'internal.com',
  180. 'SERVER_PORT': 8042,
  181. }
  182. self.assertEqual(request.get_host(), 'internal.com:8042')
  183. # Poisoned host headers are rejected as suspicious
  184. legit_hosts = [
  185. 'example.com',
  186. 'example.com:80',
  187. '12.34.56.78',
  188. '12.34.56.78:443',
  189. '[2001:19f0:feee::dead:beef:cafe]',
  190. '[2001:19f0:feee::dead:beef:cafe]:8080',
  191. 'xn--4ca9at.com', # Punnycode for öäü.com
  192. ]
  193. poisoned_hosts = [
  194. 'example.com@evil.tld',
  195. 'example.com:dr.frankenstein@evil.tld',
  196. 'example.com:dr.frankenstein@evil.tld:80',
  197. 'example.com:80/badpath',
  198. 'example.com: recovermypassword.com',
  199. ]
  200. for host in legit_hosts:
  201. request = HttpRequest()
  202. request.META = {
  203. 'HTTP_HOST': host,
  204. }
  205. request.get_host()
  206. for host in poisoned_hosts:
  207. with self.assertRaises(SuspiciousOperation):
  208. request = HttpRequest()
  209. request.META = {
  210. 'HTTP_HOST': host,
  211. }
  212. request.get_host()
  213. @override_settings(DEBUG=True, ALLOWED_HOSTS=[])
  214. def test_host_validation_disabled_in_debug_mode(self):
  215. """If ALLOWED_HOSTS is empty and DEBUG is True, all hosts pass."""
  216. request = HttpRequest()
  217. request.META = {
  218. 'HTTP_HOST': 'example.com',
  219. }
  220. self.assertEqual(request.get_host(), 'example.com')
  221. def test_near_expiration(self):
  222. "Cookie will expire when an near expiration time is provided"
  223. response = HttpResponse()
  224. # There is a timing weakness in this test; The
  225. # expected result for max-age requires that there be
  226. # a very slight difference between the evaluated expiration
  227. # time, and the time evaluated in set_cookie(). If this
  228. # difference doesn't exist, the cookie time will be
  229. # 1 second larger. To avoid the problem, put in a quick sleep,
  230. # which guarantees that there will be a time difference.
  231. expires = datetime.utcnow() + timedelta(seconds=10)
  232. time.sleep(0.001)
  233. response.set_cookie('datetime', expires=expires)
  234. datetime_cookie = response.cookies['datetime']
  235. self.assertEqual(datetime_cookie['max-age'], 10)
  236. def test_aware_expiration(self):
  237. "Cookie accepts an aware datetime as expiration time"
  238. response = HttpResponse()
  239. expires = (datetime.utcnow() + timedelta(seconds=10)).replace(tzinfo=utc)
  240. time.sleep(0.001)
  241. response.set_cookie('datetime', expires=expires)
  242. datetime_cookie = response.cookies['datetime']
  243. self.assertEqual(datetime_cookie['max-age'], 10)
  244. def test_far_expiration(self):
  245. "Cookie will expire when an distant expiration time is provided"
  246. response = HttpResponse()
  247. response.set_cookie('datetime', expires=datetime(2028, 1, 1, 4, 5, 6))
  248. datetime_cookie = response.cookies['datetime']
  249. self.assertEqual(datetime_cookie['expires'], 'Sat, 01-Jan-2028 04:05:06 GMT')
  250. def test_max_age_expiration(self):
  251. "Cookie will expire if max_age is provided"
  252. response = HttpResponse()
  253. response.set_cookie('max_age', max_age=10)
  254. max_age_cookie = response.cookies['max_age']
  255. self.assertEqual(max_age_cookie['max-age'], 10)
  256. self.assertEqual(max_age_cookie['expires'], cookie_date(time.time()+10))
  257. def test_httponly_cookie(self):
  258. response = HttpResponse()
  259. response.set_cookie('example', httponly=True)
  260. example_cookie = response.cookies['example']
  261. # A compat cookie may be in use -- check that it has worked
  262. # both as an output string, and using the cookie attributes
  263. self.assertTrue('; httponly' in str(example_cookie))
  264. self.assertTrue(example_cookie['httponly'])
  265. def test_limited_stream(self):
  266. # Read all of a limited stream
  267. stream = LimitedStream(BytesIO(b'test'), 2)
  268. self.assertEqual(stream.read(), b'te')
  269. # Reading again returns nothing.
  270. self.assertEqual(stream.read(), b'')
  271. # Read a number of characters greater than the stream has to offer
  272. stream = LimitedStream(BytesIO(b'test'), 2)
  273. self.assertEqual(stream.read(5), b'te')
  274. # Reading again returns nothing.
  275. self.assertEqual(stream.readline(5), b'')
  276. # Read sequentially from a stream
  277. stream = LimitedStream(BytesIO(b'12345678'), 8)
  278. self.assertEqual(stream.read(5), b'12345')
  279. self.assertEqual(stream.read(5), b'678')
  280. # Reading again returns nothing.
  281. self.assertEqual(stream.readline(5), b'')
  282. # Read lines from a stream
  283. stream = LimitedStream(BytesIO(b'1234\n5678\nabcd\nefgh\nijkl'), 24)
  284. # Read a full line, unconditionally
  285. self.assertEqual(stream.readline(), b'1234\n')
  286. # Read a number of characters less than a line
  287. self.assertEqual(stream.readline(2), b'56')
  288. # Read the rest of the partial line
  289. self.assertEqual(stream.readline(), b'78\n')
  290. # Read a full line, with a character limit greater than the line length
  291. self.assertEqual(stream.readline(6), b'abcd\n')
  292. # Read the next line, deliberately terminated at the line end
  293. self.assertEqual(stream.readline(4), b'efgh')
  294. # Read the next line... just the line end
  295. self.assertEqual(stream.readline(), b'\n')
  296. # Read everything else.
  297. self.assertEqual(stream.readline(), b'ijkl')
  298. # Regression for #15018
  299. # If a stream contains a newline, but the provided length
  300. # is less than the number of provided characters, the newline
  301. # doesn't reset the available character count
  302. stream = LimitedStream(BytesIO(b'1234\nabcdef'), 9)
  303. self.assertEqual(stream.readline(10), b'1234\n')
  304. self.assertEqual(stream.readline(3), b'abc')
  305. # Now expire the available characters
  306. self.assertEqual(stream.readline(3), b'd')
  307. # Reading again returns nothing.
  308. self.assertEqual(stream.readline(2), b'')
  309. # Same test, but with read, not readline.
  310. stream = LimitedStream(BytesIO(b'1234\nabcdef'), 9)
  311. self.assertEqual(stream.read(6), b'1234\na')
  312. self.assertEqual(stream.read(2), b'bc')
  313. self.assertEqual(stream.read(2), b'd')
  314. self.assertEqual(stream.read(2), b'')
  315. self.assertEqual(stream.read(), b'')
  316. def test_stream(self):
  317. payload = FakePayload('name=value')
  318. request = WSGIRequest({'REQUEST_METHOD': 'POST',
  319. 'CONTENT_TYPE': 'application/x-www-form-urlencoded',
  320. 'CONTENT_LENGTH': len(payload),
  321. 'wsgi.input': payload})
  322. self.assertEqual(request.read(), b'name=value')
  323. def test_read_after_value(self):
  324. """
  325. Reading from request is allowed after accessing request contents as
  326. POST or body.
  327. """
  328. payload = FakePayload('name=value')
  329. request = WSGIRequest({'REQUEST_METHOD': 'POST',
  330. 'CONTENT_TYPE': 'application/x-www-form-urlencoded',
  331. 'CONTENT_LENGTH': len(payload),
  332. 'wsgi.input': payload})
  333. self.assertEqual(request.POST, {'name': ['value']})
  334. self.assertEqual(request.body, b'name=value')
  335. self.assertEqual(request.read(), b'name=value')
  336. def test_value_after_read(self):
  337. """
  338. Construction of POST or body is not allowed after reading
  339. from request.
  340. """
  341. payload = FakePayload('name=value')
  342. request = WSGIRequest({'REQUEST_METHOD': 'POST',
  343. 'CONTENT_TYPE': 'application/x-www-form-urlencoded',
  344. 'CONTENT_LENGTH': len(payload),
  345. 'wsgi.input': payload})
  346. self.assertEqual(request.read(2), b'na')
  347. self.assertRaises(Exception, lambda: request.body)
  348. self.assertEqual(request.POST, {})
  349. def test_non_ascii_POST(self):
  350. payload = FakePayload(urlencode({'key': 'España'}))
  351. request = WSGIRequest({
  352. 'REQUEST_METHOD': 'POST',
  353. 'CONTENT_LENGTH': len(payload),
  354. 'CONTENT_TYPE': 'application/x-www-form-urlencoded',
  355. 'wsgi.input': payload,
  356. })
  357. self.assertEqual(request.POST, {'key': ['España']})
  358. def test_alternate_charset_POST(self):
  359. """
  360. Test a POST with non-utf-8 payload encoding.
  361. """
  362. from django.utils.http import urllib_parse
  363. payload = FakePayload(urllib_parse.urlencode({'key': 'España'.encode('latin-1')}))
  364. request = WSGIRequest({
  365. 'REQUEST_METHOD': 'POST',
  366. 'CONTENT_LENGTH': len(payload),
  367. 'CONTENT_TYPE': 'application/x-www-form-urlencoded; charset=iso-8859-1',
  368. 'wsgi.input': payload,
  369. })
  370. self.assertEqual(request.POST, {'key': ['España']})
  371. def test_body_after_POST_multipart(self):
  372. """
  373. Reading body after parsing multipart is not allowed
  374. """
  375. # Because multipart is used for large amounts fo data i.e. file uploads,
  376. # we don't want the data held in memory twice, and we don't want to
  377. # silence the error by setting body = '' either.
  378. payload = FakePayload("\r\n".join([
  379. '--boundary',
  380. 'Content-Disposition: form-data; name="name"',
  381. '',
  382. 'value',
  383. '--boundary--'
  384. '']))
  385. request = WSGIRequest({'REQUEST_METHOD': 'POST',
  386. 'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
  387. 'CONTENT_LENGTH': len(payload),
  388. 'wsgi.input': payload})
  389. self.assertEqual(request.POST, {'name': ['value']})
  390. self.assertRaises(Exception, lambda: request.body)
  391. def test_POST_multipart_with_content_length_zero(self):
  392. """
  393. Multipart POST requests with Content-Length >= 0 are valid and need to be handled.
  394. """
  395. # According to:
  396. # http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.13
  397. # Every request.POST with Content-Length >= 0 is a valid request,
  398. # this test ensures that we handle Content-Length == 0.
  399. payload = FakePayload("\r\n".join([
  400. '--boundary',
  401. 'Content-Disposition: form-data; name="name"',
  402. '',
  403. 'value',
  404. '--boundary--'
  405. '']))
  406. request = WSGIRequest({'REQUEST_METHOD': 'POST',
  407. 'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
  408. 'CONTENT_LENGTH': 0,
  409. 'wsgi.input': payload})
  410. self.assertEqual(request.POST, {})
  411. def test_POST_binary_only(self):
  412. payload = b'\r\n\x01\x00\x00\x00ab\x00\x00\xcd\xcc,@'
  413. environ = {'REQUEST_METHOD': 'POST',
  414. 'CONTENT_TYPE': 'application/octet-stream',
  415. 'CONTENT_LENGTH': len(payload),
  416. 'wsgi.input': BytesIO(payload)}
  417. request = WSGIRequest(environ)
  418. self.assertEqual(request.POST, {})
  419. self.assertEqual(request.FILES, {})
  420. self.assertEqual(request.body, payload)
  421. # Same test without specifying content-type
  422. environ.update({'CONTENT_TYPE': '', 'wsgi.input': BytesIO(payload)})
  423. request = WSGIRequest(environ)
  424. self.assertEqual(request.POST, {})
  425. self.assertEqual(request.FILES, {})
  426. self.assertEqual(request.body, payload)
  427. def test_read_by_lines(self):
  428. payload = FakePayload('name=value')
  429. request = WSGIRequest({'REQUEST_METHOD': 'POST',
  430. 'CONTENT_TYPE': 'application/x-www-form-urlencoded',
  431. 'CONTENT_LENGTH': len(payload),
  432. 'wsgi.input': payload})
  433. self.assertEqual(list(request), [b'name=value'])
  434. def test_POST_after_body_read(self):
  435. """
  436. POST should be populated even if body is read first
  437. """
  438. payload = FakePayload('name=value')
  439. request = WSGIRequest({'REQUEST_METHOD': 'POST',
  440. 'CONTENT_TYPE': 'application/x-www-form-urlencoded',
  441. 'CONTENT_LENGTH': len(payload),
  442. 'wsgi.input': payload})
  443. raw_data = request.body
  444. self.assertEqual(request.POST, {'name': ['value']})
  445. def test_POST_after_body_read_and_stream_read(self):
  446. """
  447. POST should be populated even if body is read first, and then
  448. the stream is read second.
  449. """
  450. payload = FakePayload('name=value')
  451. request = WSGIRequest({'REQUEST_METHOD': 'POST',
  452. 'CONTENT_TYPE': 'application/x-www-form-urlencoded',
  453. 'CONTENT_LENGTH': len(payload),
  454. 'wsgi.input': payload})
  455. raw_data = request.body
  456. self.assertEqual(request.read(1), b'n')
  457. self.assertEqual(request.POST, {'name': ['value']})
  458. def test_POST_after_body_read_and_stream_read_multipart(self):
  459. """
  460. POST should be populated even if body is read first, and then
  461. the stream is read second. Using multipart/form-data instead of urlencoded.
  462. """
  463. payload = FakePayload("\r\n".join([
  464. '--boundary',
  465. 'Content-Disposition: form-data; name="name"',
  466. '',
  467. 'value',
  468. '--boundary--'
  469. '']))
  470. request = WSGIRequest({'REQUEST_METHOD': 'POST',
  471. 'CONTENT_TYPE': 'multipart/form-data; boundary=boundary',
  472. 'CONTENT_LENGTH': len(payload),
  473. 'wsgi.input': payload})
  474. raw_data = request.body
  475. # Consume enough data to mess up the parsing:
  476. self.assertEqual(request.read(13), b'--boundary\r\nC')
  477. self.assertEqual(request.POST, {'name': ['value']})
  478. def test_POST_connection_error(self):
  479. """
  480. If wsgi.input.read() raises an exception while trying to read() the
  481. POST, the exception should be identifiable (not a generic IOError).
  482. """
  483. class ExplodingBytesIO(BytesIO):
  484. def read(self, len=0):
  485. raise IOError("kaboom!")
  486. payload = b'name=value'
  487. request = WSGIRequest({'REQUEST_METHOD': 'POST',
  488. 'CONTENT_TYPE': 'application/x-www-form-urlencoded',
  489. 'CONTENT_LENGTH': len(payload),
  490. 'wsgi.input': ExplodingBytesIO(payload)})
  491. with self.assertRaises(UnreadablePostError):
  492. request.body
  493. class TransactionRequestTests(TransactionTestCase):
  494. def test_request_finished_db_state(self):
  495. # The GET below will not succeed, but it will give a response with
  496. # defined ._handler_class. That is needed for sending the
  497. # request_finished signal.
  498. response = self.client.get('/')
  499. # Make sure there is an open connection
  500. connection.cursor()
  501. connection.enter_transaction_management()
  502. connection.managed(True)
  503. signals.request_finished.send(sender=response._handler_class)
  504. # In-memory sqlite doesn't actually close connections.
  505. if connection.vendor != 'sqlite':
  506. self.assertIs(connection.connection, None)
  507. self.assertEqual(len(connection.transaction_state), 0)
  508. @unittest.skipIf(connection.vendor == 'sqlite',
  509. 'This test will close the connection, in-memory '
  510. 'sqlite connections must not be closed.')
  511. def test_request_finished_failed_connection(self):
  512. conn = connections[DEFAULT_DB_ALIAS]
  513. conn.enter_transaction_management()
  514. conn.managed(True)
  515. conn.set_dirty()
  516. # Test that the rollback doesn't succeed (for example network failure
  517. # could cause this).
  518. def fail_horribly():
  519. raise Exception("Horrible failure!")
  520. conn._rollback = fail_horribly
  521. try:
  522. with self.assertRaises(Exception):
  523. signals.request_finished.send(sender=self.__class__)
  524. # The connection's state wasn't cleaned up
  525. self.assertTrue(len(connection.transaction_state), 1)
  526. finally:
  527. del conn._rollback
  528. # The connection will be cleaned on next request where the conn
  529. # works again.
  530. signals.request_finished.send(sender=self.__class__)
  531. self.assertEqual(len(connection.transaction_state), 0)