Files
mont_vendor_maps/zkart_app/routes.py
T

970 lines
38 KiB
Python

from __future__ import annotations
import json
import secrets
import sqlite3
from datetime import datetime
from html import escape
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from zoneinfo import ZoneInfo
from flask import Blueprint, flash, jsonify, redirect, render_template, request, send_from_directory, session
from .config import (
ADMIN_LOGIN,
ADMIN_PASSWORD,
ADMIN_PATH,
BASE_DIR,
SUPER_ADMIN_LOGIN,
SUPER_ADMIN_PASSWORD,
TELEGRAM_BOT_TOKEN,
TELEGRAM_CHAT_ID,
)
from .db import fetch_scope_data, get_db, scope_tables
bp = Blueprint("main", __name__)
MOSCOW_TZ = ZoneInfo("Europe/Moscow")
def require_admin() -> bool:
return bool(session.get("is_admin"))
def is_super_admin() -> bool:
return session.get("admin_role") == "super"
def allowed_scopes() -> set[str]:
if is_super_admin():
return {"infra", "ib"}
raw = session.get("admin_scopes") or "infra,ib"
scopes = {item.strip() for item in raw.split(",") if item.strip()}
return scopes or {"infra"}
def can_access_scope(scope: str) -> bool:
return scope in allowed_scopes()
def parse_int(form_value: str | None) -> int | None:
if not form_value:
return None
try:
return int(form_value)
except ValueError:
return None
def redirect_admin(scope: str):
return redirect(f"{ADMIN_PATH}?scope={scope}")
def make_admin_password() -> str:
return secrets.token_urlsafe(9)
def parse_admin_scopes() -> str:
scopes = []
if request.form.get("allow_infra"):
scopes.append("infra")
if request.form.get("allow_ib"):
scopes.append("ib")
return ",".join(scopes)
def local_now() -> str:
return datetime.now(MOSCOW_TZ).strftime("%Y-%m-%d %H:%M:%S")
def scope_label(scope: str) -> str:
return "ИБ" if scope == "ib" else "Инфраструктура"
def rebuild_vendor_categories(conn, tables: dict[str, str]) -> None:
conn.execute(f"DELETE FROM {tables['vendor_categories']}")
conn.execute(
f"""
INSERT OR IGNORE INTO {tables['vendor_categories']}(vendor_id, category_id)
SELECT DISTINCT p.vendor_id, pc.category_id
FROM {tables['products']} p
JOIN {tables['product_categories']} pc ON pc.product_id = p.id
"""
)
def collect_change_payload(action: str, scope: str, conn, tables: dict[str, str]) -> dict | None:
if action == "add_vendor":
name = (request.form.get("name") or "").strip()
return {"name": name} if name else None
if action == "add_category":
name = (request.form.get("name") or "").strip()
return {"name": name} if name else None
if action == "add_product":
vendor_id = parse_int(request.form.get("vendor_id"))
name = (request.form.get("name") or "").strip()
url = (request.form.get("url") or "").strip()
if not vendor_id or not name:
return None
vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
return {
"vendor_id": vendor_id,
"vendor_name": vendor["name"] if vendor else "",
"name": name,
"url": url,
}
if action == "update_product":
product_id = parse_int(request.form.get("product_id"))
name = (request.form.get("name") or "").strip()
url = (request.form.get("url") or "").strip()
if not product_id or not name:
return None
product = conn.execute(
f"""
SELECT p.name, p.url, v.name AS vendor_name
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
WHERE p.id = ?
""",
(product_id,),
).fetchone()
return {
"product_id": product_id,
"vendor_name": product["vendor_name"] if product else "",
"old_name": product["name"] if product else "",
"old_url": product["url"] if product else "",
"name": name,
"url": url,
}
if action == "update_vendor":
vendor_id = parse_int(request.form.get("vendor_id"))
name = (request.form.get("name") or "").strip()
website = (request.form.get("website") or "").strip()
mont_page = (request.form.get("mont_page") or "").strip()
if not vendor_id or not name:
return None
vendor = conn.execute(f"SELECT name, website, mont_page FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
return {
"vendor_id": vendor_id,
"old_name": vendor["name"] if vendor else "",
"name": name,
"website": website,
"mont_page": mont_page,
}
if action == "delete_vendor":
vendor_id = parse_int(request.form.get("vendor_id"))
if not vendor_id:
return None
vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
return {"vendor_id": vendor_id, "vendor_name": vendor["name"] if vendor else ""}
if action == "delete_category":
category_id = parse_int(request.form.get("category_id"))
if not category_id:
return None
category = conn.execute(f"SELECT name FROM {tables['categories']} WHERE id = ?", (category_id,)).fetchone()
return {"category_id": category_id, "category_name": category["name"] if category else ""}
if action == "delete_product":
product_id = parse_int(request.form.get("product_id"))
if not product_id:
return None
product = conn.execute(
f"""
SELECT p.name, v.name AS vendor_name
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
WHERE p.id = ?
""",
(product_id,),
).fetchone()
return {
"product_id": product_id,
"product_name": product["name"] if product else "",
"vendor_name": product["vendor_name"] if product else "",
}
if action == "save_matrix":
products = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['products']}")]
categories = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['categories']}")]
pairs: list[list[int]] = []
for product_id in products:
for category_id in categories:
if request.form.get(f"pc_{product_id}_{category_id}"):
pairs.append([product_id, category_id])
current_pairs = {
(r["product_id"], r["category_id"])
for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}")
}
new_pairs = {tuple(pair) for pair in pairs}
changed_pairs = sorted((new_pairs - current_pairs) | (current_pairs - new_pairs))
product_labels = {
r["id"]: f"{r['vendor_name']} :: {r['name']}"
for r in conn.execute(
f"""
SELECT p.id, p.name, v.name AS vendor_name
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
"""
)
}
category_labels = {
r["id"]: r["name"]
for r in conn.execute(f"SELECT id, name FROM {tables['categories']}")
}
added = []
removed = []
for product_id, category_id in changed_pairs:
item = {
"product_id": product_id,
"category_id": category_id,
"product": product_labels.get(product_id, str(product_id)),
"category": category_labels.get(category_id, str(category_id)),
}
if (product_id, category_id) in new_pairs:
added.append(item)
else:
removed.append(item)
return {"pairs": pairs, "added": added, "removed": removed}
return None
def apply_change(conn, tables: dict[str, str], action: str, payload: dict) -> None:
if action == "add_vendor":
conn.execute(f"INSERT OR IGNORE INTO {tables['vendors']}(name) VALUES (?)", (payload["name"],))
elif action == "add_category":
conn.execute(f"INSERT OR IGNORE INTO {tables['categories']}(name) VALUES (?)", (payload["name"],))
elif action == "add_product":
conn.execute(
f"INSERT OR IGNORE INTO {tables['products']}(vendor_id, name, url) VALUES (?, ?, ?)",
(payload["vendor_id"], payload["name"], payload.get("url") or None),
)
if payload.get("url"):
conn.execute(
f"UPDATE {tables['products']} SET url = ? WHERE vendor_id = ? AND name = ?",
(payload["url"], payload["vendor_id"], payload["name"]),
)
elif action == "update_product":
conn.execute(
f"UPDATE {tables['products']} SET name = ?, url = ? WHERE id = ?",
(payload["name"], payload.get("url") or None, payload["product_id"]),
)
elif action == "update_vendor":
conn.execute(
f"UPDATE {tables['vendors']} SET name = ?, website = ?, mont_page = ? WHERE id = ?",
(payload["name"], payload.get("website") or None, payload.get("mont_page") or None, payload["vendor_id"]),
)
elif action == "delete_vendor":
conn.execute(f"DELETE FROM {tables['vendors']} WHERE id = ?", (payload["vendor_id"],))
elif action == "delete_category":
conn.execute(f"DELETE FROM {tables['categories']} WHERE id = ?", (payload["category_id"],))
elif action == "delete_product":
conn.execute(f"DELETE FROM {tables['products']} WHERE id = ?", (payload["product_id"],))
elif action == "save_matrix":
conn.execute(f"DELETE FROM {tables['product_categories']}")
conn.executemany(
f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)",
[tuple(pair) for pair in payload.get("pairs", [])],
)
rebuild_vendor_categories(conn, tables)
def queue_change(conn, scope: str, action: str, payload: dict) -> int:
cur = conn.execute(
"""
INSERT INTO pending_changes(scope, action, payload, created_by, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(
scope,
action,
json.dumps(payload, ensure_ascii=False),
session.get("admin_login") or ADMIN_LOGIN,
local_now(),
),
)
return int(cur.lastrowid)
def describe_change(action: str, payload: dict) -> str:
if action == "add_vendor":
return f"Добавить вендора: {payload.get('name')}"
if action == "add_category":
return f"Добавить категорию: {payload.get('name')}"
if action == "add_product":
parts = [
"Добавить продукт",
f"Вендор: {payload.get('vendor_name') or payload.get('vendor_id')}",
f"Название: {payload.get('name')}",
]
if payload.get("url"):
parts.append(f"URL: {payload.get('url')}")
return "\n".join(parts)
if action == "update_product":
return "\n".join(
[
"Изменить продукт",
f"Вендор: {payload.get('vendor_name') or '-'}",
f"Было: {payload.get('old_name') or '-'}",
f"Стало: {payload.get('name')}",
f"URL был: {payload.get('old_url') or '-'}",
f"URL станет: {payload.get('url') or '-'}",
]
)
if action == "update_vendor":
parts = [f"Изменить вендора: {payload.get('old_name') or payload.get('vendor_id')}{payload.get('name')}"]
if payload.get("website"):
parts.append(f"Сайт: {payload['website']}")
if payload.get("mont_page"):
parts.append(f"MONT: {payload['mont_page']}")
return "\n".join(parts)
if action == "delete_vendor":
return f"Удалить вендора: {payload.get('vendor_name') or payload.get('vendor_id')}"
if action == "delete_category":
return f"Удалить категорию: {payload.get('category_name') or payload.get('category_id')}"
if action == "delete_product":
product = payload.get("product_name") or payload.get("product_id")
vendor = payload.get("vendor_name") or "-"
return f"Удалить продукт: {vendor} :: {product}"
if action == "save_matrix":
lines = [
f"Изменить матрицу: связей выбрано {len(payload.get('pairs', []))}",
f"Добавлено связей: {len(payload.get('added', []))}",
f"Снято связей: {len(payload.get('removed', []))}",
]
added = payload.get("added", [])
removed = payload.get("removed", [])
if added:
lines.append("")
lines.append("Добавить:")
lines.extend([f"+ {item.get('product')} -> {item.get('category')}" for item in added[:80]])
if len(added) > 80:
lines.append(f"... еще {len(added) - 80}")
if removed:
lines.append("")
lines.append("Снять:")
lines.extend([f"- {item.get('product')} -> {item.get('category')}" for item in removed[:80]])
if len(removed) > 80:
lines.append(f"... еще {len(removed) - 80}")
return "\n".join(lines)
return json.dumps(payload, ensure_ascii=False, indent=2)
def change_title(action: str) -> str:
return {
"add_vendor": "Добавление вендора",
"add_category": "Добавление категории",
"add_product": "Добавление продукта",
"update_vendor": "Изменение вендора",
"update_product": "Изменение продукта",
"delete_vendor": "Удаление вендора",
"delete_category": "Удаление категории",
"delete_product": "Удаление продукта",
"save_matrix": "Изменение матрицы",
}.get(action, action)
def notify_pending_change(change_id: int, scope: str, action: str, payload: dict) -> bool:
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
return False
admin_url = request.url_root.rstrip("/") + f"{ADMIN_PATH}?scope={scope}"
raw_details = describe_change(action, payload)
if len(raw_details) > 3000:
raw_details = raw_details[:3000] + "\n... сокращено, полный список в админке"
details = escape(raw_details)
text = "\n".join(
[
"<b>Zkart: нужно подтверждение</b>",
"",
f"<b>Заявка:</b> #{change_id}",
f"<b>Тип:</b> {escape(change_title(action))}",
f"<b>Раздел:</b> {escape(scope_label(scope))}",
f"<b>Админ:</b> {escape(session.get('admin_login') or ADMIN_LOGIN)}",
"",
f"<b>Изменение:</b>\n<pre>{details}</pre>",
"",
f"<a href=\"{escape(admin_url)}\">Открыть админку</a>",
]
)
data = urlencode(
{
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "1",
}
).encode("utf-8")
req = Request(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urlopen(req, timeout=5) as response:
return 200 <= response.status < 300
except Exception:
return False
def approve_pending_change(conn, row, reviewed_by: str) -> bool:
savepoint = f"approve_change_{int(row['id'])}"
conn.execute(f"SAVEPOINT {savepoint}")
try:
change_tables = scope_tables(row["scope"])
apply_change(conn, change_tables, row["action"], json.loads(row["payload"]))
conn.execute(
"""
UPDATE pending_changes
SET status = 'approved', reviewed_by = ?, reviewed_at = ?
WHERE id = ?
""",
(reviewed_by, local_now(), row["id"]),
)
conn.execute(f"RELEASE SAVEPOINT {savepoint}")
return True
except (KeyError, TypeError, ValueError, sqlite3.DatabaseError):
conn.execute(f"ROLLBACK TO SAVEPOINT {savepoint}")
conn.execute(f"RELEASE SAVEPOINT {savepoint}")
return False
def prepare_pending_change(change: dict) -> dict:
change["payload"] = json.loads(change["payload"])
change["title"] = change_title(change["action"])
change["scope_label"] = scope_label(change["scope"])
change["description"] = describe_change(change["action"], change["payload"])
return change
def fetch_admin_matrix(conn, tables: dict[str, str]) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]:
vendors = [dict(r) for r in conn.execute(
f"SELECT id, name, COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page "
f"FROM {tables['vendors']} ORDER BY lower(name)"
)]
categories = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['categories']} ORDER BY lower(name)")]
products = [
dict(r)
for r in conn.execute(
f"""
SELECT p.id, p.name, p.vendor_id, v.name AS vendor_name
, p.url
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
ORDER BY lower(v.name), lower(p.name)
"""
)
]
links = {
(r["product_id"], r["category_id"])
for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}")
}
return vendors, categories, products, links
def apply_pending_overlay(
vendors: list[dict],
categories: list[dict],
products: list[dict],
links: set[tuple[int, int]],
pending_changes: list[dict],
) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]:
vendors = [dict(item) for item in vendors]
categories = [dict(item) for item in categories]
products = [dict(item) for item in products]
links = set(links)
for change in sorted(pending_changes, key=lambda item: item["id"]):
if change["scope"] not in {"infra", "ib"}:
continue
action = change["action"]
payload = change["payload"]
if action == "add_vendor":
vendors.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True})
elif action == "add_category":
categories.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True})
elif action == "add_product":
products.append(
{
"id": -change["id"],
"name": payload.get("name", ""),
"vendor_id": payload.get("vendor_id"),
"vendor_name": payload.get("vendor_name", ""),
"url": payload.get("url") or None,
"pending": True,
}
)
elif action == "update_product":
for product in products:
if product["id"] == payload.get("product_id"):
product["name"] = payload.get("name", product["name"])
product["url"] = payload.get("url") or None
product["pending"] = True
break
elif action == "update_vendor":
vendor_id = payload.get("vendor_id")
for vendor in vendors:
if vendor["id"] == vendor_id:
vendor["name"] = payload.get("name", vendor["name"])
vendor["website"] = payload.get("website") or ""
vendor["mont_page"] = payload.get("mont_page") or ""
vendor["pending"] = True
break
elif action == "delete_vendor":
vendor_id = payload.get("vendor_id")
vendors = [item for item in vendors if item["id"] != vendor_id]
products = [item for item in products if item.get("vendor_id") != vendor_id]
elif action == "delete_category":
category_id = payload.get("category_id")
categories = [item for item in categories if item["id"] != category_id]
links = {pair for pair in links if pair[1] != category_id}
elif action == "delete_product":
product_id = payload.get("product_id")
products = [item for item in products if item["id"] != product_id]
links = {pair for pair in links if pair[0] != product_id}
elif action == "save_matrix":
links = {tuple(pair) for pair in payload.get("pairs", [])}
vendors.sort(key=lambda item: item["name"].lower())
categories.sort(key=lambda item: item["name"].lower())
products.sort(key=lambda item: (item.get("vendor_name", "").lower(), item.get("name", "").lower()))
return vendors, categories, products, links
@bp.get("/")
def index():
conn = get_db()
ssr_vendors = [r[0] for r in conn.execute("SELECT name FROM vendors ORDER BY lower(name)")]
ssr_categories = [r[0] for r in conn.execute(
"SELECT name FROM (SELECT name FROM categories UNION SELECT name FROM ib_categories) ORDER BY lower(name)"
)]
conn.close()
proto = request.headers.get("X-Forwarded-Proto", "https")
canonical_url = f"{proto}://{request.host}"
return render_template("index.html",
ssr_vendors=ssr_vendors,
ssr_categories=ssr_categories,
canonical_url=canonical_url,
)
@bp.get("/robots.txt")
def robots_txt():
from flask import Response
body = (
"User-agent: *\n"
"Allow: /\n"
f"Disallow: {ADMIN_PATH}\n"
"Disallow: /sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj\n"
"\n"
f"Sitemap: https://{{host}}/sitemap.xml\n"
).replace("{host}", request.host)
return Response(body, mimetype="text/plain")
@bp.get("/sitemap.xml")
def sitemap_xml():
from flask import Response
proto = request.headers.get("X-Forwarded-Proto", "https")
base = f"{proto}://{request.host}"
conn = get_db()
slugs = [r[0] for r in conn.execute(
"SELECT slug FROM vendors WHERE slug IS NOT NULL "
"UNION SELECT slug FROM ib_vendors WHERE slug IS NOT NULL"
)]
conn.close()
urls = [f' <url><loc>{base}/</loc><changefreq>weekly</changefreq><priority>1.0</priority></url>']
for slug in slugs:
urls.append(f' <url><loc>{base}/vendor/{slug}</loc><changefreq>monthly</changefreq><priority>0.7</priority></url>')
body = (
'<?xml version="1.0" encoding="UTF-8"?>\n'
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">\n'
+ '\n'.join(urls) + '\n</urlset>'
)
return Response(body, mimetype="application/xml")
@bp.get("/vendor/<slug>")
def vendor_page(slug: str):
from flask import abort
conn = get_db()
# Search in both tables, prefer infra
vendor = conn.execute(
"SELECT id, name, COALESCE(logo,'') as logo, COALESCE(description,'') as description, "
"COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page, 'infra' as scope "
"FROM vendors WHERE slug = ?", (slug,)
).fetchone()
if not vendor:
vendor = conn.execute(
"SELECT id, name, COALESCE(logo,'') as logo, COALESCE(description,'') as description, "
"COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page, 'ib' as scope "
"FROM ib_vendors WHERE slug = ?", (slug,)
).fetchone()
if not vendor:
conn.close()
abort(404)
vendor = dict(vendor)
tables = scope_tables(vendor['scope'])
products = [dict(r) for r in conn.execute(
f"SELECT name, COALESCE(url,'') as url FROM {tables['products']} "
f"WHERE vendor_id = ? ORDER BY lower(name)", (vendor['id'],)
)]
categories = [r[0] for r in conn.execute(
f"SELECT c.name FROM {tables['categories']} c "
f"JOIN {tables['vendor_categories']} vc ON vc.category_id = c.id "
f"WHERE vc.vendor_id = ? ORDER BY lower(c.name)", (vendor['id'],)
)]
conn.close()
proto = request.headers.get("X-Forwarded-Proto", "https")
base_url = f"{proto}://{request.host}"
canonical_url = f"{base_url}/vendor/{slug}"
return render_template("vendor.html",
vendor=vendor,
products=products,
categories=categories,
canonical_url=canonical_url,
base_url=base_url,
)
@bp.get("/api/data")
def api_data():
scope = (request.args.get("scope") or "infra").strip().lower()
if scope in {"ib", "sec", "security"}:
scope = "ib"
else:
scope = "infra"
return jsonify(fetch_scope_data(scope))
@bp.get("/sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj")
def legacy_admin_redirect():
raw_scope = (request.args.get("scope") or "infra").strip().lower()
scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra"
return redirect_admin(scope)
@bp.route(ADMIN_PATH, methods=["GET", "POST"])
def admin_login_or_panel():
conn = get_db()
raw_scope = (request.args.get("scope") or request.form.get("scope") or "infra").strip().lower()
scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra"
tables = scope_tables(scope)
if request.method == "POST" and not require_admin() and request.form.get("action") == "login":
username = request.form.get("username") or ""
password = request.form.get("password") or ""
admin_user = conn.execute(
"SELECT username, password, role, access_scopes FROM admin_users WHERE username = ?",
(username,),
).fetchone()
if admin_user and password == admin_user["password"]:
session["is_admin"] = True
session["admin_role"] = admin_user["role"]
session["admin_login"] = admin_user["username"]
session["admin_scopes"] = admin_user["access_scopes"]
conn.close()
return redirect(ADMIN_PATH)
conn.close()
return render_template("login.html", error="Неверный логин или пароль")
if not require_admin():
conn.close()
return render_template("login.html", error=None)
if not can_access_scope(scope):
conn.close()
return redirect_admin(next(iter(allowed_scopes())))
if request.method == "POST":
action = request.form.get("action", "")
if action == "logout":
session.pop("is_admin", None)
session.pop("admin_role", None)
session.pop("admin_login", None)
session.pop("admin_scopes", None)
conn.close()
return redirect(ADMIN_PATH)
if not can_access_scope(scope):
flash("Нет доступа к этому разделу.", "error")
conn.close()
return redirect_admin(next(iter(allowed_scopes())))
if action == "approve_all_changes":
if not is_super_admin():
flash("Недостаточно прав для подтверждения изменений.", "error")
conn.close()
return redirect_admin(scope)
rows = conn.execute(
"SELECT * FROM pending_changes WHERE status = 'pending' ORDER BY id"
).fetchall()
reviewed_by = session.get("admin_login") or SUPER_ADMIN_LOGIN
approved = 0
failed = 0
for row in rows:
if approve_pending_change(conn, row, reviewed_by):
approved += 1
else:
failed += 1
conn.commit()
conn.close()
if failed:
flash(f"Утверждено заявок: {approved}. Не удалось применить: {failed}.", "error")
else:
flash(f"Все заявки утверждены: {approved}.", "ok")
return redirect_admin(scope)
if action == "create_admin":
if not is_super_admin():
flash("Недостаточно прав для управления админами.", "error")
conn.close()
return redirect_admin(scope)
username = (request.form.get("username") or "").strip()
access = parse_admin_scopes()
if not username or not access:
flash("Укажите логин и минимум один раздел доступа.", "error")
conn.close()
return redirect_admin(scope)
password = make_admin_password()
try:
conn.execute(
"""
INSERT INTO admin_users(username, password, role, access_scopes, created_at)
VALUES (?, ?, 'admin', ?, ?)
""",
(username, password, access, local_now()),
)
conn.commit()
session["created_admin_credentials"] = {
"username": username,
"password": password,
"access": access,
}
flash(f"Админ создан. Логин: {username}. Пароль: {password}", "ok")
except sqlite3.IntegrityError:
conn.rollback()
session.pop("created_admin_credentials", None)
flash("Админ с таким логином уже существует.", "error")
conn.close()
return redirect_admin(scope)
if action == "delete_admin":
if not is_super_admin():
flash("Недостаточно прав для управления админами.", "error")
conn.close()
return redirect_admin(scope)
admin_id = parse_int(request.form.get("admin_id"))
if admin_id:
target = conn.execute("SELECT role, username FROM admin_users WHERE id = ?", (admin_id,)).fetchone()
if target and target["role"] == "super":
flash("Супер-админа удалить нельзя.", "error")
elif target and target["username"] == session.get("admin_login"):
flash("Нельзя удалить текущего пользователя.", "error")
else:
conn.execute("DELETE FROM admin_users WHERE id = ? AND role <> 'super'", (admin_id,))
conn.commit()
flash("Админ удален.", "ok")
conn.close()
return redirect_admin(scope)
if action in {"approve_change", "reject_change"}:
if not is_super_admin():
flash("Недостаточно прав для подтверждения изменений.", "error")
conn.close()
return redirect_admin(scope)
change_id = parse_int(request.form.get("change_id"))
row = None
if change_id:
row = conn.execute(
"SELECT * FROM pending_changes WHERE id = ? AND status = 'pending'",
(change_id,),
).fetchone()
if not row:
flash("Заявка не найдена или уже обработана.", "error")
conn.close()
return redirect_admin(scope)
if action == "reject_change":
conn.execute(
"""
UPDATE pending_changes
SET status = 'rejected', reviewed_by = ?, reviewed_at = ?
WHERE id = ?
""",
(session.get("admin_login") or SUPER_ADMIN_LOGIN, local_now(), change_id),
)
flash("Заявка отклонена.", "ok")
else:
if approve_pending_change(conn, row, session.get("admin_login") or SUPER_ADMIN_LOGIN):
flash("Заявка утверждена и применена.", "ok")
else:
flash("Не удалось применить заявку. Проверьте, не изменились ли связанные записи.", "error")
conn.close()
return redirect_admin(scope)
conn.commit()
conn.close()
return redirect_admin(scope)
payload = collect_change_payload(action, scope, conn, tables)
if payload is not None:
if is_super_admin():
try:
apply_change(conn, tables, action, payload)
flash("Изменения сохранены.", "ok")
except sqlite3.IntegrityError:
conn.rollback()
flash("Не удалось сохранить: возможно, такое название уже есть у выбранного вендора.", "error")
conn.close()
return redirect_admin(scope)
else:
change_id = queue_change(conn, scope, action, payload)
notify_pending_change(change_id, scope, action, payload)
flash("Изменения отправлены супер админу. Они вступят в силу после утверждения.", "ok")
conn.commit()
conn.close()
return redirect_admin(scope)
vendors, categories, products, links = fetch_admin_matrix(conn, tables)
if is_super_admin():
pending_rows = conn.execute(
"""
SELECT id, scope, action, payload, created_by, created_at
FROM pending_changes
WHERE status = 'pending'
ORDER BY id DESC
"""
)
else:
pending_rows = conn.execute(
"""
SELECT id, scope, action, payload, created_by, created_at
FROM pending_changes
WHERE status = 'pending' AND created_by = ?
ORDER BY id DESC
""",
(session.get("admin_login") or ADMIN_LOGIN,),
)
pending_changes = [prepare_pending_change(dict(r)) for r in pending_rows]
if not is_super_admin():
scoped_pending = [change for change in pending_changes if change["scope"] == scope]
vendors, categories, products, links = apply_pending_overlay(vendors, categories, products, links, scoped_pending)
admin_users = [
dict(r)
for r in conn.execute(
"""
SELECT id, username, role, access_scopes, created_at
FROM admin_users
ORDER BY role DESC, lower(username)
"""
)
]
created_admin_credentials = session.pop("created_admin_credentials", None)
conn.close()
return render_template(
"admin.html",
vendors=vendors,
categories=categories,
products=products,
links=links,
scope=scope,
is_super_admin=is_super_admin(),
allowed_scopes=allowed_scopes(),
pending_changes=pending_changes,
admin_users=admin_users,
created_admin_credentials=created_admin_credentials,
)
@bp.get("/health")
def health():
return {"status": "ok"}
@bp.get("/yandex_8addde0be1e0ee72.html")
def yandex_verify():
from flask import Response
return Response(
'<html><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head>'
'<body>Verification: 8addde0be1e0ee72</body></html>',
mimetype="text/html"
)
@bp.post("/api/contact")
def contact():
import re as _re
data = request.get_json(silent=True) or {}
name = str(data.get("name", "")).strip()
email = str(data.get("email", "")).strip()
phone = str(data.get("phone", "")).strip()
text = str(data.get("text", "")).strip()
if not all([name, email, phone, text]):
return jsonify({"detail": "Заполните все обязательные поля"}), 422
if not _re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", email):
return jsonify({"detail": "Некорректный email"}), 422
if not _re.match(r"^[\+\d][\d\s\-\(\)]{6,18}$", phone):
return jsonify({"detail": "Некорректный номер телефона"}), 422
# Geo lookup
ip = request.remote_addr or ""
geo_line = ""
try:
geo_url = f"http://ip-api.com/json/{ip}?lang=ru&fields=status,country,regionName,city,query"
geo_req = Request(geo_url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(geo_req, timeout=5) as geo_resp:
geo = json.loads(geo_resp.read())
if geo.get("status") == "success":
parts = [p for p in [geo.get("country"), geo.get("regionName"), geo.get("city")] if p]
geo_line = f"🌍 *Местоположение:* {', '.join(parts)}\n🖥 *IP:* {geo.get('query', ip)}\n"
except Exception:
geo_line = f"🖥 *IP:* {ip}\n" if ip else ""
divider = "" * 22
msg = (
f"🔔 *Сообщение через форму*\n{divider}\n\n"
f"👤 *Имя:* {name}\n"
f"📧 *Email:* {email}\n"
f"📱 *Телефон:* {phone}\n"
f"{geo_line}\n"
f"💬 *Сообщение:*\n{text}"
)
payload = json.dumps({
"chat_id": TELEGRAM_CHAT_ID,
"text": msg,
"parse_mode": "Markdown",
}).encode()
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
try:
req = Request(url, data=payload, headers={"Content-Type": "application/json"})
with urlopen(req, timeout=10) as resp:
resp.read()
except Exception as e:
return jsonify({"detail": f"Ошибка отправки: {e}"}), 502
return jsonify({"ok": True})
@bp.get("/assets/mont-logo")
def mont_logo():
return send_from_directory(BASE_DIR, "mont_logo.png")