Guides
Queues and Workers
Workers allow you to run asynchronous, background tasks that are not tied to HTTP request-response workflows. They are useful for workloads such as sending emails, processing uploaded files, performing inference tasks, or any work that may take longer than a client should wait for.
A Worker processes jobs placed onto a queue. You define one or more Worker instances in your project, and then enqueue tasks to be executed by those workers.
Defining a Worker
A Worker is created by instantiating the Worker class with a name. This name identifies the queue the worker consumes from.
from nobs.models import Worker
priority_queue = Worker("priority_queue")
Once defined, the worker must be included in the workers list on the project:
project = Project(
name="mlops-example",
workers=[priority_queue],
)
This ensures that the worker will be deployed and will begin processing queued tasks.
Defining the Work to Be Enqueued
Workers operate on typed request models. The request body should be defined using BaseModel (or dataclasses if preferred), and the function should accept that model type. The function may be synchronous or asynchronous.
from pydantic import BaseModel
class SendEmail(BaseModel):
email: str
content: str
async def send_email(request: SendEmail) -> None:
# Implementation
...
Queueing Work for a Worker
Once the worker is deployed and a function is defined, you can enqueue work by calling queue on the worker instance.
await priority_queue.queue(
send_email,
SendEmail(email="ola@nordmann.no", content="...")
)
This call places the task on the worker's queue. The worker will then pick up and execute the provided function using the provided data model.
- Key points when queueing tasks:
- The first argument is the function to execute.
- The second argument is the request payload instance.
- The function will run asynchronously in the worker environment.
- The caller does not wait for the task to complete.
When to Use Workers
Workers are appropriate when:
- A request should not block the caller.
- Work needs to be retried automatically on failure.
- Workloads involve external systems that may have latency (email, webhooks, cloud services).
- You need parallel processing independent of API traffic.
Examples include:
- Sending notifications
- Performing ML inference asynchronously
- Running post-processing stages
- Processing uploaded files
- Integrating with third-party services