first commit

This commit is contained in:
2026-05-06 13:28:54 +03:00
commit 79768a4fac
13 changed files with 1827 additions and 0 deletions
+935
View File
@@ -0,0 +1,935 @@
import os
import random
import sqlite3
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from functools import wraps
from pathlib import Path
from typing import List, Optional, Set, Tuple
import requests
from dotenv import load_dotenv
from flask import Flask, g, redirect, render_template, request, session, url_for
from werkzeug.security import check_password_hash, generate_password_hash
BASE_DIR = Path(__file__).resolve().parent
load_dotenv(BASE_DIR / '.env')
STAR_VALUES = [5, 4, 3, 2, 1]
DEFAULT_STARS = {5}
AUTO_REPLY_STARS = {5, 4}
AUTO_REPLY_SETTING_KEY = "auto_reply_enabled"
AUTO_REPLY_LAST_RUN_KEY = "auto_reply_last_run_ts"
AUTO_REPLY_POOL_5_KEY = "auto_reply_pool_5"
AUTO_REPLY_POOL_4_KEY = "auto_reply_pool_4"
AUTO_REPLY_INTERVAL_MINUTES = int(os.getenv("AUTO_REPLY_INTERVAL_MINUTES", "10"))
DEFAULT_REPLY_POOL_5 = [
"Спасибо за высокую оценку! Нам очень приятно.",
"Благодарим за отзыв и доверие к нашему магазину!",
"Спасибо! Рады, что товар вам понравился.",
]
DEFAULT_REPLY_POOL_4 = [
"Спасибо за отзыв! Учтём ваши замечания и станем лучше.",
"Благодарим за оценку! Работаем над тем, чтобы было на 5 звёзд.",
"Спасибо за обратную связь! Нам важно ваше мнение.",
]
class FeedbackApiError(RuntimeError):
"""Raised when the Wildberries Feedback API returns an error."""
@dataclass
class Review:
id: str
text: str
pros: str
cons: str
rating: int
created_at: datetime
product_name: str
answer: Optional[str]
is_answered: bool
user_name: str
@classmethod
def from_api(cls, payload: dict) -> "Review":
created_at = payload.get("createdDate")
created_dt = None
if created_at:
created_dt = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
product = payload.get("productDetails", {}) or {}
answer_payload = payload.get("answer")
if isinstance(answer_payload, dict):
answer_value = answer_payload.get("text")
else:
answer_value = answer_payload
return cls(
id=payload.get("id") or "",
text=payload.get("text") or "",
pros=payload.get("pros") or "",
cons=payload.get("cons") or "",
rating=payload.get("productValuation") or 0,
created_at=created_dt,
product_name=product.get("productName") or "",
answer=answer_value,
is_answered=bool(answer_value),
user_name=payload.get("userName") or "",
)
class Database:
def __init__(self, db_path: Path) -> None:
self.db_path = db_path
self._ensure_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def _ensure_db(self) -> None:
conn = self._connect()
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
is_admin INTEGER DEFAULT 0,
is_active INTEGER DEFAULT 0
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
name TEXT NOT NULL,
token TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id)
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS auto_reply_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
review_id TEXT NOT NULL,
rating INTEGER NOT NULL,
product_name TEXT,
user_name TEXT,
reply_text TEXT NOT NULL,
status TEXT NOT NULL,
error_text TEXT
);
"""
)
self._ensure_admin(conn)
self._ensure_token_user_column(conn)
conn.commit()
finally:
conn.close()
def _ensure_admin(self, conn: sqlite3.Connection) -> None:
admin = conn.execute("SELECT id FROM users WHERE username = ?", ("ruslan",)).fetchone()
if not admin:
password_hash = generate_password_hash("utOgbZ09ruslan+")
conn.execute(
"INSERT INTO users(username, password_hash, is_admin, is_active) VALUES (?, ?, 1, 1)",
("ruslan", password_hash),
)
def _ensure_token_user_column(self, conn: sqlite3.Connection) -> None:
info = conn.execute("PRAGMA table_info(tokens)").fetchall()
columns = {row["name"] for row in info}
if "user_id" not in columns:
conn.execute("ALTER TABLE tokens ADD COLUMN user_id INTEGER")
admin = conn.execute("SELECT id FROM users WHERE username = ?", ("ruslan",)).fetchone()
admin_id = admin["id"] if admin else None
if admin_id:
conn.execute(
"UPDATE tokens SET user_id = ? WHERE user_id IS NULL",
(admin_id,),
)
# User helpers
def create_user(self, username: str, password_hash: str) -> None:
conn = self._connect()
try:
conn.execute(
"INSERT INTO users(username, password_hash, is_admin, is_active) VALUES (?, ?, 0, 0)",
(username.lower(), password_hash),
)
conn.commit()
finally:
conn.close()
def get_user_by_username(self, username: str) -> Optional[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute(
"SELECT * FROM users WHERE username = ?", (username.lower(),)
).fetchone()
finally:
conn.close()
def get_user_by_id(self, user_id: int) -> Optional[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
finally:
conn.close()
def list_users(self) -> List[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute("SELECT * FROM users ORDER BY username").fetchall()
finally:
conn.close()
def set_user_active(self, user_id: int, is_active: bool) -> None:
conn = self._connect()
try:
conn.execute("UPDATE users SET is_active = ? WHERE id = ?", (1 if is_active else 0, user_id))
conn.commit()
finally:
conn.close()
# Token helpers
def add_token(self, user_id: int, name: str, token: str) -> None:
conn = self._connect()
try:
conn.execute(
"INSERT INTO tokens(user_id, name, token) VALUES (?, ?, ?)",
(user_id, name.strip(), token.strip()),
)
conn.commit()
finally:
conn.close()
def fetch_tokens_for_user(self, user_id: int, is_admin: bool) -> List[sqlite3.Row]:
conn = self._connect()
try:
if is_admin:
query = """
SELECT tokens.id, tokens.name, tokens.token, tokens.user_id, users.username AS owner
FROM tokens
LEFT JOIN users ON users.id = tokens.user_id
ORDER BY tokens.id DESC
"""
return conn.execute(query).fetchall()
query = """
SELECT tokens.id, tokens.name, tokens.token, tokens.user_id, users.username AS owner
FROM tokens
LEFT JOIN users ON users.id = tokens.user_id
WHERE tokens.user_id = ?
ORDER BY tokens.id DESC
"""
return conn.execute(query, (user_id,)).fetchall()
finally:
conn.close()
def fetch_first_token_for_user(self, user_id: int) -> Optional[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute(
"SELECT * FROM tokens WHERE user_id = ? ORDER BY id DESC LIMIT 1",
(user_id,),
).fetchone()
finally:
conn.close()
def fetch_first_token_any(self) -> Optional[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute("SELECT * FROM tokens ORDER BY id DESC LIMIT 1").fetchone()
finally:
conn.close()
def get_token(self, token_id: int) -> Optional[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute("SELECT * FROM tokens WHERE id = ?", (token_id,)).fetchone()
finally:
conn.close()
def get_setting(self, key: str) -> Optional[str]:
conn = self._connect()
try:
row = conn.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
return row["value"] if row else None
finally:
conn.close()
def set_setting(self, key: str, value: str) -> None:
conn = self._connect()
try:
conn.execute(
"""
INSERT INTO settings(key, value) VALUES (?, ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value
""",
(key, value),
)
conn.commit()
finally:
conn.close()
def add_auto_reply_log(
self,
*,
review_id: str,
rating: int,
product_name: str,
user_name: str,
reply_text: str,
status: str,
error_text: Optional[str] = None,
) -> None:
conn = self._connect()
try:
conn.execute(
"""
INSERT INTO auto_reply_logs(
created_at, review_id, rating, product_name, user_name, reply_text, status, error_text
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
datetime.utcnow().isoformat(),
review_id,
rating,
product_name,
user_name,
reply_text,
status,
error_text,
),
)
conn.commit()
finally:
conn.close()
def list_auto_reply_logs(self, limit: int = 100) -> List[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute(
"""
SELECT * FROM auto_reply_logs
ORDER BY id DESC
LIMIT ?
""",
(limit,),
).fetchall()
finally:
conn.close()
class FeedbackClient:
BASE_URL = "https://feedbacks-api.wildberries.ru/api/v1/feedbacks"
ANSWER_URL = "https://feedbacks-api.wildberries.ru/api/v1/feedbacks/answer"
def __init__(self, token: Optional[str], page_size: int = 100, timeout: int = 15) -> None:
self.token = token
self.page_size = page_size
self.timeout = timeout
def _get_headers(self) -> dict:
if not self.token:
raise FeedbackApiError(
"Токен Wildberries не найден. Добавьте токен в личном кабинете."
)
return {"Authorization": f"Bearer {self.token}"}
def _request(self, *, is_answered: bool, skip: int, take: int) -> List[dict]:
params = {
"isAnswered": str(is_answered).lower(),
"skip": skip,
"take": take,
}
headers = self._get_headers()
response = requests.get(self.BASE_URL, params=params, headers=headers, timeout=self.timeout)
if not response.ok:
raise FeedbackApiError(f"Ошибка запроса: {response.status_code} {response.text}")
payload = response.json()
if payload.get("error"):
raise FeedbackApiError(payload.get("errorText") or "Не удалось получить отзывы.")
data = payload.get("data") or {}
return data.get("feedbacks") or []
def fetch_reviews(
self,
limit: int = 50,
unanswered_only: bool = False,
allowed_ratings: Optional[Set[int]] = None,
) -> List[Review]:
reviews: List[Review] = []
if unanswered_only:
raw_reviews = self._request(is_answered=False, skip=0, take=min(limit, self.page_size))
reviews = [Review.from_api(item) for item in raw_reviews]
else:
raw = []
raw.extend(self._request(is_answered=False, skip=0, take=self.page_size))
raw.extend(self._request(is_answered=True, skip=0, take=self.page_size))
raw.sort(key=lambda r: r.get("createdDate"), reverse=True)
reviews = [Review.from_api(item) for item in raw]
allowed = allowed_ratings or DEFAULT_STARS
filtered = [item for item in reviews if item.rating in allowed]
return filtered[:limit]
def validate_token(self) -> None:
"""Проверяет токен, выполняя минимальный запрос к API."""
# Используем минимальный набор данных, чтобы не тратить лимиты без необходимости.
self._request(is_answered=False, skip=0, take=1)
def send_answer(self, review_id: str, text: str) -> None:
payload = {"id": review_id, "text": text}
response = requests.post(
self.ANSWER_URL,
json=payload,
headers=self._get_headers(),
timeout=self.timeout,
)
if not response.ok:
raise FeedbackApiError(f"Не удалось отправить ответ: {response.status_code} {response.text}")
def answer_many(self, review_ids: List[str], text: str) -> int:
sent = 0
for review_id in review_ids:
if not review_id:
continue
self.send_answer(review_id, text)
sent += 1
return sent
def _parse_pool(value: Optional[str], fallback: List[str]) -> List[str]:
if not value:
return fallback
normalized = value.replace("\r\n", "\n").replace("||", "\n")
parsed = [item.strip() for item in normalized.split("\n") if item.strip()]
return parsed or fallback
ENV_REPLY_POOL_5 = os.getenv("REPLY_POOL_5")
ENV_REPLY_POOL_4 = os.getenv("REPLY_POOL_4")
app = Flask(__name__)
app.config["SECRET_KEY"] = os.getenv("FLASK_SECRET_KEY", "dev-secret")
db = Database(BASE_DIR / "tokens.db")
def _parse_selected_stars(raw_values: List[str]) -> List[int]:
normalized: List[int] = []
for value in raw_values:
try:
star = int(value)
except ValueError:
continue
if star in STAR_VALUES and star not in normalized:
normalized.append(star)
if not normalized:
return [5]
return normalized
def _get_session_token() -> Tuple[Optional[int], Optional[str], Optional[str]]:
user = g.get("user")
if not user:
return (None, None, None)
token_id = session.get("token_id")
token_row = None
if token_id is not None:
token_row = db.get_token(int(token_id))
if not token_row or (
not user["is_admin"] and token_row["user_id"] != user["id"]
):
session.pop("token_id", None)
token_row = None
if token_row is None:
token_row = db.fetch_first_token_for_user(user["id"])
if token_row is None and user["is_admin"]:
token_row = db.fetch_first_token_any()
if token_row is not None:
session["token_id"] = token_row["id"]
if token_row is None:
return (None, None, None)
return token_row["id"], token_row["name"], token_row["token"]
def _get_active_token() -> Tuple[Optional[str], Optional[str]]:
token_id, token_name, token_value = _get_session_token()
if token_value:
return token_value, token_name
return None, None
def _get_background_token() -> Optional[str]:
env_token = (os.getenv("WB_API_TOKEN") or "").strip()
if env_token:
return env_token
token_row = db.fetch_first_token_any()
if token_row:
return token_row["token"]
return None
def get_client() -> FeedbackClient:
token_value, _ = _get_active_token()
if not token_value:
raise FeedbackApiError("Токен не задан. Добавьте его в личном кабинете.")
return FeedbackClient(token_value)
def is_auto_reply_enabled() -> bool:
return db.get_setting(AUTO_REPLY_SETTING_KEY) == "1"
def set_auto_reply_enabled(value: bool) -> None:
db.set_setting(AUTO_REPLY_SETTING_KEY, "1" if value else "0")
def _load_reply_pool(star: int) -> List[str]:
if star == 5:
db_value = db.get_setting(AUTO_REPLY_POOL_5_KEY)
return _parse_pool(db_value or ENV_REPLY_POOL_5, DEFAULT_REPLY_POOL_5)
if star == 4:
db_value = db.get_setting(AUTO_REPLY_POOL_4_KEY)
return _parse_pool(db_value or ENV_REPLY_POOL_4, DEFAULT_REPLY_POOL_4)
return []
def _pool_to_multiline_text(pool: List[str]) -> str:
return "\n".join(pool)
def _pick_auto_reply(star: int) -> Optional[str]:
pool = _load_reply_pool(star)
if pool:
return random.choice(pool)
return None
def process_auto_replies() -> int:
token_value = _get_background_token()
if not token_value:
return 0
client = FeedbackClient(token_value)
reviews = client.fetch_reviews(limit=100, unanswered_only=True, allowed_ratings=AUTO_REPLY_STARS)
sent = 0
for review in reviews:
reply_text = _pick_auto_reply(review.rating)
if not reply_text:
continue
try:
client.send_answer(review.id, reply_text)
db.add_auto_reply_log(
review_id=review.id,
rating=review.rating,
product_name=review.product_name,
user_name=review.user_name,
reply_text=reply_text,
status="sent",
)
sent += 1
except FeedbackApiError as exc:
db.add_auto_reply_log(
review_id=review.id,
rating=review.rating,
product_name=review.product_name,
user_name=review.user_name,
reply_text=reply_text,
status="failed",
error_text=str(exc),
)
return sent
def _scheduler_should_run(last_run_raw: Optional[str], now_ts: float) -> bool:
if not last_run_raw:
return True
try:
last_run = float(last_run_raw)
except ValueError:
return True
return now_ts - last_run >= AUTO_REPLY_INTERVAL_MINUTES * 60
def auto_reply_loop() -> None:
while True:
try:
if is_auto_reply_enabled():
now_ts = time.time()
last_run = db.get_setting(AUTO_REPLY_LAST_RUN_KEY)
if _scheduler_should_run(last_run, now_ts):
process_auto_replies()
db.set_setting(AUTO_REPLY_LAST_RUN_KEY, str(now_ts))
except Exception:
pass
time.sleep(60)
@app.template_filter("format_datetime")
def format_datetime(value: Optional[datetime]) -> str:
if not value:
return ""
return value.strftime("%d.%m.%Y %H:%M")
@app.before_request
def load_current_user() -> None:
user_id = session.get("user_id")
g.user = None
if user_id is not None:
user = db.get_user_by_id(int(user_id))
if user and user["is_active"]:
g.user = user
else:
session.pop("user_id", None)
session.pop("token_id", None)
def login_required(view):
@wraps(view)
def wrapped(*args, **kwargs):
if g.get("user") is None:
return redirect(url_for("login", next=request.path))
return view(*args, **kwargs)
return wrapped
def admin_required(view):
@wraps(view)
def wrapped(*args, **kwargs):
user = g.get("user")
if not user or not user["is_admin"]:
return redirect(url_for("index"))
return view(*args, **kwargs)
return wrapped
@app.route("/cabinet", methods=["GET", "POST"])
@login_required
def cabinet():
error_message: Optional[str] = None
success_message: Optional[str] = None
status = request.args.get("status")
status_map = {
"added": "Токен сохранён.",
"selected": "Активирован выбранный магазин.",
"checked": "Токен успешно проверен.",
}
if status in status_map:
success_message = status_map[status]
user = g.user
if request.method == "POST":
action = request.form.get("cabinet_action")
if action == "add":
name = (request.form.get("name") or "").strip()
token_value = (request.form.get("token") or "").strip()
if not name or not token_value:
error_message = "Введите название и токен."
else:
db.add_token(user_id=user["id"], name=name, token=token_value)
return redirect(url_for("cabinet", status="added"))
elif action == "select":
token_id = request.form.get("token_id")
if token_id:
token_row = db.get_token(int(token_id))
if token_row and (user["is_admin"] or token_row["user_id"] == user["id"]):
session["token_id"] = int(token_id)
return redirect(url_for("cabinet", status="selected"))
error_message = "Не удалось выбрать токен."
elif action == "check":
token_id = request.form.get("token_id")
token_row = db.get_token(int(token_id)) if token_id else None
if token_row and (user["is_admin"] or token_row["user_id"] == user["id"]):
try:
FeedbackClient(token_row["token"]).validate_token()
return redirect(url_for("cabinet", status="checked"))
except FeedbackApiError as exc:
error_message = str(exc)
else:
error_message = "Недостаточно прав для проверки токена."
raw_tokens = db.fetch_tokens_for_user(user["id"], bool(user["is_admin"]))
tokens = [
{"id": row["id"], "name": row["name"], "owner": row["owner"], "user_id": row["user_id"]}
for row in raw_tokens
]
active_token_id = session.get("token_id")
return render_template(
"cabinet.html",
tokens=tokens,
active_token_id=active_token_id,
error_message=error_message,
success_message=success_message,
current_user=user,
)
@app.route("/")
@login_required
def index():
action = request.args.get("action") or "all"
status = request.args.get("status")
selected_stars_list = _parse_selected_stars(request.args.getlist("stars"))
selected_stars = set(selected_stars_list)
selected_stars_display = sorted(selected_stars, reverse=True)
active_token_value, active_token_name = _get_active_token()
client: Optional[FeedbackClient] = None
client_error: Optional[str] = None
try:
client = get_client()
except FeedbackApiError as exc:
client_error = str(exc)
reviews: List[Review] = []
current_filter: Optional[str] = None
error_message: Optional[str] = None
success_message: Optional[str] = None
if action in {"all", "unanswered"}:
if client:
try:
if action == "all":
reviews = client.fetch_reviews(
limit=50,
unanswered_only=False,
allowed_ratings=selected_stars,
)
current_filter = "all"
else:
reviews = client.fetch_reviews(
limit=50,
unanswered_only=True,
allowed_ratings=selected_stars,
)
current_filter = "unanswered"
except FeedbackApiError as exc:
error_message = str(exc)
else:
error_message = client_error or "Токен не задан."
elif action == "clear":
return redirect(url_for("index"))
if status == "reply_sent":
count = request.args.get("count") or "0"
success_message = f"Отправлено ответов: {count}"
elif status == "pools_saved":
success_message = "Пулы автоответов сохранены."
elif status == "reply_failed":
error_text = request.args.get("error") or "Не удалось отправить ответы."
error_message = error_text
return render_template(
"index.html",
reviews=reviews,
current_filter=current_filter,
error_message=error_message,
success_message=success_message,
selected_stars=selected_stars,
selected_stars_display=selected_stars_display,
selected_stars_list=selected_stars_list,
active_token_name=active_token_name,
has_token=bool(active_token_value),
current_action=action or "all",
auto_reply_enabled=is_auto_reply_enabled(),
auto_reply_interval_minutes=AUTO_REPLY_INTERVAL_MINUTES,
reply_pool_5_text=_pool_to_multiline_text(_load_reply_pool(5)),
reply_pool_4_text=_pool_to_multiline_text(_load_reply_pool(4)),
auto_reply_logs=db.list_auto_reply_logs(limit=100),
current_user=g.user,
)
@app.route("/auto-reply-toggle", methods=["POST"])
@login_required
def auto_reply_toggle():
enabled = request.form.get("enabled") == "1"
set_auto_reply_enabled(enabled)
return redirect(url_for("index", action="unanswered", stars=[5, 4]))
@app.route("/auto-reply-pools", methods=["POST"])
@login_required
def auto_reply_pools():
pool_5_raw = (request.form.get("pool_5") or "").strip()
pool_4_raw = (request.form.get("pool_4") or "").strip()
pool_5 = _parse_pool(pool_5_raw, [])
pool_4 = _parse_pool(pool_4_raw, [])
if not pool_5 or not pool_4:
return redirect(
url_for(
"index",
status="reply_failed",
error="Для 5★ и 4★ укажите минимум по одному варианту ответа.",
action="unanswered",
stars=[5, 4],
)
)
db.set_setting(AUTO_REPLY_POOL_5_KEY, _pool_to_multiline_text(pool_5))
db.set_setting(AUTO_REPLY_POOL_4_KEY, _pool_to_multiline_text(pool_4))
return redirect(url_for("index", status="pools_saved", action="unanswered", stars=[5, 4]))
@app.route("/reply", methods=["POST"])
@login_required
def reply_all():
message = (request.form.get("message") or "").strip()
review_ids = request.form.getlist("review_id")
stars_from_form = request.form.getlist("stars")
selected_star_values = _parse_selected_stars(stars_from_form)
redirect_params: dict = {"action": "unanswered", "stars": selected_star_values}
if not message:
return redirect(
url_for("index", status="reply_failed", error="Введите текст ответа.", **redirect_params)
)
if len(message) < 2 or len(message) > 5000:
return redirect(
url_for(
"index",
status="reply_failed",
error="Ответ должен содержать от 2 до 5000 символов.",
**redirect_params,
)
)
deduped_ids = []
for review_id in review_ids:
if review_id and review_id not in deduped_ids:
deduped_ids.append(review_id)
if not deduped_ids:
return redirect(
url_for(
"index",
status="reply_failed",
error="Нет отзывов для ответа.",
**redirect_params,
)
)
try:
client = get_client()
sent = client.answer_many(deduped_ids, message)
except FeedbackApiError as exc:
return redirect(url_for("index", status="reply_failed", error=str(exc), **redirect_params))
return redirect(url_for("index", status="reply_sent", count=sent, **redirect_params))
@app.route("/reply/<review_id>", methods=["POST"])
@login_required
def reply_single(review_id: str):
message = (request.form.get("message") or "").strip()
stars_from_form = request.form.getlist("stars")
next_action = request.form.get("next_action") or "all"
redirect_params: dict = {"action": next_action, "stars": _parse_selected_stars(stars_from_form)}
if not message:
return redirect(
url_for("index", status="reply_failed", error="Введите текст ответа.", **redirect_params)
)
if len(message) < 2 or len(message) > 5000:
return redirect(
url_for(
"index",
status="reply_failed",
error="Ответ должен содержать от 2 до 5000 символов.",
**redirect_params,
)
)
try:
client = get_client()
client.send_answer(review_id, message)
except FeedbackApiError as exc:
return redirect(url_for("index", status="reply_failed", error=str(exc), **redirect_params))
return redirect(url_for("index", status="reply_sent", count=1, **redirect_params))
@app.route("/login", methods=["GET", "POST"])
def login():
if g.get("user"):
return redirect(url_for("index"))
error_message: Optional[str] = None
if request.method == "POST":
username = (request.form.get("username") or "").strip().lower()
password = request.form.get("password") or ""
user = db.get_user_by_username(username)
if not user or not check_password_hash(user["password_hash"], password):
error_message = "Неверный логин или пароль."
elif not user["is_active"]:
error_message = "Аккаунт ожидает подтверждения администратора."
else:
session.clear()
session["user_id"] = user["id"]
return redirect(request.args.get("next") or url_for("index"))
return render_template("login.html", error_message=error_message)
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
@app.route("/register", methods=["GET", "POST"])
def register():
if g.get("user"):
return redirect(url_for("index"))
error_message: Optional[str] = None
success_message: Optional[str] = None
if request.method == "POST":
username = (request.form.get("username") or "").strip().lower()
password = request.form.get("password") or ""
confirm = request.form.get("confirm") or ""
if not username or not password:
error_message = "Введите логин и пароль."
elif password != confirm:
error_message = "Пароли не совпадают."
elif username == "ruslan":
error_message = "Нельзя использовать зарезервированный логин."
else:
try:
password_hash = generate_password_hash(password)
db.create_user(username, password_hash)
success_message = "Заявка создана. Дождитесь подтверждения администратора."
except sqlite3.IntegrityError:
error_message = "Такой пользователь уже существует."
return render_template("register.html", error_message=error_message, success_message=success_message)
@app.route("/admin", methods=["GET", "POST"])
@admin_required
def admin_panel():
message: Optional[str] = None
if request.args.get("status") == "updated":
message = "Статус пользователя обновлён."
if request.method == "POST":
user_id = request.form.get("user_id")
action = request.form.get("admin_action")
if user_id and action in {"activate", "deactivate"}:
db.set_user_active(int(user_id), action == "activate")
return redirect(url_for("admin_panel", status="updated"))
users = db.list_users()
return render_template("admin.html", users=users, info_message=message, current_user=g.user)
if __name__ == "__main__":
is_reloader_process = os.environ.get("WERKZEUG_RUN_MAIN") == "true"
if is_reloader_process or not app.debug:
threading.Thread(target=auto_reply_loop, daemon=True).start()
port = int(os.getenv("FLASK_PORT", "5000"))
app.run(host="0.0.0.0", port=port, debug=True)