Python 7 min read

FastAPI Tutorial Part 9: JWT Authentication - Secure Your API

Implement JWT authentication in FastAPI. Learn token generation, password hashing, OAuth2 flows, refresh tokens, and protecting API endpoints.

MR

Moshiour Rahman

Advertisement

Understanding JWT Authentication

JSON Web Tokens (JWT) provide a stateless authentication mechanism. The server generates a signed token containing user information, and clients include this token in subsequent requests.

Client                              Server
  |                                    |
  |-------- Login (credentials) ------>|
  |<------- JWT Token -----------------|
  |                                    |
  |-------- Request + Token --------->|
  |<------- Protected Data ------------|

Setup and Dependencies

pip install python-jose[cryptography]  # JWT handling
pip install passlib[bcrypt]             # Password hashing
pip install python-multipart            # Form data

Core Components

Password Hashing

# app/auth/password.py
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

JWT Token Creation

# app/auth/jwt.py
from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError

SECRET_KEY = "your-secret-key-keep-it-secret"  # Use env variable
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    to_encode = data.copy()

    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

    to_encode.update({"exp": expire, "type": "access"})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def decode_token(token: str) -> Optional[dict]:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        return None

User Models

# app/models/user.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from ..database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    username = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)
    is_verified = Column(Boolean, default=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

Pydantic Schemas

# app/schemas/auth.py
from pydantic import BaseModel, EmailStr, Field
from typing import Optional

class UserCreate(BaseModel):
    email: EmailStr
    username: str = Field(min_length=3, max_length=50)
    password: str = Field(min_length=8)

class UserLogin(BaseModel):
    username: str
    password: str

class UserResponse(BaseModel):
    id: int
    email: str
    username: str
    is_active: bool

    class Config:
        from_attributes = True

class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"

class TokenData(BaseModel):
    user_id: Optional[int] = None
    username: Optional[str] = None

OAuth2 Implementation

OAuth2 Password Flow

# app/auth/oauth2.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from ..database import get_db
from ..models.user import User
from .jwt import decode_token

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login")

def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"}
    )

    payload = decode_token(token)
    if payload is None:
        raise credentials_exception

    user_id = payload.get("sub")
    if user_id is None:
        raise credentials_exception

    user = db.query(User).filter(User.id == int(user_id)).first()
    if user is None:
        raise credentials_exception

    return user

def get_current_active_user(
    current_user: User = Depends(get_current_user)
) -> User:
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Inactive user"
        )
    return current_user

Authentication Router

# app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import timedelta

from ..database import get_db
from ..models.user import User
from ..schemas.auth import UserCreate, UserResponse, Token
from ..auth.password import hash_password, verify_password
from ..auth.jwt import create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES
from ..auth.oauth2 import get_current_active_user

router = APIRouter(prefix="/auth", tags=["Authentication"])

@router.post("/register", response_model=UserResponse, status_code=201)
def register(user: UserCreate, db: Session = Depends(get_db)):
    """Register a new user."""
    # Check existing email
    if db.query(User).filter(User.email == user.email).first():
        raise HTTPException(
            status_code=400,
            detail="Email already registered"
        )

    # Check existing username
    if db.query(User).filter(User.username == user.username).first():
        raise HTTPException(
            status_code=400,
            detail="Username already taken"
        )

    # Create user
    db_user = User(
        email=user.email,
        username=user.username,
        hashed_password=hash_password(user.password)
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)

    return db_user

@router.post("/login", response_model=Token)
def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: Session = Depends(get_db)
):
    """Login and get access token."""
    # Find user by username
    user = db.query(User).filter(User.username == form_data.username).first()

    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"}
        )

    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Inactive user"
        )

    # Create access token
    access_token = create_access_token(
        data={"sub": str(user.id), "username": user.username},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    )

    return Token(access_token=access_token)

@router.get("/me", response_model=UserResponse)
def get_me(current_user: User = Depends(get_current_active_user)):
    """Get current user profile."""
    return current_user

Refresh Tokens

Enhanced Token System

# app/auth/jwt.py
REFRESH_TOKEN_EXPIRE_DAYS = 7

def create_refresh_token(data: dict) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    to_encode.update({"exp": expire, "type": "refresh"})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

def verify_refresh_token(token: str) -> Optional[dict]:
    payload = decode_token(token)
    if payload and payload.get("type") == "refresh":
        return payload
    return None

Token Pair Response

# app/schemas/auth.py
class TokenPair(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"

class RefreshTokenRequest(BaseModel):
    refresh_token: str

Refresh Endpoint

@router.post("/login", response_model=TokenPair)
def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: Session = Depends(get_db)
):
    user = authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=401,
            detail="Incorrect credentials"
        )

    token_data = {"sub": str(user.id), "username": user.username}

    return TokenPair(
        access_token=create_access_token(token_data),
        refresh_token=create_refresh_token(token_data)
    )

