811 lines
32 KiB
Python
811 lines
32 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import secrets
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from html import escape
|
|
from urllib.parse import urlencode
|
|
from urllib.request import Request, urlopen
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from flask import Blueprint, flash, jsonify, redirect, render_template, request, send_from_directory, session
|
|
|
|
from .config import (
|
|
ADMIN_LOGIN,
|
|
ADMIN_PASSWORD,
|
|
ADMIN_PATH,
|
|
BASE_DIR,
|
|
SUPER_ADMIN_LOGIN,
|
|
SUPER_ADMIN_PASSWORD,
|
|
TELEGRAM_BOT_TOKEN,
|
|
TELEGRAM_CHAT_ID,
|
|
)
|
|
from .db import fetch_scope_data, get_db, scope_tables
|
|
|
|
bp = Blueprint("main", __name__)
|
|
MOSCOW_TZ = ZoneInfo("Europe/Moscow")
|
|
|
|
def require_admin() -> bool:
|
|
return bool(session.get("is_admin"))
|
|
|
|
|
|
def is_super_admin() -> bool:
|
|
return session.get("admin_role") == "super"
|
|
|
|
|
|
def allowed_scopes() -> set[str]:
|
|
if is_super_admin():
|
|
return {"infra", "ib"}
|
|
raw = session.get("admin_scopes") or "infra,ib"
|
|
scopes = {item.strip() for item in raw.split(",") if item.strip()}
|
|
return scopes or {"infra"}
|
|
|
|
|
|
def can_access_scope(scope: str) -> bool:
|
|
return scope in allowed_scopes()
|
|
|
|
|
|
def parse_int(form_value: str | None) -> int | None:
|
|
if not form_value:
|
|
return None
|
|
try:
|
|
return int(form_value)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def redirect_admin(scope: str):
|
|
return redirect(f"{ADMIN_PATH}?scope={scope}")
|
|
|
|
|
|
def make_admin_password() -> str:
|
|
return secrets.token_urlsafe(9)
|
|
|
|
|
|
def parse_admin_scopes() -> str:
|
|
scopes = []
|
|
if request.form.get("allow_infra"):
|
|
scopes.append("infra")
|
|
if request.form.get("allow_ib"):
|
|
scopes.append("ib")
|
|
return ",".join(scopes)
|
|
|
|
|
|
def local_now() -> str:
|
|
return datetime.now(MOSCOW_TZ).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def scope_label(scope: str) -> str:
|
|
return "ИБ" if scope == "ib" else "Инфраструктура"
|
|
|
|
|
|
def rebuild_vendor_categories(conn, tables: dict[str, str]) -> None:
|
|
conn.execute(f"DELETE FROM {tables['vendor_categories']}")
|
|
conn.execute(
|
|
f"""
|
|
INSERT OR IGNORE INTO {tables['vendor_categories']}(vendor_id, category_id)
|
|
SELECT DISTINCT p.vendor_id, pc.category_id
|
|
FROM {tables['products']} p
|
|
JOIN {tables['product_categories']} pc ON pc.product_id = p.id
|
|
"""
|
|
)
|
|
|
|
|
|
def collect_change_payload(action: str, scope: str, conn, tables: dict[str, str]) -> dict | None:
|
|
if action == "add_vendor":
|
|
name = (request.form.get("name") or "").strip()
|
|
return {"name": name} if name else None
|
|
|
|
if action == "add_category":
|
|
name = (request.form.get("name") or "").strip()
|
|
return {"name": name} if name else None
|
|
|
|
if action == "add_product":
|
|
vendor_id = parse_int(request.form.get("vendor_id"))
|
|
name = (request.form.get("name") or "").strip()
|
|
url = (request.form.get("url") or "").strip()
|
|
if not vendor_id or not name:
|
|
return None
|
|
vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
|
|
return {
|
|
"vendor_id": vendor_id,
|
|
"vendor_name": vendor["name"] if vendor else "",
|
|
"name": name,
|
|
"url": url,
|
|
}
|
|
|
|
if action == "update_product":
|
|
product_id = parse_int(request.form.get("product_id"))
|
|
name = (request.form.get("name") or "").strip()
|
|
url = (request.form.get("url") or "").strip()
|
|
if not product_id or not name:
|
|
return None
|
|
product = conn.execute(
|
|
f"""
|
|
SELECT p.name, p.url, v.name AS vendor_name
|
|
FROM {tables['products']} p
|
|
JOIN {tables['vendors']} v ON v.id = p.vendor_id
|
|
WHERE p.id = ?
|
|
""",
|
|
(product_id,),
|
|
).fetchone()
|
|
return {
|
|
"product_id": product_id,
|
|
"vendor_name": product["vendor_name"] if product else "",
|
|
"old_name": product["name"] if product else "",
|
|
"old_url": product["url"] if product else "",
|
|
"name": name,
|
|
"url": url,
|
|
}
|
|
|
|
if action == "update_vendor":
|
|
vendor_id = parse_int(request.form.get("vendor_id"))
|
|
name = (request.form.get("name") or "").strip()
|
|
website = (request.form.get("website") or "").strip()
|
|
mont_page = (request.form.get("mont_page") or "").strip()
|
|
if not vendor_id or not name:
|
|
return None
|
|
vendor = conn.execute(f"SELECT name, website, mont_page FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
|
|
return {
|
|
"vendor_id": vendor_id,
|
|
"old_name": vendor["name"] if vendor else "",
|
|
"name": name,
|
|
"website": website,
|
|
"mont_page": mont_page,
|
|
}
|
|
|
|
if action == "delete_vendor":
|
|
vendor_id = parse_int(request.form.get("vendor_id"))
|
|
if not vendor_id:
|
|
return None
|
|
vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
|
|
return {"vendor_id": vendor_id, "vendor_name": vendor["name"] if vendor else ""}
|
|
|
|
if action == "delete_category":
|
|
category_id = parse_int(request.form.get("category_id"))
|
|
if not category_id:
|
|
return None
|
|
category = conn.execute(f"SELECT name FROM {tables['categories']} WHERE id = ?", (category_id,)).fetchone()
|
|
return {"category_id": category_id, "category_name": category["name"] if category else ""}
|
|
|
|
if action == "delete_product":
|
|
product_id = parse_int(request.form.get("product_id"))
|
|
if not product_id:
|
|
return None
|
|
product = conn.execute(
|
|
f"""
|
|
SELECT p.name, v.name AS vendor_name
|
|
FROM {tables['products']} p
|
|
JOIN {tables['vendors']} v ON v.id = p.vendor_id
|
|
WHERE p.id = ?
|
|
""",
|
|
(product_id,),
|
|
).fetchone()
|
|
return {
|
|
"product_id": product_id,
|
|
"product_name": product["name"] if product else "",
|
|
"vendor_name": product["vendor_name"] if product else "",
|
|
}
|
|
|
|
if action == "save_matrix":
|
|
products = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['products']}")]
|
|
categories = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['categories']}")]
|
|
pairs: list[list[int]] = []
|
|
for product_id in products:
|
|
for category_id in categories:
|
|
if request.form.get(f"pc_{product_id}_{category_id}"):
|
|
pairs.append([product_id, category_id])
|
|
current_pairs = {
|
|
(r["product_id"], r["category_id"])
|
|
for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}")
|
|
}
|
|
new_pairs = {tuple(pair) for pair in pairs}
|
|
changed_pairs = sorted((new_pairs - current_pairs) | (current_pairs - new_pairs))
|
|
product_labels = {
|
|
r["id"]: f"{r['vendor_name']} :: {r['name']}"
|
|
for r in conn.execute(
|
|
f"""
|
|
SELECT p.id, p.name, v.name AS vendor_name
|
|
FROM {tables['products']} p
|
|
JOIN {tables['vendors']} v ON v.id = p.vendor_id
|
|
"""
|
|
)
|
|
}
|
|
category_labels = {
|
|
r["id"]: r["name"]
|
|
for r in conn.execute(f"SELECT id, name FROM {tables['categories']}")
|
|
}
|
|
added = []
|
|
removed = []
|
|
for product_id, category_id in changed_pairs:
|
|
item = {
|
|
"product_id": product_id,
|
|
"category_id": category_id,
|
|
"product": product_labels.get(product_id, str(product_id)),
|
|
"category": category_labels.get(category_id, str(category_id)),
|
|
}
|
|
if (product_id, category_id) in new_pairs:
|
|
added.append(item)
|
|
else:
|
|
removed.append(item)
|
|
return {"pairs": pairs, "added": added, "removed": removed}
|
|
|
|
return None
|
|
|
|
|
|
def apply_change(conn, tables: dict[str, str], action: str, payload: dict) -> None:
|
|
if action == "add_vendor":
|
|
conn.execute(f"INSERT OR IGNORE INTO {tables['vendors']}(name) VALUES (?)", (payload["name"],))
|
|
|
|
elif action == "add_category":
|
|
conn.execute(f"INSERT OR IGNORE INTO {tables['categories']}(name) VALUES (?)", (payload["name"],))
|
|
|
|
elif action == "add_product":
|
|
conn.execute(
|
|
f"INSERT OR IGNORE INTO {tables['products']}(vendor_id, name, url) VALUES (?, ?, ?)",
|
|
(payload["vendor_id"], payload["name"], payload.get("url") or None),
|
|
)
|
|
if payload.get("url"):
|
|
conn.execute(
|
|
f"UPDATE {tables['products']} SET url = ? WHERE vendor_id = ? AND name = ?",
|
|
(payload["url"], payload["vendor_id"], payload["name"]),
|
|
)
|
|
|
|
elif action == "update_product":
|
|
conn.execute(
|
|
f"UPDATE {tables['products']} SET name = ?, url = ? WHERE id = ?",
|
|
(payload["name"], payload.get("url") or None, payload["product_id"]),
|
|
)
|
|
|
|
elif action == "update_vendor":
|
|
conn.execute(
|
|
f"UPDATE {tables['vendors']} SET name = ?, website = ?, mont_page = ? WHERE id = ?",
|
|
(payload["name"], payload.get("website") or None, payload.get("mont_page") or None, payload["vendor_id"]),
|
|
)
|
|
|
|
elif action == "delete_vendor":
|
|
conn.execute(f"DELETE FROM {tables['vendors']} WHERE id = ?", (payload["vendor_id"],))
|
|
|
|
elif action == "delete_category":
|
|
conn.execute(f"DELETE FROM {tables['categories']} WHERE id = ?", (payload["category_id"],))
|
|
|
|
elif action == "delete_product":
|
|
conn.execute(f"DELETE FROM {tables['products']} WHERE id = ?", (payload["product_id"],))
|
|
|
|
elif action == "save_matrix":
|
|
conn.execute(f"DELETE FROM {tables['product_categories']}")
|
|
conn.executemany(
|
|
f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)",
|
|
[tuple(pair) for pair in payload.get("pairs", [])],
|
|
)
|
|
rebuild_vendor_categories(conn, tables)
|
|
|
|
|
|
def queue_change(conn, scope: str, action: str, payload: dict) -> int:
|
|
cur = conn.execute(
|
|
"""
|
|
INSERT INTO pending_changes(scope, action, payload, created_by, created_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
scope,
|
|
action,
|
|
json.dumps(payload, ensure_ascii=False),
|
|
session.get("admin_login") or ADMIN_LOGIN,
|
|
local_now(),
|
|
),
|
|
)
|
|
return int(cur.lastrowid)
|
|
|
|
|
|
def describe_change(action: str, payload: dict) -> str:
|
|
if action == "add_vendor":
|
|
return f"Добавить вендора: {payload.get('name')}"
|
|
if action == "add_category":
|
|
return f"Добавить категорию: {payload.get('name')}"
|
|
if action == "add_product":
|
|
parts = [
|
|
"Добавить продукт",
|
|
f"Вендор: {payload.get('vendor_name') or payload.get('vendor_id')}",
|
|
f"Название: {payload.get('name')}",
|
|
]
|
|
if payload.get("url"):
|
|
parts.append(f"URL: {payload.get('url')}")
|
|
return "\n".join(parts)
|
|
if action == "update_product":
|
|
return "\n".join(
|
|
[
|
|
"Изменить продукт",
|
|
f"Вендор: {payload.get('vendor_name') or '-'}",
|
|
f"Было: {payload.get('old_name') or '-'}",
|
|
f"Стало: {payload.get('name')}",
|
|
f"URL был: {payload.get('old_url') or '-'}",
|
|
f"URL станет: {payload.get('url') or '-'}",
|
|
]
|
|
)
|
|
if action == "update_vendor":
|
|
parts = [f"Изменить вендора: {payload.get('old_name') or payload.get('vendor_id')} → {payload.get('name')}"]
|
|
if payload.get("website"):
|
|
parts.append(f"Сайт: {payload['website']}")
|
|
if payload.get("mont_page"):
|
|
parts.append(f"MONT: {payload['mont_page']}")
|
|
return "\n".join(parts)
|
|
if action == "delete_vendor":
|
|
return f"Удалить вендора: {payload.get('vendor_name') or payload.get('vendor_id')}"
|
|
if action == "delete_category":
|
|
return f"Удалить категорию: {payload.get('category_name') or payload.get('category_id')}"
|
|
if action == "delete_product":
|
|
product = payload.get("product_name") or payload.get("product_id")
|
|
vendor = payload.get("vendor_name") or "-"
|
|
return f"Удалить продукт: {vendor} :: {product}"
|
|
if action == "save_matrix":
|
|
lines = [
|
|
f"Изменить матрицу: связей выбрано {len(payload.get('pairs', []))}",
|
|
f"Добавлено связей: {len(payload.get('added', []))}",
|
|
f"Снято связей: {len(payload.get('removed', []))}",
|
|
]
|
|
added = payload.get("added", [])
|
|
removed = payload.get("removed", [])
|
|
if added:
|
|
lines.append("")
|
|
lines.append("Добавить:")
|
|
lines.extend([f"+ {item.get('product')} -> {item.get('category')}" for item in added[:80]])
|
|
if len(added) > 80:
|
|
lines.append(f"... еще {len(added) - 80}")
|
|
if removed:
|
|
lines.append("")
|
|
lines.append("Снять:")
|
|
lines.extend([f"- {item.get('product')} -> {item.get('category')}" for item in removed[:80]])
|
|
if len(removed) > 80:
|
|
lines.append(f"... еще {len(removed) - 80}")
|
|
return "\n".join(lines)
|
|
return json.dumps(payload, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def change_title(action: str) -> str:
|
|
return {
|
|
"add_vendor": "Добавление вендора",
|
|
"add_category": "Добавление категории",
|
|
"add_product": "Добавление продукта",
|
|
"update_vendor": "Изменение вендора",
|
|
"update_product": "Изменение продукта",
|
|
"delete_vendor": "Удаление вендора",
|
|
"delete_category": "Удаление категории",
|
|
"delete_product": "Удаление продукта",
|
|
"save_matrix": "Изменение матрицы",
|
|
}.get(action, action)
|
|
|
|
|
|
def notify_pending_change(change_id: int, scope: str, action: str, payload: dict) -> bool:
|
|
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
|
|
return False
|
|
|
|
admin_url = request.url_root.rstrip("/") + f"{ADMIN_PATH}?scope={scope}"
|
|
raw_details = describe_change(action, payload)
|
|
if len(raw_details) > 3000:
|
|
raw_details = raw_details[:3000] + "\n... сокращено, полный список в админке"
|
|
details = escape(raw_details)
|
|
text = "\n".join(
|
|
[
|
|
"<b>Zkart: нужно подтверждение</b>",
|
|
"",
|
|
f"<b>Заявка:</b> #{change_id}",
|
|
f"<b>Тип:</b> {escape(change_title(action))}",
|
|
f"<b>Раздел:</b> {escape(scope_label(scope))}",
|
|
f"<b>Админ:</b> {escape(session.get('admin_login') or ADMIN_LOGIN)}",
|
|
"",
|
|
f"<b>Изменение:</b>\n<pre>{details}</pre>",
|
|
"",
|
|
f"<a href=\"{escape(admin_url)}\">Открыть админку</a>",
|
|
]
|
|
)
|
|
data = urlencode(
|
|
{
|
|
"chat_id": TELEGRAM_CHAT_ID,
|
|
"text": text,
|
|
"parse_mode": "HTML",
|
|
"disable_web_page_preview": "1",
|
|
}
|
|
).encode("utf-8")
|
|
req = Request(
|
|
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
|
|
data=data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
method="POST",
|
|
)
|
|
try:
|
|
with urlopen(req, timeout=5) as response:
|
|
return 200 <= response.status < 300
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def approve_pending_change(conn, row, reviewed_by: str) -> bool:
|
|
savepoint = f"approve_change_{int(row['id'])}"
|
|
conn.execute(f"SAVEPOINT {savepoint}")
|
|
try:
|
|
change_tables = scope_tables(row["scope"])
|
|
apply_change(conn, change_tables, row["action"], json.loads(row["payload"]))
|
|
conn.execute(
|
|
"""
|
|
UPDATE pending_changes
|
|
SET status = 'approved', reviewed_by = ?, reviewed_at = ?
|
|
WHERE id = ?
|
|
""",
|
|
(reviewed_by, local_now(), row["id"]),
|
|
)
|
|
conn.execute(f"RELEASE SAVEPOINT {savepoint}")
|
|
return True
|
|
except (KeyError, TypeError, ValueError, sqlite3.DatabaseError):
|
|
conn.execute(f"ROLLBACK TO SAVEPOINT {savepoint}")
|
|
conn.execute(f"RELEASE SAVEPOINT {savepoint}")
|
|
return False
|
|
|
|
|
|
def prepare_pending_change(change: dict) -> dict:
|
|
change["payload"] = json.loads(change["payload"])
|
|
change["title"] = change_title(change["action"])
|
|
change["scope_label"] = scope_label(change["scope"])
|
|
change["description"] = describe_change(change["action"], change["payload"])
|
|
return change
|
|
|
|
|
|
def fetch_admin_matrix(conn, tables: dict[str, str]) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]:
|
|
vendors = [dict(r) for r in conn.execute(
|
|
f"SELECT id, name, COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page "
|
|
f"FROM {tables['vendors']} ORDER BY lower(name)"
|
|
)]
|
|
categories = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['categories']} ORDER BY lower(name)")]
|
|
products = [
|
|
dict(r)
|
|
for r in conn.execute(
|
|
f"""
|
|
SELECT p.id, p.name, p.vendor_id, v.name AS vendor_name
|
|
, p.url
|
|
FROM {tables['products']} p
|
|
JOIN {tables['vendors']} v ON v.id = p.vendor_id
|
|
ORDER BY lower(v.name), lower(p.name)
|
|
"""
|
|
)
|
|
]
|
|
links = {
|
|
(r["product_id"], r["category_id"])
|
|
for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}")
|
|
}
|
|
return vendors, categories, products, links
|
|
|
|
|
|
def apply_pending_overlay(
|
|
vendors: list[dict],
|
|
categories: list[dict],
|
|
products: list[dict],
|
|
links: set[tuple[int, int]],
|
|
pending_changes: list[dict],
|
|
) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]:
|
|
vendors = [dict(item) for item in vendors]
|
|
categories = [dict(item) for item in categories]
|
|
products = [dict(item) for item in products]
|
|
links = set(links)
|
|
|
|
for change in sorted(pending_changes, key=lambda item: item["id"]):
|
|
if change["scope"] not in {"infra", "ib"}:
|
|
continue
|
|
action = change["action"]
|
|
payload = change["payload"]
|
|
|
|
if action == "add_vendor":
|
|
vendors.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True})
|
|
elif action == "add_category":
|
|
categories.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True})
|
|
elif action == "add_product":
|
|
products.append(
|
|
{
|
|
"id": -change["id"],
|
|
"name": payload.get("name", ""),
|
|
"vendor_id": payload.get("vendor_id"),
|
|
"vendor_name": payload.get("vendor_name", ""),
|
|
"url": payload.get("url") or None,
|
|
"pending": True,
|
|
}
|
|
)
|
|
elif action == "update_product":
|
|
for product in products:
|
|
if product["id"] == payload.get("product_id"):
|
|
product["name"] = payload.get("name", product["name"])
|
|
product["url"] = payload.get("url") or None
|
|
product["pending"] = True
|
|
break
|
|
elif action == "update_vendor":
|
|
vendor_id = payload.get("vendor_id")
|
|
for vendor in vendors:
|
|
if vendor["id"] == vendor_id:
|
|
vendor["name"] = payload.get("name", vendor["name"])
|
|
vendor["website"] = payload.get("website") or ""
|
|
vendor["mont_page"] = payload.get("mont_page") or ""
|
|
vendor["pending"] = True
|
|
break
|
|
elif action == "delete_vendor":
|
|
vendor_id = payload.get("vendor_id")
|
|
vendors = [item for item in vendors if item["id"] != vendor_id]
|
|
products = [item for item in products if item.get("vendor_id") != vendor_id]
|
|
elif action == "delete_category":
|
|
category_id = payload.get("category_id")
|
|
categories = [item for item in categories if item["id"] != category_id]
|
|
links = {pair for pair in links if pair[1] != category_id}
|
|
elif action == "delete_product":
|
|
product_id = payload.get("product_id")
|
|
products = [item for item in products if item["id"] != product_id]
|
|
links = {pair for pair in links if pair[0] != product_id}
|
|
elif action == "save_matrix":
|
|
links = {tuple(pair) for pair in payload.get("pairs", [])}
|
|
|
|
vendors.sort(key=lambda item: item["name"].lower())
|
|
categories.sort(key=lambda item: item["name"].lower())
|
|
products.sort(key=lambda item: (item.get("vendor_name", "").lower(), item.get("name", "").lower()))
|
|
return vendors, categories, products, links
|
|
|
|
|
|
@bp.get("/")
|
|
def index():
|
|
return render_template("index.html")
|
|
|
|
|
|
@bp.get("/api/data")
|
|
def api_data():
|
|
scope = (request.args.get("scope") or "infra").strip().lower()
|
|
if scope in {"ib", "sec", "security"}:
|
|
scope = "ib"
|
|
else:
|
|
scope = "infra"
|
|
return jsonify(fetch_scope_data(scope))
|
|
|
|
|
|
@bp.get("/sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj")
|
|
def legacy_admin_redirect():
|
|
raw_scope = (request.args.get("scope") or "infra").strip().lower()
|
|
scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra"
|
|
return redirect_admin(scope)
|
|
|
|
|
|
@bp.route(ADMIN_PATH, methods=["GET", "POST"])
|
|
def admin_login_or_panel():
|
|
conn = get_db()
|
|
raw_scope = (request.args.get("scope") or request.form.get("scope") or "infra").strip().lower()
|
|
scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra"
|
|
tables = scope_tables(scope)
|
|
|
|
if request.method == "POST" and not require_admin() and request.form.get("action") == "login":
|
|
username = request.form.get("username") or ""
|
|
password = request.form.get("password") or ""
|
|
admin_user = conn.execute(
|
|
"SELECT username, password, role, access_scopes FROM admin_users WHERE username = ?",
|
|
(username,),
|
|
).fetchone()
|
|
if admin_user and password == admin_user["password"]:
|
|
session["is_admin"] = True
|
|
session["admin_role"] = admin_user["role"]
|
|
session["admin_login"] = admin_user["username"]
|
|
session["admin_scopes"] = admin_user["access_scopes"]
|
|
conn.close()
|
|
return redirect(ADMIN_PATH)
|
|
conn.close()
|
|
return render_template("login.html", error="Неверный логин или пароль")
|
|
|
|
if not require_admin():
|
|
conn.close()
|
|
return render_template("login.html", error=None)
|
|
|
|
if not can_access_scope(scope):
|
|
conn.close()
|
|
return redirect_admin(next(iter(allowed_scopes())))
|
|
|
|
if request.method == "POST":
|
|
action = request.form.get("action", "")
|
|
if action == "logout":
|
|
session.pop("is_admin", None)
|
|
session.pop("admin_role", None)
|
|
session.pop("admin_login", None)
|
|
session.pop("admin_scopes", None)
|
|
conn.close()
|
|
return redirect(ADMIN_PATH)
|
|
|
|
if not can_access_scope(scope):
|
|
flash("Нет доступа к этому разделу.", "error")
|
|
conn.close()
|
|
return redirect_admin(next(iter(allowed_scopes())))
|
|
|
|
if action == "approve_all_changes":
|
|
if not is_super_admin():
|
|
flash("Недостаточно прав для подтверждения изменений.", "error")
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
rows = conn.execute(
|
|
"SELECT * FROM pending_changes WHERE status = 'pending' ORDER BY id"
|
|
).fetchall()
|
|
reviewed_by = session.get("admin_login") or SUPER_ADMIN_LOGIN
|
|
approved = 0
|
|
failed = 0
|
|
for row in rows:
|
|
if approve_pending_change(conn, row, reviewed_by):
|
|
approved += 1
|
|
else:
|
|
failed += 1
|
|
conn.commit()
|
|
conn.close()
|
|
if failed:
|
|
flash(f"Утверждено заявок: {approved}. Не удалось применить: {failed}.", "error")
|
|
else:
|
|
flash(f"Все заявки утверждены: {approved}.", "ok")
|
|
return redirect_admin(scope)
|
|
|
|
if action == "create_admin":
|
|
if not is_super_admin():
|
|
flash("Недостаточно прав для управления админами.", "error")
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
username = (request.form.get("username") or "").strip()
|
|
access = parse_admin_scopes()
|
|
if not username or not access:
|
|
flash("Укажите логин и минимум один раздел доступа.", "error")
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
password = make_admin_password()
|
|
try:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO admin_users(username, password, role, access_scopes, created_at)
|
|
VALUES (?, ?, 'admin', ?, ?)
|
|
""",
|
|
(username, password, access, local_now()),
|
|
)
|
|
conn.commit()
|
|
session["created_admin_credentials"] = {
|
|
"username": username,
|
|
"password": password,
|
|
"access": access,
|
|
}
|
|
flash(f"Админ создан. Логин: {username}. Пароль: {password}", "ok")
|
|
except sqlite3.IntegrityError:
|
|
conn.rollback()
|
|
session.pop("created_admin_credentials", None)
|
|
flash("Админ с таким логином уже существует.", "error")
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
|
|
if action == "delete_admin":
|
|
if not is_super_admin():
|
|
flash("Недостаточно прав для управления админами.", "error")
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
admin_id = parse_int(request.form.get("admin_id"))
|
|
if admin_id:
|
|
target = conn.execute("SELECT role, username FROM admin_users WHERE id = ?", (admin_id,)).fetchone()
|
|
if target and target["role"] == "super":
|
|
flash("Супер-админа удалить нельзя.", "error")
|
|
elif target and target["username"] == session.get("admin_login"):
|
|
flash("Нельзя удалить текущего пользователя.", "error")
|
|
else:
|
|
conn.execute("DELETE FROM admin_users WHERE id = ? AND role <> 'super'", (admin_id,))
|
|
conn.commit()
|
|
flash("Админ удален.", "ok")
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
|
|
if action in {"approve_change", "reject_change"}:
|
|
if not is_super_admin():
|
|
flash("Недостаточно прав для подтверждения изменений.", "error")
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
change_id = parse_int(request.form.get("change_id"))
|
|
row = None
|
|
if change_id:
|
|
row = conn.execute(
|
|
"SELECT * FROM pending_changes WHERE id = ? AND status = 'pending'",
|
|
(change_id,),
|
|
).fetchone()
|
|
if not row:
|
|
flash("Заявка не найдена или уже обработана.", "error")
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
if action == "reject_change":
|
|
conn.execute(
|
|
"""
|
|
UPDATE pending_changes
|
|
SET status = 'rejected', reviewed_by = ?, reviewed_at = ?
|
|
WHERE id = ?
|
|
""",
|
|
(session.get("admin_login") or SUPER_ADMIN_LOGIN, local_now(), change_id),
|
|
)
|
|
flash("Заявка отклонена.", "ok")
|
|
else:
|
|
if approve_pending_change(conn, row, session.get("admin_login") or SUPER_ADMIN_LOGIN):
|
|
flash("Заявка утверждена и применена.", "ok")
|
|
else:
|
|
flash("Не удалось применить заявку. Проверьте, не изменились ли связанные записи.", "error")
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
conn.commit()
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
|
|
payload = collect_change_payload(action, scope, conn, tables)
|
|
if payload is not None:
|
|
if is_super_admin():
|
|
try:
|
|
apply_change(conn, tables, action, payload)
|
|
flash("Изменения сохранены.", "ok")
|
|
except sqlite3.IntegrityError:
|
|
conn.rollback()
|
|
flash("Не удалось сохранить: возможно, такое название уже есть у выбранного вендора.", "error")
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
else:
|
|
change_id = queue_change(conn, scope, action, payload)
|
|
notify_pending_change(change_id, scope, action, payload)
|
|
flash("Изменения отправлены супер админу. Они вступят в силу после утверждения.", "ok")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
return redirect_admin(scope)
|
|
|
|
vendors, categories, products, links = fetch_admin_matrix(conn, tables)
|
|
if is_super_admin():
|
|
pending_rows = conn.execute(
|
|
"""
|
|
SELECT id, scope, action, payload, created_by, created_at
|
|
FROM pending_changes
|
|
WHERE status = 'pending'
|
|
ORDER BY id DESC
|
|
"""
|
|
)
|
|
else:
|
|
pending_rows = conn.execute(
|
|
"""
|
|
SELECT id, scope, action, payload, created_by, created_at
|
|
FROM pending_changes
|
|
WHERE status = 'pending' AND created_by = ?
|
|
ORDER BY id DESC
|
|
""",
|
|
(session.get("admin_login") or ADMIN_LOGIN,),
|
|
)
|
|
pending_changes = [prepare_pending_change(dict(r)) for r in pending_rows]
|
|
if not is_super_admin():
|
|
scoped_pending = [change for change in pending_changes if change["scope"] == scope]
|
|
vendors, categories, products, links = apply_pending_overlay(vendors, categories, products, links, scoped_pending)
|
|
admin_users = [
|
|
dict(r)
|
|
for r in conn.execute(
|
|
"""
|
|
SELECT id, username, role, access_scopes, created_at
|
|
FROM admin_users
|
|
ORDER BY role DESC, lower(username)
|
|
"""
|
|
)
|
|
]
|
|
created_admin_credentials = session.pop("created_admin_credentials", None)
|
|
conn.close()
|
|
return render_template(
|
|
"admin.html",
|
|
vendors=vendors,
|
|
categories=categories,
|
|
products=products,
|
|
links=links,
|
|
scope=scope,
|
|
is_super_admin=is_super_admin(),
|
|
allowed_scopes=allowed_scopes(),
|
|
pending_changes=pending_changes,
|
|
admin_users=admin_users,
|
|
created_admin_credentials=created_admin_credentials,
|
|
)
|
|
|
|
|
|
@bp.get("/health")
|
|
def health():
|
|
return {"status": "ok"}
|
|
|
|
|
|
@bp.get("/assets/mont-logo")
|
|
def mont_logo():
|
|
return send_from_directory(BASE_DIR, "mont_logo.png")
|