diff options
| author | Jake Howard <git@theorangeone.net> | 2025-07-17 12:51:09 +0100 |
|---|---|---|
| committer | nessita <124304+nessita@users.noreply.github.com> | 2025-09-16 17:28:32 -0300 |
| commit | 4289966d1b8e848e5e460b7c782dac009d746b20 (patch) | |
| tree | ef1d61a33562579d985c762036db5f7aa01406fc /tests/tasks | |
| parent | 218f69f05eb51da1ea17d62a914a67ceff5bfd55 (diff) | |
Fixed #35859 -- Added background Tasks framework interface.
This work implements what was defined in DEP 14
(https://github.com/django/deps/blob/main/accepted/0014-background-workers.rst).
Thanks to Raphael Gaschignard, Eric Holscher, Ran Benita, Sarah Boyce,
Jacob Walls, and Natalia Bidart for the reviews.
Diffstat (limited to 'tests/tasks')
| -rw-r--r-- | tests/tasks/__init__.py | 0 | ||||
| -rw-r--r-- | tests/tasks/tasks.py | 88 | ||||
| -rw-r--r-- | tests/tasks/test_custom_backend.py | 71 | ||||
| -rw-r--r-- | tests/tasks/test_dummy_backend.py | 337 | ||||
| -rw-r--r-- | tests/tasks/test_immediate_backend.py | 387 | ||||
| -rw-r--r-- | tests/tasks/test_tasks.py | 316 |
6 files changed, 1199 insertions, 0 deletions
diff --git a/tests/tasks/__init__.py b/tests/tasks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/tests/tasks/__init__.py diff --git a/tests/tasks/tasks.py b/tests/tasks/tasks.py new file mode 100644 index 0000000000..3959660ab9 --- /dev/null +++ b/tests/tasks/tasks.py @@ -0,0 +1,88 @@ +import time + +from django.tasks import TaskContext, task + + +@task() +def noop_task(*args, **kwargs): + return None + + +@task +def noop_task_from_bare_decorator(*args, **kwargs): + return None + + +@task() +async def noop_task_async(*args, **kwargs): + return None + + +@task() +def calculate_meaning_of_life(): + return 42 + + +@task() +def failing_task_value_error(): + raise ValueError("This Task failed due to ValueError") + + +@task() +def failing_task_system_exit(): + raise SystemExit("This Task failed due to SystemExit") + + +@task() +def failing_task_keyboard_interrupt(): + raise KeyboardInterrupt("This Task failed due to KeyboardInterrupt") + + +@task() +def complex_exception(): + raise ValueError(ValueError("This task failed")) + + +@task() +def complex_return_value(): + # Return something which isn't JSON serializable nor picklable. + return lambda: True + + +@task() +def exit_task(): + exit(1) + + +@task(enqueue_on_commit=True) +def enqueue_on_commit_task(): + pass + + +@task(enqueue_on_commit=False) +def never_enqueue_on_commit_task(): + pass + + +@task() +def hang(): + """ + Do nothing for 5 minutes + """ + time.sleep(300) + + +@task() +def sleep_for(seconds): + time.sleep(seconds) + + +@task(takes_context=True) +def get_task_id(context): + return context.task_result.id + + +@task(takes_context=True) +def test_context(context, attempt): + assert isinstance(context, TaskContext) + assert context.attempt == attempt diff --git a/tests/tasks/test_custom_backend.py b/tests/tasks/test_custom_backend.py new file mode 100644 index 0000000000..83a302a183 --- /dev/null +++ b/tests/tasks/test_custom_backend.py @@ -0,0 +1,71 @@ +import logging +from unittest import mock + +from django.tasks import default_task_backend, task_backends +from django.tasks.backends.base import BaseTaskBackend +from django.tasks.exceptions import InvalidTask +from django.test import SimpleTestCase, override_settings + +from . import tasks as test_tasks + + +class CustomBackend(BaseTaskBackend): + def __init__(self, alias, params): + super().__init__(alias, params) + self.prefix = self.options.get("prefix", "") + + def enqueue(self, *args, **kwargs): + logger = logging.getLogger(__name__) + logger.info(f"{self.prefix}Task enqueued.") + + +class CustomBackendNoEnqueue(BaseTaskBackend): + pass + + +@override_settings( + TASKS={ + "default": { + "BACKEND": f"{CustomBackend.__module__}.{CustomBackend.__qualname__}", + "ENQUEUE_ON_COMMIT": False, + "OPTIONS": {"prefix": "PREFIX: "}, + }, + "no_enqueue": { + "BACKEND": f"{CustomBackendNoEnqueue.__module__}." + f"{CustomBackendNoEnqueue.__qualname__}", + }, + } +) +class CustomBackendTestCase(SimpleTestCase): + def test_using_correct_backend(self): + self.assertEqual(default_task_backend, task_backends["default"]) + self.assertIsInstance(task_backends["default"], CustomBackend) + self.assertEqual(default_task_backend.alias, "default") + self.assertEqual(default_task_backend.options, {"prefix": "PREFIX: "}) + + @mock.patch.multiple(CustomBackend, supports_async_task=False) + def test_enqueue_async_task_on_non_async_backend(self): + with self.assertRaisesMessage( + InvalidTask, "Backend does not support async Tasks." + ): + default_task_backend.validate_task(test_tasks.noop_task_async) + + def test_backend_does_not_support_priority(self): + with self.assertRaisesMessage( + InvalidTask, "Backend does not support setting priority of tasks." + ): + test_tasks.noop_task.using(priority=10) + + def test_options(self): + with self.assertLogs(__name__, level="INFO") as captured_logs: + test_tasks.noop_task.enqueue() + self.assertEqual(len(captured_logs.output), 1) + self.assertIn("PREFIX: Task enqueued", captured_logs.output[0]) + + def test_no_enqueue(self): + with self.assertRaisesMessage( + TypeError, + "Can't instantiate abstract class CustomBackendNoEnqueue " + "without an implementation for abstract method 'enqueue'", + ): + test_tasks.noop_task.using(backend="no_enqueue") diff --git a/tests/tasks/test_dummy_backend.py b/tests/tasks/test_dummy_backend.py new file mode 100644 index 0000000000..27205f6ab3 --- /dev/null +++ b/tests/tasks/test_dummy_backend.py @@ -0,0 +1,337 @@ +from typing import cast +from unittest import mock + +from django.db import transaction +from django.db.utils import ConnectionHandler +from django.tasks import TaskResultStatus, default_task_backend, task_backends +from django.tasks.backends.dummy import DummyBackend +from django.tasks.base import Task +from django.tasks.exceptions import InvalidTask, TaskResultDoesNotExist +from django.test import ( + SimpleTestCase, + TransactionTestCase, + override_settings, + skipIfDBFeature, +) + +from . import tasks as test_tasks + + +@override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "QUEUES": [], + "ENQUEUE_ON_COMMIT": False, + } + } +) +class DummyBackendTestCase(SimpleTestCase): + def setUp(self): + default_task_backend.clear() + + def test_using_correct_backend(self): + self.assertEqual(default_task_backend, task_backends["default"]) + self.assertIsInstance(task_backends["default"], DummyBackend) + self.assertEqual(default_task_backend.alias, "default") + self.assertEqual(default_task_backend.options, {}) + + def test_enqueue_task(self): + for task in [test_tasks.noop_task, test_tasks.noop_task_async]: + with self.subTest(task): + result = cast(Task, task).enqueue(1, two=3) + + self.assertEqual(result.status, TaskResultStatus.READY) + self.assertIs(result.is_finished, False) + self.assertIsNone(result.started_at) + self.assertIsNone(result.last_attempted_at) + self.assertIsNone(result.finished_at) + with self.assertRaisesMessage(ValueError, "Task has not finished yet"): + result.return_value + self.assertEqual(result.task, task) + self.assertEqual(result.args, [1]) + self.assertEqual(result.kwargs, {"two": 3}) + self.assertEqual(result.attempts, 0) + + self.assertIn(result, default_task_backend.results) + + async def test_enqueue_task_async(self): + for task in [test_tasks.noop_task, test_tasks.noop_task_async]: + with self.subTest(task): + result = await cast(Task, task).aenqueue() + + self.assertEqual(result.status, TaskResultStatus.READY) + self.assertIs(result.is_finished, False) + self.assertIsNone(result.started_at) + self.assertIsNone(result.last_attempted_at) + self.assertIsNone(result.finished_at) + with self.assertRaisesMessage(ValueError, "Task has not finished yet"): + result.return_value + self.assertEqual(result.task, task) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + self.assertEqual(result.attempts, 0) + + self.assertIn(result, default_task_backend.results) + + def test_get_result(self): + result = default_task_backend.enqueue(test_tasks.noop_task, (), {}) + + new_result = default_task_backend.get_result(result.id) + + self.assertEqual(result, new_result) + + async def test_get_result_async(self): + result = await default_task_backend.aenqueue(test_tasks.noop_task, (), {}) + + new_result = await default_task_backend.aget_result(result.id) + + self.assertEqual(result, new_result) + + def test_refresh_result(self): + result = default_task_backend.enqueue( + test_tasks.calculate_meaning_of_life, (), {} + ) + + enqueued_result = default_task_backend.results[0] + object.__setattr__(enqueued_result, "status", TaskResultStatus.SUCCESSFUL) + + self.assertEqual(result.status, TaskResultStatus.READY) + result.refresh() + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + async def test_refresh_result_async(self): + result = await default_task_backend.aenqueue( + test_tasks.calculate_meaning_of_life, (), {} + ) + + enqueued_result = default_task_backend.results[0] + object.__setattr__(enqueued_result, "status", TaskResultStatus.SUCCESSFUL) + + self.assertEqual(result.status, TaskResultStatus.READY) + await result.arefresh() + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + async def test_get_missing_result(self): + with self.assertRaises(TaskResultDoesNotExist): + default_task_backend.get_result("123") + + with self.assertRaises(TaskResultDoesNotExist): + await default_task_backend.aget_result("123") + + def test_enqueue_on_commit(self): + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task( + test_tasks.enqueue_on_commit_task + ), + True, + ) + + def test_enqueue_logs(self): + with self.assertLogs("django.tasks", level="DEBUG") as captured_logs: + result = test_tasks.noop_task.enqueue() + + self.assertEqual(len(captured_logs.output), 1) + self.assertIn("enqueued", captured_logs.output[0]) + self.assertIn(result.id, captured_logs.output[0]) + + def test_errors(self): + result = test_tasks.noop_task.enqueue() + self.assertEqual(result.errors, []) + + def test_validate_disallowed_async_task(self): + with mock.patch.multiple(default_task_backend, supports_async_task=False): + with self.assertRaisesMessage( + InvalidTask, "Backend does not support async Tasks." + ): + default_task_backend.validate_task(test_tasks.noop_task_async) + + def test_check(self): + errors = list(default_task_backend.check()) + self.assertEqual(len(errors), 0, errors) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "ENQUEUE_ON_COMMIT": True, + } + } + ) + @mock.patch("django.tasks.backends.base.connections", ConnectionHandler({})) + def test_enqueue_on_commit_with_no_databases(self): + self.assertIn( + "tasks.E001", {error.id for error in default_task_backend.check()} + ) + + def test_takes_context(self): + result = test_tasks.get_task_id.enqueue() + self.assertEqual(result.status, TaskResultStatus.READY) + + def test_clear(self): + result = test_tasks.noop_task.enqueue() + + default_task_backend.get_result(result.id) + + default_task_backend.clear() + + with self.assertRaisesMessage(TaskResultDoesNotExist, result.id): + default_task_backend.get_result(result.id) + + def test_validate_on_enqueue(self): + task_with_custom_queue_name = test_tasks.noop_task.using( + queue_name="unknown_queue" + ) + + with override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "QUEUES": ["queue-1"], + "ENQUEUE_ON_COMMIT": False, + } + } + ): + with self.assertRaisesMessage( + InvalidTask, "Queue 'unknown_queue' is not valid for backend" + ): + task_with_custom_queue_name.enqueue() + + async def test_validate_on_aenqueue(self): + task_with_custom_queue_name = test_tasks.noop_task.using( + queue_name="unknown_queue" + ) + + with override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "QUEUES": ["queue-1"], + "ENQUEUE_ON_COMMIT": False, + } + } + ): + with self.assertRaisesMessage( + InvalidTask, "Queue 'unknown_queue' is not valid for backend" + ): + await task_with_custom_queue_name.aenqueue() + + +class DummyBackendTransactionTestCase(TransactionTestCase): + available_apps = [] + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "ENQUEUE_ON_COMMIT": True, + } + } + ) + def test_wait_until_transaction_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + True, + ) + + with transaction.atomic(): + test_tasks.noop_task.enqueue() + + self.assertEqual(len(default_task_backend.results), 0) + + self.assertEqual(len(default_task_backend.results), 1) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "ENQUEUE_ON_COMMIT": False, + } + } + ) + def test_doesnt_wait_until_transaction_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, False) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + False, + ) + + with transaction.atomic(): + result = test_tasks.noop_task.enqueue() + + self.assertIsNotNone(result.enqueued_at) + + self.assertEqual(len(default_task_backend.results), 1) + + self.assertEqual(len(default_task_backend.results), 1) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + } + } + ) + def test_wait_until_transaction_by_default(self): + self.assertIs(default_task_backend.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + True, + ) + + with transaction.atomic(): + result = test_tasks.noop_task.enqueue() + + self.assertIsNone(result.enqueued_at) + + self.assertEqual(len(default_task_backend.results), 0) + + self.assertEqual(len(default_task_backend.results), 1) + self.assertIsNone(result.enqueued_at) + result.refresh() + self.assertIsNotNone(result.enqueued_at) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "ENQUEUE_ON_COMMIT": False, + } + } + ) + def test_task_specific_enqueue_on_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, False) + self.assertIs(test_tasks.enqueue_on_commit_task.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task( + test_tasks.enqueue_on_commit_task + ), + True, + ) + + with transaction.atomic(): + result = test_tasks.enqueue_on_commit_task.enqueue() + + self.assertIsNone(result.enqueued_at) + + self.assertEqual(len(default_task_backend.results), 0) + + self.assertEqual(len(default_task_backend.results), 1) + self.assertIsNone(result.enqueued_at) + result.refresh() + self.assertIsNotNone(result.enqueued_at) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "ENQUEUE_ON_COMMIT": True, + } + } + ) + @skipIfDBFeature("supports_transactions") + def test_enqueue_on_commit_with_no_transactions(self): + self.assertIn( + "tasks.E002", {error.id for error in default_task_backend.check()} + ) diff --git a/tests/tasks/test_immediate_backend.py b/tests/tasks/test_immediate_backend.py new file mode 100644 index 0000000000..01e2841aa7 --- /dev/null +++ b/tests/tasks/test_immediate_backend.py @@ -0,0 +1,387 @@ +from django.db import transaction +from django.tasks import TaskResultStatus, default_task_backend, task_backends +from django.tasks.backends.immediate import ImmediateBackend +from django.tasks.exceptions import InvalidTask +from django.test import SimpleTestCase, TransactionTestCase, override_settings +from django.utils import timezone + +from . import tasks as test_tasks + + +@override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "QUEUES": [], + "ENQUEUE_ON_COMMIT": False, + } + } +) +class ImmediateBackendTestCase(SimpleTestCase): + def test_using_correct_backend(self): + self.assertEqual(default_task_backend, task_backends["default"]) + self.assertIsInstance(task_backends["default"], ImmediateBackend) + self.assertEqual(default_task_backend.alias, "default") + self.assertEqual(default_task_backend.options, {}) + + def test_enqueue_task(self): + for task in [test_tasks.noop_task, test_tasks.noop_task_async]: + with self.subTest(task): + result = task.enqueue(1, two=3) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + self.assertIs(result.is_finished, True) + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.last_attempted_at) + self.assertIsNotNone(result.finished_at) + self.assertGreaterEqual(result.started_at, result.enqueued_at) + self.assertGreaterEqual(result.finished_at, result.started_at) + self.assertIsNone(result.return_value) + self.assertEqual(result.task, task) + self.assertEqual(result.args, [1]) + self.assertEqual(result.kwargs, {"two": 3}) + self.assertEqual(result.attempts, 1) + + async def test_enqueue_task_async(self): + for task in [test_tasks.noop_task, test_tasks.noop_task_async]: + with self.subTest(task): + result = await task.aenqueue() + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + self.assertIs(result.is_finished, True) + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.last_attempted_at) + self.assertIsNotNone(result.finished_at) + self.assertGreaterEqual(result.started_at, result.enqueued_at) + self.assertGreaterEqual(result.finished_at, result.started_at) + self.assertIsNone(result.return_value) + self.assertEqual(result.task, task) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + self.assertEqual(result.attempts, 1) + + def test_catches_exception(self): + test_data = [ + ( + test_tasks.failing_task_value_error, # Task function. + ValueError, # Expected exception. + "This Task failed due to ValueError", # Expected message. + ), + ( + test_tasks.failing_task_system_exit, + SystemExit, + "This Task failed due to SystemExit", + ), + ] + for task, exception, message in test_data: + with ( + self.subTest(task), + self.assertLogs("django.tasks", level="ERROR") as captured_logs, + ): + result = task.enqueue() + + self.assertEqual(len(captured_logs.output), 1) + self.assertIn(message, captured_logs.output[0]) + + self.assertEqual(result.status, TaskResultStatus.FAILED) + with self.assertRaisesMessage(ValueError, "Task failed"): + result.return_value + self.assertIs(result.is_finished, True) + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.last_attempted_at) + self.assertIsNotNone(result.finished_at) + self.assertGreaterEqual(result.started_at, result.enqueued_at) + self.assertGreaterEqual(result.finished_at, result.started_at) + self.assertEqual(result.errors[0].exception_class, exception) + traceback = result.errors[0].traceback + self.assertIs( + traceback + and traceback.endswith(f"{exception.__name__}: {message}\n"), + True, + traceback, + ) + self.assertEqual(result.task, task) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + + def test_throws_keyboard_interrupt(self): + with self.assertRaises(KeyboardInterrupt): + with self.assertNoLogs("django.tasks", level="ERROR"): + default_task_backend.enqueue( + test_tasks.failing_task_keyboard_interrupt, [], {} + ) + + def test_complex_exception(self): + with self.assertLogs("django.tasks", level="ERROR"): + result = test_tasks.complex_exception.enqueue() + + self.assertEqual(result.status, TaskResultStatus.FAILED) + with self.assertRaisesMessage(ValueError, "Task failed"): + result.return_value + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.last_attempted_at) + self.assertIsNotNone(result.finished_at) + self.assertGreaterEqual(result.started_at, result.enqueued_at) + self.assertGreaterEqual(result.finished_at, result.started_at) + + self.assertIsNone(result._return_value) + self.assertEqual(result.errors[0].exception_class, ValueError) + self.assertIn( + 'ValueError(ValueError("This task failed"))', result.errors[0].traceback + ) + + self.assertEqual(result.task, test_tasks.complex_exception) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + + def test_complex_return_value(self): + with self.assertLogs("django.tasks", level="ERROR"): + result = test_tasks.complex_return_value.enqueue() + + self.assertEqual(result.status, TaskResultStatus.FAILED) + self.assertIsNotNone(result.started_at) + self.assertIsNotNone(result.last_attempted_at) + self.assertIsNotNone(result.finished_at) + self.assertGreaterEqual(result.started_at, result.enqueued_at) + self.assertGreaterEqual(result.finished_at, result.started_at) + self.assertIsNone(result._return_value) + self.assertEqual(result.errors[0].exception_class, TypeError) + self.assertIn("Unsupported type", result.errors[0].traceback) + + def test_result(self): + result = default_task_backend.enqueue( + test_tasks.calculate_meaning_of_life, [], {} + ) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + self.assertEqual(result.return_value, 42) + + async def test_result_async(self): + result = await default_task_backend.aenqueue( + test_tasks.calculate_meaning_of_life, [], {} + ) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + self.assertEqual(result.return_value, 42) + + async def test_cannot_get_result(self): + with self.assertRaisesMessage( + NotImplementedError, + "This backend does not support retrieving or refreshing results.", + ): + default_task_backend.get_result("123") + + with self.assertRaisesMessage( + NotImplementedError, + "This backend does not support retrieving or refreshing results.", + ): + await default_task_backend.aget_result(123) + + async def test_cannot_refresh_result(self): + result = await default_task_backend.aenqueue( + test_tasks.calculate_meaning_of_life, (), {} + ) + + with self.assertRaisesMessage( + NotImplementedError, + "This backend does not support retrieving or refreshing results.", + ): + await result.arefresh() + + with self.assertRaisesMessage( + NotImplementedError, + "This backend does not support retrieving or refreshing results.", + ): + result.refresh() + + def test_cannot_pass_run_after(self): + with self.assertRaisesMessage( + InvalidTask, + "Backend does not support run_after.", + ): + default_task_backend.validate_task( + test_tasks.failing_task_value_error.using(run_after=timezone.now()) + ) + + def test_enqueue_on_commit(self): + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task( + test_tasks.enqueue_on_commit_task + ), + True, + ) + + def test_enqueue_logs(self): + with self.assertLogs("django.tasks", level="DEBUG") as captured_logs: + result = test_tasks.noop_task.enqueue() + + self.assertEqual(len(captured_logs.output), 3) + + self.assertIn("enqueued", captured_logs.output[0]) + self.assertIn(result.id, captured_logs.output[0]) + + self.assertIn("state=RUNNING", captured_logs.output[1]) + self.assertIn(result.id, captured_logs.output[1]) + + self.assertIn("state=SUCCESSFUL", captured_logs.output[2]) + self.assertIn(result.id, captured_logs.output[2]) + + def test_failed_logs(self): + with self.assertLogs("django.tasks", level="DEBUG") as captured_logs: + result = test_tasks.failing_task_value_error.enqueue() + + self.assertEqual(len(captured_logs.output), 3) + self.assertIn("state=RUNNING", captured_logs.output[1]) + self.assertIn(result.id, captured_logs.output[1]) + + self.assertIn("state=FAILED", captured_logs.output[2]) + self.assertIn(result.id, captured_logs.output[2]) + + def test_takes_context(self): + result = test_tasks.get_task_id.enqueue() + + self.assertEqual(result.return_value, result.id) + + def test_context(self): + result = test_tasks.test_context.enqueue(1) + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + def test_validate_on_enqueue(self): + task_with_custom_queue_name = test_tasks.noop_task.using( + queue_name="unknown_queue" + ) + + with override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "QUEUES": ["queue-1"], + "ENQUEUE_ON_COMMIT": False, + } + } + ): + with self.assertRaisesMessage( + InvalidTask, "Queue 'unknown_queue' is not valid for backend" + ): + task_with_custom_queue_name.enqueue() + + async def test_validate_on_aenqueue(self): + task_with_custom_queue_name = test_tasks.noop_task.using( + queue_name="unknown_queue" + ) + + with override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "QUEUES": ["queue-1"], + "ENQUEUE_ON_COMMIT": False, + } + } + ): + with self.assertRaisesMessage( + InvalidTask, "Queue 'unknown_queue' is not valid for backend" + ): + await task_with_custom_queue_name.aenqueue() + + +class ImmediateBackendTransactionTestCase(TransactionTestCase): + available_apps = [] + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "ENQUEUE_ON_COMMIT": True, + } + } + ) + def test_wait_until_transaction_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + True, + ) + + with transaction.atomic(): + result = test_tasks.noop_task.enqueue() + + self.assertIsNone(result.enqueued_at) + self.assertEqual(result.attempts, 0) + self.assertEqual(result.status, TaskResultStatus.READY) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + self.assertIsNotNone(result.enqueued_at) + self.assertEqual(result.attempts, 1) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "ENQUEUE_ON_COMMIT": False, + } + } + ) + def test_doesnt_wait_until_transaction_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, False) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + False, + ) + + with transaction.atomic(): + result = test_tasks.noop_task.enqueue() + + self.assertIsNotNone(result.enqueued_at) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + } + } + ) + def test_wait_until_transaction_by_default(self): + self.assertIs(default_task_backend.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task), + True, + ) + + with transaction.atomic(): + result = test_tasks.noop_task.enqueue() + + self.assertIsNone(result.enqueued_at) + self.assertEqual(result.status, TaskResultStatus.READY) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) + + @override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "ENQUEUE_ON_COMMIT": False, + } + } + ) + def test_task_specific_enqueue_on_commit(self): + self.assertIs(default_task_backend.enqueue_on_commit, False) + self.assertIs(test_tasks.enqueue_on_commit_task.enqueue_on_commit, True) + self.assertIs( + default_task_backend._get_enqueue_on_commit_for_task( + test_tasks.enqueue_on_commit_task + ), + True, + ) + + with transaction.atomic(): + result = test_tasks.enqueue_on_commit_task.enqueue() + + self.assertIsNone(result.enqueued_at) + self.assertEqual(result.status, TaskResultStatus.READY) + + self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL) diff --git a/tests/tasks/test_tasks.py b/tests/tasks/test_tasks.py new file mode 100644 index 0000000000..2c11d9657d --- /dev/null +++ b/tests/tasks/test_tasks.py @@ -0,0 +1,316 @@ +import dataclasses +from datetime import datetime + +from django.tasks import ( + DEFAULT_TASK_QUEUE_NAME, + TaskResultStatus, + default_task_backend, + task, + task_backends, +) +from django.tasks.backends.dummy import DummyBackend +from django.tasks.backends.immediate import ImmediateBackend +from django.tasks.base import TASK_MAX_PRIORITY, TASK_MIN_PRIORITY, Task +from django.tasks.exceptions import ( + InvalidTask, + InvalidTaskBackend, + TaskResultDoesNotExist, + TaskResultMismatch, +) +from django.test import SimpleTestCase, override_settings +from django.utils import timezone +from django.utils.module_loading import import_string + +from . import tasks as test_tasks + + +@override_settings( + TASKS={ + "default": { + "BACKEND": "django.tasks.backends.dummy.DummyBackend", + "QUEUES": ["default", "queue_1"], + "ENQUEUE_ON_COMMIT": False, + }, + "immediate": { + "BACKEND": "django.tasks.backends.immediate.ImmediateBackend", + "ENQUEUE_ON_COMMIT": False, + "QUEUES": [], + }, + "missing": {"BACKEND": "does.not.exist"}, + }, + USE_TZ=True, +) +class TaskTestCase(SimpleTestCase): + def setUp(self): + default_task_backend.clear() + + def test_using_correct_backend(self): + self.assertEqual(default_task_backend, task_backends["default"]) + self.assertIsInstance(task_backends["default"], DummyBackend) + + def test_task_decorator(self): + self.assertIsInstance(test_tasks.noop_task, Task) + self.assertIsInstance(test_tasks.noop_task_async, Task) + self.assertIsInstance(test_tasks.noop_task_from_bare_decorator, Task) + + def test_enqueue_task(self): + result = test_tasks.noop_task.enqueue() + + self.assertEqual(result.status, TaskResultStatus.READY) + self.assertEqual(result.task, test_tasks.noop_task) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + + self.assertEqual(default_task_backend.results, [result]) + + async def test_enqueue_task_async(self): + result = await test_tasks.noop_task.aenqueue() + + self.assertEqual(result.status, TaskResultStatus.READY) + self.assertEqual(result.task, test_tasks.noop_task) + self.assertEqual(result.args, []) + self.assertEqual(result.kwargs, {}) + + self.assertEqual(default_task_backend.results, [result]) + + def test_enqueue_with_invalid_argument(self): + with self.assertRaisesMessage(TypeError, "Unsupported type"): + test_tasks.noop_task.enqueue(datetime.now()) + + async def test_aenqueue_with_invalid_argument(self): + with self.assertRaisesMessage(TypeError, "Unsupported type"): + await test_tasks.noop_task.aenqueue(datetime.now()) + + def test_using_priority(self): + self.assertEqual(test_tasks.noop_task.priority, 0) + self.assertEqual(test_tasks.noop_task.using(priority=1).priority, 1) + self.assertEqual(test_tasks.noop_task.priority, 0) + + def test_using_queue_name(self): + self.assertEqual(test_tasks.noop_task.queue_name, DEFAULT_TASK_QUEUE_NAME) + self.assertEqual( + test_tasks.noop_task.using(queue_name="queue_1").queue_name, "queue_1" + ) + self.assertEqual(test_tasks.noop_task.queue_name, DEFAULT_TASK_QUEUE_NAME) + + def test_using_run_after(self): + now = timezone.now() + + self.assertIsNone(test_tasks.noop_task.run_after) + self.assertEqual(test_tasks.noop_task.using(run_after=now).run_after, now) + self.assertIsNone(test_tasks.noop_task.run_after) + + def test_using_unknown_backend(self): + self.assertEqual(test_tasks.noop_task.backend, "default") + + with self.assertRaisesMessage( + InvalidTaskBackend, "The connection 'unknown' doesn't exist." + ): + test_tasks.noop_task.using(backend="unknown") + + def test_using_missing_backend(self): + self.assertEqual(test_tasks.noop_task.backend, "default") + + with self.assertRaisesMessage( + InvalidTaskBackend, + "Could not find backend 'does.not.exist': No module named 'does'", + ): + test_tasks.noop_task.using(backend="missing") + + def test_using_creates_new_instance(self): + new_task = test_tasks.noop_task.using() + + self.assertEqual(new_task, test_tasks.noop_task) + self.assertIsNot(new_task, test_tasks.noop_task) + + def test_chained_using(self): + now = timezone.now() + + run_after_task = test_tasks.noop_task.using(run_after=now) + self.assertEqual(run_after_task.run_after, now) + + priority_task = run_after_task.using(priority=10) + self.assertEqual(priority_task.priority, 10) + self.assertEqual(priority_task.run_after, now) + + self.assertEqual(run_after_task.priority, 0) + + async def test_refresh_result(self): + result = await test_tasks.noop_task.aenqueue() + + original_result = dataclasses.asdict(result) + + result.refresh() + + self.assertEqual(dataclasses.asdict(result), original_result) + + await result.arefresh() + + self.assertEqual(dataclasses.asdict(result), original_result) + + def test_naive_datetime(self): + with self.assertRaisesMessage( + InvalidTask, "run_after must be an aware datetime." + ): + test_tasks.noop_task.using(run_after=datetime.now()) + + def test_invalid_priority(self): + with self.assertRaisesMessage( + InvalidTask, + f"priority must be a whole number between {TASK_MIN_PRIORITY} and " + f"{TASK_MAX_PRIORITY}.", + ): + test_tasks.noop_task.using(priority=-101) + + with self.assertRaisesMessage( + InvalidTask, + f"priority must be a whole number between {TASK_MIN_PRIORITY} and " + f"{TASK_MAX_PRIORITY}.", + ): + test_tasks.noop_task.using(priority=101) + + with self.assertRaisesMessage( + InvalidTask, + f"priority must be a whole number between {TASK_MIN_PRIORITY} and " + f"{TASK_MAX_PRIORITY}.", + ): + test_tasks.noop_task.using(priority=3.1) + + test_tasks.noop_task.using(priority=100) + test_tasks.noop_task.using(priority=-100) + test_tasks.noop_task.using(priority=0) + + def test_unknown_queue_name(self): + with self.assertRaisesMessage( + InvalidTask, "Queue 'queue-2' is not valid for backend." + ): + test_tasks.noop_task.using(queue_name="queue-2") + # Validation is bypassed when the backend QUEUES is an empty list. + self.assertEqual( + test_tasks.noop_task.using( + queue_name="queue-2", backend="immediate" + ).queue_name, + "queue-2", + ) + + def test_call_task(self): + self.assertEqual(test_tasks.calculate_meaning_of_life.call(), 42) + + async def test_call_task_async(self): + self.assertEqual(await test_tasks.calculate_meaning_of_life.acall(), 42) + + async def test_call_async_task(self): + self.assertIsNone(await test_tasks.noop_task_async.acall()) + + def test_call_async_task_sync(self): + self.assertIsNone(test_tasks.noop_task_async.call()) + + def test_get_result(self): + result = default_task_backend.enqueue(test_tasks.noop_task, (), {}) + + new_result = test_tasks.noop_task.get_result(result.id) + + self.assertEqual(result, new_result) + + async def test_get_result_async(self): + result = await default_task_backend.aenqueue(test_tasks.noop_task, (), {}) + + new_result = await test_tasks.noop_task.aget_result(result.id) + + self.assertEqual(result, new_result) + + async def test_get_missing_result(self): + with self.assertRaises(TaskResultDoesNotExist): + test_tasks.noop_task.get_result("123") + + with self.assertRaises(TaskResultDoesNotExist): + await test_tasks.noop_task.aget_result("123") + + def test_get_incorrect_result(self): + result = default_task_backend.enqueue(test_tasks.noop_task_async, (), {}) + with self.assertRaisesMessage(TaskResultMismatch, "Task does not match"): + test_tasks.noop_task.get_result(result.id) + + async def test_get_incorrect_result_async(self): + result = await default_task_backend.aenqueue(test_tasks.noop_task_async, (), {}) + with self.assertRaisesMessage(TaskResultMismatch, "Task does not match"): + await test_tasks.noop_task.aget_result(result.id) + + def test_invalid_function(self): + for invalid_function in [any, self.test_invalid_function]: + with self.subTest(invalid_function): + with self.assertRaisesMessage( + InvalidTask, + "Task function must be defined at a module level.", + ): + task()(invalid_function) + + def test_get_backend(self): + self.assertEqual(test_tasks.noop_task.backend, "default") + self.assertIsInstance(test_tasks.noop_task.get_backend(), DummyBackend) + + immediate_task = test_tasks.noop_task.using(backend="immediate") + self.assertEqual(immediate_task.backend, "immediate") + self.assertIsInstance(immediate_task.get_backend(), ImmediateBackend) + + def test_name(self): + self.assertEqual(test_tasks.noop_task.name, "noop_task") + self.assertEqual(test_tasks.noop_task_async.name, "noop_task_async") + + def test_module_path(self): + self.assertEqual(test_tasks.noop_task.module_path, "tasks.tasks.noop_task") + self.assertEqual( + test_tasks.noop_task_async.module_path, "tasks.tasks.noop_task_async" + ) + + self.assertIs( + import_string(test_tasks.noop_task.module_path), test_tasks.noop_task + ) + self.assertIs( + import_string(test_tasks.noop_task_async.module_path), + test_tasks.noop_task_async, + ) + + @override_settings(TASKS={}) + def test_no_backends(self): + with self.assertRaises(InvalidTaskBackend): + test_tasks.noop_task.enqueue() + + def test_task_error_invalid_exception(self): + with self.assertLogs("django.tasks"): + immediate_task = test_tasks.failing_task_value_error.using( + backend="immediate" + ).enqueue() + + self.assertEqual(len(immediate_task.errors), 1) + + object.__setattr__( + immediate_task.errors[0], "exception_class_path", "subprocess.run" + ) + + with self.assertRaisesMessage( + ValueError, "'subprocess.run' does not reference a valid exception." + ): + immediate_task.errors[0].exception_class + + def test_task_error_unknown_module(self): + with self.assertLogs("django.tasks"): + immediate_task = test_tasks.failing_task_value_error.using( + backend="immediate" + ).enqueue() + + self.assertEqual(len(immediate_task.errors), 1) + + object.__setattr__( + immediate_task.errors[0], "exception_class_path", "does.not.exist" + ) + + with self.assertRaises(ImportError): + immediate_task.errors[0].exception_class + + def test_takes_context_without_taking_context(self): + with self.assertRaisesMessage( + InvalidTask, + "Task takes context but does not have a first argument of 'context'.", + ): + task(takes_context=True)(test_tasks.calculate_meaning_of_life.func) |
