123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- import asyncio
- import sys
- import threading
- from pathlib import Path
- from unittest import skipIf
- from asgiref.sync import SyncToAsync
- from asgiref.testing import ApplicationCommunicator
- from django.contrib.staticfiles.handlers import ASGIStaticFilesHandler
- from django.core.asgi import get_asgi_application
- from django.core.signals import request_finished, request_started
- from django.db import close_old_connections
- from django.test import (
- AsyncRequestFactory, SimpleTestCase, modify_settings, override_settings,
- )
- from django.utils.http import http_date
- from .urls import test_filename
- TEST_STATIC_ROOT = Path(__file__).parent / 'project' / 'static'
- @skipIf(sys.platform == 'win32' and (3, 8, 0) < sys.version_info < (3, 8, 1), 'https://bugs.python.org/issue38563')
- @override_settings(ROOT_URLCONF='asgi.urls')
- class ASGITest(SimpleTestCase):
- async_request_factory = AsyncRequestFactory()
- def setUp(self):
- request_started.disconnect(close_old_connections)
- def tearDown(self):
- request_started.connect(close_old_connections)
- async def test_get_asgi_application(self):
- """
- get_asgi_application() returns a functioning ASGI callable.
- """
- application = get_asgi_application()
- # Construct HTTP request.
- scope = self.async_request_factory._base_scope(path='/')
- communicator = ApplicationCommunicator(application, scope)
- await communicator.send_input({'type': 'http.request'})
- # Read the response.
- response_start = await communicator.receive_output()
- self.assertEqual(response_start['type'], 'http.response.start')
- self.assertEqual(response_start['status'], 200)
- self.assertEqual(
- set(response_start['headers']),
- {
- (b'Content-Length', b'12'),
- (b'Content-Type', b'text/html; charset=utf-8'),
- },
- )
- response_body = await communicator.receive_output()
- self.assertEqual(response_body['type'], 'http.response.body')
- self.assertEqual(response_body['body'], b'Hello World!')
- async def test_file_response(self):
- """
- Makes sure that FileResponse works over ASGI.
- """
- application = get_asgi_application()
- # Construct HTTP request.
- scope = self.async_request_factory._base_scope(path='/file/')
- communicator = ApplicationCommunicator(application, scope)
- await communicator.send_input({'type': 'http.request'})
- # Get the file content.
- with open(test_filename, 'rb') as test_file:
- test_file_contents = test_file.read()
- # Read the response.
- response_start = await communicator.receive_output()
- self.assertEqual(response_start['type'], 'http.response.start')
- self.assertEqual(response_start['status'], 200)
- headers = response_start['headers']
- self.assertEqual(len(headers), 3)
- expected_headers = {
- b'Content-Length': str(len(test_file_contents)).encode('ascii'),
- b'Content-Type': b'text/x-python',
- b'Content-Disposition': b'inline; filename="urls.py"',
- }
- for key, value in headers:
- try:
- self.assertEqual(value, expected_headers[key])
- except AssertionError:
- # Windows registry may not be configured with correct
- # mimetypes.
- if sys.platform == 'win32' and key == b'Content-Type':
- self.assertEqual(value, b'text/plain')
- else:
- raise
- response_body = await communicator.receive_output()
- self.assertEqual(response_body['type'], 'http.response.body')
- self.assertEqual(response_body['body'], test_file_contents)
- # Allow response.close() to finish.
- await communicator.wait()
- @modify_settings(INSTALLED_APPS={'append': 'django.contrib.staticfiles'})
- @override_settings(
- STATIC_URL='static/',
- STATIC_ROOT=TEST_STATIC_ROOT,
- STATICFILES_DIRS=[TEST_STATIC_ROOT],
- STATICFILES_FINDERS=[
- 'django.contrib.staticfiles.finders.FileSystemFinder',
- ],
- )
- async def test_static_file_response(self):
- application = ASGIStaticFilesHandler(get_asgi_application())
- # Construct HTTP request.
- scope = self.async_request_factory._base_scope(path='/static/file.txt')
- communicator = ApplicationCommunicator(application, scope)
- await communicator.send_input({'type': 'http.request'})
- # Get the file content.
- file_path = TEST_STATIC_ROOT / 'file.txt'
- with open(file_path, 'rb') as test_file:
- test_file_contents = test_file.read()
- # Read the response.
- stat = file_path.stat()
- response_start = await communicator.receive_output()
- self.assertEqual(response_start['type'], 'http.response.start')
- self.assertEqual(response_start['status'], 200)
- self.assertEqual(
- set(response_start['headers']),
- {
- (b'Content-Length', str(len(test_file_contents)).encode('ascii')),
- (b'Content-Type', b'text/plain'),
- (b'Content-Disposition', b'inline; filename="file.txt"'),
- (b'Last-Modified', http_date(stat.st_mtime).encode('ascii')),
- },
- )
- response_body = await communicator.receive_output()
- self.assertEqual(response_body['type'], 'http.response.body')
- self.assertEqual(response_body['body'], test_file_contents)
- # Allow response.close() to finish.
- await communicator.wait()
- async def test_headers(self):
- application = get_asgi_application()
- communicator = ApplicationCommunicator(
- application,
- self.async_request_factory._base_scope(
- path='/meta/',
- headers=[
- [b'content-type', b'text/plain; charset=utf-8'],
- [b'content-length', b'77'],
- [b'referer', b'Scotland'],
- [b'referer', b'Wales'],
- ],
- ),
- )
- await communicator.send_input({'type': 'http.request'})
- response_start = await communicator.receive_output()
- self.assertEqual(response_start['type'], 'http.response.start')
- self.assertEqual(response_start['status'], 200)
- self.assertEqual(
- set(response_start['headers']),
- {
- (b'Content-Length', b'19'),
- (b'Content-Type', b'text/plain; charset=utf-8'),
- },
- )
- response_body = await communicator.receive_output()
- self.assertEqual(response_body['type'], 'http.response.body')
- self.assertEqual(response_body['body'], b'From Scotland,Wales')
- async def test_get_query_string(self):
- application = get_asgi_application()
- for query_string in (b'name=Andrew', 'name=Andrew'):
- with self.subTest(query_string=query_string):
- scope = self.async_request_factory._base_scope(
- path='/',
- query_string=query_string,
- )
- communicator = ApplicationCommunicator(application, scope)
- await communicator.send_input({'type': 'http.request'})
- response_start = await communicator.receive_output()
- self.assertEqual(response_start['type'], 'http.response.start')
- self.assertEqual(response_start['status'], 200)
- response_body = await communicator.receive_output()
- self.assertEqual(response_body['type'], 'http.response.body')
- self.assertEqual(response_body['body'], b'Hello Andrew!')
- async def test_disconnect(self):
- application = get_asgi_application()
- scope = self.async_request_factory._base_scope(path='/')
- communicator = ApplicationCommunicator(application, scope)
- await communicator.send_input({'type': 'http.disconnect'})
- with self.assertRaises(asyncio.TimeoutError):
- await communicator.receive_output()
- async def test_wrong_connection_type(self):
- application = get_asgi_application()
- scope = self.async_request_factory._base_scope(path='/', type='other')
- communicator = ApplicationCommunicator(application, scope)
- await communicator.send_input({'type': 'http.request'})
- msg = 'Django can only handle ASGI/HTTP connections, not other.'
- with self.assertRaisesMessage(ValueError, msg):
- await communicator.receive_output()
- async def test_non_unicode_query_string(self):
- application = get_asgi_application()
- scope = self.async_request_factory._base_scope(path='/', query_string=b'\xff')
- communicator = ApplicationCommunicator(application, scope)
- await communicator.send_input({'type': 'http.request'})
- response_start = await communicator.receive_output()
- self.assertEqual(response_start['type'], 'http.response.start')
- self.assertEqual(response_start['status'], 400)
- response_body = await communicator.receive_output()
- self.assertEqual(response_body['type'], 'http.response.body')
- self.assertEqual(response_body['body'], b'')
- async def test_request_lifecycle_signals_dispatched_with_thread_sensitive(self):
- class SignalHandler:
- """Track threads handler is dispatched on."""
- threads = []
- def __call__(self, **kwargs):
- self.threads.append(threading.current_thread())
- signal_handler = SignalHandler()
- request_started.connect(signal_handler)
- request_finished.connect(signal_handler)
- # Perform a basic request.
- application = get_asgi_application()
- scope = self.async_request_factory._base_scope(path='/')
- communicator = ApplicationCommunicator(application, scope)
- await communicator.send_input({'type': 'http.request'})
- response_start = await communicator.receive_output()
- self.assertEqual(response_start['type'], 'http.response.start')
- self.assertEqual(response_start['status'], 200)
- response_body = await communicator.receive_output()
- self.assertEqual(response_body['type'], 'http.response.body')
- self.assertEqual(response_body['body'], b'Hello World!')
- # Give response.close() time to finish.
- await communicator.wait()
- # At this point, AsyncToSync does not have a current executor. Thus
- # SyncToAsync falls-back to .single_thread_executor.
- target_thread = next(iter(SyncToAsync.single_thread_executor._threads))
- request_started_thread, request_finished_thread = signal_handler.threads
- self.assertEqual(request_started_thread, target_thread)
- self.assertEqual(request_finished_thread, target_thread)
- request_started.disconnect(signal_handler)
- request_finished.disconnect(signal_handler)
|