Microservices Design Patterns: Complete Architecture Guide
Master microservices architecture patterns. Learn service discovery, circuit breakers, saga pattern, API gateway, and build resilient distributed systems.
Moshiour Rahman
Advertisement
What are Microservices?
Microservices architecture breaks applications into small, independent services that communicate over networks. Each service handles a specific business capability and can be developed, deployed, and scaled independently.
Monolith vs Microservices
| Monolith | Microservices |
|---|---|
| Single deployment | Independent deployments |
| Shared database | Database per service |
| Tight coupling | Loose coupling |
| Vertical scaling | Horizontal scaling |
| Single tech stack | Polyglot |
API Gateway Pattern
Implementation with FastAPI
from fastapi import FastAPI, HTTPException, Request
import httpx
from typing import Dict
app = FastAPI()
# Service registry
SERVICES: Dict[str, str] = {
"users": "http://user-service:8001",
"orders": "http://order-service:8002",
"products": "http://product-service:8003",
}
class APIGateway:
def __init__(self):
self.client = httpx.AsyncClient(timeout=30.0)
async def forward_request(
self,
service: str,
path: str,
method: str,
body: dict = None,
headers: dict = None
):
if service not in SERVICES:
raise HTTPException(404, f"Service {service} not found")
url = f"{SERVICES[service]}{path}"
try:
response = await self.client.request(
method=method,
url=url,
json=body,
headers=headers
)
return response.json()
except httpx.RequestError as e:
raise HTTPException(503, f"Service unavailable: {str(e)}")
gateway = APIGateway()
@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(service: str, path: str, request: Request):
body = None
if request.method in ["POST", "PUT"]:
body = await request.json()
return await gateway.forward_request(
service=service,
path=f"/{path}",
method=request.method,
body=body,
headers=dict(request.headers)
)
Request Aggregation
from fastapi import FastAPI
import httpx
import asyncio
app = FastAPI()
async def fetch_user(user_id: int):
async with httpx.AsyncClient() as client:
response = await client.get(f"http://user-service:8001/users/{user_id}")
return response.json()
async def fetch_orders(user_id: int):
async with httpx.AsyncClient() as client:
response = await client.get(f"http://order-service:8002/users/{user_id}/orders")
return response.json()
async def fetch_recommendations(user_id: int):
async with httpx.AsyncClient() as client:
response = await client.get(f"http://recommendation-service:8004/users/{user_id}")
return response.json()
@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int):
"""Aggregate data from multiple services."""
user, orders, recommendations = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_recommendations(user_id)
)
return {
"user": user,
"recent_orders": orders[:5],
"recommendations": recommendations[:10]
}
Service Discovery
Consul Integration
import consul
import random
from typing import Optional
class ServiceDiscovery:
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.Consul(host=consul_host, port=consul_port)
def register_service(
self,
name: str,
service_id: str,
address: str,
port: int,
tags: list = None
):
"""Register service with Consul."""
self.consul.agent.service.register(
name=name,
service_id=service_id,
address=address,
port=port,
tags=tags or [],
check=consul.Check.http(
f"http://{address}:{port}/health",
interval="10s",
timeout="5s"
)
)
def deregister_service(self, service_id: str):
"""Deregister service from Consul."""
self.consul.agent.service.deregister(service_id)
def get_service(self, name: str) -> Optional[dict]:
"""Get healthy service instance (load balanced)."""
_, services = self.consul.health.service(name, passing=True)
if not services:
return None
# Random load balancing
service = random.choice(services)
return {
"address": service["Service"]["Address"],
"port": service["Service"]["Port"]
}
def get_all_services(self, name: str) -> list:
"""Get all healthy instances of a service."""
_, services = self.consul.health.service(name, passing=True)
return [
{
"id": s["Service"]["ID"],
"address": s["Service"]["Address"],
"port": s["Service"]["Port"]
}
for s in services
]
# Usage
discovery = ServiceDiscovery()
# Register on startup
discovery.register_service(
name="user-service",
service_id="user-service-1",
address="192.168.1.10",
port=8001,
tags=["api", "users"]
)
# Discover service
service = discovery.get_service("order-service")
if service:
url = f"http://{service['address']}:{service['port']}/orders"
Circuit Breaker Pattern
Implementation
import time
from enum import Enum
from typing import Callable, Any
from functools import wraps
import threading
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
expected_exception: type = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.lock = threading.Lock()
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
return self._execute(func, *args, **kwargs)
return wrapper
def _execute(self, func: Callable, *args, **kwargs) -> Any:
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen("Circuit breaker is open")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _should_attempt_reset(self) -> bool:
if self.last_failure_time is None:
return False
return time.time() - self.last_failure_time >= self.recovery_timeout
def _on_success(self):
with self.lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class CircuitBreakerOpen(Exception):
pass
# Usage
import httpx
circuit_breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=30,
expected_exception=httpx.RequestError
)
@circuit_breaker
def call_external_service(url: str) -> dict:
response = httpx.get(url, timeout=5.0)
response.raise_for_status()
return response.json()
# With retry logic
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
@circuit_breaker
def resilient_call(url: str) -> dict:
return httpx.get(url).json()
Saga Pattern
Choreography-Based Saga
import asyncio
from enum import Enum
from typing import Callable, List, Optional
from dataclasses import dataclass
class SagaStepStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATED = "compensated"
@dataclass
class SagaStep:
name: str
execute: Callable
compensate: Callable
status: SagaStepStatus = SagaStepStatus.PENDING
class Saga:
def __init__(self):
self.steps: List[SagaStep] = []
self.completed_steps: List[SagaStep] = []
def add_step(
self,
name: str,
execute: Callable,
compensate: Callable
):
self.steps.append(SagaStep(
name=name,
execute=execute,
compensate=compensate
))
async def execute(self, context: dict) -> dict:
try:
for step in self.steps:
print(f"Executing step: {step.name}")
result = await step.execute(context)
context.update(result or {})
step.status = SagaStepStatus.COMPLETED
self.completed_steps.append(step)
return context
except Exception as e:
print(f"Saga failed at step {step.name}: {e}")
await self._compensate(context)
raise
async def _compensate(self, context: dict):
for step in reversed(self.completed_steps):
try:
print(f"Compensating step: {step.name}")
await step.compensate(context)
step.status = SagaStepStatus.COMPENSATED
except Exception as e:
print(f"Compensation failed for {step.name}: {e}")
# Order Processing Saga Example
async def create_order(ctx):
print(f"Creating order for user {ctx['user_id']}")
ctx['order_id'] = 12345
return {"order_id": ctx['order_id']}
async def cancel_order(ctx):
print(f"Cancelling order {ctx['order_id']}")
async def reserve_inventory(ctx):
print(f"Reserving inventory for order {ctx['order_id']}")
ctx['inventory_reserved'] = True
async def release_inventory(ctx):
print(f"Releasing inventory for order {ctx['order_id']}")
async def process_payment(ctx):
print(f"Processing payment for order {ctx['order_id']}")
# Simulate failure
# raise Exception("Payment failed")
ctx['payment_id'] = "pay_123"
async def refund_payment(ctx):
print(f"Refunding payment {ctx.get('payment_id')}")
async def ship_order(ctx):
print(f"Shipping order {ctx['order_id']}")
async def cancel_shipment(ctx):
print(f"Cancelling shipment for order {ctx['order_id']}")
# Execute saga
async def process_order(user_id: int, items: list):
saga = Saga()
saga.add_step("create_order", create_order, cancel_order)
saga.add_step("reserve_inventory", reserve_inventory, release_inventory)
saga.add_step("process_payment", process_payment, refund_payment)
saga.add_step("ship_order", ship_order, cancel_shipment)
context = {"user_id": user_id, "items": items}
try:
result = await saga.execute(context)
print(f"Order processed successfully: {result}")
except Exception as e:
print(f"Order processing failed: {e}")
# Run
asyncio.run(process_order(1, [{"product_id": 1, "quantity": 2}]))
Event-Driven Architecture
Event Bus with Redis
import redis
import json
from typing import Callable, Dict
from dataclasses import dataclass, asdict
from datetime import datetime
import threading
@dataclass
class Event:
type: str
data: dict
timestamp: str = None
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.utcnow().isoformat()
class EventBus:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.pubsub = self.redis.pubsub()
self.handlers: Dict[str, list] = {}
def publish(self, event: Event):
"""Publish event to all subscribers."""
self.redis.publish(event.type, json.dumps(asdict(event)))
def subscribe(self, event_type: str, handler: Callable):
"""Subscribe to event type."""
if event_type not in self.handlers:
self.handlers[event_type] = []
self.pubsub.subscribe(event_type)
self.handlers[event_type].append(handler)
def start_listening(self):
"""Start listening for events in background."""
def listen():
for message in self.pubsub.listen():
if message['type'] == 'message':
event_type = message['channel'].decode()
event_data = json.loads(message['data'])
for handler in self.handlers.get(event_type, []):
try:
handler(Event(**event_data))
except Exception as e:
print(f"Handler error: {e}")
thread = threading.Thread(target=listen, daemon=True)
thread.start()
# Usage
event_bus = EventBus()
# Define handlers
def on_order_created(event: Event):
print(f"Order created: {event.data}")
# Send confirmation email, update inventory, etc.
def on_payment_processed(event: Event):
print(f"Payment processed: {event.data}")
# Subscribe
event_bus.subscribe("order.created", on_order_created)
event_bus.subscribe("payment.processed", on_payment_processed)
event_bus.start_listening()
# Publish events
event_bus.publish(Event(
type="order.created",
data={"order_id": 123, "user_id": 456, "total": 99.99}
))
CQRS Pattern
Command Query Responsibility Segregation
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional
import asyncio
# Commands
@dataclass
class CreateOrderCommand:
user_id: int
items: List[dict]
@dataclass
class UpdateOrderStatusCommand:
order_id: int
status: str
# Queries
@dataclass
class GetOrderQuery:
order_id: int
@dataclass
class GetUserOrdersQuery:
user_id: int
# Command Handler
class CommandHandler(ABC):
@abstractmethod
async def handle(self, command) -> dict:
pass
class CreateOrderHandler(CommandHandler):
def __init__(self, write_db, event_bus):
self.write_db = write_db
self.event_bus = event_bus
async def handle(self, command: CreateOrderCommand) -> dict:
# Create order in write database
order_id = await self.write_db.create_order(
user_id=command.user_id,
items=command.items
)
# Publish event for read model update
self.event_bus.publish(Event(
type="order.created",
data={"order_id": order_id, "user_id": command.user_id}
))
return {"order_id": order_id}
# Query Handler
class QueryHandler(ABC):
@abstractmethod
async def handle(self, query) -> dict:
pass
class GetOrderQueryHandler(QueryHandler):
def __init__(self, read_db):
self.read_db = read_db
async def handle(self, query: GetOrderQuery) -> Optional[dict]:
# Read from optimized read database
return await self.read_db.get_order(query.order_id)
# Dispatcher
class CommandQueryDispatcher:
def __init__(self):
self.command_handlers = {}
self.query_handlers = {}
def register_command_handler(self, command_type, handler):
self.command_handlers[command_type] = handler
def register_query_handler(self, query_type, handler):
self.query_handlers[query_type] = handler
async def dispatch_command(self, command) -> dict:
handler = self.command_handlers.get(type(command))
if not handler:
raise ValueError(f"No handler for {type(command)}")
return await handler.handle(command)
async def dispatch_query(self, query) -> dict:
handler = self.query_handlers.get(type(query))
if not handler:
raise ValueError(f"No handler for {type(query)}")
return await handler.handle(query)
Health Checks
from fastapi import FastAPI
from enum import Enum
import httpx
import redis
from typing import Dict
class HealthStatus(str, Enum):
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
DEGRADED = "degraded"
app = FastAPI()
async def check_database() -> bool:
try:
# Check database connection
return True
except Exception:
return False
async def check_redis() -> bool:
try:
r = redis.Redis()
return r.ping()
except Exception:
return False
async def check_external_service(url: str) -> bool:
try:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=5.0)
return response.status_code == 200
except Exception:
return False
@app.get("/health")
async def health_check():
checks: Dict[str, bool] = {
"database": await check_database(),
"redis": await check_redis(),
"external_api": await check_external_service("https://api.example.com/health")
}
all_healthy = all(checks.values())
some_healthy = any(checks.values())
if all_healthy:
status = HealthStatus.HEALTHY
elif some_healthy:
status = HealthStatus.DEGRADED
else:
status = HealthStatus.UNHEALTHY
return {
"status": status,
"checks": checks
}
@app.get("/health/live")
async def liveness():
"""Kubernetes liveness probe."""
return {"status": "alive"}
@app.get("/health/ready")
async def readiness():
"""Kubernetes readiness probe."""
db_ready = await check_database()
return {"status": "ready" if db_ready else "not_ready"}
Summary
| Pattern | Use Case |
|---|---|
| API Gateway | Single entry point, routing |
| Service Discovery | Dynamic service location |
| Circuit Breaker | Fault tolerance |
| Saga | Distributed transactions |
| Event-Driven | Async communication |
| CQRS | Separate read/write |
Microservices patterns enable building resilient, scalable distributed systems.
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
Stop Wrestling YAML: How to Deploy 50 AI Models with Python Loops
Infrastructure as Code shouldn't be a copy-paste nightmare. Learn how to use Pulumi and Python to programmatically deploy scalable AI infrastructure without the YAML fatigue.
DevOpsDocker Compose for Microservices: Complete Development Guide
Master Docker Compose for local microservices development. Learn multi-container orchestration, networking, volumes, and production-ready configurations.
DevOpsService Mesh with Istio: Complete Kubernetes Guide
Master Istio service mesh for Kubernetes. Learn traffic management, security, observability, and build resilient microservices architectures.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.