Files
Stend_mont/app/main.py
T

1615 lines
68 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime as dt
import logging
import re
import secrets
import threading
import uuid
import time
import contextvars
from typing import Optional
from fastapi import Depends, FastAPI, File, Form, HTTPException, Query, Request, UploadFile, status
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from markupsafe import Markup, escape
from sqlalchemy import select
from sqlalchemy import delete, select, text, update
from sqlalchemy.orm import Session
from starlette.responses import HTMLResponse as _HR
import urllib.request as _urllib_request
import urllib.parse as _urllib_parse
import json as _json
from config import (
COOKIE_NAME, CSRF_COOKIE, GO_POOL_LOCK_TIMEOUT_SECONDS,
GO_USER_LOCK_TIMEOUT_SECONDS, LOG_LEVEL, LOG_SLOW_REQUEST_MS,
MAX_ACTIVE_SERVICES_PER_USER, PUBLIC_HOST, SESSION_IDLE_SECONDS,
WEB_POOL_BUFFER, WEB_POOL_SIZE,
TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, TELEGRAM_API_URL,
)
from database import get_db
from models import (
AuditLog, Category, RdpSlot, Service, ServiceCategory, ServiceType,
SessionModel, SessionStatus, User, UserServiceAccess,
)
from utils import (
audit, ensure_icons_dir, format_service_comment, log_event, normalize_web_target,
now_utc, parse_rdp_target, remove_icon_file, request_id_ctx, set_service_categories,
session_closed_reason, store_service_icon,
)
from auth import (
get_current_user, has_access, issue_auth_cookie, issue_csrf_cookie,
require_admin, require_user, user_is_valid, validate_csrf, verify_password, hash_password,
)
from runtime import (
acquire_universal_slot, acquire_web_pool_slot, allocator_lock,
container_running, create_runtime_container, desired_pool_size,
dispatch_universal_target, dispatch_web_pool_target,
docker_client, ensure_universal_pool, ensure_warm_pool, ensure_web_pool,
find_active_session_for_service, find_active_session_for_user_service,
get_active_sessions_count, get_pool_detailed_status, get_pool_status_for_service,
get_universal_pool_status, get_web_pool_status, LockTimeoutError, open_warm_web_url,
_rdp_slot_container_name, route_ready, sanitize_client_resolution,
service_uses_universal_pool, session_redirect_url,
connect_rdp_slot, start_rdp_slot_container, stop_rdp_slot_container,
stop_runtime_container, terminate_active_slot_sessions,
terminate_session_record, wait_for_session_route,
)
from maintenance import on_startup
logging.basicConfig(
level=LOG_LEVEL,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
logger = logging.getLogger("portal")
templates = Jinja2Templates(directory="templates")
def _get_real_ip(request) -> str:
"""Real client IP from X-Forwarded-For (Traefik trusts NPM via trustedIPs)."""
forwarded_for = request.headers.get("x-forwarded-for", "")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
return request.client.host if request.client else "unknown"
def _get_geo(ip: str) -> str:
"""Lookup city/country for IP via ip-api.com. Returns formatted string or empty."""
try:
if ip in ("unknown", "127.0.0.1", "::1") or ip.startswith("10.") or ip.startswith("192.168."):
return ""
url = f"http://ip-api.com/json/{ip}?lang=ru&fields=status,country,regionName,city,query"
req = _urllib_request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with _urllib_request.urlopen(req, timeout=5) as resp:
data = _json.loads(resp.read())
if data.get("status") == "success":
parts = [data.get("country", ""), data.get("regionName", ""), data.get("city", "")]
return ", ".join(p for p in parts if p)
except Exception:
pass
return ""
app = FastAPI(title="MONT - инфрастуктурный полигон")
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.middleware("http")
async def request_logging_middleware(request: Request, call_next):
req_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
token = request_id_ctx.set(req_id)
started = time.time()
client_ip = request.client.host if request.client else "-"
user_agent = request.headers.get("user-agent", "-")
try:
response = await call_next(request)
except Exception:
log_event(
"request_failed",
level=logging.ERROR,
method=request.method,
path=request.url.path,
client_ip=client_ip,
user_agent=user_agent,
)
request_id_ctx.reset(token)
raise
duration_ms = int((time.time() - started) * 1000)
level = logging.INFO
if response.status_code >= 500:
level = logging.ERROR
elif response.status_code >= 400:
level = logging.WARNING
log_event(
"request",
level=level,
method=request.method,
path=request.url.path,
query=str(request.url.query or ""),
status=response.status_code,
duration_ms=duration_ms,
client_ip=client_ip,
user_agent=user_agent,
)
if duration_ms >= LOG_SLOW_REQUEST_MS:
log_event(
"slow_request",
level=logging.WARNING,
method=request.method,
path=request.url.path,
duration_ms=duration_ms,
threshold_ms=LOG_SLOW_REQUEST_MS,
)
response.headers["X-Request-ID"] = req_id
request_id_ctx.reset(token)
return response
_MOBILE_UA_RE = re.compile(
r"(Mobile|Android|iPhone|iPad|iPod|BlackBerry|IEMobile|Opera Mini|webOS)",
re.IGNORECASE,
)
_MOBILE_PAGE = (
"<!doctype html>"
'<html lang="ru">'
"<head>"
'<meta charset="utf-8"/>'
'<meta name="viewport" content="width=device-width,initial-scale=1"/>'
"<title>MONT - инфрастуктурный полигон</title>"
"<style>"
"*{box-sizing:border-box;margin:0;padding:0}"
"body{min-height:100dvh;display:flex;flex-direction:column;align-items:center;"
"justify-content:center;background:linear-gradient(160deg,#0a2a4a 0%,#1565a0 60%,#1e88c8 100%);"
"font-family:sans-serif;color:#fff;padding:2rem 1.5rem;text-align:center}"
".logo{width:120px;margin-bottom:2rem}"
"h1{font-size:1.3rem;font-weight:700;margin-bottom:1rem;line-height:1.35}"
"p{font-size:0.95rem;color:rgba(255,255,255,.75);line-height:1.5;max-width:280px}"
".icon{font-size:3.5rem;margin-bottom:1.2rem}"
"</style>"
"</head>"
"<body>"
'<img class="logo" src="/static/logo.png" alt="MONT"/>'
'<div class="icon">&#128421;</div>'
"<h1>Ресурс доступен<br>только с ПК</h1>"
"<p>Пожалуйста, откройте эту страницу на компьютере или ноутбуке.</p>"
"</body>"
"</html>"
)
@app.middleware("http")
async def mobile_block_middleware(request: Request, call_next):
path = request.url.path
if path.startswith("/static/"):
return await call_next(request)
ua = request.headers.get("user-agent", "")
if _MOBILE_UA_RE.search(ua):
return _HR(content=_MOBILE_PAGE, status_code=200)
return await call_next(request)
@app.on_event("startup")
def startup_event():
on_startup()
@app.get("/", response_class=HTMLResponse)
def index(request: Request, user: Optional[User] = Depends(get_current_user), db: Session = Depends(get_db)):
session_closed = (request.query_params.get("session_closed") or "").strip().lower()
launch_error = (request.query_params.get("launch_error") or "").strip().lower()
session_notice = ""
if session_closed == "idle":
session_notice = "Сессия была закрыта из-за простоя. Откройте сервис заново."
elif session_closed == "limit":
session_notice = (
f"Сессия была закрыта из-за лимита в {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). "
"Освободите один сервис и попробуйте снова."
)
elif launch_error == "max_services":
session_notice = (
f"Есть ограничение на {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). "
"Освободите один сервис и попробуйте снова."
)
if not user:
csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
response = templates.TemplateResponse(
"login.html",
{
"request": request,
"csrf_token": csrf,
"login_error": "",
"session_notice": session_notice,
},
)
response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="lax", path="/")
return response
services = db.scalars(
select(Service)
.join(UserServiceAccess, UserServiceAccess.service_id == Service.id)
.where(
UserServiceAccess.user_id == user.id,
Service.active == True,
Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
)
.order_by(Service.name)
).all()
service_categories = {svc.id: [] for svc in services}
categories = []
if services:
service_ids = [svc.id for svc in services]
rows = db.execute(
select(ServiceCategory.service_id, Category.id, Category.name, Category.slug)
.join(Category, Category.id == ServiceCategory.category_id)
.where(ServiceCategory.service_id.in_(service_ids))
.order_by(Category.name)
).all()
category_map = {}
for service_id, category_id, category_name, category_slug in rows:
service_categories.setdefault(service_id, []).append(
{
"id": category_id,
"name": category_name,
"slug": category_slug,
}
)
if category_id not in category_map:
category_map[category_id] = {"id": category_id, "name": category_name, "slug": category_slug}
categories = sorted(category_map.values(), key=lambda x: x["name"].lower())
selected_category_slug = (request.query_params.get("category") or "").strip().lower()
if selected_category_slug:
services = [
svc for svc in services
if any(cat["slug"] == selected_category_slug for cat in service_categories.get(svc.id, []))
]
service_comment_html = {svc.id: format_service_comment(svc.comment) for svc in services}
return templates.TemplateResponse(
"dashboard.html",
{
"request": request,
"user": user,
"services": services,
"categories": categories,
"selected_category_slug": selected_category_slug,
"service_categories": service_categories,
"service_comment_html": service_comment_html,
"csrf_token": request.cookies.get(CSRF_COOKIE, ""),
"session_notice": session_notice,
},
)
@app.get("/admin", response_class=HTMLResponse)
def admin_page(request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)):
users = db.scalars(select(User).order_by(User.id)).all()
categories = db.scalars(select(Category).order_by(Category.name)).all()
services = db.scalars(select(Service).where(Service.type.in_([ServiceType.WEB, ServiceType.RDP])).order_by(Service.id)).all()
web_services = [s for s in services if s.type == ServiceType.WEB]
rdp_services = [s for s in services if s.type == ServiceType.RDP]
service_category_map = {s.id: [] for s in services}
if services:
service_rows = db.execute(
select(ServiceCategory.service_id, ServiceCategory.category_id).where(
ServiceCategory.service_id.in_([s.id for s in services])
)
).all()
for service_id, category_id in service_rows:
service_category_map.setdefault(service_id, []).append(category_id)
acl_rows = db.scalars(select(UserServiceAccess)).all()
acl = {}
for row in acl_rows:
acl.setdefault(row.user_id, []).append(row.service_id)
for user_id in acl:
acl[user_id] = sorted(acl[user_id])
pool_status = {s.id: get_pool_status_for_service(s) for s in services}
service_health = {}
for sid, st in pool_status.items():
service_health[sid] = {
"health": st["health"],
"running": st["running"],
"desired": st["desired"],
"active_sessions": get_active_sessions_count(db, sid),
}
web_pool = get_web_pool_status()
web_totals = {
"services": len(web_services),
"running": web_pool["running"],
"desired": web_pool["desired"],
"active_sessions": sum(service_health[s.id]["active_sessions"] for s in web_services),
}
recent_sessions = db.execute(
text(
"""
SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug,
s.status, s.created_at, s.last_access_at
FROM sessions s
JOIN users u ON u.id = s.user_id
JOIN services sv ON sv.id = s.service_id
WHERE sv.type IN ('WEB','RDP')
ORDER BY s.created_at DESC
LIMIT 200
"""
)
).mappings().all()
open_stats = db.execute(
text(
"""
SELECT u.username, sv.name AS service_name, sv.slug AS service_slug, COUNT(*) AS opens
FROM sessions s
JOIN users u ON u.id = s.user_id
JOIN services sv ON sv.id = s.service_id
WHERE sv.type IN ('WEB','RDP')
GROUP BY u.username, sv.name, sv.slug
ORDER BY opens DESC, u.username ASC
LIMIT 200
"""
)
).mappings().all()
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
online_sessions = db.execute(
text(
"""
SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug,
sv.type AS service_type, s.container_id, s.created_at, s.last_access_at
FROM sessions s
JOIN users u ON u.id = s.user_id
JOIN services sv ON sv.id = s.service_id
WHERE s.status = 'ACTIVE'
AND s.last_access_at >= :cutoff
AND sv.type IN ('WEB','RDP')
ORDER BY s.last_access_at DESC, s.created_at DESC
LIMIT 500
"""
),
{"cutoff": cutoff},
).mappings().all()
rdp_slots: dict[int, list] = {}
for svc in rdp_services:
slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == svc.id).order_by(RdpSlot.id)).all()
slot_list = []
for slot in slots:
active_sess = db.scalar(
select(SessionModel).where(
SessionModel.container_id == f"RDPSLOT:{slot.id}",
SessionModel.status == SessionStatus.ACTIVE,
)
)
running = False
if slot.container_name:
try:
c = docker_client().containers.get(slot.container_name)
running = c.status == "running"
except Exception:
pass
occupied_username = None
if active_sess:
u = db.get(User, active_sess.user_id)
occupied_username = u.username if u else f"id={active_sess.user_id}"
slot_list.append({
"id": slot.id,
"rdp_username": slot.rdp_username,
"container_name": slot.container_name or "",
"running": running,
"occupied_username": occupied_username,
})
rdp_slots[svc.id] = slot_list
return templates.TemplateResponse(
"admin.html",
{
"request": request,
"admin": admin,
"users": users,
"web_services": web_services,
"rdp_services": rdp_services,
"services": services,
"categories": categories,
"service_category_map": service_category_map,
"acl": acl,
"pool_status": pool_status,
"service_health": service_health,
"web_totals": web_totals,
"web_pool_size": WEB_POOL_SIZE,
"web_pool_buffer": WEB_POOL_BUFFER,
"recent_sessions": recent_sessions,
"open_stats": open_stats,
"online_sessions": online_sessions,
"csrf_token": request.cookies.get(CSRF_COOKIE, ""),
"max_active_services_per_user": MAX_ACTIVE_SERVICES_PER_USER,
"rdp_slots": rdp_slots,
},
)
@app.get("/yandex_b847b9b35f967fcc.html", include_in_schema=False)
def yandex_verify():
from fastapi.responses import HTMLResponse
return HTMLResponse('''<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
</head>
<body>Verification: b847b9b35f967fcc</body>
</html>''')
@app.get("/privacy", include_in_schema=False)
def privacy_page():
from fastapi.responses import RedirectResponse
return RedirectResponse(url="https://www.mont.ru/ru-ru/agreement", status_code=301)
@app.get("/robots.txt", include_in_schema=False)
def robots_txt():
from fastapi.responses import FileResponse
return FileResponse("static/robots.txt", media_type="text/plain")
@app.get("/sitemap.xml", include_in_schema=False)
def sitemap_xml():
from fastapi.responses import FileResponse
return FileResponse("static/sitemap.xml", media_type="application/xml")
@app.get("/api/public/services-by-category")
def public_services_by_category(db: Session = Depends(get_db)):
services = db.execute(
select(Service).where(Service.active == True).order_by(Service.name)
).scalars().all()
categories = db.execute(select(Category).order_by(Category.name)).scalars().all()
cat_map = {c.id: c.name for c in categories}
svc_cats: dict[int, list[str]] = {}
links = db.execute(select(ServiceCategory)).scalars().all()
for lnk in links:
svc_cats.setdefault(lnk.service_id, []).append(cat_map.get(lnk.category_id, ""))
result: dict[str, list[dict]] = {"Без категории": []}
for svc in services:
cats = svc_cats.get(svc.id, [])
entry = {"id": svc.id, "name": svc.name}
if cats:
for cat in cats:
result.setdefault(cat, []).append(entry)
else:
result["Без категории"].append(entry)
if not result["Без категории"]:
del result["Без категории"]
return result
@app.post("/api/request-access")
async def request_access(request: Request, db: Session = Depends(get_db)):
try:
data = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
name = str(data.get("name", "")).strip()
company = str(data.get("company", "")).strip()
email = str(data.get("email", "")).strip()
phone = str(data.get("phone", "")).strip()
manager = str(data.get("manager", "")).strip()
products = data.get("products", [])
import re as _re
if not name or not company or not email or not phone:
raise HTTPException(status_code=422, detail="Заполните все обязательные поля")
if not _re.match(r'^[^\s@]+@[^\s@]+\.[^\s@]+$', email):
raise HTTPException(status_code=422, detail="Некорректный email")
if not _re.match(r'^[\+\d][\d\s\-\(\)]{6,18}$', phone):
raise HTTPException(status_code=422, detail="Некорректный номер телефона")
products_text = ""
if products:
items = "\n".join(f"{p}" for p in products)
products_text = f"\n\n🖥 *Интересующие продукты:*\n{items}"
divider = "━━━━━━━━━━━━━━━━━━━━━━"
manager_text = f"\n🤝 *Менеджер MONT:* {manager}" if manager else ""
text = (
f"🔔 *Новый запрос доступа к полигону MONT*\n"
f"{divider}\n\n"
f"👤 *Имя:* {name}\n"
f"🏢 *Компания:* {company}\n"
f"📧 *Email:* {email}\n"
f"📱 *Телефон:* {phone}"
f"{manager_text}"
f"{products_text}"
)
log_event("ip_headers", xff=request.headers.get("x-forwarded-for",""), xri=request.headers.get("x-real-ip",""), client=str(request.client.host if request.client else ""))
ip = _get_real_ip(request)
geo = _get_geo(ip)
geo_text = ""
if geo:
geo_text += "\n📍 *Местоположение:* " + geo
geo_text += "\n🖥 *IP:* " + ip
text += geo_text
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
log_event("telegram_not_configured")
return {"ok": True}
try:
payload = _json.dumps({
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "Markdown",
}).encode()
url = f"{TELEGRAM_API_URL}{TELEGRAM_BOT_TOKEN}/sendMessage"
req = _urllib_request.Request(url, data=payload, headers={"Content-Type": "application/json"})
with _urllib_request.urlopen(req, timeout=10) as resp:
resp.read()
except Exception as e:
log_event("telegram_send_error", error=str(e))
raise HTTPException(status_code=502, detail="Ошибка отправки запроса")
return {"ok": True}
@app.post("/api/contact")
async def contact_ruslan(request: Request):
import re as _re
try:
data = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
name = str(data.get("name", "")).strip()
email = str(data.get("email", "")).strip()
phone = str(data.get("phone", "")).strip()
text = str(data.get("text", "")).strip()
if not name or not email or not phone or not text:
raise HTTPException(status_code=422, detail="Заполните все обязательные поля")
if not _re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", email):
raise HTTPException(status_code=422, detail="Некорректный email")
if not _re.match(r"^[\+\d][\d\s\-\(\)]{6,18}$", phone):
raise HTTPException(status_code=422, detail="Некорректный номер телефона")
divider = "━━━━━━━━━━━━━━━━━━━━━━"
msg = (
f"🔔 *Сообщение через форму полигона*\n"
f"{divider}\n\n"
f"👤 *Имя:* {name}\n"
f"📧 *Email:* {email}\n"
f"📱 *Телефон:* {phone}\n\n"
f"💬 *Сообщение:*\n{text}"
)
ip = _get_real_ip(request)
geo = _get_geo(ip)
geo_text = ""
if geo:
geo_text += f"\n📍 *Местоположение:* {geo}"
geo_text += f"\n🖥 *IP:* {ip}"
msg += geo_text
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
log_event("telegram_not_configured")
return {"ok": True}
try:
payload = _json.dumps({
"chat_id": TELEGRAM_CHAT_ID,
"text": msg,
"parse_mode": "Markdown",
}).encode()
url = f"{TELEGRAM_API_URL}{TELEGRAM_BOT_TOKEN}/sendMessage"
req = _urllib_request.Request(url, data=payload, headers={"Content-Type": "application/json"})
with _urllib_request.urlopen(req, timeout=10) as resp:
resp.read()
except Exception as e:
log_event("telegram_send_error", error=str(e))
raise HTTPException(status_code=502, detail="Ошибка отправки")
return {"ok": True}
@app.post("/login")
def login(
request: Request,
username: str = Form(...),
password: str = Form(...),
csrf_token: str = Form(...),
db: Session = Depends(get_db),
):
cookie_csrf = request.cookies.get(CSRF_COOKIE)
if not cookie_csrf or csrf_token != cookie_csrf:
raise HTTPException(status_code=403, detail="CSRF failed")
user = db.scalar(select(User).where(User.username == username))
if not user or not verify_password(password, user.password_hash):
csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
response = templates.TemplateResponse(
"login.html",
{
"request": request,
"csrf_token": csrf,
"login_error": "Неверный логин или пароль",
"session_notice": "",
},
status_code=401,
)
response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="lax", path="/")
return response
if not user_is_valid(user):
csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
response = templates.TemplateResponse(
"login.html",
{
"request": request,
"csrf_token": csrf,
"login_error": "Доступ к сервису приостоновлен, обратитесь к вашему менеджеру",
},
status_code=403,
)
response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="lax", path="/")
return response
response = RedirectResponse(url="/", status_code=303)
issue_auth_cookie(response, user)
issue_csrf_cookie(response)
audit(db, "LOGIN", f"login success: {username}", user_id=user.id)
return response
@app.post("/logout")
def logout(request: Request):
response = RedirectResponse(url="/", status_code=303)
response.delete_cookie(COOKIE_NAME, path="/")
response.delete_cookie(CSRF_COOKIE, path="/")
return response
@app.get("/go/{slug}")
def go_service(
slug: str,
sw: Optional[int] = Query(default=None, ge=320, le=7680),
sh: Optional[int] = Query(default=None, ge=240, le=4320),
user: User = Depends(require_user),
db: Session = Depends(get_db),
):
total_started = time.perf_counter()
phase_ms = {}
def _mark(name: str, started: float) -> None:
phase_ms[name] = int((time.perf_counter() - started) * 1000)
def _emit(result: str, **extra) -> None:
payload = {
"user_id": user.id,
"service_slug": slug,
"result": result,
"total_ms": int((time.perf_counter() - total_started) * 1000),
}
payload.update(phase_ms)
payload.update(extra)
log_event("go_service_timing", **payload)
log_event("session_open_requested", user_id=user.id, service_slug=slug, sw=sw, sh=sh)
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
if not service:
raise HTTPException(status_code=404, detail="Service not found")
if service.type == ServiceType.VNC:
raise HTTPException(status_code=410, detail="VNC services are deprecated")
if not has_access(db, user.id, service.id):
raise HTTPException(status_code=403, detail="ACL denied")
client_width, client_height = sanitize_client_resolution(sw, sh)
log_event(
"session_open_resolution",
user_id=user.id,
service_slug=slug,
sw=sw,
sh=sh,
client_width=client_width,
client_height=client_height,
)
user_lock_started = time.perf_counter()
try:
with allocator_lock(db, 92000 + int(user.id), timeout_seconds=GO_USER_LOCK_TIMEOUT_SECONDS):
_mark("wait_user_lock_ms", user_lock_started)
t_existing = time.perf_counter()
existing_user_session = find_active_session_for_user_service(db, user.id, service.id)
_mark("check_existing_ms", t_existing)
if existing_user_session:
_emit("reuse_session", session_id=existing_user_session.id)
if existing_user_session.container_id and existing_user_session.container_id.startswith("RDPSLOT:"):
try:
_rdp_slot_id = int(existing_user_session.container_id.split(":", 1)[1])
threading.Thread(target=connect_rdp_slot, args=(_rdp_slot_id,), daemon=True).start()
except Exception:
pass
return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303)
t_limit = time.perf_counter()
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
active_rows = db.scalars(
select(SessionModel).where(
SessionModel.user_id == user.id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
).all()
active_rows = sorted(active_rows, key=lambda row: row.created_at)
active_service_ids = {row.service_id for row in active_rows}
_mark("check_limit_ms", t_limit)
if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER:
oldest = next((row for row in active_rows if row.service_id != service.id), None)
if oldest:
t_rotate = time.perf_counter()
terminate_session_record(db, oldest, SessionStatus.ROTATED, stop_container=True)
db.commit()
_mark("rotate_oldest_ms", t_rotate)
log_event(
"session_rotated",
user_id=user.id,
closed_session_id=oldest.id,
closed_service_id=oldest.service_id,
new_service_id=service.id,
)
else:
_emit("max_services_redirect")
return RedirectResponse(url="/?launch_error=max_services", status_code=303)
if service.type == ServiceType.RDP:
t_rdp_slots = time.perf_counter()
slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == service.id)).all()
_mark("check_rdp_slots_ms", t_rdp_slots)
if slots:
session_id = str(uuid.uuid4())
try:
with allocator_lock(db, 91003, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
busy_slot_ids: set[int] = set()
for row in db.scalars(
select(SessionModel).where(
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.service_id == service.id,
SessionModel.container_id.like("RDPSLOT:%"),
)
).all():
try:
busy_slot_ids.add(int(row.container_id.split(":", 1)[1]))
except Exception:
pass
free_slot = next((s for s in slots if s.id not in busy_slot_ids), None)
if not free_slot:
_emit("rdp_all_slots_busy")
raise HTTPException(
status_code=503,
detail="Все слоты этого RDP сервиса заняты. Попробуйте позже.",
)
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=f"RDPSLOT:{free_slot.id}",
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
except LockTimeoutError:
_emit("rdp_slot_lock_timeout")
raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.")
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="rdp_slot", slot_id=free_slot.id)
audit(db, "SESSION_CREATE_RDP_SLOT", f"service={service.slug} session={session_id} slot={free_slot.id}", user_id=user.id)
_emit("session_created_rdp_slot", session_id=session_id, slot_id=free_slot.id)
threading.Thread(target=connect_rdp_slot, args=(free_slot.id,), daemon=True).start()
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
else:
# Legacy: no slots configured — exclusive single-session behaviour
active_owner = find_active_session_for_service(db, service.id)
if active_owner:
if active_owner.user_id != user.id:
_emit("rdp_busy_legacy")
raise HTTPException(status_code=503, detail="RDP сервис занят. Попробуйте позже.")
_emit("reuse_rdp_session", session_id=active_owner.id)
return RedirectResponse(url=session_redirect_url(active_owner), status_code=303)
session_id = str(uuid.uuid4())
if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0:
try:
t_pool_lock = time.perf_counter()
with allocator_lock(db, 91001, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
_mark("wait_web_pool_lock_ms", t_pool_lock)
t_ensure = time.perf_counter()
ensure_web_pool()
_mark("ensure_web_pool_ms", t_ensure)
t_acquire = time.perf_counter()
slot = acquire_web_pool_slot(db)
_mark("acquire_web_slot_ms", t_acquire)
slot_cid = f"WEBPOOLIDX:{slot}"
t_dispatch = time.perf_counter()
terminate_active_slot_sessions(db, slot_cid)
dispatch_web_pool_target(slot, service, width=client_width, height=client_height)
_mark("dispatch_web_target_ms", t_dispatch)
t_commit = time.perf_counter()
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=slot_cid,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
_mark("db_commit_ms", t_commit)
except LockTimeoutError:
_emit("web_pool_lock_timeout")
raise HTTPException(status_code=503, detail="Пул WEB занят. Повторите через несколько секунд.")
except Exception as exc:
logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="web_pool", error=str(exc))
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
_emit("web_pool_create_failed", error=str(exc))
raise HTTPException(status_code=502, detail="WEB runtime failed to switch target")
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="web_pool", slot=slot)
audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
_emit("session_created_web_pool", session_id=session_id, slot=slot)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
if service_uses_universal_pool(service):
try:
t_pool_lock = time.perf_counter()
with allocator_lock(db, 91002, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
_mark("wait_universal_pool_lock_ms", t_pool_lock)
t_ensure = time.perf_counter()
ensure_universal_pool()
_mark("ensure_universal_pool_ms", t_ensure)
t_acquire = time.perf_counter()
slot = acquire_universal_slot(db)
_mark("acquire_universal_slot_ms", t_acquire)
slot_cid = f"POOLIDX:{slot}"
t_dispatch = time.perf_counter()
terminate_active_slot_sessions(db, slot_cid)
dispatch_universal_target(slot, service, width=client_width, height=client_height)
_mark("dispatch_universal_target_ms", t_dispatch)
t_commit = time.perf_counter()
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=slot_cid,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
_mark("db_commit_ms", t_commit)
except LockTimeoutError:
_emit("universal_pool_lock_timeout")
raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.")
except Exception as exc:
logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="universal_pool", error=str(exc))
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
_emit("universal_pool_create_failed", error=str(exc))
raise HTTPException(status_code=502, detail="Universal runtime failed to switch target")
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="universal_pool", slot=slot)
audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
_emit("session_created_universal_pool", session_id=session_id, slot=slot)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
if service.type == ServiceType.WEB and desired_pool_size(service) > 0:
t_warm = time.perf_counter()
ensure_warm_pool(service)
open_warm_web_url(service, service.target)
_mark("warm_pool_prepare_ms", t_warm)
t_commit = time.perf_counter()
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=f"POOL:{service.slug}",
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
_mark("db_commit_ms", t_commit)
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="warm_pool")
audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id)
_emit("session_created_warm_pool", session_id=session_id)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
try:
t_create = time.perf_counter()
container_id = create_runtime_container(service, session_id)
_mark("create_runtime_container_ms", t_create)
except Exception as exc:
logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id)
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="single_runtime", error=str(exc))
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
_emit("single_runtime_create_failed", error=str(exc))
raise HTTPException(status_code=502, detail="Session runtime failed to start")
t_commit = time.perf_counter()
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=container_id,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
_mark("db_commit_ms", t_commit)
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="single_runtime", container_id=container_id)
audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id)
t_wait = time.perf_counter()
ready = wait_for_session_route(session_id)
_mark("wait_session_route_ms", t_wait)
log_event("session_route_ready", session_id=session_id, ready=ready)
_emit("session_created_single_runtime", session_id=session_id, ready=ready)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
except LockTimeoutError:
_emit("user_lock_timeout")
raise HTTPException(status_code=429, detail="Слишком много параллельных запусков. Повторите через несколько секунд.")
@app.get("/svc/{slug}/", response_class=HTMLResponse)
def service_wait_page(slug: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
if not service:
raise HTTPException(status_code=404, detail="Service not found")
if not has_access(db, user.id, service.id):
raise HTTPException(status_code=403, detail="ACL denied")
return HTMLResponse(
content="""
<!doctype html>
<html>
<head>
<meta charset='utf-8'>
<title>Service Starting</title>
<style>
body { font-family: sans-serif; background: #f4f6f8; display: grid; place-items: center; height: 100vh; margin: 0; color:#1b3145; }
.card { background: #fff; padding: 1rem 1.2rem; border-radius: 10px; box-shadow: 0 8px 20px rgba(0,0,0,.08); min-width: 340px; }
.title { font-weight: 700; margin-bottom: 0.5rem; }
.state { margin-bottom: 0.6rem; }
ul { margin: 0; padding-left: 1.1rem; }
li { margin: 0.2rem 0; }
</style>
</head>
<body>
<div class="card">
<div class="title">Сервис запускается</div>
<div class="state" id="state">Проверка...</div>
<ul id="steps"></ul>
</div>
<script>
const slug = window.location.pathname.replace(/^\\/svc\\//, '').replace(/\\/$/, '');
async function tick() {
const r = await fetch(`/api/services/${slug}/status`, {credentials:'include'});
if (!r.ok) return;
const data = await r.json();
document.getElementById('state').textContent = data.message || 'Запуск...';
const ul = document.getElementById('steps');
ul.innerHTML = '';
(data.steps || []).forEach((x) => {
const li = document.createElement('li');
li.textContent = x;
ul.appendChild(li);
});
if (data.ready) window.location.replace(`/svc/${slug}/`);
}
setInterval(tick, 1000);
tick();
</script>
</body>
</html>
""".strip(),
status_code=200,
)
@app.get("/s/{session_id}/", response_class=HTMLResponse)
def session_wait_page(session_id: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
sess = db.get(SessionModel, session_id)
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
raise HTTPException(status_code=410, detail="Session is not active")
service = db.get(Service, sess.service_id)
service_title = service.name if service else "Сервис"
is_rdp = service and service.type == ServiceType.RDP
label = "Ожидайте..." if is_rdp else "Сессия запускается..."
redirect_target = session_redirect_url(sess)
return HTMLResponse(
content=f"""
<!doctype html>
<html>
<head>
<meta charset='utf-8'>
<title>{service_title}</title>
<style>
*{{box-sizing:border-box}}
body{{font-family:sans-serif;background:#0f1720;display:grid;place-items:center;height:100vh;margin:0;color:#dce8f5}}
.card{{background:rgba(255,255,255,.06);border:1px solid rgba(255,255,255,.12);padding:1.6rem 2rem;border-radius:14px;
box-shadow:0 12px 32px rgba(0,0,0,.4);min-width:320px;max-width:440px;text-align:center}}
.spinner{{width:48px;height:48px;border:4px solid rgba(220,232,245,.15);border-top-color:#2a8cd6;
border-radius:50%;animation:spin .9s linear infinite;margin:0 auto 1.2rem}}
@keyframes spin{{to{{transform:rotate(360deg)}}}}
.title{{font-size:1.15rem;font-weight:700;margin-bottom:.5rem;color:#fff}}
.state{{font-size:.9rem;color:#a0b8cc;margin-bottom:.8rem;min-height:1.2em}}
ul{{margin:0;padding:0;list-style:none;font-size:.82rem;color:#7a99b0;text-align:left}}
li::before{{content:"· ";color:#2a8cd6}}
li+li{{margin-top:.2rem}}
.sid{{display:block;margin-top:1.2rem;font-size:.7rem;color:rgba(160,184,204,.4);word-break:break-all}}
</style>
</head>
<body>
<div class="card">
<div class="spinner"></div>
<div class="title">{label}</div>
<div class="state" id="state">Проверка...</div>
<ul id="steps"></ul>
<span class="sid">{session_id}</span>
</div>
<script>
const sessionId = "{session_id}";
async function tick() {{
const r = await fetch(`/api/sessions/${{sessionId}}/status`, {{credentials:'include'}});
if (!r.ok) return;
const data = await r.json();
document.getElementById('state').textContent = data.message || 'Запуск...';
const ul = document.getElementById('steps');
ul.innerHTML = '';
(data.steps || []).forEach((x) => {{
const li = document.createElement('li');
li.textContent = x;
ul.appendChild(li);
}});
if (data.ready) window.location.replace(data.redirect_url || "{redirect_target}");
}}
setInterval(tick, 1000);
tick();
</script>
</body>
</html>
""".strip(),
status_code=200,
)
@app.get("/s/{session_id}/view", response_class=HTMLResponse)
def session_view_page(session_id: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
sess = db.get(SessionModel, session_id)
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
raise HTTPException(status_code=410, detail="Session is not active")
service = db.get(Service, sess.service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
iframe_src = None
if sess.container_id and sess.container_id.startswith("POOL:"):
iframe_src = f"/svc/{service.slug}/?sid={session_id}"
elif sess.container_id and sess.container_id.startswith("WEBPOOLIDX:"):
try:
slot = int(sess.container_id.split(":", 1)[1])
iframe_src = f"/w/{slot}/?sid={session_id}"
except Exception:
iframe_src = None
elif sess.container_id and sess.container_id.startswith("POOLIDX:"):
try:
slot = int(sess.container_id.split(":", 1)[1])
iframe_src = f"/u/{slot}/?sid={session_id}"
except Exception:
iframe_src = None
elif sess.container_id and sess.container_id.startswith("RDPSLOT:"):
try:
slot = int(sess.container_id.split(":", 1)[1])
iframe_src = f"/rdp/{slot}/?sid={session_id}"
except Exception:
iframe_src = None
if iframe_src:
creds_html = ""
if service.type != ServiceType.RDP and (service.svc_login or service.svc_password):
rows = ""
if service.svc_login:
login_esc = service.svc_login.replace('"', '&quot;').replace('<', '&lt;')
rows += f'''<div class="cr-row"><span class="cr-label">Логин</span><span class="cr-val">{login_esc}</span></div>'''
if service.svc_password:
pass_esc = service.svc_password.replace('"', '&quot;').replace('<', '&lt;')
rows += f'''<div class="cr-row"><span class="cr-label">Пароль</span><span class="cr-val cr-masked">{pass_esc}</span></div>'''
if service.svc_cred_hint:
hint_esc = service.svc_cred_hint.replace('<', '&lt;')
rows += f'''<p class="cr-hint">{hint_esc}</p>'''
creds_html = f'''
<div class="creds-panel" id="creds-panel">
<button class="creds-close" id="creds-close" title="Закрыть">✕</button>
{rows}
</div>
<script>
document.getElementById("creds-close").onclick = function() {{
document.getElementById("creds-panel").style.display = "none";
}};
</script>'''
return HTMLResponse(
content=f"""
<!doctype html>
<html>
<head>
<meta charset='utf-8'>
<title>{service.name}</title>
<style>
html,body,iframe {{ margin:0; width:100%; height:100%; border:0; background:#0f1720; }}
.creds-panel{{
position:fixed;right:16px;top:16px;z-index:999;
background:linear-gradient(180deg,rgba(15,24,36,.88),rgba(9,14,22,.94));
border:1px solid rgba(255,255,255,.22);backdrop-filter:blur(6px);
box-shadow:0 10px 28px rgba(0,0,0,.4);padding:10px 12px 11px;border-radius:14px;
min-width:220px;max-width:320px;
}}
.creds-close{{
position:absolute;top:6px;right:8px;background:none;border:none;
color:rgba(255,255,255,.55);font-size:14px;cursor:pointer;line-height:1;padding:2px 4px;
}}
.creds-close:hover{{color:#fff}}
.cr-row{{display:flex;align-items:center;gap:6px;margin-bottom:5px;}}
.cr-label{{font:600 11px/1 sans-serif;text-transform:uppercase;letter-spacing:.04em;
color:rgba(180,210,240,.7);min-width:46px;flex-shrink:0;}}
.cr-val{{font:600 13px/1 monospace;color:#dce8f5;flex:1;overflow:hidden;
text-overflow:ellipsis;white-space:nowrap;}}
.cr-masked{{letter-spacing:.1em;font-size:14px;}}
.cr-hint{{margin:4px 0 0;font:400 11px/1.35 sans-serif;color:rgba(180,210,240,.65);}}
</style>
</head>
<body>
<iframe src="{iframe_src}" allow="clipboard-read; clipboard-write"></iframe>{creds_html}
</body>
</html>
""".strip()
)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
@app.post("/api/sessions/{session_id}/touch")
def touch_session(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
sess = db.get(SessionModel, session_id)
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
reason = session_closed_reason(sess, db)
log_event(
"session_touch_rejected",
level=logging.WARNING,
session_id=session_id,
user_id=user.id,
status=sess.status.value,
reason=reason,
)
return JSONResponse(
status_code=410,
content={
"ok": False,
"reason": reason,
"status": sess.status.value,
},
)
sess.last_access_at = now_utc()
db.commit()
return {"ok": True}
@app.post("/api/sessions/{session_id}/close")
def close_session(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
sess = db.get(SessionModel, session_id)
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
log_event(
"session_close_already_closed",
session_id=session_id,
user_id=user.id,
status=sess.status.value,
reason=session_closed_reason(sess, db),
)
return {"ok": True, "status": sess.status.value}
terminate_session_record(db, sess, SessionStatus.TERMINATED, stop_container=True)
db.commit()
log_event("session_closed_by_user", session_id=session_id, user_id=user.id)
return {"ok": True, "status": "TERMINATED"}
@app.get("/api/services/{slug}/status")
def service_status(slug: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
if not service:
raise HTTPException(status_code=404, detail="Service not found")
if service.type == ServiceType.VNC:
raise HTTPException(status_code=410, detail="VNC services are deprecated")
if not has_access(db, user.id, service.id):
raise HTTPException(status_code=403, detail="ACL denied")
pool = get_pool_status_for_service(service)
route_ok = route_ready(f"/svc/{slug}/")
ready = route_ok and (pool["running"] > 0 if desired_pool_size(service) > 0 else True)
steps = [
f"ACL: OK ({user.username})",
f"Пул: {pool['running']} / {pool['desired']}",
f"Маршрут /svc/{slug}/: {'OK' if route_ok else 'ожидание'}",
]
return {
"ready": ready,
"message": "Готово, открываем..." if ready else "Поднимаем контейнер и маршрут...",
"steps": steps,
}
@app.get("/api/sessions/{session_id}/status")
def session_status(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
sess = db.get(SessionModel, session_id)
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
raise HTTPException(status_code=410, detail="Session is not active")
service = db.get(Service, sess.service_id)
pooled_web = bool(sess.container_id and sess.container_id.startswith("POOL:") and service and service.type == ServiceType.WEB)
web_pool_idx = None
universal_pool_idx = None
if sess.container_id and sess.container_id.startswith("WEBPOOLIDX:"):
try:
web_pool_idx = int(sess.container_id.split(":", 1)[1])
except Exception:
web_pool_idx = None
if sess.container_id and sess.container_id.startswith("POOLIDX:"):
try:
universal_pool_idx = int(sess.container_id.split(":", 1)[1])
except Exception:
universal_pool_idx = None
pooled_rdp = bool(sess.container_id and sess.container_id.startswith("POOL:") and service and service.type == ServiceType.RDP)
rdp_slot_idx = None
if sess.container_id and sess.container_id.startswith("RDPSLOT:"):
try:
rdp_slot_idx = int(sess.container_id.split(":", 1)[1])
except Exception:
rdp_slot_idx = None
if pooled_web and service:
route_path = f"/svc/{service.slug}/"
elif pooled_rdp and service:
route_path = f"/svc/{service.slug}/"
elif rdp_slot_idx is not None:
route_path = f"/rdp/{rdp_slot_idx}/"
else:
route_path = f"/s/{session_id}/"
if web_pool_idx is not None:
route_path = f"/w/{web_pool_idx}/"
if universal_pool_idx is not None:
route_path = f"/u/{universal_pool_idx}/"
route_ok = route_ready(route_path)
running = container_running(sess.container_id)
ready = running and route_ok
steps = [
f"Контейнер: {'running' if running else 'starting'}",
f"Маршрут {route_path}: {'OK' if route_ok else 'ожидание'}",
]
payload = {
"ready": ready,
"message": "Готово, открываем..." if ready else "Запуск сессии...",
"steps": steps,
}
if pooled_web or pooled_rdp:
payload["redirect_url"] = f"/s/{session_id}/view"
if web_pool_idx is not None:
payload["redirect_url"] = f"/s/{session_id}/view"
if universal_pool_idx is not None:
payload["redirect_url"] = f"/s/{session_id}/view"
if rdp_slot_idx is not None:
payload["redirect_url"] = f"/s/{session_id}/view"
return payload
@app.post("/api/admin/services")
def create_service(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service_type = ServiceType(payload["type"])
if service_type == ServiceType.VNC:
raise HTTPException(status_code=400, detail="VNC services are no longer supported")
target = payload["target"]
if service_type == ServiceType.WEB:
target = normalize_web_target(target)
elif service_type == ServiceType.RDP:
parse_rdp_target(target)
service = Service(
name=payload["name"],
slug=payload["slug"],
type=service_type,
target=target,
comment=payload.get("comment", ""),
svc_login=payload.get("svc_login", ""),
svc_password=payload.get("svc_password", ""),
svc_cred_hint=payload.get("svc_cred_hint", ""),
active=payload.get("active", True),
warm_pool_size=max(0, int(payload.get("warm_pool_size", 0))),
)
db.add(service)
db.flush()
set_service_categories(db, service.id, payload.get("category_ids", []))
db.commit()
if service.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
ensure_warm_pool(service)
elif service_uses_universal_pool(service):
ensure_universal_pool()
return {"id": service.id}
@app.get("/api/admin/services/{service_id}/containers/status")
def service_containers_status(service_id: int, _: User = Depends(require_admin), db: Session = Depends(get_db)):
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
out = get_pool_detailed_status(service)
out["active_sessions"] = get_active_sessions_count(db, service.id)
return out
@app.post("/api/admin/services/{service_id}/icon")
async def upload_service_icon(
service_id: int,
request: Request,
file: UploadFile = File(...),
_: User = Depends(require_admin),
db: Session = Depends(get_db),
):
validate_csrf(request)
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
new_path = await store_service_icon(service, file)
old_path = service.icon_path
service.icon_path = new_path
db.commit()
if old_path and old_path != new_path:
remove_icon_file(old_path)
return {"ok": True, "icon_path": new_path}
@app.delete("/api/admin/services/{service_id}/icon")
def delete_service_icon(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
old_path = service.icon_path
service.icon_path = ""
db.commit()
remove_icon_file(old_path)
return {"ok": True}
@app.put("/api/admin/services/{service_id}")
def edit_service(service_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
for key in ["name", "slug", "target", "active", "comment", "svc_login", "svc_password", "svc_cred_hint"]:
if key in payload:
setattr(service, key, payload[key])
if "type" in payload:
service.type = ServiceType(payload["type"])
if service.type == ServiceType.VNC:
raise HTTPException(status_code=400, detail="VNC services are no longer supported")
if service.type == ServiceType.WEB:
service.target = normalize_web_target(service.target)
elif service.type == ServiceType.RDP:
parse_rdp_target(service.target)
if "warm_pool_size" in payload:
service.warm_pool_size = max(0, int(payload["warm_pool_size"]))
if "category_ids" in payload:
set_service_categories(db, service.id, payload.get("category_ids", []))
db.commit()
if service.type == ServiceType.WEB:
if WEB_POOL_SIZE <= 0:
ensure_warm_pool(service)
open_warm_web_url(service, service.target)
elif service_uses_universal_pool(service):
ensure_universal_pool()
return {"ok": True}
@app.delete("/api/admin/services/{service_id}")
def delete_service(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
if service.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
ensure_warm_pool(service, 0)
remove_icon_file(service.icon_path)
db.delete(service)
db.commit()
return {"ok": True}
@app.post("/api/admin/services/{service_id}/prewarm")
def prewarm_now(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
if service.type == ServiceType.WEB:
ensure_web_pool()
return {"ok": True, "pool": get_web_pool_status()}
if service_uses_universal_pool(service):
ensure_universal_pool()
return {"ok": True, "pool": get_universal_pool_status()}
if service.type == ServiceType.RDP:
return {"ok": True, "pool": get_pool_status_for_service(service), "message": "RDP запускается on-demand"}
ensure_warm_pool(service)
return {"ok": True, "pool": get_pool_status_for_service(service)}
@app.post("/api/admin/services/{service_id}/rdp-slots")
def create_rdp_slot(service_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service = db.get(Service, service_id)
if not service or service.type != ServiceType.RDP:
raise HTTPException(status_code=404, detail="RDP service not found")
rdp_username = (payload.get("rdp_username") or "").strip()
rdp_password = (payload.get("rdp_password") or "").strip()
if not rdp_username:
raise HTTPException(status_code=400, detail="rdp_username is required")
slot = RdpSlot(service_id=service_id, rdp_username=rdp_username, rdp_password=rdp_password)
db.add(slot)
db.flush()
try:
container_name = start_rdp_slot_container(slot, service)
slot.container_name = container_name
except Exception as exc:
logger.exception("rdp_slot_container_start_failed service_id=%s", service_id)
raise HTTPException(status_code=502, detail=f"Контейнер не запустился: {exc}")
db.commit()
audit(db, "RDP_SLOT_CREATE", f"service={service.slug} slot={slot.id} user={rdp_username}", user_id=None)
return {"ok": True, "slot_id": slot.id, "container_name": slot.container_name}
@app.delete("/api/admin/rdp-slots/{slot_id}")
def delete_rdp_slot(slot_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
slot = db.get(RdpSlot, slot_id)
if not slot:
raise HTTPException(status_code=404, detail="Slot not found")
container_name = slot.container_name
db.delete(slot)
db.commit()
if container_name:
threading.Thread(target=stop_rdp_slot_container, args=(container_name,), daemon=True).start()
return {"ok": True}
@app.post("/api/admin/categories")
def create_category(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
name = (payload.get("name") or "").strip()
slug = (payload.get("slug") or "").strip().lower().replace(" ", "-")
if not name:
raise HTTPException(status_code=400, detail="Category name is required")
if not slug:
raise HTTPException(status_code=400, detail="Category slug is required")
exists = db.scalar(select(Category).where((Category.name == name) | (Category.slug == slug)))
if exists:
raise HTTPException(status_code=409, detail="Category already exists")
category = Category(name=name, slug=slug)
db.add(category)
db.commit()
return {"id": category.id}
@app.delete("/api/admin/categories/{category_id}")
def delete_category(category_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
category = db.get(Category, category_id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
db.delete(category)
db.commit()
return {"ok": True}
@app.put("/api/admin/web-pool-size")
def update_web_pool_size(payload: dict, request: Request, _: User = Depends(require_admin)):
validate_csrf(request)
global WEB_POOL_SIZE
value = max(0, int(payload.get("size", WEB_POOL_SIZE)))
WEB_POOL_SIZE = value
ensure_web_pool()
return {"ok": True, "size": WEB_POOL_SIZE, "pool": get_web_pool_status()}
@app.post("/api/admin/users")
def create_user(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
expires_at = dt.datetime.fromisoformat(payload["expires_at"])
user = User(
username=payload["username"],
password_hash=hash_password(payload["password"]),
expires_at=expires_at,
active=payload.get("active", True),
is_admin=payload.get("is_admin", False),
first_name=payload.get("first_name", ""),
last_name=payload.get("last_name", ""),
)
db.add(user)
db.commit()
return {"id": user.id}
@app.put("/api/admin/users/{user_id}")
def edit_user(user_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
user = db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
for key in ["username", "active", "is_admin", "first_name", "last_name"]:
if key in payload:
setattr(user, key, payload[key])
if "password" in payload and payload["password"]:
user.password_hash = hash_password(payload["password"])
if "expires_at" in payload:
user.expires_at = dt.datetime.fromisoformat(payload["expires_at"])
db.commit()
return {"ok": True}
@app.delete("/api/admin/users/{user_id}")
def delete_user(user_id: int, request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
user = db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.id == admin.id:
raise HTTPException(status_code=400, detail="Cannot delete current admin")
db.delete(user)
db.commit()
return {"ok": True}
@app.put("/api/admin/users/{user_id}/acl")
def set_acl(user_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
user = db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
service_ids = set(payload.get("service_ids", []))
existing = db.scalars(select(UserServiceAccess).where(UserServiceAccess.user_id == user_id)).all()
existing_map = {x.service_id: x for x in existing}
for sid in service_ids:
if sid not in existing_map:
db.add(UserServiceAccess(user_id=user_id, service_id=sid))
for sid, row in existing_map.items():
if sid not in service_ids:
db.delete(row)
db.commit()
return {"ok": True}