feat: improve session limit handling and add k6 load testing

This commit is contained in:
2026-04-23 05:17:53 +00:00
parent 47f46d5c5b
commit 1438dee21a
6 changed files with 687 additions and 156 deletions
+263 -128
View File
@@ -1,6 +1,7 @@
import datetime as dt
import enum
import fcntl
import json
import re
import logging
import os
@@ -9,13 +10,14 @@ import secrets
import threading
import time
import uuid
import contextvars
from urllib.parse import parse_qs, unquote, urlparse
from typing import Optional
import docker
import requests
from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from itsdangerous import BadSignature, URLSafeTimedSerializer
@@ -44,6 +46,7 @@ COOKIE_MAX_AGE = 8 * 60 * 60
SESSION_IDLE_SECONDS = int(os.getenv("SESSION_IDLE_SECONDS", "300"))
PUBLIC_HOST = os.getenv("PUBLIC_HOST", "stend.4mont.ru")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
LOG_SLOW_REQUEST_MS = int(os.getenv("LOG_SLOW_REQUEST_MS", "2000"))
TRAEFIK_INTERNAL_URL = os.getenv("TRAEFIK_INTERNAL_URL", "http://traefik")
PREWARM_POOL_SIZE = int(os.getenv("PREWARM_POOL_SIZE", "0"))
UNIVERSAL_POOL_SIZE = int(os.getenv("UNIVERSAL_POOL_SIZE", "0"))
@@ -63,6 +66,22 @@ logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
logger = logging.getLogger("portal")
request_id_ctx = contextvars.ContextVar("request_id", default="-")
def _normalize_log_value(value):
if isinstance(value, (str, int, float, bool)) or value is None:
return value
if isinstance(value, dt.datetime):
return value.isoformat()
return str(value)
def log_event(event: str, level: int = logging.INFO, **fields) -> None:
payload = {"event": event, "req_id": request_id_ctx.get()}
for key, value in fields.items():
payload[key] = _normalize_log_value(value)
logger.log(level, json.dumps(payload, ensure_ascii=False, separators=(",", ":")))
SIGNING_KEY = os.getenv("SIGNING_KEY", secrets.token_urlsafe(32))
serializer = URLSafeTimedSerializer(SIGNING_KEY, salt="portal-auth")
@@ -79,22 +98,51 @@ app.mount("/static", StaticFiles(directory="static"), name="static")
@app.middleware("http")
async def request_logging_middleware(request: Request, call_next):
req_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
token = request_id_ctx.set(req_id)
started = time.time()
client_ip = request.client.host if request.client else "-"
user_agent = request.headers.get("user-agent", "-")
try:
response = await call_next(request)
except Exception:
logger.exception("request_failed req_id=%s method=%s path=%s", req_id, request.method, request.url.path)
log_event(
"request_failed",
level=logging.ERROR,
method=request.method,
path=request.url.path,
client_ip=client_ip,
user_agent=user_agent,
)
request_id_ctx.reset(token)
raise
duration_ms = int((time.time() - started) * 1000)
logger.info(
"request req_id=%s method=%s path=%s status=%s duration_ms=%s",
req_id,
request.method,
request.url.path,
response.status_code,
duration_ms,
level = logging.INFO
if response.status_code >= 500:
level = logging.ERROR
elif response.status_code >= 400:
level = logging.WARNING
log_event(
"request",
level=level,
method=request.method,
path=request.url.path,
query=str(request.url.query or ""),
status=response.status_code,
duration_ms=duration_ms,
client_ip=client_ip,
user_agent=user_agent,
)
if duration_ms >= LOG_SLOW_REQUEST_MS:
log_event(
"slow_request",
level=logging.WARNING,
method=request.method,
path=request.url.path,
duration_ms=duration_ms,
threshold_ms=LOG_SLOW_REQUEST_MS,
)
response.headers["X-Request-ID"] = req_id
request_id_ctx.reset(token)
return response
@@ -112,6 +160,7 @@ class SessionStatus(str, enum.Enum):
ACTIVE = "ACTIVE"
EXPIRED = "EXPIRED"
TERMINATED = "TERMINATED"
ROTATED = "ROTATED"
class User(Base):
@@ -196,6 +245,28 @@ def now_utc() -> dt.datetime:
return dt.datetime.now(dt.timezone.utc)
def session_closed_reason(sess: SessionModel, db: Session) -> str:
if not sess:
return "idle"
if sess.status == SessionStatus.EXPIRED:
return "idle"
if sess.status == SessionStatus.ROTATED:
return "limit"
if sess.status == SessionStatus.TERMINATED:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
active_rows = db.scalars(
select(SessionModel).where(
SessionModel.user_id == sess.user_id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
).all()
active_service_ids = {row.service_id for row in active_rows}
if len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER and sess.service_id not in active_service_ids:
return "limit"
return "idle"
def normalize_web_target(url: str) -> str:
raw = (url or "").strip()
if not raw:
@@ -981,11 +1052,24 @@ def terminate_session_record(
) -> None:
if not sess or sess.status != SessionStatus.ACTIVE:
return
old_status = sess.status
cid = sess.container_id or ""
if stop_container and cid and not cid.startswith(("POOL:", "POOLIDX:", "WEBPOOLIDX:")):
stop_runtime_container(cid)
sess.status = new_status
sess.last_access_at = now_utc()
log_event(
"session_closed",
level=logging.INFO,
session_id=sess.id,
user_id=sess.user_id,
service_id=sess.service_id,
container_id=cid,
old_status=old_status.value if isinstance(old_status, SessionStatus) else str(old_status),
new_status=new_status.value,
reason=session_closed_reason(sess, db),
stop_container=stop_container,
)
def ensure_schema_compatibility() -> None:
@@ -1005,6 +1089,20 @@ def ensure_schema_compatibility() -> None:
"""
)
)
conn.execute(
text(
"""
DO $$
BEGIN
BEGIN
ALTER TYPE sessionstatus ADD VALUE IF NOT EXISTS 'ROTATED';
EXCEPTION WHEN undefined_object THEN
NULL;
END;
END $$;
"""
)
)
with engine.begin() as conn:
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS warm_pool_size INTEGER NOT NULL DEFAULT 0"))
@@ -1430,6 +1528,11 @@ def index(request: Request, user: Optional[User] = Depends(get_current_user), db
session_notice = ""
if session_closed == "idle":
session_notice = "Сессия была закрыта из-за простоя. Откройте сервис заново."
elif session_closed == "limit":
session_notice = (
f"Сессия была закрыта из-за лимита в {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). "
"Освободите один сервис и попробуйте снова."
)
elif launch_error == "max_services":
session_notice = (
f"Есть ограничение на {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). "
@@ -1675,6 +1778,7 @@ def logout(request: Request):
@app.get("/go/{slug}")
def go_service(slug: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
log_event("session_open_requested", user_id=user.id, service_slug=slug)
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
if not service:
raise HTTPException(status_code=404, detail="Service not found")
@@ -1682,142 +1786,149 @@ def go_service(slug: str, user: User = Depends(require_user), db: Session = Depe
raise HTTPException(status_code=410, detail="VNC services are deprecated")
if not has_access(db, user.id, service.id):
raise HTTPException(status_code=403, detail="ACL denied")
with allocator_lock(db, 92000 + int(user.id)):
existing_user_session = find_active_session_for_user_service(db, user.id, service.id)
if existing_user_session:
return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303)
existing_user_session = find_active_session_for_user_service(db, user.id, service.id)
if existing_user_session:
return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303)
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
active_rows = db.scalars(
select(SessionModel).where(
SessionModel.user_id == user.id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
).all()
active_rows = sorted(active_rows, key=lambda row: row.created_at)
active_service_ids = {row.service_id for row in active_rows}
if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER:
oldest = next((row for row in active_rows if row.service_id != service.id), None)
if oldest:
terminate_session_record(db, oldest, SessionStatus.TERMINATED, stop_container=True)
db.commit()
logger.info(
"session_rotated user_id=%s closed_session=%s old_service_id=%s new_service_id=%s",
user.id,
oldest.id,
oldest.service_id,
service.id,
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
active_rows = db.scalars(
select(SessionModel).where(
SessionModel.user_id == user.id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
else:
return RedirectResponse(url="/?launch_error=max_services", status_code=303)
if service.type == ServiceType.RDP:
active_owner = find_active_session_for_service(db, service.id)
if active_owner:
if active_owner.user_id != user.id:
owner = db.get(User, active_owner.user_id)
owner_name = owner.username if owner else f"id={active_owner.user_id}"
raise HTTPException(
status_code=409,
detail=f"RDP сервис уже занят пользователем {owner_name}. Попробуйте позже.",
)
return RedirectResponse(url=session_redirect_url(active_owner), status_code=303)
session_id = str(uuid.uuid4())
if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0:
try:
with allocator_lock(db, 91001):
ensure_web_pool()
slot = acquire_web_pool_slot(db)
slot_cid = f"WEBPOOLIDX:{slot}"
terminate_active_slot_sessions(db, slot_cid)
dispatch_web_pool_target(slot, service)
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=slot_cid,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
).all()
active_rows = sorted(active_rows, key=lambda row: row.created_at)
active_service_ids = {row.service_id for row in active_rows}
if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER:
oldest = next((row for row in active_rows if row.service_id != service.id), None)
if oldest:
terminate_session_record(db, oldest, SessionStatus.ROTATED, stop_container=True)
db.commit()
except Exception as exc:
logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
raise HTTPException(status_code=502, detail="WEB runtime failed to switch target")
audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
if service_uses_universal_pool(service):
try:
with allocator_lock(db, 91002):
ensure_universal_pool()
slot = acquire_universal_slot(db)
slot_cid = f"POOLIDX:{slot}"
terminate_active_slot_sessions(db, slot_cid)
dispatch_universal_target(slot, service)
session_obj = SessionModel(
id=session_id,
log_event(
"session_rotated",
user_id=user.id,
service_id=service.id,
container_id=slot_cid,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
closed_session_id=oldest.id,
closed_service_id=oldest.service_id,
new_service_id=service.id,
)
db.add(session_obj)
db.commit()
except Exception as exc:
logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
raise HTTPException(status_code=502, detail="Universal runtime failed to switch target")
audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
else:
return RedirectResponse(url="/?launch_error=max_services", status_code=303)
if service.type == ServiceType.RDP:
active_owner = find_active_session_for_service(db, service.id)
if active_owner:
if active_owner.user_id != user.id:
owner = db.get(User, active_owner.user_id)
owner_name = owner.username if owner else f"id={active_owner.user_id}"
raise HTTPException(
status_code=409,
detail=f"RDP сервис уже занят пользователем {owner_name}. Попробуйте позже.",
)
return RedirectResponse(url=session_redirect_url(active_owner), status_code=303)
session_id = str(uuid.uuid4())
if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0:
try:
with allocator_lock(db, 91001):
ensure_web_pool()
slot = acquire_web_pool_slot(db)
slot_cid = f"WEBPOOLIDX:{slot}"
terminate_active_slot_sessions(db, slot_cid)
dispatch_web_pool_target(slot, service)
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=slot_cid,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
except Exception as exc:
logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="web_pool", error=str(exc))
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
raise HTTPException(status_code=502, detail="WEB runtime failed to switch target")
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="web_pool", slot=slot)
audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
if service_uses_universal_pool(service):
try:
with allocator_lock(db, 91002):
ensure_universal_pool()
slot = acquire_universal_slot(db)
slot_cid = f"POOLIDX:{slot}"
terminate_active_slot_sessions(db, slot_cid)
dispatch_universal_target(slot, service)
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=slot_cid,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
except Exception as exc:
logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="universal_pool", error=str(exc))
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
raise HTTPException(status_code=502, detail="Universal runtime failed to switch target")
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="universal_pool", slot=slot)
audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
if service.type == ServiceType.WEB and desired_pool_size(service) > 0:
ensure_warm_pool(service)
open_warm_web_url(service, service.target)
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=f"POOL:{service.slug}",
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="warm_pool")
audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
try:
container_id = create_runtime_container(service, session_id)
except Exception as exc:
logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id)
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="single_runtime", error=str(exc))
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
raise HTTPException(status_code=502, detail="Session runtime failed to start")
if service.type == ServiceType.WEB and desired_pool_size(service) > 0:
ensure_warm_pool(service)
open_warm_web_url(service, service.target)
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=f"POOL:{service.slug}",
container_id=container_id,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id)
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="single_runtime", container_id=container_id)
audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id)
ready = wait_for_session_route(session_id)
log_event("session_route_ready", session_id=session_id, ready=ready)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
try:
container_id = create_runtime_container(service, session_id)
except Exception as exc:
logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id)
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
raise HTTPException(status_code=502, detail="Session runtime failed to start")
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=container_id,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id)
ready = wait_for_session_route(session_id)
logger.info("session_route_ready session_id=%s ready=%s", session_id, ready)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
@app.get("/svc/{slug}/", response_class=HTMLResponse)
def service_wait_page(slug: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
@@ -1985,7 +2096,23 @@ def touch_session(session_id: str, user: User = Depends(require_user), db: Sessi
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
raise HTTPException(status_code=410, detail="Session expired")
reason = session_closed_reason(sess, db)
log_event(
"session_touch_rejected",
level=logging.WARNING,
session_id=session_id,
user_id=user.id,
status=sess.status.value,
reason=reason,
)
return JSONResponse(
status_code=410,
content={
"ok": False,
"reason": reason,
"status": sess.status.value,
},
)
sess.last_access_at = now_utc()
db.commit()
return {"ok": True}
@@ -1997,9 +2124,17 @@ def close_session(session_id: str, user: User = Depends(require_user), db: Sessi
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
log_event(
"session_close_already_closed",
session_id=session_id,
user_id=user.id,
status=sess.status.value,
reason=session_closed_reason(sess, db),
)
return {"ok": True, "status": sess.status.value}
terminate_session_record(db, sess, SessionStatus.TERMINATED, stop_container=True)
db.commit()
log_event("session_closed_by_user", session_id=session_id, user_id=user.id)
return {"ok": True, "status": "TERMINATED"}