Celery Task Queue: Complete Python Background Jobs Guide
Master Celery for Python background task processing. Learn task queues, scheduling, monitoring, error handling, and build scalable async workflows.
Moshiour Rahman
Advertisement
What is Celery?
Celery is a distributed task queue for Python that processes tasks asynchronously. It’s perfect for handling time-consuming operations like sending emails, processing images, or running ML models.
Use Cases
| Task Type | Examples |
|---|---|
| Sending notifications | |
| Processing | Image/video conversion |
| API Calls | Third-party integrations |
| Reports | PDF generation |
| ML | Model inference |
Getting Started
Installation
pip install celery[redis]
pip install redis
Basic Setup
# celery_app.py
from celery import Celery
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# Configuration
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=30 * 60, # 30 minutes
worker_prefetch_multiplier=1,
)
Define Tasks
# tasks.py
from celery_app import app
import time
@app.task
def add(x, y):
return x + y
@app.task(bind=True)
def long_task(self, duration):
"""Task with progress updates."""
for i in range(duration):
time.sleep(1)
self.update_state(
state='PROGRESS',
meta={'current': i + 1, 'total': duration}
)
return {'status': 'completed', 'duration': duration}
@app.task(name='send_email')
def send_email_task(to, subject, body):
"""Named task for sending emails."""
# Email sending logic
print(f"Sending email to {to}: {subject}")
return {'sent': True, 'to': to}
Run Worker
# Start worker
celery -A celery_app worker --loglevel=info
# With concurrency
celery -A celery_app worker --loglevel=info --concurrency=4
# Specific queues
celery -A celery_app worker -Q high,default,low --loglevel=info
Task Execution
Calling Tasks
from tasks import add, long_task, send_email_task
# Synchronous call (blocking)
result = add(4, 4) # Returns 8
# Asynchronous call
result = add.delay(4, 4) # Returns AsyncResult
print(result.id) # Task ID
# With apply_async (more options)
result = add.apply_async(args=[4, 4], countdown=10) # Delay 10 seconds
result = add.apply_async(args=[4, 4], eta=datetime(2024, 12, 1, 10, 0))
# Queue routing
result = send_email_task.apply_async(
args=['user@example.com', 'Hello', 'Body'],
queue='high_priority'
)
# Get result
result = add.delay(4, 4)
result.ready() # True if completed
result.successful() # True if succeeded
result.get(timeout=10) # Wait for result (blocking)
result.get(propagate=False) # Don't raise exceptions
Task States
from celery.result import AsyncResult
def check_task_status(task_id):
result = AsyncResult(task_id)
status = result.state
if status == 'PENDING':
return {'state': 'pending', 'status': 'Task is waiting'}
elif status == 'STARTED':
return {'state': 'started', 'status': 'Task has started'}
elif status == 'PROGRESS':
return {
'state': 'progress',
'current': result.info.get('current', 0),
'total': result.info.get('total', 1)
}
elif status == 'SUCCESS':
return {'state': 'success', 'result': result.result}
elif status == 'FAILURE':
return {'state': 'failure', 'error': str(result.info)}
elif status == 'REVOKED':
return {'state': 'revoked', 'status': 'Task was cancelled'}
return {'state': status}
Task Configuration
Retry Logic
from celery import app
from celery.exceptions import Retry
import requests
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(requests.RequestException,),
retry_backoff=True,
retry_backoff_max=600,
retry_jitter=True
)
def fetch_url(self, url):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as exc:
raise self.retry(exc=exc)
# Custom retry logic
@app.task(bind=True, max_retries=5)
def process_payment(self, payment_id):
try:
# Process payment
result = process(payment_id)
return result
except TemporaryError as exc:
# Exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
except PermanentError:
# Don't retry
raise
Rate Limiting
@app.task(rate_limit='10/m') # 10 tasks per minute
def rate_limited_task():
pass
@app.task(rate_limit='100/h') # 100 tasks per hour
def hourly_limited_task():
pass
# Global rate limit
app.conf.task_default_rate_limit = '1000/h'
Task Routing
# celery_app.py
app.conf.task_routes = {
'tasks.send_email_task': {'queue': 'emails'},
'tasks.process_image': {'queue': 'images'},
'tasks.ml_inference': {'queue': 'ml', 'routing_key': 'ml.predict'},
'tasks.*': {'queue': 'default'},
}
# Priority queues
app.conf.task_queues = (
Queue('high', Exchange('high'), routing_key='high'),
Queue('default', Exchange('default'), routing_key='default'),
Queue('low', Exchange('low'), routing_key='low'),
)
# Start workers for specific queues
# celery -A celery_app worker -Q high,default -c 4
# celery -A celery_app worker -Q low -c 2
Task Workflows
Chains
from celery import chain
# Execute tasks in sequence
workflow = chain(
fetch_data.s(url),
process_data.s(),
save_results.s()
)
result = workflow.apply_async()
# Get final result
final_result = result.get()
Groups
from celery import group
# Execute tasks in parallel
job = group([
process_image.s(image_id)
for image_id in image_ids
])
result = job.apply_async()
# Get all results
results = result.get() # List of results
Chords
from celery import chord
# Parallel tasks followed by callback
workflow = chord(
[fetch_price.s(symbol) for symbol in symbols],
aggregate_prices.s()
)
result = workflow.apply_async()
Complex Workflows
from celery import chain, group, chord
# Complex pipeline
workflow = chain(
# Step 1: Fetch data
fetch_data.s(source_id),
# Step 2: Process in parallel
group([
process_chunk.s(i)
for i in range(10)
]),
# Step 3: Aggregate results
aggregate_results.s(),
# Step 4: Save and notify
group([
save_to_database.s(),
send_notification.s()
])
)
result = workflow.apply_async()
Scheduling with Celery Beat
Periodic Tasks
# celery_app.py
from celery.schedules import crontab
app.conf.beat_schedule = {
# Every 30 seconds
'check-every-30-seconds': {
'task': 'tasks.heartbeat',
'schedule': 30.0,
},
# Every minute
'process-queue-every-minute': {
'task': 'tasks.process_queue',
'schedule': crontab(),
},
# Daily at midnight
'daily-cleanup': {
'task': 'tasks.cleanup',
'schedule': crontab(hour=0, minute=0),
},
# Every Monday at 7:30am
'weekly-report': {
'task': 'tasks.generate_weekly_report',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
},
# First day of month
'monthly-invoice': {
'task': 'tasks.generate_invoices',
'schedule': crontab(hour=0, minute=0, day_of_month=1),
},
# With arguments
'send-daily-digest': {
'task': 'tasks.send_digest',
'schedule': crontab(hour=8, minute=0),
'args': ('daily',),
'kwargs': {'include_stats': True},
},
}
Run Beat Scheduler
# Start beat scheduler
celery -A celery_app beat --loglevel=info
# Beat with worker (development)
celery -A celery_app worker --beat --loglevel=info
Error Handling
Custom Error Handling
from celery import Task
class BaseTaskWithErrorHandling(Task):
abstract = True
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Called when task fails."""
print(f"Task {task_id} failed: {exc}")
# Log to monitoring system
# Send alert
def on_success(self, retval, task_id, args, kwargs):
"""Called when task succeeds."""
print(f"Task {task_id} succeeded with result: {retval}")
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""Called when task is retried."""
print(f"Task {task_id} retrying due to: {exc}")
@app.task(base=BaseTaskWithErrorHandling)
def important_task():
# Task logic
pass
# Error callbacks
@app.task
def error_handler(request, exc, traceback):
print(f'Task {request.id} raised exception: {exc}')
result = task.apply_async(
link_error=error_handler.s()
)
Dead Letter Queue
from celery import Task
class TaskWithDLQ(Task):
max_retries = 3
def on_failure(self, exc, task_id, args, kwargs, einfo):
# Send to dead letter queue
dead_letter_task.delay({
'original_task': self.name,
'task_id': task_id,
'args': args,
'kwargs': kwargs,
'exception': str(exc),
'traceback': str(einfo)
})
@app.task
def dead_letter_task(failed_task_info):
"""Store failed tasks for later analysis."""
# Save to database or send to monitoring
pass
FastAPI Integration
from fastapi import FastAPI, BackgroundTasks
from celery.result import AsyncResult
from tasks import process_data, send_email_task
app = FastAPI()
@app.post("/process")
async def start_processing(data: dict):
task = process_data.delay(data)
return {"task_id": task.id}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
result = AsyncResult(task_id)
if result.state == 'PENDING':
response = {'state': 'pending'}
elif result.state == 'PROGRESS':
response = {
'state': 'progress',
'current': result.info.get('current', 0),
'total': result.info.get('total', 1)
}
elif result.state == 'SUCCESS':
response = {'state': 'success', 'result': result.result}
else:
response = {'state': result.state, 'info': str(result.info)}
return response
@app.post("/tasks/{task_id}/revoke")
async def revoke_task(task_id: str):
AsyncResult(task_id).revoke(terminate=True)
return {"status": "revoked"}
Monitoring with Flower
# Install Flower
pip install flower
# Start Flower
celery -A celery_app flower --port=5555
# With authentication
celery -A celery_app flower --basic_auth=user:password
Docker Compose Setup
version: '3.8'
services:
redis:
image: redis:alpine
ports:
- "6379:6379"
worker:
build: .
command: celery -A celery_app worker --loglevel=info
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
depends_on:
- redis
beat:
build: .
command: celery -A celery_app beat --loglevel=info
depends_on:
- redis
- worker
flower:
build: .
command: celery -A celery_app flower --port=5555
ports:
- "5555:5555"
depends_on:
- redis
- worker
Summary
| Feature | Purpose |
|---|---|
delay() | Quick async execution |
apply_async() | Advanced options |
chain() | Sequential tasks |
group() | Parallel tasks |
chord() | Parallel + callback |
| Beat | Scheduled tasks |
| Flower | Monitoring |
Celery provides robust background task processing for Python applications at any scale.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 11: Background Tasks and Celery
Handle long-running operations in FastAPI. Learn built-in BackgroundTasks, Celery integration, task queues, and async processing patterns.
PythonFastAPI Tutorial Part 17: Performance and Caching
Optimize FastAPI performance with caching, async operations, connection pooling, and profiling. Build blazing-fast APIs.
PythonAI Agents Fundamentals: Build Your First Agent from Scratch
Master AI agents from the ground up. Learn the agent loop, build a working agent in pure Python, and understand the foundations that power LangGraph and CrewAI.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.