Python 10 min read

FastAPI Tutorial Part 6: Database Integration with SQLAlchemy

Connect FastAPI to databases using SQLAlchemy. Learn ORM models, relationships, migrations with Alembic, and async database operations.

MR

Moshiour Rahman

Advertisement

Database Options in FastAPI

FastAPI works with any database through various libraries:

DatabaseLibraryAsync Support
PostgreSQLSQLAlchemy, asyncpgYes
MySQLSQLAlchemy, aiomysqlYes
SQLiteSQLAlchemy, aiosqliteYes
MongoDBMotor, BeanieYes

We’ll focus on SQLAlchemy, the most popular Python ORM.

Project Setup

Install Dependencies

pip install sqlalchemy
pip install psycopg2-binary  # PostgreSQL
# or
pip install aiosqlite  # Async SQLite

Project Structure

app/
├── __init__.py
├── main.py
├── database.py      # Database configuration
├── models.py        # SQLAlchemy models
├── schemas.py       # Pydantic schemas
├── crud.py          # Database operations
└── routers/
    └── users.py

Database Configuration

Synchronous Setup

# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

SQLALCHEMY_DATABASE_URL = "sqlite:///./app.db"
# PostgreSQL: "postgresql://user:password@localhost/dbname"

engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    connect_args={"check_same_thread": False}  # SQLite only
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

Async Setup

# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base

DATABASE_URL = "sqlite+aiosqlite:///./app.db"
# PostgreSQL: "postgresql+asyncpg://user:password@localhost/dbname"

engine = create_async_engine(DATABASE_URL, echo=True)

async_session = sessionmaker(
    engine, class_=AsyncSession, expire_on_commit=False
)

Base = declarative_base()

async def get_db():
    async with async_session() as session:
        yield session

SQLAlchemy Models

Basic Model

# app/models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Float
from sqlalchemy.sql import func
from .database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True, nullable=False)
    username = Column(String, unique=True, index=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

Model with Relationships

from sqlalchemy import Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import relationship

# Many-to-many association table
post_tags = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", Integer, ForeignKey("posts.id")),
    Column("tag_id", Integer, ForeignKey("tags.id"))
)

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    username = Column(String, unique=True, index=True)

    # One-to-many relationship
    posts = relationship("Post", back_populates="author")

class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)
    content = Column(String)
    author_id = Column(Integer, ForeignKey("users.id"))

    # Relationships
    author = relationship("User", back_populates="posts")
    tags = relationship("Tag", secondary=post_tags, back_populates="posts")
    comments = relationship("Comment", back_populates="post")

class Tag(Base):
    __tablename__ = "tags"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, unique=True, index=True)

    posts = relationship("Post", secondary=post_tags, back_populates="tags")

class Comment(Base):
    __tablename__ = "comments"

    id = Column(Integer, primary_key=True, index=True)
    content = Column(String)
    post_id = Column(Integer, ForeignKey("posts.id"))

    post = relationship("Post", back_populates="comments")

Pydantic Schemas

Separate Input/Output Schemas

# app/schemas.py
from pydantic import BaseModel, EmailStr
from typing import Optional, List
from datetime import datetime

# User schemas
class UserBase(BaseModel):
    email: EmailStr
    username: str

class UserCreate(UserBase):
    password: str

class UserUpdate(BaseModel):
    email: Optional[EmailStr] = None
    username: Optional[str] = None
    is_active: Optional[bool] = None

class UserResponse(UserBase):
    id: int
    is_active: bool
    created_at: datetime

    class Config:
        from_attributes = True  # Enable ORM mode

# Post schemas
class PostBase(BaseModel):
    title: str
    content: str

class PostCreate(PostBase):
    tag_ids: List[int] = []

class PostResponse(PostBase):
    id: int
    author_id: int
    created_at: datetime

    class Config:
        from_attributes = True

class PostWithAuthor(PostResponse):
    author: UserResponse

CRUD Operations

Basic CRUD Functions

# app/crud.py
from sqlalchemy.orm import Session
from sqlalchemy import select
from . import models, schemas
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# User CRUD
def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()

def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()

def get_users(db: Session, skip: int = 0, limit: int = 100):
    return db.query(models.User).offset(skip).limit(limit).all()

def create_user(db: Session, user: schemas.UserCreate):
    hashed_password = pwd_context.hash(user.password)
    db_user = models.User(
        email=user.email,
        username=user.username,
        hashed_password=hashed_password
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

def update_user(db: Session, user_id: int, user: schemas.UserUpdate):
    db_user = get_user(db, user_id)
    if not db_user:
        return None

    update_data = user.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(db_user, field, value)

    db.commit()
    db.refresh(db_user)
    return db_user

def delete_user(db: Session, user_id: int):
    db_user = get_user(db, user_id)
    if db_user:
        db.delete(db_user)
        db.commit()
    return db_user

Async CRUD Functions

# app/crud_async.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload

async def get_user(db: AsyncSession, user_id: int):
    result = await db.execute(
        select(models.User).where(models.User.id == user_id)
    )
    return result.scalar_one_or_none()

async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100):
    result = await db.execute(
        select(models.User).offset(skip).limit(limit)
    )
    return result.scalars().all()

