Python 4 min read

FastAPI Tutorial Part 11: Background Tasks and Celery

Handle long-running operations in FastAPI. Learn built-in BackgroundTasks, Celery integration, task queues, and async processing patterns.

MR

Moshiour Rahman

Advertisement

Why Background Tasks?

Some operations shouldn’t block API responses:

  • Sending emails
  • Processing files
  • Generating reports
  • External API calls
  • Data synchronization

FastAPI BackgroundTasks

Basic Usage

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def write_log(message: str):
    with open("log.txt", "a") as f:
        f.write(f"{message}\n")

@app.post("/send-notification")
async def send_notification(
    email: str,
    background_tasks: BackgroundTasks
):
    background_tasks.add_task(write_log, f"Notification sent to {email}")
    return {"message": "Notification scheduled"}

Multiple Tasks

def send_email(email: str, message: str):
    # Simulate email sending
    import time
    time.sleep(2)
    print(f"Email sent to {email}: {message}")

def update_analytics(event: str):
    print(f"Analytics updated: {event}")

@app.post("/users")
async def create_user(
    user: UserCreate,
    background_tasks: BackgroundTasks
):
    new_user = save_user(user)

    # Queue multiple background tasks
    background_tasks.add_task(send_email, user.email, "Welcome!")
    background_tasks.add_task(update_analytics, "user_created")

    return new_user

With Dependencies

def get_email_service():
    return EmailService()

@app.post("/orders/{order_id}/confirm")
async def confirm_order(
    order_id: int,
    background_tasks: BackgroundTasks,
    email_service: EmailService = Depends(get_email_service)
):
    order = get_order(order_id)

    background_tasks.add_task(
        email_service.send_confirmation,
        order.customer_email,
        order_id
    )

    return {"status": "confirmed"}

Celery Integration

Setup

pip install celery redis

Celery Configuration

# app/celery_app.py
from celery import Celery

celery_app = Celery(
    "tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0"
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,  # 5 minutes
    worker_prefetch_multiplier=1,
)

Define Tasks

# app/tasks.py
from .celery_app import celery_app
import time

@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, to: str, subject: str, body: str):
    try:
        # Email sending logic
        send_email(to, subject, body)
        return {"status": "sent", "to": to}
    except Exception as exc:
        self.retry(exc=exc, countdown=60)

@celery_app.task
def process_file_task(file_path: str):
    # Long-running file processing
    time.sleep(10)
    return {"processed": file_path}

@celery_app.task
def generate_report_task(report_type: str, params: dict):
    # Generate report
    report = create_report(report_type, params)
    save_report(report)
    return {"report_id": report.id}

Use in FastAPI

from fastapi import FastAPI
from .tasks import send_email_task, process_file_task

app = FastAPI()

@app.post("/send-email")
def queue_email(email: EmailSchema):
    task = send_email_task.delay(
        email.to,
        email.subject,
        email.body
    )
    return {"task_id": task.id, "status": "queued"}

@app.get("/tasks/{task_id}")
def get_task_status(task_id: str):
    task = celery_app.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": task.status,
        "result": task.result if task.ready() else None
    }

Running Celery

# Start Redis
redis-server

# Start Celery worker
celery -A app.celery_app worker --loglevel=info

# Start Celery beat (for scheduled tasks)
celery -A app.celery_app beat --loglevel=info

Scheduled Tasks

# app/celery_app.py
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "cleanup-every-hour": {
        "task": "app.tasks.cleanup_old_files",
        "schedule": crontab(minute=0),  # Every hour
    },
    "daily-report": {
        "task": "app.tasks.generate_daily_report",
        "schedule": crontab(hour=6, minute=0),  # 6 AM daily
    },
    "sync-every-5-minutes": {
        "task": "app.tasks.sync_external_data",
        "schedule": 300.0,  # Every 5 minutes
    }
}

# app/tasks.py
@celery_app.task
def cleanup_old_files():
    delete_files_older_than(days=7)

@celery_app.task
def generate_daily_report():
    report = create_daily_summary()
    send_report_email(report)

Task Chaining

from celery import chain, group, chord

# Sequential tasks
@app.post("/process-order")
def process_order(order_id: int):
    workflow = chain(
        validate_order.s(order_id),
        process_payment.s(),
        send_confirmation.s()
    )
    result = workflow.apply_async()
    return {"workflow_id": result.id}

# Parallel tasks
@app.post("/generate-reports")
def generate_all_reports():
    tasks = group(
        generate_report.s("sales"),
        generate_report.s("inventory"),
        generate_report.s("customers")
    )
    result = tasks.apply_async()
    return {"group_id": result.id}

Complete Example

# app/main.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Optional
from .tasks import send_email_task, process_file_task
from .celery_app import celery_app

app = FastAPI()

class EmailRequest(BaseModel):
    to: str
    subject: str
    body: str

class TaskResponse(BaseModel):
    task_id: str
    status: str

class TaskStatus(BaseModel):
    task_id: str
    status: str
    result: Optional[dict] = None
    error: Optional[str] = None

# Simple background task
@app.post("/notify")
async def send_notification(
    message: str,
    background_tasks: BackgroundTasks
):
    def log_notification():
        with open("notifications.log", "a") as f:
            f.write(f"{message}\n")

    background_tasks.add_task(log_notification)
    return {"status": "scheduled"}

# Celery task
@app.post("/email", response_model=TaskResponse)
def queue_email(email: EmailRequest):
    task = send_email_task.delay(
        email.to,
        email.subject,
        email.body
    )
    return TaskResponse(task_id=task.id, status="queued")

@app.post("/process-file", response_model=TaskResponse)
def queue_file_processing(file_path: str):
    task = process_file_task.delay(file_path)
    return TaskResponse(task_id=task.id, status="queued")

@app.get("/tasks/{task_id}", response_model=TaskStatus)
def get_task_status(task_id: str):
    task = celery_app.AsyncResult(task_id)

    response = TaskStatus(
        task_id=task_id,
        status=task.status
    )

    if task.ready():
        if task.successful():
            response.result = task.result
        else:
            response.error = str(task.result)

    return response

@app.delete("/tasks/{task_id}")
def cancel_task(task_id: str):
    celery_app.control.revoke(task_id, terminate=True)
    return {"status": "cancelled"}

Summary

MethodUse CaseComplexity
BackgroundTasksSimple, short tasksLow
CeleryComplex, distributed tasksHigh
asyncioI/O-bound async operationsMedium
Celery FeatureDescription
Task queueDistributed task execution
Retry logicAutomatic task retries
SchedulingPeriodic task execution
ChainingTask workflows

Next Steps

In Part 12, we’ll explore WebSockets - building real-time bidirectional communication.

Series Navigation:

  • Part 1-10: Foundations & Security
  • Part 11: Background Tasks (You are here)
  • Part 12: WebSockets

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.