Python 8 min read

API Rate Limiting: Complete Implementation Guide

Master API rate limiting for production systems. Learn token bucket, sliding window, Redis-based limiting, and protect your APIs from abuse.

MR

Moshiour Rahman

Advertisement

What is Rate Limiting?

Rate limiting controls how many requests a client can make to an API within a time window. It protects services from abuse, ensures fair usage, and maintains system stability.

Why Rate Limit?

PurposeDescription
Prevent AbuseStop malicious requests
Fair UsageEqual access for all users
Cost ControlLimit resource consumption
StabilityPrevent system overload

Rate Limiting Algorithms

Fixed Window

import time
from typing import Tuple
import redis

class FixedWindowLimiter:
    """Simple fixed window rate limiter."""

    def __init__(self, redis_client, max_requests: int, window_seconds: int):
        self.redis = redis_client
        self.max_requests = max_requests
        self.window_seconds = window_seconds

    def is_allowed(self, key: str) -> Tuple[bool, dict]:
        current_window = int(time.time() // self.window_seconds)
        redis_key = f"rate_limit:{key}:{current_window}"

        current = self.redis.incr(redis_key)

        if current == 1:
            self.redis.expire(redis_key, self.window_seconds)

        remaining = max(0, self.max_requests - current)
        reset_time = (current_window + 1) * self.window_seconds

        return current <= self.max_requests, {
            "limit": self.max_requests,
            "remaining": remaining,
            "reset": reset_time
        }

# Usage
r = redis.Redis()
limiter = FixedWindowLimiter(r, max_requests=100, window_seconds=60)

allowed, info = limiter.is_allowed("user:123")
print(f"Allowed: {allowed}, Remaining: {info['remaining']}")

Sliding Window Log

import time
import redis

class SlidingWindowLogLimiter:
    """Sliding window log algorithm - most accurate."""

    def __init__(self, redis_client, max_requests: int, window_seconds: int):
        self.redis = redis_client
        self.max_requests = max_requests
        self.window_seconds = window_seconds

    def is_allowed(self, key: str) -> Tuple[bool, dict]:
        now = time.time()
        window_start = now - self.window_seconds
        redis_key = f"rate_limit:swl:{key}"

        pipe = self.redis.pipeline()

        # Remove old entries
        pipe.zremrangebyscore(redis_key, 0, window_start)

        # Count requests in window
        pipe.zcard(redis_key)

        # Add current request timestamp
        pipe.zadd(redis_key, {str(now): now})

        # Set expiry
        pipe.expire(redis_key, self.window_seconds)

        results = pipe.execute()
        request_count = results[1]

        allowed = request_count < self.max_requests
        remaining = max(0, self.max_requests - request_count - 1)

        return allowed, {
            "limit": self.max_requests,
            "remaining": remaining,
            "reset": int(now + self.window_seconds)
        }

# Usage
limiter = SlidingWindowLogLimiter(redis.Redis(), max_requests=100, window_seconds=60)

Token Bucket

import time
import redis

class TokenBucketLimiter:
    """Token bucket algorithm - allows bursts."""

    def __init__(
        self,
        redis_client,
        bucket_size: int,
        refill_rate: float,  # tokens per second
    ):
        self.redis = redis_client
        self.bucket_size = bucket_size
        self.refill_rate = refill_rate

    def is_allowed(self, key: str, tokens: int = 1) -> Tuple[bool, dict]:
        redis_key = f"rate_limit:tb:{key}"
        now = time.time()

        # Get current bucket state
        bucket_data = self.redis.hgetall(redis_key)

        if bucket_data:
            last_update = float(bucket_data[b'last_update'])
            current_tokens = float(bucket_data[b'tokens'])

            # Calculate tokens to add based on time passed
            time_passed = now - last_update
            tokens_to_add = time_passed * self.refill_rate
            current_tokens = min(self.bucket_size, current_tokens + tokens_to_add)
        else:
            current_tokens = self.bucket_size

        # Check if we have enough tokens
        if current_tokens >= tokens:
            current_tokens -= tokens
            allowed = True
        else:
            allowed = False

        # Update bucket state
        self.redis.hset(redis_key, mapping={
            'tokens': current_tokens,
            'last_update': now
        })
        self.redis.expire(redis_key, 3600)

        return allowed, {
            "limit": self.bucket_size,
            "remaining": int(current_tokens),
            "refill_rate": self.refill_rate
        }

# Usage - 100 tokens max, refills at 10 tokens/second
limiter = TokenBucketLimiter(redis.Redis(), bucket_size=100, refill_rate=10)

Leaky Bucket

import time
import redis
from typing import Tuple

class LeakyBucketLimiter:
    """Leaky bucket - smooth output rate."""

    def __init__(
        self,
        redis_client,
        bucket_size: int,
        leak_rate: float  # requests per second
    ):
        self.redis = redis_client
        self.bucket_size = bucket_size
        self.leak_rate = leak_rate

    def is_allowed(self, key: str) -> Tuple[bool, dict]:
        redis_key = f"rate_limit:lb:{key}"
        now = time.time()

        # Get current bucket state
        bucket_data = self.redis.hgetall(redis_key)

        if bucket_data:
            last_update = float(bucket_data[b'last_update'])
            water_level = float(bucket_data[b'water_level'])

            # Calculate water leaked since last update
            time_passed = now - last_update
            leaked = time_passed * self.leak_rate
            water_level = max(0, water_level - leaked)
        else:
            water_level = 0

        # Try to add water (request)
        if water_level < self.bucket_size:
            water_level += 1
            allowed = True
        else:
            allowed = False

        # Update state
        self.redis.hset(redis_key, mapping={
            'water_level': water_level,
            'last_update': now
        })
        self.redis.expire(redis_key, 3600)

        return allowed, {
            "limit": self.bucket_size,
            "current_level": int(water_level),
            "leak_rate": self.leak_rate
        }

FastAPI Integration

Middleware Implementation

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import redis
import time

app = FastAPI()
redis_client = redis.Redis(decode_responses=True)

class RateLimitMiddleware:
    def __init__(
        self,
        app,
        max_requests: int = 100,
        window_seconds: int = 60
    ):
        self.app = app
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.redis = redis_client

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        request = Request(scope, receive)

        # Get client identifier
        client_ip = request.client.host
        api_key = request.headers.get("X-API-Key", "")
        identifier = api_key if api_key else client_ip

        # Check rate limit
        allowed, info = self._check_limit(identifier)

        if not allowed:
            response = JSONResponse(
                status_code=429,
                content={"error": "Rate limit exceeded"},
                headers={
                    "X-RateLimit-Limit": str(info["limit"]),
                    "X-RateLimit-Remaining": str(info["remaining"]),
                    "X-RateLimit-Reset": str(info["reset"]),
                    "Retry-After": str(info["reset"] - int(time.time()))
                }
            )
            await response(scope, receive, send)
            return

        # Add rate limit headers to response
        async def send_wrapper(message):
            if message["type"] == "http.response.start":
                headers = list(message.get("headers", []))
                headers.extend([
                    (b"X-RateLimit-Limit", str(info["limit"]).encode()),
                    (b"X-RateLimit-Remaining", str(info["remaining"]).encode()),
                    (b"X-RateLimit-Reset", str(info["reset"]).encode()),
                ])
                message["headers"] = headers
            await send(message)

        await self.app(scope, receive, send_wrapper)

    def _check_limit(self, key: str) -> Tuple[bool, dict]:
        now = time.time()
        window_start = now - self.window_seconds
        redis_key = f"rate_limit:{key}"

        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(redis_key, 0, window_start)
        pipe.zcard(redis_key)
        pipe.zadd(redis_key, {str(now): now})
        pipe.expire(redis_key, self.window_seconds)

        results = pipe.execute()
        count = results[1]

        return count < self.max_requests, {
            "limit": self.max_requests,
            "remaining": max(0, self.max_requests - count - 1),
            "reset": int(now + self.window_seconds)
        }

# Add middleware
app.add_middleware(RateLimitMiddleware, max_requests=100, window_seconds=60)

Decorator-Based Limiting

from fastapi import FastAPI, Request, HTTPException, Depends
from functools import wraps
import redis

app = FastAPI()
redis_client = redis.Redis(decode_responses=True)

def rate_limit(max_requests: int, window_seconds: int):
    """Rate limit decorator for specific endpoints."""

    def decorator(func):
        @wraps(func)
        async def wrapper(request: Request, *args, **kwargs):
            client_ip = request.client.host
            key = f"rate_limit:{func.__name__}:{client_ip}"

            current_window = int(time.time() // window_seconds)
            redis_key = f"{key}:{current_window}"

            current = redis_client.incr(redis_key)
            if current == 1:
                redis_client.expire(redis_key, window_seconds)

            if current > max_requests:
                raise HTTPException(
                    status_code=429,
                    detail="Rate limit exceeded"
                )

            return await func(request, *args, **kwargs)
        return wrapper
    return decorator

@app.get("/api/data")
@rate_limit(max_requests=10, window_seconds=60)
async def get_data(request: Request):
    return {"data": "limited endpoint"}

@app.get("/api/public")
@rate_limit(max_requests=100, window_seconds=60)
async def get_public(request: Request):
    return {"data": "less limited endpoint"}

Tiered Rate Limits

from fastapi import FastAPI, Request, Depends, HTTPException
from enum import Enum
import redis

app = FastAPI()
redis_client = redis.Redis(decode_responses=True)

class UserTier(str, Enum):
    FREE = "free"
    BASIC = "basic"
    PRO = "pro"
    ENTERPRISE = "enterprise"

TIER_LIMITS = {
    UserTier.FREE: {"requests": 100, "window": 3600},
    UserTier.BASIC: {"requests": 1000, "window": 3600},
    UserTier.PRO: {"requests": 10000, "window": 3600},
    UserTier.ENTERPRISE: {"requests": 100000, "window": 3600},
}

class TieredRateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client

    def check_limit(self, user_id: str, tier: UserTier) -> Tuple[bool, dict]:
        limits = TIER_LIMITS[tier]
        key = f"rate_limit:{tier}:{user_id}"

        now = time.time()
        window_start = now - limits["window"]

        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(key, 0, window_start)
        pipe.zcard(key)
        pipe.zadd(key, {str(now): now})
        pipe.expire(key, limits["window"])

        results = pipe.execute()
        count = results[1]

        allowed = count < limits["requests"]
        remaining = max(0, limits["requests"] - count - 1)

        return allowed, {
            "tier": tier,
            "limit": limits["requests"],
            "remaining": remaining,
            "reset": int(now + limits["window"])
        }

limiter = TieredRateLimiter(redis_client)

async def get_user_tier(request: Request) -> Tuple[str, UserTier]:
    # In production, get from auth token or database
    api_key = request.headers.get("X-API-Key", "")
    # Lookup user tier from API key
    return "user_123", UserTier.BASIC

@app.get("/api/resource")
async def get_resource(
    request: Request,
    user_info: Tuple[str, UserTier] = Depends(get_user_tier)
):
    user_id, tier = user_info

    allowed, info = limiter.check_limit(user_id, tier)

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail={
                "error": "Rate limit exceeded",
                "tier": tier,
                "limit": info["limit"],
                "reset": info["reset"]
            }
        )

    return {"data": "success", "rate_limit": info}

Distributed Rate Limiting

Redis Cluster

import redis
from redis.cluster import RedisCluster
import time

class DistributedRateLimiter:
    """Rate limiter for distributed systems using Redis Cluster."""

    def __init__(self, startup_nodes: list):
        self.redis = RedisCluster(
            startup_nodes=startup_nodes,
            decode_responses=True
        )

    def is_allowed(
        self,
        key: str,
        max_requests: int,
        window_seconds: int
    ) -> Tuple[bool, dict]:
        # Use Lua script for atomic operation
        lua_script = """
        local key = KEYS[1]
        local max_requests = tonumber(ARGV[1])
        local window = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])

        -- Remove old entries
        redis.call('ZREMRANGEBYSCORE', key, 0, now - window)

        -- Get current count
        local count = redis.call('ZCARD', key)

        if count < max_requests then
            -- Add request
            redis.call('ZADD', key, now, now .. '-' .. math.random())
            redis.call('EXPIRE', key, window)
            return {1, max_requests - count - 1}
        else
            return {0, 0}
        end
        """

        now = time.time()
        result = self.redis.eval(
            lua_script,
            1,  # number of keys
            f"rate_limit:{key}",
            max_requests,
            window_seconds,
            now
        )

        allowed = result[0] == 1
        remaining = result[1]

        return allowed, {
            "limit": max_requests,
            "remaining": remaining,
            "reset": int(now + window_seconds)
        }

Response Headers

from fastapi import FastAPI, Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware

class RateLimitHeadersMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)

        # Get rate limit info from request state
        rate_limit_info = getattr(request.state, "rate_limit_info", None)

        if rate_limit_info:
            response.headers["X-RateLimit-Limit"] = str(rate_limit_info["limit"])
            response.headers["X-RateLimit-Remaining"] = str(rate_limit_info["remaining"])
            response.headers["X-RateLimit-Reset"] = str(rate_limit_info["reset"])
            response.headers["X-RateLimit-Policy"] = rate_limit_info.get("policy", "sliding_window")

        return response

app = FastAPI()
app.add_middleware(RateLimitHeadersMiddleware)

Summary

AlgorithmBest For
Fixed WindowSimple, high performance
Sliding WindowAccurate, smooth
Token BucketAllowing bursts
Leaky BucketSmooth output

Rate limiting protects APIs and ensures fair resource usage across all clients.

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.