summaryrefslogtreecommitdiff
path: root/tests/tasks
diff options
context:
space:
mode:
authorJake Howard <git@theorangeone.net>2025-07-17 12:51:09 +0100
committernessita <124304+nessita@users.noreply.github.com>2025-09-16 17:28:32 -0300
commit4289966d1b8e848e5e460b7c782dac009d746b20 (patch)
treeef1d61a33562579d985c762036db5f7aa01406fc /tests/tasks
parent218f69f05eb51da1ea17d62a914a67ceff5bfd55 (diff)
Fixed #35859 -- Added background Tasks framework interface.
This work implements what was defined in DEP 14 (https://github.com/django/deps/blob/main/accepted/0014-background-workers.rst). Thanks to Raphael Gaschignard, Eric Holscher, Ran Benita, Sarah Boyce, Jacob Walls, and Natalia Bidart for the reviews.
Diffstat (limited to 'tests/tasks')
-rw-r--r--tests/tasks/__init__.py0
-rw-r--r--tests/tasks/tasks.py88
-rw-r--r--tests/tasks/test_custom_backend.py71
-rw-r--r--tests/tasks/test_dummy_backend.py337
-rw-r--r--tests/tasks/test_immediate_backend.py387
-rw-r--r--tests/tasks/test_tasks.py316
6 files changed, 1199 insertions, 0 deletions
diff --git a/tests/tasks/__init__.py b/tests/tasks/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/tasks/__init__.py
diff --git a/tests/tasks/tasks.py b/tests/tasks/tasks.py
new file mode 100644
index 0000000000..3959660ab9
--- /dev/null
+++ b/tests/tasks/tasks.py
@@ -0,0 +1,88 @@
+import time
+
+from django.tasks import TaskContext, task
+
+
+@task()
+def noop_task(*args, **kwargs):
+ return None
+
+
+@task
+def noop_task_from_bare_decorator(*args, **kwargs):
+ return None
+
+
+@task()
+async def noop_task_async(*args, **kwargs):
+ return None
+
+
+@task()
+def calculate_meaning_of_life():
+ return 42
+
+
+@task()
+def failing_task_value_error():
+ raise ValueError("This Task failed due to ValueError")
+
+
+@task()
+def failing_task_system_exit():
+ raise SystemExit("This Task failed due to SystemExit")
+
+
+@task()
+def failing_task_keyboard_interrupt():
+ raise KeyboardInterrupt("This Task failed due to KeyboardInterrupt")
+
+
+@task()
+def complex_exception():
+ raise ValueError(ValueError("This task failed"))
+
+
+@task()
+def complex_return_value():
+ # Return something which isn't JSON serializable nor picklable.
+ return lambda: True
+
+
+@task()
+def exit_task():
+ exit(1)
+
+
+@task(enqueue_on_commit=True)
+def enqueue_on_commit_task():
+ pass
+
+
+@task(enqueue_on_commit=False)
+def never_enqueue_on_commit_task():
+ pass
+
+
+@task()
+def hang():
+ """
+ Do nothing for 5 minutes
+ """
+ time.sleep(300)
+
+
+@task()
+def sleep_for(seconds):
+ time.sleep(seconds)
+
+
+@task(takes_context=True)
+def get_task_id(context):
+ return context.task_result.id
+
+
+@task(takes_context=True)
+def test_context(context, attempt):
+ assert isinstance(context, TaskContext)
+ assert context.attempt == attempt
diff --git a/tests/tasks/test_custom_backend.py b/tests/tasks/test_custom_backend.py
new file mode 100644
index 0000000000..83a302a183
--- /dev/null
+++ b/tests/tasks/test_custom_backend.py
@@ -0,0 +1,71 @@
+import logging
+from unittest import mock
+
+from django.tasks import default_task_backend, task_backends
+from django.tasks.backends.base import BaseTaskBackend
+from django.tasks.exceptions import InvalidTask
+from django.test import SimpleTestCase, override_settings
+
+from . import tasks as test_tasks
+
+
+class CustomBackend(BaseTaskBackend):
+ def __init__(self, alias, params):
+ super().__init__(alias, params)
+ self.prefix = self.options.get("prefix", "")
+
+ def enqueue(self, *args, **kwargs):
+ logger = logging.getLogger(__name__)
+ logger.info(f"{self.prefix}Task enqueued.")
+
+
+class CustomBackendNoEnqueue(BaseTaskBackend):
+ pass
+
+
+@override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": f"{CustomBackend.__module__}.{CustomBackend.__qualname__}",
+ "ENQUEUE_ON_COMMIT": False,
+ "OPTIONS": {"prefix": "PREFIX: "},
+ },
+ "no_enqueue": {
+ "BACKEND": f"{CustomBackendNoEnqueue.__module__}."
+ f"{CustomBackendNoEnqueue.__qualname__}",
+ },
+ }
+)
+class CustomBackendTestCase(SimpleTestCase):
+ def test_using_correct_backend(self):
+ self.assertEqual(default_task_backend, task_backends["default"])
+ self.assertIsInstance(task_backends["default"], CustomBackend)
+ self.assertEqual(default_task_backend.alias, "default")
+ self.assertEqual(default_task_backend.options, {"prefix": "PREFIX: "})
+
+ @mock.patch.multiple(CustomBackend, supports_async_task=False)
+ def test_enqueue_async_task_on_non_async_backend(self):
+ with self.assertRaisesMessage(
+ InvalidTask, "Backend does not support async Tasks."
+ ):
+ default_task_backend.validate_task(test_tasks.noop_task_async)
+
+ def test_backend_does_not_support_priority(self):
+ with self.assertRaisesMessage(
+ InvalidTask, "Backend does not support setting priority of tasks."
+ ):
+ test_tasks.noop_task.using(priority=10)
+
+ def test_options(self):
+ with self.assertLogs(__name__, level="INFO") as captured_logs:
+ test_tasks.noop_task.enqueue()
+ self.assertEqual(len(captured_logs.output), 1)
+ self.assertIn("PREFIX: Task enqueued", captured_logs.output[0])
+
+ def test_no_enqueue(self):
+ with self.assertRaisesMessage(
+ TypeError,
+ "Can't instantiate abstract class CustomBackendNoEnqueue "
+ "without an implementation for abstract method 'enqueue'",
+ ):
+ test_tasks.noop_task.using(backend="no_enqueue")
diff --git a/tests/tasks/test_dummy_backend.py b/tests/tasks/test_dummy_backend.py
new file mode 100644
index 0000000000..27205f6ab3
--- /dev/null
+++ b/tests/tasks/test_dummy_backend.py
@@ -0,0 +1,337 @@
+from typing import cast
+from unittest import mock
+
+from django.db import transaction
+from django.db.utils import ConnectionHandler
+from django.tasks import TaskResultStatus, default_task_backend, task_backends
+from django.tasks.backends.dummy import DummyBackend
+from django.tasks.base import Task
+from django.tasks.exceptions import InvalidTask, TaskResultDoesNotExist
+from django.test import (
+ SimpleTestCase,
+ TransactionTestCase,
+ override_settings,
+ skipIfDBFeature,
+)
+
+from . import tasks as test_tasks
+
+
+@override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "QUEUES": [],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+)
+class DummyBackendTestCase(SimpleTestCase):
+ def setUp(self):
+ default_task_backend.clear()
+
+ def test_using_correct_backend(self):
+ self.assertEqual(default_task_backend, task_backends["default"])
+ self.assertIsInstance(task_backends["default"], DummyBackend)
+ self.assertEqual(default_task_backend.alias, "default")
+ self.assertEqual(default_task_backend.options, {})
+
+ def test_enqueue_task(self):
+ for task in [test_tasks.noop_task, test_tasks.noop_task_async]:
+ with self.subTest(task):
+ result = cast(Task, task).enqueue(1, two=3)
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ self.assertIs(result.is_finished, False)
+ self.assertIsNone(result.started_at)
+ self.assertIsNone(result.last_attempted_at)
+ self.assertIsNone(result.finished_at)
+ with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
+ result.return_value
+ self.assertEqual(result.task, task)
+ self.assertEqual(result.args, [1])
+ self.assertEqual(result.kwargs, {"two": 3})
+ self.assertEqual(result.attempts, 0)
+
+ self.assertIn(result, default_task_backend.results)
+
+ async def test_enqueue_task_async(self):
+ for task in [test_tasks.noop_task, test_tasks.noop_task_async]:
+ with self.subTest(task):
+ result = await cast(Task, task).aenqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ self.assertIs(result.is_finished, False)
+ self.assertIsNone(result.started_at)
+ self.assertIsNone(result.last_attempted_at)
+ self.assertIsNone(result.finished_at)
+ with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
+ result.return_value
+ self.assertEqual(result.task, task)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+ self.assertEqual(result.attempts, 0)
+
+ self.assertIn(result, default_task_backend.results)
+
+ def test_get_result(self):
+ result = default_task_backend.enqueue(test_tasks.noop_task, (), {})
+
+ new_result = default_task_backend.get_result(result.id)
+
+ self.assertEqual(result, new_result)
+
+ async def test_get_result_async(self):
+ result = await default_task_backend.aenqueue(test_tasks.noop_task, (), {})
+
+ new_result = await default_task_backend.aget_result(result.id)
+
+ self.assertEqual(result, new_result)
+
+ def test_refresh_result(self):
+ result = default_task_backend.enqueue(
+ test_tasks.calculate_meaning_of_life, (), {}
+ )
+
+ enqueued_result = default_task_backend.results[0]
+ object.__setattr__(enqueued_result, "status", TaskResultStatus.SUCCESSFUL)
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ result.refresh()
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ async def test_refresh_result_async(self):
+ result = await default_task_backend.aenqueue(
+ test_tasks.calculate_meaning_of_life, (), {}
+ )
+
+ enqueued_result = default_task_backend.results[0]
+ object.__setattr__(enqueued_result, "status", TaskResultStatus.SUCCESSFUL)
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ await result.arefresh()
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ async def test_get_missing_result(self):
+ with self.assertRaises(TaskResultDoesNotExist):
+ default_task_backend.get_result("123")
+
+ with self.assertRaises(TaskResultDoesNotExist):
+ await default_task_backend.aget_result("123")
+
+ def test_enqueue_on_commit(self):
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(
+ test_tasks.enqueue_on_commit_task
+ ),
+ True,
+ )
+
+ def test_enqueue_logs(self):
+ with self.assertLogs("django.tasks", level="DEBUG") as captured_logs:
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertEqual(len(captured_logs.output), 1)
+ self.assertIn("enqueued", captured_logs.output[0])
+ self.assertIn(result.id, captured_logs.output[0])
+
+ def test_errors(self):
+ result = test_tasks.noop_task.enqueue()
+ self.assertEqual(result.errors, [])
+
+ def test_validate_disallowed_async_task(self):
+ with mock.patch.multiple(default_task_backend, supports_async_task=False):
+ with self.assertRaisesMessage(
+ InvalidTask, "Backend does not support async Tasks."
+ ):
+ default_task_backend.validate_task(test_tasks.noop_task_async)
+
+ def test_check(self):
+ errors = list(default_task_backend.check())
+ self.assertEqual(len(errors), 0, errors)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "ENQUEUE_ON_COMMIT": True,
+ }
+ }
+ )
+ @mock.patch("django.tasks.backends.base.connections", ConnectionHandler({}))
+ def test_enqueue_on_commit_with_no_databases(self):
+ self.assertIn(
+ "tasks.E001", {error.id for error in default_task_backend.check()}
+ )
+
+ def test_takes_context(self):
+ result = test_tasks.get_task_id.enqueue()
+ self.assertEqual(result.status, TaskResultStatus.READY)
+
+ def test_clear(self):
+ result = test_tasks.noop_task.enqueue()
+
+ default_task_backend.get_result(result.id)
+
+ default_task_backend.clear()
+
+ with self.assertRaisesMessage(TaskResultDoesNotExist, result.id):
+ default_task_backend.get_result(result.id)
+
+ def test_validate_on_enqueue(self):
+ task_with_custom_queue_name = test_tasks.noop_task.using(
+ queue_name="unknown_queue"
+ )
+
+ with override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "QUEUES": ["queue-1"],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ ):
+ with self.assertRaisesMessage(
+ InvalidTask, "Queue 'unknown_queue' is not valid for backend"
+ ):
+ task_with_custom_queue_name.enqueue()
+
+ async def test_validate_on_aenqueue(self):
+ task_with_custom_queue_name = test_tasks.noop_task.using(
+ queue_name="unknown_queue"
+ )
+
+ with override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "QUEUES": ["queue-1"],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ ):
+ with self.assertRaisesMessage(
+ InvalidTask, "Queue 'unknown_queue' is not valid for backend"
+ ):
+ await task_with_custom_queue_name.aenqueue()
+
+
+class DummyBackendTransactionTestCase(TransactionTestCase):
+ available_apps = []
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "ENQUEUE_ON_COMMIT": True,
+ }
+ }
+ )
+ def test_wait_until_transaction_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ True,
+ )
+
+ with transaction.atomic():
+ test_tasks.noop_task.enqueue()
+
+ self.assertEqual(len(default_task_backend.results), 0)
+
+ self.assertEqual(len(default_task_backend.results), 1)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ )
+ def test_doesnt_wait_until_transaction_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, False)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ False,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertIsNotNone(result.enqueued_at)
+
+ self.assertEqual(len(default_task_backend.results), 1)
+
+ self.assertEqual(len(default_task_backend.results), 1)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ }
+ }
+ )
+ def test_wait_until_transaction_by_default(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ True,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertIsNone(result.enqueued_at)
+
+ self.assertEqual(len(default_task_backend.results), 0)
+
+ self.assertEqual(len(default_task_backend.results), 1)
+ self.assertIsNone(result.enqueued_at)
+ result.refresh()
+ self.assertIsNotNone(result.enqueued_at)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ )
+ def test_task_specific_enqueue_on_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, False)
+ self.assertIs(test_tasks.enqueue_on_commit_task.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(
+ test_tasks.enqueue_on_commit_task
+ ),
+ True,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.enqueue_on_commit_task.enqueue()
+
+ self.assertIsNone(result.enqueued_at)
+
+ self.assertEqual(len(default_task_backend.results), 0)
+
+ self.assertEqual(len(default_task_backend.results), 1)
+ self.assertIsNone(result.enqueued_at)
+ result.refresh()
+ self.assertIsNotNone(result.enqueued_at)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "ENQUEUE_ON_COMMIT": True,
+ }
+ }
+ )
+ @skipIfDBFeature("supports_transactions")
+ def test_enqueue_on_commit_with_no_transactions(self):
+ self.assertIn(
+ "tasks.E002", {error.id for error in default_task_backend.check()}
+ )
diff --git a/tests/tasks/test_immediate_backend.py b/tests/tasks/test_immediate_backend.py
new file mode 100644
index 0000000000..01e2841aa7
--- /dev/null
+++ b/tests/tasks/test_immediate_backend.py
@@ -0,0 +1,387 @@
+from django.db import transaction
+from django.tasks import TaskResultStatus, default_task_backend, task_backends
+from django.tasks.backends.immediate import ImmediateBackend
+from django.tasks.exceptions import InvalidTask
+from django.test import SimpleTestCase, TransactionTestCase, override_settings
+from django.utils import timezone
+
+from . import tasks as test_tasks
+
+
+@override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "QUEUES": [],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+)
+class ImmediateBackendTestCase(SimpleTestCase):
+ def test_using_correct_backend(self):
+ self.assertEqual(default_task_backend, task_backends["default"])
+ self.assertIsInstance(task_backends["default"], ImmediateBackend)
+ self.assertEqual(default_task_backend.alias, "default")
+ self.assertEqual(default_task_backend.options, {})
+
+ def test_enqueue_task(self):
+ for task in [test_tasks.noop_task, test_tasks.noop_task_async]:
+ with self.subTest(task):
+ result = task.enqueue(1, two=3)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+ self.assertIs(result.is_finished, True)
+ self.assertIsNotNone(result.started_at)
+ self.assertIsNotNone(result.last_attempted_at)
+ self.assertIsNotNone(result.finished_at)
+ self.assertGreaterEqual(result.started_at, result.enqueued_at)
+ self.assertGreaterEqual(result.finished_at, result.started_at)
+ self.assertIsNone(result.return_value)
+ self.assertEqual(result.task, task)
+ self.assertEqual(result.args, [1])
+ self.assertEqual(result.kwargs, {"two": 3})
+ self.assertEqual(result.attempts, 1)
+
+ async def test_enqueue_task_async(self):
+ for task in [test_tasks.noop_task, test_tasks.noop_task_async]:
+ with self.subTest(task):
+ result = await task.aenqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+ self.assertIs(result.is_finished, True)
+ self.assertIsNotNone(result.started_at)
+ self.assertIsNotNone(result.last_attempted_at)
+ self.assertIsNotNone(result.finished_at)
+ self.assertGreaterEqual(result.started_at, result.enqueued_at)
+ self.assertGreaterEqual(result.finished_at, result.started_at)
+ self.assertIsNone(result.return_value)
+ self.assertEqual(result.task, task)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+ self.assertEqual(result.attempts, 1)
+
+ def test_catches_exception(self):
+ test_data = [
+ (
+ test_tasks.failing_task_value_error, # Task function.
+ ValueError, # Expected exception.
+ "This Task failed due to ValueError", # Expected message.
+ ),
+ (
+ test_tasks.failing_task_system_exit,
+ SystemExit,
+ "This Task failed due to SystemExit",
+ ),
+ ]
+ for task, exception, message in test_data:
+ with (
+ self.subTest(task),
+ self.assertLogs("django.tasks", level="ERROR") as captured_logs,
+ ):
+ result = task.enqueue()
+
+ self.assertEqual(len(captured_logs.output), 1)
+ self.assertIn(message, captured_logs.output[0])
+
+ self.assertEqual(result.status, TaskResultStatus.FAILED)
+ with self.assertRaisesMessage(ValueError, "Task failed"):
+ result.return_value
+ self.assertIs(result.is_finished, True)
+ self.assertIsNotNone(result.started_at)
+ self.assertIsNotNone(result.last_attempted_at)
+ self.assertIsNotNone(result.finished_at)
+ self.assertGreaterEqual(result.started_at, result.enqueued_at)
+ self.assertGreaterEqual(result.finished_at, result.started_at)
+ self.assertEqual(result.errors[0].exception_class, exception)
+ traceback = result.errors[0].traceback
+ self.assertIs(
+ traceback
+ and traceback.endswith(f"{exception.__name__}: {message}\n"),
+ True,
+ traceback,
+ )
+ self.assertEqual(result.task, task)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+
+ def test_throws_keyboard_interrupt(self):
+ with self.assertRaises(KeyboardInterrupt):
+ with self.assertNoLogs("django.tasks", level="ERROR"):
+ default_task_backend.enqueue(
+ test_tasks.failing_task_keyboard_interrupt, [], {}
+ )
+
+ def test_complex_exception(self):
+ with self.assertLogs("django.tasks", level="ERROR"):
+ result = test_tasks.complex_exception.enqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.FAILED)
+ with self.assertRaisesMessage(ValueError, "Task failed"):
+ result.return_value
+ self.assertIsNotNone(result.started_at)
+ self.assertIsNotNone(result.last_attempted_at)
+ self.assertIsNotNone(result.finished_at)
+ self.assertGreaterEqual(result.started_at, result.enqueued_at)
+ self.assertGreaterEqual(result.finished_at, result.started_at)
+
+ self.assertIsNone(result._return_value)
+ self.assertEqual(result.errors[0].exception_class, ValueError)
+ self.assertIn(
+ 'ValueError(ValueError("This task failed"))', result.errors[0].traceback
+ )
+
+ self.assertEqual(result.task, test_tasks.complex_exception)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+
+ def test_complex_return_value(self):
+ with self.assertLogs("django.tasks", level="ERROR"):
+ result = test_tasks.complex_return_value.enqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.FAILED)
+ self.assertIsNotNone(result.started_at)
+ self.assertIsNotNone(result.last_attempted_at)
+ self.assertIsNotNone(result.finished_at)
+ self.assertGreaterEqual(result.started_at, result.enqueued_at)
+ self.assertGreaterEqual(result.finished_at, result.started_at)
+ self.assertIsNone(result._return_value)
+ self.assertEqual(result.errors[0].exception_class, TypeError)
+ self.assertIn("Unsupported type", result.errors[0].traceback)
+
+ def test_result(self):
+ result = default_task_backend.enqueue(
+ test_tasks.calculate_meaning_of_life, [], {}
+ )
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+ self.assertEqual(result.return_value, 42)
+
+ async def test_result_async(self):
+ result = await default_task_backend.aenqueue(
+ test_tasks.calculate_meaning_of_life, [], {}
+ )
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+ self.assertEqual(result.return_value, 42)
+
+ async def test_cannot_get_result(self):
+ with self.assertRaisesMessage(
+ NotImplementedError,
+ "This backend does not support retrieving or refreshing results.",
+ ):
+ default_task_backend.get_result("123")
+
+ with self.assertRaisesMessage(
+ NotImplementedError,
+ "This backend does not support retrieving or refreshing results.",
+ ):
+ await default_task_backend.aget_result(123)
+
+ async def test_cannot_refresh_result(self):
+ result = await default_task_backend.aenqueue(
+ test_tasks.calculate_meaning_of_life, (), {}
+ )
+
+ with self.assertRaisesMessage(
+ NotImplementedError,
+ "This backend does not support retrieving or refreshing results.",
+ ):
+ await result.arefresh()
+
+ with self.assertRaisesMessage(
+ NotImplementedError,
+ "This backend does not support retrieving or refreshing results.",
+ ):
+ result.refresh()
+
+ def test_cannot_pass_run_after(self):
+ with self.assertRaisesMessage(
+ InvalidTask,
+ "Backend does not support run_after.",
+ ):
+ default_task_backend.validate_task(
+ test_tasks.failing_task_value_error.using(run_after=timezone.now())
+ )
+
+ def test_enqueue_on_commit(self):
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(
+ test_tasks.enqueue_on_commit_task
+ ),
+ True,
+ )
+
+ def test_enqueue_logs(self):
+ with self.assertLogs("django.tasks", level="DEBUG") as captured_logs:
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertEqual(len(captured_logs.output), 3)
+
+ self.assertIn("enqueued", captured_logs.output[0])
+ self.assertIn(result.id, captured_logs.output[0])
+
+ self.assertIn("state=RUNNING", captured_logs.output[1])
+ self.assertIn(result.id, captured_logs.output[1])
+
+ self.assertIn("state=SUCCESSFUL", captured_logs.output[2])
+ self.assertIn(result.id, captured_logs.output[2])
+
+ def test_failed_logs(self):
+ with self.assertLogs("django.tasks", level="DEBUG") as captured_logs:
+ result = test_tasks.failing_task_value_error.enqueue()
+
+ self.assertEqual(len(captured_logs.output), 3)
+ self.assertIn("state=RUNNING", captured_logs.output[1])
+ self.assertIn(result.id, captured_logs.output[1])
+
+ self.assertIn("state=FAILED", captured_logs.output[2])
+ self.assertIn(result.id, captured_logs.output[2])
+
+ def test_takes_context(self):
+ result = test_tasks.get_task_id.enqueue()
+
+ self.assertEqual(result.return_value, result.id)
+
+ def test_context(self):
+ result = test_tasks.test_context.enqueue(1)
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ def test_validate_on_enqueue(self):
+ task_with_custom_queue_name = test_tasks.noop_task.using(
+ queue_name="unknown_queue"
+ )
+
+ with override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "QUEUES": ["queue-1"],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ ):
+ with self.assertRaisesMessage(
+ InvalidTask, "Queue 'unknown_queue' is not valid for backend"
+ ):
+ task_with_custom_queue_name.enqueue()
+
+ async def test_validate_on_aenqueue(self):
+ task_with_custom_queue_name = test_tasks.noop_task.using(
+ queue_name="unknown_queue"
+ )
+
+ with override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "QUEUES": ["queue-1"],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ ):
+ with self.assertRaisesMessage(
+ InvalidTask, "Queue 'unknown_queue' is not valid for backend"
+ ):
+ await task_with_custom_queue_name.aenqueue()
+
+
+class ImmediateBackendTransactionTestCase(TransactionTestCase):
+ available_apps = []
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "ENQUEUE_ON_COMMIT": True,
+ }
+ }
+ )
+ def test_wait_until_transaction_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ True,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertIsNone(result.enqueued_at)
+ self.assertEqual(result.attempts, 0)
+ self.assertEqual(result.status, TaskResultStatus.READY)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+ self.assertIsNotNone(result.enqueued_at)
+ self.assertEqual(result.attempts, 1)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ )
+ def test_doesnt_wait_until_transaction_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, False)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ False,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertIsNotNone(result.enqueued_at)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ }
+ }
+ )
+ def test_wait_until_transaction_by_default(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ True,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertIsNone(result.enqueued_at)
+ self.assertEqual(result.status, TaskResultStatus.READY)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ )
+ def test_task_specific_enqueue_on_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, False)
+ self.assertIs(test_tasks.enqueue_on_commit_task.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(
+ test_tasks.enqueue_on_commit_task
+ ),
+ True,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.enqueue_on_commit_task.enqueue()
+
+ self.assertIsNone(result.enqueued_at)
+ self.assertEqual(result.status, TaskResultStatus.READY)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
diff --git a/tests/tasks/test_tasks.py b/tests/tasks/test_tasks.py
new file mode 100644
index 0000000000..2c11d9657d
--- /dev/null
+++ b/tests/tasks/test_tasks.py
@@ -0,0 +1,316 @@
+import dataclasses
+from datetime import datetime
+
+from django.tasks import (
+ DEFAULT_TASK_QUEUE_NAME,
+ TaskResultStatus,
+ default_task_backend,
+ task,
+ task_backends,
+)
+from django.tasks.backends.dummy import DummyBackend
+from django.tasks.backends.immediate import ImmediateBackend
+from django.tasks.base import TASK_MAX_PRIORITY, TASK_MIN_PRIORITY, Task
+from django.tasks.exceptions import (
+ InvalidTask,
+ InvalidTaskBackend,
+ TaskResultDoesNotExist,
+ TaskResultMismatch,
+)
+from django.test import SimpleTestCase, override_settings
+from django.utils import timezone
+from django.utils.module_loading import import_string
+
+from . import tasks as test_tasks
+
+
+@override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "QUEUES": ["default", "queue_1"],
+ "ENQUEUE_ON_COMMIT": False,
+ },
+ "immediate": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "ENQUEUE_ON_COMMIT": False,
+ "QUEUES": [],
+ },
+ "missing": {"BACKEND": "does.not.exist"},
+ },
+ USE_TZ=True,
+)
+class TaskTestCase(SimpleTestCase):
+ def setUp(self):
+ default_task_backend.clear()
+
+ def test_using_correct_backend(self):
+ self.assertEqual(default_task_backend, task_backends["default"])
+ self.assertIsInstance(task_backends["default"], DummyBackend)
+
+ def test_task_decorator(self):
+ self.assertIsInstance(test_tasks.noop_task, Task)
+ self.assertIsInstance(test_tasks.noop_task_async, Task)
+ self.assertIsInstance(test_tasks.noop_task_from_bare_decorator, Task)
+
+ def test_enqueue_task(self):
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ self.assertEqual(result.task, test_tasks.noop_task)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+
+ self.assertEqual(default_task_backend.results, [result])
+
+ async def test_enqueue_task_async(self):
+ result = await test_tasks.noop_task.aenqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ self.assertEqual(result.task, test_tasks.noop_task)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+
+ self.assertEqual(default_task_backend.results, [result])
+
+ def test_enqueue_with_invalid_argument(self):
+ with self.assertRaisesMessage(TypeError, "Unsupported type"):
+ test_tasks.noop_task.enqueue(datetime.now())
+
+ async def test_aenqueue_with_invalid_argument(self):
+ with self.assertRaisesMessage(TypeError, "Unsupported type"):
+ await test_tasks.noop_task.aenqueue(datetime.now())
+
+ def test_using_priority(self):
+ self.assertEqual(test_tasks.noop_task.priority, 0)
+ self.assertEqual(test_tasks.noop_task.using(priority=1).priority, 1)
+ self.assertEqual(test_tasks.noop_task.priority, 0)
+
+ def test_using_queue_name(self):
+ self.assertEqual(test_tasks.noop_task.queue_name, DEFAULT_TASK_QUEUE_NAME)
+ self.assertEqual(
+ test_tasks.noop_task.using(queue_name="queue_1").queue_name, "queue_1"
+ )
+ self.assertEqual(test_tasks.noop_task.queue_name, DEFAULT_TASK_QUEUE_NAME)
+
+ def test_using_run_after(self):
+ now = timezone.now()
+
+ self.assertIsNone(test_tasks.noop_task.run_after)
+ self.assertEqual(test_tasks.noop_task.using(run_after=now).run_after, now)
+ self.assertIsNone(test_tasks.noop_task.run_after)
+
+ def test_using_unknown_backend(self):
+ self.assertEqual(test_tasks.noop_task.backend, "default")
+
+ with self.assertRaisesMessage(
+ InvalidTaskBackend, "The connection 'unknown' doesn't exist."
+ ):
+ test_tasks.noop_task.using(backend="unknown")
+
+ def test_using_missing_backend(self):
+ self.assertEqual(test_tasks.noop_task.backend, "default")
+
+ with self.assertRaisesMessage(
+ InvalidTaskBackend,
+ "Could not find backend 'does.not.exist': No module named 'does'",
+ ):
+ test_tasks.noop_task.using(backend="missing")
+
+ def test_using_creates_new_instance(self):
+ new_task = test_tasks.noop_task.using()
+
+ self.assertEqual(new_task, test_tasks.noop_task)
+ self.assertIsNot(new_task, test_tasks.noop_task)
+
+ def test_chained_using(self):
+ now = timezone.now()
+
+ run_after_task = test_tasks.noop_task.using(run_after=now)
+ self.assertEqual(run_after_task.run_after, now)
+
+ priority_task = run_after_task.using(priority=10)
+ self.assertEqual(priority_task.priority, 10)
+ self.assertEqual(priority_task.run_after, now)
+
+ self.assertEqual(run_after_task.priority, 0)
+
+ async def test_refresh_result(self):
+ result = await test_tasks.noop_task.aenqueue()
+
+ original_result = dataclasses.asdict(result)
+
+ result.refresh()
+
+ self.assertEqual(dataclasses.asdict(result), original_result)
+
+ await result.arefresh()
+
+ self.assertEqual(dataclasses.asdict(result), original_result)
+
+ def test_naive_datetime(self):
+ with self.assertRaisesMessage(
+ InvalidTask, "run_after must be an aware datetime."
+ ):
+ test_tasks.noop_task.using(run_after=datetime.now())
+
+ def test_invalid_priority(self):
+ with self.assertRaisesMessage(
+ InvalidTask,
+ f"priority must be a whole number between {TASK_MIN_PRIORITY} and "
+ f"{TASK_MAX_PRIORITY}.",
+ ):
+ test_tasks.noop_task.using(priority=-101)
+
+ with self.assertRaisesMessage(
+ InvalidTask,
+ f"priority must be a whole number between {TASK_MIN_PRIORITY} and "
+ f"{TASK_MAX_PRIORITY}.",
+ ):
+ test_tasks.noop_task.using(priority=101)
+
+ with self.assertRaisesMessage(
+ InvalidTask,
+ f"priority must be a whole number between {TASK_MIN_PRIORITY} and "
+ f"{TASK_MAX_PRIORITY}.",
+ ):
+ test_tasks.noop_task.using(priority=3.1)
+
+ test_tasks.noop_task.using(priority=100)
+ test_tasks.noop_task.using(priority=-100)
+ test_tasks.noop_task.using(priority=0)
+
+ def test_unknown_queue_name(self):
+ with self.assertRaisesMessage(
+ InvalidTask, "Queue 'queue-2' is not valid for backend."
+ ):
+ test_tasks.noop_task.using(queue_name="queue-2")
+ # Validation is bypassed when the backend QUEUES is an empty list.
+ self.assertEqual(
+ test_tasks.noop_task.using(
+ queue_name="queue-2", backend="immediate"
+ ).queue_name,
+ "queue-2",
+ )
+
+ def test_call_task(self):
+ self.assertEqual(test_tasks.calculate_meaning_of_life.call(), 42)
+
+ async def test_call_task_async(self):
+ self.assertEqual(await test_tasks.calculate_meaning_of_life.acall(), 42)
+
+ async def test_call_async_task(self):
+ self.assertIsNone(await test_tasks.noop_task_async.acall())
+
+ def test_call_async_task_sync(self):
+ self.assertIsNone(test_tasks.noop_task_async.call())
+
+ def test_get_result(self):
+ result = default_task_backend.enqueue(test_tasks.noop_task, (), {})
+
+ new_result = test_tasks.noop_task.get_result(result.id)
+
+ self.assertEqual(result, new_result)
+
+ async def test_get_result_async(self):
+ result = await default_task_backend.aenqueue(test_tasks.noop_task, (), {})
+
+ new_result = await test_tasks.noop_task.aget_result(result.id)
+
+ self.assertEqual(result, new_result)
+
+ async def test_get_missing_result(self):
+ with self.assertRaises(TaskResultDoesNotExist):
+ test_tasks.noop_task.get_result("123")
+
+ with self.assertRaises(TaskResultDoesNotExist):
+ await test_tasks.noop_task.aget_result("123")
+
+ def test_get_incorrect_result(self):
+ result = default_task_backend.enqueue(test_tasks.noop_task_async, (), {})
+ with self.assertRaisesMessage(TaskResultMismatch, "Task does not match"):
+ test_tasks.noop_task.get_result(result.id)
+
+ async def test_get_incorrect_result_async(self):
+ result = await default_task_backend.aenqueue(test_tasks.noop_task_async, (), {})
+ with self.assertRaisesMessage(TaskResultMismatch, "Task does not match"):
+ await test_tasks.noop_task.aget_result(result.id)
+
+ def test_invalid_function(self):
+ for invalid_function in [any, self.test_invalid_function]:
+ with self.subTest(invalid_function):
+ with self.assertRaisesMessage(
+ InvalidTask,
+ "Task function must be defined at a module level.",
+ ):
+ task()(invalid_function)
+
+ def test_get_backend(self):
+ self.assertEqual(test_tasks.noop_task.backend, "default")
+ self.assertIsInstance(test_tasks.noop_task.get_backend(), DummyBackend)
+
+ immediate_task = test_tasks.noop_task.using(backend="immediate")
+ self.assertEqual(immediate_task.backend, "immediate")
+ self.assertIsInstance(immediate_task.get_backend(), ImmediateBackend)
+
+ def test_name(self):
+ self.assertEqual(test_tasks.noop_task.name, "noop_task")
+ self.assertEqual(test_tasks.noop_task_async.name, "noop_task_async")
+
+ def test_module_path(self):
+ self.assertEqual(test_tasks.noop_task.module_path, "tasks.tasks.noop_task")
+ self.assertEqual(
+ test_tasks.noop_task_async.module_path, "tasks.tasks.noop_task_async"
+ )
+
+ self.assertIs(
+ import_string(test_tasks.noop_task.module_path), test_tasks.noop_task
+ )
+ self.assertIs(
+ import_string(test_tasks.noop_task_async.module_path),
+ test_tasks.noop_task_async,
+ )
+
+ @override_settings(TASKS={})
+ def test_no_backends(self):
+ with self.assertRaises(InvalidTaskBackend):
+ test_tasks.noop_task.enqueue()
+
+ def test_task_error_invalid_exception(self):
+ with self.assertLogs("django.tasks"):
+ immediate_task = test_tasks.failing_task_value_error.using(
+ backend="immediate"
+ ).enqueue()
+
+ self.assertEqual(len(immediate_task.errors), 1)
+
+ object.__setattr__(
+ immediate_task.errors[0], "exception_class_path", "subprocess.run"
+ )
+
+ with self.assertRaisesMessage(
+ ValueError, "'subprocess.run' does not reference a valid exception."
+ ):
+ immediate_task.errors[0].exception_class
+
+ def test_task_error_unknown_module(self):
+ with self.assertLogs("django.tasks"):
+ immediate_task = test_tasks.failing_task_value_error.using(
+ backend="immediate"
+ ).enqueue()
+
+ self.assertEqual(len(immediate_task.errors), 1)
+
+ object.__setattr__(
+ immediate_task.errors[0], "exception_class_path", "does.not.exist"
+ )
+
+ with self.assertRaises(ImportError):
+ immediate_task.errors[0].exception_class
+
+ def test_takes_context_without_taking_context(self):
+ with self.assertRaisesMessage(
+ InvalidTask,
+ "Task takes context but does not have a first argument of 'context'.",
+ ):
+ task(takes_context=True)(test_tasks.calculate_meaning_of_life.func)