---
title: "Asynchronous Tasks"
url: https://develop.sentry.dev/backend/application-domains/tasks/
---

# Asynchronous Tasks

Sentry includes a framework for scheduling and executing tasks that are executed asynchronously in a background worker process. The task framework within Sentry is inspired by [Celery](https://docs.celeryproject.org) as it was originally built with celery.

## [Defining Tasks](https://develop.sentry.dev/backend/application-domains/tasks.md#defining-tasks)

Sentry tasks are configured with the `instrumented_task` decorator that includes features like automatic tracing and metric collection, and multi-region silo enforcement.

```python
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import issues_tasks
from sentry.taskworker.retry import Retry

@instrumented_task(
    name="sentry.widgets.tasks.do_work",
    namespace=issues_tasks,
    retry=Retry(times=3, on=(ConnectionError,)),
    processing_deadline_duration=60
)
def do_work(organization_id: int, issue_id: int, **kwargs) -> None:
    ...
```

When defining tasks there are some constraints:

* All tasks *must* have names. The name of task is serialized into an 'activation' message that is persisted in Kafka. Task names must be stable between deployments to avoid lost tasks.

* Task parameters *must* be JSON serializable. You cannot pass arbitrary python objects through task parameters.

* Tasks *must* define a 'processing deadline'. After a task's processing deadline has elapsed, it will be killed by the worker runtime. Tasks that are killed for execution duration are not automatically retried.

* All tasks must be assigned to a 'namespace'. A namespace is a group of related tasks that are operated together and share a backlog.

* The return value of a task is not stored and ignored by workers.

* The module containing a task *must* be added to `TASKWORKER_IMPORTS` in `src/sentry/conf/server.py`

## [Scheduling Tasks](https://develop.sentry.dev/backend/application-domains/tasks.md#scheduling-tasks)

With our task defined we can schedule a task (also called an "activation"):

```python
from sentry.widgets.tasks import do_work

# Can call the task synchronously like a normal function
do_work(organization_id=org.id, issue_id=issue.id)

# Use .delay() to schedule a task to run in the future as soon as possible
do_work.delay(organization_id=org.id, issue_id=issue.id)

# Use .apply_async() when you need to define headers, countdown, or expires
# for your task. Here we schedule a task to run in 5 minutes (300 seconds)
do_work.apply_async(
  kwargs={"organization_id": org.id, "issue_id": issue.id},
	countdown=300
)
```

When tasks are executed, the parameter payload is deserialized, and the task function is called. Tasks are successfully completed if they don't raise an error. If an error is raised from a task, or the task's deadline expires, the task is considered a failure and needs to be retried, put into a dead-letter queue or dropped depending on the task and failure.

## [Retries](https://develop.sentry.dev/backend/application-domains/tasks.md#retries)

When defining tasks, you can define a retry policy with the `retry` parameter. When a worker executes an activation with a retry policy, any non-successful outcome will result in the retry policy being evaluated. If the task has retries remaining, and the captured error is a retriable error, the worker sends a status of retry to the worker's broker. The taskbroker will take care of marking the current activation as complete and producing a new activation to be processed later.

If a task does not define a retry policy the retry policy of the task namespace is inherited.

```python
@instrumented_task(
    name="sentry.issues.tasks.deliver_issue_webhook",
    namespace=issues_tasks,
    retry=Retry(times=3, times_exceeded=LastAction.Deadletter),
)
def deliver_issue_webhook(organization_id: int, group_id: int) -> None:
   ...
```

### [Conditional Retries](https://develop.sentry.dev/backend/application-domains/tasks.md#conditional-retries)

Retries can be conditional based on the exception type:

```python
retry=Retry(on=(IntegrationError,), times=3, times_exceeded=LastAction.Discard)
```

### [Retry delays](https://develop.sentry.dev/backend/application-domains/tasks.md#retry-delays)

By default retries will be executed as soon as they are consumed. If a task needs to stagger retries, it can use a delayed retry.

```python
@instrumented_task(
    name="sentry.integrations.fetch_commits",
    namespace=issues_tasks,
    retry=Retry(times=3, on=(IntegrationError, ), delay=30)
)
def fetch_commits(repository_id: int) -> None:
    ...
```

With the above configuration, each retry will be processed at least 30 seconds after the previous attempt. The delay between retries could be longer than 30 seconds, but won’t be shorter.

## [Processing Deadlines](https://develop.sentry.dev/backend/application-domains/tasks.md#processing-deadlines)

Every task has a 'processing deadline' which is the maximum expected runtime for a task. If a task does not define a processing deadline, it will inherit the deadline defined on the task's namespace or use the default of **10 seconds**. Task deadlines are intended to be generous and are intended to prevent workers being saturated by tasks running for unbounded amounts of time.

```python
@instrumented_task(
    name="sentry.integrations.fetch_commits",
    namespace=issues_tasks,
    # Extended from the default 10
    processing_deadline_duration=60
)
def fetch_commits(repository_id: int) -> None:
    ...
```

After this a task has run for the length of its processing deadline, it will be interrupted by `SIGALRM` which raises a `ProcessingDeadlineExceeded` error which will interrupt your task’s logic.

### [Resolving deadline issues](https://develop.sentry.dev/backend/application-domains/tasks.md#resolving-deadline-issues)

In most scenarios the simplest solution is to extend the deadline for a task. This is the recommended solution until you get above 20min of duration. After this duration the chances of your task being terminated by a deploy increase quickly. Instead of extending the deadline further, you should rethink your logic and partition the workload into smaller batches, or individual jobs that can be processed independently. Instead of mapping all projects in a single task, spawn multiple tasks.

## [Expiration deadlines](https://develop.sentry.dev/backend/application-domains/tasks.md#expiration-deadlines)

A task's expiration time defines a point in time after which a task is considered expired and should not be executed. This mechanism allows tasks to be skipped if they are stale and their results are no longer relevant.

```python
@instrumented_tasks(
    name="sentry.issues.tasks.deliver_issue_webhook",
    namespace=issues_tasks,
    expires=timedelta("5 minutes"),
)
def deliver_issue_webhook(organization_id: int, group_id: int):
    ...
```

Expiration times can be expressed as `timedelta` objects or a number of seconds. Tasks that are past their expiration will not be sent to workers. Instead they will be discarded or dead-lettered depending on task configuration.

## [Future schedules](https://develop.sentry.dev/backend/application-domains/tasks.md#future-schedules)

Tasks can be scheduled to be run up to an hour in the future with the `countdown` parameter.

```jsx
deliver_issue_webhook.apply_async((countdown = timedelta((minutes = 10))));
```

Countdown tasks will be processed and retained by taskbroker until their countdown has elapsed. Once the countdown delay has elapsed the task will be made available for workers.

## [Idempotency (at\_most\_once)](https://develop.sentry.dev/backend/application-domains/tasks.md#idempotency-at_most_once)

Tasks are processed with at-least-once guarantees. A task may be attempted multiple times if processing deadlines are exceeded. To prevent multiple executions, tasks can enable `at_most_once` which enables at-most-once execution.

```python
@instrumented_task(
    name="sentry.issues.tasks.deliver_issue_webhook",
    namespace=issues_tasks,
    at_most_once=True,
)
def deliver_issue_webhook(organization_id: int, group_id: int) -> None:
   ...
```

If an idempotent task exceeds a processing deadline, it will *not* be retried.

## [Task Aliasing](https://develop.sentry.dev/backend/application-domains/tasks.md#task-aliasing)

Task aliasing allows you to rename tasks or move them to different namespaces while maintaining backwards compatibility with existing task activations in Kafka. This is useful when refactoring task organization or renaming tasks without losing queued tasks. Task aliases inherit the same retry policy, processing deadline, and other configuration from the primary task definition.

### [Renaming a Task](https://develop.sentry.dev/backend/application-domains/tasks.md#renaming-a-task)

When renaming a task, use the `alias` parameter to specify the old task name:

```python
@instrumented_task(
    name="sentry.widgets.tasks.do_work_v2",  # New task name
    namespace=issues_tasks,
    alias="sentry.widgets.tasks.do_work",    # Old task name
)
def do_work() -> None:
    ...
```

New activations will be created using `name`. However, both task names will be registered and can process activations. Workers will handle tasks sent to either `do_work_v2` or `do_work` with the same function.

### [Moving to a Different Namespace](https://develop.sentry.dev/backend/application-domains/tasks.md#moving-to-a-different-namespace)

To move a task from one namespace to another, use the `alias_namespace` parameter:

```python
from sentry.taskworker.namespaces import issues_tasks, integrations_tasks

@instrumented_task(
    name="sentry.widgets.tasks.do_work",
    namespace=integrations_tasks,  # New namespace
    alias_namespace=issues_tasks,  # Old namespace
)
def do_work() -> None:
    ...
```

The task will be registered in both `performance_tasks` and `issues_tasks` for the same task name `sentry.widgets.tasks.do_work`, allowing in-flight activations in the old namespace to process.

### [Renaming and Moving Together](https://develop.sentry.dev/backend/application-domains/tasks.md#renaming-and-moving-together)

You can combine both `alias` and `alias_namespace` to rename a task and move it to a different namespace simultaneously:

```python
@instrumented_task(
    name="sentry.widgets.tasks.do_work_v2",   # New name
    namespace=integrations_tasks,             # New namespace
    alias="sentry.widgets.tasks.do_work",     # Old name
    alias_namespace=issues_tasks,             # Old namespace
)
def do_work() -> None:
    ...
```

This registers the task as both:

* `integrations_tasks:sentry.widgets.tasks.do_work_v2` (new)
* `issues_tasks:sentry.widgets.tasks.do_work` (old)

## [External Tasks](https://develop.sentry.dev/backend/application-domains/tasks.md#external-tasks)

An application can create tasks for another application to execute through the usage of **external namespaces**:

```python
from sentry.taskworker.app import app

# Create an external namespace
launchpad_tasks = app.create_external_namespace(
  application="launchpad",
  name="default"
)
```

With an external namespace you can register and spawn **external tasks**.

```python
@launchpad_tasks.register(name="launchpad.task.name")
def run_process(org_id: int, project_id: int, payload: bytes) -> None:
    pass


# Schedule the task
run_process.delay(org_id=1, project_id=123, payload=blob)
```

Like local tasks, external tasks can typecheck their parameters and support all of the retry, deadline and idempotency features tasks provide. When an external task is produced, the producing application's task router will be used to resolve which topic external task activations are sent to. The task router will receive an application prefixed namespace name eg. `launchpad:default`.

External tasks have a few restrictions:

1. They cannot be called synchronously. Eg. `external_task_func(org_id)` will fail with an exception as external tasks do not have an implementation in the producing application.
2. The `name` assigned to the external task **must** be the same as the task name registered in the application that will execute the task.
3. External tasks must be mocked within a testing context manager. Within a testing context manager, tasks become synchronous, and raise exceptions.

## [Testing Tasks](https://develop.sentry.dev/backend/application-domains/tasks.md#testing-tasks)

Tasks can be tested with a few different approaches. The first is with the `self.tasks()` or `TaskRunner` context manager. When these context managers are entered, tasks will be executed *synchronously* which allows you to validate the side-effects of your tasks and validate that parameters to your task are JSON compatible:

```python
def test_action_with_tasks(self):
    with self.tasks():
        self.client.get("/organizations/slug/do-thing/")
        # can make assertions on side-effects of tasks spawned by the endpoint.
```

Tasks can also be tested with `mock.patch` :

```python
@patch("sentry.hybridcloud.tasks.deliver_webhooks.drain_mailbox")
def test_schedule_task(self, mock_deliver: MagicMock) -> None:
    # Do work to trigger the task
    # Assert that the task was scheduled
    mock_deliver.delay.assert_called_with(webhook_one.id)
```

Mocking tasks will not validate that parameters are JSON compatible, nor will it catch TypeErrors from signature mismatches.

## [Task namespaces](https://develop.sentry.dev/backend/application-domains/tasks.md#task-namespaces)

Task namespaces are created as code, and configuration are linked to the namespace when it is declared.

```python
# in sentry.taskworker.namespaces
from sentry.taskworker.config import taskregistry
from sentry.taskworker.retry import LastAction, Retry

issues_tasks = taskregistry.create_namespace(
    "issues",
    retry=Retry(times=3, times_exceeded=LastAction.Discard)
)

# register tasks within a namespace
@instrumented_task(name="tasks.do_work", namespace=issues_tasks)
def do_work(**kwargs):
   ...
```

Namespaces can define default behaviour for `retry` , `processing_deadline` and `expires` for the tasks they contain. Without explicit routing, any namespace will be run in our `default` worker pools. If your task namespace will be high-throughput (more than 1500 tasks per second) consider provisioning a dedicated pool for your tasks.

## [Periodically Scheduled Tasks](https://develop.sentry.dev/backend/application-domains/tasks.md#periodically-scheduled-tasks)

Task can also be set to spawn on a periodic schedule. To accomplish this, simply update the `TASKWORKER_SCHEDULE` configuration found in `src/sentry/conf/server.py` with the appropriate namespace, task, and schedule. Taskworker supports `timedelta` and `crontab` schedule types:

```python
TASKWORKER_REGION_SCHEDULES: ScheduleConfigMap = {
    "send-beacon": {
        "task": "selfhosted:sentry.tasks.send_beacon",
        "schedule": task_crontab("0", "*/1", "*", "*", "*"),
    },
}
```

## Pages in this section

- [Terminology and Concepts](https://develop.sentry.dev/backend/application-domains/tasks/terminology-and-concepts.md)
- [Task Lifecycle](https://develop.sentry.dev/backend/application-domains/tasks/task-lifecycle.md)
- [Running workers](https://develop.sentry.dev/backend/application-domains/tasks/running-workers.md)
