#!/usr/bin/env python3 import base64 import io import os import sqlite3 import subprocess import time from datetime import datetime import qrcode from flask import Flask, redirect, render_template, request, url_for, flash, Response app = Flask(__name__) app.secret_key = os.environ.get("APP_SECRET", "dev-secret") DB_PATH = os.environ.get("DB_PATH", "/opt/wg-admin-gui/data/wgadmin.db") WG_INTERFACE = os.environ.get("WG_INTERFACE", "wg0") WG_META_FILE = os.environ.get("WG_META_FILE", "/etc/wireguard/wg-meta.env") ADMIN_USER = os.environ.get("ADMIN_USER", "admin") ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "") ONLINE_WINDOW_SEC = int(os.environ.get("ONLINE_WINDOW_SEC", "120")) def db_conn(): db_dir = os.path.dirname(DB_PATH) if db_dir: os.makedirs(db_dir, exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def ensure_schema(): with db_conn() as conn: cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS peers ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, public_key TEXT UNIQUE NOT NULL, client_address TEXT, advertised_routes TEXT, client_conf TEXT, peer_psk TEXT, peer_allowed_ips TEXT, enabled INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); """) cols = {row[1] for row in cur.execute("PRAGMA table_info(peers)").fetchall()} if "client_conf" not in cols: cur.execute("ALTER TABLE peers ADD COLUMN client_conf TEXT") if "peer_psk" not in cols: cur.execute("ALTER TABLE peers ADD COLUMN peer_psk TEXT") if "peer_allowed_ips" not in cols: cur.execute("ALTER TABLE peers ADD COLUMN peer_allowed_ips TEXT") if "enabled" not in cols: cur.execute("ALTER TABLE peers ADD COLUMN enabled INTEGER NOT NULL DEFAULT 1") conn.commit() def run(cmd): return subprocess.check_output(cmd, text=True).strip() def load_meta(): meta = {} if not os.path.exists(WG_META_FILE): return meta with open(WG_META_FILE, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or "=" not in line: continue k, v = line.split("=", 1) meta[k] = v return meta def parse_kv(text): out = {} for line in text.splitlines(): if "=" not in line: continue k, v = line.split("=", 1) out[k.strip()] = v.strip() return out def parse_wg_conf_peer_meta(): path = f"/etc/wireguard/{WG_INTERFACE}.conf" if not os.path.exists(path): return {} by_pub = {} pending_name = None in_peer = False current_pub = "" current_allowed = "" def flush(): nonlocal current_pub, current_allowed, pending_name if not current_pub: return first = current_allowed.split(",", 1)[0].strip() if current_allowed else "" routes = "" if "," in current_allowed: routes = current_allowed.split(",", 1)[1].strip() by_pub[current_pub] = { "name": pending_name or "(external)", "client_address": first, "routes": routes, "allowed_ips": current_allowed, } pending_name = None current_pub = "" current_allowed = "" with open(path, "r", encoding="utf-8") as f: for raw in f: line = raw.strip() if not line: continue if line.startswith("# managed-by=wg-peerctl"): marker = "client=" if marker in line: pending_name = line.split(marker, 1)[1].split()[0].strip() continue if line == "[Peer]": if in_peer: flush() in_peer = True continue if in_peer and line.startswith("PublicKey"): current_pub = line.split("=", 1)[1].strip() continue if in_peer and line.startswith("AllowedIPs"): current_allowed = line.split("=", 1)[1].strip() continue if in_peer: flush() return by_pub def wg_dump(): try: out = run(["wg", "show", WG_INTERFACE, "dump"]) except Exception: return [] rows = out.splitlines() peers = [] for line in rows[1:]: parts = line.split("\t") if len(parts) < 8: continue latest_ts = 0 latest = "never" if parts[5].isdigit() and int(parts[5]) > 0: latest_ts = int(parts[5]) latest = datetime.utcfromtimestamp(latest_ts).strftime("%Y-%m-%d %H:%M:%S UTC") peers.append( { "public_key": parts[0], "endpoint": parts[2] or "-", "allowed_ips": parts[3], "latest_handshake": latest, "latest_handshake_ts": latest_ts, "rx_bytes": int(parts[6] or 0), "tx_bytes": int(parts[7] or 0), } ) return peers def bytes_h(n): units = ["B", "KiB", "MiB", "GiB", "TiB"] x = float(n) for u in units: if x < 1024 or u == units[-1]: return f"{x:.1f} {u}" if u != "B" else f"{int(x)} B" x /= 1024 def gen_keypair_psk(): priv = run(["wg", "genkey"]) pub = subprocess.check_output(["wg", "pubkey"], input=priv, text=True).strip() psk = run(["wg", "genpsk"]) return priv, pub, psk def to_png_b64(text): img = qrcode.make(text) buf = io.BytesIO() img.save(buf) return base64.b64encode(buf.getvalue()).decode("ascii") def _unauthorized(): return Response( "Auth required", 401, {"WWW-Authenticate": 'Basic realm="WG Admin"'}, ) @app.before_request def _auth(): if request.path.startswith("/static/"): return None if not ADMIN_PASSWORD: return None auth = request.authorization if not auth: return _unauthorized() if auth.username != ADMIN_USER or auth.password != ADMIN_PASSWORD: return _unauthorized() return None @app.before_request def _schema(): ensure_schema() @app.route("/") def index(): meta = load_meta() runtime = {p["public_key"]: p for p in wg_dump()} conf_meta = parse_wg_conf_peer_meta() with db_conn() as conn: cur = conn.cursor() cur.execute("SELECT * FROM peers ORDER BY id DESC") db_peers = [dict(r) for r in cur.fetchall()] items = [] seen = set() now = int(time.time()) for row in db_peers: rt = runtime.get(row["public_key"], {}) ts = int(rt.get("latest_handshake_ts", 0) or 0) is_online = ts > 0 and (now - ts) <= ONLINE_WINDOW_SEC seen.add(row["public_key"]) items.append( { "id": row["id"], "name": row["name"], "public_key": row["public_key"], "client_address": row.get("client_address") or "-", "routes": row.get("advertised_routes") or "-", "allowed_ips": rt.get("allowed_ips", "-"), "endpoint": rt.get("endpoint", "-"), "latest_handshake": rt.get("latest_handshake", "offline"), "rx": bytes_h(rt.get("rx_bytes", 0)), "tx": bytes_h(rt.get("tx_bytes", 0)), "status": "online" if is_online else "offline", } ) for pk, rt in runtime.items(): if pk in seen: continue cm = conf_meta.get(pk, {}) imported_name = cm.get("name", "(external)") imported_addr = cm.get("client_address", rt.get("allowed_ips", "-").split(",", 1)[0]) imported_routes = cm.get("routes", "-") or "-" ext_id = None with db_conn() as conn: cur = conn.cursor() cur.execute( "INSERT OR IGNORE INTO peers(name, public_key, client_address, advertised_routes, enabled) VALUES (?,?,?,?,1)", (imported_name, pk, imported_addr, imported_routes if imported_routes != "-" else ""), ) cur.execute("SELECT id FROM peers WHERE public_key = ?", (pk,)) got = cur.fetchone() if got: ext_id = got["id"] conn.commit() ts = int(rt.get("latest_handshake_ts", 0) or 0) is_online = ts > 0 and (now - ts) <= ONLINE_WINDOW_SEC items.append( { "id": ext_id, "name": imported_name, "public_key": pk, "client_address": imported_addr, "routes": imported_routes, "allowed_ips": rt.get("allowed_ips", "-"), "endpoint": rt.get("endpoint", "-"), "latest_handshake": rt.get("latest_handshake", "offline"), "rx": bytes_h(rt.get("rx_bytes", 0)), "tx": bytes_h(rt.get("tx_bytes", 0)), "status": "online" if is_online else "offline", } ) return render_template("index.html", peers=items, meta=meta) @app.route("/peers/new", methods=["GET", "POST"]) def new_peer(): meta = load_meta() if request.method == "GET": return render_template("new_peer.html", meta=meta) name = request.form.get("name", "").strip() mode = request.form.get("mode", "full").strip() allowed_ips = request.form.get("allowed_ips", "").strip() routes = request.form.get("routes", "").strip() if not name: flash("Укажите имя клиента", "error") return redirect(url_for("new_peer")) if mode == "full": allowed_ips = "0.0.0.0/0,::/0" elif not allowed_ips: allowed_ips = meta.get("WG_NETWORK", "10.66.66.0/24") client_priv, client_pub, client_psk = gen_keypair_psk() cmd = [ "/usr/local/sbin/wg-peerctl", "add", "--client-name", name, "--client-public-key", client_pub, "--client-preshared-key", client_psk, "--persistent-keepalive", "25", ] if routes: cmd += ["--client-routes", routes] try: resp = parse_kv(run(cmd)) except subprocess.CalledProcessError as e: flash(f"Не удалось добавить peer: {e}", "error") return redirect(url_for("new_peer")) client_addr = resp.get("CLIENT_ADDRESS", "") server_pub = resp.get("SERVER_PUBLIC_KEY", "") endpoint = resp.get("SERVER_ENDPOINT", "") dns = resp.get("SERVER_DNS", "1.1.1.1") conf_lines = [ "[Interface]", f"PrivateKey = {client_priv}", f"Address = {client_addr}", f"DNS = {dns}", "", "[Peer]", f"PublicKey = {server_pub}", f"PresharedKey = {client_psk}", f"Endpoint = {endpoint}", f"AllowedIPs = {allowed_ips}", "PersistentKeepalive = 25", "", ] client_conf = "\n".join(conf_lines) qr_b64 = to_png_b64(client_conf) with db_conn() as conn: cur = conn.cursor() cur.execute( "UPDATE peers SET name=?, client_address=?, advertised_routes=?, client_conf=?, peer_psk=?, peer_allowed_ips=?, enabled=1 WHERE public_key=?", (name, client_addr, routes, client_conf, client_psk, client_addr + (("," + routes) if routes else ""), client_pub), ) if cur.rowcount == 0: cur.execute( "INSERT INTO peers(name, public_key, client_address, advertised_routes, client_conf, peer_psk, peer_allowed_ips, enabled) VALUES (?,?,?,?,?,?,?,1)", (name, client_pub, client_addr, routes, client_conf, client_psk, client_addr + (("," + routes) if routes else "")), ) conn.commit() return render_template( "peer_created.html", name=name, client_conf=client_conf, qr_b64=qr_b64, public_key=client_pub, ) @app.route("/peers/") def peer_view(peer_id: int): with db_conn() as conn: cur = conn.cursor() cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,)) row = cur.fetchone() if not row: flash("Клиент не найден", "error") return redirect(url_for("index")) item = dict(row) conf = item.get("client_conf") or "" if not conf: flash("Для этого клиента не найден сохраненный конфиг", "error") return redirect(url_for("index")) qr_b64 = to_png_b64(conf) return render_template( "peer_created.html", name=item.get("name", "peer"), client_conf=conf, qr_b64=qr_b64, public_key=item.get("public_key", ""), ) @app.post("/peers//disable") def peer_disable(peer_id: int): with db_conn() as conn: cur = conn.cursor() cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,)) row = cur.fetchone() if not row: flash("Клиент не найден", "error") return redirect(url_for("index")) item = dict(row) pk = item.get("public_key", "") if not pk: flash("Не найден public key", "error") return redirect(url_for("index")) try: run(["/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk]) except subprocess.CalledProcessError as e: flash(f"Не удалось отключить peer: {e}", "error") return redirect(url_for("index")) with db_conn() as conn: cur = conn.cursor() cur.execute("UPDATE peers SET enabled=0 WHERE id = ?", (peer_id,)) conn.commit() flash("Peer отключен", "ok") return redirect(url_for("index")) @app.post("/peers//enable") def peer_enable(peer_id: int): with db_conn() as conn: cur = conn.cursor() cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,)) row = cur.fetchone() if not row: flash("Клиент не найден", "error") return redirect(url_for("index")) item = dict(row) name = item.get("name", "") pk = item.get("public_key", "") addr = item.get("client_address", "") routes = item.get("advertised_routes", "") or "" psk = item.get("peer_psk", "") or "" if not (name and pk and addr and psk): flash("Недостаточно данных для включения peer (нужны name/public key/address/psk)", "error") return redirect(url_for("index")) cmd = [ "/usr/local/sbin/wg-peerctl", "add", "--client-name", name, "--client-public-key", pk, "--client-address", addr, "--client-preshared-key", psk, "--persistent-keepalive", "25", ] if routes: cmd += ["--client-routes", routes] try: run(cmd) except subprocess.CalledProcessError as e: flash(f"Не удалось включить peer: {e}", "error") return redirect(url_for("index")) with db_conn() as conn: cur = conn.cursor() cur.execute("UPDATE peers SET enabled=1 WHERE id = ?", (peer_id,)) conn.commit() flash("Peer включен", "ok") return redirect(url_for("index")) @app.post("/peers//delete") def peer_delete(peer_id: int): with db_conn() as conn: cur = conn.cursor() cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,)) row = cur.fetchone() if not row: flash("Клиент не найден", "error") return redirect(url_for("index")) item = dict(row) pk = item.get("public_key", "") if pk: try: run(["/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk]) except Exception: pass with db_conn() as conn: cur = conn.cursor() cur.execute("DELETE FROM peers WHERE id = ?", (peer_id,)) conn.commit() flash("Peer удален", "ok") return redirect(url_for("index")) @app.post("/peers/delete-by-key") def peer_delete_by_key(): pk = (request.form.get("public_key") or "").strip() if not pk: flash("Не найден public key", "error") return redirect(url_for("index")) try: run(["/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk]) except Exception: pass with db_conn() as conn: cur = conn.cursor() cur.execute("DELETE FROM peers WHERE public_key = ?", (pk,)) conn.commit() flash("Peer удален", "ok") return redirect(url_for("index")) @app.route("/scripts") def scripts(): commands = { "server_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/server/install_server.sh\"", "client_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/client/install_client.sh\"", "apply_wg": "wg syncconf wg0 <(wg-quick strip /etc/wireguard/wg0.conf)", } paths = [ "/usr/local/sbin/wg-peerctl", "/etc/wireguard/wg0.conf", "/etc/wireguard/wg-meta.env", "/var/log/wireguard-server-install.log", "/var/log/wireguard-client-install.log", "/var/log/wireguard-peerctl.log", ] return render_template("scripts.html", commands=commands, paths=paths) if __name__ == "__main__": app.run(host="0.0.0.0", port=int(os.environ.get("APP_PORT", "5080")), debug=False)