Files
Wireguard_server/gui/app.py

309 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import base64
import io
import os
import sqlite3
import subprocess
from datetime import datetime
import qrcode
from flask import Flask, redirect, render_template, request, url_for, flash, Response
app = Flask(__name__)
app.secret_key = os.environ.get("APP_SECRET", "dev-secret")
DB_PATH = os.environ.get("DB_PATH", "/opt/wg-admin-gui/data/wgadmin.db")
WG_INTERFACE = os.environ.get("WG_INTERFACE", "wg0")
WG_META_FILE = os.environ.get("WG_META_FILE", "/etc/wireguard/wg-meta.env")
ADMIN_USER = os.environ.get("ADMIN_USER", "admin")
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "")
def db_conn():
db_dir = os.path.dirname(DB_PATH)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def ensure_schema():
with db_conn() as conn:
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS peers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
public_key TEXT UNIQUE NOT NULL,
client_address TEXT,
advertised_routes TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
""")
conn.commit()
def run(cmd):
return subprocess.check_output(cmd, text=True).strip()
def load_meta():
meta = {}
if not os.path.exists(WG_META_FILE):
return meta
with open(WG_META_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or "=" not in line:
continue
k, v = line.split("=", 1)
meta[k] = v
return meta
def parse_kv(text):
out = {}
for line in text.splitlines():
if "=" not in line:
continue
k, v = line.split("=", 1)
out[k.strip()] = v.strip()
return out
def wg_dump():
try:
out = run(["wg", "show", WG_INTERFACE, "dump"])
except Exception:
return []
rows = out.splitlines()
peers = []
for line in rows[1:]:
parts = line.split("\t")
if len(parts) < 8:
continue
latest = "never"
if parts[5].isdigit() and int(parts[5]) > 0:
latest = datetime.utcfromtimestamp(int(parts[5])).strftime("%Y-%m-%d %H:%M:%S UTC")
peers.append(
{
"public_key": parts[0],
"endpoint": parts[2] or "-",
"allowed_ips": parts[3],
"latest_handshake": latest,
"rx_bytes": int(parts[6] or 0),
"tx_bytes": int(parts[7] or 0),
}
)
return peers
def bytes_h(n):
units = ["B", "KiB", "MiB", "GiB", "TiB"]
x = float(n)
for u in units:
if x < 1024 or u == units[-1]:
return f"{x:.1f} {u}" if u != "B" else f"{int(x)} B"
x /= 1024
def gen_keypair_psk():
priv = run(["wg", "genkey"])
pub = subprocess.check_output(["wg", "pubkey"], input=priv, text=True).strip()
psk = run(["wg", "genpsk"])
return priv, pub, psk
def to_png_b64(text):
img = qrcode.make(text)
buf = io.BytesIO()
img.save(buf, format="PNG")
return base64.b64encode(buf.getvalue()).decode("ascii")
def _unauthorized():
return Response(
"Auth required",
401,
{"WWW-Authenticate": 'Basic realm="WG Admin"'},
)
@app.before_request
def _auth():
if request.path.startswith("/static/"):
return None
if not ADMIN_PASSWORD:
return None
auth = request.authorization
if not auth:
return _unauthorized()
if auth.username != ADMIN_USER or auth.password != ADMIN_PASSWORD:
return _unauthorized()
return None
@app.before_request
def _schema():
ensure_schema()
@app.route("/")
def index():
meta = load_meta()
runtime = {p["public_key"]: p for p in wg_dump()}
with db_conn() as conn:
cur = conn.cursor()
cur.execute("SELECT * FROM peers ORDER BY id DESC")
db_peers = [dict(r) for r in cur.fetchall()]
items = []
seen = set()
for row in db_peers:
rt = runtime.get(row["public_key"], {})
seen.add(row["public_key"])
items.append(
{
"name": row["name"],
"public_key": row["public_key"],
"client_address": row.get("client_address") or "-",
"routes": row.get("advertised_routes") or "-",
"allowed_ips": rt.get("allowed_ips", "-"),
"endpoint": rt.get("endpoint", "-"),
"latest_handshake": rt.get("latest_handshake", "offline"),
"rx": bytes_h(rt.get("rx_bytes", 0)),
"tx": bytes_h(rt.get("tx_bytes", 0)),
"status": "online" if rt else "offline",
}
)
for pk, rt in runtime.items():
if pk in seen:
continue
items.append(
{
"name": "(external)",
"public_key": pk,
"client_address": rt.get("allowed_ips", "-").split(",", 1)[0],
"routes": "-",
"allowed_ips": rt.get("allowed_ips", "-"),
"endpoint": rt.get("endpoint", "-"),
"latest_handshake": rt.get("latest_handshake", "offline"),
"rx": bytes_h(rt.get("rx_bytes", 0)),
"tx": bytes_h(rt.get("tx_bytes", 0)),
"status": "online",
}
)
return render_template("index.html", peers=items, meta=meta)
@app.route("/peers/new", methods=["GET", "POST"])
def new_peer():
meta = load_meta()
if request.method == "GET":
return render_template("new_peer.html", meta=meta)
name = request.form.get("name", "").strip()
mode = request.form.get("mode", "full").strip()
allowed_ips = request.form.get("allowed_ips", "").strip()
routes = request.form.get("routes", "").strip()
if not name:
flash("Укажите имя клиента", "error")
return redirect(url_for("new_peer"))
if mode == "full":
allowed_ips = "0.0.0.0/0,::/0"
elif not allowed_ips:
allowed_ips = meta.get("WG_NETWORK", "10.66.66.0/24")
client_priv, client_pub, client_psk = gen_keypair_psk()
cmd = [
"/usr/local/sbin/wg-peerctl",
"add",
"--client-name",
name,
"--client-public-key",
client_pub,
"--client-preshared-key",
client_psk,
"--persistent-keepalive",
"25",
]
if routes:
cmd += ["--client-routes", routes]
try:
resp = parse_kv(run(cmd))
except subprocess.CalledProcessError as e:
flash(f"Не удалось добавить peer: {e}", "error")
return redirect(url_for("new_peer"))
client_addr = resp.get("CLIENT_ADDRESS", "")
server_pub = resp.get("SERVER_PUBLIC_KEY", "")
endpoint = resp.get("SERVER_ENDPOINT", "")
dns = resp.get("SERVER_DNS", "1.1.1.1")
conf_lines = [
"[Interface]",
f"PrivateKey = {client_priv}",
f"Address = {client_addr}",
f"DNS = {dns}",
"",
"[Peer]",
f"PublicKey = {server_pub}",
f"PresharedKey = {client_psk}",
f"Endpoint = {endpoint}",
f"AllowedIPs = {allowed_ips}",
"PersistentKeepalive = 25",
"",
]
client_conf = "\n".join(conf_lines)
qr_b64 = to_png_b64(client_conf)
with db_conn() as conn:
cur = conn.cursor()
cur.execute(
"UPDATE peers SET name=?, client_address=?, advertised_routes=? WHERE public_key=?",
(name, client_addr, routes, client_pub),
)
if cur.rowcount == 0:
cur.execute(
"INSERT INTO peers(name, public_key, client_address, advertised_routes) VALUES (?,?,?,?)",
(name, client_pub, client_addr, routes),
)
conn.commit()
return render_template(
"peer_created.html",
name=name,
client_conf=client_conf,
qr_b64=qr_b64,
public_key=client_pub,
)
@app.route("/scripts")
def scripts():
commands = {
"server_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/server/install_server.sh\"",
"client_install": "tmp=\"$(mktemp -d)\" && curl -fL \"https://git.ruslan.xyz/ruslan/Wireguard_server/archive/main.tar.gz\" -o \"$tmp/repo.tar.gz\" && tar -xzf \"$tmp/repo.tar.gz\" -C \"$tmp\" && bash \"$tmp/wireguard_server/client/install_client.sh\"",
"apply_wg": "wg syncconf wg0 <(wg-quick strip /etc/wireguard/wg0.conf)",
}
paths = [
"/usr/local/sbin/wg-peerctl",
"/etc/wireguard/wg0.conf",
"/etc/wireguard/wg-meta.env",
"/var/log/wireguard-server-install.log",
"/var/log/wireguard-client-install.log",
"/var/log/wireguard-peerctl.log",
]
return render_template("scripts.html", commands=commands, paths=paths)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=int(os.environ.get("APP_PORT", "5080")), debug=False)