#!/usr/bin/env python3 import hashlib import json import os import shutil import signal import sqlite3 import subprocess import tempfile import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer from urllib.parse import urlparse DISPLAY = os.environ.get("DISPLAY", ":1") CHROME_WINDOW_SIZE = os.environ.get("CHROME_WINDOW_SIZE", "1920,1080") RESOLUTION_MIN_WIDTH = int(os.environ.get("WEB_RESOLUTION_MIN_WIDTH", "1024")) RESOLUTION_MIN_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MIN_HEIGHT", "720")) RESOLUTION_MAX_WIDTH = int(os.environ.get("WEB_RESOLUTION_MAX_WIDTH", "3840")) RESOLUTION_MAX_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MAX_HEIGHT", "2160")) _state = { "proc": None, "mode": "idle", "target": "", "resolution": CHROME_WINDOW_SIZE, "profile_dir": None, } _lock = threading.Lock() def _chrome_encrypt_v10(plaintext: str) -> bytes: try: from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend except ImportError: return plaintext.encode("utf-8") key = hashlib.pbkdf2_hmac("sha1", b"peanuts", b"saltysalt", 1, dklen=16) iv = b" " * 16 data = plaintext.encode("utf-8") pad = 16 - (len(data) % 16) data += bytes([pad] * pad) cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) enc = cipher.encryptor() return b"v10" + enc.update(data) + enc.finalize() def _create_chrome_profile(login: str, password: str, url: str) -> str: profile_dir = tempfile.mkdtemp(prefix="chrome-profile-") default_dir = os.path.join(profile_dir, "Default") os.makedirs(default_dir, exist_ok=True) prefs = { "intl": {"accept_languages": "ru-RU,ru,en", "selected_languages": "ru-RU,ru"}, "translate": {"enabled": False}, "translate_blocked_languages": ["ru"], "credentials_enable_service": True, "credentials_enable_autosign_in": False, } with open(os.path.join(default_dir, "Preferences"), "w") as f: json.dump(prefs, f) if not login and not password: return profile_dir parsed = urlparse(url) origin = f"{parsed.scheme}://{parsed.netloc}/" db_path = os.path.join(default_dir, "Login Data") conn = sqlite3.connect(db_path) conn.execute( "CREATE TABLE IF NOT EXISTS meta " "(key LONGVARCHAR NOT NULL UNIQUE PRIMARY KEY, value LONGVARCHAR)" ) conn.execute("INSERT OR REPLACE INTO meta VALUES ('version', '30')") conn.execute("INSERT OR REPLACE INTO meta VALUES ('last_compatible_version', '30')") conn.execute(""" CREATE TABLE IF NOT EXISTS logins ( origin_url VARCHAR NOT NULL, action_url VARCHAR, username_element VARCHAR, username_value VARCHAR, password_element VARCHAR, password_value BLOB, submit_element VARCHAR, signon_realm VARCHAR NOT NULL, date_created INTEGER NOT NULL, blacklisted_by_user INTEGER NOT NULL, scheme INTEGER NOT NULL, password_type INTEGER DEFAULT 0, times_used INTEGER DEFAULT 0, form_data BLOB DEFAULT '', display_name VARCHAR DEFAULT '', icon_url VARCHAR DEFAULT '', federation_url VARCHAR DEFAULT '', skip_zero_click INTEGER DEFAULT 0, generation_upload_status INTEGER DEFAULT 0, possible_username_pairs BLOB DEFAULT '', id INTEGER PRIMARY KEY AUTOINCREMENT, date_last_used INTEGER DEFAULT 0, moving_blocked_for BLOB DEFAULT '', date_password_modified INTEGER DEFAULT 0 )""") enc_password = _chrome_encrypt_v10(password) if password else b"" now = int(time.time() * 1_000_000) conn.execute( "INSERT INTO logins " "(origin_url, action_url, username_element, username_value, " "password_element, password_value, submit_element, signon_realm, " "date_created, blacklisted_by_user, scheme, times_used, date_last_used) " "VALUES (?,?,?,?,?,?,?,?,?,0,0,1,?)", (origin, origin, "", login, "", enc_password, "", origin, now, now), ) conn.commit() conn.close() return profile_dir def _stop_current() -> None: proc = _state.get("proc") if proc: try: os.killpg(os.getpgid(proc.pid), signal.SIGTERM) proc.wait(timeout=4) except Exception: try: os.killpg(os.getpgid(proc.pid), signal.SIGKILL) except Exception: pass finally: _state["proc"] = None profile_dir = _state.get("profile_dir") if profile_dir and os.path.isdir(profile_dir): shutil.rmtree(profile_dir, ignore_errors=True) _state["profile_dir"] = None def _start_process(cmd: list[str], mode: str, target: str) -> None: _stop_current() logf = open("/tmp/session-app.log", "a", buffering=1) env = os.environ.copy() env["DISPLAY"] = DISPLAY proc = subprocess.Popen( # noqa: S603 cmd, stdout=logf, stderr=subprocess.STDOUT, env=env, start_new_session=True, ) _state["proc"] = proc _state["mode"] = mode _state["target"] = target def _sanitize_resolution(width: int | None, height: int | None) -> tuple[int, int]: if not width or not height: try: default_w, default_h = [int(x) for x in CHROME_WINDOW_SIZE.split(",", 1)] return default_w, default_h except Exception: return 1920, 1080 safe_w = max(RESOLUTION_MIN_WIDTH, min(int(width), RESOLUTION_MAX_WIDTH)) safe_h = max(RESOLUTION_MIN_HEIGHT, min(int(height), RESOLUTION_MAX_HEIGHT)) return safe_w, safe_h def _xrandr_output_name() -> str | None: try: out = subprocess.run( ["xrandr", "-display", DISPLAY], capture_output=True, text=True, check=False, ).stdout for line in out.splitlines(): if " connected" in line: return line.split()[0] except Exception: pass return None def _add_mode_via_cvt(width: int, height: int, output_name: str) -> bool: try: cvt = subprocess.run( ["cvt", str(width), str(height)], capture_output=True, text=True, check=False, ) if cvt.returncode != 0: return False modeline_line = next( (l for l in cvt.stdout.splitlines() if l.startswith("Modeline")), None ) if not modeline_line: return False parts = modeline_line.split() mode_name = parts[1].strip('"') mode_params = parts[2:] subprocess.run( ["xrandr", "-display", DISPLAY, "--newmode", mode_name] + mode_params, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) subprocess.run( ["xrandr", "-display", DISPLAY, "--addmode", output_name, mode_name], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) subprocess.run( ["xrandr", "-display", DISPLAY, "--output", output_name, "--mode", mode_name], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return True except Exception: return False def apply_resolution(width: int | None, height: int | None) -> tuple[int, int]: safe_w, safe_h = _sanitize_resolution(width, height) result = subprocess.run( ["xrandr", "-display", DISPLAY, "-s", f"{safe_w}x{safe_h}"], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) if result.returncode != 0: output_name = _xrandr_output_name() if output_name: _add_mode_via_cvt(safe_w, safe_h, output_name) _state["resolution"] = f"{safe_w},{safe_h}" return safe_w, safe_h def open_web( url: str, width: int | None = None, height: int | None = None, login: str = "", password: str = "", ) -> None: safe_w, safe_h = apply_resolution(width, height) profile_dir = _create_chrome_profile(login, password, url) cmd = [ "chromium", "--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--use-gl=swiftshader", "--kiosk", "--disable-translate", "--disable-features=TranslateUI,ExtensionsToolbarMenu", "--disable-pinch", "--overscroll-history-navigation=0", "--ignore-certificate-errors", "--allow-insecure-localhost", "--allow-running-insecure-content", f"--window-size={safe_w},{safe_h}", "--no-first-run", "--no-default-browser-check", "--lang=ru-RU", "--accept-lang=ru-RU,ru", "--password-store=basic", f"--user-data-dir={profile_dir}", url, ] _start_process(cmd, "web", url) _state["profile_dir"] = profile_dir def open_rdp(payload: dict) -> None: host = (payload.get("host") or "").strip() if not host: raise ValueError("host is required") port = str(payload.get("port") or "3389").strip() user = (payload.get("user") or "").strip() password = (payload.get("password") or "").strip() domain = (payload.get("domain") or "").strip() security = (payload.get("security") or "").strip().lower() cmd = [ "xfreerdp", f"/v:{host}:{port}", "/cert:ignore", "/f", "/dynamic-resolution", "/network:auto", "+clipboard", ] if security: cmd.append(f"/sec:{security}") if user: cmd.append(f"/u:{user}") if password: cmd.append(f"/p:{password}") if domain: cmd.append(f"/d:{domain}") safe_target = f"{host}:{port}" _start_process(cmd, "rdp", safe_target) class Handler(BaseHTTPRequestHandler): def _read_json(self): length = int(self.headers.get("Content-Length", "0")) if length <= 0: return {} raw = self.rfile.read(length) return json.loads(raw.decode("utf-8")) def _json(self, code: int, payload: dict): body = json.dumps(payload).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self): if self.path == "/health": proc = _state.get("proc") running = bool(proc and proc.poll() is None) self._json( 200, { "ok": True, "mode": _state.get("mode", "idle"), "running": running, "target": _state.get("target", ""), "resolution": _state.get("resolution", CHROME_WINDOW_SIZE), }, ) return self._json(404, {"detail": "Not found"}) def do_POST(self): try: data = self._read_json() if self.path == "/open": url = (data.get("url") or "").strip() if not (url.startswith("http://") or url.startswith("https://")): self._json(400, {"detail": "Invalid URL"}) return width = data.get("width") height = data.get("height") login = (data.get("login") or "").strip() password = (data.get("password") or "").strip() with _lock: open_web(url, width=width, height=height, login=login, password=password) self._json( 200, { "ok": True, "mode": "web", "target": url, "resolution": _state.get("resolution", CHROME_WINDOW_SIZE), }, ) return if self.path == "/resolution": width = data.get("width") height = data.get("height") with _lock: safe_w, safe_h = apply_resolution(width, height) self._json(200, {"ok": True, "width": safe_w, "height": safe_h}) return if self.path == "/rdp": with _lock: open_rdp(data) self._json(200, {"ok": True, "mode": "rdp"}) return self._json(404, {"detail": "Not found"}) except Exception as exc: self._json(500, {"detail": str(exc)}) def log_message(self, fmt, *args): return if __name__ == "__main__": server = HTTPServer(("0.0.0.0", 7000), Handler) server.serve_forever()