Files
Stend_mont/app/runtime.py
T
ruslan 58cb8b1035 feat: on-demand RDP - connect xfreerdp only when session opens
Replaces always-on xfreerdp with on-demand model (load 12 to under 1 at idle).
- rdp-proxy/manager.py: HTTP server port 7001 managing xfreerdp lifecycle
- rdp-proxy/entrypoint.sh: starts Xvfb+x11vnc+websockify+manager, no auto-connect
- rdp-proxy/Dockerfile: adds python3, copies manager.py, exposes 7001
- runtime.py: connect_rdp_slot and disconnect_rdp_slot via manager HTTP API
- terminate_session_record: disconnect instead of container restart
- main.py: calls connect_rdp_slot in background thread on session create
- maintenance.py: cleanup_loop disconnects on expire, run_maintenance_service
  includes RDP slot init, maintenance_runner fixed to import maintenance
2026-05-01 10:12:52 +00:00

1163 lines
41 KiB
Python

import datetime as dt
import logging
import threading
import time
from typing import Optional
import docker
import requests
from fastapi import HTTPException
from sqlalchemy import select, text
from sqlalchemy.orm import Session
from config import (
POOL_DISPATCH_RETRIES, POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS,
POOL_DISPATCH_SLEEP_SECONDS, PREWARM_POOL_SIZE, PUBLIC_HOST,
SESSION_IDLE_SECONDS, TRAEFIK_INTERNAL_URL, UNIVERSAL_POOL_SIZE,
WEB_POOL_BUFFER, WEB_POOL_SIZE, X11VNC_FLAGS,
WEB_RESOLUTION_MIN_WIDTH, WEB_RESOLUTION_MIN_HEIGHT,
WEB_RESOLUTION_MAX_WIDTH, WEB_RESOLUTION_MAX_HEIGHT,
)
from database import SessionLocal, engine
from models import (
RdpSlot, Service, ServiceType, SessionModel, SessionStatus,
)
from utils import log_event, normalize_web_target, now_utc, parse_rdp_target, session_closed_reason
logger = logging.getLogger("portal")
def docker_client():
return docker.from_env()
def session_router_name(session_id: str) -> str:
return f"sess-{session_id.replace('-', '')[:16]}"
def _is_pool_name_conflict(exc: Exception) -> bool:
msg = str(exc).lower()
return ("already in use" in msg) or ("marked for removal" in msg)
def _remove_container_by_name(d, name: str) -> None:
try:
old = d.containers.get(name)
old.remove(force=True)
except docker.errors.NotFound:
return
except Exception:
logger.exception("pool_container_remove_failed name=%s", name)
def ensure_universal_pool() -> None:
if UNIVERSAL_POOL_SIZE <= 0:
return
d = docker_client()
image = "portal-universal-runtime:latest"
for i in range(UNIVERSAL_POOL_SIZE, 100):
name = universal_container_name(i)
try:
c = d.containers.get(name)
c.stop(timeout=5)
except docker.errors.NotFound:
break
except Exception:
logger.exception("universal_pool_scale_down_failed slot=%s", i)
for i in range(UNIVERSAL_POOL_SIZE):
name = universal_container_name(i)
path = f"/u/{i}/"
router = f"upool-{i}"
labels = {
"traefik.enable": "true",
"traefik.docker.network": "portal_net",
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
f"traefik.http.routers.{router}.entrypoints": "websecure",
f"traefik.http.routers.{router}.tls": "true",
f"traefik.http.routers.{router}.priority": "9400",
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
"portal.pool": "1",
"portal.pool.slot": str(i),
}
env = {
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
"ENABLE_HEARTBEAT": "0",
"SESSION_ID": f"universal-{i}",
"X11VNC_FLAGS": X11VNC_FLAGS,
}
try:
c = d.containers.get(name)
if c.status != "running":
c.start()
continue
except docker.errors.NotFound:
pass
except Exception:
logger.exception("universal_pool_check_failed slot=%s", i)
continue
d.containers.run(
image=image,
name=name,
detach=True,
auto_remove=True,
network="portal_net",
labels=labels,
environment=env,
)
logger.info("universal_pool_container_started slot=%s", i)
def ensure_web_pool(target_size: Optional[int] = None) -> None:
desired = max(0, WEB_POOL_SIZE if target_size is None else target_size)
d = docker_client()
image = "portal-universal-runtime:latest"
for i in range(desired, 100):
name = web_pool_container_name(i)
try:
c = d.containers.get(name)
c.stop(timeout=5)
except docker.errors.NotFound:
break
except Exception:
logger.exception("web_pool_scale_down_failed slot=%s", i)
for i in range(desired):
name = web_pool_container_name(i)
path = f"/w/{i}/"
router = f"wpool-{i}"
labels = {
"traefik.enable": "true",
"traefik.docker.network": "portal_net",
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
f"traefik.http.routers.{router}.entrypoints": "websecure",
f"traefik.http.routers.{router}.tls": "true",
f"traefik.http.routers.{router}.priority": "9450",
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
"portal.pool": "1",
"portal.pool.kind": "web",
"portal.pool.slot": str(i),
}
env = {
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
"ENABLE_HEARTBEAT": "0",
"SESSION_ID": f"webpool-{i}",
"X11VNC_FLAGS": X11VNC_FLAGS,
}
should_create = False
try:
c = d.containers.get(name)
if c.status != "running":
try:
c.start()
except docker.errors.APIError as exc:
if _is_pool_name_conflict(exc):
logger.warning("web_pool_recreate_needed slot=%s reason=name-conflict", i)
_remove_container_by_name(d, name)
should_create = True
else:
raise
if not should_create:
continue
except docker.errors.NotFound:
should_create = True
except Exception:
logger.exception("web_pool_check_failed slot=%s", i)
continue
for attempt in range(3):
try:
d.containers.run(
image=image,
name=name,
detach=True,
auto_remove=True,
network="portal_net",
labels=labels,
environment=env,
)
logger.info("web_pool_container_started slot=%s", i)
break
except docker.errors.APIError as exc:
if _is_pool_name_conflict(exc) and attempt < 2:
logger.warning("web_pool_run_conflict_retry slot=%s attempt=%s", i, attempt + 1)
_remove_container_by_name(d, name)
time.sleep(0.25)
continue
logger.exception("web_pool_run_failed slot=%s", i)
break
def get_universal_pool_status() -> dict:
desired = max(0, UNIVERSAL_POOL_SIZE)
if desired <= 0:
return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []}
d = docker_client()
names = [universal_container_name(i) for i in range(desired)]
containers = []
for name in names:
try:
containers.append(d.containers.get(name))
except Exception:
continue
running = sum(1 for c in containers if c.status == "running")
health = "ok" if running >= min(desired, 1) else "down"
return {
"desired": desired,
"running": running,
"total": len(containers),
"names": sorted(c.name for c in containers),
"health": health,
}
def get_web_pool_status() -> dict:
desired = max(0, WEB_POOL_SIZE)
if desired <= 0:
return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []}
d = docker_client()
names = [web_pool_container_name(i) for i in range(desired)]
containers = []
for name in names:
try:
containers.append(d.containers.get(name))
except Exception:
continue
running = sum(1 for c in containers if c.status == "running")
health = "ok" if running >= min(desired, 1) else "down"
return {
"desired": desired,
"running": running,
"total": len(containers),
"names": sorted(c.name for c in containers),
"health": health,
}
def acquire_universal_slot(db: Session) -> int:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = select(SessionModel).where(
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.container_id.like("POOLIDX:%"),
SessionModel.last_access_at >= cutoff,
)
active = db.scalars(q).all()
busy = set()
for sess in active:
try:
busy.add(int((sess.container_id or "").split(":", 1)[1]))
except Exception:
continue
for i in range(max(0, UNIVERSAL_POOL_SIZE)):
if i not in busy:
return i
if active:
victim = min(active, key=lambda s: s.last_access_at)
victim.status = SessionStatus.TERMINATED
db.commit()
try:
return int((victim.container_id or "").split(":", 1)[1])
except Exception:
pass
return 0
def acquire_web_pool_slot(db: Session) -> int:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = select(SessionModel).where(
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.container_id.like("WEBPOOLIDX:%"),
SessionModel.last_access_at >= cutoff,
)
active = db.scalars(q).all()
busy = set()
for sess in active:
try:
busy.add(int((sess.container_id or "").split(":", 1)[1]))
except Exception:
continue
# Keep headroom: when active sessions are close to hot pool capacity,
# proactively warm up extra slots.
auto_target = max(WEB_POOL_SIZE, len(active) + max(0, WEB_POOL_BUFFER))
if auto_target > WEB_POOL_SIZE:
ensure_web_pool(auto_target)
for i in range(max(0, auto_target)):
if i not in busy:
return i
return 0
def sanitize_client_resolution(width: Optional[int], height: Optional[int]) -> tuple[Optional[int], Optional[int]]:
if width is None or height is None:
return None, None
clamped_width = max(WEB_RESOLUTION_MIN_WIDTH, min(int(width), WEB_RESOLUTION_MAX_WIDTH))
clamped_height = max(WEB_RESOLUTION_MIN_HEIGHT, min(int(height), WEB_RESOLUTION_MAX_HEIGHT))
return clamped_width, clamped_height
def dispatch_universal_target(slot: int, service: Service, width: Optional[int] = None, height: Optional[int] = None) -> None:
name = universal_container_name(slot)
url = ""
payload = {}
if service.type == ServiceType.WEB:
url = f"http://{name}:7000/open"
payload = {"url": normalize_web_target(service.target), "login": service.svc_login or "", "password": service.svc_password or ""}
width, height = sanitize_client_resolution(width, height)
if width and height:
payload["width"] = width
payload["height"] = height
elif service.type == ServiceType.RDP:
cfg = parse_rdp_target(service.target)
url = f"http://{name}:7000/rdp"
payload = {
"host": cfg["host"],
"port": cfg["port"],
"user": cfg["user"],
"password": cfg["password"],
"domain": cfg["domain"],
"security": cfg["security"],
}
else:
raise HTTPException(status_code=400, detail="Universal pool supports WEB/RDP only")
last_exc = None
for _ in range(max(1, POOL_DISPATCH_RETRIES)):
try:
resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS)
resp.raise_for_status()
return
except Exception as exc:
last_exc = exc
time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS))
if last_exc:
raise last_exc
def dispatch_web_pool_target(slot: int, service: Service, width: Optional[int] = None, height: Optional[int] = None) -> None:
name = web_pool_container_name(slot)
target_url = normalize_web_target(service.target)
url = f"http://{name}:7000/open"
payload = {"url": target_url, "login": service.svc_login or "", "password": service.svc_password or ""}
width, height = sanitize_client_resolution(width, height)
if width and height:
payload["width"] = width
payload["height"] = height
last_exc = None
for _ in range(max(1, POOL_DISPATCH_RETRIES)):
try:
resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS)
resp.raise_for_status()
return
except Exception as exc:
last_exc = exc
time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS))
if last_exc:
raise last_exc
def create_runtime_container(service: Service, session_id: str):
d = docker_client()
router = session_router_name(session_id)
path = f"/s/{session_id}/"
labels = {
"traefik.enable": "true",
"traefik.docker.network": "portal_net",
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
f"traefik.http.routers.{router}.entrypoints": "websecure",
f"traefik.http.routers.{router}.tls": "true",
f"traefik.http.routers.{router}.priority": "10000",
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
}
env = {
"SESSION_ID": session_id,
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
"ENABLE_HEARTBEAT": "1",
"TOUCH_PATH": f"/api/sessions/{session_id}/touch",
"X11VNC_FLAGS": X11VNC_FLAGS,
}
image = "portal-kiosk:latest"
if service.type == ServiceType.WEB:
env["TARGET_URL"] = service.target
env["HOME_URL"] = f"https://{PUBLIC_HOST}/"
elif service.type == ServiceType.RDP:
image = "portal-rdp-proxy:latest"
cfg = parse_rdp_target(service.target)
env["RDP_HOST"] = cfg["host"]
env["RDP_PORT"] = cfg["port"]
if cfg["user"]:
env["RDP_USER"] = cfg["user"]
if cfg["password"]:
env["RDP_PASSWORD"] = cfg["password"]
if cfg["domain"]:
env["RDP_DOMAIN"] = cfg["domain"]
if cfg["security"]:
env["RDP_SECURITY"] = cfg["security"]
else:
raise HTTPException(status_code=400, detail="Unsupported service type")
container = d.containers.run(
image=image,
name=f"portal-sess-{session_id[:8]}",
detach=True,
auto_remove=True,
network="portal_net",
labels=labels,
environment=env,
)
logger.info("session_container_started session_id=%s container_id=%s service_type=%s", session_id, container.id, service.type.value)
return container.id
def ensure_warm_pool(service: Service, pool_size: Optional[int] = None) -> None:
if service_uses_universal_pool(service):
return
if pool_size is None:
pool_size = desired_pool_size(service)
if pool_size <= 0:
# Stop stale warm containers for this service when pool is disabled.
prefix = f"portal-warm-{service.slug}-"
try:
d = docker_client()
for c in d.containers.list(all=True, filters={"name": prefix}):
if c.name.startswith(prefix):
c.stop(timeout=5)
except Exception:
logger.exception("warm_pool_disable_failed service=%s", service.slug)
return
d = docker_client()
router = f"warm-{service.slug}"
svc_name = f"warmsvc-{service.slug}"
path = f"/svc/{service.slug}/"
image = "portal-kiosk:latest"
base_env = {
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
"ENABLE_HEARTBEAT": "0",
"TOUCH_PATH": "",
"X11VNC_FLAGS": X11VNC_FLAGS,
}
if service.type == ServiceType.WEB:
base_env["UNIVERSAL_WEB"] = "1"
base_env["START_URL"] = normalize_web_target(service.target)
base_env["HOME_URL"] = f"https://{PUBLIC_HOST}/"
elif service.type == ServiceType.RDP:
image = "portal-rdp-proxy:latest"
cfg = parse_rdp_target(service.target)
base_env["RDP_HOST"] = cfg["host"]
base_env["RDP_PORT"] = cfg["port"]
if cfg["user"]:
base_env["RDP_USER"] = cfg["user"]
if cfg["password"]:
base_env["RDP_PASSWORD"] = cfg["password"]
if cfg["domain"]:
base_env["RDP_DOMAIN"] = cfg["domain"]
if cfg["security"]:
base_env["RDP_SECURITY"] = cfg["security"]
else:
raise HTTPException(status_code=400, detail="Unsupported service type")
labels = {
"traefik.enable": "true",
"traefik.docker.network": "portal_net",
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
f"traefik.http.routers.{router}.entrypoints": "websecure",
f"traefik.http.routers.{router}.tls": "true",
f"traefik.http.routers.{router}.priority": "9500",
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
f"traefik.http.services.{svc_name}.loadbalancer.server.port": "6080",
f"traefik.http.routers.{router}.service": svc_name,
"portal.warm": "1",
"portal.service.slug": service.slug,
"portal.service.type": service.type.value,
}
# Ensure desired cardinality.
for i in range(pool_size, 50):
name = f"portal-warm-{service.slug}-{i}"
try:
c = d.containers.get(name)
c.stop(timeout=5)
except docker.errors.NotFound:
break
except Exception:
logger.exception("warm_pool_scale_down_failed service=%s idx=%s", service.slug, i)
for i in range(pool_size):
name = f"portal-warm-{service.slug}-{i}"
try:
c = d.containers.get(name)
if c.status != "running":
c.start()
continue
except docker.errors.NotFound:
pass
except Exception:
logger.exception("warm_pool_check_failed service=%s idx=%s", service.slug, i)
continue
env = dict(base_env)
env["SESSION_ID"] = f"warm-{service.slug}-{i}"
d.containers.run(
image=image,
name=name,
detach=True,
auto_remove=True,
network="portal_net",
labels=labels,
environment=env,
)
logger.info("warm_pool_container_started service=%s idx=%s", service.slug, i)
def wait_for_session_route(session_id: str, timeout_seconds: int = 6) -> bool:
target = f"{TRAEFIK_INTERNAL_URL}/s/{session_id}/"
deadline = time.time() + timeout_seconds
while time.time() < deadline:
try:
resp = requests.get(
target,
headers={"Host": PUBLIC_HOST},
allow_redirects=False,
timeout=1.5,
)
if resp.status_code != 404:
return True
except Exception:
pass
time.sleep(0.3)
return False
def route_ready(path: str) -> bool:
bases = [TRAEFIK_INTERNAL_URL]
if TRAEFIK_INTERNAL_URL.startswith("http://"):
bases.append("https://" + TRAEFIK_INTERNAL_URL[len("http://"):])
for base in bases:
try:
verify = not base.startswith("https://")
resp = requests.get(
f"{base}{path}",
headers={"Host": PUBLIC_HOST},
allow_redirects=False,
timeout=1.5,
verify=verify,
)
if resp.status_code != 404:
return True
except Exception:
continue
return False
def container_running(container_id: Optional[str]) -> bool:
if not container_id:
return False
if (
container_id.startswith("POOL:")
or container_id.startswith("POOLIDX:")
or container_id.startswith("WEBPOOLIDX:")
):
return True
if container_id.startswith("RDPSLOT:"):
try:
slot_id = int(container_id.split(":", 1)[1])
db = SessionLocal()
try:
slot = db.get(RdpSlot, slot_id)
if not slot or not slot.container_name:
return False
c = docker_client().containers.get(slot.container_name)
return c.status == "running"
finally:
db.close()
except Exception:
return False
try:
c = docker_client().containers.get(container_id)
return c.status == "running"
except Exception:
return False
def _rdp_slot_container_name(service_slug: str, slot_id: int) -> str:
return f"portal-rdpslot-{service_slug}-{slot_id}"
def start_rdp_slot_container(slot: RdpSlot, service: Service) -> str:
d = docker_client()
name = _rdp_slot_container_name(service.slug, slot.id)
slot_id = slot.id
path = f"/rdp/{slot_id}/"
router = f"rdpslot-{slot_id}"
labels = {
"traefik.enable": "true",
"traefik.docker.network": "portal_net",
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
f"traefik.http.routers.{router}.entrypoints": "websecure",
f"traefik.http.routers.{router}.tls": "true",
f"traefik.http.routers.{router}.priority": "10000",
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
"portal.rdpslot": "1",
"portal.rdpslot.id": str(slot_id),
"portal.service.slug": service.slug,
}
cfg = parse_rdp_target(service.target)
env = {
"SESSION_ID": f"rdpslot-{slot_id}",
"IDLE_TIMEOUT": "86400",
"ENABLE_HEARTBEAT": "0",
"RDP_HOST": cfg["host"],
"RDP_PORT": cfg["port"],
"X11VNC_FLAGS": X11VNC_FLAGS,
}
if slot.rdp_username:
env["RDP_USER"] = slot.rdp_username
if slot.rdp_password:
env["RDP_PASSWORD"] = slot.rdp_password
if cfg.get("domain"):
env["RDP_DOMAIN"] = cfg["domain"]
if cfg.get("security"):
env["RDP_SECURITY"] = cfg["security"]
try:
existing = d.containers.get(name)
existing.stop(timeout=5)
existing.remove(force=True)
except docker.errors.NotFound:
pass
except Exception:
logger.exception("rdp_slot_container_cleanup_failed slot_id=%s", slot_id)
container = d.containers.run(
"portal-rdp-proxy:latest",
name=name,
detach=True,
restart_policy={"Name": "unless-stopped"},
network="portal_net",
labels=labels,
environment=env,
)
logger.info("rdp_slot_container_started slot_id=%s name=%s", slot_id, name)
return container.name
def stop_rdp_slot_container(container_name: str) -> None:
if not container_name:
return
try:
d = docker_client()
c = d.containers.get(container_name)
c.stop(timeout=5)
c.remove(force=True)
logger.info("rdp_slot_container_stopped name=%s", container_name)
except docker.errors.NotFound:
pass
except Exception:
logger.exception("rdp_slot_container_stop_failed container=%s", container_name)
def _call_rdp_manager(container_name: str, endpoint: str) -> bool:
url = f"http://{container_name}:7001{endpoint}"
try:
resp = requests.post(url, timeout=10)
logger.info("rdp_manager_%s container=%s status=%s", endpoint.strip('/'), container_name, resp.status_code)
return resp.ok
except Exception:
logger.exception("rdp_manager_call_failed container=%s endpoint=%s", container_name, endpoint)
return False
def connect_rdp_slot(slot_id: int) -> None:
db = SessionLocal()
try:
slot = db.get(RdpSlot, slot_id)
if slot and slot.container_name:
_call_rdp_manager(slot.container_name, "/connect")
finally:
db.close()
def disconnect_rdp_slot(slot_id: int) -> None:
db = SessionLocal()
try:
slot = db.get(RdpSlot, slot_id)
if slot and slot.container_name:
_call_rdp_manager(slot.container_name, "/disconnect")
finally:
db.close()
def _restart_rdp_slot_bg(slot_id: int) -> None:
db = SessionLocal()
try:
slot = db.get(RdpSlot, slot_id)
if not slot or not slot.container_name:
return
service = db.get(Service, slot.service_id)
if not service:
return
try:
d = docker_client()
c = d.containers.get(slot.container_name)
c.restart(timeout=10)
logger.info("rdp_slot_container_restarted slot_id=%s", slot_id)
except docker.errors.NotFound:
start_rdp_slot_container(slot, service)
except Exception:
logger.exception("rdp_slot_container_restart_failed slot_id=%s", slot_id)
finally:
db.close()
def stop_runtime_container(container_id: Optional[str]) -> None:
if not container_id:
return
try:
d = docker_client()
c = d.containers.get(container_id)
c.stop(timeout=5)
except Exception:
logger.exception("session_container_stop_failed container_id=%s", container_id)
def terminate_session_record(
db: Session,
sess: SessionModel,
new_status: SessionStatus = SessionStatus.TERMINATED,
*,
stop_container: bool = True,
) -> None:
if not sess or sess.status != SessionStatus.ACTIVE:
return
old_status = sess.status
cid = sess.container_id or ""
if stop_container and cid and not cid.startswith(("POOL:", "POOLIDX:", "WEBPOOLIDX:", "RDPSLOT:")):
stop_runtime_container(cid)
if cid.startswith("RDPSLOT:"):
try:
slot_id = int(cid.split(":", 1)[1])
threading.Thread(target=disconnect_rdp_slot, args=(slot_id,), daemon=True).start()
except Exception:
logger.exception("rdp_slot_disconnect_failed cid=%s", cid)
sess.status = new_status
sess.last_access_at = now_utc()
log_event(
"session_closed",
level=logging.INFO,
session_id=sess.id,
user_id=sess.user_id,
service_id=sess.service_id,
container_id=cid,
old_status=old_status.value if isinstance(old_status, SessionStatus) else str(old_status),
new_status=new_status.value,
reason=session_closed_reason(sess, db),
stop_container=stop_container,
)
def ensure_schema_compatibility() -> None:
# PostgreSQL requires enum value addition to be committed before usage in constraints.
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
conn.execute(
text(
"""
DO $$
BEGIN
BEGIN
ALTER TYPE servicetype ADD VALUE IF NOT EXISTS 'RDP';
EXCEPTION WHEN undefined_object THEN
NULL;
END;
END $$;
"""
)
)
conn.execute(
text(
"""
DO $$
BEGIN
BEGIN
ALTER TYPE sessionstatus ADD VALUE IF NOT EXISTS 'ROTATED';
EXCEPTION WHEN undefined_object THEN
NULL;
END;
END $$;
"""
)
)
with engine.begin() as conn:
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS warm_pool_size INTEGER NOT NULL DEFAULT 0"))
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS comment TEXT NOT NULL DEFAULT ''"))
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS svc_login VARCHAR(256) NOT NULL DEFAULT ''"))
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS svc_password VARCHAR(256) NOT NULL DEFAULT ''"))
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS svc_cred_hint TEXT NOT NULL DEFAULT ''"))
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS icon_path TEXT NOT NULL DEFAULT ''"))
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS categories (
id SERIAL PRIMARY KEY,
name VARCHAR(128) NOT NULL UNIQUE,
slug VARCHAR(64) NOT NULL UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
"""
)
)
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS service_categories (
id SERIAL PRIMARY KEY,
service_id INT NOT NULL REFERENCES services(id) ON DELETE CASCADE,
category_id INT NOT NULL REFERENCES categories(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (service_id, category_id)
)
"""
)
)
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_service_id ON service_categories(service_id)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_category_id ON service_categories(category_id)"))
# Handle installs where service type is VARCHAR + CHECK.
conn.execute(
text(
"""
DO $$
DECLARE c record;
BEGIN
FOR c IN
SELECT conname
FROM pg_constraint
WHERE conrelid = 'services'::regclass
AND contype = 'c'
AND pg_get_constraintdef(oid) ILIKE '%type%'
LOOP
EXECUTE format('ALTER TABLE services DROP CONSTRAINT %I', c.conname);
END LOOP;
ALTER TABLE services
ADD CONSTRAINT services_type_check
CHECK (type IN ('WEB','VNC','RDP'));
EXCEPTION WHEN duplicate_object THEN
NULL;
END $$;
"""
)
)
def desired_pool_size(service: Service) -> int:
if not service.active:
return 0
if service.type == ServiceType.RDP and not service_uses_universal_pool(service):
# RDP runs on-demand per user session; no prewarmed pool.
return 0
if service_uses_universal_pool(service):
return UNIVERSAL_POOL_SIZE
return service.warm_pool_size if service.warm_pool_size and service.warm_pool_size > 0 else PREWARM_POOL_SIZE
def get_warm_containers_for_service(service: Service) -> list:
prefix = f"portal-warm-{service.slug}-"
try:
d = docker_client()
containers = []
for c in d.containers.list(all=True, filters={"name": prefix}):
if c.name.startswith(prefix):
containers.append(c)
return containers
except Exception:
logger.exception("pool_status_failed service=%s", service.slug)
return []
def get_pool_status_for_service(service: Service) -> dict:
if service.type == ServiceType.WEB:
return get_web_pool_status()
if service.type == ServiceType.RDP and not service_uses_universal_pool(service):
return {"desired": 0, "running": 0, "total": 0, "names": [], "health": "n/a"}
if service_uses_universal_pool(service):
return get_universal_pool_status()
desired = desired_pool_size(service)
containers = get_warm_containers_for_service(service)
running = sum(1 for c in containers if c.status == "running")
states = [(c.attrs.get("State") or {}).get("Status", c.status) for c in containers]
has_bad = any(s in {"exited", "dead"} for s in states)
total = len(containers)
if running == 0:
health = "down"
elif running >= min(desired, 1) and not has_bad:
health = "ok"
else:
health = "degraded"
return {
"desired": desired,
"running": running,
"total": total,
"names": sorted(c.name for c in containers),
"health": health,
}
def get_pool_detailed_status(service: Service) -> dict:
if service.type == ServiceType.WEB:
d = docker_client()
pool = get_web_pool_status()
details = []
for i in range(max(0, pool["desired"])):
name = web_pool_container_name(i)
try:
c = d.containers.get(name)
except Exception:
continue
attrs = c.attrs or {}
state = (attrs.get("State") or {}).get("Status", c.status)
details.append(
{
"name": c.name,
"status": c.status,
"state": state,
"created": attrs.get("Created", ""),
"image": c.image.tags[0] if c.image.tags else "",
"labels_ok": True,
}
)
return {
"service_id": service.id,
"slug": service.slug,
"type": service.type.value,
"desired": pool["desired"],
"running": pool["running"],
"total": pool["total"],
"health": pool["health"],
"containers": details,
"updated_at": now_utc().isoformat(),
}
if service_uses_universal_pool(service):
d = docker_client()
pool = get_universal_pool_status()
details = []
for i in range(max(0, UNIVERSAL_POOL_SIZE)):
name = universal_container_name(i)
try:
c = d.containers.get(name)
except Exception:
continue
attrs = c.attrs or {}
state = (attrs.get("State") or {}).get("Status", c.status)
details.append(
{
"name": c.name,
"status": c.status,
"state": state,
"created": attrs.get("Created", ""),
"image": c.image.tags[0] if c.image.tags else "",
"labels_ok": True,
}
)
return {
"service_id": service.id,
"slug": service.slug,
"type": service.type.value,
"desired": pool["desired"],
"running": pool["running"],
"total": pool["total"],
"health": pool["health"],
"containers": details,
"updated_at": now_utc().isoformat(),
}
containers = get_warm_containers_for_service(service)
pool = get_pool_status_for_service(service)
details = []
for c in sorted(containers, key=lambda x: x.name):
attrs = c.attrs or {}
state = (attrs.get("State") or {}).get("Status", c.status)
created = attrs.get("Created", "")
labels = attrs.get("Config", {}).get("Labels", {}) or {}
labels_ok = (
labels.get("portal.warm") == "1"
and labels.get("portal.service.slug") == service.slug
and labels.get("portal.service.type") == service.type.value
)
details.append(
{
"name": c.name,
"status": c.status,
"state": state,
"created": created,
"image": c.image.tags[0] if c.image.tags else "",
"labels_ok": labels_ok,
}
)
return {
"service_id": service.id,
"slug": service.slug,
"type": service.type.value,
"desired": pool["desired"],
"running": pool["running"],
"total": pool["total"],
"health": pool["health"],
"containers": details,
"updated_at": now_utc().isoformat(),
}
def get_active_sessions_count(db: Session, service_id: int) -> int:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = select(SessionModel).where(
SessionModel.service_id == service_id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
sessions = db.scalars(q).all()
# Avoid inflated stats when pooled slot sessions were duplicated by race:
# for pooled sessions, occupancy is unique container_id.
pooled = [s for s in sessions if (s.container_id or "").startswith(("WEBPOOLIDX:", "POOLIDX:", "POOL:"))]
direct = [s for s in sessions if s not in pooled]
unique_pooled = len({s.container_id for s in pooled if s.container_id})
return unique_pooled + len(direct)
def find_active_session_for_service(db: Session, service_id: int) -> Optional[SessionModel]:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = (
select(SessionModel)
.where(
SessionModel.service_id == service_id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
.order_by(SessionModel.created_at.desc())
)
return db.scalars(q).first()
def find_active_session_for_user_service(db: Session, user_id: int, service_id: int) -> Optional[SessionModel]:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = (
select(SessionModel)
.where(
SessionModel.user_id == user_id,
SessionModel.service_id == service_id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
.order_by(SessionModel.created_at.desc())
)
return db.scalars(q).first()
class LockTimeoutError(Exception):
pass
def allocator_lock(db: Session, lock_id: int, timeout_seconds: Optional[float] = None, poll_seconds: float = 0.05):
class _LockCtx:
def __enter__(self_nonlocal):
self_nonlocal._acquired = False
if timeout_seconds is None:
db.execute(text("SELECT pg_advisory_xact_lock(:lid)"), {"lid": lock_id})
self_nonlocal._acquired = True
return self_nonlocal
deadline = time.monotonic() + max(0.0, timeout_seconds)
while time.monotonic() <= deadline:
got = db.execute(text("SELECT pg_try_advisory_xact_lock(:lid)"), {"lid": lock_id}).scalar()
if got:
self_nonlocal._acquired = True
return self_nonlocal
time.sleep(max(0.01, poll_seconds))
raise LockTimeoutError(f"advisory lock timeout lock_id={lock_id} timeout={timeout_seconds}")
return self_nonlocal
def __exit__(self_nonlocal, exc_type, exc, tb):
return False
return _LockCtx()
def terminate_active_slot_sessions(db: Session, container_id: str) -> None:
if not container_id:
return
db.execute(
text(
"""
UPDATE sessions
SET status = 'TERMINATED'
WHERE container_id = :cid
AND status = 'ACTIVE'
"""
),
{"cid": container_id},
)
def session_redirect_url(sess: SessionModel) -> str:
cid = sess.container_id or ""
if cid.startswith("POOL:") or cid.startswith("POOLIDX:") or cid.startswith("WEBPOOLIDX:") or cid.startswith("RDPSLOT:"):
return f"/s/{sess.id}/view"
return f"/s/{sess.id}/"
def open_warm_web_url(service: Service, target_url: str) -> None:
if service_uses_universal_pool(service):
return
if service.type != ServiceType.WEB:
return
target_url = normalize_web_target(target_url)
try:
d = docker_client()
containers = d.containers.list(
filters={
"label": [
"portal.warm=1",
f"portal.service.slug={service.slug}",
"portal.service.type=WEB",
]
}
)
for c in containers:
try:
resp = requests.post(
f"http://{c.name}:7000/open",
json={"url": target_url},
timeout=2,
)
resp.raise_for_status()
logger.info("warm_web_open_ok service=%s container=%s url=%s", service.slug, c.name, target_url)
except Exception:
logger.exception("warm_web_open_failed service=%s container=%s", service.slug, c.name)
except Exception:
logger.exception("warm_web_open_dispatch_failed service=%s", service.slug)
def service_uses_universal_pool(service) -> bool:
return UNIVERSAL_POOL_SIZE > 0 and service.type == ServiceType.RDP
def universal_container_name(slot: int) -> str:
return f"portal-universal-{slot}"
def web_pool_container_name(slot: int) -> str:
return f"portal-webpool-{slot}"