Files
Wireguard_server/gui/app.py
T
ruslan 904582e7fa feat(gui): security hardening, UI overhaul, light theme
- CSRF protection on all POST forms (session token)
- ensure_schema() moved to module-level, removed from before_request
- gunicorn now binds to 127.0.0.1 only, runs as unprivileged user wgadmin
- nginx reverse proxy with HTTPS (Let's Encrypt, wg.4mont.ru)
- HTTP → HTTPS redirect before Basic Auth prompt
- Auth moved to nginx level (auth_basic), wg-peerctl called via sudo
- ufw firewall: only 22/80/443/51820 open
- fail2ban: SSH + nginx (5 attempts → 1h ban)
- Add Enable/Disable toggle buttons in peer table
- Add .conf file download route
- Light theme: white background, blue accent, subtle shadows
- Modern sidebar layout, styled badges, responsive forms

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 10:10:19 +03:00

595 lines
19 KiB
Python

#!/usr/bin/env python3
import base64
import io
import os
import secrets
import sqlite3
import subprocess
import time
from datetime import datetime
import qrcode
from flask import Flask, redirect, render_template, request, url_for, flash, Response, session, send_file
app = Flask(__name__)
app.secret_key = os.environ.get("APP_SECRET", "dev-secret")
DB_PATH = os.environ.get("DB_PATH", "/opt/wg-admin-gui/data/wgadmin.db")
WG_INTERFACE = os.environ.get("WG_INTERFACE", "wg0")
WG_META_FILE = os.environ.get("WG_META_FILE", "/etc/wireguard/wg-meta.env")
ADMIN_USER = os.environ.get("ADMIN_USER", "admin")
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "")
ONLINE_WINDOW_SEC = int(os.environ.get("ONLINE_WINDOW_SEC", "120"))
def db_conn():
db_dir = os.path.dirname(DB_PATH)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def ensure_schema():
with db_conn() as conn:
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS peers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
public_key TEXT UNIQUE NOT NULL,
client_address TEXT,
advertised_routes TEXT,
client_conf TEXT,
peer_psk TEXT,
peer_allowed_ips TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
""")
cols = {row[1] for row in cur.execute("PRAGMA table_info(peers)").fetchall()}
if "client_conf" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN client_conf TEXT")
if "peer_psk" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN peer_psk TEXT")
if "peer_allowed_ips" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN peer_allowed_ips TEXT")
if "enabled" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN enabled INTEGER NOT NULL DEFAULT 1")
conn.commit()
# Run once per worker process on import
ensure_schema()
def run(cmd):
return subprocess.check_output(cmd, text=True).strip()
def load_meta():
meta = {}
if not os.path.exists(WG_META_FILE):
return meta
with open(WG_META_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or "=" not in line:
continue
k, v = line.split("=", 1)
meta[k] = v
return meta
def parse_kv(text):
out = {}
for line in text.splitlines():
if "=" not in line:
continue
k, v = line.split("=", 1)
out[k.strip()] = v.strip()
return out
def parse_wg_conf_peer_meta():
path = f"/etc/wireguard/{WG_INTERFACE}.conf"
if not os.path.exists(path):
return {}
by_pub = {}
pending_name = None
in_peer = False
current_pub = ""
current_allowed = ""
def flush():
nonlocal current_pub, current_allowed, pending_name
if not current_pub:
return
first = current_allowed.split(",", 1)[0].strip() if current_allowed else ""
routes = ""
if "," in current_allowed:
routes = current_allowed.split(",", 1)[1].strip()
by_pub[current_pub] = {
"name": pending_name or "(external)",
"client_address": first,
"routes": routes,
"allowed_ips": current_allowed,
}
pending_name = None
current_pub = ""
current_allowed = ""
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line:
continue
if line.startswith("# managed-by=wg-peerctl"):
marker = "client="
if marker in line:
pending_name = line.split(marker, 1)[1].split()[0].strip()
continue
if line == "[Peer]":
if in_peer:
flush()
in_peer = True
continue
if in_peer and line.startswith("PublicKey"):
current_pub = line.split("=", 1)[1].strip()
continue
if in_peer and line.startswith("AllowedIPs"):
current_allowed = line.split("=", 1)[1].strip()
continue
if in_peer:
flush()
return by_pub
def wg_dump():
try:
out = run(["wg", "show", WG_INTERFACE, "dump"])
except Exception:
return []
rows = out.splitlines()
peers = []
for line in rows[1:]:
parts = line.split("\t")
if len(parts) < 8:
continue
latest_ts = 0
latest = "never"
if parts[5].isdigit() and int(parts[5]) > 0:
latest_ts = int(parts[5])
latest = datetime.utcfromtimestamp(latest_ts).strftime("%Y-%m-%d %H:%M:%S UTC")
peers.append(
{
"public_key": parts[0],
"endpoint": parts[2] or "-",
"allowed_ips": parts[3],
"latest_handshake": latest,
"latest_handshake_ts": latest_ts,
"rx_bytes": int(parts[6] or 0),
"tx_bytes": int(parts[7] or 0),
}
)
return peers
def bytes_h(n):
units = ["B", "KiB", "MiB", "GiB", "TiB"]
x = float(n)
for u in units:
if x < 1024 or u == units[-1]:
return f"{x:.1f} {u}" if u != "B" else f"{int(x)} B"
x /= 1024
def gen_keypair_psk():
priv = run(["wg", "genkey"])
pub = subprocess.check_output(["wg", "pubkey"], input=priv, text=True).strip()
psk = run(["wg", "genpsk"])
return priv, pub, psk
def to_png_b64(text):
img = qrcode.make(text)
buf = io.BytesIO()
img.save(buf)
return base64.b64encode(buf.getvalue()).decode("ascii")
def _unauthorized():
return Response(
"Auth required",
401,
{"WWW-Authenticate": 'Basic realm="WG Admin"'},
)
def _get_csrf_token():
if "csrf_token" not in session:
session["csrf_token"] = secrets.token_hex(32)
return session["csrf_token"]
app.jinja_env.globals["csrf_token"] = _get_csrf_token
@app.before_request
def _auth():
if request.path.startswith("/static/"):
return None
if not ADMIN_PASSWORD:
return None
auth = request.authorization
if not auth:
return _unauthorized()
if auth.username != ADMIN_USER or auth.password != ADMIN_PASSWORD:
return _unauthorized()
return None
@app.before_request
def _csrf_check():
if request.method != "POST":
return None
token = request.form.get("csrf_token")
if not token or token != session.get("csrf_token"):
return Response("CSRF token invalid", 403)
return None
@app.route("/")
def index():
meta = load_meta()
runtime = {p["public_key"]: p for p in wg_dump()}
conf_meta = parse_wg_conf_peer_meta()
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers ORDER BY id DESC")
db_peers = [dict(r) for r in cur.fetchall()]
items = []
seen = set()
now = int(time.time())
for row in db_peers:
rt = runtime.get(row["public_key"], {})
ts = int(rt.get("latest_handshake_ts", 0) or 0)
is_online = ts > 0 and (now - ts) <= ONLINE_WINDOW_SEC
seen.add(row["public_key"])
items.append(
{
"id": row["id"],
"name": row["name"],
"public_key": row["public_key"],
"client_address": row.get("client_address") or "-",
"routes": row.get("advertised_routes") or "-",
"allowed_ips": rt.get("allowed_ips", "-"),
"endpoint": rt.get("endpoint", "-"),
"latest_handshake": rt.get("latest_handshake", "offline"),
"rx": bytes_h(rt.get("rx_bytes", 0)),
"tx": bytes_h(rt.get("tx_bytes", 0)),
"status": "online" if is_online else "offline",
"enabled": row.get("enabled", 1),
}
)
for pk, rt in runtime.items():
if pk in seen:
continue
cm = conf_meta.get(pk, {})
imported_name = cm.get("name", "(external)")
imported_addr = cm.get("client_address", rt.get("allowed_ips", "-").split(",", 1)[0])
imported_routes = cm.get("routes", "-") or "-"
ext_id = None
with db_conn() as conn:
cur = conn.cursor()
cur.execute(
"INSERT OR IGNORE INTO peers(name, public_key, client_address, advertised_routes, enabled) VALUES (?,?,?,?,1)",
(imported_name, pk, imported_addr, imported_routes if imported_routes != "-" else ""),
)
cur.execute("SELECT id FROM peers WHERE public_key = ?", (pk,))
got = cur.fetchone()
if got:
ext_id = got["id"]
conn.commit()
ts = int(rt.get("latest_handshake_ts", 0) or 0)
is_online = ts > 0 and (now - ts) <= ONLINE_WINDOW_SEC
items.append(
{
"id": ext_id,
"name": imported_name,
"public_key": pk,
"client_address": imported_addr,
"routes": imported_routes,
"allowed_ips": rt.get("allowed_ips", "-"),
"endpoint": rt.get("endpoint", "-"),
"latest_handshake": rt.get("latest_handshake", "offline"),
"rx": bytes_h(rt.get("rx_bytes", 0)),
"tx": bytes_h(rt.get("tx_bytes", 0)),
"status": "online" if is_online else "offline",
"enabled": 1,
}
)
return render_template("index.html", peers=items, meta=meta)
@app.route("/peers/new", methods=["GET", "POST"])
def new_peer():
meta = load_meta()
if request.method == "GET":
return render_template("new_peer.html", meta=meta)
name = request.form.get("name", "").strip()
mode = request.form.get("mode", "full").strip()
allowed_ips = request.form.get("allowed_ips", "").strip()
routes = request.form.get("routes", "").strip()
if not name:
flash("Укажите имя клиента", "error")
return redirect(url_for("new_peer"))
if mode == "full":
allowed_ips = "0.0.0.0/0,::/0"
elif not allowed_ips:
allowed_ips = meta.get("WG_NETWORK", "10.66.66.0/24")
client_priv, client_pub, client_psk = gen_keypair_psk()
cmd = [
"sudo", "/usr/local/sbin/wg-peerctl",
"add",
"--client-name", name,
"--client-public-key", client_pub,
"--client-preshared-key", client_psk,
"--persistent-keepalive", "25",
]
if routes:
cmd += ["--client-routes", routes]
try:
resp = parse_kv(run(cmd))
except subprocess.CalledProcessError as e:
flash(f"Не удалось добавить peer: {e}", "error")
return redirect(url_for("new_peer"))
client_addr = resp.get("CLIENT_ADDRESS", "")
server_pub = resp.get("SERVER_PUBLIC_KEY", "")
endpoint = resp.get("SERVER_ENDPOINT", "")
dns = resp.get("SERVER_DNS", "1.1.1.1")
conf_lines = [
"[Interface]",
f"PrivateKey = {client_priv}",
f"Address = {client_addr}",
f"DNS = {dns}",
"",
"[Peer]",
f"PublicKey = {server_pub}",
f"PresharedKey = {client_psk}",
f"Endpoint = {endpoint}",
f"AllowedIPs = {allowed_ips}",
"PersistentKeepalive = 25",
"",
]
client_conf = "\n".join(conf_lines)
qr_b64 = to_png_b64(client_conf)
with db_conn() as conn:
cur = conn.cursor()
cur.execute(
"UPDATE peers SET name=?, client_address=?, advertised_routes=?, client_conf=?, peer_psk=?, peer_allowed_ips=?, enabled=1 WHERE public_key=?",
(name, client_addr, routes, client_conf, client_psk, client_addr + (("," + routes) if routes else ""), client_pub),
)
if cur.rowcount == 0:
cur.execute(
"INSERT INTO peers(name, public_key, client_address, advertised_routes, client_conf, peer_psk, peer_allowed_ips, enabled) VALUES (?,?,?,?,?,?,?,1)",
(name, client_pub, client_addr, routes, client_conf, client_psk, client_addr + (("," + routes) if routes else "")),
)
conn.commit()
return render_template(
"peer_created.html",
name=name,
client_conf=client_conf,
qr_b64=qr_b64,
public_key=client_pub,
)
@app.route("/peers/<int:peer_id>")
def peer_view(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
conf = item.get("client_conf") or ""
if not conf:
flash("Для этого клиента не найден сохраненный конфиг", "error")
return redirect(url_for("index"))
qr_b64 = to_png_b64(conf)
return render_template(
"peer_created.html",
name=item.get("name", "peer"),
client_conf=conf,
qr_b64=qr_b64,
public_key=item.get("public_key", ""),
)
@app.route("/peers/<int:peer_id>/download")
def peer_download(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
conf = item.get("client_conf") or ""
if not conf:
flash("Для этого клиента не найден сохраненный конфиг", "error")
return redirect(url_for("index"))
name = item.get("name", "peer")
buf = io.BytesIO(conf.encode("utf-8"))
return send_file(buf, as_attachment=True, download_name=f"{name}.conf", mimetype="text/plain")
@app.post("/peers/<int:peer_id>/disable")
def peer_disable(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
pk = item.get("public_key", "")
if not pk:
flash("Не найден public key", "error")
return redirect(url_for("index"))
try:
run(["sudo", "/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk])
except subprocess.CalledProcessError as e:
flash(f"Не удалось отключить peer: {e}", "error")
return redirect(url_for("index"))
with db_conn() as conn:
cur = conn.cursor()
cur.execute("UPDATE peers SET enabled=0 WHERE id = ?", (peer_id,))
conn.commit()
flash("Peer отключен", "ok")
return redirect(url_for("index"))
@app.post("/peers/<int:peer_id>/enable")
def peer_enable(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
name = item.get("name", "")
pk = item.get("public_key", "")
addr = item.get("client_address", "")
routes = item.get("advertised_routes", "") or ""
psk = item.get("peer_psk", "") or ""
if not (name and pk and addr and psk):
flash("Недостаточно данных для включения peer (нужны name/public key/address/psk)", "error")
return redirect(url_for("index"))
cmd = [
"sudo", "/usr/local/sbin/wg-peerctl",
"add",
"--client-name", name,
"--client-public-key", pk,
"--client-address", addr,
"--client-preshared-key", psk,
"--persistent-keepalive", "25",
]
if routes:
cmd += ["--client-routes", routes]
try:
run(cmd)
except subprocess.CalledProcessError as e:
flash(f"Не удалось включить peer: {e}", "error")
return redirect(url_for("index"))
with db_conn() as conn:
cur = conn.cursor()
cur.execute("UPDATE peers SET enabled=1 WHERE id = ?", (peer_id,))
conn.commit()
flash("Peer включен", "ok")
return redirect(url_for("index"))
@app.post("/peers/<int:peer_id>/delete")
def peer_delete(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
pk = item.get("public_key", "")
if pk:
try:
run(["sudo", "/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk])
except Exception:
pass
with db_conn() as conn:
cur = conn.cursor()
cur.execute("DELETE FROM peers WHERE id = ?", (peer_id,))
conn.commit()
flash("Peer удален", "ok")
return redirect(url_for("index"))
@app.post("/peers/delete-by-key")
def peer_delete_by_key():
pk = (request.form.get("public_key") or "").strip()
if not pk:
flash("Не найден public key", "error")
return redirect(url_for("index"))
try:
run(["sudo", "/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk])
except Exception:
pass
with db_conn() as conn:
cur = conn.cursor()
cur.execute("DELETE FROM peers WHERE public_key = ?", (pk,))
conn.commit()
flash("Peer удален", "ok")
return redirect(url_for("index"))
@app.route("/scripts")
def scripts():
commands = {
"server_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/server/install_server.sh\"",
"client_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/client/install_client.sh\"",
"apply_wg": "wg syncconf wg0 <(wg-quick strip /etc/wireguard/wg0.conf)",
}
paths = [
"sudo", "/usr/local/sbin/wg-peerctl",
"/etc/wireguard/wg0.conf",
"/etc/wireguard/wg-meta.env",
"/var/log/wireguard-server-install.log",
"/var/log/wireguard-client-install.log",
"/var/log/wireguard-peerctl.log",
]
return render_template("scripts.html", commands=commands, paths=paths)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.environ.get("APP_PORT", "5080")), debug=False)