6f17193312
- dashboard.html: overlay div moved before <script> so getElementById works; double rAF ensures browser paints spinner before navigation - main.py: pooled_rdp route fix — session_status now returns /svc/<slug>/ route and redirect_url for POOL: RDP sessions (was always ready instantly) - docker-compose.yml: parametrise env vars via .env for easier tuning Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2716 lines
104 KiB
Python
2716 lines
104 KiB
Python
import datetime as dt
|
|
import enum
|
|
import fcntl
|
|
import json
|
|
import re
|
|
import logging
|
|
import os
|
|
from pathlib import Path
|
|
import secrets
|
|
import threading
|
|
import time
|
|
import uuid
|
|
import contextvars
|
|
from urllib.parse import parse_qs, unquote, urlparse
|
|
from typing import Optional
|
|
|
|
import docker
|
|
import requests
|
|
from fastapi import Depends, FastAPI, File, Form, HTTPException, Query, Request, UploadFile, status
|
|
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from itsdangerous import BadSignature, URLSafeTimedSerializer
|
|
from markupsafe import Markup, escape
|
|
from passlib.context import CryptContext
|
|
from sqlalchemy import (
|
|
Boolean,
|
|
DateTime,
|
|
Enum,
|
|
ForeignKey,
|
|
Integer,
|
|
String,
|
|
Text,
|
|
UniqueConstraint,
|
|
create_engine,
|
|
select,
|
|
text,
|
|
)
|
|
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, sessionmaker
|
|
|
|
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+psycopg2://portal:portal@db:5432/portal")
|
|
COOKIE_NAME = "portal_auth"
|
|
CSRF_COOKIE = "csrf_token"
|
|
COOKIE_MAX_AGE = 8 * 60 * 60
|
|
SESSION_IDLE_SECONDS = int(os.getenv("SESSION_IDLE_SECONDS", "7200"))
|
|
PUBLIC_HOST = os.getenv("PUBLIC_HOST", "stend.4mont.ru")
|
|
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
|
|
LOG_SLOW_REQUEST_MS = int(os.getenv("LOG_SLOW_REQUEST_MS", "2000"))
|
|
GO_USER_LOCK_TIMEOUT_SECONDS = float(os.getenv("GO_USER_LOCK_TIMEOUT_SECONDS", "8.0"))
|
|
GO_POOL_LOCK_TIMEOUT_SECONDS = float(os.getenv("GO_POOL_LOCK_TIMEOUT_SECONDS", "20.0"))
|
|
POOL_DISPATCH_RETRIES = int(os.getenv("POOL_DISPATCH_RETRIES", "6"))
|
|
POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS = float(os.getenv("POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS", "2.0"))
|
|
POOL_DISPATCH_SLEEP_SECONDS = float(os.getenv("POOL_DISPATCH_SLEEP_SECONDS", "0.3"))
|
|
TRAEFIK_INTERNAL_URL = os.getenv("TRAEFIK_INTERNAL_URL", "http://traefik")
|
|
PREWARM_POOL_SIZE = int(os.getenv("PREWARM_POOL_SIZE", "2"))
|
|
UNIVERSAL_POOL_SIZE = int(os.getenv("UNIVERSAL_POOL_SIZE", "0"))
|
|
WEB_POOL_SIZE = int(os.getenv("WEB_POOL_SIZE", "20"))
|
|
WEB_POOL_BUFFER = int(os.getenv("WEB_POOL_BUFFER", "2"))
|
|
X11VNC_FLAGS = os.getenv("X11VNC_FLAGS", "-wait 5 -defer 5 -threads")
|
|
MAX_ACTIVE_SERVICES_PER_USER = int(os.getenv("MAX_ACTIVE_SERVICES_PER_USER", "4"))
|
|
WEB_RESOLUTION_MIN_WIDTH = int(os.getenv("WEB_RESOLUTION_MIN_WIDTH", "1024"))
|
|
WEB_RESOLUTION_MIN_HEIGHT = int(os.getenv("WEB_RESOLUTION_MIN_HEIGHT", "720"))
|
|
WEB_RESOLUTION_MAX_WIDTH = int(os.getenv("WEB_RESOLUTION_MAX_WIDTH", "3840"))
|
|
WEB_RESOLUTION_MAX_HEIGHT = int(os.getenv("WEB_RESOLUTION_MAX_HEIGHT", "2160"))
|
|
ENABLE_STARTUP_MAINTENANCE = os.getenv("ENABLE_STARTUP_MAINTENANCE", "1") == "1"
|
|
ICON_UPLOAD_MAX_BYTES = 2 * 1024 * 1024
|
|
ICON_UPLOAD_TYPES = {
|
|
"image/png": "png",
|
|
"image/jpeg": "jpg",
|
|
"image/webp": "webp",
|
|
}
|
|
SERVICE_ICONS_DIR = Path("static/service-icons")
|
|
|
|
logging.basicConfig(
|
|
level=LOG_LEVEL,
|
|
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
|
)
|
|
logger = logging.getLogger("portal")
|
|
request_id_ctx = contextvars.ContextVar("request_id", default="-")
|
|
maintenance_lock_file = None
|
|
|
|
|
|
def _normalize_log_value(value):
|
|
if isinstance(value, (str, int, float, bool)) or value is None:
|
|
return value
|
|
if isinstance(value, dt.datetime):
|
|
return value.isoformat()
|
|
return str(value)
|
|
|
|
|
|
def log_event(event: str, level: int = logging.INFO, **fields) -> None:
|
|
payload = {"event": event, "req_id": request_id_ctx.get()}
|
|
for key, value in fields.items():
|
|
payload[key] = _normalize_log_value(value)
|
|
logger.log(level, json.dumps(payload, ensure_ascii=False, separators=(",", ":")))
|
|
|
|
SIGNING_KEY = os.getenv("SIGNING_KEY", secrets.token_urlsafe(32))
|
|
serializer = URLSafeTimedSerializer(SIGNING_KEY, salt="portal-auth")
|
|
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
|
|
|
|
engine = create_engine(DATABASE_URL, pool_pre_ping=True)
|
|
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
app = FastAPI(title="МОНТ - инфрастуктурный полигон")
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
|
|
@app.middleware("http")
|
|
async def request_logging_middleware(request: Request, call_next):
|
|
req_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
|
|
token = request_id_ctx.set(req_id)
|
|
started = time.time()
|
|
client_ip = request.client.host if request.client else "-"
|
|
user_agent = request.headers.get("user-agent", "-")
|
|
try:
|
|
response = await call_next(request)
|
|
except Exception:
|
|
log_event(
|
|
"request_failed",
|
|
level=logging.ERROR,
|
|
method=request.method,
|
|
path=request.url.path,
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
)
|
|
request_id_ctx.reset(token)
|
|
raise
|
|
duration_ms = int((time.time() - started) * 1000)
|
|
level = logging.INFO
|
|
if response.status_code >= 500:
|
|
level = logging.ERROR
|
|
elif response.status_code >= 400:
|
|
level = logging.WARNING
|
|
log_event(
|
|
"request",
|
|
level=level,
|
|
method=request.method,
|
|
path=request.url.path,
|
|
query=str(request.url.query or ""),
|
|
status=response.status_code,
|
|
duration_ms=duration_ms,
|
|
client_ip=client_ip,
|
|
user_agent=user_agent,
|
|
)
|
|
if duration_ms >= LOG_SLOW_REQUEST_MS:
|
|
log_event(
|
|
"slow_request",
|
|
level=logging.WARNING,
|
|
method=request.method,
|
|
path=request.url.path,
|
|
duration_ms=duration_ms,
|
|
threshold_ms=LOG_SLOW_REQUEST_MS,
|
|
)
|
|
response.headers["X-Request-ID"] = req_id
|
|
request_id_ctx.reset(token)
|
|
return response
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
|
|
class ServiceType(str, enum.Enum):
|
|
WEB = "WEB"
|
|
VNC = "VNC"
|
|
RDP = "RDP"
|
|
|
|
|
|
class SessionStatus(str, enum.Enum):
|
|
ACTIVE = "ACTIVE"
|
|
EXPIRED = "EXPIRED"
|
|
TERMINATED = "TERMINATED"
|
|
ROTATED = "ROTATED"
|
|
|
|
|
|
class User(Base):
|
|
__tablename__ = "users"
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
|
username: Mapped[str] = mapped_column(String(64), unique=True, index=True)
|
|
password_hash: Mapped[str] = mapped_column(String(255))
|
|
expires_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), index=True)
|
|
active: Mapped[bool] = mapped_column(Boolean, default=True, index=True)
|
|
is_admin: Mapped[bool] = mapped_column(Boolean, default=False)
|
|
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
|
|
|
|
|
|
class Service(Base):
|
|
__tablename__ = "services"
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
|
name: Mapped[str] = mapped_column(String(128))
|
|
slug: Mapped[str] = mapped_column(String(64), unique=True, index=True)
|
|
type: Mapped[ServiceType] = mapped_column(Enum(ServiceType), index=True)
|
|
target: Mapped[str] = mapped_column(Text)
|
|
comment: Mapped[str] = mapped_column(Text, default="")
|
|
icon_path: Mapped[str] = mapped_column(Text, default="")
|
|
active: Mapped[bool] = mapped_column(Boolean, default=True)
|
|
warm_pool_size: Mapped[int] = mapped_column(Integer, default=0)
|
|
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
|
|
|
|
|
|
class Category(Base):
|
|
__tablename__ = "categories"
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
|
name: Mapped[str] = mapped_column(String(128), unique=True, index=True)
|
|
slug: Mapped[str] = mapped_column(String(64), unique=True, index=True)
|
|
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
|
|
|
|
|
|
class ServiceCategory(Base):
|
|
__tablename__ = "service_categories"
|
|
__table_args__ = (UniqueConstraint("service_id", "category_id", name="uq_service_category"),)
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
|
service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True)
|
|
category_id: Mapped[int] = mapped_column(ForeignKey("categories.id", ondelete="CASCADE"), index=True)
|
|
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
|
|
|
|
|
|
class UserServiceAccess(Base):
|
|
__tablename__ = "user_service_access"
|
|
__table_args__ = (UniqueConstraint("user_id", "service_id", name="uq_user_service"),)
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
|
user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True)
|
|
service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True)
|
|
granted_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
|
|
|
|
|
|
class SessionModel(Base):
|
|
__tablename__ = "sessions"
|
|
|
|
id: Mapped[str] = mapped_column(String(36), primary_key=True)
|
|
user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True)
|
|
service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True)
|
|
status: Mapped[SessionStatus] = mapped_column(Enum(SessionStatus), default=SessionStatus.ACTIVE, index=True)
|
|
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True)
|
|
last_access_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True)
|
|
container_id: Mapped[Optional[str]] = mapped_column(String(128), nullable=True)
|
|
|
|
|
|
class AuditLog(Base):
|
|
__tablename__ = "audit_logs"
|
|
|
|
id: Mapped[int] = mapped_column(Integer, primary_key=True)
|
|
user_id: Mapped[Optional[int]] = mapped_column(Integer, nullable=True, index=True)
|
|
action: Mapped[str] = mapped_column(String(128), index=True)
|
|
details: Mapped[str] = mapped_column(Text)
|
|
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc), index=True)
|
|
|
|
|
|
def now_utc() -> dt.datetime:
|
|
return dt.datetime.now(dt.timezone.utc)
|
|
|
|
|
|
def session_closed_reason(sess: SessionModel, db: Session) -> str:
|
|
if not sess:
|
|
return "idle"
|
|
if sess.status == SessionStatus.EXPIRED:
|
|
return "idle"
|
|
if sess.status == SessionStatus.ROTATED:
|
|
return "limit"
|
|
if sess.status == SessionStatus.TERMINATED:
|
|
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
|
|
active_rows = db.scalars(
|
|
select(SessionModel).where(
|
|
SessionModel.user_id == sess.user_id,
|
|
SessionModel.status == SessionStatus.ACTIVE,
|
|
SessionModel.last_access_at >= cutoff,
|
|
)
|
|
).all()
|
|
active_service_ids = {row.service_id for row in active_rows}
|
|
if len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER and sess.service_id not in active_service_ids:
|
|
return "limit"
|
|
return "idle"
|
|
|
|
|
|
def normalize_web_target(url: str) -> str:
|
|
raw = (url or "").strip()
|
|
if not raw:
|
|
return raw
|
|
if raw.startswith(("http://", "https://")):
|
|
return raw
|
|
return f"http://{raw}"
|
|
|
|
|
|
def format_service_comment(raw_text: str) -> Markup:
|
|
raw = (raw_text or "").replace("\r\n", "\n").replace("\r", "\n").strip()
|
|
if not raw:
|
|
return Markup("")
|
|
escaped = str(escape(raw))
|
|
# Support pasted/plain markdown-like bold fragments.
|
|
escaped = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", escaped, flags=re.DOTALL)
|
|
# Allow a small safe subset of pasted HTML tags.
|
|
replacements = {
|
|
"<b>": "<b>",
|
|
"</b>": "</b>",
|
|
"<strong>": "<strong>",
|
|
"</strong>": "</strong>",
|
|
"<i>": "<i>",
|
|
"</i>": "</i>",
|
|
"<em>": "<em>",
|
|
"</em>": "</em>",
|
|
"<u>": "<u>",
|
|
"</u>": "</u>",
|
|
"<br>": "<br>",
|
|
"<br/>": "<br>",
|
|
"<br />": "<br>",
|
|
}
|
|
for src, dst in replacements.items():
|
|
escaped = escaped.replace(src, dst)
|
|
escaped = escaped.replace("\n", "<br>")
|
|
return Markup(escaped)
|
|
|
|
|
|
def parse_rdp_target(target: str) -> dict:
|
|
raw = (target or "").strip()
|
|
if not raw:
|
|
raise HTTPException(status_code=400, detail="Empty RDP target")
|
|
|
|
parsed = urlparse(raw if "://" in raw else f"//{raw}")
|
|
host = parsed.hostname
|
|
if not host:
|
|
raise HTTPException(status_code=400, detail="Invalid RDP target. Use host:port or rdp://user:pass@host:port")
|
|
port = parsed.port or 3389
|
|
|
|
username = unquote(parsed.username) if parsed.username else ""
|
|
password = unquote(parsed.password) if parsed.password else ""
|
|
|
|
query = parse_qs(parsed.query or "")
|
|
if not username:
|
|
username = (query.get("u", [""])[0] or query.get("user", [""])[0] or "").strip()
|
|
if not password:
|
|
password = (query.get("p", [""])[0] or query.get("password", [""])[0] or "").strip()
|
|
|
|
domain = (query.get("domain", [""])[0] or query.get("d", [""])[0] or "").strip()
|
|
security = (query.get("sec", [""])[0] or query.get("security", [""])[0] or "").strip().lower()
|
|
if security and security not in {"nla", "tls", "rdp"}:
|
|
raise HTTPException(status_code=400, detail="Invalid RDP security. Use one of: nla, tls, rdp")
|
|
|
|
return {
|
|
"host": host,
|
|
"port": str(port),
|
|
"user": username,
|
|
"password": password,
|
|
"domain": domain,
|
|
"security": security,
|
|
}
|
|
|
|
|
|
def set_service_categories(db: Session, service_id: int, category_ids: list[int]) -> None:
|
|
normalized = sorted({int(x) for x in (category_ids or [])})
|
|
if normalized:
|
|
existing_ids = set(db.scalars(select(Category.id).where(Category.id.in_(normalized))).all())
|
|
missing = sorted(set(normalized) - existing_ids)
|
|
if missing:
|
|
raise HTTPException(status_code=400, detail=f"Unknown category ids: {missing}")
|
|
|
|
existing_links = db.scalars(select(ServiceCategory).where(ServiceCategory.service_id == service_id)).all()
|
|
current = {row.category_id: row for row in existing_links}
|
|
wanted = set(normalized)
|
|
|
|
for cat_id in wanted:
|
|
if cat_id not in current:
|
|
db.add(ServiceCategory(service_id=service_id, category_id=cat_id))
|
|
for cat_id, row in current.items():
|
|
if cat_id not in wanted:
|
|
db.delete(row)
|
|
|
|
|
|
def service_uses_universal_pool(service: Service) -> bool:
|
|
return UNIVERSAL_POOL_SIZE > 0 and service.type == ServiceType.RDP
|
|
|
|
|
|
def universal_container_name(slot: int) -> str:
|
|
return f"portal-universal-{slot}"
|
|
|
|
|
|
def web_pool_container_name(slot: int) -> str:
|
|
return f"portal-webpool-{slot}"
|
|
|
|
|
|
def ensure_icons_dir() -> None:
|
|
SERVICE_ICONS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def remove_icon_file(icon_path: str) -> None:
|
|
if not icon_path or not icon_path.startswith("/static/service-icons/"):
|
|
return
|
|
filename = icon_path.rsplit("/", 1)[-1]
|
|
candidate = SERVICE_ICONS_DIR / filename
|
|
try:
|
|
candidate.unlink(missing_ok=True)
|
|
except Exception:
|
|
logger.exception("icon_delete_failed path=%s", candidate)
|
|
|
|
|
|
async def store_service_icon(service: Service, upload: UploadFile) -> str:
|
|
ensure_icons_dir()
|
|
content_type = (upload.content_type or "").lower().strip()
|
|
ext = ICON_UPLOAD_TYPES.get(content_type)
|
|
if not ext:
|
|
raise HTTPException(status_code=400, detail="Unsupported file type. Use PNG/JPG/WEBP")
|
|
|
|
payload = await upload.read(ICON_UPLOAD_MAX_BYTES + 1)
|
|
if len(payload) > ICON_UPLOAD_MAX_BYTES:
|
|
raise HTTPException(status_code=400, detail="File too large. Max 2MB")
|
|
if not payload:
|
|
raise HTTPException(status_code=400, detail="Empty file")
|
|
|
|
stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%d_%H%M%S")
|
|
filename = f"svc_{service.id}_{stamp}.{ext}"
|
|
target = SERVICE_ICONS_DIR / filename
|
|
target.write_bytes(payload)
|
|
return f"/static/service-icons/{filename}"
|
|
|
|
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def audit(db: Session, action: str, details: str, user_id: Optional[int] = None) -> None:
|
|
db.add(AuditLog(user_id=user_id, action=action, details=details))
|
|
db.commit()
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
return pwd_context.hash(password)
|
|
|
|
|
|
def verify_password(password: str, password_hash: str) -> bool:
|
|
return pwd_context.verify(password, password_hash)
|
|
|
|
|
|
def user_is_valid(user: User) -> bool:
|
|
return bool(user.active and user.expires_at > now_utc())
|
|
|
|
|
|
def issue_auth_cookie(response: RedirectResponse, user: User) -> None:
|
|
token = serializer.dumps({"user_id": user.id})
|
|
response.set_cookie(
|
|
key=COOKIE_NAME,
|
|
value=token,
|
|
httponly=True,
|
|
secure=True,
|
|
samesite="strict",
|
|
max_age=COOKIE_MAX_AGE,
|
|
path="/",
|
|
)
|
|
|
|
|
|
def issue_csrf_cookie(response: RedirectResponse) -> str:
|
|
token = secrets.token_urlsafe(24)
|
|
response.set_cookie(
|
|
key=CSRF_COOKIE,
|
|
value=token,
|
|
httponly=False,
|
|
secure=True,
|
|
samesite="strict",
|
|
max_age=COOKIE_MAX_AGE,
|
|
path="/",
|
|
)
|
|
return token
|
|
|
|
|
|
def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]:
|
|
raw = request.cookies.get(COOKIE_NAME)
|
|
if not raw:
|
|
return None
|
|
try:
|
|
payload = serializer.loads(raw, max_age=COOKIE_MAX_AGE)
|
|
except BadSignature:
|
|
return None
|
|
user = db.get(User, int(payload["user_id"]))
|
|
if not user or not user_is_valid(user):
|
|
return None
|
|
return user
|
|
|
|
|
|
def require_user(user: Optional[User] = Depends(get_current_user)) -> User:
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized")
|
|
return user
|
|
|
|
|
|
def require_admin(user: User = Depends(require_user)) -> User:
|
|
if not user.is_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin only")
|
|
return user
|
|
|
|
|
|
def validate_csrf(request: Request) -> None:
|
|
cookie = request.cookies.get(CSRF_COOKIE)
|
|
form_val = request.headers.get("X-CSRF-Token")
|
|
if request.headers.get("content-type", "").startswith("application/x-www-form-urlencoded"):
|
|
return
|
|
if not cookie or not form_val or cookie != form_val:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="CSRF failed")
|
|
|
|
|
|
def has_access(db: Session, user_id: int, service_id: int) -> bool:
|
|
q = select(UserServiceAccess).where(
|
|
UserServiceAccess.user_id == user_id,
|
|
UserServiceAccess.service_id == service_id,
|
|
)
|
|
return db.scalar(q) is not None
|
|
|
|
|
|
def docker_client():
|
|
return docker.from_env()
|
|
|
|
|
|
def session_router_name(session_id: str) -> str:
|
|
return f"sess-{session_id.replace('-', '')[:16]}"
|
|
|
|
|
|
def _is_pool_name_conflict(exc: Exception) -> bool:
|
|
msg = str(exc).lower()
|
|
return ("already in use" in msg) or ("marked for removal" in msg)
|
|
|
|
|
|
def _remove_container_by_name(d, name: str) -> None:
|
|
try:
|
|
old = d.containers.get(name)
|
|
old.remove(force=True)
|
|
except docker.errors.NotFound:
|
|
return
|
|
except Exception:
|
|
logger.exception("pool_container_remove_failed name=%s", name)
|
|
|
|
|
|
def ensure_universal_pool() -> None:
|
|
if UNIVERSAL_POOL_SIZE <= 0:
|
|
return
|
|
d = docker_client()
|
|
image = "portal-universal-runtime:latest"
|
|
|
|
for i in range(UNIVERSAL_POOL_SIZE, 100):
|
|
name = universal_container_name(i)
|
|
try:
|
|
c = d.containers.get(name)
|
|
c.stop(timeout=5)
|
|
except docker.errors.NotFound:
|
|
break
|
|
except Exception:
|
|
logger.exception("universal_pool_scale_down_failed slot=%s", i)
|
|
|
|
for i in range(UNIVERSAL_POOL_SIZE):
|
|
name = universal_container_name(i)
|
|
path = f"/u/{i}/"
|
|
router = f"upool-{i}"
|
|
labels = {
|
|
"traefik.enable": "true",
|
|
"traefik.docker.network": "portal_net",
|
|
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
|
|
f"traefik.http.routers.{router}.entrypoints": "websecure",
|
|
f"traefik.http.routers.{router}.tls": "true",
|
|
f"traefik.http.routers.{router}.priority": "9400",
|
|
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
|
|
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
|
|
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
|
|
"portal.pool": "1",
|
|
"portal.pool.slot": str(i),
|
|
}
|
|
env = {
|
|
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
|
|
"ENABLE_HEARTBEAT": "0",
|
|
"SESSION_ID": f"universal-{i}",
|
|
"X11VNC_FLAGS": X11VNC_FLAGS,
|
|
}
|
|
try:
|
|
c = d.containers.get(name)
|
|
if c.status != "running":
|
|
c.start()
|
|
continue
|
|
except docker.errors.NotFound:
|
|
pass
|
|
except Exception:
|
|
logger.exception("universal_pool_check_failed slot=%s", i)
|
|
continue
|
|
|
|
d.containers.run(
|
|
image=image,
|
|
name=name,
|
|
detach=True,
|
|
auto_remove=True,
|
|
network="portal_net",
|
|
labels=labels,
|
|
environment=env,
|
|
)
|
|
logger.info("universal_pool_container_started slot=%s", i)
|
|
|
|
|
|
def ensure_web_pool(target_size: Optional[int] = None) -> None:
|
|
desired = max(0, WEB_POOL_SIZE if target_size is None else target_size)
|
|
d = docker_client()
|
|
image = "portal-universal-runtime:latest"
|
|
|
|
for i in range(desired, 100):
|
|
name = web_pool_container_name(i)
|
|
try:
|
|
c = d.containers.get(name)
|
|
c.stop(timeout=5)
|
|
except docker.errors.NotFound:
|
|
break
|
|
except Exception:
|
|
logger.exception("web_pool_scale_down_failed slot=%s", i)
|
|
|
|
for i in range(desired):
|
|
name = web_pool_container_name(i)
|
|
path = f"/w/{i}/"
|
|
router = f"wpool-{i}"
|
|
labels = {
|
|
"traefik.enable": "true",
|
|
"traefik.docker.network": "portal_net",
|
|
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
|
|
f"traefik.http.routers.{router}.entrypoints": "websecure",
|
|
f"traefik.http.routers.{router}.tls": "true",
|
|
f"traefik.http.routers.{router}.priority": "9450",
|
|
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
|
|
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
|
|
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
|
|
"portal.pool": "1",
|
|
"portal.pool.kind": "web",
|
|
"portal.pool.slot": str(i),
|
|
}
|
|
env = {
|
|
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
|
|
"ENABLE_HEARTBEAT": "0",
|
|
"SESSION_ID": f"webpool-{i}",
|
|
"X11VNC_FLAGS": X11VNC_FLAGS,
|
|
}
|
|
should_create = False
|
|
try:
|
|
c = d.containers.get(name)
|
|
if c.status != "running":
|
|
try:
|
|
c.start()
|
|
except docker.errors.APIError as exc:
|
|
if _is_pool_name_conflict(exc):
|
|
logger.warning("web_pool_recreate_needed slot=%s reason=name-conflict", i)
|
|
_remove_container_by_name(d, name)
|
|
should_create = True
|
|
else:
|
|
raise
|
|
if not should_create:
|
|
continue
|
|
except docker.errors.NotFound:
|
|
should_create = True
|
|
except Exception:
|
|
logger.exception("web_pool_check_failed slot=%s", i)
|
|
continue
|
|
|
|
for attempt in range(3):
|
|
try:
|
|
d.containers.run(
|
|
image=image,
|
|
name=name,
|
|
detach=True,
|
|
auto_remove=True,
|
|
network="portal_net",
|
|
labels=labels,
|
|
environment=env,
|
|
)
|
|
logger.info("web_pool_container_started slot=%s", i)
|
|
break
|
|
except docker.errors.APIError as exc:
|
|
if _is_pool_name_conflict(exc) and attempt < 2:
|
|
logger.warning("web_pool_run_conflict_retry slot=%s attempt=%s", i, attempt + 1)
|
|
_remove_container_by_name(d, name)
|
|
time.sleep(0.25)
|
|
continue
|
|
logger.exception("web_pool_run_failed slot=%s", i)
|
|
break
|
|
|
|
|
|
def get_universal_pool_status() -> dict:
|
|
desired = max(0, UNIVERSAL_POOL_SIZE)
|
|
if desired <= 0:
|
|
return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []}
|
|
d = docker_client()
|
|
names = [universal_container_name(i) for i in range(desired)]
|
|
containers = []
|
|
for name in names:
|
|
try:
|
|
containers.append(d.containers.get(name))
|
|
except Exception:
|
|
continue
|
|
running = sum(1 for c in containers if c.status == "running")
|
|
health = "ok" if running >= min(desired, 1) else "down"
|
|
return {
|
|
"desired": desired,
|
|
"running": running,
|
|
"total": len(containers),
|
|
"names": sorted(c.name for c in containers),
|
|
"health": health,
|
|
}
|
|
|
|
|
|
def get_web_pool_status() -> dict:
|
|
desired = max(0, WEB_POOL_SIZE)
|
|
if desired <= 0:
|
|
return {"desired": 0, "running": 0, "total": 0, "health": "down", "names": []}
|
|
d = docker_client()
|
|
names = [web_pool_container_name(i) for i in range(desired)]
|
|
containers = []
|
|
for name in names:
|
|
try:
|
|
containers.append(d.containers.get(name))
|
|
except Exception:
|
|
continue
|
|
running = sum(1 for c in containers if c.status == "running")
|
|
health = "ok" if running >= min(desired, 1) else "down"
|
|
return {
|
|
"desired": desired,
|
|
"running": running,
|
|
"total": len(containers),
|
|
"names": sorted(c.name for c in containers),
|
|
"health": health,
|
|
}
|
|
|
|
|
|
def acquire_universal_slot(db: Session) -> int:
|
|
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
|
|
q = select(SessionModel).where(
|
|
SessionModel.status == SessionStatus.ACTIVE,
|
|
SessionModel.container_id.like("POOLIDX:%"),
|
|
SessionModel.last_access_at >= cutoff,
|
|
)
|
|
active = db.scalars(q).all()
|
|
busy = set()
|
|
for sess in active:
|
|
try:
|
|
busy.add(int((sess.container_id or "").split(":", 1)[1]))
|
|
except Exception:
|
|
continue
|
|
for i in range(max(0, UNIVERSAL_POOL_SIZE)):
|
|
if i not in busy:
|
|
return i
|
|
if active:
|
|
victim = min(active, key=lambda s: s.last_access_at)
|
|
victim.status = SessionStatus.TERMINATED
|
|
db.commit()
|
|
try:
|
|
return int((victim.container_id or "").split(":", 1)[1])
|
|
except Exception:
|
|
pass
|
|
return 0
|
|
|
|
|
|
def acquire_web_pool_slot(db: Session) -> int:
|
|
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
|
|
q = select(SessionModel).where(
|
|
SessionModel.status == SessionStatus.ACTIVE,
|
|
SessionModel.container_id.like("WEBPOOLIDX:%"),
|
|
SessionModel.last_access_at >= cutoff,
|
|
)
|
|
active = db.scalars(q).all()
|
|
busy = set()
|
|
for sess in active:
|
|
try:
|
|
busy.add(int((sess.container_id or "").split(":", 1)[1]))
|
|
except Exception:
|
|
continue
|
|
|
|
# Keep headroom: when active sessions are close to hot pool capacity,
|
|
# proactively warm up extra slots.
|
|
auto_target = max(WEB_POOL_SIZE, len(active) + max(0, WEB_POOL_BUFFER))
|
|
if auto_target > WEB_POOL_SIZE:
|
|
ensure_web_pool(auto_target)
|
|
|
|
for i in range(max(0, auto_target)):
|
|
if i not in busy:
|
|
return i
|
|
return 0
|
|
|
|
|
|
def sanitize_client_resolution(width: Optional[int], height: Optional[int]) -> tuple[Optional[int], Optional[int]]:
|
|
if width is None or height is None:
|
|
return None, None
|
|
clamped_width = max(WEB_RESOLUTION_MIN_WIDTH, min(int(width), WEB_RESOLUTION_MAX_WIDTH))
|
|
clamped_height = max(WEB_RESOLUTION_MIN_HEIGHT, min(int(height), WEB_RESOLUTION_MAX_HEIGHT))
|
|
return clamped_width, clamped_height
|
|
|
|
|
|
def dispatch_universal_target(slot: int, service: Service, width: Optional[int] = None, height: Optional[int] = None) -> None:
|
|
name = universal_container_name(slot)
|
|
url = ""
|
|
payload = {}
|
|
if service.type == ServiceType.WEB:
|
|
url = f"http://{name}:7000/open"
|
|
payload = {"url": normalize_web_target(service.target)}
|
|
width, height = sanitize_client_resolution(width, height)
|
|
if width and height:
|
|
payload["width"] = width
|
|
payload["height"] = height
|
|
elif service.type == ServiceType.RDP:
|
|
cfg = parse_rdp_target(service.target)
|
|
url = f"http://{name}:7000/rdp"
|
|
payload = {
|
|
"host": cfg["host"],
|
|
"port": cfg["port"],
|
|
"user": cfg["user"],
|
|
"password": cfg["password"],
|
|
"domain": cfg["domain"],
|
|
"security": cfg["security"],
|
|
}
|
|
else:
|
|
raise HTTPException(status_code=400, detail="Universal pool supports WEB/RDP only")
|
|
|
|
last_exc = None
|
|
for _ in range(max(1, POOL_DISPATCH_RETRIES)):
|
|
try:
|
|
resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS)
|
|
resp.raise_for_status()
|
|
return
|
|
except Exception as exc:
|
|
last_exc = exc
|
|
time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS))
|
|
if last_exc:
|
|
raise last_exc
|
|
|
|
|
|
def dispatch_web_pool_target(slot: int, service: Service, width: Optional[int] = None, height: Optional[int] = None) -> None:
|
|
name = web_pool_container_name(slot)
|
|
target_url = normalize_web_target(service.target)
|
|
url = f"http://{name}:7000/open"
|
|
payload = {"url": target_url}
|
|
width, height = sanitize_client_resolution(width, height)
|
|
if width and height:
|
|
payload["width"] = width
|
|
payload["height"] = height
|
|
last_exc = None
|
|
for _ in range(max(1, POOL_DISPATCH_RETRIES)):
|
|
try:
|
|
resp = requests.post(url, json=payload, timeout=POOL_DISPATCH_REQUEST_TIMEOUT_SECONDS)
|
|
resp.raise_for_status()
|
|
return
|
|
except Exception as exc:
|
|
last_exc = exc
|
|
time.sleep(max(0.0, POOL_DISPATCH_SLEEP_SECONDS))
|
|
if last_exc:
|
|
raise last_exc
|
|
|
|
|
|
def create_runtime_container(service: Service, session_id: str):
|
|
d = docker_client()
|
|
router = session_router_name(session_id)
|
|
path = f"/s/{session_id}/"
|
|
labels = {
|
|
"traefik.enable": "true",
|
|
"traefik.docker.network": "portal_net",
|
|
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
|
|
f"traefik.http.routers.{router}.entrypoints": "websecure",
|
|
f"traefik.http.routers.{router}.tls": "true",
|
|
f"traefik.http.routers.{router}.priority": "10000",
|
|
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
|
|
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
|
|
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
|
|
}
|
|
|
|
env = {
|
|
"SESSION_ID": session_id,
|
|
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
|
|
"ENABLE_HEARTBEAT": "1",
|
|
"TOUCH_PATH": f"/api/sessions/{session_id}/touch",
|
|
"X11VNC_FLAGS": X11VNC_FLAGS,
|
|
}
|
|
image = "portal-kiosk:latest"
|
|
|
|
if service.type == ServiceType.WEB:
|
|
env["TARGET_URL"] = service.target
|
|
env["HOME_URL"] = f"https://{PUBLIC_HOST}/"
|
|
elif service.type == ServiceType.RDP:
|
|
image = "portal-rdp-proxy:latest"
|
|
cfg = parse_rdp_target(service.target)
|
|
env["RDP_HOST"] = cfg["host"]
|
|
env["RDP_PORT"] = cfg["port"]
|
|
if cfg["user"]:
|
|
env["RDP_USER"] = cfg["user"]
|
|
if cfg["password"]:
|
|
env["RDP_PASSWORD"] = cfg["password"]
|
|
if cfg["domain"]:
|
|
env["RDP_DOMAIN"] = cfg["domain"]
|
|
if cfg["security"]:
|
|
env["RDP_SECURITY"] = cfg["security"]
|
|
else:
|
|
raise HTTPException(status_code=400, detail="Unsupported service type")
|
|
|
|
container = d.containers.run(
|
|
image=image,
|
|
name=f"portal-sess-{session_id[:8]}",
|
|
detach=True,
|
|
auto_remove=True,
|
|
network="portal_net",
|
|
labels=labels,
|
|
environment=env,
|
|
)
|
|
logger.info("session_container_started session_id=%s container_id=%s service_type=%s", session_id, container.id, service.type.value)
|
|
return container.id
|
|
|
|
|
|
def ensure_warm_pool(service: Service, pool_size: Optional[int] = None) -> None:
|
|
if service_uses_universal_pool(service):
|
|
return
|
|
if pool_size is None:
|
|
pool_size = desired_pool_size(service)
|
|
if pool_size <= 0:
|
|
# Stop stale warm containers for this service when pool is disabled.
|
|
prefix = f"portal-warm-{service.slug}-"
|
|
try:
|
|
d = docker_client()
|
|
for c in d.containers.list(all=True, filters={"name": prefix}):
|
|
if c.name.startswith(prefix):
|
|
c.stop(timeout=5)
|
|
except Exception:
|
|
logger.exception("warm_pool_disable_failed service=%s", service.slug)
|
|
return
|
|
d = docker_client()
|
|
router = f"warm-{service.slug}"
|
|
svc_name = f"warmsvc-{service.slug}"
|
|
path = f"/svc/{service.slug}/"
|
|
image = "portal-kiosk:latest"
|
|
base_env = {
|
|
"IDLE_TIMEOUT": str(SESSION_IDLE_SECONDS),
|
|
"ENABLE_HEARTBEAT": "0",
|
|
"TOUCH_PATH": "",
|
|
"X11VNC_FLAGS": X11VNC_FLAGS,
|
|
}
|
|
if service.type == ServiceType.WEB:
|
|
base_env["UNIVERSAL_WEB"] = "1"
|
|
base_env["START_URL"] = normalize_web_target(service.target)
|
|
base_env["HOME_URL"] = f"https://{PUBLIC_HOST}/"
|
|
elif service.type == ServiceType.RDP:
|
|
image = "portal-rdp-proxy:latest"
|
|
cfg = parse_rdp_target(service.target)
|
|
base_env["RDP_HOST"] = cfg["host"]
|
|
base_env["RDP_PORT"] = cfg["port"]
|
|
if cfg["user"]:
|
|
base_env["RDP_USER"] = cfg["user"]
|
|
if cfg["password"]:
|
|
base_env["RDP_PASSWORD"] = cfg["password"]
|
|
if cfg["domain"]:
|
|
base_env["RDP_DOMAIN"] = cfg["domain"]
|
|
if cfg["security"]:
|
|
base_env["RDP_SECURITY"] = cfg["security"]
|
|
else:
|
|
raise HTTPException(status_code=400, detail="Unsupported service type")
|
|
|
|
labels = {
|
|
"traefik.enable": "true",
|
|
"traefik.docker.network": "portal_net",
|
|
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
|
|
f"traefik.http.routers.{router}.entrypoints": "websecure",
|
|
f"traefik.http.routers.{router}.tls": "true",
|
|
f"traefik.http.routers.{router}.priority": "9500",
|
|
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
|
|
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
|
|
f"traefik.http.services.{svc_name}.loadbalancer.server.port": "6080",
|
|
f"traefik.http.routers.{router}.service": svc_name,
|
|
"portal.warm": "1",
|
|
"portal.service.slug": service.slug,
|
|
"portal.service.type": service.type.value,
|
|
}
|
|
|
|
# Ensure desired cardinality.
|
|
for i in range(pool_size, 50):
|
|
name = f"portal-warm-{service.slug}-{i}"
|
|
try:
|
|
c = d.containers.get(name)
|
|
c.stop(timeout=5)
|
|
except docker.errors.NotFound:
|
|
break
|
|
except Exception:
|
|
logger.exception("warm_pool_scale_down_failed service=%s idx=%s", service.slug, i)
|
|
|
|
for i in range(pool_size):
|
|
name = f"portal-warm-{service.slug}-{i}"
|
|
try:
|
|
c = d.containers.get(name)
|
|
if c.status != "running":
|
|
c.start()
|
|
continue
|
|
except docker.errors.NotFound:
|
|
pass
|
|
except Exception:
|
|
logger.exception("warm_pool_check_failed service=%s idx=%s", service.slug, i)
|
|
continue
|
|
|
|
env = dict(base_env)
|
|
env["SESSION_ID"] = f"warm-{service.slug}-{i}"
|
|
d.containers.run(
|
|
image=image,
|
|
name=name,
|
|
detach=True,
|
|
auto_remove=True,
|
|
network="portal_net",
|
|
labels=labels,
|
|
environment=env,
|
|
)
|
|
logger.info("warm_pool_container_started service=%s idx=%s", service.slug, i)
|
|
|
|
|
|
def wait_for_session_route(session_id: str, timeout_seconds: int = 6) -> bool:
|
|
target = f"{TRAEFIK_INTERNAL_URL}/s/{session_id}/"
|
|
deadline = time.time() + timeout_seconds
|
|
while time.time() < deadline:
|
|
try:
|
|
resp = requests.get(
|
|
target,
|
|
headers={"Host": PUBLIC_HOST},
|
|
allow_redirects=False,
|
|
timeout=1.5,
|
|
)
|
|
if resp.status_code != 404:
|
|
return True
|
|
except Exception:
|
|
pass
|
|
time.sleep(0.3)
|
|
return False
|
|
|
|
|
|
def route_ready(path: str) -> bool:
|
|
bases = [TRAEFIK_INTERNAL_URL]
|
|
if TRAEFIK_INTERNAL_URL.startswith("http://"):
|
|
bases.append("https://" + TRAEFIK_INTERNAL_URL[len("http://"):])
|
|
for base in bases:
|
|
try:
|
|
verify = not base.startswith("https://")
|
|
resp = requests.get(
|
|
f"{base}{path}",
|
|
headers={"Host": PUBLIC_HOST},
|
|
allow_redirects=False,
|
|
timeout=1.5,
|
|
verify=verify,
|
|
)
|
|
if resp.status_code != 404:
|
|
return True
|
|
except Exception:
|
|
continue
|
|
return False
|
|
|
|
|
|
def container_running(container_id: Optional[str]) -> bool:
|
|
if not container_id:
|
|
return False
|
|
if (
|
|
container_id.startswith("POOL:")
|
|
or container_id.startswith("POOLIDX:")
|
|
or container_id.startswith("WEBPOOLIDX:")
|
|
):
|
|
return True
|
|
try:
|
|
c = docker_client().containers.get(container_id)
|
|
return c.status == "running"
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def stop_runtime_container(container_id: Optional[str]) -> None:
|
|
if not container_id:
|
|
return
|
|
try:
|
|
d = docker_client()
|
|
c = d.containers.get(container_id)
|
|
c.stop(timeout=5)
|
|
except Exception:
|
|
logger.exception("session_container_stop_failed container_id=%s", container_id)
|
|
|
|
|
|
def terminate_session_record(
|
|
db: Session,
|
|
sess: SessionModel,
|
|
new_status: SessionStatus = SessionStatus.TERMINATED,
|
|
*,
|
|
stop_container: bool = True,
|
|
) -> None:
|
|
if not sess or sess.status != SessionStatus.ACTIVE:
|
|
return
|
|
old_status = sess.status
|
|
cid = sess.container_id or ""
|
|
if stop_container and cid and not cid.startswith(("POOL:", "POOLIDX:", "WEBPOOLIDX:")):
|
|
stop_runtime_container(cid)
|
|
sess.status = new_status
|
|
sess.last_access_at = now_utc()
|
|
log_event(
|
|
"session_closed",
|
|
level=logging.INFO,
|
|
session_id=sess.id,
|
|
user_id=sess.user_id,
|
|
service_id=sess.service_id,
|
|
container_id=cid,
|
|
old_status=old_status.value if isinstance(old_status, SessionStatus) else str(old_status),
|
|
new_status=new_status.value,
|
|
reason=session_closed_reason(sess, db),
|
|
stop_container=stop_container,
|
|
)
|
|
|
|
|
|
def ensure_schema_compatibility() -> None:
|
|
# PostgreSQL requires enum value addition to be committed before usage in constraints.
|
|
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
|
|
conn.execute(
|
|
text(
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
BEGIN
|
|
ALTER TYPE servicetype ADD VALUE IF NOT EXISTS 'RDP';
|
|
EXCEPTION WHEN undefined_object THEN
|
|
NULL;
|
|
END;
|
|
END $$;
|
|
"""
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"""
|
|
DO $$
|
|
BEGIN
|
|
BEGIN
|
|
ALTER TYPE sessionstatus ADD VALUE IF NOT EXISTS 'ROTATED';
|
|
EXCEPTION WHEN undefined_object THEN
|
|
NULL;
|
|
END;
|
|
END $$;
|
|
"""
|
|
)
|
|
)
|
|
|
|
with engine.begin() as conn:
|
|
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS warm_pool_size INTEGER NOT NULL DEFAULT 0"))
|
|
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS comment TEXT NOT NULL DEFAULT ''"))
|
|
conn.execute(text("ALTER TABLE services ADD COLUMN IF NOT EXISTS icon_path TEXT NOT NULL DEFAULT ''"))
|
|
conn.execute(
|
|
text(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS categories (
|
|
id SERIAL PRIMARY KEY,
|
|
name VARCHAR(128) NOT NULL UNIQUE,
|
|
slug VARCHAR(64) NOT NULL UNIQUE,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
)
|
|
"""
|
|
)
|
|
)
|
|
conn.execute(
|
|
text(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS service_categories (
|
|
id SERIAL PRIMARY KEY,
|
|
service_id INT NOT NULL REFERENCES services(id) ON DELETE CASCADE,
|
|
category_id INT NOT NULL REFERENCES categories(id) ON DELETE CASCADE,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
UNIQUE (service_id, category_id)
|
|
)
|
|
"""
|
|
)
|
|
)
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_service_id ON service_categories(service_id)"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_service_categories_category_id ON service_categories(category_id)"))
|
|
# Handle installs where service type is VARCHAR + CHECK.
|
|
conn.execute(
|
|
text(
|
|
"""
|
|
DO $$
|
|
DECLARE c record;
|
|
BEGIN
|
|
FOR c IN
|
|
SELECT conname
|
|
FROM pg_constraint
|
|
WHERE conrelid = 'services'::regclass
|
|
AND contype = 'c'
|
|
AND pg_get_constraintdef(oid) ILIKE '%type%'
|
|
LOOP
|
|
EXECUTE format('ALTER TABLE services DROP CONSTRAINT %I', c.conname);
|
|
END LOOP;
|
|
ALTER TABLE services
|
|
ADD CONSTRAINT services_type_check
|
|
CHECK (type IN ('WEB','VNC','RDP'));
|
|
EXCEPTION WHEN duplicate_object THEN
|
|
NULL;
|
|
END $$;
|
|
"""
|
|
)
|
|
)
|
|
|
|
|
|
def desired_pool_size(service: Service) -> int:
|
|
if not service.active:
|
|
return 0
|
|
if service.type == ServiceType.RDP and not service_uses_universal_pool(service):
|
|
# RDP runs on-demand per user session; no prewarmed pool.
|
|
return 0
|
|
if service_uses_universal_pool(service):
|
|
return UNIVERSAL_POOL_SIZE
|
|
return service.warm_pool_size if service.warm_pool_size and service.warm_pool_size > 0 else PREWARM_POOL_SIZE
|
|
|
|
|
|
def get_warm_containers_for_service(service: Service) -> list:
|
|
prefix = f"portal-warm-{service.slug}-"
|
|
try:
|
|
d = docker_client()
|
|
containers = []
|
|
for c in d.containers.list(all=True, filters={"name": prefix}):
|
|
if c.name.startswith(prefix):
|
|
containers.append(c)
|
|
return containers
|
|
except Exception:
|
|
logger.exception("pool_status_failed service=%s", service.slug)
|
|
return []
|
|
|
|
|
|
def get_pool_status_for_service(service: Service) -> dict:
|
|
if service.type == ServiceType.WEB:
|
|
return get_web_pool_status()
|
|
if service.type == ServiceType.RDP and not service_uses_universal_pool(service):
|
|
return {"desired": 0, "running": 0, "total": 0, "names": [], "health": "n/a"}
|
|
if service_uses_universal_pool(service):
|
|
return get_universal_pool_status()
|
|
desired = desired_pool_size(service)
|
|
containers = get_warm_containers_for_service(service)
|
|
running = sum(1 for c in containers if c.status == "running")
|
|
states = [(c.attrs.get("State") or {}).get("Status", c.status) for c in containers]
|
|
has_bad = any(s in {"exited", "dead"} for s in states)
|
|
total = len(containers)
|
|
if running == 0:
|
|
health = "down"
|
|
elif running >= min(desired, 1) and not has_bad:
|
|
health = "ok"
|
|
else:
|
|
health = "degraded"
|
|
return {
|
|
"desired": desired,
|
|
"running": running,
|
|
"total": total,
|
|
"names": sorted(c.name for c in containers),
|
|
"health": health,
|
|
}
|
|
|
|
|
|
def get_pool_detailed_status(service: Service) -> dict:
|
|
if service.type == ServiceType.WEB:
|
|
d = docker_client()
|
|
pool = get_web_pool_status()
|
|
details = []
|
|
for i in range(max(0, pool["desired"])):
|
|
name = web_pool_container_name(i)
|
|
try:
|
|
c = d.containers.get(name)
|
|
except Exception:
|
|
continue
|
|
attrs = c.attrs or {}
|
|
state = (attrs.get("State") or {}).get("Status", c.status)
|
|
details.append(
|
|
{
|
|
"name": c.name,
|
|
"status": c.status,
|
|
"state": state,
|
|
"created": attrs.get("Created", ""),
|
|
"image": c.image.tags[0] if c.image.tags else "",
|
|
"labels_ok": True,
|
|
}
|
|
)
|
|
return {
|
|
"service_id": service.id,
|
|
"slug": service.slug,
|
|
"type": service.type.value,
|
|
"desired": pool["desired"],
|
|
"running": pool["running"],
|
|
"total": pool["total"],
|
|
"health": pool["health"],
|
|
"containers": details,
|
|
"updated_at": now_utc().isoformat(),
|
|
}
|
|
if service_uses_universal_pool(service):
|
|
d = docker_client()
|
|
pool = get_universal_pool_status()
|
|
details = []
|
|
for i in range(max(0, UNIVERSAL_POOL_SIZE)):
|
|
name = universal_container_name(i)
|
|
try:
|
|
c = d.containers.get(name)
|
|
except Exception:
|
|
continue
|
|
attrs = c.attrs or {}
|
|
state = (attrs.get("State") or {}).get("Status", c.status)
|
|
details.append(
|
|
{
|
|
"name": c.name,
|
|
"status": c.status,
|
|
"state": state,
|
|
"created": attrs.get("Created", ""),
|
|
"image": c.image.tags[0] if c.image.tags else "",
|
|
"labels_ok": True,
|
|
}
|
|
)
|
|
return {
|
|
"service_id": service.id,
|
|
"slug": service.slug,
|
|
"type": service.type.value,
|
|
"desired": pool["desired"],
|
|
"running": pool["running"],
|
|
"total": pool["total"],
|
|
"health": pool["health"],
|
|
"containers": details,
|
|
"updated_at": now_utc().isoformat(),
|
|
}
|
|
containers = get_warm_containers_for_service(service)
|
|
pool = get_pool_status_for_service(service)
|
|
details = []
|
|
for c in sorted(containers, key=lambda x: x.name):
|
|
attrs = c.attrs or {}
|
|
state = (attrs.get("State") or {}).get("Status", c.status)
|
|
created = attrs.get("Created", "")
|
|
labels = attrs.get("Config", {}).get("Labels", {}) or {}
|
|
labels_ok = (
|
|
labels.get("portal.warm") == "1"
|
|
and labels.get("portal.service.slug") == service.slug
|
|
and labels.get("portal.service.type") == service.type.value
|
|
)
|
|
details.append(
|
|
{
|
|
"name": c.name,
|
|
"status": c.status,
|
|
"state": state,
|
|
"created": created,
|
|
"image": c.image.tags[0] if c.image.tags else "",
|
|
"labels_ok": labels_ok,
|
|
}
|
|
)
|
|
return {
|
|
"service_id": service.id,
|
|
"slug": service.slug,
|
|
"type": service.type.value,
|
|
"desired": pool["desired"],
|
|
"running": pool["running"],
|
|
"total": pool["total"],
|
|
"health": pool["health"],
|
|
"containers": details,
|
|
"updated_at": now_utc().isoformat(),
|
|
}
|
|
|
|
|
|
def get_active_sessions_count(db: Session, service_id: int) -> int:
|
|
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
|
|
q = select(SessionModel).where(
|
|
SessionModel.service_id == service_id,
|
|
SessionModel.status == SessionStatus.ACTIVE,
|
|
SessionModel.last_access_at >= cutoff,
|
|
)
|
|
sessions = db.scalars(q).all()
|
|
# Avoid inflated stats when pooled slot sessions were duplicated by race:
|
|
# for pooled sessions, occupancy is unique container_id.
|
|
pooled = [s for s in sessions if (s.container_id or "").startswith(("WEBPOOLIDX:", "POOLIDX:", "POOL:"))]
|
|
direct = [s for s in sessions if s not in pooled]
|
|
unique_pooled = len({s.container_id for s in pooled if s.container_id})
|
|
return unique_pooled + len(direct)
|
|
|
|
|
|
def find_active_session_for_service(db: Session, service_id: int) -> Optional[SessionModel]:
|
|
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
|
|
q = (
|
|
select(SessionModel)
|
|
.where(
|
|
SessionModel.service_id == service_id,
|
|
SessionModel.status == SessionStatus.ACTIVE,
|
|
SessionModel.last_access_at >= cutoff,
|
|
)
|
|
.order_by(SessionModel.created_at.desc())
|
|
)
|
|
return db.scalars(q).first()
|
|
|
|
|
|
def find_active_session_for_user_service(db: Session, user_id: int, service_id: int) -> Optional[SessionModel]:
|
|
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
|
|
q = (
|
|
select(SessionModel)
|
|
.where(
|
|
SessionModel.user_id == user_id,
|
|
SessionModel.service_id == service_id,
|
|
SessionModel.status == SessionStatus.ACTIVE,
|
|
SessionModel.last_access_at >= cutoff,
|
|
)
|
|
.order_by(SessionModel.created_at.desc())
|
|
)
|
|
return db.scalars(q).first()
|
|
|
|
|
|
class LockTimeoutError(Exception):
|
|
pass
|
|
|
|
|
|
def allocator_lock(db: Session, lock_id: int, timeout_seconds: Optional[float] = None, poll_seconds: float = 0.05):
|
|
class _LockCtx:
|
|
def __enter__(self_nonlocal):
|
|
self_nonlocal._acquired = False
|
|
if timeout_seconds is None:
|
|
db.execute(text("SELECT pg_advisory_xact_lock(:lid)"), {"lid": lock_id})
|
|
self_nonlocal._acquired = True
|
|
return self_nonlocal
|
|
|
|
deadline = time.monotonic() + max(0.0, timeout_seconds)
|
|
while time.monotonic() <= deadline:
|
|
got = db.execute(text("SELECT pg_try_advisory_xact_lock(:lid)"), {"lid": lock_id}).scalar()
|
|
if got:
|
|
self_nonlocal._acquired = True
|
|
return self_nonlocal
|
|
time.sleep(max(0.01, poll_seconds))
|
|
raise LockTimeoutError(f"advisory lock timeout lock_id={lock_id} timeout={timeout_seconds}")
|
|
|
|
return self_nonlocal
|
|
|
|
def __exit__(self_nonlocal, exc_type, exc, tb):
|
|
return False
|
|
|
|
return _LockCtx()
|
|
|
|
|
|
def terminate_active_slot_sessions(db: Session, container_id: str) -> None:
|
|
if not container_id:
|
|
return
|
|
db.execute(
|
|
text(
|
|
"""
|
|
UPDATE sessions
|
|
SET status = 'TERMINATED'
|
|
WHERE container_id = :cid
|
|
AND status = 'ACTIVE'
|
|
"""
|
|
),
|
|
{"cid": container_id},
|
|
)
|
|
|
|
|
|
def session_redirect_url(sess: SessionModel) -> str:
|
|
cid = sess.container_id or ""
|
|
if cid.startswith("POOL:") or cid.startswith("POOLIDX:") or cid.startswith("WEBPOOLIDX:"):
|
|
return f"/s/{sess.id}/view"
|
|
return f"/s/{sess.id}/"
|
|
|
|
|
|
def open_warm_web_url(service: Service, target_url: str) -> None:
|
|
if service_uses_universal_pool(service):
|
|
return
|
|
if service.type != ServiceType.WEB:
|
|
return
|
|
target_url = normalize_web_target(target_url)
|
|
try:
|
|
d = docker_client()
|
|
containers = d.containers.list(
|
|
filters={
|
|
"label": [
|
|
"portal.warm=1",
|
|
f"portal.service.slug={service.slug}",
|
|
"portal.service.type=WEB",
|
|
]
|
|
}
|
|
)
|
|
for c in containers:
|
|
try:
|
|
resp = requests.post(
|
|
f"http://{c.name}:7000/open",
|
|
json={"url": target_url},
|
|
timeout=2,
|
|
)
|
|
resp.raise_for_status()
|
|
logger.info("warm_web_open_ok service=%s container=%s url=%s", service.slug, c.name, target_url)
|
|
except Exception:
|
|
logger.exception("warm_web_open_failed service=%s container=%s", service.slug, c.name)
|
|
except Exception:
|
|
logger.exception("warm_web_open_dispatch_failed service=%s", service.slug)
|
|
|
|
|
|
def cleanup_loop():
|
|
while True:
|
|
time.sleep(60)
|
|
db = SessionLocal()
|
|
try:
|
|
ensure_universal_pool()
|
|
ensure_web_pool()
|
|
for svc in db.scalars(
|
|
select(Service).where(
|
|
Service.active == True,
|
|
Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
|
|
)
|
|
).all():
|
|
if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
|
|
ensure_warm_pool(svc)
|
|
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
|
|
q = select(SessionModel).where(
|
|
SessionModel.status == SessionStatus.ACTIVE,
|
|
SessionModel.last_access_at < cutoff,
|
|
)
|
|
stale = db.scalars(q).all()
|
|
for sess in stale:
|
|
if sess.container_id and not (
|
|
sess.container_id.startswith("POOL:")
|
|
or sess.container_id.startswith("POOLIDX:")
|
|
or sess.container_id.startswith("WEBPOOLIDX:")
|
|
):
|
|
stop_runtime_container(sess.container_id)
|
|
sess.status = SessionStatus.EXPIRED
|
|
if stale:
|
|
db.commit()
|
|
except Exception:
|
|
db.rollback()
|
|
logger.exception("cleanup_loop_failed")
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def bootstrap_admin():
|
|
admin_user = os.getenv("ADMIN_USERNAME", "admin")
|
|
admin_password = os.getenv("ADMIN_PASSWORD", "change_me")
|
|
ttl_days = int(os.getenv("ADMIN_TTL_DAYS", "3650"))
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
existing = db.scalar(select(User).where(User.username == admin_user))
|
|
if not existing:
|
|
db.add(
|
|
User(
|
|
username=admin_user,
|
|
password_hash=hash_password(admin_password),
|
|
active=True,
|
|
is_admin=True,
|
|
expires_at=now_utc() + dt.timedelta(days=ttl_days),
|
|
)
|
|
)
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def try_acquire_maintenance_leader() -> bool:
|
|
global maintenance_lock_file
|
|
if maintenance_lock_file is not None:
|
|
return True
|
|
lock_file = open("/tmp/portal-maintenance.lock", "w")
|
|
try:
|
|
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
except BlockingIOError:
|
|
lock_file.close()
|
|
return False
|
|
maintenance_lock_file = lock_file
|
|
return True
|
|
|
|
|
|
|
|
def run_maintenance_service() -> None:
|
|
logger.info("maintenance_service_bootstrap_started")
|
|
with open("/tmp/portal-schema.lock", "w") as lock_file:
|
|
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
|
|
Base.metadata.create_all(bind=engine)
|
|
ensure_schema_compatibility()
|
|
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
|
|
|
|
ensure_icons_dir()
|
|
bootstrap_admin()
|
|
|
|
maintenance_lock = open("/tmp/portal-maintenance.lock", "w")
|
|
fcntl.flock(maintenance_lock.fileno(), fcntl.LOCK_EX)
|
|
logger.info("maintenance_service_leader_acquired")
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
ensure_universal_pool()
|
|
ensure_web_pool()
|
|
for svc in db.scalars(
|
|
select(Service).where(
|
|
Service.active == True,
|
|
Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
|
|
)
|
|
).all():
|
|
if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
|
|
ensure_warm_pool(svc)
|
|
finally:
|
|
db.close()
|
|
|
|
logger.info("maintenance_service_loop_started")
|
|
cleanup_loop()
|
|
|
|
@app.on_event("startup")
|
|
def startup_event():
|
|
# Multiple uvicorn workers run startup in parallel. Serialize schema bootstrap
|
|
# to avoid DDL races on first run and during schema extension.
|
|
with open("/tmp/portal-schema.lock", "w") as lock_file:
|
|
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
|
|
Base.metadata.create_all(bind=engine)
|
|
ensure_schema_compatibility()
|
|
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
|
|
ensure_icons_dir()
|
|
bootstrap_admin()
|
|
if not ENABLE_STARTUP_MAINTENANCE:
|
|
logger.info("startup_maintenance_disabled")
|
|
return
|
|
if not try_acquire_maintenance_leader():
|
|
logger.info("maintenance_leader_skipped")
|
|
return
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
ensure_universal_pool()
|
|
ensure_web_pool()
|
|
for svc in db.scalars(
|
|
select(Service).where(
|
|
Service.active == True,
|
|
Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
|
|
)
|
|
).all():
|
|
if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
|
|
ensure_warm_pool(svc)
|
|
finally:
|
|
db.close()
|
|
|
|
thread = threading.Thread(target=cleanup_loop, daemon=True)
|
|
thread.start()
|
|
logger.info("maintenance_leader_started")
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def index(request: Request, user: Optional[User] = Depends(get_current_user), db: Session = Depends(get_db)):
|
|
session_closed = (request.query_params.get("session_closed") or "").strip().lower()
|
|
launch_error = (request.query_params.get("launch_error") or "").strip().lower()
|
|
session_notice = ""
|
|
if session_closed == "idle":
|
|
session_notice = "Сессия была закрыта из-за простоя. Откройте сервис заново."
|
|
elif session_closed == "limit":
|
|
session_notice = (
|
|
f"Сессия была закрыта из-за лимита в {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). "
|
|
"Освободите один сервис и попробуйте снова."
|
|
)
|
|
elif launch_error == "max_services":
|
|
session_notice = (
|
|
f"Есть ограничение на {MAX_ACTIVE_SERVICES_PER_USER} сервиса(ов). "
|
|
"Освободите один сервис и попробуйте снова."
|
|
)
|
|
if not user:
|
|
csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
|
|
response = templates.TemplateResponse(
|
|
"login.html",
|
|
{
|
|
"request": request,
|
|
"csrf_token": csrf,
|
|
"login_error": "",
|
|
"session_notice": session_notice,
|
|
},
|
|
)
|
|
response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/")
|
|
return response
|
|
|
|
services = db.scalars(
|
|
select(Service)
|
|
.join(UserServiceAccess, UserServiceAccess.service_id == Service.id)
|
|
.where(
|
|
UserServiceAccess.user_id == user.id,
|
|
Service.active == True,
|
|
Service.type.in_([ServiceType.WEB, ServiceType.RDP]),
|
|
)
|
|
.order_by(Service.name)
|
|
).all()
|
|
|
|
service_categories = {svc.id: [] for svc in services}
|
|
categories = []
|
|
if services:
|
|
service_ids = [svc.id for svc in services]
|
|
rows = db.execute(
|
|
select(ServiceCategory.service_id, Category.id, Category.name, Category.slug)
|
|
.join(Category, Category.id == ServiceCategory.category_id)
|
|
.where(ServiceCategory.service_id.in_(service_ids))
|
|
.order_by(Category.name)
|
|
).all()
|
|
category_map = {}
|
|
for service_id, category_id, category_name, category_slug in rows:
|
|
service_categories.setdefault(service_id, []).append(
|
|
{
|
|
"id": category_id,
|
|
"name": category_name,
|
|
"slug": category_slug,
|
|
}
|
|
)
|
|
if category_id not in category_map:
|
|
category_map[category_id] = {"id": category_id, "name": category_name, "slug": category_slug}
|
|
categories = sorted(category_map.values(), key=lambda x: x["name"].lower())
|
|
|
|
selected_category_slug = (request.query_params.get("category") or "").strip().lower()
|
|
if selected_category_slug:
|
|
services = [
|
|
svc for svc in services
|
|
if any(cat["slug"] == selected_category_slug for cat in service_categories.get(svc.id, []))
|
|
]
|
|
service_comment_html = {svc.id: format_service_comment(svc.comment) for svc in services}
|
|
|
|
return templates.TemplateResponse(
|
|
"dashboard.html",
|
|
{
|
|
"request": request,
|
|
"user": user,
|
|
"services": services,
|
|
"categories": categories,
|
|
"selected_category_slug": selected_category_slug,
|
|
"service_categories": service_categories,
|
|
"service_comment_html": service_comment_html,
|
|
"csrf_token": request.cookies.get(CSRF_COOKIE, ""),
|
|
"session_notice": session_notice,
|
|
},
|
|
)
|
|
|
|
|
|
@app.get("/admin", response_class=HTMLResponse)
|
|
def admin_page(request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
users = db.scalars(select(User).order_by(User.id)).all()
|
|
categories = db.scalars(select(Category).order_by(Category.name)).all()
|
|
services = db.scalars(select(Service).where(Service.type.in_([ServiceType.WEB, ServiceType.RDP])).order_by(Service.id)).all()
|
|
web_services = [s for s in services if s.type == ServiceType.WEB]
|
|
rdp_services = [s for s in services if s.type == ServiceType.RDP]
|
|
service_category_map = {s.id: [] for s in services}
|
|
if services:
|
|
service_rows = db.execute(
|
|
select(ServiceCategory.service_id, ServiceCategory.category_id).where(
|
|
ServiceCategory.service_id.in_([s.id for s in services])
|
|
)
|
|
).all()
|
|
for service_id, category_id in service_rows:
|
|
service_category_map.setdefault(service_id, []).append(category_id)
|
|
acl_rows = db.scalars(select(UserServiceAccess)).all()
|
|
acl = {}
|
|
for row in acl_rows:
|
|
acl.setdefault(row.user_id, []).append(row.service_id)
|
|
for user_id in acl:
|
|
acl[user_id] = sorted(acl[user_id])
|
|
pool_status = {s.id: get_pool_status_for_service(s) for s in services}
|
|
service_health = {}
|
|
for sid, st in pool_status.items():
|
|
service_health[sid] = {
|
|
"health": st["health"],
|
|
"running": st["running"],
|
|
"desired": st["desired"],
|
|
"active_sessions": get_active_sessions_count(db, sid),
|
|
}
|
|
web_pool = get_web_pool_status()
|
|
web_totals = {
|
|
"services": len(web_services),
|
|
"running": web_pool["running"],
|
|
"desired": web_pool["desired"],
|
|
"active_sessions": sum(service_health[s.id]["active_sessions"] for s in web_services),
|
|
}
|
|
recent_sessions = db.execute(
|
|
text(
|
|
"""
|
|
SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug,
|
|
s.status, s.created_at, s.last_access_at
|
|
FROM sessions s
|
|
JOIN users u ON u.id = s.user_id
|
|
JOIN services sv ON sv.id = s.service_id
|
|
WHERE sv.type IN ('WEB','RDP')
|
|
ORDER BY s.created_at DESC
|
|
LIMIT 200
|
|
"""
|
|
)
|
|
).mappings().all()
|
|
open_stats = db.execute(
|
|
text(
|
|
"""
|
|
SELECT u.username, sv.name AS service_name, sv.slug AS service_slug, COUNT(*) AS opens
|
|
FROM sessions s
|
|
JOIN users u ON u.id = s.user_id
|
|
JOIN services sv ON sv.id = s.service_id
|
|
WHERE sv.type IN ('WEB','RDP')
|
|
GROUP BY u.username, sv.name, sv.slug
|
|
ORDER BY opens DESC, u.username ASC
|
|
LIMIT 200
|
|
"""
|
|
)
|
|
).mappings().all()
|
|
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
|
|
online_sessions = db.execute(
|
|
text(
|
|
"""
|
|
SELECT s.id, u.username, sv.name AS service_name, sv.slug AS service_slug,
|
|
sv.type AS service_type, s.container_id, s.created_at, s.last_access_at
|
|
FROM sessions s
|
|
JOIN users u ON u.id = s.user_id
|
|
JOIN services sv ON sv.id = s.service_id
|
|
WHERE s.status = 'ACTIVE'
|
|
AND s.last_access_at >= :cutoff
|
|
AND sv.type IN ('WEB','RDP')
|
|
ORDER BY s.last_access_at DESC, s.created_at DESC
|
|
LIMIT 500
|
|
"""
|
|
),
|
|
{"cutoff": cutoff},
|
|
).mappings().all()
|
|
rdp_occupied_by: dict[int, int] = {}
|
|
rdp_occupied_username: dict[int, str] = {}
|
|
rdp_ids = [s.id for s in rdp_services]
|
|
if rdp_ids:
|
|
rdp_acl_rows = db.execute(
|
|
select(UserServiceAccess.service_id, UserServiceAccess.user_id, User.username)
|
|
.join(User, User.id == UserServiceAccess.user_id)
|
|
.where(UserServiceAccess.service_id.in_(rdp_ids))
|
|
).all()
|
|
for row in rdp_acl_rows:
|
|
if row.service_id not in rdp_occupied_by:
|
|
rdp_occupied_by[row.service_id] = row.user_id
|
|
rdp_occupied_username[row.service_id] = row.username
|
|
return templates.TemplateResponse(
|
|
"admin.html",
|
|
{
|
|
"request": request,
|
|
"admin": admin,
|
|
"users": users,
|
|
"web_services": web_services,
|
|
"rdp_services": rdp_services,
|
|
"services": services,
|
|
"categories": categories,
|
|
"service_category_map": service_category_map,
|
|
"acl": acl,
|
|
"pool_status": pool_status,
|
|
"service_health": service_health,
|
|
"web_totals": web_totals,
|
|
"web_pool_size": WEB_POOL_SIZE,
|
|
"web_pool_buffer": WEB_POOL_BUFFER,
|
|
"recent_sessions": recent_sessions,
|
|
"open_stats": open_stats,
|
|
"online_sessions": online_sessions,
|
|
"csrf_token": request.cookies.get(CSRF_COOKIE, ""),
|
|
"max_active_services_per_user": MAX_ACTIVE_SERVICES_PER_USER,
|
|
"rdp_occupied_by": rdp_occupied_by,
|
|
"rdp_occupied_username": rdp_occupied_username,
|
|
},
|
|
)
|
|
|
|
|
|
@app.post("/login")
|
|
def login(
|
|
request: Request,
|
|
username: str = Form(...),
|
|
password: str = Form(...),
|
|
csrf_token: str = Form(...),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
cookie_csrf = request.cookies.get(CSRF_COOKIE)
|
|
if not cookie_csrf or csrf_token != cookie_csrf:
|
|
raise HTTPException(status_code=403, detail="CSRF failed")
|
|
|
|
user = db.scalar(select(User).where(User.username == username))
|
|
if not user or not verify_password(password, user.password_hash):
|
|
csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
|
|
response = templates.TemplateResponse(
|
|
"login.html",
|
|
{
|
|
"request": request,
|
|
"csrf_token": csrf,
|
|
"login_error": "Неверный логин или пароль",
|
|
"session_notice": "",
|
|
},
|
|
status_code=401,
|
|
)
|
|
response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/")
|
|
return response
|
|
if not user_is_valid(user):
|
|
csrf = request.cookies.get(CSRF_COOKIE) or secrets.token_urlsafe(24)
|
|
response = templates.TemplateResponse(
|
|
"login.html",
|
|
{
|
|
"request": request,
|
|
"csrf_token": csrf,
|
|
"login_error": "Доступ к сервису приостоновлен, обратитесь к вашему менеджеру",
|
|
},
|
|
status_code=403,
|
|
)
|
|
response.set_cookie(CSRF_COOKIE, csrf, httponly=False, secure=True, samesite="strict", path="/")
|
|
return response
|
|
|
|
response = RedirectResponse(url="/", status_code=303)
|
|
issue_auth_cookie(response, user)
|
|
issue_csrf_cookie(response)
|
|
audit(db, "LOGIN", f"login success: {username}", user_id=user.id)
|
|
return response
|
|
|
|
|
|
@app.post("/logout")
|
|
def logout(request: Request):
|
|
response = RedirectResponse(url="/", status_code=303)
|
|
response.delete_cookie(COOKIE_NAME, path="/")
|
|
response.delete_cookie(CSRF_COOKIE, path="/")
|
|
return response
|
|
|
|
|
|
@app.get("/go/{slug}")
|
|
def go_service(
|
|
slug: str,
|
|
sw: Optional[int] = Query(default=None, ge=320, le=7680),
|
|
sh: Optional[int] = Query(default=None, ge=240, le=4320),
|
|
user: User = Depends(require_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
total_started = time.perf_counter()
|
|
phase_ms = {}
|
|
|
|
def _mark(name: str, started: float) -> None:
|
|
phase_ms[name] = int((time.perf_counter() - started) * 1000)
|
|
|
|
def _emit(result: str, **extra) -> None:
|
|
payload = {
|
|
"user_id": user.id,
|
|
"service_slug": slug,
|
|
"result": result,
|
|
"total_ms": int((time.perf_counter() - total_started) * 1000),
|
|
}
|
|
payload.update(phase_ms)
|
|
payload.update(extra)
|
|
log_event("go_service_timing", **payload)
|
|
|
|
log_event("session_open_requested", user_id=user.id, service_slug=slug, sw=sw, sh=sh)
|
|
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
if service.type == ServiceType.VNC:
|
|
raise HTTPException(status_code=410, detail="VNC services are deprecated")
|
|
if not has_access(db, user.id, service.id):
|
|
raise HTTPException(status_code=403, detail="ACL denied")
|
|
|
|
client_width, client_height = sanitize_client_resolution(sw, sh)
|
|
log_event(
|
|
"session_open_resolution",
|
|
user_id=user.id,
|
|
service_slug=slug,
|
|
sw=sw,
|
|
sh=sh,
|
|
client_width=client_width,
|
|
client_height=client_height,
|
|
)
|
|
|
|
user_lock_started = time.perf_counter()
|
|
try:
|
|
with allocator_lock(db, 92000 + int(user.id), timeout_seconds=GO_USER_LOCK_TIMEOUT_SECONDS):
|
|
_mark("wait_user_lock_ms", user_lock_started)
|
|
|
|
t_existing = time.perf_counter()
|
|
existing_user_session = find_active_session_for_user_service(db, user.id, service.id)
|
|
_mark("check_existing_ms", t_existing)
|
|
if existing_user_session:
|
|
_emit("reuse_session", session_id=existing_user_session.id)
|
|
return RedirectResponse(url=session_redirect_url(existing_user_session), status_code=303)
|
|
|
|
t_limit = time.perf_counter()
|
|
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
|
|
active_rows = db.scalars(
|
|
select(SessionModel).where(
|
|
SessionModel.user_id == user.id,
|
|
SessionModel.status == SessionStatus.ACTIVE,
|
|
SessionModel.last_access_at >= cutoff,
|
|
)
|
|
).all()
|
|
active_rows = sorted(active_rows, key=lambda row: row.created_at)
|
|
active_service_ids = {row.service_id for row in active_rows}
|
|
_mark("check_limit_ms", t_limit)
|
|
if service.id not in active_service_ids and len(active_service_ids) >= MAX_ACTIVE_SERVICES_PER_USER:
|
|
oldest = next((row for row in active_rows if row.service_id != service.id), None)
|
|
if oldest:
|
|
t_rotate = time.perf_counter()
|
|
terminate_session_record(db, oldest, SessionStatus.ROTATED, stop_container=True)
|
|
db.commit()
|
|
_mark("rotate_oldest_ms", t_rotate)
|
|
log_event(
|
|
"session_rotated",
|
|
user_id=user.id,
|
|
closed_session_id=oldest.id,
|
|
closed_service_id=oldest.service_id,
|
|
new_service_id=service.id,
|
|
)
|
|
else:
|
|
_emit("max_services_redirect")
|
|
return RedirectResponse(url="/?launch_error=max_services", status_code=303)
|
|
|
|
if service.type == ServiceType.RDP:
|
|
t_rdp_owner = time.perf_counter()
|
|
active_owner = find_active_session_for_service(db, service.id)
|
|
_mark("check_rdp_owner_ms", t_rdp_owner)
|
|
if active_owner:
|
|
if active_owner.user_id != user.id:
|
|
owner = db.get(User, active_owner.user_id)
|
|
owner_name = owner.username if owner else f"id={active_owner.user_id}"
|
|
_emit("rdp_busy", owner=owner_name)
|
|
raise HTTPException(
|
|
status_code=409,
|
|
detail=f"RDP сервис уже занят пользователем {owner_name}. Попробуйте позже.",
|
|
)
|
|
_emit("reuse_rdp_session", session_id=active_owner.id)
|
|
return RedirectResponse(url=session_redirect_url(active_owner), status_code=303)
|
|
|
|
session_id = str(uuid.uuid4())
|
|
if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0:
|
|
try:
|
|
t_pool_lock = time.perf_counter()
|
|
with allocator_lock(db, 91001, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
|
|
_mark("wait_web_pool_lock_ms", t_pool_lock)
|
|
t_ensure = time.perf_counter()
|
|
ensure_web_pool()
|
|
_mark("ensure_web_pool_ms", t_ensure)
|
|
|
|
t_acquire = time.perf_counter()
|
|
slot = acquire_web_pool_slot(db)
|
|
_mark("acquire_web_slot_ms", t_acquire)
|
|
slot_cid = f"WEBPOOLIDX:{slot}"
|
|
|
|
t_dispatch = time.perf_counter()
|
|
terminate_active_slot_sessions(db, slot_cid)
|
|
dispatch_web_pool_target(slot, service, width=client_width, height=client_height)
|
|
_mark("dispatch_web_target_ms", t_dispatch)
|
|
|
|
t_commit = time.perf_counter()
|
|
session_obj = SessionModel(
|
|
id=session_id,
|
|
user_id=user.id,
|
|
service_id=service.id,
|
|
container_id=slot_cid,
|
|
status=SessionStatus.ACTIVE,
|
|
created_at=now_utc(),
|
|
last_access_at=now_utc(),
|
|
)
|
|
db.add(session_obj)
|
|
db.commit()
|
|
_mark("db_commit_ms", t_commit)
|
|
except LockTimeoutError:
|
|
_emit("web_pool_lock_timeout")
|
|
raise HTTPException(status_code=503, detail="Пул WEB занят. Повторите через несколько секунд.")
|
|
except Exception as exc:
|
|
logger.exception("web_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
|
|
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="web_pool", error=str(exc))
|
|
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
|
|
_emit("web_pool_create_failed", error=str(exc))
|
|
raise HTTPException(status_code=502, detail="WEB runtime failed to switch target")
|
|
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="web_pool", slot=slot)
|
|
audit(db, "SESSION_CREATE_WEB_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
|
|
_emit("session_created_web_pool", session_id=session_id, slot=slot)
|
|
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
|
|
|
|
if service_uses_universal_pool(service):
|
|
try:
|
|
t_pool_lock = time.perf_counter()
|
|
with allocator_lock(db, 91002, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
|
|
_mark("wait_universal_pool_lock_ms", t_pool_lock)
|
|
t_ensure = time.perf_counter()
|
|
ensure_universal_pool()
|
|
_mark("ensure_universal_pool_ms", t_ensure)
|
|
|
|
t_acquire = time.perf_counter()
|
|
slot = acquire_universal_slot(db)
|
|
_mark("acquire_universal_slot_ms", t_acquire)
|
|
slot_cid = f"POOLIDX:{slot}"
|
|
|
|
t_dispatch = time.perf_counter()
|
|
terminate_active_slot_sessions(db, slot_cid)
|
|
dispatch_universal_target(slot, service, width=client_width, height=client_height)
|
|
_mark("dispatch_universal_target_ms", t_dispatch)
|
|
|
|
t_commit = time.perf_counter()
|
|
session_obj = SessionModel(
|
|
id=session_id,
|
|
user_id=user.id,
|
|
service_id=service.id,
|
|
container_id=slot_cid,
|
|
status=SessionStatus.ACTIVE,
|
|
created_at=now_utc(),
|
|
last_access_at=now_utc(),
|
|
)
|
|
db.add(session_obj)
|
|
db.commit()
|
|
_mark("db_commit_ms", t_commit)
|
|
except LockTimeoutError:
|
|
_emit("universal_pool_lock_timeout")
|
|
raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.")
|
|
except Exception as exc:
|
|
logger.exception("universal_pool_dispatch_failed slug=%s user_id=%s", slug, user.id)
|
|
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="universal_pool", error=str(exc))
|
|
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
|
|
_emit("universal_pool_create_failed", error=str(exc))
|
|
raise HTTPException(status_code=502, detail="Universal runtime failed to switch target")
|
|
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="universal_pool", slot=slot)
|
|
audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id} slot={slot}", user_id=user.id)
|
|
_emit("session_created_universal_pool", session_id=session_id, slot=slot)
|
|
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
|
|
|
|
if service.type == ServiceType.WEB and desired_pool_size(service) > 0:
|
|
t_warm = time.perf_counter()
|
|
ensure_warm_pool(service)
|
|
open_warm_web_url(service, service.target)
|
|
_mark("warm_pool_prepare_ms", t_warm)
|
|
|
|
t_commit = time.perf_counter()
|
|
session_obj = SessionModel(
|
|
id=session_id,
|
|
user_id=user.id,
|
|
service_id=service.id,
|
|
container_id=f"POOL:{service.slug}",
|
|
status=SessionStatus.ACTIVE,
|
|
created_at=now_utc(),
|
|
last_access_at=now_utc(),
|
|
)
|
|
db.add(session_obj)
|
|
db.commit()
|
|
_mark("db_commit_ms", t_commit)
|
|
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="warm_pool")
|
|
audit(db, "SESSION_CREATE_POOL", f"service={service.slug} session={session_id}", user_id=user.id)
|
|
_emit("session_created_warm_pool", session_id=session_id)
|
|
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
|
|
|
|
try:
|
|
t_create = time.perf_counter()
|
|
container_id = create_runtime_container(service, session_id)
|
|
_mark("create_runtime_container_ms", t_create)
|
|
except Exception as exc:
|
|
logger.exception("session_container_create_failed slug=%s user_id=%s", slug, user.id)
|
|
log_event("session_create_failed", level=logging.ERROR, user_id=user.id, service_slug=slug, mode="single_runtime", error=str(exc))
|
|
audit(db, "SESSION_CREATE_FAILED", f"slug={slug} err={str(exc)}", user_id=user.id)
|
|
_emit("single_runtime_create_failed", error=str(exc))
|
|
raise HTTPException(status_code=502, detail="Session runtime failed to start")
|
|
|
|
t_commit = time.perf_counter()
|
|
session_obj = SessionModel(
|
|
id=session_id,
|
|
user_id=user.id,
|
|
service_id=service.id,
|
|
container_id=container_id,
|
|
status=SessionStatus.ACTIVE,
|
|
created_at=now_utc(),
|
|
last_access_at=now_utc(),
|
|
)
|
|
db.add(session_obj)
|
|
db.commit()
|
|
_mark("db_commit_ms", t_commit)
|
|
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="single_runtime", container_id=container_id)
|
|
|
|
audit(db, "SESSION_CREATE", f"service={service.slug} session={session_id}", user_id=user.id)
|
|
t_wait = time.perf_counter()
|
|
ready = wait_for_session_route(session_id)
|
|
_mark("wait_session_route_ms", t_wait)
|
|
log_event("session_route_ready", session_id=session_id, ready=ready)
|
|
_emit("session_created_single_runtime", session_id=session_id, ready=ready)
|
|
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
|
|
except LockTimeoutError:
|
|
_emit("user_lock_timeout")
|
|
raise HTTPException(status_code=429, detail="Слишком много параллельных запусков. Повторите через несколько секунд.")
|
|
|
|
|
|
@app.get("/svc/{slug}/", response_class=HTMLResponse)
|
|
def service_wait_page(slug: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
|
|
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
if not has_access(db, user.id, service.id):
|
|
raise HTTPException(status_code=403, detail="ACL denied")
|
|
return HTMLResponse(
|
|
content="""
|
|
<!doctype html>
|
|
<html>
|
|
<head>
|
|
<meta charset='utf-8'>
|
|
<title>Service Starting</title>
|
|
<style>
|
|
body { font-family: sans-serif; background: #f4f6f8; display: grid; place-items: center; height: 100vh; margin: 0; color:#1b3145; }
|
|
.card { background: #fff; padding: 1rem 1.2rem; border-radius: 10px; box-shadow: 0 8px 20px rgba(0,0,0,.08); min-width: 340px; }
|
|
.title { font-weight: 700; margin-bottom: 0.5rem; }
|
|
.state { margin-bottom: 0.6rem; }
|
|
ul { margin: 0; padding-left: 1.1rem; }
|
|
li { margin: 0.2rem 0; }
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="card">
|
|
<div class="title">Сервис запускается</div>
|
|
<div class="state" id="state">Проверка...</div>
|
|
<ul id="steps"></ul>
|
|
</div>
|
|
<script>
|
|
const slug = window.location.pathname.replace(/^\\/svc\\//, '').replace(/\\/$/, '');
|
|
async function tick() {
|
|
const r = await fetch(`/api/services/${slug}/status`, {credentials:'include'});
|
|
if (!r.ok) return;
|
|
const data = await r.json();
|
|
document.getElementById('state').textContent = data.message || 'Запуск...';
|
|
const ul = document.getElementById('steps');
|
|
ul.innerHTML = '';
|
|
(data.steps || []).forEach((x) => {
|
|
const li = document.createElement('li');
|
|
li.textContent = x;
|
|
ul.appendChild(li);
|
|
});
|
|
if (data.ready) window.location.replace(`/svc/${slug}/`);
|
|
}
|
|
setInterval(tick, 1000);
|
|
tick();
|
|
</script>
|
|
</body>
|
|
</html>
|
|
""".strip(),
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@app.get("/s/{session_id}/", response_class=HTMLResponse)
|
|
def session_wait_page(session_id: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
|
|
sess = db.get(SessionModel, session_id)
|
|
if not sess or sess.user_id != user.id:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if sess.status != SessionStatus.ACTIVE:
|
|
raise HTTPException(status_code=410, detail="Session is not active")
|
|
service = db.get(Service, sess.service_id)
|
|
service_title = service.name if service else "Сервис"
|
|
is_rdp = service and service.type == ServiceType.RDP
|
|
label = "Ожидайте..." if is_rdp else "Сессия запускается..."
|
|
redirect_target = session_redirect_url(sess)
|
|
return HTMLResponse(
|
|
content=f"""
|
|
<!doctype html>
|
|
<html>
|
|
<head>
|
|
<meta charset='utf-8'>
|
|
<title>{service_title}</title>
|
|
<style>
|
|
*{{box-sizing:border-box}}
|
|
body{{font-family:sans-serif;background:#0f1720;display:grid;place-items:center;height:100vh;margin:0;color:#dce8f5}}
|
|
.card{{background:rgba(255,255,255,.06);border:1px solid rgba(255,255,255,.12);padding:1.6rem 2rem;border-radius:14px;
|
|
box-shadow:0 12px 32px rgba(0,0,0,.4);min-width:320px;max-width:440px;text-align:center}}
|
|
.spinner{{width:48px;height:48px;border:4px solid rgba(220,232,245,.15);border-top-color:#2a8cd6;
|
|
border-radius:50%;animation:spin .9s linear infinite;margin:0 auto 1.2rem}}
|
|
@keyframes spin{{to{{transform:rotate(360deg)}}}}
|
|
.title{{font-size:1.15rem;font-weight:700;margin-bottom:.5rem;color:#fff}}
|
|
.state{{font-size:.9rem;color:#a0b8cc;margin-bottom:.8rem;min-height:1.2em}}
|
|
ul{{margin:0;padding:0;list-style:none;font-size:.82rem;color:#7a99b0;text-align:left}}
|
|
li::before{{content:"· ";color:#2a8cd6}}
|
|
li+li{{margin-top:.2rem}}
|
|
.sid{{display:block;margin-top:1.2rem;font-size:.7rem;color:rgba(160,184,204,.4);word-break:break-all}}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<div class="card">
|
|
<div class="spinner"></div>
|
|
<div class="title">{label}</div>
|
|
<div class="state" id="state">Проверка...</div>
|
|
<ul id="steps"></ul>
|
|
<span class="sid">{session_id}</span>
|
|
</div>
|
|
<script>
|
|
const sessionId = "{session_id}";
|
|
async function tick() {{
|
|
const r = await fetch(`/api/sessions/${{sessionId}}/status`, {{credentials:'include'}});
|
|
if (!r.ok) return;
|
|
const data = await r.json();
|
|
document.getElementById('state').textContent = data.message || 'Запуск...';
|
|
const ul = document.getElementById('steps');
|
|
ul.innerHTML = '';
|
|
(data.steps || []).forEach((x) => {{
|
|
const li = document.createElement('li');
|
|
li.textContent = x;
|
|
ul.appendChild(li);
|
|
}});
|
|
if (data.ready) window.location.replace(data.redirect_url || "{redirect_target}");
|
|
}}
|
|
setInterval(tick, 1000);
|
|
tick();
|
|
</script>
|
|
</body>
|
|
</html>
|
|
""".strip(),
|
|
status_code=200,
|
|
)
|
|
|
|
|
|
@app.get("/s/{session_id}/view", response_class=HTMLResponse)
|
|
def session_view_page(session_id: str, request: Request, user: User = Depends(require_user), db: Session = Depends(get_db)):
|
|
sess = db.get(SessionModel, session_id)
|
|
if not sess or sess.user_id != user.id:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if sess.status != SessionStatus.ACTIVE:
|
|
raise HTTPException(status_code=410, detail="Session is not active")
|
|
service = db.get(Service, sess.service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
iframe_src = None
|
|
if sess.container_id and sess.container_id.startswith("POOL:"):
|
|
iframe_src = f"/svc/{service.slug}/?sid={session_id}"
|
|
elif sess.container_id and sess.container_id.startswith("WEBPOOLIDX:"):
|
|
try:
|
|
slot = int(sess.container_id.split(":", 1)[1])
|
|
iframe_src = f"/w/{slot}/?sid={session_id}"
|
|
except Exception:
|
|
iframe_src = None
|
|
elif sess.container_id and sess.container_id.startswith("POOLIDX:"):
|
|
try:
|
|
slot = int(sess.container_id.split(":", 1)[1])
|
|
iframe_src = f"/u/{slot}/?sid={session_id}"
|
|
except Exception:
|
|
iframe_src = None
|
|
if iframe_src:
|
|
return HTMLResponse(
|
|
content=f"""
|
|
<!doctype html>
|
|
<html>
|
|
<head>
|
|
<meta charset='utf-8'>
|
|
<title>{service.name}</title>
|
|
<style>
|
|
html,body,iframe {{ margin:0; width:100%; height:100%; border:0; background:#0f1720; }}
|
|
</style>
|
|
</head>
|
|
<body>
|
|
<iframe src="{iframe_src}" allow="clipboard-read; clipboard-write"></iframe>
|
|
</body>
|
|
</html>
|
|
""".strip()
|
|
)
|
|
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
|
|
|
|
|
|
@app.post("/api/sessions/{session_id}/touch")
|
|
def touch_session(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
|
|
sess = db.get(SessionModel, session_id)
|
|
if not sess or sess.user_id != user.id:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if sess.status != SessionStatus.ACTIVE:
|
|
reason = session_closed_reason(sess, db)
|
|
log_event(
|
|
"session_touch_rejected",
|
|
level=logging.WARNING,
|
|
session_id=session_id,
|
|
user_id=user.id,
|
|
status=sess.status.value,
|
|
reason=reason,
|
|
)
|
|
return JSONResponse(
|
|
status_code=410,
|
|
content={
|
|
"ok": False,
|
|
"reason": reason,
|
|
"status": sess.status.value,
|
|
},
|
|
)
|
|
sess.last_access_at = now_utc()
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@app.post("/api/sessions/{session_id}/close")
|
|
def close_session(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
|
|
sess = db.get(SessionModel, session_id)
|
|
if not sess or sess.user_id != user.id:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if sess.status != SessionStatus.ACTIVE:
|
|
log_event(
|
|
"session_close_already_closed",
|
|
session_id=session_id,
|
|
user_id=user.id,
|
|
status=sess.status.value,
|
|
reason=session_closed_reason(sess, db),
|
|
)
|
|
return {"ok": True, "status": sess.status.value}
|
|
terminate_session_record(db, sess, SessionStatus.TERMINATED, stop_container=True)
|
|
db.commit()
|
|
log_event("session_closed_by_user", session_id=session_id, user_id=user.id)
|
|
return {"ok": True, "status": "TERMINATED"}
|
|
|
|
|
|
@app.get("/api/services/{slug}/status")
|
|
def service_status(slug: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
|
|
service = db.scalar(select(Service).where(Service.slug == slug, Service.active == True))
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
if service.type == ServiceType.VNC:
|
|
raise HTTPException(status_code=410, detail="VNC services are deprecated")
|
|
if not has_access(db, user.id, service.id):
|
|
raise HTTPException(status_code=403, detail="ACL denied")
|
|
pool = get_pool_status_for_service(service)
|
|
route_ok = route_ready(f"/svc/{slug}/")
|
|
ready = route_ok and (pool["running"] > 0 if desired_pool_size(service) > 0 else True)
|
|
steps = [
|
|
f"ACL: OK ({user.username})",
|
|
f"Пул: {pool['running']} / {pool['desired']}",
|
|
f"Маршрут /svc/{slug}/: {'OK' if route_ok else 'ожидание'}",
|
|
]
|
|
return {
|
|
"ready": ready,
|
|
"message": "Готово, открываем..." if ready else "Поднимаем контейнер и маршрут...",
|
|
"steps": steps,
|
|
}
|
|
|
|
|
|
@app.get("/api/sessions/{session_id}/status")
|
|
def session_status(session_id: str, user: User = Depends(require_user), db: Session = Depends(get_db)):
|
|
sess = db.get(SessionModel, session_id)
|
|
if not sess or sess.user_id != user.id:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
if sess.status != SessionStatus.ACTIVE:
|
|
raise HTTPException(status_code=410, detail="Session is not active")
|
|
service = db.get(Service, sess.service_id)
|
|
pooled_web = bool(sess.container_id and sess.container_id.startswith("POOL:") and service and service.type == ServiceType.WEB)
|
|
web_pool_idx = None
|
|
universal_pool_idx = None
|
|
if sess.container_id and sess.container_id.startswith("WEBPOOLIDX:"):
|
|
try:
|
|
web_pool_idx = int(sess.container_id.split(":", 1)[1])
|
|
except Exception:
|
|
web_pool_idx = None
|
|
if sess.container_id and sess.container_id.startswith("POOLIDX:"):
|
|
try:
|
|
universal_pool_idx = int(sess.container_id.split(":", 1)[1])
|
|
except Exception:
|
|
universal_pool_idx = None
|
|
pooled_rdp = bool(sess.container_id and sess.container_id.startswith("POOL:") and service and service.type == ServiceType.RDP)
|
|
if pooled_web and service:
|
|
route_path = f"/svc/{service.slug}/"
|
|
elif pooled_rdp and service:
|
|
route_path = f"/svc/{service.slug}/"
|
|
else:
|
|
route_path = f"/s/{session_id}/"
|
|
if web_pool_idx is not None:
|
|
route_path = f"/w/{web_pool_idx}/"
|
|
if universal_pool_idx is not None:
|
|
route_path = f"/u/{universal_pool_idx}/"
|
|
route_ok = route_ready(route_path)
|
|
running = container_running(sess.container_id)
|
|
ready = running and route_ok
|
|
steps = [
|
|
f"Контейнер: {'running' if running else 'starting'}",
|
|
f"Маршрут {route_path}: {'OK' if route_ok else 'ожидание'}",
|
|
]
|
|
payload = {
|
|
"ready": ready,
|
|
"message": "Готово, открываем..." if ready else "Запуск сессии...",
|
|
"steps": steps,
|
|
}
|
|
if pooled_web or pooled_rdp:
|
|
payload["redirect_url"] = f"/s/{session_id}/view"
|
|
if web_pool_idx is not None:
|
|
payload["redirect_url"] = f"/s/{session_id}/view"
|
|
if universal_pool_idx is not None:
|
|
payload["redirect_url"] = f"/s/{session_id}/view"
|
|
return payload
|
|
|
|
|
|
@app.post("/api/admin/services")
|
|
def create_service(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
service_type = ServiceType(payload["type"])
|
|
if service_type == ServiceType.VNC:
|
|
raise HTTPException(status_code=400, detail="VNC services are no longer supported")
|
|
target = payload["target"]
|
|
if service_type == ServiceType.WEB:
|
|
target = normalize_web_target(target)
|
|
elif service_type == ServiceType.RDP:
|
|
parse_rdp_target(target)
|
|
service = Service(
|
|
name=payload["name"],
|
|
slug=payload["slug"],
|
|
type=service_type,
|
|
target=target,
|
|
comment=payload.get("comment", ""),
|
|
active=payload.get("active", True),
|
|
warm_pool_size=max(0, int(payload.get("warm_pool_size", 0))),
|
|
)
|
|
db.add(service)
|
|
db.flush()
|
|
set_service_categories(db, service.id, payload.get("category_ids", []))
|
|
db.commit()
|
|
if service.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
|
|
ensure_warm_pool(service)
|
|
elif service_uses_universal_pool(service):
|
|
ensure_universal_pool()
|
|
return {"id": service.id}
|
|
|
|
|
|
@app.get("/api/admin/services/{service_id}/containers/status")
|
|
def service_containers_status(service_id: int, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
service = db.get(Service, service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
out = get_pool_detailed_status(service)
|
|
out["active_sessions"] = get_active_sessions_count(db, service.id)
|
|
return out
|
|
|
|
|
|
@app.post("/api/admin/services/{service_id}/icon")
|
|
async def upload_service_icon(
|
|
service_id: int,
|
|
request: Request,
|
|
file: UploadFile = File(...),
|
|
_: User = Depends(require_admin),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
validate_csrf(request)
|
|
service = db.get(Service, service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
new_path = await store_service_icon(service, file)
|
|
old_path = service.icon_path
|
|
service.icon_path = new_path
|
|
db.commit()
|
|
if old_path and old_path != new_path:
|
|
remove_icon_file(old_path)
|
|
return {"ok": True, "icon_path": new_path}
|
|
|
|
|
|
@app.delete("/api/admin/services/{service_id}/icon")
|
|
def delete_service_icon(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
service = db.get(Service, service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
old_path = service.icon_path
|
|
service.icon_path = ""
|
|
db.commit()
|
|
remove_icon_file(old_path)
|
|
return {"ok": True}
|
|
|
|
|
|
@app.put("/api/admin/services/{service_id}")
|
|
def edit_service(service_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
service = db.get(Service, service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
for key in ["name", "slug", "target", "active", "comment"]:
|
|
if key in payload:
|
|
setattr(service, key, payload[key])
|
|
if "type" in payload:
|
|
service.type = ServiceType(payload["type"])
|
|
if service.type == ServiceType.VNC:
|
|
raise HTTPException(status_code=400, detail="VNC services are no longer supported")
|
|
if service.type == ServiceType.WEB:
|
|
service.target = normalize_web_target(service.target)
|
|
elif service.type == ServiceType.RDP:
|
|
parse_rdp_target(service.target)
|
|
if "warm_pool_size" in payload:
|
|
service.warm_pool_size = max(0, int(payload["warm_pool_size"]))
|
|
if "category_ids" in payload:
|
|
set_service_categories(db, service.id, payload.get("category_ids", []))
|
|
db.commit()
|
|
if service.type == ServiceType.WEB:
|
|
if WEB_POOL_SIZE <= 0:
|
|
ensure_warm_pool(service)
|
|
open_warm_web_url(service, service.target)
|
|
elif service_uses_universal_pool(service):
|
|
ensure_universal_pool()
|
|
return {"ok": True}
|
|
|
|
|
|
@app.delete("/api/admin/services/{service_id}")
|
|
def delete_service(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
service = db.get(Service, service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
if service.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
|
|
ensure_warm_pool(service, 0)
|
|
remove_icon_file(service.icon_path)
|
|
db.delete(service)
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@app.post("/api/admin/services/{service_id}/prewarm")
|
|
def prewarm_now(service_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
service = db.get(Service, service_id)
|
|
if not service:
|
|
raise HTTPException(status_code=404, detail="Service not found")
|
|
if service.type == ServiceType.WEB:
|
|
ensure_web_pool()
|
|
return {"ok": True, "pool": get_web_pool_status()}
|
|
if service_uses_universal_pool(service):
|
|
ensure_universal_pool()
|
|
return {"ok": True, "pool": get_universal_pool_status()}
|
|
if service.type == ServiceType.RDP:
|
|
return {"ok": True, "pool": get_pool_status_for_service(service), "message": "RDP запускается on-demand"}
|
|
ensure_warm_pool(service)
|
|
return {"ok": True, "pool": get_pool_status_for_service(service)}
|
|
|
|
|
|
@app.post("/api/admin/categories")
|
|
def create_category(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
name = (payload.get("name") or "").strip()
|
|
slug = (payload.get("slug") or "").strip().lower().replace(" ", "-")
|
|
if not name:
|
|
raise HTTPException(status_code=400, detail="Category name is required")
|
|
if not slug:
|
|
raise HTTPException(status_code=400, detail="Category slug is required")
|
|
exists = db.scalar(select(Category).where((Category.name == name) | (Category.slug == slug)))
|
|
if exists:
|
|
raise HTTPException(status_code=409, detail="Category already exists")
|
|
category = Category(name=name, slug=slug)
|
|
db.add(category)
|
|
db.commit()
|
|
return {"id": category.id}
|
|
|
|
|
|
@app.delete("/api/admin/categories/{category_id}")
|
|
def delete_category(category_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
category = db.get(Category, category_id)
|
|
if not category:
|
|
raise HTTPException(status_code=404, detail="Category not found")
|
|
db.delete(category)
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@app.put("/api/admin/web-pool-size")
|
|
def update_web_pool_size(payload: dict, request: Request, _: User = Depends(require_admin)):
|
|
validate_csrf(request)
|
|
global WEB_POOL_SIZE
|
|
value = max(0, int(payload.get("size", WEB_POOL_SIZE)))
|
|
WEB_POOL_SIZE = value
|
|
ensure_web_pool()
|
|
return {"ok": True, "size": WEB_POOL_SIZE, "pool": get_web_pool_status()}
|
|
|
|
|
|
@app.post("/api/admin/users")
|
|
def create_user(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
expires_at = dt.datetime.fromisoformat(payload["expires_at"])
|
|
user = User(
|
|
username=payload["username"],
|
|
password_hash=hash_password(payload["password"]),
|
|
expires_at=expires_at,
|
|
active=payload.get("active", True),
|
|
is_admin=payload.get("is_admin", False),
|
|
)
|
|
db.add(user)
|
|
db.commit()
|
|
return {"id": user.id}
|
|
|
|
|
|
@app.put("/api/admin/users/{user_id}")
|
|
def edit_user(user_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
user = db.get(User, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
for key in ["username", "active", "is_admin"]:
|
|
if key in payload:
|
|
setattr(user, key, payload[key])
|
|
if "password" in payload and payload["password"]:
|
|
user.password_hash = hash_password(payload["password"])
|
|
if "expires_at" in payload:
|
|
user.expires_at = dt.datetime.fromisoformat(payload["expires_at"])
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@app.delete("/api/admin/users/{user_id}")
|
|
def delete_user(user_id: int, request: Request, admin: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
user = db.get(User, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
if user.id == admin.id:
|
|
raise HTTPException(status_code=400, detail="Cannot delete current admin")
|
|
db.delete(user)
|
|
db.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@app.put("/api/admin/users/{user_id}/acl")
|
|
def set_acl(user_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
|
|
validate_csrf(request)
|
|
user = db.get(User, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
service_ids = set(payload.get("service_ids", []))
|
|
|
|
existing = db.scalars(select(UserServiceAccess).where(UserServiceAccess.user_id == user_id)).all()
|
|
existing_map = {x.service_id: x for x in existing}
|
|
|
|
# Check RDP exclusivity: each RDP service can belong to only one user in ACL
|
|
all_rdp_ids_in_payload = set()
|
|
for sid in service_ids:
|
|
svc = db.get(Service, sid)
|
|
if svc and svc.type == ServiceType.RDP:
|
|
all_rdp_ids_in_payload.add(sid)
|
|
if all_rdp_ids_in_payload:
|
|
acl_conflicts = db.execute(
|
|
select(UserServiceAccess.service_id, User.username)
|
|
.join(User, User.id == UserServiceAccess.user_id)
|
|
.where(
|
|
UserServiceAccess.service_id.in_(all_rdp_ids_in_payload),
|
|
UserServiceAccess.user_id != user_id,
|
|
)
|
|
).all()
|
|
if acl_conflicts:
|
|
blocked = ", ".join(f'"{row.username}"' for row in acl_conflicts)
|
|
raise HTTPException(status_code=409, detail=f"RDP сервис уже назначен другому пользователю ({blocked}).")
|
|
|
|
for sid in service_ids:
|
|
if sid not in existing_map:
|
|
db.add(UserServiceAccess(user_id=user_id, service_id=sid))
|
|
for sid, row in existing_map.items():
|
|
if sid not in service_ids:
|
|
db.delete(row)
|
|
|
|
db.commit()
|
|
return {"ok": True}
|