from __future__ import annotations from flask import Blueprint, jsonify, redirect, render_template, request, send_from_directory, session from .config import ADMIN_LOGIN, ADMIN_PASSWORD, ADMIN_PATH, BASE_DIR from .db import fetch_scope_data, get_db, scope_tables bp = Blueprint("main", __name__) def require_admin() -> bool: return bool(session.get("is_admin")) def parse_int(form_value: str | None) -> int | None: if not form_value: return None try: return int(form_value) except ValueError: return None @bp.get("/") def index(): return render_template("index.html") @bp.get("/api/data") def api_data(): scope = (request.args.get("scope") or "infra").strip().lower() if scope in {"ib", "sec", "security"}: scope = "ib" else: scope = "infra" return jsonify(fetch_scope_data(scope)) @bp.route(ADMIN_PATH, methods=["GET", "POST"]) def admin_login_or_panel(): conn = get_db() raw_scope = (request.args.get("scope") or request.form.get("scope") or "infra").strip().lower() scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra" tables = scope_tables(scope) if request.method == "POST" and not require_admin() and request.form.get("action") == "login": if request.form.get("username") == ADMIN_LOGIN and request.form.get("password") == ADMIN_PASSWORD: session["is_admin"] = True conn.close() return redirect(ADMIN_PATH) conn.close() return render_template("login.html", error="Неверный логин или пароль") if not require_admin(): conn.close() return render_template("login.html", error=None) if request.method == "POST": action = request.form.get("action", "") if action == "logout": session.pop("is_admin", None) conn.close() return redirect(ADMIN_PATH) if action == "add_vendor": name = (request.form.get("name") or "").strip() if name: conn.execute(f"INSERT OR IGNORE INTO {tables['vendors']}(name) VALUES (?)", (name,)) elif action == "add_category": name = (request.form.get("name") or "").strip() if name: conn.execute(f"INSERT OR IGNORE INTO {tables['categories']}(name) VALUES (?)", (name,)) elif action == "add_product": vendor_id = parse_int(request.form.get("vendor_id")) name = (request.form.get("name") or "").strip() url = (request.form.get("url") or "").strip() if vendor_id and name: conn.execute( f"INSERT OR IGNORE INTO {tables['products']}(vendor_id, name, url) VALUES (?, ?, ?)", (vendor_id, name, url or None), ) if url: conn.execute( f"UPDATE {tables['products']} SET url = ? WHERE vendor_id = ? AND name = ?", (url, vendor_id, name), ) elif action == "delete_vendor": v_id = parse_int(request.form.get("vendor_id")) if v_id: conn.execute(f"DELETE FROM {tables['vendors']} WHERE id = ?", (v_id,)) elif action == "delete_category": c_id = parse_int(request.form.get("category_id")) if c_id: conn.execute(f"DELETE FROM {tables['categories']} WHERE id = ?", (c_id,)) elif action == "delete_product": p_id = parse_int(request.form.get("product_id")) if p_id: conn.execute(f"DELETE FROM {tables['products']} WHERE id = ?", (p_id,)) elif action == "save_matrix": products = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['products']}")] categories = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['categories']}")] new_pairs: list[tuple[int, int]] = [] for p_id in products: for c_id in categories: if request.form.get(f"pc_{p_id}_{c_id}"): new_pairs.append((p_id, c_id)) conn.execute(f"DELETE FROM {tables['product_categories']}") conn.executemany( f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)", new_pairs, ) conn.execute(f"DELETE FROM {tables['vendor_categories']}") conn.execute( f""" INSERT OR IGNORE INTO {tables['vendor_categories']}(vendor_id, category_id) SELECT DISTINCT p.vendor_id, pc.category_id FROM {tables['products']} p JOIN {tables['product_categories']} pc ON pc.product_id = p.id """ ) conn.commit() conn.close() return redirect(f"{ADMIN_PATH}?scope={scope}") vendors = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['vendors']} ORDER BY lower(name)")] categories = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['categories']} ORDER BY lower(name)")] products = [ dict(r) for r in conn.execute( f""" SELECT p.id, p.name, p.vendor_id, v.name AS vendor_name , p.url FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id ORDER BY lower(v.name), lower(p.name) """ ) ] links = { (r["product_id"], r["category_id"]) for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}") } conn.close() return render_template( "admin.html", vendors=vendors, categories=categories, products=products, links=links, scope=scope, ) @bp.get("/health") def health(): return {"status": "ok"} @bp.get("/assets/mont-logo") def mont_logo(): return send_from_directory(BASE_DIR, "mont_logo.png")