from __future__ import annotations import json import secrets import sqlite3 from datetime import datetime from html import escape from urllib.parse import urlencode from urllib.request import Request, urlopen from zoneinfo import ZoneInfo from flask import Blueprint, flash, jsonify, redirect, render_template, request, send_from_directory, session from .config import ( ADMIN_LOGIN, ADMIN_PASSWORD, ADMIN_PATH, BASE_DIR, NEWS_API_TOKEN, SUPER_ADMIN_LOGIN, SUPER_ADMIN_PASSWORD, TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID, ) from .db import (fetch_scope_data, get_db, scope_tables, get_latest_news, get_news_list, get_news_count, get_news_by_slug, get_news_by_slug_any, get_all_news_admin, get_all_news_admin_paged, create_news, update_news, delete_news, get_upcoming_events, get_all_events, get_event_by_slug) import re as _re_slug _TRANSLIT_MAP = { 'а':'a','б':'b','в':'v','г':'g','д':'d','е':'e','ё':'yo','ж':'zh', 'з':'z','и':'i','й':'y','к':'k','л':'l','м':'m','н':'n','о':'o', 'п':'p','р':'r','с':'s','т':'t','у':'u','ф':'f','х':'kh','ц':'ts', 'ч':'ch','ш':'sh','щ':'shch','ъ':'','ы':'y','ь':'','э':'e','ю':'yu','я':'ya', } def _translit(s: str) -> str: return ''.join(_TRANSLIT_MAP.get(c, c) for c in s.lower()) def _make_slug(title: str) -> str: import datetime as _dt t = _translit(title) t = _re_slug.sub(r'[^a-z0-9]+', '-', t)[:50].strip('-') ts = int(_dt.datetime.now().timestamp()) return f'{t}-{ts}' if t else f'news-{ts}' bp = Blueprint("main", __name__) MOSCOW_TZ = ZoneInfo("Europe/Moscow") def _base_url() -> str: return f"https://{request.host}" def require_admin() -> bool: return bool(session.get("is_admin")) def is_news_editor() -> bool: return session.get("admin_role") in ("news_editor", "super", "admin") def is_super_admin() -> bool: return session.get("admin_role") == "super" def allowed_scopes() -> set[str]: if is_super_admin(): return {"infra", "ib"} raw = session.get("admin_scopes") or "infra,ib" scopes = {item.strip() for item in raw.split(",") if item.strip()} return scopes or {"infra"} def can_access_scope(scope: str) -> bool: return scope in allowed_scopes() def parse_int(form_value: str | None) -> int | None: if not form_value: return None try: return int(form_value) except ValueError: return None def redirect_admin(scope: str): return redirect(f"{ADMIN_PATH}?scope={scope}") def make_admin_password() -> str: return secrets.token_urlsafe(9) def parse_admin_scopes() -> str: scopes = [] if request.form.get("allow_infra"): scopes.append("infra") if request.form.get("allow_ib"): scopes.append("ib") return ",".join(scopes) def local_now() -> str: return datetime.now(MOSCOW_TZ).strftime("%Y-%m-%d %H:%M:%S") def scope_label(scope: str) -> str: return "ИБ" if scope == "ib" else "Инфраструктура" def rebuild_vendor_categories(conn, tables: dict[str, str]) -> None: conn.execute(f"DELETE FROM {tables['vendor_categories']}") conn.execute( f""" INSERT OR IGNORE INTO {tables['vendor_categories']}(vendor_id, category_id) SELECT DISTINCT p.vendor_id, pc.category_id FROM {tables['products']} p JOIN {tables['product_categories']} pc ON pc.product_id = p.id """ ) def collect_change_payload(action: str, scope: str, conn, tables: dict[str, str]) -> dict | None: if action == "add_vendor": name = (request.form.get("name") or "").strip() return {"name": name} if name else None if action == "add_category": name = (request.form.get("name") or "").strip() return {"name": name} if name else None if action == "add_product": vendor_id = parse_int(request.form.get("vendor_id")) name = (request.form.get("name") or "").strip() url = (request.form.get("url") or "").strip() if not vendor_id or not name: return None vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone() return { "vendor_id": vendor_id, "vendor_name": vendor["name"] if vendor else "", "name": name, "url": url, } if action == "update_product": product_id = parse_int(request.form.get("product_id")) name = (request.form.get("name") or "").strip() url = (request.form.get("url") or "").strip() if not product_id or not name: return None product = conn.execute( f""" SELECT p.name, p.url, v.name AS vendor_name FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id WHERE p.id = ? """, (product_id,), ).fetchone() return { "product_id": product_id, "vendor_name": product["vendor_name"] if product else "", "old_name": product["name"] if product else "", "old_url": product["url"] if product else "", "name": name, "url": url, } if action == "update_vendor": vendor_id = parse_int(request.form.get("vendor_id")) name = (request.form.get("name") or "").strip() website = (request.form.get("website") or "").strip() mont_page = (request.form.get("mont_page") or "").strip() if not vendor_id or not name: return None vendor = conn.execute(f"SELECT name, website, mont_page FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone() return { "vendor_id": vendor_id, "old_name": vendor["name"] if vendor else "", "name": name, "website": website, "mont_page": mont_page, } if action == "delete_vendor": vendor_id = parse_int(request.form.get("vendor_id")) if not vendor_id: return None vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone() return {"vendor_id": vendor_id, "vendor_name": vendor["name"] if vendor else ""} if action == "delete_category": category_id = parse_int(request.form.get("category_id")) if not category_id: return None category = conn.execute(f"SELECT name FROM {tables['categories']} WHERE id = ?", (category_id,)).fetchone() return {"category_id": category_id, "category_name": category["name"] if category else ""} if action == "delete_product": product_id = parse_int(request.form.get("product_id")) if not product_id: return None product = conn.execute( f""" SELECT p.name, v.name AS vendor_name FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id WHERE p.id = ? """, (product_id,), ).fetchone() return { "product_id": product_id, "product_name": product["name"] if product else "", "vendor_name": product["vendor_name"] if product else "", } if action == "save_matrix": products = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['products']}")] categories = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['categories']}")] pairs: list[list[int]] = [] for product_id in products: for category_id in categories: if request.form.get(f"pc_{product_id}_{category_id}"): pairs.append([product_id, category_id]) current_pairs = { (r["product_id"], r["category_id"]) for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}") } new_pairs = {tuple(pair) for pair in pairs} changed_pairs = sorted((new_pairs - current_pairs) | (current_pairs - new_pairs)) product_labels = { r["id"]: f"{r['vendor_name']} :: {r['name']}" for r in conn.execute( f""" SELECT p.id, p.name, v.name AS vendor_name FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id """ ) } category_labels = { r["id"]: r["name"] for r in conn.execute(f"SELECT id, name FROM {tables['categories']}") } added = [] removed = [] for product_id, category_id in changed_pairs: item = { "product_id": product_id, "category_id": category_id, "product": product_labels.get(product_id, str(product_id)), "category": category_labels.get(category_id, str(category_id)), } if (product_id, category_id) in new_pairs: added.append(item) else: removed.append(item) return {"pairs": pairs, "added": added, "removed": removed} return None def apply_change(conn, tables: dict[str, str], action: str, payload: dict) -> None: if action == "add_vendor": conn.execute(f"INSERT OR IGNORE INTO {tables['vendors']}(name) VALUES (?)", (payload["name"],)) elif action == "add_category": conn.execute(f"INSERT OR IGNORE INTO {tables['categories']}(name) VALUES (?)", (payload["name"],)) elif action == "add_product": conn.execute( f"INSERT OR IGNORE INTO {tables['products']}(vendor_id, name, url) VALUES (?, ?, ?)", (payload["vendor_id"], payload["name"], payload.get("url") or None), ) if payload.get("url"): conn.execute( f"UPDATE {tables['products']} SET url = ? WHERE vendor_id = ? AND name = ?", (payload["url"], payload["vendor_id"], payload["name"]), ) elif action == "update_product": conn.execute( f"UPDATE {tables['products']} SET name = ?, url = ? WHERE id = ?", (payload["name"], payload.get("url") or None, payload["product_id"]), ) elif action == "update_vendor": conn.execute( f"UPDATE {tables['vendors']} SET name = ?, website = ?, mont_page = ? WHERE id = ?", (payload["name"], payload.get("website") or None, payload.get("mont_page") or None, payload["vendor_id"]), ) elif action == "delete_vendor": conn.execute(f"DELETE FROM {tables['vendors']} WHERE id = ?", (payload["vendor_id"],)) elif action == "delete_category": conn.execute(f"DELETE FROM {tables['categories']} WHERE id = ?", (payload["category_id"],)) elif action == "delete_product": conn.execute(f"DELETE FROM {tables['products']} WHERE id = ?", (payload["product_id"],)) elif action == "save_matrix": conn.execute(f"DELETE FROM {tables['product_categories']}") conn.executemany( f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)", [tuple(pair) for pair in payload.get("pairs", [])], ) rebuild_vendor_categories(conn, tables) def queue_change(conn, scope: str, action: str, payload: dict) -> int: cur = conn.execute( """ INSERT INTO pending_changes(scope, action, payload, created_by, created_at) VALUES (?, ?, ?, ?, ?) """, ( scope, action, json.dumps(payload, ensure_ascii=False), session.get("admin_login") or ADMIN_LOGIN, local_now(), ), ) return int(cur.lastrowid) def describe_change(action: str, payload: dict) -> str: if action == "add_vendor": return f"Добавить вендора: {payload.get('name')}" if action == "add_category": return f"Добавить категорию: {payload.get('name')}" if action == "add_product": parts = [ "Добавить продукт", f"Вендор: {payload.get('vendor_name') or payload.get('vendor_id')}", f"Название: {payload.get('name')}", ] if payload.get("url"): parts.append(f"URL: {payload.get('url')}") return "\n".join(parts) if action == "update_product": return "\n".join( [ "Изменить продукт", f"Вендор: {payload.get('vendor_name') or '-'}", f"Было: {payload.get('old_name') or '-'}", f"Стало: {payload.get('name')}", f"URL был: {payload.get('old_url') or '-'}", f"URL станет: {payload.get('url') or '-'}", ] ) if action == "update_vendor": parts = [f"Изменить вендора: {payload.get('old_name') or payload.get('vendor_id')} → {payload.get('name')}"] if payload.get("website"): parts.append(f"Сайт: {payload['website']}") if payload.get("mont_page"): parts.append(f"MONT: {payload['mont_page']}") return "\n".join(parts) if action == "delete_vendor": return f"Удалить вендора: {payload.get('vendor_name') or payload.get('vendor_id')}" if action == "delete_category": return f"Удалить категорию: {payload.get('category_name') or payload.get('category_id')}" if action == "delete_product": product = payload.get("product_name") or payload.get("product_id") vendor = payload.get("vendor_name") or "-" return f"Удалить продукт: {vendor} :: {product}" if action == "save_matrix": lines = [ f"Изменить матрицу: связей выбрано {len(payload.get('pairs', []))}", f"Добавлено связей: {len(payload.get('added', []))}", f"Снято связей: {len(payload.get('removed', []))}", ] added = payload.get("added", []) removed = payload.get("removed", []) if added: lines.append("") lines.append("Добавить:") lines.extend([f"+ {item.get('product')} -> {item.get('category')}" for item in added[:80]]) if len(added) > 80: lines.append(f"... еще {len(added) - 80}") if removed: lines.append("") lines.append("Снять:") lines.extend([f"- {item.get('product')} -> {item.get('category')}" for item in removed[:80]]) if len(removed) > 80: lines.append(f"... еще {len(removed) - 80}") return "\n".join(lines) return json.dumps(payload, ensure_ascii=False, indent=2) def change_title(action: str) -> str: return { "add_vendor": "Добавление вендора", "add_category": "Добавление категории", "add_product": "Добавление продукта", "update_vendor": "Изменение вендора", "update_product": "Изменение продукта", "delete_vendor": "Удаление вендора", "delete_category": "Удаление категории", "delete_product": "Удаление продукта", "save_matrix": "Изменение матрицы", }.get(action, action) def notify_pending_change(change_id: int, scope: str, action: str, payload: dict) -> bool: if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: return False admin_url = request.url_root.rstrip("/") + f"{ADMIN_PATH}?scope={scope}" raw_details = describe_change(action, payload) if len(raw_details) > 3000: raw_details = raw_details[:3000] + "\n... сокращено, полный список в админке" details = escape(raw_details) text = "\n".join( [ "Zkart: нужно подтверждение", "", f"Заявка: #{change_id}", f"Тип: {escape(change_title(action))}", f"Раздел: {escape(scope_label(scope))}", f"Админ: {escape(session.get('admin_login') or ADMIN_LOGIN)}", "", f"Изменение:\n
{details}
", "", f"Открыть админку", ] ) data = urlencode( { "chat_id": TELEGRAM_CHAT_ID, "text": text, "parse_mode": "HTML", "disable_web_page_preview": "1", } ).encode("utf-8") req = Request( f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage", data=data, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) try: with urlopen(req, timeout=5) as response: return 200 <= response.status < 300 except Exception: return False def approve_pending_change(conn, row, reviewed_by: str) -> bool: savepoint = f"approve_change_{int(row['id'])}" conn.execute(f"SAVEPOINT {savepoint}") try: change_tables = scope_tables(row["scope"]) apply_change(conn, change_tables, row["action"], json.loads(row["payload"])) conn.execute( """ UPDATE pending_changes SET status = 'approved', reviewed_by = ?, reviewed_at = ? WHERE id = ? """, (reviewed_by, local_now(), row["id"]), ) conn.execute(f"RELEASE SAVEPOINT {savepoint}") return True except (KeyError, TypeError, ValueError, sqlite3.DatabaseError): conn.execute(f"ROLLBACK TO SAVEPOINT {savepoint}") conn.execute(f"RELEASE SAVEPOINT {savepoint}") return False def prepare_pending_change(change: dict) -> dict: change["payload"] = json.loads(change["payload"]) change["title"] = change_title(change["action"]) change["scope_label"] = scope_label(change["scope"]) change["description"] = describe_change(change["action"], change["payload"]) return change def fetch_admin_matrix(conn, tables: dict[str, str]) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]: vendors = [dict(r) for r in conn.execute( f"SELECT id, name, COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page " f"FROM {tables['vendors']} ORDER BY lower(name)" )] categories = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['categories']} ORDER BY lower(name)")] products = [ dict(r) for r in conn.execute( f""" SELECT p.id, p.name, p.vendor_id, v.name AS vendor_name , p.url FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id ORDER BY lower(v.name), lower(p.name) """ ) ] links = { (r["product_id"], r["category_id"]) for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}") } return vendors, categories, products, links def apply_pending_overlay( vendors: list[dict], categories: list[dict], products: list[dict], links: set[tuple[int, int]], pending_changes: list[dict], ) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]: vendors = [dict(item) for item in vendors] categories = [dict(item) for item in categories] products = [dict(item) for item in products] links = set(links) for change in sorted(pending_changes, key=lambda item: item["id"]): if change["scope"] not in {"infra", "ib"}: continue action = change["action"] payload = change["payload"] if action == "add_vendor": vendors.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True}) elif action == "add_category": categories.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True}) elif action == "add_product": products.append( { "id": -change["id"], "name": payload.get("name", ""), "vendor_id": payload.get("vendor_id"), "vendor_name": payload.get("vendor_name", ""), "url": payload.get("url") or None, "pending": True, } ) elif action == "update_product": for product in products: if product["id"] == payload.get("product_id"): product["name"] = payload.get("name", product["name"]) product["url"] = payload.get("url") or None product["pending"] = True break elif action == "update_vendor": vendor_id = payload.get("vendor_id") for vendor in vendors: if vendor["id"] == vendor_id: vendor["name"] = payload.get("name", vendor["name"]) vendor["website"] = payload.get("website") or "" vendor["mont_page"] = payload.get("mont_page") or "" vendor["pending"] = True break elif action == "delete_vendor": vendor_id = payload.get("vendor_id") vendors = [item for item in vendors if item["id"] != vendor_id] products = [item for item in products if item.get("vendor_id") != vendor_id] elif action == "delete_category": category_id = payload.get("category_id") categories = [item for item in categories if item["id"] != category_id] links = {pair for pair in links if pair[1] != category_id} elif action == "delete_product": product_id = payload.get("product_id") products = [item for item in products if item["id"] != product_id] links = {pair for pair in links if pair[0] != product_id} elif action == "save_matrix": links = {tuple(pair) for pair in payload.get("pairs", [])} vendors.sort(key=lambda item: item["name"].lower()) categories.sort(key=lambda item: item["name"].lower()) products.sort(key=lambda item: (item.get("vendor_name", "").lower(), item.get("name", "").lower())) return vendors, categories, products, links @bp.get("/") def index(): conn = get_db() ssr_vendors = [r[0] for r in conn.execute("SELECT name FROM vendors ORDER BY lower(name)")] ssr_categories = [r[0] for r in conn.execute( "SELECT name FROM (SELECT name FROM categories UNION SELECT name FROM ib_categories) ORDER BY lower(name)" )] conn.close() latest_news = get_latest_news(3) canonical_url = _base_url() upcoming_events = get_upcoming_events(3) return render_template("index.html", ssr_vendors=ssr_vendors, ssr_categories=ssr_categories, latest_news=latest_news, upcoming_events=upcoming_events, canonical_url=canonical_url, is_news_editor=is_news_editor(), ) @bp.get("/robots.txt") def robots_txt(): from flask import Response body = ( "User-agent: *\n" "Allow: /\n" f"Disallow: {ADMIN_PATH}\n" "Disallow: /sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj\n" "\n" "Sitemap: https://maps.4mont.ru/sitemap.xml\n" ) return Response(body, mimetype="text/plain") INDEXNOW_KEY = "a3f8c2e1b7d94056a3f8c2e1b7d94056" @bp.get("/sitemap.xml") def sitemap_xml(): from flask import Response base = "https://maps.4mont.ru" conn = get_db() vendor_slugs = [r[0] for r in conn.execute( "SELECT slug FROM vendors WHERE slug IS NOT NULL " "UNION SELECT slug FROM ib_vendors WHERE slug IS NOT NULL" )] conn.close() urls = [f' {base}/daily1.0'] for slug in vendor_slugs: urls.append(f' {base}/vendor/{slug}monthly0.7') body = ( '\n' '\n' + '\n'.join(urls) + '\n' ) return Response(body, mimetype="application/xml") @bp.get(f"/{INDEXNOW_KEY}.txt") def indexnow_key_file(): from flask import Response return Response(INDEXNOW_KEY, mimetype="text/plain") def ping_indexnow(page_url: str): """Notify Yandex about a new/updated URL via IndexNow.""" import threading, urllib.request, json as _json def _ping(): try: payload = _json.dumps({ "host": "maps.4mont.ru", "key": INDEXNOW_KEY, "urlList": [page_url] }).encode() req = urllib.request.Request( "https://yandex.com/indexnow", data=payload, headers={"Content-Type": "application/json"}, method="POST" ) with urllib.request.urlopen(req, timeout=10) as r: pass except Exception: pass threading.Thread(target=_ping, daemon=True).start() @bp.get("/vendor/") def vendor_page(slug: str): from flask import abort conn = get_db() # Search in both tables, prefer infra vendor = conn.execute( "SELECT id, name, COALESCE(logo,'') as logo, COALESCE(description,'') as description, " "COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page, 'infra' as scope " "FROM vendors WHERE slug = ?", (slug,) ).fetchone() if not vendor: vendor = conn.execute( "SELECT id, name, COALESCE(logo,'') as logo, COALESCE(description,'') as description, " "COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page, 'ib' as scope " "FROM ib_vendors WHERE slug = ?", (slug,) ).fetchone() if not vendor: conn.close() abort(404) vendor = dict(vendor) tables = scope_tables(vendor['scope']) products = [dict(r) for r in conn.execute( f"SELECT name, COALESCE(url,'') as url FROM {tables['products']} " f"WHERE vendor_id = ? ORDER BY lower(name)", (vendor['id'],) )] categories = [r[0] for r in conn.execute( f"SELECT c.name FROM {tables['categories']} c " f"JOIN {tables['vendor_categories']} vc ON vc.category_id = c.id " f"WHERE vc.vendor_id = ? ORDER BY lower(c.name)", (vendor['id'],) )] conn.close() base_url = _base_url() canonical_url = f"{base_url}/vendor/{slug}" return render_template("vendor.html", vendor=vendor, products=products, categories=categories, canonical_url=canonical_url, base_url=base_url, ) @bp.get("/api/data") def api_data(): scope = (request.args.get("scope") or "infra").strip().lower() if scope in {"ib", "sec", "security"}: scope = "ib" else: scope = "infra" return jsonify(fetch_scope_data(scope)) @bp.get("/sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj") def legacy_admin_redirect(): raw_scope = (request.args.get("scope") or "infra").strip().lower() scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra" return redirect_admin(scope) @bp.route(ADMIN_PATH, methods=["GET", "POST"]) def admin_login_or_panel(): conn = get_db() raw_scope = (request.args.get("scope") or request.form.get("scope") or "infra").strip().lower() scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra" tables = scope_tables(scope) if request.method == "POST" and not require_admin() and request.form.get("action") == "login": username = request.form.get("username") or "" password = request.form.get("password") or "" admin_user = conn.execute( "SELECT username, password, role, access_scopes FROM admin_users WHERE username = ?", (username,), ).fetchone() if admin_user and password == admin_user["password"]: session["is_admin"] = True session["admin_role"] = admin_user["role"] session["admin_login"] = admin_user["username"] session["admin_scopes"] = admin_user["access_scopes"] conn.close() return redirect(ADMIN_PATH) conn.close() return render_template("login.html", error="Неверный логин или пароль") if not require_admin(): conn.close() return render_template("login.html", error=None) if not can_access_scope(scope): conn.close() return redirect_admin(next(iter(allowed_scopes()))) if request.method == "POST": action = request.form.get("action", "") if action == "logout": session.pop("is_admin", None) session.pop("admin_role", None) session.pop("admin_login", None) session.pop("admin_scopes", None) conn.close() return redirect(ADMIN_PATH) if not can_access_scope(scope): flash("Нет доступа к этому разделу.", "error") conn.close() return redirect_admin(next(iter(allowed_scopes()))) if action == "approve_all_changes": if not is_super_admin(): flash("Недостаточно прав для подтверждения изменений.", "error") conn.close() return redirect_admin(scope) rows = conn.execute( "SELECT * FROM pending_changes WHERE status = 'pending' ORDER BY id" ).fetchall() reviewed_by = session.get("admin_login") or SUPER_ADMIN_LOGIN approved = 0 failed = 0 for row in rows: if approve_pending_change(conn, row, reviewed_by): approved += 1 else: failed += 1 conn.commit() conn.close() if failed: flash(f"Утверждено заявок: {approved}. Не удалось применить: {failed}.", "error") else: flash(f"Все заявки утверждены: {approved}.", "ok") return redirect_admin(scope) if action == "create_admin": if not is_super_admin(): flash("Недостаточно прав для управления админами.", "error") conn.close() return redirect_admin(scope) username = (request.form.get("username") or "").strip() new_role = request.form.get("new_role", "admin") if new_role not in ("admin", "news_editor"): new_role = "admin" access = "news" if new_role == "news_editor" else parse_admin_scopes() if not username or not access: flash("Укажите логин и минимум один раздел доступа.", "error") conn.close() return redirect_admin(scope) password = make_admin_password() try: conn.execute( """ INSERT INTO admin_users(username, password, role, access_scopes, created_at) VALUES (?, ?, ?, ?, ?) """, (username, password, new_role, access, local_now()), ) conn.commit() session["created_admin_credentials"] = { "username": username, "password": password, "access": access, } flash(f"Пользователь создан. Логин: {username}. Пароль: {password}", "ok") except sqlite3.IntegrityError: conn.rollback() session.pop("created_admin_credentials", None) flash("Пользователь с таким логином уже существует.", "error") conn.close() return redirect_admin(scope) if action == "delete_admin": if not is_super_admin(): flash("Недостаточно прав для управления админами.", "error") conn.close() return redirect_admin(scope) admin_id = parse_int(request.form.get("admin_id")) if admin_id: target = conn.execute("SELECT role, username FROM admin_users WHERE id = ?", (admin_id,)).fetchone() if target and target["role"] == "super": flash("Супер-админа удалить нельзя.", "error") elif target and target["username"] == session.get("admin_login"): flash("Нельзя удалить текущего пользователя.", "error") else: conn.execute("DELETE FROM admin_users WHERE id = ? AND role <> 'super'", (admin_id,)) conn.commit() flash("Админ удален.", "ok") conn.close() return redirect_admin(scope) if action in {"approve_change", "reject_change"}: if not is_super_admin(): flash("Недостаточно прав для подтверждения изменений.", "error") conn.close() return redirect_admin(scope) change_id = parse_int(request.form.get("change_id")) row = None if change_id: row = conn.execute( "SELECT * FROM pending_changes WHERE id = ? AND status = 'pending'", (change_id,), ).fetchone() if not row: flash("Заявка не найдена или уже обработана.", "error") conn.close() return redirect_admin(scope) if action == "reject_change": conn.execute( """ UPDATE pending_changes SET status = 'rejected', reviewed_by = ?, reviewed_at = ? WHERE id = ? """, (session.get("admin_login") or SUPER_ADMIN_LOGIN, local_now(), change_id), ) flash("Заявка отклонена.", "ok") else: if approve_pending_change(conn, row, session.get("admin_login") or SUPER_ADMIN_LOGIN): flash("Заявка утверждена и применена.", "ok") else: flash("Не удалось применить заявку. Проверьте, не изменились ли связанные записи.", "error") conn.close() return redirect_admin(scope) conn.commit() conn.close() return redirect_admin(scope) payload = collect_change_payload(action, scope, conn, tables) if payload is not None: if is_super_admin(): try: apply_change(conn, tables, action, payload) flash("Изменения сохранены.", "ok") except sqlite3.IntegrityError: conn.rollback() flash("Не удалось сохранить: возможно, такое название уже есть у выбранного вендора.", "error") conn.close() return redirect_admin(scope) else: change_id = queue_change(conn, scope, action, payload) notify_pending_change(change_id, scope, action, payload) flash("Изменения отправлены супер админу. Они вступят в силу после утверждения.", "ok") conn.commit() conn.close() return redirect_admin(scope) vendors, categories, products, links = fetch_admin_matrix(conn, tables) if is_super_admin(): pending_rows = conn.execute( """ SELECT id, scope, action, payload, created_by, created_at FROM pending_changes WHERE status = 'pending' ORDER BY id DESC """ ) else: pending_rows = conn.execute( """ SELECT id, scope, action, payload, created_by, created_at FROM pending_changes WHERE status = 'pending' AND created_by = ? ORDER BY id DESC """, (session.get("admin_login") or ADMIN_LOGIN,), ) pending_changes = [prepare_pending_change(dict(r)) for r in pending_rows] if not is_super_admin(): scoped_pending = [change for change in pending_changes if change["scope"] == scope] vendors, categories, products, links = apply_pending_overlay(vendors, categories, products, links, scoped_pending) admin_users = [ dict(r) for r in conn.execute( """ SELECT id, username, role, access_scopes, created_at FROM admin_users ORDER BY role DESC, lower(username) """ ) ] created_admin_credentials = session.pop("created_admin_credentials", None) conn.close() return render_template( "admin.html", vendors=vendors, categories=categories, products=products, links=links, scope=scope, is_super_admin=is_super_admin(), allowed_scopes=allowed_scopes(), pending_changes=pending_changes, admin_users=admin_users, created_admin_credentials=created_admin_credentials, ) @bp.get("/news") def news_list(): base_url = _base_url() per_page = 10 page = max(1, request.args.get("page", 1, type=int)) offset = (page - 1) * per_page articles = get_news_list(limit=per_page, offset=offset) total = get_news_count() total_pages = max(1, (total + per_page - 1) // per_page) return render_template("news_list.html", articles=articles, base_url=base_url, page=page, total_pages=total_pages, is_news_editor=is_news_editor()) @bp.post("/api/news/publish") def api_news_publish(): token = request.headers.get("X-API-Token", "") or request.json.get("token", "") if request.is_json else "" if not NEWS_API_TOKEN or token != NEWS_API_TOKEN: return jsonify({"error": "Unauthorized"}), 401 data = request.get_json(silent=True) or {} title = str(data.get("title", "")).strip() body = str(data.get("body", "")).strip() image_url = str(data.get("image_url", "")).strip() if not title or not body: return jsonify({"error": "title и body обязательны"}), 422 import os as _os from urllib.request import urlretrieve as _urlretrieve slug = _make_slug(title) image_path = None if image_url: try: ext = _os.path.splitext(image_url.split("?")[0])[1].lower() if ext not in (".jpg", ".jpeg", ".png", ".webp", ".gif"): ext = ".jpg" fname = f"news_{secrets.token_hex(8)}{ext}" save_dir = _os.path.join(BASE_DIR, "static", "news_images") _os.makedirs(save_dir, exist_ok=True) _urlretrieve(image_url, _os.path.join(save_dir, fname)) image_path = f"news_images/{fname}" except Exception: image_path = None try: create_news(title, body, slug, image_path) except Exception: slug = f"{slug}-{secrets.token_hex(3)}" create_news(title, body, slug, image_path) ping_indexnow(f"https://maps.4mont.ru/news/{slug}") return jsonify({"ok": True, "slug": slug, "url": f"/news/{slug}"}), 201 @bp.delete("/api/news/") def api_news_delete(slug: str): token = request.headers.get("X-API-Token", "") if not NEWS_API_TOKEN or token != NEWS_API_TOKEN: return jsonify({"error": "Unauthorized"}), 401 article = get_news_by_slug_any(slug) if not article: return jsonify({"error": "Not found"}), 404 conn = get_db() conn.execute("UPDATE news SET published=0 WHERE id=?", (article["id"],)) conn.commit() conn.close() return jsonify({"ok": True, "unpublished_slug": slug}), 200 @bp.get("/news/") def news_article(slug: str): from flask import abort article = get_news_by_slug(slug) if not article: abort(404) base_url = _base_url() return render_template("news_article.html", article=article, base_url=base_url, is_news_editor=is_news_editor()) NEWS_ADMIN_PATH = "/news-admin" @bp.route(NEWS_ADMIN_PATH, methods=["GET", "POST"]) def news_admin(): action = request.form.get("action", "") # Handle login before auth check if request.method == "POST" and action == "news_login": username = request.form.get("username", "").strip() password = request.form.get("password", "").strip() conn = get_db() user = conn.execute( "SELECT username, password, role FROM admin_users WHERE username=?", (username,) ).fetchone() conn.close() if user and password == user["password"] and user["role"] in ("news_editor", "super", "admin"): session["is_admin"] = True session["admin_role"] = user["role"] session["admin_login"] = user["username"] session["admin_scopes"] = "news" return redirect(NEWS_ADMIN_PATH) flash("Неверный логин или пароль", "error") all_news = [] return render_template("news_admin.html", all_news=all_news, is_news_editor=False, admin_login="") if not require_admin(): return render_template("news_admin.html", all_news=[], is_news_editor=False, admin_login="") if request.method == "POST": if action == "logout": session.clear() return redirect(NEWS_ADMIN_PATH) if not is_news_editor(): flash("Нет прав для управления новостями.", "error") return redirect(NEWS_ADMIN_PATH) if action == "create_news": title = request.form.get("title", "").strip() body = request.form.get("body", "").strip() if not title or not body: flash("Заполните заголовок и текст новости.", "error") else: import os as _os slug = _make_slug(title) image_path = None f = request.files.get("image") if f and f.filename: ext = _os.path.splitext(f.filename)[1].lower() if ext in (".jpg", ".jpeg", ".png", ".webp", ".gif"): fname = f"news_{secrets.token_hex(8)}{ext}" save_dir = _os.path.join(BASE_DIR, "static", "news_images") _os.makedirs(save_dir, exist_ok=True) f.save(_os.path.join(save_dir, fname)) image_path = f"news_images/{fname}" try: create_news(title, body, slug, image_path) except Exception: slug = f"{slug}-{secrets.token_hex(3)}" create_news(title, body, slug, image_path) ping_indexnow(f"https://maps.4mont.ru/news/{slug}") flash("Новость опубликована.", "ok") elif action == "update_news": news_id = int(request.form.get("news_id", 0)) title = request.form.get("title", "").strip() body = request.form.get("body", "").strip() published = 1 if request.form.get("published") else 0 if news_id and title and body: import os as _os image_path = None f = request.files.get("image") if f and f.filename: ext = _os.path.splitext(f.filename)[1].lower() if ext in (".jpg", ".jpeg", ".png", ".webp", ".gif"): fname = f"news_{secrets.token_hex(8)}{ext}" save_dir = _os.path.join(BASE_DIR, "static", "news_images") _os.makedirs(save_dir, exist_ok=True) f.save(_os.path.join(save_dir, fname)) image_path = f"news_images/{fname}" raw_dt = request.form.get("created_at_edit", "").strip() created_at_val = None if raw_dt: try: from datetime import datetime as _dt created_at_val = _dt.strptime(raw_dt, "%d.%m.%Y %H:%M").strftime("%Y-%m-%d %H:%M:%S") except ValueError: pass update_news(news_id, title, body, published, image_path, created_at_val) flash("Новость обновлена.", "ok") elif action == "delete_news": news_id = int(request.form.get("news_id", 0)) if news_id: delete_news(news_id) flash("Новость удалена.", "ok") elif action == "toggle_published": news_id = int(request.form.get("news_id", 0)) published = 1 if request.form.get("published") else 0 if news_id: conn = get_db() conn.execute("UPDATE news SET published=? WHERE id=?", (published, news_id)) conn.commit() conn.close() return redirect(NEWS_ADMIN_PATH) per_page = 10 adm_page = max(1, request.args.get("page", 1, type=int)) adm_offset = (adm_page - 1) * per_page total_news = get_news_count(published_only=False) if is_news_editor() else 0 total_adm_pages = max(1, (total_news + per_page - 1) // per_page) all_news = get_all_news_admin_paged(per_page, adm_offset) if is_news_editor() else [] return render_template("news_admin.html", all_news=all_news, adm_page=adm_page, total_adm_pages=total_adm_pages, is_news_editor=is_news_editor(), admin_login=session.get("admin_login", ""), ) @bp.get("/events") def events_list(): from flask import abort events = get_all_events() base_url = _base_url() return render_template("events_list.html", events=events, base_url=base_url) @bp.get("/events/") def event_article(slug: str): from flask import abort event = get_event_by_slug(slug) if not event: abort(404) base_url = _base_url() return render_template("events_article.html", event=event, base_url=base_url) @bp.get("/health") def health(): return {"status": "ok"} @bp.get("/yandex_8addde0be1e0ee72.html") def yandex_verify(): from flask import Response return Response( '' 'Verification: 8addde0be1e0ee72', mimetype="text/html" ) @bp.post("/api/contact") def contact(): import re as _re data = request.get_json(silent=True) or {} name = str(data.get("name", "")).strip() email = str(data.get("email", "")).strip() phone = str(data.get("phone", "")).strip() text = str(data.get("text", "")).strip() if not all([name, email, phone, text]): return jsonify({"detail": "Заполните все обязательные поля"}), 422 if not _re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", email): return jsonify({"detail": "Некорректный email"}), 422 if not _re.match(r"^[\+\d][\d\s\-\(\)]{6,18}$", phone): return jsonify({"detail": "Некорректный номер телефона"}), 422 # Geo lookup ip = request.remote_addr or "" geo_line = "" try: geo_url = f"http://ip-api.com/json/{ip}?lang=ru&fields=status,country,regionName,city,query" geo_req = Request(geo_url, headers={"User-Agent": "Mozilla/5.0"}) with urlopen(geo_req, timeout=5) as geo_resp: geo = json.loads(geo_resp.read()) if geo.get("status") == "success": parts = [p for p in [geo.get("country"), geo.get("regionName"), geo.get("city")] if p] geo_line = f"🌍 *Местоположение:* {', '.join(parts)}\n🖥 *IP:* {geo.get('query', ip)}\n" except Exception: geo_line = f"🖥 *IP:* {ip}\n" if ip else "" divider = "━" * 22 msg = ( f"🔔 *Сообщение через форму*\n{divider}\n\n" f"👤 *Имя:* {name}\n" f"📧 *Email:* {email}\n" f"📱 *Телефон:* {phone}\n" f"{geo_line}\n" f"💬 *Сообщение:*\n{text}" ) payload = json.dumps({ "chat_id": TELEGRAM_CHAT_ID, "text": msg, "parse_mode": "Markdown", }).encode() url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" try: req = Request(url, data=payload, headers={"Content-Type": "application/json"}) with urlopen(req, timeout=10) as resp: resp.read() except Exception as e: return jsonify({"detail": f"Ошибка отправки: {e}"}), 502 return jsonify({"ok": True}) @bp.get("/assets/mont-logo") def mont_logo(): return send_from_directory(BASE_DIR, "mont_logo.png")