feat: vendor edit with name/website/mont_page in admin panel

This commit is contained in:
2026-05-12 16:13:55 +03:00
parent 26f17a8bb6
commit 5a8471e85c
3 changed files with 938 additions and 136 deletions
+35 -22
View File
@@ -4,12 +4,29 @@
const matrixTable = document.getElementById("matrixTable");
const topScroll = document.getElementById("matrixHScroll");
const topScrollInner = document.getElementById("matrixHScrollInner");
document.addEventListener("click", async (event) => {
const button = event.target.closest("[data-copy-target]");
if (!button) return;
const field = document.getElementById(button.dataset.copyTarget);
if (!field) return;
field.select();
field.setSelectionRange(0, field.value.length);
try {
await navigator.clipboard.writeText(field.value);
} catch (error) {
document.execCommand("copy");
}
const originalText = button.textContent;
button.textContent = "Скопировано";
setTimeout(() => {
button.textContent = originalText;
}, 1600);
});
if (!matrixForm || !matrixScroll || !matrixTable || !topScroll || !topScrollInner) return;
let isDirty = false;
let syncing = false;
let saveTimer = null;
let saveInFlight = false;
function markDirty() {
isDirty = true;
@@ -33,29 +50,9 @@
syncing = false;
}
async function autoSaveMatrix() {
if (saveInFlight) return;
saveInFlight = true;
try {
const formData = new FormData(matrixForm);
const response = await fetch(window.location.href, {
method: "POST",
body: formData,
credentials: "same-origin",
});
if (!response.ok) throw new Error("save failed");
isDirty = false;
} catch (error) {
} finally {
saveInFlight = false;
}
}
matrixForm.addEventListener("change", (event) => {
if (!(event.target && event.target.matches('input[type="checkbox"]'))) return;
markDirty();
if (saveTimer) clearTimeout(saveTimer);
saveTimer = setTimeout(autoSaveMatrix, 250);
});
matrixForm.addEventListener("submit", () => {
@@ -89,4 +86,20 @@
window.addEventListener("resize", updateTopScrollWidth);
updateTopScrollWidth();
syncScrollFromMatrix();
document.addEventListener("click", (event) => {
const button = event.target.closest("[data-edit-product]");
if (!button) return;
const form = document.querySelector(`[data-product-edit="${button.dataset.editProduct}"]`);
if (!form) return;
form.hidden = !form.hidden;
});
document.addEventListener("click", (event) => {
const button = event.target.closest("[data-edit-vendor]");
if (!button) return;
const form = document.querySelector(`[data-vendor-edit="${button.dataset.editVendor}"]`);
if (!form) return;
form.hidden = !form.hidden;
});
})();
+156 -10
View File
@@ -15,11 +15,16 @@
<div>
<strong>Админ-панель матрицы</strong>
<div class="scope-switch" style="margin-top:8px;">
{% if 'infra' in allowed_scopes %}
<a class="scope-chip {% if scope == 'infra' %}active{% endif %}" href="{{ request.path }}?scope=infra">Инфраструктура</a>
{% endif %}
{% if 'ib' in allowed_scopes %}
<a class="scope-chip {% if scope == 'ib' %}active{% endif %}" href="{{ request.path }}?scope=ib">ИБ</a>
{% endif %}
</div>
</div>
<div style="display:flex; gap:8px;">
<div class="top-actions">
<button class="pri" type="submit" form="matrixForm">Сохранить</button>
<a href="/" style="text-decoration:none;"><button class="warn" type="button">На сайт</button></a>
<form method="post" style="margin:0;">
<input type="hidden" name="scope" value="{{ scope }}" />
@@ -29,6 +34,102 @@
</div>
</section>
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
<section class="alerts">
{% for category, message in messages %}
<div class="alert {{ category }}">{{ message }}</div>
{% endfor %}
</section>
{% endif %}
{% endwith %}
{% if pending_changes %}
<section class="box pending-box">
<div class="pending-head">
<h3>{% if is_super_admin %}Ожидают утверждения{% else %}Мои изменения на проверке{% endif %}</h3>
{% if is_super_admin %}
<form method="post">
<input type="hidden" name="scope" value="{{ scope }}" />
<input type="hidden" name="action" value="approve_all_changes" />
<button class="pri" type="submit">Утвердить все</button>
</form>
{% endif %}
</div>
<div class="pending-list">
{% for change in pending_changes %}
<div class="pending-item">
<div>
<strong>#{{ change.id }} · {{ change.title }}</strong>
<span>{{ change.created_at }} · {{ change.scope_label }} · {{ change.created_by }}</span>
<div class="pending-desc">{{ change.description }}</div>
</div>
{% if is_super_admin %}
<div class="pending-actions">
<form method="post">
<input type="hidden" name="scope" value="{{ scope }}" />
<input type="hidden" name="action" value="approve_change" />
<input type="hidden" name="change_id" value="{{ change.id }}" />
<button class="pri" type="submit">Утвердить</button>
</form>
<form method="post">
<input type="hidden" name="scope" value="{{ scope }}" />
<input type="hidden" name="action" value="reject_change" />
<input type="hidden" name="change_id" value="{{ change.id }}" />
<button class="danger" type="submit">Отклонить</button>
</form>
</div>
{% endif %}
</div>
{% endfor %}
</div>
</section>
{% endif %}
{% if is_super_admin %}
<section class="box admin-users-box">
<h3>Администраторы</h3>
{% if created_admin_credentials %}
<div class="created-admin-card">
<strong>Новый админ создан</strong>
<div>Логин: <code>{{ created_admin_credentials.username }}</code></div>
<div>Пароль: <code>{{ created_admin_credentials.password }}</code></div>
<div>Доступ: <code>{{ created_admin_credentials.access }}</code></div>
<div class="created-admin-share">
<textarea id="createdAdminShare" readonly>Админка: {{ request.url_root.rstrip('/') }}{{ request.path }}
Логин: {{ created_admin_credentials.username }}
Пароль: {{ created_admin_credentials.password }}</textarea>
<button class="pri" type="button" data-copy-target="createdAdminShare">Скопировать</button>
</div>
</div>
{% endif %}
<form method="post" class="admin-create">
<input type="hidden" name="scope" value="{{ scope }}" />
<input type="hidden" name="action" value="create_admin" />
<input type="text" name="username" placeholder="Логин нового админа" required />
<label><input type="checkbox" name="allow_infra" checked /> Инфраструктура</label>
<label><input type="checkbox" name="allow_ib" checked /> ИБ</label>
<button class="pri" type="submit">Создать</button>
</form>
<div class="admin-users-list">
{% for user in admin_users %}
<form class="admin-user-item" method="post">
<input type="hidden" name="scope" value="{{ scope }}" />
<input type="hidden" name="action" value="delete_admin" />
<input type="hidden" name="admin_id" value="{{ user.id }}" />
<span>
<strong>{{ user.username }}</strong>
<small>{{ user.role }} · {{ user.access_scopes }}</small>
</span>
{% if user.role != 'super' %}
<button class="danger" type="submit">Удалить</button>
{% endif %}
</form>
{% endfor %}
</div>
</section>
{% endif %}
<section class="grid">
<div class="box">
<h3>Добавить вендора</h3>
@@ -55,7 +156,7 @@
<input type="hidden" name="action" value="add_product" />
<select name="vendor_id" required style="padding:9px 10px; border:1px solid #c6d8fb; border-radius:9px;">
{% for v in vendors %}
<option value="{{ v.id }}">{{ v.name }}</option>
<option value="{{ v.id }}" {% if v.pending %}disabled{% endif %}>{{ v.name }}{% if v.pending %} (на проверке){% endif %}</option>
{% endfor %}
</select>
<input type="text" name="name" placeholder="Название продукта" required />
@@ -67,16 +168,40 @@
<section class="lists">
<div class="box">
<h3>Удалить вендора</h3>
<h3>Вендоры</h3>
<div class="list-box">
{% for v in vendors %}
<form class="list-item" method="post">
<div class="vendor-item">
<div class="list-item">
<span>
{{ v.name }}{% if v.pending %} <small>(на проверке)</small>{% endif %}
{% if v.website %}<a href="{{ v.website }}" target="_blank" rel="noopener" style="margin-left:6px;font-size:11px;">сайт</a>{% endif %}
{% if v.mont_page %}<a href="{{ v.mont_page }}" target="_blank" rel="noopener" style="margin-left:4px;font-size:11px;">MONT</a>{% endif %}
</span>
{% if not v.pending %}
<div class="product-actions">
<button class="warn" type="button" data-edit-vendor="{{ v.id }}">Изменить</button>
<form method="post" style="margin:0">
<input type="hidden" name="scope" value="{{ scope }}" />
<span>{{ v.name }}</span>
<input type="hidden" name="action" value="delete_vendor" />
<input type="hidden" name="vendor_id" value="{{ v.id }}" />
<button class="danger" type="submit">Удалить</button>
</form>
</div>
{% endif %}
</div>
{% if not v.pending %}
<form class="product-edit" method="post" data-vendor-edit="{{ v.id }}" hidden>
<input type="hidden" name="scope" value="{{ scope }}" />
<input type="hidden" name="action" value="update_vendor" />
<input type="hidden" name="vendor_id" value="{{ v.id }}" />
<input type="text" name="name" value="{{ v.name }}" placeholder="Название вендора" required />
<input type="url" name="website" value="{{ v.website or '' }}" placeholder="Сайт вендора (https://...)" />
<input type="url" name="mont_page" value="{{ v.mont_page or '' }}" placeholder="Страница на MONT (https://...)" />
<button class="pri" type="submit">Сохранить</button>
</form>
{% endif %}
</div>
{% endfor %}
</div>
</div>
@@ -86,28 +211,48 @@
{% for c in categories %}
<form class="list-item" method="post">
<input type="hidden" name="scope" value="{{ scope }}" />
<span>{{ c.name }}</span>
<span>{{ c.name }}{% if c.pending %} <small>(на проверке)</small>{% endif %}</span>
<input type="hidden" name="action" value="delete_category" />
<input type="hidden" name="category_id" value="{{ c.id }}" />
<button class="danger" type="submit">Удалить</button>
{% if not c.pending %}<button class="danger" type="submit">Удалить</button>{% endif %}
</form>
{% endfor %}
</div>
</div>
<div class="box">
<h3>Удалить продукт</h3>
<h3>Продукты</h3>
<div class="list-box">
{% for p in products %}
<form class="list-item" method="post">
<input type="hidden" name="scope" value="{{ scope }}" />
<div class="product-item">
<div class="product-row">
<span>
{{ p.vendor_name }} :: {{ p.name }}
{% if p.pending %}<small>(на проверке)</small>{% endif %}
{% if p.url %}<a href="{{ p.url }}" target="_blank" rel="noopener noreferrer" style="margin-left:6px; font-size:11px;">ссылка</a>{% endif %}
</span>
{% if not p.pending %}
<div class="product-actions">
<button class="warn" type="button" data-edit-product="{{ p.id }}">Изменить</button>
<form method="post">
<input type="hidden" name="scope" value="{{ scope }}" />
<input type="hidden" name="action" value="delete_product" />
<input type="hidden" name="product_id" value="{{ p.id }}" />
<button class="danger" type="submit">Удалить</button>
</form>
</div>
{% endif %}
</div>
{% if not p.pending %}
<form class="product-edit" method="post" data-product-edit="{{ p.id }}" hidden>
<input type="hidden" name="scope" value="{{ scope }}" />
<input type="hidden" name="action" value="update_product" />
<input type="hidden" name="product_id" value="{{ p.id }}" />
<input type="text" name="name" value="{{ p.name }}" placeholder="Название продукта" required />
<input type="text" name="url" value="{{ p.url or '' }}" placeholder="URL продукта" />
<button class="pri" type="submit">Сохранить</button>
</form>
{% endif %}
</div>
{% endfor %}
</div>
</div>
@@ -136,6 +281,7 @@
type="checkbox"
name="pc_{{ p.id }}_{{ c.id }}"
{% if (p.id, c.id) in links %}checked{% endif %}
{% if p.pending or c.pending %}disabled{% endif %}
/>
</td>
{% endfor %}
+733 -90
View File
@@ -1,16 +1,51 @@
from __future__ import annotations
from flask import Blueprint, jsonify, redirect, render_template, request, send_from_directory, session
import json
import secrets
import sqlite3
from datetime import datetime
from html import escape
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from zoneinfo import ZoneInfo
from .config import ADMIN_LOGIN, ADMIN_PASSWORD, ADMIN_PATH, BASE_DIR
from flask import Blueprint, flash, jsonify, redirect, render_template, request, send_from_directory, session
from .config import (
ADMIN_LOGIN,
ADMIN_PASSWORD,
ADMIN_PATH,
BASE_DIR,
SUPER_ADMIN_LOGIN,
SUPER_ADMIN_PASSWORD,
TELEGRAM_BOT_TOKEN,
TELEGRAM_CHAT_ID,
)
from .db import fetch_scope_data, get_db, scope_tables
bp = Blueprint("main", __name__)
MOSCOW_TZ = ZoneInfo("Europe/Moscow")
def require_admin() -> bool:
return bool(session.get("is_admin"))
def is_super_admin() -> bool:
return session.get("admin_role") == "super"
def allowed_scopes() -> set[str]:
if is_super_admin():
return {"infra", "ib"}
raw = session.get("admin_scopes") or "infra,ib"
scopes = {item.strip() for item in raw.split(",") if item.strip()}
return scopes or {"infra"}
def can_access_scope(scope: str) -> bool:
return scope in allowed_scopes()
def parse_int(form_value: str | None) -> int | None:
if not form_value:
return None
@@ -20,100 +55,32 @@ def parse_int(form_value: str | None) -> int | None:
return None
@bp.get("/")
def index():
return render_template("index.html")
def redirect_admin(scope: str):
return redirect(f"{ADMIN_PATH}?scope={scope}")
@bp.get("/api/data")
def api_data():
scope = (request.args.get("scope") or "infra").strip().lower()
if scope in {"ib", "sec", "security"}:
scope = "ib"
else:
scope = "infra"
return jsonify(fetch_scope_data(scope))
def make_admin_password() -> str:
return secrets.token_urlsafe(9)
@bp.route(ADMIN_PATH, methods=["GET", "POST"])
def admin_login_or_panel():
conn = get_db()
raw_scope = (request.args.get("scope") or request.form.get("scope") or "infra").strip().lower()
scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra"
tables = scope_tables(scope)
def parse_admin_scopes() -> str:
scopes = []
if request.form.get("allow_infra"):
scopes.append("infra")
if request.form.get("allow_ib"):
scopes.append("ib")
return ",".join(scopes)
if request.method == "POST" and not require_admin() and request.form.get("action") == "login":
if request.form.get("username") == ADMIN_LOGIN and request.form.get("password") == ADMIN_PASSWORD:
session["is_admin"] = True
conn.close()
return redirect(ADMIN_PATH)
conn.close()
return render_template("login.html", error="Неверный логин или пароль")
if not require_admin():
conn.close()
return render_template("login.html", error=None)
def local_now() -> str:
return datetime.now(MOSCOW_TZ).strftime("%Y-%m-%d %H:%M:%S")
if request.method == "POST":
action = request.form.get("action", "")
if action == "logout":
session.pop("is_admin", None)
conn.close()
return redirect(ADMIN_PATH)
if action == "add_vendor":
name = (request.form.get("name") or "").strip()
if name:
conn.execute(f"INSERT OR IGNORE INTO {tables['vendors']}(name) VALUES (?)", (name,))
def scope_label(scope: str) -> str:
return "ИБ" if scope == "ib" else "Инфраструктура"
elif action == "add_category":
name = (request.form.get("name") or "").strip()
if name:
conn.execute(f"INSERT OR IGNORE INTO {tables['categories']}(name) VALUES (?)", (name,))
elif action == "add_product":
vendor_id = parse_int(request.form.get("vendor_id"))
name = (request.form.get("name") or "").strip()
url = (request.form.get("url") or "").strip()
if vendor_id and name:
conn.execute(
f"INSERT OR IGNORE INTO {tables['products']}(vendor_id, name, url) VALUES (?, ?, ?)",
(vendor_id, name, url or None),
)
if url:
conn.execute(
f"UPDATE {tables['products']} SET url = ? WHERE vendor_id = ? AND name = ?",
(url, vendor_id, name),
)
elif action == "delete_vendor":
v_id = parse_int(request.form.get("vendor_id"))
if v_id:
conn.execute(f"DELETE FROM {tables['vendors']} WHERE id = ?", (v_id,))
elif action == "delete_category":
c_id = parse_int(request.form.get("category_id"))
if c_id:
conn.execute(f"DELETE FROM {tables['categories']} WHERE id = ?", (c_id,))
elif action == "delete_product":
p_id = parse_int(request.form.get("product_id"))
if p_id:
conn.execute(f"DELETE FROM {tables['products']} WHERE id = ?", (p_id,))
elif action == "save_matrix":
products = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['products']}")]
categories = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['categories']}")]
new_pairs: list[tuple[int, int]] = []
for p_id in products:
for c_id in categories:
if request.form.get(f"pc_{p_id}_{c_id}"):
new_pairs.append((p_id, c_id))
conn.execute(f"DELETE FROM {tables['product_categories']}")
conn.executemany(
f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)",
new_pairs,
)
def rebuild_vendor_categories(conn, tables: dict[str, str]) -> None:
conn.execute(f"DELETE FROM {tables['vendor_categories']}")
conn.execute(
f"""
@@ -124,11 +91,371 @@ def admin_login_or_panel():
"""
)
conn.commit()
conn.close()
return redirect(f"{ADMIN_PATH}?scope={scope}")
vendors = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['vendors']} ORDER BY lower(name)")]
def collect_change_payload(action: str, scope: str, conn, tables: dict[str, str]) -> dict | None:
if action == "add_vendor":
name = (request.form.get("name") or "").strip()
return {"name": name} if name else None
if action == "add_category":
name = (request.form.get("name") or "").strip()
return {"name": name} if name else None
if action == "add_product":
vendor_id = parse_int(request.form.get("vendor_id"))
name = (request.form.get("name") or "").strip()
url = (request.form.get("url") or "").strip()
if not vendor_id or not name:
return None
vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
return {
"vendor_id": vendor_id,
"vendor_name": vendor["name"] if vendor else "",
"name": name,
"url": url,
}
if action == "update_product":
product_id = parse_int(request.form.get("product_id"))
name = (request.form.get("name") or "").strip()
url = (request.form.get("url") or "").strip()
if not product_id or not name:
return None
product = conn.execute(
f"""
SELECT p.name, p.url, v.name AS vendor_name
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
WHERE p.id = ?
""",
(product_id,),
).fetchone()
return {
"product_id": product_id,
"vendor_name": product["vendor_name"] if product else "",
"old_name": product["name"] if product else "",
"old_url": product["url"] if product else "",
"name": name,
"url": url,
}
if action == "update_vendor":
vendor_id = parse_int(request.form.get("vendor_id"))
name = (request.form.get("name") or "").strip()
website = (request.form.get("website") or "").strip()
mont_page = (request.form.get("mont_page") or "").strip()
if not vendor_id or not name:
return None
vendor = conn.execute(f"SELECT name, website, mont_page FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
return {
"vendor_id": vendor_id,
"old_name": vendor["name"] if vendor else "",
"name": name,
"website": website,
"mont_page": mont_page,
}
if action == "delete_vendor":
vendor_id = parse_int(request.form.get("vendor_id"))
if not vendor_id:
return None
vendor = conn.execute(f"SELECT name FROM {tables['vendors']} WHERE id = ?", (vendor_id,)).fetchone()
return {"vendor_id": vendor_id, "vendor_name": vendor["name"] if vendor else ""}
if action == "delete_category":
category_id = parse_int(request.form.get("category_id"))
if not category_id:
return None
category = conn.execute(f"SELECT name FROM {tables['categories']} WHERE id = ?", (category_id,)).fetchone()
return {"category_id": category_id, "category_name": category["name"] if category else ""}
if action == "delete_product":
product_id = parse_int(request.form.get("product_id"))
if not product_id:
return None
product = conn.execute(
f"""
SELECT p.name, v.name AS vendor_name
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
WHERE p.id = ?
""",
(product_id,),
).fetchone()
return {
"product_id": product_id,
"product_name": product["name"] if product else "",
"vendor_name": product["vendor_name"] if product else "",
}
if action == "save_matrix":
products = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['products']}")]
categories = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['categories']}")]
pairs: list[list[int]] = []
for product_id in products:
for category_id in categories:
if request.form.get(f"pc_{product_id}_{category_id}"):
pairs.append([product_id, category_id])
current_pairs = {
(r["product_id"], r["category_id"])
for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}")
}
new_pairs = {tuple(pair) for pair in pairs}
changed_pairs = sorted((new_pairs - current_pairs) | (current_pairs - new_pairs))
product_labels = {
r["id"]: f"{r['vendor_name']} :: {r['name']}"
for r in conn.execute(
f"""
SELECT p.id, p.name, v.name AS vendor_name
FROM {tables['products']} p
JOIN {tables['vendors']} v ON v.id = p.vendor_id
"""
)
}
category_labels = {
r["id"]: r["name"]
for r in conn.execute(f"SELECT id, name FROM {tables['categories']}")
}
added = []
removed = []
for product_id, category_id in changed_pairs:
item = {
"product_id": product_id,
"category_id": category_id,
"product": product_labels.get(product_id, str(product_id)),
"category": category_labels.get(category_id, str(category_id)),
}
if (product_id, category_id) in new_pairs:
added.append(item)
else:
removed.append(item)
return {"pairs": pairs, "added": added, "removed": removed}
return None
def apply_change(conn, tables: dict[str, str], action: str, payload: dict) -> None:
if action == "add_vendor":
conn.execute(f"INSERT OR IGNORE INTO {tables['vendors']}(name) VALUES (?)", (payload["name"],))
elif action == "add_category":
conn.execute(f"INSERT OR IGNORE INTO {tables['categories']}(name) VALUES (?)", (payload["name"],))
elif action == "add_product":
conn.execute(
f"INSERT OR IGNORE INTO {tables['products']}(vendor_id, name, url) VALUES (?, ?, ?)",
(payload["vendor_id"], payload["name"], payload.get("url") or None),
)
if payload.get("url"):
conn.execute(
f"UPDATE {tables['products']} SET url = ? WHERE vendor_id = ? AND name = ?",
(payload["url"], payload["vendor_id"], payload["name"]),
)
elif action == "update_product":
conn.execute(
f"UPDATE {tables['products']} SET name = ?, url = ? WHERE id = ?",
(payload["name"], payload.get("url") or None, payload["product_id"]),
)
elif action == "update_vendor":
conn.execute(
f"UPDATE {tables['vendors']} SET name = ?, website = ?, mont_page = ? WHERE id = ?",
(payload["name"], payload.get("website") or None, payload.get("mont_page") or None, payload["vendor_id"]),
)
elif action == "delete_vendor":
conn.execute(f"DELETE FROM {tables['vendors']} WHERE id = ?", (payload["vendor_id"],))
elif action == "delete_category":
conn.execute(f"DELETE FROM {tables['categories']} WHERE id = ?", (payload["category_id"],))
elif action == "delete_product":
conn.execute(f"DELETE FROM {tables['products']} WHERE id = ?", (payload["product_id"],))
elif action == "save_matrix":
conn.execute(f"DELETE FROM {tables['product_categories']}")
conn.executemany(
f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)",
[tuple(pair) for pair in payload.get("pairs", [])],
)
rebuild_vendor_categories(conn, tables)
def queue_change(conn, scope: str, action: str, payload: dict) -> int:
cur = conn.execute(
"""
INSERT INTO pending_changes(scope, action, payload, created_by, created_at)
VALUES (?, ?, ?, ?, ?)
""",
(
scope,
action,
json.dumps(payload, ensure_ascii=False),
session.get("admin_login") or ADMIN_LOGIN,
local_now(),
),
)
return int(cur.lastrowid)
def describe_change(action: str, payload: dict) -> str:
if action == "add_vendor":
return f"Добавить вендора: {payload.get('name')}"
if action == "add_category":
return f"Добавить категорию: {payload.get('name')}"
if action == "add_product":
parts = [
"Добавить продукт",
f"Вендор: {payload.get('vendor_name') or payload.get('vendor_id')}",
f"Название: {payload.get('name')}",
]
if payload.get("url"):
parts.append(f"URL: {payload.get('url')}")
return "\n".join(parts)
if action == "update_product":
return "\n".join(
[
"Изменить продукт",
f"Вендор: {payload.get('vendor_name') or '-'}",
f"Было: {payload.get('old_name') or '-'}",
f"Стало: {payload.get('name')}",
f"URL был: {payload.get('old_url') or '-'}",
f"URL станет: {payload.get('url') or '-'}",
]
)
if action == "update_vendor":
parts = [f"Изменить вендора: {payload.get('old_name') or payload.get('vendor_id')}{payload.get('name')}"]
if payload.get("website"):
parts.append(f"Сайт: {payload['website']}")
if payload.get("mont_page"):
parts.append(f"MONT: {payload['mont_page']}")
return "\n".join(parts)
if action == "delete_vendor":
return f"Удалить вендора: {payload.get('vendor_name') or payload.get('vendor_id')}"
if action == "delete_category":
return f"Удалить категорию: {payload.get('category_name') or payload.get('category_id')}"
if action == "delete_product":
product = payload.get("product_name") or payload.get("product_id")
vendor = payload.get("vendor_name") or "-"
return f"Удалить продукт: {vendor} :: {product}"
if action == "save_matrix":
lines = [
f"Изменить матрицу: связей выбрано {len(payload.get('pairs', []))}",
f"Добавлено связей: {len(payload.get('added', []))}",
f"Снято связей: {len(payload.get('removed', []))}",
]
added = payload.get("added", [])
removed = payload.get("removed", [])
if added:
lines.append("")
lines.append("Добавить:")
lines.extend([f"+ {item.get('product')} -> {item.get('category')}" for item in added[:80]])
if len(added) > 80:
lines.append(f"... еще {len(added) - 80}")
if removed:
lines.append("")
lines.append("Снять:")
lines.extend([f"- {item.get('product')} -> {item.get('category')}" for item in removed[:80]])
if len(removed) > 80:
lines.append(f"... еще {len(removed) - 80}")
return "\n".join(lines)
return json.dumps(payload, ensure_ascii=False, indent=2)
def change_title(action: str) -> str:
return {
"add_vendor": "Добавление вендора",
"add_category": "Добавление категории",
"add_product": "Добавление продукта",
"update_vendor": "Изменение вендора",
"update_product": "Изменение продукта",
"delete_vendor": "Удаление вендора",
"delete_category": "Удаление категории",
"delete_product": "Удаление продукта",
"save_matrix": "Изменение матрицы",
}.get(action, action)
def notify_pending_change(change_id: int, scope: str, action: str, payload: dict) -> bool:
if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
return False
admin_url = request.url_root.rstrip("/") + f"{ADMIN_PATH}?scope={scope}"
raw_details = describe_change(action, payload)
if len(raw_details) > 3000:
raw_details = raw_details[:3000] + "\n... сокращено, полный список в админке"
details = escape(raw_details)
text = "\n".join(
[
"<b>Zkart: нужно подтверждение</b>",
"",
f"<b>Заявка:</b> #{change_id}",
f"<b>Тип:</b> {escape(change_title(action))}",
f"<b>Раздел:</b> {escape(scope_label(scope))}",
f"<b>Админ:</b> {escape(session.get('admin_login') or ADMIN_LOGIN)}",
"",
f"<b>Изменение:</b>\n<pre>{details}</pre>",
"",
f"<a href=\"{escape(admin_url)}\">Открыть админку</a>",
]
)
data = urlencode(
{
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "HTML",
"disable_web_page_preview": "1",
}
).encode("utf-8")
req = Request(
f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urlopen(req, timeout=5) as response:
return 200 <= response.status < 300
except Exception:
return False
def approve_pending_change(conn, row, reviewed_by: str) -> bool:
savepoint = f"approve_change_{int(row['id'])}"
conn.execute(f"SAVEPOINT {savepoint}")
try:
change_tables = scope_tables(row["scope"])
apply_change(conn, change_tables, row["action"], json.loads(row["payload"]))
conn.execute(
"""
UPDATE pending_changes
SET status = 'approved', reviewed_by = ?, reviewed_at = ?
WHERE id = ?
""",
(reviewed_by, local_now(), row["id"]),
)
conn.execute(f"RELEASE SAVEPOINT {savepoint}")
return True
except (KeyError, TypeError, ValueError, sqlite3.DatabaseError):
conn.execute(f"ROLLBACK TO SAVEPOINT {savepoint}")
conn.execute(f"RELEASE SAVEPOINT {savepoint}")
return False
def prepare_pending_change(change: dict) -> dict:
change["payload"] = json.loads(change["payload"])
change["title"] = change_title(change["action"])
change["scope_label"] = scope_label(change["scope"])
change["description"] = describe_change(change["action"], change["payload"])
return change
def fetch_admin_matrix(conn, tables: dict[str, str]) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]:
vendors = [dict(r) for r in conn.execute(
f"SELECT id, name, COALESCE(website,'') as website, COALESCE(mont_page,'') as mont_page "
f"FROM {tables['vendors']} ORDER BY lower(name)"
)]
categories = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['categories']} ORDER BY lower(name)")]
products = [
dict(r)
@@ -146,6 +473,317 @@ def admin_login_or_panel():
(r["product_id"], r["category_id"])
for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}")
}
return vendors, categories, products, links
def apply_pending_overlay(
vendors: list[dict],
categories: list[dict],
products: list[dict],
links: set[tuple[int, int]],
pending_changes: list[dict],
) -> tuple[list[dict], list[dict], list[dict], set[tuple[int, int]]]:
vendors = [dict(item) for item in vendors]
categories = [dict(item) for item in categories]
products = [dict(item) for item in products]
links = set(links)
for change in sorted(pending_changes, key=lambda item: item["id"]):
if change["scope"] not in {"infra", "ib"}:
continue
action = change["action"]
payload = change["payload"]
if action == "add_vendor":
vendors.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True})
elif action == "add_category":
categories.append({"id": -change["id"], "name": payload.get("name", ""), "pending": True})
elif action == "add_product":
products.append(
{
"id": -change["id"],
"name": payload.get("name", ""),
"vendor_id": payload.get("vendor_id"),
"vendor_name": payload.get("vendor_name", ""),
"url": payload.get("url") or None,
"pending": True,
}
)
elif action == "update_product":
for product in products:
if product["id"] == payload.get("product_id"):
product["name"] = payload.get("name", product["name"])
product["url"] = payload.get("url") or None
product["pending"] = True
break
elif action == "update_vendor":
vendor_id = payload.get("vendor_id")
for vendor in vendors:
if vendor["id"] == vendor_id:
vendor["name"] = payload.get("name", vendor["name"])
vendor["website"] = payload.get("website") or ""
vendor["mont_page"] = payload.get("mont_page") or ""
vendor["pending"] = True
break
elif action == "delete_vendor":
vendor_id = payload.get("vendor_id")
vendors = [item for item in vendors if item["id"] != vendor_id]
products = [item for item in products if item.get("vendor_id") != vendor_id]
elif action == "delete_category":
category_id = payload.get("category_id")
categories = [item for item in categories if item["id"] != category_id]
links = {pair for pair in links if pair[1] != category_id}
elif action == "delete_product":
product_id = payload.get("product_id")
products = [item for item in products if item["id"] != product_id]
links = {pair for pair in links if pair[0] != product_id}
elif action == "save_matrix":
links = {tuple(pair) for pair in payload.get("pairs", [])}
vendors.sort(key=lambda item: item["name"].lower())
categories.sort(key=lambda item: item["name"].lower())
products.sort(key=lambda item: (item.get("vendor_name", "").lower(), item.get("name", "").lower()))
return vendors, categories, products, links
@bp.get("/")
def index():
return render_template("index.html")
@bp.get("/api/data")
def api_data():
scope = (request.args.get("scope") or "infra").strip().lower()
if scope in {"ib", "sec", "security"}:
scope = "ib"
else:
scope = "infra"
return jsonify(fetch_scope_data(scope))
@bp.get("/sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj")
def legacy_admin_redirect():
raw_scope = (request.args.get("scope") or "infra").strip().lower()
scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra"
return redirect_admin(scope)
@bp.route(ADMIN_PATH, methods=["GET", "POST"])
def admin_login_or_panel():
conn = get_db()
raw_scope = (request.args.get("scope") or request.form.get("scope") or "infra").strip().lower()
scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra"
tables = scope_tables(scope)
if request.method == "POST" and not require_admin() and request.form.get("action") == "login":
username = request.form.get("username") or ""
password = request.form.get("password") or ""
admin_user = conn.execute(
"SELECT username, password, role, access_scopes FROM admin_users WHERE username = ?",
(username,),
).fetchone()
if admin_user and password == admin_user["password"]:
session["is_admin"] = True
session["admin_role"] = admin_user["role"]
session["admin_login"] = admin_user["username"]
session["admin_scopes"] = admin_user["access_scopes"]
conn.close()
return redirect(ADMIN_PATH)
conn.close()
return render_template("login.html", error="Неверный логин или пароль")
if not require_admin():
conn.close()
return render_template("login.html", error=None)
if not can_access_scope(scope):
conn.close()
return redirect_admin(next(iter(allowed_scopes())))
if request.method == "POST":
action = request.form.get("action", "")
if action == "logout":
session.pop("is_admin", None)
session.pop("admin_role", None)
session.pop("admin_login", None)
session.pop("admin_scopes", None)
conn.close()
return redirect(ADMIN_PATH)
if not can_access_scope(scope):
flash("Нет доступа к этому разделу.", "error")
conn.close()
return redirect_admin(next(iter(allowed_scopes())))
if action == "approve_all_changes":
if not is_super_admin():
flash("Недостаточно прав для подтверждения изменений.", "error")
conn.close()
return redirect_admin(scope)
rows = conn.execute(
"SELECT * FROM pending_changes WHERE status = 'pending' ORDER BY id"
).fetchall()
reviewed_by = session.get("admin_login") or SUPER_ADMIN_LOGIN
approved = 0
failed = 0
for row in rows:
if approve_pending_change(conn, row, reviewed_by):
approved += 1
else:
failed += 1
conn.commit()
conn.close()
if failed:
flash(f"Утверждено заявок: {approved}. Не удалось применить: {failed}.", "error")
else:
flash(f"Все заявки утверждены: {approved}.", "ok")
return redirect_admin(scope)
if action == "create_admin":
if not is_super_admin():
flash("Недостаточно прав для управления админами.", "error")
conn.close()
return redirect_admin(scope)
username = (request.form.get("username") or "").strip()
access = parse_admin_scopes()
if not username or not access:
flash("Укажите логин и минимум один раздел доступа.", "error")
conn.close()
return redirect_admin(scope)
password = make_admin_password()
try:
conn.execute(
"""
INSERT INTO admin_users(username, password, role, access_scopes, created_at)
VALUES (?, ?, 'admin', ?, ?)
""",
(username, password, access, local_now()),
)
conn.commit()
session["created_admin_credentials"] = {
"username": username,
"password": password,
"access": access,
}
flash(f"Админ создан. Логин: {username}. Пароль: {password}", "ok")
except sqlite3.IntegrityError:
conn.rollback()
session.pop("created_admin_credentials", None)
flash("Админ с таким логином уже существует.", "error")
conn.close()
return redirect_admin(scope)
if action == "delete_admin":
if not is_super_admin():
flash("Недостаточно прав для управления админами.", "error")
conn.close()
return redirect_admin(scope)
admin_id = parse_int(request.form.get("admin_id"))
if admin_id:
target = conn.execute("SELECT role, username FROM admin_users WHERE id = ?", (admin_id,)).fetchone()
if target and target["role"] == "super":
flash("Супер-админа удалить нельзя.", "error")
elif target and target["username"] == session.get("admin_login"):
flash("Нельзя удалить текущего пользователя.", "error")
else:
conn.execute("DELETE FROM admin_users WHERE id = ? AND role <> 'super'", (admin_id,))
conn.commit()
flash("Админ удален.", "ok")
conn.close()
return redirect_admin(scope)
if action in {"approve_change", "reject_change"}:
if not is_super_admin():
flash("Недостаточно прав для подтверждения изменений.", "error")
conn.close()
return redirect_admin(scope)
change_id = parse_int(request.form.get("change_id"))
row = None
if change_id:
row = conn.execute(
"SELECT * FROM pending_changes WHERE id = ? AND status = 'pending'",
(change_id,),
).fetchone()
if not row:
flash("Заявка не найдена или уже обработана.", "error")
conn.close()
return redirect_admin(scope)
if action == "reject_change":
conn.execute(
"""
UPDATE pending_changes
SET status = 'rejected', reviewed_by = ?, reviewed_at = ?
WHERE id = ?
""",
(session.get("admin_login") or SUPER_ADMIN_LOGIN, local_now(), change_id),
)
flash("Заявка отклонена.", "ok")
else:
if approve_pending_change(conn, row, session.get("admin_login") or SUPER_ADMIN_LOGIN):
flash("Заявка утверждена и применена.", "ok")
else:
flash("Не удалось применить заявку. Проверьте, не изменились ли связанные записи.", "error")
conn.close()
return redirect_admin(scope)
conn.commit()
conn.close()
return redirect_admin(scope)
payload = collect_change_payload(action, scope, conn, tables)
if payload is not None:
if is_super_admin():
try:
apply_change(conn, tables, action, payload)
flash("Изменения сохранены.", "ok")
except sqlite3.IntegrityError:
conn.rollback()
flash("Не удалось сохранить: возможно, такое название уже есть у выбранного вендора.", "error")
conn.close()
return redirect_admin(scope)
else:
change_id = queue_change(conn, scope, action, payload)
notify_pending_change(change_id, scope, action, payload)
flash("Изменения отправлены супер админу. Они вступят в силу после утверждения.", "ok")
conn.commit()
conn.close()
return redirect_admin(scope)
vendors, categories, products, links = fetch_admin_matrix(conn, tables)
if is_super_admin():
pending_rows = conn.execute(
"""
SELECT id, scope, action, payload, created_by, created_at
FROM pending_changes
WHERE status = 'pending'
ORDER BY id DESC
"""
)
else:
pending_rows = conn.execute(
"""
SELECT id, scope, action, payload, created_by, created_at
FROM pending_changes
WHERE status = 'pending' AND created_by = ?
ORDER BY id DESC
""",
(session.get("admin_login") or ADMIN_LOGIN,),
)
pending_changes = [prepare_pending_change(dict(r)) for r in pending_rows]
if not is_super_admin():
scoped_pending = [change for change in pending_changes if change["scope"] == scope]
vendors, categories, products, links = apply_pending_overlay(vendors, categories, products, links, scoped_pending)
admin_users = [
dict(r)
for r in conn.execute(
"""
SELECT id, username, role, access_scopes, created_at
FROM admin_users
ORDER BY role DESC, lower(username)
"""
)
]
created_admin_credentials = session.pop("created_admin_credentials", None)
conn.close()
return render_template(
"admin.html",
@@ -154,6 +792,11 @@ def admin_login_or_panel():
products=products,
links=links,
scope=scope,
is_super_admin=is_super_admin(),
allowed_scopes=allowed_scopes(),
pending_changes=pending_changes,
admin_users=admin_users,
created_admin_credentials=created_admin_credentials,
)