import datetime as dt import enum import fcntl import json import re import logging import os from pathlib import Path import secrets import threading import time import uuid import contextvars from urllib.parse import parse_qs, unquote, urlparse from typing import Optional import docker import requests import mistune from fastapi import Depends, FastAPI, File, Form, HTTPException, Query, Request, UploadFile, status from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from itsdangerous import BadSignature, URLSafeTimedSerializer from markupsafe import Markup, escape from passlib.context import CryptContext from sqlalchemy import ( Boolean, DateTime, Enum, ForeignKey, Integer, String, Text, UniqueConstraint, create_engine, select, text, ) from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, sessionmaker DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+psycopg2://portal:portal@db:5432/portal") COOKIE_NAME = "portal_auth" CSRF_COOKIE = "csrf_token" COOKIE_MAX_AGE = 8 * 60 * 60 SESSION_IDLE_SECONDS = int(os.getenv("SESSION_IDLE_SECONDS", "7200")) PUBLIC_HOST = os.getenv("PUBLIC_HOST", "stend.4mont.ru") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() LOG_SLOW_REQUEST_MS = int(os.getenv("LOG_SLOW_REQUEST_MS", "2000")) GO_USER_LOCK_TIMEOUT_SECONDS = float(os.getenv("GO_USER_LOCK_TIMEOUT_SECONDS", "8.0")) GO_POOL_LOCK_TIMEOUT_SECONDS = float(os.getenv("GO_POOL_LOCK_TIMEOUT_SECONDS", "20.0")) POOL_DISPATCH_RETRIES = int(os.getenv("POOL_DISPATCH_RETRIES", "6")) POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS = float(os.getenv("POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS", "2.0")) POOL_DISPATCH_SLEEP_SECONDS = float(os.getenv("POOL_DISPATCH_SLEEP_SECONDS", "0.3")) TRAEFIK_INTERNAL_URL = os.getenv("TRAEFIK_INTERNAL_URL", "http://traefik") PREWARM_POOL_SIZE = int(os.getenv("PREWARM_POOL_SIZE", "2")) UNIVERSAL_POOL_SIZE = int(os.getenv("UNIVERSAL_POOL_SIZE", "0")) WEB_POOL_SIZE = int(os.getenv("WEB_POOL_SIZE", "20")) WEB_POOL_BUFFER = int(os.getenv("WEB_POOL_BUFFER", "2")) X11VNC_FLAGS = os.getenv("X11VNC_FLAGS", "-wait 5 -defer 5 -threads") MAX_ACTIVE_SERVICES_PER_USER = int(os.getenv("MAX_ACTIVE_SERVICES_PER_USER", "4")) WEB_RESOLUTION_MIN_WIDTH = int(os.getenv("WEB_RESOLUTION_MIN_WIDTH", "1024")) WEB_RESOLUTION_MIN_HEIGHT = int(os.getenv("WEB_RESOLUTION_MIN_HEIGHT", "720")) WEB_RESOLUTION_MAX_WIDTH = int(os.getenv("WEB_RESOLUTION_MAX_WIDTH", "3840")) WEB_RESOLUTION_MAX_HEIGHT = int(os.getenv("WEB_RESOLUTION_MAX_HEIGHT", "2160")) ENABLE_STARTUP_MAINTENANCE = os.getenv("ENABLE_STARTUP_MAINTENANCE", "1") == "1" ICON_UPLOAD_MAX_BYTES = 2 * 1024 * 1024 ICON_UPLOAD_TYPES = { "image/png": "png", "image/jpeg": "jpg", "image/webp": "webp", } SERVICE_ICONS_DIR = Path("static/service-icons") logging.basicConfig( level=LOG_LEVEL, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) logger = logging.getLogger("portal") request_id_ctx = contextvars.ContextVar("request_id", default="-") maintenance_lock_file = None def _normalize_log_value(value): if isinstance(value, (str, int, float, bool)) or value is None: return value if isinstance(value, dt.datetime): return value.isoformat() return str(value) def log_event(event: str, level: int = logging.INFO, **fields) -> None: payload = {"event": event, "req_id": request_id_ctx.get()} for key, value in fields.items(): payload[key] = _normalize_log_value(value) logger.log(level, json.dumps(payload, ensure_ascii=False, separators=(",", ":"))) SIGNING_KEY = os.getenv("SIGNING_KEY", secrets.token_urlsafe(32)) serializer = URLSafeTimedSerializer(SIGNING_KEY, salt="portal-auth") pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") engine = create_engine(DATABASE_URL, pool_pre_ping=True) SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) templates = Jinja2Templates(directory="templates") app = FastAPI(title="МОНТ - инфрастуктурный полигон") app.mount("/static", StaticFiles(directory="static"), name="static") @app.middleware("http") async def request_logging_middleware(request: Request, call_next): req_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8]) token = request_id_ctx.set(req_id) started = time.time() client_ip = request.client.host if request.client else "-" user_agent = request.headers.get("user-agent", "-") try: response = await call_next(request) except Exception: log_event( "request_failed", level=logging.ERROR, method=request.method, path=request.url.path, client_ip=client_ip, user_agent=user_agent, ) request_id_ctx.reset(token) raise duration_ms = int((time.time() - started) * 1000) level = logging.INFO if response.status_code >= 500: level = logging.ERROR elif response.status_code >= 400: level = logging.WARNING log_event( "request", level=level, method=request.method, path=request.url.path, query=str(request.url.query or ""), status=response.status_code, duration_ms=duration_ms, client_ip=client_ip, user_agent=user_agent, ) if duration_ms >= LOG_SLOW_REQUEST_MS: log_event( "slow_request", level=logging.WARNING, method=request.method, path=request.url.path, duration_ms=duration_ms, threshold_ms=LOG_SLOW_REQUEST_MS, ) response.headers["X-Request-ID"] = req_id request_id_ctx.reset(token) return response class Base(DeclarativeBase): pass class ServiceType(str, enum.Enum): WEB = "WEB" VNC = "VNC" RDP = "RDP" class SessionStatus(str, enum.Enum): ACTIVE = "ACTIVE" EXPIRED = "EXPIRED" TERMINATED = "TERMINATED" ROTATED = "ROTATED" class User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(Integer, primary_key=True) username: Mapped[str] = mapped_column(String(64), unique=True, index=True) password_hash: Mapped[str] = mapped_column(String(255)) expires_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), index=True) active: Mapped[bool] = mapped_column(Boolean, default=True, index=True) is_admin: Mapped[bool] = mapped_column(Boolean, default=False) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc)) class Service(Base): __tablename__ = "services" id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String(128)) slug: Mapped[str] = mapped_column(String(64), unique=True, index=True) type: Mapped[ServiceType] = mapped_column(Enum(ServiceType), index=True) target: Mapped[str] = mapped_column(Text) comment: Mapped[str] = mapped_column(Text, default="") svc_login: Mapped[str] = mapped_column(String(256), default="") svc_password: Mapped[str] = mapped_column(String(256), default="") icon_path: Mapped[str] = mapped_column(Text, default="") active: Mapped[bool] = mapped_column(Boolean, default=True) warm_pool_size: Mapped[int] = mapped_column(Integer, default=0) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc)) class Category(Base): __tablename__ = "categories" id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String(128), unique=True, index=True) slug: Mapped[str] = mapped_column(String(64), unique=True, index=True) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc)) class ServiceCategory(Base): __tablename__ = "service_categories" __table_args__ = (UniqueConstraint("service_id", "category_id", name="uq_service_category"),) id: Mapped[int] = mapped_column(Integer, primary_key=True) service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True) category_id: Mapped[int] = mapped_column(ForeignKey("categories.id", ondelete="CASCADE"), index=True) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc)) class UserServiceAccess(Base): __tablename__ = "user_service_access" __table_args__ = (UniqueConstraint("user_id", "service_id", name="uq_user_service"),) id: Mapped[int] = mapped_column(Integer, primary_key=True) user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True) service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True) granted_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc)) class RdpSlot(Base): __tablename__ = "rdp_slots" id: Mapped[int] = mapped_column(Integer, primary_key=True) service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True) rdp_username: Mapped[str] = mapped_column(String(128)) rdp_password: Mapped[str] = mapped_column(String(256), default="") container_name: Mapped[Optional[str]] = mapped_column(String(128), nullable=True) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc)) class SessionModel(Base): __tablename__ = "sessions" id: Mapped[str] = mapped_column(String(36), primary_key=True) user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True) service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True) status: Mapped[SessionStatus] = mapped_column(Enum(SessionStatus), default=SessionStatus.ACTIVE, index=True) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True) last_access_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True) container_id: Mapped[Optional[str]] = mapped_column(String(128), nullable=True) class AuditLog(Base): __tablename__ = "audit_logs" id: Mapped[int] = mapped_column(Integer, primary_key=True) user_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, index=True) action: Mapped[str] = mapped_column(String(128), index=True) details: Mapped[str] = mapped_column(Text) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True) def now_utc() -> dt.datetime: return dt.datetime.now(dt.timezone.utc) def session_closed_reason(sess: SessionModel, db: Session) -> str: if not sess: return "idle" if sess.status == SessionStatus.EXPIRED: return "idle" if sess.status == SessionStatus.ROTATED: return "limit" if sess.status == SessionStatus.TERMINATED: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) active_rows = db.scalars( select(SessionModel).where( SessionModel.user_id == sess.user_id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) ).all() active_service_ids = {row.service_id for row in active_rows} if len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER and sess.service_id not in active_service_ids: return "limit" return "idle" def normalize_web_target(url: str) -> str: raw = (url or "").strip() if not raw: return raw if raw.startswith(("http://", "https://")): return raw return f"http://{raw}" _md = mistune.create_markdown( escape=True, plugins=["strikethrough", "table", "task_lists"], ) def format_service_comment(raw_text: str) -> Markup: raw = (raw_text or "").replace("\r\n", "\n").replace("\r", "\n").strip() if not raw: return Markup("") return Markup(_md(raw)) def parse_rdp_target(target: str) -> dict: raw = (target or "").strip() if not raw: raise HTTPException(status_code=400, detail="Empty RDP target") parsed = urlparse(raw if "://" in raw else f"//{raw}") host = parsed.hostname if not host: raise HTTPException(status_code=400, detail="Invalid RDP target. Use host:port or rdp://user:pass@host:port") port = parsed.port or 3389 username = unquote(parsed.username) if parsed.username else "" password = unquote(parsed.password) if parsed.password else "" query = parse_qs(parsed.query or "") if not username: username = (query.get("u", [""])[0] or query.get("user", [""])[0] or "").strip() if not password: password = (query.get("p", [""])[0] or query.get("password", [""])[0] or "").strip() domain = (query.get("domain", [""])[0] or query.get("d", [""])[0] or "").strip() security = (query.get("sec", [""])[0] or query.get("security", [""])[0] or "").strip().lower() if security and security not in {"nla", "tls", "rdp"}: raise HTTPException(status_code=400, detail="Invalid RDP security. Use one of: nla, tls, rdp") return { "host": host, "port": str(port), "user": username, "password": password, "domain": domain, "security": security, } def set_service_categories(db: Session, service_id: int, category_ids: list[int]) -> None: normalized = sorted({int(x) for x in (category_ids or [])}) if normalized: existing_ids = set(db.scalars(select(Category.id).where(Category.id.in_(normalized))).all()) missing = sorted(set(normalized) - existing_ids) if missing: raise HTTPException(status_code=400, detail=f"Unknown category ids: {missing}") existing_links = db.scalars(select(ServiceCategory).where(ServiceCategory.service_id == service_id)).all() current = {row.category_id: row for row in existing_links} wanted = set(normalized) for cat_id in wanted: if cat_id not in current: db.add(ServiceCategory(service_id=service_id, category_id=cat_id)) for cat_id, row in current.items(): if cat_id not in wanted: db.delete(row) def service_uses_universal_pool(service: Service) -> bool: return UNIVERSAL_POOL_SIZE > 0 and service.type == ServiceType.RDP def universal_container_name(slot: int) -> str: return f"portal-universal-{slot}" def web_pool_container_name(slot: int) -> str: return f"portal-webpool-{slot}" def ensure_icons_dir() -> None: SERVICE_ICONS_DIR.mkdir(parents=True, exist_ok=True) def remove_icon_file(icon_path: str) -> None: if not icon_path or not icon_path.startswith("/static/service-icons/"): return filename = icon_path.rsplit("/", 1)[-1] candidate = SERVICE_ICONS_DIR / filename try: candidate.unlink(missing_ok=True) except Exception: logger.exception("icon_delete_failed path=%s", candidate) async def store_service_icon(service: Service, upload: UploadFile) -> str: ensure_icons_dir() content_type = (upload.content_type or "").lower().strip() ext = ICON_UPLOAD_TYPES.get(content_type) if not ext: raise HTTPException(status_code=400, detail="Unsupported file type. Use PNG/JPG/WEBP") payload = await upload.read(ICON_UPLOAD_MAX_BYTES + 1) if len(payload) > ICON_UPLOAD_MAX_BYTES: raise HTTPException(status_code=400, detail="File too large. Max 2MB") if not payload: raise HTTPException(status_code=400, detail="Empty file") stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%d_%H%M%S") filename = f"svc_{service.id}_{stamp}.{ext}" target = SERVICE_ICONS_DIR / filename target.write_bytes(payload) return f"/static/service-icons/{filename}" def get_db(): db = SessionLocal() try: yield db finally: db.close() def audit(db: Session, action: str, details: str, user_id: Optional[int] = None) -> None: db.add(AuditLog(user_id=user_id, action=action, details=details)) db.commit() def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(password: str, password_hash: str) -> bool: return pwd_context.verify(password, password_hash) def user_is_valid(user: User) -> bool: return bool(user.active and user.expires_at > now_utc()) def issue_auth_cookie(response: RedirectResponse, user: User) -> None: token = serializer.dumps({"user_id": user.id}) response.set_cookie( key=COOKIE_NAME, value=token, httponly=True, secure=True, samesite="strict", max_age=COOKIE_MAX_AGE, path="/", ) def issue_csrf_cookie(response: RedirectResponse) -> str: token = secrets.token_urlsafe(24) response.set_cookie( key=CSRF_COOKIE, value=token, httponly=False, secure=True, samesite="strict", max_age=COOKIE_MAX_AGE, path="/", ) return token def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]: raw = request.cookies.get(COOKIE_NAME) if not raw: return None try: payload = serializer.loads(raw, max_age=COOKIE_MAX_AGE) except BadSignature: return None user = db.get(User, int(payload["user_id"])) if not user or not user_is_valid(user): return None return user def require_user(user: Optional[User] = Depends(get_current_user)) -> User: if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") return user def require_admin(user: User = Depends(require_user)) -> User: if not user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin only") return user def validate_csrf(request: Request) -> None: cookie = request.cookies.get(CSRF_COOKIE) form_val = request.headers.get("X-CSRF-Token") if request.headers.get("content-type", "").startswith("application/x-www-form-urlencoded"): return if not cookie or not form_val or cookie != form_val: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="CSRF failed") def has_access(db: Session, user_id: int, service_id: int) -> bool: q = select(UserServiceAccess).where( UserServiceAccess.user_id == user_id, UserServiceAccess.service_id == service_id, ) return db.scalar(q) is not None def docker_client(): return docker.from_env() def session_router_name(session_id: str) -> str: return f"sess-{session_id.replace('-', '')[:16]}" def _is_pool_name_conflict(exc: Exception) -> bool: msg = str(exc).lower() return ("already in use" in msg) or ("marked for removal" in msg) def _remove_container_by_name(d, name: str) -> None: try: old = d.containers.get(name) old.remove(force=True) except docker.errors.NotFound: return except Exception: logger.exception("pool_container_remove_failed name=%s", name) def ensure_universal_pool() -> None: if UNIVERSAL_POOL_SIZE <= 0: return d = docker_client() image = "portal-universal-runtime:latest" for i in range(UNIVERSAL_POOL_SIZE, 100): name = universal_container_name(i) try: c = d.containers.get(name) c.stop(timeout=5) except docker.errors.NotFound: break except Exception: logger.exception("universal_pool_scale_down_failed slot=%s", i) for i in range(UNIVERSAL_POOL_SIZE): name = universal_container_name(i) path = f"/u/{i}/" router = f"upool-{i}" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "9400", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", "portal.pool": "1", "portal.pool.slot": str(i), } env = { "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "0", "SESSION_ID": f"universal-{i}", "X11VNC_FLAGS": X11VNC_FLAGS, } try: c = d.containers.get(name) if c.status != "running": c.start() continue except docker.errors.NotFound: pass except Exception: logger.exception("universal_pool_check_failed slot=%s", i) continue d.containers.run( image=image, name=name, detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("universal_pool_container_started slot=%s", i) def ensure_web_pool(target_size: Optional[int] = None) -> None: desired = max(0, WEB_POOL_SIZE if target_size is None else target_size) d = docker_client() image = "portal-universal-runtime:latest" for i in range(desired, 100): name = web_pool_container_name(i) try: c = d.containers.get(name) c.stop(timeout=5) except docker.errors.NotFound: break except Exception: logger.exception("web_pool_scale_down_failed slot=%s", i) for i in range(desired): name = web_pool_container_name(i) path = f"/w/{i}/" router = f"wpool-{i}" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "9450", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", "portal.pool": "1", "portal.pool.kind": "web", "portal.pool.slot": str(i), } env = { "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "0", "SESSION_ID": f"webpool-{i}", "X11VNC_FLAGS": X11VNC_FLAGS, } should_create = False try: c = d.containers.get(name) if c.status != "running": try: c.start() except docker.errors.APIError as exc: if _is_pool_name_conflict(exc): logger.warning("web_pool_recreate_needed slot=%s reason=name-conflict", i) _remove_container_by_name(d, name) should_create = True else: raise if not should_create: continue except docker.errors.NotFound: should_create = True except Exception: logger.exception("web_pool_check_failed slot=%s", i) continue for attempt in range(3): try: d.containers.run( image=image, name=name, detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("web_pool_container_started slot=%s", i) break except docker.errors.APIError as exc: if _is_pool_name_conflict(exc) and attempt < 2: logger.warning("web_pool_run_conflict_retry slot=%s attempt=%s", i, attempt + 1) _remove_container_by_name(d, name) time.sleep(0.25) continue logger.exception("web_pool_run_failed slot=%s", i) break def get_universal_pool_status() -> dict: desired = max(0, UNIVERSAL_POOL_SIZE) if desired <= 0: return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []} d = docker_client() names = [universal_container_name(i) for i in range(desired)] containers = [] for name in names: try: containers.append(d.containers.get(name)) except Exception: continue running = sum(1 for c in containers if c.status == "running") health = "ok" if running >= min(desired, 1) else "down" return { "desired": desired, "running": running, "total": len(containers), "names": sorted(c.name for c in containers), "health": health, } def get_web_pool_status() -> dict: desired = max(0, WEB_POOL_SIZE) if desired <= 0: return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []} d = docker_client() names = [web_pool_container_name(i) for i in range(desired)] containers = [] for name in names: try: containers.append(d.containers.get(name)) except Exception: continue running = sum(1 for c in containers if c.status == "running") health = "ok" if running >= min(desired, 1) else "down" return { "desired": desired, "running": running, "total": len(containers), "names": sorted(c.name for c in containers), "health": health, } def acquire_universal_slot(db: Session) -> int: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.container_id.like("POOLIDX:%"), SessionModel.last_access_at >= cutoff, ) active = db.scalars(q).all() busy = set() for sess in active: try: busy.add(int((sess.container_id or "").split(":", 1)[1])) except Exception: continue for i in range(max(0, UNIVERSAL_POOL_SIZE)): if i not in busy: return i if active: victim = min(active, key=lambda s: s.last_access_at) victim.status = SessionStatus.TERMINATED db.commit() try: return int((victim.container_id or "").split(":", 1)[1]) except Exception: pass return 0 def acquire_web_pool_slot(db: Session) -> int: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.container_id.like("WEBPOOLIDX:%"), SessionModel.last_access_at >= cutoff, ) active = db.scalars(q).all() busy = set() for sess in active: try: busy.add(int((sess.container_id or "").split(":", 1)[1])) except Exception: continue # Keep headroom: when active sessions are close to hot pool capacity, # proactively warm up extra slots. auto_target = max(WEB_POOL_SIZE, len(active) + max(0, WEB_POOL_BUFFER)) if auto_target > WEB_POOL_SIZE: ensure_web_pool(auto_target) for i in range(max(0, auto_target)): if i not in busy: return i return 0 def sanitize_client_resolution(width: Optional[int], height: Optional[int]) -> tuple[Optional[int], Optional[int]]: if width is None or height is None: return None, None clamped_width = max(WEB_RESOLUTION_MIN_WIDTH, min(int(width), WEB_RESOLUTION_MAX_WIDTH)) clamped_height = max(WEB_RESOLUTION_MIN_HEIGHT, min(int(height), WEB_RESOLUTION_MAX_HEIGHT)) return clamped_width, clamped_height def dispatch_universal_target(slot: int, service: Service, width: Optional[int] = None, height: Optional[int] = None) -> None: name = universal_container_name(slot) url = "" payload = {} if service.type == ServiceType.WEB: url = f"http://{name}:7000/open" payload = {"url": normalize_web_target(service.target)} width, height = sanitize_client_resolution(width, height) if width and height: payload["width"] = width payload["height"] = height elif service.type == ServiceType.RDP: cfg = parse_rdp_target(service.target) url = f"http://{name}:7000/rdp" payload = { "host": cfg["host"], "port": cfg["port"], "user": cfg["user"], "password": cfg["password"], "domain": cfg["domain"], "security": cfg["security"], } else: raise HTTPException(status_code=400, detail="Universal pool supports WEB/RDP only") last_exc = None for _ in range(max(1, POOL_DISPATCH_RETRIES)): try: resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS) resp.raise_for_status() return except Exception as exc: last_exc = exc time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS)) if last_exc: raise last_exc def dispatch_web_pool_target(slot: int, service: Service, width: Optional[int] = None, height: Optional[int] = None) -> None: name = web_pool_container_name(slot) target_url = normalize_web_target(service.target) url = f"http://{name}:7000/open" payload = {"url": target_url} width, height = sanitize_client_resolution(width, height) if width and height: payload["width"] = width payload["height"] = height last_exc = None for _ in range(max(1, POOL_DISPATCH_RETRIES)): try: resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS) resp.raise_for_status() return except Exception as exc: last_exc = exc time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS)) if last_exc: raise last_exc def create_runtime_container(service: Service, session_id: str): d = docker_client() router = session_router_name(session_id) path = f"/s/{session_id}/" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "10000", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", } env = { "SESSION_ID": session_id, "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "1", "TOUCH_PATH": f"/api/sessions/{session_id}/touch", "X11VNC_FLAGS": X11VNC_FLAGS, } image = "portal-kiosk:latest" if service.type == ServiceType.WEB: env["TARGET_URL"] = service.target env["HOME_URL"] = f"https://{PUBLIC_HOST}/" elif service.type == ServiceType.RDP: image = "portal-rdp-proxy:latest" cfg = parse_rdp_target(service.target) env["RDP_HOST"] = cfg["host"] env["RDP_PORT"] = cfg["port"] if cfg["user"]: env["RDP_USER"] = cfg["user"] if cfg["password"]: env["RDP_PASSWORD"] = cfg["password"] if cfg["domain"]: env["RDP_DOMAIN"] = cfg["domain"] if cfg["security"]: env["RDP_SECURITY"] = cfg["security"] else: raise HTTPException(status_code=400, detail="Unsupported service type") container = d.containers.run( image=image, name=f"portal-sess-{session_id[:8]}", detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("session_container_started session_id=%s container_id=%s service_type=%s", session_id, container.id, service.type.value) return container.id def ensure_warm_pool(service: Service, pool_size: Optional[int] = None) -> None: if service_uses_universal_pool(service): return if pool_size is None: pool_size = desired_pool_size(service) if pool_size <= 0: # Stop stale warm containers for this service when pool is disabled. prefix = f"portal-warm-{service.slug}-" try: d = docker_client() for c in d.containers.list(all=True, filters={"name": prefix}): if c.name.startswith(prefix): c.stop(timeout=5) except Exception: logger.exception("warm_pool_disable_failed service=%s", service.slug) return d = docker_client() router = f"warm-{service.slug}" svc_name = f"warmsvc-{service.slug}" path = f"/svc/{service.slug}/" image = "portal-kiosk:latest" base_env = { "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "0", "TOUCH_PATH": "", "X11VNC_FLAGS": X11VNC_FLAGS, } if service.type == ServiceType.WEB: base_env["UNIVERSAL_WEB"] = "1" base_env["START_URL"] = normalize_web_target(service.target) base_env["HOME_URL"] = f"https://{PUBLIC_HOST}/" elif service.type == ServiceType.RDP: image = "portal-rdp-proxy:latest" cfg = parse_rdp_target(service.target) base_env["RDP_HOST"] = cfg["host"] base_env["RDP_PORT"] = cfg["port"] if cfg["user"]: base_env["RDP_USER"] = cfg["user"] if cfg["password"]: base_env["RDP_PASSWORD"] = cfg["password"] if cfg["domain"]: base_env["RDP_DOMAIN"] = cfg["domain"] if cfg["security"]: base_env["RDP_SECURITY"] = cfg["security"] else: raise HTTPException(status_code=400, detail="Unsupported service type") labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "9500", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{svc_name}.loadbalancer.server.port": "6080", f"traefik.http.routers.{router}.service": svc_name, "portal.warm": "1", "portal.service.slug": service.slug, "portal.service.type": service.type.value, } # Ensure desired cardinality. for i in range(pool_size, 50): name = f"portal-warm-{service.slug}-{i}" try: c = d.containers.get(name) c.stop(timeout=5) except docker.errors.NotFound: break except Exception: logger.exception("warm_pool_scale_down_failed service=%s idx=%s", service.slug, i) for i in range(pool_size): name = f"portal-warm-{service.slug}-{i}" try: c = d.containers.get(name) if c.status != "running": c.start() continue except docker.errors.NotFound: pass except Exception: logger.exception("warm_pool_check_failed service=%s idx=%s", service.slug, i) continue env = dict(base_env) env["SESSION_ID"] = f"warm-{service.slug}-{i}" d.containers.run( image=image, name=name, detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("warm_pool_container_started service=%s idx=%s", service.slug, i) def wait_for_session_route(session_id: str, timeout_seconds: int = 6) -> bool: target = f"{TRAEFIK_INTERNAL_URL}/s/{session_id}/" deadline = time.time() + timeout_seconds while time.time() < deadline: try: resp = requests.get( target, headers={"Host": PUBLIC_HOST}, allow_redirects=False, timeout=1.5, ) if resp.status_code != 404: return True except Exception: pass time.sleep(0.3) return False def route_ready(path: str) -> bool: bases = [TRAEFIK_INTERNAL_URL] if TRAEFIK_INTERNAL_URL.startswith("http://"): bases.append("https://" + TRAEFIK_INTERNAL_URL[len("http://"):]) for base in bases: try: verify = not base.startswith("https://") resp = requests.get( f"{base}{path}", headers={"Host": PUBLIC_HOST}, allow_redirects=False, timeout=1.5, verify=verify, ) if resp.status_code != 404: return True except Exception: continue return False def container_running(container_id: Optional[str]) -> bool: if not container_id: return False if ( container_id.startswith("POOL:") or container_id.startswith("POOLIDX:") or container_id.startswith("WEBPOOLIDX:") ): return True if container_id.startswith("RDPSLOT:"): try: slot_id = int(container_id.split(":", 1)[1]) db = SessionLocal() try: slot = db.get(RdpSlot, slot_id) if not slot or not slot.container_name: return False c = docker_client().containers.get(slot.container_name) return c.status == "running" finally: db.close() except Exception: return False try: c = docker_client().containers.get(container_id) return c.status == "running" except Exception: return False def _rdp_slot_container_name(service_slug: str, slot_id: int) -> str: return f"portal-rdpslot-{service_slug}-{slot_id}" def start_rdp_slot_container(slot: RdpSlot, service: Service) -> str: d = docker_client() name = _rdp_slot_container_name(service.slug, slot.id) slot_id = slot.id path = f"/rdp/{slot_id}/" router = f"rdpslot-{slot_id}" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "10000", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", "portal.rdpslot": "1", "portal.rdpslot.id": str(slot_id), "portal.service.slug": service.slug, } cfg = parse_rdp_target(service.target) env = { "SESSION_ID": f"rdpslot-{slot_id}", "IDLE_TIMEOUT": "86400", "ENABLE_HEARTBEAT": "0", "RDP_HOST": cfg["host"], "RDP_PORT": cfg["port"], "X11VNC_FLAGS": X11VNC_FLAGS, } if slot.rdp_username: env["RDP_USER"] = slot.rdp_username if slot.rdp_password: env["RDP_PASSWORD"] = slot.rdp_password if cfg.get("domain"): env["RDP_DOMAIN"] = cfg["domain"] if cfg.get("security"): env["RDP_SECURITY"] = cfg["security"] try: existing = d.containers.get(name) existing.stop(timeout=5) existing.remove(force=True) except docker.errors.NotFound: pass except Exception: logger.exception("rdp_slot_container_cleanup_failed slot_id=%s", slot_id) container = d.containers.run( "portal-rdp-proxy:latest", name=name, detach=True, restart_policy={"Name": "unless-stopped"}, network="portal_net", labels=labels, environment=env, ) logger.info("rdp_slot_container_started slot_id=%s name=%s", slot_id, name) return container.name def stop_rdp_slot_container(container_name: str) -> None: if not container_name: return try: d = docker_client() c = d.containers.get(container_name) c.stop(timeout=5) c.remove(force=True) logger.info("rdp_slot_container_stopped name=%s", container_name) except docker.errors.NotFound: pass except Exception: logger.exception("rdp_slot_container_stop_failed container=%s", container_name) def _restart_rdp_slot_bg(slot_id: int) -> None: db = SessionLocal() try: slot = db.get(RdpSlot, slot_id) if not slot or not slot.container_name: return service = db.get(Service, slot.service_id) if not service: return try: d = docker_client() c = d.containers.get(slot.container_name) c.restart(timeout=10) logger.info("rdp_slot_container_restarted slot_id=%s", slot_id) except docker.errors.NotFound: start_rdp_slot_container(slot, service) except Exception: logger.exception("rdp_slot_container_restart_failed slot_id=%s", slot_id) finally: db.close() def stop_runtime_container(container_id: Optional[str]) -> None: if not container_id: return try: d = docker_client() c = d.containers.get(container_id) c.stop(timeout=5) except Exception: logger.exception("session_container_stop_failed container_id=%s", container_id) def terminate_session_record( db: Session, sess: SessionModel, new_status: SessionStatus = SessionStatus.TERMINATED, *, stop_container: bool = True, ) -> None: if not sess or sess.status != SessionStatus.ACTIVE: return old_status = sess.status cid = sess.container_id or "" if stop_container and cid and not cid.startswith(("POOL:", "POOLIDX:", "WEBPOOLIDX:", "RDPSLOT:")): stop_runtime_container(cid) if cid.startswith("RDPSLOT:"): try: slot_id = int(cid.split(":", 1)[1]) threading.Thread(target=_restart_rdp_slot_bg, args=(slot_id,), daemon=True).start() except Exception: logger.exception("rdp_slot_restart_schedule_failed cid=%s", cid) sess.status = new_status sess.last_access_at = now_utc() log_event( "session_closed", level=logging.INFO, session_id=sess.id, user_id=sess.user_id, service_id=sess.service_id, container_id=cid, old_status=old_status.value if isinstance(old_status, SessionStatus) else str(old_status), new_status=new_status.value, reason=session_closed_reason(sess, db), stop_container=stop_container, ) def ensure_schema_compatibility() -> None: # PostgreSQL requires enum value addition to be committed before usage in constraints. with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: conn.execute( text( """ DO $$ BEGIN BEGIN ALTER TYPE servicetype ADD VALUE IF NOT EXISTS 'RDP'; EXCEPTION WHEN undefined_object THEN NULL; END; END $$; """ ) ) conn.execute( text( """ DO $$ BEGIN BEGIN ALTER TYPE sessionstatus ADD VALUE IF NOT EXISTS 'ROTATED'; EXCEPTION WHEN undefined_object THEN NULL; END; END $$; """ ) ) with engine.begin() as conn: conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS warm_pool_size INTEGER NOT NULL DEFAULT 0")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS comment TEXT NOT NULL DEFAULT ''")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS svc_login VARCHAR(256) NOT NULL DEFAULT ''")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS svc_password VARCHAR(256) NOT NULL DEFAULT ''")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS icon_path TEXT NOT NULL DEFAULT ''")) conn.execute( text( """ CREATE TABLE IF NOT EXISTS categories ( id SERIAL PRIMARY KEY, name VARCHAR(128) NOT NULL UNIQUE, slug VARCHAR(64) NOT NULL UNIQUE, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ) """ ) ) conn.execute( text( """ CREATE TABLE IF NOT EXISTS service_categories ( id SERIAL PRIMARY KEY, service_id INT NOT NULL REFERENCES services(id) ON DELETE CASCADE, category_id INT NOT NULL REFERENCES categories(id) ON DELETE CASCADE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (service_id, category_id) ) """ ) ) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_service_id ON service_categories(service_id)")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_category_id ON service_categories(category_id)")) # Handle installs where service type is VARCHAR + CHECK. conn.execute( text( """ DO $$ DECLARE c record; BEGIN FOR c IN SELECT conname FROM pg_constraint WHERE conrelid = 'services'::regclass AND contype = 'c' AND pg_get_constraintdef(oid) ILIKE '%type%' LOOP EXECUTE format('ALTER TABLE services DROP CONSTRAINT %I', c.conname); END LOOP; ALTER TABLE services ADD CONSTRAINT services_type_check CHECK (type IN ('WEB','VNC','RDP')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; """ ) ) def desired_pool_size(service: Service) -> int: if not service.active: return 0 if service.type == ServiceType.RDP and not service_uses_universal_pool(service): # RDP runs on-demand per user session; no prewarmed pool. return 0 if service_uses_universal_pool(service): return UNIVERSAL_POOL_SIZE return service.warm_pool_size if service.warm_pool_size and service.warm_pool_size > 0 else PREWARM_POOL_SIZE def get_warm_containers_for_service(service: Service) -> list: prefix = f"portal-warm-{service.slug}-" try: d = docker_client() containers = [] for c in d.containers.list(all=True, filters={"name": prefix}): if c.name.startswith(prefix): containers.append(c) return containers except Exception: logger.exception("pool_status_failed service=%s", service.slug) return [] def get_pool_status_for_service(service: Service) -> dict: if service.type == ServiceType.WEB: return get_web_pool_status() if service.type == ServiceType.RDP and not service_uses_universal_pool(service): return {"desired": 0, "running": 0, "total": 0, "names": [], "health": "n/a"} if service_uses_universal_pool(service): return get_universal_pool_status() desired = desired_pool_size(service) containers = get_warm_containers_for_service(service) running = sum(1 for c in containers if c.status == "running") states = [(c.attrs.get("State") or {}).get("Status", c.status) for c in containers] has_bad = any(s in {"exited", "dead"} for s in states) total = len(containers) if running == 0: health = "down" elif running >= min(desired, 1) and not has_bad: health = "ok" else: health = "degraded" return { "desired": desired, "running": running, "total": total, "names": sorted(c.name for c in containers), "health": health, } def get_pool_detailed_status(service: Service) -> dict: if service.type == ServiceType.WEB: d = docker_client() pool = get_web_pool_status() details = [] for i in range(max(0, pool["desired"])): name = web_pool_container_name(i) try: c = d.containers.get(name) except Exception: continue attrs = c.attrs or {} state = (attrs.get("State") or {}).get("Status", c.status) details.append( { "name": c.name, "status": c.status, "state": state, "created": attrs.get("Created", ""), "image": c.image.tags[0] if c.image.tags else "", "labels_ok": True, } ) return { "service_id": service.id, "slug": service.slug, "type": service.type.value, "desired": pool["desired"], "running": pool["running"], "total": pool["total"], "health": pool["health"], "containers": details, "updated_at": now_utc().isoformat(), } if service_uses_universal_pool(service): d = docker_client() pool = get_universal_pool_status() details = [] for i in range(max(0, UNIVERSAL_POOL_SIZE)): name = universal_container_name(i) try: c = d.containers.get(name) except Exception: continue attrs = c.attrs or {} state = (attrs.get("State") or {}).get("Status", c.status) details.append( { "name": c.name, "status": c.status, "state": state, "created": attrs.get("Created", ""), "image": c.image.tags[0] if c.image.tags else "", "labels_ok": True, } ) return { "service_id": service.id, "slug": service.slug, "type": service.type.value, "desired": pool["desired"], "running": pool["running"], "total": pool["total"], "health": pool["health"], "containers": details, "updated_at": now_utc().isoformat(), } containers = get_warm_containers_for_service(service) pool = get_pool_status_for_service(service) details = [] for c in sorted(containers, key=lambda x: x.name): attrs = c.attrs or {} state = (attrs.get("State") or {}).get("Status", c.status) created = attrs.get("Created", "") labels = attrs.get("Config", {}).get("Labels", {}) or {} labels_ok = ( labels.get("portal.warm") == "1" and labels.get("portal.service.slug") == service.slug and labels.get("portal.service.type") == service.type.value ) details.append( { "name": c.name, "status": c.status, "state": state, "created": created, "image": c.image.tags[0] if c.image.tags else "", "labels_ok": labels_ok, } ) return { "service_id": service.id, "slug": service.slug, "type": service.type.value, "desired": pool["desired"], "running": pool["running"], "total": pool["total"], "health": pool["health"], "containers": details, "updated_at": now_utc().isoformat(), } def get_active_sessions_count(db: Session, service_id: int) -> int: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.service_id == service_id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) sessions = db.scalars(q).all() # Avoid inflated stats when pooled slot sessions were duplicated by race: # for pooled sessions, occupancy is unique container_id. pooled = [s for s in sessions if (s.container_id or "").startswith(("WEBPOOLIDX:", "POOLIDX:", "POOL:"))] direct = [s for s in sessions if s not in pooled] unique_pooled = len({s.container_id for s in pooled if s.container_id}) return unique_pooled + len(direct) def find_active_session_for_service(db: Session, service_id: int) -> Optional[SessionModel]: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = ( select(SessionModel) .where( SessionModel.service_id == service_id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) .order_by(SessionModel.created_at.desc()) ) return db.scalars(q).first() def find_active_session_for_user_service(db: Session, user_id: int, service_id: int) -> Optional[SessionModel]: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = ( select(SessionModel) .where( SessionModel.user_id == user_id, SessionModel.service_id == service_id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) .order_by(SessionModel.created_at.desc()) ) return db.scalars(q).first() class LockTimeoutError(Exception): pass def allocator_lock(db: Session, lock_id: int, timeout_seconds: Optional[float] = None, poll_seconds: float = 0.05): class _LockCtx: def __enter__(self_nonlocal): self_nonlocal._acquired = False if timeout_seconds is None: db.execute(text("SELECT pg_advisory_xact_lock(:lid)"), {"lid": lock_id}) self_nonlocal._acquired = True return self_nonlocal deadline = time.monotonic() + max(0.0, timeout_seconds) while time.monotonic() <= deadline: got = db.execute(text("SELECT pg_try_advisory_xact_lock(:lid)"), {"lid": lock_id}).scalar() if got: self_nonlocal._acquired = True return self_nonlocal time.sleep(max(0.01, poll_seconds)) raise LockTimeoutError(f"advisory lock timeout lock_id={lock_id} timeout={timeout_seconds}") return self_nonlocal def __exit__(self_nonlocal, exc_type, exc, tb): return False return _LockCtx() def terminate_active_slot_sessions(db: Session, container_id: str) -> None: if not container_id: return db.execute( text( """ UPDATE sessions SET status = 'TERMINATED' WHERE container_id = :cid AND status = 'ACTIVE' """ ), {"cid": container_id}, ) def session_redirect_url(sess: SessionModel) -> str: cid = sess.container_id or "" if cid.startswith("POOL:") or cid.startswith("POOLIDX:") or cid.startswith("WEBPOOLIDX:") or cid.startswith("RDPSLOT:"): return f"/s/{sess.id}/view" return f"/s/{sess.id}/" def open_warm_web_url(service: Service, target_url: str) -> None: if service_uses_universal_pool(service): return if service.type != ServiceType.WEB: return target_url = normalize_web_target(target_url) try: d = docker_client() containers = d.containers.list( filters={ "label": [ "portal.warm=1", f"portal.service.slug={service.slug}", "portal.service.type=WEB", ] } ) for c in containers: try: resp = requests.post( f"http://{c.name}:7000/open", json={"url": target_url}, timeout=2, ) resp.raise_for_status() logger.info("warm_web_open_ok service=%s container=%s url=%s", service.slug, c.name, target_url) except Exception: logger.exception("warm_web_open_failed service=%s container=%s", service.slug, c.name) except Exception: logger.exception("warm_web_open_dispatch_failed service=%s", service.slug) def cleanup_loop(): while True: time.sleep(60) db = SessionLocal() try: ensure_universal_pool() ensure_web_pool() for svc in db.scalars( select(Service).where( Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) ).all(): if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0: ensure_warm_pool(svc) cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at < cutoff, ) stale = db.scalars(q).all() rdp_slots_to_restart: list[int] = [] for sess in stale: cid = sess.container_id or "" if cid.startswith("RDPSLOT:"): try: rdp_slots_to_restart.append(int(cid.split(":", 1)[1])) except Exception: pass elif cid and not ( cid.startswith("POOL:") or cid.startswith("POOLIDX:") or cid.startswith("WEBPOOLIDX:") ): stop_runtime_container(cid) sess.status = SessionStatus.EXPIRED if stale: db.commit() for slot_id in rdp_slots_to_restart: threading.Thread(target=_restart_rdp_slot_bg, args=(slot_id,), daemon=True).start() except Exception: db.rollback() logger.exception("cleanup_loop_failed") finally: db.close() def bootstrap_admin(): admin_user = os.getenv("ADMIN_USERNAME", "admin") admin_password = os.getenv("ADMIN_PASSWORD", "change_me") ttl_days = int(os.getenv("ADMIN_TTL_DAYS", "3650")) db = SessionLocal() try: existing = db.scalar(select(User).where(User.username == admin_user)) if not existing: db.add( User( username=admin_user, password_hash=hash_password(admin_password), active=True, is_admin=True, expires_at=now_utc() + dt.timedelta(days=ttl_days), ) ) db.commit() finally: db.close() def try_acquire_maintenance_leader() -> bool: global maintenance_lock_file if maintenance_lock_file is not None: return True lock_file = open("/tmp/portal-maintenance.lock", "w") try: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except BlockingIOError: lock_file.close() return False maintenance_lock_file = lock_file return True def run_maintenance_service() -> None: logger.info("maintenance_service_bootstrap_started") with open("/tmp/portal-schema.lock", "w") as lock_file: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) Base.metadata.create_all(bind=engine) ensure_schema_compatibility() fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) ensure_icons_dir() bootstrap_admin() maintenance_lock = open("/tmp/portal-maintenance.lock", "w") fcntl.flock(maintenance_lock.fileno(), fcntl.LOCK_EX) logger.info("maintenance_service_leader_acquired") db = SessionLocal() try: ensure_universal_pool() ensure_web_pool() for svc in db.scalars( select(Service).where( Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) ).all(): if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0: ensure_warm_pool(svc) finally: db.close() logger.info("maintenance_service_loop_started") cleanup_loop() @app.on_event("startup") def startup_event(): # Multiple uvicorn workers run startup in parallel. Serialize schema bootstrap # to avoid DDL races on first run and during schema extension. with open("/tmp/portal-schema.lock", "w") as lock_file: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) Base.metadata.create_all(bind=engine) ensure_schema_compatibility() fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) ensure_icons_dir() bootstrap_admin() if not try_acquire_maintenance_leader(): logger.info("maintenance_leader_skipped") return if ENABLE_STARTUP_MAINTENANCE: db = SessionLocal() try: ensure_universal_pool() ensure_web_pool() for svc in db.scalars( select(Service).where( Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) ).all(): if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0: ensure_warm_pool(svc) elif svc.type == ServiceType.RDP: slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == svc.id)).all() for slot in slots: try: cname = _rdp_slot_container_name(svc.slug, slot.id) try: c = docker_client().containers.get(cname) if c.status != "running": c.start() except docker.errors.NotFound: start_rdp_slot_container(slot, svc) slot.container_name = cname except Exception: logger.exception("startup_rdp_slot_start_failed slot_id=%s", slot.id) if slots: db.commit() finally: db.close() thread = threading.Thread(target=cleanup_loop, daemon=True) thread.start() logger.info("maintenance_leader_started") @app.get("/", response_class=HTMLResponse) def index(request: Request, user: Optional[User] = Depends(get_current_user), db: Session = Depends(get_db)): session_closed = (request.query_params.get("session_closed") or "").strip().lower() launch_error = (request.query_params.get("launch_error") or "").strip().lower() session_notice = "" if session_closed == "idle": session_notice = "Сессия была закрыта из-за простоя. Откройте сервис заново." elif session_closed == "limit": session_notice = ( f"Сессия была закрыта из-за лимита в {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). " "Освободите один сервис и попробуйте снова." ) elif launch_error == "max_services": session_notice = ( f"Есть ограничение на {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). " "Освободите один сервис и попробуйте снова." ) if not user: csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) response = templates.TemplateResponse( "login.html", { "request": request, "csrf_token": csrf, "login_error": "", "session_notice": session_notice, }, ) response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/") return response services = db.scalars( select(Service) .join(UserServiceAccess, UserServiceAccess.service_id == Service.id) .where( UserServiceAccess.user_id == user.id, Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) .order_by(Service.name) ).all() service_categories = {svc.id: [] for svc in services} categories = [] if services: service_ids = [svc.id for svc in services] rows = db.execute( select(ServiceCategory.service_id, Category.id, Category.name, Category.slug) .join(Category, Category.id == ServiceCategory.category_id) .where(ServiceCategory.service_id.in_(service_ids)) .order_by(Category.name) ).all() category_map = {} for service_id, category_id, category_name, category_slug in rows: service_categories.setdefault(service_id, []).append( { "id": category_id, "name": category_name, "slug": category_slug, } ) if category_id not in category_map: category_map[category_id] = {"id": category_id, "name": category_name, "slug": category_slug} categories = sorted(category_map.values(), key=lambda x: x["name"].lower()) selected_category_slug = (request.query_params.get("category") or "").strip().lower() if selected_category_slug: services = [ svc for svc in services if any(cat["slug"] == selected_category_slug for cat in service_categories.get(svc.id, [])) ] service_comment_html = {svc.id: format_service_comment(svc.comment) for svc in services} return templates.TemplateResponse( "dashboard.html", { "request": request, "user": user, "services": services, "categories": categories, "selected_category_slug": selected_category_slug, "service_categories": service_categories, "service_comment_html": service_comment_html, "csrf_token": request.cookies.get(CSRF_COOKIE, ""), "session_notice": session_notice, }, ) @app.get("/admin", response_class=HTMLResponse) def admin_page(request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)): users = db.scalars(select(User).order_by(User.id)).all() categories = db.scalars(select(Category).order_by(Category.name)).all() services = db.scalars(select(Service).where(Service.type.in_([ServiceType.WEB, ServiceType.RDP])).order_by(Service.id)).all() web_services = [s for s in services if s.type == ServiceType.WEB] rdp_services = [s for s in services if s.type == ServiceType.RDP] service_category_map = {s.id: [] for s in services} if services: service_rows = db.execute( select(ServiceCategory.service_id, ServiceCategory.category_id).where( ServiceCategory.service_id.in_([s.id for s in services]) ) ).all() for service_id, category_id in service_rows: service_category_map.setdefault(service_id, []).append(category_id) acl_rows = db.scalars(select(UserServiceAccess)).all() acl = {} for row in acl_rows: acl.setdefault(row.user_id, []).append(row.service_id) for user_id in acl: acl[user_id] = sorted(acl[user_id]) pool_status = {s.id: get_pool_status_for_service(s) for s in services} service_health = {} for sid, st in pool_status.items(): service_health[sid] = { "health": st["health"], "running": st["running"], "desired": st["desired"], "active_sessions": get_active_sessions_count(db, sid), } web_pool = get_web_pool_status() web_totals = { "services": len(web_services), "running": web_pool["running"], "desired": web_pool["desired"], "active_sessions": sum(service_health[s.id]["active_sessions"] for s in web_services), } recent_sessions = db.execute( text( """ SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug, s.status, s.created_at, s.last_access_at FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE sv.type IN ('WEB','RDP') ORDER BY s.created_at DESC LIMIT 200 """ ) ).mappings().all() open_stats = db.execute( text( """ SELECT u.username, sv.name AS service_name, sv.slug AS service_slug, COUNT(*) AS opens FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE sv.type IN ('WEB','RDP') GROUP BY u.username, sv.name, sv.slug ORDER BY opens DESC, u.username ASC LIMIT 200 """ ) ).mappings().all() cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) online_sessions = db.execute( text( """ SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug, sv.type AS service_type, s.container_id, s.created_at, s.last_access_at FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE s.status = 'ACTIVE' AND s.last_access_at >= :cutoff AND sv.type IN ('WEB','RDP') ORDER BY s.last_access_at DESC, s.created_at DESC LIMIT 500 """ ), {"cutoff": cutoff}, ).mappings().all() rdp_slots: dict[int, list] = {} for svc in rdp_services: slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == svc.id).order_by(RdpSlot.id)).all() slot_list = [] for slot in slots: active_sess = db.scalar( select(SessionModel).where( SessionModel.container_id == f"RDPSLOT:{slot.id}", SessionModel.status == SessionStatus.ACTIVE, ) ) running = False if slot.container_name: try: c = docker_client().containers.get(slot.container_name) running = c.status == "running" except Exception: pass occupied_username = None if active_sess: u = db.get(User, active_sess.user_id) occupied_username = u.username if u else f"id={active_sess.user_id}" slot_list.append({ "id": slot.id, "rdp_username": slot.rdp_username, "container_name": slot.container_name or "", "running": running, "occupied_username": occupied_username, }) rdp_slots[svc.id] = slot_list return templates.TemplateResponse( "admin.html", { "request": request, "admin": admin, "users": users, "web_services": web_services, "rdp_services": rdp_services, "services": services, "categories": categories, "service_category_map": service_category_map, "acl": acl, "pool_status": pool_status, "service_health": service_health, "web_totals": web_totals, "web_pool_size": WEB_POOL_SIZE, "web_pool_buffer": WEB_POOL_BUFFER, "recent_sessions": recent_sessions, "open_stats": open_stats, "online_sessions": online_sessions, "csrf_token": request.cookies.get(CSRF_COOKIE, ""), "max_active_services_per_user": MAX_ACTIVE_SERVICES_PER_USER, "rdp_slots": rdp_slots, }, ) @app.post("/login") def login( request: Request, username: str = Form(...), password: str = Form(...), csrf_token: str = Form(...), db: Session = Depends(get_db), ): cookie_csrf = request.cookies.get(CSRF_COOKIE) if not cookie_csrf or csrf_token != cookie_csrf: raise HTTPException(status_code=403, detail="CSRF failed") user = db.scalar(select(User).where(User.username == username)) if not user or not verify_password(password, user.password_hash): csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) response = templates.TemplateResponse( "login.html", { "request": request, "csrf_token": csrf, "login_error": "Неверный логин или пароль", "session_notice": "", }, status_code=401, ) response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/") return response if not user_is_valid(user): csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) response = templates.TemplateResponse( "login.html", { "request": request, "csrf_token": csrf, "login_error": "Доступ к сервису приостоновлен, обратитесь к вашему менеджеру", }, status_code=403, ) response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/") return response response = RedirectResponse(url="/", status_code=303) issue_auth_cookie(response, user) issue_csrf_cookie(response) audit(db, "LOGIN", f"login success: {username}", user_id=user.id) return response @app.post("/logout") def logout(request: Request): response = RedirectResponse(url="/", status_code=303) response.delete_cookie(COOKIE_NAME, path="/") response.delete_cookie(CSRF_COOKIE, path="/") return response @app.get("/go/{slug}") def go_service( slug: str, sw: Optional[int] = Query(default=None, ge=320, le=7680), sh: Optional[int] = Query(default=None, ge=240, le=4320), user: User = Depends(require_user), db: Session = Depends(get_db), ): total_started = time.perf_counter() phase_ms = {} def _mark(name: str, started: float) -> None: phase_ms[name] = int((time.perf_counter() - started) * 1000) def _emit(result: str, **extra) -> None: payload = { "user_id": user.id, "service_slug": slug, "result": result, "total_ms": int((time.perf_counter() - total_started) * 1000), } payload.update(phase_ms) payload.update(extra) log_event("go_service_timing", **payload) log_event("session_open_requested", user_id=user.id, service_slug=slug, sw=sw, sh=sh) service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True)) if not service: raise HTTPException(status_code=404, detail="Service not found") if service.type == ServiceType.VNC: raise HTTPException(status_code=410, detail="VNC services are deprecated") if not has_access(db, user.id, service.id): raise HTTPException(status_code=403, detail="ACL denied") client_width, client_height = sanitize_client_resolution(sw, sh) log_event( "session_open_resolution", user_id=user.id, service_slug=slug, sw=sw, sh=sh, client_width=client_width, client_height=client_height, ) user_lock_started = time.perf_counter() try: with allocator_lock(db, 92000 + int(user.id), timeout_seconds=GO_USER_LOCK_TIMEOUT_SECONDS): _mark("wait_user_lock_ms", user_lock_started) t_existing = time.perf_counter() existing_user_session = find_active_session_for_user_service(db, user.id, service.id) _mark("check_existing_ms", t_existing) if existing_user_session: _emit("reuse_session", session_id=existing_user_session.id) return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303) t_limit = time.perf_counter() cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) active_rows = db.scalars( select(SessionModel).where( SessionModel.user_id == user.id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) ).all() active_rows = sorted(active_rows, key=lambda row: row.created_at) active_service_ids = {row.service_id for row in active_rows} _mark("check_limit_ms", t_limit) if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER: oldest = next((row for row in active_rows if row.service_id != service.id), None) if oldest: t_rotate = time.perf_counter() terminate_session_record(db, oldest, SessionStatus.ROTATED, stop_container=True) db.commit() _mark("rotate_oldest_ms", t_rotate) log_event( "session_rotated", user_id=user.id, closed_session_id=oldest.id, closed_service_id=oldest.service_id, new_service_id=service.id, ) else: _emit("max_services_redirect") return RedirectResponse(url="/?launch_error=max_services", status_code=303) if service.type == ServiceType.RDP: t_rdp_slots = time.perf_counter() slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == service.id)).all() _mark("check_rdp_slots_ms", t_rdp_slots) if slots: session_id = str(uuid.uuid4()) try: with allocator_lock(db, 91003, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS): busy_slot_ids: set[int] = set() for row in db.scalars( select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.service_id == service.id, SessionModel.container_id.like("RDPSLOT:%"), ) ).all(): try: busy_slot_ids.add(int(row.container_id.split(":", 1)[1])) except Exception: pass free_slot = next((s for s in slots if s.id not in busy_slot_ids), None) if not free_slot: _emit("rdp_all_slots_busy") raise HTTPException( status_code=503, detail="Все слоты этого RDP сервиса заняты. Попробуйте позже.", ) session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=f"RDPSLOT:{free_slot.id}", status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() except LockTimeoutError: _emit("rdp_slot_lock_timeout") raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.") log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="rdp_slot", slot_id=free_slot.id) audit(db, "SESSION_CREATE_RDP_SLOT", f"service={service.slug} session={session_id} slot={free_slot.id}", user_id=user.id) _emit("session_created_rdp_slot", session_id=session_id, slot_id=free_slot.id) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) else: # Legacy: no slots configured — exclusive single-session behaviour active_owner = find_active_session_for_service(db, service.id) if active_owner: if active_owner.user_id != user.id: _emit("rdp_busy_legacy") raise HTTPException(status_code=503, detail="RDP сервис занят. Попробуйте позже.") _emit("reuse_rdp_session", session_id=active_owner.id) return RedirectResponse(url=session_redirect_url(active_owner), status_code=303) session_id = str(uuid.uuid4()) if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0: try: t_pool_lock = time.perf_counter() with allocator_lock(db, 91001, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS): _mark("wait_web_pool_lock_ms", t_pool_lock) t_ensure = time.perf_counter() ensure_web_pool() _mark("ensure_web_pool_ms", t_ensure) t_acquire = time.perf_counter() slot = acquire_web_pool_slot(db) _mark("acquire_web_slot_ms", t_acquire) slot_cid = f"WEBPOOLIDX:{slot}" t_dispatch = time.perf_counter() terminate_active_slot_sessions(db, slot_cid) dispatch_web_pool_target(slot, service, width=client_width, height=client_height) _mark("dispatch_web_target_ms", t_dispatch) t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=slot_cid, status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) except LockTimeoutError: _emit("web_pool_lock_timeout") raise HTTPException(status_code=503, detail="Пул WEB занят. Повторите через несколько секунд.") except Exception as exc: logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id) log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="web_pool", error=str(exc)) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) _emit("web_pool_create_failed", error=str(exc)) raise HTTPException(status_code=502, detail="WEB runtime failed to switch target") log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="web_pool", slot=slot) audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id) _emit("session_created_web_pool", session_id=session_id, slot=slot) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) if service_uses_universal_pool(service): try: t_pool_lock = time.perf_counter() with allocator_lock(db, 91002, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS): _mark("wait_universal_pool_lock_ms", t_pool_lock) t_ensure = time.perf_counter() ensure_universal_pool() _mark("ensure_universal_pool_ms", t_ensure) t_acquire = time.perf_counter() slot = acquire_universal_slot(db) _mark("acquire_universal_slot_ms", t_acquire) slot_cid = f"POOLIDX:{slot}" t_dispatch = time.perf_counter() terminate_active_slot_sessions(db, slot_cid) dispatch_universal_target(slot, service, width=client_width, height=client_height) _mark("dispatch_universal_target_ms", t_dispatch) t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=slot_cid, status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) except LockTimeoutError: _emit("universal_pool_lock_timeout") raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.") except Exception as exc: logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id) log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="universal_pool", error=str(exc)) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) _emit("universal_pool_create_failed", error=str(exc)) raise HTTPException(status_code=502, detail="Universal runtime failed to switch target") log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="universal_pool", slot=slot) audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id) _emit("session_created_universal_pool", session_id=session_id, slot=slot) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) if service.type == ServiceType.WEB and desired_pool_size(service) > 0: t_warm = time.perf_counter() ensure_warm_pool(service) open_warm_web_url(service, service.target) _mark("warm_pool_prepare_ms", t_warm) t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=f"POOL:{service.slug}", status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="warm_pool") audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id) _emit("session_created_warm_pool", session_id=session_id) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) try: t_create = time.perf_counter() container_id = create_runtime_container(service, session_id) _mark("create_runtime_container_ms", t_create) except Exception as exc: logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id) log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="single_runtime", error=str(exc)) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) _emit("single_runtime_create_failed", error=str(exc)) raise HTTPException(status_code=502, detail="Session runtime failed to start") t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=container_id, status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="single_runtime", container_id=container_id) audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id) t_wait = time.perf_counter() ready = wait_for_session_route(session_id) _mark("wait_session_route_ms", t_wait) log_event("session_route_ready", session_id=session_id, ready=ready) _emit("session_created_single_runtime", session_id=session_id, ready=ready) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) except LockTimeoutError: _emit("user_lock_timeout") raise HTTPException(status_code=429, detail="Слишком много параллельных запусков. Повторите через несколько секунд.") @app.get("/svc/{slug}/", response_class=HTMLResponse) def service_wait_page(slug: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)): service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True)) if not service: raise HTTPException(status_code=404, detail="Service not found") if not has_access(db, user.id, service.id): raise HTTPException(status_code=403, detail="ACL denied") return HTMLResponse( content="""