Files
ruslan c8c77048c7 refactor: split main.py into modules (config, database, models, utils, auth, runtime, maintenance)
main.py was ~3000 lines with models, routes, Docker ops, maintenance all mixed.
Split into 7 focused modules:
- config.py: env vars and constants
- database.py: SQLAlchemy engine, SessionLocal, Base, get_db
- models.py: ORM models and enums
- utils.py: logging, formatting, icon handling, misc helpers
- auth.py: password hashing, cookies, CSRF, user dependency
- runtime.py: all Docker operations, pool management, session lifecycle
- maintenance.py: cleanup loop, schema bootstrap, startup logic
- main.py: FastAPI app, middleware, all route handlers only
2026-05-01 09:40:06 +00:00

103 lines
3.0 KiB
Python

import secrets
from typing import Optional
from fastapi import Depends, HTTPException, Request, status
from fastapi.responses import RedirectResponse
from itsdangerous import BadSignature, URLSafeTimedSerializer
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from config import COOKIE_MAX_AGE, COOKIE_NAME, CSRF_COOKIE
from database import get_db
from models import User, UserServiceAccess
from utils import now_utc
from sqlalchemy import select
import os
_SIGNING_KEY = os.getenv("SIGNING_KEY", secrets.token_urlsafe(32))
serializer = URLSafeTimedSerializer(_SIGNING_KEY, salt="portal-auth")
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(password: str, password_hash: str) -> bool:
return pwd_context.verify(password, password_hash)
def user_is_valid(user: User) -> bool:
return bool(user.active and user.expires_at > now_utc())
def issue_auth_cookie(response: RedirectResponse, user: User) -> None:
token = serializer.dumps({"user_id": user.id})
response.set_cookie(
key=COOKIE_NAME,
value=token,
httponly=True,
secure=True,
samesite="strict",
max_age=COOKIE_MAX_AGE,
path="/",
)
def issue_csrf_cookie(response: RedirectResponse) -> str:
token = secrets.token_urlsafe(24)
response.set_cookie(
key=CSRF_COOKIE,
value=token,
httponly=False,
secure=True,
samesite="lax",
max_age=COOKIE_MAX_AGE,
path="/",
)
return token
def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]:
raw = request.cookies.get(COOKIE_NAME)
if not raw:
return None
try:
payload = serializer.loads(raw, max_age=COOKIE_MAX_AGE)
except BadSignature:
return None
user = db.get(User, int(payload["user_id"]))
if not user or not user_is_valid(user):
return None
return user
def require_user(user: Optional[User] = Depends(get_current_user)) -> User:
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized")
return user
def require_admin(user: User = Depends(require_user)) -> User:
if not user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin only")
return user
def validate_csrf(request: Request) -> None:
cookie = request.cookies.get(CSRF_COOKIE)
form_val = request.headers.get("X-CSRF-Token")
if request.headers.get("content-type", "").startswith("application/x-www-form-urlencoded"):
return
if not cookie or not form_val or cookie != form_val:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="CSRF failed")
def has_access(db: Session, user_id: int, service_id: int) -> bool:
q = select(UserServiceAccess).where(
UserServiceAccess.user_id == user_id,
UserServiceAccess.service_id == service_id,
)
return db.scalar(q) is not None