tests.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. import asyncio
  2. import sys
  3. import threading
  4. from pathlib import Path
  5. from asgiref.testing import ApplicationCommunicator
  6. from django.contrib.staticfiles.handlers import ASGIStaticFilesHandler
  7. from django.core.asgi import get_asgi_application
  8. from django.core.handlers.asgi import ASGIHandler, ASGIRequest
  9. from django.core.signals import request_finished, request_started
  10. from django.db import close_old_connections
  11. from django.http import HttpResponse, StreamingHttpResponse
  12. from django.test import (
  13. AsyncRequestFactory,
  14. SimpleTestCase,
  15. ignore_warnings,
  16. modify_settings,
  17. override_settings,
  18. )
  19. from django.urls import path
  20. from django.utils.http import http_date
  21. from .urls import sync_waiter, test_filename
  22. TEST_STATIC_ROOT = Path(__file__).parent / "project" / "static"
  23. @override_settings(ROOT_URLCONF="asgi.urls")
  24. class ASGITest(SimpleTestCase):
  25. async_request_factory = AsyncRequestFactory()
  26. def setUp(self):
  27. request_started.disconnect(close_old_connections)
  28. def tearDown(self):
  29. request_started.connect(close_old_connections)
  30. async def test_get_asgi_application(self):
  31. """
  32. get_asgi_application() returns a functioning ASGI callable.
  33. """
  34. application = get_asgi_application()
  35. # Construct HTTP request.
  36. scope = self.async_request_factory._base_scope(path="/")
  37. communicator = ApplicationCommunicator(application, scope)
  38. await communicator.send_input({"type": "http.request"})
  39. # Read the response.
  40. response_start = await communicator.receive_output()
  41. self.assertEqual(response_start["type"], "http.response.start")
  42. self.assertEqual(response_start["status"], 200)
  43. self.assertEqual(
  44. set(response_start["headers"]),
  45. {
  46. (b"Content-Length", b"12"),
  47. (b"Content-Type", b"text/html; charset=utf-8"),
  48. },
  49. )
  50. response_body = await communicator.receive_output()
  51. self.assertEqual(response_body["type"], "http.response.body")
  52. self.assertEqual(response_body["body"], b"Hello World!")
  53. # Allow response.close() to finish.
  54. await communicator.wait()
  55. # Python's file API is not async compatible. A third-party library such
  56. # as https://github.com/Tinche/aiofiles allows passing the file to
  57. # FileResponse as an async iterator. With a sync iterator
  58. # StreamingHTTPResponse triggers a warning when iterating the file.
  59. # assertWarnsMessage is not async compatible, so ignore_warnings for the
  60. # test.
  61. @ignore_warnings(module="django.http.response")
  62. async def test_file_response(self):
  63. """
  64. Makes sure that FileResponse works over ASGI.
  65. """
  66. application = get_asgi_application()
  67. # Construct HTTP request.
  68. scope = self.async_request_factory._base_scope(path="/file/")
  69. communicator = ApplicationCommunicator(application, scope)
  70. await communicator.send_input({"type": "http.request"})
  71. # Get the file content.
  72. with open(test_filename, "rb") as test_file:
  73. test_file_contents = test_file.read()
  74. # Read the response.
  75. response_start = await communicator.receive_output()
  76. self.assertEqual(response_start["type"], "http.response.start")
  77. self.assertEqual(response_start["status"], 200)
  78. headers = response_start["headers"]
  79. self.assertEqual(len(headers), 3)
  80. expected_headers = {
  81. b"Content-Length": str(len(test_file_contents)).encode("ascii"),
  82. b"Content-Type": b"text/x-python",
  83. b"Content-Disposition": b'inline; filename="urls.py"',
  84. }
  85. for key, value in headers:
  86. try:
  87. self.assertEqual(value, expected_headers[key])
  88. except AssertionError:
  89. # Windows registry may not be configured with correct
  90. # mimetypes.
  91. if sys.platform == "win32" and key == b"Content-Type":
  92. self.assertEqual(value, b"text/plain")
  93. else:
  94. raise
  95. # Warning ignored here.
  96. response_body = await communicator.receive_output()
  97. self.assertEqual(response_body["type"], "http.response.body")
  98. self.assertEqual(response_body["body"], test_file_contents)
  99. # Allow response.close() to finish.
  100. await communicator.wait()
  101. @modify_settings(INSTALLED_APPS={"append": "django.contrib.staticfiles"})
  102. @override_settings(
  103. STATIC_URL="static/",
  104. STATIC_ROOT=TEST_STATIC_ROOT,
  105. STATICFILES_DIRS=[TEST_STATIC_ROOT],
  106. STATICFILES_FINDERS=[
  107. "django.contrib.staticfiles.finders.FileSystemFinder",
  108. ],
  109. )
  110. async def test_static_file_response(self):
  111. application = ASGIStaticFilesHandler(get_asgi_application())
  112. # Construct HTTP request.
  113. scope = self.async_request_factory._base_scope(path="/static/file.txt")
  114. communicator = ApplicationCommunicator(application, scope)
  115. await communicator.send_input({"type": "http.request"})
  116. # Get the file content.
  117. file_path = TEST_STATIC_ROOT / "file.txt"
  118. with open(file_path, "rb") as test_file:
  119. test_file_contents = test_file.read()
  120. # Read the response.
  121. stat = file_path.stat()
  122. response_start = await communicator.receive_output()
  123. self.assertEqual(response_start["type"], "http.response.start")
  124. self.assertEqual(response_start["status"], 200)
  125. self.assertEqual(
  126. set(response_start["headers"]),
  127. {
  128. (b"Content-Length", str(len(test_file_contents)).encode("ascii")),
  129. (b"Content-Type", b"text/plain"),
  130. (b"Content-Disposition", b'inline; filename="file.txt"'),
  131. (b"Last-Modified", http_date(stat.st_mtime).encode("ascii")),
  132. },
  133. )
  134. response_body = await communicator.receive_output()
  135. self.assertEqual(response_body["type"], "http.response.body")
  136. self.assertEqual(response_body["body"], test_file_contents)
  137. # Allow response.close() to finish.
  138. await communicator.wait()
  139. async def test_headers(self):
  140. application = get_asgi_application()
  141. communicator = ApplicationCommunicator(
  142. application,
  143. self.async_request_factory._base_scope(
  144. path="/meta/",
  145. headers=[
  146. [b"content-type", b"text/plain; charset=utf-8"],
  147. [b"content-length", b"77"],
  148. [b"referer", b"Scotland"],
  149. [b"referer", b"Wales"],
  150. ],
  151. ),
  152. )
  153. await communicator.send_input({"type": "http.request"})
  154. response_start = await communicator.receive_output()
  155. self.assertEqual(response_start["type"], "http.response.start")
  156. self.assertEqual(response_start["status"], 200)
  157. self.assertEqual(
  158. set(response_start["headers"]),
  159. {
  160. (b"Content-Length", b"19"),
  161. (b"Content-Type", b"text/plain; charset=utf-8"),
  162. },
  163. )
  164. response_body = await communicator.receive_output()
  165. self.assertEqual(response_body["type"], "http.response.body")
  166. self.assertEqual(response_body["body"], b"From Scotland,Wales")
  167. # Allow response.close() to finish
  168. await communicator.wait()
  169. async def test_post_body(self):
  170. application = get_asgi_application()
  171. scope = self.async_request_factory._base_scope(
  172. method="POST",
  173. path="/post/",
  174. query_string="echo=1",
  175. )
  176. communicator = ApplicationCommunicator(application, scope)
  177. await communicator.send_input({"type": "http.request", "body": b"Echo!"})
  178. response_start = await communicator.receive_output()
  179. self.assertEqual(response_start["type"], "http.response.start")
  180. self.assertEqual(response_start["status"], 200)
  181. response_body = await communicator.receive_output()
  182. self.assertEqual(response_body["type"], "http.response.body")
  183. self.assertEqual(response_body["body"], b"Echo!")
  184. async def test_untouched_request_body_gets_closed(self):
  185. application = get_asgi_application()
  186. scope = self.async_request_factory._base_scope(method="POST", path="/post/")
  187. communicator = ApplicationCommunicator(application, scope)
  188. await communicator.send_input({"type": "http.request"})
  189. response_start = await communicator.receive_output()
  190. self.assertEqual(response_start["type"], "http.response.start")
  191. self.assertEqual(response_start["status"], 204)
  192. response_body = await communicator.receive_output()
  193. self.assertEqual(response_body["type"], "http.response.body")
  194. self.assertEqual(response_body["body"], b"")
  195. # Allow response.close() to finish
  196. await communicator.wait()
  197. async def test_get_query_string(self):
  198. application = get_asgi_application()
  199. for query_string in (b"name=Andrew", "name=Andrew"):
  200. with self.subTest(query_string=query_string):
  201. scope = self.async_request_factory._base_scope(
  202. path="/",
  203. query_string=query_string,
  204. )
  205. communicator = ApplicationCommunicator(application, scope)
  206. await communicator.send_input({"type": "http.request"})
  207. response_start = await communicator.receive_output()
  208. self.assertEqual(response_start["type"], "http.response.start")
  209. self.assertEqual(response_start["status"], 200)
  210. response_body = await communicator.receive_output()
  211. self.assertEqual(response_body["type"], "http.response.body")
  212. self.assertEqual(response_body["body"], b"Hello Andrew!")
  213. # Allow response.close() to finish
  214. await communicator.wait()
  215. async def test_disconnect(self):
  216. application = get_asgi_application()
  217. scope = self.async_request_factory._base_scope(path="/")
  218. communicator = ApplicationCommunicator(application, scope)
  219. await communicator.send_input({"type": "http.disconnect"})
  220. with self.assertRaises(asyncio.TimeoutError):
  221. await communicator.receive_output()
  222. async def test_disconnect_both_return(self):
  223. # Force both the disconnect listener and the task that sends the
  224. # response to finish at the same time.
  225. application = get_asgi_application()
  226. scope = self.async_request_factory._base_scope(path="/")
  227. communicator = ApplicationCommunicator(application, scope)
  228. await communicator.send_input({"type": "http.request", "body": b"some body"})
  229. # Fetch response headers (this yields to asyncio and causes
  230. # ASGHandler.send_response() to dump the body of the response in the
  231. # queue).
  232. await communicator.receive_output()
  233. # Fetch response body (there's already some data queued up, so this
  234. # doesn't actually yield to the event loop, it just succeeds
  235. # instantly).
  236. await communicator.receive_output()
  237. # Send disconnect at the same time that response finishes (this just
  238. # puts some info in a queue, it doesn't have to yield to the event
  239. # loop).
  240. await communicator.send_input({"type": "http.disconnect"})
  241. # Waiting for the communicator _does_ yield to the event loop, since
  242. # ASGIHandler.send_response() is still waiting to do response.close().
  243. # It so happens that there are enough remaining yield points in both
  244. # tasks that they both finish while the loop is running.
  245. await communicator.wait()
  246. async def test_disconnect_with_body(self):
  247. application = get_asgi_application()
  248. scope = self.async_request_factory._base_scope(path="/")
  249. communicator = ApplicationCommunicator(application, scope)
  250. await communicator.send_input({"type": "http.request", "body": b"some body"})
  251. await communicator.send_input({"type": "http.disconnect"})
  252. with self.assertRaises(asyncio.TimeoutError):
  253. await communicator.receive_output()
  254. async def test_assert_in_listen_for_disconnect(self):
  255. application = get_asgi_application()
  256. scope = self.async_request_factory._base_scope(path="/")
  257. communicator = ApplicationCommunicator(application, scope)
  258. await communicator.send_input({"type": "http.request"})
  259. await communicator.send_input({"type": "http.not_a_real_message"})
  260. msg = "Invalid ASGI message after request body: http.not_a_real_message"
  261. with self.assertRaisesMessage(AssertionError, msg):
  262. await communicator.wait()
  263. async def test_delayed_disconnect_with_body(self):
  264. application = get_asgi_application()
  265. scope = self.async_request_factory._base_scope(path="/delayed_hello/")
  266. communicator = ApplicationCommunicator(application, scope)
  267. await communicator.send_input({"type": "http.request", "body": b"some body"})
  268. await communicator.send_input({"type": "http.disconnect"})
  269. with self.assertRaises(asyncio.TimeoutError):
  270. await communicator.receive_output()
  271. async def test_wrong_connection_type(self):
  272. application = get_asgi_application()
  273. scope = self.async_request_factory._base_scope(path="/", type="other")
  274. communicator = ApplicationCommunicator(application, scope)
  275. await communicator.send_input({"type": "http.request"})
  276. msg = "Django can only handle ASGI/HTTP connections, not other."
  277. with self.assertRaisesMessage(ValueError, msg):
  278. await communicator.receive_output()
  279. async def test_non_unicode_query_string(self):
  280. application = get_asgi_application()
  281. scope = self.async_request_factory._base_scope(path="/", query_string=b"\xff")
  282. communicator = ApplicationCommunicator(application, scope)
  283. await communicator.send_input({"type": "http.request"})
  284. response_start = await communicator.receive_output()
  285. self.assertEqual(response_start["type"], "http.response.start")
  286. self.assertEqual(response_start["status"], 400)
  287. response_body = await communicator.receive_output()
  288. self.assertEqual(response_body["type"], "http.response.body")
  289. self.assertEqual(response_body["body"], b"")
  290. async def test_request_lifecycle_signals_dispatched_with_thread_sensitive(self):
  291. class SignalHandler:
  292. """Track threads handler is dispatched on."""
  293. threads = []
  294. def __call__(self, **kwargs):
  295. self.threads.append(threading.current_thread())
  296. signal_handler = SignalHandler()
  297. request_started.connect(signal_handler)
  298. request_finished.connect(signal_handler)
  299. # Perform a basic request.
  300. application = get_asgi_application()
  301. scope = self.async_request_factory._base_scope(path="/")
  302. communicator = ApplicationCommunicator(application, scope)
  303. await communicator.send_input({"type": "http.request"})
  304. response_start = await communicator.receive_output()
  305. self.assertEqual(response_start["type"], "http.response.start")
  306. self.assertEqual(response_start["status"], 200)
  307. response_body = await communicator.receive_output()
  308. self.assertEqual(response_body["type"], "http.response.body")
  309. self.assertEqual(response_body["body"], b"Hello World!")
  310. # Give response.close() time to finish.
  311. await communicator.wait()
  312. # AsyncToSync should have executed the signals in the same thread.
  313. request_started_thread, request_finished_thread = signal_handler.threads
  314. self.assertEqual(request_started_thread, request_finished_thread)
  315. request_started.disconnect(signal_handler)
  316. request_finished.disconnect(signal_handler)
  317. async def test_concurrent_async_uses_multiple_thread_pools(self):
  318. sync_waiter.active_threads.clear()
  319. # Send 2 requests concurrently
  320. application = get_asgi_application()
  321. scope = self.async_request_factory._base_scope(path="/wait/")
  322. communicators = []
  323. for _ in range(2):
  324. communicators.append(ApplicationCommunicator(application, scope))
  325. await communicators[-1].send_input({"type": "http.request"})
  326. # Each request must complete with a status code of 200
  327. # If requests aren't scheduled concurrently, the barrier in the
  328. # sync_wait view will time out, resulting in a 500 status code.
  329. for communicator in communicators:
  330. response_start = await communicator.receive_output()
  331. self.assertEqual(response_start["type"], "http.response.start")
  332. self.assertEqual(response_start["status"], 200)
  333. response_body = await communicator.receive_output()
  334. self.assertEqual(response_body["type"], "http.response.body")
  335. self.assertEqual(response_body["body"], b"Hello World!")
  336. # Give response.close() time to finish.
  337. await communicator.wait()
  338. # The requests should have scheduled on different threads. Note
  339. # active_threads is a set (a thread can only appear once), therefore
  340. # length is a sufficient check.
  341. self.assertEqual(len(sync_waiter.active_threads), 2)
  342. sync_waiter.active_threads.clear()
  343. async def test_asyncio_cancel_error(self):
  344. # Flag to check if the view was cancelled.
  345. view_did_cancel = False
  346. # A view that will listen for the cancelled error.
  347. async def view(request):
  348. nonlocal view_did_cancel
  349. try:
  350. await asyncio.sleep(0.2)
  351. return HttpResponse("Hello World!")
  352. except asyncio.CancelledError:
  353. # Set the flag.
  354. view_did_cancel = True
  355. raise
  356. # Request class to use the view.
  357. class TestASGIRequest(ASGIRequest):
  358. urlconf = (path("cancel/", view),)
  359. # Handler to use request class.
  360. class TestASGIHandler(ASGIHandler):
  361. request_class = TestASGIRequest
  362. # Request cycle should complete since no disconnect was sent.
  363. application = TestASGIHandler()
  364. scope = self.async_request_factory._base_scope(path="/cancel/")
  365. communicator = ApplicationCommunicator(application, scope)
  366. await communicator.send_input({"type": "http.request"})
  367. response_start = await communicator.receive_output()
  368. self.assertEqual(response_start["type"], "http.response.start")
  369. self.assertEqual(response_start["status"], 200)
  370. response_body = await communicator.receive_output()
  371. self.assertEqual(response_body["type"], "http.response.body")
  372. self.assertEqual(response_body["body"], b"Hello World!")
  373. # Give response.close() time to finish.
  374. await communicator.wait()
  375. self.assertIs(view_did_cancel, False)
  376. # Request cycle with a disconnect before the view can respond.
  377. application = TestASGIHandler()
  378. scope = self.async_request_factory._base_scope(path="/cancel/")
  379. communicator = ApplicationCommunicator(application, scope)
  380. await communicator.send_input({"type": "http.request"})
  381. # Let the view actually start.
  382. await asyncio.sleep(0.1)
  383. # Disconnect the client.
  384. await communicator.send_input({"type": "http.disconnect"})
  385. # The handler should not send a response.
  386. with self.assertRaises(asyncio.TimeoutError):
  387. await communicator.receive_output()
  388. await communicator.wait()
  389. self.assertIs(view_did_cancel, True)
  390. async def test_asyncio_streaming_cancel_error(self):
  391. # Similar to test_asyncio_cancel_error(), but during a streaming
  392. # response.
  393. view_did_cancel = False
  394. async def streaming_response():
  395. nonlocal view_did_cancel
  396. try:
  397. await asyncio.sleep(0.2)
  398. yield b"Hello World!"
  399. except asyncio.CancelledError:
  400. # Set the flag.
  401. view_did_cancel = True
  402. raise
  403. async def view(request):
  404. return StreamingHttpResponse(streaming_response())
  405. class TestASGIRequest(ASGIRequest):
  406. urlconf = (path("cancel/", view),)
  407. class TestASGIHandler(ASGIHandler):
  408. request_class = TestASGIRequest
  409. # With no disconnect, the request cycle should complete in the same
  410. # manner as the non-streaming response.
  411. application = TestASGIHandler()
  412. scope = self.async_request_factory._base_scope(path="/cancel/")
  413. communicator = ApplicationCommunicator(application, scope)
  414. await communicator.send_input({"type": "http.request"})
  415. response_start = await communicator.receive_output()
  416. self.assertEqual(response_start["type"], "http.response.start")
  417. self.assertEqual(response_start["status"], 200)
  418. response_body = await communicator.receive_output()
  419. self.assertEqual(response_body["type"], "http.response.body")
  420. self.assertEqual(response_body["body"], b"Hello World!")
  421. await communicator.wait()
  422. self.assertIs(view_did_cancel, False)
  423. # Request cycle with a disconnect.
  424. application = TestASGIHandler()
  425. scope = self.async_request_factory._base_scope(path="/cancel/")
  426. communicator = ApplicationCommunicator(application, scope)
  427. await communicator.send_input({"type": "http.request"})
  428. response_start = await communicator.receive_output()
  429. # Fetch the start of response so streaming can begin
  430. self.assertEqual(response_start["type"], "http.response.start")
  431. self.assertEqual(response_start["status"], 200)
  432. await asyncio.sleep(0.1)
  433. # Now disconnect the client.
  434. await communicator.send_input({"type": "http.disconnect"})
  435. # This time the handler should not send a response.
  436. with self.assertRaises(asyncio.TimeoutError):
  437. await communicator.receive_output()
  438. await communicator.wait()
  439. self.assertIs(view_did_cancel, True)
  440. async def test_streaming(self):
  441. scope = self.async_request_factory._base_scope(
  442. path="/streaming/", query_string=b"sleep=0.001"
  443. )
  444. application = get_asgi_application()
  445. communicator = ApplicationCommunicator(application, scope)
  446. await communicator.send_input({"type": "http.request"})
  447. # Fetch http.response.start.
  448. await communicator.receive_output(timeout=1)
  449. # Fetch the 'first' and 'last'.
  450. first_response = await communicator.receive_output(timeout=1)
  451. self.assertEqual(first_response["body"], b"first\n")
  452. second_response = await communicator.receive_output(timeout=1)
  453. self.assertEqual(second_response["body"], b"last\n")
  454. # Fetch the rest of the response so that coroutines are cleaned up.
  455. await communicator.receive_output(timeout=1)
  456. with self.assertRaises(asyncio.TimeoutError):
  457. await communicator.receive_output(timeout=1)
  458. async def test_streaming_disconnect(self):
  459. scope = self.async_request_factory._base_scope(
  460. path="/streaming/", query_string=b"sleep=0.1"
  461. )
  462. application = get_asgi_application()
  463. communicator = ApplicationCommunicator(application, scope)
  464. await communicator.send_input({"type": "http.request"})
  465. await communicator.receive_output(timeout=1)
  466. first_response = await communicator.receive_output(timeout=1)
  467. self.assertEqual(first_response["body"], b"first\n")
  468. # Disconnect the client.
  469. await communicator.send_input({"type": "http.disconnect"})
  470. # 'last\n' isn't sent.
  471. with self.assertRaises(asyncio.TimeoutError):
  472. await communicator.receive_output(timeout=0.2)