diff --git a/app/main.py b/app/main.py index 450f417..7bf301c 100644 --- a/app/main.py +++ b/app/main.py @@ -44,6 +44,8 @@ LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() TRAEFIK_INTERNAL_URL = os.getenv("TRAEFIK_INTERNAL_URL", "http://traefik") PREWARM_POOL_SIZE = int(os.getenv("PREWARM_POOL_SIZE", "0")) UNIVERSAL_POOL_SIZE = int(os.getenv("UNIVERSAL_POOL_SIZE", "5")) +WEB_POOL_SIZE = int(os.getenv("WEB_POOL_SIZE", "5")) +WEB_POOL_BUFFER = int(os.getenv("WEB_POOL_BUFFER", "2")) ICON_UPLOAD_MAX_BYTES = 2 * 1024 * 1024 ICON_UPLOAD_TYPES = { "image/png": "png", @@ -223,6 +225,10 @@ def universal_container_name(slot: int) -> str: return f"portal-universal-{slot}" +def web_pool_container_name(slot: int) -> str: + return f"portal-webpool-{slot}" + + def ensure_icons_dir() -> None: SERVICE_ICONS_DIR.mkdir(parents=True, exist_ok=True) @@ -422,6 +428,67 @@ def ensure_universal_pool() -> None: logger.info("universal_pool_container_started slot=%s", i) +def ensure_web_pool(target_size: Optional[int] = None) -> None: + desired = max(0, WEB_POOL_SIZE if target_size is None else target_size) + d = docker_client() + image = "portal-universal-runtime:latest" + + for i in range(desired, 100): + name = web_pool_container_name(i) + try: + c = d.containers.get(name) + c.stop(timeout=5) + except docker.errors.NotFound: + break + except Exception: + logger.exception("web_pool_scale_down_failed slot=%s", i) + + for i in range(desired): + name = web_pool_container_name(i) + path = f"/w/{i}/" + router = f"wpool-{i}" + labels = { + "traefik.enable": "true", + "traefik.docker.network": "portal_net", + f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)", + f"traefik.http.routers.{router}.entrypoints": "websecure", + f"traefik.http.routers.{router}.tls": "true", + f"traefik.http.routers.{router}.priority": "9450", + f"traefik.http.routers.{router}.middlewares": f"{router}-strip", + f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1], + f"traefik.http.services.{router}.loadbalancer.server.port": "6080", + "portal.pool": "1", + "portal.pool.kind": "web", + "portal.pool.slot": str(i), + } + env = { + "IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS), + "ENABLE_HEARTBEAT": "0", + "SESSION_ID": f"webpool-{i}", + } + try: + c = d.containers.get(name) + if c.status != "running": + c.start() + continue + except docker.errors.NotFound: + pass + except Exception: + logger.exception("web_pool_check_failed slot=%s", i) + continue + + d.containers.run( + image=image, + name=name, + detach=True, + auto_remove=True, + network="portal_net", + labels=labels, + environment=env, + ) + logger.info("web_pool_container_started slot=%s", i) + + def get_universal_pool_status() -> dict: desired = max(0, UNIVERSAL_POOL_SIZE) if desired <= 0: @@ -445,6 +512,29 @@ def get_universal_pool_status() -> dict: } +def get_web_pool_status() -> dict: + desired = max(0, WEB_POOL_SIZE) + if desired <= 0: + return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []} + d = docker_client() + names = [web_pool_container_name(i) for i in range(desired)] + containers = [] + for name in names: + try: + containers.append(d.containers.get(name)) + except Exception: + continue + running = sum(1 for c in containers if c.status == "running") + health = "ok" if running >= min(desired, 1) else "down" + return { + "desired": desired, + "running": running, + "total": len(containers), + "names": sorted(c.name for c in containers), + "health": health, + } + + def acquire_universal_slot(db: Session) -> int: cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) q = select(SessionModel).where( @@ -473,6 +563,33 @@ def acquire_universal_slot(db: Session) -> int: return 0 +def acquire_web_pool_slot(db: Session) -> int: + cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) + q = select(SessionModel).where( + SessionModel.status == SessionStatus.ACTIVE, + SessionModel.container_id.like("WEBPOOLIDX:%"), + SessionModel.last_access_at >= cutoff, + ) + active = db.scalars(q).all() + busy = set() + for sess in active: + try: + busy.add(int((sess.container_id or "").split(":", 1)[1])) + except Exception: + continue + + # Keep headroom: when active sessions are close to hot pool capacity, + # proactively warm up extra slots. + auto_target = max(WEB_POOL_SIZE, len(active) + max(0, WEB_POOL_BUFFER)) + if auto_target > WEB_POOL_SIZE: + ensure_web_pool(auto_target) + + for i in range(max(0, auto_target)): + if i not in busy: + return i + return 0 + + def dispatch_universal_target(slot: int, service: Service) -> None: name = universal_container_name(slot) url = "" @@ -507,6 +624,23 @@ def dispatch_universal_target(slot: int, service: Service) -> None: raise last_exc +def dispatch_web_pool_target(slot: int, service: Service) -> None: + name = web_pool_container_name(slot) + target_url = normalize_web_target(service.target) + url = f"http://{name}:7000/open" + last_exc = None + for _ in range(8): + try: + resp = requests.post(url, json={"url": target_url}, timeout=3) + resp.raise_for_status() + return + except Exception as exc: + last_exc = exc + time.sleep(0.4) + if last_exc: + raise last_exc + + def create_runtime_container(service: Service, session_id: str): d = docker_client() router = session_router_name(session_id) @@ -706,7 +840,11 @@ def route_ready(path: str) -> bool: def container_running(container_id: Optional[str]) -> bool: if not container_id: return False - if container_id.startswith("POOL:") or container_id.startswith("POOLIDX:"): + if ( + container_id.startswith("POOL:") + or container_id.startswith("POOLIDX:") + or container_id.startswith("WEBPOOLIDX:") + ): return True try: c = docker_client().containers.get(container_id) @@ -798,6 +936,8 @@ def get_warm_containers_for_service(service: Service) -> list: def get_pool_status_for_service(service: Service) -> dict: + if service.type == ServiceType.WEB: + return get_web_pool_status() if service_uses_universal_pool(service): return get_universal_pool_status() desired = desired_pool_size(service) @@ -822,6 +962,39 @@ def get_pool_status_for_service(service: Service) -> dict: def get_pool_detailed_status(service: Service) -> dict: + if service.type == ServiceType.WEB: + d = docker_client() + pool = get_web_pool_status() + details = [] + for i in range(max(0, pool["desired"])): + name = web_pool_container_name(i) + try: + c = d.containers.get(name) + except Exception: + continue + attrs = c.attrs or {} + state = (attrs.get("State") or {}).get("Status", c.status) + details.append( + { + "name": c.name, + "status": c.status, + "state": state, + "created": attrs.get("Created", ""), + "image": c.image.tags[0] if c.image.tags else "", + "labels_ok": True, + } + ) + return { + "service_id": service.id, + "slug": service.slug, + "type": service.type.value, + "desired": pool["desired"], + "running": pool["running"], + "total": pool["total"], + "health": pool["health"], + "containers": details, + "updated_at": now_utc().isoformat(), + } if service_uses_universal_pool(service): d = docker_client() pool = get_universal_pool_status() @@ -939,12 +1112,15 @@ def cleanup_loop(): db = SessionLocal() try: ensure_universal_pool() + ensure_web_pool() for svc in db.scalars( select(Service).where( Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) ).all(): + if svc.type == ServiceType.WEB: + continue if not service_uses_universal_pool(svc): ensure_warm_pool(svc) cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) @@ -954,7 +1130,11 @@ def cleanup_loop(): ) stale = db.scalars(q).all() for sess in stale: - if sess.container_id and not (sess.container_id.startswith("POOL:") or sess.container_id.startswith("POOLIDX:")): + if sess.container_id and not ( + sess.container_id.startswith("POOL:") + or sess.container_id.startswith("POOLIDX:") + or sess.container_id.startswith("WEBPOOLIDX:") + ): stop_runtime_container(sess.container_id) sess.status = SessionStatus.EXPIRED if stale: @@ -998,12 +1178,15 @@ def startup_event(): db = SessionLocal() try: ensure_universal_pool() + ensure_web_pool() for svc in db.scalars( select(Service).where( Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) ).all(): + if svc.type == ServiceType.WEB: + continue if not service_uses_universal_pool(svc): ensure_warm_pool(svc) finally: @@ -1057,10 +1240,11 @@ def admin_page(request: Request, admin: User = Depends(require_admin), db: Sessi "desired": st["desired"], "active_sessions": get_active_sessions_count(db, sid), } + web_pool = get_web_pool_status() web_totals = { "services": len(web_services), - "running": sum(service_health[s.id]["running"] for s in web_services), - "desired": sum(service_health[s.id]["desired"] for s in web_services), + "running": web_pool["running"], + "desired": web_pool["desired"], "active_sessions": sum(service_health[s.id]["active_sessions"] for s in web_services), } recent_sessions = db.execute( @@ -1104,6 +1288,8 @@ def admin_page(request: Request, admin: User = Depends(require_admin), db: Sessi "pool_status": pool_status, "service_health": service_health, "web_totals": web_totals, + "web_pool_size": WEB_POOL_SIZE, + "web_pool_buffer": WEB_POOL_BUFFER, "recent_sessions": recent_sessions, "open_stats": open_stats, "csrf_token": request.cookies.get(CSRF_COOKIE, ""), @@ -1153,6 +1339,29 @@ def go_service(slug: str, user: User = Depends(require_user), db: Session = Depe raise HTTPException(status_code=403, detail="ACL denied") session_id = str(uuid.uuid4()) + if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0: + try: + ensure_web_pool() + slot = acquire_web_pool_slot(db) + dispatch_web_pool_target(slot, service) + except Exception as exc: + logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id) + audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) + raise HTTPException(status_code=502, detail="WEB runtime failed to switch target") + session_obj = SessionModel( + id=session_id, + user_id=user.id, + service_id=service.id, + container_id=f"WEBPOOLIDX:{slot}", + status=SessionStatus.ACTIVE, + created_at=now_utc(), + last_access_at=now_utc(), + ) + db.add(session_obj) + db.commit() + audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id) + return RedirectResponse(url=f"/w/{slot}/?sid={session_id}", status_code=303) + if service_uses_universal_pool(service): try: ensure_universal_pool() @@ -1406,7 +1615,15 @@ def session_status(session_id: str, user: User = Depends(require_user), db: Sess raise HTTPException(status_code=410, detail="Session is not active") service = db.get(Service, sess.service_id) pooled_web = bool(sess.container_id and sess.container_id.startswith("POOL:") and service and service.type == ServiceType.WEB) + web_pool_idx = None + if sess.container_id and sess.container_id.startswith("WEBPOOLIDX:"): + try: + web_pool_idx = int(sess.container_id.split(":", 1)[1]) + except Exception: + web_pool_idx = None route_path = f"/svc/{service.slug}/" if pooled_web and service else f"/s/{session_id}/" + if web_pool_idx is not None: + route_path = f"/w/{web_pool_idx}/" route_ok = route_ready(route_path) running = container_running(sess.container_id) ready = running and route_ok @@ -1421,6 +1638,8 @@ def session_status(session_id: str, user: User = Depends(require_user), db: Sess } if pooled_web: payload["redirect_url"] = f"/s/{session_id}/view" + if web_pool_idx is not None: + payload["redirect_url"] = f"/w/{web_pool_idx}/?sid={session_id}" return payload @@ -1538,6 +1757,9 @@ def prewarm_now(service_id: int, request: Request, _: User = Depends(require_adm service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") + if service.type == ServiceType.WEB: + ensure_web_pool() + return {"ok": True, "pool": get_web_pool_status()} if service_uses_universal_pool(service): ensure_universal_pool() return {"ok": True, "pool": get_universal_pool_status()} @@ -1545,6 +1767,16 @@ def prewarm_now(service_id: int, request: Request, _: User = Depends(require_adm return {"ok": True, "pool": get_pool_status_for_service(service)} +@app.put("/api/admin/web-pool-size") +def update_web_pool_size(payload: dict, request: Request, _: User = Depends(require_admin)): + validate_csrf(request) + global WEB_POOL_SIZE + value = max(0, int(payload.get("size", WEB_POOL_SIZE))) + WEB_POOL_SIZE = value + ensure_web_pool() + return {"ok": True, "size": WEB_POOL_SIZE, "pool": get_web_pool_status()} + + @app.post("/api/admin/users") def create_user(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) diff --git a/app/templates/admin.html b/app/templates/admin.html index c803654..f795dc8 100644 --- a/app/templates/admin.html +++ b/app/templates/admin.html @@ -19,7 +19,7 @@
Основной режим: WEB. Пользователь выбирает сервис, а портал открывает нужный URL в заранее прогретом браузере. - Поле pool size задаёт, сколько таких прогретых контейнеров держать для конкретного сервиса. + Для WEB используется общий пул горячих контейнеров с автодоращиванием по занятости.
@@ -90,7 +90,15 @@