diff --git a/.env.bak_20260423_084240 b/.env.bak_20260423_084240
new file mode 100644
index 0000000..28e09b7
--- /dev/null
+++ b/.env.bak_20260423_084240
@@ -0,0 +1,15 @@
+PUBLIC_HOST=stend.4mont.ru
+LETSENCRYPT_EMAIL=admin@4mont.ru
+
+POSTGRES_DB=portal
+POSTGRES_USER=portal
+POSTGRES_PASSWORD=change_me
+
+SIGNING_KEY=replace_with_long_random_key
+ADMIN_USERNAME=admin
+ADMIN_PASSWORD=StrongAdminPassword!
+SESSION_IDLE_SECONDS=300
+PREWARM_POOL_SIZE=2
+UNIVERSAL_POOL_SIZE=0
+MAX_ACTIVE_SERVICES_PER_USER=4
+LOG_LEVEL=INFO
diff --git a/.env.bak_before_load_idle_20260424_150032 b/.env.bak_before_load_idle_20260424_150032
new file mode 100644
index 0000000..9ffac2b
--- /dev/null
+++ b/.env.bak_before_load_idle_20260424_150032
@@ -0,0 +1,17 @@
+PUBLIC_HOST=stend.4mont.ru
+LETSENCRYPT_EMAIL=admin@4mont.ru
+
+POSTGRES_DB=portal
+POSTGRES_USER=portal
+POSTGRES_PASSWORD=change_me
+
+SIGNING_KEY=9a6d4b053a47ae24078e07587e69f344111652f153ba50eff31603e43c91f89b
+ADMIN_USERNAME=admin
+ADMIN_PASSWORD=StrongAdminPassword!
+SESSION_IDLE_SECONDS=300
+PREWARM_POOL_SIZE=2
+UNIVERSAL_POOL_SIZE=0
+MAX_ACTIVE_SERVICES_PER_USER=4
+LOG_LEVEL=INFO
+WEB_POOL_SIZE=20
+WEB_POOL_BUFFER=2
diff --git a/.gitignore b/.gitignore
index 67ac2fe..db7d43a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,3 +10,5 @@ __pycache__/
venv/
.DS_Store
traefik/letsencrypt/acme.json
+docs/PROJECT_CONTEXT.md
+PROJECT_CONTEXT.md
diff --git a/app/main.py b/app/main.py
index 872d7e8..cb7ad33 100644
--- a/app/main.py
+++ b/app/main.py
@@ -47,12 +47,18 @@ SESSION_IDLE_SECONDS = int(os.getenv("SESSION_IDLE_SECONDS", "300"))
PUBLIC_HOST = os.getenv("PUBLIC_HOST", "stend.4mont.ru")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
LOG_SLOW_REQUEST_MS = int(os.getenv("LOG_SLOW_REQUEST_MS", "2000"))
+GO_USER_LOCK_TIMEOUT_SECONDS = float(os.getenv("GO_USER_LOCK_TIMEOUT_SECONDS", "8.0"))
+GO_POOL_LOCK_TIMEOUT_SECONDS = float(os.getenv("GO_POOL_LOCK_TIMEOUT_SECONDS", "5.0"))
+POOL_DISPATCH_RETRIES = int(os.getenv("POOL_DISPATCH_RETRIES", "4"))
+POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS = float(os.getenv("POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS", "2.0"))
+POOL_DISPATCH_SLEEP_SECONDS = float(os.getenv("POOL_DISPATCH_SLEEP_SECONDS", "0.3"))
TRAEFIK_INTERNAL_URL = os.getenv("TRAEFIK_INTERNAL_URL", "http://traefik")
PREWARM_POOL_SIZE = int(os.getenv("PREWARM_POOL_SIZE", "0"))
UNIVERSAL_POOL_SIZE = int(os.getenv("UNIVERSAL_POOL_SIZE", "0"))
WEB_POOL_SIZE = int(os.getenv("WEB_POOL_SIZE", "5"))
WEB_POOL_BUFFER = int(os.getenv("WEB_POOL_BUFFER", "2"))
MAX_ACTIVE_SERVICES_PER_USER = int(os.getenv("MAX_ACTIVE_SERVICES_PER_USER", "4"))
+ENABLE_STARTUP_MAINTENANCE = os.getenv("ENABLE_STARTUP_MAINTENANCE", "1") == "1"
ICON_UPLOAD_MAX_BYTES = 2 * 1024 * 1024
ICON_UPLOAD_TYPES = {
"image/png": "png",
@@ -67,6 +73,7 @@ logging.basicConfig(
)
logger = logging.getLogger("portal")
request_id_ctx = contextvars.ContextVar("request_id", default="-")
+maintenance_lock_file = None
def _normalize_log_value(value):
@@ -791,14 +798,14 @@ def dispatch_universal_target(slot: int, service: Service) -> None:
raise HTTPException(status_code=400, detail="Universal pool supports WEB/RDP only")
last_exc = None
- for _ in range(8):
+ for _ in range(max(1, POOL_DISPATCH_RETRIES)):
try:
- resp = requests.post(url, json=payload, timeout=3)
+ resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS)
resp.raise_for_status()
return
except Exception as exc:
last_exc = exc
- time.sleep(0.4)
+ time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS))
if last_exc:
raise last_exc
@@ -808,14 +815,14 @@ def dispatch_web_pool_target(slot: int, service: Service) -> None:
target_url = normalize_web_target(service.target)
url = f"http://{name}:7000/open"
last_exc = None
- for _ in range(8):
+ for _ in range(max(1, POOL_DISPATCH_RETRIES)):
try:
- resp = requests.post(url, json={"url": target_url}, timeout=3)
+ resp = requests.post(url, json={"url": target_url}, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS)
resp.raise_for_status()
return
except Exception as exc:
last_exc = exc
- time.sleep(0.4)
+ time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS))
if last_exc:
raise last_exc
@@ -1363,14 +1370,31 @@ def find_active_session_for_user_service(db: Session, user_id: int, service_id:
return db.scalars(q).first()
-def allocator_lock(db: Session, lock_id: int):
+class LockTimeoutError(Exception):
+ pass
+
+
+def allocator_lock(db: Session, lock_id: int, timeout_seconds: Optional[float] = None, poll_seconds: float = 0.05):
class _LockCtx:
def __enter__(self_nonlocal):
- db.execute(text("SELECT pg_advisory_lock(:lid)"), {"lid": lock_id})
+ self_nonlocal._acquired = False
+ if timeout_seconds is None:
+ db.execute(text("SELECT pg_advisory_xact_lock(:lid)"), {"lid": lock_id})
+ self_nonlocal._acquired = True
+ return self_nonlocal
+
+ deadline = time.monotonic() + max(0.0, timeout_seconds)
+ while time.monotonic() <= deadline:
+ got = db.execute(text("SELECT pg_try_advisory_xact_lock(:lid)"), {"lid": lock_id}).scalar()
+ if got:
+ self_nonlocal._acquired = True
+ return self_nonlocal
+ time.sleep(max(0.01, poll_seconds))
+ raise LockTimeoutError(f"advisory lock timeout lock_id={lock_id} timeout={timeout_seconds}")
+
return self_nonlocal
def __exit__(self_nonlocal, exc_type, exc, tb):
- db.execute(text("SELECT pg_advisory_unlock(:lid)"), {"lid": lock_id})
return False
return _LockCtx()
@@ -1492,17 +1516,36 @@ def bootstrap_admin():
db.close()
-@app.on_event("startup")
-def startup_event():
- # Multiple uvicorn workers run startup in parallel. Serialize schema bootstrap
- # to avoid DDL races on first run and during schema extension.
+def try_acquire_maintenance_leader() -> bool:
+ global maintenance_lock_file
+ if maintenance_lock_file is not None:
+ return True
+ lock_file = open("/tmp/portal-maintenance.lock", "w")
+ try:
+ fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except BlockingIOError:
+ lock_file.close()
+ return False
+ maintenance_lock_file = lock_file
+ return True
+
+
+
+def run_maintenance_service() -> None:
+ logger.info("maintenance_service_bootstrap_started")
with open("/tmp/portal-schema.lock", "w") as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
Base.metadata.create_all(bind=engine)
ensure_schema_compatibility()
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
+
ensure_icons_dir()
bootstrap_admin()
+
+ maintenance_lock = open("/tmp/portal-maintenance.lock", "w")
+ fcntl.flock(maintenance_lock.fileno(), fcntl.LOCK_EX)
+ logger.info("maintenance_service_leader_acquired")
+
db = SessionLocal()
try:
ensure_universal_pool()
@@ -1517,8 +1560,46 @@ def startup_event():
ensure_warm_pool(svc)
finally:
db.close()
+
+ logger.info("maintenance_service_loop_started")
+ cleanup_loop()
+
+@app.on_event("startup")
+def startup_event():
+ # Multiple uvicorn workers run startup in parallel. Serialize schema bootstrap
+ # to avoid DDL races on first run and during schema extension.
+ with open("/tmp/portal-schema.lock", "w") as lock_file:
+ fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
+ Base.metadata.create_all(bind=engine)
+ ensure_schema_compatibility()
+ fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
+ ensure_icons_dir()
+ bootstrap_admin()
+ if not ENABLE_STARTUP_MAINTENANCE:
+ logger.info("startup_maintenance_disabled")
+ return
+ if not try_acquire_maintenance_leader():
+ logger.info("maintenance_leader_skipped")
+ return
+
+ db = SessionLocal()
+ try:
+ ensure_universal_pool()
+ ensure_web_pool()
+ for svc in db.scalars(
+ select(Service).where(
+ Service.active == True,
+ Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
+ )
+ ).all():
+ if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
+ ensure_warm_pool(svc)
+ finally:
+ db.close()
+
thread = threading.Thread(target=cleanup_loop, daemon=True)
thread.start()
+ logger.info("maintenance_leader_started")
@app.get("/", response_class=HTMLResponse)
@@ -1778,6 +1859,23 @@ def logout(request: Request):
@app.get("/go/{slug}")
def go_service(slug: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
+ total_started = time.perf_counter()
+ phase_ms = {}
+
+ def _mark(name: str, started: float) -> None:
+ phase_ms[name] = int((time.perf_counter() - started) * 1000)
+
+ def _emit(result: str, **extra) -> None:
+ payload = {
+ "user_id": user.id,
+ "service_slug": slug,
+ "result": result,
+ "total_ms": int((time.perf_counter() - total_started) * 1000),
+ }
+ payload.update(phase_ms)
+ payload.update(extra)
+ log_event("go_service_timing", **payload)
+
log_event("session_open_requested", user_id=user.id, service_slug=slug)
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
if not service:
@@ -1786,148 +1884,218 @@ def go_service(slug: str, user: User = Depends(require_user), db: Session = Depe
raise HTTPException(status_code=410, detail="VNC services are deprecated")
if not has_access(db, user.id, service.id):
raise HTTPException(status_code=403, detail="ACL denied")
- with allocator_lock(db, 92000 + int(user.id)):
- existing_user_session = find_active_session_for_user_service(db, user.id, service.id)
- if existing_user_session:
- return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303)
- cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
- active_rows = db.scalars(
- select(SessionModel).where(
- SessionModel.user_id == user.id,
- SessionModel.status == SessionStatus.ACTIVE,
- SessionModel.last_access_at >= cutoff,
- )
- ).all()
- active_rows = sorted(active_rows, key=lambda row: row.created_at)
- active_service_ids = {row.service_id for row in active_rows}
- if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER:
- oldest = next((row for row in active_rows if row.service_id != service.id), None)
- if oldest:
- terminate_session_record(db, oldest, SessionStatus.ROTATED, stop_container=True)
- db.commit()
- log_event(
- "session_rotated",
- user_id=user.id,
- closed_session_id=oldest.id,
- closed_service_id=oldest.service_id,
- new_service_id=service.id,
+ user_lock_started = time.perf_counter()
+ try:
+ with allocator_lock(db, 92000 + int(user.id), timeout_seconds=GO_USER_LOCK_TIMEOUT_SECONDS):
+ _mark("wait_user_lock_ms", user_lock_started)
+
+ t_existing = time.perf_counter()
+ existing_user_session = find_active_session_for_user_service(db, user.id, service.id)
+ _mark("check_existing_ms", t_existing)
+ if existing_user_session:
+ _emit("reuse_session", session_id=existing_user_session.id)
+ return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303)
+
+ t_limit = time.perf_counter()
+ cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
+ active_rows = db.scalars(
+ select(SessionModel).where(
+ SessionModel.user_id == user.id,
+ SessionModel.status == SessionStatus.ACTIVE,
+ SessionModel.last_access_at >= cutoff,
)
- else:
- return RedirectResponse(url="/?launch_error=max_services", status_code=303)
-
- if service.type == ServiceType.RDP:
- active_owner = find_active_session_for_service(db, service.id)
- if active_owner:
- if active_owner.user_id != user.id:
- owner = db.get(User, active_owner.user_id)
- owner_name = owner.username if owner else f"id={active_owner.user_id}"
- raise HTTPException(
- status_code=409,
- detail=f"RDP сервис уже занят пользователем {owner_name}. Попробуйте позже.",
- )
- return RedirectResponse(url=session_redirect_url(active_owner), status_code=303)
-
- session_id = str(uuid.uuid4())
- if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0:
- try:
- with allocator_lock(db, 91001):
- ensure_web_pool()
- slot = acquire_web_pool_slot(db)
- slot_cid = f"WEBPOOLIDX:{slot}"
- terminate_active_slot_sessions(db, slot_cid)
- dispatch_web_pool_target(slot, service)
- session_obj = SessionModel(
- id=session_id,
- user_id=user.id,
- service_id=service.id,
- container_id=slot_cid,
- status=SessionStatus.ACTIVE,
- created_at=now_utc(),
- last_access_at=now_utc(),
- )
- db.add(session_obj)
+ ).all()
+ active_rows = sorted(active_rows, key=lambda row: row.created_at)
+ active_service_ids = {row.service_id for row in active_rows}
+ _mark("check_limit_ms", t_limit)
+ if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER:
+ oldest = next((row for row in active_rows if row.service_id != service.id), None)
+ if oldest:
+ t_rotate = time.perf_counter()
+ terminate_session_record(db, oldest, SessionStatus.ROTATED, stop_container=True)
db.commit()
- except Exception as exc:
- logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
- log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="web_pool", error=str(exc))
- audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
- raise HTTPException(status_code=502, detail="WEB runtime failed to switch target")
- log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="web_pool", slot=slot)
- audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
- return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
-
- if service_uses_universal_pool(service):
- try:
- with allocator_lock(db, 91002):
- ensure_universal_pool()
- slot = acquire_universal_slot(db)
- slot_cid = f"POOLIDX:{slot}"
- terminate_active_slot_sessions(db, slot_cid)
- dispatch_universal_target(slot, service)
- session_obj = SessionModel(
- id=session_id,
+ _mark("rotate_oldest_ms", t_rotate)
+ log_event(
+ "session_rotated",
user_id=user.id,
- service_id=service.id,
- container_id=slot_cid,
- status=SessionStatus.ACTIVE,
- created_at=now_utc(),
- last_access_at=now_utc(),
+ closed_session_id=oldest.id,
+ closed_service_id=oldest.service_id,
+ new_service_id=service.id,
)
- db.add(session_obj)
- db.commit()
- except Exception as exc:
- logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
- log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="universal_pool", error=str(exc))
- audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
- raise HTTPException(status_code=502, detail="Universal runtime failed to switch target")
- log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="universal_pool", slot=slot)
- audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
- return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
+ else:
+ _emit("max_services_redirect")
+ return RedirectResponse(url="/?launch_error=max_services", status_code=303)
- if service.type == ServiceType.WEB and desired_pool_size(service) > 0:
- ensure_warm_pool(service)
- open_warm_web_url(service, service.target)
+ if service.type == ServiceType.RDP:
+ t_rdp_owner = time.perf_counter()
+ active_owner = find_active_session_for_service(db, service.id)
+ _mark("check_rdp_owner_ms", t_rdp_owner)
+ if active_owner:
+ if active_owner.user_id != user.id:
+ owner = db.get(User, active_owner.user_id)
+ owner_name = owner.username if owner else f"id={active_owner.user_id}"
+ _emit("rdp_busy", owner=owner_name)
+ raise HTTPException(
+ status_code=409,
+ detail=f"RDP сервис уже занят пользователем {owner_name}. Попробуйте позже.",
+ )
+ _emit("reuse_rdp_session", session_id=active_owner.id)
+ return RedirectResponse(url=session_redirect_url(active_owner), status_code=303)
+
+ session_id = str(uuid.uuid4())
+ if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0:
+ try:
+ t_pool_lock = time.perf_counter()
+ with allocator_lock(db, 91001, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
+ _mark("wait_web_pool_lock_ms", t_pool_lock)
+ t_ensure = time.perf_counter()
+ ensure_web_pool()
+ _mark("ensure_web_pool_ms", t_ensure)
+
+ t_acquire = time.perf_counter()
+ slot = acquire_web_pool_slot(db)
+ _mark("acquire_web_slot_ms", t_acquire)
+ slot_cid = f"WEBPOOLIDX:{slot}"
+
+ t_dispatch = time.perf_counter()
+ terminate_active_slot_sessions(db, slot_cid)
+ dispatch_web_pool_target(slot, service)
+ _mark("dispatch_web_target_ms", t_dispatch)
+
+ t_commit = time.perf_counter()
+ session_obj = SessionModel(
+ id=session_id,
+ user_id=user.id,
+ service_id=service.id,
+ container_id=slot_cid,
+ status=SessionStatus.ACTIVE,
+ created_at=now_utc(),
+ last_access_at=now_utc(),
+ )
+ db.add(session_obj)
+ db.commit()
+ _mark("db_commit_ms", t_commit)
+ except LockTimeoutError:
+ _emit("web_pool_lock_timeout")
+ raise HTTPException(status_code=503, detail="Пул WEB занят. Повторите через несколько секунд.")
+ except Exception as exc:
+ logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
+ log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="web_pool", error=str(exc))
+ audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
+ _emit("web_pool_create_failed", error=str(exc))
+ raise HTTPException(status_code=502, detail="WEB runtime failed to switch target")
+ log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="web_pool", slot=slot)
+ audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
+ _emit("session_created_web_pool", session_id=session_id, slot=slot)
+ return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
+
+ if service_uses_universal_pool(service):
+ try:
+ t_pool_lock = time.perf_counter()
+ with allocator_lock(db, 91002, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
+ _mark("wait_universal_pool_lock_ms", t_pool_lock)
+ t_ensure = time.perf_counter()
+ ensure_universal_pool()
+ _mark("ensure_universal_pool_ms", t_ensure)
+
+ t_acquire = time.perf_counter()
+ slot = acquire_universal_slot(db)
+ _mark("acquire_universal_slot_ms", t_acquire)
+ slot_cid = f"POOLIDX:{slot}"
+
+ t_dispatch = time.perf_counter()
+ terminate_active_slot_sessions(db, slot_cid)
+ dispatch_universal_target(slot, service)
+ _mark("dispatch_universal_target_ms", t_dispatch)
+
+ t_commit = time.perf_counter()
+ session_obj = SessionModel(
+ id=session_id,
+ user_id=user.id,
+ service_id=service.id,
+ container_id=slot_cid,
+ status=SessionStatus.ACTIVE,
+ created_at=now_utc(),
+ last_access_at=now_utc(),
+ )
+ db.add(session_obj)
+ db.commit()
+ _mark("db_commit_ms", t_commit)
+ except LockTimeoutError:
+ _emit("universal_pool_lock_timeout")
+ raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.")
+ except Exception as exc:
+ logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
+ log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="universal_pool", error=str(exc))
+ audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
+ _emit("universal_pool_create_failed", error=str(exc))
+ raise HTTPException(status_code=502, detail="Universal runtime failed to switch target")
+ log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="universal_pool", slot=slot)
+ audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
+ _emit("session_created_universal_pool", session_id=session_id, slot=slot)
+ return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
+
+ if service.type == ServiceType.WEB and desired_pool_size(service) > 0:
+ t_warm = time.perf_counter()
+ ensure_warm_pool(service)
+ open_warm_web_url(service, service.target)
+ _mark("warm_pool_prepare_ms", t_warm)
+
+ t_commit = time.perf_counter()
+ session_obj = SessionModel(
+ id=session_id,
+ user_id=user.id,
+ service_id=service.id,
+ container_id=f"POOL:{service.slug}",
+ status=SessionStatus.ACTIVE,
+ created_at=now_utc(),
+ last_access_at=now_utc(),
+ )
+ db.add(session_obj)
+ db.commit()
+ _mark("db_commit_ms", t_commit)
+ log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="warm_pool")
+ audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id)
+ _emit("session_created_warm_pool", session_id=session_id)
+ return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
+
+ try:
+ t_create = time.perf_counter()
+ container_id = create_runtime_container(service, session_id)
+ _mark("create_runtime_container_ms", t_create)
+ except Exception as exc:
+ logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id)
+ log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="single_runtime", error=str(exc))
+ audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
+ _emit("single_runtime_create_failed", error=str(exc))
+ raise HTTPException(status_code=502, detail="Session runtime failed to start")
+
+ t_commit = time.perf_counter()
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
- container_id=f"POOL:{service.slug}",
+ container_id=container_id,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
- log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="warm_pool")
- audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id)
+ _mark("db_commit_ms", t_commit)
+ log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="single_runtime", container_id=container_id)
+
+ audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id)
+ t_wait = time.perf_counter()
+ ready = wait_for_session_route(session_id)
+ _mark("wait_session_route_ms", t_wait)
+ log_event("session_route_ready", session_id=session_id, ready=ready)
+ _emit("session_created_single_runtime", session_id=session_id, ready=ready)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
-
- try:
- container_id = create_runtime_container(service, session_id)
- except Exception as exc:
- logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id)
- log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="single_runtime", error=str(exc))
- audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
- raise HTTPException(status_code=502, detail="Session runtime failed to start")
-
- session_obj = SessionModel(
- id=session_id,
- user_id=user.id,
- service_id=service.id,
- container_id=container_id,
- status=SessionStatus.ACTIVE,
- created_at=now_utc(),
- last_access_at=now_utc(),
- )
- db.add(session_obj)
- db.commit()
- log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="single_runtime", container_id=container_id)
-
- audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id)
- ready = wait_for_session_route(session_id)
- log_event("session_route_ready", session_id=session_id, ready=ready)
- return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
+ except LockTimeoutError:
+ _emit("user_lock_timeout")
+ raise HTTPException(status_code=429, detail="Слишком много параллельных запусков. Повторите через несколько секунд.")
@app.get("/svc/{slug}/", response_class=HTMLResponse)
diff --git a/app/main.py.bak_batchfix_20260424_122119 b/app/main.py.bak_batchfix_20260424_122119
new file mode 100644
index 0000000..92264eb
--- /dev/null
+++ b/app/main.py.bak_batchfix_20260424_122119
@@ -0,0 +1,2563 @@
+import datetime as dt
+import enum
+import fcntl
+import json
+import re
+import logging
+import os
+from pathlib import Path
+import secrets
+import threading
+import time
+import uuid
+import contextvars
+from urllib.parse import parse_qs, unquote, urlparse
+from typing import Optional
+
+import docker
+import requests
+from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status
+from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
+from fastapi.staticfiles import StaticFiles
+from fastapi.templating import Jinja2Templates
+from itsdangerous import BadSignature, URLSafeTimedSerializer
+from markupsafe import Markup, escape
+from passlib.context import CryptContext
+from sqlalchemy import (
+ Boolean,
+ DateTime,
+ Enum,
+ ForeignKey,
+ Integer,
+ String,
+ Text,
+ UniqueConstraint,
+ create_engine,
+ select,
+ text,
+)
+from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, sessionmaker
+
+
+DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+psycopg2://portal:portal@db:5432/portal")
+COOKIE_NAME = "portal_auth"
+CSRF_COOKIE = "csrf_token"
+COOKIE_MAX_AGE = 8 * 60 * 60
+SESSION_IDLE_SECONDS = int(os.getenv("SESSION_IDLE_SECONDS", "300"))
+PUBLIC_HOST = os.getenv("PUBLIC_HOST", "stend.4mont.ru")
+LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
+LOG_SLOW_REQUEST_MS = int(os.getenv("LOG_SLOW_REQUEST_MS", "2000"))
+GO_USER_LOCK_TIMEOUT_SECONDS = float(os.getenv("GO_USER_LOCK_TIMEOUT_SECONDS", "2.0"))
+GO_POOL_LOCK_TIMEOUT_SECONDS = float(os.getenv("GO_POOL_LOCK_TIMEOUT_SECONDS", "5.0"))
+POOL_DISPATCH_RETRIES = int(os.getenv("POOL_DISPATCH_RETRIES", "4"))
+POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS = float(os.getenv("POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS", "2.0"))
+POOL_DISPATCH_SLEEP_SECONDS = float(os.getenv("POOL_DISPATCH_SLEEP_SECONDS", "0.3"))
+TRAEFIK_INTERNAL_URL = os.getenv("TRAEFIK_INTERNAL_URL", "http://traefik")
+PREWARM_POOL_SIZE = int(os.getenv("PREWARM_POOL_SIZE", "0"))
+UNIVERSAL_POOL_SIZE = int(os.getenv("UNIVERSAL_POOL_SIZE", "0"))
+WEB_POOL_SIZE = int(os.getenv("WEB_POOL_SIZE", "5"))
+WEB_POOL_BUFFER = int(os.getenv("WEB_POOL_BUFFER", "2"))
+MAX_ACTIVE_SERVICES_PER_USER = int(os.getenv("MAX_ACTIVE_SERVICES_PER_USER", "4"))
+ICON_UPLOAD_MAX_BYTES = 2 * 1024 * 1024
+ICON_UPLOAD_TYPES = {
+ "image/png": "png",
+ "image/jpeg": "jpg",
+ "image/webp": "webp",
+}
+SERVICE_ICONS_DIR = Path("static/service-icons")
+
+logging.basicConfig(
+ level=LOG_LEVEL,
+ format="%(asctime)s %(levelname)s %(name)s %(message)s",
+)
+logger = logging.getLogger("portal")
+request_id_ctx = contextvars.ContextVar("request_id", default="-")
+
+
+def _normalize_log_value(value):
+ if isinstance(value, (str, int, float, bool)) or value is None:
+ return value
+ if isinstance(value, dt.datetime):
+ return value.isoformat()
+ return str(value)
+
+
+def log_event(event: str, level: int = logging.INFO, **fields) -> None:
+ payload = {"event": event, "req_id": request_id_ctx.get()}
+ for key, value in fields.items():
+ payload[key] = _normalize_log_value(value)
+ logger.log(level, json.dumps(payload, ensure_ascii=False, separators=(",", ":")))
+
+SIGNING_KEY = os.getenv("SIGNING_KEY", secrets.token_urlsafe(32))
+serializer = URLSafeTimedSerializer(SIGNING_KEY, salt="portal-auth")
+pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
+
+engine = create_engine(DATABASE_URL, pool_pre_ping=True)
+SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
+
+templates = Jinja2Templates(directory="templates")
+app = FastAPI(title="МОНТ - инфрастуктурный полигон")
+app.mount("/static", StaticFiles(directory="static"), name="static")
+
+
+@app.middleware("http")
+async def request_logging_middleware(request: Request, call_next):
+ req_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
+ token = request_id_ctx.set(req_id)
+ started = time.time()
+ client_ip = request.client.host if request.client else "-"
+ user_agent = request.headers.get("user-agent", "-")
+ try:
+ response = await call_next(request)
+ except Exception:
+ log_event(
+ "request_failed",
+ level=logging.ERROR,
+ method=request.method,
+ path=request.url.path,
+ client_ip=client_ip,
+ user_agent=user_agent,
+ )
+ request_id_ctx.reset(token)
+ raise
+ duration_ms = int((time.time() - started) * 1000)
+ level = logging.INFO
+ if response.status_code >= 500:
+ level = logging.ERROR
+ elif response.status_code >= 400:
+ level = logging.WARNING
+ log_event(
+ "request",
+ level=level,
+ method=request.method,
+ path=request.url.path,
+ query=str(request.url.query or ""),
+ status=response.status_code,
+ duration_ms=duration_ms,
+ client_ip=client_ip,
+ user_agent=user_agent,
+ )
+ if duration_ms >= LOG_SLOW_REQUEST_MS:
+ log_event(
+ "slow_request",
+ level=logging.WARNING,
+ method=request.method,
+ path=request.url.path,
+ duration_ms=duration_ms,
+ threshold_ms=LOG_SLOW_REQUEST_MS,
+ )
+ response.headers["X-Request-ID"] = req_id
+ request_id_ctx.reset(token)
+ return response
+
+
+class Base(DeclarativeBase):
+ pass
+
+
+class ServiceType(str, enum.Enum):
+ WEB = "WEB"
+ VNC = "VNC"
+ RDP = "RDP"
+
+
+class SessionStatus(str, enum.Enum):
+ ACTIVE = "ACTIVE"
+ EXPIRED = "EXPIRED"
+ TERMINATED = "TERMINATED"
+ ROTATED = "ROTATED"
+
+
+class User(Base):
+ __tablename__ = "users"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ username: Mapped[str] = mapped_column(String(64), unique=True, index=True)
+ password_hash: Mapped[str] = mapped_column(String(255))
+ expires_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), index=True)
+ active: Mapped[bool] = mapped_column(Boolean, default=True, index=True)
+ is_admin: Mapped[bool] = mapped_column(Boolean, default=False)
+ created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
+
+
+class Service(Base):
+ __tablename__ = "services"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ name: Mapped[str] = mapped_column(String(128))
+ slug: Mapped[str] = mapped_column(String(64), unique=True, index=True)
+ type: Mapped[ServiceType] = mapped_column(Enum(ServiceType), index=True)
+ target: Mapped[str] = mapped_column(Text)
+ comment: Mapped[str] = mapped_column(Text, default="")
+ icon_path: Mapped[str] = mapped_column(Text, default="")
+ active: Mapped[bool] = mapped_column(Boolean, default=True)
+ warm_pool_size: Mapped[int] = mapped_column(Integer, default=0)
+ created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
+
+
+class Category(Base):
+ __tablename__ = "categories"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ name: Mapped[str] = mapped_column(String(128), unique=True, index=True)
+ slug: Mapped[str] = mapped_column(String(64), unique=True, index=True)
+ created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
+
+
+class ServiceCategory(Base):
+ __tablename__ = "service_categories"
+ __table_args__ = (UniqueConstraint("service_id", "category_id", name="uq_service_category"),)
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True)
+ category_id: Mapped[int] = mapped_column(ForeignKey("categories.id", ondelete="CASCADE"), index=True)
+ created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
+
+
+class UserServiceAccess(Base):
+ __tablename__ = "user_service_access"
+ __table_args__ = (UniqueConstraint("user_id", "service_id", name="uq_user_service"),)
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True)
+ service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True)
+ granted_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
+
+
+class SessionModel(Base):
+ __tablename__ = "sessions"
+
+ id: Mapped[str] = mapped_column(String(36), primary_key=True)
+ user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True)
+ service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True)
+ status: Mapped[SessionStatus] = mapped_column(Enum(SessionStatus), default=SessionStatus.ACTIVE, index=True)
+ created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True)
+ last_access_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True)
+ container_id: Mapped[Optional[str]] = mapped_column(String(128), nullable=True)
+
+
+class AuditLog(Base):
+ __tablename__ = "audit_logs"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ user_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, index=True)
+ action: Mapped[str] = mapped_column(String(128), index=True)
+ details: Mapped[str] = mapped_column(Text)
+ created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True)
+
+
+def now_utc() -> dt.datetime:
+ return dt.datetime.now(dt.timezone.utc)
+
+
+def session_closed_reason(sess: SessionModel, db: Session) -> str:
+ if not sess:
+ return "idle"
+ if sess.status == SessionStatus.EXPIRED:
+ return "idle"
+ if sess.status == SessionStatus.ROTATED:
+ return "limit"
+ if sess.status == SessionStatus.TERMINATED:
+ cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
+ active_rows = db.scalars(
+ select(SessionModel).where(
+ SessionModel.user_id == sess.user_id,
+ SessionModel.status == SessionStatus.ACTIVE,
+ SessionModel.last_access_at >= cutoff,
+ )
+ ).all()
+ active_service_ids = {row.service_id for row in active_rows}
+ if len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER and sess.service_id not in active_service_ids:
+ return "limit"
+ return "idle"
+
+
+def normalize_web_target(url: str) -> str:
+ raw = (url or "").strip()
+ if not raw:
+ return raw
+ if raw.startswith(("http://", "https://")):
+ return raw
+ return f"http://{raw}"
+
+
+def format_service_comment(raw_text: str) -> Markup:
+ raw = (raw_text or "").replace("\r\n", "\n").replace("\r", "\n").strip()
+ if not raw:
+ return Markup("")
+ escaped = str(escape(raw))
+ # Support pasted/plain markdown-like bold fragments.
+ escaped = re.sub(r"\*\*(.+?)\*\*", r"\1", escaped, flags=re.DOTALL)
+ # Allow a small safe subset of pasted HTML tags.
+ replacements = {
+ "<b>": "",
+ "</b>": "",
+ "<strong>": "",
+ "</strong>": "",
+ "<i>": "",
+ "</i>": "",
+ "<em>": "",
+ "</em>": "",
+ "<u>": "",
+ "</u>": "",
+ "<br>": "
",
+ "<br/>": "
",
+ "<br />": "
",
+ }
+ for src, dst in replacements.items():
+ escaped = escaped.replace(src, dst)
+ escaped = escaped.replace("\n", "
")
+ return Markup(escaped)
+
+
+def parse_rdp_target(target: str) -> dict:
+ raw = (target or "").strip()
+ if not raw:
+ raise HTTPException(status_code=400, detail="Empty RDP target")
+
+ parsed = urlparse(raw if "://" in raw else f"//{raw}")
+ host = parsed.hostname
+ if not host:
+ raise HTTPException(status_code=400, detail="Invalid RDP target. Use host:port or rdp://user:pass@host:port")
+ port = parsed.port or 3389
+
+ username = unquote(parsed.username) if parsed.username else ""
+ password = unquote(parsed.password) if parsed.password else ""
+
+ query = parse_qs(parsed.query or "")
+ if not username:
+ username = (query.get("u", [""])[0] or query.get("user", [""])[0] or "").strip()
+ if not password:
+ password = (query.get("p", [""])[0] or query.get("password", [""])[0] or "").strip()
+
+ domain = (query.get("domain", [""])[0] or query.get("d", [""])[0] or "").strip()
+ security = (query.get("sec", [""])[0] or query.get("security", [""])[0] or "").strip().lower()
+ if security and security not in {"nla", "tls", "rdp"}:
+ raise HTTPException(status_code=400, detail="Invalid RDP security. Use one of: nla, tls, rdp")
+
+ return {
+ "host": host,
+ "port": str(port),
+ "user": username,
+ "password": password,
+ "domain": domain,
+ "security": security,
+ }
+
+
+def set_service_categories(db: Session, service_id: int, category_ids: list[int]) -> None:
+ normalized = sorted({int(x) for x in (category_ids or [])})
+ if normalized:
+ existing_ids = set(db.scalars(select(Category.id).where(Category.id.in_(normalized))).all())
+ missing = sorted(set(normalized) - existing_ids)
+ if missing:
+ raise HTTPException(status_code=400, detail=f"Unknown category ids: {missing}")
+
+ existing_links = db.scalars(select(ServiceCategory).where(ServiceCategory.service_id == service_id)).all()
+ current = {row.category_id: row for row in existing_links}
+ wanted = set(normalized)
+
+ for cat_id in wanted:
+ if cat_id not in current:
+ db.add(ServiceCategory(service_id=service_id, category_id=cat_id))
+ for cat_id, row in current.items():
+ if cat_id not in wanted:
+ db.delete(row)
+
+
+def service_uses_universal_pool(service: Service) -> bool:
+ return UNIVERSAL_POOL_SIZE > 0 and service.type == ServiceType.RDP
+
+
+def universal_container_name(slot: int) -> str:
+ return f"portal-universal-{slot}"
+
+
+def web_pool_container_name(slot: int) -> str:
+ return f"portal-webpool-{slot}"
+
+
+def ensure_icons_dir() -> None:
+ SERVICE_ICONS_DIR.mkdir(parents=True, exist_ok=True)
+
+
+def remove_icon_file(icon_path: str) -> None:
+ if not icon_path or not icon_path.startswith("/static/service-icons/"):
+ return
+ filename = icon_path.rsplit("/", 1)[-1]
+ candidate = SERVICE_ICONS_DIR / filename
+ try:
+ candidate.unlink(missing_ok=True)
+ except Exception:
+ logger.exception("icon_delete_failed path=%s", candidate)
+
+
+async def store_service_icon(service: Service, upload: UploadFile) -> str:
+ ensure_icons_dir()
+ content_type = (upload.content_type or "").lower().strip()
+ ext = ICON_UPLOAD_TYPES.get(content_type)
+ if not ext:
+ raise HTTPException(status_code=400, detail="Unsupported file type. Use PNG/JPG/WEBP")
+
+ payload = await upload.read(ICON_UPLOAD_MAX_BYTES + 1)
+ if len(payload) > ICON_UPLOAD_MAX_BYTES:
+ raise HTTPException(status_code=400, detail="File too large. Max 2MB")
+ if not payload:
+ raise HTTPException(status_code=400, detail="Empty file")
+
+ stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%d_%H%M%S")
+ filename = f"svc_{service.id}_{stamp}.{ext}"
+ target = SERVICE_ICONS_DIR / filename
+ target.write_bytes(payload)
+ return f"/static/service-icons/{filename}"
+
+
+def get_db():
+ db = SessionLocal()
+ try:
+ yield db
+ finally:
+ db.close()
+
+
+def audit(db: Session, action: str, details: str, user_id: Optional[int] = None) -> None:
+ db.add(AuditLog(user_id=user_id, action=action, details=details))
+ db.commit()
+
+
+def hash_password(password: str) -> str:
+ return pwd_context.hash(password)
+
+
+def verify_password(password: str, password_hash: str) -> bool:
+ return pwd_context.verify(password, password_hash)
+
+
+def user_is_valid(user: User) -> bool:
+ return bool(user.active and user.expires_at > now_utc())
+
+
+def issue_auth_cookie(response: RedirectResponse, user: User) -> None:
+ token = serializer.dumps({"user_id": user.id})
+ response.set_cookie(
+ key=COOKIE_NAME,
+ value=token,
+ httponly=True,
+ secure=True,
+ samesite="strict",
+ max_age=COOKIE_MAX_AGE,
+ path="/",
+ )
+
+
+def issue_csrf_cookie(response: RedirectResponse) -> str:
+ token = secrets.token_urlsafe(24)
+ response.set_cookie(
+ key=CSRF_COOKIE,
+ value=token,
+ httponly=False,
+ secure=True,
+ samesite="strict",
+ max_age=COOKIE_MAX_AGE,
+ path="/",
+ )
+ return token
+
+
+def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]:
+ raw = request.cookies.get(COOKIE_NAME)
+ if not raw:
+ return None
+ try:
+ payload = serializer.loads(raw, max_age=COOKIE_MAX_AGE)
+ except BadSignature:
+ return None
+ user = db.get(User, int(payload["user_id"]))
+ if not user or not user_is_valid(user):
+ return None
+ return user
+
+
+def require_user(user: Optional[User] = Depends(get_current_user)) -> User:
+ if not user:
+ raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized")
+ return user
+
+
+def require_admin(user: User = Depends(require_user)) -> User:
+ if not user.is_admin:
+ raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin only")
+ return user
+
+
+def validate_csrf(request: Request) -> None:
+ cookie = request.cookies.get(CSRF_COOKIE)
+ form_val = request.headers.get("X-CSRF-Token")
+ if request.headers.get("content-type", "").startswith("application/x-www-form-urlencoded"):
+ return
+ if not cookie or not form_val or cookie != form_val:
+ raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="CSRF failed")
+
+
+def has_access(db: Session, user_id: int, service_id: int) -> bool:
+ q = select(UserServiceAccess).where(
+ UserServiceAccess.user_id == user_id,
+ UserServiceAccess.service_id == service_id,
+ )
+ return db.scalar(q) is not None
+
+
+def docker_client():
+ return docker.from_env()
+
+
+def session_router_name(session_id: str) -> str:
+ return f"sess-{session_id.replace('-', '')[:16]}"
+
+
+def _is_pool_name_conflict(exc: Exception) -> bool:
+ msg = str(exc).lower()
+ return ("already in use" in msg) or ("marked for removal" in msg)
+
+
+def _remove_container_by_name(d, name: str) -> None:
+ try:
+ old = d.containers.get(name)
+ old.remove(force=True)
+ except docker.errors.NotFound:
+ return
+ except Exception:
+ logger.exception("pool_container_remove_failed name=%s", name)
+
+
+def ensure_universal_pool() -> None:
+ if UNIVERSAL_POOL_SIZE <= 0:
+ return
+ d = docker_client()
+ image = "portal-universal-runtime:latest"
+
+ for i in range(UNIVERSAL_POOL_SIZE, 100):
+ name = universal_container_name(i)
+ try:
+ c = d.containers.get(name)
+ c.stop(timeout=5)
+ except docker.errors.NotFound:
+ break
+ except Exception:
+ logger.exception("universal_pool_scale_down_failed slot=%s", i)
+
+ for i in range(UNIVERSAL_POOL_SIZE):
+ name = universal_container_name(i)
+ path = f"/u/{i}/"
+ router = f"upool-{i}"
+ labels = {
+ "traefik.enable": "true",
+ "traefik.docker.network": "portal_net",
+ f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
+ f"traefik.http.routers.{router}.entrypoints": "websecure",
+ f"traefik.http.routers.{router}.tls": "true",
+ f"traefik.http.routers.{router}.priority": "9400",
+ f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
+ f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
+ f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
+ "portal.pool": "1",
+ "portal.pool.slot": str(i),
+ }
+ env = {
+ "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
+ "ENABLE_HEARTBEAT": "0",
+ "SESSION_ID": f"universal-{i}",
+ }
+ try:
+ c = d.containers.get(name)
+ if c.status != "running":
+ c.start()
+ continue
+ except docker.errors.NotFound:
+ pass
+ except Exception:
+ logger.exception("universal_pool_check_failed slot=%s", i)
+ continue
+
+ d.containers.run(
+ image=image,
+ name=name,
+ detach=True,
+ auto_remove=True,
+ network="portal_net",
+ labels=labels,
+ environment=env,
+ )
+ logger.info("universal_pool_container_started slot=%s", i)
+
+
+def ensure_web_pool(target_size: Optional[int] = None) -> None:
+ desired = max(0, WEB_POOL_SIZE if target_size is None else target_size)
+ d = docker_client()
+ image = "portal-universal-runtime:latest"
+
+ for i in range(desired, 100):
+ name = web_pool_container_name(i)
+ try:
+ c = d.containers.get(name)
+ c.stop(timeout=5)
+ except docker.errors.NotFound:
+ break
+ except Exception:
+ logger.exception("web_pool_scale_down_failed slot=%s", i)
+
+ for i in range(desired):
+ name = web_pool_container_name(i)
+ path = f"/w/{i}/"
+ router = f"wpool-{i}"
+ labels = {
+ "traefik.enable": "true",
+ "traefik.docker.network": "portal_net",
+ f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
+ f"traefik.http.routers.{router}.entrypoints": "websecure",
+ f"traefik.http.routers.{router}.tls": "true",
+ f"traefik.http.routers.{router}.priority": "9450",
+ f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
+ f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
+ f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
+ "portal.pool": "1",
+ "portal.pool.kind": "web",
+ "portal.pool.slot": str(i),
+ }
+ env = {
+ "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
+ "ENABLE_HEARTBEAT": "0",
+ "SESSION_ID": f"webpool-{i}",
+ }
+ should_create = False
+ try:
+ c = d.containers.get(name)
+ if c.status != "running":
+ try:
+ c.start()
+ except docker.errors.APIError as exc:
+ if _is_pool_name_conflict(exc):
+ logger.warning("web_pool_recreate_needed slot=%s reason=name-conflict", i)
+ _remove_container_by_name(d, name)
+ should_create = True
+ else:
+ raise
+ if not should_create:
+ continue
+ except docker.errors.NotFound:
+ should_create = True
+ except Exception:
+ logger.exception("web_pool_check_failed slot=%s", i)
+ continue
+
+ for attempt in range(3):
+ try:
+ d.containers.run(
+ image=image,
+ name=name,
+ detach=True,
+ auto_remove=True,
+ network="portal_net",
+ labels=labels,
+ environment=env,
+ )
+ logger.info("web_pool_container_started slot=%s", i)
+ break
+ except docker.errors.APIError as exc:
+ if _is_pool_name_conflict(exc) and attempt < 2:
+ logger.warning("web_pool_run_conflict_retry slot=%s attempt=%s", i, attempt + 1)
+ _remove_container_by_name(d, name)
+ time.sleep(0.25)
+ continue
+ logger.exception("web_pool_run_failed slot=%s", i)
+ break
+
+
+def get_universal_pool_status() -> dict:
+ desired = max(0, UNIVERSAL_POOL_SIZE)
+ if desired <= 0:
+ return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []}
+ d = docker_client()
+ names = [universal_container_name(i) for i in range(desired)]
+ containers = []
+ for name in names:
+ try:
+ containers.append(d.containers.get(name))
+ except Exception:
+ continue
+ running = sum(1 for c in containers if c.status == "running")
+ health = "ok" if running >= min(desired, 1) else "down"
+ return {
+ "desired": desired,
+ "running": running,
+ "total": len(containers),
+ "names": sorted(c.name for c in containers),
+ "health": health,
+ }
+
+
+def get_web_pool_status() -> dict:
+ desired = max(0, WEB_POOL_SIZE)
+ if desired <= 0:
+ return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []}
+ d = docker_client()
+ names = [web_pool_container_name(i) for i in range(desired)]
+ containers = []
+ for name in names:
+ try:
+ containers.append(d.containers.get(name))
+ except Exception:
+ continue
+ running = sum(1 for c in containers if c.status == "running")
+ health = "ok" if running >= min(desired, 1) else "down"
+ return {
+ "desired": desired,
+ "running": running,
+ "total": len(containers),
+ "names": sorted(c.name for c in containers),
+ "health": health,
+ }
+
+
+def acquire_universal_slot(db: Session) -> int:
+ cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
+ q = select(SessionModel).where(
+ SessionModel.status == SessionStatus.ACTIVE,
+ SessionModel.container_id.like("POOLIDX:%"),
+ SessionModel.last_access_at >= cutoff,
+ )
+ active = db.scalars(q).all()
+ busy = set()
+ for sess in active:
+ try:
+ busy.add(int((sess.container_id or "").split(":", 1)[1]))
+ except Exception:
+ continue
+ for i in range(max(0, UNIVERSAL_POOL_SIZE)):
+ if i not in busy:
+ return i
+ if active:
+ victim = min(active, key=lambda s: s.last_access_at)
+ victim.status = SessionStatus.TERMINATED
+ db.commit()
+ try:
+ return int((victim.container_id or "").split(":", 1)[1])
+ except Exception:
+ pass
+ return 0
+
+
+def acquire_web_pool_slot(db: Session) -> int:
+ cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
+ q = select(SessionModel).where(
+ SessionModel.status == SessionStatus.ACTIVE,
+ SessionModel.container_id.like("WEBPOOLIDX:%"),
+ SessionModel.last_access_at >= cutoff,
+ )
+ active = db.scalars(q).all()
+ busy = set()
+ for sess in active:
+ try:
+ busy.add(int((sess.container_id or "").split(":", 1)[1]))
+ except Exception:
+ continue
+
+ # Keep headroom: when active sessions are close to hot pool capacity,
+ # proactively warm up extra slots.
+ auto_target = max(WEB_POOL_SIZE, len(active) + max(0, WEB_POOL_BUFFER))
+ if auto_target > WEB_POOL_SIZE:
+ ensure_web_pool(auto_target)
+
+ for i in range(max(0, auto_target)):
+ if i not in busy:
+ return i
+ return 0
+
+
+def dispatch_universal_target(slot: int, service: Service) -> None:
+ name = universal_container_name(slot)
+ url = ""
+ payload = {}
+ if service.type == ServiceType.WEB:
+ url = f"http://{name}:7000/open"
+ payload = {"url": normalize_web_target(service.target)}
+ elif service.type == ServiceType.RDP:
+ cfg = parse_rdp_target(service.target)
+ url = f"http://{name}:7000/rdp"
+ payload = {
+ "host": cfg["host"],
+ "port": cfg["port"],
+ "user": cfg["user"],
+ "password": cfg["password"],
+ "domain": cfg["domain"],
+ "security": cfg["security"],
+ }
+ else:
+ raise HTTPException(status_code=400, detail="Universal pool supports WEB/RDP only")
+
+ last_exc = None
+ for _ in range(max(1, POOL_DISPATCH_RETRIES)):
+ try:
+ resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS)
+ resp.raise_for_status()
+ return
+ except Exception as exc:
+ last_exc = exc
+ time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS))
+ if last_exc:
+ raise last_exc
+
+
+def dispatch_web_pool_target(slot: int, service: Service) -> None:
+ name = web_pool_container_name(slot)
+ target_url = normalize_web_target(service.target)
+ url = f"http://{name}:7000/open"
+ last_exc = None
+ for _ in range(max(1, POOL_DISPATCH_RETRIES)):
+ try:
+ resp = requests.post(url, json={"url": target_url}, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS)
+ resp.raise_for_status()
+ return
+ except Exception as exc:
+ last_exc = exc
+ time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS))
+ if last_exc:
+ raise last_exc
+
+
+def create_runtime_container(service: Service, session_id: str):
+ d = docker_client()
+ router = session_router_name(session_id)
+ path = f"/s/{session_id}/"
+ labels = {
+ "traefik.enable": "true",
+ "traefik.docker.network": "portal_net",
+ f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
+ f"traefik.http.routers.{router}.entrypoints": "websecure",
+ f"traefik.http.routers.{router}.tls": "true",
+ f"traefik.http.routers.{router}.priority": "10000",
+ f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
+ f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
+ f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
+ }
+
+ env = {
+ "SESSION_ID": session_id,
+ "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
+ "ENABLE_HEARTBEAT": "1",
+ "TOUCH_PATH": f"/api/sessions/{session_id}/touch",
+ }
+ image = "portal-kiosk:latest"
+
+ if service.type == ServiceType.WEB:
+ env["TARGET_URL"] = service.target
+ env["HOME_URL"] = f"https://{PUBLIC_HOST}/"
+ elif service.type == ServiceType.RDP:
+ image = "portal-rdp-proxy:latest"
+ cfg = parse_rdp_target(service.target)
+ env["RDP_HOST"] = cfg["host"]
+ env["RDP_PORT"] = cfg["port"]
+ if cfg["user"]:
+ env["RDP_USER"] = cfg["user"]
+ if cfg["password"]:
+ env["RDP_PASSWORD"] = cfg["password"]
+ if cfg["domain"]:
+ env["RDP_DOMAIN"] = cfg["domain"]
+ if cfg["security"]:
+ env["RDP_SECURITY"] = cfg["security"]
+ else:
+ raise HTTPException(status_code=400, detail="Unsupported service type")
+
+ container = d.containers.run(
+ image=image,
+ name=f"portal-sess-{session_id[:8]}",
+ detach=True,
+ auto_remove=True,
+ network="portal_net",
+ labels=labels,
+ environment=env,
+ )
+ logger.info("session_container_started session_id=%s container_id=%s service_type=%s", session_id, container.id, service.type.value)
+ return container.id
+
+
+def ensure_warm_pool(service: Service, pool_size: Optional[int] = None) -> None:
+ if service_uses_universal_pool(service):
+ return
+ if pool_size is None:
+ pool_size = desired_pool_size(service)
+ if pool_size <= 0:
+ # Stop stale warm containers for this service when pool is disabled.
+ prefix = f"portal-warm-{service.slug}-"
+ try:
+ d = docker_client()
+ for c in d.containers.list(all=True, filters={"name": prefix}):
+ if c.name.startswith(prefix):
+ c.stop(timeout=5)
+ except Exception:
+ logger.exception("warm_pool_disable_failed service=%s", service.slug)
+ return
+ d = docker_client()
+ router = f"warm-{service.slug}"
+ svc_name = f"warmsvc-{service.slug}"
+ path = f"/svc/{service.slug}/"
+ image = "portal-kiosk:latest"
+ base_env = {
+ "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
+ "ENABLE_HEARTBEAT": "0",
+ "TOUCH_PATH": "",
+ }
+ if service.type == ServiceType.WEB:
+ base_env["UNIVERSAL_WEB"] = "1"
+ base_env["START_URL"] = normalize_web_target(service.target)
+ base_env["HOME_URL"] = f"https://{PUBLIC_HOST}/"
+ elif service.type == ServiceType.RDP:
+ image = "portal-rdp-proxy:latest"
+ cfg = parse_rdp_target(service.target)
+ base_env["RDP_HOST"] = cfg["host"]
+ base_env["RDP_PORT"] = cfg["port"]
+ if cfg["user"]:
+ base_env["RDP_USER"] = cfg["user"]
+ if cfg["password"]:
+ base_env["RDP_PASSWORD"] = cfg["password"]
+ if cfg["domain"]:
+ base_env["RDP_DOMAIN"] = cfg["domain"]
+ if cfg["security"]:
+ base_env["RDP_SECURITY"] = cfg["security"]
+ else:
+ raise HTTPException(status_code=400, detail="Unsupported service type")
+
+ labels = {
+ "traefik.enable": "true",
+ "traefik.docker.network": "portal_net",
+ f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
+ f"traefik.http.routers.{router}.entrypoints": "websecure",
+ f"traefik.http.routers.{router}.tls": "true",
+ f"traefik.http.routers.{router}.priority": "9500",
+ f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
+ f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
+ f"traefik.http.services.{svc_name}.loadbalancer.server.port": "6080",
+ f"traefik.http.routers.{router}.service": svc_name,
+ "portal.warm": "1",
+ "portal.service.slug": service.slug,
+ "portal.service.type": service.type.value,
+ }
+
+ # Ensure desired cardinality.
+ for i in range(pool_size, 50):
+ name = f"portal-warm-{service.slug}-{i}"
+ try:
+ c = d.containers.get(name)
+ c.stop(timeout=5)
+ except docker.errors.NotFound:
+ break
+ except Exception:
+ logger.exception("warm_pool_scale_down_failed service=%s idx=%s", service.slug, i)
+
+ for i in range(pool_size):
+ name = f"portal-warm-{service.slug}-{i}"
+ try:
+ c = d.containers.get(name)
+ if c.status != "running":
+ c.start()
+ continue
+ except docker.errors.NotFound:
+ pass
+ except Exception:
+ logger.exception("warm_pool_check_failed service=%s idx=%s", service.slug, i)
+ continue
+
+ env = dict(base_env)
+ env["SESSION_ID"] = f"warm-{service.slug}-{i}"
+ d.containers.run(
+ image=image,
+ name=name,
+ detach=True,
+ auto_remove=True,
+ network="portal_net",
+ labels=labels,
+ environment=env,
+ )
+ logger.info("warm_pool_container_started service=%s idx=%s", service.slug, i)
+
+
+def wait_for_session_route(session_id: str, timeout_seconds: int = 6) -> bool:
+ target = f"{TRAEFIK_INTERNAL_URL}/s/{session_id}/"
+ deadline = time.time() + timeout_seconds
+ while time.time() < deadline:
+ try:
+ resp = requests.get(
+ target,
+ headers={"Host": PUBLIC_HOST},
+ allow_redirects=False,
+ timeout=1.5,
+ )
+ if resp.status_code != 404:
+ return True
+ except Exception:
+ pass
+ time.sleep(0.3)
+ return False
+
+
+def route_ready(path: str) -> bool:
+ bases = [TRAEFIK_INTERNAL_URL]
+ if TRAEFIK_INTERNAL_URL.startswith("http://"):
+ bases.append("https://" + TRAEFIK_INTERNAL_URL[len("http://"):])
+ for base in bases:
+ try:
+ verify = not base.startswith("https://")
+ resp = requests.get(
+ f"{base}{path}",
+ headers={"Host": PUBLIC_HOST},
+ allow_redirects=False,
+ timeout=1.5,
+ verify=verify,
+ )
+ if resp.status_code != 404:
+ return True
+ except Exception:
+ continue
+ return False
+
+
+def container_running(container_id: Optional[str]) -> bool:
+ if not container_id:
+ return False
+ if (
+ container_id.startswith("POOL:")
+ or container_id.startswith("POOLIDX:")
+ or container_id.startswith("WEBPOOLIDX:")
+ ):
+ return True
+ try:
+ c = docker_client().containers.get(container_id)
+ return c.status == "running"
+ except Exception:
+ return False
+
+
+def stop_runtime_container(container_id: Optional[str]) -> None:
+ if not container_id:
+ return
+ try:
+ d = docker_client()
+ c = d.containers.get(container_id)
+ c.stop(timeout=5)
+ except Exception:
+ logger.exception("session_container_stop_failed container_id=%s", container_id)
+
+
+def terminate_session_record(
+ db: Session,
+ sess: SessionModel,
+ new_status: SessionStatus = SessionStatus.TERMINATED,
+ *,
+ stop_container: bool = True,
+) -> None:
+ if not sess or sess.status != SessionStatus.ACTIVE:
+ return
+ old_status = sess.status
+ cid = sess.container_id or ""
+ if stop_container and cid and not cid.startswith(("POOL:", "POOLIDX:", "WEBPOOLIDX:")):
+ stop_runtime_container(cid)
+ sess.status = new_status
+ sess.last_access_at = now_utc()
+ log_event(
+ "session_closed",
+ level=logging.INFO,
+ session_id=sess.id,
+ user_id=sess.user_id,
+ service_id=sess.service_id,
+ container_id=cid,
+ old_status=old_status.value if isinstance(old_status, SessionStatus) else str(old_status),
+ new_status=new_status.value,
+ reason=session_closed_reason(sess, db),
+ stop_container=stop_container,
+ )
+
+
+def ensure_schema_compatibility() -> None:
+ # PostgreSQL requires enum value addition to be committed before usage in constraints.
+ with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
+ conn.execute(
+ text(
+ """
+ DO $$
+ BEGIN
+ BEGIN
+ ALTER TYPE servicetype ADD VALUE IF NOT EXISTS 'RDP';
+ EXCEPTION WHEN undefined_object THEN
+ NULL;
+ END;
+ END $$;
+ """
+ )
+ )
+ conn.execute(
+ text(
+ """
+ DO $$
+ BEGIN
+ BEGIN
+ ALTER TYPE sessionstatus ADD VALUE IF NOT EXISTS 'ROTATED';
+ EXCEPTION WHEN undefined_object THEN
+ NULL;
+ END;
+ END $$;
+ """
+ )
+ )
+
+ with engine.begin() as conn:
+ conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS warm_pool_size INTEGER NOT NULL DEFAULT 0"))
+ conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS comment TEXT NOT NULL DEFAULT ''"))
+ conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS icon_path TEXT NOT NULL DEFAULT ''"))
+ conn.execute(
+ text(
+ """
+ CREATE TABLE IF NOT EXISTS categories (
+ id SERIAL PRIMARY KEY,
+ name VARCHAR(128) NOT NULL UNIQUE,
+ slug VARCHAR(64) NOT NULL UNIQUE,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT now()
+ )
+ """
+ )
+ )
+ conn.execute(
+ text(
+ """
+ CREATE TABLE IF NOT EXISTS service_categories (
+ id SERIAL PRIMARY KEY,
+ service_id INT NOT NULL REFERENCES services(id) ON DELETE CASCADE,
+ category_id INT NOT NULL REFERENCES categories(id) ON DELETE CASCADE,
+ created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
+ UNIQUE (service_id, category_id)
+ )
+ """
+ )
+ )
+ conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_service_id ON service_categories(service_id)"))
+ conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_category_id ON service_categories(category_id)"))
+ # Handle installs where service type is VARCHAR + CHECK.
+ conn.execute(
+ text(
+ """
+ DO $$
+ DECLARE c record;
+ BEGIN
+ FOR c IN
+ SELECT conname
+ FROM pg_constraint
+ WHERE conrelid = 'services'::regclass
+ AND contype = 'c'
+ AND pg_get_constraintdef(oid) ILIKE '%type%'
+ LOOP
+ EXECUTE format('ALTER TABLE services DROP CONSTRAINT %I', c.conname);
+ END LOOP;
+ ALTER TABLE services
+ ADD CONSTRAINT services_type_check
+ CHECK (type IN ('WEB','VNC','RDP'));
+ EXCEPTION WHEN duplicate_object THEN
+ NULL;
+ END $$;
+ """
+ )
+ )
+
+
+def desired_pool_size(service: Service) -> int:
+ if not service.active:
+ return 0
+ if service.type == ServiceType.RDP and not service_uses_universal_pool(service):
+ # RDP runs on-demand per user session; no prewarmed pool.
+ return 0
+ if service_uses_universal_pool(service):
+ return UNIVERSAL_POOL_SIZE
+ return service.warm_pool_size if service.warm_pool_size and service.warm_pool_size > 0 else PREWARM_POOL_SIZE
+
+
+def get_warm_containers_for_service(service: Service) -> list:
+ prefix = f"portal-warm-{service.slug}-"
+ try:
+ d = docker_client()
+ containers = []
+ for c in d.containers.list(all=True, filters={"name": prefix}):
+ if c.name.startswith(prefix):
+ containers.append(c)
+ return containers
+ except Exception:
+ logger.exception("pool_status_failed service=%s", service.slug)
+ return []
+
+
+def get_pool_status_for_service(service: Service) -> dict:
+ if service.type == ServiceType.WEB:
+ return get_web_pool_status()
+ if service.type == ServiceType.RDP and not service_uses_universal_pool(service):
+ return {"desired": 0, "running": 0, "total": 0, "names": [], "health": "n/a"}
+ if service_uses_universal_pool(service):
+ return get_universal_pool_status()
+ desired = desired_pool_size(service)
+ containers = get_warm_containers_for_service(service)
+ running = sum(1 for c in containers if c.status == "running")
+ states = [(c.attrs.get("State") or {}).get("Status", c.status) for c in containers]
+ has_bad = any(s in {"exited", "dead"} for s in states)
+ total = len(containers)
+ if running == 0:
+ health = "down"
+ elif running >= min(desired, 1) and not has_bad:
+ health = "ok"
+ else:
+ health = "degraded"
+ return {
+ "desired": desired,
+ "running": running,
+ "total": total,
+ "names": sorted(c.name for c in containers),
+ "health": health,
+ }
+
+
+def get_pool_detailed_status(service: Service) -> dict:
+ if service.type == ServiceType.WEB:
+ d = docker_client()
+ pool = get_web_pool_status()
+ details = []
+ for i in range(max(0, pool["desired"])):
+ name = web_pool_container_name(i)
+ try:
+ c = d.containers.get(name)
+ except Exception:
+ continue
+ attrs = c.attrs or {}
+ state = (attrs.get("State") or {}).get("Status", c.status)
+ details.append(
+ {
+ "name": c.name,
+ "status": c.status,
+ "state": state,
+ "created": attrs.get("Created", ""),
+ "image": c.image.tags[0] if c.image.tags else "",
+ "labels_ok": True,
+ }
+ )
+ return {
+ "service_id": service.id,
+ "slug": service.slug,
+ "type": service.type.value,
+ "desired": pool["desired"],
+ "running": pool["running"],
+ "total": pool["total"],
+ "health": pool["health"],
+ "containers": details,
+ "updated_at": now_utc().isoformat(),
+ }
+ if service_uses_universal_pool(service):
+ d = docker_client()
+ pool = get_universal_pool_status()
+ details = []
+ for i in range(max(0, UNIVERSAL_POOL_SIZE)):
+ name = universal_container_name(i)
+ try:
+ c = d.containers.get(name)
+ except Exception:
+ continue
+ attrs = c.attrs or {}
+ state = (attrs.get("State") or {}).get("Status", c.status)
+ details.append(
+ {
+ "name": c.name,
+ "status": c.status,
+ "state": state,
+ "created": attrs.get("Created", ""),
+ "image": c.image.tags[0] if c.image.tags else "",
+ "labels_ok": True,
+ }
+ )
+ return {
+ "service_id": service.id,
+ "slug": service.slug,
+ "type": service.type.value,
+ "desired": pool["desired"],
+ "running": pool["running"],
+ "total": pool["total"],
+ "health": pool["health"],
+ "containers": details,
+ "updated_at": now_utc().isoformat(),
+ }
+ containers = get_warm_containers_for_service(service)
+ pool = get_pool_status_for_service(service)
+ details = []
+ for c in sorted(containers, key=lambda x: x.name):
+ attrs = c.attrs or {}
+ state = (attrs.get("State") or {}).get("Status", c.status)
+ created = attrs.get("Created", "")
+ labels = attrs.get("Config", {}).get("Labels", {}) or {}
+ labels_ok = (
+ labels.get("portal.warm") == "1"
+ and labels.get("portal.service.slug") == service.slug
+ and labels.get("portal.service.type") == service.type.value
+ )
+ details.append(
+ {
+ "name": c.name,
+ "status": c.status,
+ "state": state,
+ "created": created,
+ "image": c.image.tags[0] if c.image.tags else "",
+ "labels_ok": labels_ok,
+ }
+ )
+ return {
+ "service_id": service.id,
+ "slug": service.slug,
+ "type": service.type.value,
+ "desired": pool["desired"],
+ "running": pool["running"],
+ "total": pool["total"],
+ "health": pool["health"],
+ "containers": details,
+ "updated_at": now_utc().isoformat(),
+ }
+
+
+def get_active_sessions_count(db: Session, service_id: int) -> int:
+ cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
+ q = select(SessionModel).where(
+ SessionModel.service_id == service_id,
+ SessionModel.status == SessionStatus.ACTIVE,
+ SessionModel.last_access_at >= cutoff,
+ )
+ sessions = db.scalars(q).all()
+ # Avoid inflated stats when pooled slot sessions were duplicated by race:
+ # for pooled sessions, occupancy is unique container_id.
+ pooled = [s for s in sessions if (s.container_id or "").startswith(("WEBPOOLIDX:", "POOLIDX:", "POOL:"))]
+ direct = [s for s in sessions if s not in pooled]
+ unique_pooled = len({s.container_id for s in pooled if s.container_id})
+ return unique_pooled + len(direct)
+
+
+def find_active_session_for_service(db: Session, service_id: int) -> Optional[SessionModel]:
+ cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
+ q = (
+ select(SessionModel)
+ .where(
+ SessionModel.service_id == service_id,
+ SessionModel.status == SessionStatus.ACTIVE,
+ SessionModel.last_access_at >= cutoff,
+ )
+ .order_by(SessionModel.created_at.desc())
+ )
+ return db.scalars(q).first()
+
+
+def find_active_session_for_user_service(db: Session, user_id: int, service_id: int) -> Optional[SessionModel]:
+ cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
+ q = (
+ select(SessionModel)
+ .where(
+ SessionModel.user_id == user_id,
+ SessionModel.service_id == service_id,
+ SessionModel.status == SessionStatus.ACTIVE,
+ SessionModel.last_access_at >= cutoff,
+ )
+ .order_by(SessionModel.created_at.desc())
+ )
+ return db.scalars(q).first()
+
+
+class LockTimeoutError(Exception):
+ pass
+
+
+def allocator_lock(db: Session, lock_id: int, timeout_seconds: Optional[float] = None, poll_seconds: float = 0.05):
+ class _LockCtx:
+ def __enter__(self_nonlocal):
+ self_nonlocal._acquired = False
+ if timeout_seconds is None:
+ db.execute(text("SELECT pg_advisory_xact_lock(:lid)"), {"lid": lock_id})
+ self_nonlocal._acquired = True
+ return self_nonlocal
+
+ deadline = time.monotonic() + max(0.0, timeout_seconds)
+ while time.monotonic() <= deadline:
+ got = db.execute(text("SELECT pg_try_advisory_xact_lock(:lid)"), {"lid": lock_id}).scalar()
+ if got:
+ self_nonlocal._acquired = True
+ return self_nonlocal
+ time.sleep(max(0.01, poll_seconds))
+ raise LockTimeoutError(f"advisory lock timeout lock_id={lock_id} timeout={timeout_seconds}")
+
+ return self_nonlocal
+
+ def __exit__(self_nonlocal, exc_type, exc, tb):
+ return False
+
+ return _LockCtx()
+
+
+def terminate_active_slot_sessions(db: Session, container_id: str) -> None:
+ if not container_id:
+ return
+ db.execute(
+ text(
+ """
+ UPDATE sessions
+ SET status = 'TERMINATED'
+ WHERE container_id = :cid
+ AND status = 'ACTIVE'
+ """
+ ),
+ {"cid": container_id},
+ )
+
+
+def session_redirect_url(sess: SessionModel) -> str:
+ cid = sess.container_id or ""
+ if cid.startswith("POOL:") or cid.startswith("POOLIDX:") or cid.startswith("WEBPOOLIDX:"):
+ return f"/s/{sess.id}/view"
+ return f"/s/{sess.id}/"
+
+
+def open_warm_web_url(service: Service, target_url: str) -> None:
+ if service_uses_universal_pool(service):
+ return
+ if service.type != ServiceType.WEB:
+ return
+ target_url = normalize_web_target(target_url)
+ try:
+ d = docker_client()
+ containers = d.containers.list(
+ filters={
+ "label": [
+ "portal.warm=1",
+ f"portal.service.slug={service.slug}",
+ "portal.service.type=WEB",
+ ]
+ }
+ )
+ for c in containers:
+ try:
+ resp = requests.post(
+ f"http://{c.name}:7000/open",
+ json={"url": target_url},
+ timeout=2,
+ )
+ resp.raise_for_status()
+ logger.info("warm_web_open_ok service=%s container=%s url=%s", service.slug, c.name, target_url)
+ except Exception:
+ logger.exception("warm_web_open_failed service=%s container=%s", service.slug, c.name)
+ except Exception:
+ logger.exception("warm_web_open_dispatch_failed service=%s", service.slug)
+
+
+def cleanup_loop():
+ while True:
+ time.sleep(60)
+ db = SessionLocal()
+ try:
+ ensure_universal_pool()
+ ensure_web_pool()
+ for svc in db.scalars(
+ select(Service).where(
+ Service.active == True,
+ Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
+ )
+ ).all():
+ if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
+ ensure_warm_pool(svc)
+ cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
+ q = select(SessionModel).where(
+ SessionModel.status == SessionStatus.ACTIVE,
+ SessionModel.last_access_at < cutoff,
+ )
+ stale = db.scalars(q).all()
+ for sess in stale:
+ if sess.container_id and not (
+ sess.container_id.startswith("POOL:")
+ or sess.container_id.startswith("POOLIDX:")
+ or sess.container_id.startswith("WEBPOOLIDX:")
+ ):
+ stop_runtime_container(sess.container_id)
+ sess.status = SessionStatus.EXPIRED
+ if stale:
+ db.commit()
+ except Exception:
+ db.rollback()
+ logger.exception("cleanup_loop_failed")
+ finally:
+ db.close()
+
+
+def bootstrap_admin():
+ admin_user = os.getenv("ADMIN_USERNAME", "admin")
+ admin_password = os.getenv("ADMIN_PASSWORD", "admin123")
+ ttl_days = int(os.getenv("ADMIN_TTL_DAYS", "3650"))
+
+ db = SessionLocal()
+ try:
+ existing = db.scalar(select(User).where(User.username == admin_user))
+ if not existing:
+ db.add(
+ User(
+ username=admin_user,
+ password_hash=hash_password(admin_password),
+ active=True,
+ is_admin=True,
+ expires_at=now_utc() + dt.timedelta(days=ttl_days),
+ )
+ )
+ db.commit()
+ finally:
+ db.close()
+
+
+@app.on_event("startup")
+def startup_event():
+ # Multiple uvicorn workers run startup in parallel. Serialize schema bootstrap
+ # to avoid DDL races on first run and during schema extension.
+ with open("/tmp/portal-schema.lock", "w") as lock_file:
+ fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
+ Base.metadata.create_all(bind=engine)
+ ensure_schema_compatibility()
+ fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
+ ensure_icons_dir()
+ bootstrap_admin()
+ db = SessionLocal()
+ try:
+ ensure_universal_pool()
+ ensure_web_pool()
+ for svc in db.scalars(
+ select(Service).where(
+ Service.active == True,
+ Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
+ )
+ ).all():
+ if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
+ ensure_warm_pool(svc)
+ finally:
+ db.close()
+ thread = threading.Thread(target=cleanup_loop, daemon=True)
+ thread.start()
+
+
+@app.get("/", response_class=HTMLResponse)
+def index(request: Request, user: Optional[User] = Depends(get_current_user), db: Session = Depends(get_db)):
+ session_closed = (request.query_params.get("session_closed") or "").strip().lower()
+ launch_error = (request.query_params.get("launch_error") or "").strip().lower()
+ session_notice = ""
+ if session_closed == "idle":
+ session_notice = "Сессия была закрыта из-за простоя. Откройте сервис заново."
+ elif session_closed == "limit":
+ session_notice = (
+ f"Сессия была закрыта из-за лимита в {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). "
+ "Освободите один сервис и попробуйте снова."
+ )
+ elif launch_error == "max_services":
+ session_notice = (
+ f"Есть ограничение на {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). "
+ "Освободите один сервис и попробуйте снова."
+ )
+ if not user:
+ csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
+ response = templates.TemplateResponse(
+ "login.html",
+ {
+ "request": request,
+ "csrf_token": csrf,
+ "login_error": "",
+ "session_notice": session_notice,
+ },
+ )
+ response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/")
+ return response
+
+ services = db.scalars(
+ select(Service)
+ .join(UserServiceAccess, UserServiceAccess.service_id == Service.id)
+ .where(
+ UserServiceAccess.user_id == user.id,
+ Service.active == True,
+ Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
+ )
+ .order_by(Service.name)
+ ).all()
+
+ service_categories = {svc.id: [] for svc in services}
+ categories = []
+ if services:
+ service_ids = [svc.id for svc in services]
+ rows = db.execute(
+ select(ServiceCategory.service_id, Category.id, Category.name, Category.slug)
+ .join(Category, Category.id == ServiceCategory.category_id)
+ .where(ServiceCategory.service_id.in_(service_ids))
+ .order_by(Category.name)
+ ).all()
+ category_map = {}
+ for service_id, category_id, category_name, category_slug in rows:
+ service_categories.setdefault(service_id, []).append(
+ {
+ "id": category_id,
+ "name": category_name,
+ "slug": category_slug,
+ }
+ )
+ if category_id not in category_map:
+ category_map[category_id] = {"id": category_id, "name": category_name, "slug": category_slug}
+ categories = sorted(category_map.values(), key=lambda x: x["name"].lower())
+
+ selected_category_slug = (request.query_params.get("category") or "").strip().lower()
+ if selected_category_slug:
+ services = [
+ svc for svc in services
+ if any(cat["slug"] == selected_category_slug for cat in service_categories.get(svc.id, []))
+ ]
+ service_comment_html = {svc.id: format_service_comment(svc.comment) for svc in services}
+
+ return templates.TemplateResponse(
+ "dashboard.html",
+ {
+ "request": request,
+ "user": user,
+ "services": services,
+ "categories": categories,
+ "selected_category_slug": selected_category_slug,
+ "service_categories": service_categories,
+ "service_comment_html": service_comment_html,
+ "csrf_token": request.cookies.get(CSRF_COOKIE, ""),
+ "session_notice": session_notice,
+ },
+ )
+
+
+@app.get("/admin", response_class=HTMLResponse)
+def admin_page(request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)):
+ users = db.scalars(select(User).order_by(User.id)).all()
+ categories = db.scalars(select(Category).order_by(Category.name)).all()
+ services = db.scalars(select(Service).where(Service.type.in_([ServiceType.WEB, ServiceType.RDP])).order_by(Service.id)).all()
+ web_services = [s for s in services if s.type == ServiceType.WEB]
+ rdp_services = [s for s in services if s.type == ServiceType.RDP]
+ service_category_map = {s.id: [] for s in services}
+ if services:
+ service_rows = db.execute(
+ select(ServiceCategory.service_id, ServiceCategory.category_id).where(
+ ServiceCategory.service_id.in_([s.id for s in services])
+ )
+ ).all()
+ for service_id, category_id in service_rows:
+ service_category_map.setdefault(service_id, []).append(category_id)
+ acl_rows = db.scalars(select(UserServiceAccess)).all()
+ acl = {}
+ for row in acl_rows:
+ acl.setdefault(row.user_id, []).append(row.service_id)
+ for user_id in acl:
+ acl[user_id] = sorted(acl[user_id])
+ pool_status = {s.id: get_pool_status_for_service(s) for s in services}
+ service_health = {}
+ for sid, st in pool_status.items():
+ service_health[sid] = {
+ "health": st["health"],
+ "running": st["running"],
+ "desired": st["desired"],
+ "active_sessions": get_active_sessions_count(db, sid),
+ }
+ web_pool = get_web_pool_status()
+ web_totals = {
+ "services": len(web_services),
+ "running": web_pool["running"],
+ "desired": web_pool["desired"],
+ "active_sessions": sum(service_health[s.id]["active_sessions"] for s in web_services),
+ }
+ recent_sessions = db.execute(
+ text(
+ """
+ SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug,
+ s.status, s.created_at, s.last_access_at
+ FROM sessions s
+ JOIN users u ON u.id = s.user_id
+ JOIN services sv ON sv.id = s.service_id
+ WHERE sv.type IN ('WEB','RDP')
+ ORDER BY s.created_at DESC
+ LIMIT 200
+ """
+ )
+ ).mappings().all()
+ open_stats = db.execute(
+ text(
+ """
+ SELECT u.username, sv.name AS service_name, sv.slug AS service_slug, COUNT(*) AS opens
+ FROM sessions s
+ JOIN users u ON u.id = s.user_id
+ JOIN services sv ON sv.id = s.service_id
+ WHERE sv.type IN ('WEB','RDP')
+ GROUP BY u.username, sv.name, sv.slug
+ ORDER BY opens DESC, u.username ASC
+ LIMIT 200
+ """
+ )
+ ).mappings().all()
+ cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
+ online_sessions = db.execute(
+ text(
+ """
+ SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug,
+ sv.type AS service_type, s.container_id, s.created_at, s.last_access_at
+ FROM sessions s
+ JOIN users u ON u.id = s.user_id
+ JOIN services sv ON sv.id = s.service_id
+ WHERE s.status = 'ACTIVE'
+ AND s.last_access_at >= :cutoff
+ AND sv.type IN ('WEB','RDP')
+ ORDER BY s.last_access_at DESC, s.created_at DESC
+ LIMIT 500
+ """
+ ),
+ {"cutoff": cutoff},
+ ).mappings().all()
+ return templates.TemplateResponse(
+ "admin.html",
+ {
+ "request": request,
+ "admin": admin,
+ "users": users,
+ "web_services": web_services,
+ "rdp_services": rdp_services,
+ "services": services,
+ "categories": categories,
+ "service_category_map": service_category_map,
+ "acl": acl,
+ "pool_status": pool_status,
+ "service_health": service_health,
+ "web_totals": web_totals,
+ "web_pool_size": WEB_POOL_SIZE,
+ "web_pool_buffer": WEB_POOL_BUFFER,
+ "recent_sessions": recent_sessions,
+ "open_stats": open_stats,
+ "online_sessions": online_sessions,
+ "csrf_token": request.cookies.get(CSRF_COOKIE, ""),
+ "max_active_services_per_user": MAX_ACTIVE_SERVICES_PER_USER,
+ },
+ )
+
+
+@app.post("/login")
+def login(
+ request: Request,
+ username: str = Form(...),
+ password: str = Form(...),
+ csrf_token: str = Form(...),
+ db: Session = Depends(get_db),
+):
+ cookie_csrf = request.cookies.get(CSRF_COOKIE)
+ if not cookie_csrf or csrf_token != cookie_csrf:
+ raise HTTPException(status_code=403, detail="CSRF failed")
+
+ user = db.scalar(select(User).where(User.username == username))
+ if not user or not verify_password(password, user.password_hash):
+ csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
+ response = templates.TemplateResponse(
+ "login.html",
+ {
+ "request": request,
+ "csrf_token": csrf,
+ "login_error": "Неверный логин или пароль",
+ "session_notice": "",
+ },
+ status_code=401,
+ )
+ response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/")
+ return response
+ if not user_is_valid(user):
+ csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
+ response = templates.TemplateResponse(
+ "login.html",
+ {
+ "request": request,
+ "csrf_token": csrf,
+ "login_error": "Доступ к сервису приостоновлен, обратитесь к вашему менеджеру",
+ },
+ status_code=403,
+ )
+ response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/")
+ return response
+
+ response = RedirectResponse(url="/", status_code=303)
+ issue_auth_cookie(response, user)
+ issue_csrf_cookie(response)
+ audit(db, "LOGIN", f"login success: {username}", user_id=user.id)
+ return response
+
+
+@app.post("/logout")
+def logout(request: Request):
+ response = RedirectResponse(url="/", status_code=303)
+ response.delete_cookie(COOKIE_NAME, path="/")
+ response.delete_cookie(CSRF_COOKIE, path="/")
+ return response
+
+
+@app.get("/go/{slug}")
+def go_service(slug: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
+ total_started = time.perf_counter()
+ phase_ms = {}
+
+ def _mark(name: str, started: float) -> None:
+ phase_ms[name] = int((time.perf_counter() - started) * 1000)
+
+ def _emit(result: str, **extra) -> None:
+ payload = {
+ "user_id": user.id,
+ "service_slug": slug,
+ "result": result,
+ "total_ms": int((time.perf_counter() - total_started) * 1000),
+ }
+ payload.update(phase_ms)
+ payload.update(extra)
+ log_event("go_service_timing", **payload)
+
+ log_event("session_open_requested", user_id=user.id, service_slug=slug)
+ service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
+ if not service:
+ raise HTTPException(status_code=404, detail="Service not found")
+ if service.type == ServiceType.VNC:
+ raise HTTPException(status_code=410, detail="VNC services are deprecated")
+ if not has_access(db, user.id, service.id):
+ raise HTTPException(status_code=403, detail="ACL denied")
+
+ user_lock_started = time.perf_counter()
+ try:
+ with allocator_lock(db, 92000 + int(user.id), timeout_seconds=GO_USER_LOCK_TIMEOUT_SECONDS):
+ _mark("wait_user_lock_ms", user_lock_started)
+
+ t_existing = time.perf_counter()
+ existing_user_session = find_active_session_for_user_service(db, user.id, service.id)
+ _mark("check_existing_ms", t_existing)
+ if existing_user_session:
+ _emit("reuse_session", session_id=existing_user_session.id)
+ return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303)
+
+ t_limit = time.perf_counter()
+ cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
+ active_rows = db.scalars(
+ select(SessionModel).where(
+ SessionModel.user_id == user.id,
+ SessionModel.status == SessionStatus.ACTIVE,
+ SessionModel.last_access_at >= cutoff,
+ )
+ ).all()
+ active_rows = sorted(active_rows, key=lambda row: row.created_at)
+ active_service_ids = {row.service_id for row in active_rows}
+ _mark("check_limit_ms", t_limit)
+ if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER:
+ oldest = next((row for row in active_rows if row.service_id != service.id), None)
+ if oldest:
+ t_rotate = time.perf_counter()
+ terminate_session_record(db, oldest, SessionStatus.ROTATED, stop_container=True)
+ db.commit()
+ _mark("rotate_oldest_ms", t_rotate)
+ log_event(
+ "session_rotated",
+ user_id=user.id,
+ closed_session_id=oldest.id,
+ closed_service_id=oldest.service_id,
+ new_service_id=service.id,
+ )
+ else:
+ _emit("max_services_redirect")
+ return RedirectResponse(url="/?launch_error=max_services", status_code=303)
+
+ if service.type == ServiceType.RDP:
+ t_rdp_owner = time.perf_counter()
+ active_owner = find_active_session_for_service(db, service.id)
+ _mark("check_rdp_owner_ms", t_rdp_owner)
+ if active_owner:
+ if active_owner.user_id != user.id:
+ owner = db.get(User, active_owner.user_id)
+ owner_name = owner.username if owner else f"id={active_owner.user_id}"
+ _emit("rdp_busy", owner=owner_name)
+ raise HTTPException(
+ status_code=409,
+ detail=f"RDP сервис уже занят пользователем {owner_name}. Попробуйте позже.",
+ )
+ _emit("reuse_rdp_session", session_id=active_owner.id)
+ return RedirectResponse(url=session_redirect_url(active_owner), status_code=303)
+
+ session_id = str(uuid.uuid4())
+ if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0:
+ try:
+ t_pool_lock = time.perf_counter()
+ with allocator_lock(db, 91001, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
+ _mark("wait_web_pool_lock_ms", t_pool_lock)
+ t_ensure = time.perf_counter()
+ ensure_web_pool()
+ _mark("ensure_web_pool_ms", t_ensure)
+
+ t_acquire = time.perf_counter()
+ slot = acquire_web_pool_slot(db)
+ _mark("acquire_web_slot_ms", t_acquire)
+ slot_cid = f"WEBPOOLIDX:{slot}"
+
+ t_dispatch = time.perf_counter()
+ terminate_active_slot_sessions(db, slot_cid)
+ dispatch_web_pool_target(slot, service)
+ _mark("dispatch_web_target_ms", t_dispatch)
+
+ t_commit = time.perf_counter()
+ session_obj = SessionModel(
+ id=session_id,
+ user_id=user.id,
+ service_id=service.id,
+ container_id=slot_cid,
+ status=SessionStatus.ACTIVE,
+ created_at=now_utc(),
+ last_access_at=now_utc(),
+ )
+ db.add(session_obj)
+ db.commit()
+ _mark("db_commit_ms", t_commit)
+ except LockTimeoutError:
+ _emit("web_pool_lock_timeout")
+ raise HTTPException(status_code=503, detail="Пул WEB занят. Повторите через несколько секунд.")
+ except Exception as exc:
+ logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
+ log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="web_pool", error=str(exc))
+ audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
+ _emit("web_pool_create_failed", error=str(exc))
+ raise HTTPException(status_code=502, detail="WEB runtime failed to switch target")
+ log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="web_pool", slot=slot)
+ audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
+ _emit("session_created_web_pool", session_id=session_id, slot=slot)
+ return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
+
+ if service_uses_universal_pool(service):
+ try:
+ t_pool_lock = time.perf_counter()
+ with allocator_lock(db, 91002, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
+ _mark("wait_universal_pool_lock_ms", t_pool_lock)
+ t_ensure = time.perf_counter()
+ ensure_universal_pool()
+ _mark("ensure_universal_pool_ms", t_ensure)
+
+ t_acquire = time.perf_counter()
+ slot = acquire_universal_slot(db)
+ _mark("acquire_universal_slot_ms", t_acquire)
+ slot_cid = f"POOLIDX:{slot}"
+
+ t_dispatch = time.perf_counter()
+ terminate_active_slot_sessions(db, slot_cid)
+ dispatch_universal_target(slot, service)
+ _mark("dispatch_universal_target_ms", t_dispatch)
+
+ t_commit = time.perf_counter()
+ session_obj = SessionModel(
+ id=session_id,
+ user_id=user.id,
+ service_id=service.id,
+ container_id=slot_cid,
+ status=SessionStatus.ACTIVE,
+ created_at=now_utc(),
+ last_access_at=now_utc(),
+ )
+ db.add(session_obj)
+ db.commit()
+ _mark("db_commit_ms", t_commit)
+ except LockTimeoutError:
+ _emit("universal_pool_lock_timeout")
+ raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.")
+ except Exception as exc:
+ logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
+ log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="universal_pool", error=str(exc))
+ audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
+ _emit("universal_pool_create_failed", error=str(exc))
+ raise HTTPException(status_code=502, detail="Universal runtime failed to switch target")
+ log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="universal_pool", slot=slot)
+ audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
+ _emit("session_created_universal_pool", session_id=session_id, slot=slot)
+ return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
+
+ if service.type == ServiceType.WEB and desired_pool_size(service) > 0:
+ t_warm = time.perf_counter()
+ ensure_warm_pool(service)
+ open_warm_web_url(service, service.target)
+ _mark("warm_pool_prepare_ms", t_warm)
+
+ t_commit = time.perf_counter()
+ session_obj = SessionModel(
+ id=session_id,
+ user_id=user.id,
+ service_id=service.id,
+ container_id=f"POOL:{service.slug}",
+ status=SessionStatus.ACTIVE,
+ created_at=now_utc(),
+ last_access_at=now_utc(),
+ )
+ db.add(session_obj)
+ db.commit()
+ _mark("db_commit_ms", t_commit)
+ log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="warm_pool")
+ audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id)
+ _emit("session_created_warm_pool", session_id=session_id)
+ return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
+
+ try:
+ t_create = time.perf_counter()
+ container_id = create_runtime_container(service, session_id)
+ _mark("create_runtime_container_ms", t_create)
+ except Exception as exc:
+ logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id)
+ log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="single_runtime", error=str(exc))
+ audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
+ _emit("single_runtime_create_failed", error=str(exc))
+ raise HTTPException(status_code=502, detail="Session runtime failed to start")
+
+ t_commit = time.perf_counter()
+ session_obj = SessionModel(
+ id=session_id,
+ user_id=user.id,
+ service_id=service.id,
+ container_id=container_id,
+ status=SessionStatus.ACTIVE,
+ created_at=now_utc(),
+ last_access_at=now_utc(),
+ )
+ db.add(session_obj)
+ db.commit()
+ _mark("db_commit_ms", t_commit)
+ log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="single_runtime", container_id=container_id)
+
+ audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id)
+ t_wait = time.perf_counter()
+ ready = wait_for_session_route(session_id)
+ _mark("wait_session_route_ms", t_wait)
+ log_event("session_route_ready", session_id=session_id, ready=ready)
+ _emit("session_created_single_runtime", session_id=session_id, ready=ready)
+ return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
+ except LockTimeoutError:
+ _emit("user_lock_timeout")
+ raise HTTPException(status_code=429, detail="Слишком много параллельных запусков. Повторите через несколько секунд.")
+
+
+@app.get("/svc/{slug}/", response_class=HTMLResponse)
+def service_wait_page(slug: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
+ service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
+ if not service:
+ raise HTTPException(status_code=404, detail="Service not found")
+ if not has_access(db, user.id, service.id):
+ raise HTTPException(status_code=403, detail="ACL denied")
+ return HTMLResponse(
+ content="""
+
+
+