Files
mont_vendor_maps/zkart_app/routes.py
T

1263 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import json
import secrets
import sqlite3
from datetime import datetime
from html import escape
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from zoneinfo import ZoneInfo
from flask import Blueprint, flash, jsonify, redirect, render_template, request, send_from_directory, session
from .config import (
ADMIN_LOGIN,
ADMIN_PASSWORD,
ADMIN_PATH,
BASE_DIR,
NEWS_API_TOKEN,
SUPER_ADMIN_LOGIN,
SUPER_ADMIN_PASSWORD,
TELEGRAM_BOT_TOKEN,
TELEGRAM_CHAT_ID,
)
from .db import (fetch_scope_data, get_db, scope_tables,
get_latest_news, get_news_list, get_news_count, get_news_by_slug, get_news_by_slug_any, get_all_news_admin, get_all_news_admin_paged,
create_news, update_news, delete_news,
get_upcoming_events, get_all_events, get_event_by_slug)
import re as _re_slug
_TRANSLIT_MAP = {
'а':'a','б':'b','в':'v','г':'g','д':'d','е':'e','ё':'yo','ж':'zh',
'з':'z','и':'i','й':'y','к':'k','л':'l','м':'m','н':'n','о':'o',
'п':'p','р':'r','с':'s','т':'t','у':'u','ф':'f','х':'kh','ц':'ts',
'ч':'ch','ш':'sh','щ':'shch','ъ':'','ы':'y','ь':'','э':'e','ю':'yu','я':'ya',
}
def _translit(s: str) -> str:
return ''.join(_TRANSLIT_MAP.get(c, c) for c in s.lower())
def _make_slug(title: str) -> str:
import datetime as _dt
t = _translit(title)
t = _re_slug.sub(r'[^a-z0-9]+', '-', t)[:50].strip('-')
ts = int(_dt.datetime.now().timestamp())
return f'{t}-{ts}' if t else f'news-{ts}'
bp = Blueprint("main", __name__)
MOSCOW_TZ = ZoneInfo("Europe/Moscow")
def _base_url() -> str:
return f"https://{request.host}"
def require_admin() -> bool:
return bool(session.get("is_admin"))
def is_news_editor() -> bool:
return session.get("admin_role") in ("news_editor", "super", "admin")
def is_super_admin() -> bool:
return session.get("admin_role") == "super"
def allowed_scopes() -> set[str]:
if is_super_admin():
return {"infra", "ib"}
raw = session.get("admin_scopes") or "infra,ib"
scopes = {item.strip() for item in raw.split(",") if item.strip()}
return scopes or {"infra"}
def can_access_scope(scope: str) -> bool:
return scope in allowed_scopes()
def parse_int(form_value: str | None) -> int | None:
if not form_value:
return None
try:
return int(form_value)
except ValueError:
return None
def redirect_admin(scope: str):
return redirect(f"{ADMIN_PATH}?scope={scope}")
def make_admin_password() -> str:
return secrets.token_urlsafe(9)
def parse_admin_scopes() -> str:
scopes = []
if request.form.get("allow_infra"):
scopes.append("infra")
if request.form.get("allow_ib"):
scopes.append("ib")
return ",".join(scopes)
def local_now() -> str:
return datetime.now(MOSCOW_TZ).strftime("%Y-%m-%d %H:%M:%S")
def scope_label(scope: str) -> str:
return "ИБ" if scope == "ib" else "Инфраструктура"
def rebuild_vendor_categories(conn, tables: dict[str, str]) -> None:
conn.execute(f"DELETE FROM {tables['vendor_categories']}")
conn.execute(
f"""
INSERT OR IGNORE INTO {tables['vendor_categories']}(vendor_id, category_id)
SELECT DISTINCT p.vendor_id, pc.category_id
FROM {tables['products']} p
JOIN {tables['product_categories']} pc ON pc.product_id = p.id
"""
)
def collect_change_payload(action: str, scope: str, conn, tables: dict[str, str]) -> dict | None:
if action == "add_vendor":
name = (request.form.get("name") or "").strip()
return {"name": name} if name else None
if action == "add_category":
name = (request.form.get("name") or "").strip()
return {"name": name} if name else None
if action == "add_product":
vendor_id = parse_int(request.form.get("vendor_id"))
name = (request.form.get("name") or "").strip()
url = (request.form.get("url") or "").strip()
if not vendor_id or not name:
return None
vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
return {
"vendor_id": vendor_id,
"vendor_name": vendor["name"] if vendor else "",
"name": name,
"url": url,
}
if action == "update_product":
product_id = parse_int(request.form.get("product_id"))
name = (request.form.get("name") or "").strip()
url = (request.form.get("url") or "").strip()
if not product_id or not name:
return None
product = conn.execute(
f"""
SELECT p.name, p.url, v.name AS vendor_name
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
WHERE p.id = ?
""",
(product_id,),
).fetchone()
return {
"product_id": product_id,
"vendor_name": product["vendor_name"] if product else "",
"old_name": product["name"] if product else "",
"old_url": product["url"] if product else "",
"name": name,
"url": url,
}
if action == "update_vendor":
vendor_id = parse_int(request.form.get("vendor_id"))
name = (request.form.get("name") or "").strip()
website = (request.form.get("website") or "").strip()
mont_page = (request.form.get("mont_page") or "").strip()
if not vendor_id or not name:
return None
vendor = conn.execute(f"SELECT name, website, mont_page FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
return {
"vendor_id": vendor_id,
"old_name": vendor["name"] if vendor else "",
"name": name,
"website": website,
"mont_page": mont_page,
}
if action == "delete_vendor":
vendor_id = parse_int(request.form.get("vendor_id"))
if not vendor_id:
return None
vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
return {"vendor_id": vendor_id, "vendor_name": vendor["name"] if vendor else ""}
if action == "delete_category":
category_id = parse_int(request.form.get("category_id"))
if not category_id:
return None
category = conn.execute(f"SELECT name FROM {tables['categories']} WHERE id = ?", (category_id,)).fetchone()
return {"category_id": category_id, "category_name": category["name"] if category else ""}
if action == "delete_product":
product_id = parse_int(request.form.get("product_id"))
if not product_id:
return None
product = conn.execute(
f"""
SELECT p.name, v.name AS vendor_name
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
WHERE p.id = ?
""",
(product_id,),
).fetchone()
return {
"product_id": product_id,
"product_name": product["name"] if product else "",
"vendor_name": product["vendor_name"] if product else "",
}
if action == "save_matrix":
products = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['products']}")]
categories = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['categories']}")]
pairs: list[list[int]] = []
for product_id in products:
for category_id in categories:
if request.form.get(f"pc_{product_id}_{category_id}"):
pairs.append([product_id, category_id])
current_pairs = {
(r["product_id"], r["category_id"])
for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}")
}
new_pairs = {tuple(pair) for pair in pairs}
changed_pairs = sorted((new_pairs - current_pairs) | (current_pairs - new_pairs))
product_labels = {
r["id"]: f"{r['vendor_name']} :: {r['name']}"
for r in conn.execute(
f"""
SELECT p.id, p.name, v.name AS vendor_name
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
"""
)
}
category_labels = {
r["id"]: r["name"]
for r in conn.execute(f"SELECT id, name FROM {tables['categories']}")
}
added = []
removed = []
for product_id, category_id in changed_pairs:
item = {
"product_id": product_id,
"category_id": category_id,
"product": product_labels.get(product_id, str(product_id)),
"category": category_labels.get(category_id, str(category_id)),
}
if (product_id, category_id) in new_pairs:
added.append(item)
else:
removed.append(item)
return {"pairs": pairs, "added": added, "removed": removed}
return None
def apply_change(conn, tables: dict[str, str], action: str, payload: dict) -> None:
if action == "add_vendor":
conn.execute(f"INSERT OR IGNORE INTO {tables['vendors']}(name) VALUES (?)", (payload["name"],))
elif action == "add_category":
conn.execute(f"INSERT OR IGNORE INTO {tables['categories']}(name) VALUES (?)", (payload["name"],))
elif action == "add_product":
conn.execute(
f"INSERT OR IGNORE INTO {tables['products']}(vendor_id, name, url) VALUES (?, ?, ?)",
(payload["vendor_id"], payload["name"], payload.get("url") or None),
)
if payload.get("url"):
conn.execute(
f"UPDATE {tables['products']} SET url = ? WHERE vendor_id = ? AND name = ?",
(payload["url"], payload["vendor_id"], payload["name"]),
)
elif action == "update_product":
conn.execute(
f"UPDATE {tables['products']} SET name = ?, url = ? WHERE id = ?",
(payload["name"], payload.get("url") or None, payload["product_id"]),
)
elif action == "update_vendor":
conn.execute(
f"UPDATE {tables['vendors']} SET name = ?, website = ?, mont_page = ? WHERE id = ?",
(payload["name"], payload.get("website") or None, payload.get("mont_page") or None, payload["vendor_id"]),
)
elif action == "delete_vendor":
conn.execute(f"DELETE FROM {tables['vendors']} WHERE id = ?", (payload["vendor_id"],))
elif action == "delete_category":
conn.execute(f"DELETE FROM {tables['categories']} WHERE id = ?", (payload["category_id"],))
elif action == "delete_product":
conn.execute(f"DELETE FROM {tables['products']} WHERE id = ?", (payload["product_id"],))
elif action == "save_matrix":
conn.execute(f"DELETE FROM {tables['product_categories']}")
conn.executemany(
f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)",
[tuple(pair) for pair in payload.get("pairs", [])],
)
rebuild_vendor_categories(conn, tables)
def queue_change(conn, scope: str, action: str, payload: dict) -> int:
cur = conn.execute(
"""
INSERT INTO pending_changes(scope, action, payload, created_by, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(
scope,
action,
json.dumps(payload, ensure_ascii=False),
session.get("admin_login") or ADMIN_LOGIN,
local_now(),
),
)
return int(cur.lastrowid)
def describe_change(action: str, payload: dict) -> str:
if action == "add_vendor":
return f"Добавить вендора: {payload.get('name')}"
if action == "add_category":
return f"Добавить категорию: {payload.get('name')}"
if action == "add_product":
parts = [
"Добавить продукт",
f"Вендор: {payload.get('vendor_name') or payload.get('vendor_id')}",
f"Название: {payload.get('name')}",
]
if payload.get("url"):
parts.append(f"URL: {payload.get('url')}")
return "\n".join(parts)
if action == "update_product":
return "\n".join(
[
"Изменить продукт",
f"Вендор: {payload.get('vendor_name') or '-'}",
f"Было: {payload.get('old_name') or '-'}",
f"Стало: {payload.get('name')}",
f"URL был: {payload.get('old_url') or '-'}",
f"URL станет: {payload.get('url') or '-'}",
]
)
if action == "update_vendor":
parts = [f"Изменить вендора: {payload.get('old_name') or payload.get('vendor_id')}{payload.get('name')}"]
if payload.get("website"):
parts.append(f"Сайт: {payload['website']}")
if payload.get("mont_page"):
parts.append(f"MONT: {payload['mont_page']}")
return "\n".join(parts)
if action == "delete_vendor":
return f"Удалить вендора: {payload.get('vendor_name') or payload.get('vendor_id')}"
if action == "delete_category":
return f"Удалить категорию: {payload.get('category_name') or payload.get('category_id')}"
if action == "delete_product":
product = payload.get("product_name") or payload.get("product_id")
vendor = payload.get("vendor_name") or "-"
return f"Удалить продукт: {vendor} :: {product}"
if action == "save_matrix":
lines = [
f"Изменить матрицу: связей выбрано {len(payload.get('pairs', []))}",
f"Добавлено связей: {len(payload.get('added', []))}",
f"Снято связей: {len(payload.get('removed', []))}",
]
added = payload.get("added", [])
removed = payload.get("removed", [])
if added:
lines.append("")
lines.append("Добавить:")
lines.extend([f"+ {item.get('product')} -> {item.get('category')}" for item in added[:80]])
if len(added) > 80:
lines.append(f"... еще {len(added) - 80}")
if removed:
lines.append("")
lines.append("Снять:")
lines.extend([f"- {item.get('product')} -> {item.get('category')}" for item in removed[:80]])
if len(removed) > 80:
lines.append(f"... еще {len(removed) - 80}")
return "\n".join(lines)
return json.dumps(payload, ensure_ascii=False, indent=2)
def change_title(action: str) -> str:
return {
"add_vendor": "Добавление вендора",
"add_category": "Добавление категории",
"add_product": "Добавление продукта",
"update_vendor": "Изменение вендора",
"update_product": "Изменение продукта",
"delete_vendor": "Удаление вендора",
"delete_category": "Удаление категории",
"delete_product": "Удаление продукта",
"save_matrix": "Изменение матрицы",
}.get(action, action)
def notify_pending_change(change_id: int, scope: str, action: str, payload: dict) -> bool:
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
return False
admin_url = request.url_root.rstrip("/") + f"{ADMIN_PATH}?scope={scope}"
raw_details = describe_change(action, payload)
if len(raw_details) > 3000:
raw_details = raw_details[:3000] + "\n... сокращено, полный список в админке"
details = escape(raw_details)
text = "\n".join(
[
"<b>Zkart: нужно подтверждение</b>",
"",
f"<b>Заявка:</b> #{change_id}",
f"<b>Тип:</b> {escape(change_title(action))}",
f"<b>Раздел:</b> {escape(scope_label(scope))}",
f"<b>Админ:</b> {escape(session.get('admin_login') or ADMIN_LOGIN)}",
"",
f"<b>Изменение:</b>\n<pre>{details}</pre>",
"",
f"<a href=\"{escape(admin_url)}\">Открыть админку</a>",
]
)
data = urlencode(
{
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "1",
}
).encode("utf-8")
req = Request(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urlopen(req, timeout=5) as response:
return 200 <= response.status < 300
except Exception:
return False
def approve_pending_change(conn, row, reviewed_by: str) -> bool:
savepoint = f"approve_change_{int(row['id'])}"
conn.execute(f"SAVEPOINT {savepoint}")
try:
change_tables = scope_tables(row["scope"])
apply_change(conn, change_tables, row["action"], json.loads(row["payload"]))
conn.execute(
"""
UPDATE pending_changes
SET status = 'approved', reviewed_by = ?, reviewed_at = ?
WHERE id = ?
""",
(reviewed_by, local_now(), row["id"]),
)
conn.execute(f"RELEASE SAVEPOINT {savepoint}")
return True
except (KeyError, TypeError, ValueError, sqlite3.DatabaseError):
conn.execute(f"ROLLBACK TO SAVEPOINT {savepoint}")
conn.execute(f"RELEASE SAVEPOINT {savepoint}")
return False
def prepare_pending_change(change: dict) -> dict:
change["payload"] = json.loads(change["payload"])
change["title"] = change_title(change["action"])
change["scope_label"] = scope_label(change["scope"])
change["description"] = describe_change(change["action"], change["payload"])
return change
def fetch_admin_matrix(conn, tables: dict[str, str]) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]:
vendors = [dict(r) for r in conn.execute(
f"SELECT id, name, COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page "
f"FROM {tables['vendors']} ORDER BY lower(name)"
)]
categories = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['categories']} ORDER BY lower(name)")]
products = [
dict(r)
for r in conn.execute(
f"""
SELECT p.id, p.name, p.vendor_id, v.name AS vendor_name
, p.url
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
ORDER BY lower(v.name), lower(p.name)
"""
)
]
links = {
(r["product_id"], r["category_id"])
for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}")
}
return vendors, categories, products, links
def apply_pending_overlay(
vendors: list[dict],
categories: list[dict],
products: list[dict],
links: set[tuple[int, int]],
pending_changes: list[dict],
) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]:
vendors = [dict(item) for item in vendors]
categories = [dict(item) for item in categories]
products = [dict(item) for item in products]
links = set(links)
for change in sorted(pending_changes, key=lambda item: item["id"]):
if change["scope"] not in {"infra", "ib"}:
continue
action = change["action"]
payload = change["payload"]
if action == "add_vendor":
vendors.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True})
elif action == "add_category":
categories.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True})
elif action == "add_product":
products.append(
{
"id": -change["id"],
"name": payload.get("name", ""),
"vendor_id": payload.get("vendor_id"),
"vendor_name": payload.get("vendor_name", ""),
"url": payload.get("url") or None,
"pending": True,
}
)
elif action == "update_product":
for product in products:
if product["id"] == payload.get("product_id"):
product["name"] = payload.get("name", product["name"])
product["url"] = payload.get("url") or None
product["pending"] = True
break
elif action == "update_vendor":
vendor_id = payload.get("vendor_id")
for vendor in vendors:
if vendor["id"] == vendor_id:
vendor["name"] = payload.get("name", vendor["name"])
vendor["website"] = payload.get("website") or ""
vendor["mont_page"] = payload.get("mont_page") or ""
vendor["pending"] = True
break
elif action == "delete_vendor":
vendor_id = payload.get("vendor_id")
vendors = [item for item in vendors if item["id"] != vendor_id]
products = [item for item in products if item.get("vendor_id") != vendor_id]
elif action == "delete_category":
category_id = payload.get("category_id")
categories = [item for item in categories if item["id"] != category_id]
links = {pair for pair in links if pair[1] != category_id}
elif action == "delete_product":
product_id = payload.get("product_id")
products = [item for item in products if item["id"] != product_id]
links = {pair for pair in links if pair[0] != product_id}
elif action == "save_matrix":
links = {tuple(pair) for pair in payload.get("pairs", [])}
vendors.sort(key=lambda item: item["name"].lower())
categories.sort(key=lambda item: item["name"].lower())
products.sort(key=lambda item: (item.get("vendor_name", "").lower(), item.get("name", "").lower()))
return vendors, categories, products, links
@bp.get("/")
def index():
conn = get_db()
ssr_vendors = [r[0] for r in conn.execute("SELECT name FROM vendors ORDER BY lower(name)")]
ssr_categories = [r[0] for r in conn.execute(
"SELECT name FROM (SELECT name FROM categories UNION SELECT name FROM ib_categories) ORDER BY lower(name)"
)]
conn.close()
latest_news = get_latest_news(3)
canonical_url = _base_url()
upcoming_events = get_upcoming_events(3)
return render_template("index.html",
ssr_vendors=ssr_vendors,
ssr_categories=ssr_categories,
latest_news=latest_news,
upcoming_events=upcoming_events,
canonical_url=canonical_url,
is_news_editor=is_news_editor(),
)
@bp.get("/robots.txt")
def robots_txt():
from flask import Response
body = (
"User-agent: *\n"
"Allow: /\n"
f"Disallow: {ADMIN_PATH}\n"
"Disallow: /sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj\n"
"\n"
"Sitemap: https://maps.4mont.ru/sitemap.xml\n"
)
return Response(body, mimetype="text/plain")
INDEXNOW_KEY = "a3f8c2e1b7d94056a3f8c2e1b7d94056"
@bp.get("/sitemap.xml")
def sitemap_xml():
from flask import Response
base = "https://maps.4mont.ru"
conn = get_db()
vendor_slugs = [r[0] for r in conn.execute(
"SELECT slug FROM vendors WHERE slug IS NOT NULL "
"UNION SELECT slug FROM ib_vendors WHERE slug IS NOT NULL"
)]
conn.close()
urls = [f' <url><loc>{base}/</loc><changefreq>daily</changefreq><priority>1.0</priority></url>']
for slug in vendor_slugs:
urls.append(f' <url><loc>{base}/vendor/{slug}</loc><changefreq>monthly</changefreq><priority>0.7</priority></url>')
body = (
'<?xml version="1.0" encoding="UTF-8"?>\n'
'<urlset xmlns="http://www.sitemaps.org/schemas/sitemap/0.9">\n'
+ '\n'.join(urls) + '\n</urlset>'
)
return Response(body, mimetype="application/xml")
@bp.get(f"/{INDEXNOW_KEY}.txt")
def indexnow_key_file():
from flask import Response
return Response(INDEXNOW_KEY, mimetype="text/plain")
def ping_indexnow(page_url: str):
"""Notify Yandex about a new/updated URL via IndexNow."""
import threading, urllib.request, json as _json
def _ping():
try:
payload = _json.dumps({
"host": "maps.4mont.ru",
"key": INDEXNOW_KEY,
"urlList": [page_url]
}).encode()
req = urllib.request.Request(
"https://yandex.com/indexnow",
data=payload,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as r:
pass
except Exception:
pass
threading.Thread(target=_ping, daemon=True).start()
@bp.get("/vendor/<slug>")
def vendor_page(slug: str):
from flask import abort
conn = get_db()
# Search in both tables, prefer infra
vendor = conn.execute(
"SELECT id, name, COALESCE(logo,'') as logo, COALESCE(description,'') as description, "
"COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page, 'infra' as scope "
"FROM vendors WHERE slug = ?", (slug,)
).fetchone()
if not vendor:
vendor = conn.execute(
"SELECT id, name, COALESCE(logo,'') as logo, COALESCE(description,'') as description, "
"COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page, 'ib' as scope "
"FROM ib_vendors WHERE slug = ?", (slug,)
).fetchone()
if not vendor:
conn.close()
abort(404)
vendor = dict(vendor)
tables = scope_tables(vendor['scope'])
products = [dict(r) for r in conn.execute(
f"SELECT name, COALESCE(url,'') as url FROM {tables['products']} "
f"WHERE vendor_id = ? ORDER BY lower(name)", (vendor['id'],)
)]
categories = [r[0] for r in conn.execute(
f"SELECT c.name FROM {tables['categories']} c "
f"JOIN {tables['vendor_categories']} vc ON vc.category_id = c.id "
f"WHERE vc.vendor_id = ? ORDER BY lower(c.name)", (vendor['id'],)
)]
conn.close()
base_url = _base_url()
canonical_url = f"{base_url}/vendor/{slug}"
return render_template("vendor.html",
vendor=vendor,
products=products,
categories=categories,
canonical_url=canonical_url,
base_url=base_url,
)
@bp.get("/api/data")
def api_data():
scope = (request.args.get("scope") or "infra").strip().lower()
if scope in {"ib", "sec", "security"}:
scope = "ib"
else:
scope = "infra"
return jsonify(fetch_scope_data(scope))
@bp.get("/sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj")
def legacy_admin_redirect():
raw_scope = (request.args.get("scope") or "infra").strip().lower()
scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra"
return redirect_admin(scope)
@bp.route(ADMIN_PATH, methods=["GET", "POST"])
def admin_login_or_panel():
conn = get_db()
raw_scope = (request.args.get("scope") or request.form.get("scope") or "infra").strip().lower()
scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra"
tables = scope_tables(scope)
if request.method == "POST" and not require_admin() and request.form.get("action") == "login":
username = request.form.get("username") or ""
password = request.form.get("password") or ""
admin_user = conn.execute(
"SELECT username, password, role, access_scopes FROM admin_users WHERE username = ?",
(username,),
).fetchone()
if admin_user and password == admin_user["password"]:
session["is_admin"] = True
session["admin_role"] = admin_user["role"]
session["admin_login"] = admin_user["username"]
session["admin_scopes"] = admin_user["access_scopes"]
conn.close()
return redirect(ADMIN_PATH)
conn.close()
return render_template("login.html", error="Неверный логин или пароль")
if not require_admin():
conn.close()
return render_template("login.html", error=None)
if not can_access_scope(scope):
conn.close()
return redirect_admin(next(iter(allowed_scopes())))
if request.method == "POST":
action = request.form.get("action", "")
if action == "logout":
session.pop("is_admin", None)
session.pop("admin_role", None)
session.pop("admin_login", None)
session.pop("admin_scopes", None)
conn.close()
return redirect(ADMIN_PATH)
if not can_access_scope(scope):
flash("Нет доступа к этому разделу.", "error")
conn.close()
return redirect_admin(next(iter(allowed_scopes())))
if action == "approve_all_changes":
if not is_super_admin():
flash("Недостаточно прав для подтверждения изменений.", "error")
conn.close()
return redirect_admin(scope)
rows = conn.execute(
"SELECT * FROM pending_changes WHERE status = 'pending' ORDER BY id"
).fetchall()
reviewed_by = session.get("admin_login") or SUPER_ADMIN_LOGIN
approved = 0
failed = 0
for row in rows:
if approve_pending_change(conn, row, reviewed_by):
approved += 1
else:
failed += 1
conn.commit()
conn.close()
if failed:
flash(f"Утверждено заявок: {approved}. Не удалось применить: {failed}.", "error")
else:
flash(f"Все заявки утверждены: {approved}.", "ok")
return redirect_admin(scope)
if action == "create_admin":
if not is_super_admin():
flash("Недостаточно прав для управления админами.", "error")
conn.close()
return redirect_admin(scope)
username = (request.form.get("username") or "").strip()
new_role = request.form.get("new_role", "admin")
if new_role not in ("admin", "news_editor"):
new_role = "admin"
access = "news" if new_role == "news_editor" else parse_admin_scopes()
if not username or not access:
flash("Укажите логин и минимум один раздел доступа.", "error")
conn.close()
return redirect_admin(scope)
password = make_admin_password()
try:
conn.execute(
"""
INSERT INTO admin_users(username, password, role, access_scopes, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(username, password, new_role, access, local_now()),
)
conn.commit()
session["created_admin_credentials"] = {
"username": username,
"password": password,
"access": access,
}
flash(f"Пользователь создан. Логин: {username}. Пароль: {password}", "ok")
except sqlite3.IntegrityError:
conn.rollback()
session.pop("created_admin_credentials", None)
flash("Пользователь с таким логином уже существует.", "error")
conn.close()
return redirect_admin(scope)
if action == "delete_admin":
if not is_super_admin():
flash("Недостаточно прав для управления админами.", "error")
conn.close()
return redirect_admin(scope)
admin_id = parse_int(request.form.get("admin_id"))
if admin_id:
target = conn.execute("SELECT role, username FROM admin_users WHERE id = ?", (admin_id,)).fetchone()
if target and target["role"] == "super":
flash("Супер-админа удалить нельзя.", "error")
elif target and target["username"] == session.get("admin_login"):
flash("Нельзя удалить текущего пользователя.", "error")
else:
conn.execute("DELETE FROM admin_users WHERE id = ? AND role <> 'super'", (admin_id,))
conn.commit()
flash("Админ удален.", "ok")
conn.close()
return redirect_admin(scope)
if action in {"approve_change", "reject_change"}:
if not is_super_admin():
flash("Недостаточно прав для подтверждения изменений.", "error")
conn.close()
return redirect_admin(scope)
change_id = parse_int(request.form.get("change_id"))
row = None
if change_id:
row = conn.execute(
"SELECT * FROM pending_changes WHERE id = ? AND status = 'pending'",
(change_id,),
).fetchone()
if not row:
flash("Заявка не найдена или уже обработана.", "error")
conn.close()
return redirect_admin(scope)
if action == "reject_change":
conn.execute(
"""
UPDATE pending_changes
SET status = 'rejected', reviewed_by = ?, reviewed_at = ?
WHERE id = ?
""",
(session.get("admin_login") or SUPER_ADMIN_LOGIN, local_now(), change_id),
)
flash("Заявка отклонена.", "ok")
else:
if approve_pending_change(conn, row, session.get("admin_login") or SUPER_ADMIN_LOGIN):
flash("Заявка утверждена и применена.", "ok")
else:
flash("Не удалось применить заявку. Проверьте, не изменились ли связанные записи.", "error")
conn.close()
return redirect_admin(scope)
conn.commit()
conn.close()
return redirect_admin(scope)
payload = collect_change_payload(action, scope, conn, tables)
if payload is not None:
if is_super_admin():
try:
apply_change(conn, tables, action, payload)
flash("Изменения сохранены.", "ok")
except sqlite3.IntegrityError:
conn.rollback()
flash("Не удалось сохранить: возможно, такое название уже есть у выбранного вендора.", "error")
conn.close()
return redirect_admin(scope)
else:
change_id = queue_change(conn, scope, action, payload)
notify_pending_change(change_id, scope, action, payload)
flash("Изменения отправлены супер админу. Они вступят в силу после утверждения.", "ok")
conn.commit()
conn.close()
return redirect_admin(scope)
vendors, categories, products, links = fetch_admin_matrix(conn, tables)
if is_super_admin():
pending_rows = conn.execute(
"""
SELECT id, scope, action, payload, created_by, created_at
FROM pending_changes
WHERE status = 'pending'
ORDER BY id DESC
"""
)
else:
pending_rows = conn.execute(
"""
SELECT id, scope, action, payload, created_by, created_at
FROM pending_changes
WHERE status = 'pending' AND created_by = ?
ORDER BY id DESC
""",
(session.get("admin_login") or ADMIN_LOGIN,),
)
pending_changes = [prepare_pending_change(dict(r)) for r in pending_rows]
if not is_super_admin():
scoped_pending = [change for change in pending_changes if change["scope"] == scope]
vendors, categories, products, links = apply_pending_overlay(vendors, categories, products, links, scoped_pending)
admin_users = [
dict(r)
for r in conn.execute(
"""
SELECT id, username, role, access_scopes, created_at
FROM admin_users
ORDER BY role DESC, lower(username)
"""
)
]
created_admin_credentials = session.pop("created_admin_credentials", None)
conn.close()
return render_template(
"admin.html",
vendors=vendors,
categories=categories,
products=products,
links=links,
scope=scope,
is_super_admin=is_super_admin(),
allowed_scopes=allowed_scopes(),
pending_changes=pending_changes,
admin_users=admin_users,
created_admin_credentials=created_admin_credentials,
)
@bp.get("/news")
def news_list():
base_url = _base_url()
per_page = 10
page = max(1, request.args.get("page", 1, type=int))
offset = (page - 1) * per_page
articles = get_news_list(limit=per_page, offset=offset)
total = get_news_count()
total_pages = max(1, (total + per_page - 1) // per_page)
return render_template("news_list.html", articles=articles, base_url=base_url,
page=page, total_pages=total_pages,
is_news_editor=is_news_editor())
@bp.post("/api/news/publish")
def api_news_publish():
token = request.headers.get("X-API-Token", "") or request.json.get("token", "") if request.is_json else ""
if not NEWS_API_TOKEN or token != NEWS_API_TOKEN:
return jsonify({"error": "Unauthorized"}), 401
data = request.get_json(silent=True) or {}
title = str(data.get("title", "")).strip()
body = str(data.get("body", "")).strip()
image_url = str(data.get("image_url", "")).strip()
if not title or not body:
return jsonify({"error": "title и body обязательны"}), 422
import os as _os
from urllib.request import urlretrieve as _urlretrieve
slug = _make_slug(title)
image_path = None
if image_url:
try:
ext = _os.path.splitext(image_url.split("?")[0])[1].lower()
if ext not in (".jpg", ".jpeg", ".png", ".webp", ".gif"):
ext = ".jpg"
fname = f"news_{secrets.token_hex(8)}{ext}"
save_dir = _os.path.join(BASE_DIR, "static", "news_images")
_os.makedirs(save_dir, exist_ok=True)
_urlretrieve(image_url, _os.path.join(save_dir, fname))
image_path = f"news_images/{fname}"
except Exception:
image_path = None
try:
create_news(title, body, slug, image_path)
except Exception:
slug = f"{slug}-{secrets.token_hex(3)}"
create_news(title, body, slug, image_path)
ping_indexnow(f"https://maps.4mont.ru/news/{slug}")
return jsonify({"ok": True, "slug": slug, "url": f"/news/{slug}"}), 201
@bp.delete("/api/news/<slug>")
def api_news_delete(slug: str):
token = request.headers.get("X-API-Token", "")
if not NEWS_API_TOKEN or token != NEWS_API_TOKEN:
return jsonify({"error": "Unauthorized"}), 401
article = get_news_by_slug_any(slug)
if not article:
return jsonify({"error": "Not found"}), 404
conn = get_db()
conn.execute("UPDATE news SET published=0 WHERE id=?", (article["id"],))
conn.commit()
conn.close()
return jsonify({"ok": True, "unpublished_slug": slug}), 200
@bp.get("/news/<slug>")
def news_article(slug: str):
from flask import abort
article = get_news_by_slug(slug)
if not article:
abort(404)
base_url = _base_url()
return render_template("news_article.html", article=article, base_url=base_url,
is_news_editor=is_news_editor())
NEWS_ADMIN_PATH = "/news-admin"
@bp.route(NEWS_ADMIN_PATH, methods=["GET", "POST"])
def news_admin():
action = request.form.get("action", "")
# Handle login before auth check
if request.method == "POST" and action == "news_login":
username = request.form.get("username", "").strip()
password = request.form.get("password", "").strip()
conn = get_db()
user = conn.execute(
"SELECT username, password, role FROM admin_users WHERE username=?", (username,)
).fetchone()
conn.close()
if user and password == user["password"] and user["role"] in ("news_editor", "super", "admin"):
session["is_admin"] = True
session["admin_role"] = user["role"]
session["admin_login"] = user["username"]
session["admin_scopes"] = "news"
return redirect(NEWS_ADMIN_PATH)
flash("Неверный логин или пароль", "error")
all_news = []
return render_template("news_admin.html", all_news=all_news,
is_news_editor=False, admin_login="")
if not require_admin():
return render_template("news_admin.html", all_news=[],
is_news_editor=False, admin_login="")
if request.method == "POST":
if action == "logout":
session.clear()
return redirect(NEWS_ADMIN_PATH)
if not is_news_editor():
flash("Нет прав для управления новостями.", "error")
return redirect(NEWS_ADMIN_PATH)
if action == "create_news":
title = request.form.get("title", "").strip()
body = request.form.get("body", "").strip()
if not title or not body:
flash("Заполните заголовок и текст новости.", "error")
else:
import os as _os
slug = _make_slug(title)
image_path = None
f = request.files.get("image")
if f and f.filename:
ext = _os.path.splitext(f.filename)[1].lower()
if ext in (".jpg", ".jpeg", ".png", ".webp", ".gif"):
fname = f"news_{secrets.token_hex(8)}{ext}"
save_dir = _os.path.join(BASE_DIR, "static", "news_images")
_os.makedirs(save_dir, exist_ok=True)
f.save(_os.path.join(save_dir, fname))
image_path = f"news_images/{fname}"
try:
create_news(title, body, slug, image_path)
except Exception:
slug = f"{slug}-{secrets.token_hex(3)}"
create_news(title, body, slug, image_path)
ping_indexnow(f"https://maps.4mont.ru/news/{slug}")
flash("Новость опубликована.", "ok")
elif action == "update_news":
news_id = int(request.form.get("news_id", 0))
title = request.form.get("title", "").strip()
body = request.form.get("body", "").strip()
published = 1 if request.form.get("published") else 0
if news_id and title and body:
import os as _os
image_path = None
f = request.files.get("image")
if f and f.filename:
ext = _os.path.splitext(f.filename)[1].lower()
if ext in (".jpg", ".jpeg", ".png", ".webp", ".gif"):
fname = f"news_{secrets.token_hex(8)}{ext}"
save_dir = _os.path.join(BASE_DIR, "static", "news_images")
_os.makedirs(save_dir, exist_ok=True)
f.save(_os.path.join(save_dir, fname))
image_path = f"news_images/{fname}"
raw_dt = request.form.get("created_at_edit", "").strip()
created_at_val = None
if raw_dt:
try:
from datetime import datetime as _dt
created_at_val = _dt.strptime(raw_dt, "%d.%m.%Y %H:%M").strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
pass
update_news(news_id, title, body, published, image_path, created_at_val)
flash("Новость обновлена.", "ok")
elif action == "delete_news":
news_id = int(request.form.get("news_id", 0))
if news_id:
delete_news(news_id)
flash("Новость удалена.", "ok")
elif action == "toggle_published":
news_id = int(request.form.get("news_id", 0))
published = 1 if request.form.get("published") else 0
if news_id:
conn = get_db()
conn.execute("UPDATE news SET published=? WHERE id=?", (published, news_id))
conn.commit()
conn.close()
return redirect(NEWS_ADMIN_PATH)
per_page = 10
adm_page = max(1, request.args.get("page", 1, type=int))
adm_offset = (adm_page - 1) * per_page
total_news = get_news_count(published_only=False) if is_news_editor() else 0
total_adm_pages = max(1, (total_news + per_page - 1) // per_page)
all_news = get_all_news_admin_paged(per_page, adm_offset) if is_news_editor() else []
return render_template("news_admin.html",
all_news=all_news,
adm_page=adm_page, total_adm_pages=total_adm_pages,
is_news_editor=is_news_editor(),
admin_login=session.get("admin_login", ""),
)
@bp.get("/events")
def events_list():
from flask import abort
events = get_all_events()
base_url = _base_url()
return render_template("events_list.html", events=events, base_url=base_url)
@bp.get("/events/<slug>")
def event_article(slug: str):
from flask import abort
event = get_event_by_slug(slug)
if not event:
abort(404)
base_url = _base_url()
return render_template("events_article.html", event=event, base_url=base_url)
@bp.get("/health")
def health():
return {"status": "ok"}
@bp.get("/yandex_8addde0be1e0ee72.html")
def yandex_verify():
from flask import Response
return Response(
'<html><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head>'
'<body>Verification: 8addde0be1e0ee72</body></html>',
mimetype="text/html"
)
@bp.post("/api/contact")
def contact():
import re as _re
data = request.get_json(silent=True) or {}
name = str(data.get("name", "")).strip()
email = str(data.get("email", "")).strip()
phone = str(data.get("phone", "")).strip()
text = str(data.get("text", "")).strip()
if not all([name, email, phone, text]):
return jsonify({"detail": "Заполните все обязательные поля"}), 422
if not _re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", email):
return jsonify({"detail": "Некорректный email"}), 422
if not _re.match(r"^[\+\d][\d\s\-\(\)]{6,18}$", phone):
return jsonify({"detail": "Некорректный номер телефона"}), 422
# Geo lookup
ip = request.remote_addr or ""
geo_line = ""
try:
geo_url = f"http://ip-api.com/json/{ip}?lang=ru&fields=status,country,regionName,city,query"
geo_req = Request(geo_url, headers={"User-Agent": "Mozilla/5.0"})
with urlopen(geo_req, timeout=5) as geo_resp:
geo = json.loads(geo_resp.read())
if geo.get("status") == "success":
parts = [p for p in [geo.get("country"), geo.get("regionName"), geo.get("city")] if p]
geo_line = f"🌍 *Местоположение:* {', '.join(parts)}\n🖥 *IP:* {geo.get('query', ip)}\n"
except Exception:
geo_line = f"🖥 *IP:* {ip}\n" if ip else ""
divider = "" * 22
msg = (
f"🔔 *Сообщение через форму*\n{divider}\n\n"
f"👤 *Имя:* {name}\n"
f"📧 *Email:* {email}\n"
f"📱 *Телефон:* {phone}\n"
f"{geo_line}\n"
f"💬 *Сообщение:*\n{text}"
)
payload = json.dumps({
"chat_id": TELEGRAM_CHAT_ID,
"text": msg,
"parse_mode": "Markdown",
}).encode()
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
try:
req = Request(url, data=payload, headers={"Content-Type": "application/json"})
with urlopen(req, timeout=10) as resp:
resp.read()
except Exception as e:
return jsonify({"detail": f"Ошибка отправки: {e}"}), 502
return jsonify({"ok": True})
@bp.get("/assets/mont-logo")
def mont_logo():
return send_from_directory(BASE_DIR, "mont_logo.png")