FastAPI Tutorial Part 12: WebSockets - Real-Time Communication
Build real-time features with FastAPI WebSockets. Learn bidirectional communication, chat systems, live updates, and broadcasting patterns.
Moshiour Rahman
Advertisement
Understanding WebSockets
WebSockets enable persistent, bidirectional communication between client and server, unlike HTTP’s request-response model.
HTTP: Client --request--> Server --response--> Client (connection closes)
WebSocket: Client <========== Full Duplex ===========> Server (stays open)
Basic WebSocket
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message received: {data}")
Client (JavaScript)
const ws = new WebSocket("ws://localhost:8000/ws");
ws.onopen = () => {
console.log("Connected");
ws.send("Hello Server!");
};
ws.onmessage = (event) => {
console.log("Received:", event.data);
};
ws.onclose = () => {
console.log("Disconnected");
};
Connection Management
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f"{client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"{client_id} left the chat")
Chat Room Example
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Set
from pydantic import BaseModel
import json
app = FastAPI()
class Room:
def __init__(self):
self.connections: Dict[str, WebSocket] = {}
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.connections[user_id] = websocket
await self.broadcast_system(f"{user_id} joined")
def disconnect(self, user_id: str):
if user_id in self.connections:
del self.connections[user_id]
async def broadcast(self, message: dict):
for websocket in self.connections.values():
await websocket.send_json(message)
async def broadcast_system(self, text: str):
await self.broadcast({"type": "system", "message": text})
async def send_to_user(self, user_id: str, message: dict):
if user_id in self.connections:
await self.connections[user_id].send_json(message)
rooms: Dict[str, Room] = {}
def get_room(room_id: str) -> Room:
if room_id not in rooms:
rooms[room_id] = Room()
return rooms[room_id]
@app.websocket("/chat/{room_id}/{user_id}")
async def chat_endpoint(
websocket: WebSocket,
room_id: str,
user_id: str
):
room = get_room(room_id)
await room.connect(user_id, websocket)
try:
while True:
data = await websocket.receive_json()
if data["type"] == "message":
await room.broadcast({
"type": "message",
"user": user_id,
"content": data["content"]
})
elif data["type"] == "private":
await room.send_to_user(data["to"], {
"type": "private",
"from": user_id,
"content": data["content"]
})
except WebSocketDisconnect:
room.disconnect(user_id)
await room.broadcast_system(f"{user_id} left")
Authentication
from fastapi import WebSocket, status, Query
from .auth.jwt import decode_token
async def get_ws_user(
websocket: WebSocket,
token: str = Query(...)
) -> User:
payload = decode_token(token)
if not payload:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return None
user = get_user_by_id(payload["sub"])
return user
@app.websocket("/ws")
async def authenticated_ws(
websocket: WebSocket,
user: User = Depends(get_ws_user)
):
if not user:
return
await websocket.accept()
# Handle authenticated connection
Live Data Streaming
import asyncio
from datetime import datetime
@app.websocket("/ws/prices")
async def price_stream(websocket: WebSocket):
await websocket.accept()
try:
while True:
prices = get_current_prices()
await websocket.send_json({
"timestamp": datetime.now().isoformat(),
"prices": prices
})
await asyncio.sleep(1) # Send every second
except WebSocketDisconnect:
pass
@app.websocket("/ws/notifications/{user_id}")
async def notifications_stream(websocket: WebSocket, user_id: str):
await websocket.accept()
async def notification_listener():
while True:
notification = await get_user_notifications(user_id)
if notification:
await websocket.send_json(notification)
await asyncio.sleep(0.5)
try:
asyncio.create_task(notification_listener())
while True:
# Keep connection alive, handle client messages
data = await websocket.receive_text()
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
pass
Complete Example
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from typing import Dict, List
import json
app = FastAPI()
class ChatManager:
def __init__(self):
self.rooms: Dict[str, Dict[str, WebSocket]] = {}
async def join_room(self, room_id: str, user_id: str, ws: WebSocket):
await ws.accept()
if room_id not in self.rooms:
self.rooms[room_id] = {}
self.rooms[room_id][user_id] = ws
await self.broadcast_to_room(room_id, {
"type": "join",
"user": user_id,
"users": list(self.rooms[room_id].keys())
})
def leave_room(self, room_id: str, user_id: str):
if room_id in self.rooms and user_id in self.rooms[room_id]:
del self.rooms[room_id][user_id]
async def broadcast_to_room(self, room_id: str, message: dict):
if room_id in self.rooms:
for ws in self.rooms[room_id].values():
await ws.send_json(message)
manager = ChatManager()
@app.websocket("/ws/chat/{room_id}/{user_id}")
async def chat(websocket: WebSocket, room_id: str, user_id: str):
await manager.join_room(room_id, user_id, websocket)
try:
while True:
data = await websocket.receive_json()
await manager.broadcast_to_room(room_id, {
"type": "message",
"user": user_id,
"content": data.get("content", "")
})
except WebSocketDisconnect:
manager.leave_room(room_id, user_id)
await manager.broadcast_to_room(room_id, {
"type": "leave",
"user": user_id
})
Summary
| Feature | Description |
|---|---|
websocket.accept() | Accept connection |
websocket.receive_*() | Receive data |
websocket.send_*() | Send data |
WebSocketDisconnect | Handle disconnection |
| Pattern | Use Case |
|---|---|
| Connection Manager | Multi-client broadcast |
| Rooms | Group communication |
| Streaming | Live data updates |
Next Steps
In Part 13, we’ll explore Middleware and Events - intercepting requests and handling application lifecycle.
Series Navigation:
- Part 1-11: Foundations & Tasks
- Part 12: WebSockets (You are here)
- Part 13: Middleware & Events
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 14: File Uploads and Storage
Handle file uploads in FastAPI. Learn form data, file validation, cloud storage integration with S3, and serving static files.
PythonFastAPI Tutorial Part 11: Background Tasks and Celery
Handle long-running operations in FastAPI. Learn built-in BackgroundTasks, Celery integration, task queues, and async processing patterns.
PythonFastAPI Tutorial Part 7: CRUD Operations - Build a Complete REST API
Build production-ready CRUD APIs with FastAPI. Learn RESTful patterns, pagination, filtering, bulk operations, and best practices for real-world applications.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.