Files
Stend_mont/universal-runtime/manager.py
T

378 lines
13 KiB
Python

#!/usr/bin/env python3
import hashlib
import json
import os
import shutil
import signal
import sqlite3
import subprocess
import tempfile
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse
DISPLAY = os.environ.get("DISPLAY", ":1")
CHROME_WINDOW_SIZE = os.environ.get("CHROME_WINDOW_SIZE", "1920,1080")
RESOLUTION_MIN_WIDTH = int(os.environ.get("WEB_RESOLUTION_MIN_WIDTH", "1024"))
RESOLUTION_MIN_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MIN_HEIGHT", "720"))
RESOLUTION_MAX_WIDTH = int(os.environ.get("WEB_RESOLUTION_MAX_WIDTH", "3840"))
RESOLUTION_MAX_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MAX_HEIGHT", "2160"))
_state = {
"proc": None,
"mode": "idle",
"target": "",
"resolution": CHROME_WINDOW_SIZE,
"profile_dir": None,
}
_lock = threading.Lock()
def _chrome_encrypt_v10(plaintext: str) -> bytes:
try:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
except ImportError:
return plaintext.encode("utf-8")
key = hashlib.pbkdf2_hmac("sha1", b"peanuts", b"saltysalt", 1, dklen=16)
iv = b" " * 16
data = plaintext.encode("utf-8")
pad = 16 - (len(data) % 16)
data += bytes([pad] * pad)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
enc = cipher.encryptor()
return b"v10" + enc.update(data) + enc.finalize()
def _create_chrome_profile(login: str, password: str, url: str) -> str:
profile_dir = tempfile.mkdtemp(prefix="chrome-profile-")
default_dir = os.path.join(profile_dir, "Default")
os.makedirs(default_dir, exist_ok=True)
prefs = {
"intl": {"accept_languages": "ru-RU,ru,en", "selected_languages": "ru-RU,ru"},
"translate": {"enabled": False},
"translate_blocked_languages": ["ru"],
"credentials_enable_service": True,
"credentials_enable_autosign_in": False,
}
with open(os.path.join(default_dir, "Preferences"), "w") as f:
json.dump(prefs, f)
if not login and not password:
return profile_dir
parsed = urlparse(url)
origin = f"{parsed.scheme}://{parsed.netloc}/"
db_path = os.path.join(default_dir, "Login Data")
conn = sqlite3.connect(db_path)
conn.execute(
"CREATE TABLE IF NOT EXISTS meta "
"(key LONGVARCHAR NOT NULL UNIQUE PRIMARY KEY, value LONGVARCHAR)"
)
conn.execute("INSERT OR REPLACE INTO meta VALUES ('version', '30')")
conn.execute("INSERT OR REPLACE INTO meta VALUES ('last_compatible_version', '30')")
conn.execute("""
CREATE TABLE IF NOT EXISTS logins (
origin_url VARCHAR NOT NULL,
action_url VARCHAR,
username_element VARCHAR,
username_value VARCHAR,
password_element VARCHAR,
password_value BLOB,
submit_element VARCHAR,
signon_realm VARCHAR NOT NULL,
date_created INTEGER NOT NULL,
blacklisted_by_user INTEGER NOT NULL,
scheme INTEGER NOT NULL,
password_type INTEGER DEFAULT 0,
times_used INTEGER DEFAULT 0,
form_data BLOB DEFAULT '',
display_name VARCHAR DEFAULT '',
icon_url VARCHAR DEFAULT '',
federation_url VARCHAR DEFAULT '',
skip_zero_click INTEGER DEFAULT 0,
generation_upload_status INTEGER DEFAULT 0,
possible_username_pairs BLOB DEFAULT '',
id INTEGER PRIMARY KEY AUTOINCREMENT,
date_last_used INTEGER DEFAULT 0,
moving_blocked_for BLOB DEFAULT '',
date_password_modified INTEGER DEFAULT 0
)""")
enc_password = _chrome_encrypt_v10(password) if password else b""
now = int(time.time() * 1_000_000)
conn.execute(
"INSERT INTO logins "
"(origin_url, action_url, username_element, username_value, "
"password_element, password_value, submit_element, signon_realm, "
"date_created, blacklisted_by_user, scheme, times_used, date_last_used) "
"VALUES (?,?,?,?,?,?,?,?,?,0,0,1,?)",
(origin, origin, "", login, "", enc_password, "", origin, now, now),
)
conn.commit()
conn.close()
return profile_dir
def _stop_current() -> None:
proc = _state.get("proc")
if proc:
try:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
proc.wait(timeout=4)
except Exception:
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except Exception:
pass
finally:
_state["proc"] = None
profile_dir = _state.get("profile_dir")
if profile_dir and os.path.isdir(profile_dir):
shutil.rmtree(profile_dir, ignore_errors=True)
_state["profile_dir"] = None
def _start_process(cmd: list[str], mode: str, target: str) -> None:
_stop_current()
logf = open("/tmp/session-app.log", "a", buffering=1)
env = os.environ.copy()
env["DISPLAY"] = DISPLAY
proc = subprocess.Popen( # noqa: S603
cmd,
stdout=logf,
stderr=subprocess.STDOUT,
env=env,
start_new_session=True,
)
_state["proc"] = proc
_state["mode"] = mode
_state["target"] = target
def _sanitize_resolution(width: int | None, height: int | None) -> tuple[int, int]:
if not width or not height:
try:
default_w, default_h = [int(x) for x in CHROME_WINDOW_SIZE.split(",", 1)]
return default_w, default_h
except Exception:
return 1920, 1080
safe_w = max(RESOLUTION_MIN_WIDTH, min(int(width), RESOLUTION_MAX_WIDTH))
safe_h = max(RESOLUTION_MIN_HEIGHT, min(int(height), RESOLUTION_MAX_HEIGHT))
return safe_w, safe_h
def _xrandr_output_name() -> str | None:
try:
out = subprocess.run(
["xrandr", "-display", DISPLAY],
capture_output=True, text=True, check=False,
).stdout
for line in out.splitlines():
if " connected" in line:
return line.split()[0]
except Exception:
pass
return None
def _add_mode_via_cvt(width: int, height: int, output_name: str) -> bool:
try:
cvt = subprocess.run(
["cvt", str(width), str(height)],
capture_output=True, text=True, check=False,
)
if cvt.returncode != 0:
return False
modeline_line = next(
(l for l in cvt.stdout.splitlines() if l.startswith("Modeline")), None
)
if not modeline_line:
return False
parts = modeline_line.split()
mode_name = parts[1].strip('"')
mode_params = parts[2:]
subprocess.run(
["xrandr", "-display", DISPLAY, "--newmode", mode_name] + mode_params,
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
subprocess.run(
["xrandr", "-display", DISPLAY, "--addmode", output_name, mode_name],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
subprocess.run(
["xrandr", "-display", DISPLAY, "--output", output_name, "--mode", mode_name],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
return True
except Exception:
return False
def apply_resolution(width: int | None, height: int | None) -> tuple[int, int]:
safe_w, safe_h = _sanitize_resolution(width, height)
result = subprocess.run(
["xrandr", "-display", DISPLAY, "-s", f"{safe_w}x{safe_h}"],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
if result.returncode != 0:
output_name = _xrandr_output_name()
if output_name:
_add_mode_via_cvt(safe_w, safe_h, output_name)
_state["resolution"] = f"{safe_w},{safe_h}"
return safe_w, safe_h
def open_web(
url: str,
width: int | None = None,
height: int | None = None,
login: str = "",
password: str = "",
) -> None:
safe_w, safe_h = apply_resolution(width, height)
profile_dir = _create_chrome_profile(login, password, url)
_state["profile_dir"] = profile_dir
cmd = [
"chromium",
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--use-gl=swiftshader",
"--kiosk",
"--disable-translate",
"--disable-features=TranslateUI,ExtensionsToolbarMenu",
"--disable-pinch",
"--overscroll-history-navigation=0",
"--ignore-certificate-errors",
"--allow-insecure-localhost",
"--allow-running-insecure-content",
f"--window-size={safe_w},{safe_h}",
"--no-first-run",
"--no-default-browser-check",
"--lang=ru-RU",
"--accept-lang=ru-RU,ru",
"--password-store=basic",
f"--user-data-dir={profile_dir}",
url,
]
_start_process(cmd, "web", url)
def open_rdp(payload: dict) -> None:
host = (payload.get("host") or "").strip()
if not host:
raise ValueError("host is required")
port = str(payload.get("port") or "3389").strip()
user = (payload.get("user") or "").strip()
password = (payload.get("password") or "").strip()
domain = (payload.get("domain") or "").strip()
security = (payload.get("security") or "").strip().lower()
cmd = [
"xfreerdp",
f"/v:{host}:{port}",
"/cert:ignore",
"/f",
"/dynamic-resolution",
"/network:auto",
"+clipboard",
]
if security:
cmd.append(f"/sec:{security}")
if user:
cmd.append(f"/u:{user}")
if password:
cmd.append(f"/p:{password}")
if domain:
cmd.append(f"/d:{domain}")
safe_target = f"{host}:{port}"
_start_process(cmd, "rdp", safe_target)
class Handler(BaseHTTPRequestHandler):
def _read_json(self):
length = int(self.headers.get("Content-Length", "0"))
if length <= 0:
return {}
raw = self.rfile.read(length)
return json.loads(raw.decode("utf-8"))
def _json(self, code: int, payload: dict):
body = json.dumps(payload).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self):
if self.path == "/health":
proc = _state.get("proc")
running = bool(proc and proc.poll() is None)
self._json(
200,
{
"ok": True,
"mode": _state.get("mode", "idle"),
"running": running,
"target": _state.get("target", ""),
"resolution": _state.get("resolution", CHROME_WINDOW_SIZE),
},
)
return
self._json(404, {"detail": "Not found"})
def do_POST(self):
try:
data = self._read_json()
if self.path == "/open":
url = (data.get("url") or "").strip()
if not (url.startswith("http://") or url.startswith("https://")):
self._json(400, {"detail": "Invalid URL"})
return
width = data.get("width")
height = data.get("height")
login = (data.get("login") or "").strip()
password = (data.get("password") or "").strip()
with _lock:
open_web(url, width=width, height=height, login=login, password=password)
self._json(
200,
{
"ok": True,
"mode": "web",
"target": url,
"resolution": _state.get("resolution", CHROME_WINDOW_SIZE),
},
)
return
if self.path == "/resolution":
width = data.get("width")
height = data.get("height")
with _lock:
safe_w, safe_h = apply_resolution(width, height)
self._json(200, {"ok": True, "width": safe_w, "height": safe_h})
return
if self.path == "/rdp":
with _lock:
open_rdp(data)
self._json(200, {"ok": True, "mode": "rdp"})
return
self._json(404, {"detail": "Not found"})
except Exception as exc:
self._json(500, {"detail": str(exc)})
def log_message(self, fmt, *args):
return
if __name__ == "__main__":
server = HTTPServer(("0.0.0.0", 7000), Handler)
server.serve_forever()