DevOps 9 min read

Microservices Design Patterns: Complete Architecture Guide

Master microservices architecture patterns. Learn service discovery, circuit breakers, saga pattern, API gateway, and build resilient distributed systems.

MR

Moshiour Rahman

Advertisement

What are Microservices?

Microservices architecture breaks applications into small, independent services that communicate over networks. Each service handles a specific business capability and can be developed, deployed, and scaled independently.

Monolith vs Microservices

MonolithMicroservices
Single deploymentIndependent deployments
Shared databaseDatabase per service
Tight couplingLoose coupling
Vertical scalingHorizontal scaling
Single tech stackPolyglot

API Gateway Pattern

Implementation with FastAPI

from fastapi import FastAPI, HTTPException, Request
import httpx
from typing import Dict

app = FastAPI()

# Service registry
SERVICES: Dict[str, str] = {
    "users": "http://user-service:8001",
    "orders": "http://order-service:8002",
    "products": "http://product-service:8003",
}

class APIGateway:
    def __init__(self):
        self.client = httpx.AsyncClient(timeout=30.0)

    async def forward_request(
        self,
        service: str,
        path: str,
        method: str,
        body: dict = None,
        headers: dict = None
    ):
        if service not in SERVICES:
            raise HTTPException(404, f"Service {service} not found")

        url = f"{SERVICES[service]}{path}"

        try:
            response = await self.client.request(
                method=method,
                url=url,
                json=body,
                headers=headers
            )
            return response.json()
        except httpx.RequestError as e:
            raise HTTPException(503, f"Service unavailable: {str(e)}")

gateway = APIGateway()

