Files
Wireguard_server/gui/app.py
T
ruslan 2391007a81 feat(gui): custom login page, session-based auth
Replace nginx auth_basic + HTTP Basic Auth with a styled Flask login form.
- Session-based authentication (cookie, session.permanent)
- Custom login page with logo, error state, clean form design
- CSRF check skipped for /login route
- Logout button in sidebar footer
- nginx auth_basic removed; ADMIN_PASSWORD restored in .env

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 10:42:44 +03:00

679 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import base64
import io
import os
import secrets
import sqlite3
import subprocess
import time
from datetime import datetime
import qrcode
from flask import Flask, redirect, render_template, request, url_for, flash, Response, session, send_file
app = Flask(__name__)
app.secret_key = os.environ.get("APP_SECRET", "dev-secret")
DB_PATH = os.environ.get("DB_PATH", "/opt/wg-admin-gui/data/wgadmin.db")
WG_INTERFACE = os.environ.get("WG_INTERFACE", "wg0")
WG_META_FILE = os.environ.get("WG_META_FILE", "/etc/wireguard/wg-meta.env")
ADMIN_USER = os.environ.get("ADMIN_USER", "admin")
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "")
ONLINE_WINDOW_SEC = int(os.environ.get("ONLINE_WINDOW_SEC", "120"))
def db_conn():
db_dir = os.path.dirname(DB_PATH)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def ensure_schema():
with db_conn() as conn:
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS peers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
public_key TEXT UNIQUE NOT NULL,
client_address TEXT,
advertised_routes TEXT,
client_conf TEXT,
peer_psk TEXT,
peer_allowed_ips TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
""")
cols = {row[1] for row in cur.execute("PRAGMA table_info(peers)").fetchall()}
if "client_conf" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN client_conf TEXT")
if "peer_psk" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN peer_psk TEXT")
if "peer_allowed_ips" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN peer_allowed_ips TEXT")
if "enabled" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN enabled INTEGER NOT NULL DEFAULT 1")
conn.commit()
# Run once per worker process on import
ensure_schema()
def run(cmd):
return subprocess.check_output(cmd, text=True).strip()
def load_meta():
meta = {}
if not os.path.exists(WG_META_FILE):
return meta
with open(WG_META_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or "=" not in line:
continue
k, v = line.split("=", 1)
meta[k] = v
return meta
def parse_kv(text):
out = {}
for line in text.splitlines():
if "=" not in line:
continue
k, v = line.split("=", 1)
out[k.strip()] = v.strip()
return out
def parse_wg_conf_peer_meta():
path = f"/etc/wireguard/{WG_INTERFACE}.conf"
if not os.path.exists(path):
return {}
by_pub = {}
pending_name = None
in_peer = False
current_pub = ""
current_allowed = ""
def flush():
nonlocal current_pub, current_allowed, pending_name
if not current_pub:
return
first = current_allowed.split(",", 1)[0].strip() if current_allowed else ""
routes = ""
if "," in current_allowed:
routes = current_allowed.split(",", 1)[1].strip()
by_pub[current_pub] = {
"name": pending_name or "(external)",
"client_address": first,
"routes": routes,
"allowed_ips": current_allowed,
}
pending_name = None
current_pub = ""
current_allowed = ""
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line:
continue
if line.startswith("# managed-by=wg-peerctl"):
marker = "client="
if marker in line:
pending_name = line.split(marker, 1)[1].split()[0].strip()
continue
if line == "[Peer]":
if in_peer:
flush()
in_peer = True
continue
if in_peer and line.startswith("PublicKey"):
current_pub = line.split("=", 1)[1].strip()
continue
if in_peer and line.startswith("AllowedIPs"):
current_allowed = line.split("=", 1)[1].strip()
continue
if in_peer:
flush()
return by_pub
def wg_dump():
try:
out = run(["sudo", "/usr/bin/wg", "show", WG_INTERFACE, "dump"])
except Exception:
return []
rows = out.splitlines()
peers = []
for line in rows[1:]:
parts = line.split("\t")
if len(parts) < 8:
continue
# dump format: pubkey psk endpoint allowed_ips handshake_ts rx tx keepalive
latest_ts = 0
if parts[4].isdigit() and int(parts[4]) > 0:
latest_ts = int(parts[4])
peers.append(
{
"public_key": parts[0],
"endpoint": parts[2] or "-",
"allowed_ips": parts[3],
"latest_handshake_ts": latest_ts,
"rx_bytes": int(parts[5] or 0),
"tx_bytes": int(parts[6] or 0),
}
)
return peers
def bytes_h(n):
units = ["B", "KiB", "MiB", "GiB", "TiB"]
x = float(n)
for u in units:
if x < 1024 or u == units[-1]:
return f"{x:.1f} {u}" if u != "B" else f"{int(x)} B"
x /= 1024
def humanize_ago(ts: int, now: int) -> str:
if not ts:
return "никогда"
diff = now - ts
if diff < 60:
return f"{diff}с назад"
if diff < 3600:
return f"{diff // 60}м назад"
if diff < 86400:
return f"{diff // 3600}ч назад"
return f"{diff // 86400}д назад"
_dns_cache: dict = {} # ip -> (hostname, expires_ts)
_DNS_TTL = 300 # 5 минут
def resolve_hostname(ip: str) -> str:
if not ip or ip in ("-", "(none)"):
return ""
now = time.time()
if ip in _dns_cache:
host, exp = _dns_cache[ip]
if now < exp:
return host
import socket
try:
host = socket.gethostbyaddr(ip)[0]
except Exception:
host = ""
_dns_cache[ip] = (host, now + _DNS_TTL)
return host
def next_free_ip(network: str) -> str:
import ipaddress
try:
net = ipaddress.ip_network(network, strict=False)
except ValueError:
return ""
with db_conn() as conn:
used = {
row[0].split("/")[0]
for row in conn.execute("SELECT client_address FROM peers WHERE client_address IS NOT NULL")
}
# skip network address and first host (server usually takes .1)
for host in list(net.hosts())[1:]:
if str(host) not in used:
return f"{host}/{net.prefixlen}"
return ""
def gen_keypair_psk():
priv = run(["wg", "genkey"])
pub = subprocess.check_output(["wg", "pubkey"], input=priv, text=True).strip()
psk = run(["wg", "genpsk"])
return priv, pub, psk
def to_png_b64(text):
img = qrcode.make(text)
buf = io.BytesIO()
img.save(buf)
return base64.b64encode(buf.getvalue()).decode("ascii")
def _get_csrf_token():
if "csrf_token" not in session:
session["csrf_token"] = secrets.token_hex(32)
return session["csrf_token"]
app.jinja_env.globals["csrf_token"] = _get_csrf_token
PUBLIC_PATHS = {"/login", "/static/"}
@app.before_request
def _auth():
if request.path.startswith("/static/"):
return None
if request.path == "/login":
return None
if not ADMIN_PASSWORD:
return None
if not session.get("logged_in"):
return redirect(url_for("login", next=request.path))
return None
@app.before_request
def _csrf_check():
if request.method != "POST":
return None
if request.path == "/login":
return None
token = request.form.get("csrf_token")
if not token or token != session.get("csrf_token"):
return Response("CSRF token invalid", 403)
return None
@app.route("/login", methods=["GET", "POST"])
def login():
if session.get("logged_in"):
return redirect(url_for("index"))
error = None
if request.method == "POST":
user = request.form.get("username", "").strip()
pwd = request.form.get("password", "")
if user == ADMIN_USER and pwd == ADMIN_PASSWORD:
session.clear()
session["logged_in"] = True
session.permanent = True
return redirect(request.args.get("next") or url_for("index"))
error = "Неверный логин или пароль"
return render_template("login.html", error=error)
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
@app.route("/")
def index():
meta = load_meta()
runtime = {p["public_key"]: p for p in wg_dump()}
conf_meta = parse_wg_conf_peer_meta()
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers ORDER BY id DESC")
db_peers = [dict(r) for r in cur.fetchall()]
items = []
seen = set()
now = int(time.time())
for row in db_peers:
rt = runtime.get(row["public_key"], {})
ts = int(rt.get("latest_handshake_ts", 0) or 0)
is_online = ts > 0 and (now - ts) <= ONLINE_WINDOW_SEC
seen.add(row["public_key"])
items.append(
{
"id": row["id"],
"name": row["name"],
"public_key": row["public_key"],
"client_address": row.get("client_address") or "-",
"routes": row.get("advertised_routes") or "-",
"allowed_ips": rt.get("allowed_ips", "-"),
"endpoint": rt.get("endpoint", "-"),
"hostname": resolve_hostname(rt.get("endpoint", "").split(":")[0]),
"handshake_ago": humanize_ago(ts, now),
"rx": bytes_h(rt.get("rx_bytes", 0)),
"tx": bytes_h(rt.get("tx_bytes", 0)),
"status": "online" if is_online else "offline",
"enabled": row.get("enabled", 1),
}
)
for pk, rt in runtime.items():
if pk in seen:
continue
cm = conf_meta.get(pk, {})
imported_name = cm.get("name", "(external)")
imported_addr = cm.get("client_address", rt.get("allowed_ips", "-").split(",", 1)[0])
imported_routes = cm.get("routes", "-") or "-"
ext_id = None
with db_conn() as conn:
cur = conn.cursor()
cur.execute(
"INSERT OR IGNORE INTO peers(name, public_key, client_address, advertised_routes, enabled) VALUES (?,?,?,?,1)",
(imported_name, pk, imported_addr, imported_routes if imported_routes != "-" else ""),
)
cur.execute("SELECT id FROM peers WHERE public_key = ?", (pk,))
got = cur.fetchone()
if got:
ext_id = got["id"]
conn.commit()
ts = int(rt.get("latest_handshake_ts", 0) or 0)
is_online = ts > 0 and (now - ts) <= ONLINE_WINDOW_SEC
items.append(
{
"id": ext_id,
"name": imported_name,
"public_key": pk,
"client_address": imported_addr,
"routes": imported_routes,
"allowed_ips": rt.get("allowed_ips", "-"),
"endpoint": rt.get("endpoint", "-"),
"hostname": resolve_hostname(rt.get("endpoint", "").split(":")[0]),
"handshake_ago": humanize_ago(ts, now),
"rx": bytes_h(rt.get("rx_bytes", 0)),
"tx": bytes_h(rt.get("tx_bytes", 0)),
"status": "online" if is_online else "offline",
"enabled": 1,
}
)
return render_template("index.html", peers=items, meta=meta)
@app.route("/peers/new", methods=["GET", "POST"])
def new_peer():
meta = load_meta()
if request.method == "GET":
next_ip = next_free_ip(meta.get("WG_NETWORK", "10.66.66.0/24"))
return render_template("new_peer.html", meta=meta, next_ip=next_ip)
name = request.form.get("name", "").strip()
mode = request.form.get("mode", "full").strip()
allowed_ips = request.form.get("allowed_ips", "").strip()
routes = request.form.get("routes", "").strip()
if not name:
flash("Укажите имя клиента", "error")
return redirect(url_for("new_peer"))
if mode == "full":
allowed_ips = "0.0.0.0/0,::/0"
elif not allowed_ips:
allowed_ips = meta.get("WG_NETWORK", "10.66.66.0/24")
client_priv, client_pub, client_psk = gen_keypair_psk()
cmd = [
"sudo", "/usr/local/sbin/wg-peerctl",
"add",
"--client-name", name,
"--client-public-key", client_pub,
"--client-preshared-key", client_psk,
"--persistent-keepalive", "25",
]
if routes:
cmd += ["--client-routes", routes]
try:
resp = parse_kv(run(cmd))
except subprocess.CalledProcessError as e:
flash(f"Не удалось добавить peer: {e}", "error")
return redirect(url_for("new_peer"))
client_addr = resp.get("CLIENT_ADDRESS", "")
server_pub = resp.get("SERVER_PUBLIC_KEY", "")
endpoint = resp.get("SERVER_ENDPOINT", "")
dns = resp.get("SERVER_DNS", "1.1.1.1")
conf_lines = [
"[Interface]",
f"PrivateKey = {client_priv}",
f"Address = {client_addr}",
f"DNS = {dns}",
"",
"[Peer]",
f"PublicKey = {server_pub}",
f"PresharedKey = {client_psk}",
f"Endpoint = {endpoint}",
f"AllowedIPs = {allowed_ips}",
"PersistentKeepalive = 25",
"",
]
client_conf = "\n".join(conf_lines)
qr_b64 = to_png_b64(client_conf)
with db_conn() as conn:
cur = conn.cursor()
cur.execute(
"UPDATE peers SET name=?, client_address=?, advertised_routes=?, client_conf=?, peer_psk=?, peer_allowed_ips=?, enabled=1 WHERE public_key=?",
(name, client_addr, routes, client_conf, client_psk, client_addr + (("," + routes) if routes else ""), client_pub),
)
if cur.rowcount == 0:
cur.execute(
"INSERT INTO peers(name, public_key, client_address, advertised_routes, client_conf, peer_psk, peer_allowed_ips, enabled) VALUES (?,?,?,?,?,?,?,1)",
(name, client_pub, client_addr, routes, client_conf, client_psk, client_addr + (("," + routes) if routes else "")),
)
conn.commit()
return render_template(
"peer_created.html",
name=name,
client_conf=client_conf,
qr_b64=qr_b64,
public_key=client_pub,
)
@app.route("/peers/<int:peer_id>")
def peer_view(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
conf = item.get("client_conf") or ""
if not conf:
flash("Для этого клиента не найден сохраненный конфиг", "error")
return redirect(url_for("index"))
qr_b64 = to_png_b64(conf)
return render_template(
"peer_created.html",
name=item.get("name", "peer"),
client_conf=conf,
qr_b64=qr_b64,
public_key=item.get("public_key", ""),
)
@app.route("/peers/<int:peer_id>/download")
def peer_download(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
conf = item.get("client_conf") or ""
if not conf:
flash("Для этого клиента не найден сохраненный конфиг", "error")
return redirect(url_for("index"))
name = item.get("name", "peer")
buf = io.BytesIO(conf.encode("utf-8"))
return send_file(buf, as_attachment=True, download_name=f"{name}.conf", mimetype="text/plain")
@app.post("/peers/<int:peer_id>/disable")
def peer_disable(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
pk = item.get("public_key", "")
if not pk:
flash("Не найден public key", "error")
return redirect(url_for("index"))
try:
run(["sudo", "/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk])
except subprocess.CalledProcessError as e:
flash(f"Не удалось отключить peer: {e}", "error")
return redirect(url_for("index"))
with db_conn() as conn:
cur = conn.cursor()
cur.execute("UPDATE peers SET enabled=0 WHERE id = ?", (peer_id,))
conn.commit()
flash("Peer отключен", "ok")
return redirect(url_for("index"))
@app.post("/peers/<int:peer_id>/enable")
def peer_enable(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
name = item.get("name", "")
pk = item.get("public_key", "")
addr = item.get("client_address", "")
routes = item.get("advertised_routes", "") or ""
psk = item.get("peer_psk", "") or ""
if not (name and pk and addr and psk):
flash("Недостаточно данных для включения peer (нужны name/public key/address/psk)", "error")
return redirect(url_for("index"))
cmd = [
"sudo", "/usr/local/sbin/wg-peerctl",
"add",
"--client-name", name,
"--client-public-key", pk,
"--client-address", addr,
"--client-preshared-key", psk,
"--persistent-keepalive", "25",
]
if routes:
cmd += ["--client-routes", routes]
try:
run(cmd)
except subprocess.CalledProcessError as e:
flash(f"Не удалось включить peer: {e}", "error")
return redirect(url_for("index"))
with db_conn() as conn:
cur = conn.cursor()
cur.execute("UPDATE peers SET enabled=1 WHERE id = ?", (peer_id,))
conn.commit()
flash("Peer включен", "ok")
return redirect(url_for("index"))
@app.post("/peers/<int:peer_id>/delete")
def peer_delete(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
pk = item.get("public_key", "")
if pk:
try:
run(["sudo", "/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk])
except Exception:
pass
with db_conn() as conn:
cur = conn.cursor()
cur.execute("DELETE FROM peers WHERE id = ?", (peer_id,))
conn.commit()
flash("Peer удален", "ok")
return redirect(url_for("index"))
@app.post("/peers/delete-by-key")
def peer_delete_by_key():
pk = (request.form.get("public_key") or "").strip()
if not pk:
flash("Не найден public key", "error")
return redirect(url_for("index"))
try:
run(["sudo", "/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk])
except Exception:
pass
with db_conn() as conn:
cur = conn.cursor()
cur.execute("DELETE FROM peers WHERE public_key = ?", (pk,))
conn.commit()
flash("Peer удален", "ok")
return redirect(url_for("index"))
@app.post("/peers/<int:peer_id>/rename")
def peer_rename(peer_id: int):
name = (request.form.get("name") or "").strip()
if not name:
flash("Имя не может быть пустым", "error")
return redirect(url_for("index"))
with db_conn() as conn:
cur = conn.cursor()
cur.execute("UPDATE peers SET name=? WHERE id=?", (name, peer_id))
conn.commit()
return ("", 204)
@app.route("/scripts")
def scripts():
commands = {
"server_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/server/install_server.sh\"",
"client_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/client/install_client.sh\"",
"apply_wg": "wg syncconf wg0 <(wg-quick strip /etc/wireguard/wg0.conf)",
}
paths = [
"sudo", "/usr/local/sbin/wg-peerctl",
"/etc/wireguard/wg0.conf",
"/etc/wireguard/wg-meta.env",
"/var/log/wireguard-server-install.log",
"/var/log/wireguard-client-install.log",
"/var/log/wireguard-peerctl.log",
]
return render_template("scripts.html", commands=commands, paths=paths)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.environ.get("APP_PORT", "5080")), debug=False)