DevOps 7 min read

Redis Caching: Complete Guide to High-Performance Data Caching

Master Redis caching for web applications. Learn cache strategies, data structures, pub/sub, sessions, and build scalable caching solutions.

MR

Moshiour Rahman

Advertisement

What is Redis?

Redis is an in-memory data store used as a cache, message broker, and database. Its speed and versatility make it essential for high-performance applications.

Redis Use Cases

Use CaseDescription
CachingStore frequently accessed data
SessionsUser session management
Rate LimitingAPI request throttling
Pub/SubReal-time messaging
QueuesJob and task queues

Getting Started

Installation

# Docker (recommended)
docker run -d --name redis -p 6379:6379 redis:latest

# macOS
brew install redis
brew services start redis

# Ubuntu
sudo apt install redis-server
sudo systemctl start redis

# Python client
pip install redis

Basic Operations

import redis

# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# String operations
r.set('name', 'John')
r.set('age', 30)
r.set('session', 'abc123', ex=3600)  # Expires in 1 hour

name = r.get('name')
print(f"Name: {name}")

# Check existence
exists = r.exists('name')
print(f"Exists: {exists}")

# Delete keys
r.delete('name')

# Set multiple values
r.mset({'key1': 'value1', 'key2': 'value2'})

# Get multiple values
values = r.mget(['key1', 'key2'])
print(values)

# Increment/Decrement
r.set('counter', 0)
r.incr('counter')  # 1
r.incrby('counter', 5)  # 6
r.decr('counter')  # 5

Data Structures

Lists

import redis

r = redis.Redis(decode_responses=True)

# Add to list
r.lpush('queue', 'task1', 'task2')  # Left push
r.rpush('queue', 'task3')  # Right push

# Get list
items = r.lrange('queue', 0, -1)
print(items)  # ['task2', 'task1', 'task3']

# Pop from list
task = r.lpop('queue')  # Remove from left
task = r.rpop('queue')  # Remove from right

# Blocking pop (for queues)
task = r.blpop('queue', timeout=5)

# List length
length = r.llen('queue')

# Get by index
item = r.lindex('queue', 0)

Hashes

# Store user object
r.hset('user:1', mapping={
    'name': 'John',
    'email': 'john@example.com',
    'age': 30
})

# Get single field
name = r.hget('user:1', 'name')

# Get all fields
user = r.hgetall('user:1')
print(user)  # {'name': 'John', 'email': 'john@example.com', 'age': '30'}

# Update field
r.hset('user:1', 'age', 31)

# Increment field
r.hincrby('user:1', 'age', 1)

# Delete field
r.hdel('user:1', 'email')

# Check field exists
exists = r.hexists('user:1', 'name')

Sets

# Add to set
r.sadd('tags:article1', 'python', 'redis', 'database')
r.sadd('tags:article2', 'python', 'django', 'web')

# Get all members
tags = r.smembers('tags:article1')
print(tags)  # {'python', 'redis', 'database'}

# Check membership
is_member = r.sismember('tags:article1', 'python')

# Set operations
intersection = r.sinter('tags:article1', 'tags:article2')  # Common tags
union = r.sunion('tags:article1', 'tags:article2')  # All tags
diff = r.sdiff('tags:article1', 'tags:article2')  # In article1 but not article2

# Random member
random_tag = r.srandmember('tags:article1')

# Remove member
r.srem('tags:article1', 'database')

Sorted Sets

# Leaderboard example
r.zadd('leaderboard', {
    'player1': 100,
    'player2': 200,
    'player3': 150
})

# Get top players
top = r.zrevrange('leaderboard', 0, 2, withscores=True)
print(top)  # [('player2', 200.0), ('player3', 150.0), ('player1', 100.0)]

# Get player rank
rank = r.zrevrank('leaderboard', 'player1')

# Get player score
score = r.zscore('leaderboard', 'player1')

# Increment score
r.zincrby('leaderboard', 50, 'player1')

# Get players by score range
players = r.zrangebyscore('leaderboard', 100, 200)

Caching Strategies

Cache-Aside Pattern

import redis
import json
from typing import Optional, Any
from functools import wraps

r = redis.Redis(decode_responses=True)

def get_user_from_db(user_id: int) -> dict:
    """Simulate database query."""
    return {'id': user_id, 'name': f'User {user_id}'}

def get_user(user_id: int) -> Optional[dict]:
    """Cache-aside pattern."""
    cache_key = f'user:{user_id}'

    # Try cache first
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # Cache miss - get from database
    user = get_user_from_db(user_id)

    # Store in cache
    r.set(cache_key, json.dumps(user), ex=3600)

    return user

# Decorator version
def cached(prefix: str, ttl: int = 3600):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            cache_key = f"{prefix}:{':'.join(map(str, args))}"

            cached = r.get(cache_key)
            if cached:
                return json.loads(cached)

            result = func(*args, **kwargs)
            r.set(cache_key, json.dumps(result), ex=ttl)
            return result
        return wrapper
    return decorator

@cached('product', ttl=1800)
def get_product(product_id: int) -> dict:
    return {'id': product_id, 'name': f'Product {product_id}'}

Write-Through Cache

class WriteThoughCache:
    def __init__(self, redis_client, db):
        self.redis = redis_client
        self.db = db

    def set(self, key: str, value: Any, ttl: int = 3600):
        """Write to both cache and database."""
        # Write to database first
        self.db.save(key, value)

        # Then update cache
        self.redis.set(key, json.dumps(value), ex=ttl)

    def get(self, key: str) -> Optional[Any]:
        """Read from cache, fallback to database."""
        cached = self.redis.get(key)
        if cached:
            return json.loads(cached)

        value = self.db.get(key)
        if value:
            self.redis.set(key, json.dumps(value), ex=3600)
        return value

