diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..1f5b97c --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,47 @@ +from flask import Flask +from flask_login import LoginManager +import os +import json + +from .models import initialize_db, User +from .utils.auth import ensure_default_admin, ensure_test_users + +login_manager = LoginManager() + + +def create_app(): + app = Flask(__name__) + app.secret_key = os.environ.get('FLASK_SECRET', 'sk_f098a9f7206d40f89bc2a0dd1d2d9182') + app.jinja_env.filters['from_json'] = json.loads + + # DB init and seeds + initialize_db() + if os.environ.get('SEED_ADMIN_DISABLED') != '1': + ensure_default_admin() + if os.environ.get('SEED_TEST_USERS_DISABLED') != '1': + ensure_test_users() + + # Login manager + login_manager.init_app(app) + login_manager.login_view = 'login' + + @login_manager.user_loader + def load_user(user_id): + try: + return User.get_by_id(int(user_id)) + except Exception: + return None + + # Register blueprints + from .blueprints.auth import bp as auth_bp + from .blueprints.admin_users import bp as admin_users_bp + from .blueprints.core import bp as core_bp + from .blueprints.invite import bp as invite_bp + + app.register_blueprint(auth_bp) + app.register_blueprint(core_bp) + app.register_blueprint(invite_bp) + app.register_blueprint(admin_users_bp) + + return app + diff --git a/app/blueprints/admin_users.py b/app/blueprints/admin_users.py new file mode 100644 index 0000000..6184674 --- /dev/null +++ b/app/blueprints/admin_users.py @@ -0,0 +1,91 @@ +from flask import Blueprint, render_template, request, redirect, url_for, flash +from werkzeug.security import generate_password_hash +from app.models import User +from app.utils.auth import admin_required + +bp = Blueprint('admin_users', __name__) + + +@bp.route('/admin/users', methods=['GET', 'POST'], endpoint='manage_users') +@admin_required +def manage_users(): + if request.method == 'POST': + username = request.form['username'].strip() + email = request.form['email'].strip() + full_name = request.form.get('full_name', '').strip() + password = request.form['password'] + is_admin_flag = request.form.get('is_admin') == 'on' + + if not username, or not email or not password: + flash('Заполните обязательные поля', 'danger') + return redirect(url_for('manage_users')) + if User.select().where((User.username == username) | (User.email == email)).exists(): + flash('Пользователь с таким логином или email уже существует', 'danger') + return redirect(url_for('manage_users')) + + User.create( + username=username, + email=email, + full_name=full_name or None, + password_hash=generate_password_hash(password), + is_admin=is_admin_flag, + ) + flash('Пользователь создан', 'success') + return redirect(url_for('manage_users')) + + users = User.select().order_by(User.id) + return render_template('admin/users.html', users=users, title='Пользователи') + + +@bp.route('/admin/users//reset_password', methods=['POST'], endpoint='admin_reset_password') +@admin_required +def admin_reset_password(user_id): + user = User.get_or_none(User.id == user_id) + if not user: + flash('Пользователь не найден', 'danger') + return redirect(url_for('manage_users')) + new_password = request.form.get('new_password') + if not new_password: + flash('Укажите новый пароль', 'danger') + return redirect(url_for('manage_users')) + user.password_hash = generate_password_hash(new_password) + user.save() + flash('Пароль обновлён', 'success') + return redirect(url_for('manage_users')) + + +@bp.route('/admin/users//toggle_admin', methods=['POST'], endpoint='admin_toggle_admin') +@admin_required +def admin_toggle_admin(user_id): + from flask_login import current_user + if current_user.id == user_id: + flash('Нельзя менять свои собственные права', 'warning') + return redirect(url_for('manage_users')) + user = User.get_or_none(User.id == user_id) + if not user: + flash('Пользователь не найден', 'danger') + return redirect(url_for('manage_users')) + user.is_admin = not user.is_admin + user.save() + flash('Права обновлены', 'success') + return redirect(url_for('manage_users')) + + +@bp.route('/admin/users//delete', methods=['POST'], endpoint='admin_delete_user') +@admin_required +def admin_delete_user(user_id): + from flask_login import current_user + if current_user.id == user_id: + flash('Нельзя удалить самого себя', 'warning') + return redirect(url_for('manage_users')) + user = User.get_or_none(User.id == user_id) + if not user: + flash('Пользователь не найден', 'danger') + return redirect(url_for('manage_users')) + try: + user.delete_instance(recursive=True) + flash('Пользователь удалён', 'success') + except Exception: + flash('Не удалось удалить пользователя', 'danger') + return redirect(url_for('manage_users')) + diff --git a/app/blueprints/auth.py b/app/blueprints/auth.py new file mode 100644 index 0000000..606c069 --- /dev/null +++ b/app/blueprints/auth.py @@ -0,0 +1,29 @@ +from flask import Blueprint, render_template, request, redirect, url_for, flash +from flask_login import login_user, logout_user +from werkzeug.security import check_password_hash + +from app.models import User + +bp = Blueprint('auth', __name__) + + +@bp.route('/login', methods=['GET', 'POST'], endpoint='login') +def login(): + if request.method == 'POST': + username = request.form['username'] + password = request.form['password'] + user = User.get_or_none(User.username == username) + + if user and check_password_hash(user.password_hash, password): + login_user(user) + return redirect(url_for('dashboard' if not user.is_admin else 'manage_surveys')) + else: + flash('Неверный логин или пароль', 'danger') + return render_template('login.html', title='Вход') + + +@bp.route('/logout', endpoint='logout') +def logout(): + logout_user() + return redirect(url_for('choose_survey')) + diff --git a/app/blueprints/core.py b/app/blueprints/core.py new file mode 100644 index 0000000..0ec3b55 --- /dev/null +++ b/app/blueprints/core.py @@ -0,0 +1,54 @@ +from flask import Blueprint, render_template, redirect, url_for +from flask_login import login_required, current_user +from app.models import SurveyInvite, UserSurvey, SurveyType, User + +bp = Blueprint('core', __name__) + + +@bp.route('/', endpoint='home') +def home(): + return redirect(url_for('choose_survey')) + + +@bp.route('/dashboard', endpoint='dashboard') +@login_required +def dashboard(): + if current_user.is_admin: + invites = (SurveyInvite + .select() + .join(UserSurvey) + .order_by(SurveyInvite.sent_at.desc())) + else: + invites = (SurveyInvite + .select() + .join(UserSurvey) + .where(UserSurvey.user == current_user.id) + .order_by(SurveyInvite.sent_at.desc())) + return render_template('user/dashboard.html', invites=invites, title='Отчёты') + + +@bp.route('/quiz', endpoint='choose_survey') +@login_required +def choose_survey(): + if current_user.is_admin: + surveys = SurveyType.select() + users = User.select().where(User.is_admin == False) + assignments = { + s.id: [us.user.id for us in UserSurvey.select().where(UserSurvey.survey_type == s)] + for s in surveys + } + else: + surveys = (SurveyType + .select() + .join(UserSurvey) + .where(UserSurvey.user == current_user.id) + .distinct()) + users = [] + assignments = {} + + return render_template('choose_survey.html', + surveys=surveys, + users=users, + assignments=assignments, + title='Опросы') + diff --git a/app/blueprints/invite.py b/app/blueprints/invite.py new file mode 100644 index 0000000..646b38c --- /dev/null +++ b/app/blueprints/invite.py @@ -0,0 +1,90 @@ +from flask import Blueprint, render_template, request, redirect, url_for, flash +import datetime +import json +from app.models import SurveyInvite, Product, Feature, Platform, PlatformFeature, UserSurvey, SurveyType +from app.utils.mail import send_result_email + +bp = Blueprint('invite', __name__) + + +@bp.route('/invite/', methods=['GET', 'POST'], endpoint='handle_invite') +def handle_invite(uuid_str): + invite = SurveyInvite.get_or_none(SurveyInvite.uuid == uuid_str) + if not invite: + return "Ссылка недействительна", 404 + + user_survey = invite.survey + survey_type = user_survey.survey_type + products = Product.select().where(Product.survey_type == survey_type) + + if request.method == 'GET': + questions = {} + for product in products: + features = Feature.select().where(Feature.product == product) + questions[product.name] = {f.name: f.question_text for f in features} + return render_template("invite/form.html", invite=invite, survey=user_survey, questions=questions) + + full_name = request.form.get('full_name') + phone = request.form.get('phone') + organization = request.form.get('organization') + answers = dict(request.form) + for k in ['full_name', 'phone', 'organization']: + answers.pop(k, None) + selected_keys = list(answers.keys()) + + platforms = Platform.select().where(Platform.survey_type == survey_type) + features = Feature.select().join(Product).where( + Feature.name.in_(selected_keys), + Product.survey_type == survey_type + ) + comment = request.form.get('comment') + scores = {} + unsupported = {} + total = len(selected_keys) + + for platform in platforms: + match = 0 + unsup = [] + for feature in features: + pf = PlatformFeature.get_or_none(platform=platform, feature=feature) + if pf and pf.supported: + match += 1 + else: + unsup.append(feature.question_text) + scores[platform.name] = round((match / total) * 100) if total else 0 + unsupported[platform.name] = unsup + + SurveyResult = __import__('app.models', fromlist=['SurveyResult']).SurveyResult + SurveyResult.create( + invite=invite, + full_name=full_name, + phone=phone, + organization=organization, + answers=json.dumps(answers), + platform_scores=json.dumps(scores), + unsupported=json.dumps(unsupported), + submitted_at=datetime.datetime.now(), + comment=comment + ) + + invite.responded = True + invite.save() + + send_result_email( + to_email=invite.survey.user.email, + from_email=invite.recipient_email, + survey_name=survey_type.name, + scores=scores, + unsupported=unsupported, + sender_name=invite.survey.user.full_name + ) + + if not invite.show_result: + flash("Спасибо! Ваши ответы отправлены.", "success") + return redirect("https://www.mont.ru/ru-ru") + + return render_template("invite/result.html", + scores=scores, + unsupported_features=unsupported, + chart_labels=list(scores.keys()), + chart_data=list(scores.values())) diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..3a5c663 --- /dev/null +++ b/app/models.py @@ -0,0 +1,117 @@ +from peewee import * +from flask_login import UserMixin +import os +import datetime +import uuid + +USE_SQLITE = os.environ.get("USE_SQLITE") == "1" + +if USE_SQLITE: + db = SqliteDatabase(os.environ.get("SQLITE_PATH", "survey.db")) +else: + db = PostgresqlDatabase( + os.environ.get("POSTGRES_DB", "survey"), + user=os.environ.get("POSTGRES_USER", "servey"), + password=os.environ.get("POSTGRES_PASSWORD", "utOgbZ09servey"), + host=os.environ.get("POSTGRES_HOST", "db"), + port=int(os.environ.get("POSTGRES_PORT", 5432)) + ) + + +class BaseModel(Model): + class Meta: + database = db + + +class User(BaseModel, UserMixin): + username = CharField(unique=True) + email = CharField(unique=True) + password_hash = CharField() + full_name = CharField(null=True) + is_admin = BooleanField(default=False) + + def get_id(self): + return str(self.id) + + @property + def is_active(self): + return True + + +class SurveyType(BaseModel): + name = CharField(unique=True) + + +class Product(BaseModel): + name = CharField() + survey_type = ForeignKeyField(SurveyType, backref='products', on_delete='CASCADE') + + +class Feature(BaseModel): + question_text = TextField() + product = ForeignKeyField(Product, backref='features', on_delete='CASCADE') + + +class Platform(BaseModel): + name = CharField(unique=True) + survey_type = ForeignKeyField(SurveyType, backref='platforms', on_delete='CASCADE') + + +class PlatformFeature(BaseModel): + platform = ForeignKeyField(Platform, backref='features', on_delete='CASCADE') + feature = ForeignKeyField(Feature, backref='platforms', on_delete='CASCADE') + supported = BooleanField(default=True) + + +class UserSurvey(BaseModel): + user = ForeignKeyField(User, backref='surveys', on_delete='CASCADE') + survey_type = ForeignKeyField(SurveyType, on_delete='CASCADE') + name = CharField() + created_at = DateTimeField(default=datetime.datetime.now) + + +class SurveyInvite(BaseModel): + survey = ForeignKeyField(UserSurvey, backref='invites', on_delete='CASCADE') + recipient_email = CharField() + uuid = UUIDField(default=uuid.uuid4, unique=True) + show_result = BooleanField(default=False) + ask_full_name = BooleanField(default=False) + ask_phone = BooleanField(default=False) + ask_organization = BooleanField(default=False) + sent_at = DateTimeField(default=datetime.datetime.now) + responded = BooleanField(default=False) + + +class SurveyResult(BaseModel): + invite = ForeignKeyField(SurveyInvite, backref='result') + full_name = CharField(null=True) + phone = CharField(null=True) + organization = CharField(null=True) + answers = TextField(null=True) + platform_scores = TextField(null=True) + unsupported = TextField(null=True) + submitted_at = DateTimeField(default=datetime.datetime.now, null=False) + comment = TextField(null=True) + + +def initialize_db(): + try: + db.connect() + except Exception: + if not USE_SQLITE: + sqlite_db = SqliteDatabase(os.environ.get("SQLITE_PATH", "survey.db")) + sqlite_db.bind([ + User, SurveyType, Product, Feature, Platform, PlatformFeature, + UserSurvey, SurveyInvite, SurveyResult + ], bind_refs=False, bind_backrefs=False) + globals()['db'] = sqlite_db + sqlite_db.connect() + else: + raise + + db.create_tables([ + User, SurveyType, Product, Feature, Platform, PlatformFeature, + UserSurvey, SurveyInvite, SurveyResult + ], safe=True) + db.close() + diff --git a/app/utils/auth.py b/app/utils/auth.py new file mode 100644 index 0000000..f6c8732 --- /dev/null +++ b/app/utils/auth.py @@ -0,0 +1,67 @@ +from flask import redirect, url_for, flash +from flask_login import current_user +from functools import wraps +from werkzeug.security import generate_password_hash, check_password_hash +import os + +from app.models import User + + +def admin_required(f): + @wraps(f) + def decorated(*args, **kwargs): + if not current_user.is_authenticated: + return redirect(url_for('login')) + if not getattr(current_user, 'is_admin', False): + flash('Недостаточно прав', 'danger') + return redirect(url_for('dashboard')) + return f(*args, **kwargs) + return decorated + + +def ensure_default_admin(): + username = os.environ.get('ADMIN_USERNAME', 'ruslan') + password = os.environ.get('ADMIN_PASSWORD', '1234') + email = os.environ.get('ADMIN_EMAIL', 'ruslan@example.com') + + user = User.get_or_none(User.username == username) + if user: + changed = False + if not user.is_admin: + user.is_admin = True + changed = True + try: + if not check_password_hash(user.password_hash, password): + user.password_hash = generate_password_hash(password) + changed = True + except Exception: + user.password_hash = generate_password_hash(password) + changed = True + if changed: + user.save() + else: + User.create( + username=username, + email=email, + full_name='Администратор', + password_hash=generate_password_hash(password), + is_admin=True + ) + + +def ensure_test_users(): + tests = [ + ('test1', 'test1@example.com', 'Пользователь 1'), + ('test2', 'test2@example.com', 'Пользователь 2'), + ] + for username, email, full_name in tests: + u = User.get_or_none(User.username == username) + if not u: + User.create( + username=username, + email=email, + full_name=full_name, + password_hash=generate_password_hash(os.environ.get('TEST_USER_PASSWORD', '1234')), + is_admin=False, + ) + diff --git a/app/utils/mail.py b/app/utils/mail.py new file mode 100644 index 0000000..f5c8763 --- /dev/null +++ b/app/utils/mail.py @@ -0,0 +1,64 @@ +import smtplib +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + + +def send_invite_email(to_email, link, sender_name, survey_name): + msg = MIMEMultipart("alternative") + msg['Subject'] = f"Приглашение пройти опрос: {survey_name}" + msg['From'] = "quiz@4mont.ru" + msg['To'] = to_email + + plain_text = f"Опрос: {survey_name}\nОтправитель: {sender_name}\n\nСсылка: {link}\n" + html = f""" + + +

