import secrets from typing import Optional from fastapi import Depends, HTTPException, Request, status from fastapi.responses import RedirectResponse from itsdangerous import BadSignature, URLSafeTimedSerializer from passlib.context import CryptContext from sqlalchemy.orm import Session from config import COOKIE_MAX_AGE, COOKIE_NAME, CSRF_COOKIE from database import get_db from models import User, UserServiceAccess from utils import now_utc from sqlalchemy import select import os _SIGNING_KEY = os.getenv("SIGNING_KEY", secrets.token_urlsafe(32)) serializer = URLSafeTimedSerializer(_SIGNING_KEY, salt="portal-auth") pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(password: str, password_hash: str) -> bool: return pwd_context.verify(password, password_hash) def user_is_valid(user: User) -> bool: return bool(user.active and user.expires_at > now_utc()) def issue_auth_cookie(response: RedirectResponse, user: User) -> None: token = serializer.dumps({"user_id": user.id}) response.set_cookie( key=COOKIE_NAME, value=token, httponly=True, secure=True, samesite="strict", max_age=COOKIE_MAX_AGE, path="/", ) def issue_csrf_cookie(response: RedirectResponse) -> str: token = secrets.token_urlsafe(24) response.set_cookie( key=CSRF_COOKIE, value=token, httponly=False, secure=True, samesite="lax", max_age=COOKIE_MAX_AGE, path="/", ) return token def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]: raw = request.cookies.get(COOKIE_NAME) if not raw: return None try: payload = serializer.loads(raw, max_age=COOKIE_MAX_AGE) except BadSignature: return None user = db.get(User, int(payload["user_id"])) if not user or not user_is_valid(user): return None return user def require_user(user: Optional[User] = Depends(get_current_user)) -> User: if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") return user def require_admin(user: User = Depends(require_user)) -> User: if not user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin only") return user def validate_csrf(request: Request) -> None: cookie = request.cookies.get(CSRF_COOKIE) form_val = request.headers.get("X-CSRF-Token") if request.headers.get("content-type", "").startswith("application/x-www-form-urlencoded"): return if not cookie or not form_val or cookie != form_val: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="CSRF failed") def has_access(db: Session, user_id: int, service_id: int) -> bool: q = select(UserServiceAccess).where( UserServiceAccess.user_id == user_id, UserServiceAccess.service_id == service_id, ) return db.scalar(q) is not None