Files
Stend_mont/universal-runtime/manager.py
T

515 lines
17 KiB
Python

#!/usr/bin/env python3
import json
import os
import shutil
import signal
import subprocess
import tempfile
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse, urlunparse, quote
DISPLAY = os.environ.get("DISPLAY", ":1")
CHROME_WINDOW_SIZE = os.environ.get("CHROME_WINDOW_SIZE", "1920,1080")
RESOLUTION_MIN_WIDTH = int(os.environ.get("WEB_RESOLUTION_MIN_WIDTH", "1024"))
RESOLUTION_MIN_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MIN_HEIGHT", "720"))
RESOLUTION_MAX_WIDTH = int(os.environ.get("WEB_RESOLUTION_MAX_WIDTH", "3840"))
RESOLUTION_MAX_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MAX_HEIGHT", "2160"))
_state = {
"proc": None,
"mode": "idle",
"target": "",
"resolution": CHROME_WINDOW_SIZE,
"profile_dir": None,
"extension_dir": None,
}
_lock = threading.Lock()
_AUTOFILL_CONTENT_JS = r"""
(function() {
const CREDS = __CREDS__;
console.log('[PortalAutofill] loaded for', location.href);
function isVisible(el) {
if (!el) return false;
if (el.disabled || el.readOnly) return false;
if (el.offsetParent === null && el.type !== 'email') return false;
return true;
}
function findUserFieldByAttrs() {
const candidates = document.querySelectorAll(
'input[type="email"], ' +
'input[autocomplete*="username"], ' +
'input[autocomplete*="email"], ' +
'input[name*="user" i]:not([type="password"]):not([type="hidden"]), ' +
'input[name*="login" i]:not([type="password"]):not([type="hidden"]), ' +
'input[name*="email" i]:not([type="password"]):not([type="hidden"]), ' +
'input[id*="user" i]:not([type="password"]):not([type="hidden"]), ' +
'input[id*="login" i]:not([type="password"]):not([type="hidden"]), ' +
'input[id*="email" i]:not([type="password"]):not([type="hidden"])'
);
for (const el of candidates) {
if (isVisible(el)) return el;
}
return null;
}
function findUserFieldNearPassword(passEl) {
if (!passEl) return null;
// Walk up to find the closest form-like container, then look for
// any text/email input that comes BEFORE the password element.
let container = passEl.closest('form');
if (!container) {
let cur = passEl.parentElement;
while (cur && cur !== document.body) {
if (cur.querySelectorAll('input').length >= 2) { container = cur; break; }
cur = cur.parentElement;
}
}
if (!container) container = document.body;
const inputs = container.querySelectorAll('input');
let candidate = null;
for (const el of inputs) {
if (el === passEl) break;
const t = (el.type || 'text').toLowerCase();
if (t === 'password' || t === 'hidden' || t === 'submit' || t === 'button' ||
t === 'checkbox' || t === 'radio' || t === 'file') continue;
if (!isVisible(el)) continue;
candidate = el;
}
return candidate;
}
function findUserField(passEl) {
return findUserFieldByAttrs() || findUserFieldNearPassword(passEl) ||
Array.from(document.querySelectorAll('input[type="text"], input:not([type])'))
.find(isVisible) || null;
}
function findPassField() {
const list = document.querySelectorAll('input[type="password"]');
for (const el of list) {
if (isVisible(el)) return el;
}
return list[0] || null;
}
function setNativeValue(el, v) {
if (!el) return false;
if (el.value === v) return true;
const proto = Object.getPrototypeOf(el);
const desc = Object.getOwnPropertyDescriptor(proto, 'value') ||
Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value');
if (desc && desc.set) desc.set.call(el, v); else el.value = v;
el.dispatchEvent(new Event('input', { bubbles: true }));
el.dispatchEvent(new Event('change', { bubbles: true }));
return true;
}
function tryFill() {
const p = findPassField();
const u = findUserField(p);
// Fill login FIRST so password-triggered re-render doesn't clear it
if (CREDS.login && u) {
if (setNativeValue(u, CREDS.login)) {
console.log('[PortalAutofill] user filled');
}
}
if (CREDS.password && p) {
if (setNativeValue(p, CREDS.password)) {
console.log('[PortalAutofill] password filled');
}
}
}
// Watch for SPA re-renders clearing the fields and re-fill continuously
let _filling = false;
function scheduleFill() {
if (_filling) return;
_filling = true;
requestAnimationFrame(() => { tryFill(); _filling = false; });
}
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', tryFill);
} else {
tryFill();
}
// Periodic check for first 30s in case of async SPA resets
let _checks = 0;
const _interval = setInterval(() => {
tryFill();
if (++_checks >= 30) clearInterval(_interval);
}, 1000);
const obs = new MutationObserver(scheduleFill);
if (document.documentElement) {
obs.observe(document.documentElement, { childList: true, subtree: true });
}
const resetAndRefill = () => {
_checks = 0;
setTimeout(tryFill, 150);
setTimeout(tryFill, 600);
};
['pushState', 'replaceState'].forEach(fn => {
const orig = history[fn];
history[fn] = function() { const r = orig.apply(this, arguments); resetAndRefill(); return r; };
});
window.addEventListener('popstate', resetAndRefill);
})();
"""
_AUTOFILL_MANIFEST = {
"manifest_version": 3,
"name": "Portal Autofill",
"version": "1.0",
"description": "Auto-fill credentials for portal session",
"background": {"service_worker": "background.js"},
"permissions": ["webRequest"],
"host_permissions": ["<all_urls>"],
"content_scripts": [
{
"matches": ["<all_urls>"],
"js": ["content.js"],
"run_at": "document_idle",
"all_frames": True,
}
],
}
_AUTOFILL_BACKGROUND_JS = r"""
const CREDS = __CREDS__;
if (CREDS.login || CREDS.password) {
chrome.webRequest.onAuthRequired.addListener(
function(details, callback) {
callback({ authCredentials: { username: CREDS.login || '', password: CREDS.password || '' } });
},
{ urls: ['<all_urls>'] },
['asyncBlocking']
);
}
"""
def _create_autofill_extension(login: str, password: str) -> str | None:
if not login and not password:
return None
ext_dir = tempfile.mkdtemp(prefix="chrome-autofill-ext-")
creds_json = json.dumps({"login": login, "password": password})
content_js = _AUTOFILL_CONTENT_JS.replace("__CREDS__", creds_json)
background_js = _AUTOFILL_BACKGROUND_JS.replace("__CREDS__", creds_json)
with open(os.path.join(ext_dir, "manifest.json"), "w") as f:
json.dump(_AUTOFILL_MANIFEST, f)
with open(os.path.join(ext_dir, "content.js"), "w") as f:
f.write(content_js)
with open(os.path.join(ext_dir, "background.js"), "w") as f:
f.write(background_js)
return ext_dir
def _create_chrome_profile() -> str:
profile_dir = tempfile.mkdtemp(prefix="chrome-profile-")
default_dir = os.path.join(profile_dir, "Default")
os.makedirs(default_dir, exist_ok=True)
prefs = {
"intl": {"accept_languages": "ru-RU,ru,en", "selected_languages": "ru-RU,ru"},
"translate": {"enabled": False},
"translate_blocked_languages": ["ru"],
}
with open(os.path.join(default_dir, "Preferences"), "w") as f:
json.dump(prefs, f)
return profile_dir
def _stop_current() -> None:
proc = _state.get("proc")
if proc:
try:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
proc.wait(timeout=4)
except Exception:
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except Exception:
pass
finally:
_state["proc"] = None
for key in ("profile_dir", "extension_dir"):
path = _state.get(key)
if path and os.path.isdir(path):
shutil.rmtree(path, ignore_errors=True)
_state[key] = None
def _start_process(cmd: list[str], mode: str, target: str) -> None:
_stop_current()
logf = open("/tmp/session-app.log", "a", buffering=1)
env = os.environ.copy()
env["DISPLAY"] = DISPLAY
proc = subprocess.Popen( # noqa: S603
cmd,
stdout=logf,
stderr=subprocess.STDOUT,
env=env,
start_new_session=True,
)
_state["proc"] = proc
_state["mode"] = mode
_state["target"] = target
def _sanitize_resolution(width: int | None, height: int | None) -> tuple[int, int]:
if not width or not height:
try:
default_w, default_h = [int(x) for x in CHROME_WINDOW_SIZE.split(",", 1)]
return default_w, default_h
except Exception:
return 1920, 1080
safe_w = max(RESOLUTION_MIN_WIDTH, min(int(width), RESOLUTION_MAX_WIDTH))
safe_h = max(RESOLUTION_MIN_HEIGHT, min(int(height), RESOLUTION_MAX_HEIGHT))
return safe_w, safe_h
def _xrandr_output_name() -> str | None:
try:
out = subprocess.run(
["xrandr", "-display", DISPLAY],
capture_output=True, text=True, check=False,
).stdout
for line in out.splitlines():
if " connected" in line:
return line.split()[0]
except Exception:
pass
return None
def _add_mode_via_cvt(width: int, height: int, output_name: str) -> bool:
try:
cvt = subprocess.run(
["cvt", str(width), str(height)],
capture_output=True, text=True, check=False,
)
if cvt.returncode != 0:
return False
modeline_line = next(
(l for l in cvt.stdout.splitlines() if l.startswith("Modeline")), None
)
if not modeline_line:
return False
parts = modeline_line.split()
mode_name = parts[1].strip('"')
mode_params = parts[2:]
subprocess.run(
["xrandr", "-display", DISPLAY, "--newmode", mode_name] + mode_params,
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
subprocess.run(
["xrandr", "-display", DISPLAY, "--addmode", output_name, mode_name],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
subprocess.run(
["xrandr", "-display", DISPLAY, "--output", output_name, "--mode", mode_name],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
return True
except Exception:
return False
def apply_resolution(width: int | None, height: int | None) -> tuple[int, int]:
safe_w, safe_h = _sanitize_resolution(width, height)
result = subprocess.run(
["xrandr", "-display", DISPLAY, "-s", f"{safe_w}x{safe_h}"],
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
if result.returncode != 0:
output_name = _xrandr_output_name()
if output_name:
_add_mode_via_cvt(safe_w, safe_h, output_name)
_state["resolution"] = f"{safe_w},{safe_h}"
return safe_w, safe_h
def _url_with_credentials(url: str, login: str, password: str) -> str:
"""Embed login:password into URL so Chromium auto-handles Basic Auth."""
if not login and not password:
return url
parsed = urlparse(url)
netloc = parsed.hostname or ""
if parsed.port:
netloc += f":{parsed.port}"
user_info = quote(login, safe="") + ":" + quote(password, safe="")
return urlunparse(parsed._replace(netloc=f"{user_info}@{netloc}"))
def open_web(
url: str,
width: int | None = None,
height: int | None = None,
login: str = "",
password: str = "",
) -> None:
safe_w, safe_h = apply_resolution(width, height)
profile_dir = _create_chrome_profile()
extension_dir = _create_autofill_extension(login, password)
url_with_creds = url # credentials in URL break SPA fetch; extension handles auth
# Use the real Chromium binary directly to avoid the Debian wrapper which
# injects an empty `--load-extension=` from /etc/chromium.d/extensions.
chromium_bin = "/usr/lib/chromium/chromium"
if not os.path.isfile(chromium_bin):
chromium_bin = "chromium"
cmd = [
chromium_bin,
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--use-gl=swiftshader",
"--kiosk",
"--disable-translate",
"--disable-features=TranslateUI,ExtensionsToolbarMenu",
"--disable-pinch",
"--overscroll-history-navigation=0",
"--ignore-certificate-errors",
"--allow-insecure-localhost",
"--allow-running-insecure-content",
f"--window-size={safe_w},{safe_h}",
"--no-first-run",
"--no-default-browser-check",
"--enable-logging=stderr",
"--v=0",
"--lang=ru-RU",
"--accept-lang=ru-RU,ru",
"--password-store=basic",
f"--user-data-dir={profile_dir}",
]
if extension_dir:
cmd.append(f"--load-extension={extension_dir}")
cmd.append(f"--disable-extensions-except={extension_dir}")
cmd.append(url_with_creds)
_start_process(cmd, "web", url)
_state["profile_dir"] = profile_dir
_state["extension_dir"] = extension_dir
def open_rdp(payload: dict) -> None:
host = (payload.get("host") or "").strip()
if not host:
raise ValueError("host is required")
port = str(payload.get("port") or "3389").strip()
user = (payload.get("user") or "").strip()
password = (payload.get("password") or "").strip()
domain = (payload.get("domain") or "").strip()
security = (payload.get("security") or "").strip().lower()
cmd = [
"xfreerdp",
f"/v:{host}:{port}",
"/cert:ignore",
"/f",
"/dynamic-resolution",
"/network:auto",
"+clipboard",
]
if security:
cmd.append(f"/sec:{security}")
if user:
cmd.append(f"/u:{user}")
if password:
cmd.append(f"/p:{password}")
if domain:
cmd.append(f"/d:{domain}")
safe_target = f"{host}:{port}"
_start_process(cmd, "rdp", safe_target)
class Handler(BaseHTTPRequestHandler):
def _read_json(self):
length = int(self.headers.get("Content-Length", "0"))
if length <= 0:
return {}
raw = self.rfile.read(length)
return json.loads(raw.decode("utf-8"))
def _json(self, code: int, payload: dict):
body = json.dumps(payload).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self):
if self.path == "/health":
proc = _state.get("proc")
running = bool(proc and proc.poll() is None)
self._json(
200,
{
"ok": True,
"mode": _state.get("mode", "idle"),
"running": running,
"target": _state.get("target", ""),
"resolution": _state.get("resolution", CHROME_WINDOW_SIZE),
},
)
return
self._json(404, {"detail": "Not found"})
def do_POST(self):
try:
data = self._read_json()
if self.path == "/open":
url = (data.get("url") or "").strip()
if not (url.startswith("http://") or url.startswith("https://")):
self._json(400, {"detail": "Invalid URL"})
return
width = data.get("width")
height = data.get("height")
login = (data.get("login") or "").strip()
password = (data.get("password") or "").strip()
with _lock:
open_web(url, width=width, height=height, login=login, password=password)
self._json(
200,
{
"ok": True,
"mode": "web",
"target": url,
"resolution": _state.get("resolution", CHROME_WINDOW_SIZE),
},
)
return
if self.path == "/resolution":
width = data.get("width")
height = data.get("height")
with _lock:
safe_w, safe_h = apply_resolution(width, height)
self._json(200, {"ok": True, "width": safe_w, "height": safe_h})
return
if self.path == "/rdp":
with _lock:
open_rdp(data)
self._json(200, {"ok": True, "mode": "rdp"})
return
self._json(404, {"detail": "Not found"})
except Exception as exc:
self._json(500, {"detail": str(exc)})
def log_message(self, fmt, *args):
return
if __name__ == "__main__":
server = HTTPServer(("0.0.0.0", 7000), Handler)
server.serve_forever()