Examples
Support Ticket Classification
This example demonstrates how to build a small support-ticket application using FastAPI together with background workers, illustrating how server-side rendering, database persistence, training an NLP model, and asynchronous background processing work together inside a single nobs project.
The application exposes a simple HTML interface where users can submit support messages. Each message is stored in a Postgres database. A background worker immediately receives a job to classify the ticket using a text-classification model. If the model does not yet exist, the worker trains it on a handful of sample messages, saves the model to disk, and then predicts the ticket category. The page refreshes to show the updated category as soon as the worker finishes.
Everything is orchestrated by nobs, including the app server, Postgres, SQS-style queues, workers, and all environment setup.
Project Configuration
The project is defined in project.py. It describes a FastAPI application, the background worker that handles ML tasks, and the secrets required to connect to Postgres and queueing infrastructure.
from nobs import Project, FastAPIApp
from nobs.secrets import SqsConfig
from src.settings import Settings
from src.workers import background
from src import app
project = Project(
name="support",
shared_secrets=[Settings, SqsConfig],
workers=[background],
app=FastAPIApp(app),
)
A single worker named background is included under workers, which means that all tasks queued to that worker will automatically be executed in a separate process, both locally and in the cloud.
Worker Definition
The background worker lives in src/workers.py:
from nobs import Worker
background = Worker("background")
This worker receives queued tasks such as training the classification model and predicting category labels.
The FastAPI Application
The server component resides in src/app.py. It renders a basic HTML form where the user can submit a message, and it shows a table of existing tickets and their predicted categories.
import logging
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi.responses import RedirectResponse
from sqlmodel import SQLModel, select
from sqlmodel.ext.asyncio.session import AsyncSession
from fastapi import Depends, FastAPI, Form, Response
from starlette.responses import HTMLResponse
from src.models import Ticket, engine, session
from src.workers import background
from src.predict import PredictArgs, predict
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
logging.basicConfig(level=logging.INFO)
logger.info("Creating db models")
async with engine().begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
logger.info("Created all models")
yield
app = FastAPI(lifespan=lifespan)
The index route fetches all tickets and displays them along with a form for submitting new ones.
@app.get("/")
async def index(
session: Annotated[AsyncSession, Depends(session)]
) -> HTMLResponse:
tickets = await session.exec(select(Ticket))
table = ticket_table(list(tickets.all()))
return HTMLResponse(
content=f"""
<!doctype html>
<title>Submit Ticket</title>
<h1>Submit a Suport Ticket</h1>
<form method="post">
Email: <input type="email" name="email" required><br>
Message: <textarea name="message" required></textarea><br>
<input type="submit" value="Submit">
</form>
{table}
"""
)
When a new ticket is submitted, it is inserted into the database and immediately handed off to the worker for prediction.
@app.post("/")
async def create(
email: Annotated[str, Form()],
message: Annotated[str, Form()],
session: Annotated[AsyncSession, Depends(session)]
) -> Response:
model = Ticket(email=email, message=message)
session.add(model)
await session.commit()
await session.refresh(model)
await background.queue(predict, PredictArgs(ticket_id=model.id))
return RedirectResponse(url="/", status_code=301)
A manual trigger endpoint is also provided for retraining the model:
@app.post("/train")
async def train_model() -> None:
from src.train import train, TrainArgs
await background.queue(train, TrainArgs())
The table of tickets is rendered with a simple HTML helper:
def ticket_table(tickets: list[Ticket]) -> str:
def ticket_row(ticket: Ticket) -> str:
return f"""
<tr>
<td>{ticket.id}</td>
<td>{ticket.email}</td>
<td>{ticket.message}</td>
<td>{ticket.category}</td>
</tr>
"""
rows = "<tr>" + "</tr><tr>".join([ticket_row(t) for t in tickets]) + "</tr>"
return f"""
<table border="1">
<tr>
<th>ID</th>
<th>Email</th>
<th<Message</th>
<th>Category</th>
</tr>
{rows}
</table>
"""
Database Configuration
The database layer is defined in src/models.py, powered by SQLModel and using Postgres via async engines.
from uuid import uuid4, UUID
from sqlmodel import SQLModel, Field
from src.settings import Settings
from functools import lru_cache
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlmodel.ext.asyncio.session import AsyncSession
@lru_cache
def engine() -> AsyncEngine:
settings = Settings() # type: ignore
return create_async_engine(settings.psql_url.encoded_string())
async def session():
async with AsyncSession(engine(), expire_on_commit=False) as session:
yield session
class Ticket(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
email: str
message: str
category: str | None = Field(default=None)
The Postgres connection URL is provided via environment variables, defined inside src/settings.py.
from pydantic import PostgresDsn
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
psql_url: PostgresDsn
Model Training
The training function in src/train.py builds a simple text classifier using scikit-learn and saves the pipeline to disk.
import pickle
import logging
from pathlib import Path
from pydantic import BaseModel, Field
from sklearn.naive_bayes import MultinomialNB
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
logger = logging.getLogger(__name__)
class CategoryTypes:
technical = "technical"
billing = "billing"
general = "general"
class TrainArgs(BaseModel):
model_file: str = Field(default="classifier.pkl")
async def train(args: TrainArgs) -> None:
texts = [
("My payment failed", CategoryTypes.billing),
("The website is down", CategoryTypes.technical),
("I can't log in to my account", CategoryTypes.technical),
("How do I update my billing info?", CategoryTypes.billing),
("What are your business hours?", CategoryTypes.general),
("How do I reset my password?", CategoryTypes.technical),
]
pipeline = Pipeline([
("tfidf", TfidfVectorizer()),
("model", MultinomialNB())
])
logger.info("Running train")
pipeline.fit(
[text for text, _ in texts],
[label for _, label in texts],
)
Path(args.model_file).write_bytes(pickle.dumps(pipeline))
Prediction Worker
The prediction step loads the trained model, runs a classification on the ticket message, and stores the resulting category back into Postgres.
import pickle
from pathlib import Path
from contextlib import asynccontextmanager
import logging
from uuid import UUID
from pydantic import BaseModel, Field
from sqlmodel import select
from src.models import Ticket, session
from src.train import train, TrainArgs
logger = logging.getLogger(__name__)
class PredictArgs(BaseModel):
ticket_id: UUID
model_file: str = Field(default="classifier.pkl")
async def predict(args: PredictArgs) -> None:
model_path = Path(args.model_file)
if not model_path.is_file():
await train(TrainArgs(model_file=args.model_file))
model = pickle.loads(model_path.read_bytes())
async with asynccontextmanager(session)() as sess:
res = await sess.exec(
select(Ticket).where(Ticket.id == args.ticket_id)
)
ticket = res.first()
assert ticket
logger.info(f"Running predict for {ticket}")
label = model.predict([ticket.message])[0]
ticket.category = label
logger.info(f"Predicted '{label}' as the category")
sess.add(ticket)
await sess.commit()
Running Locally
To develop the support-ticket system locally, start by creating a new project folder using uv:
uv init my-support-project
Add nobs to the project environment:
uv add nobs
Place the project file (project.py), the FastAPI application, the workers, training logic, prediction logic, and database models into a src/ directory as shown above.
Once the code is in place, start your local environment with:
nobs up
The command automatically provisions Postgres, a local queueing system via LocalStack, and any workers and application servers. All compute limits configured in the project file are enforced locally as they would be in the cloud.
A typical startup looks like this, including service creation, live logs, and worker initialization:
INFO:nobs.cli:Updating source code from /Users/.../support-app/src
INFO:nobs.docker:Creating resource 'psql'
INFO:nobs.docker:Creating resource 'infra'
INFO:nobs.docker:Creating app 'app'
INFO:nobs.docker:Creating worker 'background'
INFO:nobs.docker:Available Containers:
INFO:nobs.docker:Container psql (...) is accessible at http://localhost:5432
INFO:nobs.docker:Container infra (...) is accessible at many AWS-compatible endpoints
INFO:nobs.docker:Container app (...) is accessible at http://localhost:8000
INFO:nobs.docker:Container background (...) is running in the background
[psql] PostgreSQL Database directory appears to contain a database; Skipping initialization
[app] INFO: Uvicorn running on http://0.0.0.0:8000
[background] INFO:nobs.cli:Ready to receive work at queue 'background'
Opening http://localhost:8000 in a browser displays the ticket submission form together with the table of tickets. Submitting a new ticket triggers a prediction job on the worker, and the table updates after the background task completes.