Browse Source

Fixed #33735 -- Added async support to StreamingHttpResponse.

Thanks to Florian Vazelle for initial exploratory work, and to Nick
Pope and Mariusz Felisiak for review.
Carlton Gibson 2 years ago
parent
commit
0bd2c0c901

+ 17 - 13
django/core/handlers/asgi.py

@@ -19,6 +19,7 @@ from django.http import (
     parse_cookie,
 )
 from django.urls import set_script_prefix
+from django.utils.asyncio import aclosing
 from django.utils.functional import cached_property
 
 logger = logging.getLogger("django.request")
@@ -263,19 +264,22 @@ class ASGIHandler(base.BaseHandler):
         )
         # Streaming responses need to be pinned to their iterator.
         if response.streaming:
-            # Access `__iter__` and not `streaming_content` directly in case
-            # it has been overridden in a subclass.
-            for part in response:
-                for chunk, _ in self.chunk_bytes(part):
-                    await send(
-                        {
-                            "type": "http.response.body",
-                            "body": chunk,
-                            # Ignore "more" as there may be more parts; instead,
-                            # use an empty final closing message with False.
-                            "more_body": True,
-                        }
-                    )
+            # - Consume via `__aiter__` and not `streaming_content` directly, to
+            #   allow mapping of a sync iterator.
+            # - Use aclosing() when consuming aiter.
+            #   See https://github.com/python/cpython/commit/6e8dcda
+            async with aclosing(response.__aiter__()) as content:
+                async for part in content:
+                    for chunk, _ in self.chunk_bytes(part):
+                        await send(
+                            {
+                                "type": "http.response.body",
+                                "body": chunk,
+                                # Ignore "more" as there may be more parts; instead,
+                                # use an empty final closing message with False.
+                                "more_body": True,
+                            }
+                        )
             # Final closing message.
             await send({"type": "http.response.body"})
         # Other responses just need chunking.

+ 53 - 3
django/http/response.py

@@ -6,10 +6,13 @@ import os
 import re
 import sys
 import time
+import warnings
 from email.header import Header
 from http.client import responses
 from urllib.parse import urlparse
 
+from asgiref.sync import async_to_sync, sync_to_async
+
 from django.conf import settings
 from django.core import signals, signing
 from django.core.exceptions import DisallowedRedirect
@@ -476,7 +479,18 @@ class StreamingHttpResponse(HttpResponseBase):
 
     @property
     def streaming_content(self):
-        return map(self.make_bytes, self._iterator)
+        if self.is_async:
+            # pull to lexical scope to capture fixed reference in case
+            # streaming_content is set again later.
+            _iterator = self._iterator
+
+            async def awrapper():
+                async for part in _iterator:
+                    yield self.make_bytes(part)
+
+            return awrapper()
+        else:
+            return map(self.make_bytes, self._iterator)
 
     @streaming_content.setter
     def streaming_content(self, value):
@@ -484,12 +498,48 @@ class StreamingHttpResponse(HttpResponseBase):
 
     def _set_streaming_content(self, value):
         # Ensure we can never iterate on "value" more than once.
-        self._iterator = iter(value)
+        try:
+            self._iterator = iter(value)
+            self.is_async = False
+        except TypeError:
+            self._iterator = value.__aiter__()
+            self.is_async = True
         if hasattr(value, "close"):
             self._resource_closers.append(value.close)
 
     def __iter__(self):
-        return self.streaming_content
+        try:
+            return iter(self.streaming_content)
+        except TypeError:
+            warnings.warn(
+                "StreamingHttpResponse must consume asynchronous iterators in order to "
+                "serve them synchronously. Use a synchronous iterator instead.",
+                Warning,
+            )
+
+            # async iterator. Consume in async_to_sync and map back.
+            async def to_list(_iterator):
+                as_list = []
+                async for chunk in _iterator:
+                    as_list.append(chunk)
+                return as_list
+
+            return map(self.make_bytes, iter(async_to_sync(to_list)(self._iterator)))
+
+    async def __aiter__(self):
+        try:
+            async for part in self.streaming_content:
+                yield part
+        except TypeError:
+            warnings.warn(
+                "StreamingHttpResponse must consume synchronous iterators in order to "
+                "serve them asynchronously. Use an asynchronous iterator instead.",
+                Warning,
+            )
+            # sync iterator. Consume via sync_to_async and yield via async
+            # generator.
+            for part in await sync_to_async(list)(self.streaming_content):
+                yield part
 
     def getvalue(self):
         return b"".join(self.streaming_content)

