Asynchronous Tasks
Sentry includes a framework for scheduling and executing tasks that are executed asynchronously in a background worker process. The task framework within Sentry is inspired by Celery as it was originally built with celery.
Sentry tasks are configured with the instrumented_task
decorator that includes features like automatic tracing and metric collection, and multi-region silo enforcement.
from sentry.tasks.base import instrumented_task
from sentry.taskworker.namespaces import issues_tasks
from sentry.taskworker.retry import Retry
@instrumented_task(
name="sentry.widgets.tasks.do_work",
namespace=issues_tasks,
retry=Retry(times=3, on=(ConnectionError,)),
processing_deadline_duration=60
)
def do_work(organization_id: int, issue_id: int, **kwargs) -> None:
...
When defining tasks there are some constraints:
All tasks must have names. The name of task is serialized into an 'activation' message that is persisted in Kafka. Task names must be stable between deployments to avoid lost tasks.
Task parameters must be JSON serializable. You cannot pass arbitrary python objects through task parameters.
Tasks must define a 'processing deadline'. After a task's processing deadline has elapsed, it will be killed by the worker runtime. Tasks that are killed for execution duration are not automatically retried.
All tasks must be assigned to a 'namespace'. A namespace is a group of related tasks that are operated together and share a backlog.
The return value of a task is not stored and ignored by workers.
The module containing a task must be added to
TASKWORKER_IMPORTS
insrc/sentry/conf/server.py
With our task defined we can schedule a task (also called an "activation"):
from sentry.widgets.tasks import do_work
# Can call the task synchronously like a normal function
do_work(organization_id=org.id, issue_id=issue.id)
# Use .delay() to schedule a task to run in the future as soon as possible
do_work.delay(organization_id=org.id, issue_id=issue.id)
# Use .apply_async() when you need to define headers, countdown, or expires
# for your task. Here we schedule a task to run in 5 minutes (300 seconds)
do_work.apply_async(
kwargs={"organization_id": org.id, "issue_id": issue.id},
countdown=300
)
When tasks are executed, the parameter payload is deserialized, and the task function is called. Tasks are successfully completed if they don't raise an error. If an error is raised from a task, or the task's deadline expires, the task is considered a failure and needs to be retried, put into a dead-letter queue or dropped depending on the task and failure.
When defining tasks, you can define a retry policy with the retry
parameter. When a worker executes an activation with a retry policy, any non-successful outcome will result in the retry policy being evaluated. If the task has retries remaining, and the captured error is a retriable error, the worker sends a status of retry to the worker's broker. The taskbroker will take care of marking the current activation as complete and producing a new activation to be processed later.
If a task does not define a retry policy the retry policy of the task namespace is inherited.
@instrumented_task(
name="sentry.issues.tasks.deliver_issue_webhook",
namespace=issues_tasks,
retry=Retry(times=3, times_exceeded=LastAction.Deadletter),
)
def deliver_issue_webhook(organization_id: int, group_id: int) -> None:
...
Retries can be conditional based on the exception type:
retry=Retry(on=(IntegrationError,), times=3, times_exceeded=LastAction.Discard)
By default retries will be executed as soon as they are consumed. If a task needs to stagger retries, it can use a delayed retry.
@instrumented_task(
name="sentry.integrations.fetch_commits",
namespace=issues_tasks,
retry=Retry(times=3, on=(IntegrationError, ), delay=30)
)
def fetch_commits(repository_id: int) -> None:
...
With the above configuration, each retry will be processed at least 30 seconds after the previous attempt. The delay between retries could be longer than 30 seconds, but won’t be shorter.
Every task has a 'processing deadline' which is the maximum expected runtime for a task. If a task does not define a processing deadline, it will inherit the deadline defined on the task's namespace or use the default of 10 seconds. Task deadlines are intended to be generous and are intended to prevent workers being saturated by tasks running for unbounded amounts of time.
@instrumented_task(
name="sentry.integrations.fetch_commits",
namespace=issues_tasks,
# Extended from the default 10
processing_deadline_duration=60
)
def fetch_commits(repository_id: int) -> None:
...
After this a task has run for the length of its processing deadline, it will be interrupted by SIGALRM
which raises a ProcessingDeadlineExceeded
error which will interrupt your task’s logic.
In most scenarios the simplest solution is to extend the deadline for a task. This is the recommended solution until you get above 20min of duration. After this duration the chances of your task being terminated by a deploy increase quickly. Instead of extending the deadline further, you should rethink your logic and partition the workload into smaller batches, or individual jobs that can be processed independently. Instead of mapping all projects in a single task, spawn multiple tasks.
A task's expiration time defines a point in time after which a task is considered expired and should not be executed. This mechanism allows tasks to be skipped if they are stale and their results are no longer relevant.
@instrumented_tasks(
name="sentry.issues.tasks.deliver_issue_webhook",
namespace=issues_tasks,
expires=timedelta("5 minutes"),
)
def deliver_issue_webhook(organization_id: int, group_id: int):
...
Expiration times can be expressed as timedelta
objects or a number of seconds. Tasks that are past their expiration will not be sent to workers. Instead they will be discarded or dead-lettered depending on task configuration.
Tasks can be scheduled to be run up to an hour in the future with the countdown
parameter.
deliver_issue_webhook.apply_async((countdown = timedelta((minutes = 10))));
Countdown tasks will be processed and retained by taskbroker until their countdown has elapsed. Once the countdown delay has elapsed the task will be made available for workers.
Tasks are processed with at-least-once guarantees. A task may be attempted multiple times if processing deadlines are exceeded. To prevent multiple executions, tasks can enable at_most_once
which enables at-most-once execution.
@instrumented_task(
name="sentry.issues.tasks.deliver_issue_webhook",
namespace=issues_tasks,
at_most_once=True,
)
def deliver_issue_webhook(organization_id: int, group_id: int) -> None:
...
If an idempotent task exceeds a processing deadline, it will not be retried.
Tasks can be tested with a few different approaches. The first is with the self.tasks()
or TaskRunner
context manager. When these context managers are entered, tasks will be executed synchronously which allows you to validate the side-effects of your tasks and validate that parameters to your task are JSON compatible:
def test_action_with_tasks(self):
with self.tasks():
self.client.get("/organizations/slug/do-thing/")
# can make assertions on side-effects of tasks spawned by the endpoint.
Tasks can also be tested with mock.patch
:
@patch("sentry.hybridcloud.tasks.deliver_webhooks.drain_mailbox")
def test_schedule_task(self, mock_deliver: MagicMock) -> None:
# Do work to trigger the task
# Assert that the task was scheduled
mock_deliver.delay.assert_called_with(webhook_one.id)
Mocking tasks will not validate that parameters are JSON compatible, nor will it catch TypeErrors from signature mismatches.
Task namespaces are created as code, and configuration are linked to the namespace when it is declared.
# in sentry.taskworker.namespaces
from sentry.taskworker.config import taskregistry
from sentry.taskworker.retry import LastAction, Retry
issues_tasks = taskregistry.create_namespace(
"issues",
retry=Retry(times=3, times_exceeded=LastAction.Discard)
)
# register tasks within a namespace
@instrumented_task(name="tasks.do_work", namespace=issues_tasks)
def do_work(**kwargs):
...
Namespaces can define default behaviour for retry
, processing_deadline
and expires
for the tasks they contain. Without explicit routing, any namespace will be run in our default
worker pools. If your task namespace will be high-throughput (more than 1500 tasks per second) consider provisioning a dedicated pool for your tasks.
Task can also be set to spawn on a periodic schedule. To accomplish this, simply update the TASKWORKER_SCHEDULE
configuration found in src/sentry/conf/server.py
with the appropriate namespace, task, and schedule. Taskworker supports timedelta
and crontab
schedule types:
TASKWORKER_REGION_SCHEDULES: ScheduleConfigMap = {
"send-beacon": {
"task": "selfhosted:sentry.tasks.send_beacon",
"schedule": task_crontab("0", "*/1", "*", "*", "*"),
},
}
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").