from __future__ import annotations import os import json import sqlite3 from pathlib import Path from typing import Iterable from flask import Flask, jsonify, redirect, render_template_string, request, send_from_directory, session try: from openpyxl import load_workbook except ImportError: load_workbook = None BASE_DIR = Path(__file__).resolve().parent DB_PATH = BASE_DIR / "matrix.db" XLSX_PATH = BASE_DIR / "Z-card_РФ.xlsx" INFRA_JSON_FILES = [BASE_DIR / "infra1", BASE_DIR / "infra2", BASE_DIR / "infra3", BASE_DIR / "infra4"] ADMIN_PATH = "/sdjlkfhsjkadahjksdhjgfkhsssssdjkjfljsdfjklsdajfkldsjflksdjfkldsj" ADMIN_LOGIN = "batman" ADMIN_PASSWORD = "batmannotmont" app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY", "change-me-please") ENABLE_BOOTSTRAP = os.getenv("ENABLE_BOOTSTRAP", "0").strip().lower() in {"1", "true", "yes", "on"} def get_db() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") return conn def seed_data(conn: sqlite3.Connection) -> None: categories = [ "Augmented Reality", "IoT", "Robotic Process Automation (RPA)", "Автоматизация бизнес-процессов", "Видеосвязь и веб-конференции", "Визуализация и анализ данных", "Виртуализация", "Виртуализация рабочих мест, VDI", "Виртуализация, гиперконвергенция", "Виртуализация, классическая виртуализация", "Графические редакторы (замена Visio)", "Компьютерная техника", "Контейнерные платформы", "Корпоративные почтовые серверы", "Корпоративные коммуникации", "Облачные платформы", "Облачные сервисы и сопроводительные решения", "Онлайн-переводчик", "Операционные системы", "СУБД", "Оцифровка бумажных документов", "Платформы для онлайн-обучения", "ПО в сфере ИИ", "Программные маркетплейсы", "Программы для смартфонов", "Работа с PDF", "Работа с мультимедиа (видео, фото, графика)", "Разработка в ИИ", "Резервное копирование и управление данными", "Речевые технологии, компьютерное зрение", "САПР", "Серверное и WiFi оборудование", "Системы хранения данных", "Системы ЭДО", "Средства разработки ПО", "Техническая поддержка и консалтинг", "Удаленное управление устройствами", "Файлы и диски", ] vendors = [ "Adobe", "AliveColors", "Amazon Web Services (AWS)", "ANWORK", "CommuniGate Pro (СБК)", "Content AI (ex-ABBYY)", "DocTrix", "EvaTeam", "eXpress", "FanRuan", "GStarCAD", "Handy Backup", "InfoWatch", "iSpring", "Just AI", "Kairos Digital", "LITEBIM", "LiteManager", "livedigital", "Master PDF (Code Industry)", "MIND Software", "Monq", "NextBox", "Paragon Software Group", "SL Soft", "Positive Technologies", "Postgres Pro", "Pragmatic Tools", "Pro32", "PROMT", "Quasar", "Radmin", "RDW Computers", "Renga Software", "SETERE Group", "Sharx DC", "SpaceVM", "TestIT", "Uncom OS", "Utinet", "Valo Cloud", "Vinteo", "VK Tech", "АЛМИ Партнер", "АСКОН", "Базальт", "БФТ", "ГазИнформСервис", "Гравитон", "ГрафТех", "Группа Астра", "ИТ Роут", "Киберпротект", "Контур", "Кредо-Диалог", "Лаборатория Касперского", "Лаборатория Числитель", "Мовавика", "МойОфис", "МТС Линк", "Нанософт разработка", "НЛПК", "Облакотека", "Р7", "РЕД СОФТ", "РОСА", "Росплатформа", "Сакура ПРО", "Салют для бизнеса (SberDevices)", "Труконф", "Флант (Deckhouse)", "ЦРТ", "ЦИТИП", "Яндекс 360 для бизнеса", ] vendor_links = { "Adobe": ["Работа с PDF", "Оцифровка бумажных документов", "Работа с мультимедиа (видео, фото, графика)"], "Amazon Web Services (AWS)": ["Облачные платформы", "Облачные сервисы и сопроводительные решения", "ПО в сфере ИИ"], "CommuniGate Pro (СБК)": ["Корпоративные почтовые серверы", "Корпоративные коммуникации"], "Content AI (ex-ABBYY)": ["Оцифровка бумажных документов", "Онлайн-переводчик", "Работа с PDF"], "eXpress": ["Корпоративные коммуникации", "Программы для смартфонов"], "FanRuan": ["Визуализация и анализ данных"], "GStarCAD": ["САПР"], "Handy Backup": ["Резервное копирование и управление данными"], "iSpring": ["Платформы для онлайн-обучения"], "Just AI": ["ПО в сфере ИИ", "Речевые технологии, компьютерное зрение"], "LiteManager": ["Удаленное управление устройствами"], "Master PDF (Code Industry)": ["Работа с PDF"], "Paragon Software Group": ["Файлы и диски", "Резервное копирование и управление данными"], "Postgres Pro": ["СУБД"], "PROMT": ["Онлайн-переводчик"], "Radmin": ["Удаленное управление устройствами"], "Renga Software": ["САПР"], "SpaceVM": ["Виртуализация", "Виртуализация рабочих мест, VDI"], "Uncom OS": ["Операционные системы"], "VK Tech": ["Облачные платформы", "Корпоративные коммуникации", "ПО в сфере ИИ"], "Базальт": ["Операционные системы"], "ГазИнформСервис": ["Системы ЭДО", "Техническая поддержка и консалтинг"], "Группа Астра": ["Операционные системы", "Виртуализация", "СУБД"], "Киберпротект": ["Резервное копирование и управление данными"], "Контур": ["Системы ЭДО", "Корпоративные коммуникации"], "Лаборатория Касперского": ["Техническая поддержка и консалтинг", "Средства разработки ПО"], "МойОфис": ["Корпоративные коммуникации", "Программы для смартфонов", "Файлы и диски"], "МТС Линк": ["Видеосвязь и веб-конференции", "Платформы для онлайн-обучения"], "Р7": ["Корпоративные коммуникации", "Файлы и диски"], "РЕД СОФТ": ["Операционные системы", "СУБД"], "РОСА": ["Операционные системы"], "Росплатформа": ["Облачные платформы", "Виртуализация, гиперконвергенция"], "Салют для бизнеса (SberDevices)": ["ПО в сфере ИИ", "Речевые технологии, компьютерное зрение"], "Труконф": ["Видеосвязь и веб-конференции", "Корпоративные коммуникации"], "Флант (Deckhouse)": ["Контейнерные платформы", "Облачные платформы"], "ЦРТ": ["Речевые технологии, компьютерное зрение", "ПО в сфере ИИ"], "Яндекс 360 для бизнеса": ["Корпоративные коммуникации", "Файлы и диски", "Программы для смартфонов"], } conn.executemany("INSERT INTO categories(name) VALUES (?)", [(name,) for name in categories]) conn.executemany("INSERT INTO vendors(name) VALUES (?)", [(name,) for name in vendors]) category_ids = {r["name"]: r["id"] for r in conn.execute("SELECT id, name FROM categories")} vendor_ids = {r["name"]: r["id"] for r in conn.execute("SELECT id, name FROM vendors")} pairs: list[tuple[int, int]] = [] for vendor, cats in vendor_links.items(): v_id = vendor_ids.get(vendor) if not v_id: continue for cat in cats: c_id = category_ids.get(cat) if c_id: pairs.append((v_id, c_id)) conn.executemany( "INSERT OR IGNORE INTO vendor_categories(vendor_id, category_id) VALUES (?, ?)", pairs, ) def init_db() -> None: conn = get_db() conn.executescript( """ CREATE TABLE IF NOT EXISTS vendors ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE ); CREATE TABLE IF NOT EXISTS categories ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE ); CREATE TABLE IF NOT EXISTS vendor_categories ( vendor_id INTEGER NOT NULL, category_id INTEGER NOT NULL, PRIMARY KEY (vendor_id, category_id), FOREIGN KEY(vendor_id) REFERENCES vendors(id) ON DELETE CASCADE, FOREIGN KEY(category_id) REFERENCES categories(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS ib_vendors ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE ); CREATE TABLE IF NOT EXISTS ib_categories ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE ); CREATE TABLE IF NOT EXISTS ib_vendor_categories ( vendor_id INTEGER NOT NULL, category_id INTEGER NOT NULL, PRIMARY KEY (vendor_id, category_id), FOREIGN KEY(vendor_id) REFERENCES ib_vendors(id) ON DELETE CASCADE, FOREIGN KEY(category_id) REFERENCES ib_categories(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS products ( id INTEGER PRIMARY KEY AUTOINCREMENT, vendor_id INTEGER NOT NULL, name TEXT NOT NULL, url TEXT, UNIQUE(vendor_id, name), FOREIGN KEY(vendor_id) REFERENCES vendors(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS product_categories ( product_id INTEGER NOT NULL, category_id INTEGER NOT NULL, PRIMARY KEY (product_id, category_id), FOREIGN KEY(product_id) REFERENCES products(id) ON DELETE CASCADE, FOREIGN KEY(category_id) REFERENCES categories(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS ib_products ( id INTEGER PRIMARY KEY AUTOINCREMENT, vendor_id INTEGER NOT NULL, name TEXT NOT NULL, url TEXT, UNIQUE(vendor_id, name), FOREIGN KEY(vendor_id) REFERENCES ib_vendors(id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS ib_product_categories ( product_id INTEGER NOT NULL, category_id INTEGER NOT NULL, PRIMARY KEY (product_id, category_id), FOREIGN KEY(product_id) REFERENCES ib_products(id) ON DELETE CASCADE, FOREIGN KEY(category_id) REFERENCES ib_categories(id) ON DELETE CASCADE ); """ ) try: conn.execute("ALTER TABLE products ADD COLUMN url TEXT") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE ib_products ADD COLUMN url TEXT") except sqlite3.OperationalError: pass has_data = conn.execute("SELECT EXISTS(SELECT 1 FROM vendors)").fetchone()[0] if not has_data and ENABLE_BOOTSTRAP: seed_data(conn) has_ib_data = conn.execute("SELECT EXISTS(SELECT 1 FROM ib_vendors)").fetchone()[0] if not has_ib_data and ENABLE_BOOTSTRAP: ib_matrix = None from_xlsx = load_matrices_from_xlsx() if from_xlsx: ib_matrix = from_xlsx.get("ib") if not ib_matrix: ib_matrix = IB_MATRIX seed_ib_data(conn, ib_matrix) if ENABLE_BOOTSTRAP: bootstrap_products_from_vendor_links(conn, "infra") bootstrap_products_from_vendor_links(conn, "ib") import_infra_products_from_json(conn) conn.commit() conn.close() def fetch_matrix() -> dict: conn = get_db() vendors = [dict(r) for r in conn.execute("SELECT id, name FROM vendors ORDER BY lower(name)")] categories = [dict(r) for r in conn.execute("SELECT id, name FROM categories ORDER BY lower(name)")] links = [dict(r) for r in conn.execute("SELECT vendor_id, category_id FROM vendor_categories")] conn.close() return {"vendors": vendors, "categories": categories, "links": links} def scope_tables(scope: str) -> dict[str, str]: if scope == "ib": return { "vendors": "ib_vendors", "categories": "ib_categories", "vendor_categories": "ib_vendor_categories", "products": "ib_products", "product_categories": "ib_product_categories", } return { "vendors": "vendors", "categories": "categories", "vendor_categories": "vendor_categories", "products": "products", "product_categories": "product_categories", } def seed_ib_data(conn: sqlite3.Connection, matrix: dict) -> None: categories = [item["name"] for item in matrix.get("categories", [])] vendors = [item["name"] for item in matrix.get("vendors", [])] links = matrix.get("links", []) conn.executemany("INSERT OR IGNORE INTO ib_categories(name) VALUES (?)", [(name,) for name in categories]) conn.executemany("INSERT OR IGNORE INTO ib_vendors(name) VALUES (?)", [(name,) for name in vendors]) category_ids = {r["name"]: r["id"] for r in conn.execute("SELECT id, name FROM ib_categories")} vendor_ids = {r["name"]: r["id"] for r in conn.execute("SELECT id, name FROM ib_vendors")} src_category_by_id = {item["id"]: item["name"] for item in matrix.get("categories", [])} src_vendor_by_id = {item["id"]: item["name"] for item in matrix.get("vendors", [])} pairs: list[tuple[int, int]] = [] for link in links: src_vendor_name = src_vendor_by_id.get(link["vendor_id"]) src_category_name = src_category_by_id.get(link["category_id"]) if not src_vendor_name or not src_category_name: continue db_vendor_id = vendor_ids.get(src_vendor_name) db_category_id = category_ids.get(src_category_name) if db_vendor_id and db_category_id: pairs.append((db_vendor_id, db_category_id)) conn.executemany( "INSERT OR IGNORE INTO ib_vendor_categories(vendor_id, category_id) VALUES (?, ?)", pairs, ) def fetch_ib_matrix() -> dict: conn = get_db() vendors = [dict(r) for r in conn.execute("SELECT id, name FROM ib_vendors ORDER BY lower(name)")] categories = [dict(r) for r in conn.execute("SELECT id, name FROM ib_categories ORDER BY lower(name)")] links = [dict(r) for r in conn.execute("SELECT vendor_id, category_id FROM ib_vendor_categories")] conn.close() return {"vendors": vendors, "categories": categories, "links": links} def fetch_scope_data(scope: str) -> dict: tables = scope_tables(scope) conn = get_db() vendors = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['vendors']} ORDER BY lower(name)")] categories = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['categories']} ORDER BY lower(name)")] products = [ dict(r) for r in conn.execute( f""" SELECT p.id, p.name, p.vendor_id, v.name AS vendor_name , p.url FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id ORDER BY lower(v.name), lower(p.name) """ ) ] product_links = [ dict(r) for r in conn.execute( f"SELECT product_id, category_id FROM {tables['product_categories']}" ) ] links = [ dict(r) for r in conn.execute( f"SELECT vendor_id, category_id FROM {tables['vendor_categories']}" ) ] conn.close() return { "vendors": vendors, "categories": categories, "products": products, "product_links": product_links, "links": links, } def bootstrap_products_from_vendor_links(conn: sqlite3.Connection, scope: str) -> None: tables = scope_tables(scope) has_products = conn.execute(f"SELECT EXISTS(SELECT 1 FROM {tables['products']})").fetchone()[0] if has_products: return vendors = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['vendors']}")] for vendor in vendors: cur = conn.execute( f"INSERT INTO {tables['products']}(vendor_id, name) VALUES (?, ?)", (vendor["id"], "Базовый продукт"), ) product_id = cur.lastrowid categories = [ r["category_id"] for r in conn.execute( f"SELECT category_id FROM {tables['vendor_categories']} WHERE vendor_id = ?", (vendor["id"],), ) ] conn.executemany( f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)", [(product_id, c_id) for c_id in categories], ) def import_infra_products_from_json(conn: sqlite3.Connection) -> None: present_files = [p for p in INFRA_JSON_FILES if p.exists()] if not present_files: return marker_exists = conn.execute("SELECT EXISTS(SELECT 1 FROM products WHERE url IS NOT NULL AND trim(url) <> '')").fetchone()[0] if marker_exists: return tables = scope_tables("infra") vendors = {r["name"]: r["id"] for r in conn.execute(f"SELECT id, name FROM {tables['vendors']}")} categories = {r["name"]: r["id"] for r in conn.execute(f"SELECT id, name FROM {tables['categories']}")} imported_products = 0 imported_links = 0 skipped = 0 for path in present_files: try: payload = json.loads(path.read_text(encoding="utf-8")) except Exception: continue if not isinstance(payload, list): continue for item in payload: if not isinstance(item, dict): continue vendor_name = (item.get("vendor") or "").strip() product_name = (item.get("product") or "").strip() if not vendor_name or not product_name: skipped += 1 continue if "нет подтвержденного соответствия" in product_name.lower(): skipped += 1 continue vendor_id = vendors.get(vendor_name) if not vendor_id: skipped += 1 continue product_url = "" evidence = item.get("evidence") or [] if isinstance(evidence, list): for entry in evidence: if isinstance(entry, dict): url = (entry.get("url") or "").strip() if url: product_url = url break conn.execute( f"INSERT OR IGNORE INTO {tables['products']}(vendor_id, name, url) VALUES (?, ?, ?)", (vendor_id, product_name, product_url or None), ) conn.execute( f"UPDATE {tables['products']} SET url = COALESCE(NULLIF(url, ''), ?) WHERE vendor_id = ? AND name = ?", (product_url or None, vendor_id, product_name), ) product_id_row = conn.execute( f"SELECT id FROM {tables['products']} WHERE vendor_id = ? AND name = ?", (vendor_id, product_name), ).fetchone() if not product_id_row: skipped += 1 continue product_id = product_id_row["id"] imported_products += 1 category_names = item.get("categories") or [] if isinstance(category_names, list): for category_name_raw in category_names: category_name = str(category_name_raw).strip() category_id = categories.get(category_name) if not category_id: continue conn.execute( f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)", (product_id, category_id), ) imported_links += 1 conn.execute(f"DELETE FROM {tables['vendor_categories']}") conn.execute( f""" INSERT OR IGNORE INTO {tables['vendor_categories']}(vendor_id, category_id) SELECT DISTINCT p.vendor_id, pc.category_id FROM {tables['products']} p JOIN {tables['product_categories']} pc ON pc.product_id = p.id """ ) if imported_products == 0 and skipped > 0: # Preserve non-empty startup state if JSON couldn't be mapped. bootstrap_products_from_vendor_links(conn, "infra") def build_matrix_from_lists( vendors: list[str], categories: list[str], vendor_links: dict[str, list[str]], ) -> dict: categories_payload = [{"id": i + 1, "name": name} for i, name in enumerate(categories)] vendors_payload = [{"id": i + 1, "name": name} for i, name in enumerate(vendors)] category_ids = {item["name"]: item["id"] for item in categories_payload} vendor_ids = {item["name"]: item["id"] for item in vendors_payload} links_payload: list[dict[str, int]] = [] for vendor_name, linked_categories in vendor_links.items(): v_id = vendor_ids.get(vendor_name) if not v_id: continue for category_name in linked_categories: c_id = category_ids.get(category_name) if c_id: links_payload.append({"vendor_id": v_id, "category_id": c_id}) return {"vendors": vendors_payload, "categories": categories_payload, "links": links_payload} def parse_xlsx_matrix_sheet( sheet, *, header_row: int, data_start_row: int, category_start_col: int, ) -> dict: category_cols: list[tuple[int, str]] = [] for col in range(category_start_col, sheet.max_column + 1): raw = sheet.cell(header_row, col).value if raw is None: continue name = str(raw).strip() if name: category_cols.append((col, name)) categories_payload = [{"id": i + 1, "name": name} for i, (_, name) in enumerate(category_cols)] category_id_by_col = {col: idx + 1 for idx, (col, _) in enumerate(category_cols)} vendors_payload: list[dict[str, str | int]] = [] links_payload: list[dict[str, int]] = [] for row in range(data_start_row, sheet.max_row + 1): raw_vendor = sheet.cell(row, 1).value if raw_vendor is None: continue vendor_name = str(raw_vendor).strip() if not vendor_name: continue lowered = vendor_name.lower() if "вендор" in lowered or "решение" in lowered or "категория" in lowered: continue vendor_id = len(vendors_payload) + 1 vendors_payload.append({"id": vendor_id, "name": vendor_name}) for col, _ in category_cols: mark = sheet.cell(row, col).value if mark is None: continue if str(mark).strip() == "": continue links_payload.append({"vendor_id": vendor_id, "category_id": category_id_by_col[col]}) return {"vendors": vendors_payload, "categories": categories_payload, "links": links_payload} def load_matrices_from_xlsx() -> dict[str, dict] | None: if load_workbook is None: return None if not XLSX_PATH.exists(): return None wb = load_workbook(XLSX_PATH, data_only=True) if "инфра" not in wb.sheetnames or "инфобез" not in wb.sheetnames: return None infra = parse_xlsx_matrix_sheet( wb["инфра"], header_row=1, data_start_row=2, category_start_col=4, ) ib = parse_xlsx_matrix_sheet( wb["инфобез"], header_row=2, data_start_row=4, category_start_col=3, ) return {"infra": infra, "ib": ib} IB_CATEGORIES = [ "Защита конечных устройств (EDR/EPP)", "Безопасность мобильных устройств", "Межсетевые экраны и NGFW", "Удаленный доступ (VPN)", "Защита от DDoS", "Защита виртуальных сред", "NTA / анализ сетевого трафика", "Защита АСУ ТП", "Sandbox", "Управление уязвимостями (VM)", "Управление событиями (SIEM)", "SOAR", "SGRC / комплаенс", "Поведенческий анализ (UEBA)", "Антифрод", "KMS / криптозащита", "DLP", "Классификация и маркировка данных", "Защита баз данных", "DRM", "DAM / доступ к секретам", "Биометрическая аутентификация", "MFA", "Менеджер паролей", "SWG / веб-безопасность", "Родительский контроль", ] IB_VENDORS = [ "Bifit Mitigator", "BI.ZONE", "Check Point", "F6", "InfoWatch", "Positive Technologies", "Лаборатория Касперского", "Киберпротект", "Код Безопасности", "Р7", "Контур", "UserGate", "С-Терра", "Гарда", "КриптоПро", "Эшелон", "R-Vision", "RuSIEM", "SkyDNS", "IKOD", "StaffCop", "Zecurion", "Nano Security", "StopPhish", ] IB_VENDOR_LINKS = { "Bifit Mitigator": ["Антифрод", "UEBA", "SIEM"], "BI.ZONE": ["SIEM", "SOAR", "SGRC / комплаенс", "VM", "DLP", "Антифрод"], "Check Point": ["Межсетевые экраны и NGFW", "VPN", "Защита конечных устройств (EDR/EPP)", "SWG / веб-безопасность"], "F6": ["Антифрод", "Защита от DDoS", "NTA / анализ сетевого трафика"], "InfoWatch": ["DLP", "Классификация и маркировка данных", "DRM"], "Positive Technologies": ["VM", "NTA / анализ сетевого трафика", "SIEM", "SOAR", "SGRC / комплаенс"], "Лаборатория Касперского": ["Защита конечных устройств (EDR/EPP)", "Sandbox", "SIEM", "SWG / веб-безопасность"], "Киберпротект": ["DLP", "Защита баз данных", "DRM"], "Код Безопасности": ["Межсетевые экраны и NGFW", "VPN", "Защита виртуальных сред", "KMS / криптозащита"], "Р7": ["MFA", "Менеджер паролей"], "Контур": ["MFA", "Биометрическая аутентификация"], "UserGate": ["Межсетевые экраны и NGFW", "SWG / веб-безопасность", "VPN"], "С-Терра": ["VPN", "KMS / криптозащита"], "Гарда": ["DLP", "Классификация и маркировка данных", "SIEM"], "КриптоПро": ["KMS / криптозащита", "MFA", "Биометрическая аутентификация"], "Эшелон": ["VM", "SIEM", "SGRC / комплаенс"], "R-Vision": ["SIEM", "SOAR", "SGRC / комплаенс", "UEBA"], "RuSIEM": ["SIEM", "SOAR"], "SkyDNS": ["SWG / веб-безопасность", "Родительский контроль"], "IKOD": ["DAM / доступ к секретам", "Менеджер паролей"], "StaffCop": ["UEBA", "DLP"], "Zecurion": ["DLP", "Классификация и маркировка данных", "Защита баз данных"], "Nano Security": ["Защита конечных устройств (EDR/EPP)", "Sandbox"], "StopPhish": ["Антифрод", "MFA"], } IB_MATRIX = build_matrix_from_lists(IB_VENDORS, IB_CATEGORIES, IB_VENDOR_LINKS) def require_admin() -> bool: return bool(session.get("is_admin")) def parse_int(form_value: str | None) -> int | None: if not form_value: return None try: return int(form_value) except ValueError: return None @app.get("/") def index(): return render_template_string(INDEX_HTML) @app.get("/api/data") def api_data(): scope = (request.args.get("scope") or "infra").strip().lower() if scope in {"ib", "sec", "security"}: scope = "ib" else: scope = "infra" return jsonify(fetch_scope_data(scope)) @app.route(ADMIN_PATH, methods=["GET", "POST"]) def admin_login_or_panel(): conn = get_db() raw_scope = (request.args.get("scope") or request.form.get("scope") or "infra").strip().lower() scope = "ib" if raw_scope in {"ib", "sec", "security"} else "infra" tables = scope_tables(scope) if request.method == "POST" and not require_admin() and request.form.get("action") == "login": if request.form.get("username") == ADMIN_LOGIN and request.form.get("password") == ADMIN_PASSWORD: session["is_admin"] = True conn.close() return redirect(ADMIN_PATH) conn.close() return render_template_string(LOGIN_HTML, error="Неверный логин или пароль") if not require_admin(): conn.close() return render_template_string(LOGIN_HTML, error=None) if request.method == "POST": action = request.form.get("action", "") if action == "logout": session.pop("is_admin", None) conn.close() return redirect(ADMIN_PATH) if action == "add_vendor": name = (request.form.get("name") or "").strip() if name: conn.execute(f"INSERT OR IGNORE INTO {tables['vendors']}(name) VALUES (?)", (name,)) elif action == "add_category": name = (request.form.get("name") or "").strip() if name: conn.execute(f"INSERT OR IGNORE INTO {tables['categories']}(name) VALUES (?)", (name,)) elif action == "add_product": vendor_id = parse_int(request.form.get("vendor_id")) name = (request.form.get("name") or "").strip() url = (request.form.get("url") or "").strip() if vendor_id and name: conn.execute( f"INSERT OR IGNORE INTO {tables['products']}(vendor_id, name, url) VALUES (?, ?, ?)", (vendor_id, name, url or None), ) if url: conn.execute( f"UPDATE {tables['products']} SET url = ? WHERE vendor_id = ? AND name = ?", (url, vendor_id, name), ) elif action == "delete_vendor": v_id = parse_int(request.form.get("vendor_id")) if v_id: conn.execute(f"DELETE FROM {tables['vendors']} WHERE id = ?", (v_id,)) elif action == "delete_category": c_id = parse_int(request.form.get("category_id")) if c_id: conn.execute(f"DELETE FROM {tables['categories']} WHERE id = ?", (c_id,)) elif action == "delete_product": p_id = parse_int(request.form.get("product_id")) if p_id: conn.execute(f"DELETE FROM {tables['products']} WHERE id = ?", (p_id,)) elif action == "save_matrix": products = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['products']}")] categories = [r["id"] for r in conn.execute(f"SELECT id FROM {tables['categories']}")] new_pairs: list[tuple[int, int]] = [] for p_id in products: for c_id in categories: if request.form.get(f"pc_{p_id}_{c_id}"): new_pairs.append((p_id, c_id)) conn.execute(f"DELETE FROM {tables['product_categories']}") conn.executemany( f"INSERT OR IGNORE INTO {tables['product_categories']}(product_id, category_id) VALUES (?, ?)", new_pairs, ) conn.execute(f"DELETE FROM {tables['vendor_categories']}") conn.execute( f""" INSERT OR IGNORE INTO {tables['vendor_categories']}(vendor_id, category_id) SELECT DISTINCT p.vendor_id, pc.category_id FROM {tables['products']} p JOIN {tables['product_categories']} pc ON pc.product_id = p.id """ ) conn.commit() conn.close() return redirect(f"{ADMIN_PATH}?scope={scope}") vendors = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['vendors']} ORDER BY lower(name)")] categories = [dict(r) for r in conn.execute(f"SELECT id, name FROM {tables['categories']} ORDER BY lower(name)")] products = [ dict(r) for r in conn.execute( f""" SELECT p.id, p.name, p.vendor_id, v.name AS vendor_name , p.url FROM {tables['products']} p JOIN {tables['vendors']} v ON v.id = p.vendor_id ORDER BY lower(v.name), lower(p.name) """ ) ] links = { (r["product_id"], r["category_id"]) for r in conn.execute(f"SELECT product_id, category_id FROM {tables['product_categories']}") } conn.close() return render_template_string( ADMIN_HTML, vendors=vendors, categories=categories, products=products, links=links, scope=scope, ) @app.get("/health") def health(): return {"status": "ok"} @app.get("/assets/mont-logo") def mont_logo(): return send_from_directory(BASE_DIR, "mont_logo.png") INDEX_HTML = """ Корзина МОНТ

