Python 7 min read

Celery Task Queue: Complete Python Background Jobs Guide

Master Celery for Python background task processing. Learn task queues, scheduling, monitoring, error handling, and build scalable async workflows.

MR

Moshiour Rahman

Advertisement

What is Celery?

Celery is a distributed task queue for Python that processes tasks asynchronously. It’s perfect for handling time-consuming operations like sending emails, processing images, or running ML models.

Use Cases

Task TypeExamples
EmailSending notifications
ProcessingImage/video conversion
API CallsThird-party integrations
ReportsPDF generation
MLModel inference

Getting Started

Installation

pip install celery[redis]
pip install redis

Basic Setup

# celery_app.py
from celery import Celery

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

# Configuration
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60,  # 30 minutes
    worker_prefetch_multiplier=1,
)

Define Tasks

# tasks.py
from celery_app import app
import time

@app.task
def add(x, y):
    return x + y

@app.task(bind=True)
def long_task(self, duration):
    """Task with progress updates."""
    for i in range(duration):
        time.sleep(1)
        self.update_state(
            state='PROGRESS',
            meta={'current': i + 1, 'total': duration}
        )
    return {'status': 'completed', 'duration': duration}

@app.task(name='send_email')
def send_email_task(to, subject, body):
    """Named task for sending emails."""
    # Email sending logic
    print(f"Sending email to {to}: {subject}")
    return {'sent': True, 'to': to}

Run Worker

# Start worker
celery -A celery_app worker --loglevel=info

# With concurrency
celery -A celery_app worker --loglevel=info --concurrency=4

# Specific queues
celery -A celery_app worker -Q high,default,low --loglevel=info

Task Execution

Calling Tasks

from tasks import add, long_task, send_email_task

# Synchronous call (blocking)
result = add(4, 4)  # Returns 8

# Asynchronous call
result = add.delay(4, 4)  # Returns AsyncResult
print(result.id)  # Task ID

# With apply_async (more options)
result = add.apply_async(args=[4, 4], countdown=10)  # Delay 10 seconds
result = add.apply_async(args=[4, 4], eta=datetime(2024, 12, 1, 10, 0))

# Queue routing
result = send_email_task.apply_async(
    args=['user@example.com', 'Hello', 'Body'],
    queue='high_priority'
)

# Get result
result = add.delay(4, 4)
result.ready()  # True if completed
result.successful()  # True if succeeded
result.get(timeout=10)  # Wait for result (blocking)
result.get(propagate=False)  # Don't raise exceptions

Task States

from celery.result import AsyncResult

def check_task_status(task_id):
    result = AsyncResult(task_id)

    status = result.state

    if status == 'PENDING':
        return {'state': 'pending', 'status': 'Task is waiting'}
    elif status == 'STARTED':
        return {'state': 'started', 'status': 'Task has started'}
    elif status == 'PROGRESS':
        return {
            'state': 'progress',
            'current': result.info.get('current', 0),
            'total': result.info.get('total', 1)
        }
    elif status == 'SUCCESS':
        return {'state': 'success', 'result': result.result}
    elif status == 'FAILURE':
        return {'state': 'failure', 'error': str(result.info)}
    elif status == 'REVOKED':
        return {'state': 'revoked', 'status': 'Task was cancelled'}

    return {'state': status}

Task Configuration

Retry Logic

from celery import app
from celery.exceptions import Retry
import requests

@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(requests.RequestException,),
    retry_backoff=True,
    retry_backoff_max=600,
    retry_jitter=True
)
def fetch_url(self, url):
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as exc:
        raise self.retry(exc=exc)

# Custom retry logic
@app.task(bind=True, max_retries=5)
def process_payment(self, payment_id):
    try:
        # Process payment
        result = process(payment_id)
        return result
    except TemporaryError as exc:
        # Exponential backoff
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)
    except PermanentError:
        # Don't retry
        raise

Rate Limiting

@app.task(rate_limit='10/m')  # 10 tasks per minute
def rate_limited_task():
    pass

@app.task(rate_limit='100/h')  # 100 tasks per hour
def hourly_limited_task():
    pass

# Global rate limit
app.conf.task_default_rate_limit = '1000/h'

Task Routing

# celery_app.py
app.conf.task_routes = {
    'tasks.send_email_task': {'queue': 'emails'},
    'tasks.process_image': {'queue': 'images'},
    'tasks.ml_inference': {'queue': 'ml', 'routing_key': 'ml.predict'},
    'tasks.*': {'queue': 'default'},
}

# Priority queues
app.conf.task_queues = (
    Queue('high', Exchange('high'), routing_key='high'),
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('low', Exchange('low'), routing_key='low'),
)

# Start workers for specific queues
# celery -A celery_app worker -Q high,default -c 4
# celery -A celery_app worker -Q low -c 2

Task Workflows

Chains

from celery import chain

# Execute tasks in sequence
workflow = chain(
    fetch_data.s(url),
    process_data.s(),
    save_results.s()
)
result = workflow.apply_async()

# Get final result
final_result = result.get()

Groups

from celery import group