async def create_user(db: AsyncSession, user: schemas.UserCreate):
    db_user = models.User(
        email=user.email,
        username=user.username,
        hashed_password=pwd_context.hash(user.password)
    )
    db.add(db_user)
    await db.commit()
    await db.refresh(db_user)
    return db_user

# With eager loading for relationships
async def get_post_with_author(db: AsyncSession, post_id: int):
    result = await db.execute(
        select(models.Post)
        .options(selectinload(models.Post.author))
        .where(models.Post.id == post_id)
    )
    return result.scalar_one_or_none()

FastAPI Integration

Main Application

# app/main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List

from . import crud, models, schemas
from .database import engine, get_db

# Create tables
models.Base.metadata.create_all(bind=engine)

app = FastAPI()

@app.post("/users/", response_model=schemas.UserResponse)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="Email already registered")
    return crud.create_user(db=db, user=user)

@app.get("/users/", response_model=List[schemas.UserResponse])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    return crud.get_users(db, skip=skip, limit=limit)

@app.get("/users/{user_id}", response_model=schemas.UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

@app.put("/users/{user_id}", response_model=schemas.UserResponse)
def update_user(
    user_id: int,
    user: schemas.UserUpdate,
    db: Session = Depends(get_db)
):
    db_user = crud.update_user(db, user_id, user)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

@app.delete("/users/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.delete_user(db, user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return {"message": "User deleted"}

Async Endpoints

from sqlalchemy.ext.asyncio import AsyncSession
from .database import get_db  # async version

@app.get("/users/{user_id}", response_model=schemas.UserResponse)
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
    db_user = await crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

Query Patterns

Filtering

def get_users_by_status(db: Session, is_active: bool):
    return db.query(models.User).filter(
        models.User.is_active == is_active
    ).all()

def search_users(db: Session, query: str):
    return db.query(models.User).filter(
        models.User.username.ilike(f"%{query}%")
    ).all()

def get_users_by_ids(db: Session, user_ids: List[int]):
    return db.query(models.User).filter(
        models.User.id.in_(user_ids)
    ).all()

Ordering and Pagination

from sqlalchemy import desc, asc

def get_users_sorted(db: Session, sort_by: str = "created_at", order: str = "desc"):
    query = db.query(models.User)

    if order == "desc":
        query = query.order_by(desc(getattr(models.User, sort_by)))
    else:
        query = query.order_by(asc(getattr(models.User, sort_by)))

    return query.all()

def get_users_paginated(db: Session, page: int = 1, per_page: int = 10):
    offset = (page - 1) * per_page
    users = db.query(models.User).offset(offset).limit(per_page).all()
    total = db.query(models.User).count()

    return {
        "items": users,
        "total": total,
        "page": page,
        "per_page": per_page,
        "pages": (total + per_page - 1) // per_page
    }

Aggregations

from sqlalchemy import func

def get_user_stats(db: Session):
    return {
        "total_users": db.query(func.count(models.User.id)).scalar(),
        "active_users": db.query(func.count(models.User.id)).filter(
            models.User.is_active == True
        ).scalar()
    }

def get_posts_per_user(db: Session):
    return db.query(
        models.User.username,
        func.count(models.Post.id).label("post_count")
    ).join(models.Post).group_by(models.User.id).all()

Joins and Eager Loading

from sqlalchemy.orm import joinedload, selectinload

# Eager load single relationship
def get_user_with_posts(db: Session, user_id: int):
    return db.query(models.User).options(
        joinedload(models.User.posts)
    ).filter(models.User.id == user_id).first()

# Eager load multiple relationships
def get_post_with_details(db: Session, post_id: int):
    return db.query(models.Post).options(
        joinedload(models.Post.author),
        selectinload(models.Post.comments),
        selectinload(models.Post.tags)
    ).filter(models.Post.id == post_id).first()

Migrations with Alembic

Setup Alembic

pip install alembic
alembic init alembic

Configure Alembic

# alembic/env.py
from app.database import Base
from app.models import User, Post, Tag, Comment  # Import all models

target_metadata = Base.metadata
# alembic.ini
sqlalchemy.url = sqlite:///./app.db

Migration Commands

# Create migration
alembic revision --autogenerate -m "Initial migration"

# Apply migrations
alembic upgrade head

# Rollback
alembic downgrade -1

# View migration history
alembic history

Example Migration

# alembic/versions/001_initial.py
from alembic import op
import sqlalchemy as sa

def upgrade():
    op.create_table(
        'users',
        sa.Column('id', sa.Integer(), primary_key=True),
        sa.Column('email', sa.String(), unique=True, index=True),
        sa.Column('username', sa.String(), unique=True, index=True),
        sa.Column('hashed_password', sa.String()),
        sa.Column('is_active', sa.Boolean(), default=True),
        sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now())
    )

def downgrade():
    op.drop_table('users')

Complete Example

# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

DATABASE_URL = "sqlite:///./blog.db"

engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# app/models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from .database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True)
    username = Column(String(50), unique=True, index=True)
    hashed_password = Column(String(255))
    bio = Column(Text, nullable=True)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())

    posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")

