Defining tasks, brokers, workers, and backends in Celery

Defining tasks, brokers, workers, and backends in Celery


Celery can be daunting to learn. While its documentation is comprehensive, it has a tendency to skip the basics.

This post will define four of the main concepts in Celery, discuss the relationship between Celery and Kombu, and use a few code examples to illustrate how Celery might be useful in real applications. The examples will use the Django web framework and its @shared_task decorator, but the concepts are also applicable to Flask, FastAPI, and others.



Tasks, Brokers, Workers, and Backends

You’ll be hard pressed to find a place on the current Celery documentation that clearly spells out what it considers a broker or backend, but with enough digging you can find and infer definitions.

Below are concepts you should know before getting started with Celery.



Task

A task is some piece of work that Celery will perform asynchronously (in this context, that’s a fancy word for “not immediately”). In a web application, one task might be sending an email after a user submits a form. Sending an email can be a multi-second operation, and forcing a user to wait for an email to send before redirecting can make an application feel slow.

Tasks are defined using decorators in Celery. Below we use the @shared_task decorator to turn send_thank_you_email() into a Celery task that can be used in the submit_feedback() form submission handler.

from config.celery import shared_task
from django.core.mail import send_mail
from django.shortcuts import render, redirect
from feedback.forms import FeedbackForm

@shared_task
def send_thank_you_email(email_address):
    send_mail(
        "Thank you for your feedback!",
        "We appreciate your input.",
        "noreply@example.com",
        [email_address],
    )

def submit_feedback(request):
    if request.method == "POST":
        form = FeedbackForm(request.POST)
        if form.is_valid():
            form.save()

            # Push the task to the broker using the delay() method.
            send_thank_you_email.delay(form.cleaned_data["email"])

            return redirect("/thank-you/")
    else:
        form = FeedbackForm()

    return render(request, "feedback.html", {"form": form})
Enter fullscreen mode

Exit fullscreen mode

When a task is defined using a decorator in Celery, it adds a delay() method to the task. You can see the send_thank_you_email task calling the delay() method in the example above after the form is successfully saved. When delay() is called, it will send the send_thank_you_email task and its data to the broker where it is stored and will later be executed by a worker, at which point the user will be emailed.

The benefit of pushing work to Celery becomes more obvious if you need to send additional emails after saving the form. For example, you may want to email the customer support team that they have received new feedback. With Celery, this adds almost no additional time to the response.

Celery tasks also allow additional advanced configuration. In the event an email failed to send, you can code your task to automatically retry and configure settings like max_retries, retry_backoff, retry_jitter, etc.



Broker

The Celery Enhancement Proposals’ Glossary has the following to say about Message Brokers:

Enterprise Integration Patterns defines a Message Broker as an architectural building block that can receive messages from multiple destinations, determine the correct destination and route the message to the correct channel.

For our purposes with Celery, we’ll consider a broker a “message transport” where created tasks are stored. Brokers don’t actually execute the task: that is the job of a worker. Brokers are instead the place where scheduled tasks are stored to when a task is scheduled, and pulled from when a worker eventually executes a task. A broker is a required component for Celery to work, and Celery will connect to exactly one broker.

Celery’s Backends and Brokers page lists some if its supported brokers, and there are other experimental brokers it supports that are not listed (such as SQLAlchemy). These brokers (or “message transports”) are managed by a Celery-maintained Python library for message transports called Kombu. When looking for information about configuring brokers, it is sometimes helpful to consult with Kombu’s documentation rather that Celery’s.

Some brokers have advanced features like task fanout and priority, while others operate as simple queues.



Worker

A worker is an instance of Celery that pulls tasks from the broker and executes the task functions defined in your Python app. Celery is able to run Python code in its workers because Celery itself is written in Python.

Many workers can run simultaneously to execute tasks. When you run the celery worker command, it will spin up a worker for every core of your computer by default. If your computer has 16 cores, running celery worker will start 16 workers.

If no workers are running, messages (tasks) will accumulate in the broker until workers are available to execute them.



Backend

The tasks page in the Celery User Guide has the following to say about backends:

If you want to keep track of tasks or need the return values, then Celery must store or send the states somewhere so that they can be retrieved later. There are several built-in result backends to choose from: SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid (rpc), and Redis – or you can define your own.

TLDR: a backend tracks the outcomes and returned results of async tasks. What does that actually mean, and when could it be useful?

Imagine you are building an accounting app in Django that can generate an annual report. The report could take minutes to generate.

To give your users a more responsive experience, you use an AJAX request to kick off a report generation task. That request returns an ID of the task, which it can use to poll the server every few seconds to see if the report is generated. Once the task is complete, it will return the ID of the report, which the client can use to display a link to the report via JavaScript.

We can implement this with Celery and Django using the following code:

from celery import shared_task
from django.http import JsonResponse
from django.views.decorators.http import require_http_methods
from accounting.models import Asset
from accounting.reports import AnnualReportGenerator

@shared_task
def generate_report_task(year):
    # This could take minutes...
    report = AnnualReportGenerator().generate(year)
    asset = Asset.objects.create(
        name=f"{year} annual report",
        url=report.url,
    )
    return asset.id

@require_http_methods(["POST"])
def generate_annual_report_view(request):
    year = request.POST.get("year")
    task = generate_report_task.delay(year)
    return JsonResponse({"taskId": task.id})

def get_annual_report_generation_status_view(request, task_id):
    task = generate_report_task.AsyncResult(task_id)

    # The status is typically "PENDING", "SUCCESS", or "FAILURE"
    status = task.status
    return JsonResponse({"status": status, "assetId": task.result})
Enter fullscreen mode

Exit fullscreen mode

In this example, the asset ID returned by generate_report_task() is stored in a backend. The backend stores the outcome and returned result. The backend does not store the status of yet-to-be-processed tasks: these will only be added once there has been an outcome. A task that returns "PENDING" has a completely unknown status: an associated task might not even exist. Tasks will typically return "SUCCESS" or "FAILURE", but you can see all statuses in the Celery status docs.

Having a backend is not required for Celery to run tasks. However, it is required if you ever need to check the outcome of a task or return a task’s result. If you try to check a task’s status when Celery does not have a backend configured, an exception will be raised.


I hope this post helps you understand the individual pieces of Celery and why you might consider using it. While the official documentation is challenging to grok, learning Celery deeply can unlock new possibilities within your Python applications.



Source link
lol

By stp2y

Leave a Reply

Your email address will not be published. Required fields are marked *

No widgets found. Go to Widget page and add the widget in Offcanvas Sidebar Widget Area.