FastAPI Tutorial Part 6: Database Integration with SQLAlchemy
Connect FastAPI to databases using SQLAlchemy. Learn ORM models, relationships, migrations with Alembic, and async database operations.
Moshiour Rahman
Advertisement
Database Options in FastAPI
FastAPI works with any database through various libraries:
| Database | Library | Async Support |
|---|---|---|
| PostgreSQL | SQLAlchemy, asyncpg | Yes |
| MySQL | SQLAlchemy, aiomysql | Yes |
| SQLite | SQLAlchemy, aiosqlite | Yes |
| MongoDB | Motor, Beanie | Yes |
We’ll focus on SQLAlchemy, the most popular Python ORM.
Project Setup
Install Dependencies
pip install sqlalchemy
pip install psycopg2-binary # PostgreSQL
# or
pip install aiosqlite # Async SQLite
Project Structure
app/
├── __init__.py
├── main.py
├── database.py # Database configuration
├── models.py # SQLAlchemy models
├── schemas.py # Pydantic schemas
├── crud.py # Database operations
└── routers/
└── users.py
Database Configuration
Synchronous Setup
# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./app.db"
# PostgreSQL: "postgresql://user:password@localhost/dbname"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False} # SQLite only
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Async Setup
# app/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
DATABASE_URL = "sqlite+aiosqlite:///./app.db"
# PostgreSQL: "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
async def get_db():
async with async_session() as session:
yield session
SQLAlchemy Models
Basic Model
# app/models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Float
from sqlalchemy.sql import func
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
Model with Relationships
from sqlalchemy import Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import relationship
# Many-to-many association table
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id")),
Column("tag_id", Integer, ForeignKey("tags.id"))
)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
username = Column(String, unique=True, index=True)
# One-to-many relationship
posts = relationship("Post", back_populates="author")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
content = Column(String)
author_id = Column(Integer, ForeignKey("users.id"))
# Relationships
author = relationship("User", back_populates="posts")
tags = relationship("Tag", secondary=post_tags, back_populates="posts")
comments = relationship("Comment", back_populates="post")
class Tag(Base):
__tablename__ = "tags"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, unique=True, index=True)
posts = relationship("Post", secondary=post_tags, back_populates="tags")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True, index=True)
content = Column(String)
post_id = Column(Integer, ForeignKey("posts.id"))
post = relationship("Post", back_populates="comments")
Pydantic Schemas
Separate Input/Output Schemas
# app/schemas.py
from pydantic import BaseModel, EmailStr
from typing import Optional, List
from datetime import datetime
# User schemas
class UserBase(BaseModel):
email: EmailStr
username: str
class UserCreate(UserBase):
password: str
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
username: Optional[str] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
class Config:
from_attributes = True # Enable ORM mode
# Post schemas
class PostBase(BaseModel):
title: str
content: str
class PostCreate(PostBase):
tag_ids: List[int] = []
class PostResponse(PostBase):
id: int
author_id: int
created_at: datetime
class Config:
from_attributes = True
class PostWithAuthor(PostResponse):
author: UserResponse
CRUD Operations
Basic CRUD Functions
# app/crud.py
from sqlalchemy.orm import Session
from sqlalchemy import select
from . import models, schemas
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# User CRUD
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
hashed_password = pwd_context.hash(user.password)
db_user = models.User(
email=user.email,
username=user.username,
hashed_password=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
def update_user(db: Session, user_id: int, user: schemas.UserUpdate):
db_user = get_user(db, user_id)
if not db_user:
return None
update_data = user.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_user, field, value)
db.commit()
db.refresh(db_user)
return db_user
def delete_user(db: Session, user_id: int):
db_user = get_user(db, user_id)
if db_user:
db.delete(db_user)
db.commit()
return db_user
Async CRUD Functions
# app/crud_async.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import selectinload
async def get_user(db: AsyncSession, user_id: int):
result = await db.execute(
select(models.User).where(models.User.id == user_id)
)
return result.scalar_one_or_none()
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100):
result = await db.execute(
select(models.User).offset(skip).limit(limit)
)
return result.scalars().all()
async def create_user(db: AsyncSession, user: schemas.UserCreate):
db_user = models.User(
email=user.email,
username=user.username,
hashed_password=pwd_context.hash(user.password)
)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
# With eager loading for relationships
async def get_post_with_author(db: AsyncSession, post_id: int):
result = await db.execute(
select(models.Post)
.options(selectinload(models.Post.author))
.where(models.Post.id == post_id)
)
return result.scalar_one_or_none()
FastAPI Integration
Main Application
# app/main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from . import crud, models, schemas
from .database import engine, get_db
# Create tables
models.Base.metadata.create_all(bind=engine)
app = FastAPI()
@app.post("/users/", response_model=schemas.UserResponse)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="Email already registered")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=List[schemas.UserResponse])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return crud.get_users(db, skip=skip, limit=limit)
@app.get("/users/{user_id}", response_model=schemas.UserResponse)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.put("/users/{user_id}", response_model=schemas.UserResponse)
def update_user(
user_id: int,
user: schemas.UserUpdate,
db: Session = Depends(get_db)
):
db_user = crud.update_user(db, user_id, user)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@app.delete("/users/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.delete_user(db, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return {"message": "User deleted"}
Async Endpoints
from sqlalchemy.ext.asyncio import AsyncSession
from .database import get_db # async version
@app.get("/users/{user_id}", response_model=schemas.UserResponse)
async def read_user(user_id: int, db: AsyncSession = Depends(get_db)):
db_user = await crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
Query Patterns
Filtering
def get_users_by_status(db: Session, is_active: bool):
return db.query(models.User).filter(
models.User.is_active == is_active
).all()
def search_users(db: Session, query: str):
return db.query(models.User).filter(
models.User.username.ilike(f"%{query}%")
).all()
def get_users_by_ids(db: Session, user_ids: List[int]):
return db.query(models.User).filter(
models.User.id.in_(user_ids)
).all()
Ordering and Pagination
from sqlalchemy import desc, asc
def get_users_sorted(db: Session, sort_by: str = "created_at", order: str = "desc"):
query = db.query(models.User)
if order == "desc":
query = query.order_by(desc(getattr(models.User, sort_by)))
else:
query = query.order_by(asc(getattr(models.User, sort_by)))
return query.all()
def get_users_paginated(db: Session, page: int = 1, per_page: int = 10):
offset = (page - 1) * per_page
users = db.query(models.User).offset(offset).limit(per_page).all()
total = db.query(models.User).count()
return {
"items": users,
"total": total,
"page": page,
"per_page": per_page,
"pages": (total + per_page - 1) // per_page
}
Aggregations
from sqlalchemy import func
def get_user_stats(db: Session):
return {
"total_users": db.query(func.count(models.User.id)).scalar(),
"active_users": db.query(func.count(models.User.id)).filter(
models.User.is_active == True
).scalar()
}
def get_posts_per_user(db: Session):
return db.query(
models.User.username,
func.count(models.Post.id).label("post_count")
).join(models.Post).group_by(models.User.id).all()
Joins and Eager Loading
from sqlalchemy.orm import joinedload, selectinload
# Eager load single relationship
def get_user_with_posts(db: Session, user_id: int):
return db.query(models.User).options(
joinedload(models.User.posts)
).filter(models.User.id == user_id).first()
# Eager load multiple relationships
def get_post_with_details(db: Session, post_id: int):
return db.query(models.Post).options(
joinedload(models.Post.author),
selectinload(models.Post.comments),
selectinload(models.Post.tags)
).filter(models.Post.id == post_id).first()
Migrations with Alembic
Setup Alembic
pip install alembic
alembic init alembic
Configure Alembic
# alembic/env.py
from app.database import Base
from app.models import User, Post, Tag, Comment # Import all models
target_metadata = Base.metadata
# alembic.ini
sqlalchemy.url = sqlite:///./app.db
Migration Commands
# Create migration
alembic revision --autogenerate -m "Initial migration"
# Apply migrations
alembic upgrade head
# Rollback
alembic downgrade -1
# View migration history
alembic history
Example Migration
# alembic/versions/001_initial.py
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'users',
sa.Column('id', sa.Integer(), primary_key=True),
sa.Column('email', sa.String(), unique=True, index=True),
sa.Column('username', sa.String(), unique=True, index=True),
sa.Column('hashed_password', sa.String()),
sa.Column('is_active', sa.Boolean(), default=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now())
)
def downgrade():
op.drop_table('users')
Complete Example
# app/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
DATABASE_URL = "sqlite:///./blog.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# app/models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True)
username = Column(String(50), unique=True, index=True)
hashed_password = Column(String(255))
bio = Column(Text, nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
posts = relationship("Post", back_populates="author", cascade="all, delete-orphan")
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), index=True)
slug = Column(String(200), unique=True, index=True)
content = Column(Text)
published = Column(Boolean, default=False)
author_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
author = relationship("User", back_populates="posts")
# app/schemas.py
from pydantic import BaseModel, EmailStr, Field
from typing import Optional, List
from datetime import datetime
class UserBase(BaseModel):
email: EmailStr
username: str = Field(min_length=3, max_length=50)
class UserCreate(UserBase):
password: str = Field(min_length=8)
bio: Optional[str] = None
class UserResponse(UserBase):
id: int
bio: Optional[str]
is_active: bool
created_at: datetime
class Config:
from_attributes = True
class PostBase(BaseModel):
title: str = Field(min_length=5, max_length=200)
content: str = Field(min_length=10)
class PostCreate(PostBase):
slug: str = Field(pattern=r"^[a-z0-9-]+$")
published: bool = False
class PostResponse(PostBase):
id: int
slug: str
published: bool
author_id: int
created_at: datetime
class Config:
from_attributes = True
class PostWithAuthor(PostResponse):
author: UserResponse
# app/main.py
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session, joinedload
from typing import List
from . import models, schemas
from .database import engine, get_db
models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="Blog API")
# Users
@app.post("/users", response_model=schemas.UserResponse, status_code=201)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
if db.query(models.User).filter(models.User.email == user.email).first():
raise HTTPException(status_code=400, detail="Email already registered")
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"])
db_user = models.User(
email=user.email,
username=user.username,
hashed_password=pwd_context.hash(user.password),
bio=user.bio
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users", response_model=List[schemas.UserResponse])
def list_users(skip: int = 0, limit: int = 20, db: Session = Depends(get_db)):
return db.query(models.User).offset(skip).limit(limit).all()
@app.get("/users/{user_id}", response_model=schemas.UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# Posts
@app.post("/users/{user_id}/posts", response_model=schemas.PostResponse, status_code=201)
def create_post(
user_id: int,
post: schemas.PostCreate,
db: Session = Depends(get_db)
):
user = db.query(models.User).filter(models.User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if db.query(models.Post).filter(models.Post.slug == post.slug).first():
raise HTTPException(status_code=400, detail="Slug already exists")
db_post = models.Post(**post.model_dump(), author_id=user_id)
db.add(db_post)
db.commit()
db.refresh(db_post)
return db_post
@app.get("/posts", response_model=List[schemas.PostWithAuthor])
def list_posts(
published_only: bool = True,
skip: int = 0,
limit: int = 20,
db: Session = Depends(get_db)
):
query = db.query(models.Post).options(joinedload(models.Post.author))
if published_only:
query = query.filter(models.Post.published == True)
return query.order_by(models.Post.created_at.desc()).offset(skip).limit(limit).all()
@app.get("/posts/{slug}", response_model=schemas.PostWithAuthor)
def get_post(slug: str, db: Session = Depends(get_db)):
post = db.query(models.Post).options(
joinedload(models.Post.author)
).filter(models.Post.slug == slug).first()
if not post:
raise HTTPException(status_code=404, detail="Post not found")
return post
Summary
| Component | Purpose |
|---|---|
engine | Database connection |
SessionLocal | Session factory |
Base | Declarative base for models |
get_db | Dependency for session management |
relationship() | Define model relationships |
| Alembic | Database migrations |
| Query Method | Use Case |
|---|---|
filter() | WHERE clause |
order_by() | Sorting |
offset/limit | Pagination |
joinedload() | Eager load relationships |
func.count() | Aggregations |
Next Steps
In Part 7, we’ll build a Complete CRUD API - combining everything we’ve learned into a production-ready API with proper error handling.
Series Navigation:
- Part 1: Introduction and Setup
- Part 2: Path and Query Parameters
- Part 3: Request Bodies and Pydantic
- Part 4: Response Models and Status Codes
- Part 5: Dependency Injection
- Part 6: Database Integration (You are here)
- Part 7: CRUD Operations
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
SQLAlchemy 2.0: Complete Python ORM Guide
Master SQLAlchemy 2.0 for Python database operations. Learn ORM, async support, relationships, migrations, and build robust data layers.
PythonFastAPI Tutorial Part 14: File Uploads and Storage
Handle file uploads in FastAPI. Learn form data, file validation, cloud storage integration with S3, and serving static files.
PythonFastAPI Tutorial Part 11: Background Tasks and Celery
Handle long-running operations in FastAPI. Learn built-in BackgroundTasks, Celery integration, task queues, and async processing patterns.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.