FastAPI Tutorial Part 11: Background Tasks and Celery
Handle long-running operations in FastAPI. Learn built-in BackgroundTasks, Celery integration, task queues, and async processing patterns.
Moshiour Rahman
Advertisement
Why Background Tasks?
Some operations shouldn’t block API responses:
- Sending emails
- Processing files
- Generating reports
- External API calls
- Data synchronization
FastAPI BackgroundTasks
Basic Usage
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def write_log(message: str):
with open("log.txt", "a") as f:
f.write(f"{message}\n")
@app.post("/send-notification")
async def send_notification(
email: str,
background_tasks: BackgroundTasks
):
background_tasks.add_task(write_log, f"Notification sent to {email}")
return {"message": "Notification scheduled"}
Multiple Tasks
def send_email(email: str, message: str):
# Simulate email sending
import time
time.sleep(2)
print(f"Email sent to {email}: {message}")
def update_analytics(event: str):
print(f"Analytics updated: {event}")
@app.post("/users")
async def create_user(
user: UserCreate,
background_tasks: BackgroundTasks
):
new_user = save_user(user)
# Queue multiple background tasks
background_tasks.add_task(send_email, user.email, "Welcome!")
background_tasks.add_task(update_analytics, "user_created")
return new_user
With Dependencies
def get_email_service():
return EmailService()
@app.post("/orders/{order_id}/confirm")
async def confirm_order(
order_id: int,
background_tasks: BackgroundTasks,
email_service: EmailService = Depends(get_email_service)
):
order = get_order(order_id)
background_tasks.add_task(
email_service.send_confirmation,
order.customer_email,
order_id
)
return {"status": "confirmed"}
Celery Integration
Setup
pip install celery redis
Celery Configuration
# app/celery_app.py
from celery import Celery
celery_app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0"
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=300, # 5 minutes
worker_prefetch_multiplier=1,
)
Define Tasks
# app/tasks.py
from .celery_app import celery_app
import time
@celery_app.task(bind=True, max_retries=3)
def send_email_task(self, to: str, subject: str, body: str):
try:
# Email sending logic
send_email(to, subject, body)
return {"status": "sent", "to": to}
except Exception as exc:
self.retry(exc=exc, countdown=60)
@celery_app.task
def process_file_task(file_path: str):
# Long-running file processing
time.sleep(10)
return {"processed": file_path}
@celery_app.task
def generate_report_task(report_type: str, params: dict):
# Generate report
report = create_report(report_type, params)
save_report(report)
return {"report_id": report.id}
Use in FastAPI
from fastapi import FastAPI
from .tasks import send_email_task, process_file_task
app = FastAPI()
@app.post("/send-email")
def queue_email(email: EmailSchema):
task = send_email_task.delay(
email.to,
email.subject,
email.body
)
return {"task_id": task.id, "status": "queued"}
@app.get("/tasks/{task_id}")
def get_task_status(task_id: str):
task = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}
Running Celery
# Start Redis
redis-server
# Start Celery worker
celery -A app.celery_app worker --loglevel=info
# Start Celery beat (for scheduled tasks)
celery -A app.celery_app beat --loglevel=info
Scheduled Tasks
# app/celery_app.py
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"cleanup-every-hour": {
"task": "app.tasks.cleanup_old_files",
"schedule": crontab(minute=0), # Every hour
},
"daily-report": {
"task": "app.tasks.generate_daily_report",
"schedule": crontab(hour=6, minute=0), # 6 AM daily
},
"sync-every-5-minutes": {
"task": "app.tasks.sync_external_data",
"schedule": 300.0, # Every 5 minutes
}
}
# app/tasks.py
@celery_app.task
def cleanup_old_files():
delete_files_older_than(days=7)
@celery_app.task
def generate_daily_report():
report = create_daily_summary()
send_report_email(report)
Task Chaining
from celery import chain, group, chord
# Sequential tasks
@app.post("/process-order")
def process_order(order_id: int):
workflow = chain(
validate_order.s(order_id),
process_payment.s(),
send_confirmation.s()
)
result = workflow.apply_async()
return {"workflow_id": result.id}
# Parallel tasks
@app.post("/generate-reports")
def generate_all_reports():
tasks = group(
generate_report.s("sales"),
generate_report.s("inventory"),
generate_report.s("customers")
)
result = tasks.apply_async()
return {"group_id": result.id}
Complete Example
# app/main.py
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Optional
from .tasks import send_email_task, process_file_task
from .celery_app import celery_app
app = FastAPI()
class EmailRequest(BaseModel):
to: str
subject: str
body: str
class TaskResponse(BaseModel):
task_id: str
status: str
class TaskStatus(BaseModel):
task_id: str
status: str
result: Optional[dict] = None
error: Optional[str] = None
# Simple background task
@app.post("/notify")
async def send_notification(
message: str,
background_tasks: BackgroundTasks
):
def log_notification():
with open("notifications.log", "a") as f:
f.write(f"{message}\n")
background_tasks.add_task(log_notification)
return {"status": "scheduled"}
# Celery task
@app.post("/email", response_model=TaskResponse)
def queue_email(email: EmailRequest):
task = send_email_task.delay(
email.to,
email.subject,
email.body
)
return TaskResponse(task_id=task.id, status="queued")
@app.post("/process-file", response_model=TaskResponse)
def queue_file_processing(file_path: str):
task = process_file_task.delay(file_path)
return TaskResponse(task_id=task.id, status="queued")
@app.get("/tasks/{task_id}", response_model=TaskStatus)
def get_task_status(task_id: str):
task = celery_app.AsyncResult(task_id)
response = TaskStatus(
task_id=task_id,
status=task.status
)
if task.ready():
if task.successful():
response.result = task.result
else:
response.error = str(task.result)
return response
@app.delete("/tasks/{task_id}")
def cancel_task(task_id: str):
celery_app.control.revoke(task_id, terminate=True)
return {"status": "cancelled"}
Summary
| Method | Use Case | Complexity |
|---|---|---|
| BackgroundTasks | Simple, short tasks | Low |
| Celery | Complex, distributed tasks | High |
| asyncio | I/O-bound async operations | Medium |
| Celery Feature | Description |
|---|---|
| Task queue | Distributed task execution |
| Retry logic | Automatic task retries |
| Scheduling | Periodic task execution |
| Chaining | Task workflows |
Next Steps
In Part 12, we’ll explore WebSockets - building real-time bidirectional communication.
Series Navigation:
- Part 1-10: Foundations & Security
- Part 11: Background Tasks (You are here)
- Part 12: WebSockets
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 14: File Uploads and Storage
Handle file uploads in FastAPI. Learn form data, file validation, cloud storage integration with S3, and serving static files.
PythonFastAPI Tutorial Part 7: CRUD Operations - Build a Complete REST API
Build production-ready CRUD APIs with FastAPI. Learn RESTful patterns, pagination, filtering, bulk operations, and best practices for real-world applications.
PythonFastAPI Tutorial Part 5: Dependency Injection - Share Logic Across Endpoints
Master FastAPI dependency injection for clean, reusable code. Learn database sessions, authentication, pagination, and complex dependency chains.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.