diff options
| author | Sam Toyer <sam@qxcv.net> | 2023-07-29 01:43:15 -0700 |
|---|---|---|
| committer | Mariusz Felisiak <felisiak.mariusz@gmail.com> | 2023-09-11 19:53:21 +0200 |
| commit | 64cea1e48f285ea2162c669208d95188b32bbc82 (patch) | |
| tree | 490e610188dabdd2fe9b2c9e7d36d51f11002468 /tests/asgi | |
| parent | a7c73b944f51d6c92ec876fd7e0a171e7c01657d (diff) | |
Fixed #34752 -- Fixed handling ASGI http.disconnect for streaming responses.
Diffstat (limited to 'tests/asgi')
| -rw-r--r-- | tests/asgi/tests.py | 121 | ||||
| -rw-r--r-- | tests/asgi/urls.py | 15 |
2 files changed, 133 insertions, 3 deletions
diff --git a/tests/asgi/tests.py b/tests/asgi/tests.py index 0222b5356e..ced24c658e 100644 --- a/tests/asgi/tests.py +++ b/tests/asgi/tests.py @@ -10,7 +10,7 @@ from django.core.asgi import get_asgi_application from django.core.handlers.asgi import ASGIHandler, ASGIRequest from django.core.signals import request_finished, request_started from django.db import close_old_connections -from django.http import HttpResponse +from django.http import HttpResponse, StreamingHttpResponse from django.test import ( AsyncRequestFactory, SimpleTestCase, @@ -237,6 +237,31 @@ class ASGITest(SimpleTestCase): with self.assertRaises(asyncio.TimeoutError): await communicator.receive_output() + async def test_disconnect_both_return(self): + # Force both the disconnect listener and the task that sends the + # response to finish at the same time. + application = get_asgi_application() + scope = self.async_request_factory._base_scope(path="/") + communicator = ApplicationCommunicator(application, scope) + await communicator.send_input({"type": "http.request", "body": b"some body"}) + # Fetch response headers (this yields to asyncio and causes + # ASGHandler.send_response() to dump the body of the response in the + # queue). + await communicator.receive_output() + # Fetch response body (there's already some data queued up, so this + # doesn't actually yield to the event loop, it just succeeds + # instantly). + await communicator.receive_output() + # Send disconnect at the same time that response finishes (this just + # puts some info in a queue, it doesn't have to yield to the event + # loop). + await communicator.send_input({"type": "http.disconnect"}) + # Waiting for the communicator _does_ yield to the event loop, since + # ASGIHandler.send_response() is still waiting to do response.close(). + # It so happens that there are enough remaining yield points in both + # tasks that they both finish while the loop is running. + await communicator.wait() + async def test_disconnect_with_body(self): application = get_asgi_application() scope = self.async_request_factory._base_scope(path="/") @@ -254,7 +279,7 @@ class ASGITest(SimpleTestCase): await communicator.send_input({"type": "http.not_a_real_message"}) msg = "Invalid ASGI message after request body: http.not_a_real_message" with self.assertRaisesMessage(AssertionError, msg): - await communicator.receive_output() + await communicator.wait() async def test_delayed_disconnect_with_body(self): application = get_asgi_application() @@ -402,3 +427,95 @@ class ASGITest(SimpleTestCase): await communicator.receive_output() await communicator.wait() self.assertIs(view_did_cancel, True) + + async def test_asyncio_streaming_cancel_error(self): + # Similar to test_asyncio_cancel_error(), but during a streaming + # response. + view_did_cancel = False + + async def streaming_response(): + nonlocal view_did_cancel + try: + await asyncio.sleep(0.2) + yield b"Hello World!" + except asyncio.CancelledError: + # Set the flag. + view_did_cancel = True + raise + + async def view(request): + return StreamingHttpResponse(streaming_response()) + + class TestASGIRequest(ASGIRequest): + urlconf = (path("cancel/", view),) + + class TestASGIHandler(ASGIHandler): + request_class = TestASGIRequest + + # With no disconnect, the request cycle should complete in the same + # manner as the non-streaming response. + application = TestASGIHandler() + scope = self.async_request_factory._base_scope(path="/cancel/") + communicator = ApplicationCommunicator(application, scope) + await communicator.send_input({"type": "http.request"}) + response_start = await communicator.receive_output() + self.assertEqual(response_start["type"], "http.response.start") + self.assertEqual(response_start["status"], 200) + response_body = await communicator.receive_output() + self.assertEqual(response_body["type"], "http.response.body") + self.assertEqual(response_body["body"], b"Hello World!") + await communicator.wait() + self.assertIs(view_did_cancel, False) + + # Request cycle with a disconnect. + application = TestASGIHandler() + scope = self.async_request_factory._base_scope(path="/cancel/") + communicator = ApplicationCommunicator(application, scope) + await communicator.send_input({"type": "http.request"}) + response_start = await communicator.receive_output() + # Fetch the start of response so streaming can begin + self.assertEqual(response_start["type"], "http.response.start") + self.assertEqual(response_start["status"], 200) + await asyncio.sleep(0.1) + # Now disconnect the client. + await communicator.send_input({"type": "http.disconnect"}) + # This time the handler should not send a response. + with self.assertRaises(asyncio.TimeoutError): + await communicator.receive_output() + await communicator.wait() + self.assertIs(view_did_cancel, True) + + async def test_streaming(self): + scope = self.async_request_factory._base_scope( + path="/streaming/", query_string=b"sleep=0.001" + ) + application = get_asgi_application() + communicator = ApplicationCommunicator(application, scope) + await communicator.send_input({"type": "http.request"}) + # Fetch http.response.start. + await communicator.receive_output(timeout=1) + # Fetch the 'first' and 'last'. + first_response = await communicator.receive_output(timeout=1) + self.assertEqual(first_response["body"], b"first\n") + second_response = await communicator.receive_output(timeout=1) + self.assertEqual(second_response["body"], b"last\n") + # Fetch the rest of the response so that coroutines are cleaned up. + await communicator.receive_output(timeout=1) + with self.assertRaises(asyncio.TimeoutError): + await communicator.receive_output(timeout=1) + + async def test_streaming_disconnect(self): + scope = self.async_request_factory._base_scope( + path="/streaming/", query_string=b"sleep=0.1" + ) + application = get_asgi_application() + communicator = ApplicationCommunicator(application, scope) + await communicator.send_input({"type": "http.request"}) + await communicator.receive_output(timeout=1) + first_response = await communicator.receive_output(timeout=1) + self.assertEqual(first_response["body"], b"first\n") + # Disconnect the client. + await communicator.send_input({"type": "http.disconnect"}) + # 'last\n' isn't sent. + with self.assertRaises(asyncio.TimeoutError): + await communicator.receive_output(timeout=0.2) diff --git a/tests/asgi/urls.py b/tests/asgi/urls.py index 0f74fc9b97..931b7d5206 100644 --- a/tests/asgi/urls.py +++ b/tests/asgi/urls.py @@ -1,7 +1,8 @@ +import asyncio import threading import time -from django.http import FileResponse, HttpResponse +from django.http import FileResponse, HttpResponse, StreamingHttpResponse from django.urls import path from django.views.decorators.csrf import csrf_exempt @@ -44,6 +45,17 @@ sync_waiter.lock = threading.Lock() sync_waiter.barrier = threading.Barrier(2) +async def streaming_inner(sleep_time): + yield b"first\n" + await asyncio.sleep(sleep_time) + yield b"last\n" + + +async def streaming_view(request): + sleep_time = float(request.GET["sleep"]) + return StreamingHttpResponse(streaming_inner(sleep_time)) + + test_filename = __file__ @@ -54,4 +66,5 @@ urlpatterns = [ path("post/", post_echo), path("wait/", sync_waiter), path("delayed_hello/", hello_with_delay), + path("streaming/", streaming_view), ] |
