Redis Caching: Complete Guide to High-Performance Data Caching
Master Redis caching for web applications. Learn cache strategies, data structures, pub/sub, sessions, and build scalable caching solutions.
Moshiour Rahman
Advertisement
What is Redis?
Redis is an in-memory data store used as a cache, message broker, and database. Its speed and versatility make it essential for high-performance applications.
Redis Use Cases
| Use Case | Description |
|---|---|
| Caching | Store frequently accessed data |
| Sessions | User session management |
| Rate Limiting | API request throttling |
| Pub/Sub | Real-time messaging |
| Queues | Job and task queues |
Getting Started
Installation
# Docker (recommended)
docker run -d --name redis -p 6379:6379 redis:latest
# macOS
brew install redis
brew services start redis
# Ubuntu
sudo apt install redis-server
sudo systemctl start redis
# Python client
pip install redis
Basic Operations
import redis
# Connect to Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# String operations
r.set('name', 'John')
r.set('age', 30)
r.set('session', 'abc123', ex=3600) # Expires in 1 hour
name = r.get('name')
print(f"Name: {name}")
# Check existence
exists = r.exists('name')
print(f"Exists: {exists}")
# Delete keys
r.delete('name')
# Set multiple values
r.mset({'key1': 'value1', 'key2': 'value2'})
# Get multiple values
values = r.mget(['key1', 'key2'])
print(values)
# Increment/Decrement
r.set('counter', 0)
r.incr('counter') # 1
r.incrby('counter', 5) # 6
r.decr('counter') # 5
Data Structures
Lists
import redis
r = redis.Redis(decode_responses=True)
# Add to list
r.lpush('queue', 'task1', 'task2') # Left push
r.rpush('queue', 'task3') # Right push
# Get list
items = r.lrange('queue', 0, -1)
print(items) # ['task2', 'task1', 'task3']
# Pop from list
task = r.lpop('queue') # Remove from left
task = r.rpop('queue') # Remove from right
# Blocking pop (for queues)
task = r.blpop('queue', timeout=5)
# List length
length = r.llen('queue')
# Get by index
item = r.lindex('queue', 0)
Hashes
# Store user object
r.hset('user:1', mapping={
'name': 'John',
'email': 'john@example.com',
'age': 30
})
# Get single field
name = r.hget('user:1', 'name')
# Get all fields
user = r.hgetall('user:1')
print(user) # {'name': 'John', 'email': 'john@example.com', 'age': '30'}
# Update field
r.hset('user:1', 'age', 31)
# Increment field
r.hincrby('user:1', 'age', 1)
# Delete field
r.hdel('user:1', 'email')
# Check field exists
exists = r.hexists('user:1', 'name')
Sets
# Add to set
r.sadd('tags:article1', 'python', 'redis', 'database')
r.sadd('tags:article2', 'python', 'django', 'web')
# Get all members
tags = r.smembers('tags:article1')
print(tags) # {'python', 'redis', 'database'}
# Check membership
is_member = r.sismember('tags:article1', 'python')
# Set operations
intersection = r.sinter('tags:article1', 'tags:article2') # Common tags
union = r.sunion('tags:article1', 'tags:article2') # All tags
diff = r.sdiff('tags:article1', 'tags:article2') # In article1 but not article2
# Random member
random_tag = r.srandmember('tags:article1')
# Remove member
r.srem('tags:article1', 'database')
Sorted Sets
# Leaderboard example
r.zadd('leaderboard', {
'player1': 100,
'player2': 200,
'player3': 150
})
# Get top players
top = r.zrevrange('leaderboard', 0, 2, withscores=True)
print(top) # [('player2', 200.0), ('player3', 150.0), ('player1', 100.0)]
# Get player rank
rank = r.zrevrank('leaderboard', 'player1')
# Get player score
score = r.zscore('leaderboard', 'player1')
# Increment score
r.zincrby('leaderboard', 50, 'player1')
# Get players by score range
players = r.zrangebyscore('leaderboard', 100, 200)
Caching Strategies
Cache-Aside Pattern
import redis
import json
from typing import Optional, Any
from functools import wraps
r = redis.Redis(decode_responses=True)
def get_user_from_db(user_id: int) -> dict:
"""Simulate database query."""
return {'id': user_id, 'name': f'User {user_id}'}
def get_user(user_id: int) -> Optional[dict]:
"""Cache-aside pattern."""
cache_key = f'user:{user_id}'
# Try cache first
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Cache miss - get from database
user = get_user_from_db(user_id)
# Store in cache
r.set(cache_key, json.dumps(user), ex=3600)
return user
# Decorator version
def cached(prefix: str, ttl: int = 3600):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = f"{prefix}:{':'.join(map(str, args))}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
result = func(*args, **kwargs)
r.set(cache_key, json.dumps(result), ex=ttl)
return result
return wrapper
return decorator
@cached('product', ttl=1800)
def get_product(product_id: int) -> dict:
return {'id': product_id, 'name': f'Product {product_id}'}
Write-Through Cache
class WriteThoughCache:
def __init__(self, redis_client, db):
self.redis = redis_client
self.db = db
def set(self, key: str, value: Any, ttl: int = 3600):
"""Write to both cache and database."""
# Write to database first
self.db.save(key, value)
# Then update cache
self.redis.set(key, json.dumps(value), ex=ttl)
def get(self, key: str) -> Optional[Any]:
"""Read from cache, fallback to database."""
cached = self.redis.get(key)
if cached:
return json.loads(cached)
value = self.db.get(key)
if value:
self.redis.set(key, json.dumps(value), ex=3600)
return value
Cache Invalidation
class CacheManager:
def __init__(self, redis_client):
self.redis = redis_client
def invalidate_key(self, key: str):
"""Delete single key."""
self.redis.delete(key)
def invalidate_pattern(self, pattern: str):
"""Delete all keys matching pattern."""
cursor = 0
while True:
cursor, keys = self.redis.scan(cursor, match=pattern, count=100)
if keys:
self.redis.delete(*keys)
if cursor == 0:
break
def invalidate_user_cache(self, user_id: int):
"""Invalidate all user-related cache."""
patterns = [
f'user:{user_id}',
f'user:{user_id}:*',
f'orders:user:{user_id}:*'
]
for pattern in patterns:
self.invalidate_pattern(pattern)
Session Management
import redis
import uuid
import json
from datetime import timedelta
class SessionManager:
def __init__(self, redis_client, ttl: int = 86400):
self.redis = redis_client
self.ttl = ttl
self.prefix = 'session:'
def create_session(self, user_id: int, data: dict = None) -> str:
"""Create new session."""
session_id = str(uuid.uuid4())
session_data = {
'user_id': user_id,
'created_at': str(datetime.now()),
**(data or {})
}
key = f'{self.prefix}{session_id}'
self.redis.set(key, json.dumps(session_data), ex=self.ttl)
return session_id
def get_session(self, session_id: str) -> Optional[dict]:
"""Get session data."""
key = f'{self.prefix}{session_id}'
data = self.redis.get(key)
if data:
# Refresh TTL on access
self.redis.expire(key, self.ttl)
return json.loads(data)
return None
def update_session(self, session_id: str, data: dict):
"""Update session data."""
key = f'{self.prefix}{session_id}'
current = self.get_session(session_id)
if current:
current.update(data)
self.redis.set(key, json.dumps(current), ex=self.ttl)
def destroy_session(self, session_id: str):
"""Delete session."""
key = f'{self.prefix}{session_id}'
self.redis.delete(key)
def get_user_sessions(self, user_id: int) -> list:
"""Get all sessions for a user."""
sessions = []
cursor = 0
while True:
cursor, keys = self.redis.scan(cursor, match=f'{self.prefix}*')
for key in keys:
data = self.redis.get(key)
if data:
session = json.loads(data)
if session.get('user_id') == user_id:
sessions.append(key.replace(self.prefix, ''))
if cursor == 0:
break
return sessions
Rate Limiting
Token Bucket
import redis
import time
class RateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
def is_allowed(
self,
key: str,
max_requests: int,
window_seconds: int
) -> tuple[bool, int]:
"""Check if request is allowed."""
now = time.time()
window_start = now - window_seconds
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(key, 0, window_start)
# Count requests in window
pipe.zcard(key)
# Add current request
pipe.zadd(key, {str(now): now})
# Set expiry
pipe.expire(key, window_seconds)
results = pipe.execute()
request_count = results[1]
if request_count < max_requests:
return True, max_requests - request_count - 1
return False, 0
def sliding_window(
self,
key: str,
max_requests: int,
window_seconds: int
) -> bool:
"""Sliding window rate limiter."""
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, window_seconds)
return current <= max_requests
# FastAPI middleware example
from fastapi import FastAPI, Request, HTTPException
app = FastAPI()
limiter = RateLimiter(redis.Redis())
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
key = f"rate_limit:{client_ip}"
allowed, remaining = limiter.is_allowed(key, max_requests=100, window_seconds=60)
if not allowed:
raise HTTPException(429, "Too many requests")
response = await call_next(request)
response.headers["X-RateLimit-Remaining"] = str(remaining)
return response
Pub/Sub Messaging
import redis
import threading
import json
r = redis.Redis(decode_responses=True)
# Publisher
def publish_message(channel: str, message: dict):
r.publish(channel, json.dumps(message))
# Subscriber
def subscribe_handler(channel: str, callback):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
callback(data)
# Example usage
def on_message(data):
print(f"Received: {data}")
# Start subscriber in background
thread = threading.Thread(
target=subscribe_handler,
args=('notifications', on_message)
)
thread.daemon = True
thread.start()
# Publish messages
publish_message('notifications', {'type': 'alert', 'message': 'Hello!'})
FastAPI Integration
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
import redis.asyncio as redis
app = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
app.state.redis = redis.Redis(decode_responses=True)
yield
# Shutdown
await app.state.redis.close()
app = FastAPI(lifespan=lifespan)
async def get_redis():
return app.state.redis
@app.get("/cached/{key}")
async def get_cached(key: str, r = Depends(get_redis)):
value = await r.get(key)
return {"key": key, "value": value}
@app.post("/cache/{key}")
async def set_cached(key: str, value: str, ttl: int = 3600, r = Depends(get_redis)):
await r.set(key, value, ex=ttl)
return {"status": "cached"}
Summary
| Feature | Command |
|---|---|
| Set value | r.set(key, value) |
| Get value | r.get(key) |
| Hash | r.hset(), r.hget() |
| List | r.lpush(), r.lpop() |
| Set | r.sadd(), r.smembers() |
| Sorted Set | r.zadd(), r.zrange() |
| Pub/Sub | r.publish(), r.subscribe() |
Redis provides lightning-fast caching and data structures for building scalable applications.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
PostgreSQL Advanced Guide: From Queries to Performance Tuning
Master PostgreSQL with advanced SQL queries, indexing strategies, performance optimization, JSON support, and production database management.
DevOpsElasticsearch: Complete Full-Text Search Guide
Master Elasticsearch for full-text search. Learn indexing, queries, aggregations, and build powerful search applications with Python.
DevOpsStop Wrestling YAML: How to Deploy 50 AI Models with Python Loops
Infrastructure as Code shouldn't be a copy-paste nightmare. Learn how to use Pulumi and Python to programmatically deploy scalable AI infrastructure without the YAML fatigue.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.