160 lines
4.5 KiB
Python
160 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import os
|
|
import signal
|
|
import subprocess
|
|
import threading
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
|
|
DISPLAY = os.environ.get("DISPLAY", ":1")
|
|
CHROME_WINDOW_SIZE = os.environ.get("CHROME_WINDOW_SIZE", "1920,1080")
|
|
|
|
_state = {
|
|
"proc": None,
|
|
"mode": "idle",
|
|
"target": "",
|
|
}
|
|
_lock = threading.Lock()
|
|
|
|
|
|
def _stop_current() -> None:
|
|
proc = _state.get("proc")
|
|
if not proc:
|
|
return
|
|
try:
|
|
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
|
|
proc.wait(timeout=4)
|
|
except Exception:
|
|
try:
|
|
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
_state["proc"] = None
|
|
|
|
|
|
def _start_process(cmd: list[str], mode: str, target: str) -> None:
|
|
_stop_current()
|
|
logf = open("/tmp/session-app.log", "a", buffering=1)
|
|
env = os.environ.copy()
|
|
env["DISPLAY"] = DISPLAY
|
|
proc = subprocess.Popen( # noqa: S603
|
|
cmd,
|
|
stdout=logf,
|
|
stderr=subprocess.STDOUT,
|
|
env=env,
|
|
start_new_session=True,
|
|
)
|
|
_state["proc"] = proc
|
|
_state["mode"] = mode
|
|
_state["target"] = target
|
|
|
|
|
|
def open_web(url: str) -> None:
|
|
cmd = [
|
|
"chromium",
|
|
"--no-sandbox",
|
|
"--disable-dev-shm-usage",
|
|
"--disable-gpu",
|
|
"--use-gl=swiftshader",
|
|
"--kiosk",
|
|
"--disable-translate",
|
|
"--disable-features=TranslateUI,ExtensionsToolbarMenu",
|
|
"--disable-pinch",
|
|
"--overscroll-history-navigation=0",
|
|
"--ignore-certificate-errors",
|
|
"--allow-insecure-localhost",
|
|
"--allow-running-insecure-content",
|
|
f"--window-size={CHROME_WINDOW_SIZE}",
|
|
"--no-first-run",
|
|
"--no-default-browser-check",
|
|
url,
|
|
]
|
|
_start_process(cmd, "web", url)
|
|
|
|
|
|
def open_rdp(payload: dict) -> None:
|
|
host = (payload.get("host") or "").strip()
|
|
if not host:
|
|
raise ValueError("host is required")
|
|
port = str(payload.get("port") or "3389").strip()
|
|
user = (payload.get("user") or "").strip()
|
|
password = (payload.get("password") or "").strip()
|
|
domain = (payload.get("domain") or "").strip()
|
|
security = (payload.get("security") or "").strip().lower()
|
|
|
|
cmd = [
|
|
"xfreerdp",
|
|
f"/v:{host}:{port}",
|
|
"/cert:ignore",
|
|
"/f",
|
|
"/dynamic-resolution",
|
|
"/network:auto",
|
|
"+clipboard",
|
|
]
|
|
if security:
|
|
cmd.append(f"/sec:{security}")
|
|
if user:
|
|
cmd.append(f"/u:{user}")
|
|
if password:
|
|
cmd.append(f"/p:{password}")
|
|
if domain:
|
|
cmd.append(f"/d:{domain}")
|
|
|
|
safe_target = f"{host}:{port}"
|
|
_start_process(cmd, "rdp", safe_target)
|
|
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def _read_json(self):
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
if length <= 0:
|
|
return {}
|
|
raw = self.rfile.read(length)
|
|
return json.loads(raw.decode("utf-8"))
|
|
|
|
def _json(self, code: int, payload: dict):
|
|
body = json.dumps(payload).encode("utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def do_GET(self):
|
|
if self.path == "/health":
|
|
proc = _state.get("proc")
|
|
running = bool(proc and proc.poll() is None)
|
|
self._json(200, {"ok": True, "mode": _state.get("mode", "idle"), "running": running, "target": _state.get("target", "")})
|
|
return
|
|
self._json(404, {"detail": "Not found"})
|
|
|
|
def do_POST(self):
|
|
try:
|
|
data = self._read_json()
|
|
if self.path == "/open":
|
|
url = (data.get("url") or "").strip()
|
|
if not (url.startswith("http://") or url.startswith("https://")):
|
|
self._json(400, {"detail": "Invalid URL"})
|
|
return
|
|
with _lock:
|
|
open_web(url)
|
|
self._json(200, {"ok": True, "mode": "web", "target": url})
|
|
return
|
|
if self.path == "/rdp":
|
|
with _lock:
|
|
open_rdp(data)
|
|
self._json(200, {"ok": True, "mode": "rdp"})
|
|
return
|
|
self._json(404, {"detail": "Not found"})
|
|
except Exception as exc:
|
|
self._json(500, {"detail": str(exc)})
|
|
|
|
def log_message(self, fmt, *args):
|
|
return
|
|
|
|
|
|
if __name__ == "__main__":
|
|
server = HTTPServer(("0.0.0.0", 7000), Handler)
|
|
server.serve_forever()
|