+ 18 - 4
django/middleware/gzip.py

@@ -31,12 +31,26 @@ class GZipMiddleware(MiddlewareMixin):
             return response
 
         if response.streaming:
+            if response.is_async:
+                # pull to lexical scope to capture fixed reference in case
+                # streaming_content is set again later.
+                orignal_iterator = response.streaming_content
+
+                async def gzip_wrapper():
+                    async for chunk in orignal_iterator:
+                        yield compress_string(
+                            chunk,
+                            max_random_bytes=self.max_random_bytes,
+                        )
+
+                response.streaming_content = gzip_wrapper()
+            else:
+                response.streaming_content = compress_sequence(
+                    response.streaming_content,
+                    max_random_bytes=self.max_random_bytes,
+                )
             # Delete the `Content-Length` header for streaming content, because
             # we won't know the compressed size until we stream it.
-            response.streaming_content = compress_sequence(
-                response.streaming_content,
-                max_random_bytes=self.max_random_bytes,
-            )
             del response.headers["Content-Length"]
         else:
             # Return the compressed content only if it's actually shorter.

+ 25 - 0
django/utils/asyncio.py

@@ -37,3 +37,28 @@ def async_unsafe(message):
         return decorator(func)
     else:
         return decorator
+
+
+try:
+    from contextlib import aclosing
+except ImportError:
+    # TODO: Remove when dropping support for PY39.
+    from contextlib import AbstractAsyncContextManager
+
+    # Backport of contextlib.aclosing() from Python 3.10. Copyright (C) Python
+    # Software Foundation (see LICENSE.python).
+    class aclosing(AbstractAsyncContextManager):
+        """
+        Async context manager for safely finalizing an asynchronously
+        cleaned-up resource such as an async generator, calling its
+        ``aclose()`` method.
+        """
+
+        def __init__(self, thing):
+            self.thing = thing
+
+        async def __aenter__(self):
+            return self.thing
+
+        async def __aexit__(self, *exc_info):
+            await self.thing.aclose()

+ 70 - 18
docs/ref/request-response.txt

@@ -1116,43 +1116,76 @@ parameter to the constructor method::
 .. class:: StreamingHttpResponse
 
 The :class:`StreamingHttpResponse` class is used to stream a response from
-Django to the browser. You might want to do this if generating the response
-takes too long or uses too much memory. For instance, it's useful for
-:ref:`generating large CSV files <streaming-csv-files>`.
+Django to the browser.
 
-.. admonition:: Performance considerations
+.. admonition:: Advanced usage
 
-    Django is designed for short-lived requests. Streaming responses will tie
-    a worker process for the entire duration of the response. This may result
-    in poor performance.
+    :class:`StreamingHttpResponse` is somewhat advanced, in that it is
+    important to know whether you'll be serving your application synchronously
+    under WSGI or asynchronously under ASGI, and adjust your usage
+    appropriately.
 
