Python 4 min read

FastAPI Tutorial Part 12: WebSockets - Real-Time Communication

Build real-time features with FastAPI WebSockets. Learn bidirectional communication, chat systems, live updates, and broadcasting patterns.

MR

Moshiour Rahman

Advertisement

Understanding WebSockets

WebSockets enable persistent, bidirectional communication between client and server, unlike HTTP’s request-response model.

HTTP:     Client --request--> Server --response--> Client (connection closes)
WebSocket: Client <========== Full Duplex ===========> Server (stays open)

Basic WebSocket

from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Message received: {data}")

Client (JavaScript)

const ws = new WebSocket("ws://localhost:8000/ws");

ws.onopen = () => {
    console.log("Connected");
    ws.send("Hello Server!");
};

ws.onmessage = (event) => {
    console.log("Received:", event.data);
};

ws.onclose = () => {
    console.log("Disconnected");
};

Connection Management

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await manager.broadcast(f"{client_id}: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"{client_id} left the chat")

Chat Room Example

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
from pydantic import BaseModel
import json

app = FastAPI()

class Room:
    def __init__(self):
        self.connections: Dict[str, WebSocket] = {}

    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        self.connections[user_id] = websocket
        await self.broadcast_system(f"{user_id} joined")

    def disconnect(self, user_id: str):
        if user_id in self.connections:
            del self.connections[user_id]

    async def broadcast(self, message: dict):
        for websocket in self.connections.values():
            await websocket.send_json(message)

    async def broadcast_system(self, text: str):
        await self.broadcast({"type": "system", "message": text})

    async def send_to_user(self, user_id: str, message: dict):
        if user_id in self.connections:
            await self.connections[user_id].send_json(message)

rooms: Dict[str, Room] = {}

def get_room(room_id: str) -> Room:
    if room_id not in rooms:
        rooms[room_id] = Room()
    return rooms[room_id]

@app.websocket("/chat/{room_id}/{user_id}")
async def chat_endpoint(
    websocket: WebSocket,
    room_id: str,
    user_id: str
):
    room = get_room(room_id)
    await room.connect(user_id, websocket)

    try:
        while True:
            data = await websocket.receive_json()

            if data["type"] == "message":
                await room.broadcast({
                    "type": "message",
                    "user": user_id,
                    "content": data["content"]
                })

            elif data["type"] == "private":
                await room.send_to_user(data["to"], {
                    "type": "private",
                    "from": user_id,
                    "content": data["content"]
                })

    except WebSocketDisconnect:
        room.disconnect(user_id)
        await room.broadcast_system(f"{user_id} left")

Authentication

from fastapi import WebSocket, status, Query
from .auth.jwt import decode_token

async def get_ws_user(
    websocket: WebSocket,
    token: str = Query(...)
) -> User:
    payload = decode_token(token)
    if not payload:
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
        return None

    user = get_user_by_id(payload["sub"])
    return user

@app.websocket("/ws")
async def authenticated_ws(
    websocket: WebSocket,
    user: User = Depends(get_ws_user)
):
    if not user:
        return

    await websocket.accept()
    # Handle authenticated connection

Live Data Streaming

import asyncio
from datetime import datetime

@app.websocket("/ws/prices")
async def price_stream(websocket: WebSocket):
    await websocket.accept()

    try:
        while True:
            prices = get_current_prices()
            await websocket.send_json({
                "timestamp": datetime.now().isoformat(),
                "prices": prices
            })
            await asyncio.sleep(1)  # Send every second
    except WebSocketDisconnect:
        pass

@app.websocket("/ws/notifications/{user_id}")
async def notifications_stream(websocket: WebSocket, user_id: str):
    await websocket.accept()

    async def notification_listener():
        while True:
            notification = await get_user_notifications(user_id)
            if notification:
                await websocket.send_json(notification)
            await asyncio.sleep(0.5)

    try:
        asyncio.create_task(notification_listener())

        while True:
            # Keep connection alive, handle client messages
            data = await websocket.receive_text()
            if data == "ping":
                await websocket.send_text("pong")
    except WebSocketDisconnect:
        pass

Complete Example

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from typing import Dict, List
import json

app = FastAPI()

class ChatManager:
    def __init__(self):
        self.rooms: Dict[str, Dict[str, WebSocket]] = {}

    async def join_room(self, room_id: str, user_id: str, ws: WebSocket):
        await ws.accept()

        if room_id not in self.rooms:
            self.rooms[room_id] = {}

        self.rooms[room_id][user_id] = ws
        await self.broadcast_to_room(room_id, {
            "type": "join",
            "user": user_id,
            "users": list(self.rooms[room_id].keys())
        })

    def leave_room(self, room_id: str, user_id: str):
        if room_id in self.rooms and user_id in self.rooms[room_id]:
            del self.rooms[room_id][user_id]

    async def broadcast_to_room(self, room_id: str, message: dict):
        if room_id in self.rooms:
            for ws in self.rooms[room_id].values():
                await ws.send_json(message)

manager = ChatManager()

@app.websocket("/ws/chat/{room_id}/{user_id}")
async def chat(websocket: WebSocket, room_id: str, user_id: str):
    await manager.join_room(room_id, user_id, websocket)

    try:
        while True:
            data = await websocket.receive_json()
            await manager.broadcast_to_room(room_id, {
                "type": "message",
                "user": user_id,
                "content": data.get("content", "")
            })
    except WebSocketDisconnect:
        manager.leave_room(room_id, user_id)
        await manager.broadcast_to_room(room_id, {
            "type": "leave",
            "user": user_id
        })

Summary

FeatureDescription
websocket.accept()Accept connection
websocket.receive_*()Receive data
websocket.send_*()Send data
WebSocketDisconnectHandle disconnection
PatternUse Case
Connection ManagerMulti-client broadcast
RoomsGroup communication
StreamingLive data updates

Next Steps

In Part 13, we’ll explore Middleware and Events - intercepting requests and handling application lifecycle.

Series Navigation:

  • Part 1-11: Foundations & Tasks
  • Part 12: WebSockets (You are here)
  • Part 13: Middleware & Events

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.