class Post(Base):
    __tablename__ = "posts"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), index=True)
    slug = Column(String(200), unique=True, index=True)
    content = Column(Text)
    published = Column(Boolean, default=False)
    author_id = Column(Integer, ForeignKey("users.id"))
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

    author = relationship("User", back_populates="posts")

# app/schemas.py
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List
from datetime import datetime

class UserBase(BaseModel):
    email: EmailStr
    username: str = Field(min_length=3, max_length=50)

class UserCreate(UserBase):
    password: str = Field(min_length=8)
    bio: Optional[str] = None

class UserResponse(UserBase):
    id: int
    bio: Optional[str]
    is_active: bool
    created_at: datetime

    class Config:
        from_attributes = True

class PostBase(BaseModel):
    title: str = Field(min_length=5, max_length=200)
    content: str = Field(min_length=10)

class PostCreate(PostBase):
    slug: str = Field(pattern=r"^[a-z0-9-]+$")
    published: bool = False

class PostResponse(PostBase):
    id: int
    slug: str
    published: bool
    author_id: int
    created_at: datetime

    class Config:
        from_attributes = True

class PostWithAuthor(PostResponse):
    author: UserResponse

# app/main.py
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session, joinedload
from typing import List
from . import models, schemas
from .database import engine, get_db

models.Base.metadata.create_all(bind=engine)

app = FastAPI(title="Blog API")

# Users
@app.post("/users", response_model=schemas.UserResponse, status_code=201)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    if db.query(models.User).filter(models.User.email == user.email).first():
        raise HTTPException(status_code=400, detail="Email already registered")

    from passlib.context import CryptContext
    pwd_context = CryptContext(schemes=["bcrypt"])

    db_user = models.User(
        email=user.email,
        username=user.username,
        hashed_password=pwd_context.hash(user.password),
        bio=user.bio
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.get("/users", response_model=List[schemas.UserResponse])
def list_users(skip: int = 0, limit: int = 20, db: Session = Depends(get_db)):
    return db.query(models.User).offset(skip).limit(limit).all()

@app.get("/users/{user_id}", response_model=schemas.UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
    user = db.query(models.User).filter(models.User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

# Posts
@app.post("/users/{user_id}/posts", response_model=schemas.PostResponse, status_code=201)
def create_post(
    user_id: int,
    post: schemas.PostCreate,
    db: Session = Depends(get_db)
):
    user = db.query(models.User).filter(models.User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    if db.query(models.Post).filter(models.Post.slug == post.slug).first():
        raise HTTPException(status_code=400, detail="Slug already exists")

    db_post = models.Post(**post.model_dump(), author_id=user_id)
    db.add(db_post)
    db.commit()
    db.refresh(db_post)
    return db_post

@app.get("/posts", response_model=List[schemas.PostWithAuthor])
def list_posts(
    published_only: bool = True,
    skip: int = 0,
    limit: int = 20,
    db: Session = Depends(get_db)
):
    query = db.query(models.Post).options(joinedload(models.Post.author))

    if published_only:
        query = query.filter(models.Post.published == True)

    return query.order_by(models.Post.created_at.desc()).offset(skip).limit(limit).all()

@app.get("/posts/{slug}", response_model=schemas.PostWithAuthor)
def get_post(slug: str, db: Session = Depends(get_db)):
    post = db.query(models.Post).options(
        joinedload(models.Post.author)
    ).filter(models.Post.slug == slug).first()

    if not post:
        raise HTTPException(status_code=404, detail="Post not found")
    return post

Summary

ComponentPurpose
engineDatabase connection
SessionLocalSession factory
BaseDeclarative base for models
get_dbDependency for session management
relationship()Define model relationships
AlembicDatabase migrations
Query MethodUse Case
filter()WHERE clause
order_by()Sorting
offset/limitPagination
joinedload()Eager load relationships
func.count()Aggregations

Next Steps

In Part 7, we’ll build a Complete CRUD API - combining everything we’ve learned into a production-ready API with proper error handling.

Series Navigation:

  • Part 1: Introduction and Setup
  • Part 2: Path and Query Parameters
  • Part 3: Request Bodies and Pydantic
  • Part 4: Response Models and Status Codes
  • Part 5: Dependency Injection
  • Part 6: Database Integration (You are here)
  • Part 7: CRUD Operations

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.