@router.post("/refresh", response_model=Token)
def refresh_token(request: RefreshTokenRequest, db: Session = Depends(get_db)):
    """Get new access token using refresh token."""
    payload = verify_refresh_token(request.refresh_token)

    if not payload:
        raise HTTPException(
            status_code=401,
            detail="Invalid refresh token"
        )

    user_id = payload.get("sub")
    user = db.query(User).filter(User.id == int(user_id)).first()

    if not user or not user.is_active:
        raise HTTPException(
            status_code=401,
            detail="User not found or inactive"
        )

    return Token(
        access_token=create_access_token({
            "sub": str(user.id),
            "username": user.username
        })
    )

Protected Endpoints

Basic Protection

from ..auth.oauth2 import get_current_active_user

@app.get("/protected")
def protected_route(current_user: User = Depends(get_current_active_user)):
    return {"message": f"Hello {current_user.username}"}

Optional Authentication

from typing import Optional

def get_current_user_optional(
    token: Optional[str] = Depends(OAuth2PasswordBearer(tokenUrl="auth/login", auto_error=False)),
    db: Session = Depends(get_db)
) -> Optional[User]:
    if not token:
        return None

    payload = decode_token(token)
    if not payload:
        return None

    user_id = payload.get("sub")
    return db.query(User).filter(User.id == int(user_id)).first()

@app.get("/items")
def list_items(
    current_user: Optional[User] = Depends(get_current_user_optional)
):
    items = get_all_items()

    # Show extra info for authenticated users
    if current_user:
        return {"items": items, "user_favorites": get_favorites(current_user.id)}

    return {"items": items}

Complete Example

# app/main.py
from fastapi import FastAPI
from .database import engine, Base
from .routers import auth, users, items

Base.metadata.create_all(bind=engine)

app = FastAPI(title="Auth Demo API")

app.include_router(auth.router)
app.include_router(users.router)
app.include_router(items.router)

# app/routers/users.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from ..database import get_db
from ..auth.oauth2 import get_current_active_user
from ..models.user import User
from ..schemas.auth import UserResponse

router = APIRouter(prefix="/users", tags=["Users"])

@router.get("/me", response_model=UserResponse)
def get_current_user_profile(user: User = Depends(get_current_active_user)):
    return user

@router.put("/me", response_model=UserResponse)
def update_profile(
    update: UserUpdate,
    user: User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    for field, value in update.model_dump(exclude_unset=True).items():
        setattr(user, field, value)
    db.commit()
    db.refresh(user)
    return user

# app/routers/items.py
from fastapi import APIRouter, Depends
from ..auth.oauth2 import get_current_active_user
from ..models.user import User

router = APIRouter(prefix="/items", tags=["Items"])

@router.get("/")
def list_items():
    """Public endpoint - no auth required."""
    return {"items": get_items()}

@router.post("/")
def create_item(
    item: ItemCreate,
    user: User = Depends(get_current_active_user)
):
    """Protected endpoint - auth required."""
    return create_new_item(item, owner_id=user.id)

@router.delete("/{item_id}")
def delete_item(
    item_id: int,
    user: User = Depends(get_current_active_user)
):
    """Protected - can only delete own items."""
    item = get_item(item_id)
    if item.owner_id != user.id:
        raise HTTPException(status_code=403, detail="Not your item")
    return delete_item_by_id(item_id)

Testing Authentication

# Register
curl -X POST "http://localhost:8000/auth/register" \
  -H "Content-Type: application/json" \
  -d '{"email": "user@example.com", "username": "testuser", "password": "password123"}'

# Login (form data for OAuth2)
curl -X POST "http://localhost:8000/auth/login" \
  -d "username=testuser&password=password123"

# Access protected endpoint
curl "http://localhost:8000/users/me" \
  -H "Authorization: Bearer <your-token>"

# Refresh token
curl -X POST "http://localhost:8000/auth/refresh" \
  -H "Content-Type: application/json" \
  -d '{"refresh_token": "<your-refresh-token>"}'

Summary

ComponentPurpose
passlibPassword hashing
python-joseJWT encoding/decoding
OAuth2PasswordBearerToken extraction
Depends()Inject auth into endpoints
Token TypeLifetimeUsage
Access TokenShort (15-60 min)API requests
Refresh TokenLong (days-weeks)Get new access tokens

Next Steps

In Part 10, we’ll explore Role-Based Access Control (RBAC) - implementing user roles, permissions, and fine-grained access control.

Series Navigation:

  • Part 1-8: Foundations & Error Handling
  • Part 9: JWT Authentication (You are here)
  • Part 10: Role-Based Access Control

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.