@app.api_route("/{service}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(service: str, path: str, request: Request):
    body = None
    if request.method in ["POST", "PUT"]:
        body = await request.json()

    return await gateway.forward_request(
        service=service,
        path=f"/{path}",
        method=request.method,
        body=body,
        headers=dict(request.headers)
    )

Request Aggregation

from fastapi import FastAPI
import httpx
import asyncio

app = FastAPI()

async def fetch_user(user_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://user-service:8001/users/{user_id}")
        return response.json()

async def fetch_orders(user_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://order-service:8002/users/{user_id}/orders")
        return response.json()

async def fetch_recommendations(user_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://recommendation-service:8004/users/{user_id}")
        return response.json()

@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int):
    """Aggregate data from multiple services."""
    user, orders, recommendations = await asyncio.gather(
        fetch_user(user_id),
        fetch_orders(user_id),
        fetch_recommendations(user_id)
    )

    return {
        "user": user,
        "recent_orders": orders[:5],
        "recommendations": recommendations[:10]
    }

Service Discovery

Consul Integration

import consul
import random
from typing import Optional

class ServiceDiscovery:
    def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
        self.consul = consul.Consul(host=consul_host, port=consul_port)

    def register_service(
        self,
        name: str,
        service_id: str,
        address: str,
        port: int,
        tags: list = None
    ):
        """Register service with Consul."""
        self.consul.agent.service.register(
            name=name,
            service_id=service_id,
            address=address,
            port=port,
            tags=tags or [],
            check=consul.Check.http(
                f"http://{address}:{port}/health",
                interval="10s",
                timeout="5s"
            )
        )

    def deregister_service(self, service_id: str):
        """Deregister service from Consul."""
        self.consul.agent.service.deregister(service_id)

    def get_service(self, name: str) -> Optional[dict]:
        """Get healthy service instance (load balanced)."""
        _, services = self.consul.health.service(name, passing=True)

        if not services:
            return None

        # Random load balancing
        service = random.choice(services)
        return {
            "address": service["Service"]["Address"],
            "port": service["Service"]["Port"]
        }

    def get_all_services(self, name: str) -> list:
        """Get all healthy instances of a service."""
        _, services = self.consul.health.service(name, passing=True)
        return [
            {
                "id": s["Service"]["ID"],
                "address": s["Service"]["Address"],
                "port": s["Service"]["Port"]
            }
            for s in services
        ]

# Usage
discovery = ServiceDiscovery()

# Register on startup
discovery.register_service(
    name="user-service",
    service_id="user-service-1",
    address="192.168.1.10",
    port=8001,
    tags=["api", "users"]
)

# Discover service
service = discovery.get_service("order-service")
if service:
    url = f"http://{service['address']}:{service['port']}/orders"

Circuit Breaker Pattern

Implementation

import time
from enum import Enum
from typing import Callable, Any
from functools import wraps
import threading

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 30,
        expected_exception: type = Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.lock = threading.Lock()

    def __call__(self, func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            return self._execute(func, *args, **kwargs)
        return wrapper

    def _execute(self, func: Callable, *args, **kwargs) -> Any:
        with self.lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise CircuitBreakerOpen("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise

    def _should_attempt_reset(self) -> bool:
        if self.last_failure_time is None:
            return False
        return time.time() - self.last_failure_time >= self.recovery_timeout

    def _on_success(self):
        with self.lock:
            self.failure_count = 0
            self.state = CircuitState.CLOSED

    def _on_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

class CircuitBreakerOpen(Exception):
    pass

# Usage
import httpx

circuit_breaker = CircuitBreaker(
    failure_threshold=3,
    recovery_timeout=30,
    expected_exception=httpx.RequestError
)

@circuit_breaker
def call_external_service(url: str) -> dict:
    response = httpx.get(url, timeout=5.0)
    response.raise_for_status()
    return response.json()

# With retry logic
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
@circuit_breaker
def resilient_call(url: str) -> dict:
    return httpx.get(url).json()

Saga Pattern

Choreography-Based Saga

import asyncio
from enum import Enum
from typing import Callable, List, Optional
from dataclasses import dataclass

class SagaStepStatus(Enum):
    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATED = "compensated"

@dataclass
class SagaStep:
    name: str
    execute: Callable
    compensate: Callable
    status: SagaStepStatus = SagaStepStatus.PENDING

class Saga:
    def __init__(self):
        self.steps: List[SagaStep] = []
        self.completed_steps: List[SagaStep] = []

    def add_step(
        self,
        name: str,
        execute: Callable,
        compensate: Callable
    ):
        self.steps.append(SagaStep(
            name=name,
            execute=execute,
            compensate=compensate
        ))

    async def execute(self, context: dict) -> dict:
        try:
            for step in self.steps:
                print(f"Executing step: {step.name}")
                result = await step.execute(context)
                context.update(result or {})
                step.status = SagaStepStatus.COMPLETED
                self.completed_steps.append(step)

            return context

        except Exception as e:
            print(f"Saga failed at step {step.name}: {e}")
            await self._compensate(context)
            raise

    async def _compensate(self, context: dict):
        for step in reversed(self.completed_steps):
            try:
                print(f"Compensating step: {step.name}")
                await step.compensate(context)
                step.status = SagaStepStatus.COMPENSATED
            except Exception as e:
                print(f"Compensation failed for {step.name}: {e}")

# Order Processing Saga Example
async def create_order(ctx):
    print(f"Creating order for user {ctx['user_id']}")
    ctx['order_id'] = 12345
    return {"order_id": ctx['order_id']}

async def cancel_order(ctx):
    print(f"Cancelling order {ctx['order_id']}")

async def reserve_inventory(ctx):
    print(f"Reserving inventory for order {ctx['order_id']}")
    ctx['inventory_reserved'] = True

async def release_inventory(ctx):
    print(f"Releasing inventory for order {ctx['order_id']}")

async def process_payment(ctx):
    print(f"Processing payment for order {ctx['order_id']}")
    # Simulate failure
    # raise Exception("Payment failed")
    ctx['payment_id'] = "pay_123"

async def refund_payment(ctx):
    print(f"Refunding payment {ctx.get('payment_id')}")

async def ship_order(ctx):
    print(f"Shipping order {ctx['order_id']}")

async def cancel_shipment(ctx):
    print(f"Cancelling shipment for order {ctx['order_id']}")

# Execute saga
async def process_order(user_id: int, items: list):
    saga = Saga()

    saga.add_step("create_order", create_order, cancel_order)
    saga.add_step("reserve_inventory", reserve_inventory, release_inventory)
    saga.add_step("process_payment", process_payment, refund_payment)
    saga.add_step("ship_order", ship_order, cancel_shipment)

    context = {"user_id": user_id, "items": items}

    try:
        result = await saga.execute(context)
        print(f"Order processed successfully: {result}")
    except Exception as e:
        print(f"Order processing failed: {e}")

# Run
asyncio.run(process_order(1, [{"product_id": 1, "quantity": 2}]))

Event-Driven Architecture

Event Bus with Redis

import redis
import json
from typing import Callable, Dict
from dataclasses import dataclass, asdict
from datetime import datetime
import threading

@dataclass
class Event:
    type: str
    data: dict
    timestamp: str = None

    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.utcnow().isoformat()

class EventBus:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.pubsub = self.redis.pubsub()
        self.handlers: Dict[str, list] = {}

    def publish(self, event: Event):
        """Publish event to all subscribers."""
        self.redis.publish(event.type, json.dumps(asdict(event)))

    def subscribe(self, event_type: str, handler: Callable):
        """Subscribe to event type."""
        if event_type not in self.handlers:
            self.handlers[event_type] = []
            self.pubsub.subscribe(event_type)

        self.handlers[event_type].append(handler)

    def start_listening(self):
        """Start listening for events in background."""
        def listen():
            for message in self.pubsub.listen():
                if message['type'] == 'message':
                    event_type = message['channel'].decode()
                    event_data = json.loads(message['data'])

                    for handler in self.handlers.get(event_type, []):
                        try:
                            handler(Event(**event_data))
                        except Exception as e:
                            print(f"Handler error: {e}")

        thread = threading.Thread(target=listen, daemon=True)
        thread.start()

# Usage
event_bus = EventBus()

# Define handlers
def on_order_created(event: Event):
    print(f"Order created: {event.data}")
    # Send confirmation email, update inventory, etc.

def on_payment_processed(event: Event):
    print(f"Payment processed: {event.data}")

# Subscribe
event_bus.subscribe("order.created", on_order_created)
event_bus.subscribe("payment.processed", on_payment_processed)
event_bus.start_listening()

# Publish events
event_bus.publish(Event(
    type="order.created",
    data={"order_id": 123, "user_id": 456, "total": 99.99}
))

CQRS Pattern

Command Query Responsibility Segregation

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional
import asyncio

# Commands
@dataclass
class CreateOrderCommand:
    user_id: int
    items: List[dict]

@dataclass
class UpdateOrderStatusCommand:
    order_id: int
    status: str

# Queries
@dataclass
class GetOrderQuery:
    order_id: int

@dataclass
class GetUserOrdersQuery:
    user_id: int

# Command Handler
class CommandHandler(ABC):
    @abstractmethod
    async def handle(self, command) -> dict:
        pass

class CreateOrderHandler(CommandHandler):
    def __init__(self, write_db, event_bus):
        self.write_db = write_db
        self.event_bus = event_bus

    async def handle(self, command: CreateOrderCommand) -> dict:
        # Create order in write database
        order_id = await self.write_db.create_order(
            user_id=command.user_id,
            items=command.items
        )

        # Publish event for read model update
        self.event_bus.publish(Event(
            type="order.created",
            data={"order_id": order_id, "user_id": command.user_id}
        ))

        return {"order_id": order_id}

# Query Handler
class QueryHandler(ABC):
    @abstractmethod
    async def handle(self, query) -> dict:
        pass

class GetOrderQueryHandler(QueryHandler):
    def __init__(self, read_db):
        self.read_db = read_db

    async def handle(self, query: GetOrderQuery) -> Optional[dict]:
        # Read from optimized read database
        return await self.read_db.get_order(query.order_id)

# Dispatcher
class CommandQueryDispatcher:
    def __init__(self):
        self.command_handlers = {}
        self.query_handlers = {}

    def register_command_handler(self, command_type, handler):
        self.command_handlers[command_type] = handler

    def register_query_handler(self, query_type, handler):
        self.query_handlers[query_type] = handler

    async def dispatch_command(self, command) -> dict:
        handler = self.command_handlers.get(type(command))
        if not handler:
            raise ValueError(f"No handler for {type(command)}")
        return await handler.handle(command)

    async def dispatch_query(self, query) -> dict:
        handler = self.query_handlers.get(type(query))
        if not handler:
            raise ValueError(f"No handler for {type(query)}")
        return await handler.handle(query)

Health Checks

from fastapi import FastAPI
from enum import Enum
import httpx
import redis
from typing import Dict

class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    DEGRADED = "degraded"

app = FastAPI()

async def check_database() -> bool:
    try:
        # Check database connection
        return True
    except Exception:
        return False

async def check_redis() -> bool:
    try:
        r = redis.Redis()
        return r.ping()
    except Exception:
        return False

async def check_external_service(url: str) -> bool:
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(url, timeout=5.0)
            return response.status_code == 200
    except Exception:
        return False

@app.get("/health")
async def health_check():
    checks: Dict[str, bool] = {
        "database": await check_database(),
        "redis": await check_redis(),
        "external_api": await check_external_service("https://api.example.com/health")
    }

    all_healthy = all(checks.values())
    some_healthy = any(checks.values())

    if all_healthy:
        status = HealthStatus.HEALTHY
    elif some_healthy:
        status = HealthStatus.DEGRADED
    else:
        status = HealthStatus.UNHEALTHY

    return {
        "status": status,
        "checks": checks
    }

@app.get("/health/live")
async def liveness():
    """Kubernetes liveness probe."""
    return {"status": "alive"}

@app.get("/health/ready")
async def readiness():
    """Kubernetes readiness probe."""
    db_ready = await check_database()
    return {"status": "ready" if db_ready else "not_ready"}

Summary

PatternUse Case
API GatewaySingle entry point, routing
Service DiscoveryDynamic service location
Circuit BreakerFault tolerance
SagaDistributed transactions
Event-DrivenAsync communication
CQRSSeparate read/write

Microservices patterns enable building resilient, scalable distributed systems.

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.