API Rate Limiting: Complete Implementation Guide
Master API rate limiting for production systems. Learn token bucket, sliding window, Redis-based limiting, and protect your APIs from abuse.
Moshiour Rahman
Advertisement
What is Rate Limiting?
Rate limiting controls how many requests a client can make to an API within a time window. It protects services from abuse, ensures fair usage, and maintains system stability.
Why Rate Limit?
| Purpose | Description |
|---|---|
| Prevent Abuse | Stop malicious requests |
| Fair Usage | Equal access for all users |
| Cost Control | Limit resource consumption |
| Stability | Prevent system overload |
Rate Limiting Algorithms
Fixed Window
import time
from typing import Tuple
import redis
class FixedWindowLimiter:
"""Simple fixed window rate limiter."""
def __init__(self, redis_client, max_requests: int, window_seconds: int):
self.redis = redis_client
self.max_requests = max_requests
self.window_seconds = window_seconds
def is_allowed(self, key: str) -> Tuple[bool, dict]:
current_window = int(time.time() // self.window_seconds)
redis_key = f"rate_limit:{key}:{current_window}"
current = self.redis.incr(redis_key)
if current == 1:
self.redis.expire(redis_key, self.window_seconds)
remaining = max(0, self.max_requests - current)
reset_time = (current_window + 1) * self.window_seconds
return current <= self.max_requests, {
"limit": self.max_requests,
"remaining": remaining,
"reset": reset_time
}
# Usage
r = redis.Redis()
limiter = FixedWindowLimiter(r, max_requests=100, window_seconds=60)
allowed, info = limiter.is_allowed("user:123")
print(f"Allowed: {allowed}, Remaining: {info['remaining']}")
Sliding Window Log
import time
import redis
class SlidingWindowLogLimiter:
"""Sliding window log algorithm - most accurate."""
def __init__(self, redis_client, max_requests: int, window_seconds: int):
self.redis = redis_client
self.max_requests = max_requests
self.window_seconds = window_seconds
def is_allowed(self, key: str) -> Tuple[bool, dict]:
now = time.time()
window_start = now - self.window_seconds
redis_key = f"rate_limit:swl:{key}"
pipe = self.redis.pipeline()
# Remove old entries
pipe.zremrangebyscore(redis_key, 0, window_start)
# Count requests in window
pipe.zcard(redis_key)
# Add current request timestamp
pipe.zadd(redis_key, {str(now): now})
# Set expiry
pipe.expire(redis_key, self.window_seconds)
results = pipe.execute()
request_count = results[1]
allowed = request_count < self.max_requests
remaining = max(0, self.max_requests - request_count - 1)
return allowed, {
"limit": self.max_requests,
"remaining": remaining,
"reset": int(now + self.window_seconds)
}
# Usage
limiter = SlidingWindowLogLimiter(redis.Redis(), max_requests=100, window_seconds=60)
Token Bucket
import time
import redis
class TokenBucketLimiter:
"""Token bucket algorithm - allows bursts."""
def __init__(
self,
redis_client,
bucket_size: int,
refill_rate: float, # tokens per second
):
self.redis = redis_client
self.bucket_size = bucket_size
self.refill_rate = refill_rate
def is_allowed(self, key: str, tokens: int = 1) -> Tuple[bool, dict]:
redis_key = f"rate_limit:tb:{key}"
now = time.time()
# Get current bucket state
bucket_data = self.redis.hgetall(redis_key)
if bucket_data:
last_update = float(bucket_data[b'last_update'])
current_tokens = float(bucket_data[b'tokens'])
# Calculate tokens to add based on time passed
time_passed = now - last_update
tokens_to_add = time_passed * self.refill_rate
current_tokens = min(self.bucket_size, current_tokens + tokens_to_add)
else:
current_tokens = self.bucket_size
# Check if we have enough tokens
if current_tokens >= tokens:
current_tokens -= tokens
allowed = True
else:
allowed = False
# Update bucket state
self.redis.hset(redis_key, mapping={
'tokens': current_tokens,
'last_update': now
})
self.redis.expire(redis_key, 3600)
return allowed, {
"limit": self.bucket_size,
"remaining": int(current_tokens),
"refill_rate": self.refill_rate
}
# Usage - 100 tokens max, refills at 10 tokens/second
limiter = TokenBucketLimiter(redis.Redis(), bucket_size=100, refill_rate=10)
Leaky Bucket
import time
import redis
from typing import Tuple
class LeakyBucketLimiter:
"""Leaky bucket - smooth output rate."""
def __init__(
self,
redis_client,
bucket_size: int,
leak_rate: float # requests per second
):
self.redis = redis_client
self.bucket_size = bucket_size
self.leak_rate = leak_rate
def is_allowed(self, key: str) -> Tuple[bool, dict]:
redis_key = f"rate_limit:lb:{key}"
now = time.time()
# Get current bucket state
bucket_data = self.redis.hgetall(redis_key)
if bucket_data:
last_update = float(bucket_data[b'last_update'])
water_level = float(bucket_data[b'water_level'])
# Calculate water leaked since last update
time_passed = now - last_update
leaked = time_passed * self.leak_rate
water_level = max(0, water_level - leaked)
else:
water_level = 0
# Try to add water (request)
if water_level < self.bucket_size:
water_level += 1
allowed = True
else:
allowed = False
# Update state
self.redis.hset(redis_key, mapping={
'water_level': water_level,
'last_update': now
})
self.redis.expire(redis_key, 3600)
return allowed, {
"limit": self.bucket_size,
"current_level": int(water_level),
"leak_rate": self.leak_rate
}
FastAPI Integration
Middleware Implementation
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import redis
import time
app = FastAPI()
redis_client = redis.Redis(decode_responses=True)
class RateLimitMiddleware:
def __init__(
self,
app,
max_requests: int = 100,
window_seconds: int = 60
):
self.app = app
self.max_requests = max_requests
self.window_seconds = window_seconds
self.redis = redis_client
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive)
# Get client identifier
client_ip = request.client.host
api_key = request.headers.get("X-API-Key", "")
identifier = api_key if api_key else client_ip
# Check rate limit
allowed, info = self._check_limit(identifier)
if not allowed:
response = JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded"},
headers={
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["reset"]),
"Retry-After": str(info["reset"] - int(time.time()))
}
)
await response(scope, receive, send)
return
# Add rate limit headers to response
async def send_wrapper(message):
if message["type"] == "http.response.start":
headers = list(message.get("headers", []))
headers.extend([
(b"X-RateLimit-Limit", str(info["limit"]).encode()),
(b"X-RateLimit-Remaining", str(info["remaining"]).encode()),
(b"X-RateLimit-Reset", str(info["reset"]).encode()),
])
message["headers"] = headers
await send(message)
await self.app(scope, receive, send_wrapper)
def _check_limit(self, key: str) -> Tuple[bool, dict]:
now = time.time()
window_start = now - self.window_seconds
redis_key = f"rate_limit:{key}"
pipe = self.redis.pipeline()
pipe.zremrangebyscore(redis_key, 0, window_start)
pipe.zcard(redis_key)
pipe.zadd(redis_key, {str(now): now})
pipe.expire(redis_key, self.window_seconds)
results = pipe.execute()
count = results[1]
return count < self.max_requests, {
"limit": self.max_requests,
"remaining": max(0, self.max_requests - count - 1),
"reset": int(now + self.window_seconds)
}
# Add middleware
app.add_middleware(RateLimitMiddleware, max_requests=100, window_seconds=60)
Decorator-Based Limiting
from fastapi import FastAPI, Request, HTTPException, Depends
from functools import wraps
import redis
app = FastAPI()
redis_client = redis.Redis(decode_responses=True)
def rate_limit(max_requests: int, window_seconds: int):
"""Rate limit decorator for specific endpoints."""
def decorator(func):
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
client_ip = request.client.host
key = f"rate_limit:{func.__name__}:{client_ip}"
current_window = int(time.time() // window_seconds)
redis_key = f"{key}:{current_window}"
current = redis_client.incr(redis_key)
if current == 1:
redis_client.expire(redis_key, window_seconds)
if current > max_requests:
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
return await func(request, *args, **kwargs)
return wrapper
return decorator
@app.get("/api/data")
@rate_limit(max_requests=10, window_seconds=60)
async def get_data(request: Request):
return {"data": "limited endpoint"}
@app.get("/api/public")
@rate_limit(max_requests=100, window_seconds=60)
async def get_public(request: Request):
return {"data": "less limited endpoint"}
Tiered Rate Limits
from fastapi import FastAPI, Request, Depends, HTTPException
from enum import Enum
import redis
app = FastAPI()
redis_client = redis.Redis(decode_responses=True)
class UserTier(str, Enum):
FREE = "free"
BASIC = "basic"
PRO = "pro"
ENTERPRISE = "enterprise"
TIER_LIMITS = {
UserTier.FREE: {"requests": 100, "window": 3600},
UserTier.BASIC: {"requests": 1000, "window": 3600},
UserTier.PRO: {"requests": 10000, "window": 3600},
UserTier.ENTERPRISE: {"requests": 100000, "window": 3600},
}
class TieredRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
def check_limit(self, user_id: str, tier: UserTier) -> Tuple[bool, dict]:
limits = TIER_LIMITS[tier]
key = f"rate_limit:{tier}:{user_id}"
now = time.time()
window_start = now - limits["window"]
pipe = self.redis.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {str(now): now})
pipe.expire(key, limits["window"])
results = pipe.execute()
count = results[1]
allowed = count < limits["requests"]
remaining = max(0, limits["requests"] - count - 1)
return allowed, {
"tier": tier,
"limit": limits["requests"],
"remaining": remaining,
"reset": int(now + limits["window"])
}
limiter = TieredRateLimiter(redis_client)
async def get_user_tier(request: Request) -> Tuple[str, UserTier]:
# In production, get from auth token or database
api_key = request.headers.get("X-API-Key", "")
# Lookup user tier from API key
return "user_123", UserTier.BASIC
@app.get("/api/resource")
async def get_resource(
request: Request,
user_info: Tuple[str, UserTier] = Depends(get_user_tier)
):
user_id, tier = user_info
allowed, info = limiter.check_limit(user_id, tier)
if not allowed:
raise HTTPException(
status_code=429,
detail={
"error": "Rate limit exceeded",
"tier": tier,
"limit": info["limit"],
"reset": info["reset"]
}
)
return {"data": "success", "rate_limit": info}
Distributed Rate Limiting
Redis Cluster
import redis
from redis.cluster import RedisCluster
import time
class DistributedRateLimiter:
"""Rate limiter for distributed systems using Redis Cluster."""
def __init__(self, startup_nodes: list):
self.redis = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True
)
def is_allowed(
self,
key: str,
max_requests: int,
window_seconds: int
) -> Tuple[bool, dict]:
# Use Lua script for atomic operation
lua_script = """
local key = KEYS[1]
local max_requests = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
-- Remove old entries
redis.call('ZREMRANGEBYSCORE', key, 0, now - window)
-- Get current count
local count = redis.call('ZCARD', key)
if count < max_requests then
-- Add request
redis.call('ZADD', key, now, now .. '-' .. math.random())
redis.call('EXPIRE', key, window)
return {1, max_requests - count - 1}
else
return {0, 0}
end
"""
now = time.time()
result = self.redis.eval(
lua_script,
1, # number of keys
f"rate_limit:{key}",
max_requests,
window_seconds,
now
)
allowed = result[0] == 1
remaining = result[1]
return allowed, {
"limit": max_requests,
"remaining": remaining,
"reset": int(now + window_seconds)
}
Response Headers
from fastapi import FastAPI, Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
class RateLimitHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Get rate limit info from request state
rate_limit_info = getattr(request.state, "rate_limit_info", None)
if rate_limit_info:
response.headers["X-RateLimit-Limit"] = str(rate_limit_info["limit"])
response.headers["X-RateLimit-Remaining"] = str(rate_limit_info["remaining"])
response.headers["X-RateLimit-Reset"] = str(rate_limit_info["reset"])
response.headers["X-RateLimit-Policy"] = rate_limit_info.get("policy", "sliding_window")
return response
app = FastAPI()
app.add_middleware(RateLimitHeadersMiddleware)
Summary
| Algorithm | Best For |
|---|---|
| Fixed Window | Simple, high performance |
| Sliding Window | Accurate, smooth |
| Token Bucket | Allowing bursts |
| Leaky Bucket | Smooth output |
Rate limiting protects APIs and ensures fair resource usage across all clients.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 18: API Security Best Practices
Secure your FastAPI application against common vulnerabilities. Learn input validation, rate limiting, CORS, and OWASP security patterns.
PythonFastAPI Tutorial: Build Modern Python APIs
Master FastAPI for building high-performance Python APIs. Learn async endpoints, validation, authentication, database integration, and deployment.
PythonFastAPI Tutorial Part 5: Dependency Injection - Share Logic Across Endpoints
Master FastAPI dependency injection for clean, reusable code. Learn database sessions, authentication, pagination, and complex dependency chains.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.