Cache Invalidation

class CacheManager:
    def __init__(self, redis_client):
        self.redis = redis_client

    def invalidate_key(self, key: str):
        """Delete single key."""
        self.redis.delete(key)

    def invalidate_pattern(self, pattern: str):
        """Delete all keys matching pattern."""
        cursor = 0
        while True:
            cursor, keys = self.redis.scan(cursor, match=pattern, count=100)
            if keys:
                self.redis.delete(*keys)
            if cursor == 0:
                break

    def invalidate_user_cache(self, user_id: int):
        """Invalidate all user-related cache."""
        patterns = [
            f'user:{user_id}',
            f'user:{user_id}:*',
            f'orders:user:{user_id}:*'
        ]
        for pattern in patterns:
            self.invalidate_pattern(pattern)

Session Management

import redis
import uuid
import json
from datetime import timedelta

class SessionManager:
    def __init__(self, redis_client, ttl: int = 86400):
        self.redis = redis_client
        self.ttl = ttl
        self.prefix = 'session:'

    def create_session(self, user_id: int, data: dict = None) -> str:
        """Create new session."""
        session_id = str(uuid.uuid4())
        session_data = {
            'user_id': user_id,
            'created_at': str(datetime.now()),
            **(data or {})
        }

        key = f'{self.prefix}{session_id}'
        self.redis.set(key, json.dumps(session_data), ex=self.ttl)

        return session_id

    def get_session(self, session_id: str) -> Optional[dict]:
        """Get session data."""
        key = f'{self.prefix}{session_id}'
        data = self.redis.get(key)

        if data:
            # Refresh TTL on access
            self.redis.expire(key, self.ttl)
            return json.loads(data)
        return None

    def update_session(self, session_id: str, data: dict):
        """Update session data."""
        key = f'{self.prefix}{session_id}'
        current = self.get_session(session_id)

        if current:
            current.update(data)
            self.redis.set(key, json.dumps(current), ex=self.ttl)

    def destroy_session(self, session_id: str):
        """Delete session."""
        key = f'{self.prefix}{session_id}'
        self.redis.delete(key)

    def get_user_sessions(self, user_id: int) -> list:
        """Get all sessions for a user."""
        sessions = []
        cursor = 0

        while True:
            cursor, keys = self.redis.scan(cursor, match=f'{self.prefix}*')
            for key in keys:
                data = self.redis.get(key)
                if data:
                    session = json.loads(data)
                    if session.get('user_id') == user_id:
                        sessions.append(key.replace(self.prefix, ''))
            if cursor == 0:
                break

        return sessions

Rate Limiting

Token Bucket

import redis
import time

class RateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client

    def is_allowed(
        self,
        key: str,
        max_requests: int,
        window_seconds: int
    ) -> tuple[bool, int]:
        """Check if request is allowed."""
        now = time.time()
        window_start = now - window_seconds

        pipe = self.redis.pipeline()

        # Remove old entries
        pipe.zremrangebyscore(key, 0, window_start)

        # Count requests in window
        pipe.zcard(key)

        # Add current request
        pipe.zadd(key, {str(now): now})

        # Set expiry
        pipe.expire(key, window_seconds)

        results = pipe.execute()
        request_count = results[1]

        if request_count < max_requests:
            return True, max_requests - request_count - 1

        return False, 0

    def sliding_window(
        self,
        key: str,
        max_requests: int,
        window_seconds: int
    ) -> bool:
        """Sliding window rate limiter."""
        current = self.redis.incr(key)

        if current == 1:
            self.redis.expire(key, window_seconds)

        return current <= max_requests

# FastAPI middleware example
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()
limiter = RateLimiter(redis.Redis())

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    client_ip = request.client.host
    key = f"rate_limit:{client_ip}"

    allowed, remaining = limiter.is_allowed(key, max_requests=100, window_seconds=60)

    if not allowed:
        raise HTTPException(429, "Too many requests")

    response = await call_next(request)
    response.headers["X-RateLimit-Remaining"] = str(remaining)
    return response

Pub/Sub Messaging

import redis
import threading
import json

r = redis.Redis(decode_responses=True)

# Publisher
def publish_message(channel: str, message: dict):
    r.publish(channel, json.dumps(message))

# Subscriber
def subscribe_handler(channel: str, callback):
    pubsub = r.pubsub()
    pubsub.subscribe(channel)

    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            callback(data)

# Example usage
def on_message(data):
    print(f"Received: {data}")

# Start subscriber in background
thread = threading.Thread(
    target=subscribe_handler,
    args=('notifications', on_message)
)
thread.daemon = True
thread.start()

# Publish messages
publish_message('notifications', {'type': 'alert', 'message': 'Hello!'})

FastAPI Integration

from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
import redis.asyncio as redis

app = FastAPI()

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    app.state.redis = redis.Redis(decode_responses=True)
    yield
    # Shutdown
    await app.state.redis.close()

app = FastAPI(lifespan=lifespan)

async def get_redis():
    return app.state.redis

@app.get("/cached/{key}")
async def get_cached(key: str, r = Depends(get_redis)):
    value = await r.get(key)
    return {"key": key, "value": value}

@app.post("/cache/{key}")
async def set_cached(key: str, value: str, ttl: int = 3600, r = Depends(get_redis)):
    await r.set(key, value, ex=ttl)
    return {"status": "cached"}

Summary

FeatureCommand
Set valuer.set(key, value)
Get valuer.get(key)
Hashr.hset(), r.hget()
Listr.lpush(), r.lpop()
Setr.sadd(), r.smembers()
Sorted Setr.zadd(), r.zrange()
Pub/Subr.publish(), r.subscribe()

Redis provides lightning-fast caching and data structures for building scalable applications.

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.