Python 6 min read

FastAPI Tutorial Part 10: Role-Based Access Control (RBAC)

Implement RBAC in FastAPI. Learn user roles, permissions, scopes, and granular access control for secure multi-tenant applications.

MR

Moshiour Rahman

Advertisement

Understanding RBAC

Role-Based Access Control assigns permissions to roles, and roles to users. This provides a scalable way to manage access in applications.

User -> Role -> Permissions
  │       │          │
  └──────►│──────────►│
         admin    [create, read, update, delete]
         editor   [create, read, update]
         viewer   [read]

Database Models

# app/models/rbac.py
from sqlalchemy import Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import relationship
from ..database import Base

# Association tables
user_roles = Table(
    "user_roles",
    Base.metadata,
    Column("user_id", Integer, ForeignKey("users.id")),
    Column("role_id", Integer, ForeignKey("roles.id"))
)

role_permissions = Table(
    "role_permissions",
    Base.metadata,
    Column("role_id", Integer, ForeignKey("roles.id")),
    Column("permission_id", Integer, ForeignKey("permissions.id"))
)

class Permission(Base):
    __tablename__ = "permissions"

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True)  # e.g., "users:read"
    description = Column(String(200))

class Role(Base):
    __tablename__ = "roles"

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True)  # e.g., "admin"
    description = Column(String(200))

    permissions = relationship("Permission", secondary=role_permissions)
    users = relationship("User", secondary=user_roles, back_populates="roles")

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    email = Column(String, unique=True)
    username = Column(String, unique=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)

    roles = relationship("Role", secondary=user_roles, back_populates="users")

    @property
    def permissions(self) -> set:
        perms = set()
        for role in self.roles:
            for perm in role.permissions:
                perms.add(perm.name)
        return perms

    def has_permission(self, permission: str) -> bool:
        return permission in self.permissions

    def has_role(self, role_name: str) -> bool:
        return any(r.name == role_name for r in self.roles)

Permission Checking

Permission Dependencies

# app/auth/permissions.py
from fastapi import Depends, HTTPException, status
from ..auth.oauth2 import get_current_active_user
from ..models.user import User

class PermissionChecker:
    def __init__(self, required_permissions: list[str]):
        self.required_permissions = required_permissions

    def __call__(self, user: User = Depends(get_current_active_user)) -> User:
        for permission in self.required_permissions:
            if not user.has_permission(permission):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Permission denied: {permission} required"
                )
        return user

# Usage
require_read = PermissionChecker(["items:read"])
require_write = PermissionChecker(["items:write"])
require_admin = PermissionChecker(["admin"])

@app.get("/items")
def list_items(user: User = Depends(require_read)):
    return get_items()

@app.post("/items")
def create_item(item: ItemCreate, user: User = Depends(require_write)):
    return create_new_item(item)

Role Dependencies

def require_role(role_name: str):
    def role_checker(user: User = Depends(get_current_active_user)) -> User:
        if not user.has_role(role_name):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Role '{role_name}' required"
            )
        return user
    return role_checker

@app.get("/admin/dashboard")
def admin_dashboard(admin: User = Depends(require_role("admin"))):
    return {"message": "Admin area"}

Multiple Permissions (AND/OR)

def require_all_permissions(permissions: list[str]):
    """User must have ALL permissions."""
    def checker(user: User = Depends(get_current_active_user)) -> User:
        missing = [p for p in permissions if not user.has_permission(p)]
        if missing:
            raise HTTPException(
                status_code=403,
                detail=f"Missing permissions: {missing}"
            )
        return user
    return checker

def require_any_permission(permissions: list[str]):
    """User must have AT LEAST ONE permission."""
    def checker(user: User = Depends(get_current_active_user)) -> User:
        if not any(user.has_permission(p) for p in permissions):
            raise HTTPException(
                status_code=403,
                detail=f"Requires one of: {permissions}"
            )
        return user
    return checker

# Usage
@app.delete("/items/{id}")
def delete_item(
    id: int,
    user: User = Depends(require_any_permission(["items:delete", "admin"]))
):
    return delete_item_by_id(id)

Resource-Based Permissions

Owner Check

def require_owner_or_admin(resource_getter):
    """Check if user owns the resource or is admin."""
    async def checker(
        resource_id: int,
        user: User = Depends(get_current_active_user),
        db: Session = Depends(get_db)
    ):
        resource = resource_getter(db, resource_id)
        if not resource:
            raise HTTPException(status_code=404, detail="Not found")

        if resource.owner_id != user.id and not user.has_role("admin"):
            raise HTTPException(status_code=403, detail="Not authorized")

        return resource
    return checker

get_item_if_authorized = require_owner_or_admin(get_item)

@app.put("/items/{resource_id}")
def update_item(
    item_update: ItemUpdate,
    item = Depends(get_item_if_authorized)
):
    return update_item_in_db(item, item_update)

Complete RBAC System

# app/auth/rbac.py
from fastapi import Depends, HTTPException, status
from typing import List, Optional, Callable
from enum import Enum
from ..models.user import User
from .oauth2 import get_current_active_user

