summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJake Howard <git@theorangeone.net>2025-07-17 12:51:09 +0100
committernessita <124304+nessita@users.noreply.github.com>2025-09-16 17:28:32 -0300
commit4289966d1b8e848e5e460b7c782dac009d746b20 (patch)
treeef1d61a33562579d985c762036db5f7aa01406fc
parent218f69f05eb51da1ea17d62a914a67ceff5bfd55 (diff)
Fixed #35859 -- Added background Tasks framework interface.
This work implements what was defined in DEP 14 (https://github.com/django/deps/blob/main/accepted/0014-background-workers.rst). Thanks to Raphael Gaschignard, Eric Holscher, Ran Benita, Sarah Boyce, Jacob Walls, and Natalia Bidart for the reviews.
-rw-r--r--django/conf/global_settings.py5
-rw-r--r--django/tasks/__init__.py45
-rw-r--r--django/tasks/backends/__init__.py0
-rw-r--r--django/tasks/backends/base.py138
-rw-r--r--django/tasks/backends/dummy.py69
-rw-r--r--django/tasks/backends/immediate.py100
-rw-r--r--django/tasks/base.py253
-rw-r--r--django/tasks/checks.py11
-rw-r--r--django/tasks/exceptions.py21
-rw-r--r--django/tasks/signals.py64
-rw-r--r--django/utils/inspect.py10
-rw-r--r--django/utils/json.py19
-rw-r--r--docs/ref/checks.txt8
-rw-r--r--docs/ref/index.txt1
-rw-r--r--docs/ref/settings.txt76
-rw-r--r--docs/ref/signals.txt57
-rw-r--r--docs/ref/tasks.txt444
-rw-r--r--docs/releases/6.0.txt39
-rw-r--r--docs/spelling_wordlist1
-rw-r--r--docs/topics/index.txt1
-rw-r--r--docs/topics/tasks.txt438
-rw-r--r--tests/tasks/__init__.py0
-rw-r--r--tests/tasks/tasks.py88
-rw-r--r--tests/tasks/test_custom_backend.py71
-rw-r--r--tests/tasks/test_dummy_backend.py337
-rw-r--r--tests/tasks/test_immediate_backend.py387
-rw-r--r--tests/tasks/test_tasks.py316
-rw-r--r--tests/utils_tests/test_inspect.py49
-rw-r--r--tests/utils_tests/test_json.py46
29 files changed, 3094 insertions, 0 deletions
diff --git a/django/conf/global_settings.py b/django/conf/global_settings.py
index 77f2b83e68..72f84dd6af 100644
--- a/django/conf/global_settings.py
+++ b/django/conf/global_settings.py
@@ -672,3 +672,8 @@ SECURE_CSP_REPORT_ONLY = {}
# HTTPS as the default protocol in urlize and urlizetrunc when no protocol is
# provided. Set to True to assume HTTPS during the Django 6.x release cycle.
URLIZE_ASSUME_HTTPS = False
+
+#########
+# TASKS #
+#########
+TASKS = {"default": {"BACKEND": "django.tasks.backends.immediate.ImmediateBackend"}}
diff --git a/django/tasks/__init__.py b/django/tasks/__init__.py
new file mode 100644
index 0000000000..ae7ae8dd7c
--- /dev/null
+++ b/django/tasks/__init__.py
@@ -0,0 +1,45 @@
+from django.utils.connection import BaseConnectionHandler, ConnectionProxy
+from django.utils.module_loading import import_string
+
+from . import checks, signals # NOQA
+from .base import (
+ DEFAULT_TASK_BACKEND_ALIAS,
+ DEFAULT_TASK_QUEUE_NAME,
+ Task,
+ TaskContext,
+ TaskResult,
+ TaskResultStatus,
+ task,
+)
+from .exceptions import InvalidTaskBackend
+
+__all__ = [
+ "DEFAULT_TASK_BACKEND_ALIAS",
+ "DEFAULT_TASK_QUEUE_NAME",
+ "default_task_backend",
+ "task",
+ "task_backends",
+ "Task",
+ "TaskContext",
+ "TaskResult",
+ "TaskResultStatus",
+]
+
+
+class TaskBackendHandler(BaseConnectionHandler):
+ settings_name = "TASKS"
+ exception_class = InvalidTaskBackend
+
+ def create_connection(self, alias):
+ params = self.settings[alias]
+ backend = params["BACKEND"]
+ try:
+ backend_cls = import_string(backend)
+ except ImportError as e:
+ raise InvalidTaskBackend(f"Could not find backend '{backend}': {e}") from e
+ return backend_cls(alias=alias, params=params)
+
+
+task_backends = TaskBackendHandler()
+
+default_task_backend = ConnectionProxy(task_backends, DEFAULT_TASK_BACKEND_ALIAS)
diff --git a/django/tasks/backends/__init__.py b/django/tasks/backends/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/django/tasks/backends/__init__.py
diff --git a/django/tasks/backends/base.py b/django/tasks/backends/base.py
new file mode 100644
index 0000000000..32ae10018d
--- /dev/null
+++ b/django/tasks/backends/base.py
@@ -0,0 +1,138 @@
+from abc import ABCMeta, abstractmethod
+from inspect import iscoroutinefunction
+
+from asgiref.sync import sync_to_async
+
+from django.conf import settings
+from django.core import checks
+from django.db import connections
+from django.tasks import DEFAULT_TASK_QUEUE_NAME
+from django.tasks.base import (
+ DEFAULT_TASK_PRIORITY,
+ TASK_MAX_PRIORITY,
+ TASK_MIN_PRIORITY,
+ Task,
+)
+from django.tasks.exceptions import InvalidTask
+from django.utils import timezone
+from django.utils.inspect import get_func_args, is_module_level_function
+
+
+class BaseTaskBackend(metaclass=ABCMeta):
+ task_class = Task
+
+ # Does the backend support Tasks to be enqueued with the run_after
+ # attribute?
+ supports_defer = False
+
+ # Does the backend support coroutines to be enqueued?
+ supports_async_task = False
+
+ # Does the backend support results being retrieved (from any
+ # thread/process)?
+ supports_get_result = False
+
+ # Does the backend support executing Tasks in a given
+ # priority order?
+ supports_priority = False
+
+ def __init__(self, alias, params):
+ self.alias = alias
+ self.queues = set(params.get("QUEUES", [DEFAULT_TASK_QUEUE_NAME]))
+ self.enqueue_on_commit = bool(params.get("ENQUEUE_ON_COMMIT", True))
+ self.options = params.get("OPTIONS", {})
+
+ def _get_enqueue_on_commit_for_task(self, task):
+ return (
+ task.enqueue_on_commit
+ if task.enqueue_on_commit is not None
+ else self.enqueue_on_commit
+ )
+
+ def validate_task(self, task):
+ """
+ Determine whether the provided Task can be executed by the backend.
+ """
+ if not is_module_level_function(task.func):
+ raise InvalidTask("Task function must be defined at a module level.")
+
+ if not self.supports_async_task and iscoroutinefunction(task.func):
+ raise InvalidTask("Backend does not support async Tasks.")
+
+ task_func_args = get_func_args(task.func)
+ if task.takes_context and (
+ not task_func_args or task_func_args[0] != "context"
+ ):
+ raise InvalidTask(
+ "Task takes context but does not have a first argument of 'context'."
+ )
+
+ if not self.supports_priority and task.priority != DEFAULT_TASK_PRIORITY:
+ raise InvalidTask("Backend does not support setting priority of tasks.")
+ if (
+ task.priority < TASK_MIN_PRIORITY
+ or task.priority > TASK_MAX_PRIORITY
+ or int(task.priority) != task.priority
+ ):
+ raise InvalidTask(
+ f"priority must be a whole number between {TASK_MIN_PRIORITY} and "
+ f"{TASK_MAX_PRIORITY}."
+ )
+
+ if not self.supports_defer and task.run_after is not None:
+ raise InvalidTask("Backend does not support run_after.")
+
+ if (
+ settings.USE_TZ
+ and task.run_after is not None
+ and not timezone.is_aware(task.run_after)
+ ):
+ raise InvalidTask("run_after must be an aware datetime.")
+
+ if self.queues and task.queue_name not in self.queues:
+ raise InvalidTask(f"Queue '{task.queue_name}' is not valid for backend.")
+
+ @abstractmethod
+ def enqueue(self, task, args, kwargs):
+ """Queue up a task to be executed."""
+
+ async def aenqueue(self, task, args, kwargs):
+ """Queue up a task function (or coroutine) to be executed."""
+ return await sync_to_async(self.enqueue, thread_sensitive=True)(
+ task=task, args=args, kwargs=kwargs
+ )
+
+ def get_result(self, result_id):
+ """
+ Retrieve a task result by id.
+
+ Raise TaskResultDoesNotExist if such result does not exist.
+ """
+ raise NotImplementedError(
+ "This backend does not support retrieving or refreshing results."
+ )
+
+ async def aget_result(self, result_id):
+ """See get_result()."""
+ return await sync_to_async(self.get_result, thread_sensitive=True)(
+ result_id=result_id
+ )
+
+ def check(self, **kwargs):
+ if self.enqueue_on_commit and not connections._settings:
+ yield checks.Error(
+ "ENQUEUE_ON_COMMIT cannot be used when no databases are configured.",
+ hint="Set ENQUEUE_ON_COMMIT to False",
+ id="tasks.E001",
+ )
+
+ elif (
+ self.enqueue_on_commit
+ and not connections["default"].features.supports_transactions
+ ):
+ yield checks.Error(
+ "ENQUEUE_ON_COMMIT cannot be used on a database which doesn't support "
+ "transactions.",
+ hint="Set ENQUEUE_ON_COMMIT to False",
+ id="tasks.E002",
+ )
diff --git a/django/tasks/backends/dummy.py b/django/tasks/backends/dummy.py
new file mode 100644
index 0000000000..93bb8f3ee4
--- /dev/null
+++ b/django/tasks/backends/dummy.py
@@ -0,0 +1,69 @@
+from copy import deepcopy
+from functools import partial
+
+from django.db import transaction
+from django.tasks.base import TaskResult, TaskResultStatus
+from django.tasks.exceptions import TaskResultDoesNotExist
+from django.tasks.signals import task_enqueued
+from django.utils import timezone
+from django.utils.crypto import get_random_string
+
+from .base import BaseTaskBackend
+
+
+class DummyBackend(BaseTaskBackend):
+ supports_defer = True
+ supports_async_task = True
+ supports_priority = True
+
+ def __init__(self, alias, params):
+ super().__init__(alias, params)
+ self.results = []
+
+ def _store_result(self, result):
+ object.__setattr__(result, "enqueued_at", timezone.now())
+ self.results.append(result)
+ task_enqueued.send(type(self), task_result=result)
+
+ def enqueue(self, task, args, kwargs):
+ self.validate_task(task)
+
+ result = TaskResult(
+ task=task,
+ id=get_random_string(32),
+ status=TaskResultStatus.READY,
+ enqueued_at=None,
+ started_at=None,
+ last_attempted_at=None,
+ finished_at=None,
+ args=args,
+ kwargs=kwargs,
+ backend=self.alias,
+ errors=[],
+ worker_ids=[],
+ )
+
+ if self._get_enqueue_on_commit_for_task(task) is not False:
+ transaction.on_commit(partial(self._store_result, result))
+ else:
+ self._store_result(result)
+
+ # Copy the task to prevent mutation issues.
+ return deepcopy(result)
+
+ def get_result(self, result_id):
+ # Results are only scoped to the current thread, hence
+ # supports_get_result is False.
+ try:
+ return next(result for result in self.results if result.id == result_id)
+ except StopIteration:
+ raise TaskResultDoesNotExist(result_id) from None
+
+ async def aget_result(self, result_id):
+ try:
+ return next(result for result in self.results if result.id == result_id)
+ except StopIteration:
+ raise TaskResultDoesNotExist(result_id) from None
+
+ def clear(self):
+ self.results.clear()
diff --git a/django/tasks/backends/immediate.py b/django/tasks/backends/immediate.py
new file mode 100644
index 0000000000..06b94d18ab
--- /dev/null
+++ b/django/tasks/backends/immediate.py
@@ -0,0 +1,100 @@
+import logging
+from functools import partial
+from traceback import format_exception
+
+from django.db import transaction
+from django.tasks.base import TaskContext, TaskError, TaskResult, TaskResultStatus
+from django.tasks.signals import task_enqueued, task_finished, task_started
+from django.utils import timezone
+from django.utils.crypto import get_random_string
+from django.utils.json import normalize_json
+
+from .base import BaseTaskBackend
+
+logger = logging.getLogger(__name__)
+
+
+class ImmediateBackend(BaseTaskBackend):
+ supports_async_task = True
+ supports_priority = True
+
+ def __init__(self, alias, params):
+ super().__init__(alias, params)
+ self.worker_id = get_random_string(32)
+
+ def _execute_task(self, task_result):
+ """
+ Execute the Task for the given TaskResult, mutating it with the
+ outcome.
+ """
+ object.__setattr__(task_result, "enqueued_at", timezone.now())
+ task_enqueued.send(type(self), task_result=task_result)
+
+ task = task_result.task
+ task_start_time = timezone.now()
+ object.__setattr__(task_result, "status", TaskResultStatus.RUNNING)
+ object.__setattr__(task_result, "started_at", task_start_time)
+ object.__setattr__(task_result, "last_attempted_at", task_start_time)
+ task_result.worker_ids.append(self.worker_id)
+ task_started.send(sender=type(self), task_result=task_result)
+
+ try:
+ if task.takes_context:
+ raw_return_value = task.call(
+ TaskContext(task_result=task_result),
+ *task_result.args,
+ **task_result.kwargs,
+ )
+ else:
+ raw_return_value = task.call(*task_result.args, **task_result.kwargs)
+
+ object.__setattr__(
+ task_result,
+ "_return_value",
+ normalize_json(raw_return_value),
+ )
+ except KeyboardInterrupt:
+ # If the user tried to terminate, let them
+ raise
+ except BaseException as e:
+ object.__setattr__(task_result, "finished_at", timezone.now())
+ exception_type = type(e)
+ task_result.errors.append(
+ TaskError(
+ exception_class_path=(
+ f"{exception_type.__module__}.{exception_type.__qualname__}"
+ ),
+ traceback="".join(format_exception(e)),
+ )
+ )
+ object.__setattr__(task_result, "status", TaskResultStatus.FAILED)
+ task_finished.send(type(self), task_result=task_result)
+ else:
+ object.__setattr__(task_result, "finished_at", timezone.now())
+ object.__setattr__(task_result, "status", TaskResultStatus.SUCCESSFUL)
+ task_finished.send(type(self), task_result=task_result)
+
+ def enqueue(self, task, args, kwargs):
+ self.validate_task(task)
+
+ task_result = TaskResult(
+ task=task,
+ id=get_random_string(32),
+ status=TaskResultStatus.READY,
+ enqueued_at=None,
+ started_at=None,
+ last_attempted_at=None,
+ finished_at=None,
+ args=args,
+ kwargs=kwargs,
+ backend=self.alias,
+ errors=[],
+ worker_ids=[],
+ )
+
+ if self._get_enqueue_on_commit_for_task(task) is not False:
+ transaction.on_commit(partial(self._execute_task, task_result))
+ else:
+ self._execute_task(task_result)
+
+ return task_result
diff --git a/django/tasks/base.py b/django/tasks/base.py
new file mode 100644
index 0000000000..905dbef597
--- /dev/null
+++ b/django/tasks/base.py
@@ -0,0 +1,253 @@
+from dataclasses import dataclass, field, replace
+from datetime import datetime
+from inspect import isclass, iscoroutinefunction
+from typing import Any, Callable, Dict, Optional
+
+from asgiref.sync import async_to_sync, sync_to_async
+
+from django.db.models.enums import TextChoices
+from django.utils.json import normalize_json
+from django.utils.module_loading import import_string
+from django.utils.translation import pgettext_lazy
+
+from .exceptions import TaskResultMismatch
+
+DEFAULT_TASK_BACKEND_ALIAS = "default"
+DEFAULT_TASK_PRIORITY = 0
+DEFAULT_TASK_QUEUE_NAME = "default"
+TASK_MAX_PRIORITY = 100
+TASK_MIN_PRIORITY = -100
+TASK_REFRESH_ATTRS = {
+ "errors",
+ "_return_value",
+ "finished_at",
+ "started_at",
+ "last_attempted_at",
+ "status",
+ "enqueued_at",
+ "worker_ids",
+}
+
+
+class TaskResultStatus(TextChoices):
+ # The Task has just been enqueued, or is ready to be executed again.
+ READY = ("READY", pgettext_lazy("Task", "Ready"))
+ # The Task is currently running.
+ RUNNING = ("RUNNING", pgettext_lazy("Task", "Running"))
+ # The Task raised an exception during execution, or was unable to start.
+ FAILED = ("FAILED", pgettext_lazy("Task", "Failed"))
+ # The Task has finished running successfully.
+ SUCCESSFUL = ("SUCCESSFUL", pgettext_lazy("Task", "Successful"))
+
+
+@dataclass(frozen=True, slots=True, kw_only=True)
+class Task:
+ priority: int
+ func: Callable # The Task function.
+ backend: str
+ queue_name: str
+ run_after: Optional[datetime] # The earliest this Task will run.
+
+ # Whether the Task will be enqueued when the current transaction commits,
+ # immediately, or whatever the backend decides.
+ enqueue_on_commit: Optional[bool]
+
+ # Whether the Task receives the Task context when executed.
+ takes_context: bool = False
+
+ def __post_init__(self):
+ self.get_backend().validate_task(self)
+
+ @property
+ def name(self):
+ return self.func.__name__
+
+ def using(
+ self,
+ *,
+ priority=None,
+ queue_name=None,
+ run_after=None,
+ backend=None,
+ ):
+ """Create a new Task with modified defaults."""
+
+ changes = {}
+ if priority is not None:
+ changes["priority"] = priority
+ if queue_name is not None:
+ changes["queue_name"] = queue_name
+ if run_after is not None:
+ changes["run_after"] = run_after
+ if backend is not None:
+ changes["backend"] = backend
+ return replace(self, **changes)
+
+ def enqueue(self, *args, **kwargs):
+ """Queue up the Task to be executed."""
+ return self.get_backend().enqueue(self, args, kwargs)
+
+ async def aenqueue(self, *args, **kwargs):
+ """Queue up the Task to be executed."""
+ return await self.get_backend().aenqueue(self, args, kwargs)
+
+ def get_result(self, result_id):
+ """
+ Retrieve a task result by id.
+
+ Raise TaskResultDoesNotExist if such result does not exist, or raise
+ TaskResultMismatch if the result exists but belongs to another Task.
+ """
+ result = self.get_backend().get_result(result_id)
+ if result.task.func != self.func:
+ raise TaskResultMismatch(
+ f"Task does not match (received {result.task.module_path!r})"
+ )
+ return result
+
+ async def aget_result(self, result_id):
+ """See get_result()."""
+ result = await self.get_backend().aget_result(result_id)
+ if result.task.func != self.func:
+ raise TaskResultMismatch(
+ f"Task does not match (received {result.task.module_path!r})"
+ )
+ return result
+
+ def call(self, *args, **kwargs):
+ if iscoroutinefunction(self.func):
+ return async_to_sync(self.func)(*args, **kwargs)
+ return self.func(*args, **kwargs)
+
+ async def acall(self, *args, **kwargs):
+ if iscoroutinefunction(self.func):
+ return await self.func(*args, **kwargs)
+ return await sync_to_async(self.func)(*args, **kwargs)
+
+ def get_backend(self):
+ from . import task_backends
+
+ return task_backends[self.backend]
+
+ @property
+ def module_path(self):
+ return f"{self.func.__module__}.{self.func.__qualname__}"
+
+
+def task(
+ function=None,
+ *,
+ priority=DEFAULT_TASK_PRIORITY,
+ queue_name=DEFAULT_TASK_QUEUE_NAME,
+ backend=DEFAULT_TASK_BACKEND_ALIAS,
+ enqueue_on_commit=None,
+ takes_context=False,
+):
+ from . import task_backends
+
+ def wrapper(f):
+ return task_backends[backend].task_class(
+ priority=priority,
+ func=f,
+ queue_name=queue_name,
+ backend=backend,
+ enqueue_on_commit=enqueue_on_commit,
+ takes_context=takes_context,
+ run_after=None,
+ )
+
+ if function:
+ return wrapper(function)
+ return wrapper
+
+
+@dataclass(frozen=True, slots=True, kw_only=True)
+class TaskError:
+ exception_class_path: str
+ traceback: str
+
+ @property
+ def exception_class(self):
+ # Lazy resolve the exception class.
+ exception_class = import_string(self.exception_class_path)
+
+ if not isclass(exception_class) or not issubclass(
+ exception_class, BaseException
+ ):
+ raise ValueError(
+ f"{self.exception_class_path!r} does not reference a valid exception."
+ )
+ return exception_class
+
+
+@dataclass(frozen=True, slots=True, kw_only=True)
+class TaskResult:
+ task: Task
+
+ id: str # Unique identifier for the task result.
+ status: TaskResultStatus
+ enqueued_at: Optional[datetime] # Time the task was enqueued.
+ started_at: Optional[datetime] # Time the task was started.
+ finished_at: Optional[datetime] # Time the task was finished.
+
+ # Time the task was last attempted to be run.
+ last_attempted_at: Optional[datetime]
+
+ args: list # Arguments to pass to the task function.
+ kwargs: Dict[str, Any] # Keyword arguments to pass to the task function.
+ backend: str
+ errors: list[TaskError] # Errors raised when running the task.
+ worker_ids: list[str] # Workers which have processed the task.
+
+ _return_value: Optional[Any] = field(init=False, default=None)
+
+ def __post_init__(self):
+ object.__setattr__(self, "args", normalize_json(self.args))
+ object.__setattr__(self, "kwargs", normalize_json(self.kwargs))
+
+ @property
+ def return_value(self):
+ """
+ The return value of the task.
+
+ If the task didn't succeed, an exception is raised.
+ This is to distinguish against the task returning None.
+ """
+ if self.status == TaskResultStatus.SUCCESSFUL:
+ return self._return_value
+ elif self.status == TaskResultStatus.FAILED:
+ raise ValueError("Task failed")
+ else:
+ raise ValueError("Task has not finished yet")
+
+ @property
+ def is_finished(self):
+ return self.status in {TaskResultStatus.FAILED, TaskResultStatus.SUCCESSFUL}
+
+ @property
+ def attempts(self):
+ return len(self.worker_ids)
+
+ def refresh(self):
+ """Reload the cached task data from the task store."""
+ refreshed_task = self.task.get_backend().get_result(self.id)
+
+ for attr in TASK_REFRESH_ATTRS:
+ object.__setattr__(self, attr, getattr(refreshed_task, attr))
+
+ async def arefresh(self):
+ """
+ Reload the cached task data from the task store
+ """
+ refreshed_task = await self.task.get_backend().aget_result(self.id)
+ for attr in TASK_REFRESH_ATTRS:
+ object.__setattr__(self, attr, getattr(refreshed_task, attr))
+
+
+@dataclass(frozen=True, slots=True, kw_only=True)
+class TaskContext:
+ task_result: TaskResult
+
+ @property
+ def attempt(self):
+ return self.task_result.attempts
diff --git a/django/tasks/checks.py b/django/tasks/checks.py
new file mode 100644
index 0000000000..f0b26c5b0e
--- /dev/null
+++ b/django/tasks/checks.py
@@ -0,0 +1,11 @@
+from django.core import checks
+
+
+@checks.register
+def check_tasks(app_configs=None, **kwargs):
+ """Checks all registered Task backends."""
+
+ from . import task_backends
+
+ for backend in task_backends.all():
+ yield from backend.check()
diff --git a/django/tasks/exceptions.py b/django/tasks/exceptions.py
new file mode 100644
index 0000000000..91bd8f4cd5
--- /dev/null
+++ b/django/tasks/exceptions.py
@@ -0,0 +1,21 @@
+from django.core.exceptions import ImproperlyConfigured
+
+
+class TaskException(Exception):
+ """Base class for task-related exceptions. Do not raise directly."""
+
+
+class InvalidTask(TaskException):
+ """The provided Task is invalid."""
+
+
+class InvalidTaskBackend(ImproperlyConfigured):
+ """The provided Task backend is invalid."""
+
+
+class TaskResultDoesNotExist(TaskException):
+ """The requested TaskResult does not exist."""
+
+
+class TaskResultMismatch(TaskException):
+ """The requested TaskResult is invalid."""
diff --git a/django/tasks/signals.py b/django/tasks/signals.py
new file mode 100644
index 0000000000..288fe08e32
--- /dev/null
+++ b/django/tasks/signals.py
@@ -0,0 +1,64 @@
+import logging
+import sys
+
+from asgiref.local import Local
+
+from django.core.signals import setting_changed
+from django.dispatch import Signal, receiver
+
+from .base import TaskResultStatus
+
+logger = logging.getLogger("django.tasks")
+
+task_enqueued = Signal()
+task_finished = Signal()
+task_started = Signal()
+
+
+@receiver(setting_changed)
+def clear_tasks_handlers(*, setting, **kwargs):
+ """Reset the connection handler whenever the settings change."""
+ if setting == "TASKS":
+ from . import task_backends
+
+ task_backends._settings = task_backends.settings = (
+ task_backends.configure_settings(None)
+ )
+ task_backends._connections = Local()
+
+
+@receiver(task_enqueued)
+def log_task_enqueued(sender, task_result, **kwargs):
+ logger.debug(
+ "Task id=%s path=%s enqueued backend=%s",
+ task_result.id,
+ task_result.task.module_path,
+ task_result.backend,
+ )
+
+
+@receiver(task_started)
+def log_task_started(sender, task_result, **kwargs):
+ logger.info(
+ "Task id=%s path=%s state=%s",
+ task_result.id,
+ task_result.task.module_path,
+ task_result.status,
+ )
+
+
+@receiver(task_finished)
+def log_task_finished(sender, task_result, **kwargs):
+ logger.log(
+ (
+ logging.ERROR
+ if task_result.status == TaskResultStatus.FAILED
+ else logging.INFO
+ ),
+ "Task id=%s path=%s state=%s",
+ task_result.id,
+ task_result.task.module_path,
+ task_result.status,
+ # Signal is sent inside exception handlers, so exc_info() is available.
+ exc_info=sys.exc_info(),
+ )
diff --git a/django/utils/inspect.py b/django/utils/inspect.py
index 4e065f0347..31f3cf994b 100644
--- a/django/utils/inspect.py
+++ b/django/utils/inspect.py
@@ -74,3 +74,13 @@ def method_has_no_args(meth):
def func_supports_parameter(func, name):
return any(param.name == name for param in _get_callable_parameters(func))
+
+
+def is_module_level_function(func):
+ if not inspect.isfunction(func) or inspect.isbuiltin(func):
+ return False
+
+ if "<locals>" in func.__qualname__:
+ return False
+
+ return True
diff --git a/django/utils/json.py b/django/utils/json.py
new file mode 100644
index 0000000000..e7389b70ed
--- /dev/null
+++ b/django/utils/json.py
@@ -0,0 +1,19 @@
+from collections.abc import Mapping, Sequence
+
+
+def normalize_json(obj):
+ """Recursively normalize an object into JSON-compatible types."""
+ match obj:
+ case Mapping():
+ return {normalize_json(k): normalize_json(v) for k, v in obj.items()}
+ case bytes():
+ try:
+ return obj.decode("utf-8")
+ except UnicodeDecodeError:
+ raise ValueError(f"Unsupported value: {type(obj)}")
+ case str() | int() | float() | bool() | None:
+ return obj
+ case Sequence(): # str and bytes were already handled.
+ return [normalize_json(v) for v in obj]
+ case _: # Other types can't be serialized to JSON
+ raise TypeError(f"Unsupported type: {type(obj)}")
diff --git a/docs/ref/checks.txt b/docs/ref/checks.txt
index e1ea5bc753..138db8708e 100644
--- a/docs/ref/checks.txt
+++ b/docs/ref/checks.txt
@@ -597,6 +597,14 @@ Signals
a lazy reference to the sender ``<app label>.<model>``, but app
``<app label>`` isn't installed or doesn't provide model ``<model>``.
+Tasks
+-----
+
+* **tasks.E001**: ``ENQUEUE_ON_COMMIT`` cannot be used when no databases are
+ configured.
+* **tasks.E002**: ``ENQUEUE_ON_COMMIT`` cannot be used on a database which
+ doesn't support transactions.
+
Templates
---------
diff --git a/docs/ref/index.txt b/docs/ref/index.txt
index 3741b82aad..af32b131cb 100644
--- a/docs/ref/index.txt
+++ b/docs/ref/index.txt
@@ -26,6 +26,7 @@ API Reference
schema-editor
settings
signals
+ tasks
templates/index
template-response
unicode
diff --git a/docs/ref/settings.txt b/docs/ref/settings.txt
index c16547e72a..54957a726a 100644
--- a/docs/ref/settings.txt
+++ b/docs/ref/settings.txt
@@ -2766,6 +2766,82 @@ backend definition in :setting:`STORAGES`.
Defining this setting overrides the default value and is *not* merged with
it.
+.. setting:: TASKS
+
+``TASKS``
+---------
+
+.. versionadded:: 6.0
+
+Default::
+
+ {
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ }
+ }
+
+A dictionary containing the settings for all Task backends to be used with
+Django. It is a nested dictionary whose contents maps backend aliases to a
+dictionary containing the options for each backend.
+
+The :setting:`TASKS` setting must configure a ``default`` backend; any number
+of additional backends may also be specified. Depending on which backend is
+used, other options may be required. The following options are available as
+standard.
+
+.. setting:: TASKS-BACKEND
+
+``BACKEND``
+~~~~~~~~~~~
+
+Default: ``''`` (Empty string)
+
+The Tasks backend to use. The built-in backends are:
+
+* ``'django.tasks.backends.dummy.DummyBackend'``
+* ``'django.tasks.backends.immediate.ImmediateBackend'``
+
+You can use a backend that doesn't ship with Django by setting
+:setting:`BACKEND <TASKS-BACKEND>` to a fully-qualified path of a backend
+class (i.e. ``mypackage.backends.whatever.WhateverBackend``).
+
+.. setting:: TASKS-ENQUEUE_ON_COMMIT
+
+``ENQUEUE_ON_COMMIT``
+~~~~~~~~~~~~~~~~~~~~~
+
+Default: ``True``
+
+Whether to enqueue a Task only after the current transaction, if any, commits
+successfully, instead of enqueueing immediately.
+
+This can also be configured on a per-Task basis.
+
+See :ref:`Task transactions <task-transactions>` for more information.
+
+.. setting:: TASKS-QUEUES
+
+``QUEUES``
+~~~~~~~~~~
+
+Default: ``["default"]``
+
+Specify the queue names supported by the backend. This can be used to ensure
+Tasks aren't enqueued to queues which do not exist.
+
+To disable queue name validation, set to an empty list (``[]``).
+
+.. setting:: TASKS-OPTIONS
+
+``OPTIONS``
+~~~~~~~~~~~
+
+Default: ``{}``
+
+Extra parameters to pass to the Task backend. Available parameters vary
+depending on the Task backend.
+
.. setting:: TEMPLATES
``TEMPLATES``
diff --git a/docs/ref/signals.txt b/docs/ref/signals.txt
index 6dc5122f96..82b92e12c2 100644
--- a/docs/ref/signals.txt
+++ b/docs/ref/signals.txt
@@ -703,3 +703,60 @@ Arguments sent with this signal:
The database connection that was opened. This can be used in a
multiple-database configuration to differentiate connection signals
from different databases.
+
+Tasks signals
+=============
+
+.. versionadded:: 6.0
+
+Signals sent by the :doc:`tasks </ref/tasks>` framework.
+
+``task_enqueued``
+-----------------
+
+.. data:: django.tasks.signals.task_enqueued
+ :module:
+
+Sent once a Task has been enqueued. If
+:attr:`django.tasks.Task.enqueue_on_commit` is set, the signal is only sent
+once the transaction commits successfully.
+
+Arguments sent with this signal:
+
+``sender``
+ The backend class which the Task was enqueued on to.
+
+``task_result``
+ The enqueued :class:`TaskResult <django.tasks.TaskResult>`.
+
+``task_started``
+----------------
+
+.. data:: django.tasks.signals.task_started
+ :module:
+
+Sent when a Task has started executing.
+
+Arguments sent with this signal:
+
+``sender``
+ The backend class which the Task was enqueued on to.
+
+``task_result``
+ The started :class:`TaskResult <django.tasks.TaskResult>`.
+
+``task_finished``
+-----------------
+
+.. data:: django.tasks.signals.task_finished
+ :module:
+
+Sent once a Task has finished executing, successfully or otherwise.
+
+Arguments sent with this signal:
+
+``sender``
+ The backend class which the Task was enqueued on to.
+
+``task_result``
+ The finished :class:`TaskResult <django.tasks.TaskResult>`.
diff --git a/docs/ref/tasks.txt b/docs/ref/tasks.txt
new file mode 100644
index 0000000000..3134243d40
--- /dev/null
+++ b/docs/ref/tasks.txt
@@ -0,0 +1,444 @@
+=====
+Tasks
+=====
+
+.. versionadded:: 6.0
+
+.. module:: django.tasks
+ :synopsis: Django's built-in background Task system.
+
+Task definition
+===============
+
+The ``task`` decorator
+----------------------
+
+.. function:: task(*, priority=0, queue_name="default", backend="default", enqueue_on_commit=None, takes_context=False)
+
+ The ``@task`` decorator defines a :class:`Task` instance. This has the
+ following optional arguments:
+
+ * ``priority``: Sets the :attr:`~Task.priority` of the ``Task``. Defaults
+ to 0.
+ * ``queue_name``: Sets the :attr:`~Task.queue_name` of the ``Task``.
+ Defaults to ``"default"``.
+ * ``backend``: Sets the :attr:`~Task.backend` of the ``Task``. Defaults to
+ ``"default"``.
+ * ``enqueue_on_commit``: Sets :attr:`~Task.enqueue_on_commit` for the
+ ``Task``. Defaults to ``None``.
+ * ``takes_context``: Controls whether the ``Task`` function accepts a
+ :class:`TaskContext`. Defaults to ``False``. See :ref:`Task context
+ <task-context>` for details.
+
+ If the defined ``Task`` is not valid according to the backend,
+ :exc:`~django.tasks.exceptions.InvalidTask` is raised.
+
+ See :ref:`defining tasks <defining-tasks>` for usage examples.
+
+``Task``
+--------
+
+.. class:: Task
+
+ Represents a Task to be run in the background. Tasks should be defined
+ using the :func:`task` decorator.
+
+ Attributes of ``Task`` cannot be modified. See :ref:`modifying Tasks
+ <modifying-tasks>` for details.
+
+ .. attribute:: Task.priority
+
+ The priority of the ``Task``. Priorities must be between -100 and 100,
+ where larger numbers are higher priority, and will be run sooner.
+
+ The backend must have :attr:`.supports_priority` set to ``True`` to use
+ this feature.
+
+ .. attribute:: Task.backend
+
+ The alias of the backend the ``Task`` should be enqueued to. This must
+ match a backend defined in :setting:`BACKEND <TASKS-BACKEND>`.
+
+ .. attribute:: Task.queue_name
+
+ The name of the queue the ``Task`` will be enqueued on to. Defaults to
+ ``"default"``. This must match a queue defined in
+ :setting:`QUEUES <TASKS-QUEUES>`, unless
+ :setting:`QUEUES <TASKS-QUEUES>` is set to ``[]``.
+
+ .. attribute:: Task.run_after
+
+ The earliest time the ``Task`` will be executed. This can be a
+ :class:`timedelta <datetime.timedelta>`, which is used relative to the
+ current time, a timezone-aware :class:`datetime <datetime.datetime>`,
+ or ``None`` if not constrained. Defaults to ``None``.
+
+ The backend must have :attr:`.supports_defer` set to ``True`` to use
+ this feature. Otherwise,
+ :exc:`~django.tasks.exceptions.InvalidTask` is raised.
+
+ .. attribute:: Task.enqueue_on_commit
+
+ Whether the ``Task`` should be enqueued when the transaction commits
+ successfully, or immediately. Defaults to :setting:`ENQUEUE_ON_COMMIT
+ <TASKS-ENQUEUE_ON_COMMIT>` for the backend.
+
+ See :ref:`Task transactions <task-transactions>` for more information.
+
+ .. attribute:: Task.name
+
+ The name of the function decorated with :func:`task`. This name is not
+ necessarily unique.
+
+ .. method:: Task.using(*, priority=None, backend=None, queue_name=None, run_after=None)
+
+ Creates a new ``Task`` with modified defaults. The existing ``Task`` is
+ left unchanged.
+
+ ``using`` allows modifying the following attributes:
+
+ * :attr:`priority <Task.priority>`
+ * :attr:`backend <Task.backend>`
+ * :attr:`queue_name <Task.queue_name>`
+ * :attr:`run_after <Task.run_after>`
+
+ See :ref:`modifying Tasks <modifying-tasks>` for usage examples.
+
+ .. method:: Task.enqueue(*args, **kwargs)
+
+ Enqueues the ``Task`` to the ``Task`` backend for later execution.
+
+ Arguments are passed to the ``Task``'s function after a round-trip
+ through a :func:`json.dumps`/:func:`json.loads` cycle. Hence, all
+ arguments must be JSON-serializable and preserve their type after the
+ round-trip.
+
+ If the ``Task`` is not valid according to the backend,
+ :exc:`~django.tasks.exceptions.InvalidTask` is raised.
+
+ See :ref:`enqueueing Tasks <enqueueing-tasks>` for usage examples.
+
+ .. method:: Task.aenqueue(*args, **kwargs)
+
+ The ``async`` variant of :meth:`enqueue <Task.enqueue>`.
+
+ .. method:: Task.get_result(result_id)
+
+ Retrieves a result by its id.
+
+ If the result does not exist, :exc:`TaskResultDoesNotExist
+ <django.tasks.exceptions.TaskResultDoesNotExist>` is raised. If the
+ result is not the same type as the current Task,
+ :exc:`TaskResultMismatch <django.tasks.exceptions.TaskResultMismatch>`
+ is raised. If the backend does not support ``get_result()``,
+ :exc:`NotImplementedError` is raised.
+
+ .. method:: Task.aget_result(*args, **kwargs)
+
+ The ``async`` variant of :meth:`get_result <Task.get_result>`.
+
+Task context
+============
+
+.. class:: TaskContext
+
+ Contains context for the running :class:`Task`. Context only passed to a
+ ``Task`` if it was defined with ``takes_context=True``.
+
+ Attributes of ``TaskContext`` cannot be modified.
+
+ .. attribute:: TaskContext.task_result
+
+ The :class:`TaskResult` currently being run.
+
+ .. attribute:: TaskContext.attempt
+
+ The number of the current execution attempts for this Task, starting at
+ 1.
+
+Task results
+============
+
+.. class:: TaskResultStatus
+
+ An Enum representing the status of a :class:`TaskResult`.
+
+ .. attribute:: TaskResultStatus.READY
+
+ The :class:`Task` has just been enqueued, or is ready to be executed
+ again.
+
+ .. attribute:: TaskResultStatus.RUNNING
+
+ The :class:`Task` is currently being executed.
+
+ .. attribute:: TaskResultStatus.FAILED
+
+ The :class:`Task` raised an exception during execution, or was unable
+ to start.
+
+ .. attribute:: TaskResultStatus.SUCCESSFUL
+
+ The :class:`Task` has finished executing successfully.
+
+.. class:: TaskResult
+
+ The ``TaskResult`` stores the information about a specific execution of a
+ :class:`Task`.
+
+ Attributes of ``TaskResult`` cannot be modified.
+
+ .. attribute:: TaskResult.task
+
+ The :class:`Task` the result was enqueued for.
+
+ .. attribute:: TaskResult.id
+
+ A unique identifier for the result, which can be passed to
+ :meth:`Task.get_result`.
+
+ The format of the id will depend on the backend being used. Task result
+ ids are always strings less than 64 characters.
+
+ See :ref:`Task results <task-results>` for more details.
+
+ .. attribute:: TaskResult.status
+
+ The :class:`status <TaskResultStatus>` of the result.
+
+ .. attribute:: TaskResult.enqueued_at
+
+ The time when the ``Task`` was enqueued.
+
+ If :attr:`Task.enqueue_on_commit` was set, this is the time the
+ transaction committed.
+
+ .. attribute:: TaskResult.started_at
+
+ The time when the ``Task`` began execution, on its first attempt.
+
+ .. attribute:: TaskResult.last_attempted_at
+
+ The time when the most recent ``Task`` run began execution.
+
+ .. attribute:: TaskResult.finished_at
+
+ The time when the ``Task`` finished execution, whether it failed or
+ succeeded.
+
+ .. attribute:: TaskResult.backend
+
+ The backend the result is from.
+
+ .. attribute:: TaskResult.errors
+
+ A list of :class:`TaskError` instances for the errors raised as part of
+ each execution of the Task.
+
+ .. attribute:: TaskResult.return_value
+
+ The return value from the ``Task`` function.
+
+ If the ``Task`` did not finish successfully, :exc:`ValueError` is
+ raised.
+
+ See :ref:`return values <task-return-values>` for usage examples.
+
+ .. method:: TaskResult.refresh
+
+ Refresh the result's attributes from the queue store.
+
+ .. method:: TaskResult.arefresh
+
+ The ``async`` variant of :meth:`TaskResult.refresh`.
+
+ .. attribute:: TaskResult.is_finished
+
+ Whether the ``Task`` has finished (successfully or not).
+
+ .. attribute:: TaskResult.attempts
+
+ The number of times the Task has been run.
+
+ If the task is currently running, it does not count as an attempt.
+
+ .. attribute:: TaskResult.worker_ids
+
+ The ids of the workers which have executed the Task.
+
+
+Task errors
+-----------
+
+.. class:: TaskError
+
+ Contains information about the error raised during the execution of a
+ ``Task``.
+
+ .. attribute:: TaskError.traceback
+
+ The traceback (as a string) from the raised exception when the ``Task``
+ failed.
+
+ .. attribute:: TaskError.exception_class
+
+ The exception class raised when executing the ``Task``.
+
+Backends
+========
+
+Base backend
+------------
+
+.. module:: django.tasks.backends.base
+
+.. class:: BaseTaskBackend
+
+ ``BaseTaskBackend`` is the parent class for all Task backends.
+
+ .. attribute:: BaseTaskBackend.options
+
+ A dictionary of extra parameters for the Task backend. These are
+ provided using the :setting:`OPTIONS <TASKS-OPTIONS>` setting.
+
+ .. method:: BaseTaskBackend.enqueue(task, args, kwargs)
+
+ Task backends which subclass ``BaseTaskBackend`` should implement this
+ method as a minimum.
+
+ When implemented, ``enqueue()`` enqueues the ``task``, a :class:`.Task`
+ instance, for later execution. ``args`` are the positional arguments
+ and ``kwargs`` are the keyword arguments to be passed to the ``task``.
+ Returns a :class:`~django.tasks.TaskResult`.
+
+ .. method:: BaseTaskBackend.aenqueue(task, args, kwargs)
+
+ The ``async`` variant of :meth:`BaseTaskBackend.enqueue`.
+
+ .. method:: BaseTaskBackend.get_result(result_id)
+
+ Retrieve a result by its id. If the result does not exist,
+ :exc:`TaskResultDoesNotExist
+ <django.tasks.exceptions.TaskResultDoesNotExist>` is raised.
+
+ If the backend does not support ``get_result()``,
+ :exc:`NotImplementedError` is raised.
+
+ .. method:: BaseTaskBackend.aget_result(result_id)
+
+ The ``async`` variant of :meth:`BaseTaskBackend.get_result`.
+
+ .. method:: BaseTaskBackend.validate_task(task)
+
+ Validates whether the provided ``Task`` is able to be enqueued using
+ the backend. If the Task is not valid,
+ :exc:`InvalidTask <django.tasks.exceptions.InvalidTask>`
+ is raised.
+
+Feature flags
+~~~~~~~~~~~~~
+
+Some backends may not support all features Django provides. It's possible to
+identify the supported functionality of a backend, and potentially change
+behavior accordingly.
+
+.. attribute:: BaseTaskBackend.supports_defer
+
+ Whether the backend supports enqueueing Tasks to be executed after a
+ specific time using the :attr:`~django.tasks.Task.run_after` attribute.
+
+.. attribute:: BaseTaskBackend.supports_async_task
+
+ Whether the backend supports enqueueing async functions (coroutines).
+
+.. attribute:: BaseTaskBackend.supports_get_result
+
+ Whether the backend supports retrieving ``Task`` results from another
+ thread after they have been enqueued.
+
+.. attribute:: BaseTaskBackend.supports_priority
+
+ Whether the backend supports executing Tasks as ordered by their
+ :attr:`~django.tasks.Task.priority`.
+
+The below table notes which of the :ref:`built-in backends
+<task-available-backends>` support which features:
+
+============================ ======================= ===========================
+Feature :class:`.DummyBackend` :class:`.ImmediateBackend`
+============================ ======================= ===========================
+:attr:`.supports_defer` Yes No
+:attr:`.supports_async_task` Yes Yes
+:attr:`.supports_get_result` No No [#fnimmediateresult]_
+:attr:`.supports_priority` Yes [#fndummypriority]_ Yes [#fnimmediatepriority]_
+============================ ======================= ===========================
+
+.. _task-available-backends:
+
+Available backends
+------------------
+
+Immediate backend
+~~~~~~~~~~~~~~~~~
+
+.. module:: django.tasks.backends.immediate
+
+.. class:: ImmediateBackend
+
+ The :ref:`immediate backend <immediate-task-backend>` executes Tasks
+ immediately, rather than in the background.
+
+Dummy backend
+~~~~~~~~~~~~~
+
+.. module:: django.tasks.backends.dummy
+
+.. class:: DummyBackend
+
+ The :ref:`dummy backend <dummy-task-backend>` does not execute enqueued
+ Tasks. Instead, it stores task results for later inspection.
+
+ .. attribute:: DummyBackend.results
+
+ A list of results for the enqueued Tasks, in the order they were
+ enqueued.
+
+ .. method:: DummyBackend.clear
+
+ Clears the list of stored results.
+
+Exceptions
+==========
+
+.. module:: django.tasks.exceptions
+
+.. exception:: InvalidTask
+
+ Raised when the :class:`.Task` attempting to be enqueued
+ is invalid.
+
+.. exception:: InvalidTaskBackend
+
+ Raised when the requested :class:`.BaseTaskBackend` is invalid.
+
+.. exception:: TaskResultDoesNotExist
+
+ Raised by :meth:`~django.tasks.backends.base.BaseTaskBackend.get_result`
+ when the provided ``result_id`` does not exist.
+
+.. exception:: TaskResultMismatch
+
+ Raised by :meth:`~django.tasks.Task.get_result` when the provided
+ ``result_id`` is for a different Task than the current Task.
+
+.. rubric:: Footnotes
+.. [#fnimmediateresult] The :class:`.ImmediateBackend` doesn't officially
+ support ``get_result()``, despite implementing the API, since the result
+ cannot be retrieved from a different thread.
+.. [#fndummypriority] The :class:`.DummyBackend` has ``supports_priority=True``
+ so that it can be used as a drop-in replacement in tests. Since this
+ backend never executes Tasks, the ``priority`` value has no effect.
+.. [#fnimmediatepriority] The :class:`.ImmediateBackend` has
+ ``supports_priority=True`` so that it can be used as a drop-in replacement
+ in tests. Because Tasks run as soon as they are scheduled, the ``priority``
+ value has no effect.
diff --git a/docs/releases/6.0.txt b/docs/releases/6.0.txt
index fba0935a2b..8f0bf321a5 100644
--- a/docs/releases/6.0.txt
+++ b/docs/releases/6.0.txt
@@ -112,6 +112,45 @@ A `migration guide`_ is available if you're updating from the
.. _migration guide: https://github.com/carltongibson/django-template-partials/blob/main/Migration.md
+Background Tasks
+----------------
+
+Django now includes a built-in Tasks framework for running code outside the
+HTTP request–response cycle. This enables offloading work, such as sending
+emails or processing data, to background workers.
+
+Tasks are defined using the :func:`~django.tasks.task` decorator::
+
+ from django.core.mail import send_mail
+ from django.tasks import task
+
+
+ @task
+ def email_users(emails, subject, message):
+ return send_mail(subject, message, None, emails)
+
+Once defined, tasks can be enqueued through a configured backend::
+
+ email_users.enqueue(
+ emails=["user@example.com"],
+ subject="You have a message",
+ message="Hello there!",
+ )
+
+Backends are configured via the :setting:`TASKS` setting. Django provides
+two built-in backends, primarily for development and testing:
+
+* :class:`~django.tasks.backends.immediate.ImmediateBackend`: executes tasks
+ immediately in the same process.
+* :class:`~django.tasks.backends.dummy.DummyBackend`: stores tasks without
+ running them, leaving results in the
+ :attr:`~django.tasks.TaskResultStatus.READY` state.
+
+Django only handles task creation and queuing; it does not provide a worker
+mechanism to run tasks. Execution must be managed by external infrastructure,
+such as a separate process or service. See :doc:`/topics/tasks` for an
+overview, and the :doc:`Tasks reference </ref/tasks>` for API details.
+
Minor features
--------------
diff --git a/docs/spelling_wordlist b/docs/spelling_wordlist
index 864b99f84a..2898f85d5b 100644
--- a/docs/spelling_wordlist
+++ b/docs/spelling_wordlist
@@ -152,6 +152,7 @@ editability
encodings
Endian
Enero
+enqueueing
enum
environ
esque
diff --git a/docs/topics/index.txt b/docs/topics/index.txt
index 4f837c81e2..59484d9799 100644
--- a/docs/topics/index.txt
+++ b/docs/topics/index.txt
@@ -33,3 +33,4 @@ Introductions to all the key parts of Django you'll need to know:
checks
external-packages
async
+ tasks
diff --git a/docs/topics/tasks.txt b/docs/topics/tasks.txt
new file mode 100644
index 0000000000..17c233d595
--- /dev/null
+++ b/docs/topics/tasks.txt
@@ -0,0 +1,438 @@
+========================
+Django's Tasks framework
+========================
+
+.. versionadded:: 6.0
+
+For a web application, there's often more than just turning HTTP requests into
+HTTP responses. For some functionality, it may be beneficial to run code
+outside the request-response cycle.
+
+That's where background Tasks come in.
+
+Background Tasks can offload work to be run outside the request-response cycle,
+to be run elsewhere, potentially at a later date. This keeps requests fast,
+reduces latency, and improves the user experience. For example, a user
+shouldn't have to wait for an email to send before their page finishes loading.
+
+Django's new Tasks framework makes it easy to define and enqueue such work. It
+does not provide a worker mechanism to run Tasks. The actual execution must be
+handled by infrastructure outside Django, such as a separate process or
+service.
+
+Background Task fundamentals
+============================
+
+When work needs to be done in the background, Django creates a ``Task``, which
+is stored in the Queue Store. This ``Task`` contains all the metadata needed to
+execute it, as well as a unique identifier for Django to retrieve the result
+later.
+
+A Worker will look at the Queue Store for new Tasks to run. When a new Task is
+added, a Worker claims the Task, executes it, and saves the status and result
+back to the Queue Store. These workers run outside the request-response
+lifecycle.
+
+.. _configuring-a-task-backend:
+
+Configuring a Task backend
+==========================
+
+The Task backend determines how and where Tasks are stored for execution and
+how they are executed. Different Task backends have different characteristics
+and configuration options, which may impact the performance and reliability of
+your application. Django comes with a number of :ref:`built-in backends
+<task-available-backends>`. Django does not provide a generic way to execute
+Tasks, only enqueue them.
+
+Task backends are configured using the :setting:`TASKS` setting in your
+settings file. Whilst most applications will only need a single backend,
+multiple are supported.
+
+.. _immediate-task-backend:
+
+Immediate execution
+-------------------
+
+This is the default backend if another is not specified in your settings file.
+The :class:`.ImmediateBackend` runs enqueued Tasks immediately, rather than in
+the background. This allows background Task functionality to be slowly added to
+an application, before the required infrastructure is available.
+
+To use it, set :setting:`BACKEND <TASKS-BACKEND>` to
+``"django.tasks.backends.immediate.ImmediateBackend"``::
+
+ TASKS = {"default": {"BACKEND": "django.tasks.backends.immediate.ImmediateBackend"}}
+
+The :class:`.ImmediateBackend` may also be useful in tests, to bypass the need
+to run a real background worker in your tests.
+
+.. admonition:: ``ImmediateBackend`` and ``ENQUEUE_ON_COMMIT``
+
+ When :setting:`ENQUEUE_ON_COMMIT <TASKS-ENQUEUE_ON_COMMIT>` is ``False``,
+ the Task will be executed within the same transaction it was enqueued in.
+
+ See :ref:`Task transactions <task-transactions>` for more information.
+
+.. _dummy-task-backend:
+
+Dummy backend
+-------------
+
+The :class:`.DummyBackend` doesn't execute enqueued Tasks at all, instead
+storing results for later use. Task results will forever remain in the
+:attr:`~django.tasks.TaskResultStatus.READY` state.
+
+This backend is not intended for use in production - it is provided as a
+convenience that can be used during development and testing.
+
+To use it, set :setting:`BACKEND <TASKS-BACKEND>` to
+``"django.tasks.backends.dummy.DummyBackend"``::
+
+ TASKS = {"default": {"BACKEND": "django.tasks.backends.dummy.DummyBackend"}}
+
+The results for enqueued Tasks can be retrieved from the backend's
+:attr:`~django.tasks.backends.dummy.DummyBackend.results` attribute:
+
+.. code-block:: pycon
+
+ >>> from django.tasks import default_task_backend
+ >>> my_task.enqueue()
+ >>> len(default_task_backend.results)
+ 1
+
+Stored results can be cleared using the
+:meth:`~django.tasks.backends.dummy.DummyBackend.clear` method:
+
+.. code-block:: pycon
+
+ >>> default_task_backend.clear()
+ >>> len(default_task_backend.results)
+ 0
+
+Using a custom backend
+----------------------
+
+While Django includes support for a number of Task backends out-of-the-box,
+sometimes you might want to customize the Task backend. To use an external Task
+backend with Django, use the Python import path as the :setting:`BACKEND
+<TASKS-BACKEND>` of the :setting:`TASKS` setting, like so::
+
+ TASKS = {
+ "default": {
+ "BACKEND": "path.to.backend",
+ }
+ }
+
+A Task backend is a class that inherits
+:class:`~django.tasks.backends.base.BaseTaskBackend`. At a minimum, it must
+implement :meth:`.BaseTaskBackend.enqueue`. If you're building your own
+backend, you can use the built-in Task backends as reference implementations.
+You'll find the code in the :source:`django/tasks/backends/` directory of the
+Django source.
+
+Asynchronous support
+--------------------
+
+Django has developing support for asynchronous Task backends.
+
+:class:`~django.tasks.backends.base.BaseTaskBackend` has async variants of all
+base methods. By convention, the asynchronous versions of all methods are
+prefixed with ``a``. The arguments for both variants are the same.
+
+Retrieving backends
+-------------------
+
+Backends can be retrieved using the ``task_backends`` connection handler::
+
+ from django.tasks import task_backends
+
+ task_backends["default"] # The default backend
+ task_backends["reserve"] # Another backend
+
+The "default" backend is available as ``default_task_backend``::
+
+ from django.tasks import default_task_backend
+
+.. _defining-tasks:
+
+Defining Tasks
+==============
+
+Tasks are defined using the :meth:`django.tasks.task` decorator on a
+module-level function::
+
+ from django.core.mail import send_mail
+ from django.tasks import task
+
+
+ @task
+ def email_users(emails, subject, message):
+ return send_mail(
+ subject=subject, message=message, from_email=None, recipient_list=emails
+ )
+
+
+The return value of the decorator is a :class:`~django.tasks.Task` instance.
+
+:class:`~django.tasks.Task` attributes can be customized via the ``@task``
+decorator arguments::
+
+ from django.core.mail import send_mail
+ from django.tasks import task
+
+
+ @task(priority=2, queue_name="emails", enqueue_on_commit=True)
+ def email_users(emails, subject, message):
+ return send_mail(
+ subject=subject, message=message, from_email=None, recipient_list=emails
+ )
+
+By convention, Tasks are defined in a ``tasks.py`` file, however this is not
+enforced.
+
+.. _task-context:
+
+Task context
+------------
+
+Sometimes, the running ``Task`` may need to know context about how it was
+enqueued, and how it is being executed. This can be accessed by taking a
+``context`` argument, which is an instance of
+:class:`~django.tasks.TaskContext`.
+
+To receive the Task context as an argument to your Task function, pass
+``takes_context`` when defining it::
+
+ import logging
+ from django.core.mail import send_mail
+ from django.tasks import task
+
+
+ logger = logging.getLogger(__name__)
+
+
+ @task(takes_context=True)
+ def email_users(context, emails, subject, message):
+ logger.debug(
+ f"Attempt {context.attempt} to send user email. Task result id: {context.task_result.id}."
+ )
+ return send_mail(
+ subject=subject, message=message, from_email=None, recipient_list=emails
+ )
+
+.. _modifying-tasks:
+
+Modifying Tasks
+---------------
+
+Before enqueueing Tasks, it may be necessary to modify certain parameters of
+the Task. For example, to give it a higher priority than it would normally.
+
+A ``Task`` instance cannot be modified directly. Instead, a modified instance
+can be created with the :meth:`~django.tasks.Task.using` method, leaving the
+original as-is. For example:
+
+.. code-block:: pycon
+
+ >>> email_users.priority
+ 0
+ >>> email_users.using(priority=10).priority
+ 10
+
+.. _enqueueing-tasks:
+
+Enqueueing Tasks
+================
+
+To add the Task to the queue store, so it will be executed, call the
+:meth:`~django.tasks.Task.enqueue` method on it. If the Task takes arguments,
+these can be passed as-is. For example::
+
+ result = email_users.enqueue(
+ emails=["user@example.com"],
+ subject="You have a message",
+ message="Hello there!",
+ )
+
+This returns a :class:`~django.tasks.TaskResult`, which can be used to retrieve
+the result of the Task once it has finished executing.
+
+To enqueue Tasks in an ``async`` context, :meth:`~django.tasks.Task.aenqueue`
+is available as an ``async`` variant of :meth:`~django.tasks.Task.enqueue`.
+
+Because both Task arguments and return values are serialized to JSON, they must
+be JSON-serializable:
+
+.. code-block:: pycon
+
+ >>> process_data.enqueue(datetime.now())
+ Traceback (most recent call last):
+ ...
+ TypeError: Object of type datetime is not JSON serializable
+
+Arguments must also be able to round-trip through a :func:`json.dumps`/
+:func:`json.loads` cycle without changing type. For example, consider this
+Task::
+
+ @task()
+ def double_dictionary(key):
+ return {key: key * 2}
+
+With the ``ImmediateBackend`` configured as the default backend:
+
+.. code-block:: pycon
+
+ >>> result = double_dictionary.enqueue((1, 2, 3))
+ >>> result.status
+ FAILED
+ >>> result.errors[0].traceback
+ Traceback (most recent call last):
+ ...
+ TypeError: unhashable type: 'list'
+
+The ``double_dictionary`` Task fails because after the JSON round-trip the
+tuple ``(1, 2, 3)`` becomes the list ``[1, 2, 3]``, which cannot be used as a
+dictionary key.
+
+In general, complex objects such as model instances, or built-in types like
+``datetime`` and ``tuple`` cannot be used in Tasks without additional
+conversion.
+
+.. _task-transactions:
+
+Transactions
+------------
+
+By default, Tasks are enqueued after the current database transaction (if there
+is one) commits successfully (using :meth:`transaction.on_commit()
+<django.db.transaction.on_commit>`), rather than enqueueing immediately. For
+most backends, Tasks are run in a separate process, using a different database
+connection. Without waiting for the transaction to commit, workers could start
+to process a Task which uses objects it can't access yet.
+
+This behavior can be changed by changing the :setting:`TASKS-ENQUEUE_ON_COMMIT`
+setting for the backend, or for a specific Task using the ``enqueue_on_commit``
+parameter.
+
+For example, consider this simplified example::
+
+ @task
+ def my_task():
+ Thing.objects.get()
+
+
+ with transaction.atomic():
+ Thing.objects.create()
+ my_task.enqueue()
+
+
+If :setting:`ENQUEUE_ON_COMMIT <TASKS-ENQUEUE_ON_COMMIT>` is ``False``, then it
+is possible for ``my_task`` to run before the ``Thing`` is committed to the
+database, and the Task won't be able to see the created object within your
+transaction.
+
+.. _task-results:
+
+Task results
+============
+
+When enqueueing a ``Task``, you receive a :class:`~django.tasks.TaskResult`,
+however it's likely useful to retrieve the result from somewhere else (for
+example another request or another Task).
+
+Each ``TaskResult`` has a unique :attr:`~django.tasks.TaskResult.id`, which can
+be used to identify and retrieve the result once the code which enqueued the
+Task has finished.
+
+The :meth:`~django.tasks.Task.get_result` method can retrieve a result based on
+its ``id``::
+
+ # Later, somewhere else...
+ result = email_users.get_result(result_id)
+
+To retrieve a ``TaskResult``, regardless of which kind of ``Task`` it was from,
+use the :meth:`~django.tasks.Task.get_result` method on the backend::
+
+ from django.tasks import default_task_backend
+
+ result = default_task_backend.get_result(result_id)
+
+To retrieve results in an ``async`` context,
+:meth:`~django.tasks.Task.aget_result` is available as an ``async`` variant of
+:meth:`~django.tasks.Task.get_result` on both the backend and ``Task``.
+
+Some backends, such as the built-in ``ImmediateBackend`` do not support
+``get_result()``. Calling ``get_result()`` on these backends will
+raise :exc:`NotImplementedError`.
+
+Updating results
+----------------
+
+A ``TaskResult`` contains the status of a Task's execution at the point it was
+retrieved. If the Task finishes after :meth:`~django.tasks.Task.get_result` is
+called, it will not update.
+
+To refresh the values, call the :meth:`django.tasks.TaskResult.refresh`
+method:
+
+.. code-block:: pycon
+
+ >>> result.status
+ RUNNING
+ >>> result.refresh() # or await result.arefresh()
+ >>> result.status
+ SUCCESSFUL
+
+.. _task-return-values:
+
+Return values
+-------------
+
+If your Task function returns something, it can be retrieved from the
+:attr:`django.tasks.TaskResult.return_value` attribute:
+
+.. code-block:: pycon
+
+ >>> result.status
+ SUCCESSFUL
+ >>> result.return_value
+ 42
+
+If the Task has not finished executing, or has failed, :exc:`ValueError` is
+raised.
+
+.. code-block:: pycon
+
+ >>> result.status
+ RUNNING
+ >>> result.return_value
+ Traceback (most recent call last):
+ ...
+ ValueError: Task has not finished yet
+
+Errors
+------
+
+If the Task doesn't succeed, and instead raises an exception, either as part of
+the Task or as part of running it, the exception and traceback are saved to the
+:attr:`django.tasks.TaskResult.errors` list.
+
+Each entry in ``errors`` is a :class:`~django.tasks.TaskError` containing
+information about error raised during the execution:
+
+.. code-block:: pycon
+
+ >>> result.errors[0].exception_class
+ <class 'ValueError'>
+
+Note that this is just the type of exception, and contains no other values. The
+traceback information is reduced to a string which you can use to help
+debugging:
+
+.. code-block:: pycon
+
+ >>> result.errors[0].traceback
+ Traceback (most recent call last):
+ ...
+ TypeError: Object of type datetime is not JSON serializable
diff --git a/tests/tasks/__init__.py b/tests/tasks/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/tasks/__init__.py
diff --git a/tests/tasks/tasks.py b/tests/tasks/tasks.py
new file mode 100644
index 0000000000..3959660ab9
--- /dev/null
+++ b/tests/tasks/tasks.py
@@ -0,0 +1,88 @@
+import time
+
+from django.tasks import TaskContext, task
+
+
+@task()
+def noop_task(*args, **kwargs):
+ return None
+
+
+@task
+def noop_task_from_bare_decorator(*args, **kwargs):
+ return None
+
+
+@task()
+async def noop_task_async(*args, **kwargs):
+ return None
+
+
+@task()
+def calculate_meaning_of_life():
+ return 42
+
+
+@task()
+def failing_task_value_error():
+ raise ValueError("This Task failed due to ValueError")
+
+
+@task()
+def failing_task_system_exit():
+ raise SystemExit("This Task failed due to SystemExit")
+
+
+@task()
+def failing_task_keyboard_interrupt():
+ raise KeyboardInterrupt("This Task failed due to KeyboardInterrupt")
+
+
+@task()
+def complex_exception():
+ raise ValueError(ValueError("This task failed"))
+
+
+@task()
+def complex_return_value():
+ # Return something which isn't JSON serializable nor picklable.
+ return lambda: True
+
+
+@task()
+def exit_task():
+ exit(1)
+
+
+@task(enqueue_on_commit=True)
+def enqueue_on_commit_task():
+ pass
+
+
+@task(enqueue_on_commit=False)
+def never_enqueue_on_commit_task():
+ pass
+
+
+@task()
+def hang():
+ """
+ Do nothing for 5 minutes
+ """
+ time.sleep(300)
+
+
+@task()
+def sleep_for(seconds):
+ time.sleep(seconds)
+
+
+@task(takes_context=True)
+def get_task_id(context):
+ return context.task_result.id
+
+
+@task(takes_context=True)
+def test_context(context, attempt):
+ assert isinstance(context, TaskContext)
+ assert context.attempt == attempt
diff --git a/tests/tasks/test_custom_backend.py b/tests/tasks/test_custom_backend.py
new file mode 100644
index 0000000000..83a302a183
--- /dev/null
+++ b/tests/tasks/test_custom_backend.py
@@ -0,0 +1,71 @@
+import logging
+from unittest import mock
+
+from django.tasks import default_task_backend, task_backends
+from django.tasks.backends.base import BaseTaskBackend
+from django.tasks.exceptions import InvalidTask
+from django.test import SimpleTestCase, override_settings
+
+from . import tasks as test_tasks
+
+
+class CustomBackend(BaseTaskBackend):
+ def __init__(self, alias, params):
+ super().__init__(alias, params)
+ self.prefix = self.options.get("prefix", "")
+
+ def enqueue(self, *args, **kwargs):
+ logger = logging.getLogger(__name__)
+ logger.info(f"{self.prefix}Task enqueued.")
+
+
+class CustomBackendNoEnqueue(BaseTaskBackend):
+ pass
+
+
+@override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": f"{CustomBackend.__module__}.{CustomBackend.__qualname__}",
+ "ENQUEUE_ON_COMMIT": False,
+ "OPTIONS": {"prefix": "PREFIX: "},
+ },
+ "no_enqueue": {
+ "BACKEND": f"{CustomBackendNoEnqueue.__module__}."
+ f"{CustomBackendNoEnqueue.__qualname__}",
+ },
+ }
+)
+class CustomBackendTestCase(SimpleTestCase):
+ def test_using_correct_backend(self):
+ self.assertEqual(default_task_backend, task_backends["default"])
+ self.assertIsInstance(task_backends["default"], CustomBackend)
+ self.assertEqual(default_task_backend.alias, "default")
+ self.assertEqual(default_task_backend.options, {"prefix": "PREFIX: "})
+
+ @mock.patch.multiple(CustomBackend, supports_async_task=False)
+ def test_enqueue_async_task_on_non_async_backend(self):
+ with self.assertRaisesMessage(
+ InvalidTask, "Backend does not support async Tasks."
+ ):
+ default_task_backend.validate_task(test_tasks.noop_task_async)
+
+ def test_backend_does_not_support_priority(self):
+ with self.assertRaisesMessage(
+ InvalidTask, "Backend does not support setting priority of tasks."
+ ):
+ test_tasks.noop_task.using(priority=10)
+
+ def test_options(self):
+ with self.assertLogs(__name__, level="INFO") as captured_logs:
+ test_tasks.noop_task.enqueue()
+ self.assertEqual(len(captured_logs.output), 1)
+ self.assertIn("PREFIX: Task enqueued", captured_logs.output[0])
+
+ def test_no_enqueue(self):
+ with self.assertRaisesMessage(
+ TypeError,
+ "Can't instantiate abstract class CustomBackendNoEnqueue "
+ "without an implementation for abstract method 'enqueue'",
+ ):
+ test_tasks.noop_task.using(backend="no_enqueue")
diff --git a/tests/tasks/test_dummy_backend.py b/tests/tasks/test_dummy_backend.py
new file mode 100644
index 0000000000..27205f6ab3
--- /dev/null
+++ b/tests/tasks/test_dummy_backend.py
@@ -0,0 +1,337 @@
+from typing import cast
+from unittest import mock
+
+from django.db import transaction
+from django.db.utils import ConnectionHandler
+from django.tasks import TaskResultStatus, default_task_backend, task_backends
+from django.tasks.backends.dummy import DummyBackend
+from django.tasks.base import Task
+from django.tasks.exceptions import InvalidTask, TaskResultDoesNotExist
+from django.test import (
+ SimpleTestCase,
+ TransactionTestCase,
+ override_settings,
+ skipIfDBFeature,
+)
+
+from . import tasks as test_tasks
+
+
+@override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "QUEUES": [],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+)
+class DummyBackendTestCase(SimpleTestCase):
+ def setUp(self):
+ default_task_backend.clear()
+
+ def test_using_correct_backend(self):
+ self.assertEqual(default_task_backend, task_backends["default"])
+ self.assertIsInstance(task_backends["default"], DummyBackend)
+ self.assertEqual(default_task_backend.alias, "default")
+ self.assertEqual(default_task_backend.options, {})
+
+ def test_enqueue_task(self):
+ for task in [test_tasks.noop_task, test_tasks.noop_task_async]:
+ with self.subTest(task):
+ result = cast(Task, task).enqueue(1, two=3)
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ self.assertIs(result.is_finished, False)
+ self.assertIsNone(result.started_at)
+ self.assertIsNone(result.last_attempted_at)
+ self.assertIsNone(result.finished_at)
+ with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
+ result.return_value
+ self.assertEqual(result.task, task)
+ self.assertEqual(result.args, [1])
+ self.assertEqual(result.kwargs, {"two": 3})
+ self.assertEqual(result.attempts, 0)
+
+ self.assertIn(result, default_task_backend.results)
+
+ async def test_enqueue_task_async(self):
+ for task in [test_tasks.noop_task, test_tasks.noop_task_async]:
+ with self.subTest(task):
+ result = await cast(Task, task).aenqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ self.assertIs(result.is_finished, False)
+ self.assertIsNone(result.started_at)
+ self.assertIsNone(result.last_attempted_at)
+ self.assertIsNone(result.finished_at)
+ with self.assertRaisesMessage(ValueError, "Task has not finished yet"):
+ result.return_value
+ self.assertEqual(result.task, task)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+ self.assertEqual(result.attempts, 0)
+
+ self.assertIn(result, default_task_backend.results)
+
+ def test_get_result(self):
+ result = default_task_backend.enqueue(test_tasks.noop_task, (), {})
+
+ new_result = default_task_backend.get_result(result.id)
+
+ self.assertEqual(result, new_result)
+
+ async def test_get_result_async(self):
+ result = await default_task_backend.aenqueue(test_tasks.noop_task, (), {})
+
+ new_result = await default_task_backend.aget_result(result.id)
+
+ self.assertEqual(result, new_result)
+
+ def test_refresh_result(self):
+ result = default_task_backend.enqueue(
+ test_tasks.calculate_meaning_of_life, (), {}
+ )
+
+ enqueued_result = default_task_backend.results[0]
+ object.__setattr__(enqueued_result, "status", TaskResultStatus.SUCCESSFUL)
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ result.refresh()
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ async def test_refresh_result_async(self):
+ result = await default_task_backend.aenqueue(
+ test_tasks.calculate_meaning_of_life, (), {}
+ )
+
+ enqueued_result = default_task_backend.results[0]
+ object.__setattr__(enqueued_result, "status", TaskResultStatus.SUCCESSFUL)
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ await result.arefresh()
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ async def test_get_missing_result(self):
+ with self.assertRaises(TaskResultDoesNotExist):
+ default_task_backend.get_result("123")
+
+ with self.assertRaises(TaskResultDoesNotExist):
+ await default_task_backend.aget_result("123")
+
+ def test_enqueue_on_commit(self):
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(
+ test_tasks.enqueue_on_commit_task
+ ),
+ True,
+ )
+
+ def test_enqueue_logs(self):
+ with self.assertLogs("django.tasks", level="DEBUG") as captured_logs:
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertEqual(len(captured_logs.output), 1)
+ self.assertIn("enqueued", captured_logs.output[0])
+ self.assertIn(result.id, captured_logs.output[0])
+
+ def test_errors(self):
+ result = test_tasks.noop_task.enqueue()
+ self.assertEqual(result.errors, [])
+
+ def test_validate_disallowed_async_task(self):
+ with mock.patch.multiple(default_task_backend, supports_async_task=False):
+ with self.assertRaisesMessage(
+ InvalidTask, "Backend does not support async Tasks."
+ ):
+ default_task_backend.validate_task(test_tasks.noop_task_async)
+
+ def test_check(self):
+ errors = list(default_task_backend.check())
+ self.assertEqual(len(errors), 0, errors)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "ENQUEUE_ON_COMMIT": True,
+ }
+ }
+ )
+ @mock.patch("django.tasks.backends.base.connections", ConnectionHandler({}))
+ def test_enqueue_on_commit_with_no_databases(self):
+ self.assertIn(
+ "tasks.E001", {error.id for error in default_task_backend.check()}
+ )
+
+ def test_takes_context(self):
+ result = test_tasks.get_task_id.enqueue()
+ self.assertEqual(result.status, TaskResultStatus.READY)
+
+ def test_clear(self):
+ result = test_tasks.noop_task.enqueue()
+
+ default_task_backend.get_result(result.id)
+
+ default_task_backend.clear()
+
+ with self.assertRaisesMessage(TaskResultDoesNotExist, result.id):
+ default_task_backend.get_result(result.id)
+
+ def test_validate_on_enqueue(self):
+ task_with_custom_queue_name = test_tasks.noop_task.using(
+ queue_name="unknown_queue"
+ )
+
+ with override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "QUEUES": ["queue-1"],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ ):
+ with self.assertRaisesMessage(
+ InvalidTask, "Queue 'unknown_queue' is not valid for backend"
+ ):
+ task_with_custom_queue_name.enqueue()
+
+ async def test_validate_on_aenqueue(self):
+ task_with_custom_queue_name = test_tasks.noop_task.using(
+ queue_name="unknown_queue"
+ )
+
+ with override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "QUEUES": ["queue-1"],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ ):
+ with self.assertRaisesMessage(
+ InvalidTask, "Queue 'unknown_queue' is not valid for backend"
+ ):
+ await task_with_custom_queue_name.aenqueue()
+
+
+class DummyBackendTransactionTestCase(TransactionTestCase):
+ available_apps = []
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "ENQUEUE_ON_COMMIT": True,
+ }
+ }
+ )
+ def test_wait_until_transaction_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ True,
+ )
+
+ with transaction.atomic():
+ test_tasks.noop_task.enqueue()
+
+ self.assertEqual(len(default_task_backend.results), 0)
+
+ self.assertEqual(len(default_task_backend.results), 1)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ )
+ def test_doesnt_wait_until_transaction_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, False)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ False,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertIsNotNone(result.enqueued_at)
+
+ self.assertEqual(len(default_task_backend.results), 1)
+
+ self.assertEqual(len(default_task_backend.results), 1)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ }
+ }
+ )
+ def test_wait_until_transaction_by_default(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ True,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertIsNone(result.enqueued_at)
+
+ self.assertEqual(len(default_task_backend.results), 0)
+
+ self.assertEqual(len(default_task_backend.results), 1)
+ self.assertIsNone(result.enqueued_at)
+ result.refresh()
+ self.assertIsNotNone(result.enqueued_at)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ )
+ def test_task_specific_enqueue_on_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, False)
+ self.assertIs(test_tasks.enqueue_on_commit_task.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(
+ test_tasks.enqueue_on_commit_task
+ ),
+ True,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.enqueue_on_commit_task.enqueue()
+
+ self.assertIsNone(result.enqueued_at)
+
+ self.assertEqual(len(default_task_backend.results), 0)
+
+ self.assertEqual(len(default_task_backend.results), 1)
+ self.assertIsNone(result.enqueued_at)
+ result.refresh()
+ self.assertIsNotNone(result.enqueued_at)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "ENQUEUE_ON_COMMIT": True,
+ }
+ }
+ )
+ @skipIfDBFeature("supports_transactions")
+ def test_enqueue_on_commit_with_no_transactions(self):
+ self.assertIn(
+ "tasks.E002", {error.id for error in default_task_backend.check()}
+ )
diff --git a/tests/tasks/test_immediate_backend.py b/tests/tasks/test_immediate_backend.py
new file mode 100644
index 0000000000..01e2841aa7
--- /dev/null
+++ b/tests/tasks/test_immediate_backend.py
@@ -0,0 +1,387 @@
+from django.db import transaction
+from django.tasks import TaskResultStatus, default_task_backend, task_backends
+from django.tasks.backends.immediate import ImmediateBackend
+from django.tasks.exceptions import InvalidTask
+from django.test import SimpleTestCase, TransactionTestCase, override_settings
+from django.utils import timezone
+
+from . import tasks as test_tasks
+
+
+@override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "QUEUES": [],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+)
+class ImmediateBackendTestCase(SimpleTestCase):
+ def test_using_correct_backend(self):
+ self.assertEqual(default_task_backend, task_backends["default"])
+ self.assertIsInstance(task_backends["default"], ImmediateBackend)
+ self.assertEqual(default_task_backend.alias, "default")
+ self.assertEqual(default_task_backend.options, {})
+
+ def test_enqueue_task(self):
+ for task in [test_tasks.noop_task, test_tasks.noop_task_async]:
+ with self.subTest(task):
+ result = task.enqueue(1, two=3)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+ self.assertIs(result.is_finished, True)
+ self.assertIsNotNone(result.started_at)
+ self.assertIsNotNone(result.last_attempted_at)
+ self.assertIsNotNone(result.finished_at)
+ self.assertGreaterEqual(result.started_at, result.enqueued_at)
+ self.assertGreaterEqual(result.finished_at, result.started_at)
+ self.assertIsNone(result.return_value)
+ self.assertEqual(result.task, task)
+ self.assertEqual(result.args, [1])
+ self.assertEqual(result.kwargs, {"two": 3})
+ self.assertEqual(result.attempts, 1)
+
+ async def test_enqueue_task_async(self):
+ for task in [test_tasks.noop_task, test_tasks.noop_task_async]:
+ with self.subTest(task):
+ result = await task.aenqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+ self.assertIs(result.is_finished, True)
+ self.assertIsNotNone(result.started_at)
+ self.assertIsNotNone(result.last_attempted_at)
+ self.assertIsNotNone(result.finished_at)
+ self.assertGreaterEqual(result.started_at, result.enqueued_at)
+ self.assertGreaterEqual(result.finished_at, result.started_at)
+ self.assertIsNone(result.return_value)
+ self.assertEqual(result.task, task)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+ self.assertEqual(result.attempts, 1)
+
+ def test_catches_exception(self):
+ test_data = [
+ (
+ test_tasks.failing_task_value_error, # Task function.
+ ValueError, # Expected exception.
+ "This Task failed due to ValueError", # Expected message.
+ ),
+ (
+ test_tasks.failing_task_system_exit,
+ SystemExit,
+ "This Task failed due to SystemExit",
+ ),
+ ]
+ for task, exception, message in test_data:
+ with (
+ self.subTest(task),
+ self.assertLogs("django.tasks", level="ERROR") as captured_logs,
+ ):
+ result = task.enqueue()
+
+ self.assertEqual(len(captured_logs.output), 1)
+ self.assertIn(message, captured_logs.output[0])
+
+ self.assertEqual(result.status, TaskResultStatus.FAILED)
+ with self.assertRaisesMessage(ValueError, "Task failed"):
+ result.return_value
+ self.assertIs(result.is_finished, True)
+ self.assertIsNotNone(result.started_at)
+ self.assertIsNotNone(result.last_attempted_at)
+ self.assertIsNotNone(result.finished_at)
+ self.assertGreaterEqual(result.started_at, result.enqueued_at)
+ self.assertGreaterEqual(result.finished_at, result.started_at)
+ self.assertEqual(result.errors[0].exception_class, exception)
+ traceback = result.errors[0].traceback
+ self.assertIs(
+ traceback
+ and traceback.endswith(f"{exception.__name__}: {message}\n"),
+ True,
+ traceback,
+ )
+ self.assertEqual(result.task, task)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+
+ def test_throws_keyboard_interrupt(self):
+ with self.assertRaises(KeyboardInterrupt):
+ with self.assertNoLogs("django.tasks", level="ERROR"):
+ default_task_backend.enqueue(
+ test_tasks.failing_task_keyboard_interrupt, [], {}
+ )
+
+ def test_complex_exception(self):
+ with self.assertLogs("django.tasks", level="ERROR"):
+ result = test_tasks.complex_exception.enqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.FAILED)
+ with self.assertRaisesMessage(ValueError, "Task failed"):
+ result.return_value
+ self.assertIsNotNone(result.started_at)
+ self.assertIsNotNone(result.last_attempted_at)
+ self.assertIsNotNone(result.finished_at)
+ self.assertGreaterEqual(result.started_at, result.enqueued_at)
+ self.assertGreaterEqual(result.finished_at, result.started_at)
+
+ self.assertIsNone(result._return_value)
+ self.assertEqual(result.errors[0].exception_class, ValueError)
+ self.assertIn(
+ 'ValueError(ValueError("This task failed"))', result.errors[0].traceback
+ )
+
+ self.assertEqual(result.task, test_tasks.complex_exception)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+
+ def test_complex_return_value(self):
+ with self.assertLogs("django.tasks", level="ERROR"):
+ result = test_tasks.complex_return_value.enqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.FAILED)
+ self.assertIsNotNone(result.started_at)
+ self.assertIsNotNone(result.last_attempted_at)
+ self.assertIsNotNone(result.finished_at)
+ self.assertGreaterEqual(result.started_at, result.enqueued_at)
+ self.assertGreaterEqual(result.finished_at, result.started_at)
+ self.assertIsNone(result._return_value)
+ self.assertEqual(result.errors[0].exception_class, TypeError)
+ self.assertIn("Unsupported type", result.errors[0].traceback)
+
+ def test_result(self):
+ result = default_task_backend.enqueue(
+ test_tasks.calculate_meaning_of_life, [], {}
+ )
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+ self.assertEqual(result.return_value, 42)
+
+ async def test_result_async(self):
+ result = await default_task_backend.aenqueue(
+ test_tasks.calculate_meaning_of_life, [], {}
+ )
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+ self.assertEqual(result.return_value, 42)
+
+ async def test_cannot_get_result(self):
+ with self.assertRaisesMessage(
+ NotImplementedError,
+ "This backend does not support retrieving or refreshing results.",
+ ):
+ default_task_backend.get_result("123")
+
+ with self.assertRaisesMessage(
+ NotImplementedError,
+ "This backend does not support retrieving or refreshing results.",
+ ):
+ await default_task_backend.aget_result(123)
+
+ async def test_cannot_refresh_result(self):
+ result = await default_task_backend.aenqueue(
+ test_tasks.calculate_meaning_of_life, (), {}
+ )
+
+ with self.assertRaisesMessage(
+ NotImplementedError,
+ "This backend does not support retrieving or refreshing results.",
+ ):
+ await result.arefresh()
+
+ with self.assertRaisesMessage(
+ NotImplementedError,
+ "This backend does not support retrieving or refreshing results.",
+ ):
+ result.refresh()
+
+ def test_cannot_pass_run_after(self):
+ with self.assertRaisesMessage(
+ InvalidTask,
+ "Backend does not support run_after.",
+ ):
+ default_task_backend.validate_task(
+ test_tasks.failing_task_value_error.using(run_after=timezone.now())
+ )
+
+ def test_enqueue_on_commit(self):
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(
+ test_tasks.enqueue_on_commit_task
+ ),
+ True,
+ )
+
+ def test_enqueue_logs(self):
+ with self.assertLogs("django.tasks", level="DEBUG") as captured_logs:
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertEqual(len(captured_logs.output), 3)
+
+ self.assertIn("enqueued", captured_logs.output[0])
+ self.assertIn(result.id, captured_logs.output[0])
+
+ self.assertIn("state=RUNNING", captured_logs.output[1])
+ self.assertIn(result.id, captured_logs.output[1])
+
+ self.assertIn("state=SUCCESSFUL", captured_logs.output[2])
+ self.assertIn(result.id, captured_logs.output[2])
+
+ def test_failed_logs(self):
+ with self.assertLogs("django.tasks", level="DEBUG") as captured_logs:
+ result = test_tasks.failing_task_value_error.enqueue()
+
+ self.assertEqual(len(captured_logs.output), 3)
+ self.assertIn("state=RUNNING", captured_logs.output[1])
+ self.assertIn(result.id, captured_logs.output[1])
+
+ self.assertIn("state=FAILED", captured_logs.output[2])
+ self.assertIn(result.id, captured_logs.output[2])
+
+ def test_takes_context(self):
+ result = test_tasks.get_task_id.enqueue()
+
+ self.assertEqual(result.return_value, result.id)
+
+ def test_context(self):
+ result = test_tasks.test_context.enqueue(1)
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ def test_validate_on_enqueue(self):
+ task_with_custom_queue_name = test_tasks.noop_task.using(
+ queue_name="unknown_queue"
+ )
+
+ with override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "QUEUES": ["queue-1"],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ ):
+ with self.assertRaisesMessage(
+ InvalidTask, "Queue 'unknown_queue' is not valid for backend"
+ ):
+ task_with_custom_queue_name.enqueue()
+
+ async def test_validate_on_aenqueue(self):
+ task_with_custom_queue_name = test_tasks.noop_task.using(
+ queue_name="unknown_queue"
+ )
+
+ with override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "QUEUES": ["queue-1"],
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ ):
+ with self.assertRaisesMessage(
+ InvalidTask, "Queue 'unknown_queue' is not valid for backend"
+ ):
+ await task_with_custom_queue_name.aenqueue()
+
+
+class ImmediateBackendTransactionTestCase(TransactionTestCase):
+ available_apps = []
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "ENQUEUE_ON_COMMIT": True,
+ }
+ }
+ )
+ def test_wait_until_transaction_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ True,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertIsNone(result.enqueued_at)
+ self.assertEqual(result.attempts, 0)
+ self.assertEqual(result.status, TaskResultStatus.READY)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+ self.assertIsNotNone(result.enqueued_at)
+ self.assertEqual(result.attempts, 1)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ )
+ def test_doesnt_wait_until_transaction_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, False)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ False,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertIsNotNone(result.enqueued_at)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ }
+ }
+ )
+ def test_wait_until_transaction_by_default(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(test_tasks.noop_task),
+ True,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertIsNone(result.enqueued_at)
+ self.assertEqual(result.status, TaskResultStatus.READY)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
+
+ @override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "ENQUEUE_ON_COMMIT": False,
+ }
+ }
+ )
+ def test_task_specific_enqueue_on_commit(self):
+ self.assertIs(default_task_backend.enqueue_on_commit, False)
+ self.assertIs(test_tasks.enqueue_on_commit_task.enqueue_on_commit, True)
+ self.assertIs(
+ default_task_backend._get_enqueue_on_commit_for_task(
+ test_tasks.enqueue_on_commit_task
+ ),
+ True,
+ )
+
+ with transaction.atomic():
+ result = test_tasks.enqueue_on_commit_task.enqueue()
+
+ self.assertIsNone(result.enqueued_at)
+ self.assertEqual(result.status, TaskResultStatus.READY)
+
+ self.assertEqual(result.status, TaskResultStatus.SUCCESSFUL)
diff --git a/tests/tasks/test_tasks.py b/tests/tasks/test_tasks.py
new file mode 100644
index 0000000000..2c11d9657d
--- /dev/null
+++ b/tests/tasks/test_tasks.py
@@ -0,0 +1,316 @@
+import dataclasses
+from datetime import datetime
+
+from django.tasks import (
+ DEFAULT_TASK_QUEUE_NAME,
+ TaskResultStatus,
+ default_task_backend,
+ task,
+ task_backends,
+)
+from django.tasks.backends.dummy import DummyBackend
+from django.tasks.backends.immediate import ImmediateBackend
+from django.tasks.base import TASK_MAX_PRIORITY, TASK_MIN_PRIORITY, Task
+from django.tasks.exceptions import (
+ InvalidTask,
+ InvalidTaskBackend,
+ TaskResultDoesNotExist,
+ TaskResultMismatch,
+)
+from django.test import SimpleTestCase, override_settings
+from django.utils import timezone
+from django.utils.module_loading import import_string
+
+from . import tasks as test_tasks
+
+
+@override_settings(
+ TASKS={
+ "default": {
+ "BACKEND": "django.tasks.backends.dummy.DummyBackend",
+ "QUEUES": ["default", "queue_1"],
+ "ENQUEUE_ON_COMMIT": False,
+ },
+ "immediate": {
+ "BACKEND": "django.tasks.backends.immediate.ImmediateBackend",
+ "ENQUEUE_ON_COMMIT": False,
+ "QUEUES": [],
+ },
+ "missing": {"BACKEND": "does.not.exist"},
+ },
+ USE_TZ=True,
+)
+class TaskTestCase(SimpleTestCase):
+ def setUp(self):
+ default_task_backend.clear()
+
+ def test_using_correct_backend(self):
+ self.assertEqual(default_task_backend, task_backends["default"])
+ self.assertIsInstance(task_backends["default"], DummyBackend)
+
+ def test_task_decorator(self):
+ self.assertIsInstance(test_tasks.noop_task, Task)
+ self.assertIsInstance(test_tasks.noop_task_async, Task)
+ self.assertIsInstance(test_tasks.noop_task_from_bare_decorator, Task)
+
+ def test_enqueue_task(self):
+ result = test_tasks.noop_task.enqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ self.assertEqual(result.task, test_tasks.noop_task)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+
+ self.assertEqual(default_task_backend.results, [result])
+
+ async def test_enqueue_task_async(self):
+ result = await test_tasks.noop_task.aenqueue()
+
+ self.assertEqual(result.status, TaskResultStatus.READY)
+ self.assertEqual(result.task, test_tasks.noop_task)
+ self.assertEqual(result.args, [])
+ self.assertEqual(result.kwargs, {})
+
+ self.assertEqual(default_task_backend.results, [result])
+
+ def test_enqueue_with_invalid_argument(self):
+ with self.assertRaisesMessage(TypeError, "Unsupported type"):
+ test_tasks.noop_task.enqueue(datetime.now())
+
+ async def test_aenqueue_with_invalid_argument(self):
+ with self.assertRaisesMessage(TypeError, "Unsupported type"):
+ await test_tasks.noop_task.aenqueue(datetime.now())
+
+ def test_using_priority(self):
+ self.assertEqual(test_tasks.noop_task.priority, 0)
+ self.assertEqual(test_tasks.noop_task.using(priority=1).priority, 1)
+ self.assertEqual(test_tasks.noop_task.priority, 0)
+
+ def test_using_queue_name(self):
+ self.assertEqual(test_tasks.noop_task.queue_name, DEFAULT_TASK_QUEUE_NAME)
+ self.assertEqual(
+ test_tasks.noop_task.using(queue_name="queue_1").queue_name, "queue_1"
+ )
+ self.assertEqual(test_tasks.noop_task.queue_name, DEFAULT_TASK_QUEUE_NAME)
+
+ def test_using_run_after(self):
+ now = timezone.now()
+
+ self.assertIsNone(test_tasks.noop_task.run_after)
+ self.assertEqual(test_tasks.noop_task.using(run_after=now).run_after, now)
+ self.assertIsNone(test_tasks.noop_task.run_after)
+
+ def test_using_unknown_backend(self):
+ self.assertEqual(test_tasks.noop_task.backend, "default")
+
+ with self.assertRaisesMessage(
+ InvalidTaskBackend, "The connection 'unknown' doesn't exist."
+ ):
+ test_tasks.noop_task.using(backend="unknown")
+
+ def test_using_missing_backend(self):
+ self.assertEqual(test_tasks.noop_task.backend, "default")
+
+ with self.assertRaisesMessage(
+ InvalidTaskBackend,
+ "Could not find backend 'does.not.exist': No module named 'does'",
+ ):
+ test_tasks.noop_task.using(backend="missing")
+
+ def test_using_creates_new_instance(self):
+ new_task = test_tasks.noop_task.using()
+
+ self.assertEqual(new_task, test_tasks.noop_task)
+ self.assertIsNot(new_task, test_tasks.noop_task)
+
+ def test_chained_using(self):
+ now = timezone.now()
+
+ run_after_task = test_tasks.noop_task.using(run_after=now)
+ self.assertEqual(run_after_task.run_after, now)
+
+ priority_task = run_after_task.using(priority=10)
+ self.assertEqual(priority_task.priority, 10)
+ self.assertEqual(priority_task.run_after, now)
+
+ self.assertEqual(run_after_task.priority, 0)
+
+ async def test_refresh_result(self):
+ result = await test_tasks.noop_task.aenqueue()
+
+ original_result = dataclasses.asdict(result)
+
+ result.refresh()
+
+ self.assertEqual(dataclasses.asdict(result), original_result)
+
+ await result.arefresh()
+
+ self.assertEqual(dataclasses.asdict(result), original_result)
+
+ def test_naive_datetime(self):
+ with self.assertRaisesMessage(
+ InvalidTask, "run_after must be an aware datetime."
+ ):
+ test_tasks.noop_task.using(run_after=datetime.now())
+
+ def test_invalid_priority(self):
+ with self.assertRaisesMessage(
+ InvalidTask,
+ f"priority must be a whole number between {TASK_MIN_PRIORITY} and "
+ f"{TASK_MAX_PRIORITY}.",
+ ):
+ test_tasks.noop_task.using(priority=-101)
+
+ with self.assertRaisesMessage(
+ InvalidTask,
+ f"priority must be a whole number between {TASK_MIN_PRIORITY} and "
+ f"{TASK_MAX_PRIORITY}.",
+ ):
+ test_tasks.noop_task.using(priority=101)
+
+ with self.assertRaisesMessage(
+ InvalidTask,
+ f"priority must be a whole number between {TASK_MIN_PRIORITY} and "
+ f"{TASK_MAX_PRIORITY}.",
+ ):
+ test_tasks.noop_task.using(priority=3.1)
+
+ test_tasks.noop_task.using(priority=100)
+ test_tasks.noop_task.using(priority=-100)
+ test_tasks.noop_task.using(priority=0)
+
+ def test_unknown_queue_name(self):
+ with self.assertRaisesMessage(
+ InvalidTask, "Queue 'queue-2' is not valid for backend."
+ ):
+ test_tasks.noop_task.using(queue_name="queue-2")
+ # Validation is bypassed when the backend QUEUES is an empty list.
+ self.assertEqual(
+ test_tasks.noop_task.using(
+ queue_name="queue-2", backend="immediate"
+ ).queue_name,
+ "queue-2",
+ )
+
+ def test_call_task(self):
+ self.assertEqual(test_tasks.calculate_meaning_of_life.call(), 42)
+
+ async def test_call_task_async(self):
+ self.assertEqual(await test_tasks.calculate_meaning_of_life.acall(), 42)
+
+ async def test_call_async_task(self):
+ self.assertIsNone(await test_tasks.noop_task_async.acall())
+
+ def test_call_async_task_sync(self):
+ self.assertIsNone(test_tasks.noop_task_async.call())
+
+ def test_get_result(self):
+ result = default_task_backend.enqueue(test_tasks.noop_task, (), {})
+
+ new_result = test_tasks.noop_task.get_result(result.id)
+
+ self.assertEqual(result, new_result)
+
+ async def test_get_result_async(self):
+ result = await default_task_backend.aenqueue(test_tasks.noop_task, (), {})
+
+ new_result = await test_tasks.noop_task.aget_result(result.id)
+
+ self.assertEqual(result, new_result)
+
+ async def test_get_missing_result(self):
+ with self.assertRaises(TaskResultDoesNotExist):
+ test_tasks.noop_task.get_result("123")
+
+ with self.assertRaises(TaskResultDoesNotExist):
+ await test_tasks.noop_task.aget_result("123")
+
+ def test_get_incorrect_result(self):
+ result = default_task_backend.enqueue(test_tasks.noop_task_async, (), {})
+ with self.assertRaisesMessage(TaskResultMismatch, "Task does not match"):
+ test_tasks.noop_task.get_result(result.id)
+
+ async def test_get_incorrect_result_async(self):
+ result = await default_task_backend.aenqueue(test_tasks.noop_task_async, (), {})
+ with self.assertRaisesMessage(TaskResultMismatch, "Task does not match"):
+ await test_tasks.noop_task.aget_result(result.id)
+
+ def test_invalid_function(self):
+ for invalid_function in [any, self.test_invalid_function]:
+ with self.subTest(invalid_function):
+ with self.assertRaisesMessage(
+ InvalidTask,
+ "Task function must be defined at a module level.",
+ ):
+ task()(invalid_function)
+
+ def test_get_backend(self):
+ self.assertEqual(test_tasks.noop_task.backend, "default")
+ self.assertIsInstance(test_tasks.noop_task.get_backend(), DummyBackend)
+
+ immediate_task = test_tasks.noop_task.using(backend="immediate")
+ self.assertEqual(immediate_task.backend, "immediate")
+ self.assertIsInstance(immediate_task.get_backend(), ImmediateBackend)
+
+ def test_name(self):
+ self.assertEqual(test_tasks.noop_task.name, "noop_task")
+ self.assertEqual(test_tasks.noop_task_async.name, "noop_task_async")
+
+ def test_module_path(self):
+ self.assertEqual(test_tasks.noop_task.module_path, "tasks.tasks.noop_task")
+ self.assertEqual(
+ test_tasks.noop_task_async.module_path, "tasks.tasks.noop_task_async"
+ )
+
+ self.assertIs(
+ import_string(test_tasks.noop_task.module_path), test_tasks.noop_task
+ )
+ self.assertIs(
+ import_string(test_tasks.noop_task_async.module_path),
+ test_tasks.noop_task_async,
+ )
+
+ @override_settings(TASKS={})
+ def test_no_backends(self):
+ with self.assertRaises(InvalidTaskBackend):
+ test_tasks.noop_task.enqueue()
+
+ def test_task_error_invalid_exception(self):
+ with self.assertLogs("django.tasks"):
+ immediate_task = test_tasks.failing_task_value_error.using(
+ backend="immediate"
+ ).enqueue()
+
+ self.assertEqual(len(immediate_task.errors), 1)
+
+ object.__setattr__(
+ immediate_task.errors[0], "exception_class_path", "subprocess.run"
+ )
+
+ with self.assertRaisesMessage(
+ ValueError, "'subprocess.run' does not reference a valid exception."
+ ):
+ immediate_task.errors[0].exception_class
+
+ def test_task_error_unknown_module(self):
+ with self.assertLogs("django.tasks"):
+ immediate_task = test_tasks.failing_task_value_error.using(
+ backend="immediate"
+ ).enqueue()
+
+ self.assertEqual(len(immediate_task.errors), 1)
+
+ object.__setattr__(
+ immediate_task.errors[0], "exception_class_path", "does.not.exist"
+ )
+
+ with self.assertRaises(ImportError):
+ immediate_task.errors[0].exception_class
+
+ def test_takes_context_without_taking_context(self):
+ with self.assertRaisesMessage(
+ InvalidTask,
+ "Task takes context but does not have a first argument of 'context'.",
+ ):
+ task(takes_context=True)(test_tasks.calculate_meaning_of_life.func)
diff --git a/tests/utils_tests/test_inspect.py b/tests/utils_tests/test_inspect.py
index b8359c2508..38ea35ecfb 100644
--- a/tests/utils_tests/test_inspect.py
+++ b/tests/utils_tests/test_inspect.py
@@ -1,5 +1,7 @@
+import subprocess
import unittest
+from django.shortcuts import aget_object_or_404
from django.utils import inspect
@@ -100,3 +102,50 @@ class TestInspectMethods(unittest.TestCase):
self.assertIs(inspect.func_accepts_kwargs(Person().just_args), False)
self.assertIs(inspect.func_accepts_kwargs(Person.all_kinds), True)
self.assertIs(inspect.func_accepts_kwargs(Person().just_args), False)
+
+
+class IsModuleLevelFunctionTestCase(unittest.TestCase):
+ @classmethod
+ def _class_method(cls) -> None:
+ return None
+
+ @staticmethod
+ def _static_method() -> None:
+ return None
+
+ def test_builtin(self):
+ self.assertIs(inspect.is_module_level_function(any), False)
+ self.assertIs(inspect.is_module_level_function(isinstance), False)
+
+ def test_from_module(self):
+ self.assertIs(inspect.is_module_level_function(subprocess.run), True)
+ self.assertIs(inspect.is_module_level_function(subprocess.check_output), True)
+ self.assertIs(
+ inspect.is_module_level_function(inspect.is_module_level_function), True
+ )
+
+ def test_private_function(self):
+ def private_function():
+ pass
+
+ self.assertIs(inspect.is_module_level_function(private_function), False)
+
+ def test_coroutine(self):
+ self.assertIs(inspect.is_module_level_function(aget_object_or_404), True)
+
+ def test_method(self):
+ self.assertIs(inspect.is_module_level_function(self.test_method), False)
+ self.assertIs(inspect.is_module_level_function(self.setUp), False)
+
+ def test_unbound_method(self):
+ self.assertIs(
+ inspect.is_module_level_function(self.__class__.test_unbound_method), True
+ )
+ self.assertIs(inspect.is_module_level_function(self.__class__.setUp), True)
+
+ def test_lambda(self):
+ self.assertIs(inspect.is_module_level_function(lambda: True), False)
+
+ def test_class_and_static_method(self):
+ self.assertIs(inspect.is_module_level_function(self._static_method), True)
+ self.assertIs(inspect.is_module_level_function(self._class_method), False)
diff --git a/tests/utils_tests/test_json.py b/tests/utils_tests/test_json.py
new file mode 100644
index 0000000000..9d137149ed
--- /dev/null
+++ b/tests/utils_tests/test_json.py
@@ -0,0 +1,46 @@
+import json
+from collections import UserList, defaultdict
+from datetime import datetime
+from decimal import Decimal
+
+from django.test import SimpleTestCase
+from django.utils.json import normalize_json
+
+
+class JSONNormalizeTestCase(SimpleTestCase):
+ def test_converts_json_types(self):
+ for test_case, expected in [
+ (None, "null"),
+ (True, "true"),
+ (False, "false"),
+ (2, "2"),
+ (3.0, "3.0"),
+ (1e23 + 1, "1e+23"),
+ ("1", '"1"'),
+ (b"hello", '"hello"'),
+ ([], "[]"),
+ (UserList([1, 2]), "[1, 2]"),
+ ({}, "{}"),
+ ({1: "a"}, '{"1": "a"}'),
+ ({"foo": (1, 2, 3)}, '{"foo": [1, 2, 3]}'),
+ (defaultdict(list), "{}"),
+ (float("nan"), "NaN"),
+ (float("inf"), "Infinity"),
+ (float("-inf"), "-Infinity"),
+ ]:
+ with self.subTest(test_case):
+ normalized = normalize_json(test_case)
+ # Ensure that the normalized result is serializable.
+ self.assertEqual(json.dumps(normalized), expected)
+
+ def test_bytes_decode_error(self):
+ with self.assertRaisesMessage(ValueError, "Unsupported value"):
+ normalize_json(b"\xff")
+
+ def test_encode_error(self):
+ for test_case in [self, any, object(), datetime.now(), set(), Decimal("3.42")]:
+ with (
+ self.subTest(test_case),
+ self.assertRaisesMessage(TypeError, "Unsupported type"),
+ ):
+ normalize_json(test_case)