import datetime as dt import logging import re import secrets import threading import uuid import time import contextvars from typing import Optional from fastapi import Depends, FastAPI, File, Form, HTTPException, Query, Request, UploadFile, status from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from markupsafe import Markup, escape from sqlalchemy import select from sqlalchemy import delete, select, text, update from sqlalchemy.orm import Session from starlette.responses import HTMLResponse as _HR import urllib.request as _urllib_request import urllib.parse as _urllib_parse import json as _json from config import ( COOKIE_NAME, CSRF_COOKIE, GO_POOL_LOCK_TIMEOUT_SECONDS, GO_USER_LOCK_TIMEOUT_SECONDS, LOG_LEVEL, LOG_SLOW_REQUEST_MS, MAX_ACTIVE_SERVICES_PER_USER, PUBLIC_HOST, SESSION_IDLE_SECONDS, WEB_POOL_BUFFER, WEB_POOL_SIZE, TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, TELEGRAM_API_URL, ) from database import get_db from models import ( AuditLog, Category, RdpSlot, Service, ServiceCategory, ServiceType, SessionModel, SessionStatus, User, UserServiceAccess, ) from utils import ( audit, ensure_icons_dir, format_service_comment, log_event, normalize_web_target, now_utc, parse_rdp_target, remove_icon_file, request_id_ctx, set_service_categories, session_closed_reason, store_service_icon, ) from auth import ( get_current_user, has_access, issue_auth_cookie, issue_csrf_cookie, require_admin, require_user, user_is_valid, validate_csrf, verify_password, hash_password, ) from runtime import ( acquire_universal_slot, acquire_web_pool_slot, allocator_lock, container_running, create_runtime_container, desired_pool_size, dispatch_universal_target, dispatch_web_pool_target, docker_client, ensure_universal_pool, ensure_warm_pool, ensure_web_pool, find_active_session_for_service, find_active_session_for_user_service, get_active_sessions_count, get_pool_detailed_status, get_pool_status_for_service, get_universal_pool_status, get_web_pool_status, LockTimeoutError, open_warm_web_url, _rdp_slot_container_name, route_ready, sanitize_client_resolution, service_uses_universal_pool, session_redirect_url, connect_rdp_slot, start_rdp_slot_container, stop_rdp_slot_container, stop_runtime_container, terminate_active_slot_sessions, terminate_session_record, wait_for_session_route, ) from maintenance import on_startup logging.basicConfig( level=LOG_LEVEL, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) logger = logging.getLogger("portal") templates = Jinja2Templates(directory="templates") def _get_real_ip(request) -> str: """Real client IP from X-Forwarded-For (Traefik trusts NPM via trustedIPs).""" forwarded_for = request.headers.get("x-forwarded-for", "") if forwarded_for: return forwarded_for.split(",")[0].strip() return request.client.host if request.client else "unknown" def _get_geo(ip: str) -> str: """Lookup city/country for IP via ip-api.com. Returns formatted string or empty.""" try: if ip in ("unknown", "127.0.0.1", "::1") or ip.startswith("10.") or ip.startswith("192.168."): return "" url = f"http://ip-api.com/json/{ip}?lang=ru&fields=status,country,regionName,city,query" req = _urllib_request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with _urllib_request.urlopen(req, timeout=5) as resp: data = _json.loads(resp.read()) if data.get("status") == "success": parts = [data.get("country", ""), data.get("regionName", ""), data.get("city", "")] return ", ".join(p for p in parts if p) except Exception: pass return "" app = FastAPI(title="МОНТ - инфрастуктурный полигон") app.mount("/static", StaticFiles(directory="static"), name="static") @app.middleware("http") async def request_logging_middleware(request: Request, call_next): req_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8]) token = request_id_ctx.set(req_id) started = time.time() client_ip = request.client.host if request.client else "-" user_agent = request.headers.get("user-agent", "-") try: response = await call_next(request) except Exception: log_event( "request_failed", level=logging.ERROR, method=request.method, path=request.url.path, client_ip=client_ip, user_agent=user_agent, ) request_id_ctx.reset(token) raise duration_ms = int((time.time() - started) * 1000) level = logging.INFO if response.status_code >= 500: level = logging.ERROR elif response.status_code >= 400: level = logging.WARNING log_event( "request", level=level, method=request.method, path=request.url.path, query=str(request.url.query or ""), status=response.status_code, duration_ms=duration_ms, client_ip=client_ip, user_agent=user_agent, ) if duration_ms >= LOG_SLOW_REQUEST_MS: log_event( "slow_request", level=logging.WARNING, method=request.method, path=request.url.path, duration_ms=duration_ms, threshold_ms=LOG_SLOW_REQUEST_MS, ) response.headers["X-Request-ID"] = req_id request_id_ctx.reset(token) return response _MOBILE_UA_RE = re.compile( r"(Mobile|Android|iPhone|iPad|iPod|BlackBerry|IEMobile|Opera Mini|webOS)", re.IGNORECASE, ) _MOBILE_PAGE = ( "" '' "
" '' '' "
'
'Пожалуйста, откройте эту страницу на компьютере или ноутбуке.
" "" "" ) @app.middleware("http") async def mobile_block_middleware(request: Request, call_next): path = request.url.path if path.startswith("/static/"): return await call_next(request) ua = request.headers.get("user-agent", "") if _MOBILE_UA_RE.search(ua): return _HR(content=_MOBILE_PAGE, status_code=200) return await call_next(request) @app.on_event("startup") def startup_event(): on_startup() @app.get("/", response_class=HTMLResponse) def index(request: Request, user: Optional[User] = Depends(get_current_user), db: Session = Depends(get_db)): session_closed = (request.query_params.get("session_closed") or "").strip().lower() launch_error = (request.query_params.get("launch_error") or "").strip().lower() session_notice = "" if session_closed == "idle": session_notice = "Сессия была закрыта из-за простоя. Откройте сервис заново." elif session_closed == "limit": session_notice = ( f"Сессия была закрыта из-за лимита в {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). " "Освободите один сервис и попробуйте снова." ) elif launch_error == "max_services": session_notice = ( f"Есть ограничение на {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). " "Освободите один сервис и попробуйте снова." ) if not user: csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) response = templates.TemplateResponse( "login.html", { "request": request, "csrf_token": csrf, "login_error": "", "session_notice": session_notice, }, ) response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="lax", path="/") return response services = db.scalars( select(Service) .join(UserServiceAccess, UserServiceAccess.service_id == Service.id) .where( UserServiceAccess.user_id == user.id, Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) .order_by(Service.name) ).all() service_categories = {svc.id: [] for svc in services} categories = [] if services: service_ids = [svc.id for svc in services] rows = db.execute( select(ServiceCategory.service_id, Category.id, Category.name, Category.slug) .join(Category, Category.id == ServiceCategory.category_id) .where(ServiceCategory.service_id.in_(service_ids)) .order_by(Category.name) ).all() category_map = {} for service_id, category_id, category_name, category_slug in rows: service_categories.setdefault(service_id, []).append( { "id": category_id, "name": category_name, "slug": category_slug, } ) if category_id not in category_map: category_map[category_id] = {"id": category_id, "name": category_name, "slug": category_slug} categories = sorted(category_map.values(), key=lambda x: x["name"].lower()) selected_category_slug = (request.query_params.get("category") or "").strip().lower() if selected_category_slug: services = [ svc for svc in services if any(cat["slug"] == selected_category_slug for cat in service_categories.get(svc.id, [])) ] service_comment_html = {svc.id: format_service_comment(svc.comment) for svc in services} return templates.TemplateResponse( "dashboard.html", { "request": request, "user": user, "services": services, "categories": categories, "selected_category_slug": selected_category_slug, "service_categories": service_categories, "service_comment_html": service_comment_html, "csrf_token": request.cookies.get(CSRF_COOKIE, ""), "session_notice": session_notice, }, ) @app.get("/admin", response_class=HTMLResponse) def admin_page(request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)): users = db.scalars(select(User).order_by(User.id)).all() categories = db.scalars(select(Category).order_by(Category.name)).all() services = db.scalars(select(Service).where(Service.type.in_([ServiceType.WEB, ServiceType.RDP])).order_by(Service.id)).all() web_services = [s for s in services if s.type == ServiceType.WEB] rdp_services = [s for s in services if s.type == ServiceType.RDP] service_category_map = {s.id: [] for s in services} if services: service_rows = db.execute( select(ServiceCategory.service_id, ServiceCategory.category_id).where( ServiceCategory.service_id.in_([s.id for s in services]) ) ).all() for service_id, category_id in service_rows: service_category_map.setdefault(service_id, []).append(category_id) acl_rows = db.scalars(select(UserServiceAccess)).all() acl = {} for row in acl_rows: acl.setdefault(row.user_id, []).append(row.service_id) for user_id in acl: acl[user_id] = sorted(acl[user_id]) pool_status = {s.id: get_pool_status_for_service(s) for s in services} service_health = {} for sid, st in pool_status.items(): service_health[sid] = { "health": st["health"], "running": st["running"], "desired": st["desired"], "active_sessions": get_active_sessions_count(db, sid), } web_pool = get_web_pool_status() web_totals = { "services": len(web_services), "running": web_pool["running"], "desired": web_pool["desired"], "active_sessions": sum(service_health[s.id]["active_sessions"] for s in web_services), } recent_sessions = db.execute( text( """ SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug, s.status, s.created_at, s.last_access_at FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE sv.type IN ('WEB','RDP') ORDER BY s.created_at DESC LIMIT 200 """ ) ).mappings().all() open_stats = db.execute( text( """ SELECT u.username, sv.name AS service_name, sv.slug AS service_slug, COUNT(*) AS opens FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE sv.type IN ('WEB','RDP') GROUP BY u.username, sv.name, sv.slug ORDER BY opens DESC, u.username ASC LIMIT 200 """ ) ).mappings().all() cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) online_sessions = db.execute( text( """ SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug, sv.type AS service_type, s.container_id, s.created_at, s.last_access_at FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE s.status = 'ACTIVE' AND s.last_access_at >= :cutoff AND sv.type IN ('WEB','RDP') ORDER BY s.last_access_at DESC, s.created_at DESC LIMIT 500 """ ), {"cutoff": cutoff}, ).mappings().all() rdp_slots: dict[int, list] = {} for svc in rdp_services: slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == svc.id).order_by(RdpSlot.id)).all() slot_list = [] for slot in slots: active_sess = db.scalar( select(SessionModel).where( SessionModel.container_id == f"RDPSLOT:{slot.id}", SessionModel.status == SessionStatus.ACTIVE, ) ) running = False if slot.container_name: try: c = docker_client().containers.get(slot.container_name) running = c.status == "running" except Exception: pass occupied_username = None if active_sess: u = db.get(User, active_sess.user_id) occupied_username = u.username if u else f"id={active_sess.user_id}" slot_list.append({ "id": slot.id, "rdp_username": slot.rdp_username, "container_name": slot.container_name or "", "running": running, "occupied_username": occupied_username, }) rdp_slots[svc.id] = slot_list return templates.TemplateResponse( "admin.html", { "request": request, "admin": admin, "users": users, "web_services": web_services, "rdp_services": rdp_services, "services": services, "categories": categories, "service_category_map": service_category_map, "acl": acl, "pool_status": pool_status, "service_health": service_health, "web_totals": web_totals, "web_pool_size": WEB_POOL_SIZE, "web_pool_buffer": WEB_POOL_BUFFER, "recent_sessions": recent_sessions, "open_stats": open_stats, "online_sessions": online_sessions, "csrf_token": request.cookies.get(CSRF_COOKIE, ""), "max_active_services_per_user": MAX_ACTIVE_SERVICES_PER_USER, "rdp_slots": rdp_slots, }, ) @app.get("/yandex_b847b9b35f967fcc.html", include_in_schema=False) def yandex_verify(): from fastapi.responses import HTMLResponse return HTMLResponse(''' Verification: b847b9b35f967fcc ''') @app.get("/robots.txt", include_in_schema=False) def robots_txt(): from fastapi.responses import FileResponse return FileResponse("static/robots.txt", media_type="text/plain") @app.get("/sitemap.xml", include_in_schema=False) def sitemap_xml(): from fastapi.responses import FileResponse return FileResponse("static/sitemap.xml", media_type="application/xml") @app.get("/api/public/services-by-category") def public_services_by_category(db: Session = Depends(get_db)): services = db.execute( select(Service).where(Service.active == True).order_by(Service.name) ).scalars().all() categories = db.execute(select(Category).order_by(Category.name)).scalars().all() cat_map = {c.id: c.name for c in categories} svc_cats: dict[int, list[str]] = {} links = db.execute(select(ServiceCategory)).scalars().all() for lnk in links: svc_cats.setdefault(lnk.service_id, []).append(cat_map.get(lnk.category_id, "")) result: dict[str, list[dict]] = {"Без категории": []} for svc in services: cats = svc_cats.get(svc.id, []) entry = {"id": svc.id, "name": svc.name} if cats: for cat in cats: result.setdefault(cat, []).append(entry) else: result["Без категории"].append(entry) if not result["Без категории"]: del result["Без категории"] return result @app.post("/api/request-access") async def request_access(request: Request, db: Session = Depends(get_db)): try: data = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") name = str(data.get("name", "")).strip() company = str(data.get("company", "")).strip() email = str(data.get("email", "")).strip() phone = str(data.get("phone", "")).strip() manager = str(data.get("manager", "")).strip() products = data.get("products", []) import re as _re if not name or not company or not email or not phone: raise HTTPException(status_code=422, detail="Заполните все обязательные поля") if not _re.match(r'^[^\s@]+@[^\s@]+\.[^\s@]+$', email): raise HTTPException(status_code=422, detail="Некорректный email") if not _re.match(r'^[\+\d][\d\s\-\(\)]{6,18}$', phone): raise HTTPException(status_code=422, detail="Некорректный номер телефона") products_text = "" if products: items = "\n".join(f" • {p}" for p in products) products_text = f"\n\n🖥 *Интересующие продукты:*\n{items}" divider = "━━━━━━━━━━━━━━━━━━━━━━" manager_text = f"\n🤝 *Менеджер МОНТ:* {manager}" if manager else "" text = ( f"🔔 *Новый запрос доступа к полигону МОНТ*\n" f"{divider}\n\n" f"👤 *Имя:* {name}\n" f"🏢 *Компания:* {company}\n" f"📧 *Email:* {email}\n" f"📱 *Телефон:* {phone}" f"{manager_text}" f"{products_text}" ) log_event("ip_headers", xff=request.headers.get("x-forwarded-for","–"), xri=request.headers.get("x-real-ip","–"), client=str(request.client.host if request.client else "–")) ip = _get_real_ip(request) geo = _get_geo(ip) geo_text = "" if geo: geo_text += "\n📍 *Местоположение:* " + geo geo_text += "\n🖥 *IP:* " + ip text += geo_text if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: log_event("telegram_not_configured") return {"ok": True} try: payload = _json.dumps({ "chat_id": TELEGRAM_CHAT_ID, "text": text, "parse_mode": "Markdown", }).encode() url = f"{TELEGRAM_API_URL}{TELEGRAM_BOT_TOKEN}/sendMessage" req = _urllib_request.Request(url, data=payload, headers={"Content-Type": "application/json"}) with _urllib_request.urlopen(req, timeout=10) as resp: resp.read() except Exception as e: log_event("telegram_send_error", error=str(e)) raise HTTPException(status_code=502, detail="Ошибка отправки запроса") return {"ok": True} @app.post("/api/contact") async def contact_ruslan(request: Request): import re as _re try: data = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") name = str(data.get("name", "")).strip() email = str(data.get("email", "")).strip() phone = str(data.get("phone", "")).strip() text = str(data.get("text", "")).strip() if not name or not email or not phone or not text: raise HTTPException(status_code=422, detail="Заполните все обязательные поля") if not _re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", email): raise HTTPException(status_code=422, detail="Некорректный email") if not _re.match(r"^[\+\d][\d\s\-\(\)]{6,18}$", phone): raise HTTPException(status_code=422, detail="Некорректный номер телефона") divider = "━━━━━━━━━━━━━━━━━━━━━━" msg = ( f"🔔 *Сообщение через форму полигона*\n" f"{divider}\n\n" f"👤 *Имя:* {name}\n" f"📧 *Email:* {email}\n" f"📱 *Телефон:* {phone}\n\n" f"💬 *Сообщение:*\n{text}" ) ip = _get_real_ip(request) geo = _get_geo(ip) geo_text = "" if geo: geo_text += f"\n📍 *Местоположение:* {geo}" geo_text += f"\n🖥 *IP:* {ip}" msg += geo_text if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: log_event("telegram_not_configured") return {"ok": True} try: payload = _json.dumps({ "chat_id": TELEGRAM_CHAT_ID, "text": msg, "parse_mode": "Markdown", }).encode() url = f"{TELEGRAM_API_URL}{TELEGRAM_BOT_TOKEN}/sendMessage" req = _urllib_request.Request(url, data=payload, headers={"Content-Type": "application/json"}) with _urllib_request.urlopen(req, timeout=10) as resp: resp.read() except Exception as e: log_event("telegram_send_error", error=str(e)) raise HTTPException(status_code=502, detail="Ошибка отправки") return {"ok": True} @app.post("/login") def login( request: Request, username: str = Form(...), password: str = Form(...), csrf_token: str = Form(...), db: Session = Depends(get_db), ): cookie_csrf = request.cookies.get(CSRF_COOKIE) if not cookie_csrf or csrf_token != cookie_csrf: raise HTTPException(status_code=403, detail="CSRF failed") user = db.scalar(select(User).where(User.username == username)) if not user or not verify_password(password, user.password_hash): csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) response = templates.TemplateResponse( "login.html", { "request": request, "csrf_token": csrf, "login_error": "Неверный логин или пароль", "session_notice": "", }, status_code=401, ) response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="lax", path="/") return response if not user_is_valid(user): csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) response = templates.TemplateResponse( "login.html", { "request": request, "csrf_token": csrf, "login_error": "Доступ к сервису приостоновлен, обратитесь к вашему менеджеру", }, status_code=403, ) response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="lax", path="/") return response response = RedirectResponse(url="/", status_code=303) issue_auth_cookie(response, user) issue_csrf_cookie(response) audit(db, "LOGIN", f"login success: {username}", user_id=user.id) return response @app.post("/logout") def logout(request: Request): response = RedirectResponse(url="/", status_code=303) response.delete_cookie(COOKIE_NAME, path="/") response.delete_cookie(CSRF_COOKIE, path="/") return response @app.get("/go/{slug}") def go_service( slug: str, sw: Optional[int] = Query(default=None, ge=320, le=7680), sh: Optional[int] = Query(default=None, ge=240, le=4320), user: User = Depends(require_user), db: Session = Depends(get_db), ): total_started = time.perf_counter() phase_ms = {} def _mark(name: str, started: float) -> None: phase_ms[name] = int((time.perf_counter() - started) * 1000) def _emit(result: str, **extra) -> None: payload = { "user_id": user.id, "service_slug": slug, "result": result, "total_ms": int((time.perf_counter() - total_started) * 1000), } payload.update(phase_ms) payload.update(extra) log_event("go_service_timing", **payload) log_event("session_open_requested", user_id=user.id, service_slug=slug, sw=sw, sh=sh) service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True)) if not service: raise HTTPException(status_code=404, detail="Service not found") if service.type == ServiceType.VNC: raise HTTPException(status_code=410, detail="VNC services are deprecated") if not has_access(db, user.id, service.id): raise HTTPException(status_code=403, detail="ACL denied") client_width, client_height = sanitize_client_resolution(sw, sh) log_event( "session_open_resolution", user_id=user.id, service_slug=slug, sw=sw, sh=sh, client_width=client_width, client_height=client_height, ) user_lock_started = time.perf_counter() try: with allocator_lock(db, 92000 + int(user.id), timeout_seconds=GO_USER_LOCK_TIMEOUT_SECONDS): _mark("wait_user_lock_ms", user_lock_started) t_existing = time.perf_counter() existing_user_session = find_active_session_for_user_service(db, user.id, service.id) _mark("check_existing_ms", t_existing) if existing_user_session: _emit("reuse_session", session_id=existing_user_session.id) if existing_user_session.container_id and existing_user_session.container_id.startswith("RDPSLOT:"): try: _rdp_slot_id = int(existing_user_session.container_id.split(":", 1)[1]) threading.Thread(target=connect_rdp_slot, args=(_rdp_slot_id,), daemon=True).start() except Exception: pass return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303) t_limit = time.perf_counter() cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) active_rows = db.scalars( select(SessionModel).where( SessionModel.user_id == user.id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) ).all() active_rows = sorted(active_rows, key=lambda row: row.created_at) active_service_ids = {row.service_id for row in active_rows} _mark("check_limit_ms", t_limit) if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER: oldest = next((row for row in active_rows if row.service_id != service.id), None) if oldest: t_rotate = time.perf_counter() terminate_session_record(db, oldest, SessionStatus.ROTATED, stop_container=True) db.commit() _mark("rotate_oldest_ms", t_rotate) log_event( "session_rotated", user_id=user.id, closed_session_id=oldest.id, closed_service_id=oldest.service_id, new_service_id=service.id, ) else: _emit("max_services_redirect") return RedirectResponse(url="/?launch_error=max_services", status_code=303) if service.type == ServiceType.RDP: t_rdp_slots = time.perf_counter() slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == service.id)).all() _mark("check_rdp_slots_ms", t_rdp_slots) if slots: session_id = str(uuid.uuid4()) try: with allocator_lock(db, 91003, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS): busy_slot_ids: set[int] = set() for row in db.scalars( select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.service_id == service.id, SessionModel.container_id.like("RDPSLOT:%"), ) ).all(): try: busy_slot_ids.add(int(row.container_id.split(":", 1)[1])) except Exception: pass free_slot = next((s for s in slots if s.id not in busy_slot_ids), None) if not free_slot: _emit("rdp_all_slots_busy") raise HTTPException( status_code=503, detail="Все слоты этого RDP сервиса заняты. Попробуйте позже.", ) session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=f"RDPSLOT:{free_slot.id}", status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() except LockTimeoutError: _emit("rdp_slot_lock_timeout") raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.") log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="rdp_slot", slot_id=free_slot.id) audit(db, "SESSION_CREATE_RDP_SLOT", f"service={service.slug} session={session_id} slot={free_slot.id}", user_id=user.id) _emit("session_created_rdp_slot", session_id=session_id, slot_id=free_slot.id) threading.Thread(target=connect_rdp_slot, args=(free_slot.id,), daemon=True).start() return RedirectResponse(url=f"/s/{session_id}/", status_code=303) else: # Legacy: no slots configured — exclusive single-session behaviour active_owner = find_active_session_for_service(db, service.id) if active_owner: if active_owner.user_id != user.id: _emit("rdp_busy_legacy") raise HTTPException(status_code=503, detail="RDP сервис занят. Попробуйте позже.") _emit("reuse_rdp_session", session_id=active_owner.id) return RedirectResponse(url=session_redirect_url(active_owner), status_code=303) session_id = str(uuid.uuid4()) if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0: try: t_pool_lock = time.perf_counter() with allocator_lock(db, 91001, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS): _mark("wait_web_pool_lock_ms", t_pool_lock) t_ensure = time.perf_counter() ensure_web_pool() _mark("ensure_web_pool_ms", t_ensure) t_acquire = time.perf_counter() slot = acquire_web_pool_slot(db) _mark("acquire_web_slot_ms", t_acquire) slot_cid = f"WEBPOOLIDX:{slot}" t_dispatch = time.perf_counter() terminate_active_slot_sessions(db, slot_cid) dispatch_web_pool_target(slot, service, width=client_width, height=client_height) _mark("dispatch_web_target_ms", t_dispatch) t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=slot_cid, status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) except LockTimeoutError: _emit("web_pool_lock_timeout") raise HTTPException(status_code=503, detail="Пул WEB занят. Повторите через несколько секунд.") except Exception as exc: logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id) log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="web_pool", error=str(exc)) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) _emit("web_pool_create_failed", error=str(exc)) raise HTTPException(status_code=502, detail="WEB runtime failed to switch target") log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="web_pool", slot=slot) audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id) _emit("session_created_web_pool", session_id=session_id, slot=slot) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) if service_uses_universal_pool(service): try: t_pool_lock = time.perf_counter() with allocator_lock(db, 91002, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS): _mark("wait_universal_pool_lock_ms", t_pool_lock) t_ensure = time.perf_counter() ensure_universal_pool() _mark("ensure_universal_pool_ms", t_ensure) t_acquire = time.perf_counter() slot = acquire_universal_slot(db) _mark("acquire_universal_slot_ms", t_acquire) slot_cid = f"POOLIDX:{slot}" t_dispatch = time.perf_counter() terminate_active_slot_sessions(db, slot_cid) dispatch_universal_target(slot, service, width=client_width, height=client_height) _mark("dispatch_universal_target_ms", t_dispatch) t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=slot_cid, status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) except LockTimeoutError: _emit("universal_pool_lock_timeout") raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.") except Exception as exc: logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id) log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="universal_pool", error=str(exc)) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) _emit("universal_pool_create_failed", error=str(exc)) raise HTTPException(status_code=502, detail="Universal runtime failed to switch target") log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="universal_pool", slot=slot) audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id) _emit("session_created_universal_pool", session_id=session_id, slot=slot) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) if service.type == ServiceType.WEB and desired_pool_size(service) > 0: t_warm = time.perf_counter() ensure_warm_pool(service) open_warm_web_url(service, service.target) _mark("warm_pool_prepare_ms", t_warm) t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=f"POOL:{service.slug}", status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="warm_pool") audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id) _emit("session_created_warm_pool", session_id=session_id) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) try: t_create = time.perf_counter() container_id = create_runtime_container(service, session_id) _mark("create_runtime_container_ms", t_create) except Exception as exc: logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id) log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="single_runtime", error=str(exc)) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) _emit("single_runtime_create_failed", error=str(exc)) raise HTTPException(status_code=502, detail="Session runtime failed to start") t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=container_id, status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="single_runtime", container_id=container_id) audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id) t_wait = time.perf_counter() ready = wait_for_session_route(session_id) _mark("wait_session_route_ms", t_wait) log_event("session_route_ready", session_id=session_id, ready=ready) _emit("session_created_single_runtime", session_id=session_id, ready=ready) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) except LockTimeoutError: _emit("user_lock_timeout") raise HTTPException(status_code=429, detail="Слишком много параллельных запусков. Повторите через несколько секунд.") @app.get("/svc/{slug}/", response_class=HTMLResponse) def service_wait_page(slug: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)): service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True)) if not service: raise HTTPException(status_code=404, detail="Service not found") if not has_access(db, user.id, service.id): raise HTTPException(status_code=403, detail="ACL denied") return HTMLResponse( content="""{hint_esc}
''' creds_html = f'''