import datetime as dt import logging import re import secrets import threading import uuid import time import contextvars from typing import Optional from fastapi import Depends, FastAPI, File, Form, HTTPException, Query, Request, UploadFile, status from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from markupsafe import Markup, escape from sqlalchemy import select from sqlalchemy import delete, select, text, update from sqlalchemy.orm import Session from starlette.responses import HTMLResponse as _HR import urllib.request as _urllib_request import urllib.parse as _urllib_parse import json as _json from config import ( COOKIE_NAME, CSRF_COOKIE, GO_POOL_LOCK_TIMEOUT_SECONDS, GO_USER_LOCK_TIMEOUT_SECONDS, LOG_LEVEL, LOG_SLOW_REQUEST_MS, MAX_ACTIVE_SERVICES_PER_USER, PUBLIC_HOST, SESSION_IDLE_SECONDS, WEB_POOL_BUFFER, WEB_POOL_SIZE, TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, TELEGRAM_API_URL, ) from database import get_db from models import ( AuditLog, Category, RdpSlot, Service, ServiceCategory, ServiceType, SessionModel, SessionStatus, User, UserServiceAccess, ) from utils import ( audit, ensure_icons_dir, format_service_comment, log_event, normalize_web_target, now_utc, parse_rdp_target, remove_icon_file, request_id_ctx, set_service_categories, session_closed_reason, store_service_icon, ) from auth import ( get_current_user, has_access, issue_auth_cookie, issue_csrf_cookie, require_admin, require_user, user_is_valid, validate_csrf, verify_password, hash_password, ) from runtime import ( acquire_universal_slot, acquire_web_pool_slot, allocator_lock, container_running, create_runtime_container, desired_pool_size, dispatch_universal_target, dispatch_web_pool_target, docker_client, ensure_universal_pool, ensure_warm_pool, ensure_web_pool, find_active_session_for_service, find_active_session_for_user_service, get_active_sessions_count, get_pool_detailed_status, get_pool_status_for_service, get_universal_pool_status, get_web_pool_status, LockTimeoutError, open_warm_web_url, _rdp_slot_container_name, route_ready, sanitize_client_resolution, service_uses_universal_pool, session_redirect_url, connect_rdp_slot, start_rdp_slot_container, stop_rdp_slot_container, stop_runtime_container, terminate_active_slot_sessions, terminate_session_record, wait_for_session_route, ) from maintenance import on_startup logging.basicConfig( level=LOG_LEVEL, format="%(asctime)s %(levelname)s %(name)s %(message)s", ) logger = logging.getLogger("portal") templates = Jinja2Templates(directory="templates") app = FastAPI(title="МОНТ - инфрастуктурный полигон") app.mount("/static", StaticFiles(directory="static"), name="static") @app.middleware("http") async def request_logging_middleware(request: Request, call_next): req_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8]) token = request_id_ctx.set(req_id) started = time.time() client_ip = request.client.host if request.client else "-" user_agent = request.headers.get("user-agent", "-") try: response = await call_next(request) except Exception: log_event( "request_failed", level=logging.ERROR, method=request.method, path=request.url.path, client_ip=client_ip, user_agent=user_agent, ) request_id_ctx.reset(token) raise duration_ms = int((time.time() - started) * 1000) level = logging.INFO if response.status_code >= 500: level = logging.ERROR elif response.status_code >= 400: level = logging.WARNING log_event( "request", level=level, method=request.method, path=request.url.path, query=str(request.url.query or ""), status=response.status_code, duration_ms=duration_ms, client_ip=client_ip, user_agent=user_agent, ) if duration_ms >= LOG_SLOW_REQUEST_MS: log_event( "slow_request", level=logging.WARNING, method=request.method, path=request.url.path, duration_ms=duration_ms, threshold_ms=LOG_SLOW_REQUEST_MS, ) response.headers["X-Request-ID"] = req_id request_id_ctx.reset(token) return response _MOBILE_UA_RE = re.compile( r"(Mobile|Android|iPhone|iPad|iPod|BlackBerry|IEMobile|Opera Mini|webOS)", re.IGNORECASE, ) _MOBILE_PAGE = ( "" '' "" '' '' "МОНТ - инфрастуктурный полигон" "" "" "" '' '
🖥
' "

Ресурс доступен
только с ПК

" "

Пожалуйста, откройте эту страницу на компьютере или ноутбуке.

" "" "" ) @app.middleware("http") async def mobile_block_middleware(request: Request, call_next): path = request.url.path if path.startswith("/static/"): return await call_next(request) ua = request.headers.get("user-agent", "") if _MOBILE_UA_RE.search(ua): return _HR(content=_MOBILE_PAGE, status_code=200) return await call_next(request) @app.on_event("startup") def startup_event(): on_startup() @app.get("/", response_class=HTMLResponse) def index(request: Request, user: Optional[User] = Depends(get_current_user), db: Session = Depends(get_db)): session_closed = (request.query_params.get("session_closed") or "").strip().lower() launch_error = (request.query_params.get("launch_error") or "").strip().lower() session_notice = "" if session_closed == "idle": session_notice = "Сессия была закрыта из-за простоя. Откройте сервис заново." elif session_closed == "limit": session_notice = ( f"Сессия была закрыта из-за лимита в {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). " "Освободите один сервис и попробуйте снова." ) elif launch_error == "max_services": session_notice = ( f"Есть ограничение на {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). " "Освободите один сервис и попробуйте снова." ) if not user: csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) response = templates.TemplateResponse( "login.html", { "request": request, "csrf_token": csrf, "login_error": "", "session_notice": session_notice, }, ) response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="lax", path="/") return response services = db.scalars( select(Service) .join(UserServiceAccess, UserServiceAccess.service_id == Service.id) .where( UserServiceAccess.user_id == user.id, Service.active == True, Service.type.in_([ServiceType.WEB, ServiceType.RDP]), ) .order_by(Service.name) ).all() service_categories = {svc.id: [] for svc in services} categories = [] if services: service_ids = [svc.id for svc in services] rows = db.execute( select(ServiceCategory.service_id, Category.id, Category.name, Category.slug) .join(Category, Category.id == ServiceCategory.category_id) .where(ServiceCategory.service_id.in_(service_ids)) .order_by(Category.name) ).all() category_map = {} for service_id, category_id, category_name, category_slug in rows: service_categories.setdefault(service_id, []).append( { "id": category_id, "name": category_name, "slug": category_slug, } ) if category_id not in category_map: category_map[category_id] = {"id": category_id, "name": category_name, "slug": category_slug} categories = sorted(category_map.values(), key=lambda x: x["name"].lower()) selected_category_slug = (request.query_params.get("category") or "").strip().lower() if selected_category_slug: services = [ svc for svc in services if any(cat["slug"] == selected_category_slug for cat in service_categories.get(svc.id, [])) ] service_comment_html = {svc.id: format_service_comment(svc.comment) for svc in services} return templates.TemplateResponse( "dashboard.html", { "request": request, "user": user, "services": services, "categories": categories, "selected_category_slug": selected_category_slug, "service_categories": service_categories, "service_comment_html": service_comment_html, "csrf_token": request.cookies.get(CSRF_COOKIE, ""), "session_notice": session_notice, }, ) @app.get("/admin", response_class=HTMLResponse) def admin_page(request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)): users = db.scalars(select(User).order_by(User.id)).all() categories = db.scalars(select(Category).order_by(Category.name)).all() services = db.scalars(select(Service).where(Service.type.in_([ServiceType.WEB, ServiceType.RDP])).order_by(Service.id)).all() web_services = [s for s in services if s.type == ServiceType.WEB] rdp_services = [s for s in services if s.type == ServiceType.RDP] service_category_map = {s.id: [] for s in services} if services: service_rows = db.execute( select(ServiceCategory.service_id, ServiceCategory.category_id).where( ServiceCategory.service_id.in_([s.id for s in services]) ) ).all() for service_id, category_id in service_rows: service_category_map.setdefault(service_id, []).append(category_id) acl_rows = db.scalars(select(UserServiceAccess)).all() acl = {} for row in acl_rows: acl.setdefault(row.user_id, []).append(row.service_id) for user_id in acl: acl[user_id] = sorted(acl[user_id]) pool_status = {s.id: get_pool_status_for_service(s) for s in services} service_health = {} for sid, st in pool_status.items(): service_health[sid] = { "health": st["health"], "running": st["running"], "desired": st["desired"], "active_sessions": get_active_sessions_count(db, sid), } web_pool = get_web_pool_status() web_totals = { "services": len(web_services), "running": web_pool["running"], "desired": web_pool["desired"], "active_sessions": sum(service_health[s.id]["active_sessions"] for s in web_services), } recent_sessions = db.execute( text( """ SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug, s.status, s.created_at, s.last_access_at FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE sv.type IN ('WEB','RDP') ORDER BY s.created_at DESC LIMIT 200 """ ) ).mappings().all() open_stats = db.execute( text( """ SELECT u.username, sv.name AS service_name, sv.slug AS service_slug, COUNT(*) AS opens FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE sv.type IN ('WEB','RDP') GROUP BY u.username, sv.name, sv.slug ORDER BY opens DESC, u.username ASC LIMIT 200 """ ) ).mappings().all() cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) online_sessions = db.execute( text( """ SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug, sv.type AS service_type, s.container_id, s.created_at, s.last_access_at FROM sessions s JOIN users u ON u.id = s.user_id JOIN services sv ON sv.id = s.service_id WHERE s.status = 'ACTIVE' AND s.last_access_at >= :cutoff AND sv.type IN ('WEB','RDP') ORDER BY s.last_access_at DESC, s.created_at DESC LIMIT 500 """ ), {"cutoff": cutoff}, ).mappings().all() rdp_slots: dict[int, list] = {} for svc in rdp_services: slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == svc.id).order_by(RdpSlot.id)).all() slot_list = [] for slot in slots: active_sess = db.scalar( select(SessionModel).where( SessionModel.container_id == f"RDPSLOT:{slot.id}", SessionModel.status == SessionStatus.ACTIVE, ) ) running = False if slot.container_name: try: c = docker_client().containers.get(slot.container_name) running = c.status == "running" except Exception: pass occupied_username = None if active_sess: u = db.get(User, active_sess.user_id) occupied_username = u.username if u else f"id={active_sess.user_id}" slot_list.append({ "id": slot.id, "rdp_username": slot.rdp_username, "container_name": slot.container_name or "", "running": running, "occupied_username": occupied_username, }) rdp_slots[svc.id] = slot_list return templates.TemplateResponse( "admin.html", { "request": request, "admin": admin, "users": users, "web_services": web_services, "rdp_services": rdp_services, "services": services, "categories": categories, "service_category_map": service_category_map, "acl": acl, "pool_status": pool_status, "service_health": service_health, "web_totals": web_totals, "web_pool_size": WEB_POOL_SIZE, "web_pool_buffer": WEB_POOL_BUFFER, "recent_sessions": recent_sessions, "open_stats": open_stats, "online_sessions": online_sessions, "csrf_token": request.cookies.get(CSRF_COOKIE, ""), "max_active_services_per_user": MAX_ACTIVE_SERVICES_PER_USER, "rdp_slots": rdp_slots, }, ) @app.get("/api/public/services-by-category") def public_services_by_category(db: Session = Depends(get_db)): services = db.execute( select(Service).where(Service.active == True).order_by(Service.name) ).scalars().all() categories = db.execute(select(Category).order_by(Category.name)).scalars().all() cat_map = {c.id: c.name for c in categories} svc_cats: dict[int, list[str]] = {} links = db.execute(select(ServiceCategory)).scalars().all() for lnk in links: svc_cats.setdefault(lnk.service_id, []).append(cat_map.get(lnk.category_id, "")) result: dict[str, list[dict]] = {"Без категории": []} for svc in services: cats = svc_cats.get(svc.id, []) entry = {"id": svc.id, "name": svc.name} if cats: for cat in cats: result.setdefault(cat, []).append(entry) else: result["Без категории"].append(entry) if not result["Без категории"]: del result["Без категории"] return result @app.post("/api/request-access") async def request_access(request: Request, db: Session = Depends(get_db)): try: data = await request.json() except Exception: raise HTTPException(status_code=400, detail="Invalid JSON") name = str(data.get("name", "")).strip() company = str(data.get("company", "")).strip() email = str(data.get("email", "")).strip() phone = str(data.get("phone", "")).strip() manager = str(data.get("manager", "")).strip() products = data.get("products", []) if not name or not company or not email or not phone: raise HTTPException(status_code=422, detail="Заполните все обязательные поля") products_text = "" if products: items = "\n".join(f" • {p}" for p in products) products_text = f"\n\n🖥 *Интересующие продукты:*\n{items}" divider = "━━━━━━━━━━━━━━━━━━━━━━" manager_text = f"\n🤝 *Менеджер МОНТ:* {manager}" if manager else "" text = ( f"🔔 *Новый запрос доступа к полигону МОНТ*\n" f"{divider}\n\n" f"👤 *Имя:* {name}\n" f"🏢 *Компания:* {company}\n" f"📧 *Email:* {email}\n" f"📱 *Телефон:* {phone}" f"{manager_text}" f"{products_text}" ) if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: log_event("telegram_not_configured", {}) return {"ok": True} try: payload = _json.dumps({ "chat_id": TELEGRAM_CHAT_ID, "text": text, "parse_mode": "Markdown", }).encode() url = f"{TELEGRAM_API_URL}{TELEGRAM_BOT_TOKEN}/sendMessage" req = _urllib_request.Request(url, data=payload, headers={"Content-Type": "application/json"}) with _urllib_request.urlopen(req, timeout=10) as resp: resp.read() except Exception as e: log_event("telegram_send_error", {"error": str(e)}) raise HTTPException(status_code=502, detail="Ошибка отправки запроса") return {"ok": True} @app.post("/login") def login( request: Request, username: str = Form(...), password: str = Form(...), csrf_token: str = Form(...), db: Session = Depends(get_db), ): cookie_csrf = request.cookies.get(CSRF_COOKIE) if not cookie_csrf or csrf_token != cookie_csrf: raise HTTPException(status_code=403, detail="CSRF failed") user = db.scalar(select(User).where(User.username == username)) if not user or not verify_password(password, user.password_hash): csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) response = templates.TemplateResponse( "login.html", { "request": request, "csrf_token": csrf, "login_error": "Неверный логин или пароль", "session_notice": "", }, status_code=401, ) response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="lax", path="/") return response if not user_is_valid(user): csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24) response = templates.TemplateResponse( "login.html", { "request": request, "csrf_token": csrf, "login_error": "Доступ к сервису приостоновлен, обратитесь к вашему менеджеру", }, status_code=403, ) response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="lax", path="/") return response response = RedirectResponse(url="/", status_code=303) issue_auth_cookie(response, user) issue_csrf_cookie(response) audit(db, "LOGIN", f"login success: {username}", user_id=user.id) return response @app.post("/logout") def logout(request: Request): response = RedirectResponse(url="/", status_code=303) response.delete_cookie(COOKIE_NAME, path="/") response.delete_cookie(CSRF_COOKIE, path="/") return response @app.get("/go/{slug}") def go_service( slug: str, sw: Optional[int] = Query(default=None, ge=320, le=7680), sh: Optional[int] = Query(default=None, ge=240, le=4320), user: User = Depends(require_user), db: Session = Depends(get_db), ): total_started = time.perf_counter() phase_ms = {} def _mark(name: str, started: float) -> None: phase_ms[name] = int((time.perf_counter() - started) * 1000) def _emit(result: str, **extra) -> None: payload = { "user_id": user.id, "service_slug": slug, "result": result, "total_ms": int((time.perf_counter() - total_started) * 1000), } payload.update(phase_ms) payload.update(extra) log_event("go_service_timing", **payload) log_event("session_open_requested", user_id=user.id, service_slug=slug, sw=sw, sh=sh) service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True)) if not service: raise HTTPException(status_code=404, detail="Service not found") if service.type == ServiceType.VNC: raise HTTPException(status_code=410, detail="VNC services are deprecated") if not has_access(db, user.id, service.id): raise HTTPException(status_code=403, detail="ACL denied") client_width, client_height = sanitize_client_resolution(sw, sh) log_event( "session_open_resolution", user_id=user.id, service_slug=slug, sw=sw, sh=sh, client_width=client_width, client_height=client_height, ) user_lock_started = time.perf_counter() try: with allocator_lock(db, 92000 + int(user.id), timeout_seconds=GO_USER_LOCK_TIMEOUT_SECONDS): _mark("wait_user_lock_ms", user_lock_started) t_existing = time.perf_counter() existing_user_session = find_active_session_for_user_service(db, user.id, service.id) _mark("check_existing_ms", t_existing) if existing_user_session: _emit("reuse_session", session_id=existing_user_session.id) if existing_user_session.container_id and existing_user_session.container_id.startswith("RDPSLOT:"): try: _rdp_slot_id = int(existing_user_session.container_id.split(":", 1)[1]) threading.Thread(target=connect_rdp_slot, args=(_rdp_slot_id,), daemon=True).start() except Exception: pass return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303) t_limit = time.perf_counter() cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS) active_rows = db.scalars( select(SessionModel).where( SessionModel.user_id == user.id, SessionModel.status == SessionStatus.ACTIVE, SessionModel.last_access_at >= cutoff, ) ).all() active_rows = sorted(active_rows, key=lambda row: row.created_at) active_service_ids = {row.service_id for row in active_rows} _mark("check_limit_ms", t_limit) if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER: oldest = next((row for row in active_rows if row.service_id != service.id), None) if oldest: t_rotate = time.perf_counter() terminate_session_record(db, oldest, SessionStatus.ROTATED, stop_container=True) db.commit() _mark("rotate_oldest_ms", t_rotate) log_event( "session_rotated", user_id=user.id, closed_session_id=oldest.id, closed_service_id=oldest.service_id, new_service_id=service.id, ) else: _emit("max_services_redirect") return RedirectResponse(url="/?launch_error=max_services", status_code=303) if service.type == ServiceType.RDP: t_rdp_slots = time.perf_counter() slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == service.id)).all() _mark("check_rdp_slots_ms", t_rdp_slots) if slots: session_id = str(uuid.uuid4()) try: with allocator_lock(db, 91003, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS): busy_slot_ids: set[int] = set() for row in db.scalars( select(SessionModel).where( SessionModel.status == SessionStatus.ACTIVE, SessionModel.service_id == service.id, SessionModel.container_id.like("RDPSLOT:%"), ) ).all(): try: busy_slot_ids.add(int(row.container_id.split(":", 1)[1])) except Exception: pass free_slot = next((s for s in slots if s.id not in busy_slot_ids), None) if not free_slot: _emit("rdp_all_slots_busy") raise HTTPException( status_code=503, detail="Все слоты этого RDP сервиса заняты. Попробуйте позже.", ) session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=f"RDPSLOT:{free_slot.id}", status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() except LockTimeoutError: _emit("rdp_slot_lock_timeout") raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.") log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="rdp_slot", slot_id=free_slot.id) audit(db, "SESSION_CREATE_RDP_SLOT", f"service={service.slug} session={session_id} slot={free_slot.id}", user_id=user.id) _emit("session_created_rdp_slot", session_id=session_id, slot_id=free_slot.id) threading.Thread(target=connect_rdp_slot, args=(free_slot.id,), daemon=True).start() return RedirectResponse(url=f"/s/{session_id}/", status_code=303) else: # Legacy: no slots configured — exclusive single-session behaviour active_owner = find_active_session_for_service(db, service.id) if active_owner: if active_owner.user_id != user.id: _emit("rdp_busy_legacy") raise HTTPException(status_code=503, detail="RDP сервис занят. Попробуйте позже.") _emit("reuse_rdp_session", session_id=active_owner.id) return RedirectResponse(url=session_redirect_url(active_owner), status_code=303) session_id = str(uuid.uuid4()) if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0: try: t_pool_lock = time.perf_counter() with allocator_lock(db, 91001, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS): _mark("wait_web_pool_lock_ms", t_pool_lock) t_ensure = time.perf_counter() ensure_web_pool() _mark("ensure_web_pool_ms", t_ensure) t_acquire = time.perf_counter() slot = acquire_web_pool_slot(db) _mark("acquire_web_slot_ms", t_acquire) slot_cid = f"WEBPOOLIDX:{slot}" t_dispatch = time.perf_counter() terminate_active_slot_sessions(db, slot_cid) dispatch_web_pool_target(slot, service, width=client_width, height=client_height) _mark("dispatch_web_target_ms", t_dispatch) t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=slot_cid, status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) except LockTimeoutError: _emit("web_pool_lock_timeout") raise HTTPException(status_code=503, detail="Пул WEB занят. Повторите через несколько секунд.") except Exception as exc: logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id) log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="web_pool", error=str(exc)) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) _emit("web_pool_create_failed", error=str(exc)) raise HTTPException(status_code=502, detail="WEB runtime failed to switch target") log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="web_pool", slot=slot) audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id) _emit("session_created_web_pool", session_id=session_id, slot=slot) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) if service_uses_universal_pool(service): try: t_pool_lock = time.perf_counter() with allocator_lock(db, 91002, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS): _mark("wait_universal_pool_lock_ms", t_pool_lock) t_ensure = time.perf_counter() ensure_universal_pool() _mark("ensure_universal_pool_ms", t_ensure) t_acquire = time.perf_counter() slot = acquire_universal_slot(db) _mark("acquire_universal_slot_ms", t_acquire) slot_cid = f"POOLIDX:{slot}" t_dispatch = time.perf_counter() terminate_active_slot_sessions(db, slot_cid) dispatch_universal_target(slot, service, width=client_width, height=client_height) _mark("dispatch_universal_target_ms", t_dispatch) t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=slot_cid, status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) except LockTimeoutError: _emit("universal_pool_lock_timeout") raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.") except Exception as exc: logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id) log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="universal_pool", error=str(exc)) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) _emit("universal_pool_create_failed", error=str(exc)) raise HTTPException(status_code=502, detail="Universal runtime failed to switch target") log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="universal_pool", slot=slot) audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id) _emit("session_created_universal_pool", session_id=session_id, slot=slot) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) if service.type == ServiceType.WEB and desired_pool_size(service) > 0: t_warm = time.perf_counter() ensure_warm_pool(service) open_warm_web_url(service, service.target) _mark("warm_pool_prepare_ms", t_warm) t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=f"POOL:{service.slug}", status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="warm_pool") audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id) _emit("session_created_warm_pool", session_id=session_id) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) try: t_create = time.perf_counter() container_id = create_runtime_container(service, session_id) _mark("create_runtime_container_ms", t_create) except Exception as exc: logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id) log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="single_runtime", error=str(exc)) audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id) _emit("single_runtime_create_failed", error=str(exc)) raise HTTPException(status_code=502, detail="Session runtime failed to start") t_commit = time.perf_counter() session_obj = SessionModel( id=session_id, user_id=user.id, service_id=service.id, container_id=container_id, status=SessionStatus.ACTIVE, created_at=now_utc(), last_access_at=now_utc(), ) db.add(session_obj) db.commit() _mark("db_commit_ms", t_commit) log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="single_runtime", container_id=container_id) audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id) t_wait = time.perf_counter() ready = wait_for_session_route(session_id) _mark("wait_session_route_ms", t_wait) log_event("session_route_ready", session_id=session_id, ready=ready) _emit("session_created_single_runtime", session_id=session_id, ready=ready) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) except LockTimeoutError: _emit("user_lock_timeout") raise HTTPException(status_code=429, detail="Слишком много параллельных запусков. Повторите через несколько секунд.") @app.get("/svc/{slug}/", response_class=HTMLResponse) def service_wait_page(slug: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)): service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True)) if not service: raise HTTPException(status_code=404, detail="Service not found") if not has_access(db, user.id, service.id): raise HTTPException(status_code=403, detail="ACL denied") return HTMLResponse( content=""" Service Starting
Сервис запускается
Проверка...
""".strip(), status_code=200, ) @app.get("/s/{session_id}/", response_class=HTMLResponse) def session_wait_page(session_id: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)): sess = db.get(SessionModel, session_id) if not sess or sess.user_id != user.id: raise HTTPException(status_code=404, detail="Session not found") if sess.status != SessionStatus.ACTIVE: raise HTTPException(status_code=410, detail="Session is not active") service = db.get(Service, sess.service_id) service_title = service.name if service else "Сервис" is_rdp = service and service.type == ServiceType.RDP label = "Ожидайте..." if is_rdp else "Сессия запускается..." redirect_target = session_redirect_url(sess) return HTMLResponse( content=f""" {service_title}
{label}
Проверка...
{session_id}
""".strip(), status_code=200, ) @app.get("/s/{session_id}/view", response_class=HTMLResponse) def session_view_page(session_id: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)): sess = db.get(SessionModel, session_id) if not sess or sess.user_id != user.id: raise HTTPException(status_code=404, detail="Session not found") if sess.status != SessionStatus.ACTIVE: raise HTTPException(status_code=410, detail="Session is not active") service = db.get(Service, sess.service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") iframe_src = None if sess.container_id and sess.container_id.startswith("POOL:"): iframe_src = f"/svc/{service.slug}/?sid={session_id}" elif sess.container_id and sess.container_id.startswith("WEBPOOLIDX:"): try: slot = int(sess.container_id.split(":", 1)[1]) iframe_src = f"/w/{slot}/?sid={session_id}" except Exception: iframe_src = None elif sess.container_id and sess.container_id.startswith("POOLIDX:"): try: slot = int(sess.container_id.split(":", 1)[1]) iframe_src = f"/u/{slot}/?sid={session_id}" except Exception: iframe_src = None elif sess.container_id and sess.container_id.startswith("RDPSLOT:"): try: slot = int(sess.container_id.split(":", 1)[1]) iframe_src = f"/rdp/{slot}/?sid={session_id}" except Exception: iframe_src = None if iframe_src: creds_html = "" if service.type != ServiceType.RDP and (service.svc_login or service.svc_password): rows = "" if service.svc_login: login_esc = service.svc_login.replace('"', '"').replace('<', '<') rows += f'''
Логин{login_esc}
''' if service.svc_password: pass_esc = service.svc_password.replace('"', '"').replace('<', '<') rows += f'''
Пароль{pass_esc}
''' if service.svc_cred_hint: hint_esc = service.svc_cred_hint.replace('<', '<') rows += f'''

{hint_esc}

''' creds_html = f'''
{rows}
''' return HTMLResponse( content=f""" {service.name} {creds_html} """.strip() ) return RedirectResponse(url=f"/s/{session_id}/", status_code=303) @app.post("/api/sessions/{session_id}/touch") def touch_session(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)): sess = db.get(SessionModel, session_id) if not sess or sess.user_id != user.id: raise HTTPException(status_code=404, detail="Session not found") if sess.status != SessionStatus.ACTIVE: reason = session_closed_reason(sess, db) log_event( "session_touch_rejected", level=logging.WARNING, session_id=session_id, user_id=user.id, status=sess.status.value, reason=reason, ) return JSONResponse( status_code=410, content={ "ok": False, "reason": reason, "status": sess.status.value, }, ) sess.last_access_at = now_utc() db.commit() return {"ok": True} @app.post("/api/sessions/{session_id}/close") def close_session(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)): sess = db.get(SessionModel, session_id) if not sess or sess.user_id != user.id: raise HTTPException(status_code=404, detail="Session not found") if sess.status != SessionStatus.ACTIVE: log_event( "session_close_already_closed", session_id=session_id, user_id=user.id, status=sess.status.value, reason=session_closed_reason(sess, db), ) return {"ok": True, "status": sess.status.value} terminate_session_record(db, sess, SessionStatus.TERMINATED, stop_container=True) db.commit() log_event("session_closed_by_user", session_id=session_id, user_id=user.id) return {"ok": True, "status": "TERMINATED"} @app.get("/api/services/{slug}/status") def service_status(slug: str, user: User = Depends(require_user), db: Session = Depends(get_db)): service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True)) if not service: raise HTTPException(status_code=404, detail="Service not found") if service.type == ServiceType.VNC: raise HTTPException(status_code=410, detail="VNC services are deprecated") if not has_access(db, user.id, service.id): raise HTTPException(status_code=403, detail="ACL denied") pool = get_pool_status_for_service(service) route_ok = route_ready(f"/svc/{slug}/") ready = route_ok and (pool["running"] > 0 if desired_pool_size(service) > 0 else True) steps = [ f"ACL: OK ({user.username})", f"Пул: {pool['running']} / {pool['desired']}", f"Маршрут /svc/{slug}/: {'OK' if route_ok else 'ожидание'}", ] return { "ready": ready, "message": "Готово, открываем..." if ready else "Поднимаем контейнер и маршрут...", "steps": steps, } @app.get("/api/sessions/{session_id}/status") def session_status(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)): sess = db.get(SessionModel, session_id) if not sess or sess.user_id != user.id: raise HTTPException(status_code=404, detail="Session not found") if sess.status != SessionStatus.ACTIVE: raise HTTPException(status_code=410, detail="Session is not active") service = db.get(Service, sess.service_id) pooled_web = bool(sess.container_id and sess.container_id.startswith("POOL:") and service and service.type == ServiceType.WEB) web_pool_idx = None universal_pool_idx = None if sess.container_id and sess.container_id.startswith("WEBPOOLIDX:"): try: web_pool_idx = int(sess.container_id.split(":", 1)[1]) except Exception: web_pool_idx = None if sess.container_id and sess.container_id.startswith("POOLIDX:"): try: universal_pool_idx = int(sess.container_id.split(":", 1)[1]) except Exception: universal_pool_idx = None pooled_rdp = bool(sess.container_id and sess.container_id.startswith("POOL:") and service and service.type == ServiceType.RDP) rdp_slot_idx = None if sess.container_id and sess.container_id.startswith("RDPSLOT:"): try: rdp_slot_idx = int(sess.container_id.split(":", 1)[1]) except Exception: rdp_slot_idx = None if pooled_web and service: route_path = f"/svc/{service.slug}/" elif pooled_rdp and service: route_path = f"/svc/{service.slug}/" elif rdp_slot_idx is not None: route_path = f"/rdp/{rdp_slot_idx}/" else: route_path = f"/s/{session_id}/" if web_pool_idx is not None: route_path = f"/w/{web_pool_idx}/" if universal_pool_idx is not None: route_path = f"/u/{universal_pool_idx}/" route_ok = route_ready(route_path) running = container_running(sess.container_id) ready = running and route_ok steps = [ f"Контейнер: {'running' if running else 'starting'}", f"Маршрут {route_path}: {'OK' if route_ok else 'ожидание'}", ] payload = { "ready": ready, "message": "Готово, открываем..." if ready else "Запуск сессии...", "steps": steps, } if pooled_web or pooled_rdp: payload["redirect_url"] = f"/s/{session_id}/view" if web_pool_idx is not None: payload["redirect_url"] = f"/s/{session_id}/view" if universal_pool_idx is not None: payload["redirect_url"] = f"/s/{session_id}/view" if rdp_slot_idx is not None: payload["redirect_url"] = f"/s/{session_id}/view" return payload @app.post("/api/admin/services") def create_service(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service_type = ServiceType(payload["type"]) if service_type == ServiceType.VNC: raise HTTPException(status_code=400, detail="VNC services are no longer supported") target = payload["target"] if service_type == ServiceType.WEB: target = normalize_web_target(target) elif service_type == ServiceType.RDP: parse_rdp_target(target) service = Service( name=payload["name"], slug=payload["slug"], type=service_type, target=target, comment=payload.get("comment", ""), svc_login=payload.get("svc_login", ""), svc_password=payload.get("svc_password", ""), svc_cred_hint=payload.get("svc_cred_hint", ""), active=payload.get("active", True), warm_pool_size=max(0, int(payload.get("warm_pool_size", 0))), ) db.add(service) db.flush() set_service_categories(db, service.id, payload.get("category_ids", [])) db.commit() if service.type == ServiceType.WEB and WEB_POOL_SIZE <= 0: ensure_warm_pool(service) elif service_uses_universal_pool(service): ensure_universal_pool() return {"id": service.id} @app.get("/api/admin/services/{service_id}/containers/status") def service_containers_status(service_id: int, _: User = Depends(require_admin), db: Session = Depends(get_db)): service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") out = get_pool_detailed_status(service) out["active_sessions"] = get_active_sessions_count(db, service.id) return out @app.post("/api/admin/services/{service_id}/icon") async def upload_service_icon( service_id: int, request: Request, file: UploadFile = File(...), _: User = Depends(require_admin), db: Session = Depends(get_db), ): validate_csrf(request) service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") new_path = await store_service_icon(service, file) old_path = service.icon_path service.icon_path = new_path db.commit() if old_path and old_path != new_path: remove_icon_file(old_path) return {"ok": True, "icon_path": new_path} @app.delete("/api/admin/services/{service_id}/icon") def delete_service_icon(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") old_path = service.icon_path service.icon_path = "" db.commit() remove_icon_file(old_path) return {"ok": True} @app.put("/api/admin/services/{service_id}") def edit_service(service_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") for key in ["name", "slug", "target", "active", "comment", "svc_login", "svc_password", "svc_cred_hint"]: if key in payload: setattr(service, key, payload[key]) if "type" in payload: service.type = ServiceType(payload["type"]) if service.type == ServiceType.VNC: raise HTTPException(status_code=400, detail="VNC services are no longer supported") if service.type == ServiceType.WEB: service.target = normalize_web_target(service.target) elif service.type == ServiceType.RDP: parse_rdp_target(service.target) if "warm_pool_size" in payload: service.warm_pool_size = max(0, int(payload["warm_pool_size"])) if "category_ids" in payload: set_service_categories(db, service.id, payload.get("category_ids", [])) db.commit() if service.type == ServiceType.WEB: if WEB_POOL_SIZE <= 0: ensure_warm_pool(service) open_warm_web_url(service, service.target) elif service_uses_universal_pool(service): ensure_universal_pool() return {"ok": True} @app.delete("/api/admin/services/{service_id}") def delete_service(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") if service.type == ServiceType.WEB and WEB_POOL_SIZE <= 0: ensure_warm_pool(service, 0) remove_icon_file(service.icon_path) db.delete(service) db.commit() return {"ok": True} @app.post("/api/admin/services/{service_id}/prewarm") def prewarm_now(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service = db.get(Service, service_id) if not service: raise HTTPException(status_code=404, detail="Service not found") if service.type == ServiceType.WEB: ensure_web_pool() return {"ok": True, "pool": get_web_pool_status()} if service_uses_universal_pool(service): ensure_universal_pool() return {"ok": True, "pool": get_universal_pool_status()} if service.type == ServiceType.RDP: return {"ok": True, "pool": get_pool_status_for_service(service), "message": "RDP запускается on-demand"} ensure_warm_pool(service) return {"ok": True, "pool": get_pool_status_for_service(service)} @app.post("/api/admin/services/{service_id}/rdp-slots") def create_rdp_slot(service_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) service = db.get(Service, service_id) if not service or service.type != ServiceType.RDP: raise HTTPException(status_code=404, detail="RDP service not found") rdp_username = (payload.get("rdp_username") or "").strip() rdp_password = (payload.get("rdp_password") or "").strip() if not rdp_username: raise HTTPException(status_code=400, detail="rdp_username is required") slot = RdpSlot(service_id=service_id, rdp_username=rdp_username, rdp_password=rdp_password) db.add(slot) db.flush() try: container_name = start_rdp_slot_container(slot, service) slot.container_name = container_name except Exception as exc: logger.exception("rdp_slot_container_start_failed service_id=%s", service_id) raise HTTPException(status_code=502, detail=f"Контейнер не запустился: {exc}") db.commit() audit(db, "RDP_SLOT_CREATE", f"service={service.slug} slot={slot.id} user={rdp_username}", user_id=None) return {"ok": True, "slot_id": slot.id, "container_name": slot.container_name} @app.delete("/api/admin/rdp-slots/{slot_id}") def delete_rdp_slot(slot_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) slot = db.get(RdpSlot, slot_id) if not slot: raise HTTPException(status_code=404, detail="Slot not found") container_name = slot.container_name db.delete(slot) db.commit() if container_name: threading.Thread(target=stop_rdp_slot_container, args=(container_name,), daemon=True).start() return {"ok": True} @app.post("/api/admin/categories") def create_category(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) name = (payload.get("name") or "").strip() slug = (payload.get("slug") or "").strip().lower().replace(" ", "-") if not name: raise HTTPException(status_code=400, detail="Category name is required") if not slug: raise HTTPException(status_code=400, detail="Category slug is required") exists = db.scalar(select(Category).where((Category.name == name) | (Category.slug == slug))) if exists: raise HTTPException(status_code=409, detail="Category already exists") category = Category(name=name, slug=slug) db.add(category) db.commit() return {"id": category.id} @app.delete("/api/admin/categories/{category_id}") def delete_category(category_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) category = db.get(Category, category_id) if not category: raise HTTPException(status_code=404, detail="Category not found") db.delete(category) db.commit() return {"ok": True} @app.put("/api/admin/web-pool-size") def update_web_pool_size(payload: dict, request: Request, _: User = Depends(require_admin)): validate_csrf(request) global WEB_POOL_SIZE value = max(0, int(payload.get("size", WEB_POOL_SIZE))) WEB_POOL_SIZE = value ensure_web_pool() return {"ok": True, "size": WEB_POOL_SIZE, "pool": get_web_pool_status()} @app.post("/api/admin/users") def create_user(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) expires_at = dt.datetime.fromisoformat(payload["expires_at"]) user = User( username=payload["username"], password_hash=hash_password(payload["password"]), expires_at=expires_at, active=payload.get("active", True), is_admin=payload.get("is_admin", False), first_name=payload.get("first_name", ""), last_name=payload.get("last_name", ""), ) db.add(user) db.commit() return {"id": user.id} @app.put("/api/admin/users/{user_id}") def edit_user(user_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") for key in ["username", "active", "is_admin", "first_name", "last_name"]: if key in payload: setattr(user, key, payload[key]) if "password" in payload and payload["password"]: user.password_hash = hash_password(payload["password"]) if "expires_at" in payload: user.expires_at = dt.datetime.fromisoformat(payload["expires_at"]) db.commit() return {"ok": True} @app.delete("/api/admin/users/{user_id}") def delete_user(user_id: int, request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") if user.id == admin.id: raise HTTPException(status_code=400, detail="Cannot delete current admin") db.delete(user) db.commit() return {"ok": True} @app.put("/api/admin/users/{user_id}/acl") def set_acl(user_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)): validate_csrf(request) user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") service_ids = set(payload.get("service_ids", [])) existing = db.scalars(select(UserServiceAccess).where(UserServiceAccess.user_id == user_id)).all() existing_map = {x.service_id: x for x in existing} for sid in service_ids: if sid not in existing_map: db.add(UserServiceAccess(user_id=user_id, service_id=sid)) for sid, row in existing_map.items(): if sid not in service_ids: db.delete(row) db.commit() return {"ok": True}