FastAPI Tutorial Part 10: Role-Based Access Control (RBAC)
Implement RBAC in FastAPI. Learn user roles, permissions, scopes, and granular access control for secure multi-tenant applications.
Moshiour Rahman
Advertisement
Understanding RBAC
Role-Based Access Control assigns permissions to roles, and roles to users. This provides a scalable way to manage access in applications.
User -> Role -> Permissions
│ │ │
└──────►│──────────►│
admin [create, read, update, delete]
editor [create, read, update]
viewer [read]
Database Models
# app/models/rbac.py
from sqlalchemy import Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import relationship
from ..database import Base
# Association tables
user_roles = Table(
"user_roles",
Base.metadata,
Column("user_id", Integer, ForeignKey("users.id")),
Column("role_id", Integer, ForeignKey("roles.id"))
)
role_permissions = Table(
"role_permissions",
Base.metadata,
Column("role_id", Integer, ForeignKey("roles.id")),
Column("permission_id", Integer, ForeignKey("permissions.id"))
)
class Permission(Base):
__tablename__ = "permissions"
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True) # e.g., "users:read"
description = Column(String(200))
class Role(Base):
__tablename__ = "roles"
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True) # e.g., "admin"
description = Column(String(200))
permissions = relationship("Permission", secondary=role_permissions)
users = relationship("User", secondary=user_roles, back_populates="roles")
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String, unique=True)
username = Column(String, unique=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
roles = relationship("Role", secondary=user_roles, back_populates="users")
@property
def permissions(self) -> set:
perms = set()
for role in self.roles:
for perm in role.permissions:
perms.add(perm.name)
return perms
def has_permission(self, permission: str) -> bool:
return permission in self.permissions
def has_role(self, role_name: str) -> bool:
return any(r.name == role_name for r in self.roles)
Permission Checking
Permission Dependencies
# app/auth/permissions.py
from fastapi import Depends, HTTPException, status
from ..auth.oauth2 import get_current_active_user
from ..models.user import User
class PermissionChecker:
def __init__(self, required_permissions: list[str]):
self.required_permissions = required_permissions
def __call__(self, user: User = Depends(get_current_active_user)) -> User:
for permission in self.required_permissions:
if not user.has_permission(permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied: {permission} required"
)
return user
# Usage
require_read = PermissionChecker(["items:read"])
require_write = PermissionChecker(["items:write"])
require_admin = PermissionChecker(["admin"])
@app.get("/items")
def list_items(user: User = Depends(require_read)):
return get_items()
@app.post("/items")
def create_item(item: ItemCreate, user: User = Depends(require_write)):
return create_new_item(item)
Role Dependencies
def require_role(role_name: str):
def role_checker(user: User = Depends(get_current_active_user)) -> User:
if not user.has_role(role_name):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Role '{role_name}' required"
)
return user
return role_checker
@app.get("/admin/dashboard")
def admin_dashboard(admin: User = Depends(require_role("admin"))):
return {"message": "Admin area"}
Multiple Permissions (AND/OR)
def require_all_permissions(permissions: list[str]):
"""User must have ALL permissions."""
def checker(user: User = Depends(get_current_active_user)) -> User:
missing = [p for p in permissions if not user.has_permission(p)]
if missing:
raise HTTPException(
status_code=403,
detail=f"Missing permissions: {missing}"
)
return user
return checker
def require_any_permission(permissions: list[str]):
"""User must have AT LEAST ONE permission."""
def checker(user: User = Depends(get_current_active_user)) -> User:
if not any(user.has_permission(p) for p in permissions):
raise HTTPException(
status_code=403,
detail=f"Requires one of: {permissions}"
)
return user
return checker
# Usage
@app.delete("/items/{id}")
def delete_item(
id: int,
user: User = Depends(require_any_permission(["items:delete", "admin"]))
):
return delete_item_by_id(id)
Resource-Based Permissions
Owner Check
def require_owner_or_admin(resource_getter):
"""Check if user owns the resource or is admin."""
async def checker(
resource_id: int,
user: User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
resource = resource_getter(db, resource_id)
if not resource:
raise HTTPException(status_code=404, detail="Not found")
if resource.owner_id != user.id and not user.has_role("admin"):
raise HTTPException(status_code=403, detail="Not authorized")
return resource
return checker
get_item_if_authorized = require_owner_or_admin(get_item)
@app.put("/items/{resource_id}")
def update_item(
item_update: ItemUpdate,
item = Depends(get_item_if_authorized)
):
return update_item_in_db(item, item_update)
Complete RBAC System
# app/auth/rbac.py
from fastapi import Depends, HTTPException, status
from typing import List, Optional, Callable
from enum import Enum
from ..models.user import User
from .oauth2 import get_current_active_user
class Permission(str, Enum):
# Users
USERS_READ = "users:read"
USERS_WRITE = "users:write"
USERS_DELETE = "users:delete"
# Items
ITEMS_READ = "items:read"
ITEMS_WRITE = "items:write"
ITEMS_DELETE = "items:delete"
# Admin
ADMIN = "admin"
class Role(str, Enum):
ADMIN = "admin"
EDITOR = "editor"
VIEWER = "viewer"
ROLE_PERMISSIONS = {
Role.ADMIN: [p.value for p in Permission],
Role.EDITOR: [
Permission.ITEMS_READ.value,
Permission.ITEMS_WRITE.value,
Permission.USERS_READ.value,
],
Role.VIEWER: [
Permission.ITEMS_READ.value,
Permission.USERS_READ.value,
],
}
class RBACManager:
@staticmethod
def get_user_permissions(user: User) -> set:
permissions = set()
for role in user.roles:
if role.name in ROLE_PERMISSIONS:
permissions.update(ROLE_PERMISSIONS[Role(role.name)])
return permissions
@staticmethod
def has_permission(user: User, permission: str) -> bool:
return permission in RBACManager.get_user_permissions(user)
@staticmethod
def require_permissions(*permissions: str):
def dependency(user: User = Depends(get_current_active_user)):
user_perms = RBACManager.get_user_permissions(user)
missing = set(permissions) - user_perms
if missing:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"message": "Insufficient permissions",
"required": list(permissions),
"missing": list(missing)
}
)
return user
return dependency
@staticmethod
def require_roles(*roles: str):
def dependency(user: User = Depends(get_current_active_user)):
user_roles = {r.name for r in user.roles}
if not user_roles.intersection(roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires role: {roles}"
)
return user
return dependency
# Convenience dependencies
RequireAdmin = RBACManager.require_roles(Role.ADMIN.value)
RequireEditor = RBACManager.require_roles(Role.ADMIN.value, Role.EDITOR.value)
RequireItemsRead = RBACManager.require_permissions(Permission.ITEMS_READ.value)
RequireItemsWrite = RBACManager.require_permissions(Permission.ITEMS_WRITE.value)
# app/routers/items.py
from ..auth.rbac import RequireItemsRead, RequireItemsWrite, RequireAdmin
router = APIRouter(prefix="/items", tags=["Items"])
@router.get("/")
def list_items(user: User = Depends(RequireItemsRead)):
return {"items": get_all_items()}
@router.post("/")
def create_item(item: ItemCreate, user: User = Depends(RequireItemsWrite)):
return create_new_item(item, owner_id=user.id)
@router.delete("/{item_id}")
def delete_item(item_id: int, user: User = Depends(RequireAdmin)):
return delete_item_by_id(item_id)
Scopes (OAuth2 Style)
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="auth/login",
scopes={
"items:read": "Read items",
"items:write": "Create and modify items",
"users:read": "Read user profiles",
"admin": "Admin access"
}
)
def get_current_user(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value}
)
payload = decode_token(token)
if not payload:
raise credentials_exception
user = get_user_by_id(db, payload.get("sub"))
if not user:
raise credentials_exception
# Check scopes
token_scopes = payload.get("scopes", [])
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=403,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value}
)
return user
# Usage with Security
from fastapi import Security
@app.get("/items")
def list_items(user: User = Security(get_current_user, scopes=["items:read"])):
return get_items()
@app.post("/items")
def create_item(
item: ItemCreate,
user: User = Security(get_current_user, scopes=["items:write"])
):
return create_new_item(item)
Summary
| Pattern | Use Case |
|---|---|
| Role check | Simple role verification |
| Permission check | Granular access control |
| Resource ownership | User owns the resource |
| Scopes | OAuth2-style permissions |
| Component | Description |
|---|---|
| Role | Named group of permissions |
| Permission | Single access right |
| Scope | OAuth2 permission string |
Next Steps
In Part 11, we’ll explore Background Tasks and Celery - handling long-running tasks asynchronously.
Series Navigation:
- Part 1-9: Foundations & Authentication
- Part 10: RBAC (You are here)
- Part 11: Background Tasks
Advertisement
Moshiour Rahman
Software Architect & AI Engineer
Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.
Related Articles
FastAPI Tutorial Part 9: JWT Authentication - Secure Your API
Implement JWT authentication in FastAPI. Learn token generation, password hashing, OAuth2 flows, refresh tokens, and protecting API endpoints.
PythonFastAPI Tutorial Part 18: API Security Best Practices
Secure your FastAPI application against common vulnerabilities. Learn input validation, rate limiting, CORS, and OWASP security patterns.
PythonFastAPI Tutorial Part 14: File Uploads and Storage
Handle file uploads in FastAPI. Learn form data, file validation, cloud storage integration with S3, and serving static files.
Comments
Comments are powered by GitHub Discussions.
Configure Giscus at giscus.app to enable comments.