|
|
|
from flask import Flask, request, jsonify |
|
from flask_cors import CORS |
|
import os |
|
import json |
|
import re |
|
from sentence_transformers import SentenceTransformer, CrossEncoder, util |
|
import torch |
|
from typing import List, Dict |
|
import random |
|
import datetime |
|
from fuzzywuzzy import fuzz |
|
|
|
app = Flask(__name__) |
|
CORS(app) |
|
|
|
class EnhancedMultilingualEidQABot: |
|
def __init__(self, data_file='dataSet.json'): |
|
print("\U0001F504 Loading multilingual models...") |
|
self.bi_encoder = None |
|
self.cross_encoder = None |
|
print("\U0001F4D6 Processing dataset...") |
|
self.data = self._load_dataset(data_file) |
|
self.knowledge_chunks = self._create_chunks() |
|
self.chunk_embeddings = None |
|
self.question_patterns = self._initialize_question_patterns() |
|
print("\u2705 Bot ready!\n") |
|
|
|
def _ensure_embeddings(self): |
|
if self.chunk_embeddings is None: |
|
self._load_models() |
|
print("\U0001F9E0 Creating embeddings...") |
|
self.chunk_embeddings = self.bi_encoder.encode( |
|
[chunk['text'] for chunk in self.knowledge_chunks], |
|
convert_to_tensor=True, |
|
show_progress_bar=True |
|
) |
|
|
|
def _load_dataset(self, data_file): |
|
try: |
|
with open(data_file, 'r', encoding='utf-8') as f: |
|
return json.load(f) |
|
except Exception as e: |
|
print(f"Error loading dataset: {e}") |
|
return [] |
|
|
|
def _create_chunks(self): |
|
chunks = [] |
|
for item in self.data: |
|
text = item['text'] |
|
tag = item.get('tag', 'General') |
|
chunks.append({ |
|
'text': text, |
|
'tag': tag, |
|
'type': 'original', |
|
'score_boost': 1.0 |
|
}) |
|
if 'eid' in text.lower() or 'عید' in text: |
|
chunks.append({'text': f"Eid info: {text}", 'tag': tag, 'type': 'enhanced', 'score_boost': 1.1}) |
|
if 'prayer' in text.lower() or 'نماز' in text: |
|
chunks.append({'text': f"Prayer info: {text}", 'tag': tag, 'type': 'enhanced', 'score_boost': 1.2}) |
|
if 'qurbani' in text.lower() or 'قربانی' in text or 'sacrifice' in text.lower(): |
|
chunks.append({'text': f"Qurbani info: {text}", 'tag': tag, 'type': 'enhanced', 'score_boost': 1.2}) |
|
if 'funny' in tag.lower() or 'shair' in tag.lower(): |
|
chunks.append({'text': f"Fun: {text}", 'tag': tag, 'type': 'enhanced', 'score_boost': 0.9}) |
|
if 'gaza' in text.lower() or 'غزہ' in text: |
|
chunks.append({'text': f"Gaza info: {text}", 'tag': tag, 'type': 'enhanced', 'score_boost': 1.3}) |
|
return chunks |
|
|
|
def _load_models(self): |
|
if self.bi_encoder is None: |
|
print("\U0001F504 Loading bi-encoder...") |
|
self.bi_encoder = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2') |
|
if self.cross_encoder is None: |
|
print("\U0001F504 Loading cross-encoder...") |
|
self.cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2') |
|
|
|
def _initialize_question_patterns(self): |
|
tag_keywords = {} |
|
for chunk in self.data: |
|
tag = chunk.get("tag", "").lower() |
|
if tag not in tag_keywords: |
|
tag_keywords[tag] = set() |
|
tag_keywords[tag].update(tag.replace('_', ' ').split()) |
|
|
|
|
|
if "greeting" in tag: |
|
tag_keywords[tag].update(["hi", "hello", "salaam", "eid mubarak", "السلام"]) |
|
elif "prayer" in tag: |
|
tag_keywords[tag].update(["prayer", "namaz", "salah", "نماز"]) |
|
elif "qurbani" in tag or "sacrifice" in tag: |
|
tag_keywords[tag].update(["qurbani", "sacrifice", "janwar", "bakra", "قربانی"]) |
|
elif "gaza" in tag: |
|
tag_keywords[tag].update(["gaza", "غزہ", "palestine", "فلسطین"]) |
|
|
|
return {k: list(v) for k, v in tag_keywords.items()} |
|
|
|
def _clean_input(self, text: str) -> str: |
|
text = re.sub(r'\s+', ' ', text.strip().lower()) |
|
text = re.sub(r'[^\w\s؟!]', '', text) |
|
return text |
|
|
|
def _fuzzy_match(self, word: str, keywords: List[str]) -> bool: |
|
return any(fuzz.ratio(word, keyword) > 80 for keyword in keywords) |
|
|
|
def _detect_question_type(self, question: str) -> str: |
|
cleaned_question = self._clean_input(question) |
|
words = cleaned_question.split() |
|
for category, keywords in self.question_patterns.items(): |
|
if any(self._fuzzy_match(word, keywords) for word in words): |
|
return category |
|
return 'general' |
|
|
|
def _get_contextual_boost(self, chunk: Dict, question_type: str) -> float: |
|
boost = chunk.get('score_boost', 1.0) |
|
if question_type in chunk['tag'].lower(): |
|
boost *= 1.3 |
|
return boost |
|
|
|
def _is_time_sensitive(self, question: str) -> bool: |
|
time_keywords = ['time', 'waqt', 'kab', 'when', 'کب', 'وقت'] |
|
return any(self._fuzzy_match(word, time_keywords) for word in question.lower().split()) |
|
|
|
def answer_question(self, question: str) -> str: |
|
self._load_models() |
|
self._ensure_embeddings() |
|
|
|
cleaned_question = self._clean_input(question) |
|
if not cleaned_question: |
|
return self._get_default_response('empty') |
|
|
|
question_type = self._detect_question_type(cleaned_question) |
|
question_embedding = self.bi_encoder.encode(cleaned_question, convert_to_tensor=True) |
|
cos_scores = util.cos_sim(question_embedding, self.chunk_embeddings)[0] |
|
|
|
boosted_scores = [score * self._get_contextual_boost(self.knowledge_chunks[i], question_type) |
|
for i, score in enumerate(cos_scores)] |
|
|
|
top_k = min(15, len(self.knowledge_chunks)) |
|
top_results = torch.topk(torch.tensor(boosted_scores), k=top_k) |
|
top_chunks = [self.knowledge_chunks[i]['text'] for i in top_results.indices.tolist()] |
|
top_scores = top_results.values.tolist() |
|
|
|
rerank_pairs = [(cleaned_question, chunk) for chunk in top_chunks] |
|
rerank_scores = self.cross_encoder.predict(rerank_pairs) |
|
|
|
combined_scores = [(rerank_scores[i] * 0.7 + top_scores[i] * 0.3) for i in range(len(rerank_scores))] |
|
best_idx = max(range(len(combined_scores)), key=lambda i: combined_scores[i]) |
|
best_chunk = top_chunks[best_idx] |
|
|
|
for prefix in ["Eid info: ", "Prayer info: ", "Qurbani info: ", "Fun: ", "Gaza info: "]: |
|
if best_chunk.startswith(prefix): |
|
best_chunk = best_chunk[len(prefix):] |
|
break |
|
|
|
if self._is_time_sensitive(cleaned_question): |
|
date = datetime.datetime.now().strftime('%B %d, %Y') |
|
best_chunk += f"\n\n🕒 آج {date} ہے۔ عید الاضحیٰ عام طور پر 10th Dhul-Hijjah کو ہوتی ہے۔" |
|
|
|
return best_chunk + "\n\n This is a demo. Your feedback matters." |
|
|
|
def _get_default_response(self, question_type: str) -> str: |
|
return { |
|
'empty': "❓ Ask something about Eid!", |
|
'general': "🌟 I'm your Eid Assistant. Ask me anything about Eid!" |
|
}.get(question_type, "🌟 I'm your Eid Assistant. Ask me anything about Eid!") |
|
|
|
def get_random_by_tag(self, tag_keyword: str) -> str: |
|
matches = [c['text'] for c in self.knowledge_chunks if tag_keyword in c['tag'].lower()] |
|
return random.choice(matches) if matches else "No info found." |
|
|
|
|
|
bot = EnhancedMultilingualEidQABot('dataSet.json') |
|
|
|
@app.route('/ask', methods=['POST']) |
|
def ask(): |
|
question = request.get_json().get('question', '') |
|
return jsonify({'answer': bot.answer_question(question)}) |
|
|
|
@app.route('/tags', methods=['GET']) |
|
def tags(): |
|
unique_tags = sorted({chunk['tag'] for chunk in bot.knowledge_chunks}) |
|
return jsonify({'tags': unique_tags}) |
|
|
|
@app.route('/tag/<tag>', methods=['GET']) |
|
def get_by_tag(tag): |
|
results = [chunk['text'] for chunk in bot.knowledge_chunks if tag.lower() in chunk['tag'].lower()] |
|
return jsonify({'results': results}) |
|
|
|
@app.route('/') |
|
def home(): |
|
return "✅ Eid Assistant API is running." |
|
|
|
if __name__ == '__main__': |
|
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 5000))) |