265 lines
8.1 KiB
Python
265 lines
8.1 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
from pathlib import Path
|
|
from urllib import parse, request as urllib_request
|
|
|
|
from flask import Flask, redirect, render_template, request, send_from_directory, session, url_for
|
|
from werkzeug.security import check_password_hash
|
|
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent
|
|
DATA_DIR = BASE_DIR / "data"
|
|
DB_PATH = DATA_DIR / "infra.db"
|
|
|
|
DEFAULT_SETTINGS = {
|
|
"company_name": "InfraIT",
|
|
"phone_display": "+7 987 297-06-66",
|
|
"phone_link": "+79872970666",
|
|
"email": "maks@infrait.ru",
|
|
"site_url": "https://infrait.ru/",
|
|
"yandex_verification": "PASTE_YOUR_YANDEX_VERIFICATION_TOKEN",
|
|
"yandex_metrika_id": "",
|
|
"telegram_bot_token": "",
|
|
"telegram_chat_id": "",
|
|
"geo_primary": "Казань и Татарстан — выезд в день запроса",
|
|
"geo_secondary": "Россия — удалённая поддержка",
|
|
}
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.getenv("SECRET_KEY", "change-this-secret-key")
|
|
app.config.update(
|
|
SESSION_COOKIE_HTTPONLY=True,
|
|
SESSION_COOKIE_SAMESITE="Lax",
|
|
)
|
|
|
|
# Fixed admin password hash (plain password is never stored in code).
|
|
ADMIN_PASSWORD_HASH = (
|
|
"scrypt:32768:8:1$Ac0t7TD7bUhYLg04$25779398c765417771b888aa15d23dd72ee40bea4e48d0cd"
|
|
"7da9e8e386628a099b1f1e75019059be76c73264deb888959c236f6b776d12f4847e6762d5c76f0f"
|
|
)
|
|
|
|
|
|
def get_db() -> sqlite3.Connection:
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(DB_PATH, timeout=30)
|
|
conn.execute("PRAGMA journal_mode=WAL;")
|
|
conn.execute("PRAGMA busy_timeout=30000;")
|
|
conn.execute("PRAGMA synchronous=NORMAL;")
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_db() -> None:
|
|
with get_db() as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS settings (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS leads (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
created_at TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
company TEXT NOT NULL,
|
|
phone TEXT NOT NULL,
|
|
email TEXT,
|
|
city TEXT,
|
|
computers TEXT NOT NULL,
|
|
message TEXT
|
|
)
|
|
"""
|
|
)
|
|
for key, value in DEFAULT_SETTINGS.items():
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO settings (key, value) VALUES (?, ?)",
|
|
(key, value),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def get_settings() -> dict[str, str]:
|
|
settings = DEFAULT_SETTINGS.copy()
|
|
with get_db() as conn:
|
|
rows = conn.execute("SELECT key, value FROM settings").fetchall()
|
|
for row in rows:
|
|
settings[row["key"]] = row["value"]
|
|
return settings
|
|
|
|
|
|
def update_settings(new_values: dict[str, str]) -> None:
|
|
with get_db() as conn:
|
|
for key, value in new_values.items():
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO settings (key, value)
|
|
VALUES (?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET value=excluded.value
|
|
""",
|
|
(key, value),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def send_telegram_lead(form_data: dict[str, str], settings: dict[str, str]) -> None:
|
|
token = settings.get("telegram_bot_token", "").strip()
|
|
chat_id = settings.get("telegram_chat_id", "").strip()
|
|
if not token or not chat_id:
|
|
return
|
|
|
|
message = (
|
|
"Новая заявка InfraIT\n"
|
|
f"Имя: {form_data['name']}\n"
|
|
f"Компания: {form_data['company']}\n"
|
|
f"Телефон: {form_data['phone']}\n"
|
|
f"Email: {form_data['email'] or '-'}\n"
|
|
f"Город: {form_data['city'] or '-'}\n"
|
|
f"ПК: {form_data['computers']}\n"
|
|
f"Комментарий: {form_data['message'] or '-'}"
|
|
)
|
|
payload = parse.urlencode({"chat_id": chat_id, "text": message}).encode("utf-8")
|
|
req = urllib_request.Request(
|
|
f"https://api.telegram.org/bot{token}/sendMessage",
|
|
data=payload,
|
|
method="POST",
|
|
)
|
|
try:
|
|
urllib_request.urlopen(req, timeout=8).read()
|
|
except Exception:
|
|
# Ошибка телеграм-уведомления не должна ломать отправку формы.
|
|
return
|
|
|
|
|
|
def save_lead(form_data: dict[str, str], settings: dict[str, str]) -> None:
|
|
with get_db() as conn:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO leads (
|
|
created_at, name, company, phone, email, city, computers, message
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
datetime.now().isoformat(timespec="seconds"),
|
|
form_data["name"],
|
|
form_data["company"],
|
|
form_data["phone"],
|
|
form_data["email"],
|
|
form_data["city"],
|
|
form_data["computers"],
|
|
form_data["message"],
|
|
),
|
|
)
|
|
conn.commit()
|
|
send_telegram_lead(form_data, settings)
|
|
|
|
|
|
def admin_required(view):
|
|
@wraps(view)
|
|
def wrapped(*args, **kwargs):
|
|
if not session.get("admin_logged_in"):
|
|
return redirect(url_for("admin_login"))
|
|
return view(*args, **kwargs)
|
|
|
|
return wrapped
|
|
|
|
|
|
def verify_admin_password(password: str) -> bool:
|
|
return check_password_hash(ADMIN_PASSWORD_HASH, password)
|
|
|
|
init_db()
|
|
|
|
|
|
@app.route("/", methods=["GET", "POST"])
|
|
def index():
|
|
success = request.args.get("success") == "1"
|
|
error = None
|
|
settings = get_settings()
|
|
|
|
if request.method == "POST":
|
|
form_data = {
|
|
"name": request.form.get("name", "").strip(),
|
|
"company": request.form.get("company", "").strip(),
|
|
"phone": request.form.get("phone", "").strip(),
|
|
"email": request.form.get("email", "").strip(),
|
|
"city": request.form.get("city", "").strip(),
|
|
"computers": request.form.get("computers", "").strip(),
|
|
"message": request.form.get("message", "").strip(),
|
|
}
|
|
|
|
required_fields = ["name", "phone", "company", "computers"]
|
|
if any(not form_data[field] for field in required_fields):
|
|
error = "Заполните обязательные поля: имя, компания, телефон и количество компьютеров."
|
|
else:
|
|
save_lead(form_data, settings)
|
|
return redirect(url_for("index", success=1) + "#contact")
|
|
|
|
return render_template("index.html", success=success, error=error, settings=settings)
|
|
|
|
|
|
@app.route("/admin/login", methods=["GET", "POST"])
|
|
def admin_login():
|
|
error = None
|
|
|
|
if request.method == "POST":
|
|
password = request.form.get("password", "")
|
|
if verify_admin_password(password):
|
|
session["admin_logged_in"] = True
|
|
return redirect(url_for("admin_settings"))
|
|
error = "Неверный пароль."
|
|
|
|
return render_template("admin_login.html", error=error)
|
|
|
|
|
|
@app.route("/admin/logout")
|
|
def admin_logout():
|
|
session.pop("admin_logged_in", None)
|
|
return redirect(url_for("admin_login"))
|
|
|
|
|
|
@app.route("/admin/settings", methods=["GET", "POST"])
|
|
@admin_required
|
|
def admin_settings():
|
|
success = request.args.get("saved") == "1"
|
|
settings = get_settings()
|
|
|
|
if request.method == "POST":
|
|
updates: dict[str, str] = {}
|
|
for field in DEFAULT_SETTINGS:
|
|
updates[field] = request.form.get(field, "").strip()
|
|
update_settings(updates)
|
|
return redirect(url_for("admin_settings", saved=1))
|
|
|
|
return render_template("admin_settings.html", settings=settings, success=success)
|
|
|
|
|
|
@app.route("/health")
|
|
def health():
|
|
try:
|
|
with get_db() as conn:
|
|
conn.execute("SELECT 1")
|
|
return {"status": "ok"}, 200
|
|
except Exception:
|
|
return {"status": "error"}, 503
|
|
|
|
|
|
@app.route("/favicon.ico")
|
|
def favicon():
|
|
return send_from_directory(
|
|
app.static_folder,
|
|
"img/favicon.png",
|
|
mimetype="image/png",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
init_db()
|
|
app.run(debug=True)
|