-    Generally speaking, you should perform expensive tasks outside of the
-    request-response cycle, rather than resorting to a streamed response.
+    Please read these notes with care.
+
+An example usage of :class:`StreamingHttpResponse` under WSGI is streaming
+content when generating the response would take too long or uses too much
+memory. For instance, it's useful for :ref:`generating large CSV files
+<streaming-csv-files>`.
+
+There are performance considerations when doing this, though. Django, under
+WSGI, is designed for short-lived requests. Streaming responses will tie a
+worker process for the entire duration of the response. This may result in poor
+performance.
+
+Generally speaking, you would perform expensive tasks outside of the
+request-response cycle, rather than resorting to a streamed response.
+
+When serving under ASGI, however, a :class:`StreamingHttpResponse` need not
+stop other requests from being served whilst waiting for I/O. This opens up
+the possibility of long-lived requests for streaming content and implementing
+patterns such as long-polling, and server-sent events.
+
+Even under ASGI note, :class:`StreamingHttpResponse` should only be used in
+situations where it is absolutely required that the whole content isn't
+iterated before transferring the data to the client. Because the content can't
+be accessed, many middleware can't function normally. For example the ``ETag``
+and ``Content-Length`` headers can't be generated for streaming responses.
 
 The :class:`StreamingHttpResponse` is not a subclass of :class:`HttpResponse`,
 because it features a slightly different API. However, it is almost identical,
 with the following notable differences:
 
-* It should be given an iterator that yields bytestrings as content.
+* It should be given an iterator that yields bytestrings as content. When
+  serving under WSGI, this should be a sync iterator. When serving under ASGI,
+  this is should an async iterator.
 
 * You cannot access its content, except by iterating the response object
-  itself. This should only occur when the response is returned to the client.
+  itself. This should only occur when the response is returned to the client:
+  you should not iterate the response yourself.
+
+  Under WSGI the response will be iterated synchronously. Under ASGI the
+  response will be iterated asynchronously. (This is why the iterator type must
+  match the protocol you're using.)
+
+  To avoid a crash, an incorrect iterator type will be mapped to the correct
+  type during iteration, and a warning will be raised, but in order to do this
+  the iterator must be fully-consumed, which defeats the purpose of using a
+  :class:`StreamingHttpResponse` at all.
 
 * It has no ``content`` attribute. Instead, it has a
-  :attr:`~StreamingHttpResponse.streaming_content` attribute.
+  :attr:`~StreamingHttpResponse.streaming_content` attribute. This can be used
+  in middleware to wrap the response iterable, but should not be consumed.
 
 * You cannot use the file-like object ``tell()`` or ``write()`` methods.
   Doing so will raise an exception.
 
-:class:`StreamingHttpResponse` should only be used in situations where it is
-absolutely required that the whole content isn't iterated before transferring
-the data to the client. Because the content can't be accessed, many
-middleware can't function normally. For example the ``ETag`` and
-``Content-Length`` headers can't be generated for streaming responses.
-
 The :class:`HttpResponseBase` base class is common between
 :class:`HttpResponse` and :class:`StreamingHttpResponse`.
 
+.. versionchanged:: 4.2
+
+    Support for asynchronous iteration was added.
+
 Attributes
 ----------
 
@@ -1181,6 +1214,16 @@ Attributes
 
     This is always ``True``.
 
+.. attribute:: StreamingHttpResponse.is_async
+
+    .. versionadded:: 4.2
+
+    Boolean indicating whether :attr:`StreamingHttpResponse.streaming_content`
+    is an asynchronous iterator or not.
+
+    This is useful for middleware needing to wrap
+    :attr:`StreamingHttpResponse.streaming_content`.
+
 ``FileResponse`` objects
 ========================
 
@@ -1213,6 +1256,15 @@ a file open in binary mode like so::
 
 The file will be closed automatically, so don't open it with a context manager.
 
+.. admonition:: Use under ASGI
+
+    Python's file API is synchronous. This means that the file must be fully
+    consumed in order to be served under ASGI.
+
+    In order to stream a file asynchronously you need to use a third-party
+    package that provides an asynchronous file API, such as `aiofiles
+    <https://github.com/Tinche/aiofiles>`_.
+
 Methods
 -------
 

+ 2 - 1
docs/releases/4.2.txt

@@ -286,7 +286,8 @@ Models
 Requests and Responses
 ~~~~~~~~~~~~~~~~~~~~~~
 
-* ...
+* :class:`~django.http.StreamingHttpResponse` now supports async iterators
+  when Django is served via ASGI.
 
 Security
 ~~~~~~~~

+ 10 - 0
docs/topics/http/middleware.txt

@@ -267,6 +267,16 @@ must test for streaming responses and adjust their behavior accordingly::
             for chunk in content:
                 yield alter_content(chunk)
 
+:class:`~django.http.StreamingHttpResponse` allows both synchronous and
+asynchronous iterators. The wrapping function must match. Check
+:attr:`StreamingHttpResponse.is_async
+<django.http.StreamingHttpResponse.is_async>` if your middleware needs to
+support both types of iterator.
+
+..  versionchanged:: 4.2
+
+    Support for streaming responses with asynchronous iterators was added.
+
 Exception handling
 ==================
 

+ 11 - 0
tests/asgi/tests.py

