Files
Wireguard_server/gui/app.py

562 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import base64
import io
import os
import sqlite3
import subprocess
import time
from datetime import datetime
import qrcode
from flask import Flask, redirect, render_template, request, url_for, flash, Response
app = Flask(__name__)
app.secret_key = os.environ.get("APP_SECRET", "dev-secret")
DB_PATH = os.environ.get("DB_PATH", "/opt/wg-admin-gui/data/wgadmin.db")
WG_INTERFACE = os.environ.get("WG_INTERFACE", "wg0")
WG_META_FILE = os.environ.get("WG_META_FILE", "/etc/wireguard/wg-meta.env")
ADMIN_USER = os.environ.get("ADMIN_USER", "admin")
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "")
ONLINE_WINDOW_SEC = int(os.environ.get("ONLINE_WINDOW_SEC", "120"))
def db_conn():
db_dir = os.path.dirname(DB_PATH)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def ensure_schema():
with db_conn() as conn:
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS peers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
public_key TEXT UNIQUE NOT NULL,
client_address TEXT,
advertised_routes TEXT,
client_conf TEXT,
peer_psk TEXT,
peer_allowed_ips TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
""")
cols = {row[1] for row in cur.execute("PRAGMA table_info(peers)").fetchall()}
if "client_conf" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN client_conf TEXT")
if "peer_psk" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN peer_psk TEXT")
if "peer_allowed_ips" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN peer_allowed_ips TEXT")
if "enabled" not in cols:
cur.execute("ALTER TABLE peers ADD COLUMN enabled INTEGER NOT NULL DEFAULT 1")
conn.commit()
def run(cmd):
return subprocess.check_output(cmd, text=True).strip()
def load_meta():
meta = {}
if not os.path.exists(WG_META_FILE):
return meta
with open(WG_META_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or "=" not in line:
continue
k, v = line.split("=", 1)
meta[k] = v
return meta
def parse_kv(text):
out = {}
for line in text.splitlines():
if "=" not in line:
continue
k, v = line.split("=", 1)
out[k.strip()] = v.strip()
return out
def parse_wg_conf_peer_meta():
path = f"/etc/wireguard/{WG_INTERFACE}.conf"
if not os.path.exists(path):
return {}
by_pub = {}
pending_name = None
in_peer = False
current_pub = ""
current_allowed = ""
def flush():
nonlocal current_pub, current_allowed, pending_name
if not current_pub:
return
first = current_allowed.split(",", 1)[0].strip() if current_allowed else ""
routes = ""
if "," in current_allowed:
routes = current_allowed.split(",", 1)[1].strip()
by_pub[current_pub] = {
"name": pending_name or "(external)",
"client_address": first,
"routes": routes,
"allowed_ips": current_allowed,
}
pending_name = None
current_pub = ""
current_allowed = ""
with open(path, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line:
continue
if line.startswith("# managed-by=wg-peerctl"):
marker = "client="
if marker in line:
pending_name = line.split(marker, 1)[1].split()[0].strip()
continue
if line == "[Peer]":
if in_peer:
flush()
in_peer = True
continue
if in_peer and line.startswith("PublicKey"):
current_pub = line.split("=", 1)[1].strip()
continue
if in_peer and line.startswith("AllowedIPs"):
current_allowed = line.split("=", 1)[1].strip()
continue
if in_peer:
flush()
return by_pub
def wg_dump():
try:
out = run(["wg", "show", WG_INTERFACE, "dump"])
except Exception:
return []
rows = out.splitlines()
peers = []
for line in rows[1:]:
parts = line.split("\t")
if len(parts) < 8:
continue
latest_ts = 0
latest = "never"
if parts[5].isdigit() and int(parts[5]) > 0:
latest_ts = int(parts[5])
latest = datetime.utcfromtimestamp(latest_ts).strftime("%Y-%m-%d %H:%M:%S UTC")
peers.append(
{
"public_key": parts[0],
"endpoint": parts[2] or "-",
"allowed_ips": parts[3],
"latest_handshake": latest,
"latest_handshake_ts": latest_ts,
"rx_bytes": int(parts[6] or 0),
"tx_bytes": int(parts[7] or 0),
}
)
return peers
def bytes_h(n):
units = ["B", "KiB", "MiB", "GiB", "TiB"]
x = float(n)
for u in units:
if x < 1024 or u == units[-1]:
return f"{x:.1f} {u}" if u != "B" else f"{int(x)} B"
x /= 1024
def gen_keypair_psk():
priv = run(["wg", "genkey"])
pub = subprocess.check_output(["wg", "pubkey"], input=priv, text=True).strip()
psk = run(["wg", "genpsk"])
return priv, pub, psk
def to_png_b64(text):
img = qrcode.make(text)
buf = io.BytesIO()
img.save(buf)
return base64.b64encode(buf.getvalue()).decode("ascii")
def _unauthorized():
return Response(
"Auth required",
401,
{"WWW-Authenticate": 'Basic realm="WG Admin"'},
)
@app.before_request
def _auth():
if request.path.startswith("/static/"):
return None
if not ADMIN_PASSWORD:
return None
auth = request.authorization
if not auth:
return _unauthorized()
if auth.username != ADMIN_USER or auth.password != ADMIN_PASSWORD:
return _unauthorized()
return None
@app.before_request
def _schema():
ensure_schema()
@app.route("/")
def index():
meta = load_meta()
runtime = {p["public_key"]: p for p in wg_dump()}
conf_meta = parse_wg_conf_peer_meta()
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers ORDER BY id DESC")
db_peers = [dict(r) for r in cur.fetchall()]
items = []
seen = set()
now = int(time.time())
for row in db_peers:
rt = runtime.get(row["public_key"], {})
ts = int(rt.get("latest_handshake_ts", 0) or 0)
is_online = ts > 0 and (now - ts) <= ONLINE_WINDOW_SEC
seen.add(row["public_key"])
items.append(
{
"id": row["id"],
"name": row["name"],
"public_key": row["public_key"],
"client_address": row.get("client_address") or "-",
"routes": row.get("advertised_routes") or "-",
"allowed_ips": rt.get("allowed_ips", "-"),
"endpoint": rt.get("endpoint", "-"),
"latest_handshake": rt.get("latest_handshake", "offline"),
"rx": bytes_h(rt.get("rx_bytes", 0)),
"tx": bytes_h(rt.get("tx_bytes", 0)),
"status": "online" if is_online else "offline",
}
)
for pk, rt in runtime.items():
if pk in seen:
continue
cm = conf_meta.get(pk, {})
imported_name = cm.get("name", "(external)")
imported_addr = cm.get("client_address", rt.get("allowed_ips", "-").split(",", 1)[0])
imported_routes = cm.get("routes", "-") or "-"
ext_id = None
with db_conn() as conn:
cur = conn.cursor()
cur.execute(
"INSERT OR IGNORE INTO peers(name, public_key, client_address, advertised_routes, enabled) VALUES (?,?,?,?,1)",
(imported_name, pk, imported_addr, imported_routes if imported_routes != "-" else ""),
)
cur.execute("SELECT id FROM peers WHERE public_key = ?", (pk,))
got = cur.fetchone()
if got:
ext_id = got["id"]
conn.commit()
ts = int(rt.get("latest_handshake_ts", 0) or 0)
is_online = ts > 0 and (now - ts) <= ONLINE_WINDOW_SEC
items.append(
{
"id": ext_id,
"name": imported_name,
"public_key": pk,
"client_address": imported_addr,
"routes": imported_routes,
"allowed_ips": rt.get("allowed_ips", "-"),
"endpoint": rt.get("endpoint", "-"),
"latest_handshake": rt.get("latest_handshake", "offline"),
"rx": bytes_h(rt.get("rx_bytes", 0)),
"tx": bytes_h(rt.get("tx_bytes", 0)),
"status": "online" if is_online else "offline",
}
)
return render_template("index.html", peers=items, meta=meta)
@app.route("/peers/new", methods=["GET", "POST"])
def new_peer():
meta = load_meta()
if request.method == "GET":
return render_template("new_peer.html", meta=meta)
name = request.form.get("name", "").strip()
mode = request.form.get("mode", "full").strip()
allowed_ips = request.form.get("allowed_ips", "").strip()
routes = request.form.get("routes", "").strip()
if not name:
flash("Укажите имя клиента", "error")
return redirect(url_for("new_peer"))
if mode == "full":
allowed_ips = "0.0.0.0/0,::/0"
elif not allowed_ips:
allowed_ips = meta.get("WG_NETWORK", "10.66.66.0/24")
client_priv, client_pub, client_psk = gen_keypair_psk()
cmd = [
"/usr/local/sbin/wg-peerctl",
"add",
"--client-name",
name,
"--client-public-key",
client_pub,
"--client-preshared-key",
client_psk,
"--persistent-keepalive",
"25",
]
if routes:
cmd += ["--client-routes", routes]
try:
resp = parse_kv(run(cmd))
except subprocess.CalledProcessError as e:
flash(f"Не удалось добавить peer: {e}", "error")
return redirect(url_for("new_peer"))
client_addr = resp.get("CLIENT_ADDRESS", "")
server_pub = resp.get("SERVER_PUBLIC_KEY", "")
endpoint = resp.get("SERVER_ENDPOINT", "")
dns = resp.get("SERVER_DNS", "1.1.1.1")
conf_lines = [
"[Interface]",
f"PrivateKey = {client_priv}",
f"Address = {client_addr}",
f"DNS = {dns}",
"",
"[Peer]",
f"PublicKey = {server_pub}",
f"PresharedKey = {client_psk}",
f"Endpoint = {endpoint}",
f"AllowedIPs = {allowed_ips}",
"PersistentKeepalive = 25",
"",
]
client_conf = "\n".join(conf_lines)
qr_b64 = to_png_b64(client_conf)
with db_conn() as conn:
cur = conn.cursor()
cur.execute(
"UPDATE peers SET name=?, client_address=?, advertised_routes=?, client_conf=?, peer_psk=?, peer_allowed_ips=?, enabled=1 WHERE public_key=?",
(name, client_addr, routes, client_conf, client_psk, client_addr + (("," + routes) if routes else ""), client_pub),
)
if cur.rowcount == 0:
cur.execute(
"INSERT INTO peers(name, public_key, client_address, advertised_routes, client_conf, peer_psk, peer_allowed_ips, enabled) VALUES (?,?,?,?,?,?,?,1)",
(name, client_pub, client_addr, routes, client_conf, client_psk, client_addr + (("," + routes) if routes else "")),
)
conn.commit()
return render_template(
"peer_created.html",
name=name,
client_conf=client_conf,
qr_b64=qr_b64,
public_key=client_pub,
)
@app.route("/peers/<int:peer_id>")
def peer_view(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
conf = item.get("client_conf") or ""
if not conf:
flash("Для этого клиента не найден сохраненный конфиг", "error")
return redirect(url_for("index"))
qr_b64 = to_png_b64(conf)
return render_template(
"peer_created.html",
name=item.get("name", "peer"),
client_conf=conf,
qr_b64=qr_b64,
public_key=item.get("public_key", ""),
)
@app.post("/peers/<int:peer_id>/disable")
def peer_disable(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
pk = item.get("public_key", "")
if not pk:
flash("Не найден public key", "error")
return redirect(url_for("index"))
try:
run(["/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk])
except subprocess.CalledProcessError as e:
flash(f"Не удалось отключить peer: {e}", "error")
return redirect(url_for("index"))
with db_conn() as conn:
cur = conn.cursor()
cur.execute("UPDATE peers SET enabled=0 WHERE id = ?", (peer_id,))
conn.commit()
flash("Peer отключен", "ok")
return redirect(url_for("index"))
@app.post("/peers/<int:peer_id>/enable")
def peer_enable(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
name = item.get("name", "")
pk = item.get("public_key", "")
addr = item.get("client_address", "")
routes = item.get("advertised_routes", "") or ""
psk = item.get("peer_psk", "") or ""
if not (name and pk and addr and psk):
flash("Недостаточно данных для включения peer (нужны name/public key/address/psk)", "error")
return redirect(url_for("index"))
cmd = [
"/usr/local/sbin/wg-peerctl",
"add",
"--client-name",
name,
"--client-public-key",
pk,
"--client-address",
addr,
"--client-preshared-key",
psk,
"--persistent-keepalive",
"25",
]
if routes:
cmd += ["--client-routes", routes]
try:
run(cmd)
except subprocess.CalledProcessError as e:
flash(f"Не удалось включить peer: {e}", "error")
return redirect(url_for("index"))
with db_conn() as conn:
cur = conn.cursor()
cur.execute("UPDATE peers SET enabled=1 WHERE id = ?", (peer_id,))
conn.commit()
flash("Peer включен", "ok")
return redirect(url_for("index"))
@app.post("/peers/<int:peer_id>/delete")
def peer_delete(peer_id: int):
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers WHERE id = ?", (peer_id,))
row = cur.fetchone()
if not row:
flash("Клиент не найден", "error")
return redirect(url_for("index"))
item = dict(row)
pk = item.get("public_key", "")
if pk:
try:
run(["/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk])
except Exception:
pass
with db_conn() as conn:
cur = conn.cursor()
cur.execute("DELETE FROM peers WHERE id = ?", (peer_id,))
conn.commit()
flash("Peer удален", "ok")
return redirect(url_for("index"))
@app.post("/peers/delete-by-key")
def peer_delete_by_key():
pk = (request.form.get("public_key") or "").strip()
if not pk:
flash("Не найден public key", "error")
return redirect(url_for("index"))
try:
run(["/usr/local/sbin/wg-peerctl", "remove", "--client-public-key", pk])
except Exception:
pass
with db_conn() as conn:
cur = conn.cursor()
cur.execute("DELETE FROM peers WHERE public_key = ?", (pk,))
conn.commit()
flash("Peer удален", "ok")
return redirect(url_for("index"))
@app.route("/scripts")
def scripts():
commands = {
"server_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/server/install_server.sh\"",
"client_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/client/install_client.sh\"",
"apply_wg": "wg syncconf wg0 <(wg-quick strip /etc/wireguard/wg0.conf)",
}
paths = [
"/usr/local/sbin/wg-peerctl",
"/etc/wireguard/wg0.conf",
"/etc/wireguard/wg-meta.env",
"/var/log/wireguard-server-install.log",
"/var/log/wireguard-client-install.log",
"/var/log/wireguard-peerctl.log",
]
return render_template("scripts.html", commands=commands, paths=paths)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.environ.get("APP_PORT", "5080")), debug=False)