feat: news/events pages, pagination, cookie banner, MONT→MONT rename, logo update, admin improvements, API endpoints, dynamic links by domain

This commit is contained in:
2026-06-01 17:44:25 +03:00
parent 7c0c2ea14a
commit b1fde8344e
117 changed files with 3993 additions and 70 deletions
+6 -1
View File
@@ -8,9 +8,14 @@ DB_PATH = BASE_DIR / "matrix.db"
XLSX_PATH = BASE_DIR / "Z-card_РФ.xlsx"
INFRA_JSON_FILES = [BASE_DIR / "infra1", BASE_DIR / "infra2", BASE_DIR / "infra3", BASE_DIR / "infra4"]
ADMIN_PATH = "/sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj"
ADMIN_PATH = "/adminka"
ADMIN_LOGIN = "batman"
ADMIN_PASSWORD = "batmannotmont"
SUPER_ADMIN_LOGIN = os.getenv("SUPER_ADMIN_LOGIN", "Ruslan")
SUPER_ADMIN_PASSWORD = os.getenv("SUPER_ADMIN_PASSWORD", "utOgbZ09ruslanmaps")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID", "")
NEWS_API_TOKEN = os.getenv("NEWS_API_TOKEN", "")
SECRET_KEY = os.getenv("SECRET_KEY", "change-me-please")
ENABLE_BOOTSTRAP = os.getenv("ENABLE_BOOTSTRAP", "0").strip().lower() in {"1", "true", "yes", "on"}
+179
View File
@@ -294,6 +294,27 @@ def init_db() -> None:
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS news (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
body TEXT NOT NULL,
slug TEXT NOT NULL UNIQUE,
published INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
body TEXT NOT NULL,
slug TEXT NOT NULL UNIQUE,
image TEXT,
event_date TEXT NOT NULL,
register_url TEXT,
published INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
);
"""
)
try:
@@ -304,6 +325,10 @@ def init_db() -> None:
conn.execute("ALTER TABLE ib_products ADD COLUMN url TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE events ADD COLUMN register_url TEXT")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE admin_users ADD COLUMN access_scopes TEXT NOT NULL DEFAULT 'infra,ib'")
except sqlite3.OperationalError:
@@ -744,3 +769,157 @@ IB_VENDOR_LINKS = {
}
IB_MATRIX = build_matrix_from_lists(IB_VENDORS, IB_CATEGORIES, IB_VENDOR_LINKS)
# ── News functions ──
def get_latest_news(limit: int = 3) -> list[dict]:
conn = get_db()
rows = [dict(r) for r in conn.execute(
"SELECT id, title, slug, image, created_at FROM news WHERE published=1 ORDER BY created_at DESC, id DESC LIMIT ?",
(limit,)
)]
conn.close()
return rows
def get_news_list(limit: int = 10, offset: int = 0) -> list[dict]:
conn = get_db()
rows = [dict(r) for r in conn.execute(
"SELECT id, title, slug, image, body, created_at FROM news WHERE published=1 ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?",
(limit, offset)
)]
conn.close()
return rows
def get_news_count(published_only: bool = True) -> int:
conn = get_db()
q = "SELECT COUNT(*) FROM news WHERE published=1" if published_only else "SELECT COUNT(*) FROM news"
count = conn.execute(q).fetchone()[0]
conn.close()
return count
def get_all_news_admin_paged(limit: int = 10, offset: int = 0) -> list[dict]:
conn = get_db()
rows = [dict(r) for r in conn.execute(
"SELECT id, title, slug, image, body, published, created_at FROM news ORDER BY created_at DESC, id DESC LIMIT ? OFFSET ?",
(limit, offset)
)]
conn.close()
return rows
def get_news_by_slug(slug: str) -> dict | None:
conn = get_db()
row = conn.execute(
"SELECT id, title, body, slug, image, created_at FROM news WHERE slug=? AND published=1",
(slug,)
).fetchone()
conn.close()
return dict(row) if row else None
def get_all_news_admin() -> list[dict]:
conn = get_db()
rows = [dict(r) for r in conn.execute(
"SELECT id, title, slug, image, body, published, created_at FROM news ORDER BY created_at DESC, id DESC"
)]
conn.close()
return rows
def create_news(title: str, body: str, slug: str, image: str | None = None) -> None:
conn = get_db()
conn.execute(
"INSERT INTO news(title, body, slug, image) VALUES (?, ?, ?, ?)",
(title, body, slug, image)
)
conn.commit()
conn.close()
def update_news(news_id: int, title: str, body: str, published: int, image: str | None = None, created_at: str | None = None) -> None:
conn = get_db()
if image is not None and created_at is not None:
conn.execute("UPDATE news SET title=?, body=?, published=?, image=?, created_at=? WHERE id=?",
(title, body, published, image, created_at, news_id))
elif image is not None:
conn.execute("UPDATE news SET title=?, body=?, published=?, image=? WHERE id=?",
(title, body, published, image, news_id))
elif created_at is not None:
conn.execute("UPDATE news SET title=?, body=?, published=?, created_at=? WHERE id=?",
(title, body, published, created_at, news_id))
else:
conn.execute("UPDATE news SET title=?, body=?, published=? WHERE id=?",
(title, body, published, news_id))
conn.commit()
conn.close()
def get_news_by_slug_any(slug: str) -> dict | None:
conn = get_db()
row = conn.execute(
"SELECT id, title, slug FROM news WHERE slug=?", (slug,)
).fetchone()
conn.close()
return dict(row) if row else None
def delete_news(news_id: int) -> None:
conn = get_db()
conn.execute("DELETE FROM news WHERE id=?", (news_id,))
conn.commit()
conn.close()
# ── Events ────────────────────────────────────────────────────────────────────
def get_upcoming_events(limit: int = 3) -> list[dict]:
conn = get_db()
rows = [dict(r) for r in conn.execute(
"SELECT id, title, slug, image, event_date FROM events "
"WHERE published=1 AND event_date >= date('now','localtime') "
"ORDER BY event_date ASC LIMIT ?",
(limit,)
)]
conn.close()
return rows
def get_all_events() -> list[dict]:
conn = get_db()
rows = [dict(r) for r in conn.execute(
"SELECT id, title, slug, image, body, event_date FROM events "
"WHERE published=1 AND event_date >= date('now','localtime') "
"ORDER BY event_date ASC"
)]
conn.close()
return rows
def get_event_by_slug(slug: str) -> dict | None:
conn = get_db()
row = conn.execute(
"SELECT id, title, body, slug, image, event_date, register_url FROM events WHERE slug=? AND published=1",
(slug,)
).fetchone()
conn.close()
return dict(row) if row else None
def create_event(title: str, body: str, slug: str, event_date: str, image: str | None = None) -> None:
conn = get_db()
conn.execute(
"INSERT INTO events(title, body, slug, image, event_date) VALUES (?,?,?,?,?)",
(title, body, slug, image, event_date)
)
conn.commit()
conn.close()
def is_event_saved(mont_id: str) -> bool:
conn = get_db()
row = conn.execute("SELECT id FROM events WHERE slug LIKE ?", (f"%-{mont_id}",)).fetchone()
conn.close()
return row is not None
+310 -17
View File
@@ -16,19 +16,50 @@ from .config import (
ADMIN_PASSWORD,
ADMIN_PATH,
BASE_DIR,
NEWS_API_TOKEN,
SUPER_ADMIN_LOGIN,
SUPER_ADMIN_PASSWORD,
TELEGRAM_BOT_TOKEN,
TELEGRAM_CHAT_ID,
)
from .db import fetch_scope_data, get_db, scope_tables
from .db import (fetch_scope_data, get_db, scope_tables,
get_latest_news, get_news_list, get_news_count, get_news_by_slug, get_news_by_slug_any, get_all_news_admin, get_all_news_admin_paged,
create_news, update_news, delete_news,
get_upcoming_events, get_all_events, get_event_by_slug)
import re as _re_slug
_TRANSLIT_MAP = {
'а':'a','б':'b','в':'v','г':'g','д':'d','е':'e','ё':'yo','ж':'zh',
'з':'z','и':'i','й':'y','к':'k','л':'l','м':'m','н':'n','о':'o',
'п':'p','р':'r','с':'s','т':'t','у':'u','ф':'f','х':'kh','ц':'ts',
'ч':'ch','ш':'sh','щ':'shch','ъ':'','ы':'y','ь':'','э':'e','ю':'yu','я':'ya',
}
def _translit(s: str) -> str:
return ''.join(_TRANSLIT_MAP.get(c, c) for c in s.lower())
def _make_slug(title: str) -> str:
import datetime as _dt
t = _translit(title)
t = _re_slug.sub(r'[^a-z0-9]+', '-', t)[:50].strip('-')
ts = int(_dt.datetime.now().timestamp())
return f'{t}-{ts}' if t else f'news-{ts}'
bp = Blueprint("main", __name__)
MOSCOW_TZ = ZoneInfo("Europe/Moscow")
def _base_url() -> str:
return f"https://{request.host}"
def require_admin() -> bool:
return bool(session.get("is_admin"))
def is_news_editor() -> bool:
return session.get("admin_role") in ("news_editor", "super", "admin")
def is_super_admin() -> bool:
return session.get("admin_role") == "super"
@@ -554,12 +585,16 @@ def index():
"SELECT name FROM (SELECT name FROM categories UNION SELECT name FROM ib_categories) ORDER BY lower(name)"
)]
conn.close()
proto = request.headers.get("X-Forwarded-Proto", "https")
canonical_url = f"{proto}://{request.host}"
latest_news = get_latest_news(3)
canonical_url = _base_url()
upcoming_events = get_upcoming_events(3)
return render_template("index.html",
ssr_vendors=ssr_vendors,
ssr_categories=ssr_categories,
latest_news=latest_news,
upcoming_events=upcoming_events,
canonical_url=canonical_url,
is_news_editor=is_news_editor(),
)
@@ -572,24 +607,25 @@ def robots_txt():
f"Disallow: {ADMIN_PATH}\n"
"Disallow: /sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj\n"
"\n"
f"Sitemap: https://{{host}}/sitemap.xml\n"
).replace("{host}", request.host)
"Sitemap: https://maps.4mont.ru/sitemap.xml\n"
)
return Response(body, mimetype="text/plain")
INDEXNOW_KEY = "a3f8c2e1b7d94056a3f8c2e1b7d94056"
@bp.get("/sitemap.xml")
def sitemap_xml():
from flask import Response
proto = request.headers.get("X-Forwarded-Proto", "https")
base = f"{proto}://{request.host}"
base = "https://maps.4mont.ru"
conn = get_db()
slugs = [r[0] for r in conn.execute(
vendor_slugs = [r[0] for r in conn.execute(
"SELECT slug FROM vendors WHERE slug IS NOT NULL "
"UNION SELECT slug FROM ib_vendors WHERE slug IS NOT NULL"
)]
conn.close()
urls = [f' <url><loc>{base}/</loc><changefreq>weekly</changefreq><priority>1.0</priority></url>']
for slug in slugs:
urls = [f' <url><loc>{base}/</loc><changefreq>daily</changefreq><priority>1.0</priority></url>']
for slug in vendor_slugs:
urls.append(f' <url><loc>{base}/vendor/{slug}</loc><changefreq>monthly</changefreq><priority>0.7</priority></url>')
body = (
'<?xml version="1.0" encoding="UTF-8"?>\n'
@@ -599,6 +635,35 @@ def sitemap_xml():
return Response(body, mimetype="application/xml")
@bp.get(f"/{INDEXNOW_KEY}.txt")
def indexnow_key_file():
from flask import Response
return Response(INDEXNOW_KEY, mimetype="text/plain")
def ping_indexnow(page_url: str):
"""Notify Yandex about a new/updated URL via IndexNow."""
import threading, urllib.request, json as _json
def _ping():
try:
payload = _json.dumps({
"host": "maps.4mont.ru",
"key": INDEXNOW_KEY,
"urlList": [page_url]
}).encode()
req = urllib.request.Request(
"https://yandex.com/indexnow",
data=payload,
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as r:
pass
except Exception:
pass
threading.Thread(target=_ping, daemon=True).start()
@bp.get("/vendor/<slug>")
def vendor_page(slug: str):
from flask import abort
@@ -633,8 +698,7 @@ def vendor_page(slug: str):
)]
conn.close()
proto = request.headers.get("X-Forwarded-Proto", "https")
base_url = f"{proto}://{request.host}"
base_url = _base_url()
canonical_url = f"{base_url}/vendor/{slug}"
return render_template("vendor.html",
vendor=vendor,
@@ -739,7 +803,10 @@ def admin_login_or_panel():
conn.close()
return redirect_admin(scope)
username = (request.form.get("username") or "").strip()
access = parse_admin_scopes()
new_role = request.form.get("new_role", "admin")
if new_role not in ("admin", "news_editor"):
new_role = "admin"
access = "news" if new_role == "news_editor" else parse_admin_scopes()
if not username or not access:
flash("Укажите логин и минимум один раздел доступа.", "error")
conn.close()
@@ -749,9 +816,9 @@ def admin_login_or_panel():
conn.execute(
"""
INSERT INTO admin_users(username, password, role, access_scopes, created_at)
VALUES (?, ?, 'admin', ?, ?)
VALUES (?, ?, ?, ?, ?)
""",
(username, password, access, local_now()),
(username, password, new_role, access, local_now()),
)
conn.commit()
session["created_admin_credentials"] = {
@@ -759,11 +826,11 @@ def admin_login_or_panel():
"password": password,
"access": access,
}
flash(f"Админ создан. Логин: {username}. Пароль: {password}", "ok")
flash(f"Пользователь создан. Логин: {username}. Пароль: {password}", "ok")
except sqlite3.IntegrityError:
conn.rollback()
session.pop("created_admin_credentials", None)
flash("Админ с таким логином уже существует.", "error")
flash("Пользователь с таким логином уже существует.", "error")
conn.close()
return redirect_admin(scope)
@@ -894,6 +961,232 @@ def admin_login_or_panel():
)
@bp.get("/news")
def news_list():
base_url = _base_url()
per_page = 10
page = max(1, request.args.get("page", 1, type=int))
offset = (page - 1) * per_page
articles = get_news_list(limit=per_page, offset=offset)
total = get_news_count()
total_pages = max(1, (total + per_page - 1) // per_page)
return render_template("news_list.html", articles=articles, base_url=base_url,
page=page, total_pages=total_pages,
is_news_editor=is_news_editor())
@bp.post("/api/news/publish")
def api_news_publish():
token = request.headers.get("X-API-Token", "") or request.json.get("token", "") if request.is_json else ""
if not NEWS_API_TOKEN or token != NEWS_API_TOKEN:
return jsonify({"error": "Unauthorized"}), 401
data = request.get_json(silent=True) or {}
title = str(data.get("title", "")).strip()
body = str(data.get("body", "")).strip()
image_url = str(data.get("image_url", "")).strip()
if not title or not body:
return jsonify({"error": "title и body обязательны"}), 422
import os as _os
from urllib.request import urlretrieve as _urlretrieve
slug = _make_slug(title)
image_path = None
if image_url:
try:
ext = _os.path.splitext(image_url.split("?")[0])[1].lower()
if ext not in (".jpg", ".jpeg", ".png", ".webp", ".gif"):
ext = ".jpg"
fname = f"news_{secrets.token_hex(8)}{ext}"
save_dir = _os.path.join(BASE_DIR, "static", "news_images")
_os.makedirs(save_dir, exist_ok=True)
_urlretrieve(image_url, _os.path.join(save_dir, fname))
image_path = f"news_images/{fname}"
except Exception:
image_path = None
try:
create_news(title, body, slug, image_path)
except Exception:
slug = f"{slug}-{secrets.token_hex(3)}"
create_news(title, body, slug, image_path)
ping_indexnow(f"https://maps.4mont.ru/news/{slug}")
return jsonify({"ok": True, "slug": slug, "url": f"/news/{slug}"}), 201
@bp.delete("/api/news/<slug>")
def api_news_delete(slug: str):
token = request.headers.get("X-API-Token", "")
if not NEWS_API_TOKEN or token != NEWS_API_TOKEN:
return jsonify({"error": "Unauthorized"}), 401
article = get_news_by_slug_any(slug)
if not article:
return jsonify({"error": "Not found"}), 404
conn = get_db()
conn.execute("UPDATE news SET published=0 WHERE id=?", (article["id"],))
conn.commit()
conn.close()
return jsonify({"ok": True, "unpublished_slug": slug}), 200
@bp.get("/news/<slug>")
def news_article(slug: str):
from flask import abort
article = get_news_by_slug(slug)
if not article:
abort(404)
base_url = _base_url()
return render_template("news_article.html", article=article, base_url=base_url,
is_news_editor=is_news_editor())
NEWS_ADMIN_PATH = "/news-admin"
@bp.route(NEWS_ADMIN_PATH, methods=["GET", "POST"])
def news_admin():
action = request.form.get("action", "")
# Handle login before auth check
if request.method == "POST" and action == "news_login":
username = request.form.get("username", "").strip()
password = request.form.get("password", "").strip()
conn = get_db()
user = conn.execute(
"SELECT username, password, role FROM admin_users WHERE username=?", (username,)
).fetchone()
conn.close()
if user and password == user["password"] and user["role"] in ("news_editor", "super", "admin"):
session["is_admin"] = True
session["admin_role"] = user["role"]
session["admin_login"] = user["username"]
session["admin_scopes"] = "news"
return redirect(NEWS_ADMIN_PATH)
flash("Неверный логин или пароль", "error")
all_news = []
return render_template("news_admin.html", all_news=all_news,
is_news_editor=False, admin_login="")
if not require_admin():
return render_template("news_admin.html", all_news=[],
is_news_editor=False, admin_login="")
if request.method == "POST":
if action == "logout":
session.clear()
return redirect(NEWS_ADMIN_PATH)
if not is_news_editor():
flash("Нет прав для управления новостями.", "error")
return redirect(NEWS_ADMIN_PATH)
if action == "create_news":
title = request.form.get("title", "").strip()
body = request.form.get("body", "").strip()
if not title or not body:
flash("Заполните заголовок и текст новости.", "error")
else:
import os as _os
slug = _make_slug(title)
image_path = None
f = request.files.get("image")
if f and f.filename:
ext = _os.path.splitext(f.filename)[1].lower()
if ext in (".jpg", ".jpeg", ".png", ".webp", ".gif"):
fname = f"news_{secrets.token_hex(8)}{ext}"
save_dir = _os.path.join(BASE_DIR, "static", "news_images")
_os.makedirs(save_dir, exist_ok=True)
f.save(_os.path.join(save_dir, fname))
image_path = f"news_images/{fname}"
try:
create_news(title, body, slug, image_path)
except Exception:
slug = f"{slug}-{secrets.token_hex(3)}"
create_news(title, body, slug, image_path)
ping_indexnow(f"https://maps.4mont.ru/news/{slug}")
flash("Новость опубликована.", "ok")
elif action == "update_news":
news_id = int(request.form.get("news_id", 0))
title = request.form.get("title", "").strip()
body = request.form.get("body", "").strip()
published = 1 if request.form.get("published") else 0
if news_id and title and body:
import os as _os
image_path = None
f = request.files.get("image")
if f and f.filename:
ext = _os.path.splitext(f.filename)[1].lower()
if ext in (".jpg", ".jpeg", ".png", ".webp", ".gif"):
fname = f"news_{secrets.token_hex(8)}{ext}"
save_dir = _os.path.join(BASE_DIR, "static", "news_images")
_os.makedirs(save_dir, exist_ok=True)
f.save(_os.path.join(save_dir, fname))
image_path = f"news_images/{fname}"
raw_dt = request.form.get("created_at_edit", "").strip()
created_at_val = None
if raw_dt:
try:
from datetime import datetime as _dt
created_at_val = _dt.strptime(raw_dt, "%d.%m.%Y %H:%M").strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
pass
update_news(news_id, title, body, published, image_path, created_at_val)
flash("Новость обновлена.", "ok")
elif action == "delete_news":
news_id = int(request.form.get("news_id", 0))
if news_id:
delete_news(news_id)
flash("Новость удалена.", "ok")
elif action == "toggle_published":
news_id = int(request.form.get("news_id", 0))
published = 1 if request.form.get("published") else 0
if news_id:
conn = get_db()
conn.execute("UPDATE news SET published=? WHERE id=?", (published, news_id))
conn.commit()
conn.close()
return redirect(NEWS_ADMIN_PATH)
per_page = 10
adm_page = max(1, request.args.get("page", 1, type=int))
adm_offset = (adm_page - 1) * per_page
total_news = get_news_count(published_only=False) if is_news_editor() else 0
total_adm_pages = max(1, (total_news + per_page - 1) // per_page)
all_news = get_all_news_admin_paged(per_page, adm_offset) if is_news_editor() else []
return render_template("news_admin.html",
all_news=all_news,
adm_page=adm_page, total_adm_pages=total_adm_pages,
is_news_editor=is_news_editor(),
admin_login=session.get("admin_login", ""),
)
@bp.get("/events")
def events_list():
from flask import abort
events = get_all_events()
base_url = _base_url()
return render_template("events_list.html", events=events, base_url=base_url)
@bp.get("/events/<slug>")
def event_article(slug: str):
from flask import abort
event = get_event_by_slug(slug)
if not event:
abort(404)
base_url = _base_url()
return render_template("events_article.html", event=event, base_url=base_url)
@bp.get("/health")
def health():
return {"status": "ok"}