from __future__ import annotations import json import secrets import sqlite3 from datetime import datetime from html import escape from urllib.parse import urlencode from urllib.request import Request, urlopen from zoneinfo import ZoneInfo from flask import Blueprint, flash, jsonify, redirect, render_template, request, send_from_directory, session from .config import ( ADMIN_LOGIN, ADMIN_PASSWORD, ADMIN_PATH, BASE_DIR, SUPER_ADMIN_LOGIN, SUPER_ADMIN_PASSWORD, TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, ) from .db import fetch_scope_data, get_db, scope_tables bp = Blueprint("main", __name__) MOSCOW_TZ = ZoneInfo("Europe/Moscow") def require_admin() -> bool: return bool(session.get("is_admin")) def is_super_admin() -> bool: return session.get("admin_role") == "super" def allowed_scopes() -> set[str]: if is_super_admin(): return {"infra", "ib"} raw = session.get("admin_scopes") or "infra,ib" scopes = {item.strip() for item in raw.split(",") if item.strip()} return scopes or {"infra"} def can_access_scope(scope: str) -> bool: return scope in allowed_scopes() def parse_int(form_value: str | None) -> int | None: if not form_value: return None try: return int(form_value) except ValueError: return None def redirect_admin(scope: str): return redirect(f"{ADMIN_PATH}?scope={scope}") def make_admin_password() -> str: return secrets.token_urlsafe(9) def parse_admin_scopes() -> str: scopes = [] if request.form.get("allow_infra"): scopes.append("infra") if request.form.get("allow_ib"): scopes.append("ib") return ",".join(scopes) def local_now() -> str: return datetime.now(MOSCOW_TZ).strftime("%Y-%m-%d %H:%M:%S") def scope_label(scope: str) -> str: return "ИБ" if scope == "ib" else "Инфраструктура" def rebuild_vendor_categories(conn, tables: dict[str, str]) -> None: conn.execute(f"DELETE FROM {tables['vendor_categories']}") conn.execute( f""" INSERT OR IGNORE INTO {tables['vendor_categories']}(vendor_id, category_id) SELECT DISTINCT p.vendor_id, pc.category_id FROM {tables['products']} p JOIN {tables['product_categories']} pc ON pc.product_id = p.id """ ) def collect_change_payload(action: str, scope: str, conn, tables: dict[str, str]) -> dict | None: if action == "add_vendor": name = (request.form.get("name") or "").strip() return {"name": name} if name else None if action == "add_category": name = (request.form.get("name") or "").strip() return {"name": name} if name else None if action == "add_product": vendor_id = parse_int(request.form.get("vendor_id")) name = (request.form.get("name") or "").strip() url = (request.form.get("url") or "").strip() if not vendor_id or not name: return None vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone() return { "vendor_id": vendor_id, "vendor_name": vendor["name"] if vendor else "", "name": name, "url": url, } if action == "update_product": product_id = parse_int(request.form.get("product_id")) name = (request.form.get("name") or "").strip() url = (request.form.get("url") or "").strip() if not product_id or not name: return None product = conn.execute( f""" SELECT p.name, p.url, v.name AS vendor_name FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id WHERE p.id = ? """, (product_id,), ).fetchone() return { "product_id": product_id, "vendor_name": product["vendor_name"] if product else "", "old_name": product["name"] if product else "", "old_url": product["url"] if product else "", "name": name, "url": url, } if action == "update_vendor": vendor_id = parse_int(request.form.get("vendor_id")) name = (request.form.get("name") or "").strip() website = (request.form.get("website") or "").strip() mont_page = (request.form.get("mont_page") or "").strip() if not vendor_id or not name: return None vendor = conn.execute(f"SELECT name, website, mont_page FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone() return { "vendor_id": vendor_id, "old_name": vendor["name"] if vendor else "", "name": name, "website": website, "mont_page": mont_page, } if action == "delete_vendor": vendor_id = parse_int(request.form.get("vendor_id")) if not vendor_id: return None vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone() return {"vendor_id": vendor_id, "vendor_name": vendor["name"] if vendor else ""} if action == "delete_category": category_id = parse_int(request.form.get("category_id")) if not category_id: return None category = conn.execute(f"SELECT name FROM {tables['categories']} WHERE id = ?", (category_id,)).fetchone() return {"category_id": category_id, "category_name": category["name"] if category else ""} if action == "delete_product": product_id = parse_int(request.form.get("product_id")) if not product_id: return None product = conn.execute( f""" SELECT p.name, v.name AS vendor_name FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id WHERE p.id = ? """, (product_id,), ).fetchone() return { "product_id": product_id, "product_name": product["name"] if product else "", "vendor_name": product["vendor_name"] if product else "", } if action == "save_matrix": products = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['products']}")] categories = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['categories']}")] pairs: list[list[int]] = [] for product_id in products: for category_id in categories: if request.form.get(f"pc_{product_id}_{category_id}"): pairs.append([product_id, category_id]) current_pairs = { (r["product_id"], r["category_id"]) for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}") } new_pairs = {tuple(pair) for pair in pairs} changed_pairs = sorted((new_pairs - current_pairs) | (current_pairs - new_pairs)) product_labels = { r["id"]: f"{r['vendor_name']} :: {r['name']}" for r in conn.execute( f""" SELECT p.id, p.name, v.name AS vendor_name FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id """ ) } category_labels = { r["id"]: r["name"] for r in conn.execute(f"SELECT id, name FROM {tables['categories']}") } added = [] removed = [] for product_id, category_id in changed_pairs: item = { "product_id": product_id, "category_id": category_id, "product": product_labels.get(product_id, str(product_id)), "category": category_labels.get(category_id, str(category_id)), } if (product_id, category_id) in new_pairs: added.append(item) else: removed.append(item) return {"pairs": pairs, "added": added, "removed": removed} return None def apply_change(conn, tables: dict[str, str], action: str, payload: dict) -> None: if action == "add_vendor": conn.execute(f"INSERT OR IGNORE INTO {tables['vendors']}(name) VALUES (?)", (payload["name"],)) elif action == "add_category": conn.execute(f"INSERT OR IGNORE INTO {tables['categories']}(name) VALUES (?)", (payload["name"],)) elif action == "add_product": conn.execute( f"INSERT OR IGNORE INTO {tables['products']}(vendor_id, name, url) VALUES (?, ?, ?)", (payload["vendor_id"], payload["name"], payload.get("url") or None), ) if payload.get("url"): conn.execute( f"UPDATE {tables['products']} SET url = ? WHERE vendor_id = ? AND name = ?", (payload["url"], payload["vendor_id"], payload["name"]), ) elif action == "update_product": conn.execute( f"UPDATE {tables['products']} SET name = ?, url = ? WHERE id = ?", (payload["name"], payload.get("url") or None, payload["product_id"]), ) elif action == "update_vendor": conn.execute( f"UPDATE {tables['vendors']} SET name = ?, website = ?, mont_page = ? WHERE id = ?", (payload["name"], payload.get("website") or None, payload.get("mont_page") or None, payload["vendor_id"]), ) elif action == "delete_vendor": conn.execute(f"DELETE FROM {tables['vendors']} WHERE id = ?", (payload["vendor_id"],)) elif action == "delete_category": conn.execute(f"DELETE FROM {tables['categories']} WHERE id = ?", (payload["category_id"],)) elif action == "delete_product": conn.execute(f"DELETE FROM {tables['products']} WHERE id = ?", (payload["product_id"],)) elif action == "save_matrix": conn.execute(f"DELETE FROM {tables['product_categories']}") conn.executemany( f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)", [tuple(pair) for pair in payload.get("pairs", [])], ) rebuild_vendor_categories(conn, tables) def queue_change(conn, scope: str, action: str, payload: dict) -> int: cur = conn.execute( """ INSERT INTO pending_changes(scope, action, payload, created_by, created_at) VALUES (?, ?, ?, ?, ?) """, ( scope, action, json.dumps(payload, ensure_ascii=False), session.get("admin_login") or ADMIN_LOGIN, local_now(), ), ) return int(cur.lastrowid) def describe_change(action: str, payload: dict) -> str: if action == "add_vendor": return f"Добавить вендора: {payload.get('name')}" if action == "add_category": return f"Добавить категорию: {payload.get('name')}" if action == "add_product": parts = [ "Добавить продукт", f"Вендор: {payload.get('vendor_name') or payload.get('vendor_id')}", f"Название: {payload.get('name')}", ] if payload.get("url"): parts.append(f"URL: {payload.get('url')}") return "\n".join(parts) if action == "update_product": return "\n".join( [ "Изменить продукт", f"Вендор: {payload.get('vendor_name') or '-'}", f"Было: {payload.get('old_name') or '-'}", f"Стало: {payload.get('name')}", f"URL был: {payload.get('old_url') or '-'}", f"URL станет: {payload.get('url') or '-'}", ] ) if action == "update_vendor": parts = [f"Изменить вендора: {payload.get('old_name') or payload.get('vendor_id')} → {payload.get('name')}"] if payload.get("website"): parts.append(f"Сайт: {payload['website']}") if payload.get("mont_page"): parts.append(f"MONT: {payload['mont_page']}") return "\n".join(parts) if action == "delete_vendor": return f"Удалить вендора: {payload.get('vendor_name') or payload.get('vendor_id')}" if action == "delete_category": return f"Удалить категорию: {payload.get('category_name') or payload.get('category_id')}" if action == "delete_product": product = payload.get("product_name") or payload.get("product_id") vendor = payload.get("vendor_name") or "-" return f"Удалить продукт: {vendor} :: {product}" if action == "save_matrix": lines = [ f"Изменить матрицу: связей выбрано {len(payload.get('pairs', []))}", f"Добавлено связей: {len(payload.get('added', []))}", f"Снято связей: {len(payload.get('removed', []))}", ] added = payload.get("added", []) removed = payload.get("removed", []) if added: lines.append("") lines.append("Добавить:") lines.extend([f"+ {item.get('product')} -> {item.get('category')}" for item in added[:80]]) if len(added) > 80: lines.append(f"... еще {len(added) - 80}") if removed: lines.append("") lines.append("Снять:") lines.extend([f"- {item.get('product')} -> {item.get('category')}" for item in removed[:80]]) if len(removed) > 80: lines.append(f"... еще {len(removed) - 80}") return "\n".join(lines) return json.dumps(payload, ensure_ascii=False, indent=2) def change_title(action: str) -> str: return { "add_vendor": "Добавление вендора", "add_category": "Добавление категории", "add_product": "Добавление продукта", "update_vendor": "Изменение вендора", "update_product": "Изменение продукта", "delete_vendor": "Удаление вендора", "delete_category": "Удаление категории", "delete_product": "Удаление продукта", "save_matrix": "Изменение матрицы", }.get(action, action) def notify_pending_change(change_id: int, scope: str, action: str, payload: dict) -> bool: if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: return False admin_url = request.url_root.rstrip("/") + f"{ADMIN_PATH}?scope={scope}" raw_details = describe_change(action, payload) if len(raw_details) > 3000: raw_details = raw_details[:3000] + "\n... сокращено, полный список в админке" details = escape(raw_details) text = "\n".join( [ "Zkart: нужно подтверждение", "", f"Заявка: #{change_id}", f"Тип: {escape(change_title(action))}", f"Раздел: {escape(scope_label(scope))}", f"Админ: {escape(session.get('admin_login') or ADMIN_LOGIN)}", "", f"Изменение:\n
{details}
", "", f"Открыть админку", ] ) data = urlencode( { "chat_id": TELEGRAM_CHAT_ID, "text": text, "parse_mode": "HTML", "disable_web_page_preview": "1", } ).encode("utf-8") req = Request( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage", data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) try: with urlopen(req, timeout=5) as response: return 200 <= response.status < 300 except Exception: return False def approve_pending_change(conn, row, reviewed_by: str) -> bool: savepoint = f"approve_change_{int(row['id'])}" conn.execute(f"SAVEPOINT {savepoint}") try: change_tables = scope_tables(row["scope"]) apply_change(conn, change_tables, row["action"], json.loads(row["payload"])) conn.execute( """ UPDATE pending_changes SET status = 'approved', reviewed_by = ?, reviewed_at = ? WHERE id = ? """, (reviewed_by, local_now(), row["id"]), ) conn.execute(f"RELEASE SAVEPOINT {savepoint}") return True except (KeyError, TypeError, ValueError, sqlite3.DatabaseError): conn.execute(f"ROLLBACK TO SAVEPOINT {savepoint}") conn.execute(f"RELEASE SAVEPOINT {savepoint}") return False def prepare_pending_change(change: dict) -> dict: change["payload"] = json.loads(change["payload"]) change["title"] = change_title(change["action"]) change["scope_label"] = scope_label(change["scope"]) change["description"] = describe_change(change["action"], change["payload"]) return change def fetch_admin_matrix(conn, tables: dict[str, str]) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]: vendors = [dict(r) for r in conn.execute( f"SELECT id, name, COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page " f"FROM {tables['vendors']} ORDER BY lower(name)" )] categories = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['categories']} ORDER BY lower(name)")] products = [ dict(r) for r in conn.execute( f""" SELECT p.id, p.name, p.vendor_id, v.name AS vendor_name , p.url FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id ORDER BY lower(v.name), lower(p.name) """ ) ] links = { (r["product_id"], r["category_id"]) for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}") } return vendors, categories, products, links def apply_pending_overlay( vendors: list[dict], categories: list[dict], products: list[dict], links: set[tuple[int, int]], pending_changes: list[dict], ) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]: vendors = [dict(item) for item in vendors] categories = [dict(item) for item in categories] products = [dict(item) for item in products] links = set(links) for change in sorted(pending_changes, key=lambda item: item["id"]): if change["scope"] not in {"infra", "ib"}: continue action = change["action"] payload = change["payload"] if action == "add_vendor": vendors.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True}) elif action == "add_category": categories.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True}) elif action == "add_product": products.append( { "id": -change["id"], "name": payload.get("name", ""), "vendor_id": payload.get("vendor_id"), "vendor_name": payload.get("vendor_name", ""), "url": payload.get("url") or None, "pending": True, } ) elif action == "update_product": for product in products: if product["id"] == payload.get("product_id"): product["name"] = payload.get("name", product["name"]) product["url"] = payload.get("url") or None product["pending"] = True break elif action == "update_vendor": vendor_id = payload.get("vendor_id") for vendor in vendors: if vendor["id"] == vendor_id: vendor["name"] = payload.get("name", vendor["name"]) vendor["website"] = payload.get("website") or "" vendor["mont_page"] = payload.get("mont_page") or "" vendor["pending"] = True break elif action == "delete_vendor": vendor_id = payload.get("vendor_id") vendors = [item for item in vendors if item["id"] != vendor_id] products = [item for item in products if item.get("vendor_id") != vendor_id] elif action == "delete_category": category_id = payload.get("category_id") categories = [item for item in categories if item["id"] != category_id] links = {pair for pair in links if pair[1] != category_id} elif action == "delete_product": product_id = payload.get("product_id") products = [item for item in products if item["id"] != product_id] links = {pair for pair in links if pair[0] != product_id} elif action == "save_matrix": links = {tuple(pair) for pair in payload.get("pairs", [])} vendors.sort(key=lambda item: item["name"].lower()) categories.sort(key=lambda item: item["name"].lower()) products.sort(key=lambda item: (item.get("vendor_name", "").lower(), item.get("name", "").lower())) return vendors, categories, products, links @bp.get("/") def index(): return render_template("index.html") @bp.get("/api/data") def api_data(): scope = (request.args.get("scope") or "infra").strip().lower() if scope in {"ib", "sec", "security"}: scope = "ib" else: scope = "infra" return jsonify(fetch_scope_data(scope)) @bp.get("/sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj") def legacy_admin_redirect(): raw_scope = (request.args.get("scope") or "infra").strip().lower() scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra" return redirect_admin(scope) @bp.route(ADMIN_PATH, methods=["GET", "POST"]) def admin_login_or_panel(): conn = get_db() raw_scope = (request.args.get("scope") or request.form.get("scope") or "infra").strip().lower() scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra" tables = scope_tables(scope) if request.method == "POST" and not require_admin() and request.form.get("action") == "login": username = request.form.get("username") or "" password = request.form.get("password") or "" admin_user = conn.execute( "SELECT username, password, role, access_scopes FROM admin_users WHERE username = ?", (username,), ).fetchone() if admin_user and password == admin_user["password"]: session["is_admin"] = True session["admin_role"] = admin_user["role"] session["admin_login"] = admin_user["username"] session["admin_scopes"] = admin_user["access_scopes"] conn.close() return redirect(ADMIN_PATH) conn.close() return render_template("login.html", error="Неверный логин или пароль") if not require_admin(): conn.close() return render_template("login.html", error=None) if not can_access_scope(scope): conn.close() return redirect_admin(next(iter(allowed_scopes()))) if request.method == "POST": action = request.form.get("action", "") if action == "logout": session.pop("is_admin", None) session.pop("admin_role", None) session.pop("admin_login", None) session.pop("admin_scopes", None) conn.close() return redirect(ADMIN_PATH) if not can_access_scope(scope): flash("Нет доступа к этому разделу.", "error") conn.close() return redirect_admin(next(iter(allowed_scopes()))) if action == "approve_all_changes": if not is_super_admin(): flash("Недостаточно прав для подтверждения изменений.", "error") conn.close() return redirect_admin(scope) rows = conn.execute( "SELECT * FROM pending_changes WHERE status = 'pending' ORDER BY id" ).fetchall() reviewed_by = session.get("admin_login") or SUPER_ADMIN_LOGIN approved = 0 failed = 0 for row in rows: if approve_pending_change(conn, row, reviewed_by): approved += 1 else: failed += 1 conn.commit() conn.close() if failed: flash(f"Утверждено заявок: {approved}. Не удалось применить: {failed}.", "error") else: flash(f"Все заявки утверждены: {approved}.", "ok") return redirect_admin(scope) if action == "create_admin": if not is_super_admin(): flash("Недостаточно прав для управления админами.", "error") conn.close() return redirect_admin(scope) username = (request.form.get("username") or "").strip() access = parse_admin_scopes() if not username or not access: flash("Укажите логин и минимум один раздел доступа.", "error") conn.close() return redirect_admin(scope) password = make_admin_password() try: conn.execute( """ INSERT INTO admin_users(username, password, role, access_scopes, created_at) VALUES (?, ?, 'admin', ?, ?) """, (username, password, access, local_now()), ) conn.commit() session["created_admin_credentials"] = { "username": username, "password": password, "access": access, } flash(f"Админ создан. Логин: {username}. Пароль: {password}", "ok") except sqlite3.IntegrityError: conn.rollback() session.pop("created_admin_credentials", None) flash("Админ с таким логином уже существует.", "error") conn.close() return redirect_admin(scope) if action == "delete_admin": if not is_super_admin(): flash("Недостаточно прав для управления админами.", "error") conn.close() return redirect_admin(scope) admin_id = parse_int(request.form.get("admin_id")) if admin_id: target = conn.execute("SELECT role, username FROM admin_users WHERE id = ?", (admin_id,)).fetchone() if target and target["role"] == "super": flash("Супер-админа удалить нельзя.", "error") elif target and target["username"] == session.get("admin_login"): flash("Нельзя удалить текущего пользователя.", "error") else: conn.execute("DELETE FROM admin_users WHERE id = ? AND role <> 'super'", (admin_id,)) conn.commit() flash("Админ удален.", "ok") conn.close() return redirect_admin(scope) if action in {"approve_change", "reject_change"}: if not is_super_admin(): flash("Недостаточно прав для подтверждения изменений.", "error") conn.close() return redirect_admin(scope) change_id = parse_int(request.form.get("change_id")) row = None if change_id: row = conn.execute( "SELECT * FROM pending_changes WHERE id = ? AND status = 'pending'", (change_id,), ).fetchone() if not row: flash("Заявка не найдена или уже обработана.", "error") conn.close() return redirect_admin(scope) if action == "reject_change": conn.execute( """ UPDATE pending_changes SET status = 'rejected', reviewed_by = ?, reviewed_at = ? WHERE id = ? """, (session.get("admin_login") or SUPER_ADMIN_LOGIN, local_now(), change_id), ) flash("Заявка отклонена.", "ok") else: if approve_pending_change(conn, row, session.get("admin_login") or SUPER_ADMIN_LOGIN): flash("Заявка утверждена и применена.", "ok") else: flash("Не удалось применить заявку. Проверьте, не изменились ли связанные записи.", "error") conn.close() return redirect_admin(scope) conn.commit() conn.close() return redirect_admin(scope) payload = collect_change_payload(action, scope, conn, tables) if payload is not None: if is_super_admin(): try: apply_change(conn, tables, action, payload) flash("Изменения сохранены.", "ok") except sqlite3.IntegrityError: conn.rollback() flash("Не удалось сохранить: возможно, такое название уже есть у выбранного вендора.", "error") conn.close() return redirect_admin(scope) else: change_id = queue_change(conn, scope, action, payload) notify_pending_change(change_id, scope, action, payload) flash("Изменения отправлены супер админу. Они вступят в силу после утверждения.", "ok") conn.commit() conn.close() return redirect_admin(scope) vendors, categories, products, links = fetch_admin_matrix(conn, tables) if is_super_admin(): pending_rows = conn.execute( """ SELECT id, scope, action, payload, created_by, created_at FROM pending_changes WHERE status = 'pending' ORDER BY id DESC """ ) else: pending_rows = conn.execute( """ SELECT id, scope, action, payload, created_by, created_at FROM pending_changes WHERE status = 'pending' AND created_by = ? ORDER BY id DESC """, (session.get("admin_login") or ADMIN_LOGIN,), ) pending_changes = [prepare_pending_change(dict(r)) for r in pending_rows] if not is_super_admin(): scoped_pending = [change for change in pending_changes if change["scope"] == scope] vendors, categories, products, links = apply_pending_overlay(vendors, categories, products, links, scoped_pending) admin_users = [ dict(r) for r in conn.execute( """ SELECT id, username, role, access_scopes, created_at FROM admin_users ORDER BY role DESC, lower(username) """ ) ] created_admin_credentials = session.pop("created_admin_credentials", None) conn.close() return render_template( "admin.html", vendors=vendors, categories=categories, products=products, links=links, scope=scope, is_super_admin=is_super_admin(), allowed_scopes=allowed_scopes(), pending_changes=pending_changes, admin_users=admin_users, created_admin_credentials=created_admin_credentials, ) @bp.get("/health") def health(): return {"status": "ok"} @bp.post("/api/contact") def contact(): import re as _re data = request.get_json(silent=True) or {} name = str(data.get("name", "")).strip() email = str(data.get("email", "")).strip() phone = str(data.get("phone", "")).strip() text = str(data.get("text", "")).strip() if not all([name, email, phone, text]): return jsonify({"detail": "Заполните все обязательные поля"}), 422 if not _re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", email): return jsonify({"detail": "Некорректный email"}), 422 if not _re.match(r"^[\+\d][\d\s\-\(\)]{6,18}$", phone): return jsonify({"detail": "Некорректный номер телефона"}), 422 divider = "━" * 22 msg = ( f"🔔 *Сообщение через форму*\n{divider}\n\n" f"👤 *Имя:* {name}\n" f"📧 *Email:* {email}\n" f"📱 *Телефон:* {phone}\n\n" f"💬 *Сообщение:*\n{text}" ) payload = json.dumps({ "chat_id": TELEGRAM_CHAT_ID, "text": msg, "parse_mode": "Markdown", }).encode() url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" try: req = Request(url, data=payload, headers={"Content-Type": "application/json"}) with urlopen(req, timeout=10) as resp: resp.read() except Exception as e: return jsonify({"detail": f"Ошибка отправки: {e}"}), 502 return jsonify({"ok": True}) @bp.get("/assets/mont-logo") def mont_logo(): return send_from_directory(BASE_DIR, "mont_logo.png")