class Permission(str, Enum):
    # Users
    USERS_READ = "users:read"
    USERS_WRITE = "users:write"
    USERS_DELETE = "users:delete"

    # Items
    ITEMS_READ = "items:read"
    ITEMS_WRITE = "items:write"
    ITEMS_DELETE = "items:delete"

    # Admin
    ADMIN = "admin"

class Role(str, Enum):
    ADMIN = "admin"
    EDITOR = "editor"
    VIEWER = "viewer"

ROLE_PERMISSIONS = {
    Role.ADMIN: [p.value for p in Permission],
    Role.EDITOR: [
        Permission.ITEMS_READ.value,
        Permission.ITEMS_WRITE.value,
        Permission.USERS_READ.value,
    ],
    Role.VIEWER: [
        Permission.ITEMS_READ.value,
        Permission.USERS_READ.value,
    ],
}

class RBACManager:
    @staticmethod
    def get_user_permissions(user: User) -> set:
        permissions = set()
        for role in user.roles:
            if role.name in ROLE_PERMISSIONS:
                permissions.update(ROLE_PERMISSIONS[Role(role.name)])
        return permissions

    @staticmethod
    def has_permission(user: User, permission: str) -> bool:
        return permission in RBACManager.get_user_permissions(user)

    @staticmethod
    def require_permissions(*permissions: str):
        def dependency(user: User = Depends(get_current_active_user)):
            user_perms = RBACManager.get_user_permissions(user)
            missing = set(permissions) - user_perms

            if missing:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail={
                        "message": "Insufficient permissions",
                        "required": list(permissions),
                        "missing": list(missing)
                    }
                )
            return user
        return dependency

    @staticmethod
    def require_roles(*roles: str):
        def dependency(user: User = Depends(get_current_active_user)):
            user_roles = {r.name for r in user.roles}
            if not user_roles.intersection(roles):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Requires role: {roles}"
                )
            return user
        return dependency

# Convenience dependencies
RequireAdmin = RBACManager.require_roles(Role.ADMIN.value)
RequireEditor = RBACManager.require_roles(Role.ADMIN.value, Role.EDITOR.value)
RequireItemsRead = RBACManager.require_permissions(Permission.ITEMS_READ.value)
RequireItemsWrite = RBACManager.require_permissions(Permission.ITEMS_WRITE.value)

# app/routers/items.py
from ..auth.rbac import RequireItemsRead, RequireItemsWrite, RequireAdmin

router = APIRouter(prefix="/items", tags=["Items"])

@router.get("/")
def list_items(user: User = Depends(RequireItemsRead)):
    return {"items": get_all_items()}

@router.post("/")
def create_item(item: ItemCreate, user: User = Depends(RequireItemsWrite)):
    return create_new_item(item, owner_id=user.id)

@router.delete("/{item_id}")
def delete_item(item_id: int, user: User = Depends(RequireAdmin)):
    return delete_item_by_id(item_id)

Scopes (OAuth2 Style)

from fastapi.security import OAuth2PasswordBearer, SecurityScopes

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="auth/login",
    scopes={
        "items:read": "Read items",
        "items:write": "Create and modify items",
        "users:read": "Read user profiles",
        "admin": "Admin access"
    }
)

def get_current_user(
    security_scopes: SecurityScopes,
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
) -> User:
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"

    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value}
    )

    payload = decode_token(token)
    if not payload:
        raise credentials_exception

    user = get_user_by_id(db, payload.get("sub"))
    if not user:
        raise credentials_exception

    # Check scopes
    token_scopes = payload.get("scopes", [])
    for scope in security_scopes.scopes:
        if scope not in token_scopes:
            raise HTTPException(
                status_code=403,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value}
            )

    return user

# Usage with Security
from fastapi import Security

@app.get("/items")
def list_items(user: User = Security(get_current_user, scopes=["items:read"])):
    return get_items()

@app.post("/items")
def create_item(
    item: ItemCreate,
    user: User = Security(get_current_user, scopes=["items:write"])
):
    return create_new_item(item)

Summary

PatternUse Case
Role checkSimple role verification
Permission checkGranular access control
Resource ownershipUser owns the resource
ScopesOAuth2-style permissions
ComponentDescription
RoleNamed group of permissions
PermissionSingle access right
ScopeOAuth2 permission string

Next Steps

In Part 11, we’ll explore Background Tasks and Celery - handling long-running tasks asynchronously.

Series Navigation:

  • Part 1-9: Foundations & Authentication
  • Part 10: RBAC (You are here)
  • Part 11: Background Tasks

Advertisement

MR

Moshiour Rahman

Software Architect & AI Engineer

Share:
MR

Moshiour Rahman

Software Architect & AI Engineer

Enterprise software architect with deep expertise in financial systems, distributed architecture, and AI-powered applications. Building large-scale systems at Fortune 500 companies. Specializing in LLM orchestration, multi-agent systems, and cloud-native solutions. I share battle-tested patterns from real enterprise projects.

Related Articles

Comments

Comments are powered by GitHub Discussions.

Configure Giscus at giscus.app to enable comments.