import datetime as dt import enum import logging import os from pathlib import Path import secrets import threading import time import uuid from urllib.parse import parse_qs, unquote, urlparse from typing import Optional import docker import requests from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status from fastapi.responses import HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from itsdangerous import BadSignature, URLSafeTimedSerializer from passlib.context import CryptContext from sqlalchemy import ( Boolean, DateTime, Enum, ForeignKey, Integer, String, Text, UniqueConstraint, create_engine, select, text, ) from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, sessionmaker DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+psycopg2://portal:portal@db:5432/portal") COOKIE_NAME = "portal_auth" CSRF_COOKIE = "csrf_token" COOKIE_MAX_AGE = 8 * 60 * 60 SESSION_IDLE_SECONDS = int(os.getenv("SESSION_IDLE_SECONDS", "1800")) PUBLIC_HOST = os.getenv("PUBLIC_HOST", "stend.4mont.ru") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() TRAEFIK_INTERNAL_URL = os.getenv("TRAEFIK_INTERNAL_URL", "http://traefik") PREWARM_POOL_SIZE = int(os.getenv("PREWARM_POOL_SIZE", "0")) UNIVERSAL_POOL_SIZE = int(os.getenv("UNIVERSAL_POOL_SIZE", "5")) WEB_POOL_SIZE = int(os.getenv("WEB_POOL_SIZE", "5")) WEB_POOL_BUFFER = int(os.getenv("WEB_POOL_BUFFER", "2")) ICON_UPLOAD_MAX_BYTES = 2 * 1024 * 1024 ICON_UPLOAD_TYPES = { "image/png": "png", "image/jpeg": "jpg", "image/webp": "webp", } SERVICE_ICONS_DIR = Path("static/service-icons") logging.basicConfig( level=LOG_LEVEL, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) logger = logging.getLogger("portal") SIGNING_KEY = os.getenv("SIGNING_KEY", secrets.token_urlsafe(32)) serializer = URLSafeTimedSerializer(SIGNING_KEY, salt="portal-auth") pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") engine = create_engine(DATABASE_URL, pool_pre_ping=True) SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False) templates = Jinja2Templates(directory="templates") app = FastAPI(title="МОНТ - инфра полигон") app.mount("/static", StaticFiles(directory="static"), name="static") @app.middleware("http") async def request_logging_middleware(request: Request, call_next): req_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8]) started = time.time() try: response = await call_next(request) except Exception: logger.exception("request_failed req_id=%s method=%s path=%s", req_id, request.method, request.url.path) raise duration_ms = int((time.time() - started) * 1000) logger.info( "request req_id=%s method=%s path=%s status=%s duration_ms=%s", req_id, request.method, request.url.path, response.status_code, duration_ms, ) response.headers["X-Request-ID"] = req_id return response class Base(DeclarativeBase): pass class ServiceType(str, enum.Enum): WEB = "WEB" VNC = "VNC" RDP = "RDP" class SessionStatus(str, enum.Enum): ACTIVE = "ACTIVE" EXPIRED = "EXPIRED" TERMINATED = "TERMINATED" class User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(Integer, primary_key=True) username: Mapped[str] = mapped_column(String(64), unique=True, index=True) password_hash: Mapped[str] = mapped_column(String(255)) expires_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), index=True) active: Mapped[bool] = mapped_column(Boolean, default=True, index=True) is_admin: Mapped[bool] = mapped_column(Boolean, default=False) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc)) class Service(Base): __tablename__ = "services" id: Mapped[int] = mapped_column(Integer, primary_key=True) name: Mapped[str] = mapped_column(String(128)) slug: Mapped[str] = mapped_column(String(64), unique=True, index=True) type: Mapped[ServiceType] = mapped_column(Enum(ServiceType), index=True) target: Mapped[str] = mapped_column(Text) comment: Mapped[str] = mapped_column(Text, default="") icon_path: Mapped[str] = mapped_column(Text, default="") active: Mapped[bool] = mapped_column(Boolean, default=True) warm_pool_size: Mapped[int] = mapped_column(Integer, default=0) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc)) class UserServiceAccess(Base): __tablename__ = "user_service_access" __table_args__ = (UniqueConstraint("user_id", "service_id", name="uq_user_service"),) id: Mapped[int] = mapped_column(Integer, primary_key=True) user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True) service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True) granted_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc)) class SessionModel(Base): __tablename__ = "sessions" id: Mapped[str] = mapped_column(String(36), primary_key=True) user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True) service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True) status: Mapped[SessionStatus] = mapped_column(Enum(SessionStatus), default=SessionStatus.ACTIVE, index=True) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True) last_access_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True) container_id: Mapped[Optional[str]] = mapped_column(String(128), nullable=True) class AuditLog(Base): __tablename__ = "audit_logs" id: Mapped[int] = mapped_column(Integer, primary_key=True) user_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, index=True) action: Mapped[str] = mapped_column(String(128), index=True) details: Mapped[str] = mapped_column(Text) created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True) def now_utc() -> dt.datetime: return dt.datetime.now(dt.timezone.utc) def normalize_web_target(url: str) -> str: raw = (url or "").strip() if not raw: return raw if raw.startswith(("http://", "https://")): return raw return f"http://{raw}" def parse_rdp_target(target: str) -> dict: raw = (target or "").strip() if not raw: raise HTTPException(status_code=400, detail="Empty RDP target") parsed = urlparse(raw if "://" in raw else f"//{raw}") host = parsed.hostname if not host: raise HTTPException(status_code=400, detail="Invalid RDP target. Use host:port or rdp://user:pass@host:port") port = parsed.port or 3389 username = unquote(parsed.username) if parsed.username else "" password = unquote(parsed.password) if parsed.password else "" query = parse_qs(parsed.query or "") if not username: username = (query.get("u", [""])[0] or query.get("user", [""])[0] or "").strip() if not password: password = (query.get("p", [""])[0] or query.get("password", [""])[0] or "").strip() domain = (query.get("domain", [""])[0] or query.get("d", [""])[0] or "").strip() security = (query.get("sec", [""])[0] or query.get("security", [""])[0] or "").strip().lower() if security and security not in {"nla", "tls", "rdp"}: raise HTTPException(status_code=400, detail="Invalid RDP security. Use one of: nla, tls, rdp") return { "host": host, "port": str(port), "user": username, "password": password, "domain": domain, "security": security, } def service_uses_universal_pool(service: Service) -> bool: return UNIVERSAL_POOL_SIZE > 0 and service.type == ServiceType.RDP def universal_container_name(slot: int) -> str: return f"portal-universal-{slot}" def web_pool_container_name(slot: int) -> str: return f"portal-webpool-{slot}" def ensure_icons_dir() -> None: SERVICE_ICONS_DIR.mkdir(parents=True, exist_ok=True) def remove_icon_file(icon_path: str) -> None: if not icon_path or not icon_path.startswith("/static/service-icons/"): return filename = icon_path.rsplit("/", 1)[-1] candidate = SERVICE_ICONS_DIR / filename try: candidate.unlink(missing_ok=True) except Exception: logger.exception("icon_delete_failed path=%s", candidate) async def store_service_icon(service: Service, upload: UploadFile) -> str: ensure_icons_dir() content_type = (upload.content_type or "").lower().strip() ext = ICON_UPLOAD_TYPES.get(content_type) if not ext: raise HTTPException(status_code=400, detail="Unsupported file type. Use PNG/JPG/WEBP") payload = await upload.read(ICON_UPLOAD_MAX_BYTES + 1) if len(payload) > ICON_UPLOAD_MAX_BYTES: raise HTTPException(status_code=400, detail="File too large. Max 2MB") if not payload: raise HTTPException(status_code=400, detail="Empty file") stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%d_%H%M%S") filename = f"svc_{service.id}_{stamp}.{ext}" target = SERVICE_ICONS_DIR / filename target.write_bytes(payload) return f"/static/service-icons/{filename}" def get_db(): db = SessionLocal() try: yield db finally: db.close() def audit(db: Session, action: str, details: str, user_id: Optional[int] = None) -> None: db.add(AuditLog(user_id=user_id, action=action, details=details)) db.commit() def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(password: str, password_hash: str) -> bool: return pwd_context.verify(password, password_hash) def user_is_valid(user: User) -> bool: return bool(user.active and user.expires_at > now_utc()) def issue_auth_cookie(response: RedirectResponse, user: User) -> None: token = serializer.dumps({"user_id": user.id}) response.set_cookie( key=COOKIE_NAME, value=token, httponly=True, secure=True, samesite="strict", max_age=COOKIE_MAX_AGE, path="/", ) def issue_csrf_cookie(response: RedirectResponse) -> str: token = secrets.token_urlsafe(24) response.set_cookie( key=CSRF_COOKIE, value=token, httponly=False, secure=True, samesite="strict", max_age=COOKIE_MAX_AGE, path="/", ) return token def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]: raw = request.cookies.get(COOKIE_NAME) if not raw: return None try: payload = serializer.loads(raw, max_age=COOKIE_MAX_AGE) except BadSignature: return None user = db.get(User, int(payload["user_id"])) if not user or not user_is_valid(user): return None return user def require_user(user: Optional[User] = Depends(get_current_user)) -> User: if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") return user def require_admin(user: User = Depends(require_user)) -> User: if not user.is_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin only") return user def validate_csrf(request: Request) -> None: cookie = request.cookies.get(CSRF_COOKIE) form_val = request.headers.get("X-CSRF-Token") if request.headers.get("content-type", "").startswith("application/x-www-form-urlencoded"): return if not cookie or not form_val or cookie != form_val: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="CSRF failed") def has_access(db: Session, user_id: int, service_id: int) -> bool: q = select(UserServiceAccess).where( UserServiceAccess.user_id == user_id, UserServiceAccess.service_id == service_id, ) return db.scalar(q) is not None def docker_client(): return docker.from_env() def session_router_name(session_id: str) -> str: return f"sess-{session_id.replace('-', '')[:16]}" def ensure_universal_pool() -> None: if UNIVERSAL_POOL_SIZE <= 0: return d = docker_client() image = "portal-universal-runtime:latest" for i in range(UNIVERSAL_POOL_SIZE, 100): name = universal_container_name(i) try: c = d.containers.get(name) c.stop(timeout=5) except docker.errors.NotFound: break except Exception: logger.exception("universal_pool_scale_down_failed slot=%s", i) for i in range(UNIVERSAL_POOL_SIZE): name = universal_container_name(i) path = f"/u/{i}/" router = f"upool-{i}" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "9400", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", "portal.pool": "1", "portal.pool.slot": str(i), } env = { "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "0", "SESSION_ID": f"universal-{i}", } try: c = d.containers.get(name) if c.status != "running": c.start() continue except docker.errors.NotFound: pass except Exception: logger.exception("universal_pool_check_failed slot=%s", i) continue d.containers.run( image=image, name=name, detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("universal_pool_container_started slot=%s", i) def ensure_web_pool(target_size: Optional[int] = None) -> None: desired = max(0, WEB_POOL_SIZE if target_size is None else target_size) d = docker_client() image = "portal-universal-runtime:latest" for i in range(desired, 100): name = web_pool_container_name(i) try: c = d.containers.get(name) c.stop(timeout=5) except docker.errors.NotFound: break except Exception: logger.exception("web_pool_scale_down_failed slot=%s", i) for i in range(desired): name = web_pool_container_name(i) path = f"/w/{i}/" router = f"wpool-{i}" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "9450", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", "portal.pool": "1", "portal.pool.kind": "web", "portal.pool.slot": str(i), } env = { "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "0", "SESSION_ID": f"webpool-{i}", } try: c = d.containers.get(name) if c.status != "running": c.start() continue except docker.errors.NotFound: pass except Exception: logger.exception("web_pool_check_failed slot=%s", i) continue d.containers.run( image=image, name=name, detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("web_pool_container_started slot=%s", i) def get_universal_pool_status() -> dict: desired = max(0, UNIVERSAL_POOL_SIZE) if desired <= 0: return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []} d = docker_client() names = [universal_container_name(i) for i in range(desired)] containers = [] for name in names: try: containers.append(d.containers.get(name)) except Exception: continue running = sum(1 for c in containers if c.status == "running") health = "ok" if running >= min(desired, 1) else "down" return { "desired": desired, "running": running, "total": len(containers), "names": sorted(c.name for c in containers), "health": health, } def get_web_pool_status() -> dict: desired = max(0, WEB_POOL_SIZE) if desired <= 0: return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []} d = docker_client() names = [web_pool_container_name(i) for i in range(desired)] containers = [] for name in names: try: containers.append(d.containers.get(name)) except Exception: continue running = sum(1 for c in containers if c.status == "running") health = "ok" if running >= min(desired, 1) else "down" return { "desired": desired, "running": running, "total": len(containers), "names": sorted(c.name for c in containers), "health": health, } def acquire_universal_slot(db: Session) -> int: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.container_id.like("POOLIDX:%"), SessionModel.last_access_at >= cutoff, ) active = db.scalars(q).all() busy = set() for sess in active: try: busy.add(int((sess.container_id or "").split(":", 1)[1])) except Exception: continue for i in range(max(0, UNIVERSAL_POOL_SIZE)): if i not in busy: return i if active: victim = min(active, key=lambda s: s.last_access_at) victim.status = SessionStatus.TERMINATED db.commit() try: return int((victim.container_id or "").split(":", 1)[1]) except Exception: pass return 0 def acquire_web_pool_slot(db: Session) -> int: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.container_id.like("WEBPOOLIDX:%"), SessionModel.last_access_at >= cutoff, ) active = db.scalars(q).all() busy = set() for sess in active: try: busy.add(int((sess.container_id or "").split(":", 1)[1])) except Exception: continue # Keep headroom: when active sessions are close to hot pool capacity, # proactively warm up extra slots. auto_target = max(WEB_POOL_SIZE, len(active) + max(0, WEB_POOL_BUFFER)) if auto_target > WEB_POOL_SIZE: ensure_web_pool(auto_target) for i in range(max(0, auto_target)): if i not in busy: return i return 0 def dispatch_universal_target(slot: int, service: Service) -> None: name = universal_container_name(slot) url = "" payload = {} if service.type == ServiceType.WEB: url = f"http://{name}:7000/open" payload = {"url": normalize_web_target(service.target)} elif service.type == ServiceType.RDP: cfg = parse_rdp_target(service.target) url = f"http://{name}:7000/rdp" payload = { "host": cfg["host"], "port": cfg["port"], "user": cfg["user"], "password": cfg["password"], "domain": cfg["domain"], "security": cfg["security"], } else: raise HTTPException(status_code=400, detail="Universal pool supports WEB/RDP only") last_exc = None for _ in range(8): try: resp = requests.post(url, json=payload, timeout=3) resp.raise_for_status() return except Exception as exc: last_exc = exc time.sleep(0.4) if last_exc: raise last_exc def dispatch_web_pool_target(slot: int, service: Service) -> None: name = web_pool_container_name(slot) target_url = normalize_web_target(service.target) url = f"http://{name}:7000/open" last_exc = None for _ in range(8): try: resp = requests.post(url, json={"url": target_url}, timeout=3) resp.raise_for_status() return except Exception as exc: last_exc = exc time.sleep(0.4) if last_exc: raise last_exc def create_runtime_container(service: Service, session_id: str): d = docker_client() router = session_router_name(session_id) path = f"/s/{session_id}/" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "10000", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", } env = { "SESSION_ID": session_id, "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "1", "TOUCH_PATH": f"/api/sessions/{session_id}/touch", } image = "portal-kiosk:latest" if service.type == ServiceType.WEB: env["TARGET_URL"] = service.target env["HOME_URL"] = f"https://{PUBLIC_HOST}/" elif service.type == ServiceType.RDP: image = "portal-rdp-proxy:latest" cfg = parse_rdp_target(service.target) env["RDP_HOST"] = cfg["host"] env["RDP_PORT"] = cfg["port"] if cfg["user"]: env["RDP_USER"] = cfg["user"] if cfg["password"]: env["RDP_PASSWORD"] = cfg["password"] if cfg["domain"]: env["RDP_DOMAIN"] = cfg["domain"] if cfg["security"]: env["RDP_SECURITY"] = cfg["security"] else: raise HTTPException(status_code=400, detail="Unsupported service type") container = d.containers.run( image=image, name=f"portal-sess-{session_id[:8]}", detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("session_container_started session_id=%s container_id=%s service_type=%s", session_id, container.id, service.type.value) return container.id def ensure_warm_pool(service: Service, pool_size: Optional[int] = None) -> None: if service_uses_universal_pool(service): return if pool_size is None: pool_size = desired_pool_size(service) if pool_size <= 0: # Stop stale warm containers for this service when pool is disabled. prefix = f"portal-warm-{service.slug}-" try: d = docker_client() for c in d.containers.list(all=True, filters={"name": prefix}): if c.name.startswith(prefix): c.stop(timeout=5) except Exception: logger.exception("warm_pool_disable_failed service=%s", service.slug) return d = docker_client() router = f"warm-{service.slug}" svc_name = f"warmsvc-{service.slug}" path = f"/svc/{service.slug}/" image = "portal-kiosk:latest" base_env = { "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "0", "TOUCH_PATH": "", } if service.type == ServiceType.WEB: base_env["UNIVERSAL_WEB"] = "1" base_env["START_URL"] = normalize_web_target(service.target) base_env["HOME_URL"] = f"https://{PUBLIC_HOST}/" elif service.type == ServiceType.RDP: image = "portal-rdp-proxy:latest" cfg = parse_rdp_target(service.target) base_env["RDP_HOST"] = cfg["host"] base_env["RDP_PORT"] = cfg["port"] if cfg["user"]: base_env["RDP_USER"] = cfg["user"] if cfg["password"]: base_env["RDP_PASSWORD"] = cfg["password"] if cfg["domain"]: base_env["RDP_DOMAIN"] = cfg["domain"] if cfg["security"]: base_env["RDP_SECURITY"] = cfg["security"] else: raise HTTPException(status_code=400, detail="Unsupported service type") labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "9500", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{svc_name}.loadbalancer.server.port": "6080", f"traefik.http.routers.{router}.service": svc_name, "portal.warm": "1", "portal.service.slug": service.slug, "portal.service.type": service.type.value, } # Ensure desired cardinality. for i in range(pool_size, 50): name = f"portal-warm-{service.slug}-{i}" try: c = d.containers.get(name) c.stop(timeout=5) except docker.errors.NotFound: break except Exception: logger.exception("warm_pool_scale_down_failed service=%s idx=%s", service.slug, i) for i in range(pool_size): name = f"portal-warm-{service.slug}-{i}" try: c = d.containers.get(name) if c.status != "running": c.start() continue except docker.errors.NotFound: pass except Exception: logger.exception("warm_pool_check_failed service=%s idx=%s", service.slug, i) continue env = dict(base_env) env["SESSION_ID"] = f"warm-{service.slug}-{i}" d.containers.run( image=image, name=name, detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("warm_pool_container_started service=%s idx=%s", service.slug, i) def wait_for_session_route(session_id: str, timeout_seconds: int = 6) -> bool: target = f"{TRAEFIK_INTERNAL_URL}/s/{session_id}/" deadline = time.time() + timeout_seconds while time.time() < deadline: try: resp = requests.get( target, headers={"Host": PUBLIC_HOST}, allow_redirects=False, timeout=1.5, ) if resp.status_code != 404: return True except Exception: pass time.sleep(0.3) return False def route_ready(path: str) -> bool: bases = [TRAEFIK_INTERNAL_URL] if TRAEFIK_INTERNAL_URL.startswith("http://"): bases.append("https://" + TRAEFIK_INTERNAL_URL[len("http://"):]) for base in bases: try: verify = not base.startswith("https://") resp = requests.get( f"{base}{path}", headers={"Host": PUBLIC_HOST}, allow_redirects=False, timeout=1.5, verify=verify, ) if resp.status_code != 404: return True except Exception: continue return False def container_running(container_id: Optional[str]) -> bool: if not container_id: return False if ( container_id.startswith("POOL:") or container_id.startswith("POOLIDX:") or container_id.startswith("WEBPOOLIDX:") ): return True try: c = docker_client().containers.get(container_id) return c.status == "running" except Exception: return False def stop_runtime_container(container_id: Optional[str]) -> None: if not container_id: return try: d = docker_client() c = d.containers.get(container_id) c.stop(timeout=5) except Exception: logger.exception("session_container_stop_failed container_id=%s", container_id) def ensure_schema_compatibility() -> None: # PostgreSQL requires enum value addition to be committed before usage in constraints. with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: conn.execute( text( """ DO $$ BEGIN BEGIN ALTER TYPE servicetype ADD VALUE IF NOT EXISTS 'RDP'; EXCEPTION WHEN undefined_object THEN NULL; END; END $$; """ ) ) with engine.begin() as conn: conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS warm_pool_size INTEGER NOT NULL DEFAULT 0")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS comment TEXT NOT NULL DEFAULT ''")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS icon_path TEXT NOT NULL DEFAULT ''")) # Handle installs where service type is VARCHAR + CHECK. conn.execute( text( """ DO $$ DECLARE c record; BEGIN FOR c IN SELECT conname FROM pg_constraint WHERE conrelid = 'services'::regclass AND contype = 'c' AND pg_get_constraintdef(oid) ILIKE '%type%' LOOP EXECUTE format('ALTER TABLE services DROP CONSTRAINT %I', c.conname); END LOOP; ALTER TABLE services ADD CONSTRAINT services_type_check CHECK (type IN ('WEB','VNC','RDP')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; """ ) ) def desired_pool_size(service: Service) -> int: if not service.active: return 0 if service_uses_universal_pool(service): return UNIVERSAL_POOL_SIZE return service.warm_pool_size if service.warm_pool_size and service.warm_pool_size > 0 else PREWARM_POOL_SIZE def get_warm_containers_for_service(service: Service) -> list: prefix = f"portal-warm-{service.slug}-" try: d = docker_client() containers = [] for c in d.containers.list(all=True, filters={"name": prefix}): if c.name.startswith(prefix): containers.append(c) return containers except Exception: logger.exception("pool_status_failed service=%s", service.slug) return [] def get_pool_status_for_service(service: Service) -> dict: if service.type == ServiceType.WEB: return get_web_pool_status() if service_uses_universal_pool(service): return get_universal_pool_status() desired = desired_pool_size(service) containers = get_warm_containers_for_service(service) running = sum(1 for c in containers if c.status == "running") states = [(c.attrs.get("State") or {}).get("Status", c.status) for c in containers] has_bad = any(s in {"exited", "dead"} for s in states) total = len(containers) if running == 0: health = "down" elif running >= min(desired, 1) and not has_bad: health = "ok" else: health = "degraded" return { "desired": desired, "running": running, "total": total, "names": sorted(c.name for c in containers), "health": health, } def get_pool_detailed_status(service: Service) -> dict: if service.type == ServiceType.WEB: d = docker_client() pool = get_web_pool_status() details = [] for i in range(max(0, pool["desired"])): name = web_pool_container_name(i) try: c = d.containers.get(name) except Exception: continue attrs = c.attrs or {} state = (attrs.get("State") or {}).get("Status", c.status) details.append( { "name": c.name, "status": c.status, "state": state, "created": attrs.get("Created", ""), "image": c.image.tags[0] if c.image.tags else "", "labels_ok": True, } ) return { "service_id": service.id, "slug": service.slug, "type": service.type.value, "desired": pool["desired"], "running": pool["running"], "total": pool["total"], "health": pool["health"], "containers": details, "updated_at": now_utc().isoformat(), } if service_uses_universal_pool(service): d = docker_client() pool = get_universal_pool_status() details = [] for i in range(max(0, UNIVERSAL_POOL_SIZE)): name = universal_container_name(i) try: c = d.containers.get(name) except Exception: continue attrs = c.attrs or {} state = (attrs.get("State") or {}).get("Status", c.status) details.append( { "name": c.name, "status": c.status, "state": state, "created": attrs.get("Created", ""), "image": c.image.tags[0] if c.image.tags else "", "labels_ok": True, } ) return { "service_id": service.id, "slug": service.slug, "type": service.type.value, "desired": pool["desired"], "running": pool["running"], "total": pool["total"], "health": pool["health"], "containers": details, "updated_at": now_utc().isoformat(), } containers = get_warm_containers_for_service(service) pool = get_pool_status_for_service(service) details = [] for c in sorted(containers, key=lambda x: x.name): attrs = c.attrs or {} state = (attrs.get("State") or {}).get("Status", c.status) created = attrs.get("Created", "") labels = attrs.get("Config", {}).get("Labels", {}) or {} labels_ok = ( labels.get("portal.warm") == "1" and labels.get("portal.service.slug") == service.slug and labels.get("portal.service.type") == service.type.value ) details.append( { "name": c.name, "status": c.status, "state": state, "created": created, "image": c.image.tags[0] if c.image.tags else "", "labels_ok": labels_ok, } ) return { "service_id": service.id, "slug": service.slug, "type": service.type.value, "desired": pool["desired"], "running": pool["running"], "total": pool["total"], "health": pool["health"], "containers": details, "updated_at": now_utc().isoformat(), } def get_active_sessions_count(db: Session, service_id: int) -> int: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.service_id == service_id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) return len(db.scalars(q).all()) def open_warm_web_url(service: Service, target_url: str) -> None: if service_uses_universal_pool(service): return if service.type != ServiceType.WEB: return target_url = normalize_web_target(target_url) try: d = docker_client() containers = d.containers.list( filters={ "label": [ "portal.warm=1", f"portal.service.slug={service.slug}", "portal.service.type=WEB", ] } ) for c in containers: try: resp = requests.post( f"http://{c.name}:7000/open", json={"url": target_url}, timeout=2, ) resp.raise_for_status() logger.info("warm_web_open_ok service=%s container=%s url=%s", service.slug, c.name, target_url) except Exception: logger.exception("warm_web_open_failed service=%s container=%s", service.slug, c.name) except Exception: logger.exception("warm_web_open_dispatch_failed service=%s", service.slug) def cleanup_loop(): while True: time.sleep(60) db = SessionLocal() try: ensure_universal_pool() ensure_web_pool() for svc in db.scalars( select(Service).where( Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) ).all(): if svc.type == ServiceType.WEB: continue if not service_uses_universal_pool(svc): ensure_warm_pool(svc) cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at < cutoff, ) stale = db.scalars(q).all() for sess in stale: if sess.container_id and not ( sess.container_id.startswith("POOL:") or sess.container_id.startswith("POOLIDX:") or sess.container_id.startswith("WEBPOOLIDX:") ): stop_runtime_container(sess.container_id) sess.status = SessionStatus.EXPIRED if stale: db.commit() except Exception: db.rollback() logger.exception("cleanup_loop_failed") finally: db.close() def bootstrap_admin(): admin_user = os.getenv("ADMIN_USERNAME", "admin") admin_password = os.getenv("ADMIN_PASSWORD", "admin123") ttl_days = int(os.getenv("ADMIN_TTL_DAYS", "3650")) db = SessionLocal() try: existing = db.scalar(select(User).where(User.username == admin_user)) if not existing: db.add( User( username=admin_user, password_hash=hash_password(admin_password), active=True, is_admin=True, expires_at=now_utc() + dt.timedelta(days=ttl_days), ) ) db.commit() finally: db.close() @app.on_event("startup") def startup_event(): Base.metadata.create_all(bind=engine) ensure_schema_compatibility() ensure_icons_dir() bootstrap_admin() db = SessionLocal() try: ensure_universal_pool() ensure_web_pool() for svc in db.scalars( select(Service).where( Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) ).all(): if svc.type == ServiceType.WEB: continue if not service_uses_universal_pool(svc): ensure_warm_pool(svc) finally: db.close() thread = threading.Thread(target=cleanup_loop, daemon=True) thread.start() @app.get("/", response_class=HTMLResponse) def index(request: Request, user: Optional[User] = Depends(get_current_user), db: Session = Depends(get_db)): if not user: csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) response = templates.TemplateResponse("login.html", {"request": request, "csrf_token": csrf}) response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/") return response services = db.scalars( select(Service) .join(UserServiceAccess, UserServiceAccess.service_id == Service.id) .where( UserServiceAccess.user_id == user.id, Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) .order_by(Service.name) ).all() return templates.TemplateResponse( "dashboard.html", {"request": request, "user": user, "services": services, "csrf_token": request.cookies.get(CSRF_COOKIE, "")}, ) @app.get("/admin", response_class=HTMLResponse) def admin_page(request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)): users = db.scalars(select(User).order_by(User.id)).all() services = db.scalars(select(Service).where(Service.type.in_([ServiceType.WEB, ServiceType.RDP])).order_by(Service.id)).all() web_services = [s for s in services if s.type == ServiceType.WEB] rdp_services = [s for s in services if s.type == ServiceType.RDP] acl_rows = db.scalars(select(UserServiceAccess)).all() acl = {} for row in acl_rows: acl.setdefault(row.user_id, []).append(row.service_id) for user_id in acl: acl[user_id] = sorted(acl[user_id]) pool_status = {s.id: get_pool_status_for_service(s) for s in services} service_health = {} for sid, st in pool_status.items(): service_health[sid] = { "health": st["health"], "running": st["running"], "desired": st["desired"], "active_sessions": get_active_sessions_count(db, sid), } web_pool = get_web_pool_status() web_totals = { "services": len(web_services), "running": web_pool["running"], "desired": web_pool["desired"], "active_sessions": sum(service_health[s.id]["active_sessions"] for s in web_services), } recent_sessions = db.execute( text( """ SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug, s.status, s.created_at, s.last_access_at FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE sv.type IN ('WEB','RDP') ORDER BY s.created_at DESC LIMIT 200 """ ) ).mappings().all() open_stats = db.execute( text( """ SELECT u.username, sv.name AS service_name, sv.slug AS service_slug, COUNT(*) AS opens FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE sv.type IN ('WEB','RDP') GROUP BY u.username, sv.name, sv.slug ORDER BY opens DESC, u.username ASC LIMIT 200 """ ) ).mappings().all() return templates.TemplateResponse( "admin.html", { "request": request, "admin": admin, "users": users, "web_services": web_services, "rdp_services": rdp_services, "services": services, "acl": acl, "pool_status": pool_status, "service_health": service_health, "web_totals": web_totals, "web_pool_size": WEB_POOL_SIZE, "web_pool_buffer": WEB_POOL_BUFFER, "recent_sessions": recent_sessions, "open_stats": open_stats, "csrf_token": request.cookies.get(CSRF_COOKIE, ""), }, ) @app.post("/login") def login( request: Request, username: str = Form(...), password: str = Form(...), csrf_token: str = Form(...), db: Session = Depends(get_db), ): cookie_csrf = request.cookies.get(CSRF_COOKIE) if not cookie_csrf or csrf_token != cookie_csrf: raise HTTPException(status_code=403, detail="CSRF failed") user = db.scalar(select(User).where(User.username == username)) if not user or not verify_password(password, user.password_hash) or not user_is_valid(user): raise HTTPException(status_code=401, detail="Invalid credentials or expired user") response = RedirectResponse(url="/", status_code=303) issue_auth_cookie(response, user) issue_csrf_cookie(response) audit(db, "LOGIN", f"login success: {username}", user_id=user.id) return response @app.post("/logout") def logout(request: Request): response = RedirectResponse(url="/", status_code=303) response.delete_cookie(COOKIE_NAME, path="/") response.delete_cookie(CSRF_COOKIE, path="/") return response @app.get("/go/{slug}") def go_service(slug: str, user: User = Depends(require_user), db: Session = Depends(get_db)): service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True)) if not service: raise HTTPException(status_code=404, detail="Service not found") if service.type == ServiceType.VNC: raise HTTPException(status_code=410, detail="VNC services are deprecated") if not has_access(db, user.id, service.id): raise HTTPException(status_code=403, detail="ACL denied") session_id = str(uuid.uuid4()) if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0: try: ensure_web_pool() slot = acquire_web_pool_slot(db) dispatch_web_pool_target(slot, service) except Exception as exc: logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) raise HTTPException(status_code=502, detail="WEB runtime failed to switch target") session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=f"WEBPOOLIDX:{slot}", status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id) return RedirectResponse(url=f"/w/{slot}/?sid={session_id}", status_code=303) if service_uses_universal_pool(service): try: ensure_universal_pool() slot = acquire_universal_slot(db) dispatch_universal_target(slot, service) except Exception as exc: logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) raise HTTPException(status_code=502, detail="Universal runtime failed to switch target") session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=f"POOLIDX:{slot}", status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id) return RedirectResponse(url=f"/u/{slot}/?sid={session_id}", status_code=303) if desired_pool_size(service) > 0: ensure_warm_pool(service) open_warm_web_url(service, service.target) session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=f"POOL:{service.slug}", status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) try: container_id = create_runtime_container(service, session_id) except Exception as exc: logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) raise HTTPException(status_code=502, detail="Session runtime failed to start") session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=container_id, status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id) ready = wait_for_session_route(session_id) logger.info("session_route_ready session_id=%s ready=%s", session_id, ready) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) @app.get("/svc/{slug}/", response_class=HTMLResponse) def service_wait_page(slug: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)): service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True)) if not service: raise HTTPException(status_code=404, detail="Service not found") if not has_access(db, user.id, service.id): raise HTTPException(status_code=403, detail="ACL denied") return HTMLResponse( content=""" Service Starting
Сервис запускается
Проверка...
""".strip(), status_code=200, ) @app.get("/s/{session_id}/", response_class=HTMLResponse) def session_wait_page(session_id: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)): sess = db.get(SessionModel, session_id) if not sess or sess.user_id != user.id: raise HTTPException(status_code=404, detail="Session not found") if sess.status != SessionStatus.ACTIVE: raise HTTPException(status_code=410, detail="Session is not active") redirect_target = f"/s/{session_id}/" if sess.container_id and sess.container_id.startswith("POOL:"): redirect_target = f"/s/{session_id}/view" return HTMLResponse( content=f""" Session Starting
Сессия запускается
Проверка...
{session_id}
""".strip(), status_code=200, ) @app.get("/s/{session_id}/view", response_class=HTMLResponse) def session_view_page(session_id: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)): sess = db.get(SessionModel, session_id) if not sess or sess.user_id != user.id: raise HTTPException(status_code=404, detail="Session not found") if sess.status != SessionStatus.ACTIVE: raise HTTPException(status_code=410, detail="Session is not active") service = db.get(Service, sess.service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") if sess.container_id and sess.container_id.startswith("POOL:"): return HTMLResponse( content=f""" Session {session_id} """.strip() ) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) @app.post("/api/sessions/{session_id}/touch") def touch_session(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)): sess = db.get(SessionModel, session_id) if not sess or sess.user_id != user.id or sess.status != SessionStatus.ACTIVE: raise HTTPException(status_code=404, detail="Session not found") sess.last_access_at = now_utc() db.commit() return {"ok": True} @app.get("/api/services/{slug}/status") def service_status(slug: str, user: User = Depends(require_user), db: Session = Depends(get_db)): service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True)) if not service: raise HTTPException(status_code=404, detail="Service not found") if service.type == ServiceType.VNC: raise HTTPException(status_code=410, detail="VNC services are deprecated") if not has_access(db, user.id, service.id): raise HTTPException(status_code=403, detail="ACL denied") pool = get_pool_status_for_service(service) route_ok = route_ready(f"/svc/{slug}/") ready = route_ok and (pool["running"] > 0 if desired_pool_size(service) > 0 else True) steps = [ f"ACL: OK ({user.username})", f"Пул: {pool['running']} / {pool['desired']}", f"Маршрут /svc/{slug}/: {'OK' if route_ok else 'ожидание'}", ] return { "ready": ready, "message": "Готово, открываем..." if ready else "Поднимаем контейнер и маршрут...", "steps": steps, } @app.get("/api/sessions/{session_id}/status") def session_status(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)): sess = db.get(SessionModel, session_id) if not sess or sess.user_id != user.id: raise HTTPException(status_code=404, detail="Session not found") if sess.status != SessionStatus.ACTIVE: raise HTTPException(status_code=410, detail="Session is not active") service = db.get(Service, sess.service_id) pooled_web = bool(sess.container_id and sess.container_id.startswith("POOL:") and service and service.type == ServiceType.WEB) web_pool_idx = None if sess.container_id and sess.container_id.startswith("WEBPOOLIDX:"): try: web_pool_idx = int(sess.container_id.split(":", 1)[1]) except Exception: web_pool_idx = None route_path = f"/svc/{service.slug}/" if pooled_web and service else f"/s/{session_id}/" if web_pool_idx is not None: route_path = f"/w/{web_pool_idx}/" route_ok = route_ready(route_path) running = container_running(sess.container_id) ready = running and route_ok steps = [ f"Контейнер: {'running' if running else 'starting'}", f"Маршрут {route_path}: {'OK' if route_ok else 'ожидание'}", ] payload = { "ready": ready, "message": "Готово, открываем..." if ready else "Запуск сессии...", "steps": steps, } if pooled_web: payload["redirect_url"] = f"/s/{session_id}/view" if web_pool_idx is not None: payload["redirect_url"] = f"/w/{web_pool_idx}/?sid={session_id}" return payload @app.post("/api/admin/services") def create_service(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service_type = ServiceType(payload["type"]) if service_type == ServiceType.VNC: raise HTTPException(status_code=400, detail="VNC services are no longer supported") target = payload["target"] if service_type == ServiceType.WEB: target = normalize_web_target(target) elif service_type == ServiceType.RDP: parse_rdp_target(target) service = Service( name=payload["name"], slug=payload["slug"], type=service_type, target=target, comment=payload.get("comment", ""), active=payload.get("active", True), warm_pool_size=max(0, int(payload.get("warm_pool_size", 0))), ) db.add(service) db.commit() ensure_warm_pool(service) return {"id": service.id} @app.get("/api/admin/services/{service_id}/containers/status") def service_containers_status(service_id: int, _: User = Depends(require_admin), db: Session = Depends(get_db)): service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") out = get_pool_detailed_status(service) out["active_sessions"] = get_active_sessions_count(db, service.id) return out @app.post("/api/admin/services/{service_id}/icon") async def upload_service_icon( service_id: int, request: Request, file: UploadFile = File(...), _: User = Depends(require_admin), db: Session = Depends(get_db), ): validate_csrf(request) service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") new_path = await store_service_icon(service, file) old_path = service.icon_path service.icon_path = new_path db.commit() if old_path and old_path != new_path: remove_icon_file(old_path) return {"ok": True, "icon_path": new_path} @app.delete("/api/admin/services/{service_id}/icon") def delete_service_icon(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") old_path = service.icon_path service.icon_path = "" db.commit() remove_icon_file(old_path) return {"ok": True} @app.put("/api/admin/services/{service_id}") def edit_service(service_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") for key in ["name", "slug", "target", "active", "comment"]: if key in payload: setattr(service, key, payload[key]) if "type" in payload: service.type = ServiceType(payload["type"]) if service.type == ServiceType.VNC: raise HTTPException(status_code=400, detail="VNC services are no longer supported") if service.type == ServiceType.WEB: service.target = normalize_web_target(service.target) elif service.type == ServiceType.RDP: parse_rdp_target(service.target) if "warm_pool_size" in payload: service.warm_pool_size = max(0, int(payload["warm_pool_size"])) db.commit() ensure_warm_pool(service) open_warm_web_url(service, service.target) return {"ok": True} @app.delete("/api/admin/services/{service_id}") def delete_service(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") ensure_warm_pool(service, 0) remove_icon_file(service.icon_path) db.delete(service) db.commit() return {"ok": True} @app.post("/api/admin/services/{service_id}/prewarm") def prewarm_now(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") if service.type == ServiceType.WEB: ensure_web_pool() return {"ok": True, "pool": get_web_pool_status()} if service_uses_universal_pool(service): ensure_universal_pool() return {"ok": True, "pool": get_universal_pool_status()} ensure_warm_pool(service) return {"ok": True, "pool": get_pool_status_for_service(service)} @app.put("/api/admin/web-pool-size") def update_web_pool_size(payload: dict, request: Request, _: User = Depends(require_admin)): validate_csrf(request) global WEB_POOL_SIZE value = max(0, int(payload.get("size", WEB_POOL_SIZE))) WEB_POOL_SIZE = value ensure_web_pool() return {"ok": True, "size": WEB_POOL_SIZE, "pool": get_web_pool_status()} @app.post("/api/admin/users") def create_user(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) expires_at = dt.datetime.fromisoformat(payload["expires_at"]) user = User( username=payload["username"], password_hash=hash_password(payload["password"]), expires_at=expires_at, active=payload.get("active", True), is_admin=payload.get("is_admin", False), ) db.add(user) db.commit() return {"id": user.id} @app.put("/api/admin/users/{user_id}") def edit_user(user_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") for key in ["username", "active", "is_admin"]: if key in payload: setattr(user, key, payload[key]) if "password" in payload and payload["password"]: user.password_hash = hash_password(payload["password"]) if "expires_at" in payload: user.expires_at = dt.datetime.fromisoformat(payload["expires_at"]) db.commit() return {"ok": True} @app.delete("/api/admin/users/{user_id}") def delete_user(user_id: int, request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") if user.id == admin.id: raise HTTPException(status_code=400, detail="Cannot delete current admin") db.delete(user) db.commit() return {"ok": True} @app.put("/api/admin/users/{user_id}/acl") def set_acl(user_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") service_ids = set(payload.get("service_ids", [])) existing = db.scalars(select(UserServiceAccess).where(UserServiceAccess.user_id == user_id)).all() existing_map = {x.service_id: x for x in existing} for sid in service_ids: if sid not in existing_map: db.add(UserServiceAccess(user_id=user_id, service_id=sid)) for sid, row in existing_map.items(): if sid not in service_ids: db.delete(row) db.commit() return {"ok": True}