import datetime as dt import logging import threading import time from typing import Optional import docker import requests from fastapi import HTTPException from sqlalchemy import select, text from sqlalchemy.orm import Session from config import ( POOL_DISPATCH_RETRIES, POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS, POOL_DISPATCH_SLEEP_SECONDS, PREWARM_POOL_SIZE, PUBLIC_HOST, SESSION_IDLE_SECONDS, TRAEFIK_INTERNAL_URL, UNIVERSAL_POOL_SIZE, WEB_POOL_BUFFER, WEB_POOL_SIZE, X11VNC_FLAGS, WEB_RESOLUTION_MIN_WIDTH, WEB_RESOLUTION_MIN_HEIGHT, WEB_RESOLUTION_MAX_WIDTH, WEB_RESOLUTION_MAX_HEIGHT, ) from database import SessionLocal, engine from models import ( RdpSlot, Service, ServiceType, SessionModel, SessionStatus, ) from utils import log_event, normalize_web_target, now_utc, parse_rdp_target, session_closed_reason logger = logging.getLogger("portal") def docker_client(): return docker.from_env() def session_router_name(session_id: str) -> str: return f"sess-{session_id.replace('-', '')[:16]}" def _is_pool_name_conflict(exc: Exception) -> bool: msg = str(exc).lower() return ("already in use" in msg) or ("marked for removal" in msg) def _remove_container_by_name(d, name: str) -> None: try: old = d.containers.get(name) old.remove(force=True) except docker.errors.NotFound: return except Exception: logger.exception("pool_container_remove_failed name=%s", name) def ensure_universal_pool() -> None: if UNIVERSAL_POOL_SIZE <= 0: return d = docker_client() image = "portal-universal-runtime:latest" for i in range(UNIVERSAL_POOL_SIZE, 100): name = universal_container_name(i) try: c = d.containers.get(name) c.stop(timeout=5) except docker.errors.NotFound: break except Exception: logger.exception("universal_pool_scale_down_failed slot=%s", i) for i in range(UNIVERSAL_POOL_SIZE): name = universal_container_name(i) path = f"/u/{i}/" router = f"upool-{i}" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "9400", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", "portal.pool": "1", "portal.pool.slot": str(i), } env = { "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "0", "SESSION_ID": f"universal-{i}", "X11VNC_FLAGS": X11VNC_FLAGS, } try: c = d.containers.get(name) if c.status != "running": c.start() continue except docker.errors.NotFound: pass except Exception: logger.exception("universal_pool_check_failed slot=%s", i) continue d.containers.run( image=image, name=name, detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("universal_pool_container_started slot=%s", i) def ensure_web_pool(target_size: Optional[int] = None) -> None: desired = max(0, WEB_POOL_SIZE if target_size is None else target_size) d = docker_client() image = "portal-universal-runtime:latest" for i in range(desired, 100): name = web_pool_container_name(i) try: c = d.containers.get(name) c.stop(timeout=5) except docker.errors.NotFound: break except Exception: logger.exception("web_pool_scale_down_failed slot=%s", i) for i in range(desired): name = web_pool_container_name(i) path = f"/w/{i}/" router = f"wpool-{i}" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "9450", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", "portal.pool": "1", "portal.pool.kind": "web", "portal.pool.slot": str(i), } env = { "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "0", "SESSION_ID": f"webpool-{i}", "X11VNC_FLAGS": X11VNC_FLAGS, } should_create = False try: c = d.containers.get(name) if c.status != "running": try: c.start() except docker.errors.APIError as exc: if _is_pool_name_conflict(exc): logger.warning("web_pool_recreate_needed slot=%s reason=name-conflict", i) _remove_container_by_name(d, name) should_create = True else: raise if not should_create: continue except docker.errors.NotFound: should_create = True except Exception: logger.exception("web_pool_check_failed slot=%s", i) continue for attempt in range(3): try: d.containers.run( image=image, name=name, detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("web_pool_container_started slot=%s", i) break except docker.errors.APIError as exc: if _is_pool_name_conflict(exc) and attempt < 2: logger.warning("web_pool_run_conflict_retry slot=%s attempt=%s", i, attempt + 1) _remove_container_by_name(d, name) time.sleep(0.25) continue logger.exception("web_pool_run_failed slot=%s", i) break def get_universal_pool_status() -> dict: desired = max(0, UNIVERSAL_POOL_SIZE) if desired <= 0: return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []} d = docker_client() names = [universal_container_name(i) for i in range(desired)] containers = [] for name in names: try: containers.append(d.containers.get(name)) except Exception: continue running = sum(1 for c in containers if c.status == "running") health = "ok" if running >= min(desired, 1) else "down" return { "desired": desired, "running": running, "total": len(containers), "names": sorted(c.name for c in containers), "health": health, } def get_web_pool_status() -> dict: desired = max(0, WEB_POOL_SIZE) if desired <= 0: return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []} d = docker_client() names = [web_pool_container_name(i) for i in range(desired)] containers = [] for name in names: try: containers.append(d.containers.get(name)) except Exception: continue running = sum(1 for c in containers if c.status == "running") health = "ok" if running >= min(desired, 1) else "down" return { "desired": desired, "running": running, "total": len(containers), "names": sorted(c.name for c in containers), "health": health, } def acquire_universal_slot(db: Session) -> int: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.container_id.like("POOLIDX:%"), SessionModel.last_access_at >= cutoff, ) active = db.scalars(q).all() busy = set() for sess in active: try: busy.add(int((sess.container_id or "").split(":", 1)[1])) except Exception: continue for i in range(max(0, UNIVERSAL_POOL_SIZE)): if i not in busy: return i if active: victim = min(active, key=lambda s: s.last_access_at) victim.status = SessionStatus.TERMINATED db.commit() try: return int((victim.container_id or "").split(":", 1)[1]) except Exception: pass return 0 def acquire_web_pool_slot(db: Session) -> int: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.container_id.like("WEBPOOLIDX:%"), SessionModel.last_access_at >= cutoff, ) active = db.scalars(q).all() busy = set() for sess in active: try: busy.add(int((sess.container_id or "").split(":", 1)[1])) except Exception: continue # Keep headroom: when active sessions are close to hot pool capacity, # proactively warm up extra slots. auto_target = max(WEB_POOL_SIZE, len(active) + max(0, WEB_POOL_BUFFER)) if auto_target > WEB_POOL_SIZE: ensure_web_pool(auto_target) for i in range(max(0, auto_target)): if i not in busy: return i return 0 def sanitize_client_resolution(width: Optional[int], height: Optional[int]) -> tuple[Optional[int], Optional[int]]: if width is None or height is None: return None, None clamped_width = max(WEB_RESOLUTION_MIN_WIDTH, min(int(width), WEB_RESOLUTION_MAX_WIDTH)) clamped_height = max(WEB_RESOLUTION_MIN_HEIGHT, min(int(height), WEB_RESOLUTION_MAX_HEIGHT)) return clamped_width, clamped_height def dispatch_universal_target(slot: int, service: Service, width: Optional[int] = None, height: Optional[int] = None) -> None: name = universal_container_name(slot) url = "" payload = {} if service.type == ServiceType.WEB: url = f"http://{name}:7000/open" payload = {"url": normalize_web_target(service.target), "login": service.svc_login or "", "password": service.svc_password or ""} width, height = sanitize_client_resolution(width, height) if width and height: payload["width"] = width payload["height"] = height elif service.type == ServiceType.RDP: cfg = parse_rdp_target(service.target) url = f"http://{name}:7000/rdp" payload = { "host": cfg["host"], "port": cfg["port"], "user": cfg["user"], "password": cfg["password"], "domain": cfg["domain"], "security": cfg["security"], } else: raise HTTPException(status_code=400, detail="Universal pool supports WEB/RDP only") last_exc = None for _ in range(max(1, POOL_DISPATCH_RETRIES)): try: resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS) resp.raise_for_status() return except Exception as exc: last_exc = exc time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS)) if last_exc: raise last_exc def dispatch_web_pool_target(slot: int, service: Service, width: Optional[int] = None, height: Optional[int] = None) -> None: name = web_pool_container_name(slot) target_url = normalize_web_target(service.target) url = f"http://{name}:7000/open" payload = {"url": target_url, "login": service.svc_login or "", "password": service.svc_password or ""} width, height = sanitize_client_resolution(width, height) if width and height: payload["width"] = width payload["height"] = height last_exc = None for _ in range(max(1, POOL_DISPATCH_RETRIES)): try: resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS) resp.raise_for_status() return except Exception as exc: last_exc = exc time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS)) if last_exc: raise last_exc def create_runtime_container(service: Service, session_id: str): d = docker_client() router = session_router_name(session_id) path = f"/s/{session_id}/" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "10000", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", } env = { "SESSION_ID": session_id, "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "1", "TOUCH_PATH": f"/api/sessions/{session_id}/touch", "X11VNC_FLAGS": X11VNC_FLAGS, } image = "portal-kiosk:latest" if service.type == ServiceType.WEB: env["TARGET_URL"] = service.target env["HOME_URL"] = f"https://{PUBLIC_HOST}/" elif service.type == ServiceType.RDP: image = "portal-rdp-proxy:latest" cfg = parse_rdp_target(service.target) env["RDP_HOST"] = cfg["host"] env["RDP_PORT"] = cfg["port"] if cfg["user"]: env["RDP_USER"] = cfg["user"] if cfg["password"]: env["RDP_PASSWORD"] = cfg["password"] if cfg["domain"]: env["RDP_DOMAIN"] = cfg["domain"] if cfg["security"]: env["RDP_SECURITY"] = cfg["security"] else: raise HTTPException(status_code=400, detail="Unsupported service type") container = d.containers.run( image=image, name=f"portal-sess-{session_id[:8]}", detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("session_container_started session_id=%s container_id=%s service_type=%s", session_id, container.id, service.type.value) return container.id def ensure_warm_pool(service: Service, pool_size: Optional[int] = None) -> None: if service_uses_universal_pool(service): return if pool_size is None: pool_size = desired_pool_size(service) if pool_size <= 0: # Stop stale warm containers for this service when pool is disabled. prefix = f"portal-warm-{service.slug}-" try: d = docker_client() for c in d.containers.list(all=True, filters={"name": prefix}): if c.name.startswith(prefix): c.stop(timeout=5) except Exception: logger.exception("warm_pool_disable_failed service=%s", service.slug) return d = docker_client() router = f"warm-{service.slug}" svc_name = f"warmsvc-{service.slug}" path = f"/svc/{service.slug}/" image = "portal-kiosk:latest" base_env = { "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), "ENABLE_HEARTBEAT": "0", "TOUCH_PATH": "", "X11VNC_FLAGS": X11VNC_FLAGS, } if service.type == ServiceType.WEB: base_env["UNIVERSAL_WEB"] = "1" base_env["START_URL"] = normalize_web_target(service.target) base_env["HOME_URL"] = f"https://{PUBLIC_HOST}/" elif service.type == ServiceType.RDP: image = "portal-rdp-proxy:latest" cfg = parse_rdp_target(service.target) base_env["RDP_HOST"] = cfg["host"] base_env["RDP_PORT"] = cfg["port"] if cfg["user"]: base_env["RDP_USER"] = cfg["user"] if cfg["password"]: base_env["RDP_PASSWORD"] = cfg["password"] if cfg["domain"]: base_env["RDP_DOMAIN"] = cfg["domain"] if cfg["security"]: base_env["RDP_SECURITY"] = cfg["security"] else: raise HTTPException(status_code=400, detail="Unsupported service type") labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "9500", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{svc_name}.loadbalancer.server.port": "6080", f"traefik.http.routers.{router}.service": svc_name, "portal.warm": "1", "portal.service.slug": service.slug, "portal.service.type": service.type.value, } # Ensure desired cardinality. for i in range(pool_size, 50): name = f"portal-warm-{service.slug}-{i}" try: c = d.containers.get(name) c.stop(timeout=5) except docker.errors.NotFound: break except Exception: logger.exception("warm_pool_scale_down_failed service=%s idx=%s", service.slug, i) for i in range(pool_size): name = f"portal-warm-{service.slug}-{i}" try: c = d.containers.get(name) if c.status != "running": c.start() continue except docker.errors.NotFound: pass except Exception: logger.exception("warm_pool_check_failed service=%s idx=%s", service.slug, i) continue env = dict(base_env) env["SESSION_ID"] = f"warm-{service.slug}-{i}" d.containers.run( image=image, name=name, detach=True, auto_remove=True, network="portal_net", labels=labels, environment=env, ) logger.info("warm_pool_container_started service=%s idx=%s", service.slug, i) def wait_for_session_route(session_id: str, timeout_seconds: int = 6) -> bool: target = f"{TRAEFIK_INTERNAL_URL}/s/{session_id}/" deadline = time.time() + timeout_seconds while time.time() < deadline: try: resp = requests.get( target, headers={"Host": PUBLIC_HOST}, allow_redirects=False, timeout=1.5, ) if resp.status_code != 404: return True except Exception: pass time.sleep(0.3) return False def route_ready(path: str) -> bool: bases = [TRAEFIK_INTERNAL_URL] if TRAEFIK_INTERNAL_URL.startswith("http://"): bases.append("https://" + TRAEFIK_INTERNAL_URL[len("http://"):]) for base in bases: try: verify = not base.startswith("https://") resp = requests.get( f"{base}{path}", headers={"Host": PUBLIC_HOST}, allow_redirects=False, timeout=1.5, verify=verify, ) if resp.status_code != 404: return True except Exception: continue return False def container_running(container_id: Optional[str]) -> bool: if not container_id: return False if ( container_id.startswith("POOL:") or container_id.startswith("POOLIDX:") or container_id.startswith("WEBPOOLIDX:") ): return True if container_id.startswith("RDPSLOT:"): try: slot_id = int(container_id.split(":", 1)[1]) db = SessionLocal() try: slot = db.get(RdpSlot, slot_id) if not slot or not slot.container_name: return False c = docker_client().containers.get(slot.container_name) return c.status == "running" finally: db.close() except Exception: return False try: c = docker_client().containers.get(container_id) return c.status == "running" except Exception: return False def _rdp_slot_container_name(service_slug: str, slot_id: int) -> str: return f"portal-rdpslot-{service_slug}-{slot_id}" def start_rdp_slot_container(slot: RdpSlot, service: Service) -> str: d = docker_client() name = _rdp_slot_container_name(service.slug, slot.id) slot_id = slot.id path = f"/rdp/{slot_id}/" router = f"rdpslot-{slot_id}" labels = { "traefik.enable": "true", "traefik.docker.network": "portal_net", f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", f"traefik.http.routers.{router}.entrypoints": "websecure", f"traefik.http.routers.{router}.tls": "true", f"traefik.http.routers.{router}.priority": "10000", f"traefik.http.routers.{router}.middlewares": f"{router}-strip", f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], f"traefik.http.services.{router}.loadbalancer.server.port": "6080", "portal.rdpslot": "1", "portal.rdpslot.id": str(slot_id), "portal.service.slug": service.slug, } cfg = parse_rdp_target(service.target) env = { "SESSION_ID": f"rdpslot-{slot_id}", "IDLE_TIMEOUT": "86400", "ENABLE_HEARTBEAT": "0", "RDP_HOST": cfg["host"], "RDP_PORT": cfg["port"], "X11VNC_FLAGS": X11VNC_FLAGS, } if slot.rdp_username: env["RDP_USER"] = slot.rdp_username if slot.rdp_password: env["RDP_PASSWORD"] = slot.rdp_password if cfg.get("domain"): env["RDP_DOMAIN"] = cfg["domain"] if cfg.get("security"): env["RDP_SECURITY"] = cfg["security"] try: existing = d.containers.get(name) existing.stop(timeout=5) existing.remove(force=True) except docker.errors.NotFound: pass except Exception: logger.exception("rdp_slot_container_cleanup_failed slot_id=%s", slot_id) container = d.containers.run( "portal-rdp-proxy:latest", name=name, detach=True, restart_policy={"Name": "unless-stopped"}, network="portal_net", labels=labels, environment=env, ) logger.info("rdp_slot_container_started slot_id=%s name=%s", slot_id, name) return container.name def stop_rdp_slot_container(container_name: str) -> None: if not container_name: return try: d = docker_client() c = d.containers.get(container_name) c.stop(timeout=5) c.remove(force=True) logger.info("rdp_slot_container_stopped name=%s", container_name) except docker.errors.NotFound: pass except Exception: logger.exception("rdp_slot_container_stop_failed container=%s", container_name) def _restart_rdp_slot_bg(slot_id: int) -> None: db = SessionLocal() try: slot = db.get(RdpSlot, slot_id) if not slot or not slot.container_name: return service = db.get(Service, slot.service_id) if not service: return try: d = docker_client() c = d.containers.get(slot.container_name) c.restart(timeout=10) logger.info("rdp_slot_container_restarted slot_id=%s", slot_id) except docker.errors.NotFound: start_rdp_slot_container(slot, service) except Exception: logger.exception("rdp_slot_container_restart_failed slot_id=%s", slot_id) finally: db.close() def stop_runtime_container(container_id: Optional[str]) -> None: if not container_id: return try: d = docker_client() c = d.containers.get(container_id) c.stop(timeout=5) except Exception: logger.exception("session_container_stop_failed container_id=%s", container_id) def terminate_session_record( db: Session, sess: SessionModel, new_status: SessionStatus = SessionStatus.TERMINATED, *, stop_container: bool = True, ) -> None: if not sess or sess.status != SessionStatus.ACTIVE: return old_status = sess.status cid = sess.container_id or "" if stop_container and cid and not cid.startswith(("POOL:", "POOLIDX:", "WEBPOOLIDX:", "RDPSLOT:")): stop_runtime_container(cid) if cid.startswith("RDPSLOT:"): try: slot_id = int(cid.split(":", 1)[1]) threading.Thread(target=_restart_rdp_slot_bg, args=(slot_id,), daemon=True).start() except Exception: logger.exception("rdp_slot_restart_schedule_failed cid=%s", cid) sess.status = new_status sess.last_access_at = now_utc() log_event( "session_closed", level=logging.INFO, session_id=sess.id, user_id=sess.user_id, service_id=sess.service_id, container_id=cid, old_status=old_status.value if isinstance(old_status, SessionStatus) else str(old_status), new_status=new_status.value, reason=session_closed_reason(sess, db), stop_container=stop_container, ) def ensure_schema_compatibility() -> None: # PostgreSQL requires enum value addition to be committed before usage in constraints. with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn: conn.execute( text( """ DO $$ BEGIN BEGIN ALTER TYPE servicetype ADD VALUE IF NOT EXISTS 'RDP'; EXCEPTION WHEN undefined_object THEN NULL; END; END $$; """ ) ) conn.execute( text( """ DO $$ BEGIN BEGIN ALTER TYPE sessionstatus ADD VALUE IF NOT EXISTS 'ROTATED'; EXCEPTION WHEN undefined_object THEN NULL; END; END $$; """ ) ) with engine.begin() as conn: conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS warm_pool_size INTEGER NOT NULL DEFAULT 0")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS comment TEXT NOT NULL DEFAULT ''")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS svc_login VARCHAR(256) NOT NULL DEFAULT ''")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS svc_password VARCHAR(256) NOT NULL DEFAULT ''")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS svc_cred_hint TEXT NOT NULL DEFAULT ''")) conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS icon_path TEXT NOT NULL DEFAULT ''")) conn.execute( text( """ CREATE TABLE IF NOT EXISTS categories ( id SERIAL PRIMARY KEY, name VARCHAR(128) NOT NULL UNIQUE, slug VARCHAR(64) NOT NULL UNIQUE, created_at TIMESTAMPTZ NOT NULL DEFAULT now() ) """ ) ) conn.execute( text( """ CREATE TABLE IF NOT EXISTS service_categories ( id SERIAL PRIMARY KEY, service_id INT NOT NULL REFERENCES services(id) ON DELETE CASCADE, category_id INT NOT NULL REFERENCES categories(id) ON DELETE CASCADE, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), UNIQUE (service_id, category_id) ) """ ) ) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_service_id ON service_categories(service_id)")) conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_category_id ON service_categories(category_id)")) # Handle installs where service type is VARCHAR + CHECK. conn.execute( text( """ DO $$ DECLARE c record; BEGIN FOR c IN SELECT conname FROM pg_constraint WHERE conrelid = 'services'::regclass AND contype = 'c' AND pg_get_constraintdef(oid) ILIKE '%type%' LOOP EXECUTE format('ALTER TABLE services DROP CONSTRAINT %I', c.conname); END LOOP; ALTER TABLE services ADD CONSTRAINT services_type_check CHECK (type IN ('WEB','VNC','RDP')); EXCEPTION WHEN duplicate_object THEN NULL; END $$; """ ) ) def desired_pool_size(service: Service) -> int: if not service.active: return 0 if service.type == ServiceType.RDP and not service_uses_universal_pool(service): # RDP runs on-demand per user session; no prewarmed pool. return 0 if service_uses_universal_pool(service): return UNIVERSAL_POOL_SIZE return service.warm_pool_size if service.warm_pool_size and service.warm_pool_size > 0 else PREWARM_POOL_SIZE def get_warm_containers_for_service(service: Service) -> list: prefix = f"portal-warm-{service.slug}-" try: d = docker_client() containers = [] for c in d.containers.list(all=True, filters={"name": prefix}): if c.name.startswith(prefix): containers.append(c) return containers except Exception: logger.exception("pool_status_failed service=%s", service.slug) return [] def get_pool_status_for_service(service: Service) -> dict: if service.type == ServiceType.WEB: return get_web_pool_status() if service.type == ServiceType.RDP and not service_uses_universal_pool(service): return {"desired": 0, "running": 0, "total": 0, "names": [], "health": "n/a"} if service_uses_universal_pool(service): return get_universal_pool_status() desired = desired_pool_size(service) containers = get_warm_containers_for_service(service) running = sum(1 for c in containers if c.status == "running") states = [(c.attrs.get("State") or {}).get("Status", c.status) for c in containers] has_bad = any(s in {"exited", "dead"} for s in states) total = len(containers) if running == 0: health = "down" elif running >= min(desired, 1) and not has_bad: health = "ok" else: health = "degraded" return { "desired": desired, "running": running, "total": total, "names": sorted(c.name for c in containers), "health": health, } def get_pool_detailed_status(service: Service) -> dict: if service.type == ServiceType.WEB: d = docker_client() pool = get_web_pool_status() details = [] for i in range(max(0, pool["desired"])): name = web_pool_container_name(i) try: c = d.containers.get(name) except Exception: continue attrs = c.attrs or {} state = (attrs.get("State") or {}).get("Status", c.status) details.append( { "name": c.name, "status": c.status, "state": state, "created": attrs.get("Created", ""), "image": c.image.tags[0] if c.image.tags else "", "labels_ok": True, } ) return { "service_id": service.id, "slug": service.slug, "type": service.type.value, "desired": pool["desired"], "running": pool["running"], "total": pool["total"], "health": pool["health"], "containers": details, "updated_at": now_utc().isoformat(), } if service_uses_universal_pool(service): d = docker_client() pool = get_universal_pool_status() details = [] for i in range(max(0, UNIVERSAL_POOL_SIZE)): name = universal_container_name(i) try: c = d.containers.get(name) except Exception: continue attrs = c.attrs or {} state = (attrs.get("State") or {}).get("Status", c.status) details.append( { "name": c.name, "status": c.status, "state": state, "created": attrs.get("Created", ""), "image": c.image.tags[0] if c.image.tags else "", "labels_ok": True, } ) return { "service_id": service.id, "slug": service.slug, "type": service.type.value, "desired": pool["desired"], "running": pool["running"], "total": pool["total"], "health": pool["health"], "containers": details, "updated_at": now_utc().isoformat(), } containers = get_warm_containers_for_service(service) pool = get_pool_status_for_service(service) details = [] for c in sorted(containers, key=lambda x: x.name): attrs = c.attrs or {} state = (attrs.get("State") or {}).get("Status", c.status) created = attrs.get("Created", "") labels = attrs.get("Config", {}).get("Labels", {}) or {} labels_ok = ( labels.get("portal.warm") == "1" and labels.get("portal.service.slug") == service.slug and labels.get("portal.service.type") == service.type.value ) details.append( { "name": c.name, "status": c.status, "state": state, "created": created, "image": c.image.tags[0] if c.image.tags else "", "labels_ok": labels_ok, } ) return { "service_id": service.id, "slug": service.slug, "type": service.type.value, "desired": pool["desired"], "running": pool["running"], "total": pool["total"], "health": pool["health"], "containers": details, "updated_at": now_utc().isoformat(), } def get_active_sessions_count(db: Session, service_id: int) -> int: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( SessionModel.service_id == service_id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) sessions = db.scalars(q).all() # Avoid inflated stats when pooled slot sessions were duplicated by race: # for pooled sessions, occupancy is unique container_id. pooled = [s for s in sessions if (s.container_id or "").startswith(("WEBPOOLIDX:", "POOLIDX:", "POOL:"))] direct = [s for s in sessions if s not in pooled] unique_pooled = len({s.container_id for s in pooled if s.container_id}) return unique_pooled + len(direct) def find_active_session_for_service(db: Session, service_id: int) -> Optional[SessionModel]: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = ( select(SessionModel) .where( SessionModel.service_id == service_id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) .order_by(SessionModel.created_at.desc()) ) return db.scalars(q).first() def find_active_session_for_user_service(db: Session, user_id: int, service_id: int) -> Optional[SessionModel]: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = ( select(SessionModel) .where( SessionModel.user_id == user_id, SessionModel.service_id == service_id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) .order_by(SessionModel.created_at.desc()) ) return db.scalars(q).first() class LockTimeoutError(Exception): pass def allocator_lock(db: Session, lock_id: int, timeout_seconds: Optional[float] = None, poll_seconds: float = 0.05): class _LockCtx: def __enter__(self_nonlocal): self_nonlocal._acquired = False if timeout_seconds is None: db.execute(text("SELECT pg_advisory_xact_lock(:lid)"), {"lid": lock_id}) self_nonlocal._acquired = True return self_nonlocal deadline = time.monotonic() + max(0.0, timeout_seconds) while time.monotonic() <= deadline: got = db.execute(text("SELECT pg_try_advisory_xact_lock(:lid)"), {"lid": lock_id}).scalar() if got: self_nonlocal._acquired = True return self_nonlocal time.sleep(max(0.01, poll_seconds)) raise LockTimeoutError(f"advisory lock timeout lock_id={lock_id} timeout={timeout_seconds}") return self_nonlocal def __exit__(self_nonlocal, exc_type, exc, tb): return False return _LockCtx() def terminate_active_slot_sessions(db: Session, container_id: str) -> None: if not container_id: return db.execute( text( """ UPDATE sessions SET status = 'TERMINATED' WHERE container_id = :cid AND status = 'ACTIVE' """ ), {"cid": container_id}, ) def session_redirect_url(sess: SessionModel) -> str: cid = sess.container_id or "" if cid.startswith("POOL:") or cid.startswith("POOLIDX:") or cid.startswith("WEBPOOLIDX:") or cid.startswith("RDPSLOT:"): return f"/s/{sess.id}/view" return f"/s/{sess.id}/" def open_warm_web_url(service: Service, target_url: str) -> None: if service_uses_universal_pool(service): return if service.type != ServiceType.WEB: return target_url = normalize_web_target(target_url) try: d = docker_client() containers = d.containers.list( filters={ "label": [ "portal.warm=1", f"portal.service.slug={service.slug}", "portal.service.type=WEB", ] } ) for c in containers: try: resp = requests.post( f"http://{c.name}:7000/open", json={"url": target_url}, timeout=2, ) resp.raise_for_status() logger.info("warm_web_open_ok service=%s container=%s url=%s", service.slug, c.name, target_url) except Exception: logger.exception("warm_web_open_failed service=%s container=%s", service.slug, c.name) except Exception: logger.exception("warm_web_open_dispatch_failed service=%s", service.slug) def service_uses_universal_pool(service) -> bool: return UNIVERSAL_POOL_SIZE > 0 and service.type == ServiceType.RDP def universal_container_name(slot: int) -> str: return f"portal-universal-{slot}" def web_pool_container_name(slot: int) -> str: return f"portal-webpool-{slot}"