tests.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. from django.core.exceptions import ImproperlyConfigured
  2. from django.core.handlers.wsgi import WSGIHandler, WSGIRequest, get_script_name
  3. from django.core.signals import request_finished, request_started
  4. from django.db import close_old_connections, connection
  5. from django.test import (
  6. RequestFactory, SimpleTestCase, TransactionTestCase, override_settings,
  7. )
  8. from django.utils.version import PY37
  9. class HandlerTests(SimpleTestCase):
  10. request_factory = RequestFactory()
  11. def setUp(self):
  12. request_started.disconnect(close_old_connections)
  13. def tearDown(self):
  14. request_started.connect(close_old_connections)
  15. def test_middleware_initialized(self):
  16. handler = WSGIHandler()
  17. self.assertIsNotNone(handler._middleware_chain)
  18. def test_bad_path_info(self):
  19. """
  20. A non-UTF-8 path populates PATH_INFO with an URL-encoded path and
  21. produces a 404.
  22. """
  23. environ = self.request_factory.get('/').environ
  24. environ['PATH_INFO'] = '\xed'
  25. handler = WSGIHandler()
  26. response = handler(environ, lambda *a, **k: None)
  27. # The path of the request will be encoded to '/%ED'.
  28. self.assertEqual(response.status_code, 404)
  29. def test_non_ascii_query_string(self):
  30. """
  31. Non-ASCII query strings are properly decoded (#20530, #22996).
  32. """
  33. environ = self.request_factory.get('/').environ
  34. raw_query_strings = [
  35. b'want=caf%C3%A9', # This is the proper way to encode 'café'
  36. b'want=caf\xc3\xa9', # UA forgot to quote bytes
  37. b'want=caf%E9', # UA quoted, but not in UTF-8
  38. b'want=caf\xe9', # UA forgot to convert Latin-1 to UTF-8 and to quote (typical of MSIE)
  39. ]
  40. got = []
  41. for raw_query_string in raw_query_strings:
  42. # Simulate http.server.BaseHTTPRequestHandler.parse_request handling of raw request
  43. environ['QUERY_STRING'] = str(raw_query_string, 'iso-8859-1')
  44. request = WSGIRequest(environ)
  45. got.append(request.GET['want'])
  46. # %E9 is converted to the unicode replacement character by parse_qsl
  47. self.assertEqual(got, ['café', 'café', 'caf\ufffd', 'café'])
  48. def test_non_ascii_cookie(self):
  49. """Non-ASCII cookies set in JavaScript are properly decoded (#20557)."""
  50. environ = self.request_factory.get('/').environ
  51. raw_cookie = 'want="café"'.encode('utf-8').decode('iso-8859-1')
  52. environ['HTTP_COOKIE'] = raw_cookie
  53. request = WSGIRequest(environ)
  54. self.assertEqual(request.COOKIES['want'], "café")
  55. def test_invalid_unicode_cookie(self):
  56. """
  57. Invalid cookie content should result in an absent cookie, but not in a
  58. crash while trying to decode it (#23638).
  59. """
  60. environ = self.request_factory.get('/').environ
  61. environ['HTTP_COOKIE'] = 'x=W\x03c(h]\x8e'
  62. request = WSGIRequest(environ)
  63. # We don't test COOKIES content, as the result might differ between
  64. # Python version because parsing invalid content became stricter in
  65. # latest versions.
  66. self.assertIsInstance(request.COOKIES, dict)
  67. @override_settings(ROOT_URLCONF='handlers.urls')
  68. def test_invalid_multipart_boundary(self):
  69. """
  70. Invalid boundary string should produce a "Bad Request" response, not a
  71. server error (#23887).
  72. """
  73. environ = self.request_factory.post('/malformed_post/').environ
  74. environ['CONTENT_TYPE'] = 'multipart/form-data; boundary=WRONG\x07'
  75. handler = WSGIHandler()
  76. response = handler(environ, lambda *a, **k: None)
  77. # Expect "bad request" response
  78. self.assertEqual(response.status_code, 400)
  79. @override_settings(ROOT_URLCONF='handlers.urls', MIDDLEWARE=[])
  80. class TransactionsPerRequestTests(TransactionTestCase):
  81. available_apps = []
  82. def test_no_transaction(self):
  83. response = self.client.get('/in_transaction/')
  84. self.assertContains(response, 'False')
  85. def test_auto_transaction(self):
  86. old_atomic_requests = connection.settings_dict['ATOMIC_REQUESTS']
  87. try:
  88. connection.settings_dict['ATOMIC_REQUESTS'] = True
  89. response = self.client.get('/in_transaction/')
  90. finally:
  91. connection.settings_dict['ATOMIC_REQUESTS'] = old_atomic_requests
  92. self.assertContains(response, 'True')
  93. async def test_auto_transaction_async_view(self):
  94. old_atomic_requests = connection.settings_dict['ATOMIC_REQUESTS']
  95. try:
  96. connection.settings_dict['ATOMIC_REQUESTS'] = True
  97. msg = 'You cannot use ATOMIC_REQUESTS with async views.'
  98. with self.assertRaisesMessage(RuntimeError, msg):
  99. await self.async_client.get('/async_regular/')
  100. finally:
  101. connection.settings_dict['ATOMIC_REQUESTS'] = old_atomic_requests
  102. def test_no_auto_transaction(self):
  103. old_atomic_requests = connection.settings_dict['ATOMIC_REQUESTS']
  104. try:
  105. connection.settings_dict['ATOMIC_REQUESTS'] = True
  106. response = self.client.get('/not_in_transaction/')
  107. finally:
  108. connection.settings_dict['ATOMIC_REQUESTS'] = old_atomic_requests
  109. self.assertContains(response, 'False')
  110. @override_settings(ROOT_URLCONF='handlers.urls')
  111. class SignalsTests(SimpleTestCase):
  112. def setUp(self):
  113. self.signals = []
  114. self.signaled_environ = None
  115. request_started.connect(self.register_started)
  116. request_finished.connect(self.register_finished)
  117. def tearDown(self):
  118. request_started.disconnect(self.register_started)
  119. request_finished.disconnect(self.register_finished)
  120. def register_started(self, **kwargs):
  121. self.signals.append('started')
  122. self.signaled_environ = kwargs.get('environ')
  123. def register_finished(self, **kwargs):
  124. self.signals.append('finished')
  125. def test_request_signals(self):
  126. response = self.client.get('/regular/')
  127. self.assertEqual(self.signals, ['started', 'finished'])
  128. self.assertEqual(response.content, b"regular content")
  129. self.assertEqual(self.signaled_environ, response.wsgi_request.environ)
  130. def test_request_signals_streaming_response(self):
  131. response = self.client.get('/streaming/')
  132. self.assertEqual(self.signals, ['started'])
  133. self.assertEqual(b''.join(response.streaming_content), b"streaming content")
  134. self.assertEqual(self.signals, ['started', 'finished'])
  135. def empty_middleware(get_response):
  136. pass
  137. @override_settings(ROOT_URLCONF='handlers.urls')
  138. class HandlerRequestTests(SimpleTestCase):
  139. request_factory = RequestFactory()
  140. def test_async_view(self):
  141. """Calling an async view down the normal synchronous path."""
  142. response = self.client.get('/async_regular/')
  143. self.assertEqual(response.status_code, 200)
  144. def test_suspiciousop_in_view_returns_400(self):
  145. response = self.client.get('/suspicious/')
  146. self.assertEqual(response.status_code, 400)
  147. def test_invalid_urls(self):
  148. response = self.client.get('~%A9helloworld')
  149. self.assertEqual(response.status_code, 404)
  150. self.assertEqual(response.context['request_path'], '/~%25A9helloworld' if PY37 else '/%7E%25A9helloworld')
  151. response = self.client.get('d%aao%aaw%aan%aal%aao%aaa%aad%aa/')
  152. self.assertEqual(response.context['request_path'], '/d%25AAo%25AAw%25AAn%25AAl%25AAo%25AAa%25AAd%25AA')
  153. response = self.client.get('/%E2%99%E2%99%A5/')
  154. self.assertEqual(response.context['request_path'], '/%25E2%2599%E2%99%A5/')
  155. response = self.client.get('/%E2%98%8E%E2%A9%E2%99%A5/')
  156. self.assertEqual(response.context['request_path'], '/%E2%98%8E%25E2%25A9%E2%99%A5/')
  157. def test_environ_path_info_type(self):
  158. environ = self.request_factory.get('/%E2%A8%87%87%A5%E2%A8%A0').environ
  159. self.assertIsInstance(environ['PATH_INFO'], str)
  160. def test_handle_accepts_httpstatus_enum_value(self):
  161. def start_response(status, headers):
  162. start_response.status = status
  163. environ = self.request_factory.get('/httpstatus_enum/').environ
  164. WSGIHandler()(environ, start_response)
  165. self.assertEqual(start_response.status, '200 OK')
  166. @override_settings(MIDDLEWARE=['handlers.tests.empty_middleware'])
  167. def test_middleware_returns_none(self):
  168. msg = 'Middleware factory handlers.tests.empty_middleware returned None.'
  169. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  170. self.client.get('/')
  171. def test_no_response(self):
  172. msg = "The view %s didn't return an HttpResponse object. It returned None instead."
  173. tests = (
  174. ('/no_response_fbv/', 'handlers.views.no_response'),
  175. ('/no_response_cbv/', 'handlers.views.NoResponse.__call__'),
  176. )
  177. for url, view in tests:
  178. with self.subTest(url=url), self.assertRaisesMessage(ValueError, msg % view):
  179. self.client.get(url)
  180. class ScriptNameTests(SimpleTestCase):
  181. def test_get_script_name(self):
  182. # Regression test for #23173
  183. # Test first without PATH_INFO
  184. script_name = get_script_name({'SCRIPT_URL': '/foobar/'})
  185. self.assertEqual(script_name, '/foobar/')
  186. script_name = get_script_name({'SCRIPT_URL': '/foobar/', 'PATH_INFO': '/'})
  187. self.assertEqual(script_name, '/foobar')
  188. def test_get_script_name_double_slashes(self):
  189. """
  190. WSGI squashes multiple successive slashes in PATH_INFO, get_script_name
  191. should take that into account when forming SCRIPT_NAME (#17133).
  192. """
  193. script_name = get_script_name({
  194. 'SCRIPT_URL': '/mst/milestones//accounts/login//help',
  195. 'PATH_INFO': '/milestones/accounts/login/help',
  196. })
  197. self.assertEqual(script_name, '/mst')
  198. @override_settings(ROOT_URLCONF='handlers.urls')
  199. class AsyncHandlerRequestTests(SimpleTestCase):
  200. """Async variants of the normal handler request tests."""
  201. async def test_sync_view(self):
  202. """Calling a sync view down the asynchronous path."""
  203. response = await self.async_client.get('/regular/')
  204. self.assertEqual(response.status_code, 200)
  205. async def test_async_view(self):
  206. """Calling an async view down the asynchronous path."""
  207. response = await self.async_client.get('/async_regular/')
  208. self.assertEqual(response.status_code, 200)
  209. async def test_suspiciousop_in_view_returns_400(self):
  210. response = await self.async_client.get('/suspicious/')
  211. self.assertEqual(response.status_code, 400)
  212. async def test_no_response(self):
  213. msg = (
  214. "The view handlers.views.no_response didn't return an "
  215. "HttpResponse object. It returned None instead."
  216. )
  217. with self.assertRaisesMessage(ValueError, msg):
  218. await self.async_client.get('/no_response_fbv/')
  219. async def test_unawaited_response(self):
  220. msg = (
  221. "The view handlers.views.async_unawaited didn't return an "
  222. "HttpResponse object. It returned an unawaited coroutine instead. "
  223. "You may need to add an 'await' into your view."
  224. )
  225. with self.assertRaisesMessage(ValueError, msg):
  226. await self.async_client.get('/unawaited/')