FastAPI Tutorial Part 9: JWT Authentication - Secure Your API
Implement JWT authentication in FastAPI. Learn token generation, password hashing, OAuth2 flows, refresh tokens, and protecting API endpoints.
Moshiour Rahman
Advertisement
Understanding JWT Authentication
JSON Web Tokens (JWT) provide a stateless authentication mechanism. The server generates a signed token containing user information, and clients include this token in subsequent requests.
Client Server
| |
|-------- Login (credentials) ------>|
|<------- JWT Token -----------------|
| |
|-------- Request + Token --------->|
|<------- Protected Data ------------|
Setup and Dependencies
pip install python-jose[cryptography] # JWT handling
pip install passlib[bcrypt] # Password hashing
pip install python-multipart # Form data
Core Components
Password Hashing
# app/auth/password.py
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
JWT Token Creation
# app/auth/jwt.py
from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError
SECRET_KEY = "your-secret-key-keep-it-secret" # Use env variable
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> Optional[dict]:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
User Models
# app/models/user.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from ..database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
is_verified = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
Pydantic Schemas
# app/schemas/auth.py
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
class UserCreate(BaseModel):
email: EmailStr
username: str = Field(min_length=3, max_length=50)
password: str = Field(min_length=8)
class UserLogin(BaseModel):
username: str
password: str
class UserResponse(BaseModel):
id: int
email: str
username: str
is_active: bool
class Config:
from_attributes = True
class Token(BaseModel):
access_token: str
token_type: str = "bearer"
class TokenData(BaseModel):
user_id: Optional[int] = None
username: Optional[str] = None
OAuth2 Implementation
OAuth2 Password Flow
# app/auth/oauth2.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from ..database import get_db
from ..models.user import User
from .jwt import decode_token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
payload = decode_token(token)
if payload is None:
raise credentials_exception
user_id = payload.get("sub")
if user_id is None:
raise credentials_exception
user = db.query(User).filter(User.id == int(user_id)).first()
if user is None:
raise credentials_exception
return user
def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return current_user
Authentication Router
# app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta
from ..database import get_db
from ..models.user import User
from ..schemas.auth import UserCreate, UserResponse, Token
from ..auth.password import hash_password, verify_password
from ..auth.jwt import create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES
from ..auth.oauth2 import get_current_active_user
router = APIRouter(prefix="/auth", tags=["Authentication"])
@router.post("/register", response_model=UserResponse, status_code=201)
def register(user: UserCreate, db: Session = Depends(get_db)):
"""Register a new user."""
# Check existing email
if db.query(User).filter(User.email == user.email).first():
raise HTTPException(
status_code=400,
detail="Email already registered"
)
# Check existing username
if db.query(User).filter(User.username == user.username).first():
raise HTTPException(
status_code=400,
detail="Username already taken"
)
# Create user
db_user = User(
email=user.email,
username=user.username,
hashed_password=hash_password(user.password)
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.post("/login", response_model=Token)
def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""Login and get access token."""
# Find user by username
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"}
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
# Create access token
access_token = create_access_token(
data={"sub": str(user.id), "username": user.username},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return Token(access_token=access_token)
@router.get("/me", response_model=UserResponse)
def get_me(current_user: User = Depends(get_current_active_user)):
"""Get current user profile."""
return current_user
Refresh Tokens
Enhanced Token System
# app/auth/jwt.py
REFRESH_TOKEN_EXPIRE_DAYS = 7
def create_refresh_token(data: dict) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_refresh_token(token: str) -> Optional[dict]:
payload = decode_token(token)
if payload and payload.get("type") == "refresh":
return payload
return None
Token Pair Response
# app/schemas/auth.py
class TokenPair(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class RefreshTokenRequest(BaseModel):
refresh_token: str
Refresh Endpoint
@router.post("/login", response_model=TokenPair)
def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=401,
detail="Incorrect credentials"
)
token_data = {"sub": str(user.id), "username": user.username}
return TokenPair(
access_token=create_access_token(token_data),
refresh_token=create_refresh_token(token_data)
)
@router.post("/refresh", response_model=Token)
def refresh_token(request: RefreshTokenRequest, db: Session = Depends(get_db)):
"""Get new access token using refresh token."""
payload = verify_refresh_token(request.refresh_token)
if not payload:
raise HTTPException(
status_code=401,
detail="Invalid refresh token"
)
user_id = payload.get("sub")
user = db.query(User).filter(User.id == int(user_id)).first()
if not user or not user.is_active:
raise HTTPException(
status_code=401,
detail="User not found or inactive"
)
return Token(
access_token=create_access_token({
"sub": str(user.id),
"username": user.username
})
)
Protected Endpoints
Basic Protection
from ..auth.oauth2 import get_current_active_user
@app.get("/protected")
def protected_route(current_user: User = Depends(get_current_active_user)):
return {"message": f"Hello {current_user.username}"}
Optional Authentication
from typing import Optional
def get_current_user_optional(
token: Optional[str] = Depends(OAuth2PasswordBearer(tokenUrl="auth/login", auto_error=False)),
db: Session = Depends(get_db)
) -> Optional[User]:
if not token:
return None
payload = decode_token(token)
if not payload:
return None
user_id = payload.get("sub")
return db.query(User).filter(User.id == int(user_id)).first()
@app.get("/items")
def list_items(
current_user: Optional[User] = Depends(get_current_user_optional)
):
items = get_all_items()
# Show extra info for authenticated users
if current_user:
return {"items": items, "user_favorites": get_favorites(current_user.id)}
return {"items": items}
Complete Example
# app/main.py
from fastapi import FastAPI
from .database import engine, Base
from .routers import auth, users, items
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Auth Demo API")
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(items.router)
# app/routers/users.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from ..database import get_db
from ..auth.oauth2 import get_current_active_user
from ..models.user import User
from ..schemas.auth import UserResponse
router = APIRouter(prefix="/users", tags=["Users"])
@router.get("/me", response_model=UserResponse)
def get_current_user_profile(user: User = Depends(get_current_active_user)):
return user
@router.put("/me", response_model=UserResponse)
def update_profile(
update: UserUpdate,
user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
for field, value in update.model_dump(exclude_unset=True).items():
setattr(user, field, value)
db.commit()
db.refresh(user)
return user
# app/routers/items.py
from fastapi import APIRouter, Depends
from ..auth.oauth2 import get_current_active_user
from ..models.user import User
router = APIRouter(prefix="/items", tags=["Items"])
@router.get("/")
def list_items():
"""Public endpoint - no auth required."""
return {"items": get_items()}
@router.post("/")
def create_item(
item: ItemCreate,
user: User = Depends(get_current_active_user)
):
"""Protected endpoint - auth required."""
return create_new_item(item, owner_id=user.id)
@router.delete("/{item_id}")
def delete_item(
item_id: int,
user: User = Depends(get_current_active_user)
):
"""Protected - can only delete own items."""
item = get_item(item_id)
if item.owner_id != user.id:
raise HTTPException(status_code=403, detail="Not your item")
return delete_item_by_id(item_id)
Testing Authentication
# Register
curl -X POST "http://localhost:8000/auth/register" \
-H "Content-Type: application/json" \
-d '{"email": "user@example.com", "username": "testuser", "password": "password123"}'
# Login (form data for OAuth2)
curl -X POST "http://localhost:8000/auth/login" \
-d "username=testuser&password=password123"
# Access protected endpoint
curl "http://localhost:8000/users/me" \
-H "Authorization: Bearer <your-token>"
# Refresh token
curl -X POST "http://localhost:8000/auth/refresh" \
-H "Content-Type: application/json" \
-d '{"refresh_token": "<your-refresh-token>"}'
Summary
| Component | Purpose |
|---|---|
passlib | Password hashing |
python-jose | JWT encoding/decoding |
OAuth2PasswordBearer | Token extraction |
Depends() | Inject auth into endpoints |
| Token Type | Lifetime | Usage |
|---|---|---|
| Access Token | Short (15-60 min) | API requests |
| Refresh Token | Long (days-weeks) | Get new access tokens |
Next Steps
In Part 10, we’ll explore Role-Based Access Control (RBAC) - implementing user roles, permissions, and fine-grained access control.
Series Navigation:
- Part 1-8: Foundations & Error Handling
- Part 9: JWT Authentication (You are here)
- Part 10: Role-Based Access Control
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 10: Role-Based Access Control (RBAC)
Implement RBAC in FastAPI. Learn user roles, permissions, scopes, and granular access control for secure multi-tenant applications.
PythonFastAPI Tutorial Part 18: API Security Best Practices
Secure your FastAPI application against common vulnerabilities. Learn input validation, rate limiting, CORS, and OWASP security patterns.
PythonFastAPI Tutorial Part 14: File Uploads and Storage
Handle file uploads in FastAPI. Learn form data, file validation, cloud storage integration with S3, and serving static files.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.