Files
Stend_mont/app/main.py.bak_lockfix_20260424_121438
T

2566 lines
97 KiB
Plaintext

import datetime as dt
import enum
import fcntl
import json
import re
import logging
import os
from pathlib import Path
import secrets
import threading
import time
import uuid
import contextvars
from urllib.parse import parse_qs, unquote, urlparse
from typing import Optional
import docker
import requests
from fastapi import Depends, FastAPI, File, Form, HTTPException, Request, UploadFile, status
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from itsdangerous import BadSignature, URLSafeTimedSerializer
from markupsafe import Markup, escape
from passlib.context import CryptContext
from sqlalchemy import (
Boolean,
DateTime,
Enum,
ForeignKey,
Integer,
String,
Text,
UniqueConstraint,
create_engine,
select,
text,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, sessionmaker
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+psycopg2://portal:portal@db:5432/portal")
COOKIE_NAME = "portal_auth"
CSRF_COOKIE = "csrf_token"
COOKIE_MAX_AGE = 8 * 60 * 60
SESSION_IDLE_SECONDS = int(os.getenv("SESSION_IDLE_SECONDS", "300"))
PUBLIC_HOST = os.getenv("PUBLIC_HOST", "stend.4mont.ru")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
LOG_SLOW_REQUEST_MS = int(os.getenv("LOG_SLOW_REQUEST_MS", "2000"))
GO_USER_LOCK_TIMEOUT_SECONDS = float(os.getenv("GO_USER_LOCK_TIMEOUT_SECONDS", "2.0"))
GO_POOL_LOCK_TIMEOUT_SECONDS = float(os.getenv("GO_POOL_LOCK_TIMEOUT_SECONDS", "5.0"))
POOL_DISPATCH_RETRIES = int(os.getenv("POOL_DISPATCH_RETRIES", "4"))
POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS = float(os.getenv("POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS", "2.0"))
POOL_DISPATCH_SLEEP_SECONDS = float(os.getenv("POOL_DISPATCH_SLEEP_SECONDS", "0.3"))
TRAEFIK_INTERNAL_URL = os.getenv("TRAEFIK_INTERNAL_URL", "http://traefik")
PREWARM_POOL_SIZE = int(os.getenv("PREWARM_POOL_SIZE", "0"))
UNIVERSAL_POOL_SIZE = int(os.getenv("UNIVERSAL_POOL_SIZE", "0"))
WEB_POOL_SIZE = int(os.getenv("WEB_POOL_SIZE", "5"))
WEB_POOL_BUFFER = int(os.getenv("WEB_POOL_BUFFER", "2"))
MAX_ACTIVE_SERVICES_PER_USER = int(os.getenv("MAX_ACTIVE_SERVICES_PER_USER", "4"))
ICON_UPLOAD_MAX_BYTES = 2 * 1024 * 1024
ICON_UPLOAD_TYPES = {
"image/png": "png",
"image/jpeg": "jpg",
"image/webp": "webp",
}
SERVICE_ICONS_DIR = Path("static/service-icons")
logging.basicConfig(
level=LOG_LEVEL,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
logger = logging.getLogger("portal")
request_id_ctx = contextvars.ContextVar("request_id", default="-")
def _normalize_log_value(value):
if isinstance(value, (str, int, float, bool)) or value is None:
return value
if isinstance(value, dt.datetime):
return value.isoformat()
return str(value)
def log_event(event: str, level: int = logging.INFO, **fields) -> None:
payload = {"event": event, "req_id": request_id_ctx.get()}
for key, value in fields.items():
payload[key] = _normalize_log_value(value)
logger.log(level, json.dumps(payload, ensure_ascii=False, separators=(",", ":")))
SIGNING_KEY = os.getenv("SIGNING_KEY", secrets.token_urlsafe(32))
serializer = URLSafeTimedSerializer(SIGNING_KEY, salt="portal-auth")
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
templates = Jinja2Templates(directory="templates")
app = FastAPI(title="МОНТ - инфрастуктурный полигон")
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.middleware("http")
async def request_logging_middleware(request: Request, call_next):
req_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
token = request_id_ctx.set(req_id)
started = time.time()
client_ip = request.client.host if request.client else "-"
user_agent = request.headers.get("user-agent", "-")
try:
response = await call_next(request)
except Exception:
log_event(
"request_failed",
level=logging.ERROR,
method=request.method,
path=request.url.path,
client_ip=client_ip,
user_agent=user_agent,
)
request_id_ctx.reset(token)
raise
duration_ms = int((time.time() - started) * 1000)
level = logging.INFO
if response.status_code >= 500:
level = logging.ERROR
elif response.status_code >= 400:
level = logging.WARNING
log_event(
"request",
level=level,
method=request.method,
path=request.url.path,
query=str(request.url.query or ""),
status=response.status_code,
duration_ms=duration_ms,
client_ip=client_ip,
user_agent=user_agent,
)
if duration_ms >= LOG_SLOW_REQUEST_MS:
log_event(
"slow_request",
level=logging.WARNING,
method=request.method,
path=request.url.path,
duration_ms=duration_ms,
threshold_ms=LOG_SLOW_REQUEST_MS,
)
response.headers["X-Request-ID"] = req_id
request_id_ctx.reset(token)
return response
class Base(DeclarativeBase):
pass
class ServiceType(str, enum.Enum):
WEB = "WEB"
VNC = "VNC"
RDP = "RDP"
class SessionStatus(str, enum.Enum):
ACTIVE = "ACTIVE"
EXPIRED = "EXPIRED"
TERMINATED = "TERMINATED"
ROTATED = "ROTATED"
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
username: Mapped[str] = mapped_column(String(64), unique=True, index=True)
password_hash: Mapped[str] = mapped_column(String(255))
expires_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), index=True)
active: Mapped[bool] = mapped_column(Boolean, default=True, index=True)
is_admin: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
class Service(Base):
__tablename__ = "services"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = mapped_column(String(128))
slug: Mapped[str] = mapped_column(String(64), unique=True, index=True)
type: Mapped[ServiceType] = mapped_column(Enum(ServiceType), index=True)
target: Mapped[str] = mapped_column(Text)
comment: Mapped[str] = mapped_column(Text, default="")
icon_path: Mapped[str] = mapped_column(Text, default="")
active: Mapped[bool] = mapped_column(Boolean, default=True)
warm_pool_size: Mapped[int] = mapped_column(Integer, default=0)
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
class Category(Base):
__tablename__ = "categories"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = mapped_column(String(128), unique=True, index=True)
slug: Mapped[str] = mapped_column(String(64), unique=True, index=True)
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
class ServiceCategory(Base):
__tablename__ = "service_categories"
__table_args__ = (UniqueConstraint("service_id", "category_id", name="uq_service_category"),)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True)
category_id: Mapped[int] = mapped_column(ForeignKey("categories.id", ondelete="CASCADE"), index=True)
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
class UserServiceAccess(Base):
__tablename__ = "user_service_access"
__table_args__ = (UniqueConstraint("user_id", "service_id", name="uq_user_service"),)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True)
service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True)
granted_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
class SessionModel(Base):
__tablename__ = "sessions"
id: Mapped[str] = mapped_column(String(36), primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True)
service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True)
status: Mapped[SessionStatus] = mapped_column(Enum(SessionStatus), default=SessionStatus.ACTIVE, index=True)
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True)
last_access_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True)
container_id: Mapped[Optional[str]] = mapped_column(String(128), nullable=True)
class AuditLog(Base):
__tablename__ = "audit_logs"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
user_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, index=True)
action: Mapped[str] = mapped_column(String(128), index=True)
details: Mapped[str] = mapped_column(Text)
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True)
def now_utc() -> dt.datetime:
return dt.datetime.now(dt.timezone.utc)
def session_closed_reason(sess: SessionModel, db: Session) -> str:
if not sess:
return "idle"
if sess.status == SessionStatus.EXPIRED:
return "idle"
if sess.status == SessionStatus.ROTATED:
return "limit"
if sess.status == SessionStatus.TERMINATED:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
active_rows = db.scalars(
select(SessionModel).where(
SessionModel.user_id == sess.user_id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
).all()
active_service_ids = {row.service_id for row in active_rows}
if len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER and sess.service_id not in active_service_ids:
return "limit"
return "idle"
def normalize_web_target(url: str) -> str:
raw = (url or "").strip()
if not raw:
return raw
if raw.startswith(("http://", "https://")):
return raw
return f"http://{raw}"
def format_service_comment(raw_text: str) -> Markup:
raw = (raw_text or "").replace("\r\n", "\n").replace("\r", "\n").strip()
if not raw:
return Markup("")
escaped = str(escape(raw))
# Support pasted/plain markdown-like bold fragments.
escaped = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", escaped, flags=re.DOTALL)
# Allow a small safe subset of pasted HTML tags.
replacements = {
"&lt;b&gt;": "<b>",
"&lt;/b&gt;": "</b>",
"&lt;strong&gt;": "<strong>",
"&lt;/strong&gt;": "</strong>",
"&lt;i&gt;": "<i>",
"&lt;/i&gt;": "</i>",
"&lt;em&gt;": "<em>",
"&lt;/em&gt;": "</em>",
"&lt;u&gt;": "<u>",
"&lt;/u&gt;": "</u>",
"&lt;br&gt;": "<br>",
"&lt;br/&gt;": "<br>",
"&lt;br /&gt;": "<br>",
}
for src, dst in replacements.items():
escaped = escaped.replace(src, dst)
escaped = escaped.replace("\n", "<br>")
return Markup(escaped)
def parse_rdp_target(target: str) -> dict:
raw = (target or "").strip()
if not raw:
raise HTTPException(status_code=400, detail="Empty RDP target")
parsed = urlparse(raw if "://" in raw else f"//{raw}")
host = parsed.hostname
if not host:
raise HTTPException(status_code=400, detail="Invalid RDP target. Use host:port or rdp://user:pass@host:port")
port = parsed.port or 3389
username = unquote(parsed.username) if parsed.username else ""
password = unquote(parsed.password) if parsed.password else ""
query = parse_qs(parsed.query or "")
if not username:
username = (query.get("u", [""])[0] or query.get("user", [""])[0] or "").strip()
if not password:
password = (query.get("p", [""])[0] or query.get("password", [""])[0] or "").strip()
domain = (query.get("domain", [""])[0] or query.get("d", [""])[0] or "").strip()
security = (query.get("sec", [""])[0] or query.get("security", [""])[0] or "").strip().lower()
if security and security not in {"nla", "tls", "rdp"}:
raise HTTPException(status_code=400, detail="Invalid RDP security. Use one of: nla, tls, rdp")
return {
"host": host,
"port": str(port),
"user": username,
"password": password,
"domain": domain,
"security": security,
}
def set_service_categories(db: Session, service_id: int, category_ids: list[int]) -> None:
normalized = sorted({int(x) for x in (category_ids or [])})
if normalized:
existing_ids = set(db.scalars(select(Category.id).where(Category.id.in_(normalized))).all())
missing = sorted(set(normalized) - existing_ids)
if missing:
raise HTTPException(status_code=400, detail=f"Unknown category ids: {missing}")
existing_links = db.scalars(select(ServiceCategory).where(ServiceCategory.service_id == service_id)).all()
current = {row.category_id: row for row in existing_links}
wanted = set(normalized)
for cat_id in wanted:
if cat_id not in current:
db.add(ServiceCategory(service_id=service_id, category_id=cat_id))
for cat_id, row in current.items():
if cat_id not in wanted:
db.delete(row)
def service_uses_universal_pool(service: Service) -> bool:
return UNIVERSAL_POOL_SIZE > 0 and service.type == ServiceType.RDP
def universal_container_name(slot: int) -> str:
return f"portal-universal-{slot}"
def web_pool_container_name(slot: int) -> str:
return f"portal-webpool-{slot}"
def ensure_icons_dir() -> None:
SERVICE_ICONS_DIR.mkdir(parents=True, exist_ok=True)
def remove_icon_file(icon_path: str) -> None:
if not icon_path or not icon_path.startswith("/static/service-icons/"):
return
filename = icon_path.rsplit("/", 1)[-1]
candidate = SERVICE_ICONS_DIR / filename
try:
candidate.unlink(missing_ok=True)
except Exception:
logger.exception("icon_delete_failed path=%s", candidate)
async def store_service_icon(service: Service, upload: UploadFile) -> str:
ensure_icons_dir()
content_type = (upload.content_type or "").lower().strip()
ext = ICON_UPLOAD_TYPES.get(content_type)
if not ext:
raise HTTPException(status_code=400, detail="Unsupported file type. Use PNG/JPG/WEBP")
payload = await upload.read(ICON_UPLOAD_MAX_BYTES + 1)
if len(payload) > ICON_UPLOAD_MAX_BYTES:
raise HTTPException(status_code=400, detail="File too large. Max 2MB")
if not payload:
raise HTTPException(status_code=400, detail="Empty file")
stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%d_%H%M%S")
filename = f"svc_{service.id}_{stamp}.{ext}"
target = SERVICE_ICONS_DIR / filename
target.write_bytes(payload)
return f"/static/service-icons/{filename}"
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def audit(db: Session, action: str, details: str, user_id: Optional[int] = None) -> None:
db.add(AuditLog(user_id=user_id, action=action, details=details))
db.commit()
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(password: str, password_hash: str) -> bool:
return pwd_context.verify(password, password_hash)
def user_is_valid(user: User) -> bool:
return bool(user.active and user.expires_at > now_utc())
def issue_auth_cookie(response: RedirectResponse, user: User) -> None:
token = serializer.dumps({"user_id": user.id})
response.set_cookie(
key=COOKIE_NAME,
value=token,
httponly=True,
secure=True,
samesite="strict",
max_age=COOKIE_MAX_AGE,
path="/",
)
def issue_csrf_cookie(response: RedirectResponse) -> str:
token = secrets.token_urlsafe(24)
response.set_cookie(
key=CSRF_COOKIE,
value=token,
httponly=False,
secure=True,
samesite="strict",
max_age=COOKIE_MAX_AGE,
path="/",
)
return token
def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]:
raw = request.cookies.get(COOKIE_NAME)
if not raw:
return None
try:
payload = serializer.loads(raw, max_age=COOKIE_MAX_AGE)
except BadSignature:
return None
user = db.get(User, int(payload["user_id"]))
if not user or not user_is_valid(user):
return None
return user
def require_user(user: Optional[User] = Depends(get_current_user)) -> User:
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized")
return user
def require_admin(user: User = Depends(require_user)) -> User:
if not user.is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin only")
return user
def validate_csrf(request: Request) -> None:
cookie = request.cookies.get(CSRF_COOKIE)
form_val = request.headers.get("X-CSRF-Token")
if request.headers.get("content-type", "").startswith("application/x-www-form-urlencoded"):
return
if not cookie or not form_val or cookie != form_val:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="CSRF failed")
def has_access(db: Session, user_id: int, service_id: int) -> bool:
q = select(UserServiceAccess).where(
UserServiceAccess.user_id == user_id,
UserServiceAccess.service_id == service_id,
)
return db.scalar(q) is not None
def docker_client():
return docker.from_env()
def session_router_name(session_id: str) -> str:
return f"sess-{session_id.replace('-', '')[:16]}"
def _is_pool_name_conflict(exc: Exception) -> bool:
msg = str(exc).lower()
return ("already in use" in msg) or ("marked for removal" in msg)
def _remove_container_by_name(d, name: str) -> None:
try:
old = d.containers.get(name)
old.remove(force=True)
except docker.errors.NotFound:
return
except Exception:
logger.exception("pool_container_remove_failed name=%s", name)
def ensure_universal_pool() -> None:
if UNIVERSAL_POOL_SIZE <= 0:
return
d = docker_client()
image = "portal-universal-runtime:latest"
for i in range(UNIVERSAL_POOL_SIZE, 100):
name = universal_container_name(i)
try:
c = d.containers.get(name)
c.stop(timeout=5)
except docker.errors.NotFound:
break
except Exception:
logger.exception("universal_pool_scale_down_failed slot=%s", i)
for i in range(UNIVERSAL_POOL_SIZE):
name = universal_container_name(i)
path = f"/u/{i}/"
router = f"upool-{i}"
labels = {
"traefik.enable": "true",
"traefik.docker.network": "portal_net",
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
f"traefik.http.routers.{router}.entrypoints": "websecure",
f"traefik.http.routers.{router}.tls": "true",
f"traefik.http.routers.{router}.priority": "9400",
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
"portal.pool": "1",
"portal.pool.slot": str(i),
}
env = {
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
"ENABLE_HEARTBEAT": "0",
"SESSION_ID": f"universal-{i}",
}
try:
c = d.containers.get(name)
if c.status != "running":
c.start()
continue
except docker.errors.NotFound:
pass
except Exception:
logger.exception("universal_pool_check_failed slot=%s", i)
continue
d.containers.run(
image=image,
name=name,
detach=True,
auto_remove=True,
network="portal_net",
labels=labels,
environment=env,
)
logger.info("universal_pool_container_started slot=%s", i)
def ensure_web_pool(target_size: Optional[int] = None) -> None:
desired = max(0, WEB_POOL_SIZE if target_size is None else target_size)
d = docker_client()
image = "portal-universal-runtime:latest"
for i in range(desired, 100):
name = web_pool_container_name(i)
try:
c = d.containers.get(name)
c.stop(timeout=5)
except docker.errors.NotFound:
break
except Exception:
logger.exception("web_pool_scale_down_failed slot=%s", i)
for i in range(desired):
name = web_pool_container_name(i)
path = f"/w/{i}/"
router = f"wpool-{i}"
labels = {
"traefik.enable": "true",
"traefik.docker.network": "portal_net",
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
f"traefik.http.routers.{router}.entrypoints": "websecure",
f"traefik.http.routers.{router}.tls": "true",
f"traefik.http.routers.{router}.priority": "9450",
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
"portal.pool": "1",
"portal.pool.kind": "web",
"portal.pool.slot": str(i),
}
env = {
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
"ENABLE_HEARTBEAT": "0",
"SESSION_ID": f"webpool-{i}",
}
should_create = False
try:
c = d.containers.get(name)
if c.status != "running":
try:
c.start()
except docker.errors.APIError as exc:
if _is_pool_name_conflict(exc):
logger.warning("web_pool_recreate_needed slot=%s reason=name-conflict", i)
_remove_container_by_name(d, name)
should_create = True
else:
raise
if not should_create:
continue
except docker.errors.NotFound:
should_create = True
except Exception:
logger.exception("web_pool_check_failed slot=%s", i)
continue
for attempt in range(3):
try:
d.containers.run(
image=image,
name=name,
detach=True,
auto_remove=True,
network="portal_net",
labels=labels,
environment=env,
)
logger.info("web_pool_container_started slot=%s", i)
break
except docker.errors.APIError as exc:
if _is_pool_name_conflict(exc) and attempt < 2:
logger.warning("web_pool_run_conflict_retry slot=%s attempt=%s", i, attempt + 1)
_remove_container_by_name(d, name)
time.sleep(0.25)
continue
logger.exception("web_pool_run_failed slot=%s", i)
break
def get_universal_pool_status() -> dict:
desired = max(0, UNIVERSAL_POOL_SIZE)
if desired <= 0:
return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []}
d = docker_client()
names = [universal_container_name(i) for i in range(desired)]
containers = []
for name in names:
try:
containers.append(d.containers.get(name))
except Exception:
continue
running = sum(1 for c in containers if c.status == "running")
health = "ok" if running >= min(desired, 1) else "down"
return {
"desired": desired,
"running": running,
"total": len(containers),
"names": sorted(c.name for c in containers),
"health": health,
}
def get_web_pool_status() -> dict:
desired = max(0, WEB_POOL_SIZE)
if desired <= 0:
return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []}
d = docker_client()
names = [web_pool_container_name(i) for i in range(desired)]
containers = []
for name in names:
try:
containers.append(d.containers.get(name))
except Exception:
continue
running = sum(1 for c in containers if c.status == "running")
health = "ok" if running >= min(desired, 1) else "down"
return {
"desired": desired,
"running": running,
"total": len(containers),
"names": sorted(c.name for c in containers),
"health": health,
}
def acquire_universal_slot(db: Session) -> int:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = select(SessionModel).where(
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.container_id.like("POOLIDX:%"),
SessionModel.last_access_at >= cutoff,
)
active = db.scalars(q).all()
busy = set()
for sess in active:
try:
busy.add(int((sess.container_id or "").split(":", 1)[1]))
except Exception:
continue
for i in range(max(0, UNIVERSAL_POOL_SIZE)):
if i not in busy:
return i
if active:
victim = min(active, key=lambda s: s.last_access_at)
victim.status = SessionStatus.TERMINATED
db.commit()
try:
return int((victim.container_id or "").split(":", 1)[1])
except Exception:
pass
return 0
def acquire_web_pool_slot(db: Session) -> int:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = select(SessionModel).where(
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.container_id.like("WEBPOOLIDX:%"),
SessionModel.last_access_at >= cutoff,
)
active = db.scalars(q).all()
busy = set()
for sess in active:
try:
busy.add(int((sess.container_id or "").split(":", 1)[1]))
except Exception:
continue
# Keep headroom: when active sessions are close to hot pool capacity,
# proactively warm up extra slots.
auto_target = max(WEB_POOL_SIZE, len(active) + max(0, WEB_POOL_BUFFER))
if auto_target > WEB_POOL_SIZE:
ensure_web_pool(auto_target)
for i in range(max(0, auto_target)):
if i not in busy:
return i
return 0
def dispatch_universal_target(slot: int, service: Service) -> None:
name = universal_container_name(slot)
url = ""
payload = {}
if service.type == ServiceType.WEB:
url = f"http://{name}:7000/open"
payload = {"url": normalize_web_target(service.target)}
elif service.type == ServiceType.RDP:
cfg = parse_rdp_target(service.target)
url = f"http://{name}:7000/rdp"
payload = {
"host": cfg["host"],
"port": cfg["port"],
"user": cfg["user"],
"password": cfg["password"],
"domain": cfg["domain"],
"security": cfg["security"],
}
else:
raise HTTPException(status_code=400, detail="Universal pool supports WEB/RDP only")
last_exc = None
for _ in range(max(1, POOL_DISPATCH_RETRIES)):
try:
resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS)
resp.raise_for_status()
return
except Exception as exc:
last_exc = exc
time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS))
if last_exc:
raise last_exc
def dispatch_web_pool_target(slot: int, service: Service) -> None:
name = web_pool_container_name(slot)
target_url = normalize_web_target(service.target)
url = f"http://{name}:7000/open"
last_exc = None
for _ in range(max(1, POOL_DISPATCH_RETRIES)):
try:
resp = requests.post(url, json={"url": target_url}, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS)
resp.raise_for_status()
return
except Exception as exc:
last_exc = exc
time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS))
if last_exc:
raise last_exc
def create_runtime_container(service: Service, session_id: str):
d = docker_client()
router = session_router_name(session_id)
path = f"/s/{session_id}/"
labels = {
"traefik.enable": "true",
"traefik.docker.network": "portal_net",
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
f"traefik.http.routers.{router}.entrypoints": "websecure",
f"traefik.http.routers.{router}.tls": "true",
f"traefik.http.routers.{router}.priority": "10000",
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
}
env = {
"SESSION_ID": session_id,
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
"ENABLE_HEARTBEAT": "1",
"TOUCH_PATH": f"/api/sessions/{session_id}/touch",
}
image = "portal-kiosk:latest"
if service.type == ServiceType.WEB:
env["TARGET_URL"] = service.target
env["HOME_URL"] = f"https://{PUBLIC_HOST}/"
elif service.type == ServiceType.RDP:
image = "portal-rdp-proxy:latest"
cfg = parse_rdp_target(service.target)
env["RDP_HOST"] = cfg["host"]
env["RDP_PORT"] = cfg["port"]
if cfg["user"]:
env["RDP_USER"] = cfg["user"]
if cfg["password"]:
env["RDP_PASSWORD"] = cfg["password"]
if cfg["domain"]:
env["RDP_DOMAIN"] = cfg["domain"]
if cfg["security"]:
env["RDP_SECURITY"] = cfg["security"]
else:
raise HTTPException(status_code=400, detail="Unsupported service type")
container = d.containers.run(
image=image,
name=f"portal-sess-{session_id[:8]}",
detach=True,
auto_remove=True,
network="portal_net",
labels=labels,
environment=env,
)
logger.info("session_container_started session_id=%s container_id=%s service_type=%s", session_id, container.id, service.type.value)
return container.id
def ensure_warm_pool(service: Service, pool_size: Optional[int] = None) -> None:
if service_uses_universal_pool(service):
return
if pool_size is None:
pool_size = desired_pool_size(service)
if pool_size <= 0:
# Stop stale warm containers for this service when pool is disabled.
prefix = f"portal-warm-{service.slug}-"
try:
d = docker_client()
for c in d.containers.list(all=True, filters={"name": prefix}):
if c.name.startswith(prefix):
c.stop(timeout=5)
except Exception:
logger.exception("warm_pool_disable_failed service=%s", service.slug)
return
d = docker_client()
router = f"warm-{service.slug}"
svc_name = f"warmsvc-{service.slug}"
path = f"/svc/{service.slug}/"
image = "portal-kiosk:latest"
base_env = {
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
"ENABLE_HEARTBEAT": "0",
"TOUCH_PATH": "",
}
if service.type == ServiceType.WEB:
base_env["UNIVERSAL_WEB"] = "1"
base_env["START_URL"] = normalize_web_target(service.target)
base_env["HOME_URL"] = f"https://{PUBLIC_HOST}/"
elif service.type == ServiceType.RDP:
image = "portal-rdp-proxy:latest"
cfg = parse_rdp_target(service.target)
base_env["RDP_HOST"] = cfg["host"]
base_env["RDP_PORT"] = cfg["port"]
if cfg["user"]:
base_env["RDP_USER"] = cfg["user"]
if cfg["password"]:
base_env["RDP_PASSWORD"] = cfg["password"]
if cfg["domain"]:
base_env["RDP_DOMAIN"] = cfg["domain"]
if cfg["security"]:
base_env["RDP_SECURITY"] = cfg["security"]
else:
raise HTTPException(status_code=400, detail="Unsupported service type")
labels = {
"traefik.enable": "true",
"traefik.docker.network": "portal_net",
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
f"traefik.http.routers.{router}.entrypoints": "websecure",
f"traefik.http.routers.{router}.tls": "true",
f"traefik.http.routers.{router}.priority": "9500",
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
f"traefik.http.services.{svc_name}.loadbalancer.server.port": "6080",
f"traefik.http.routers.{router}.service": svc_name,
"portal.warm": "1",
"portal.service.slug": service.slug,
"portal.service.type": service.type.value,
}
# Ensure desired cardinality.
for i in range(pool_size, 50):
name = f"portal-warm-{service.slug}-{i}"
try:
c = d.containers.get(name)
c.stop(timeout=5)
except docker.errors.NotFound:
break
except Exception:
logger.exception("warm_pool_scale_down_failed service=%s idx=%s", service.slug, i)
for i in range(pool_size):
name = f"portal-warm-{service.slug}-{i}"
try:
c = d.containers.get(name)
if c.status != "running":
c.start()
continue
except docker.errors.NotFound:
pass
except Exception:
logger.exception("warm_pool_check_failed service=%s idx=%s", service.slug, i)
continue
env = dict(base_env)
env["SESSION_ID"] = f"warm-{service.slug}-{i}"
d.containers.run(
image=image,
name=name,
detach=True,
auto_remove=True,
network="portal_net",
labels=labels,
environment=env,
)
logger.info("warm_pool_container_started service=%s idx=%s", service.slug, i)
def wait_for_session_route(session_id: str, timeout_seconds: int = 6) -> bool:
target = f"{TRAEFIK_INTERNAL_URL}/s/{session_id}/"
deadline = time.time() + timeout_seconds
while time.time() < deadline:
try:
resp = requests.get(
target,
headers={"Host": PUBLIC_HOST},
allow_redirects=False,
timeout=1.5,
)
if resp.status_code != 404:
return True
except Exception:
pass
time.sleep(0.3)
return False
def route_ready(path: str) -> bool:
bases = [TRAEFIK_INTERNAL_URL]
if TRAEFIK_INTERNAL_URL.startswith("http://"):
bases.append("https://" + TRAEFIK_INTERNAL_URL[len("http://"):])
for base in bases:
try:
verify = not base.startswith("https://")
resp = requests.get(
f"{base}{path}",
headers={"Host": PUBLIC_HOST},
allow_redirects=False,
timeout=1.5,
verify=verify,
)
if resp.status_code != 404:
return True
except Exception:
continue
return False
def container_running(container_id: Optional[str]) -> bool:
if not container_id:
return False
if (
container_id.startswith("POOL:")
or container_id.startswith("POOLIDX:")
or container_id.startswith("WEBPOOLIDX:")
):
return True
try:
c = docker_client().containers.get(container_id)
return c.status == "running"
except Exception:
return False
def stop_runtime_container(container_id: Optional[str]) -> None:
if not container_id:
return
try:
d = docker_client()
c = d.containers.get(container_id)
c.stop(timeout=5)
except Exception:
logger.exception("session_container_stop_failed container_id=%s", container_id)
def terminate_session_record(
db: Session,
sess: SessionModel,
new_status: SessionStatus = SessionStatus.TERMINATED,
*,
stop_container: bool = True,
) -> None:
if not sess or sess.status != SessionStatus.ACTIVE:
return
old_status = sess.status
cid = sess.container_id or ""
if stop_container and cid and not cid.startswith(("POOL:", "POOLIDX:", "WEBPOOLIDX:")):
stop_runtime_container(cid)
sess.status = new_status
sess.last_access_at = now_utc()
log_event(
"session_closed",
level=logging.INFO,
session_id=sess.id,
user_id=sess.user_id,
service_id=sess.service_id,
container_id=cid,
old_status=old_status.value if isinstance(old_status, SessionStatus) else str(old_status),
new_status=new_status.value,
reason=session_closed_reason(sess, db),
stop_container=stop_container,
)
def ensure_schema_compatibility() -> None:
# PostgreSQL requires enum value addition to be committed before usage in constraints.
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
conn.execute(
text(
"""
DO $$
BEGIN
BEGIN
ALTER TYPE servicetype ADD VALUE IF NOT EXISTS 'RDP';
EXCEPTION WHEN undefined_object THEN
NULL;
END;
END $$;
"""
)
)
conn.execute(
text(
"""
DO $$
BEGIN
BEGIN
ALTER TYPE sessionstatus ADD VALUE IF NOT EXISTS 'ROTATED';
EXCEPTION WHEN undefined_object THEN
NULL;
END;
END $$;
"""
)
)
with engine.begin() as conn:
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS warm_pool_size INTEGER NOT NULL DEFAULT 0"))
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS comment TEXT NOT NULL DEFAULT ''"))
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS icon_path TEXT NOT NULL DEFAULT ''"))
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS categories (
id SERIAL PRIMARY KEY,
name VARCHAR(128) NOT NULL UNIQUE,
slug VARCHAR(64) NOT NULL UNIQUE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
"""
)
)
conn.execute(
text(
"""
CREATE TABLE IF NOT EXISTS service_categories (
id SERIAL PRIMARY KEY,
service_id INT NOT NULL REFERENCES services(id) ON DELETE CASCADE,
category_id INT NOT NULL REFERENCES categories(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (service_id, category_id)
)
"""
)
)
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_service_id ON service_categories(service_id)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_category_id ON service_categories(category_id)"))
# Handle installs where service type is VARCHAR + CHECK.
conn.execute(
text(
"""
DO $$
DECLARE c record;
BEGIN
FOR c IN
SELECT conname
FROM pg_constraint
WHERE conrelid = 'services'::regclass
AND contype = 'c'
AND pg_get_constraintdef(oid) ILIKE '%type%'
LOOP
EXECUTE format('ALTER TABLE services DROP CONSTRAINT %I', c.conname);
END LOOP;
ALTER TABLE services
ADD CONSTRAINT services_type_check
CHECK (type IN ('WEB','VNC','RDP'));
EXCEPTION WHEN duplicate_object THEN
NULL;
END $$;
"""
)
)
def desired_pool_size(service: Service) -> int:
if not service.active:
return 0
if service.type == ServiceType.RDP and not service_uses_universal_pool(service):
# RDP runs on-demand per user session; no prewarmed pool.
return 0
if service_uses_universal_pool(service):
return UNIVERSAL_POOL_SIZE
return service.warm_pool_size if service.warm_pool_size and service.warm_pool_size > 0 else PREWARM_POOL_SIZE
def get_warm_containers_for_service(service: Service) -> list:
prefix = f"portal-warm-{service.slug}-"
try:
d = docker_client()
containers = []
for c in d.containers.list(all=True, filters={"name": prefix}):
if c.name.startswith(prefix):
containers.append(c)
return containers
except Exception:
logger.exception("pool_status_failed service=%s", service.slug)
return []
def get_pool_status_for_service(service: Service) -> dict:
if service.type == ServiceType.WEB:
return get_web_pool_status()
if service.type == ServiceType.RDP and not service_uses_universal_pool(service):
return {"desired": 0, "running": 0, "total": 0, "names": [], "health": "n/a"}
if service_uses_universal_pool(service):
return get_universal_pool_status()
desired = desired_pool_size(service)
containers = get_warm_containers_for_service(service)
running = sum(1 for c in containers if c.status == "running")
states = [(c.attrs.get("State") or {}).get("Status", c.status) for c in containers]
has_bad = any(s in {"exited", "dead"} for s in states)
total = len(containers)
if running == 0:
health = "down"
elif running >= min(desired, 1) and not has_bad:
health = "ok"
else:
health = "degraded"
return {
"desired": desired,
"running": running,
"total": total,
"names": sorted(c.name for c in containers),
"health": health,
}
def get_pool_detailed_status(service: Service) -> dict:
if service.type == ServiceType.WEB:
d = docker_client()
pool = get_web_pool_status()
details = []
for i in range(max(0, pool["desired"])):
name = web_pool_container_name(i)
try:
c = d.containers.get(name)
except Exception:
continue
attrs = c.attrs or {}
state = (attrs.get("State") or {}).get("Status", c.status)
details.append(
{
"name": c.name,
"status": c.status,
"state": state,
"created": attrs.get("Created", ""),
"image": c.image.tags[0] if c.image.tags else "",
"labels_ok": True,
}
)
return {
"service_id": service.id,
"slug": service.slug,
"type": service.type.value,
"desired": pool["desired"],
"running": pool["running"],
"total": pool["total"],
"health": pool["health"],
"containers": details,
"updated_at": now_utc().isoformat(),
}
if service_uses_universal_pool(service):
d = docker_client()
pool = get_universal_pool_status()
details = []
for i in range(max(0, UNIVERSAL_POOL_SIZE)):
name = universal_container_name(i)
try:
c = d.containers.get(name)
except Exception:
continue
attrs = c.attrs or {}
state = (attrs.get("State") or {}).get("Status", c.status)
details.append(
{
"name": c.name,
"status": c.status,
"state": state,
"created": attrs.get("Created", ""),
"image": c.image.tags[0] if c.image.tags else "",
"labels_ok": True,
}
)
return {
"service_id": service.id,
"slug": service.slug,
"type": service.type.value,
"desired": pool["desired"],
"running": pool["running"],
"total": pool["total"],
"health": pool["health"],
"containers": details,
"updated_at": now_utc().isoformat(),
}
containers = get_warm_containers_for_service(service)
pool = get_pool_status_for_service(service)
details = []
for c in sorted(containers, key=lambda x: x.name):
attrs = c.attrs or {}
state = (attrs.get("State") or {}).get("Status", c.status)
created = attrs.get("Created", "")
labels = attrs.get("Config", {}).get("Labels", {}) or {}
labels_ok = (
labels.get("portal.warm") == "1"
and labels.get("portal.service.slug") == service.slug
and labels.get("portal.service.type") == service.type.value
)
details.append(
{
"name": c.name,
"status": c.status,
"state": state,
"created": created,
"image": c.image.tags[0] if c.image.tags else "",
"labels_ok": labels_ok,
}
)
return {
"service_id": service.id,
"slug": service.slug,
"type": service.type.value,
"desired": pool["desired"],
"running": pool["running"],
"total": pool["total"],
"health": pool["health"],
"containers": details,
"updated_at": now_utc().isoformat(),
}
def get_active_sessions_count(db: Session, service_id: int) -> int:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = select(SessionModel).where(
SessionModel.service_id == service_id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
sessions = db.scalars(q).all()
# Avoid inflated stats when pooled slot sessions were duplicated by race:
# for pooled sessions, occupancy is unique container_id.
pooled = [s for s in sessions if (s.container_id or "").startswith(("WEBPOOLIDX:", "POOLIDX:", "POOL:"))]
direct = [s for s in sessions if s not in pooled]
unique_pooled = len({s.container_id for s in pooled if s.container_id})
return unique_pooled + len(direct)
def find_active_session_for_service(db: Session, service_id: int) -> Optional[SessionModel]:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = (
select(SessionModel)
.where(
SessionModel.service_id == service_id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
.order_by(SessionModel.created_at.desc())
)
return db.scalars(q).first()
def find_active_session_for_user_service(db: Session, user_id: int, service_id: int) -> Optional[SessionModel]:
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = (
select(SessionModel)
.where(
SessionModel.user_id == user_id,
SessionModel.service_id == service_id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
.order_by(SessionModel.created_at.desc())
)
return db.scalars(q).first()
class LockTimeoutError(Exception):
pass
def allocator_lock(db: Session, lock_id: int, timeout_seconds: Optional[float] = None, poll_seconds: float = 0.05):
class _LockCtx:
def __enter__(self_nonlocal):
self_nonlocal._acquired = False
if timeout_seconds is None:
db.execute(text("SELECT pg_advisory_xact_lock(:lid)"), {"lid": lock_id})
self_nonlocal._acquired = True
return self_nonlocal
deadline = time.monotonic() + max(0.0, timeout_seconds)
while time.monotonic() <= deadline:
got = db.execute(text("SELECT pg_try_advisory_lock(:lid)"), {"lid": lock_id}).scalar()
if got:
self_nonlocal._acquired = True
return self_nonlocal
time.sleep(max(0.01, poll_seconds))
raise LockTimeoutError(f"advisory lock timeout lock_id={lock_id} timeout={timeout_seconds}")
return self_nonlocal
def __exit__(self_nonlocal, exc_type, exc, tb):
if getattr(self_nonlocal, "_acquired", False):
db.execute(text("SELECT pg_advisory_unlock(:lid)"), {"lid": lock_id})
return False
return _LockCtx()
def terminate_active_slot_sessions(db: Session, container_id: str) -> None:
if not container_id:
return
db.execute(
text(
"""
UPDATE sessions
SET status = 'TERMINATED'
WHERE container_id = :cid
AND status = 'ACTIVE'
"""
),
{"cid": container_id},
)
def session_redirect_url(sess: SessionModel) -> str:
cid = sess.container_id or ""
if cid.startswith("POOL:") or cid.startswith("POOLIDX:") or cid.startswith("WEBPOOLIDX:"):
return f"/s/{sess.id}/view"
return f"/s/{sess.id}/"
def open_warm_web_url(service: Service, target_url: str) -> None:
if service_uses_universal_pool(service):
return
if service.type != ServiceType.WEB:
return
target_url = normalize_web_target(target_url)
try:
d = docker_client()
containers = d.containers.list(
filters={
"label": [
"portal.warm=1",
f"portal.service.slug={service.slug}",
"portal.service.type=WEB",
]
}
)
for c in containers:
try:
resp = requests.post(
f"http://{c.name}:7000/open",
json={"url": target_url},
timeout=2,
)
resp.raise_for_status()
logger.info("warm_web_open_ok service=%s container=%s url=%s", service.slug, c.name, target_url)
except Exception:
logger.exception("warm_web_open_failed service=%s container=%s", service.slug, c.name)
except Exception:
logger.exception("warm_web_open_dispatch_failed service=%s", service.slug)
def cleanup_loop():
while True:
time.sleep(60)
db = SessionLocal()
try:
ensure_universal_pool()
ensure_web_pool()
for svc in db.scalars(
select(Service).where(
Service.active == True,
Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
)
).all():
if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
ensure_warm_pool(svc)
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
q = select(SessionModel).where(
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at < cutoff,
)
stale = db.scalars(q).all()
for sess in stale:
if sess.container_id and not (
sess.container_id.startswith("POOL:")
or sess.container_id.startswith("POOLIDX:")
or sess.container_id.startswith("WEBPOOLIDX:")
):
stop_runtime_container(sess.container_id)
sess.status = SessionStatus.EXPIRED
if stale:
db.commit()
except Exception:
db.rollback()
logger.exception("cleanup_loop_failed")
finally:
db.close()
def bootstrap_admin():
admin_user = os.getenv("ADMIN_USERNAME", "admin")
admin_password = os.getenv("ADMIN_PASSWORD", "admin123")
ttl_days = int(os.getenv("ADMIN_TTL_DAYS", "3650"))
db = SessionLocal()
try:
existing = db.scalar(select(User).where(User.username == admin_user))
if not existing:
db.add(
User(
username=admin_user,
password_hash=hash_password(admin_password),
active=True,
is_admin=True,
expires_at=now_utc() + dt.timedelta(days=ttl_days),
)
)
db.commit()
finally:
db.close()
@app.on_event("startup")
def startup_event():
# Multiple uvicorn workers run startup in parallel. Serialize schema bootstrap
# to avoid DDL races on first run and during schema extension.
with open("/tmp/portal-schema.lock", "w") as lock_file:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
Base.metadata.create_all(bind=engine)
ensure_schema_compatibility()
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
ensure_icons_dir()
bootstrap_admin()
db = SessionLocal()
try:
ensure_universal_pool()
ensure_web_pool()
for svc in db.scalars(
select(Service).where(
Service.active == True,
Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
)
).all():
if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
ensure_warm_pool(svc)
finally:
db.close()
thread = threading.Thread(target=cleanup_loop, daemon=True)
thread.start()
@app.get("/", response_class=HTMLResponse)
def index(request: Request, user: Optional[User] = Depends(get_current_user), db: Session = Depends(get_db)):
session_closed = (request.query_params.get("session_closed") or "").strip().lower()
launch_error = (request.query_params.get("launch_error") or "").strip().lower()
session_notice = ""
if session_closed == "idle":
session_notice = "Сессия была закрыта из-за простоя. Откройте сервис заново."
elif session_closed == "limit":
session_notice = (
f"Сессия была закрыта из-за лимита в {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). "
"Освободите один сервис и попробуйте снова."
)
elif launch_error == "max_services":
session_notice = (
f"Есть ограничение на {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). "
"Освободите один сервис и попробуйте снова."
)
if not user:
csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
response = templates.TemplateResponse(
"login.html",
{
"request": request,
"csrf_token": csrf,
"login_error": "",
"session_notice": session_notice,
},
)
response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/")
return response
services = db.scalars(
select(Service)
.join(UserServiceAccess, UserServiceAccess.service_id == Service.id)
.where(
UserServiceAccess.user_id == user.id,
Service.active == True,
Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
)
.order_by(Service.name)
).all()
service_categories = {svc.id: [] for svc in services}
categories = []
if services:
service_ids = [svc.id for svc in services]
rows = db.execute(
select(ServiceCategory.service_id, Category.id, Category.name, Category.slug)
.join(Category, Category.id == ServiceCategory.category_id)
.where(ServiceCategory.service_id.in_(service_ids))
.order_by(Category.name)
).all()
category_map = {}
for service_id, category_id, category_name, category_slug in rows:
service_categories.setdefault(service_id, []).append(
{
"id": category_id,
"name": category_name,
"slug": category_slug,
}
)
if category_id not in category_map:
category_map[category_id] = {"id": category_id, "name": category_name, "slug": category_slug}
categories = sorted(category_map.values(), key=lambda x: x["name"].lower())
selected_category_slug = (request.query_params.get("category") or "").strip().lower()
if selected_category_slug:
services = [
svc for svc in services
if any(cat["slug"] == selected_category_slug for cat in service_categories.get(svc.id, []))
]
service_comment_html = {svc.id: format_service_comment(svc.comment) for svc in services}
return templates.TemplateResponse(
"dashboard.html",
{
"request": request,
"user": user,
"services": services,
"categories": categories,
"selected_category_slug": selected_category_slug,
"service_categories": service_categories,
"service_comment_html": service_comment_html,
"csrf_token": request.cookies.get(CSRF_COOKIE, ""),
"session_notice": session_notice,
},
)
@app.get("/admin", response_class=HTMLResponse)
def admin_page(request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)):
users = db.scalars(select(User).order_by(User.id)).all()
categories = db.scalars(select(Category).order_by(Category.name)).all()
services = db.scalars(select(Service).where(Service.type.in_([ServiceType.WEB, ServiceType.RDP])).order_by(Service.id)).all()
web_services = [s for s in services if s.type == ServiceType.WEB]
rdp_services = [s for s in services if s.type == ServiceType.RDP]
service_category_map = {s.id: [] for s in services}
if services:
service_rows = db.execute(
select(ServiceCategory.service_id, ServiceCategory.category_id).where(
ServiceCategory.service_id.in_([s.id for s in services])
)
).all()
for service_id, category_id in service_rows:
service_category_map.setdefault(service_id, []).append(category_id)
acl_rows = db.scalars(select(UserServiceAccess)).all()
acl = {}
for row in acl_rows:
acl.setdefault(row.user_id, []).append(row.service_id)
for user_id in acl:
acl[user_id] = sorted(acl[user_id])
pool_status = {s.id: get_pool_status_for_service(s) for s in services}
service_health = {}
for sid, st in pool_status.items():
service_health[sid] = {
"health": st["health"],
"running": st["running"],
"desired": st["desired"],
"active_sessions": get_active_sessions_count(db, sid),
}
web_pool = get_web_pool_status()
web_totals = {
"services": len(web_services),
"running": web_pool["running"],
"desired": web_pool["desired"],
"active_sessions": sum(service_health[s.id]["active_sessions"] for s in web_services),
}
recent_sessions = db.execute(
text(
"""
SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug,
s.status, s.created_at, s.last_access_at
FROM sessions s
JOIN users u ON u.id = s.user_id
JOIN services sv ON sv.id = s.service_id
WHERE sv.type IN ('WEB','RDP')
ORDER BY s.created_at DESC
LIMIT 200
"""
)
).mappings().all()
open_stats = db.execute(
text(
"""
SELECT u.username, sv.name AS service_name, sv.slug AS service_slug, COUNT(*) AS opens
FROM sessions s
JOIN users u ON u.id = s.user_id
JOIN services sv ON sv.id = s.service_id
WHERE sv.type IN ('WEB','RDP')
GROUP BY u.username, sv.name, sv.slug
ORDER BY opens DESC, u.username ASC
LIMIT 200
"""
)
).mappings().all()
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
online_sessions = db.execute(
text(
"""
SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug,
sv.type AS service_type, s.container_id, s.created_at, s.last_access_at
FROM sessions s
JOIN users u ON u.id = s.user_id
JOIN services sv ON sv.id = s.service_id
WHERE s.status = 'ACTIVE'
AND s.last_access_at >= :cutoff
AND sv.type IN ('WEB','RDP')
ORDER BY s.last_access_at DESC, s.created_at DESC
LIMIT 500
"""
),
{"cutoff": cutoff},
).mappings().all()
return templates.TemplateResponse(
"admin.html",
{
"request": request,
"admin": admin,
"users": users,
"web_services": web_services,
"rdp_services": rdp_services,
"services": services,
"categories": categories,
"service_category_map": service_category_map,
"acl": acl,
"pool_status": pool_status,
"service_health": service_health,
"web_totals": web_totals,
"web_pool_size": WEB_POOL_SIZE,
"web_pool_buffer": WEB_POOL_BUFFER,
"recent_sessions": recent_sessions,
"open_stats": open_stats,
"online_sessions": online_sessions,
"csrf_token": request.cookies.get(CSRF_COOKIE, ""),
"max_active_services_per_user": MAX_ACTIVE_SERVICES_PER_USER,
},
)
@app.post("/login")
def login(
request: Request,
username: str = Form(...),
password: str = Form(...),
csrf_token: str = Form(...),
db: Session = Depends(get_db),
):
cookie_csrf = request.cookies.get(CSRF_COOKIE)
if not cookie_csrf or csrf_token != cookie_csrf:
raise HTTPException(status_code=403, detail="CSRF failed")
user = db.scalar(select(User).where(User.username == username))
if not user or not verify_password(password, user.password_hash):
csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
response = templates.TemplateResponse(
"login.html",
{
"request": request,
"csrf_token": csrf,
"login_error": "Неверный логин или пароль",
"session_notice": "",
},
status_code=401,
)
response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/")
return response
if not user_is_valid(user):
csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
response = templates.TemplateResponse(
"login.html",
{
"request": request,
"csrf_token": csrf,
"login_error": "Доступ к сервису приостоновлен, обратитесь к вашему менеджеру",
},
status_code=403,
)
response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/")
return response
response = RedirectResponse(url="/", status_code=303)
issue_auth_cookie(response, user)
issue_csrf_cookie(response)
audit(db, "LOGIN", f"login success: {username}", user_id=user.id)
return response
@app.post("/logout")
def logout(request: Request):
response = RedirectResponse(url="/", status_code=303)
response.delete_cookie(COOKIE_NAME, path="/")
response.delete_cookie(CSRF_COOKIE, path="/")
return response
@app.get("/go/{slug}")
def go_service(slug: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
total_started = time.perf_counter()
phase_ms = {}
def _mark(name: str, started: float) -> None:
phase_ms[name] = int((time.perf_counter() - started) * 1000)
def _emit(result: str, **extra) -> None:
payload = {
"user_id": user.id,
"service_slug": slug,
"result": result,
"total_ms": int((time.perf_counter() - total_started) * 1000),
}
payload.update(phase_ms)
payload.update(extra)
log_event("go_service_timing", **payload)
log_event("session_open_requested", user_id=user.id, service_slug=slug)
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
if not service:
raise HTTPException(status_code=404, detail="Service not found")
if service.type == ServiceType.VNC:
raise HTTPException(status_code=410, detail="VNC services are deprecated")
if not has_access(db, user.id, service.id):
raise HTTPException(status_code=403, detail="ACL denied")
user_lock_started = time.perf_counter()
try:
with allocator_lock(db, 92000 + int(user.id), timeout_seconds=GO_USER_LOCK_TIMEOUT_SECONDS):
_mark("wait_user_lock_ms", user_lock_started)
t_existing = time.perf_counter()
existing_user_session = find_active_session_for_user_service(db, user.id, service.id)
_mark("check_existing_ms", t_existing)
if existing_user_session:
_emit("reuse_session", session_id=existing_user_session.id)
return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303)
t_limit = time.perf_counter()
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
active_rows = db.scalars(
select(SessionModel).where(
SessionModel.user_id == user.id,
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
)
).all()
active_rows = sorted(active_rows, key=lambda row: row.created_at)
active_service_ids = {row.service_id for row in active_rows}
_mark("check_limit_ms", t_limit)
if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER:
oldest = next((row for row in active_rows if row.service_id != service.id), None)
if oldest:
t_rotate = time.perf_counter()
terminate_session_record(db, oldest, SessionStatus.ROTATED, stop_container=True)
db.commit()
_mark("rotate_oldest_ms", t_rotate)
log_event(
"session_rotated",
user_id=user.id,
closed_session_id=oldest.id,
closed_service_id=oldest.service_id,
new_service_id=service.id,
)
else:
_emit("max_services_redirect")
return RedirectResponse(url="/?launch_error=max_services", status_code=303)
if service.type == ServiceType.RDP:
t_rdp_owner = time.perf_counter()
active_owner = find_active_session_for_service(db, service.id)
_mark("check_rdp_owner_ms", t_rdp_owner)
if active_owner:
if active_owner.user_id != user.id:
owner = db.get(User, active_owner.user_id)
owner_name = owner.username if owner else f"id={active_owner.user_id}"
_emit("rdp_busy", owner=owner_name)
raise HTTPException(
status_code=409,
detail=f"RDP сервис уже занят пользователем {owner_name}. Попробуйте позже.",
)
_emit("reuse_rdp_session", session_id=active_owner.id)
return RedirectResponse(url=session_redirect_url(active_owner), status_code=303)
session_id = str(uuid.uuid4())
if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0:
try:
t_pool_lock = time.perf_counter()
with allocator_lock(db, 91001, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
_mark("wait_web_pool_lock_ms", t_pool_lock)
t_ensure = time.perf_counter()
ensure_web_pool()
_mark("ensure_web_pool_ms", t_ensure)
t_acquire = time.perf_counter()
slot = acquire_web_pool_slot(db)
_mark("acquire_web_slot_ms", t_acquire)
slot_cid = f"WEBPOOLIDX:{slot}"
t_dispatch = time.perf_counter()
terminate_active_slot_sessions(db, slot_cid)
dispatch_web_pool_target(slot, service)
_mark("dispatch_web_target_ms", t_dispatch)
t_commit = time.perf_counter()
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=slot_cid,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
_mark("db_commit_ms", t_commit)
except LockTimeoutError:
_emit("web_pool_lock_timeout")
raise HTTPException(status_code=503, detail="Пул WEB занят. Повторите через несколько секунд.")
except Exception as exc:
logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="web_pool", error=str(exc))
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
_emit("web_pool_create_failed", error=str(exc))
raise HTTPException(status_code=502, detail="WEB runtime failed to switch target")
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="web_pool", slot=slot)
audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
_emit("session_created_web_pool", session_id=session_id, slot=slot)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
if service_uses_universal_pool(service):
try:
t_pool_lock = time.perf_counter()
with allocator_lock(db, 91002, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
_mark("wait_universal_pool_lock_ms", t_pool_lock)
t_ensure = time.perf_counter()
ensure_universal_pool()
_mark("ensure_universal_pool_ms", t_ensure)
t_acquire = time.perf_counter()
slot = acquire_universal_slot(db)
_mark("acquire_universal_slot_ms", t_acquire)
slot_cid = f"POOLIDX:{slot}"
t_dispatch = time.perf_counter()
terminate_active_slot_sessions(db, slot_cid)
dispatch_universal_target(slot, service)
_mark("dispatch_universal_target_ms", t_dispatch)
t_commit = time.perf_counter()
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=slot_cid,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
_mark("db_commit_ms", t_commit)
except LockTimeoutError:
_emit("universal_pool_lock_timeout")
raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.")
except Exception as exc:
logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="universal_pool", error=str(exc))
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
_emit("universal_pool_create_failed", error=str(exc))
raise HTTPException(status_code=502, detail="Universal runtime failed to switch target")
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="universal_pool", slot=slot)
audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
_emit("session_created_universal_pool", session_id=session_id, slot=slot)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
if service.type == ServiceType.WEB and desired_pool_size(service) > 0:
t_warm = time.perf_counter()
ensure_warm_pool(service)
open_warm_web_url(service, service.target)
_mark("warm_pool_prepare_ms", t_warm)
t_commit = time.perf_counter()
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=f"POOL:{service.slug}",
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
_mark("db_commit_ms", t_commit)
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="warm_pool")
audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id)
_emit("session_created_warm_pool", session_id=session_id)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
try:
t_create = time.perf_counter()
container_id = create_runtime_container(service, session_id)
_mark("create_runtime_container_ms", t_create)
except Exception as exc:
logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id)
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="single_runtime", error=str(exc))
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
_emit("single_runtime_create_failed", error=str(exc))
raise HTTPException(status_code=502, detail="Session runtime failed to start")
t_commit = time.perf_counter()
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=container_id,
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
_mark("db_commit_ms", t_commit)
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="single_runtime", container_id=container_id)
audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id)
t_wait = time.perf_counter()
ready = wait_for_session_route(session_id)
_mark("wait_session_route_ms", t_wait)
log_event("session_route_ready", session_id=session_id, ready=ready)
_emit("session_created_single_runtime", session_id=session_id, ready=ready)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
except LockTimeoutError:
_emit("user_lock_timeout")
raise HTTPException(status_code=429, detail="Слишком много параллельных запусков. Повторите через несколько секунд.")
@app.get("/svc/{slug}/", response_class=HTMLResponse)
def service_wait_page(slug: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
if not service:
raise HTTPException(status_code=404, detail="Service not found")
if not has_access(db, user.id, service.id):
raise HTTPException(status_code=403, detail="ACL denied")
return HTMLResponse(
content="""
<!doctype html>
<html>
<head>
<meta charset='utf-8'>
<title>Service Starting</title>
<style>
body { font-family: sans-serif; background: #f4f6f8; display: grid; place-items: center; height: 100vh; margin: 0; color:#1b3145; }
.card { background: #fff; padding: 1rem 1.2rem; border-radius: 10px; box-shadow: 0 8px 20px rgba(0,0,0,.08); min-width: 340px; }
.title { font-weight: 700; margin-bottom: 0.5rem; }
.state { margin-bottom: 0.6rem; }
ul { margin: 0; padding-left: 1.1rem; }
li { margin: 0.2rem 0; }
</style>
</head>
<body>
<div class="card">
<div class="title">Сервис запускается</div>
<div class="state" id="state">Проверка...</div>
<ul id="steps"></ul>
</div>
<script>
const slug = window.location.pathname.replace(/^\\/svc\\//, '').replace(/\\/$/, '');
async function tick() {
const r = await fetch(`/api/services/${slug}/status`, {credentials:'include'});
if (!r.ok) return;
const data = await r.json();
document.getElementById('state').textContent = data.message || 'Запуск...';
const ul = document.getElementById('steps');
ul.innerHTML = '';
(data.steps || []).forEach((x) => {
const li = document.createElement('li');
li.textContent = x;
ul.appendChild(li);
});
if (data.ready) window.location.replace(`/svc/${slug}/`);
}
setInterval(tick, 1000);
tick();
</script>
</body>
</html>
""".strip(),
status_code=200,
)
@app.get("/s/{session_id}/", response_class=HTMLResponse)
def session_wait_page(session_id: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
sess = db.get(SessionModel, session_id)
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
raise HTTPException(status_code=410, detail="Session is not active")
service = db.get(Service, sess.service_id)
service_title = service.name if service else "Сервис"
redirect_target = session_redirect_url(sess)
return HTMLResponse(
content=f"""
<!doctype html>
<html>
<head>
<meta charset='utf-8'>
<title>{service_title}</title>
<style>
body {{ font-family: sans-serif; background: #f4f6f8; display: grid; place-items: center; height: 100vh; margin: 0; color:#1b3145; }}
.card {{ background: #fff; padding: 1rem 1.2rem; border-radius: 10px; box-shadow: 0 8px 20px rgba(0,0,0,.08); min-width: 340px; }}
.title {{ font-weight: 700; margin-bottom: 0.5rem; }}
.state {{ margin-bottom: 0.6rem; }}
ul {{ margin: 0; padding-left: 1.1rem; }}
li {{ margin: 0.2rem 0; }}
</style>
</head>
<body>
<div class="card">
<div class="title">Сессия запускается</div>
<div class="state" id="state">Проверка...</div>
<ul id="steps"></ul>
<small>{session_id}</small>
</div>
<script>
const sessionId = "{session_id}";
async function tick() {{
const r = await fetch(`/api/sessions/${{sessionId}}/status`, {{credentials:'include'}});
if (!r.ok) return;
const data = await r.json();
document.getElementById('state').textContent = data.message || 'Запуск...';
const ul = document.getElementById('steps');
ul.innerHTML = '';
(data.steps || []).forEach((x) => {{
const li = document.createElement('li');
li.textContent = x;
ul.appendChild(li);
}});
if (data.ready) window.location.replace(data.redirect_url || "{redirect_target}");
}}
setInterval(tick, 1000);
tick();
</script>
</body>
</html>
""".strip(),
status_code=200,
)
@app.get("/s/{session_id}/view", response_class=HTMLResponse)
def session_view_page(session_id: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
sess = db.get(SessionModel, session_id)
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
raise HTTPException(status_code=410, detail="Session is not active")
service = db.get(Service, sess.service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
iframe_src = None
if sess.container_id and sess.container_id.startswith("POOL:"):
iframe_src = f"/svc/{service.slug}/?sid={session_id}"
elif sess.container_id and sess.container_id.startswith("WEBPOOLIDX:"):
try:
slot = int(sess.container_id.split(":", 1)[1])
iframe_src = f"/w/{slot}/?sid={session_id}"
except Exception:
iframe_src = None
elif sess.container_id and sess.container_id.startswith("POOLIDX:"):
try:
slot = int(sess.container_id.split(":", 1)[1])
iframe_src = f"/u/{slot}/?sid={session_id}"
except Exception:
iframe_src = None
if iframe_src:
return HTMLResponse(
content=f"""
<!doctype html>
<html>
<head>
<meta charset='utf-8'>
<title>{service.name}</title>
<style>
html,body,iframe {{ margin:0; width:100%; height:100%; border:0; background:#0f1720; }}
</style>
</head>
<body>
<iframe src="{iframe_src}" allow="clipboard-read; clipboard-write"></iframe>
</body>
</html>
""".strip()
)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
@app.post("/api/sessions/{session_id}/touch")
def touch_session(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
sess = db.get(SessionModel, session_id)
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
reason = session_closed_reason(sess, db)
log_event(
"session_touch_rejected",
level=logging.WARNING,
session_id=session_id,
user_id=user.id,
status=sess.status.value,
reason=reason,
)
return JSONResponse(
status_code=410,
content={
"ok": False,
"reason": reason,
"status": sess.status.value,
},
)
sess.last_access_at = now_utc()
db.commit()
return {"ok": True}
@app.post("/api/sessions/{session_id}/close")
def close_session(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
sess = db.get(SessionModel, session_id)
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
log_event(
"session_close_already_closed",
session_id=session_id,
user_id=user.id,
status=sess.status.value,
reason=session_closed_reason(sess, db),
)
return {"ok": True, "status": sess.status.value}
terminate_session_record(db, sess, SessionStatus.TERMINATED, stop_container=True)
db.commit()
log_event("session_closed_by_user", session_id=session_id, user_id=user.id)
return {"ok": True, "status": "TERMINATED"}
@app.get("/api/services/{slug}/status")
def service_status(slug: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
if not service:
raise HTTPException(status_code=404, detail="Service not found")
if service.type == ServiceType.VNC:
raise HTTPException(status_code=410, detail="VNC services are deprecated")
if not has_access(db, user.id, service.id):
raise HTTPException(status_code=403, detail="ACL denied")
pool = get_pool_status_for_service(service)
route_ok = route_ready(f"/svc/{slug}/")
ready = route_ok and (pool["running"] > 0 if desired_pool_size(service) > 0 else True)
steps = [
f"ACL: OK ({user.username})",
f"Пул: {pool['running']} / {pool['desired']}",
f"Маршрут /svc/{slug}/: {'OK' if route_ok else 'ожидание'}",
]
return {
"ready": ready,
"message": "Готово, открываем..." if ready else "Поднимаем контейнер и маршрут...",
"steps": steps,
}
@app.get("/api/sessions/{session_id}/status")
def session_status(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
sess = db.get(SessionModel, session_id)
if not sess or sess.user_id != user.id:
raise HTTPException(status_code=404, detail="Session not found")
if sess.status != SessionStatus.ACTIVE:
raise HTTPException(status_code=410, detail="Session is not active")
service = db.get(Service, sess.service_id)
pooled_web = bool(sess.container_id and sess.container_id.startswith("POOL:") and service and service.type == ServiceType.WEB)
web_pool_idx = None
universal_pool_idx = None
if sess.container_id and sess.container_id.startswith("WEBPOOLIDX:"):
try:
web_pool_idx = int(sess.container_id.split(":", 1)[1])
except Exception:
web_pool_idx = None
if sess.container_id and sess.container_id.startswith("POOLIDX:"):
try:
universal_pool_idx = int(sess.container_id.split(":", 1)[1])
except Exception:
universal_pool_idx = None
route_path = f"/svc/{service.slug}/" if pooled_web and service else f"/s/{session_id}/"
if web_pool_idx is not None:
route_path = f"/w/{web_pool_idx}/"
if universal_pool_idx is not None:
route_path = f"/u/{universal_pool_idx}/"
route_ok = route_ready(route_path)
running = container_running(sess.container_id)
ready = running and route_ok
steps = [
f"Контейнер: {'running' if running else 'starting'}",
f"Маршрут {route_path}: {'OK' if route_ok else 'ожидание'}",
]
payload = {
"ready": ready,
"message": "Готово, открываем..." if ready else "Запуск сессии...",
"steps": steps,
}
if pooled_web:
payload["redirect_url"] = f"/s/{session_id}/view"
if web_pool_idx is not None:
payload["redirect_url"] = f"/s/{session_id}/view"
if universal_pool_idx is not None:
payload["redirect_url"] = f"/s/{session_id}/view"
return payload
@app.post("/api/admin/services")
def create_service(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service_type = ServiceType(payload["type"])
if service_type == ServiceType.VNC:
raise HTTPException(status_code=400, detail="VNC services are no longer supported")
target = payload["target"]
if service_type == ServiceType.WEB:
target = normalize_web_target(target)
elif service_type == ServiceType.RDP:
parse_rdp_target(target)
service = Service(
name=payload["name"],
slug=payload["slug"],
type=service_type,
target=target,
comment=payload.get("comment", ""),
active=payload.get("active", True),
warm_pool_size=max(0, int(payload.get("warm_pool_size", 0))),
)
db.add(service)
db.flush()
set_service_categories(db, service.id, payload.get("category_ids", []))
db.commit()
if service.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
ensure_warm_pool(service)
elif service_uses_universal_pool(service):
ensure_universal_pool()
return {"id": service.id}
@app.get("/api/admin/services/{service_id}/containers/status")
def service_containers_status(service_id: int, _: User = Depends(require_admin), db: Session = Depends(get_db)):
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
out = get_pool_detailed_status(service)
out["active_sessions"] = get_active_sessions_count(db, service.id)
return out
@app.post("/api/admin/services/{service_id}/icon")
async def upload_service_icon(
service_id: int,
request: Request,
file: UploadFile = File(...),
_: User = Depends(require_admin),
db: Session = Depends(get_db),
):
validate_csrf(request)
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
new_path = await store_service_icon(service, file)
old_path = service.icon_path
service.icon_path = new_path
db.commit()
if old_path and old_path != new_path:
remove_icon_file(old_path)
return {"ok": True, "icon_path": new_path}
@app.delete("/api/admin/services/{service_id}/icon")
def delete_service_icon(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
old_path = service.icon_path
service.icon_path = ""
db.commit()
remove_icon_file(old_path)
return {"ok": True}
@app.put("/api/admin/services/{service_id}")
def edit_service(service_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
for key in ["name", "slug", "target", "active", "comment"]:
if key in payload:
setattr(service, key, payload[key])
if "type" in payload:
service.type = ServiceType(payload["type"])
if service.type == ServiceType.VNC:
raise HTTPException(status_code=400, detail="VNC services are no longer supported")
if service.type == ServiceType.WEB:
service.target = normalize_web_target(service.target)
elif service.type == ServiceType.RDP:
parse_rdp_target(service.target)
if "warm_pool_size" in payload:
service.warm_pool_size = max(0, int(payload["warm_pool_size"]))
if "category_ids" in payload:
set_service_categories(db, service.id, payload.get("category_ids", []))
db.commit()
if service.type == ServiceType.WEB:
if WEB_POOL_SIZE <= 0:
ensure_warm_pool(service)
open_warm_web_url(service, service.target)
elif service_uses_universal_pool(service):
ensure_universal_pool()
return {"ok": True}
@app.delete("/api/admin/services/{service_id}")
def delete_service(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
if service.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
ensure_warm_pool(service, 0)
remove_icon_file(service.icon_path)
db.delete(service)
db.commit()
return {"ok": True}
@app.post("/api/admin/services/{service_id}/prewarm")
def prewarm_now(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service = db.get(Service, service_id)
if not service:
raise HTTPException(status_code=404, detail="Service not found")
if service.type == ServiceType.WEB:
ensure_web_pool()
return {"ok": True, "pool": get_web_pool_status()}
if service_uses_universal_pool(service):
ensure_universal_pool()
return {"ok": True, "pool": get_universal_pool_status()}
if service.type == ServiceType.RDP:
return {"ok": True, "pool": get_pool_status_for_service(service), "message": "RDP запускается on-demand"}
ensure_warm_pool(service)
return {"ok": True, "pool": get_pool_status_for_service(service)}
@app.post("/api/admin/categories")
def create_category(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
name = (payload.get("name") or "").strip()
slug = (payload.get("slug") or "").strip().lower().replace(" ", "-")
if not name:
raise HTTPException(status_code=400, detail="Category name is required")
if not slug:
raise HTTPException(status_code=400, detail="Category slug is required")
exists = db.scalar(select(Category).where((Category.name == name) | (Category.slug == slug)))
if exists:
raise HTTPException(status_code=409, detail="Category already exists")
category = Category(name=name, slug=slug)
db.add(category)
db.commit()
return {"id": category.id}
@app.delete("/api/admin/categories/{category_id}")
def delete_category(category_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
category = db.get(Category, category_id)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
db.delete(category)
db.commit()
return {"ok": True}
@app.put("/api/admin/web-pool-size")
def update_web_pool_size(payload: dict, request: Request, _: User = Depends(require_admin)):
validate_csrf(request)
global WEB_POOL_SIZE
value = max(0, int(payload.get("size", WEB_POOL_SIZE)))
WEB_POOL_SIZE = value
ensure_web_pool()
return {"ok": True, "size": WEB_POOL_SIZE, "pool": get_web_pool_status()}
@app.post("/api/admin/users")
def create_user(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
expires_at = dt.datetime.fromisoformat(payload["expires_at"])
user = User(
username=payload["username"],
password_hash=hash_password(payload["password"]),
expires_at=expires_at,
active=payload.get("active", True),
is_admin=payload.get("is_admin", False),
)
db.add(user)
db.commit()
return {"id": user.id}
@app.put("/api/admin/users/{user_id}")
def edit_user(user_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
user = db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
for key in ["username", "active", "is_admin"]:
if key in payload:
setattr(user, key, payload[key])
if "password" in payload and payload["password"]:
user.password_hash = hash_password(payload["password"])
if "expires_at" in payload:
user.expires_at = dt.datetime.fromisoformat(payload["expires_at"])
db.commit()
return {"ok": True}
@app.delete("/api/admin/users/{user_id}")
def delete_user(user_id: int, request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
user = db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.id == admin.id:
raise HTTPException(status_code=400, detail="Cannot delete current admin")
db.delete(user)
db.commit()
return {"ok": True}
@app.put("/api/admin/users/{user_id}/acl")
def set_acl(user_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
user = db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
service_ids = set(payload.get("service_ids", []))
existing = db.scalars(select(UserServiceAccess).where(UserServiceAccess.user_id == user_id)).all()
existing_map = {x.service_id: x for x in existing}
for sid in service_ids:
if sid not in existing_map:
db.add(UserServiceAccess(user_id=user_id, service_id=sid))
for sid, row in existing_map.items():
if sid not in service_ids:
db.delete(row)
db.commit()
return {"ok": True}