Приглашение пройти опрос

+

Опрос: {survey_name}
+ Отправитель: {sender_name}

+

Перейти к опросу

+ + + """ + + msg.attach(MIMEText(plain_text, "plain")) + msg.attach(MIMEText(html, "html")) + + with smtplib.SMTP("mail.hosting.reg.ru", 587) as server: + server.starttls() + server.login("quiz@4mont.ru", "utOgbZ09quizpochta") + server.send_message(msg) + + +def send_result_email(to_email, from_email, survey_name, scores, unsupported, sender_name): + report_link = "https://quiz.4mont.ru/dashboard" + plain_text = ( + f"Результаты опроса\nОтправитель: {sender_name} ({from_email})\n" + f"Опрос: {survey_name}\n\n" + + "\n".join(f"{p}: {v}%" for p, v in scores.items()) + + "\n\nПодробности: " + report_link + ) + html = f""" + + +

Результаты опроса: {survey_name}

+
    + {''.join(f'
  • {p}: {v}%
  • ' for p, v in scores.items())} +
+

Открыть отчеты

+ + + """ + + msg = MIMEMultipart("alternative") + msg['Subject'] = f"Результаты опроса: {survey_name}" + msg['From'] = "quiz@4mont.ru" + msg['To'] = to_email + msg.attach(MIMEText(plain_text, "plain")) + msg.attach(MIMEText(html, "html")) + + with smtplib.SMTP("mail.hosting.reg.ru", 587) as server: + server.starttls() + server.login("quiz@4mont.ru", "utOgbZ09quizpochta") + server.send_message(msg) + diff --git a/wsgi.py b/wsgi.py new file mode 100644 index 0000000..ae4d675 --- /dev/null +++ b/wsgi.py @@ -0,0 +1,7 @@ +from app import create_app + +app = create_app() + +if __name__ == '__main__': + app.run(host='0.0.0.0', debug=True) +