Files
wildberries/app.py
T
ruslan de278227ac Store picker in topbar, redesign cabinet, rename to WBfeed.ru
- index.html: store dropdown in topbar (switch without leaving page)
- cabinet.html: store-cards with Активировать/Проверить, edit hidden in <details>
- Removed 'Используется' button, active shown as green card + label
- next=index param to return to main page after store switch
- Brand name changed to WBfeed.ru across all templates

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-15 21:30:03 +03:00

1515 lines
56 KiB
Python

import logging
import json
import os
import random
import sqlite3
import threading
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import wraps
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import List, Optional, Set, Tuple
from zoneinfo import ZoneInfo
import requests
from dotenv import load_dotenv
from flask import Flask, g, jsonify, redirect, render_template, request, session, url_for
from werkzeug.security import check_password_hash, generate_password_hash
BASE_DIR = Path(__file__).resolve().parent
load_dotenv(BASE_DIR / '.env')
STAR_VALUES = [5, 4, 3, 2, 1]
DEFAULT_STARS = {5}
AUTO_REPLY_STARS = {5, 4}
AUTO_REPLY_SETTING_KEY = "auto_reply_enabled"
AUTO_REPLY_LAST_RUN_KEY = "auto_reply_last_run_ts"
AUTO_REPLY_POOL_5_KEY = "auto_reply_pool_5"
AUTO_REPLY_POOL_4_KEY = "auto_reply_pool_4"
AUTO_REPLY_POOL_3_KEY = "auto_reply_pool_3"
AUTO_REPLY_POOL_2_KEY = "auto_reply_pool_2"
AUTO_REPLY_POOL_1_KEY = "auto_reply_pool_1"
AUTO_REPLY_STARS_KEY = "auto_reply_stars_json"
AUTO_REPLY_FILTER_KEY = "auto_reply_filter"
AUTO_REPLY_QUEUE_KEY = "auto_reply_queue_json"
AUTO_REPLY_LAST_FETCH_KEY = "auto_reply_last_fetch_ts"
AUTO_REPLY_INTERVAL_MINUTES = int(os.getenv("AUTO_REPLY_INTERVAL_MINUTES", "10"))
AUTO_REPLY_INTERVAL_SECONDS = int(
os.getenv("AUTO_REPLY_INTERVAL_SECONDS", str(AUTO_REPLY_INTERVAL_MINUTES * 60))
)
AUTO_REPLY_FETCH_INTERVAL_SECONDS = int(
os.getenv("AUTO_REPLY_FETCH_INTERVAL_SECONDS", "300")
)
AUTO_REPLY_BATCH_LIMIT = max(1, int(os.getenv("AUTO_REPLY_BATCH_LIMIT", "25")))
API_GLOBAL_COOLDOWN_UNTIL_KEY = "api_global_cooldown_until_ts"
WB_REQUESTS_PER_SECOND = float(os.getenv("WB_REQUESTS_PER_SECOND", "0.8"))
WB_MIN_REQUEST_INTERVAL_SECONDS = max(1.1, 1.0 / max(0.1, WB_REQUESTS_PER_SECOND))
APP_TIMEZONE = ZoneInfo(os.getenv("APP_TIMEZONE", "Europe/Moscow"))
API_LOG_BODY_MAX = max(200, int(os.getenv("API_LOG_BODY_MAX", "1200")))
APP_LOG_FILE = os.getenv("APP_LOG_FILE", str(BASE_DIR / "app.log"))
APP_LOG_LEVEL = os.getenv("APP_LOG_LEVEL", "INFO").upper()
EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS = int(
os.getenv("EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS", "60")
)
AUTO_REPLY_LOOP_SLEEP_SECONDS = max(1, int(os.getenv("AUTO_REPLY_LOOP_SLEEP_SECONDS", "1")))
DEFAULT_REPLY_POOL_5 = [
"Спасибо за высокую оценку! Нам очень приятно.",
"Благодарим за отзыв и доверие к нашему магазину!",
"Спасибо! Рады, что товар вам понравился.",
]
DEFAULT_REPLY_POOL_4 = [
"Спасибо за отзыв! Учтём ваши замечания и станем лучше.",
"Благодарим за оценку! Работаем над тем, чтобы было на 5 звёзд.",
"Спасибо за обратную связь! Нам важно ваше мнение.",
]
def _setup_logger() -> logging.Logger:
logger = logging.getLogger("wildberries-app")
if logger.handlers:
return logger
level = getattr(logging, APP_LOG_LEVEL, logging.INFO)
logger.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s %(levelname)s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler = RotatingFileHandler(APP_LOG_FILE, maxBytes=5_000_000, backupCount=5)
file_handler.setFormatter(formatter)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
logger.propagate = False
return logger
logger = _setup_logger()
def _short_text(value: Optional[str], max_len: int = API_LOG_BODY_MAX) -> str:
text = value or ""
if len(text) <= max_len:
return text
return f"{text[:max_len]}...<trimmed {len(text) - max_len} chars>"
def _sanitize_headers(headers: dict) -> dict:
safe = dict(headers)
auth = safe.get("Authorization")
if auth:
safe["Authorization"] = f"***{auth[-6:]}" if len(auth) >= 6 else "***"
return safe
class FeedbackApiError(RuntimeError):
"""Raised when the Wildberries Feedback API returns an error."""
@dataclass
class Review:
id: str
text: str
pros: str
cons: str
rating: int
created_at: datetime
product_name: str
nm_id: int
answer: Optional[str]
is_answered: bool
user_name: str
@classmethod
def from_api(cls, payload: dict) -> "Review":
created_at = payload.get("createdDate")
created_dt = None
if created_at:
created_dt = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
product = payload.get("productDetails", {}) or {}
answer_payload = payload.get("answer")
if isinstance(answer_payload, dict):
answer_value = answer_payload.get("text")
else:
answer_value = answer_payload
return cls(
id=payload.get("id") or "",
text=payload.get("text") or "",
pros=payload.get("pros") or "",
cons=payload.get("cons") or "",
rating=payload.get("productValuation") or 0,
created_at=created_dt,
product_name=product.get("productName") or "",
nm_id=int(product.get("nmId") or 0),
answer=answer_value,
is_answered=bool(answer_value),
user_name=payload.get("userName") or "",
)
class Database:
def __init__(self, db_path: Path) -> None:
self.db_path = db_path
self._ensure_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def _ensure_db(self) -> None:
conn = self._connect()
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
is_admin INTEGER DEFAULT 0,
is_active INTEGER DEFAULT 0
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
name TEXT NOT NULL,
token TEXT NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id)
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS settings (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS auto_reply_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
review_id TEXT NOT NULL,
rating INTEGER NOT NULL,
product_name TEXT,
user_name TEXT,
review_text TEXT,
reply_text TEXT NOT NULL,
status TEXT NOT NULL,
error_text TEXT
);
"""
)
self._ensure_admin(conn)
self._ensure_token_user_column(conn)
self._ensure_auto_reply_log_review_text(conn)
conn.commit()
finally:
conn.close()
def _ensure_admin(self, conn: sqlite3.Connection) -> None:
admin = conn.execute("SELECT id FROM users WHERE username = ?", ("ruslan",)).fetchone()
if not admin:
password_hash = generate_password_hash("utOgbZ09ruslan+")
conn.execute(
"INSERT INTO users(username, password_hash, is_admin, is_active) VALUES (?, ?, 1, 1)",
("ruslan", password_hash),
)
def _ensure_token_user_column(self, conn: sqlite3.Connection) -> None:
info = conn.execute("PRAGMA table_info(tokens)").fetchall()
columns = {row["name"] for row in info}
if "user_id" not in columns:
conn.execute("ALTER TABLE tokens ADD COLUMN user_id INTEGER")
admin = conn.execute("SELECT id FROM users WHERE username = ?", ("ruslan",)).fetchone()
admin_id = admin["id"] if admin else None
if admin_id:
conn.execute(
"UPDATE tokens SET user_id = ? WHERE user_id IS NULL",
(admin_id,),
)
def _ensure_auto_reply_log_review_text(self, conn: sqlite3.Connection) -> None:
info = conn.execute("PRAGMA table_info(auto_reply_logs)").fetchall()
columns = {row["name"] for row in info}
if "review_text" not in columns:
conn.execute("ALTER TABLE auto_reply_logs ADD COLUMN review_text TEXT")
if "nm_id" not in columns:
conn.execute("ALTER TABLE auto_reply_logs ADD COLUMN nm_id INTEGER")
if "review_created_at" not in columns:
conn.execute("ALTER TABLE auto_reply_logs ADD COLUMN review_created_at TEXT")
# User helpers
def create_user(self, username: str, password_hash: str) -> None:
conn = self._connect()
try:
conn.execute(
"INSERT INTO users(username, password_hash, is_admin, is_active) VALUES (?, ?, 0, 0)",
(username.lower(), password_hash),
)
conn.commit()
finally:
conn.close()
def get_user_by_username(self, username: str) -> Optional[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute(
"SELECT * FROM users WHERE username = ?", (username.lower(),)
).fetchone()
finally:
conn.close()
def get_user_by_id(self, user_id: int) -> Optional[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
finally:
conn.close()
def list_users(self) -> List[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute("SELECT * FROM users ORDER BY username").fetchall()
finally:
conn.close()
def set_user_active(self, user_id: int, is_active: bool) -> None:
conn = self._connect()
try:
conn.execute("UPDATE users SET is_active = ? WHERE id = ?", (1 if is_active else 0, user_id))
conn.commit()
finally:
conn.close()
# Token helpers
def add_token(self, user_id: int, name: str, token: str) -> None:
conn = self._connect()
try:
conn.execute(
"INSERT INTO tokens(user_id, name, token) VALUES (?, ?, ?)",
(user_id, name.strip(), token.strip()),
)
conn.commit()
finally:
conn.close()
def update_token(self, token_id: int, name: str, token: str) -> None:
conn = self._connect()
try:
conn.execute(
"UPDATE tokens SET name = ?, token = ? WHERE id = ?",
(name.strip(), token.strip(), token_id),
)
conn.commit()
finally:
conn.close()
def fetch_tokens_for_user(self, user_id: int, is_admin: bool) -> List[sqlite3.Row]:
conn = self._connect()
try:
if is_admin:
query = """
SELECT tokens.id, tokens.name, tokens.token, tokens.user_id, users.username AS owner
FROM tokens
LEFT JOIN users ON users.id = tokens.user_id
ORDER BY tokens.id DESC
"""
return conn.execute(query).fetchall()
query = """
SELECT tokens.id, tokens.name, tokens.token, tokens.user_id, users.username AS owner
FROM tokens
LEFT JOIN users ON users.id = tokens.user_id
WHERE tokens.user_id = ?
ORDER BY tokens.id DESC
"""
return conn.execute(query, (user_id,)).fetchall()
finally:
conn.close()
def fetch_first_token_for_user(self, user_id: int) -> Optional[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute(
"SELECT * FROM tokens WHERE user_id = ? ORDER BY id DESC LIMIT 1",
(user_id,),
).fetchone()
finally:
conn.close()
def fetch_first_token_any(self) -> Optional[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute("SELECT * FROM tokens ORDER BY id DESC LIMIT 1").fetchone()
finally:
conn.close()
def get_token(self, token_id: int) -> Optional[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute("SELECT * FROM tokens WHERE id = ?", (token_id,)).fetchone()
finally:
conn.close()
def get_setting(self, key: str) -> Optional[str]:
conn = self._connect()
try:
row = conn.execute("SELECT value FROM settings WHERE key = ?", (key,)).fetchone()
return row["value"] if row else None
finally:
conn.close()
def set_setting(self, key: str, value: str) -> None:
conn = self._connect()
try:
conn.execute(
"""
INSERT INTO settings(key, value) VALUES (?, ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value
""",
(key, value),
)
conn.commit()
finally:
conn.close()
def add_auto_reply_log(
self,
*,
review_id: str,
rating: int,
product_name: str,
nm_id: int = 0,
user_name: str,
review_text: str,
review_created_at: str = "",
reply_text: str,
status: str,
error_text: Optional[str] = None,
) -> None:
conn = self._connect()
try:
conn.execute(
"""
INSERT INTO auto_reply_logs(
created_at, review_id, rating, product_name, nm_id, user_name, review_text, review_created_at, reply_text, status, error_text
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
datetime.utcnow().replace(microsecond=0).isoformat(),
review_id,
rating,
product_name,
nm_id,
user_name,
review_text,
review_created_at,
reply_text,
status,
error_text,
),
)
conn.commit()
finally:
conn.close()
def list_auto_reply_logs(self, limit: int = 100) -> List[sqlite3.Row]:
conn = self._connect()
try:
return conn.execute(
"""
SELECT * FROM auto_reply_logs
WHERE status != 'skipped'
ORDER BY id DESC
LIMIT ?
""",
(limit,),
).fetchall()
finally:
conn.close()
class FeedbackClient:
BASE_URL = "https://feedbacks-api.wildberries.ru/api/v1/feedbacks"
ANSWER_URL = "https://feedbacks-api.wildberries.ru/api/v1/feedbacks/answer"
def __init__(self, token: Optional[str], page_size: int = 100, timeout: int = 15) -> None:
self.token = token
self.page_size = page_size
self.timeout = timeout
self._last_request_ts = 0.0
def _get_headers(self) -> dict:
if not self.token:
raise FeedbackApiError(
"Токен Wildberries не найден. Добавьте токен в личном кабинете."
)
return {"Authorization": self.token}
def _throttle(self) -> None:
now = time.time()
wait_for = WB_MIN_REQUEST_INTERVAL_SECONDS - (now - self._last_request_ts)
if wait_for > 0:
time.sleep(wait_for)
self._last_request_ts = time.time()
@staticmethod
def _extract_retry_seconds(response: requests.Response) -> Optional[float]:
for header_name in ("X-Ratelimit-Retry", "Retry-After", "X-Ratelimit-Reset"):
raw = response.headers.get(header_name)
if not raw:
continue
try:
value = float(raw)
except ValueError:
continue
if value >= 0:
return value
return None
def _request(self, *, is_answered: bool, skip: int, take: int) -> List[dict]:
_check_api_cooldown_or_raise()
params = {
"isAnswered": str(is_answered).lower(),
"skip": skip,
"take": take,
}
headers = self._get_headers()
safe_headers = _sanitize_headers(headers)
max_attempts = 4
response = None
for attempt in range(1, max_attempts + 1):
self._throttle()
logger.info(
"WB API request method=GET url=%s params=%s attempt=%s/%s headers=%s",
self.BASE_URL,
params,
attempt,
max_attempts,
safe_headers,
)
response = requests.get(self.BASE_URL, params=params, headers=headers, timeout=self.timeout)
logger.info(
"WB API response method=GET url=%s status=%s headers=%s body=%s",
self.BASE_URL,
response.status_code,
dict(response.headers),
_short_text(response.text),
)
if response.ok:
remaining = response.headers.get("X-Ratelimit-Remaining")
reset_seconds = self._extract_retry_seconds(response)
if remaining == "0":
if reset_seconds is not None and reset_seconds > 0:
_set_api_cooldown(reset_seconds + 30)
else:
_set_api_cooldown(EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS)
logger.warning(
"WB API remaining=0 without reset headers. Fallback cooldown=%s sec",
EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS,
)
break
if response.status_code == 429 and attempt < max_attempts:
retry_seconds = self._extract_retry_seconds(response)
if retry_seconds is not None and retry_seconds > 0:
_set_api_cooldown(retry_seconds + 30)
if retry_seconds is not None and retry_seconds > 30:
logger.warning(
"WB API long rate limit on GET. retry_after_seconds=%s. Stop retries for this call.",
retry_seconds,
)
raise FeedbackApiError(
f"Лимит WB API. Повторите позже (примерно через {int(retry_seconds)} сек)."
)
delay = retry_seconds if retry_seconds is not None else float(2 ** attempt)
logger.warning(
"WB API rate limit on GET. sleep_seconds=%s x_ratelimit_retry=%s x_ratelimit_reset=%s",
max(1.0, delay),
response.headers.get("X-Ratelimit-Retry"),
response.headers.get("X-Ratelimit-Reset"),
)
time.sleep(max(1.0, delay))
continue
if response.status_code == 429:
retry_seconds = self._extract_retry_seconds(response)
if retry_seconds is not None and retry_seconds > 0:
_set_api_cooldown(retry_seconds + 30)
raise FeedbackApiError(f"Ошибка запроса: {response.status_code} {response.text}")
if response is None:
raise FeedbackApiError("Не удалось получить ответ от API Wildberries.")
payload = response.json()
if payload.get("error"):
raise FeedbackApiError(payload.get("errorText") or "Не удалось получить отзывы.")
data = payload.get("data") or {}
return data.get("feedbacks") or []
def _paginate(self, *, is_answered: bool, max_items: int = 500) -> List[dict]:
result: List[dict] = []
skip = 0
take = self.page_size
while True:
try:
page = self._request(is_answered=is_answered, skip=skip, take=take)
except FeedbackApiError:
if not result:
raise
logger.info("_paginate stopping early due to API error, returning %s items collected", len(result))
break
if not page:
break
result.extend(page)
if len(page) < take:
break
if len(result) >= max_items:
break
skip += len(page)
return result
def fetch_reviews(
self,
limit: int = 50,
unanswered_only: bool = False,
allowed_ratings: Optional[Set[int]] = None,
) -> List[Review]:
if unanswered_only:
raw = self._paginate(is_answered=False, max_items=max(limit * 3, 300))
reviews = [Review.from_api(item) for item in raw]
else:
raw_unanswered = self._paginate(is_answered=False, max_items=limit)
raw_answered = self._paginate(is_answered=True, max_items=limit)
raw = raw_unanswered + raw_answered
raw.sort(key=lambda r: r.get("createdDate") or "", reverse=True)
reviews = [Review.from_api(item) for item in raw]
allowed = allowed_ratings or DEFAULT_STARS
filtered = [item for item in reviews if item.rating in allowed]
return filtered[:limit]
def validate_token(self) -> None:
"""Проверяет токен, выполняя минимальный запрос к API."""
# Используем минимальный набор данных, чтобы не тратить лимиты без необходимости.
self._request(is_answered=False, skip=0, take=1)
def send_answer(self, review_id: str, text: str) -> None:
_check_api_cooldown_or_raise()
payload = {"id": review_id, "text": text}
max_attempts = 4
for attempt in range(1, max_attempts + 1):
self._throttle()
safe_headers = _sanitize_headers(self._get_headers())
logger.info(
"WB API request method=POST url=%s payload=%s attempt=%s/%s headers=%s",
self.ANSWER_URL,
payload,
attempt,
max_attempts,
safe_headers,
)
response = requests.post(
self.ANSWER_URL,
json=payload,
headers=self._get_headers(),
timeout=self.timeout,
)
logger.info(
"WB API response method=POST url=%s status=%s headers=%s body=%s",
self.ANSWER_URL,
response.status_code,
dict(response.headers),
_short_text(response.text),
)
if response.ok:
remaining = response.headers.get("X-Ratelimit-Remaining")
reset_seconds = self._extract_retry_seconds(response)
if remaining == "0":
if reset_seconds is not None and reset_seconds > 0:
_set_api_cooldown(reset_seconds + 30)
else:
_set_api_cooldown(EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS)
logger.warning(
"WB API remaining=0 without reset headers. Fallback cooldown=%s sec",
EMPTY_REMAINING_FALLBACK_COOLDOWN_SECONDS,
)
return
if response.status_code == 429 and attempt < max_attempts:
retry_seconds = self._extract_retry_seconds(response)
if retry_seconds is not None and retry_seconds > 0:
_set_api_cooldown(retry_seconds + 30)
if retry_seconds is not None and retry_seconds > 30:
logger.warning(
"WB API long rate limit on POST. retry_after_seconds=%s. Stop retries for this call.",
retry_seconds,
)
raise FeedbackApiError(
f"Лимит WB API. Повторите позже (примерно через {int(retry_seconds)} сек)."
)
delay = retry_seconds if retry_seconds is not None else float(2 ** attempt)
logger.warning(
"WB API rate limit on POST. sleep_seconds=%s x_ratelimit_retry=%s x_ratelimit_reset=%s",
max(1.0, delay),
response.headers.get("X-Ratelimit-Retry"),
response.headers.get("X-Ratelimit-Reset"),
)
time.sleep(max(1.0, delay))
continue
if response.status_code == 429:
retry_seconds = self._extract_retry_seconds(response)
if retry_seconds is not None and retry_seconds > 0:
_set_api_cooldown(retry_seconds + 30)
raise FeedbackApiError(
f"Не удалось отправить ответ: {response.status_code} {response.text}"
)
def answer_many(self, review_ids: List[str], text: str) -> int:
sent = 0
for review_id in review_ids:
if not review_id:
continue
self.send_answer(review_id, text)
sent += 1
return sent
def _parse_pool(value: Optional[str], fallback: List[str]) -> List[str]:
if not value:
return fallback
normalized = value.replace("\r\n", "\n").replace("||", "\n")
parsed = [item.strip() for item in normalized.split("\n") if item.strip()]
return parsed or fallback
ENV_REPLY_POOL_5 = os.getenv("REPLY_POOL_5")
ENV_REPLY_POOL_4 = os.getenv("REPLY_POOL_4")
app = Flask(__name__)
app.config["SECRET_KEY"] = os.getenv("FLASK_SECRET_KEY", "dev-secret")
db = Database(BASE_DIR / "tokens.db")
def _parse_selected_stars(raw_values: List[str]) -> List[int]:
normalized: List[int] = []
for value in raw_values:
try:
star = int(value)
except ValueError:
continue
if star in STAR_VALUES and star not in normalized:
normalized.append(star)
if not normalized:
return [5]
return normalized
def _get_session_token() -> Tuple[Optional[int], Optional[str], Optional[str]]:
user = g.get("user")
if not user:
return (None, None, None)
token_id = session.get("token_id")
token_row = None
if token_id is not None:
token_row = db.get_token(int(token_id))
if not token_row or (
not user["is_admin"] and token_row["user_id"] != user["id"]
):
session.pop("token_id", None)
token_row = None
if token_row is None:
token_row = db.fetch_first_token_for_user(user["id"])
if token_row is None and user["is_admin"]:
token_row = db.fetch_first_token_any()
if token_row is not None:
session["token_id"] = token_row["id"]
if token_row is None:
return (None, None, None)
return token_row["id"], token_row["name"], token_row["token"]
def _get_active_token() -> Tuple[Optional[str], Optional[str]]:
token_id, token_name, token_value = _get_session_token()
if token_value:
return token_value, token_name
return None, None
def _get_background_token() -> Optional[str]:
env_token = (os.getenv("WB_API_TOKEN") or "").strip()
if env_token:
return env_token
token_row = db.fetch_first_token_any()
if token_row:
return token_row["token"]
return None
def get_client() -> FeedbackClient:
token_value, _ = _get_active_token()
if not token_value:
raise FeedbackApiError("Токен не задан. Добавьте его в личном кабинете.")
return FeedbackClient(token_value)
def is_auto_reply_enabled() -> bool:
return db.get_setting(AUTO_REPLY_SETTING_KEY) == "1"
def set_auto_reply_enabled(value: bool) -> None:
db.set_setting(AUTO_REPLY_SETTING_KEY, "1" if value else "0")
def _get_api_cooldown_seconds_left() -> int:
raw = db.get_setting(API_GLOBAL_COOLDOWN_UNTIL_KEY)
if not raw:
return 0
try:
until_ts = float(raw)
except ValueError:
return 0
return max(0, int(until_ts - time.time()))
def _set_api_cooldown(seconds: float) -> None:
seconds_int = max(0, int(seconds))
if seconds_int <= 0:
return
until_ts = time.time() + seconds_int
db.set_setting(API_GLOBAL_COOLDOWN_UNTIL_KEY, str(until_ts))
logger.warning("Global API cooldown set for %s seconds", seconds_int)
def _check_api_cooldown_or_raise() -> None:
seconds_left = _get_api_cooldown_seconds_left()
if seconds_left > 0:
raise FeedbackApiError(
f"Лимит WB API активен. Повторите позже (примерно через {seconds_left} сек)."
)
def _load_auto_reply_queue() -> List[dict]:
raw = db.get_setting(AUTO_REPLY_QUEUE_KEY)
if not raw:
return []
try:
payload = json.loads(raw)
except (TypeError, ValueError):
return []
if not isinstance(payload, list):
return []
queue = []
for item in payload:
if not isinstance(item, dict):
continue
if not item.get("id"):
continue
queue.append(item)
return queue
def _save_auto_reply_queue(queue: List[dict]) -> None:
db.set_setting(AUTO_REPLY_QUEUE_KEY, json.dumps(queue, ensure_ascii=False))
def _build_auto_reply_queue(reviews: List[Review]) -> List[dict]:
queue: List[dict] = []
filter_mode = _load_filter_mode()
for review in reviews:
if _has_review_content(review, filter_mode):
logger.info("Auto-reply skip review_id=%s reason=has_review_text", review.id)
db.add_auto_reply_log(
review_id=review.id,
rating=review.rating,
product_name=review.product_name,
nm_id=review.nm_id,
user_name=review.user_name,
review_text=review.text,
review_created_at=review.created_at.isoformat() if review.created_at else "",
reply_text="",
status="skipped",
error_text="Пропущен: у отзыва есть текст/достоинства/недостатки.",
)
continue
queue.append(
{
"id": review.id,
"rating": review.rating,
"product_name": review.product_name,
"nm_id": review.nm_id,
"user_name": review.user_name,
"review_text": review.text,
"review_created_at": review.created_at.isoformat() if review.created_at else "",
}
)
return queue
def _fetch_window_open() -> bool:
raw = db.get_setting(AUTO_REPLY_LAST_FETCH_KEY)
if not raw:
return True
try:
last_fetch = float(raw)
except ValueError:
return True
return (time.time() - last_fetch) >= AUTO_REPLY_FETCH_INTERVAL_SECONDS
def _load_reply_pool(star: int) -> List[str]:
if star == 5:
db_value = db.get_setting(AUTO_REPLY_POOL_5_KEY)
return _parse_pool(db_value or ENV_REPLY_POOL_5, DEFAULT_REPLY_POOL_5)
if star == 4:
db_value = db.get_setting(AUTO_REPLY_POOL_4_KEY)
return _parse_pool(db_value or ENV_REPLY_POOL_4, DEFAULT_REPLY_POOL_4)
if star in (1, 2, 3):
return _parse_pool(db.get_setting(f"auto_reply_pool_{star}") or "", [])
return []
def _load_enabled_stars() -> Set[int]:
raw = db.get_setting(AUTO_REPLY_STARS_KEY)
if raw:
try:
parsed = json.loads(raw)
if isinstance(parsed, list):
result = {int(s) for s in parsed if isinstance(s, (int, float, str)) and str(s).isdigit() and 1 <= int(s) <= 5}
if result:
return result
except (TypeError, ValueError):
pass
return AUTO_REPLY_STARS
def _load_filter_mode() -> str:
raw = db.get_setting(AUTO_REPLY_FILTER_KEY)
if raw in ("no_text", "empty", "all"):
return raw
return "no_text"
def _pool_to_multiline_text(pool: List[str]) -> str:
return "\n".join(pool)
def _pick_auto_reply(star: int) -> Optional[str]:
pool = _load_reply_pool(star)
if pool:
return random.choice(pool)
return None
def _has_review_content(review: Review, filter_mode: str = "no_text") -> bool:
if filter_mode == "no_text":
return bool((review.text or "").strip())
if filter_mode == "empty":
return bool(
(review.text or "").strip()
or (review.pros or "").strip()
or (review.cons or "").strip()
)
# filter_mode == "all": reply to everything, treat as no content to skip
return False
def process_auto_replies() -> int:
if _get_api_cooldown_seconds_left() > 0:
logger.info(
"Auto-reply skipped: global cooldown active for %s seconds",
_get_api_cooldown_seconds_left(),
)
return 0
token_value = _get_background_token()
if not token_value:
logger.info("Auto-reply skipped: background token is missing.")
return 0
client = FeedbackClient(token_value)
queue = _load_auto_reply_queue()
if not queue:
if not _fetch_window_open():
logger.info(
"Auto-reply queue empty, but fetch window is closed. fetch_interval_seconds=%s",
AUTO_REPLY_FETCH_INTERVAL_SECONDS,
)
return 0
logger.info("Auto-reply queue is empty. Fetch new reviews once.")
try:
reviews = client.fetch_reviews(
limit=100,
unanswered_only=True,
allowed_ratings=_load_enabled_stars(),
)
except FeedbackApiError:
db.set_setting(AUTO_REPLY_LAST_FETCH_KEY, str(time.time()))
raise
db.set_setting(AUTO_REPLY_LAST_FETCH_KEY, str(time.time()))
queue = _build_auto_reply_queue(reviews)
_save_auto_reply_queue(queue)
logger.info("Auto-reply queue rebuilt size=%s", len(queue))
cooldown_after_fetch = _get_api_cooldown_seconds_left()
if cooldown_after_fetch > 0:
logger.info("Auto-reply stop after fetch: cooldown active for %s seconds", cooldown_after_fetch)
return 0
if not queue:
logger.info("Auto-reply queue has no eligible reviews.")
return 0
logger.info(
"Auto-reply cycle start queue_size=%s batch_limit=%s",
len(queue),
AUTO_REPLY_BATCH_LIMIT,
)
sent = 0
processed = 0
while queue and processed < AUTO_REPLY_BATCH_LIMIT:
item = queue[0]
review_id = item.get("id", "")
rating = int(item.get("rating") or 0)
reply_text = _pick_auto_reply(rating)
if not reply_text:
logger.info("Auto-reply skip review_id=%s reason=no_pool_reply rating=%s", review_id, rating)
queue.pop(0)
processed += 1
continue
try:
client.send_answer(review_id, reply_text)
logger.info("Auto-reply sent review_id=%s rating=%s", review_id, rating)
db.add_auto_reply_log(
review_id=review_id,
rating=rating,
product_name=item.get("product_name", ""),
nm_id=int(item.get("nm_id") or 0),
review_created_at=item.get("review_created_at", ""),
user_name=item.get("user_name", ""),
review_text=item.get("review_text", ""),
reply_text=reply_text,
status="sent",
)
sent += 1
queue.pop(0)
processed += 1
except FeedbackApiError as exc:
exc_str = str(exc)
is_rate_limit = (
"Лимит WB API" in exc_str
or "429" in exc_str
or "cooldown" in exc_str.lower()
or "rate limit" in exc_str.lower()
or "too many" in exc_str.lower()
)
if is_rate_limit:
logger.info("Auto-reply rate limit, keeping queue head review_id=%s", review_id)
break
logger.warning("Auto-reply failed review_id=%s rating=%s error=%s", review_id, rating, exc)
db.add_auto_reply_log(
review_id=review_id,
rating=rating,
product_name=item.get("product_name", ""),
nm_id=int(item.get("nm_id") or 0),
review_created_at=item.get("review_created_at", ""),
user_name=item.get("user_name", ""),
review_text=item.get("review_text", ""),
reply_text=reply_text,
status="failed",
error_text=str(exc),
)
queue.pop(0)
processed += 1
_save_auto_reply_queue(queue)
logger.info("Auto-reply cycle finished sent_count=%s", sent)
return sent
def _scheduler_should_run(last_run_raw: Optional[str], now_ts: float) -> bool:
if not last_run_raw:
return True
try:
last_run = float(last_run_raw)
except ValueError:
return True
return now_ts - last_run >= AUTO_REPLY_INTERVAL_SECONDS
def _next_auto_reply_meta() -> Tuple[Optional[datetime], Optional[int]]:
last_run_raw = db.get_setting(AUTO_REPLY_LAST_RUN_KEY)
if not last_run_raw:
return None, None
try:
last_run = float(last_run_raw)
except ValueError:
return None, None
next_ts = last_run + AUTO_REPLY_INTERVAL_SECONDS
seconds_left = max(0, int(next_ts - time.time()))
next_dt = datetime.fromtimestamp(next_ts, tz=timezone.utc).astimezone(APP_TIMEZONE)
return next_dt, seconds_left
def _next_fetch_seconds_left() -> int:
raw = db.get_setting(AUTO_REPLY_LAST_FETCH_KEY)
if not raw:
return 0
try:
last_fetch = float(raw)
except ValueError:
return 0
return max(0, int(last_fetch + AUTO_REPLY_FETCH_INTERVAL_SECONDS - time.time()))
def auto_reply_loop() -> None:
while True:
try:
if is_auto_reply_enabled():
now_ts = time.time()
last_run = db.get_setting(AUTO_REPLY_LAST_RUN_KEY)
if _scheduler_should_run(last_run, now_ts):
logger.info("Auto-reply loop trigger start last_run=%s now_ts=%s", last_run, now_ts)
db.set_setting(AUTO_REPLY_LAST_RUN_KEY, str(now_ts))
process_auto_replies()
else:
logger.info("Auto-reply loop skip schedule last_run=%s now_ts=%s", last_run, now_ts)
else:
logger.info("Auto-reply loop skip: disabled")
except Exception:
logger.exception("Auto-reply loop crashed")
time.sleep(AUTO_REPLY_LOOP_SLEEP_SECONDS)
@app.template_filter("format_datetime")
def format_datetime(value: Optional[datetime]) -> str:
if not value:
return ""
dt = value
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(APP_TIMEZONE).strftime("%d.%m.%Y %H:%M")
@app.template_filter("format_log_datetime")
def format_log_datetime(value: Optional[str]) -> str:
if not value:
return ""
try:
dt = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
return value
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(APP_TIMEZONE).strftime("%d.%m.%Y %H:%M")
@app.before_request
def load_current_user() -> None:
user_id = session.get("user_id")
g.user = None
if user_id is not None:
user = db.get_user_by_id(int(user_id))
if user and user["is_active"]:
g.user = user
else:
session.pop("user_id", None)
session.pop("token_id", None)
logger.info(
"HTTP request method=%s path=%s query=%s remote=%s user_id=%s",
request.method,
request.path,
request.query_string.decode("utf-8", errors="ignore"),
request.remote_addr,
g.user["id"] if g.user else None,
)
@app.after_request
def log_response(response):
logger.info(
"HTTP response method=%s path=%s status=%s length=%s",
request.method,
request.path,
response.status_code,
response.calculate_content_length(),
)
return response
def login_required(view):
@wraps(view)
def wrapped(*args, **kwargs):
if g.get("user") is None:
return redirect(url_for("login", next=request.path))
return view(*args, **kwargs)
return wrapped
def admin_required(view):
@wraps(view)
def wrapped(*args, **kwargs):
user = g.get("user")
if not user or not user["is_admin"]:
return redirect(url_for("index"))
return view(*args, **kwargs)
return wrapped
@app.route("/cabinet", methods=["GET", "POST"])
@login_required
def cabinet():
error_message: Optional[str] = None
success_message: Optional[str] = None
status = request.args.get("status")
status_map = {
"added": "Токен сохранён.",
"updated": "Токен обновлён.",
"selected": "Активирован выбранный магазин.",
"checked": "Токен успешно проверен.",
}
if status in status_map:
success_message = status_map[status]
user = g.user
if request.method == "POST":
action = request.form.get("cabinet_action")
if action == "add":
name = (request.form.get("name") or "").strip()
token_value = (request.form.get("token") or "").strip()
if not name or not token_value:
error_message = "Введите название и токен."
else:
db.add_token(user_id=user["id"], name=name, token=token_value)
return redirect(url_for("cabinet", status="added"))
elif action == "edit":
token_id = request.form.get("token_id")
token_row = db.get_token(int(token_id)) if token_id else None
name = (request.form.get("name") or "").strip()
token_value = (request.form.get("token") or "").strip()
if not token_row or not (user["is_admin"] or token_row["user_id"] == user["id"]):
error_message = "Недостаточно прав для изменения токена."
elif not name or not token_value:
error_message = "Введите название и токен."
else:
db.update_token(token_id=token_row["id"], name=name, token=token_value)
return redirect(url_for("cabinet", status="updated"))
elif action == "select":
token_id = request.form.get("token_id")
if token_id:
token_row = db.get_token(int(token_id))
if token_row and (user["is_admin"] or token_row["user_id"] == user["id"]):
session["token_id"] = int(token_id)
if request.form.get("next") == "index":
return redirect(url_for("index"))
return redirect(url_for("cabinet", status="selected"))
error_message = "Не удалось выбрать токен."
elif action == "check":
token_id = request.form.get("token_id")
token_row = db.get_token(int(token_id)) if token_id else None
if token_row and (user["is_admin"] or token_row["user_id"] == user["id"]):
try:
FeedbackClient(token_row["token"]).validate_token()
return redirect(url_for("cabinet", status="checked"))
except FeedbackApiError as exc:
error_message = str(exc)
else:
error_message = "Недостаточно прав для проверки токена."
raw_tokens = db.fetch_tokens_for_user(user["id"], bool(user["is_admin"]))
tokens = [
{
"id": row["id"],
"name": row["name"],
"token": row["token"],
"owner": row["owner"],
"user_id": row["user_id"],
}
for row in raw_tokens
]
active_token_id = session.get("token_id")
return render_template(
"cabinet.html",
tokens=tokens,
active_token_id=active_token_id,
error_message=error_message,
success_message=success_message,
current_user=user,
)
@app.route("/")
@login_required
def index():
status = request.args.get("status")
api_cooldown_seconds_left = _get_api_cooldown_seconds_left()
_, active_token_name = _get_active_token()
active_token_id = session.get("token_id")
raw_tokens = db.fetch_tokens_for_user(g.user["id"], bool(g.user["is_admin"]))
tokens = [{"id": r["id"], "name": r["name"]} for r in raw_tokens]
error_message: Optional[str] = None
success_message: Optional[str] = None
if status == "reply_sent":
count = request.args.get("count") or "0"
success_message = f"Отправлено ответов: {count}"
elif status == "pools_saved":
success_message = "Пулы автоответов сохранены."
elif status == "settings_saved":
success_message = "Настройки сохранены. Очередь будет обновлена."
elif status == "reply_failed":
error_text = request.args.get("error") or "Не удалось отправить ответы."
error_message = error_text
return render_template(
"index.html",
error_message=error_message,
success_message=success_message,
active_token_name=active_token_name,
active_token_id=active_token_id,
tokens=tokens,
auto_reply_enabled=is_auto_reply_enabled(),
api_cooldown_seconds_left=api_cooldown_seconds_left,
next_fetch_seconds_left=_next_fetch_seconds_left(),
enabled_stars=list(_load_enabled_stars()),
filter_mode=_load_filter_mode(),
reply_pools={n: _pool_to_multiline_text(_load_reply_pool(n)) for n in range(1, 6)},
auto_reply_queue=_load_auto_reply_queue(),
auto_reply_logs=db.list_auto_reply_logs(limit=100),
current_user=g.user,
)
@app.route("/yandex_b847b9b35f967fcc.html")
def yandex_verify():
return '<html><head><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head><body>Verification: b847b9b35f967fcc</body></html>', 200, {"Content-Type": "text/html; charset=UTF-8"}
@app.route("/api/status")
@login_required
def api_status():
logs = db.list_auto_reply_logs(limit=1)
last_id = logs[0]["id"] if logs else None
cooldown = _get_api_cooldown_seconds_left()
next_fetch = _next_fetch_seconds_left()
queue_len = len(_load_auto_reply_queue())
return jsonify({
"last_log_id": last_id,
"cooldown": cooldown,
"next_fetch_seconds": next_fetch,
"queue_len": queue_len,
"auto_reply_enabled": is_auto_reply_enabled(),
})
@app.route("/auto-reply-toggle", methods=["POST"])
@login_required
def auto_reply_toggle():
enabled = request.form.get("enabled") == "1"
set_auto_reply_enabled(enabled)
return redirect(url_for("index"))
@app.route("/auto-reply-pools", methods=["POST"])
@login_required
def auto_reply_pools():
pool_5 = [item.strip() for item in request.form.getlist("pool_5_item") if item.strip()]
pool_4 = [item.strip() for item in request.form.getlist("pool_4_item") if item.strip()]
is_ajax = request.headers.get("X-Requested-With") == "XMLHttpRequest"
if not pool_5 or not pool_4:
if is_ajax:
return jsonify({"ok": False, "error": "Для 5★ и 4★ укажите минимум по одному варианту ответа."}), 400
return redirect(
url_for(
"index",
status="reply_failed",
error="Для 5★ и 4★ укажите минимум по одному варианту ответа.",
action="unanswered",
stars=[5, 4],
)
)
db.set_setting(AUTO_REPLY_POOL_5_KEY, _pool_to_multiline_text(pool_5))
db.set_setting(AUTO_REPLY_POOL_4_KEY, _pool_to_multiline_text(pool_4))
if is_ajax:
return jsonify({"ok": True})
return redirect(url_for("index", status="pools_saved", action="unanswered", stars=[5, 4]))
@app.route("/auto-reply-settings", methods=["POST"])
@login_required
def auto_reply_settings():
if is_auto_reply_enabled():
return redirect(url_for("index"))
stars = [int(s) for s in request.form.getlist("stars") if s.isdigit() and 1 <= int(s) <= 5]
filter_mode = request.form.get("filter_mode", "no_text")
if filter_mode not in ("no_text", "empty", "all"):
filter_mode = "no_text"
db.set_setting(AUTO_REPLY_STARS_KEY, json.dumps(stars))
db.set_setting(AUTO_REPLY_FILTER_KEY, filter_mode)
for star in range(1, 6):
items = [i.strip() for i in request.form.getlist(f"pool_{star}_item") if i.strip()]
if items:
db.set_setting(f"auto_reply_pool_{star}", _pool_to_multiline_text(items))
# Clear queue and reset fetch window so next cycle rebuilds immediately
db.set_setting("auto_reply_queue_json", "[]")
db.set_setting(AUTO_REPLY_LAST_FETCH_KEY, "0")
return redirect(url_for("index", status="settings_saved"))
@app.route("/reply", methods=["POST"])
@login_required
def reply_all():
message = (request.form.get("message") or "").strip()
review_ids = request.form.getlist("review_id")
stars_from_form = request.form.getlist("stars")
selected_star_values = _parse_selected_stars(stars_from_form)
redirect_params: dict = {"action": "unanswered", "stars": selected_star_values}
if not message:
return redirect(
url_for("index", status="reply_failed", error="Введите текст ответа.", **redirect_params)
)
if len(message) < 2 or len(message) > 5000:
return redirect(
url_for(
"index",
status="reply_failed",
error="Ответ должен содержать от 2 до 5000 символов.",
**redirect_params,
)
)
deduped_ids = []
for review_id in review_ids:
if review_id and review_id not in deduped_ids:
deduped_ids.append(review_id)
if not deduped_ids:
return redirect(
url_for(
"index",
status="reply_failed",
error="Нет отзывов для ответа.",
**redirect_params,
)
)
try:
client = get_client()
sent = client.answer_many(deduped_ids, message)
except FeedbackApiError as exc:
return redirect(url_for("index", status="reply_failed", error=str(exc), **redirect_params))
return redirect(url_for("index", status="reply_sent", count=sent, **redirect_params))
@app.route("/reply/<review_id>", methods=["POST"])
@login_required
def reply_single(review_id: str):
message = (request.form.get("message") or "").strip()
stars_from_form = request.form.getlist("stars")
next_action = request.form.get("next_action") or "all"
redirect_params: dict = {"action": next_action, "stars": _parse_selected_stars(stars_from_form)}
if not message:
return redirect(
url_for("index", status="reply_failed", error="Введите текст ответа.", **redirect_params)
)
if len(message) < 2 or len(message) > 5000:
return redirect(
url_for(
"index",
status="reply_failed",
error="Ответ должен содержать от 2 до 5000 символов.",
**redirect_params,
)
)
try:
client = get_client()
client.send_answer(review_id, message)
except FeedbackApiError as exc:
return redirect(url_for("index", status="reply_failed", error=str(exc), **redirect_params))
return redirect(url_for("index", status="reply_sent", count=1, **redirect_params))
@app.route("/login", methods=["GET", "POST"])
def login():
if g.get("user"):
return redirect(url_for("index"))
error_message: Optional[str] = None
if request.method == "POST":
username = (request.form.get("username") or "").strip().lower()
password = request.form.get("password") or ""
user = db.get_user_by_username(username)
if not user or not check_password_hash(user["password_hash"], password):
error_message = "Неверный логин или пароль."
elif not user["is_active"]:
error_message = "Аккаунт ожидает подтверждения администратора."
else:
session.clear()
session["user_id"] = user["id"]
return redirect(request.args.get("next") or url_for("index"))
return render_template("login.html", error_message=error_message)
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
@app.route("/register", methods=["GET", "POST"])
def register():
if g.get("user"):
return redirect(url_for("index"))
error_message: Optional[str] = None
success_message: Optional[str] = None
if request.method == "POST":
username = (request.form.get("username") or "").strip().lower()
password = request.form.get("password") or ""
confirm = request.form.get("confirm") or ""
if not username or not password:
error_message = "Введите логин и пароль."
elif password != confirm:
error_message = "Пароли не совпадают."
elif username == "ruslan":
error_message = "Нельзя использовать зарезервированный логин."
else:
try:
password_hash = generate_password_hash(password)
db.create_user(username, password_hash)
success_message = "Заявка создана. Дождитесь подтверждения администратора."
except sqlite3.IntegrityError:
error_message = "Такой пользователь уже существует."
return render_template("register.html", error_message=error_message, success_message=success_message)
@app.route("/admin", methods=["GET", "POST"])
@admin_required
def admin_panel():
message: Optional[str] = None
if request.args.get("status") == "updated":
message = "Статус пользователя обновлён."
if request.method == "POST":
user_id = request.form.get("user_id")
action = request.form.get("admin_action")
if user_id and action in {"activate", "deactivate"}:
db.set_user_active(int(user_id), action == "activate")
return redirect(url_for("admin_panel", status="updated"))
users = db.list_users()
return render_template("admin.html", users=users, info_message=message, current_user=g.user)
if __name__ == "__main__":
logger.info(
"App start debug=%s port=%s req_per_sec=%s batch_limit=%s interval_min=%s timezone=%s log_file=%s",
True,
os.getenv("FLASK_PORT", "5000"),
WB_REQUESTS_PER_SECOND,
AUTO_REPLY_BATCH_LIMIT,
AUTO_REPLY_INTERVAL_MINUTES,
APP_TIMEZONE,
APP_LOG_FILE,
)
is_reloader_process = os.environ.get("WERKZEUG_RUN_MAIN") == "true"
if is_reloader_process or not app.debug:
threading.Thread(target=auto_reply_loop, daemon=True).start()
port = int(os.getenv("FLASK_PORT", "5000"))
app.run(host="0.0.0.0", port=port, debug=True)