summaryrefslogtreecommitdiff
path: root/django/tasks/base.py
diff options
context:
space:
mode:
authorJake Howard <git@theorangeone.net>2025-07-17 12:51:09 +0100
committernessita <124304+nessita@users.noreply.github.com>2025-09-16 17:28:32 -0300
commit4289966d1b8e848e5e460b7c782dac009d746b20 (patch)
treeef1d61a33562579d985c762036db5f7aa01406fc /django/tasks/base.py
parent218f69f05eb51da1ea17d62a914a67ceff5bfd55 (diff)
Fixed #35859 -- Added background Tasks framework interface.
This work implements what was defined in DEP 14 (https://github.com/django/deps/blob/main/accepted/0014-background-workers.rst). Thanks to Raphael Gaschignard, Eric Holscher, Ran Benita, Sarah Boyce, Jacob Walls, and Natalia Bidart for the reviews.
Diffstat (limited to 'django/tasks/base.py')
-rw-r--r--django/tasks/base.py253
1 files changed, 253 insertions, 0 deletions
diff --git a/django/tasks/base.py b/django/tasks/base.py
new file mode 100644
index 0000000000..905dbef597
--- /dev/null
+++ b/django/tasks/base.py
@@ -0,0 +1,253 @@
+from dataclasses import dataclass, field, replace
+from datetime import datetime
+from inspect import isclass, iscoroutinefunction
+from typing import Any, Callable, Dict, Optional
+
+from asgiref.sync import async_to_sync, sync_to_async
+
+from django.db.models.enums import TextChoices
+from django.utils.json import normalize_json
+from django.utils.module_loading import import_string
+from django.utils.translation import pgettext_lazy
+
+from .exceptions import TaskResultMismatch
+
+DEFAULT_TASK_BACKEND_ALIAS = "default"
+DEFAULT_TASK_PRIORITY = 0
+DEFAULT_TASK_QUEUE_NAME = "default"
+TASK_MAX_PRIORITY = 100
+TASK_MIN_PRIORITY = -100
+TASK_REFRESH_ATTRS = {
+ "errors",
+ "_return_value",
+ "finished_at",
+ "started_at",
+ "last_attempted_at",
+ "status",
+ "enqueued_at",
+ "worker_ids",
+}
+
+
+class TaskResultStatus(TextChoices):
+ # The Task has just been enqueued, or is ready to be executed again.
+ READY = ("READY", pgettext_lazy("Task", "Ready"))
+ # The Task is currently running.
+ RUNNING = ("RUNNING", pgettext_lazy("Task", "Running"))
+ # The Task raised an exception during execution, or was unable to start.
+ FAILED = ("FAILED", pgettext_lazy("Task", "Failed"))
+ # The Task has finished running successfully.
+ SUCCESSFUL = ("SUCCESSFUL", pgettext_lazy("Task", "Successful"))
+
+
+@dataclass(frozen=True, slots=True, kw_only=True)
+class Task:
+ priority: int
+ func: Callable # The Task function.
+ backend: str
+ queue_name: str
+ run_after: Optional[datetime] # The earliest this Task will run.
+
+ # Whether the Task will be enqueued when the current transaction commits,
+ # immediately, or whatever the backend decides.
+ enqueue_on_commit: Optional[bool]
+
+ # Whether the Task receives the Task context when executed.
+ takes_context: bool = False
+
+ def __post_init__(self):
+ self.get_backend().validate_task(self)
+
+ @property
+ def name(self):
+ return self.func.__name__
+
+ def using(
+ self,
+ *,
+ priority=None,
+ queue_name=None,
+ run_after=None,
+ backend=None,
+ ):
+ """Create a new Task with modified defaults."""
+
+ changes = {}
+ if priority is not None:
+ changes["priority"] = priority
+ if queue_name is not None:
+ changes["queue_name"] = queue_name
+ if run_after is not None:
+ changes["run_after"] = run_after
+ if backend is not None:
+ changes["backend"] = backend
+ return replace(self, **changes)
+
+ def enqueue(self, *args, **kwargs):
+ """Queue up the Task to be executed."""
+ return self.get_backend().enqueue(self, args, kwargs)
+
+ async def aenqueue(self, *args, **kwargs):
+ """Queue up the Task to be executed."""
+ return await self.get_backend().aenqueue(self, args, kwargs)
+
+ def get_result(self, result_id):
+ """
+ Retrieve a task result by id.
+
+ Raise TaskResultDoesNotExist if such result does not exist, or raise
+ TaskResultMismatch if the result exists but belongs to another Task.
+ """
+ result = self.get_backend().get_result(result_id)
+ if result.task.func != self.func:
+ raise TaskResultMismatch(
+ f"Task does not match (received {result.task.module_path!r})"
+ )
+ return result
+
+ async def aget_result(self, result_id):
+ """See get_result()."""
+ result = await self.get_backend().aget_result(result_id)
+ if result.task.func != self.func:
+ raise TaskResultMismatch(
+ f"Task does not match (received {result.task.module_path!r})"
+ )
+ return result
+
+ def call(self, *args, **kwargs):
+ if iscoroutinefunction(self.func):
+ return async_to_sync(self.func)(*args, **kwargs)
+ return self.func(*args, **kwargs)
+
+ async def acall(self, *args, **kwargs):
+ if iscoroutinefunction(self.func):
+ return await self.func(*args, **kwargs)
+ return await sync_to_async(self.func)(*args, **kwargs)
+
+ def get_backend(self):
+ from . import task_backends
+
+ return task_backends[self.backend]
+
+ @property
+ def module_path(self):
+ return f"{self.func.__module__}.{self.func.__qualname__}"
+
+
+def task(
+ function=None,
+ *,
+ priority=DEFAULT_TASK_PRIORITY,
+ queue_name=DEFAULT_TASK_QUEUE_NAME,
+ backend=DEFAULT_TASK_BACKEND_ALIAS,
+ enqueue_on_commit=None,
+ takes_context=False,
+):
+ from . import task_backends
+
+ def wrapper(f):
+ return task_backends[backend].task_class(
+ priority=priority,
+ func=f,
+ queue_name=queue_name,
+ backend=backend,
+ enqueue_on_commit=enqueue_on_commit,
+ takes_context=takes_context,
+ run_after=None,
+ )
+
+ if function:
+ return wrapper(function)
+ return wrapper
+
+
+@dataclass(frozen=True, slots=True, kw_only=True)
+class TaskError:
+ exception_class_path: str
+ traceback: str
+
+ @property
+ def exception_class(self):
+ # Lazy resolve the exception class.
+ exception_class = import_string(self.exception_class_path)
+
+ if not isclass(exception_class) or not issubclass(
+ exception_class, BaseException
+ ):
+ raise ValueError(
+ f"{self.exception_class_path!r} does not reference a valid exception."
+ )
+ return exception_class
+
+
+@dataclass(frozen=True, slots=True, kw_only=True)
+class TaskResult:
+ task: Task
+
+ id: str # Unique identifier for the task result.
+ status: TaskResultStatus
+ enqueued_at: Optional[datetime] # Time the task was enqueued.
+ started_at: Optional[datetime] # Time the task was started.
+ finished_at: Optional[datetime] # Time the task was finished.
+
+ # Time the task was last attempted to be run.
+ last_attempted_at: Optional[datetime]
+
+ args: list # Arguments to pass to the task function.
+ kwargs: Dict[str, Any] # Keyword arguments to pass to the task function.
+ backend: str
+ errors: list[TaskError] # Errors raised when running the task.
+ worker_ids: list[str] # Workers which have processed the task.
+
+ _return_value: Optional[Any] = field(init=False, default=None)
+
+ def __post_init__(self):
+ object.__setattr__(self, "args", normalize_json(self.args))
+ object.__setattr__(self, "kwargs", normalize_json(self.kwargs))
+
+ @property
+ def return_value(self):
+ """
+ The return value of the task.
+
+ If the task didn't succeed, an exception is raised.
+ This is to distinguish against the task returning None.
+ """
+ if self.status == TaskResultStatus.SUCCESSFUL:
+ return self._return_value
+ elif self.status == TaskResultStatus.FAILED:
+ raise ValueError("Task failed")
+ else:
+ raise ValueError("Task has not finished yet")
+
+ @property
+ def is_finished(self):
+ return self.status in {TaskResultStatus.FAILED, TaskResultStatus.SUCCESSFUL}
+
+ @property
+ def attempts(self):
+ return len(self.worker_ids)
+
+ def refresh(self):
+ """Reload the cached task data from the task store."""
+ refreshed_task = self.task.get_backend().get_result(self.id)
+
+ for attr in TASK_REFRESH_ATTRS:
+ object.__setattr__(self, attr, getattr(refreshed_task, attr))
+
+ async def arefresh(self):
+ """
+ Reload the cached task data from the task store
+ """
+ refreshed_task = await self.task.get_backend().aget_result(self.id)
+ for attr in TASK_REFRESH_ATTRS:
+ object.__setattr__(self, attr, getattr(refreshed_task, attr))
+
+
+@dataclass(frozen=True, slots=True, kw_only=True)
+class TaskContext:
+ task_result: TaskResult
+
+ @property
+ def attempt(self):
+ return self.task_result.attempts