feat: RDP slot pool — multi-user RDP with per-account containers

- New RdpSlot model (rdp_slots table): service_id, rdp_username,
  rdp_password, container_name
- Each slot gets a dedicated portal-rdpslot-<slug>-<id> container with
  Traefik route /rdp/<slot_id>/ and restart_policy=unless-stopped
- go_service: RDP services with slots use pool allocation — finds first
  free slot (not occupied by active session), returns 503 if all busy
- session_status + session_view: handle RDPSLOT: container_id prefix
- terminate_session_record: restarts slot container in background on close
- session_redirect_url: RDPSLOT sessions redirect to /s/<id>/view
- startup_event: starts containers for all configured slots on boot
- Admin: POST /api/admin/services/{id}/rdp-slots, DELETE /api/admin/rdp-slots/{id}
- admin.html: slot management UI (list, add, delete); removed ACL exclusivity
- set_acl: removed RDP 1-user exclusivity — RDP services now assignable to many

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-28 06:32:02 +00:00
parent 67d361c5c9
commit 6847cbc078
2 changed files with 346 additions and 76 deletions
+286 -50
View File
@@ -231,6 +231,17 @@ class UserServiceAccess(Base):
granted_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
class RdpSlot(Base):
__tablename__ = "rdp_slots"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
service_id: Mapped[int] = mapped_column(ForeignKey("services.id", ondelete="CASCADE"), index=True)
rdp_username: Mapped[str] = mapped_column(String(128))
rdp_password: Mapped[str] = mapped_column(String(256), default="")
container_name: Mapped[Optional[str]] = mapped_column(String(128), nullable=True)
created_at: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), default=lambda: dt.datetime.now(dt.timezone.utc))
class SessionModel(Base):
__tablename__ = "sessions"
@@ -1058,6 +1069,20 @@ def container_running(container_id: Optional[str]) -> bool:
or container_id.startswith("WEBPOOLIDX:")
):
return True
if container_id.startswith("RDPSLOT:"):
try:
slot_id = int(container_id.split(":", 1)[1])
db = SessionLocal()
try:
slot = db.get(RdpSlot, slot_id)
if not slot or not slot.container_name:
return False
c = docker_client().containers.get(slot.container_name)
return c.status == "running"
finally:
db.close()
except Exception:
return False
try:
c = docker_client().containers.get(container_id)
return c.status == "running"
@@ -1065,6 +1090,107 @@ def container_running(container_id: Optional[str]) -> bool:
return False
def _rdp_slot_container_name(service_slug: str, slot_id: int) -> str:
return f"portal-rdpslot-{service_slug}-{slot_id}"
def start_rdp_slot_container(slot: RdpSlot, service: Service) -> str:
d = docker_client()
name = _rdp_slot_container_name(service.slug, slot.id)
slot_id = slot.id
path = f"/rdp/{slot_id}/"
router = f"rdpslot-{slot_id}"
labels = {
"traefik.enable": "true",
"traefik.docker.network": "portal_net",
f"traefik.http.routers.{router}.rule": f"PathPrefix(`{path}`)",
f"traefik.http.routers.{router}.entrypoints": "websecure",
f"traefik.http.routers.{router}.tls": "true",
f"traefik.http.routers.{router}.priority": "10000",
f"traefik.http.routers.{router}.middlewares": f"{router}-strip",
f"traefik.http.middlewares.{router}-strip.stripprefix.prefixes": path[:-1],
f"traefik.http.services.{router}.loadbalancer.server.port": "6080",
"portal.rdpslot": "1",
"portal.rdpslot.id": str(slot_id),
"portal.service.slug": service.slug,
}
cfg = parse_rdp_target(service.target)
env = {
"SESSION_ID": f"rdpslot-{slot_id}",
"IDLE_TIMEOUT": "86400",
"ENABLE_HEARTBEAT": "0",
"RDP_HOST": cfg["host"],
"RDP_PORT": cfg["port"],
"X11VNC_FLAGS": X11VNC_FLAGS,
}
if slot.rdp_username:
env["RDP_USER"] = slot.rdp_username
if slot.rdp_password:
env["RDP_PASSWORD"] = slot.rdp_password
if cfg.get("domain"):
env["RDP_DOMAIN"] = cfg["domain"]
if cfg.get("security"):
env["RDP_SECURITY"] = cfg["security"]
try:
existing = d.containers.get(name)
existing.stop(timeout=5)
existing.remove(force=True)
except docker.errors.NotFound:
pass
except Exception:
logger.exception("rdp_slot_container_cleanup_failed slot_id=%s", slot_id)
container = d.containers.run(
"portal-rdp-proxy:latest",
name=name,
detach=True,
restart_policy={"Name": "unless-stopped"},
network="portal_net",
labels=labels,
environment=env,
)
logger.info("rdp_slot_container_started slot_id=%s name=%s", slot_id, name)
return container.name
def stop_rdp_slot_container(container_name: str) -> None:
if not container_name:
return
try:
d = docker_client()
c = d.containers.get(container_name)
c.stop(timeout=5)
c.remove(force=True)
logger.info("rdp_slot_container_stopped name=%s", container_name)
except docker.errors.NotFound:
pass
except Exception:
logger.exception("rdp_slot_container_stop_failed container=%s", container_name)
def _restart_rdp_slot_bg(slot_id: int) -> None:
db = SessionLocal()
try:
slot = db.get(RdpSlot, slot_id)
if not slot or not slot.container_name:
return
service = db.get(Service, slot.service_id)
if not service:
return
try:
d = docker_client()
c = d.containers.get(slot.container_name)
c.restart(timeout=10)
logger.info("rdp_slot_container_restarted slot_id=%s", slot_id)
except docker.errors.NotFound:
start_rdp_slot_container(slot, service)
except Exception:
logger.exception("rdp_slot_container_restart_failed slot_id=%s", slot_id)
finally:
db.close()
def stop_runtime_container(container_id: Optional[str]) -> None:
if not container_id:
return
@@ -1087,8 +1213,14 @@ def terminate_session_record(
return
old_status = sess.status
cid = sess.container_id or ""
if stop_container and cid and not cid.startswith(("POOL:", "POOLIDX:", "WEBPOOLIDX:")):
if stop_container and cid and not cid.startswith(("POOL:", "POOLIDX:", "WEBPOOLIDX:", "RDPSLOT:")):
stop_runtime_container(cid)
if cid.startswith("RDPSLOT:"):
try:
slot_id = int(cid.split(":", 1)[1])
threading.Thread(target=_restart_rdp_slot_bg, args=(slot_id,), daemon=True).start()
except Exception:
logger.exception("rdp_slot_restart_schedule_failed cid=%s", cid)
sess.status = new_status
sess.last_access_at = now_utc()
log_event(
@@ -1444,7 +1576,7 @@ def terminate_active_slot_sessions(db: Session, container_id: str) -> None:
def session_redirect_url(sess: SessionModel) -> str:
cid = sess.container_id or ""
if cid.startswith("POOL:") or cid.startswith("POOLIDX:") or cid.startswith("WEBPOOLIDX:"):
if cid.startswith("POOL:") or cid.startswith("POOLIDX:") or cid.startswith("WEBPOOLIDX:") or cid.startswith("RDPSLOT:"):
return f"/s/{sess.id}/view"
return f"/s/{sess.id}/"
@@ -1620,6 +1752,16 @@ def startup_event():
).all():
if svc.type == ServiceType.WEB and WEB_POOL_SIZE <= 0:
ensure_warm_pool(svc)
elif svc.type == ServiceType.RDP:
slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == svc.id)).all()
for slot in slots:
try:
start_rdp_slot_container(slot, svc)
slot.container_name = _rdp_slot_container_name(svc.slug, slot.id)
except Exception:
logger.exception("startup_rdp_slot_start_failed slot_id=%s", slot.id)
if slots:
db.commit()
finally:
db.close()
@@ -1801,19 +1943,38 @@ def admin_page(request: Request, admin: User = Depends(require_admin), db: Sessi
),
{"cutoff": cutoff},
).mappings().all()
rdp_occupied_by: dict[int, int] = {}
rdp_occupied_username: dict[int, str] = {}
rdp_ids = [s.id for s in rdp_services]
if rdp_ids:
rdp_acl_rows = db.execute(
select(UserServiceAccess.service_id, UserServiceAccess.user_id, User.username)
.join(User, User.id == UserServiceAccess.user_id)
.where(UserServiceAccess.service_id.in_(rdp_ids))
).all()
for row in rdp_acl_rows:
if row.service_id not in rdp_occupied_by:
rdp_occupied_by[row.service_id] = row.user_id
rdp_occupied_username[row.service_id] = row.username
rdp_slots: dict[int, list] = {}
cutoff_slot = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
for svc in rdp_services:
slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == svc.id).order_by(RdpSlot.id)).all()
slot_list = []
for slot in slots:
active_sess = db.scalar(
select(SessionModel).where(
SessionModel.container_id == f"RDPSLOT:{slot.id}",
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff_slot,
)
)
running = False
if slot.container_name:
try:
c = docker_client().containers.get(slot.container_name)
running = c.status == "running"
except Exception:
pass
occupied_username = None
if active_sess:
u = db.get(User, active_sess.user_id)
occupied_username = u.username if u else f"id={active_sess.user_id}"
slot_list.append({
"id": slot.id,
"rdp_username": slot.rdp_username,
"container_name": slot.container_name or "",
"running": running,
"occupied_username": occupied_username,
})
rdp_slots[svc.id] = slot_list
return templates.TemplateResponse(
"admin.html",
{
@@ -1836,8 +1997,7 @@ def admin_page(request: Request, admin: User = Depends(require_admin), db: Sessi
"online_sessions": online_sessions,
"csrf_token": request.cookies.get(CSRF_COOKIE, ""),
"max_active_services_per_user": MAX_ACTIVE_SERVICES_PER_USER,
"rdp_occupied_by": rdp_occupied_by,
"rdp_occupied_username": rdp_occupied_username,
"rdp_slots": rdp_slots,
},
)
@@ -1986,20 +2146,61 @@ def go_service(
return RedirectResponse(url="/?launch_error=max_services", status_code=303)
if service.type == ServiceType.RDP:
t_rdp_owner = time.perf_counter()
active_owner = find_active_session_for_service(db, service.id)
_mark("check_rdp_owner_ms", t_rdp_owner)
if active_owner:
if active_owner.user_id != user.id:
owner = db.get(User, active_owner.user_id)
owner_name = owner.username if owner else f"id={active_owner.user_id}"
_emit("rdp_busy", owner=owner_name)
raise HTTPException(
status_code=409,
detail=f"RDP сервис уже занят пользователем {owner_name}. Попробуйте позже.",
)
_emit("reuse_rdp_session", session_id=active_owner.id)
return RedirectResponse(url=session_redirect_url(active_owner), status_code=303)
t_rdp_slots = time.perf_counter()
slots = db.scalars(select(RdpSlot).where(RdpSlot.service_id == service.id)).all()
_mark("check_rdp_slots_ms", t_rdp_slots)
if slots:
session_id = str(uuid.uuid4())
try:
with allocator_lock(db, 91003, timeout_seconds=GO_POOL_LOCK_TIMEOUT_SECONDS):
cutoff = now_utc() - dt.timedelta(seconds=SESSION_IDLE_SECONDS)
busy_slot_ids: set[int] = set()
for row in db.scalars(
select(SessionModel).where(
SessionModel.status == SessionStatus.ACTIVE,
SessionModel.last_access_at >= cutoff,
SessionModel.service_id == service.id,
SessionModel.container_id.like("RDPSLOT:%"),
)
).all():
try:
busy_slot_ids.add(int(row.container_id.split(":", 1)[1]))
except Exception:
pass
free_slot = next((s for s in slots if s.id not in busy_slot_ids), None)
if not free_slot:
_emit("rdp_all_slots_busy")
raise HTTPException(
status_code=503,
detail="Все слоты этого RDP сервиса заняты. Попробуйте позже.",
)
session_obj = SessionModel(
id=session_id,
user_id=user.id,
service_id=service.id,
container_id=f"RDPSLOT:{free_slot.id}",
status=SessionStatus.ACTIVE,
created_at=now_utc(),
last_access_at=now_utc(),
)
db.add(session_obj)
db.commit()
except LockTimeoutError:
_emit("rdp_slot_lock_timeout")
raise HTTPException(status_code=503, detail="Пул RDP занят. Повторите через несколько секунд.")
log_event("session_created", user_id=user.id, service_slug=service.slug, session_id=session_id, mode="rdp_slot", slot_id=free_slot.id)
audit(db, "SESSION_CREATE_RDP_SLOT", f"service={service.slug} session={session_id} slot={free_slot.id}", user_id=user.id)
_emit("session_created_rdp_slot", session_id=session_id, slot_id=free_slot.id)
return RedirectResponse(url=f"/s/{session_id}/", status_code=303)
else:
# Legacy: no slots configured — exclusive single-session behaviour
active_owner = find_active_session_for_service(db, service.id)
if active_owner:
if active_owner.user_id != user.id:
_emit("rdp_busy_legacy")
raise HTTPException(status_code=503, detail="RDP сервис занят. Попробуйте позже.")
_emit("reuse_rdp_session", session_id=active_owner.id)
return RedirectResponse(url=session_redirect_url(active_owner), status_code=303)
session_id = str(uuid.uuid4())
if service.type == ServiceType.WEB and WEB_POOL_SIZE > 0:
@@ -2305,6 +2506,12 @@ def session_view_page(session_id: str, request: Request, user: User = Depends(re
iframe_src = f"/u/{slot}/?sid={session_id}"
except Exception:
iframe_src = None
elif sess.container_id and sess.container_id.startswith("RDPSLOT:"):
try:
slot = int(sess.container_id.split(":", 1)[1])
iframe_src = f"/rdp/{slot}/?sid={session_id}"
except Exception:
iframe_src = None
if iframe_src:
return HTMLResponse(
content=f"""
@@ -2420,10 +2627,18 @@ def session_status(session_id: str, user: User = Depends(require_user), db: Sess
except Exception:
universal_pool_idx = None
pooled_rdp = bool(sess.container_id and sess.container_id.startswith("POOL:") and service and service.type == ServiceType.RDP)
rdp_slot_idx = None
if sess.container_id and sess.container_id.startswith("RDPSLOT:"):
try:
rdp_slot_idx = int(sess.container_id.split(":", 1)[1])
except Exception:
rdp_slot_idx = None
if pooled_web and service:
route_path = f"/svc/{service.slug}/"
elif pooled_rdp and service:
route_path = f"/svc/{service.slug}/"
elif rdp_slot_idx is not None:
route_path = f"/rdp/{rdp_slot_idx}/"
else:
route_path = f"/s/{session_id}/"
if web_pool_idx is not None:
@@ -2448,6 +2663,8 @@ def session_status(session_id: str, user: User = Depends(require_user), db: Sess
payload["redirect_url"] = f"/s/{session_id}/view"
if universal_pool_idx is not None:
payload["redirect_url"] = f"/s/{session_id}/view"
if rdp_slot_idx is not None:
payload["redirect_url"] = f"/s/{session_id}/view"
return payload
@@ -2589,6 +2806,44 @@ def prewarm_now(service_id: int, request: Request, _: User = Depends(require_adm
return {"ok": True, "pool": get_pool_status_for_service(service)}
@app.post("/api/admin/services/{service_id}/rdp-slots")
def create_rdp_slot(service_id: int, payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
service = db.get(Service, service_id)
if not service or service.type != ServiceType.RDP:
raise HTTPException(status_code=404, detail="RDP service not found")
rdp_username = (payload.get("rdp_username") or "").strip()
rdp_password = (payload.get("rdp_password") or "").strip()
if not rdp_username:
raise HTTPException(status_code=400, detail="rdp_username is required")
slot = RdpSlot(service_id=service_id, rdp_username=rdp_username, rdp_password=rdp_password)
db.add(slot)
db.flush()
try:
container_name = start_rdp_slot_container(slot, service)
slot.container_name = container_name
except Exception as exc:
logger.exception("rdp_slot_container_start_failed service_id=%s", service_id)
raise HTTPException(status_code=502, detail=f"Контейнер не запустился: {exc}")
db.commit()
audit(db, "RDP_SLOT_CREATE", f"service={service.slug} slot={slot.id} user={rdp_username}", user_id=None)
return {"ok": True, "slot_id": slot.id, "container_name": slot.container_name}
@app.delete("/api/admin/rdp-slots/{slot_id}")
def delete_rdp_slot(slot_id: int, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
slot = db.get(RdpSlot, slot_id)
if not slot:
raise HTTPException(status_code=404, detail="Slot not found")
container_name = slot.container_name
db.delete(slot)
db.commit()
if container_name:
threading.Thread(target=stop_rdp_slot_container, args=(container_name,), daemon=True).start()
return {"ok": True}
@app.post("/api/admin/categories")
def create_category(payload: dict, request: Request, _: User = Depends(require_admin), db: Session = Depends(get_db)):
validate_csrf(request)
@@ -2685,25 +2940,6 @@ def set_acl(user_id: int, payload: dict, request: Request, _: User = Depends(req
existing = db.scalars(select(UserServiceAccess).where(UserServiceAccess.user_id == user_id)).all()
existing_map = {x.service_id: x for x in existing}
# Check RDP exclusivity: each RDP service can belong to only one user in ACL
all_rdp_ids_in_payload = set()
for sid in service_ids:
svc = db.get(Service, sid)
if svc and svc.type == ServiceType.RDP:
all_rdp_ids_in_payload.add(sid)
if all_rdp_ids_in_payload:
acl_conflicts = db.execute(
select(UserServiceAccess.service_id, User.username)
.join(User, User.id == UserServiceAccess.user_id)
.where(
UserServiceAccess.service_id.in_(all_rdp_ids_in_payload),
UserServiceAccess.user_id != user_id,
)
).all()
if acl_conflicts:
blocked = ", ".join(f'"{row.username}"' for row in acl_conflicts)
raise HTTPException(status_code=409, detail=f"RDP сервис уже назначен другому пользователю ({blocked}).")
for sid in service_ids:
if sid not in existing_map:
db.add(UserServiceAccess(user_id=user_id, service_id=sid))