from flask import Flask, render_template, request, redirect, url_for, session, flash from flask_login import LoginManager, login_user, logout_user, login_required, UserMixin, current_user from werkzeug.security import generate_password_hash, check_password_hash from models import * import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText import datetime import json from playhouse.shortcuts import model_to_dict from functools import wraps app = Flask(__name__) app.secret_key = 'sk_f098a9f7206d40f89bc2a0dd1d2d9182' # нужен для сессий app.jinja_env.filters['from_json'] = json.loads initialize_db() # Ensure default admin exists (for dev/tests). Controlled via env, with defaults. import os def ensure_default_admin(): username = os.environ.get('ADMIN_USERNAME', 'ruslan') password = os.environ.get('ADMIN_PASSWORD', '1234') email = os.environ.get('ADMIN_EMAIL', 'ruslan@example.com') user = User.get_or_none(User.username == username) if user: changed = False if not user.is_admin: user.is_admin = True changed = True # If password differs, reset to provided one try: # check_password_hash is available from werkzeug if not check_password_hash(user.password_hash, password): user.password_hash = generate_password_hash(password) changed = True except Exception: user.password_hash = generate_password_hash(password) changed = True if changed: user.save() else: User.create( username=username, email=email, full_name='Администратор', password_hash=generate_password_hash(password), is_admin=True ) # Seed admin on startup unless explicitly disabled if os.environ.get('SEED_ADMIN_DISABLED') != '1': ensure_default_admin() # Seed two test users (non-admin) for demos def ensure_test_users(): tests = [ ('test1', 'test1@example.com', 'Пользователь 1'), ('test2', 'test2@example.com', 'Пользователь 2'), ] for username, email, full_name in tests: u = User.get_or_none(User.username == username) if not u: User.create( username=username, email=email, full_name=full_name, password_hash=generate_password_hash(os.environ.get('TEST_USER_PASSWORD', '1234')), is_admin=False, ) if os.environ.get('SEED_TEST_USERS_DISABLED') != '1': ensure_test_users() # Admin-only decorator must be defined before routes that use it def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated: return redirect(url_for('login')) if not getattr(current_user, 'is_admin', False): flash('Недостаточно прав', 'danger') return redirect(url_for('dashboard')) return f(*args, **kwargs) return decorated_function @app.route('/admin/users', methods=['GET', 'POST']) @admin_required def manage_users(): if request.method == 'POST': # Create user username = request.form['username'].strip() email = request.form['email'].strip() full_name = request.form.get('full_name', '').strip() password = request.form['password'] is_admin_flag = request.form.get('is_admin') == 'on' if not username or not email or not password: flash('Заполните обязательные поля', 'danger') return redirect(url_for('manage_users')) if User.select().where((User.username == username) | (User.email == email)).exists(): flash('Пользователь с таким логином или email уже существует', 'danger') return redirect(url_for('manage_users')) User.create( username=username, email=email, full_name=full_name or None, password_hash=generate_password_hash(password), is_admin=is_admin_flag, ) flash('Пользователь создан', 'success') return redirect(url_for('manage_users')) users = User.select().order_by(User.id) return render_template('admin/users.html', users=users, title='Пользователи') @app.route('/admin/users//reset_password', methods=['POST']) @admin_required def admin_reset_password(user_id): user = User.get_or_none(User.id == user_id) if not user: flash('Пользователь не найден', 'danger') return redirect(url_for('manage_users')) new_password = request.form.get('new_password') if not new_password: flash('Укажите новый пароль', 'danger') return redirect(url_for('manage_users')) user.password_hash = generate_password_hash(new_password) user.save() flash('Пароль обновлён', 'success') return redirect(url_for('manage_users')) @app.route('/admin/users//toggle_admin', methods=['POST']) @admin_required def admin_toggle_admin(user_id): if current_user.id == user_id: flash('Нельзя менять свои собственные права', 'warning') return redirect(url_for('manage_users')) user = User.get_or_none(User.id == user_id) if not user: flash('Пользователь не найден', 'danger') return redirect(url_for('manage_users')) user.is_admin = not user.is_admin user.save() flash('Права обновлены', 'success') return redirect(url_for('manage_users')) @app.route('/admin/users//delete', methods=['POST']) @admin_required def admin_delete_user(user_id): if current_user.id == user_id: flash('Нельзя удалить самого себя', 'warning') return redirect(url_for('manage_users')) user = User.get_or_none(User.id == user_id) if not user: flash('Пользователь не найден', 'danger') return redirect(url_for('manage_users')) try: user.delete_instance(recursive=True) flash('Пользователь удалён', 'success') except Exception: flash('Не удалось удалить пользователя', 'danger') return redirect(url_for('manage_users')) login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' # Фиксированный админ (можно позже перенести в БД) @login_manager.user_loader def load_user(user_id): return User.get_or_none(User.id == int(user_id)) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = User.get_or_none(User.username == username) if user and check_password_hash(user.password_hash, password): login_user(user) print("✅ Авторизация успешна:", current_user.is_authenticated) print("📍 current_user:", current_user.username) print("📍 Сессия:", dict(session)) return redirect(url_for('dashboard' if not user.is_admin else 'manage_surveys')) else: flash("Неверные учетные данные", "danger") flash("Неверные учетные данные", "danger") return render_template('login.html', title="Вход") def send_invite_email(to_email, link, sender_name, survey_name): msg = MIMEMultipart("alternative") msg['Subject'] = f"📨 Приглашение пройти опрос: {survey_name}" msg['From'] = "quiz@4mont.ru" msg['To'] = to_email plain_text = f"""\ Вы получили приглашение на участие в опросе от компании МОНТ. Опрос: {survey_name} Отправитель: {sender_name} Пройдите опрос по ссылке: {link} """ html = f"""\

