# You'll have to create and import
# your own user models, functions, db
# and other dependencies.
# Personal recommendation is to use
# FastAPI Users and SQLAlchemy if you
# don't need a fully customized user
# management system.
from cryptography.fernet import Fernet
from database import async_session_maker
from users import (
User,
UserRead,
UserCreate,
fastapi_users,
auth_backend,
current_active_user
)
# Import FastAPI and Two-Fast-Auth
from fastapi import (
FastAPI,
Body,
Depends,
Header,
HTTPException,
status
)
from fastapi.responses import StreamingResponse
from two_fast_auth import (
TwoFactorMiddleware,
TwoFactorAuth
)
# Generate or use a persistent encryption key
ENCRYPTION_KEY = Fernet.generate_key()
# Create FastAPI instance
app = FastAPI()
# Get encrypted secret from database
async def get_encrypted_secret(user_id: str) -> str:
async with async_session_maker() as session:
user = await session.get(User, user_id)
return user.encrypted_secret if user else None
# Middleware with encryption
app.add_middleware(
TwoFactorMiddleware,
get_user_secret_callback=lambda uid: get_encrypted_secret(uid),
encryption_key=ENCRYPTION_KEY,
excluded_paths=[
"/docs", # Swagger UI
"/openapi.json", # OpenAPI JSON
"/redoc", # Redoc UI
"/auth/jwt/login", # FastAPI Users login
"/auth/register", # FastAPI Users register
"/setup-2fa", # Example endpoint
"/verify-2fa", # Example endpoint
"/recovery-codes" # Example endpoint
]
)
# FastAPI Users Routers
## Auth Router
app.include_router(
fastapi_users.get_auth_router(
auth_backend
),
prefix="/auth/jwt",
tags=["Auth"],
)
## Register Router
app.include_router(
fastapi_users.get_register_router(
UserRead,
UserCreate
),
prefix="/auth",
tags=["Auth"],
)
# Endpoints examples
## Setup 2FA with Headers
@app.post(
"/setup-2fa",
tags=["Auth"]
)
async def setup_2fa(
user: User = Depends(current_active_user)
):
tfa = TwoFactorAuth()
encrypted_secret = TwoFactorAuth.encrypt_secret(
tfa.secret,
ENCRYPTION_KEY
)
recovery_codes = TwoFactorAuth.generate_recovery_codes()
async with async_session_maker() as session:
db_user = await session.get(User, user.id)
db_user.encrypted_secret = encrypted_secret
db_user.recovery_codes = recovery_codes
await session.commit()
return StreamingResponse(
tfa.generate_qr_code(user.email),
media_type="image/png",
headers={
"X-Encrypted-Secret": encrypted_secret,
"X-Recovery-Codes": ",".join(recovery_codes)
}
)
## Verification with Manual Decryption
@app.post(
"/verify-2fa",
tags=["Auth"]
)
async def verify_2fa(
code: str = Body(...),
user: User = Depends(current_active_user)
):
async with async_session_maker() as session:
db_user = await session.get(User, user.id)
if not db_user.encrypted_secret:
raise HTTPException(
status_code=400,
detail="2FA not set up"
)
secret = TwoFactorAuth.decrypt_secret(
db_user.encrypted_secret,
ENCRYPTION_KEY
)
if not TwoFactorAuth(secret).verify_code(code):
raise HTTPException(
status_code=401,
detail="Invalid code"
)
return {"status": "verified"}
## 2FA protected
@app.get(
"/protected-route",
tags=["Protected"]
)
async def protected_route(
user: User = Depends(current_active_user),
x_2fa_code: str = Header(
...,
alias="X-2FA-Code"
)
):
# Explicit 2FA verification for demonstration
if not user.two_fa_secret:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="2FA not configured"
)
if not TwoFactorAuth(user.two_fa_secret).verify_code(x_2fa_code):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid 2FA code"
)
return {
"message": "You've accessed a protected route with valid 2FA!",
"user_id": str(user.id)
}
# Run the API server
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=8000
)