|
from flask import Flask, request, jsonify, render_template, flash, redirect, url_for, current_app |
|
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user |
|
from werkzeug.security import generate_password_hash, check_password_hash |
|
from transformers import pipeline |
|
from flask import session |
|
import torch |
|
from pydub import AudioSegment |
|
import os |
|
import io |
|
import uuid |
|
from datetime import datetime |
|
import sqlite3 |
|
from pathlib import Path |
|
import whisper |
|
from extensions import db, login_manager |
|
import json |
|
from admin import admin_bp |
|
from flask_migrate import Migrate |
|
from models import User |
|
from werkzeug.utils import secure_filename |
|
from forms import EditProfileForm |
|
|
|
instance_path = Path(__file__).parent / 'instance' |
|
instance_path.mkdir(exist_ok=True, mode=0o755) |
|
|
|
app = Flask(__name__) |
|
app.secret_key = 'очень_сложный_секретный_ключ_здесь' |
|
db_path = instance_path / 'chats.db' |
|
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}' |
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False |
|
|
|
|
|
db.init_app(app) |
|
login_manager.init_app(app) |
|
login_manager.login_view = 'welcome' |
|
migrate = Migrate(app, db) |
|
|
|
|
|
|
|
def init_models(): |
|
try: |
|
emotion_map = { |
|
'joy': '😊 Радость', |
|
'neutral': '😐 Нейтрально', |
|
'anger': '😠 Злость', |
|
'sadness': '😢 Грусть', |
|
'surprise': '😲 Удивление' |
|
} |
|
|
|
speech_to_text_model = whisper.load_model("base") |
|
text_classifier = pipeline( |
|
"text-classification", |
|
model="cointegrated/rubert-tiny2-cedr-emotion-detection" |
|
) |
|
audio_classifier = pipeline( |
|
"audio-classification", |
|
model="superb/hubert-large-superb-er" |
|
) |
|
|
|
return { |
|
'emotion_map': emotion_map, |
|
'speech_to_text_model': speech_to_text_model, |
|
'text_classifier': text_classifier, |
|
'audio_classifier': audio_classifier |
|
} |
|
except Exception as e: |
|
print(f"Ошибка загрузки моделей: {e}") |
|
return None |
|
|
|
|
|
models = init_models() |
|
if not models: |
|
raise RuntimeError("Не удалось загрузить модели") |
|
|
|
|
|
@app.template_filter('datetimeformat') |
|
def datetimeformat(value, format='%d.%m.%Y %H:%M'): |
|
if value is None: |
|
return "" |
|
return value.strftime(format) |
|
|
|
|
|
@app.context_processor |
|
def utility_processor(): |
|
return { |
|
'emotion_map': { |
|
'joy': '😊 Радость', |
|
'neutral': '😐 Нейтрально', |
|
'anger': '😠 Злость', |
|
'sadness': '😢 Грусть', |
|
'surprise': '😲 Удивление' |
|
}, |
|
'get_emotion_color': lambda emotion: { |
|
'joy': '#00b894', |
|
'neutral': '#636e72', |
|
'anger': '#d63031', |
|
'sadness': '#0984e3', |
|
'surprise': '#fdcb6e' |
|
}.get(emotion, '#4a4ae8') |
|
} |
|
|
|
|
|
|
|
from auth import auth_bp |
|
from profile import profile_bp |
|
|
|
app.register_blueprint(auth_bp) |
|
app.register_blueprint(profile_bp) |
|
app.register_blueprint(admin_bp, url_prefix='/admin') |
|
|
|
|
|
emotion_map = models['emotion_map'] |
|
speech_to_text_model = models['speech_to_text_model'] |
|
text_classifier = models['text_classifier'] |
|
audio_classifier = models['audio_classifier'] |
|
|
|
|
|
@app.cli.command('create-admin') |
|
def create_admin(): |
|
"""Создание администратора""" |
|
email = input("Введите email: ") |
|
password = input("Введите пароль: ") |
|
user = User.query.filter_by(email=email).first() |
|
if user: |
|
user.is_admin = True |
|
user.set_password(password) |
|
else: |
|
user = User(email=email, username=email, is_admin=True) |
|
user.set_password(password) |
|
db.session.add(user) |
|
db.session.commit() |
|
print(f"Администратор {email} создан") |
|
|
|
|
|
def transcribe_audio(audio_path): |
|
"""Преобразование аудио в текст с помощью Whisper""" |
|
if not speech_to_text_model: |
|
return None |
|
try: |
|
result = speech_to_text_model.transcribe(audio_path, language="ru") |
|
return result["text"] |
|
except Exception as e: |
|
print(f"Ошибка преобразования аудио в текст: {e}") |
|
return None |
|
|
|
|
|
|
|
login_manager = LoginManager(app) |
|
login_manager.login_view = 'auth_bp.login' |
|
login_manager.login_message = "Для доступа к этой странице необходимо авторизоваться" |
|
login_manager.login_message_category = "info" |
|
|
|
|
|
@login_manager.user_loader |
|
def load_user(user_id): |
|
from models import User |
|
return User.query.get(int(user_id)) |
|
|
|
|
|
|
|
def get_db_connection(): |
|
instance_path = Path('instance') |
|
instance_path.mkdir(exist_ok=True) |
|
db_path = instance_path / 'chats.db' |
|
conn = sqlite3.connect(str(db_path)) |
|
conn.row_factory = sqlite3.Row |
|
return conn |
|
|
|
|
|
def init_db(): |
|
conn = get_db_connection() |
|
try: |
|
conn.execute(''' |
|
CREATE TABLE IF NOT EXISTS users ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
username TEXT UNIQUE NOT NULL, |
|
email TEXT UNIQUE NOT NULL, |
|
password_hash TEXT NOT NULL, |
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP |
|
) |
|
''') |
|
conn.execute(''' |
|
CREATE TABLE IF NOT EXISTS chats ( |
|
chat_id TEXT PRIMARY KEY, |
|
user_id INTEGER, |
|
created_at TEXT, |
|
title TEXT, |
|
FOREIGN KEY(user_id) REFERENCES users(id) |
|
) |
|
''') |
|
conn.execute(''' |
|
CREATE TABLE IF NOT EXISTS messages ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
chat_id TEXT, |
|
sender TEXT, |
|
content TEXT, |
|
timestamp TEXT, |
|
FOREIGN KEY(chat_id) REFERENCES chats(chat_id) |
|
) |
|
''') |
|
conn.execute(''' |
|
CREATE TABLE IF NOT EXISTS analysis_reports ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
user_id INTEGER, |
|
content TEXT, |
|
emotion TEXT, |
|
confidence REAL, |
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP, |
|
FOREIGN KEY(user_id) REFERENCES users(id) |
|
) |
|
''') |
|
conn.commit() |
|
finally: |
|
conn.close() |
|
|
|
|
|
init_db() |
|
|
|
|
|
|
|
@app.route('/login', methods=['GET', 'POST']) |
|
def login(): |
|
if request.method == 'POST': |
|
email = request.form.get('email') |
|
password = request.form.get('password') |
|
|
|
conn = get_db_connection() |
|
user = conn.execute( |
|
"SELECT id, username, email, password_hash FROM users WHERE email = ?", |
|
(email,) |
|
).fetchone() |
|
conn.close() |
|
|
|
if user and check_password_hash(user['password_hash'], password): |
|
user_obj = User(id=user['id'], username=user['username'], |
|
email=user['email'], password_hash=user['password_hash']) |
|
login_user(user_obj) |
|
session.pop('_flashes', None) |
|
return redirect(url_for('index')) |
|
else: |
|
flash('Неверный email или пароль', 'danger') |
|
|
|
return render_template('auth/login.html') |
|
|
|
|
|
@app.route('/register', methods=['GET', 'POST']) |
|
def register(): |
|
if request.method == 'POST': |
|
username = request.form.get('username') |
|
email = request.form.get('email') |
|
password = request.form.get('password') |
|
confirm_password = request.form.get('confirm_password') |
|
|
|
if password != confirm_password: |
|
flash('Пароли не совпадают', 'danger') |
|
return redirect(url_for('register')) |
|
|
|
conn = get_db_connection() |
|
try: |
|
password_hash = generate_password_hash(password) |
|
conn.execute( |
|
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)", |
|
(username, email, password_hash) |
|
) |
|
conn.commit() |
|
flash('Регистрация прошла успешно! Теперь вы можете войти.', 'success') |
|
return redirect(url_for('login')) |
|
except sqlite3.IntegrityError: |
|
flash('Пользователь с таким email или именем уже существует', 'danger') |
|
finally: |
|
conn.close() |
|
|
|
return render_template('auth/register.html') |
|
|
|
|
|
from werkzeug.security import check_password_hash, generate_password_hash |
|
|
|
@app.route('/edit_profile', methods=['GET', 'POST']) |
|
@login_required |
|
def edit_profile(): |
|
form = EditProfileForm(obj=current_user) |
|
if form.validate_on_submit(): |
|
|
|
current_user.username = form.username.data |
|
current_user.email = form.email.data |
|
|
|
|
|
if form.avatar.data: |
|
filename = secure_filename(form.avatar.data.filename) |
|
unique_filename = f"{uuid.uuid4().hex}_{filename}" |
|
avatar_path = os.path.join(current_app.root_path, 'static/avatars', unique_filename) |
|
form.avatar.data.save(avatar_path) |
|
current_user.avatar = unique_filename |
|
|
|
|
|
if form.current_password.data: |
|
|
|
if check_password_hash(current_user.password_hash, form.current_password.data): |
|
|
|
current_user.password_hash = generate_password_hash(form.new_password.data) |
|
flash('Пароль успешно изменён', 'success') |
|
else: |
|
flash('Текущий пароль неверный', 'danger') |
|
return redirect(url_for('edit_profile')) |
|
|
|
db.session.commit() |
|
flash('Профиль обновлён', 'success') |
|
return redirect(url_for('profile')) |
|
|
|
return render_template('edit_profile.html', form=form) |
|
|
|
|
|
|
|
|
|
@app.route("/welcome") |
|
def welcome(): |
|
return render_template("welcome.html") |
|
|
|
|
|
@app.route('/logout') |
|
@login_required |
|
def logout(): |
|
session.clear() |
|
logout_user() |
|
return redirect(url_for('welcome')) |
|
|
|
|
|
@app.route("/") |
|
def index(): |
|
if current_user.is_authenticated: |
|
conn = get_db_connection() |
|
try: |
|
chats = conn.execute( |
|
"SELECT chat_id, title FROM chats WHERE user_id = ? ORDER BY created_at DESC", |
|
(current_user.id,) |
|
).fetchall() |
|
return render_template("index.html", chats=chats) |
|
finally: |
|
conn.close() |
|
return redirect(url_for('welcome')) |
|
|
|
|
|
@app.route('/profile') |
|
@login_required |
|
def profile(): |
|
conn = get_db_connection() |
|
try: |
|
|
|
reports = conn.execute( |
|
"SELECT * FROM analysis_reports WHERE user_id = ? ORDER BY created_at DESC", |
|
(current_user.id,) |
|
).fetchall() |
|
|
|
|
|
total_reports = len(reports) |
|
|
|
|
|
emotion_counts = {} |
|
for r in reports: |
|
emotion_counts[r['emotion']] = emotion_counts.get(r['emotion'], 0) + 1 |
|
|
|
most_common_emotion = max(emotion_counts, key=emotion_counts.get) if emotion_counts else None |
|
|
|
return render_template( |
|
"profile.html", |
|
reports=reports, |
|
total_reports=total_reports, |
|
most_common_emotion=most_common_emotion, |
|
emotion_map={ |
|
'joy': '😊 Радость', |
|
'neutral': '😐 Нейтрально', |
|
'anger': '😠 Злость', |
|
'sadness': '😢 Грусть', |
|
'surprise': '😲 Удивление' |
|
} |
|
) |
|
except Exception as e: |
|
flash(f"Ошибка загрузки данных: {e}", "danger") |
|
return redirect(url_for('index')) |
|
finally: |
|
conn.close() |
|
|
|
|
|
@app.route("/analyze", methods=["POST"]) |
|
@login_required |
|
def analyze_text(): |
|
if not text_classifier: |
|
return jsonify({"error": "Model not loaded"}), 500 |
|
|
|
try: |
|
data = request.get_json() |
|
text = data.get("text", "").strip() |
|
|
|
if not text: |
|
return jsonify({"error": "Empty text"}), 400 |
|
|
|
|
|
result = text_classifier(text) |
|
|
|
|
|
if not result or not isinstance(result, list): |
|
return jsonify({"error": "Invalid model response"}), 500 |
|
|
|
|
|
prediction = result[0] if result else {} |
|
|
|
|
|
if not all(key in prediction for key in ['label', 'score']): |
|
return jsonify({"error": "Invalid prediction format"}), 500 |
|
|
|
|
|
conn = get_db_connection() |
|
conn.execute( |
|
"INSERT INTO analysis_reports (user_id, content, emotion, confidence) VALUES (?, ?, ?, ?)", |
|
(current_user.id, text, prediction['label'], prediction['score']) |
|
) |
|
conn.commit() |
|
conn.close() |
|
|
|
return jsonify({ |
|
"emotion": emotion_map.get(prediction['label'], "❓ Неизвестно"), |
|
"confidence": float(prediction['score']) |
|
}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
|
|
|
|
@app.route('/analyze_audio', methods=['POST']) |
|
@login_required |
|
def analyze_audio(): |
|
if not audio_classifier or not speech_to_text_model: |
|
return jsonify({"error": "Model not loaded"}), 500 |
|
|
|
if 'audio' not in request.files: |
|
return jsonify({'error': 'No audio file'}), 400 |
|
|
|
try: |
|
audio_file = request.files['audio'] |
|
temp_path = "temp_audio.wav" |
|
|
|
audio = AudioSegment.from_file(io.BytesIO(audio_file.read())) |
|
audio = audio.set_frame_rate(16000).set_channels(1) |
|
audio.export(temp_path, format="wav", codec="pcm_s16le") |
|
|
|
transcribed_text = transcribe_audio(temp_path) |
|
result = audio_classifier(temp_path) |
|
os.remove(temp_path) |
|
|
|
emotion_mapping = { |
|
'hap': 'happy', |
|
'sad': 'sad', |
|
'neu': 'neutral', |
|
'ang': 'angry' |
|
} |
|
emotions = {emotion_mapping.get(item['label'].lower(), 'neutral'): item['score'] |
|
for item in result if item['label'].lower() in emotion_mapping} |
|
|
|
dominant_emotion = max(emotions.items(), key=lambda x: x[1]) |
|
response_map = { |
|
'happy': '😊 Радость', |
|
'sad': '😢 Грусть', |
|
'angry': '😠 Злость', |
|
'neutral': '😐 Нейтрально' |
|
} |
|
|
|
conn = get_db_connection() |
|
conn.execute( |
|
"INSERT INTO analysis_reports (user_id, content, emotion, confidence) VALUES (?, ?, ?, ?)", |
|
(current_user.id, transcribed_text, dominant_emotion[0], dominant_emotion[1]) |
|
) |
|
conn.commit() |
|
conn.close() |
|
|
|
return jsonify({ |
|
'emotion': response_map.get(dominant_emotion[0], 'неизвестно'), |
|
'confidence': round(dominant_emotion[1], 2), |
|
'transcribed_text': transcribed_text if transcribed_text else "Не удалось распознать текст" |
|
}) |
|
except Exception as e: |
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/analyze_telegram_chat', methods=['POST']) |
|
@login_required |
|
def analyze_telegram_chat(): |
|
if 'file' not in request.files: |
|
return jsonify({'error': 'No file uploaded'}), 400 |
|
file = request.files['file'] |
|
if file.filename.split('.')[-1].lower() != 'json': |
|
return jsonify({'error': 'Invalid file format. Only JSON allowed'}), 400 |
|
try: |
|
data = json.load(file) |
|
|
|
messages = [] |
|
for msg in data.get('messages', []): |
|
text = msg.get('text') |
|
sender = msg.get('from') or msg.get('sender') or 'Неизвестный пользователь' |
|
if isinstance(text, str) and len(text.strip()) > 5: |
|
messages.append({ |
|
'text': text, |
|
'timestamp': msg.get('date', datetime.now().isoformat()), |
|
'from': sender |
|
}) |
|
|
|
if not messages: |
|
return jsonify({'error': 'No valid text messages found'}), 400 |
|
|
|
results = [] |
|
for msg in messages[:500]: |
|
prediction = text_classifier(msg['text'])[0] |
|
results.append({ |
|
'text': msg['text'], |
|
'emotion': prediction['label'], |
|
'confidence': prediction['score'], |
|
'timestamp': msg['timestamp'], |
|
'from': msg['from'] |
|
}) |
|
|
|
conn = get_db_connection() |
|
conn.execute(''' |
|
CREATE TABLE IF NOT EXISTS telegram_analysis ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
user_id INTEGER, |
|
data TEXT, |
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP, |
|
FOREIGN KEY(user_id) REFERENCES users(id) |
|
) |
|
''') |
|
conn.execute( |
|
"INSERT INTO telegram_analysis (user_id, data) VALUES (?, ?)", |
|
(current_user.id, json.dumps(results)) |
|
) |
|
conn.commit() |
|
|
|
return jsonify({ |
|
'status': 'success', |
|
'message_count': len(results), |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error during analysis: {e}") |
|
return jsonify({'error': f'Server error: {str(e)}'}), 500 |
|
finally: |
|
conn.close() |
|
|
|
|
|
@app.route('/get_chats') |
|
@login_required |
|
def get_chats(): |
|
conn = get_db_connection() |
|
try: |
|
chats = conn.execute( |
|
"SELECT chat_id, title, created_at FROM chats WHERE user_id = ? ORDER BY created_at DESC", |
|
(current_user.id,) |
|
).fetchall() |
|
return jsonify([dict(chat) for chat in chats]) |
|
finally: |
|
conn.close() |
|
|
|
|
|
@app.route('/start_chat', methods=['POST']) |
|
@login_required |
|
def start_chat(): |
|
try: |
|
chat_id = str(uuid.uuid4()) |
|
title = f"Чат от {datetime.now().strftime('%d.%m.%Y %H:%M')}" |
|
|
|
conn = get_db_connection() |
|
conn.execute( |
|
"INSERT INTO chats (chat_id, user_id, created_at, title) VALUES (?, ?, ?, ?)", |
|
(chat_id, current_user.id, datetime.now(), title) |
|
) |
|
conn.commit() |
|
|
|
return jsonify({ |
|
"success": True, |
|
"chat_id": chat_id, |
|
"title": title |
|
}) |
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
finally: |
|
conn.close() |
|
|
|
|
|
@app.route('/delete_chat/<chat_id>', methods=['DELETE']) |
|
@login_required |
|
def delete_chat(chat_id): |
|
conn = get_db_connection() |
|
try: |
|
|
|
conn.execute("DELETE FROM messages WHERE chat_id = ?", (chat_id,)) |
|
|
|
|
|
|
|
conn.execute(""" |
|
DELETE FROM analysis_reports |
|
WHERE content IN ( |
|
SELECT content FROM messages WHERE chat_id = ? |
|
) AND user_id = ? |
|
""", (chat_id, current_user.id)) |
|
|
|
|
|
conn.execute("DELETE FROM chats WHERE chat_id = ? AND user_id = ?", |
|
(chat_id, current_user.id)) |
|
conn.commit() |
|
return jsonify({"success": True}) |
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
finally: |
|
conn.close() |
|
|
|
|
|
@app.route('/get_telegram_analysis') |
|
@login_required |
|
def get_telegram_analysis(): |
|
conn = get_db_connection() |
|
try: |
|
analyses = conn.execute( |
|
"SELECT id, data, created_at FROM telegram_analysis WHERE user_id = ?", |
|
(current_user.id,) |
|
).fetchall() |
|
return jsonify([dict(analysis) for analysis in analyses]) |
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
finally: |
|
conn.close() |
|
|
|
|
|
@app.route('/load_chat/<chat_id>') |
|
@login_required |
|
def load_chat(chat_id): |
|
conn = get_db_connection() |
|
try: |
|
|
|
chat = conn.execute( |
|
"SELECT chat_id, title FROM chats WHERE chat_id = ? AND user_id = ?", |
|
(chat_id, current_user.id) |
|
).fetchone() |
|
|
|
if not chat: |
|
return jsonify({"error": "Чат не найден"}), 404 |
|
|
|
|
|
messages = conn.execute( |
|
"SELECT sender, content FROM messages WHERE chat_id = ? ORDER BY timestamp ASC", |
|
(chat_id,) |
|
).fetchall() |
|
|
|
return jsonify({ |
|
"chat_id": chat["chat_id"], |
|
"title": chat["title"], |
|
"messages": [dict(msg) for msg in messages] |
|
}) |
|
finally: |
|
conn.close() |
|
|
|
|
|
@app.route('/save_message', methods=['POST']) |
|
@login_required |
|
def save_message(): |
|
data = request.get_json() |
|
if not data or 'chat_id' not in data or 'content' not in data or 'sender' not in data: |
|
return jsonify({"error": "Неверные данные"}), 400 |
|
|
|
conn = get_db_connection() |
|
try: |
|
|
|
chat = conn.execute( |
|
"SELECT chat_id FROM chats WHERE chat_id = ? AND user_id = ?", |
|
(data['chat_id'], current_user.id) |
|
).fetchone() |
|
|
|
if not chat: |
|
return jsonify({"error": "Чат не найден"}), 404 |
|
|
|
|
|
emotion = "neutral" |
|
confidence = 0.0 |
|
if text_classifier and data['content'].strip(): |
|
try: |
|
predictions = text_classifier(data['content'])[0] |
|
top_prediction = max(predictions, key=lambda x: x["score"]) |
|
emotion = top_prediction["label"] |
|
confidence = top_prediction["score"] |
|
|
|
|
|
conn.execute( |
|
"INSERT INTO analysis_reports (user_id, content, emotion, confidence) VALUES (?, ?, ?, ?)", |
|
(current_user.id, data['content'], emotion, confidence) |
|
) |
|
except Exception as e: |
|
print(f"Ошибка анализа эмоции: {e}") |
|
|
|
|
|
conn.execute( |
|
"INSERT INTO messages (chat_id, sender, content, timestamp) VALUES (?, ?, ?, ?)", |
|
(data['chat_id'], data['sender'], data['content'], datetime.now()) |
|
) |
|
conn.commit() |
|
|
|
return jsonify({ |
|
"status": "success", |
|
"emotion": emotion_map.get(emotion, "❓ Неизвестно"), |
|
"confidence": round(confidence, 2) |
|
}) |
|
except Exception as e: |
|
return jsonify({"error": str(e)}), 500 |
|
finally: |
|
conn.close() |
|
|
|
|
|
if __name__ == "__main__": |
|
app.run(debug=True) |
|
|