# Execute tasks in parallel
job = group([
    process_image.s(image_id)
    for image_id in image_ids
])
result = job.apply_async()

# Get all results
results = result.get()  # List of results

Chords

from celery import chord

# Parallel tasks followed by callback
workflow = chord(
    [fetch_price.s(symbol) for symbol in symbols],
    aggregate_prices.s()
)
result = workflow.apply_async()

Complex Workflows

from celery import chain, group, chord

# Complex pipeline
workflow = chain(
    # Step 1: Fetch data
    fetch_data.s(source_id),

    # Step 2: Process in parallel
    group([
        process_chunk.s(i)
        for i in range(10)
    ]),

    # Step 3: Aggregate results
    aggregate_results.s(),

    # Step 4: Save and notify
    group([
        save_to_database.s(),
        send_notification.s()
    ])
)

result = workflow.apply_async()

Scheduling with Celery Beat

Periodic Tasks

# celery_app.py
from celery.schedules import crontab

app.conf.beat_schedule = {
    # Every 30 seconds
    'check-every-30-seconds': {
        'task': 'tasks.heartbeat',
        'schedule': 30.0,
    },

    # Every minute
    'process-queue-every-minute': {
        'task': 'tasks.process_queue',
        'schedule': crontab(),
    },

    # Daily at midnight
    'daily-cleanup': {
        'task': 'tasks.cleanup',
        'schedule': crontab(hour=0, minute=0),
    },

    # Every Monday at 7:30am
    'weekly-report': {
        'task': 'tasks.generate_weekly_report',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
    },

    # First day of month
    'monthly-invoice': {
        'task': 'tasks.generate_invoices',
        'schedule': crontab(hour=0, minute=0, day_of_month=1),
    },

    # With arguments
    'send-daily-digest': {
        'task': 'tasks.send_digest',
        'schedule': crontab(hour=8, minute=0),
        'args': ('daily',),
        'kwargs': {'include_stats': True},
    },
}

Run Beat Scheduler

# Start beat scheduler
celery -A celery_app beat --loglevel=info

# Beat with worker (development)
celery -A celery_app worker --beat --loglevel=info

Error Handling

Custom Error Handling

from celery import Task

class BaseTaskWithErrorHandling(Task):
    abstract = True

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Called when task fails."""
        print(f"Task {task_id} failed: {exc}")
        # Log to monitoring system
        # Send alert

    def on_success(self, retval, task_id, args, kwargs):
        """Called when task succeeds."""
        print(f"Task {task_id} succeeded with result: {retval}")

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """Called when task is retried."""
        print(f"Task {task_id} retrying due to: {exc}")

@app.task(base=BaseTaskWithErrorHandling)
def important_task():
    # Task logic
    pass

# Error callbacks
@app.task
def error_handler(request, exc, traceback):
    print(f'Task {request.id} raised exception: {exc}')

result = task.apply_async(
    link_error=error_handler.s()
)

Dead Letter Queue

from celery import Task

class TaskWithDLQ(Task):
    max_retries = 3

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # Send to dead letter queue
        dead_letter_task.delay({
            'original_task': self.name,
            'task_id': task_id,
            'args': args,
            'kwargs': kwargs,
            'exception': str(exc),
            'traceback': str(einfo)
        })

@app.task
def dead_letter_task(failed_task_info):
    """Store failed tasks for later analysis."""
    # Save to database or send to monitoring
    pass

FastAPI Integration

from fastapi import FastAPI, BackgroundTasks
from celery.result import AsyncResult
from tasks import process_data, send_email_task

app = FastAPI()

@app.post("/process")
async def start_processing(data: dict):
    task = process_data.delay(data)
    return {"task_id": task.id}

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    result = AsyncResult(task_id)

    if result.state == 'PENDING':
        response = {'state': 'pending'}
    elif result.state == 'PROGRESS':
        response = {
            'state': 'progress',
            'current': result.info.get('current', 0),
            'total': result.info.get('total', 1)
        }
    elif result.state == 'SUCCESS':
        response = {'state': 'success', 'result': result.result}
    else:
        response = {'state': result.state, 'info': str(result.info)}

    return response

@app.post("/tasks/{task_id}/revoke")
async def revoke_task(task_id: str):
    AsyncResult(task_id).revoke(terminate=True)
    return {"status": "revoked"}

Monitoring with Flower

# Install Flower
pip install flower

# Start Flower
celery -A celery_app flower --port=5555

# With authentication
celery -A celery_app flower --basic_auth=user:password

Docker Compose Setup

version: '3.8'

services:
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

  worker:
    build: .
    command: celery -A celery_app worker --loglevel=info
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis

  beat:
    build: .
    command: celery -A celery_app beat --loglevel=info
    depends_on:
      - redis
      - worker

  flower:
    build: .
    command: celery -A celery_app flower --port=5555
    ports:
      - "5555:5555"
    depends_on:
      - redis
      - worker

Summary

FeaturePurpose
delay()Quick async execution
apply_async()Advanced options
chain()Sequential tasks
group()Parallel tasks
chord()Parallel + callback
BeatScheduled tasks
FlowerMonitoring

Celery provides robust background task processing for Python applications at any scale.

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.