@@ -12,6 +12,7 @@ from django.db import close_old_connections
 from django.test import (
     AsyncRequestFactory,
     SimpleTestCase,
+    ignore_warnings,
     modify_settings,
     override_settings,
 )
@@ -58,6 +59,13 @@ class ASGITest(SimpleTestCase):
         # Allow response.close() to finish.
         await communicator.wait()
 
+    # Python's file API is not async compatible. A third-party library such
+    # as https://github.com/Tinche/aiofiles allows passing the file to
+    # FileResponse as an async interator. With a sync iterator
+    # StreamingHTTPResponse triggers a warning when iterating the file.
+    # assertWarnsMessage is not async compatible, so ignore_warnings for the
+    # test.
+    @ignore_warnings(module="django.http.response")
     async def test_file_response(self):
         """
         Makes sure that FileResponse works over ASGI.
@@ -91,6 +99,8 @@ class ASGITest(SimpleTestCase):
                     self.assertEqual(value, b"text/plain")
                 else:
                     raise
+
+        # Warning ignored here.
         response_body = await communicator.receive_output()
         self.assertEqual(response_body["type"], "http.response.body")
         self.assertEqual(response_body["body"], test_file_contents)
@@ -106,6 +116,7 @@ class ASGITest(SimpleTestCase):
             "django.contrib.staticfiles.finders.FileSystemFinder",
         ],
     )
+    @ignore_warnings(module="django.http.response")
     async def test_static_file_response(self):
         application = ASGIStaticFilesHandler(get_asgi_application())
         # Construct HTTP request.

+ 36 - 0
tests/httpwrappers/tests.py

@@ -720,6 +720,42 @@ class StreamingHttpResponseTests(SimpleTestCase):
             '<StreamingHttpResponse status_code=200, "text/html; charset=utf-8">',
         )
 
+    async def test_async_streaming_response(self):
+        async def async_iter():
+            yield b"hello"
+            yield b"world"
+
+        r = StreamingHttpResponse(async_iter())
+
+        chunks = []
+        async for chunk in r:
+            chunks.append(chunk)
+        self.assertEqual(chunks, [b"hello", b"world"])
+
+    def test_async_streaming_response_warning(self):
+        async def async_iter():
+            yield b"hello"
+            yield b"world"
+
+        r = StreamingHttpResponse(async_iter())
+
+        msg = (
+            "StreamingHttpResponse must consume asynchronous iterators in order to "
+            "serve them synchronously. Use a synchronous iterator instead."
+        )
+        with self.assertWarnsMessage(Warning, msg):
+            self.assertEqual(list(r), [b"hello", b"world"])
+
+    async def test_sync_streaming_response_warning(self):
+        r = StreamingHttpResponse(iter(["hello", "world"]))
+
+        msg = (
+            "StreamingHttpResponse must consume synchronous iterators in order to "
+            "serve them asynchronously. Use an asynchronous iterator instead."
+        )
+        with self.assertWarnsMessage(Warning, msg):
+            self.assertEqual(b"hello", await r.__aiter__().__anext__())
+
 
 class FileCloseTests(SimpleTestCase):
     def setUp(self):

+ 22 - 0
tests/middleware/tests.py

@@ -899,6 +899,28 @@ class GZipMiddlewareTest(SimpleTestCase):
         self.assertEqual(r.get("Content-Encoding"), "gzip")
         self.assertFalse(r.has_header("Content-Length"))
 
+    async def test_compress_async_streaming_response(self):
+        """
+        Compression is performed on responses with async streaming content.
+        """
+
+        async def get_stream_response(request):
+            async def iterator():
+                for chunk in self.sequence:
+                    yield chunk
+
+            resp = StreamingHttpResponse(iterator())
+            resp["Content-Type"] = "text/html; charset=UTF-8"
+            return resp
+
+        r = await GZipMiddleware(get_stream_response)(self.req)
+        self.assertEqual(
+            self.decompress(b"".join([chunk async for chunk in r])),
+            b"".join(self.sequence),
+        )
+        self.assertEqual(r.get("Content-Encoding"), "gzip")
+        self.assertFalse(r.has_header("Content-Length"))
+
     def test_compress_streaming_response_unicode(self):
         """
         Compression is performed on responses with streaming Unicode content.