refactor: introduce app factory + blueprints (auth, core, admin users, invite); add utils and wsgi entrypoint; keep legacy routes for now
This commit is contained in:
47
app/__init__.py
Normal file
47
app/__init__.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from flask import Flask
|
||||
from flask_login import LoginManager
|
||||
import os
|
||||
import json
|
||||
|
||||
from .models import initialize_db, User
|
||||
from .utils.auth import ensure_default_admin, ensure_test_users
|
||||
|
||||
login_manager = LoginManager()
|
||||
|
||||
|
||||
def create_app():
|
||||
app = Flask(__name__)
|
||||
app.secret_key = os.environ.get('FLASK_SECRET', 'sk_f098a9f7206d40f89bc2a0dd1d2d9182')
|
||||
app.jinja_env.filters['from_json'] = json.loads
|
||||
|
||||
# DB init and seeds
|
||||
initialize_db()
|
||||
if os.environ.get('SEED_ADMIN_DISABLED') != '1':
|
||||
ensure_default_admin()
|
||||
if os.environ.get('SEED_TEST_USERS_DISABLED') != '1':
|
||||
ensure_test_users()
|
||||
|
||||
# Login manager
|
||||
login_manager.init_app(app)
|
||||
login_manager.login_view = 'login'
|
||||
|
||||
@login_manager.user_loader
|
||||
def load_user(user_id):
|
||||
try:
|
||||
return User.get_by_id(int(user_id))
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
# Register blueprints
|
||||
from .blueprints.auth import bp as auth_bp
|
||||
from .blueprints.admin_users import bp as admin_users_bp
|
||||
from .blueprints.core import bp as core_bp
|
||||
from .blueprints.invite import bp as invite_bp
|
||||
|
||||
app.register_blueprint(auth_bp)
|
||||
app.register_blueprint(core_bp)
|
||||
app.register_blueprint(invite_bp)
|
||||
app.register_blueprint(admin_users_bp)
|
||||
|
||||
return app
|
||||
|
||||
91
app/blueprints/admin_users.py
Normal file
91
app/blueprints/admin_users.py
Normal file
@@ -0,0 +1,91 @@
|
||||
from flask import Blueprint, render_template, request, redirect, url_for, flash
|
||||
from werkzeug.security import generate_password_hash
|
||||
from app.models import User
|
||||
from app.utils.auth import admin_required
|
||||
|
||||
bp = Blueprint('admin_users', __name__)
|
||||
|
||||
|
||||
@bp.route('/admin/users', methods=['GET', 'POST'], endpoint='manage_users')
|
||||
@admin_required
|
||||
def manage_users():
|
||||
if request.method == 'POST':
|
||||
username = request.form['username'].strip()
|
||||
email = request.form['email'].strip()
|
||||
full_name = request.form.get('full_name', '').strip()
|
||||
password = request.form['password']
|
||||
is_admin_flag = request.form.get('is_admin') == 'on'
|
||||
|
||||
if not username, or not email or not password:
|
||||
flash('Заполните обязательные поля', 'danger')
|
||||
return redirect(url_for('manage_users'))
|
||||
if User.select().where((User.username == username) | (User.email == email)).exists():
|
||||
flash('Пользователь с таким логином или email уже существует', 'danger')
|
||||
return redirect(url_for('manage_users'))
|
||||
|
||||
User.create(
|
||||
username=username,
|
||||
email=email,
|
||||
full_name=full_name or None,
|
||||
password_hash=generate_password_hash(password),
|
||||
is_admin=is_admin_flag,
|
||||
)
|
||||
flash('Пользователь создан', 'success')
|
||||
return redirect(url_for('manage_users'))
|
||||
|
||||
users = User.select().order_by(User.id)
|
||||
return render_template('admin/users.html', users=users, title='Пользователи')
|
||||
|
||||
|
||||
@bp.route('/admin/users/<int:user_id>/reset_password', methods=['POST'], endpoint='admin_reset_password')
|
||||
@admin_required
|
||||
def admin_reset_password(user_id):
|
||||
user = User.get_or_none(User.id == user_id)
|
||||
if not user:
|
||||
flash('Пользователь не найден', 'danger')
|
||||
return redirect(url_for('manage_users'))
|
||||
new_password = request.form.get('new_password')
|
||||
if not new_password:
|
||||
flash('Укажите новый пароль', 'danger')
|
||||
return redirect(url_for('manage_users'))
|
||||
user.password_hash = generate_password_hash(new_password)
|
||||
user.save()
|
||||
flash('Пароль обновлён', 'success')
|
||||
return redirect(url_for('manage_users'))
|
||||
|
||||
|
||||
@bp.route('/admin/users/<int:user_id>/toggle_admin', methods=['POST'], endpoint='admin_toggle_admin')
|
||||
@admin_required
|
||||
def admin_toggle_admin(user_id):
|
||||
from flask_login import current_user
|
||||
if current_user.id == user_id:
|
||||
flash('Нельзя менять свои собственные права', 'warning')
|
||||
return redirect(url_for('manage_users'))
|
||||
user = User.get_or_none(User.id == user_id)
|
||||
if not user:
|
||||
flash('Пользователь не найден', 'danger')
|
||||
return redirect(url_for('manage_users'))
|
||||
user.is_admin = not user.is_admin
|
||||
user.save()
|
||||
flash('Права обновлены', 'success')
|
||||
return redirect(url_for('manage_users'))
|
||||
|
||||
|
||||
@bp.route('/admin/users/<int:user_id>/delete', methods=['POST'], endpoint='admin_delete_user')
|
||||
@admin_required
|
||||
def admin_delete_user(user_id):
|
||||
from flask_login import current_user
|
||||
if current_user.id == user_id:
|
||||
flash('Нельзя удалить самого себя', 'warning')
|
||||
return redirect(url_for('manage_users'))
|
||||
user = User.get_or_none(User.id == user_id)
|
||||
if not user:
|
||||
flash('Пользователь не найден', 'danger')
|
||||
return redirect(url_for('manage_users'))
|
||||
try:
|
||||
user.delete_instance(recursive=True)
|
||||
flash('Пользователь удалён', 'success')
|
||||
except Exception:
|
||||
flash('Не удалось удалить пользователя', 'danger')
|
||||
return redirect(url_for('manage_users'))
|
||||
|
||||
29
app/blueprints/auth.py
Normal file
29
app/blueprints/auth.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from flask import Blueprint, render_template, request, redirect, url_for, flash
|
||||
from flask_login import login_user, logout_user
|
||||
from werkzeug.security import check_password_hash
|
||||
|
||||
from app.models import User
|
||||
|
||||
bp = Blueprint('auth', __name__)
|
||||
|
||||
|
||||
@bp.route('/login', methods=['GET', 'POST'], endpoint='login')
|
||||
def login():
|
||||
if request.method == 'POST':
|
||||
username = request.form['username']
|
||||
password = request.form['password']
|
||||
user = User.get_or_none(User.username == username)
|
||||
|
||||
if user and check_password_hash(user.password_hash, password):
|
||||
login_user(user)
|
||||
return redirect(url_for('dashboard' if not user.is_admin else 'manage_surveys'))
|
||||
else:
|
||||
flash('Неверный логин или пароль', 'danger')
|
||||
return render_template('login.html', title='Вход')
|
||||
|
||||
|
||||
@bp.route('/logout', endpoint='logout')
|
||||
def logout():
|
||||
logout_user()
|
||||
return redirect(url_for('choose_survey'))
|
||||
|
||||
54
app/blueprints/core.py
Normal file
54
app/blueprints/core.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from flask import Blueprint, render_template, redirect, url_for
|
||||
from flask_login import login_required, current_user
|
||||
from app.models import SurveyInvite, UserSurvey, SurveyType, User
|
||||
|
||||
bp = Blueprint('core', __name__)
|
||||
|
||||
|
||||
@bp.route('/', endpoint='home')
|
||||
def home():
|
||||
return redirect(url_for('choose_survey'))
|
||||
|
||||
|
||||
@bp.route('/dashboard', endpoint='dashboard')
|
||||
@login_required
|
||||
def dashboard():
|
||||
if current_user.is_admin:
|
||||
invites = (SurveyInvite
|
||||
.select()
|
||||
.join(UserSurvey)
|
||||
.order_by(SurveyInvite.sent_at.desc()))
|
||||
else:
|
||||
invites = (SurveyInvite
|
||||
.select()
|
||||
.join(UserSurvey)
|
||||
.where(UserSurvey.user == current_user.id)
|
||||
.order_by(SurveyInvite.sent_at.desc()))
|
||||
return render_template('user/dashboard.html', invites=invites, title='Отчёты')
|
||||
|
||||
|
||||
@bp.route('/quiz', endpoint='choose_survey')
|
||||
@login_required
|
||||
def choose_survey():
|
||||
if current_user.is_admin:
|
||||
surveys = SurveyType.select()
|
||||
users = User.select().where(User.is_admin == False)
|
||||
assignments = {
|
||||
s.id: [us.user.id for us in UserSurvey.select().where(UserSurvey.survey_type == s)]
|
||||
for s in surveys
|
||||
}
|
||||
else:
|
||||
surveys = (SurveyType
|
||||
.select()
|
||||
.join(UserSurvey)
|
||||
.where(UserSurvey.user == current_user.id)
|
||||
.distinct())
|
||||
users = []
|
||||
assignments = {}
|
||||
|
||||
return render_template('choose_survey.html',
|
||||
surveys=surveys,
|
||||
users=users,
|
||||
assignments=assignments,
|
||||
title='Опросы')
|
||||
|
||||
90
app/blueprints/invite.py
Normal file
90
app/blueprints/invite.py
Normal file
@@ -0,0 +1,90 @@
|
||||
from flask import Blueprint, render_template, request, redirect, url_for, flash
|
||||
import datetime
|
||||
import json
|
||||
from app.models import SurveyInvite, Product, Feature, Platform, PlatformFeature, UserSurvey, SurveyType
|
||||
from app.utils.mail import send_result_email
|
||||
|
||||
bp = Blueprint('invite', __name__)
|
||||
|
||||
|
||||
@bp.route('/invite/<uuid_str>', methods=['GET', 'POST'], endpoint='handle_invite')
|
||||
def handle_invite(uuid_str):
|
||||
invite = SurveyInvite.get_or_none(SurveyInvite.uuid == uuid_str)
|
||||
if not invite:
|
||||
return "Ссылка недействительна", 404
|
||||
|
||||
user_survey = invite.survey
|
||||
survey_type = user_survey.survey_type
|
||||
products = Product.select().where(Product.survey_type == survey_type)
|
||||
|
||||
if request.method == 'GET':
|
||||
questions = {}
|
||||
for product in products:
|
||||
features = Feature.select().where(Feature.product == product)
|
||||
questions[product.name] = {f.name: f.question_text for f in features}
|
||||
return render_template("invite/form.html", invite=invite, survey=user_survey, questions=questions)
|
||||
|
||||
full_name = request.form.get('full_name')
|
||||
phone = request.form.get('phone')
|
||||
organization = request.form.get('organization')
|
||||
answers = dict(request.form)
|
||||
for k in ['full_name', 'phone', 'organization']:
|
||||
answers.pop(k, None)
|
||||
selected_keys = list(answers.keys())
|
||||
|
||||
platforms = Platform.select().where(Platform.survey_type == survey_type)
|
||||
features = Feature.select().join(Product).where(
|
||||
Feature.name.in_(selected_keys),
|
||||
Product.survey_type == survey_type
|
||||
)
|
||||
comment = request.form.get('comment')
|
||||
scores = {}
|
||||
unsupported = {}
|
||||
total = len(selected_keys)
|
||||
|
||||
for platform in platforms:
|
||||
match = 0
|
||||
unsup = []
|
||||
for feature in features:
|
||||
pf = PlatformFeature.get_or_none(platform=platform, feature=feature)
|
||||
if pf and pf.supported:
|
||||
match += 1
|
||||
else:
|
||||
unsup.append(feature.question_text)
|
||||
scores[platform.name] = round((match / total) * 100) if total else 0
|
||||
unsupported[platform.name] = unsup
|
||||
|
||||
SurveyResult = __import__('app.models', fromlist=['SurveyResult']).SurveyResult
|
||||
SurveyResult.create(
|
||||
invite=invite,
|
||||
full_name=full_name,
|
||||
phone=phone,
|
||||
organization=organization,
|
||||
answers=json.dumps(answers),
|
||||
platform_scores=json.dumps(scores),
|
||||
unsupported=json.dumps(unsupported),
|
||||
submitted_at=datetime.datetime.now(),
|
||||
comment=comment
|
||||
)
|
||||
|
||||
invite.responded = True
|
||||
invite.save()
|
||||
|
||||
send_result_email(
|
||||
to_email=invite.survey.user.email,
|
||||
from_email=invite.recipient_email,
|
||||
survey_name=survey_type.name,
|
||||
scores=scores,
|
||||
unsupported=unsupported,
|
||||
sender_name=invite.survey.user.full_name
|
||||
)
|
||||
|
||||
if not invite.show_result:
|
||||
flash("Спасибо! Ваши ответы отправлены.", "success")
|
||||
return redirect("https://www.mont.ru/ru-ru")
|
||||
|
||||
return render_template("invite/result.html",
|
||||
scores=scores,
|
||||
unsupported_features=unsupported,
|
||||
chart_labels=list(scores.keys()),
|
||||
chart_data=list(scores.values()))
|
||||
117
app/models.py
Normal file
117
app/models.py
Normal file
@@ -0,0 +1,117 @@
|
||||
from peewee import *
|
||||
from flask_login import UserMixin
|
||||
import os
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
USE_SQLITE = os.environ.get("USE_SQLITE") == "1"
|
||||
|
||||
if USE_SQLITE:
|
||||
db = SqliteDatabase(os.environ.get("SQLITE_PATH", "survey.db"))
|
||||
else:
|
||||
db = PostgresqlDatabase(
|
||||
os.environ.get("POSTGRES_DB", "survey"),
|
||||
user=os.environ.get("POSTGRES_USER", "servey"),
|
||||
password=os.environ.get("POSTGRES_PASSWORD", "utOgbZ09servey"),
|
||||
host=os.environ.get("POSTGRES_HOST", "db"),
|
||||
port=int(os.environ.get("POSTGRES_PORT", 5432))
|
||||
)
|
||||
|
||||
|
||||
class BaseModel(Model):
|
||||
class Meta:
|
||||
database = db
|
||||
|
||||
|
||||
class User(BaseModel, UserMixin):
|
||||
username = CharField(unique=True)
|
||||
email = CharField(unique=True)
|
||||
password_hash = CharField()
|
||||
full_name = CharField(null=True)
|
||||
is_admin = BooleanField(default=False)
|
||||
|
||||
def get_id(self):
|
||||
return str(self.id)
|
||||
|
||||
@property
|
||||
def is_active(self):
|
||||
return True
|
||||
|
||||
|
||||
class SurveyType(BaseModel):
|
||||
name = CharField(unique=True)
|
||||
|
||||
|
||||
class Product(BaseModel):
|
||||
name = CharField()
|
||||
survey_type = ForeignKeyField(SurveyType, backref='products', on_delete='CASCADE')
|
||||
|
||||
|
||||
class Feature(BaseModel):
|
||||
question_text = TextField()
|
||||
product = ForeignKeyField(Product, backref='features', on_delete='CASCADE')
|
||||
|
||||
|
||||
class Platform(BaseModel):
|
||||
name = CharField(unique=True)
|
||||
survey_type = ForeignKeyField(SurveyType, backref='platforms', on_delete='CASCADE')
|
||||
|
||||
|
||||
class PlatformFeature(BaseModel):
|
||||
platform = ForeignKeyField(Platform, backref='features', on_delete='CASCADE')
|
||||
feature = ForeignKeyField(Feature, backref='platforms', on_delete='CASCADE')
|
||||
supported = BooleanField(default=True)
|
||||
|
||||
|
||||
class UserSurvey(BaseModel):
|
||||
user = ForeignKeyField(User, backref='surveys', on_delete='CASCADE')
|
||||
survey_type = ForeignKeyField(SurveyType, on_delete='CASCADE')
|
||||
name = CharField()
|
||||
created_at = DateTimeField(default=datetime.datetime.now)
|
||||
|
||||
|
||||
class SurveyInvite(BaseModel):
|
||||
survey = ForeignKeyField(UserSurvey, backref='invites', on_delete='CASCADE')
|
||||
recipient_email = CharField()
|
||||
uuid = UUIDField(default=uuid.uuid4, unique=True)
|
||||
show_result = BooleanField(default=False)
|
||||
ask_full_name = BooleanField(default=False)
|
||||
ask_phone = BooleanField(default=False)
|
||||
ask_organization = BooleanField(default=False)
|
||||
sent_at = DateTimeField(default=datetime.datetime.now)
|
||||
responded = BooleanField(default=False)
|
||||
|
||||
|
||||
class SurveyResult(BaseModel):
|
||||
invite = ForeignKeyField(SurveyInvite, backref='result')
|
||||
full_name = CharField(null=True)
|
||||
phone = CharField(null=True)
|
||||
organization = CharField(null=True)
|
||||
answers = TextField(null=True)
|
||||
platform_scores = TextField(null=True)
|
||||
unsupported = TextField(null=True)
|
||||
submitted_at = DateTimeField(default=datetime.datetime.now, null=False)
|
||||
comment = TextField(null=True)
|
||||
|
||||
|
||||
def initialize_db():
|
||||
try:
|
||||
db.connect()
|
||||
except Exception:
|
||||
if not USE_SQLITE:
|
||||
sqlite_db = SqliteDatabase(os.environ.get("SQLITE_PATH", "survey.db"))
|
||||
sqlite_db.bind([
|
||||
User, SurveyType, Product, Feature, Platform, PlatformFeature,
|
||||
UserSurvey, SurveyInvite, SurveyResult
|
||||
], bind_refs=False, bind_backrefs=False)
|
||||
globals()['db'] = sqlite_db
|
||||
sqlite_db.connect()
|
||||
else:
|
||||
raise
|
||||
|
||||
db.create_tables([
|
||||
User, SurveyType, Product, Feature, Platform, PlatformFeature,
|
||||
UserSurvey, SurveyInvite, SurveyResult
|
||||
], safe=True)
|
||||
db.close()
|
||||
|
||||
67
app/utils/auth.py
Normal file
67
app/utils/auth.py
Normal file
@@ -0,0 +1,67 @@
|
||||
from flask import redirect, url_for, flash
|
||||
from flask_login import current_user
|
||||
from functools import wraps
|
||||
from werkzeug.security import generate_password_hash, check_password_hash
|
||||
import os
|
||||
|
||||
from app.models import User
|
||||
|
||||
|
||||
def admin_required(f):
|
||||
@wraps(f)
|
||||
def decorated(*args, **kwargs):
|
||||
if not current_user.is_authenticated:
|
||||
return redirect(url_for('login'))
|
||||
if not getattr(current_user, 'is_admin', False):
|
||||
flash('Недостаточно прав', 'danger')
|
||||
return redirect(url_for('dashboard'))
|
||||
return f(*args, **kwargs)
|
||||
return decorated
|
||||
|
||||
|
||||
def ensure_default_admin():
|
||||
username = os.environ.get('ADMIN_USERNAME', 'ruslan')
|
||||
password = os.environ.get('ADMIN_PASSWORD', '1234')
|
||||
email = os.environ.get('ADMIN_EMAIL', 'ruslan@example.com')
|
||||
|
||||
user = User.get_or_none(User.username == username)
|
||||
if user:
|
||||
changed = False
|
||||
if not user.is_admin:
|
||||
user.is_admin = True
|
||||
changed = True
|
||||
try:
|
||||
if not check_password_hash(user.password_hash, password):
|
||||
user.password_hash = generate_password_hash(password)
|
||||
changed = True
|
||||
except Exception:
|
||||
user.password_hash = generate_password_hash(password)
|
||||
changed = True
|
||||
if changed:
|
||||
user.save()
|
||||
else:
|
||||
User.create(
|
||||
username=username,
|
||||
email=email,
|
||||
full_name='Администратор',
|
||||
password_hash=generate_password_hash(password),
|
||||
is_admin=True
|
||||
)
|
||||
|
||||
|
||||
def ensure_test_users():
|
||||
tests = [
|
||||
('test1', 'test1@example.com', 'Пользователь 1'),
|
||||
('test2', 'test2@example.com', 'Пользователь 2'),
|
||||
]
|
||||
for username, email, full_name in tests:
|
||||
u = User.get_or_none(User.username == username)
|
||||
if not u:
|
||||
User.create(
|
||||
username=username,
|
||||
email=email,
|
||||
full_name=full_name,
|
||||
password_hash=generate_password_hash(os.environ.get('TEST_USER_PASSWORD', '1234')),
|
||||
is_admin=False,
|
||||
)
|
||||
|
||||
64
app/utils/mail.py
Normal file
64
app/utils/mail.py
Normal file
@@ -0,0 +1,64 @@
|
||||
import smtplib
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
|
||||
def send_invite_email(to_email, link, sender_name, survey_name):
|
||||
msg = MIMEMultipart("alternative")
|
||||
msg['Subject'] = f"Приглашение пройти опрос: {survey_name}"
|
||||
msg['From'] = "quiz@4mont.ru"
|
||||
msg['To'] = to_email
|
||||
|
||||
plain_text = f"Опрос: {survey_name}\nОтправитель: {sender_name}\n\nСсылка: {link}\n"
|
||||
html = f"""
|
||||
<html>
|
||||
<body style="font-family: sans-serif;">
|
||||
<h3>Приглашение пройти опрос</h3>
|
||||
<p><strong>Опрос:</strong> {survey_name}<br>
|
||||
<strong>Отправитель:</strong> {sender_name}</p>
|
||||
<p><a href="{link}" style="font-size: 16px; font-weight: bold;">Перейти к опросу</a></p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
msg.attach(MIMEText(plain_text, "plain"))
|
||||
msg.attach(MIMEText(html, "html"))
|
||||
|
||||
with smtplib.SMTP("mail.hosting.reg.ru", 587) as server:
|
||||
server.starttls()
|
||||
server.login("quiz@4mont.ru", "utOgbZ09quizpochta")
|
||||
server.send_message(msg)
|
||||
|
||||
|
||||
def send_result_email(to_email, from_email, survey_name, scores, unsupported, sender_name):
|
||||
report_link = "https://quiz.4mont.ru/dashboard"
|
||||
plain_text = (
|
||||
f"Результаты опроса\nОтправитель: {sender_name} ({from_email})\n"
|
||||
f"Опрос: {survey_name}\n\n" +
|
||||
"\n".join(f"{p}: {v}%" for p, v in scores.items()) +
|
||||
"\n\nПодробности: " + report_link
|
||||
)
|
||||
html = f"""
|
||||
<html>
|
||||
<body style="font-family: sans-serif;">
|
||||
<h3>Результаты опроса: {survey_name}</h3>
|
||||
<ul>
|
||||
{''.join(f'<li><strong>{p}:</strong> {v}%</li>' for p, v in scores.items())}
|
||||
</ul>
|
||||
<p><a href="{report_link}" style="font-weight: bold;">Открыть отчеты</a></p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
msg = MIMEMultipart("alternative")
|
||||
msg['Subject'] = f"Результаты опроса: {survey_name}"
|
||||
msg['From'] = "quiz@4mont.ru"
|
||||
msg['To'] = to_email
|
||||
msg.attach(MIMEText(plain_text, "plain"))
|
||||
msg.attach(MIMEText(html, "html"))
|
||||
|
||||
with smtplib.SMTP("mail.hosting.reg.ru", 587) as server:
|
||||
server.starttls()
|
||||
server.login("quiz@4mont.ru", "utOgbZ09quizpochta")
|
||||
server.send_message(msg)
|
||||
|
||||
Reference in New Issue
Block a user