Files
Wireguard_server/gui/app.py

304 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import base64
import io
import os
import subprocess
from datetime import datetime
import qrcode
from flask import Flask, redirect, render_template, request, url_for, flash, Response
from psycopg import connect
from psycopg.rows import dict_row
app = Flask(__name__)
app.secret_key = os.environ.get("APP_SECRET", "dev-secret")
DB_DSN = os.environ.get("DB_DSN", "postgresql://wgadmin:wgadmin@127.0.0.1:5432/wgadmin")
WG_INTERFACE = os.environ.get("WG_INTERFACE", "wg0")
WG_META_FILE = os.environ.get("WG_META_FILE", "/etc/wireguard/wg-meta.env")
ADMIN_USER = os.environ.get("ADMIN_USER", "admin")
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "")
def db_conn():
return connect(DB_DSN, row_factory=dict_row)
def ensure_schema():
with db_conn() as conn, conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS peers (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL,
public_key TEXT UNIQUE NOT NULL,
client_address TEXT,
advertised_routes TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
"""
)
conn.commit()
def run(cmd):
return subprocess.check_output(cmd, text=True).strip()
def load_meta():
meta = {}
if not os.path.exists(WG_META_FILE):
return meta
with open(WG_META_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or "=" not in line:
continue
k, v = line.split("=", 1)
meta[k] = v
return meta
def parse_kv(text):
out = {}
for line in text.splitlines():
if "=" not in line:
continue
k, v = line.split("=", 1)
out[k.strip()] = v.strip()
return out
def wg_dump():
try:
out = run(["wg", "show", WG_INTERFACE, "dump"])
except Exception:
return []
rows = out.splitlines()
peers = []
for line in rows[1:]:
parts = line.split("\t")
if len(parts) < 8:
continue
latest = "never"
if parts[5].isdigit() and int(parts[5]) > 0:
latest = datetime.utcfromtimestamp(int(parts[5])).strftime("%Y-%m-%d %H:%M:%S UTC")
peers.append(
{
"public_key": parts[0],
"endpoint": parts[2] or "-",
"allowed_ips": parts[3],
"latest_handshake": latest,
"rx_bytes": int(parts[6] or 0),
"tx_bytes": int(parts[7] or 0),
}
)
return peers
def bytes_h(n):
units = ["B", "KiB", "MiB", "GiB", "TiB"]
x = float(n)
for u in units:
if x < 1024 or u == units[-1]:
return f"{x:.1f} {u}" if u != "B" else f"{int(x)} B"
x /= 1024
def gen_keypair_psk():
priv = run(["wg", "genkey"])
pub = subprocess.check_output(["wg", "pubkey"], input=priv, text=True).strip()
psk = run(["wg", "genpsk"])
return priv, pub, psk
def to_png_b64(text):
img = qrcode.make(text)
buf = io.BytesIO()
img.save(buf, format="PNG")
return base64.b64encode(buf.getvalue()).decode("ascii")
def _unauthorized():
return Response(
"Auth required",
401,
{"WWW-Authenticate": 'Basic realm="WG Admin"'},
)
@app.before_request
def _auth():
if request.path.startswith("/static/"):
return None
if not ADMIN_PASSWORD:
return None
auth = request.authorization
if not auth:
return _unauthorized()
if auth.username != ADMIN_USER or auth.password != ADMIN_PASSWORD:
return _unauthorized()
return None
@app.before_request
def _schema():
ensure_schema()
@app.route("/")
def index():
meta = load_meta()
runtime = {p["public_key"]: p for p in wg_dump()}
with db_conn() as conn, conn.cursor() as cur:
cur.execute("SELECT * FROM peers ORDER BY id DESC")
db_peers = cur.fetchall()
items = []
seen = set()
for row in db_peers:
rt = runtime.get(row["public_key"], {})
seen.add(row["public_key"])
items.append(
{
"name": row["name"],
"public_key": row["public_key"],
"client_address": row.get("client_address") or "-",
"routes": row.get("advertised_routes") or "-",
"allowed_ips": rt.get("allowed_ips", "-"),
"endpoint": rt.get("endpoint", "-"),
"latest_handshake": rt.get("latest_handshake", "offline"),
"rx": bytes_h(rt.get("rx_bytes", 0)),
"tx": bytes_h(rt.get("tx_bytes", 0)),
"status": "online" if rt else "offline",
}
)
for pk, rt in runtime.items():
if pk in seen:
continue
items.append(
{
"name": "(external)",
"public_key": pk,
"client_address": rt.get("allowed_ips", "-").split(",", 1)[0],
"routes": "-",
"allowed_ips": rt.get("allowed_ips", "-"),
"endpoint": rt.get("endpoint", "-"),
"latest_handshake": rt.get("latest_handshake", "offline"),
"rx": bytes_h(rt.get("rx_bytes", 0)),
"tx": bytes_h(rt.get("tx_bytes", 0)),
"status": "online",
}
)
return render_template("index.html", peers=items, meta=meta)
@app.route("/peers/new", methods=["GET", "POST"])
def new_peer():
meta = load_meta()
if request.method == "GET":
return render_template("new_peer.html", meta=meta)
name = request.form.get("name", "").strip()
mode = request.form.get("mode", "full").strip()
allowed_ips = request.form.get("allowed_ips", "").strip()
routes = request.form.get("routes", "").strip()
if not name:
flash("Укажите имя клиента", "error")
return redirect(url_for("new_peer"))
if mode == "full":
allowed_ips = "0.0.0.0/0,::/0"
elif not allowed_ips:
allowed_ips = meta.get("WG_NETWORK", "10.66.66.0/24")
client_priv, client_pub, client_psk = gen_keypair_psk()
cmd = [
"/usr/local/sbin/wg-peerctl",
"add",
"--client-name",
name,
"--client-public-key",
client_pub,
"--client-preshared-key",
client_psk,
"--persistent-keepalive",
"25",
]
if routes:
cmd += ["--client-routes", routes]
try:
resp = parse_kv(run(cmd))
except subprocess.CalledProcessError as e:
flash(f"Не удалось добавить peer: {e}", "error")
return redirect(url_for("new_peer"))
client_addr = resp.get("CLIENT_ADDRESS", "")
server_pub = resp.get("SERVER_PUBLIC_KEY", "")
endpoint = resp.get("SERVER_ENDPOINT", "")
dns = resp.get("SERVER_DNS", "1.1.1.1")
conf_lines = [
"[Interface]",
f"PrivateKey = {client_priv}",
f"Address = {client_addr}",
f"DNS = {dns}",
"",
"[Peer]",
f"PublicKey = {server_pub}",
f"PresharedKey = {client_psk}",
f"Endpoint = {endpoint}",
f"AllowedIPs = {allowed_ips}",
"PersistentKeepalive = 25",
"",
]
client_conf = "\n".join(conf_lines)
qr_b64 = to_png_b64(client_conf)
with db_conn() as conn, conn.cursor() as cur:
cur.execute(
"""
INSERT INTO peers(name, public_key, client_address, advertised_routes)
VALUES (%s,%s,%s,%s)
ON CONFLICT(public_key)
DO UPDATE SET name=excluded.name, client_address=excluded.client_address, advertised_routes=excluded.advertised_routes
""",
(name, client_pub, client_addr, routes),
)
conn.commit()
return render_template(
"peer_created.html",
name=name,
client_conf=client_conf,
qr_b64=qr_b64,
public_key=client_pub,
)
@app.route("/scripts")
def scripts():
commands = {
"server_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/server/install_server.sh\"",
"client_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/client/install_client.sh\"",
"apply_wg": "wg syncconf wg0 <(wg-quick strip /etc/wireguard/wg0.conf)",
}
paths = [
"/usr/local/sbin/wg-peerctl",
"/etc/wireguard/wg0.conf",
"/etc/wireguard/wg-meta.env",
"/var/log/wireguard-server-install.log",
"/var/log/wireguard-client-install.log",
"/var/log/wireguard-peerctl.log",
]
return render_template("scripts.html", commands=commands, paths=paths)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.environ.get("APP_PORT", "5080")), debug=False)