📨 Вас пригласили пройти опрос от компании МОНТ

Опрос: {survey_name}
Отправитель: {sender_name}

👉 Перейти к опросу


Если вы не ожидали этого письма — просто проигнорируйте его. """ msg.attach(MIMEText(plain_text, "plain")) msg.attach(MIMEText(html, "html")) with smtplib.SMTP("mail.hosting.reg.ru", 587) as server: server.starttls() server.login("quiz@4mont.ru", "utOgbZ09quizpochta") server.send_message(msg) def send_result_email(to_email, from_email, survey_name, scores, unsupported, sender_name): report_link = "https://quiz.4mont.ru/dashboard" # Текстовая версия plain_text = f"""\ Компания МОНТ Отправитель: {sender_name} ({from_email}) Опрос: {survey_name} Результаты: {chr(10).join(f"{p}: {v}%" for p, v in scores.items())} Неподдерживаемые функции: {chr(10).join(f"{p}: {', '.join(u)}" for p, u in unsupported.items() if u) or 'Все функции поддерживаются'} Ссылка на отчёты: {report_link} """ # HTML-версия письма html = f"""\

📩 Опрос от компании МОНТ

Отправитель: {sender_name} ({from_email})
Опрос: {survey_name}

📊 Результаты:

    {''.join(f"
  • {p}: {v}%
  • " for p, v in scores.items())}

🚫 Неподдерживаемые функции:

    {''.join(f"
  • {p}: {', '.join(u)}
  • " for p, u in unsupported.items() if u) or '
  • Все функции поддерживаются
  • '}

🔗 Перейти к отчётам


Это автоматическое уведомление. Пожалуйста, не отвечайте на него. """ msg = MIMEMultipart("alternative") msg['Subject'] = f"📥 Ответ на опрос: {survey_name}" msg['From'] = "quiz@4mont.ru" msg['To'] = to_email msg.attach(MIMEText(plain_text, "plain")) msg.attach(MIMEText(html, "html")) with smtplib.SMTP("mail.hosting.reg.ru", 587) as server: server.starttls() server.login("quiz@4mont.ru", "utOgbZ09quizpochta") server.send_message(msg) @app.route('/invite/', methods=['GET', 'POST']) def handle_invite(uuid_str): invite = SurveyInvite.get_or_none(SurveyInvite.uuid == uuid_str) if not invite: return "Ссылка недействительна", 404 user_survey = invite.survey survey_type = user_survey.survey_type products = Product.select().where(Product.survey_type == survey_type) if request.method == 'GET': questions = {} for product in products: features = Feature.select().where(Feature.product == product) questions[product.name] = {f.name: f.question_text for f in features} return render_template("invite/form.html", invite=invite, survey=user_survey, questions=questions) # POST: обработка результатов full_name = request.form.get('full_name') phone = request.form.get('phone') organization = request.form.get('organization') answers = dict(request.form) for k in ['full_name', 'phone', 'organization']: answers.pop(k, None) selected_keys = list(answers.keys()) # Расчёт результатов platforms = Platform.select().where(Platform.survey_type == survey_type) features = Feature.select().join(Product).where( Feature.name.in_(selected_keys), Product.survey_type == survey_type ) comment = request.form.get('comment') scores = {} unsupported = {} total = len(selected_keys) for platform in platforms: match = 0 unsup = [] for feature in features: pf = PlatformFeature.get_or_none(platform=platform, feature=feature) if pf and pf.supported: match += 1 else: unsup.append(feature.question_text) scores[platform.name] = round((match / total) * 100) if total else 0 unsupported[platform.name] = unsup # Сохраняем результат SurveyResult.create( invite=invite, full_name=full_name, phone=phone, organization=organization, answers=json.dumps(answers), platform_scores=json.dumps(scores), unsupported=json.dumps(unsupported), submitted_at=datetime.datetime.now(), comment=comment # ← добавлено ) invite.responded = True invite.save() send_result_email( to_email=invite.survey.user.email, from_email=invite.recipient_email, survey_name=survey_type.name, scores=scores, unsupported=unsupported, sender_name=invite.survey.user.full_name ) # ✅ если результат не нужно показывать — редирект if not invite.show_result: flash("Спасибо, ваш ответ сохранён!", "success") return redirect("https://www.mont.ru/ru-ru") return render_template("invite/result.html", scores=scores, unsupported_features=unsupported, chart_labels=list(scores.keys()), chart_data=list(scores.values())) @app.route('/send-survey/', methods=['POST']) @login_required def send_survey(survey_id): user_survey = UserSurvey.get_or_none(UserSurvey.id == survey_id, UserSurvey.user == current_user.id) if not user_survey: flash("Опрос не найден", "danger") return redirect(url_for('dashboard')) recipient_email = request.form['email'] show_result = 'show_result' in request.form ask_full_name = 'ask_full_name' in request.form ask_phone = 'ask_phone' in request.form ask_organization = 'ask_organization' in request.form invite = SurveyInvite.create( survey=user_survey, recipient_email=recipient_email, show_result=show_result, ask_full_name=ask_full_name, ask_phone=ask_phone, ask_organization=ask_organization ) link = f"http://quiz.4mont.ru/invite/{invite.uuid}" send_invite_email( to_email=recipient_email, link=link, sender_name=current_user.full_name, survey_name=user_survey.survey_type.name ) flash("Опрос отправлен!", "success") return redirect(url_for('dashboard')) @app.route('/admin/features//delete', methods=['POST']) @login_required def delete_feature(feature_id): feature = Feature.get_or_none(Feature.id == feature_id) if not feature: flash("Вопрос не найден", "danger") return redirect(request.referrer) # Удалим все связанные записи в PlatformFeature PlatformFeature.delete().where(PlatformFeature.feature == feature).execute() feature.delete_instance() flash("Вопрос удалён", "success") return redirect(request.referrer) @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form['username'] email = request.form['email'] password = request.form['password'] full_name = request.form['full_name'] # Корректная проверка уникальности логина/почты и создание пользователя if User.select().where((User.username == username) | (User.email == email)).exists(): flash("Пользователь с таким логином или e-mail уже существует", "danger") return redirect(url_for('register')) user = User.create( username=username, email=email, full_name=full_name, password_hash=generate_password_hash(password), is_admin=False ) login_user(user) return redirect(url_for('dashboard')) if User.get_or_none(User.username == username or User.email == email): flash("Пользователь с таким логином или почтой уже существует", "danger") return redirect(url_for('register')) user = User.create( username=username, email=email, full_name=full_name, password_hash=generate_password_hash(password), is_admin=False ) login_user(user) return redirect(url_for('dashboard')) return render_template("register.html", title="Регистрация") @app.route('/logout') def logout(): logout_user() return redirect(url_for('choose_survey')) # Защита всех админских маршрутов from functools import wraps @app.route('/dashboard') @login_required def dashboard(): if current_user.is_admin: invites = (SurveyInvite .select() .join(UserSurvey) .order_by(SurveyInvite.sent_at.desc())) else: invites = (SurveyInvite .select() .join(UserSurvey) .where(UserSurvey.user == current_user.id) .order_by(SurveyInvite.sent_at.desc())) results = { r.invite.id: r for r in SurveyResult.select().where(SurveyResult.invite.in_([i.id for i in invites])) } return render_template("user/dashboard.html", invites=invites, results=results) @app.route('/dashboard/invite//delete', methods=['POST']) @login_required def delete_invite(invite_id): invite = SurveyInvite.get_or_none(SurveyInvite.id == invite_id) if invite and invite.survey.user == current_user: # Удалим связанный результат, если есть SurveyResult.delete().where(SurveyResult.invite == invite).execute() invite.delete_instance() flash("Приглашение удалено", "success") else: flash("Удаление невозможно", "danger") return redirect(url_for('dashboard')) def admin_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated: return redirect(url_for('login')) if not getattr(current_user, 'is_admin', False): flash("Доступ запрещен: требуется администратор", "danger") return redirect(url_for('dashboard')) return f(*args, **kwargs) return decorated_function @app.route('/admin/support', methods=['GET', 'POST']) @admin_required def manage_support(): platforms = list(Platform.select()) features = list(Feature.select()) # Словарь [platform_id][feature_id] = True/False support = {p.id: {f.id: False for f in features} for p in platforms} # Заполнение текущей поддержки for pf in PlatformFeature.select(): support[pf.platform.id][pf.feature.id] = pf.supported if request.method == 'POST': for platform in platforms: for feature in features: key = f"support_{platform.id}_{feature.id}" is_checked = key in request.form pf, created = PlatformFeature.get_or_create( platform=platform, feature=feature, defaults={'supported': is_checked} ) if not created and pf.supported != is_checked: pf.supported = is_checked pf.save() return redirect(url_for('manage_support')) return render_template("admin/support.html", platforms=platforms, features=features, support=support, title="Матрица поддержки") @app.route('/admin/platforms/delete/', methods=['POST']) @admin_required def delete_platform(platform_id): platform = Platform.get_or_none(Platform.id == platform_id) if platform: platform.delete_instance(recursive=True) # также удалит связанные PlatformFeature return redirect(url_for('manage_platforms')) @app.route('/admin/platforms', methods=['GET', 'POST']) @admin_required def manage_platforms(): if request.method == 'POST': name = request.form['name'] Platform.get_or_create(name=name) return redirect(url_for('manage_platforms')) platforms = Platform.select() return render_template('admin/platforms.html', platforms=platforms, title="Платформы") @app.route('/admin/products//features', methods=['GET', 'POST']) @admin_required def manage_features(product_id): product = Product.get_by_id(product_id) if request.method == 'POST': text = request.form['question_text'] Feature.get_or_create(question_text=text, product=product) return redirect(url_for('manage_features', product_id=product_id)) features = Feature.select().where(Feature.product == product) return render_template('admin/features.html', product=product, features=features, title="Вопросы") #Удаление вопроса @app.route('/admin/products//features//delete', methods=['POST']) @admin_required def delete_product_feature(product_id, feature_id): feature = Feature.get_or_none(Feature.id == feature_id) if feature: feature.delete_instance() return redirect(url_for('manage_features', product_id=product_id)) @app.route('/admin/surveys', methods=['GET', 'POST']) @login_required def manage_surveys(): if request.method == 'POST': name = request.form['name'].strip() if not name: flash("Название опроса не может быть пустым", "danger") return redirect(url_for('manage_surveys')) # Создаём или получаем SurveyType survey_type, created = SurveyType.get_or_create(name=name) # Назначаем текущему пользователю (если не назначен ранее) if not UserSurvey.get_or_none(survey_type=survey_type, user=current_user): UserSurvey.create(survey_type=survey_type, user=current_user, name=name) flash("Опрос создан и назначен вам", "success") return redirect(url_for('manage_surveys')) if current_user.is_admin: surveys = SurveyType.select() users = User.select().where(User.is_admin == False) assignments = { s.id: [us.user.id for us in UserSurvey.select().where(UserSurvey.survey_type == s)] for s in surveys } else: surveys = (SurveyType .select() .join(UserSurvey) .where(UserSurvey.user == current_user.id) .distinct()) users = [] assignments = {} return render_template('admin/surveys.html', surveys=surveys, users=users, assignments=assignments, current_user=current_user, title="Опросники") @app.route('/admin/surveys/assign_bulk', methods=['POST']) @admin_required def assign_bulk_surveys(): survey_id = int(request.form['survey_id']) selected_user_ids = request.form.getlist('user_ids') # Удалить все старые назначения UserSurvey.delete().where(UserSurvey.survey_type == survey_id).execute() # Добавить новые for uid in selected_user_ids: UserSurvey.create( survey_type=survey_id, user=User.get_by_id(uid), name=f"Опрос {survey_id} для пользователя {uid}" ) flash("Назначения обновлены", "success") return redirect(url_for('manage_surveys')) @app.route('/admin/surveys/assign', methods=['POST']) @admin_required def assign_survey_to_user(): survey_id = request.form['survey_id'] user_id = request.form['user_id'] # Проверка: если такой связи нет — создать if not UserSurvey.get_or_none(survey_type=survey_id, user=user_id): UserSurvey.create(survey_type=survey_id, user=user_id, name=f"Опрос {survey_id} для пользователя {user_id}") flash("Опрос назначен пользователю", "success") return redirect(url_for('manage_surveys')) @app.route('/admin/surveys//products', methods=['GET', 'POST']) @admin_required def manage_products(survey_id): survey = SurveyType.get_by_id(survey_id) if request.method == 'POST': name = request.form['name'] Product.get_or_create(name=name, survey_type=survey) return redirect(url_for('manage_products', survey_id=survey_id)) products = Product.select().where(Product.survey_type == survey) return render_template('admin/products.html', survey=survey, products=products, title="Категории опроса") @app.route('/admin/surveys//products//delete', methods=['POST']) @admin_required def delete_product(survey_id, product_id): product = Product.get_or_none(Product.id == product_id) if product: product.delete_instance(recursive=True) # удалит и связанные features return redirect(url_for('manage_products', survey_id=survey_id)) @app.route('/quiz') @login_required def choose_survey(): if current_user.is_admin: surveys = SurveyType.select() users = User.select().where(User.is_admin == False) # Словарь: survey_id -> [user_ids] assignments = { s.id: [us.user.id for us in UserSurvey.select().where(UserSurvey.survey_type == s)] for s in surveys } return render_template("choose_survey.html", surveys=surveys, users=users, assignments=assignments, current_user=current_user) else: surveys = (UserSurvey .select() .where(UserSurvey.user == current_user.id) .order_by(UserSurvey.created_at.desc())) return render_template("choose_survey.html", surveys=surveys) @app.route('/quiz/assign', methods=['POST']) @admin_required def assign_user_survey(): user_id = request.form['user_id'] survey_id = request.form['survey_id'] us = UserSurvey.get_or_none(UserSurvey.id == survey_id) if not us: flash("Опрос не найден", "danger") return redirect(url_for('choose_survey')) UserSurvey.create( user=User.get_by_id(user_id), survey_type=us.survey_type, name=f"{us.name} (копия)", ) flash("Опрос назначен пользователю", "success") return redirect(url_for('choose_survey')) @app.route('/quiz/', methods=['GET', 'POST']) def show_quiz(survey_id): survey = SurveyType.get_by_id(survey_id) products = Product.select().where(Product.survey_type == survey) if request.method == 'GET': questions = {} for product in products: features = Feature.select().where(Feature.product == product) questions[product.name] = list(features) return render_template("quiz.html", survey=survey, questions=questions) # POST: обработка результатов selected_feature_ids = [] for key in request.form.keys(): if key.startswith('feature_'): try: fid = int(key.replace('feature_', '')) selected_feature_ids.append(fid) except ValueError: continue platforms = Platform.select().where(Platform.survey_type == survey) features = Feature.select().where(Feature.id.in_(selected_feature_ids)) scores = {p.name: 0 for p in platforms} unsupported = {p.name: [] for p in platforms} total = len(selected_feature_ids) for platform in platforms: for feature in features: pf = PlatformFeature.get_or_none(platform=platform, feature=feature) if pf and pf.supported: scores[platform.name] += 1 else: unsupported[platform.name].append(feature.question_text) for p in scores: scores[p] = (scores[p] / total) * 100 if total else 0 return render_template("result.html", scores=scores, unsupported_features=unsupported, chart_labels=list(scores.keys()), chart_data=list(scores.values())) @app.route('/quiz/assign_bulk', methods=['POST']) @admin_required def assign_user_survey_bulk(): survey_id = int(request.form['survey_id']) user_ids = request.form.getlist('user_ids') # Удалить старые назначения UserSurvey.delete().where(UserSurvey.survey_type == survey_id).execute() # Добавить новые for uid in user_ids: UserSurvey.create( survey_type=survey_id, user=User.get_by_id(uid), name=f"Опрос {survey_id} для пользователя {uid}" ) flash("Назначения обновлены", "success") return redirect(url_for('choose_survey')) @app.route('/admin/surveys/delete/', methods=['POST']) @admin_required def delete_survey(survey_id): survey = SurveyType.get_or_none(SurveyType.id == survey_id) if survey: survey.delete_instance(recursive=True) return redirect(url_for('manage_surveys')) @app.route('/admin/surveys//platforms', methods=['GET', 'POST']) @admin_required def manage_platforms_by_survey(survey_id): survey = SurveyType.get_by_id(survey_id) if request.method == 'POST': name = request.form['name'] Platform.get_or_create(name=name, survey_type=survey) return redirect(url_for('manage_platforms_by_survey', survey_id=survey.id)) platforms = Platform.select().where(Platform.survey_type == survey) return render_template('admin/platforms_by_survey.html', survey=survey, platforms=platforms, title="Платформы опроса") @app.route('/admin/surveys//platforms/delete/', methods=['POST']) @admin_required def delete_platform_by_survey(survey_id, platform_id): platform = Platform.get_or_none(id=platform_id, survey_type=survey_id) if platform: platform.delete_instance(recursive=True) return redirect(url_for('manage_platforms_by_survey', survey_id=survey_id)) @app.route('/admin/surveys//support', methods=['GET', 'POST']) @admin_required def manage_support_by_survey(survey_id): survey = SurveyType.get_by_id(survey_id) platforms = list(Platform.select().where(Platform.survey_type == survey)) products = list(Product.select().where(Product.survey_type == survey)) features = Feature.select().where(Feature.product.in_(products)) support = {p.id: {f.id: False for f in features} for p in platforms} for pf in PlatformFeature.select().where(PlatformFeature.platform.in_(platforms)): support[pf.platform.id][pf.feature.id] = pf.supported if request.method == 'POST': for p in platforms: for f in features: key = f"support_{p.id}_{f.id}" checked = key in request.form pf, created = PlatformFeature.get_or_create( platform=p, feature=f, defaults={'supported': checked} ) if not created and pf.supported != checked: pf.supported = checked pf.save() return redirect(url_for('manage_support_by_survey', survey_id=survey_id)) return render_template("admin/support_by_survey.html", survey=survey, platforms=platforms, features=features, support=support, title="Матрица поддержки") @app.route('/') def home(): return redirect(url_for('choose_survey')) if __name__ == '__main__': app.run(host='0.0.0.0', debug=True)