|
from flask import Flask, request, jsonify |
|
from flask_cors import CORS |
|
import os |
|
import json |
|
import re |
|
from sentence_transformers import SentenceTransformer, CrossEncoder, util |
|
import torch |
|
from typing import List, Dict |
|
import random |
|
import datetime |
|
from fuzzywuzzy import fuzz |
|
|
|
app = Flask(__name__) |
|
CORS(app) |
|
|
|
class EnhancedMultilingualEidQABot: |
|
def __init__(self, data_file='dataSet.json'): |
|
print("🔄 Loading multilingual models...") |
|
self.bi_encoder = None |
|
self.cross_encoder = None |
|
print("📖 Processing dataset...") |
|
self.data = self._load_dataset(data_file) |
|
self.knowledge_chunks = self._create_chunks() |
|
self.chunk_embeddings = None |
|
self.question_patterns = self._initialize_question_patterns() |
|
print("✅ Bot ready!\n") |
|
def _ensure_embeddings(self): |
|
if self.chunk_embeddings is None: |
|
self._load_models() |
|
print("🧠 Creating embeddings...") |
|
self.chunk_embeddings = self.bi_encoder.encode( |
|
[chunk['text'] for chunk in self.knowledge_chunks], |
|
convert_to_tensor=True, |
|
show_progress_bar=True |
|
) |
|
|
|
|
|
def _load_dataset(self, data_file): |
|
try: |
|
with open(data_file, 'r', encoding='utf-8') as f: |
|
return json.load(f) |
|
except Exception as e: |
|
print(f"Error loading dataset: {e}") |
|
return [] |
|
|
|
def _create_chunks(self): |
|
chunks = [] |
|
for item in self.data: |
|
text = item['text'] |
|
tag = item.get('tag', 'General') |
|
chunks.append({ |
|
'text': text, |
|
'tag': tag, |
|
'type': 'original', |
|
'score_boost': 1.0 |
|
}) |
|
if 'eid' in text.lower() or 'عید' in text: |
|
chunks.append({ |
|
'text': f"Eid information: {text}", |
|
'tag': tag, |
|
'type': 'enhanced', |
|
'score_boost': 1.1 |
|
}) |
|
if 'prayer' in text.lower() or 'نماز' in text: |
|
chunks.append({ |
|
'text': f"Prayer information: {text}", |
|
'tag': tag, |
|
'type': 'enhanced', |
|
'score_boost': 1.2 |
|
}) |
|
if 'qurbani' in text.lower() or 'قربانی' in text or 'sacrifice' in text.lower(): |
|
chunks.append({ |
|
'text': f"Qurbani rules: {text}", |
|
'tag': tag, |
|
'type': 'enhanced', |
|
'score_boost': 1.2 |
|
}) |
|
if 'funny' in tag.lower() or 'shair' in tag.lower(): |
|
chunks.append({ |
|
'text': f"Fun fact: {text}", |
|
'tag': tag, |
|
'type': 'enhanced', |
|
'score_boost': 0.9 |
|
}) |
|
if 'gaza' in text.lower() or 'غزہ' in text: |
|
chunks.append({ |
|
'text': f"Gaza context: {text}", |
|
'tag': tag, |
|
'type': 'enhanced', |
|
'score_boost': 1.3 |
|
}) |
|
return chunks |
|
def _load_models(self): |
|
if self.bi_encoder is None: |
|
print("🔄 Loading bi-encoder model...") |
|
self.bi_encoder = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2') |
|
if self.cross_encoder is None: |
|
print("🔄 Loading cross-encoder model...") |
|
self.cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2') |
|
|
|
|
|
def _initialize_question_patterns(self): |
|
return { |
|
'greeting': ['eid mubarak', 'عید مبارک', 'hello', 'hi', 'salaam', 'سلام', 'mubarak', 'eid maz', 'eid mub', 'id mubarak'], |
|
'prayer': ['namaz', 'prayer', 'salah', 'eid ki namaz', 'نماز', 'how to pray', 'kaise parhein', 'nmaz', 'nmax', 'namaaz', 'salat'], |
|
'qurbani': ['qurbani', 'sacrifice', 'bakra', 'janwar', 'قربانی', 'ذبح', 'qurbni', 'kurbani', 'sacrifise'], |
|
'rules': ['rules', 'ahkam', 'قوانین', 'kya karna', 'what to do', 'kaise karna', 'rulez', 'ahkaam'], |
|
'time': ['time', 'waqt', 'kab', 'وقت', 'when', 'konsa din', 'kab hai'], |
|
'story': ['story', 'kahani', 'ibrahim', 'ismail', 'قصہ', 'واقعہ', 'history', 'kahaniya'], |
|
'food': ['food', 'khana', 'mithai', 'کھانا', 'سویاں', 'biryani', 'khane', 'meethi'], |
|
'funny': ['funny', 'shair', 'mazah', 'مزاح', 'joke', 'shairi', 'شاعری', 'mazak', 'maza'], |
|
'gaza': ['gaza', 'palestine', 'غزہ', 'فلسطین', 'war zone', 'gazah'], |
|
'general': ['kya hai', 'what is', 'بتائیں', 'معلومات', 'eid kya', 'عید کیا', 'eid hai'] |
|
} |
|
|
|
def _clean_input(self, text: str) -> str: |
|
text = re.sub(r'\s+', ' ', text.strip().lower()) |
|
text = re.sub(r'[^\w\s؟!]', '', text) |
|
return text |
|
|
|
def _fuzzy_match(self, word: str, keywords: List[str]) -> bool: |
|
return any(fuzz.ratio(word, keyword) > 80 for keyword in keywords) |
|
|
|
def _detect_question_type(self, question: str) -> str: |
|
cleaned_question = self._clean_input(question) |
|
words = cleaned_question.split() |
|
for category, keywords in self.question_patterns.items(): |
|
if any(self._fuzzy_match(word, keywords) for word in words): |
|
return category |
|
return 'general' |
|
|
|
def _get_contextual_boost(self, chunk: Dict, question_type: str) -> float: |
|
boost = chunk.get('score_boost', 1.0) |
|
if question_type == 'greeting' and 'greeting' in chunk['tag'].lower(): |
|
boost *= 1.4 |
|
elif question_type == 'prayer' and 'prayer' in chunk['tag'].lower(): |
|
boost *= 1.3 |
|
elif question_type == 'qurbani' and ('qurbani' in chunk['tag'].lower() or 'sacrifice' in chunk['tag'].lower()): |
|
boost *= 1.3 |
|
elif question_type == 'story' and 'story' in chunk['tag'].lower(): |
|
boost *= 1.2 |
|
elif question_type == 'funny' and 'funny' in chunk['tag'].lower(): |
|
boost *= 1.1 |
|
elif question_type == 'gaza' and 'gaza' in chunk['tag'].lower(): |
|
boost *= 1.3 |
|
return boost |
|
|
|
def _is_time_sensitive(self, question: str) -> bool: |
|
time_keywords = ['time', 'waqt', 'kab', 'وقت', 'when', 'konsa din', 'kab hai'] |
|
return any(self._fuzzy_match(word, time_keywords) for word in question.lower().split()) |
|
|
|
def answer_question(self, question: str) -> str: |
|
self._load_models() |
|
self._ensure_embeddings() |
|
|
|
cleaned_question = self._clean_input(question) |
|
if not cleaned_question: |
|
return self._get_default_response('empty') |
|
|
|
question_type = self._detect_question_type(cleaned_question) |
|
question_embedding = self.bi_encoder.encode(cleaned_question, convert_to_tensor=True) |
|
cos_scores = util.cos_sim(question_embedding, self.chunk_embeddings)[0] |
|
|
|
boosted_scores = [] |
|
for i, score in enumerate(cos_scores): |
|
boost = self._get_contextual_boost(self.knowledge_chunks[i], question_type) |
|
boosted_scores.append(score * boost) |
|
|
|
boosted_scores = torch.tensor(boosted_scores) |
|
top_k = min(15, len(self.knowledge_chunks)) |
|
top_results = torch.topk(boosted_scores, k=top_k) |
|
top_indices = top_results.indices.tolist() |
|
top_chunks = [self.knowledge_chunks[i]['text'] for i in top_indices] |
|
top_scores = top_results.values.tolist() |
|
|
|
rerank_pairs = [(cleaned_question, chunk) for chunk in top_chunks] |
|
rerank_scores = self.cross_encoder.predict(rerank_pairs) |
|
|
|
combined_scores = [] |
|
for i, rerank_score in enumerate(rerank_scores): |
|
combined_score = (rerank_score * 0.7) + (top_scores[i] * 0.3) |
|
combined_scores.append(combined_score) |
|
|
|
best_idx = max(range(len(combined_scores)), key=lambda i: combined_scores[i]) |
|
best_chunk = top_chunks[best_idx] |
|
best_score = combined_scores[best_idx] |
|
|
|
avg_score = sum(combined_scores) / len(combined_scores) |
|
threshold = avg_score * 0.8 |
|
|
|
if best_score < threshold: |
|
return self._get_default_response(question_type) |
|
|
|
|
|
response = best_chunk |
|
prefixes_to_remove = [ |
|
"Eid information: ", |
|
"Prayer information: ", |
|
"Qurbani rules: ", |
|
"Fun fact: ", |
|
"Gaza context: " |
|
] |
|
|
|
for prefix in prefixes_to_remove: |
|
if response.startswith(prefix): |
|
response = response[len(prefix):] |
|
break |
|
|
|
if self._is_time_sensitive(cleaned_question): |
|
current_date = datetime.datetime.now() |
|
islamic_date = "10th Dhul-Hijjah" |
|
response += f"\n\n🕒 آج {current_date.strftime('%B %d, %Y')} ہے۔ عید الاضحیٰ عام طور پر {islamic_date} کو ہوتی ہے۔" |
|
|
|
response += "\n\n This is a demo. I'm working on this project, and its continuation depends on user feedback. Please share your suggestions by visiting our 'Contact Us' screen." |
|
return response |
|
|
|
def _get_default_response(self, question_type: str) -> str: |
|
defaults = { |
|
'greeting': "🌙Eid Mubarak! May Allah accept your prayers.", |
|
'prayer': "🕌 Eid prayer is 2 rakahs with extra takbeerat. Consult scholars for details.", |
|
'qurbani': "🐐 Qurbani is obligatory for those who meet nisab. The animal must be healthy.", |
|
'rules': "📜 Qurbani rules: Animal age, health, and intention are key.", |
|
'time': "⏰ Eid ul-Adha is from 10th to 12th Dhul-Hijjah.", |
|
'story': "📖 Eid ul-Adha commemorates Prophet Ibrahim's (AS) sacrifice.", |
|
'food': "🍲 Eid foods include sheer khurma, biryani, and sweets.", |
|
'funny': "😄 Eid fun: Eat sweets, collect Eidi!", |
|
'gaza': "🤲 Pray for the people of Gaza. They are in hardship.", |
|
'empty': " Ask something about Eid!", |
|
'general': "🌟I am your Eid Assistant, created by OCi Lab . I am currently in progress and have limited data, focusing on small fun activities for Eid. I will improve myself after Eid" |
|
} |
|
return defaults.get(question_type, defaults['general']) |
|
|
|
def get_random_eid_fact(self) -> str: |
|
facts = [chunk for chunk in self.knowledge_chunks if chunk['tag'] in ['Eid_Overview', 'Prophet_Story', 'Eid_Prayer', 'Qurbani_Rules']] |
|
if facts: |
|
fact_text = random.choice(facts)['text'] |
|
|
|
prefixes_to_remove = [ |
|
"Eid information: ", |
|
"Prayer information: ", |
|
"Qurbani rules: ", |
|
"Fun fact: ", |
|
"Gaza context: " |
|
] |
|
for prefix in prefixes_to_remove: |
|
if fact_text.startswith(prefix): |
|
fact_text = fact_text[len(prefix):] |
|
break |
|
return f"💡 {fact_text}" |
|
return "🌙 Eid Mubarak!" |
|
|
|
def get_random_greeting(self) -> str: |
|
greetings = [chunk for chunk in self.knowledge_chunks if 'greeting' in chunk['tag'].lower()] |
|
if greetings: |
|
greeting_text = random.choice(greetings)['text'] |
|
|
|
prefixes_to_remove = [ |
|
"Eid information: ", |
|
"Prayer information: ", |
|
"Qurbani rules: ", |
|
"Fun fact: ", |
|
"Gaza context: " |
|
] |
|
for prefix in prefixes_to_remove: |
|
if greeting_text.startswith(prefix): |
|
greeting_text = greeting_text[len(prefix):] |
|
break |
|
return f"🎉 {greeting_text}" |
|
return "🌙 Eid Mubarak!" |
|
|
|
def get_random_shair(self) -> str: |
|
shairs = [chunk for chunk in self.knowledge_chunks if 'funny_shair_o_shairi' in chunk['tag'].lower()] |
|
if shairs: |
|
shair_text = random.choice(shairs)['text'] |
|
|
|
prefixes_to_remove = [ |
|
"Eid information: ", |
|
"Prayer information: ", |
|
"Qurbani rules: ", |
|
"Fun fact: ", |
|
"Gaza context: " |
|
] |
|
for prefix in prefixes_to_remove: |
|
if shair_text.startswith(prefix): |
|
shair_text = shair_text[len(prefix):] |
|
break |
|
return f"😄 شاعری: {shair_text}" |
|
return "😂 No shairi found, just Eid Mubarak!" |
|
|
|
def get_contextual_info(self) -> str: |
|
current_date = datetime.datetime.now() |
|
islamic_date = "10th Dhul-Hijjah" |
|
return f"🕒 {current_date.strftime('%B %d, %Y')}۔{islamic_date} " |
|
|
|
|
|
bot = EnhancedMultilingualEidQABot('dataSet.json') |
|
|
|
|
|
@app.route('/ask', methods=['POST']) |
|
def ask_question(): |
|
try: |
|
data = request.get_json() |
|
question = data.get('question', '') |
|
if not question: |
|
return jsonify({'answer': bot._get_default_response('empty')}) |
|
answer = bot.answer_question(question) |
|
return jsonify({'answer': answer}) |
|
except Exception as e: |
|
return jsonify({'error': str(e), 'answer': 'Sorry, something went wrong!'}) |
|
|
|
@app.route('/random', methods=['GET']) |
|
def random_fact(): |
|
fact = bot.get_random_eid_fact() |
|
return jsonify({'answer': fact}) |
|
|
|
@app.route('/greet', methods=['GET']) |
|
def random_greeting(): |
|
greeting = bot.get_random_greeting() |
|
return jsonify({'answer': greeting}) |
|
|
|
@app.route('/shair', methods=['GET']) |
|
def random_shair(): |
|
shair = bot.get_random_shair() |
|
return jsonify({'answer': shair}) |
|
|
|
@app.route('/context', methods=['GET']) |
|
def contextual_info(): |
|
info = bot.get_contextual_info() |
|
return jsonify({'answer': info}) |
|
@app.route('/warmup', methods=['GET']) |
|
def warmup(): |
|
try: |
|
bot._load_models() |
|
bot._ensure_embeddings() |
|
return jsonify({'status': 'Models warmed up and embeddings ready.'}) |
|
except Exception as e: |
|
return jsonify({'error': str(e)}) |
|
|
|
|
|
if __name__ == '__main__': |
|
port = int(os.environ.get('PORT', 5000)) |
|
app.run(host='0.0.0.0', port=port) |
|
|