#!/usr/bin/env python3 import json import os import shutil import signal import subprocess import tempfile import threading from http.server import BaseHTTPRequestHandler, HTTPServer from urllib.parse import urlparse, urlunparse, quote DISPLAY = os.environ.get("DISPLAY", ":1") CHROME_WINDOW_SIZE = os.environ.get("CHROME_WINDOW_SIZE", "1920,1080") RESOLUTION_MIN_WIDTH = int(os.environ.get("WEB_RESOLUTION_MIN_WIDTH", "1024")) RESOLUTION_MIN_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MIN_HEIGHT", "720")) RESOLUTION_MAX_WIDTH = int(os.environ.get("WEB_RESOLUTION_MAX_WIDTH", "3840")) RESOLUTION_MAX_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MAX_HEIGHT", "2160")) _state = { "proc": None, "mode": "idle", "target": "", "resolution": CHROME_WINDOW_SIZE, "profile_dir": None, "extension_dir": None, } _lock = threading.Lock() _AUTOFILL_CONTENT_JS = r""" (function() { const CREDS = __CREDS__; console.log('[PortalAutofill] loaded for', location.href); function isVisible(el) { if (!el) return false; if (el.disabled || el.readOnly) return false; if (el.offsetParent === null && el.type !== 'email') return false; return true; } function findUserFieldByAttrs() { const candidates = document.querySelectorAll( 'input[type="email"], ' + 'input[autocomplete*="username"], ' + 'input[autocomplete*="email"], ' + 'input[name*="user" i]:not([type="password"]):not([type="hidden"]), ' + 'input[name*="login" i]:not([type="password"]):not([type="hidden"]), ' + 'input[name*="email" i]:not([type="password"]):not([type="hidden"]), ' + 'input[id*="user" i]:not([type="password"]):not([type="hidden"]), ' + 'input[id*="login" i]:not([type="password"]):not([type="hidden"]), ' + 'input[id*="email" i]:not([type="password"]):not([type="hidden"])' ); for (const el of candidates) { if (isVisible(el)) return el; } return null; } function findUserFieldNearPassword(passEl) { if (!passEl) return null; // Walk up to find the closest form-like container, then look for // any text/email input that comes BEFORE the password element. let container = passEl.closest('form'); if (!container) { let cur = passEl.parentElement; while (cur && cur !== document.body) { if (cur.querySelectorAll('input').length >= 2) { container = cur; break; } cur = cur.parentElement; } } if (!container) container = document.body; const inputs = container.querySelectorAll('input'); let candidate = null; for (const el of inputs) { if (el === passEl) break; const t = (el.type || 'text').toLowerCase(); if (t === 'password' || t === 'hidden' || t === 'submit' || t === 'button' || t === 'checkbox' || t === 'radio' || t === 'file') continue; if (!isVisible(el)) continue; candidate = el; } return candidate; } function findUserField(passEl) { return findUserFieldByAttrs() || findUserFieldNearPassword(passEl) || Array.from(document.querySelectorAll('input[type="text"], input:not([type])')) .find(isVisible) || null; } function findPassField() { const list = document.querySelectorAll('input[type="password"]'); for (const el of list) { if (isVisible(el)) return el; } return list[0] || null; } function setNativeValue(el, v) { if (!el) return false; if (el.value === v) return true; el.dispatchEvent(new FocusEvent('focus', { bubbles: true })); const proto = Object.getPrototypeOf(el); const desc = Object.getOwnPropertyDescriptor(proto, 'value') || Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value'); if (desc && desc.set) desc.set.call(el, v); else el.value = v; el.dispatchEvent(new InputEvent('input', { bubbles: true, data: v, inputType: 'insertText' })); el.dispatchEvent(new Event('change', { bubbles: true })); el.dispatchEvent(new KeyboardEvent('keyup', { bubbles: true, key: v.slice(-1) })); el.dispatchEvent(new FocusEvent('blur', { bubbles: true })); return true; } function tryFill() { const p = findPassField(); const u = findUserField(p); // Fill login FIRST so password-triggered re-render doesn't clear it if (CREDS.login && u) { if (setNativeValue(u, CREDS.login)) { console.log('[PortalAutofill] user filled'); } } if (CREDS.password && p) { if (setNativeValue(p, CREDS.password)) { console.log('[PortalAutofill] password filled'); } } } // Watch for SPA re-renders clearing the fields and re-fill continuously let _filling = false; function scheduleFill() { if (_filling) return; _filling = true; requestAnimationFrame(() => { tryFill(); _filling = false; }); } if (document.readyState === 'loading') { document.addEventListener('DOMContentLoaded', tryFill); } else { tryFill(); } // Periodic check for first 30s in case of async SPA resets let _checks = 0; const _interval = setInterval(() => { tryFill(); if (++_checks >= 30) clearInterval(_interval); }, 1000); const obs = new MutationObserver(scheduleFill); if (document.documentElement) { obs.observe(document.documentElement, { childList: true, subtree: true }); } const resetAndRefill = () => { _checks = 0; setTimeout(tryFill, 150); setTimeout(tryFill, 600); }; ['pushState', 'replaceState'].forEach(fn => { const orig = history[fn]; history[fn] = function() { const r = orig.apply(this, arguments); resetAndRefill(); return r; }; }); window.addEventListener('popstate', resetAndRefill); })(); """ _AUTOFILL_MANIFEST = { "manifest_version": 3, "name": "Portal Autofill", "version": "1.0", "description": "Auto-fill credentials for portal session", "background": {"service_worker": "background.js"}, "permissions": ["webRequest"], "host_permissions": [""], "content_scripts": [ { "matches": [""], "js": ["content.js"], "run_at": "document_idle", "all_frames": True, } ], } _AUTOFILL_BACKGROUND_JS = r""" const CREDS = __CREDS__; if (CREDS.login || CREDS.password) { chrome.webRequest.onAuthRequired.addListener( function(details, callback) { callback({ authCredentials: { username: CREDS.login || '', password: CREDS.password || '' } }); }, { urls: [''] }, ['asyncBlocking'] ); } """ def _create_autofill_extension(login: str, password: str) -> str | None: if not login and not password: return None ext_dir = tempfile.mkdtemp(prefix="chrome-autofill-ext-") creds_json = json.dumps({"login": login, "password": password}) content_js = _AUTOFILL_CONTENT_JS.replace("__CREDS__", creds_json) background_js = _AUTOFILL_BACKGROUND_JS.replace("__CREDS__", creds_json) with open(os.path.join(ext_dir, "manifest.json"), "w") as f: json.dump(_AUTOFILL_MANIFEST, f) with open(os.path.join(ext_dir, "content.js"), "w") as f: f.write(content_js) with open(os.path.join(ext_dir, "background.js"), "w") as f: f.write(background_js) return ext_dir def _create_chrome_profile() -> str: profile_dir = tempfile.mkdtemp(prefix="chrome-profile-") default_dir = os.path.join(profile_dir, "Default") os.makedirs(default_dir, exist_ok=True) prefs = { "intl": {"accept_languages": "ru-RU,ru,en", "selected_languages": "ru-RU,ru"}, "translate": {"enabled": False}, "translate_blocked_languages": ["ru"], } with open(os.path.join(default_dir, "Preferences"), "w") as f: json.dump(prefs, f) return profile_dir def _stop_current() -> None: proc = _state.get("proc") if proc: try: os.killpg(os.getpgid(proc.pid), signal.SIGTERM) proc.wait(timeout=4) except Exception: try: os.killpg(os.getpgid(proc.pid), signal.SIGKILL) except Exception: pass finally: _state["proc"] = None for key in ("profile_dir", "extension_dir"): path = _state.get(key) if path and os.path.isdir(path): shutil.rmtree(path, ignore_errors=True) _state[key] = None def _start_process(cmd: list[str], mode: str, target: str) -> None: _stop_current() logf = open("/tmp/session-app.log", "a", buffering=1) env = os.environ.copy() env["DISPLAY"] = DISPLAY proc = subprocess.Popen( # noqa: S603 cmd, stdout=logf, stderr=subprocess.STDOUT, env=env, start_new_session=True, ) _state["proc"] = proc _state["mode"] = mode _state["target"] = target def _sanitize_resolution(width: int | None, height: int | None) -> tuple[int, int]: if not width or not height: try: default_w, default_h = [int(x) for x in CHROME_WINDOW_SIZE.split(",", 1)] return default_w, default_h except Exception: return 1920, 1080 safe_w = max(RESOLUTION_MIN_WIDTH, min(int(width), RESOLUTION_MAX_WIDTH)) safe_h = max(RESOLUTION_MIN_HEIGHT, min(int(height), RESOLUTION_MAX_HEIGHT)) return safe_w, safe_h def _xrandr_output_name() -> str | None: try: out = subprocess.run( ["xrandr", "-display", DISPLAY], capture_output=True, text=True, check=False, ).stdout for line in out.splitlines(): if " connected" in line: return line.split()[0] except Exception: pass return None def _add_mode_via_cvt(width: int, height: int, output_name: str) -> bool: try: cvt = subprocess.run( ["cvt", str(width), str(height)], capture_output=True, text=True, check=False, ) if cvt.returncode != 0: return False modeline_line = next( (l for l in cvt.stdout.splitlines() if l.startswith("Modeline")), None ) if not modeline_line: return False parts = modeline_line.split() mode_name = parts[1].strip('"') mode_params = parts[2:] subprocess.run( ["xrandr", "-display", DISPLAY, "--newmode", mode_name] + mode_params, check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) subprocess.run( ["xrandr", "-display", DISPLAY, "--addmode", output_name, mode_name], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) subprocess.run( ["xrandr", "-display", DISPLAY, "--output", output_name, "--mode", mode_name], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return True except Exception: return False def apply_resolution(width: int | None, height: int | None) -> tuple[int, int]: safe_w, safe_h = _sanitize_resolution(width, height) result = subprocess.run( ["xrandr", "-display", DISPLAY, "-s", f"{safe_w}x{safe_h}"], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) if result.returncode != 0: output_name = _xrandr_output_name() if output_name: _add_mode_via_cvt(safe_w, safe_h, output_name) _state["resolution"] = f"{safe_w},{safe_h}" return safe_w, safe_h def _url_with_credentials(url: str, login: str, password: str) -> str: """Embed login:password into URL so Chromium auto-handles Basic Auth.""" if not login and not password: return url parsed = urlparse(url) netloc = parsed.hostname or "" if parsed.port: netloc += f":{parsed.port}" user_info = quote(login, safe="") + ":" + quote(password, safe="") return urlunparse(parsed._replace(netloc=f"{user_info}@{netloc}")) def open_web( url: str, width: int | None = None, height: int | None = None, login: str = "", password: str = "", ) -> None: safe_w, safe_h = apply_resolution(width, height) profile_dir = _create_chrome_profile() extension_dir = _create_autofill_extension(login, password) url_with_creds = url # credentials in URL break SPA fetch; extension handles auth # Use the real Chromium binary directly to avoid the Debian wrapper which # injects an empty `--load-extension=` from /etc/chromium.d/extensions. chromium_bin = "/usr/lib/chromium/chromium" if not os.path.isfile(chromium_bin): chromium_bin = "chromium" cmd = [ chromium_bin, "--no-sandbox", "--disable-dev-shm-usage", "--disable-gpu", "--use-gl=swiftshader", "--kiosk", "--disable-translate", "--disable-features=TranslateUI,ExtensionsToolbarMenu", "--disable-pinch", "--overscroll-history-navigation=0", "--ignore-certificate-errors", "--allow-insecure-localhost", "--allow-running-insecure-content", f"--window-size={safe_w},{safe_h}", "--no-first-run", "--no-default-browser-check", "--enable-logging=stderr", "--v=0", "--lang=ru-RU", "--accept-lang=ru-RU,ru", "--password-store=basic", f"--user-data-dir={profile_dir}", ] if extension_dir: cmd.append(f"--load-extension={extension_dir}") cmd.append(f"--disable-extensions-except={extension_dir}") cmd.append(url_with_creds) _start_process(cmd, "web", url) _state["profile_dir"] = profile_dir _state["extension_dir"] = extension_dir def open_rdp(payload: dict) -> None: host = (payload.get("host") or "").strip() if not host: raise ValueError("host is required") port = str(payload.get("port") or "3389").strip() user = (payload.get("user") or "").strip() password = (payload.get("password") or "").strip() domain = (payload.get("domain") or "").strip() security = (payload.get("security") or "").strip().lower() cmd = [ "xfreerdp", f"/v:{host}:{port}", "/cert:ignore", "/f", "/dynamic-resolution", "/network:auto", "+clipboard", ] if security: cmd.append(f"/sec:{security}") if user: cmd.append(f"/u:{user}") if password: cmd.append(f"/p:{password}") if domain: cmd.append(f"/d:{domain}") safe_target = f"{host}:{port}" _start_process(cmd, "rdp", safe_target) class Handler(BaseHTTPRequestHandler): def _read_json(self): length = int(self.headers.get("Content-Length", "0")) if length <= 0: return {} raw = self.rfile.read(length) return json.loads(raw.decode("utf-8")) def _json(self, code: int, payload: dict): body = json.dumps(payload).encode("utf-8") self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self): if self.path == "/health": proc = _state.get("proc") running = bool(proc and proc.poll() is None) self._json( 200, { "ok": True, "mode": _state.get("mode", "idle"), "running": running, "target": _state.get("target", ""), "resolution": _state.get("resolution", CHROME_WINDOW_SIZE), }, ) return self._json(404, {"detail": "Not found"}) def do_POST(self): try: data = self._read_json() if self.path == "/open": url = (data.get("url") or "").strip() if not (url.startswith("http://") or url.startswith("https://")): self._json(400, {"detail": "Invalid URL"}) return width = data.get("width") height = data.get("height") login = (data.get("login") or "").strip() password = (data.get("password") or "").strip() with _lock: open_web(url, width=width, height=height, login=login, password=password) self._json( 200, { "ok": True, "mode": "web", "target": url, "resolution": _state.get("resolution", CHROME_WINDOW_SIZE), }, ) return if self.path == "/resolution": width = data.get("width") height = data.get("height") with _lock: safe_w, safe_h = apply_resolution(width, height) self._json(200, {"ok": True, "width": safe_w, "height": safe_h}) return if self.path == "/rdp": with _lock: open_rdp(data) self._json(200, {"ok": True, "mode": "rdp"}) return self._json(404, {"detail": "Not found"}) except Exception as exc: self._json(500, {"detail": str(exc)}) def log_message(self, fmt, *args): return if __name__ == "__main__": server = HTTPServer(("0.0.0.0", 7000), Handler) server.serve_forever()