diff options
Diffstat (limited to 'tests/asgi/tests.py')
| -rw-r--r-- | tests/asgi/tests.py | 165 |
1 files changed, 165 insertions, 0 deletions
diff --git a/tests/asgi/tests.py b/tests/asgi/tests.py index 58429477e2..2bb820cae2 100644 --- a/tests/asgi/tests.py +++ b/tests/asgi/tests.py @@ -3,6 +3,7 @@ import sys import tempfile import threading import time +from io import BytesIO from pathlib import Path from unittest.mock import patch @@ -31,6 +32,7 @@ from django.views.decorators.csrf import csrf_exempt from .urls import sync_waiter, test_filename TEST_STATIC_ROOT = Path(__file__).parent / "project" / "static" +TOO_MUCH_DATA_MSG = "Request body exceeded settings.DATA_UPLOAD_MAX_MEMORY_SIZE." class SignalHandler: @@ -800,3 +802,166 @@ class ASGITest(SimpleTestCase): request = ASGIRequest(scope, None) self.assertEqual(request.META["HTTP_COOKIE"], "a=abc; b=def; c=ghi") self.assertEqual(request.COOKIES, {"a": "abc", "b": "def", "c": "ghi"}) + + +class DataUploadMaxMemorySizeASGITests(SimpleTestCase): + + def make_request( + self, + body, + content_type=b"application/octet-stream", + content_length=None, + stream=None, + ): + scope = AsyncRequestFactory()._base_scope(method="POST", path="/") + scope["headers"] = [(b"content-type", content_type)] + if content_length is not None: + scope["headers"].append((b"content-length", str(content_length).encode())) + return ASGIRequest(scope, stream if stream is not None else BytesIO(body)) + + def test_body_size_not_exceeded_without_content_length(self): + body = b"x" * 5 + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=5): + self.assertEqual(self.make_request(body).body, body) + + def test_body_size_exceeded_without_content_length(self): + request = self.make_request(b"x" * 10) + with ( + self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=5), + self.assertRaisesMessage(RequestDataTooBig, TOO_MUCH_DATA_MSG), + ): + request.body + + def test_body_size_check_fires_before_read(self): + # The seekable size check rejects oversized bodies before reading + # them into memory (i.e. before calling self.read()). + class TrackingBytesIO(BytesIO): + calls = [] + + def read(self, *args, **kwargs): + self.calls.append((args, kwargs)) + return super().read(*args, **kwargs) + + stream = TrackingBytesIO(b"x" * 10) + request = self.make_request(b"x" * 10, stream=stream) + with ( + self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=5), + self.assertRaisesMessage(RequestDataTooBig, TOO_MUCH_DATA_MSG), + ): + request.body + + self.assertEqual(stream.calls, []) + + def test_post_size_exceeded_without_content_length(self): + request = self.make_request( + b"a=" + b"x" * 10, + content_type=b"application/x-www-form-urlencoded", + ) + with ( + self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=5), + self.assertRaisesMessage(RequestDataTooBig, TOO_MUCH_DATA_MSG), + ): + request.POST + + def test_no_limit(self): + body = b"x" * 100 + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=None): + self.assertEqual(self.make_request(body).body, body) + + async def test_read_body_no_limit(self): + chunks = [ + {"type": "http.request", "body": b"x" * 100, "more_body": True}, + {"type": "http.request", "body": b"x" * 100, "more_body": False}, + ] + + async def receive(): + return chunks.pop(0) + + handler = ASGIHandler() + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=None): + body_file = await handler.read_body(receive) + self.addCleanup(body_file.close) + + body_file.seek(0) + self.assertEqual(body_file.read(), b"x" * 200) + + def test_non_multipart_body_size_enforced(self): + # DATA_UPLOAD_MAX_MEMORY_SIZE is enforced on non-multipart bodies. + request = self.make_request(b"x" * 100) + with ( + self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=10), + self.assertRaisesMessage(RequestDataTooBig, TOO_MUCH_DATA_MSG), + ): + request.body + + def test_multipart_file_upload_not_limited_by_data_upload_max(self): + # DATA_UPLOAD_MAX_MEMORY_SIZE applies to non-file fields only; a file + # upload whose total body exceeds the limit must still succeed. + boundary = "testboundary" + file_content = b"x" * 100 + body = ( + ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file"; filename="test.txt"\r\n' + f"Content-Type: application/octet-stream\r\n" + f"\r\n" + ).encode() + + file_content + + f"\r\n--{boundary}--\r\n".encode() + ) + request = self.make_request( + body, + content_type=f"multipart/form-data; boundary={boundary}".encode(), + content_length=len(body), + ) + with self.settings( + DATA_UPLOAD_MAX_MEMORY_SIZE=10, FILE_UPLOAD_MAX_MEMORY_SIZE=10 + ): + files = request.FILES + self.assertEqual(len(files), 1) + uploaded = files["file"] + self.addCleanup(uploaded.close) + self.assertEqual(uploaded.read(), file_content) + + async def test_read_body_buffers_all_chunks(self): + # read_body() consumes all chunks regardless of + # DATA_UPLOAD_MAX_MEMORY_SIZE; the limit is enforced later when + # HttpRequest.body is accessed. + chunks = [ + {"type": "http.request", "body": b"x" * 10, "more_body": True}, + {"type": "http.request", "body": b"y" * 10, "more_body": True}, + {"type": "http.request", "body": b"z" * 10, "more_body": False}, + ] + + async def receive(): + return chunks.pop(0) + + handler = ASGIHandler() + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=15): + body_file = await handler.read_body(receive) + self.addCleanup(body_file.close) + + self.assertEqual(len(chunks), 0) # All chunks were consumed. + body_file.seek(0) + self.assertEqual(body_file.read(), b"x" * 10 + b"y" * 10 + b"z" * 10) + + async def test_read_body_multipart_not_limited(self): + # All chunks are consumed regardless of DATA_UPLOAD_MAX_MEMORY_SIZE; + # multipart size enforcement happens inside MultiPartParser, not here. + chunks = [ + {"type": "http.request", "body": b"x" * 10, "more_body": True}, + {"type": "http.request", "body": b"y" * 10, "more_body": True}, + {"type": "http.request", "body": b"z" * 10, "more_body": False}, + ] + + async def receive(): + return chunks.pop(0) + + handler = ASGIHandler() + with self.settings(DATA_UPLOAD_MAX_MEMORY_SIZE=15): + body_file = await handler.read_body(receive) + self.addCleanup(body_file.close) + + self.assertEqual(len(chunks), 0) # All chunks were consumed. + body_file.seek(0) + self.assertEqual(body_file.read(), b"x" * 10 + b"y" * 10 + b"z" * 10) |