Вендоры в корзине МОНТ

Актуальная матрица вендоров, продуктов и категорий. Выбирайте вендоров или категории, чтобы видеть релевантные продуктовые линейки в Инфраструктуре и ИБ.

Вендоры

Категории

Вендоры и продукты (после фильтрации)

Made by Galyaviev
RGalyaviev@mont.com
""" LOGIN_HTML = """ Admin Login

Редактор матрицы

{% if error %}

{{ error }}

{% endif %}
""" ADMIN_HTML = """ Admin Matrix
Админ-панель матрицы

Добавить вендора

Добавить категорию

Добавить продукт

Удалить вендора

{% for v in vendors %}
{{ v.name }}
{% endfor %}

Удалить категорию

{% for c in categories %}
{{ c.name }}
{% endfor %}

Удалить продукт

{% for p in products %}
{{ p.vendor_name }} :: {{ p.name }} {% if p.url %}ссылка{% endif %}
{% endfor %}

Прокрутка: колесо/тачпад вниз-вверх внутри таблицы, полосой ниже - влево-вправо.

{% for c in categories %} {% endfor %} {% for p in products %} {% for c in categories %} {% endfor %} {% endfor %}
Вендор / Продукт{{ c.name }}
{{ p.vendor_name }}
{{ p.name }}
""" if __name__ == "__main__": init_db() app.run(host="0.0.0.0", port=5000, debug=True) else: init_db()