import logging import json import os import random import sqlite3 import threading import time from dataclasses import dataclass from datetime import datetime, timezone from functools import wraps from logging.handlers import RotatingFileHandler from pathlib import Path from typing import List, Optional, Set, Tuple from zoneinfo import ZoneInfo import requests from dotenv import load_dotenv from flask import Flask, g, jsonify, redirect, render_template, request, session, url_for from werkzeug.security import check_password_hash, generate_password_hash BASE_DIR = Path(__file__).resolve().parent load_dotenv(BASE_DIR / '.env') STAR_VALUES = [5, 4, 3, 2, 1] DEFAULT_STARS = {5} AUTO_REPLY_STARS = {5, 4} AUTO_REPLY_SETTING_KEY = "auto_reply_enabled" AUTO_REPLY_LAST_RUN_KEY = "auto_reply_last_run_ts" AUTO_REPLY_POOL_5_KEY = "auto_reply_pool_5" AUTO_REPLY_POOL_4_KEY = "auto_reply_pool_4" AUTO_REPLY_POOL_3_KEY = "auto_reply_pool_3" AUTO_REPLY_POOL_2_KEY = "auto_reply_pool_2" AUTO_REPLY_POOL_1_KEY = "auto_reply_pool_1" AUTO_REPLY_STARS_KEY = "auto_reply_stars_json" AUTO_REPLY_FILTER_KEY = "auto_reply_filter" AUTO_REPLY_QUEUE_KEY = "auto_reply_queue_json" AUTO_REPLY_LAST_FETCH_KEY = "auto_reply_last_fetch_ts" AUTO_REPLY_INTERVAL_MINUTES = int(os.getenv("AUTO_REPLY_INTERVAL_MINUTES", "10")) AUTO_REPLY_INTERVAL_SECONDS = int( os.getenv("AUTO_REPLY_INTERVAL_SECONDS", str(AUTO_REPLY_INTERVAL_MINUTES * 60)) ) AUTO_REPLY_FETCH_INTERVAL_SECONDS = int( os.getenv("AUTO_REPLY_FETCH_INTERVAL_SECONDS", "300") ) AUTO_REPLY_BATCH_LIMIT = max(1, int(os.getenv("AUTO_REPLY_BATCH_LIMIT", "25"))) API_GLOBAL_COOLDOWN_UNTIL_KEY = "api_global_cooldown_until_ts" WB_REQUESTS_PER_SECOND = float(os.getenv("WB_REQUESTS_PER_SECOND", "0.8")) WB_MIN_REQUEST_INTERVAL_SECONDS = max(1.1, 1.0 / max(0.1, WB_REQUESTS_PER_SECOND)) APP_TIMEZONE = ZoneInfo(os.getenv("APP_TIMEZONE", "Europe/Moscow")) API_LOG_BODY_MAX = max(200, int(os.getenv("API_LOG_BODY_MAX", "1200"))) APP_LOG_FILE = os.getenv("APP_LOG_FILE", str(BASE_DIR / "app.log")) APP_LOG_LEVEL = os.getenv("APP_LOG_LEVEL", "INFO").upper() EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS = int( os.getenv("EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS", "60") ) AUTO_REPLY_LOOP_SLEEP_SECONDS = max(1, int(os.getenv("AUTO_REPLY_LOOP_SLEEP_SECONDS", "1"))) DEFAULT_REPLY_POOL_5 = [ "Спасибо за высокую оценку! Нам очень приятно.", "Благодарим за отзыв и доверие к нашему магазину!", "Спасибо! Рады, что товар вам понравился.", ] DEFAULT_REPLY_POOL_4 = [ "Спасибо за отзыв! Учтём ваши замечания и станем лучше.", "Благодарим за оценку! Работаем над тем, чтобы было на 5 звёзд.", "Спасибо за обратную связь! Нам важно ваше мнение.", ] def _setup_logger() -> logging.Logger: logger = logging.getLogger("wildberries-app") if logger.handlers: return logger level = getattr(logging, APP_LOG_LEVEL, logging.INFO) logger.setLevel(level) formatter = logging.Formatter( "%(asctime)s %(levelname)s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) file_handler = RotatingFileHandler(APP_LOG_FILE, maxBytes=5_000_000, backupCount=5) file_handler.setFormatter(formatter) stream_handler = logging.StreamHandler() stream_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(stream_handler) logger.propagate = False return logger logger = _setup_logger() def _short_text(value: Optional[str], max_len: int = API_LOG_BODY_MAX) -> str: text = value or "" if len(text) <= max_len: return text return f"{text[:max_len]}..." def _sanitize_headers(headers: dict) -> dict: safe = dict(headers) auth = safe.get("Authorization") if auth: safe["Authorization"] = f"***{auth[-6:]}" if len(auth) >= 6 else "***" return safe class FeedbackApiError(RuntimeError): """Raised when the Wildberries Feedback API returns an error.""" @dataclass class Review: id: str text: str pros: str cons: str rating: int created_at: datetime product_name: str nm_id: int answer: Optional[str] is_answered: bool user_name: str @classmethod def from_api(cls, payload: dict) -> "Review": created_at = payload.get("createdDate") created_dt = None if created_at: created_dt = datetime.fromisoformat(created_at.replace("Z", "+00:00")) product = payload.get("productDetails", {}) or {} answer_payload = payload.get("answer") if isinstance(answer_payload, dict): answer_value = answer_payload.get("text") else: answer_value = answer_payload return cls( id=payload.get("id") or "", text=payload.get("text") or "", pros=payload.get("pros") or "", cons=payload.get("cons") or "", rating=payload.get("productValuation") or 0, created_at=created_dt, product_name=product.get("productName") or "", nm_id=int(product.get("nmId") or 0), answer=answer_value, is_answered=bool(answer_value), user_name=payload.get("userName") or "", ) class Database: def __init__(self, db_path: Path) -> None: self.db_path = db_path self._ensure_db() def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") return conn def _ensure_db(self) -> None: conn = self._connect() try: conn.execute( """ CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, is_admin INTEGER DEFAULT 0, is_active INTEGER DEFAULT 0 ); """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, name TEXT NOT NULL, token TEXT NOT NULL, FOREIGN KEY(user_id) REFERENCES users(id) ); """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS auto_reply_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT NOT NULL, review_id TEXT NOT NULL, rating INTEGER NOT NULL, product_name TEXT, user_name TEXT, review_text TEXT, reply_text TEXT NOT NULL, status TEXT NOT NULL, error_text TEXT ); """ ) self._ensure_admin(conn) self._ensure_token_user_column(conn) self._ensure_auto_reply_log_review_text(conn) conn.commit() finally: conn.close() def _ensure_admin(self, conn: sqlite3.Connection) -> None: admin = conn.execute("SELECT id FROM users WHERE username = ?", ("ruslan",)).fetchone() if not admin: password_hash = generate_password_hash("utOgbZ09ruslan+") conn.execute( "INSERT INTO users(username, password_hash, is_admin, is_active) VALUES (?, ?, 1, 1)", ("ruslan", password_hash), ) def _ensure_token_user_column(self, conn: sqlite3.Connection) -> None: info = conn.execute("PRAGMA table_info(tokens)").fetchall() columns = {row["name"] for row in info} if "user_id" not in columns: conn.execute("ALTER TABLE tokens ADD COLUMN user_id INTEGER") admin = conn.execute("SELECT id FROM users WHERE username = ?", ("ruslan",)).fetchone() admin_id = admin["id"] if admin else None if admin_id: conn.execute( "UPDATE tokens SET user_id = ? WHERE user_id IS NULL", (admin_id,), ) def _ensure_auto_reply_log_review_text(self, conn: sqlite3.Connection) -> None: info = conn.execute("PRAGMA table_info(auto_reply_logs)").fetchall() columns = {row["name"] for row in info} if "review_text" not in columns: conn.execute("ALTER TABLE auto_reply_logs ADD COLUMN review_text TEXT") if "nm_id" not in columns: conn.execute("ALTER TABLE auto_reply_logs ADD COLUMN nm_id INTEGER") if "review_created_at" not in columns: conn.execute("ALTER TABLE auto_reply_logs ADD COLUMN review_created_at TEXT") # User helpers def create_user(self, username: str, password_hash: str) -> None: conn = self._connect() try: conn.execute( "INSERT INTO users(username, password_hash, is_admin, is_active) VALUES (?, ?, 0, 0)", (username.lower(), password_hash), ) conn.commit() finally: conn.close() def get_user_by_username(self, username: str) -> Optional[sqlite3.Row]: conn = self._connect() try: return conn.execute( "SELECT * FROM users WHERE username = ?", (username.lower(),) ).fetchone() finally: conn.close() def get_user_by_id(self, user_id: int) -> Optional[sqlite3.Row]: conn = self._connect() try: return conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() finally: conn.close() def list_users(self) -> List[sqlite3.Row]: conn = self._connect() try: return conn.execute("SELECT * FROM users ORDER BY username").fetchall() finally: conn.close() def set_user_active(self, user_id: int, is_active: bool) -> None: conn = self._connect() try: conn.execute("UPDATE users SET is_active = ? WHERE id = ?", (1 if is_active else 0, user_id)) conn.commit() finally: conn.close() # Token helpers def add_token(self, user_id: int, name: str, token: str) -> None: conn = self._connect() try: conn.execute( "INSERT INTO tokens(user_id, name, token) VALUES (?, ?, ?)", (user_id, name.strip(), token.strip()), ) conn.commit() finally: conn.close() def update_token(self, token_id: int, name: str, token: str) -> None: conn = self._connect() try: conn.execute( "UPDATE tokens SET name = ?, token = ? WHERE id = ?", (name.strip(), token.strip(), token_id), ) conn.commit() finally: conn.close() def fetch_tokens_for_user(self, user_id: int, is_admin: bool) -> List[sqlite3.Row]: conn = self._connect() try: if is_admin: query = """ SELECT tokens.id, tokens.name, tokens.token, tokens.user_id, users.username AS owner FROM tokens LEFT JOIN users ON users.id = tokens.user_id ORDER BY tokens.id DESC """ return conn.execute(query).fetchall() query = """ SELECT tokens.id, tokens.name, tokens.token, tokens.user_id, users.username AS owner FROM tokens LEFT JOIN users ON users.id = tokens.user_id WHERE tokens.user_id = ? ORDER BY tokens.id DESC """ return conn.execute(query, (user_id,)).fetchall() finally: conn.close() def fetch_first_token_for_user(self, user_id: int) -> Optional[sqlite3.Row]: conn = self._connect() try: return conn.execute( "SELECT * FROM tokens WHERE user_id = ? ORDER BY id DESC LIMIT 1", (user_id,), ).fetchone() finally: conn.close() def fetch_first_token_any(self) -> Optional[sqlite3.Row]: conn = self._connect() try: return conn.execute("SELECT * FROM tokens ORDER BY id DESC LIMIT 1").fetchone() finally: conn.close() def get_token(self, token_id: int) -> Optional[sqlite3.Row]: conn = self._connect() try: return conn.execute("SELECT * FROM tokens WHERE id = ?", (token_id,)).fetchone() finally: conn.close() def get_setting(self, key: str) -> Optional[str]: conn = self._connect() try: row = conn.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone() return row["value"] if row else None finally: conn.close() def set_setting(self, key: str, value: str) -> None: conn = self._connect() try: conn.execute( """ INSERT INTO settings(key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value = excluded.value """, (key, value), ) conn.commit() finally: conn.close() def add_auto_reply_log( self, *, review_id: str, rating: int, product_name: str, nm_id: int = 0, user_name: str, review_text: str, review_created_at: str = "", reply_text: str, status: str, error_text: Optional[str] = None, ) -> None: conn = self._connect() try: conn.execute( """ INSERT INTO auto_reply_logs( created_at, review_id, rating, product_name, nm_id, user_name, review_text, review_created_at, reply_text, status, error_text ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( datetime.utcnow().replace(microsecond=0).isoformat(), review_id, rating, product_name, nm_id, user_name, review_text, review_created_at, reply_text, status, error_text, ), ) conn.commit() finally: conn.close() def list_auto_reply_logs(self, limit: int = 100) -> List[sqlite3.Row]: conn = self._connect() try: return conn.execute( """ SELECT * FROM auto_reply_logs ORDER BY id DESC LIMIT ? """, (limit,), ).fetchall() finally: conn.close() class FeedbackClient: BASE_URL = "https://feedbacks-api.wildberries.ru/api/v1/feedbacks" ANSWER_URL = "https://feedbacks-api.wildberries.ru/api/v1/feedbacks/answer" def __init__(self, token: Optional[str], page_size: int = 100, timeout: int = 15) -> None: self.token = token self.page_size = page_size self.timeout = timeout self._last_request_ts = 0.0 def _get_headers(self) -> dict: if not self.token: raise FeedbackApiError( "Токен Wildberries не найден. Добавьте токен в личном кабинете." ) return {"Authorization": self.token} def _throttle(self) -> None: now = time.time() wait_for = WB_MIN_REQUEST_INTERVAL_SECONDS - (now - self._last_request_ts) if wait_for > 0: time.sleep(wait_for) self._last_request_ts = time.time() @staticmethod def _extract_retry_seconds(response: requests.Response) -> Optional[float]: for header_name in ("X-Ratelimit-Retry", "Retry-After", "X-Ratelimit-Reset"): raw = response.headers.get(header_name) if not raw: continue try: value = float(raw) except ValueError: continue if value >= 0: return value return None def _request(self, *, is_answered: bool, skip: int, take: int) -> List[dict]: _check_api_cooldown_or_raise() params = { "isAnswered": str(is_answered).lower(), "skip": skip, "take": take, } headers = self._get_headers() safe_headers = _sanitize_headers(headers) max_attempts = 4 response = None for attempt in range(1, max_attempts + 1): self._throttle() logger.info( "WB API request method=GET url=%s params=%s attempt=%s/%s headers=%s", self.BASE_URL, params, attempt, max_attempts, safe_headers, ) response = requests.get(self.BASE_URL, params=params, headers=headers, timeout=self.timeout) logger.info( "WB API response method=GET url=%s status=%s headers=%s body=%s", self.BASE_URL, response.status_code, dict(response.headers), _short_text(response.text), ) if response.ok: remaining = response.headers.get("X-Ratelimit-Remaining") reset_seconds = self._extract_retry_seconds(response) if remaining == "0": if reset_seconds is not None and reset_seconds > 0: _set_api_cooldown(reset_seconds + 30) else: _set_api_cooldown(EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS) logger.warning( "WB API remaining=0 without reset headers. Fallback cooldown=%s sec", EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS, ) break if response.status_code == 429 and attempt < max_attempts: retry_seconds = self._extract_retry_seconds(response) if retry_seconds is not None and retry_seconds > 0: _set_api_cooldown(retry_seconds + 30) if retry_seconds is not None and retry_seconds > 30: logger.warning( "WB API long rate limit on GET. retry_after_seconds=%s. Stop retries for this call.", retry_seconds, ) raise FeedbackApiError( f"Лимит WB API. Повторите позже (примерно через {int(retry_seconds)} сек)." ) delay = retry_seconds if retry_seconds is not None else float(2 ** attempt) logger.warning( "WB API rate limit on GET. sleep_seconds=%s x_ratelimit_retry=%s x_ratelimit_reset=%s", max(1.0, delay), response.headers.get("X-Ratelimit-Retry"), response.headers.get("X-Ratelimit-Reset"), ) time.sleep(max(1.0, delay)) continue if response.status_code == 429: retry_seconds = self._extract_retry_seconds(response) if retry_seconds is not None and retry_seconds > 0: _set_api_cooldown(retry_seconds + 30) raise FeedbackApiError(f"Ошибка запроса: {response.status_code} {response.text}") if response is None: raise FeedbackApiError("Не удалось получить ответ от API Wildberries.") payload = response.json() if payload.get("error"): raise FeedbackApiError(payload.get("errorText") or "Не удалось получить отзывы.") data = payload.get("data") or {} return data.get("feedbacks") or [] def _paginate(self, *, is_answered: bool, max_items: int = 500) -> List[dict]: result: List[dict] = [] skip = 0 take = self.page_size while True: try: page = self._request(is_answered=is_answered, skip=skip, take=take) except FeedbackApiError: if not result: raise logger.info("_paginate stopping early due to API error, returning %s items collected", len(result)) break if not page: break result.extend(page) if len(page) < take: break if len(result) >= max_items: break skip += len(page) return result def fetch_reviews( self, limit: int = 50, unanswered_only: bool = False, allowed_ratings: Optional[Set[int]] = None, ) -> List[Review]: if unanswered_only: raw = self._paginate(is_answered=False, max_items=max(limit * 3, 300)) reviews = [Review.from_api(item) for item in raw] else: raw_unanswered = self._paginate(is_answered=False, max_items=limit) raw_answered = self._paginate(is_answered=True, max_items=limit) raw = raw_unanswered + raw_answered raw.sort(key=lambda r: r.get("createdDate") or "", reverse=True) reviews = [Review.from_api(item) for item in raw] allowed = allowed_ratings or DEFAULT_STARS filtered = [item for item in reviews if item.rating in allowed] return filtered[:limit] def validate_token(self) -> None: """Проверяет токен, выполняя минимальный запрос к API.""" # Используем минимальный набор данных, чтобы не тратить лимиты без необходимости. self._request(is_answered=False, skip=0, take=1) def send_answer(self, review_id: str, text: str) -> None: _check_api_cooldown_or_raise() payload = {"id": review_id, "text": text} max_attempts = 4 for attempt in range(1, max_attempts + 1): self._throttle() safe_headers = _sanitize_headers(self._get_headers()) logger.info( "WB API request method=POST url=%s payload=%s attempt=%s/%s headers=%s", self.ANSWER_URL, payload, attempt, max_attempts, safe_headers, ) response = requests.post( self.ANSWER_URL, json=payload, headers=self._get_headers(), timeout=self.timeout, ) logger.info( "WB API response method=POST url=%s status=%s headers=%s body=%s", self.ANSWER_URL, response.status_code, dict(response.headers), _short_text(response.text), ) if response.ok: remaining = response.headers.get("X-Ratelimit-Remaining") reset_seconds = self._extract_retry_seconds(response) if remaining == "0": if reset_seconds is not None and reset_seconds > 0: _set_api_cooldown(reset_seconds + 30) else: _set_api_cooldown(EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS) logger.warning( "WB API remaining=0 without reset headers. Fallback cooldown=%s sec", EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS, ) return if response.status_code == 429 and attempt < max_attempts: retry_seconds = self._extract_retry_seconds(response) if retry_seconds is not None and retry_seconds > 0: _set_api_cooldown(retry_seconds + 30) if retry_seconds is not None and retry_seconds > 30: logger.warning( "WB API long rate limit on POST. retry_after_seconds=%s. Stop retries for this call.", retry_seconds, ) raise FeedbackApiError( f"Лимит WB API. Повторите позже (примерно через {int(retry_seconds)} сек)." ) delay = retry_seconds if retry_seconds is not None else float(2 ** attempt) logger.warning( "WB API rate limit on POST. sleep_seconds=%s x_ratelimit_retry=%s x_ratelimit_reset=%s", max(1.0, delay), response.headers.get("X-Ratelimit-Retry"), response.headers.get("X-Ratelimit-Reset"), ) time.sleep(max(1.0, delay)) continue if response.status_code == 429: retry_seconds = self._extract_retry_seconds(response) if retry_seconds is not None and retry_seconds > 0: _set_api_cooldown(retry_seconds + 30) raise FeedbackApiError( f"Не удалось отправить ответ: {response.status_code} {response.text}" ) def answer_many(self, review_ids: List[str], text: str) -> int: sent = 0 for review_id in review_ids: if not review_id: continue self.send_answer(review_id, text) sent += 1 return sent def _parse_pool(value: Optional[str], fallback: List[str]) -> List[str]: if not value: return fallback normalized = value.replace("\r\n", "\n").replace("||", "\n") parsed = [item.strip() for item in normalized.split("\n") if item.strip()] return parsed or fallback ENV_REPLY_POOL_5 = os.getenv("REPLY_POOL_5") ENV_REPLY_POOL_4 = os.getenv("REPLY_POOL_4") app = Flask(__name__) app.config["SECRET_KEY"] = os.getenv("FLASK_SECRET_KEY", "dev-secret") db = Database(BASE_DIR / "tokens.db") def _parse_selected_stars(raw_values: List[str]) -> List[int]: normalized: List[int] = [] for value in raw_values: try: star = int(value) except ValueError: continue if star in STAR_VALUES and star not in normalized: normalized.append(star) if not normalized: return [5] return normalized def _get_session_token() -> Tuple[Optional[int], Optional[str], Optional[str]]: user = g.get("user") if not user: return (None, None, None) token_id = session.get("token_id") token_row = None if token_id is not None: token_row = db.get_token(int(token_id)) if not token_row or ( not user["is_admin"] and token_row["user_id"] != user["id"] ): session.pop("token_id", None) token_row = None if token_row is None: token_row = db.fetch_first_token_for_user(user["id"]) if token_row is None and user["is_admin"]: token_row = db.fetch_first_token_any() if token_row is not None: session["token_id"] = token_row["id"] if token_row is None: return (None, None, None) return token_row["id"], token_row["name"], token_row["token"] def _get_active_token() -> Tuple[Optional[str], Optional[str]]: token_id, token_name, token_value = _get_session_token() if token_value: return token_value, token_name return None, None def _get_background_token() -> Optional[str]: env_token = (os.getenv("WB_API_TOKEN") or "").strip() if env_token: return env_token token_row = db.fetch_first_token_any() if token_row: return token_row["token"] return None def get_client() -> FeedbackClient: token_value, _ = _get_active_token() if not token_value: raise FeedbackApiError("Токен не задан. Добавьте его в личном кабинете.") return FeedbackClient(token_value) def is_auto_reply_enabled() -> bool: return db.get_setting(AUTO_REPLY_SETTING_KEY) == "1" def set_auto_reply_enabled(value: bool) -> None: db.set_setting(AUTO_REPLY_SETTING_KEY, "1" if value else "0") def _get_api_cooldown_seconds_left() -> int: raw = db.get_setting(API_GLOBAL_COOLDOWN_UNTIL_KEY) if not raw: return 0 try: until_ts = float(raw) except ValueError: return 0 return max(0, int(until_ts - time.time())) def _set_api_cooldown(seconds: float) -> None: seconds_int = max(0, int(seconds)) if seconds_int <= 0: return until_ts = time.time() + seconds_int db.set_setting(API_GLOBAL_COOLDOWN_UNTIL_KEY, str(until_ts)) logger.warning("Global API cooldown set for %s seconds", seconds_int) def _check_api_cooldown_or_raise() -> None: seconds_left = _get_api_cooldown_seconds_left() if seconds_left > 0: raise FeedbackApiError( f"Лимит WB API активен. Повторите позже (примерно через {seconds_left} сек)." ) def _load_auto_reply_queue() -> List[dict]: raw = db.get_setting(AUTO_REPLY_QUEUE_KEY) if not raw: return [] try: payload = json.loads(raw) except (TypeError, ValueError): return [] if not isinstance(payload, list): return [] queue = [] for item in payload: if not isinstance(item, dict): continue if not item.get("id"): continue queue.append(item) return queue def _save_auto_reply_queue(queue: List[dict]) -> None: db.set_setting(AUTO_REPLY_QUEUE_KEY, json.dumps(queue, ensure_ascii=False)) def _build_auto_reply_queue(reviews: List[Review]) -> List[dict]: queue: List[dict] = [] filter_mode = _load_filter_mode() for review in reviews: if _has_review_content(review, filter_mode): logger.info("Auto-reply skip review_id=%s reason=has_review_text", review.id) db.add_auto_reply_log( review_id=review.id, rating=review.rating, product_name=review.product_name, nm_id=review.nm_id, user_name=review.user_name, review_text=review.text, review_created_at=review.created_at.isoformat() if review.created_at else "", reply_text="", status="skipped", error_text="Пропущен: у отзыва есть текст/достоинства/недостатки.", ) continue queue.append( { "id": review.id, "rating": review.rating, "product_name": review.product_name, "nm_id": review.nm_id, "user_name": review.user_name, "review_text": review.text, "review_created_at": review.created_at.isoformat() if review.created_at else "", } ) return queue def _fetch_window_open() -> bool: raw = db.get_setting(AUTO_REPLY_LAST_FETCH_KEY) if not raw: return True try: last_fetch = float(raw) except ValueError: return True return (time.time() - last_fetch) >= AUTO_REPLY_FETCH_INTERVAL_SECONDS def _load_reply_pool(star: int) -> List[str]: if star == 5: db_value = db.get_setting(AUTO_REPLY_POOL_5_KEY) return _parse_pool(db_value or ENV_REPLY_POOL_5, DEFAULT_REPLY_POOL_5) if star == 4: db_value = db.get_setting(AUTO_REPLY_POOL_4_KEY) return _parse_pool(db_value or ENV_REPLY_POOL_4, DEFAULT_REPLY_POOL_4) if star in (1, 2, 3): return _parse_pool(db.get_setting(f"auto_reply_pool_{star}") or "", []) return [] def _load_enabled_stars() -> Set[int]: raw = db.get_setting(AUTO_REPLY_STARS_KEY) if raw: try: parsed = json.loads(raw) if isinstance(parsed, list): result = {int(s) for s in parsed if isinstance(s, (int, float, str)) and str(s).isdigit() and 1 <= int(s) <= 5} if result: return result except (TypeError, ValueError): pass return AUTO_REPLY_STARS def _load_filter_mode() -> str: raw = db.get_setting(AUTO_REPLY_FILTER_KEY) if raw in ("no_text", "empty", "all"): return raw return "no_text" def _pool_to_multiline_text(pool: List[str]) -> str: return "\n".join(pool) def _pick_auto_reply(star: int) -> Optional[str]: pool = _load_reply_pool(star) if pool: return random.choice(pool) return None def _has_review_content(review: Review, filter_mode: str = "no_text") -> bool: if filter_mode == "no_text": return bool((review.text or "").strip()) if filter_mode == "empty": return bool( (review.text or "").strip() or (review.pros or "").strip() or (review.cons or "").strip() ) # filter_mode == "all": reply to everything, treat as no content to skip return False def process_auto_replies() -> int: if _get_api_cooldown_seconds_left() > 0: logger.info( "Auto-reply skipped: global cooldown active for %s seconds", _get_api_cooldown_seconds_left(), ) return 0 token_value = _get_background_token() if not token_value: logger.info("Auto-reply skipped: background token is missing.") return 0 client = FeedbackClient(token_value) queue = _load_auto_reply_queue() if not queue: if not _fetch_window_open(): logger.info( "Auto-reply queue empty, but fetch window is closed. fetch_interval_seconds=%s", AUTO_REPLY_FETCH_INTERVAL_SECONDS, ) return 0 logger.info("Auto-reply queue is empty. Fetch new reviews once.") try: reviews = client.fetch_reviews( limit=100, unanswered_only=True, allowed_ratings=_load_enabled_stars(), ) except FeedbackApiError: db.set_setting(AUTO_REPLY_LAST_FETCH_KEY, str(time.time())) raise db.set_setting(AUTO_REPLY_LAST_FETCH_KEY, str(time.time())) queue = _build_auto_reply_queue(reviews) _save_auto_reply_queue(queue) logger.info("Auto-reply queue rebuilt size=%s", len(queue)) cooldown_after_fetch = _get_api_cooldown_seconds_left() if cooldown_after_fetch > 0: logger.info("Auto-reply stop after fetch: cooldown active for %s seconds", cooldown_after_fetch) return 0 if not queue: logger.info("Auto-reply queue has no eligible reviews.") return 0 logger.info( "Auto-reply cycle start queue_size=%s batch_limit=%s", len(queue), AUTO_REPLY_BATCH_LIMIT, ) sent = 0 processed = 0 while queue and processed < AUTO_REPLY_BATCH_LIMIT: item = queue[0] review_id = item.get("id", "") rating = int(item.get("rating") or 0) reply_text = _pick_auto_reply(rating) if not reply_text: logger.info("Auto-reply skip review_id=%s reason=no_pool_reply rating=%s", review_id, rating) queue.pop(0) processed += 1 continue try: client.send_answer(review_id, reply_text) logger.info("Auto-reply sent review_id=%s rating=%s", review_id, rating) db.add_auto_reply_log( review_id=review_id, rating=rating, product_name=item.get("product_name", ""), nm_id=int(item.get("nm_id") or 0), review_created_at=item.get("review_created_at", ""), user_name=item.get("user_name", ""), review_text=item.get("review_text", ""), reply_text=reply_text, status="sent", ) sent += 1 queue.pop(0) processed += 1 except FeedbackApiError as exc: exc_str = str(exc) is_rate_limit = ( "Лимит WB API" in exc_str or "429" in exc_str or "cooldown" in exc_str.lower() or "rate limit" in exc_str.lower() or "too many" in exc_str.lower() ) if is_rate_limit: logger.info("Auto-reply rate limit, keeping queue head review_id=%s", review_id) break logger.warning("Auto-reply failed review_id=%s rating=%s error=%s", review_id, rating, exc) db.add_auto_reply_log( review_id=review_id, rating=rating, product_name=item.get("product_name", ""), nm_id=int(item.get("nm_id") or 0), review_created_at=item.get("review_created_at", ""), user_name=item.get("user_name", ""), review_text=item.get("review_text", ""), reply_text=reply_text, status="failed", error_text=str(exc), ) queue.pop(0) processed += 1 _save_auto_reply_queue(queue) logger.info("Auto-reply cycle finished sent_count=%s", sent) return sent def _scheduler_should_run(last_run_raw: Optional[str], now_ts: float) -> bool: if not last_run_raw: return True try: last_run = float(last_run_raw) except ValueError: return True return now_ts - last_run >= AUTO_REPLY_INTERVAL_SECONDS def _next_auto_reply_meta() -> Tuple[Optional[datetime], Optional[int]]: last_run_raw = db.get_setting(AUTO_REPLY_LAST_RUN_KEY) if not last_run_raw: return None, None try: last_run = float(last_run_raw) except ValueError: return None, None next_ts = last_run + AUTO_REPLY_INTERVAL_SECONDS seconds_left = max(0, int(next_ts - time.time())) next_dt = datetime.fromtimestamp(next_ts, tz=timezone.utc).astimezone(APP_TIMEZONE) return next_dt, seconds_left def _next_fetch_seconds_left() -> int: raw = db.get_setting(AUTO_REPLY_LAST_FETCH_KEY) if not raw: return 0 try: last_fetch = float(raw) except ValueError: return 0 return max(0, int(last_fetch + AUTO_REPLY_FETCH_INTERVAL_SECONDS - time.time())) def auto_reply_loop() -> None: while True: try: if is_auto_reply_enabled(): now_ts = time.time() last_run = db.get_setting(AUTO_REPLY_LAST_RUN_KEY) if _scheduler_should_run(last_run, now_ts): logger.info("Auto-reply loop trigger start last_run=%s now_ts=%s", last_run, now_ts) db.set_setting(AUTO_REPLY_LAST_RUN_KEY, str(now_ts)) process_auto_replies() else: logger.info("Auto-reply loop skip schedule last_run=%s now_ts=%s", last_run, now_ts) else: logger.info("Auto-reply loop skip: disabled") except Exception: logger.exception("Auto-reply loop crashed") time.sleep(AUTO_REPLY_LOOP_SLEEP_SECONDS) @app.template_filter("format_datetime") def format_datetime(value: Optional[datetime]) -> str: if not value: return "" dt = value if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(APP_TIMEZONE).strftime("%d.%m.%Y %H:%M") @app.template_filter("format_log_datetime") def format_log_datetime(value: Optional[str]) -> str: if not value: return "" try: dt = datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return value if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.astimezone(APP_TIMEZONE).strftime("%d.%m.%Y %H:%M") @app.before_request def load_current_user() -> None: user_id = session.get("user_id") g.user = None if user_id is not None: user = db.get_user_by_id(int(user_id)) if user and user["is_active"]: g.user = user else: session.pop("user_id", None) session.pop("token_id", None) logger.info( "HTTP request method=%s path=%s query=%s remote=%s user_id=%s", request.method, request.path, request.query_string.decode("utf-8", errors="ignore"), request.remote_addr, g.user["id"] if g.user else None, ) @app.after_request def log_response(response): logger.info( "HTTP response method=%s path=%s status=%s length=%s", request.method, request.path, response.status_code, response.calculate_content_length(), ) return response def login_required(view): @wraps(view) def wrapped(*args, **kwargs): if g.get("user") is None: return redirect(url_for("login", next=request.path)) return view(*args, **kwargs) return wrapped def admin_required(view): @wraps(view) def wrapped(*args, **kwargs): user = g.get("user") if not user or not user["is_admin"]: return redirect(url_for("index")) return view(*args, **kwargs) return wrapped @app.route("/cabinet", methods=["GET", "POST"]) @login_required def cabinet(): error_message: Optional[str] = None success_message: Optional[str] = None status = request.args.get("status") status_map = { "added": "Токен сохранён.", "updated": "Токен обновлён.", "selected": "Активирован выбранный магазин.", "checked": "Токен успешно проверен.", } if status in status_map: success_message = status_map[status] user = g.user if request.method == "POST": action = request.form.get("cabinet_action") if action == "add": name = (request.form.get("name") or "").strip() token_value = (request.form.get("token") or "").strip() if not name or not token_value: error_message = "Введите название и токен." else: db.add_token(user_id=user["id"], name=name, token=token_value) return redirect(url_for("cabinet", status="added")) elif action == "edit": token_id = request.form.get("token_id") token_row = db.get_token(int(token_id)) if token_id else None name = (request.form.get("name") or "").strip() token_value = (request.form.get("token") or "").strip() if not token_row or not (user["is_admin"] or token_row["user_id"] == user["id"]): error_message = "Недостаточно прав для изменения токена." elif not name or not token_value: error_message = "Введите название и токен." else: db.update_token(token_id=token_row["id"], name=name, token=token_value) return redirect(url_for("cabinet", status="updated")) elif action == "select": token_id = request.form.get("token_id") if token_id: token_row = db.get_token(int(token_id)) if token_row and (user["is_admin"] or token_row["user_id"] == user["id"]): session["token_id"] = int(token_id) return redirect(url_for("cabinet", status="selected")) error_message = "Не удалось выбрать токен." elif action == "check": token_id = request.form.get("token_id") token_row = db.get_token(int(token_id)) if token_id else None if token_row and (user["is_admin"] or token_row["user_id"] == user["id"]): try: FeedbackClient(token_row["token"]).validate_token() return redirect(url_for("cabinet", status="checked")) except FeedbackApiError as exc: error_message = str(exc) else: error_message = "Недостаточно прав для проверки токена." raw_tokens = db.fetch_tokens_for_user(user["id"], bool(user["is_admin"])) tokens = [ { "id": row["id"], "name": row["name"], "token": row["token"], "owner": row["owner"], "user_id": row["user_id"], } for row in raw_tokens ] active_token_id = session.get("token_id") return render_template( "cabinet.html", tokens=tokens, active_token_id=active_token_id, error_message=error_message, success_message=success_message, current_user=user, ) @app.route("/") @login_required def index(): action = request.args.get("action") or "all" status = request.args.get("status") selected_stars_list = _parse_selected_stars(request.args.getlist("stars")) selected_stars = set(selected_stars_list) selected_stars_display = sorted(selected_stars, reverse=True) next_auto_reply_at, next_auto_reply_in_seconds = _next_auto_reply_meta() api_cooldown_seconds_left = _get_api_cooldown_seconds_left() active_token_value, active_token_name = _get_active_token() client: Optional[FeedbackClient] = None client_error: Optional[str] = None try: client = get_client() except FeedbackApiError as exc: client_error = str(exc) reviews: List[Review] = [] current_filter: Optional[str] = None error_message: Optional[str] = None success_message: Optional[str] = None if action in {"all", "unanswered"}: if client: try: if action == "all": reviews = client.fetch_reviews( limit=50, unanswered_only=False, allowed_ratings=selected_stars, ) current_filter = "all" else: reviews = client.fetch_reviews( limit=50, unanswered_only=True, allowed_ratings=selected_stars, ) current_filter = "unanswered" except FeedbackApiError as exc: error_message = str(exc) else: error_message = client_error or "Токен не задан." elif action == "clear": return redirect(url_for("index")) if status == "reply_sent": count = request.args.get("count") or "0" success_message = f"Отправлено ответов: {count}" elif status == "pools_saved": success_message = "Пулы автоответов сохранены." elif status == "settings_saved": success_message = "Настройки сохранены. Очередь будет обновлена." elif status == "reply_failed": error_text = request.args.get("error") or "Не удалось отправить ответы." error_message = error_text return render_template( "index.html", reviews=reviews, current_filter=current_filter, error_message=error_message, success_message=success_message, selected_stars=selected_stars, selected_stars_display=selected_stars_display, selected_stars_list=selected_stars_list, active_token_name=active_token_name, has_token=bool(active_token_value), current_action=action or "all", auto_reply_enabled=is_auto_reply_enabled(), auto_reply_interval_minutes=AUTO_REPLY_INTERVAL_MINUTES, auto_reply_interval_seconds=AUTO_REPLY_INTERVAL_SECONDS, next_auto_reply_at=next_auto_reply_at, next_auto_reply_in_seconds=next_auto_reply_in_seconds, api_cooldown_seconds_left=api_cooldown_seconds_left, next_fetch_seconds_left=_next_fetch_seconds_left(), enabled_stars=list(_load_enabled_stars()), filter_mode=_load_filter_mode(), reply_pools={n: _pool_to_multiline_text(_load_reply_pool(n)) for n in range(1, 6)}, auto_reply_queue=_load_auto_reply_queue(), auto_reply_logs=db.list_auto_reply_logs(limit=100), current_user=g.user, ) @app.route("/yandex_b847b9b35f967fcc.html") def yandex_verify(): return 'Verification: b847b9b35f967fcc', 200, {"Content-Type": "text/html; charset=UTF-8"} @app.route("/api/status") @login_required def api_status(): logs = db.list_auto_reply_logs(limit=1) last_id = logs[0]["id"] if logs else None cooldown = _get_api_cooldown_seconds_left() next_fetch = _next_fetch_seconds_left() queue_len = len(_load_auto_reply_queue()) return jsonify({ "last_log_id": last_id, "cooldown": cooldown, "next_fetch_seconds": next_fetch, "queue_len": queue_len, "auto_reply_enabled": is_auto_reply_enabled(), }) @app.route("/auto-reply-toggle", methods=["POST"]) @login_required def auto_reply_toggle(): enabled = request.form.get("enabled") == "1" set_auto_reply_enabled(enabled) return redirect(url_for("index")) @app.route("/auto-reply-pools", methods=["POST"]) @login_required def auto_reply_pools(): pool_5 = [item.strip() for item in request.form.getlist("pool_5_item") if item.strip()] pool_4 = [item.strip() for item in request.form.getlist("pool_4_item") if item.strip()] is_ajax = request.headers.get("X-Requested-With") == "XMLHttpRequest" if not pool_5 or not pool_4: if is_ajax: return jsonify({"ok": False, "error": "Для 5★ и 4★ укажите минимум по одному варианту ответа."}), 400 return redirect( url_for( "index", status="reply_failed", error="Для 5★ и 4★ укажите минимум по одному варианту ответа.", action="unanswered", stars=[5, 4], ) ) db.set_setting(AUTO_REPLY_POOL_5_KEY, _pool_to_multiline_text(pool_5)) db.set_setting(AUTO_REPLY_POOL_4_KEY, _pool_to_multiline_text(pool_4)) if is_ajax: return jsonify({"ok": True}) return redirect(url_for("index", status="pools_saved", action="unanswered", stars=[5, 4])) @app.route("/auto-reply-settings", methods=["POST"]) @login_required def auto_reply_settings(): if is_auto_reply_enabled(): return redirect(url_for("index")) stars = [int(s) for s in request.form.getlist("stars") if s.isdigit() and 1 <= int(s) <= 5] filter_mode = request.form.get("filter_mode", "no_text") if filter_mode not in ("no_text", "empty", "all"): filter_mode = "no_text" db.set_setting(AUTO_REPLY_STARS_KEY, json.dumps(stars)) db.set_setting(AUTO_REPLY_FILTER_KEY, filter_mode) for star in range(1, 6): items = [i.strip() for i in request.form.getlist(f"pool_{star}_item") if i.strip()] if items: db.set_setting(f"auto_reply_pool_{star}", _pool_to_multiline_text(items)) # Clear queue and reset fetch window so next cycle rebuilds immediately db.set_setting("auto_reply_queue_json", "[]") db.set_setting(AUTO_REPLY_LAST_FETCH_KEY, "0") return redirect(url_for("index", status="settings_saved")) @app.route("/reply", methods=["POST"]) @login_required def reply_all(): message = (request.form.get("message") or "").strip() review_ids = request.form.getlist("review_id") stars_from_form = request.form.getlist("stars") selected_star_values = _parse_selected_stars(stars_from_form) redirect_params: dict = {"action": "unanswered", "stars": selected_star_values} if not message: return redirect( url_for("index", status="reply_failed", error="Введите текст ответа.", **redirect_params) ) if len(message) < 2 or len(message) > 5000: return redirect( url_for( "index", status="reply_failed", error="Ответ должен содержать от 2 до 5000 символов.", **redirect_params, ) ) deduped_ids = [] for review_id in review_ids: if review_id and review_id not in deduped_ids: deduped_ids.append(review_id) if not deduped_ids: return redirect( url_for( "index", status="reply_failed", error="Нет отзывов для ответа.", **redirect_params, ) ) try: client = get_client() sent = client.answer_many(deduped_ids, message) except FeedbackApiError as exc: return redirect(url_for("index", status="reply_failed", error=str(exc), **redirect_params)) return redirect(url_for("index", status="reply_sent", count=sent, **redirect_params)) @app.route("/reply/", methods=["POST"]) @login_required def reply_single(review_id: str): message = (request.form.get("message") or "").strip() stars_from_form = request.form.getlist("stars") next_action = request.form.get("next_action") or "all" redirect_params: dict = {"action": next_action, "stars": _parse_selected_stars(stars_from_form)} if not message: return redirect( url_for("index", status="reply_failed", error="Введите текст ответа.", **redirect_params) ) if len(message) < 2 or len(message) > 5000: return redirect( url_for( "index", status="reply_failed", error="Ответ должен содержать от 2 до 5000 символов.", **redirect_params, ) ) try: client = get_client() client.send_answer(review_id, message) except FeedbackApiError as exc: return redirect(url_for("index", status="reply_failed", error=str(exc), **redirect_params)) return redirect(url_for("index", status="reply_sent", count=1, **redirect_params)) @app.route("/login", methods=["GET", "POST"]) def login(): if g.get("user"): return redirect(url_for("index")) error_message: Optional[str] = None if request.method == "POST": username = (request.form.get("username") or "").strip().lower() password = request.form.get("password") or "" user = db.get_user_by_username(username) if not user or not check_password_hash(user["password_hash"], password): error_message = "Неверный логин или пароль." elif not user["is_active"]: error_message = "Аккаунт ожидает подтверждения администратора." else: session.clear() session["user_id"] = user["id"] return redirect(request.args.get("next") or url_for("index")) return render_template("login.html", error_message=error_message) @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) @app.route("/register", methods=["GET", "POST"]) def register(): if g.get("user"): return redirect(url_for("index")) error_message: Optional[str] = None success_message: Optional[str] = None if request.method == "POST": username = (request.form.get("username") or "").strip().lower() password = request.form.get("password") or "" confirm = request.form.get("confirm") or "" if not username or not password: error_message = "Введите логин и пароль." elif password != confirm: error_message = "Пароли не совпадают." elif username == "ruslan": error_message = "Нельзя использовать зарезервированный логин." else: try: password_hash = generate_password_hash(password) db.create_user(username, password_hash) success_message = "Заявка создана. Дождитесь подтверждения администратора." except sqlite3.IntegrityError: error_message = "Такой пользователь уже существует." return render_template("register.html", error_message=error_message, success_message=success_message) @app.route("/admin", methods=["GET", "POST"]) @admin_required def admin_panel(): message: Optional[str] = None if request.args.get("status") == "updated": message = "Статус пользователя обновлён." if request.method == "POST": user_id = request.form.get("user_id") action = request.form.get("admin_action") if user_id and action in {"activate", "deactivate"}: db.set_user_active(int(user_id), action == "activate") return redirect(url_for("admin_panel", status="updated")) users = db.list_users() return render_template("admin.html", users=users, info_message=message, current_user=g.user) if __name__ == "__main__": logger.info( "App start debug=%s port=%s req_per_sec=%s batch_limit=%s interval_min=%s timezone=%s log_file=%s", True, os.getenv("FLASK_PORT", "5000"), WB_REQUESTS_PER_SECOND, AUTO_REPLY_BATCH_LIMIT, AUTO_REPLY_INTERVAL_MINUTES, APP_TIMEZONE, APP_LOG_FILE, ) is_reloader_process = os.environ.get("WERKZEUG_RUN_MAIN") == "true" if is_reloader_process or not app.debug: threading.Thread(target=auto_reply_loop, daemon=True).start() port = int(os.getenv("FLASK_PORT", "5000")) app.run(host="0.0.0.0", port=port, debug=True)