d57acb416b
The previous approach pre-populated Chromiums Login Data SQLite with schema version 30 and AES-128-CBC v10 encrypted passwords. Chromium 147 expects schema version 43, fails to migrate (Unable to migrate database from 30 to 43), and refuses to open Login Data altogether. Result: the row was written but Chromium never read it, so autofill never worked. Instead generate a tiny Manifest V3 extension per session in a temp dir with a content_script that finds username and password fields, sets their values, and dispatches input/change events. Pass it via --load-extension and --disable-extensions-except so it is the only extension loaded. Benefits: - Independent of Chromium version and Login Database schema - Works on SPAs (MutationObserver re-runs on DOM changes) - Credentials live only in a temp file alongside the profile, removed on session end via _stop_current - No SQLite or cryptography dependency - Removes the silent failure mode of Login Data migration Removes _chrome_encrypt_v10, sqlite3, hashlib, urlparse imports. Adds _create_autofill_extension and tracks extension_dir alongside profile_dir in _state for cleanup symmetry.
415 lines
13 KiB
Python
415 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import os
|
|
import shutil
|
|
import signal
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
|
|
DISPLAY = os.environ.get("DISPLAY", ":1")
|
|
CHROME_WINDOW_SIZE = os.environ.get("CHROME_WINDOW_SIZE", "1920,1080")
|
|
RESOLUTION_MIN_WIDTH = int(os.environ.get("WEB_RESOLUTION_MIN_WIDTH", "1024"))
|
|
RESOLUTION_MIN_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MIN_HEIGHT", "720"))
|
|
RESOLUTION_MAX_WIDTH = int(os.environ.get("WEB_RESOLUTION_MAX_WIDTH", "3840"))
|
|
RESOLUTION_MAX_HEIGHT = int(os.environ.get("WEB_RESOLUTION_MAX_HEIGHT", "2160"))
|
|
|
|
_state = {
|
|
"proc": None,
|
|
"mode": "idle",
|
|
"target": "",
|
|
"resolution": CHROME_WINDOW_SIZE,
|
|
"profile_dir": None,
|
|
"extension_dir": None,
|
|
}
|
|
_lock = threading.Lock()
|
|
|
|
|
|
_AUTOFILL_CONTENT_JS = r"""
|
|
(function() {
|
|
const CREDS = __CREDS__;
|
|
let filled = false;
|
|
|
|
function findUserField() {
|
|
const candidates = document.querySelectorAll(
|
|
'input[type="email"], ' +
|
|
'input[autocomplete*="username"], ' +
|
|
'input[autocomplete*="email"], ' +
|
|
'input[name*="user" i], ' +
|
|
'input[name*="login" i], ' +
|
|
'input[name*="email" i], ' +
|
|
'input[id*="user" i], ' +
|
|
'input[id*="login" i], ' +
|
|
'input[id*="email" i], ' +
|
|
'input[type="text"]'
|
|
);
|
|
for (const el of candidates) {
|
|
if (el.type === 'password' || el.type === 'hidden') continue;
|
|
if (el.offsetParent === null && el.type !== 'email') continue;
|
|
return el;
|
|
}
|
|
return null;
|
|
}
|
|
|
|
function findPassField() {
|
|
const list = document.querySelectorAll('input[type="password"]');
|
|
for (const el of list) {
|
|
if (el.offsetParent !== null) return el;
|
|
}
|
|
return list[0] || null;
|
|
}
|
|
|
|
function setNativeValue(el, v) {
|
|
if (!el) return false;
|
|
if (el.value === v) return true;
|
|
const proto = Object.getPrototypeOf(el);
|
|
const desc = Object.getOwnPropertyDescriptor(proto, 'value') ||
|
|
Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value');
|
|
if (desc && desc.set) desc.set.call(el, v); else el.value = v;
|
|
el.dispatchEvent(new Event('input', { bubbles: true }));
|
|
el.dispatchEvent(new Event('change', { bubbles: true }));
|
|
return true;
|
|
}
|
|
|
|
function tryFill() {
|
|
if (filled) return;
|
|
const p = findPassField();
|
|
if (!p) return;
|
|
const u = findUserField();
|
|
let did = false;
|
|
if (CREDS.login && u) did = setNativeValue(u, CREDS.login) || did;
|
|
if (CREDS.password) did = setNativeValue(p, CREDS.password) || did;
|
|
if (did) filled = true;
|
|
}
|
|
|
|
if (document.readyState === 'loading') {
|
|
document.addEventListener('DOMContentLoaded', tryFill);
|
|
} else {
|
|
tryFill();
|
|
}
|
|
|
|
const obs = new MutationObserver(() => { if (!filled) tryFill(); });
|
|
if (document.documentElement) {
|
|
obs.observe(document.documentElement, { childList: true, subtree: true });
|
|
}
|
|
|
|
const resetAndRefill = () => { filled = false; setTimeout(tryFill, 150); };
|
|
['pushState', 'replaceState'].forEach(fn => {
|
|
const orig = history[fn];
|
|
history[fn] = function() { const r = orig.apply(this, arguments); resetAndRefill(); return r; };
|
|
});
|
|
window.addEventListener('popstate', resetAndRefill);
|
|
})();
|
|
"""
|
|
|
|
_AUTOFILL_MANIFEST = {
|
|
"manifest_version": 3,
|
|
"name": "Portal Autofill",
|
|
"version": "1.0",
|
|
"description": "Auto-fill credentials for portal session",
|
|
"content_scripts": [
|
|
{
|
|
"matches": ["<all_urls>"],
|
|
"js": ["content.js"],
|
|
"run_at": "document_idle",
|
|
"all_frames": True,
|
|
}
|
|
],
|
|
}
|
|
|
|
|
|
def _create_autofill_extension(login: str, password: str) -> str | None:
|
|
if not login and not password:
|
|
return None
|
|
ext_dir = tempfile.mkdtemp(prefix="chrome-autofill-ext-")
|
|
creds_json = json.dumps({"login": login, "password": password})
|
|
content_js = _AUTOFILL_CONTENT_JS.replace("__CREDS__", creds_json)
|
|
with open(os.path.join(ext_dir, "manifest.json"), "w") as f:
|
|
json.dump(_AUTOFILL_MANIFEST, f)
|
|
with open(os.path.join(ext_dir, "content.js"), "w") as f:
|
|
f.write(content_js)
|
|
return ext_dir
|
|
|
|
|
|
def _create_chrome_profile() -> str:
|
|
profile_dir = tempfile.mkdtemp(prefix="chrome-profile-")
|
|
default_dir = os.path.join(profile_dir, "Default")
|
|
os.makedirs(default_dir, exist_ok=True)
|
|
prefs = {
|
|
"intl": {"accept_languages": "ru-RU,ru,en", "selected_languages": "ru-RU,ru"},
|
|
"translate": {"enabled": False},
|
|
"translate_blocked_languages": ["ru"],
|
|
}
|
|
with open(os.path.join(default_dir, "Preferences"), "w") as f:
|
|
json.dump(prefs, f)
|
|
return profile_dir
|
|
|
|
|
|
def _stop_current() -> None:
|
|
proc = _state.get("proc")
|
|
if proc:
|
|
try:
|
|
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
|
|
proc.wait(timeout=4)
|
|
except Exception:
|
|
try:
|
|
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
_state["proc"] = None
|
|
|
|
for key in ("profile_dir", "extension_dir"):
|
|
path = _state.get(key)
|
|
if path and os.path.isdir(path):
|
|
shutil.rmtree(path, ignore_errors=True)
|
|
_state[key] = None
|
|
|
|
|
|
def _start_process(cmd: list[str], mode: str, target: str) -> None:
|
|
_stop_current()
|
|
logf = open("/tmp/session-app.log", "a", buffering=1)
|
|
env = os.environ.copy()
|
|
env["DISPLAY"] = DISPLAY
|
|
proc = subprocess.Popen( # noqa: S603
|
|
cmd,
|
|
stdout=logf,
|
|
stderr=subprocess.STDOUT,
|
|
env=env,
|
|
start_new_session=True,
|
|
)
|
|
_state["proc"] = proc
|
|
_state["mode"] = mode
|
|
_state["target"] = target
|
|
|
|
|
|
def _sanitize_resolution(width: int | None, height: int | None) -> tuple[int, int]:
|
|
if not width or not height:
|
|
try:
|
|
default_w, default_h = [int(x) for x in CHROME_WINDOW_SIZE.split(",", 1)]
|
|
return default_w, default_h
|
|
except Exception:
|
|
return 1920, 1080
|
|
safe_w = max(RESOLUTION_MIN_WIDTH, min(int(width), RESOLUTION_MAX_WIDTH))
|
|
safe_h = max(RESOLUTION_MIN_HEIGHT, min(int(height), RESOLUTION_MAX_HEIGHT))
|
|
return safe_w, safe_h
|
|
|
|
|
|
def _xrandr_output_name() -> str | None:
|
|
try:
|
|
out = subprocess.run(
|
|
["xrandr", "-display", DISPLAY],
|
|
capture_output=True, text=True, check=False,
|
|
).stdout
|
|
for line in out.splitlines():
|
|
if " connected" in line:
|
|
return line.split()[0]
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _add_mode_via_cvt(width: int, height: int, output_name: str) -> bool:
|
|
try:
|
|
cvt = subprocess.run(
|
|
["cvt", str(width), str(height)],
|
|
capture_output=True, text=True, check=False,
|
|
)
|
|
if cvt.returncode != 0:
|
|
return False
|
|
modeline_line = next(
|
|
(l for l in cvt.stdout.splitlines() if l.startswith("Modeline")), None
|
|
)
|
|
if not modeline_line:
|
|
return False
|
|
parts = modeline_line.split()
|
|
mode_name = parts[1].strip('"')
|
|
mode_params = parts[2:]
|
|
subprocess.run(
|
|
["xrandr", "-display", DISPLAY, "--newmode", mode_name] + mode_params,
|
|
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
)
|
|
subprocess.run(
|
|
["xrandr", "-display", DISPLAY, "--addmode", output_name, mode_name],
|
|
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
)
|
|
subprocess.run(
|
|
["xrandr", "-display", DISPLAY, "--output", output_name, "--mode", mode_name],
|
|
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def apply_resolution(width: int | None, height: int | None) -> tuple[int, int]:
|
|
safe_w, safe_h = _sanitize_resolution(width, height)
|
|
result = subprocess.run(
|
|
["xrandr", "-display", DISPLAY, "-s", f"{safe_w}x{safe_h}"],
|
|
check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
)
|
|
if result.returncode != 0:
|
|
output_name = _xrandr_output_name()
|
|
if output_name:
|
|
_add_mode_via_cvt(safe_w, safe_h, output_name)
|
|
_state["resolution"] = f"{safe_w},{safe_h}"
|
|
return safe_w, safe_h
|
|
|
|
|
|
def open_web(
|
|
url: str,
|
|
width: int | None = None,
|
|
height: int | None = None,
|
|
login: str = "",
|
|
password: str = "",
|
|
) -> None:
|
|
safe_w, safe_h = apply_resolution(width, height)
|
|
profile_dir = _create_chrome_profile()
|
|
extension_dir = _create_autofill_extension(login, password)
|
|
|
|
cmd = [
|
|
"chromium",
|
|
"--no-sandbox",
|
|
"--disable-dev-shm-usage",
|
|
"--disable-gpu",
|
|
"--use-gl=swiftshader",
|
|
"--kiosk",
|
|
"--disable-translate",
|
|
"--disable-features=TranslateUI,ExtensionsToolbarMenu",
|
|
"--disable-pinch",
|
|
"--overscroll-history-navigation=0",
|
|
"--ignore-certificate-errors",
|
|
"--allow-insecure-localhost",
|
|
"--allow-running-insecure-content",
|
|
f"--window-size={safe_w},{safe_h}",
|
|
"--no-first-run",
|
|
"--no-default-browser-check",
|
|
"--lang=ru-RU",
|
|
"--accept-lang=ru-RU,ru",
|
|
"--password-store=basic",
|
|
f"--user-data-dir={profile_dir}",
|
|
]
|
|
if extension_dir:
|
|
cmd.append(f"--load-extension={extension_dir}")
|
|
cmd.append(f"--disable-extensions-except={extension_dir}")
|
|
cmd.append(url)
|
|
|
|
_start_process(cmd, "web", url)
|
|
_state["profile_dir"] = profile_dir
|
|
_state["extension_dir"] = extension_dir
|
|
|
|
|
|
def open_rdp(payload: dict) -> None:
|
|
host = (payload.get("host") or "").strip()
|
|
if not host:
|
|
raise ValueError("host is required")
|
|
port = str(payload.get("port") or "3389").strip()
|
|
user = (payload.get("user") or "").strip()
|
|
password = (payload.get("password") or "").strip()
|
|
domain = (payload.get("domain") or "").strip()
|
|
security = (payload.get("security") or "").strip().lower()
|
|
|
|
cmd = [
|
|
"xfreerdp",
|
|
f"/v:{host}:{port}",
|
|
"/cert:ignore",
|
|
"/f",
|
|
"/dynamic-resolution",
|
|
"/network:auto",
|
|
"+clipboard",
|
|
]
|
|
if security:
|
|
cmd.append(f"/sec:{security}")
|
|
if user:
|
|
cmd.append(f"/u:{user}")
|
|
if password:
|
|
cmd.append(f"/p:{password}")
|
|
if domain:
|
|
cmd.append(f"/d:{domain}")
|
|
|
|
safe_target = f"{host}:{port}"
|
|
_start_process(cmd, "rdp", safe_target)
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def _read_json(self):
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
if length <= 0:
|
|
return {}
|
|
raw = self.rfile.read(length)
|
|
return json.loads(raw.decode("utf-8"))
|
|
|
|
def _json(self, code: int, payload: dict):
|
|
body = json.dumps(payload).encode("utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def do_GET(self):
|
|
if self.path == "/health":
|
|
proc = _state.get("proc")
|
|
running = bool(proc and proc.poll() is None)
|
|
self._json(
|
|
200,
|
|
{
|
|
"ok": True,
|
|
"mode": _state.get("mode", "idle"),
|
|
"running": running,
|
|
"target": _state.get("target", ""),
|
|
"resolution": _state.get("resolution", CHROME_WINDOW_SIZE),
|
|
},
|
|
)
|
|
return
|
|
self._json(404, {"detail": "Not found"})
|
|
|
|
def do_POST(self):
|
|
try:
|
|
data = self._read_json()
|
|
if self.path == "/open":
|
|
url = (data.get("url") or "").strip()
|
|
if not (url.startswith("http://") or url.startswith("https://")):
|
|
self._json(400, {"detail": "Invalid URL"})
|
|
return
|
|
width = data.get("width")
|
|
height = data.get("height")
|
|
login = (data.get("login") or "").strip()
|
|
password = (data.get("password") or "").strip()
|
|
with _lock:
|
|
open_web(url, width=width, height=height, login=login, password=password)
|
|
self._json(
|
|
200,
|
|
{
|
|
"ok": True,
|
|
"mode": "web",
|
|
"target": url,
|
|
"resolution": _state.get("resolution", CHROME_WINDOW_SIZE),
|
|
},
|
|
)
|
|
return
|
|
if self.path == "/resolution":
|
|
width = data.get("width")
|
|
height = data.get("height")
|
|
with _lock:
|
|
safe_w, safe_h = apply_resolution(width, height)
|
|
self._json(200, {"ok": True, "width": safe_w, "height": safe_h})
|
|
return
|
|
if self.path == "/rdp":
|
|
with _lock:
|
|
open_rdp(data)
|
|
self._json(200, {"ok": True, "mode": "rdp"})
|
|
return
|
|
self._json(404, {"detail": "Not found"})
|
|
except Exception as exc:
|
|
self._json(500, {"detail": str(exc)})
|
|
|
|
def log_message(self, fmt, *args):
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
server = HTTPServer(("0.0.0.0", 7000), Handler)
|
|
server.serve_forever()
|