419b495020
- RDP сервис может быть назначен только одному пользователю в ACL - Мобильная заглушка на dashboard при ширине < 1024px - rdp-proxy: кнопки навигации, спиннер Ожидайте, реконнект - session_wait_page: тёмная тема, CSS спиннер - kiosk/universal-runtime manager.py: xrandr + cvt --newmode для resolution - Dockerfiles: x11-xserver-utils, x11-utils
150 lines
5.4 KiB
Python
150 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import urllib.parse
|
|
import urllib.request
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
|
|
DISPLAY = os.environ.get("DISPLAY", ":1")
|
|
CHROME_WINDOW_SIZE = os.environ.get("CHROME_WINDOW_SIZE", "1920,1080")
|
|
RESOLUTION_MIN_WIDTH = int(os.environ.get("WEB_RESOLUTION_MIN_WIDTH", "1024"))
|
|
RESOLUTION_MIN_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MIN_HEIGHT", "720"))
|
|
RESOLUTION_MAX_WIDTH = int(os.environ.get("WEB_RESOLUTION_MAX_WIDTH", "3840"))
|
|
RESOLUTION_MAX_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MAX_HEIGHT", "2160"))
|
|
|
|
|
|
def _json_get(path: str):
|
|
with urllib.request.urlopen(f"http://127.0.0.1:9222{path}", timeout=2) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def _json_put(path: str):
|
|
req = urllib.request.Request(f"http://127.0.0.1:9222{path}", method="PUT")
|
|
with urllib.request.urlopen(req, timeout=2) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
|
|
|
|
def chromium_open(url: str) -> None:
|
|
encoded = urllib.parse.quote(url, safe=':/?#[]@!$&\'()*+,;=%')
|
|
opened = _json_put(f"/json/new?{encoded}")
|
|
opened_id = opened.get("id")
|
|
pages = _json_get("/json/list")
|
|
for page in pages:
|
|
page_id = page.get("id")
|
|
if page_id and page_id != opened_id:
|
|
try:
|
|
_json_put(f"/json/close/{page_id}")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def _sanitize_resolution(width, height):
|
|
if not width or not height:
|
|
try:
|
|
default_w, default_h = [int(x) for x in CHROME_WINDOW_SIZE.split(",", 1)]
|
|
return default_w, default_h
|
|
except Exception:
|
|
return 1920, 1080
|
|
safe_w = max(RESOLUTION_MIN_WIDTH, min(int(width), RESOLUTION_MAX_WIDTH))
|
|
safe_h = max(RESOLUTION_MIN_HEIGHT, min(int(height), RESOLUTION_MAX_HEIGHT))
|
|
return safe_w, safe_h
|
|
|
|
|
|
def _xrandr_output_name():
|
|
try:
|
|
out = subprocess.run(
|
|
["xrandr", "-display", DISPLAY],
|
|
capture_output=True, text=True, check=False,
|
|
).stdout
|
|
for line in out.splitlines():
|
|
if " connected" in line:
|
|
return line.split()[0]
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _add_mode_via_cvt(width: int, height: int, output_name: str) -> bool:
|
|
try:
|
|
cvt = subprocess.run(
|
|
["cvt", str(width), str(height)],
|
|
capture_output=True, text=True, check=False,
|
|
)
|
|
if cvt.returncode != 0:
|
|
return False
|
|
modeline_line = next((l for l in cvt.stdout.splitlines() if l.startswith("Modeline")), None)
|
|
if not modeline_line:
|
|
return False
|
|
parts = modeline_line.split()
|
|
mode_name = parts[1].strip('"')
|
|
mode_params = parts[2:]
|
|
subprocess.run(["xrandr", "-display", DISPLAY, "--newmode", mode_name] + mode_params,
|
|
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
subprocess.run(["xrandr", "-display", DISPLAY, "--addmode", output_name, mode_name],
|
|
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
subprocess.run(["xrandr", "-display", DISPLAY, "--output", output_name, "--mode", mode_name],
|
|
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def apply_resolution(width, height) -> tuple:
|
|
safe_w, safe_h = _sanitize_resolution(width, height)
|
|
result = subprocess.run(
|
|
["xrandr", "-display", DISPLAY, "-s", f"{safe_w}x{safe_h}"],
|
|
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
)
|
|
if result.returncode != 0:
|
|
output_name = _xrandr_output_name()
|
|
if output_name:
|
|
_add_mode_via_cvt(safe_w, safe_h, output_name)
|
|
return safe_w, safe_h
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def _json(self, code: int, payload: dict):
|
|
body = json.dumps(payload).encode("utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def do_GET(self):
|
|
if self.path == "/health":
|
|
self._json(200, {"ok": True})
|
|
return
|
|
self._json(404, {"detail": "Not found"})
|
|
|
|
def do_POST(self):
|
|
if self.path != "/open":
|
|
self._json(404, {"detail": "Not found"})
|
|
return
|
|
try:
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
raw = self.rfile.read(length)
|
|
data = json.loads(raw.decode("utf-8")) if raw else {}
|
|
url = (data.get("url") or "").strip()
|
|
if not url.startswith("http://") and not url.startswith("https://"):
|
|
self._json(400, {"detail": "Invalid URL"})
|
|
return
|
|
width = data.get("width")
|
|
height = data.get("height")
|
|
apply_resolution(width, height)
|
|
chromium_open(url)
|
|
print(f"open_ok url={url}", flush=True)
|
|
self._json(200, {"ok": True, "url": url})
|
|
except Exception as exc:
|
|
print(f"open_fail err={exc}", flush=True)
|
|
self._json(500, {"detail": str(exc)})
|
|
|
|
def log_message(self, format, *args):
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
server = HTTPServer(("0.0.0.0", 7000), Handler)
|
|
server.serve_forever()
|