Spaces:
Sleeping
Sleeping
import re | |
import json | |
import requests | |
import traceback | |
import time | |
import os | |
import asyncio | |
from typing import Dict, Any, List, Optional, Tuple | |
from datetime import datetime, timedelta | |
from functools import lru_cache | |
from concurrent.futures import ThreadPoolExecutor | |
# Updated imports for pydantic | |
from pydantic import BaseModel, Field | |
# Updated imports for LangChain | |
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate | |
from langchain_core.output_parsers import JsonOutputParser | |
from langchain_ollama import OllamaLLM | |
from langchain.chains import LLMChain | |
from langchain.callbacks.manager import CallbackManager | |
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler | |
from langchain_huggingface.embeddings import HuggingFaceEmbeddings | |
# Enhanced HuggingFace imports for improved functionality | |
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification | |
import numpy as np | |
# FastAPI and async HTTP client imports | |
from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import JSONResponse | |
import aiohttp | |
import httpx | |
from starlette.requests import Request | |
from starlette.responses import Response | |
# Import endpoints documentation | |
from endpoints_documentation import endpoints_documentation | |
# Set environment variables for HuggingFace | |
os.environ["HF_HOME"] = "/tmp/huggingface" | |
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" | |
# Global thread pool for CPU-bound operations | |
thread_pool = ThreadPoolExecutor(max_workers=4) | |
# Global HTTP client session for async requests | |
http_client = None | |
# Rate limiting settings | |
RATE_LIMIT_PER_MINUTE = 60 | |
rate_limit_counter = 0 | |
rate_limit_reset_time = time.time() | |
class ChatMessage(BaseModel): | |
"""Data model for chat messages""" | |
message_id: str = Field(..., description="Unique identifier for the message") | |
user_id: str = Field(..., description="User identifier") | |
message: str = Field(..., description="The user's message") | |
timestamp: datetime = Field(default_factory=datetime.now, description="When the message was sent") | |
language: str = Field(default="english", description="Detected language of the message") | |
class ChatResponse(BaseModel): | |
"""Data model for chatbot responses""" | |
response_id: str = Field(..., description="Unique identifier for the response") | |
response_type: str = Field(..., description="Type of response: 'conversation' or 'api_action'") | |
message: str = Field(..., description="The chatbot's response message") | |
api_call_made: bool = Field(default=False, description="Whether an API call was made") | |
api_data: Optional[Dict[str, Any]] = Field(default=None, description="API response data if applicable") | |
language: str = Field(default="english", description="Language of the response") | |
timestamp: datetime = Field(default_factory=datetime.now, description="When the response was generated") | |
class RouterResponse(BaseModel): | |
"""Data model for router chain response""" | |
intent: str = Field(..., description="Either 'API_ACTION' or 'CONVERSATION'") | |
confidence: float = Field(..., description="Confidence score between 0.0 and 1.0") | |
reasoning: str = Field(..., description="Explanation of the decision") | |
endpoint: Optional[str] = Field(default=None, description="API endpoint if intent is API_ACTION") | |
method: Optional[str] = Field(default=None, description="HTTP method if intent is API_ACTION") | |
params: Dict[str, Any] = Field(default_factory=dict, description="Parameters for API call") | |
missing_required: List[str] = Field(default_factory=list, description="Missing required parameters") | |
class HealthcareChatbot: | |
def __init__(self): | |
self.endpoints_documentation = endpoints_documentation | |
self.ollama_base_url = "http://localhost:11434" | |
self.model_name = "gemma3" | |
self.BASE_URL = 'https://8ac0-197-54-54-66.ngrok-free.app' | |
self.headers = {'Content-type': 'application/json'} | |
self.user_id = '9e889485-3db4-4f70-a7a2-e219beae6578' | |
self.max_retries = 3 | |
self.retry_delay = 2 | |
# Store conversation history with user-specific sessions | |
self.conversation_sessions = {} | |
self.max_history_length = 10 | |
# Initialize components | |
self._initialize_language_tools() | |
self._initialize_llm() | |
self._initialize_parsers_and_chains() | |
self._initialize_date_parser() | |
# Initialize async HTTP client | |
self._initialize_http_client() | |
print("Healthcare Chatbot initialized successfully!") | |
self._print_welcome_message() | |
def _initialize_http_client(self): | |
"""Initialize async HTTP client with connection pooling""" | |
global http_client | |
if http_client is None: | |
http_client = httpx.AsyncClient( | |
timeout=30.0, | |
limits=httpx.Limits(max_keepalive_connections=100, max_connections=1000), | |
transport=httpx.AsyncHTTPTransport(retries=3) | |
) | |
async def _close_http_client(self): | |
"""Close the HTTP client""" | |
global http_client | |
if http_client: | |
await http_client.aclose() | |
http_client = None | |
def _get_user_session(self, user_id: str) -> List[Dict]: | |
"""Get or create user conversation session""" | |
if user_id not in self.conversation_sessions: | |
self.conversation_sessions[user_id] = [] | |
return self.conversation_sessions[user_id] | |
async def _check_rate_limit(self) -> bool: | |
"""Check and update rate limiting""" | |
global rate_limit_counter, rate_limit_reset_time | |
current_time = time.time() | |
# Reset counter if a minute has passed | |
if current_time - rate_limit_reset_time >= 60: | |
rate_limit_counter = 0 | |
rate_limit_reset_time = current_time | |
# Check if we're over the limit | |
if rate_limit_counter >= RATE_LIMIT_PER_MINUTE: | |
return False | |
rate_limit_counter += 1 | |
return True | |
def _print_welcome_message(self): | |
"""Print welcome message in both languages""" | |
print("\n" + "="*60) | |
print("๐ฅ HEALTHCARE CHATBOT READY") | |
print("="*60) | |
print("English: Hello! I'm your healthcare assistant. I can help you with:") | |
print("โข Booking and managing appointments") | |
print("โข Finding hospital information") | |
print("โข Viewing your medical records") | |
print("โข General healthcare questions") | |
print() | |
print("Arabic: ู ุฑุญุจุงู! ุฃูุง ู ุณุงุนุฏู ุงูุทุจู. ูู ูููู ู ุณุงุนุฏุชู ูู:") | |
print("โข ุญุฌุฒ ูุฅุฏุงุฑุฉ ุงูู ูุงุนูุฏ") | |
print("โข ุงูุนุซูุฑ ุนูู ู ุนููู ุงุช ุงูู ุณุชุดูู") | |
print("โข ุนุฑุถ ุณุฌูุงุชู ุงูุทุจูุฉ") | |
print("โข ุงูุฃุณุฆูุฉ ุงูุทุจูุฉ ุงูุนุงู ุฉ") | |
print("="*60) | |
print("Type 'quit' or 'ุฎุฑูุฌ' to exit\n") | |
def _initialize_language_tools(self): | |
"""Initialize language processing tools""" | |
try: | |
self.embeddings = HuggingFaceEmbeddings(model_name="intfloat/multilingual-e5-large") | |
self.language_classifier = pipeline( | |
"text-classification", | |
model="papluca/xlm-roberta-base-language-detection", | |
top_k=1 | |
) | |
self.sentiment_analyzer = pipeline( | |
"sentiment-analysis", | |
model="cardiffnlp/twitter-xlm-roberta-base-sentiment" | |
) | |
print("โ Language processing models loaded successfully") | |
except Exception as e: | |
print(f"โ Warning: Some language models failed to load: {e}") | |
self.language_classifier = None | |
self.sentiment_analyzer = None | |
def _initialize_date_parser(self): | |
"""Initialize date parsing model""" | |
try: | |
self.date_parser = pipeline( | |
"token-classification", | |
model="Jean-Baptiste/roberta-large-ner-english", | |
aggregation_strategy="simple" | |
) | |
except Exception as e: | |
print(f"โ Warning: Date parsing model failed to load: {e}") | |
self.date_parser = None | |
def _initialize_llm(self): | |
"""Initialize the LLM""" | |
callbacks = [StreamingStdOutCallbackHandler()] | |
self.llm = OllamaLLM( | |
model=self.model_name, | |
base_url=self.ollama_base_url, | |
callbacks=callbacks, | |
temperature=0.7, | |
num_ctx=8192, | |
top_p=0.9, | |
request_timeout=60, | |
) | |
def _initialize_parsers_and_chains(self): | |
"""Initialize all prompt templates and chains - REVAMPED to 3 chains only""" | |
self.json_parser = JsonOutputParser(pydantic_object=RouterResponse) | |
# UNIFIED ROUTER CHAIN - Handles both intent classification AND API routing | |
self.router_prompt_template = PromptTemplate( | |
template=""" | |
You are a routing system. Your job is simple: | |
1. Understand what the user wants | |
2. Handle any dates/times in their request with PRECISE calculations | |
3. Check if any endpoint can do what they want | |
4. If yes = API_ACTION, if no = CONVERSATION | |
## Available API Endpoints Documentation | |
{endpoints_documentation} | |
## User Query to Analyze | |
Query: "{user_query}" | |
Language: {detected_language} | |
Current Context: | |
- DateTime: {current_datetime} | |
- Timezone: {timezone} | |
- Current Day of Week: {current_day_name} | |
## Step-by-Step Analysis | |
**STEP 1: What does the user want?** | |
- If query is in Arabic, translate it to English first | |
- Identify the exact action or information the user is requesting | |
- Focus on understanding their underlying need, not just the words | |
**STEP 2: Handle Date/Time Processing with PRECISE Calculations** | |
IMPORTANT: Use the current datetime ({current_datetime}) and timezone ({timezone}) for ALL calculations. | |
### Current Date Reference Points: | |
- Today is: {current_datetime} | |
- Current day of week: {current_day_name} | |
- Current timezone: {timezone} | |
### Arabic Date/Time Expressions Processing: | |
**Basic Relative Dates:** | |
- "ุงูููู " (today) = {current_datetime} date portion | |
- "ุบุฏุง" (tomorrow) = current date + 1 day | |
- "ุฃู ุณ" (yesterday) = current date - 1 day | |
- "ุจุนุฏ ุบุฏ" (day after tomorrow) = current date + 2 days | |
**Weekly Expressions - CALCULATE PRECISELY:** | |
- "ุงูุฃุณุจูุน ุงููุงุฏู " (next week) = current date + 7 days | |
- "ุงูุฃุณุจูุน ุงูู ุงุถู" (last week) = current date - 7 days | |
**Specific Weekday Calculations - MOST IMPORTANT:** | |
For expressions like "ููู [weekday] ุงููุงุฏู " (next [weekday]): | |
1. Identify the target weekday from Arabic names: | |
- ุงูุฃุญุฏ (Sunday) = 0 | |
- ุงูุงุซููู (Monday) = 1 | |
- ุงูุซูุงุซุงุก (Tuesday) = 2 | |
- ุงูุฃุฑุจุนุงุก (Wednesday) = 3 | |
- ุงูุฎู ูุณ (Thursday) = 4 | |
- ุงูุฌู ุนุฉ (Friday) = 5 | |
- ุงูุณุจุช (Saturday) = 6 | |
2. Calculate days to add: | |
- Get current weekday number (0=Sunday, 1=Monday, etc.) | |
- Target weekday number | |
- If target > current: days_to_add = target - current | |
- If target <= current: days_to_add = 7 - (current - target) | |
- Final date = current_date + days_to_add | |
**Example Calculation:** | |
If today is Sunday (June 1, 2025) and user says "ููู ุงูุงุฑุจุน ุงููุงุฏู " (next Wednesday): | |
- Current weekday: 0 (Sunday) | |
- Target weekday: 3 (Wednesday) | |
- Days to add: 3 - 0 = 3 | |
- Result: June 1 + 3 days = June 4, 2025 | |
**Monthly/Yearly Expressions:** | |
- "ุงูุดูุฑ ุงููุงุฏู " (next month) = add 1 month to current date | |
- "ุงูุดูุฑ ุงูู ุงุถู" (last month) = subtract 1 month from current date | |
- "ุงูุณูุฉ ุงููุงุฏู ุฉ" (next year) = add 1 year to current date | |
**Time Expressions:** | |
- "ุตุจุงุญูุง" (morning/AM) = 09:00 if no specific time given | |
- "ู ุณุงุกู" (evening/PM) = 18:00 if no specific time given | |
- "ุธูุฑูุง" (noon) = 12:00 | |
- "ู ูุชุตู ุงูููู" (midnight) = 00:00 | |
- "ุจุนุฏ ุณุงุนุชูู" (in 2 hours) = current time + 2 hours | |
- "ูุจู ุณุงุนุฉ" (1 hour ago) = current time - 1 hour | |
**Date Format Output:** | |
- Always convert final calculated date to ISO 8601 format: YYYY-MM-DDTHH:MM:SS | |
- Include timezone offset if available | |
- For date-only expressions, use 00:00:00 as default time | |
**STEP 3: Find matching endpoint** | |
- Read each endpoint description in the documentation | |
- Check if any endpoint's purpose can fulfill what the user wants | |
- Match based on functionality, not keywords | |
**STEP 4: Decision** | |
- Found matching endpoint = "API_ACTION" | |
- No matching endpoint = "CONVERSATION" | |
**STEP 5: Parameter Extraction (only if API_ACTION)** | |
- Extract parameter values from user query | |
- Use the CALCULATED dates/times from Step 2 | |
- Convert all dates/times to ISO 8601 format (YYYY-MM-DDTHH:MM:SS) | |
- List any missing required parameters | |
- **CRITICAL: All parameters must be in English** | |
- Translate any Arabic text to English | |
- Convert names to English equivalents (e.g., "ุฏูุชูุฑ ุงุญู ุฏ" โ "Dr. Ahmed") | |
- Use standard English terms for all parameters | |
## Output Format | |
{{ | |
"intent": "CONVERSATION|API_ACTION", | |
"confidence": 0.8, | |
"reasoning": "User wants: [what user actually needs]. Date/time processing: [show exact calculation: current date + X days = final date]. Found endpoint: [endpoint path and why it matches] OR No endpoint matches this need", | |
"endpoint": "/exact/endpoint/path", | |
"method": "GET|POST|PUT|DELETE", | |
"params": {{ | |
// ALL VALUES MUST BE IN ENGLISH | |
// Arabic terms must be translated to English equivalents | |
}}, | |
"missing_required": [], | |
"calculated_datetime": "YYYY-MM-DDTHH:MM:SS (if date/time was processed)" | |
}} | |
## CRITICAL REMINDERS: | |
1. ALWAYS use the provided current_datetime ({current_datetime}) as your base for calculations | |
2. For "next weekday" expressions, calculate the exact number of days to add | |
3. Show your calculation work in the reasoning field | |
4. Double-check weekday numbers: Sunday=0, Monday=1, Tuesday=2, Wednesday=3, Thursday=4, Friday=5, Saturday=6 | |
5. **ALL PARAMETERS MUST BE IN ENGLISH** - translate any Arabic text before output | |
**FINAL CHECK BEFORE OUTPUTTING:** | |
๐ **MANDATORY LANGUAGE CHECK:** | |
1. Examine every value in the params object | |
2. If ANY value contains Arabic characters (ุง-ู), you MUST: | |
- Translate it to English | |
- Convert names to English equivalents | |
- Replace Arabic terms with English counterparts | |
3. Only output JSON when ALL parameters are in English | |
Now analyze the user query step by step and give me the JSON response. | |
""", | |
input_variables=["user_query", "detected_language", "extracted_keywords", | |
"sentiment_analysis", "endpoints_documentation", "current_datetime", | |
"timezone", "current_day_name"] | |
) | |
# CONVERSATION CHAIN - Handles conversational responses | |
self.conversation_template = PromptTemplate( | |
template=""" | |
You are a friendly and professional healthcare chatbot assistant. | |
=== RESPONSE GUIDELINES === | |
- Respond ONLY in {detected_language} | |
- Be helpful, empathetic, and professional | |
- Keep responses concise but informative | |
- Use appropriate medical terminology when needed | |
- Maintain a caring and supportive tone | |
=== CONTEXT === | |
User Message: {user_query} | |
Language: {detected_language} | |
Sentiment: {sentiment_analysis} | |
Conversation History: {conversation_history} | |
=== LANGUAGE-SPECIFIC INSTRUCTIONS === | |
FOR ARABIC RESPONSES: | |
- Use Modern Standard Arabic (ุงููุตุญู) | |
- Be respectful and formal as appropriate in Arabic culture | |
- Use proper Arabic medical terminology | |
- Keep sentences clear and grammatically correct | |
FOR ENGLISH RESPONSES: | |
- Use clear, professional English | |
- Be warm and approachable | |
- Use appropriate medical terminology | |
=== RESPONSE RULES === | |
1. Address the user's question or comment directly | |
2. Provide helpful information when possible | |
3. If you cannot help with something specific, explain what you CAN help with | |
4. Never provide specific medical advice - always recommend consulting healthcare professionals | |
5. Be encouraging and supportive | |
6. Do NOT mix languages in your response | |
7. End responses naturally without asking multiple questions | |
Generate a helpful conversational response:""", | |
input_variables=["user_query", "detected_language", "sentiment_analysis", "conversation_history"] | |
) | |
# API RESPONSE CHAIN - Formats API responses for users | |
self.api_response_template = PromptTemplate( | |
template=""" | |
You are a professional healthcare assistant. Generate a natural language response to the user's query using ONLY the provided API data. | |
User Query: {user_query} | |
User Sentiment: {sentiment_analysis} | |
Response Language: {detected_language} | |
API Response Data: | |
{api_response} | |
=== CORE INSTRUCTIONS === | |
1. Analyze the API response structure and extract relevant data points | |
2. Cross-reference with the user's query to determine what information to include | |
3. Respond in {detected_language} using a warm, conversational tone | |
4. Convert technical data into natural language appropriate for healthcare communication | |
=== DATE/TIME HANDLING === | |
1. Identify all date/time fields in the API response (look for ISO 8601 format: YYYY-MM-DDTHH:MM:SS) | |
2. For English responses: | |
- Format dates as "Month Day, Year at HH:MM AM/PM" | |
- Convert times to 12-hour format with proper AM/PM | |
3. For Arabic responses: | |
- Format dates as "Day Month Year ุงูุณุงุนุฉ HH:MM ุตุจุงุญุงู/ู ุณุงุกู" | |
- Use Arabic numerals (ู ูกูขูฃูคูฅูฆูงูจูฉ) | |
- Use Arabic month names | |
4. Preserve all original date/time values - only change the formatting | |
=== RESPONSE GUIDELINES === | |
1. Use ONLY data present in the API response | |
2. Maintain a professional yet friendly healthcare tone | |
3. Adapt to the user's sentiment: | |
- Positive: reinforce with encouraging language | |
- Neutral: provide clear, factual information | |
- Negative: show empathy and offer assistance | |
4. Structure the response to directly answer the user's query | |
5. Include relevant details from the API response that address the user's needs | |
=== CRITICAL RULES === | |
1. Never invent or hallucinate information not present in the API response | |
2. If the API response doesn't contain requested information, say so politely | |
3. All dates/times must exactly match the API data | |
4. Maintain strict language consistency (respond only in {detected_language}) | |
5. Format all technical data (IDs, codes, etc.) for easy understanding | |
Generate a helpful response that addresses the user's query using the API data. | |
""", | |
input_variables=["user_query", "api_response", "detected_language", "sentiment_analysis"] | |
) | |
# Create the 3 chains | |
self.router_chain = LLMChain(llm=self.llm, prompt=self.router_prompt_template) | |
self.conversation_chain = LLMChain(llm=self.llm, prompt=self.conversation_template) | |
self.api_response_chain = LLMChain(llm=self.llm, prompt=self.api_response_template) | |
def detect_language(self, text): | |
"""Detect language of the input text""" | |
if self.language_classifier and len(text.strip()) > 3: | |
try: | |
result = self.language_classifier(text) | |
detected_lang = result[0][0]['label'] | |
confidence = result[0][0]['score'] | |
if detected_lang in ['ar', 'arabic']: | |
return "arabic" | |
elif detected_lang in ['en', 'english']: | |
return "english" | |
elif confidence > 0.8: | |
return "english" # Default to English for unsupported languages | |
except: | |
pass | |
# Fallback: Basic Arabic detection | |
arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF]+') | |
if arabic_pattern.search(text): | |
return "arabic" | |
return "english" | |
def analyze_sentiment(self, text): | |
"""Analyze sentiment of the text""" | |
if self.sentiment_analyzer and len(text.strip()) > 3: | |
try: | |
result = self.sentiment_analyzer(text) | |
return { | |
"sentiment": result[0]['label'], | |
"score": result[0]['score'] | |
} | |
except: | |
pass | |
return {"sentiment": "NEUTRAL", "score": 0.5} | |
def extract_keywords(self, text): | |
"""Extract keywords from text""" | |
# Simple keyword extraction | |
words = re.findall(r'\b\w+\b', text.lower()) | |
# Filter out common words and keep meaningful ones | |
stopwords = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were'} | |
keywords = [w for w in words if len(w) > 3 and w not in stopwords] | |
return list(set(keywords))[:5] # Return top 5 unique keywords | |
def get_conversation_context(self, user_id: str) -> str: | |
"""Get recent conversation history as context""" | |
history = self._get_user_session(user_id) | |
if not history: | |
return "No previous conversation" | |
context = [] | |
for item in history[-3:]: # Last 3 exchanges | |
context.append(f"User: {item['user_message']}") | |
context.append(f"Bot: {item['bot_response'][:100]}...") # Truncate long responses | |
return " | ".join(context) | |
def add_to_history(self, user_id: str, user_message: str, bot_response: str, response_type: str): | |
"""Add exchange to conversation history""" | |
history = self._get_user_session(user_id) | |
history.append({ | |
'timestamp': datetime.now(), | |
'user_message': user_message, | |
'bot_response': bot_response, | |
'response_type': response_type | |
}) | |
# Keep only recent history | |
if len(history) > self.max_history_length: | |
self.conversation_sessions[user_id] = history[-self.max_history_length:] | |
def parse_relative_date(self, text, detected_language): | |
"""Parse relative dates from text using a combination of methods""" | |
today = datetime.now() | |
# Handle common relative date patterns in English and Arabic | |
tomorrow_patterns = { | |
'english': [r'\btomorrow\b', r'\bnext day\b'], | |
'arabic': [r'\bุบุฏุง\b', r'\bุจูุฑุฉ\b', r'\bุบุฏูุง\b', r'\bุงูุบุฏ\b'] | |
} | |
next_week_patterns = { | |
'english': [r'\bnext week\b'], | |
'arabic': [r'\bุงูุฃุณุจูุน ุงููุงุฏู \b', r'\bุงูุฃุณุจูุน ุงูู ูุจู\b', r'\bุงูุงุณุจูุน ุงูุฌุงู\b'] | |
} | |
# Check for "tomorrow" patterns | |
for pattern in tomorrow_patterns.get(detected_language, []) + tomorrow_patterns.get('english', []): | |
if re.search(pattern, text, re.IGNORECASE): | |
return (today + timedelta(days=1)).strftime('%Y-%m-%dT%H:%M:%S') | |
# Check for "next week" patterns | |
for pattern in next_week_patterns.get(detected_language, []) + next_week_patterns.get('english', []): | |
if re.search(pattern, text, re.IGNORECASE): | |
return (today + timedelta(days=7)).strftime('%Y-%m-%dT%H:%M:%S') | |
# If NER model is available, use it to extract date entities | |
if self.date_parser and detected_language == 'english': | |
try: | |
date_entities = self.date_parser(text) | |
for entity in date_entities: | |
if entity['entity_group'] == 'DATE': | |
print(f"Found date entity: {entity['word']}") | |
# Default to tomorrow if we detect any date | |
return (today + timedelta(days=1)).strftime('%Y-%m-%dT%H:%M:%S') | |
except Exception as e: | |
print(f"Error in date parsing: {e}") | |
# Default return None if no date pattern is recognized | |
return None | |
def parse_router_response(self, router_text): | |
"""Parse the router chain response into structured data""" | |
try: | |
# Clean the response text | |
cleaned_response = router_text | |
# Remove any comments (both single-line and multi-line) | |
cleaned_response = re.sub(r'//.*?$', '', cleaned_response, flags=re.MULTILINE) | |
cleaned_response = re.sub(r'/\*.*?\*/', '', cleaned_response, flags=re.DOTALL) | |
# Remove any trailing commas | |
cleaned_response = re.sub(r',(\s*[}\]])', r'\1', cleaned_response) | |
# Try different methods to parse the JSON response | |
try: | |
# First attempt: direct JSON parsing of cleaned response | |
parsed_response = json.loads(cleaned_response) | |
except json.JSONDecodeError: | |
try: | |
# Second attempt: extract JSON from markdown code block | |
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', cleaned_response, re.DOTALL) | |
if json_match: | |
parsed_response = json.loads(json_match.group(1)) | |
else: | |
raise ValueError("No JSON found in code block") | |
except (json.JSONDecodeError, ValueError): | |
try: | |
# Third attempt: find JSON-like content using regex | |
json_pattern = r'\{\s*"intent"\s*:.*?\}' | |
json_match = re.search(json_pattern, cleaned_response, re.DOTALL) | |
if json_match: | |
json_str = json_match.group(0) | |
# Additional cleaning for the extracted JSON | |
json_str = re.sub(r'//.*?$', '', json_str, flags=re.MULTILINE) | |
json_str = re.sub(r',(\s*[}\]])', r'\1', json_str) | |
parsed_response = json.loads(json_str) | |
else: | |
raise ValueError("Could not extract JSON using regex") | |
except (json.JSONDecodeError, ValueError): | |
print(f"Failed to parse JSON. Raw response: {router_text}") | |
print(f"Cleaned response: {cleaned_response}") | |
# Return default conversation response on parse failure | |
return { | |
"intent": "CONVERSATION", | |
"confidence": 0.5, | |
"reasoning": "Failed to parse router response - defaulting to conversation", | |
"endpoint": None, | |
"method": None, | |
"params": {}, | |
"missing_required": [] | |
} | |
# Validate required fields and set defaults | |
validated_response = { | |
"intent": parsed_response.get("intent", "CONVERSATION"), | |
"confidence": parsed_response.get("confidence", 0.5), | |
"reasoning": parsed_response.get("reasoning", "Router decision"), | |
"endpoint": parsed_response.get("endpoint"), | |
"method": parsed_response.get("method"), | |
"params": parsed_response.get("params", {}), | |
"missing_required": parsed_response.get("missing_required", []) | |
} | |
return validated_response | |
except Exception as e: | |
print(f"Error parsing router response: {e}") | |
return { | |
"intent": "CONVERSATION", | |
"confidence": 0.5, | |
"reasoning": f"Parse error: {str(e)}", | |
"endpoint": None, | |
"method": None, | |
"params": {}, | |
"missing_required": [] | |
} | |
def handle_conversation(self, user_query, detected_language, sentiment_result): | |
"""Handle conversational responses""" | |
try: | |
result = self.conversation_chain.invoke({ | |
"user_query": user_query, | |
"detected_language": detected_language, | |
"sentiment_analysis": json.dumps(sentiment_result), | |
"conversation_history": self.get_conversation_context(self.user_id) | |
}) | |
return result["text"].strip() | |
except Exception as e: | |
# Fallback response | |
if detected_language == "arabic": | |
return "ุฃุนุชุฐุฑุ ูุงุฌูุช ู ุดููุฉ ูู ุงูู ุนุงูุฌุฉ. ููู ูู ูููู ู ุณุงุนุฏุชูุ" | |
else: | |
return "I apologize, I encountered a processing issue. How can I help you?" | |
async def backend_call(self, data: Dict[str, Any]) -> Dict[str, Any]: | |
"""Make async API call to backend with retry logic""" | |
endpoint_url = data.get('endpoint') | |
endpoint_method = data.get('method') | |
endpoint_params = data.get('params', {}).copy() | |
print(f"๐ Making API call to {endpoint_method} {self.BASE_URL + endpoint_url} with params: {endpoint_params}") | |
# Inject patient_id if needed | |
if 'patient_id' in endpoint_params: | |
endpoint_params['patient_id'] = self.user_id | |
retries = 0 | |
while retries < self.max_retries: | |
try: | |
if endpoint_method.upper() == 'GET': | |
response = await http_client.get( | |
self.BASE_URL + endpoint_url, | |
params=endpoint_params, | |
headers=self.headers | |
) | |
else: | |
response = await http_client.request( | |
endpoint_method.upper(), | |
self.BASE_URL + endpoint_url, | |
json=endpoint_params, | |
headers=self.headers | |
) | |
response.raise_for_status() | |
return response.json() | |
except httpx.HTTPError as e: | |
retries += 1 | |
if retries >= self.max_retries: | |
return { | |
"error": "Backend API call failed after multiple retries", | |
"details": str(e), | |
"status_code": getattr(e.response, 'status_code', None) if hasattr(e, 'response') else None | |
} | |
await asyncio.sleep(self.retry_delay) | |
async def handle_api_action(self, user_query: str, detected_language: str, | |
sentiment_result: Dict, keywords: List[str], | |
router_data: Dict) -> Dict[str, Any]: | |
"""Handle API-based actions using router data""" | |
try: | |
# Inject patient_id if needed | |
if 'patient_id' in router_data['params']: | |
router_data['params']['patient_id'] = self.user_id | |
else: | |
router_data['params']['patient_id'] = self.user_id | |
print(f"๐ Final API call data: {router_data}") | |
# Make backend API call | |
api_response = await self.backend_call(router_data) | |
print("๐ API response received:", api_response) | |
# Generate user-friendly response using thread pool for CPU-bound LLM operation | |
loop = asyncio.get_event_loop() | |
user_response_result = await loop.run_in_executor( | |
thread_pool, | |
lambda: self.api_response_chain.invoke({ | |
"user_query": user_query, | |
"api_response": json.dumps(api_response, indent=2), | |
"detected_language": detected_language, | |
"sentiment_analysis": json.dumps(sentiment_result), | |
}) | |
) | |
print("๐ Final user response:", user_response_result["text"].strip()) | |
return { | |
"response": user_response_result["text"].strip(), | |
"api_data": api_response, | |
"routing_info": router_data | |
} | |
except Exception as e: | |
# Fallback error response | |
if detected_language == "arabic": | |
error_msg = "ุฃุนุชุฐุฑุ ูู ุฃุชู ูู ู ู ู ุนุงูุฌุฉ ุทูุจู. ูุฑุฌู ุงูู ุญุงููุฉ ู ุฑุฉ ุฃุฎุฑู ุฃู ุตูุงุบุฉ ุงูุณุคุงู ุจุทุฑููุฉ ู ุฎุชููุฉ." | |
else: | |
error_msg = "I apologize, I couldn't process your request. Please try again or rephrase your question." | |
return { | |
"response": error_msg, | |
"api_data": {"error": str(e)}, | |
"routing_info": None | |
} | |
async def chat(self, user_message: str, user_id: str = None) -> ChatResponse: | |
"""Main chat method that handles user messages with async support""" | |
start_time = time.time() | |
# Use provided user_id or default | |
user_id = user_id or self.user_id | |
# Check rate limiting | |
if not await self._check_rate_limit(): | |
return ChatResponse( | |
response_id=str(time.time()), | |
response_type="conversation", | |
message="I'm currently processing too many requests. Please try again in a moment.", | |
api_call_made=False, | |
language="english" | |
) | |
# Check for exit commands | |
if user_message.lower().strip() in ['quit', 'exit', 'ุฎุฑูุฌ', 'bye', 'goodbye']: | |
if self.detect_language(user_message) == "arabic": | |
return ChatResponse( | |
response_id=str(time.time()), | |
response_type="conversation", | |
message="ู ุน ุงูุณูุงู ุฉ! ุฃุชู ูู ูู ููู ุงู ุณุนูุฏุงู. ๐", | |
language="arabic" | |
) | |
else: | |
return ChatResponse( | |
response_id=str(time.time()), | |
response_type="conversation", | |
message="Goodbye! Have a great day! ๐", | |
language="english" | |
) | |
try: | |
print(f"\n{'='*50}") | |
print(f"๐ Processing: '{user_message}'") | |
print(f"{'='*50}") | |
# Step 1: Language and sentiment analysis (CPU-bound operations in thread pool) | |
loop = asyncio.get_event_loop() | |
detected_language = await loop.run_in_executor( | |
thread_pool, self.detect_language, user_message | |
) | |
sentiment_result = await loop.run_in_executor( | |
thread_pool, self.analyze_sentiment, user_message | |
) | |
keywords = await loop.run_in_executor( | |
thread_pool, self.extract_keywords, user_message | |
) | |
print(f"๐ Detected Language: {detected_language}") | |
print(f"๐ Sentiment: {sentiment_result}") | |
print(f"๐ Keywords: {keywords}") | |
# Step 2: Router Chain (CPU-bound LLM operation in thread pool) | |
print(f"\n๐ค Running Router Chain...") | |
router_result = await loop.run_in_executor( | |
thread_pool, | |
lambda: self.router_chain.invoke({ | |
"user_query": user_message, | |
"detected_language": detected_language, | |
"extracted_keywords": json.dumps(keywords), | |
"sentiment_analysis": json.dumps(sentiment_result), | |
"conversation_history": self.get_conversation_context(user_id), | |
"endpoints_documentation": json.dumps(self.endpoints_documentation, indent=2), | |
"current_datetime": datetime.now().strftime('%Y-%m-%dT%H:%M:%S'), | |
"timezone": "UTC", | |
"current_day_name": datetime.now().strftime('%A'), | |
}) | |
) | |
# Parse router response | |
router_data = await loop.run_in_executor( | |
thread_pool, self.parse_router_response, router_result["text"] | |
) | |
print(f"๐ฏ Router Decision: {router_data}") | |
# Step 3: Handle based on intent | |
if router_data["intent"] == "CONVERSATION" and router_data['endpoint'] == '': | |
print(f"\n๐ฌ Handling as CONVERSATION") | |
response_text = await loop.run_in_executor( | |
thread_pool, | |
lambda: self.handle_conversation(user_message, detected_language, sentiment_result) | |
) | |
# Add to conversation history | |
self.add_to_history(user_id, user_message, response_text, "conversation") | |
return ChatResponse( | |
response_id=str(time.time()), | |
response_type="conversation", | |
message=response_text, | |
api_call_made=False, | |
language=detected_language, | |
api_data=None | |
) | |
elif router_data["intent"] == "API_ACTION": | |
print(f"\n๐ Handling as API_ACTION") | |
# Handle API action | |
api_result = await self.handle_api_action( | |
user_message, detected_language, sentiment_result, keywords, router_data | |
) | |
# Add to conversation history | |
self.add_to_history(user_id, user_message, api_result["response"], "api_action") | |
return ChatResponse( | |
response_id=str(time.time()), | |
response_type="api_action", | |
message=api_result["response"], | |
api_call_made=True, | |
language=detected_language | |
) | |
else: | |
# Fallback for unknown intent | |
print(f"โ ๏ธ Unknown intent: {router_data['intent']}") | |
fallback_response = await loop.run_in_executor( | |
thread_pool, | |
lambda: self.handle_conversation(user_message, detected_language, sentiment_result) | |
) | |
return ChatResponse( | |
response_id=str(time.time()), | |
response_type="conversation", | |
message=fallback_response, | |
api_call_made=False, | |
language=detected_language | |
) | |
except Exception as e: | |
print(f"โ Error in chat method: {str(e)}") | |
print(f"โ Traceback: {traceback.format_exc()}") | |
# Fallback error response | |
if self.detect_language(user_message) == "arabic": | |
error_message = "ุฃุนุชุฐุฑุ ุญุฏุซ ุฎุทุฃ ูู ู ุนุงูุฌุฉ ุฑุณุงูุชู. ูุฑุฌู ุงูู ุญุงููุฉ ู ุฑุฉ ุฃุฎุฑู." | |
else: | |
error_message = "I apologize, there was an error processing your message. Please try again." | |
return ChatResponse( | |
response_id=str(time.time()), | |
response_type="conversation", | |
message=error_message, | |
api_call_made=False, | |
language=self.detect_language(user_message) | |
) | |
finally: | |
end_time = time.time() | |
print(f"โฑ๏ธ Processing time: {end_time - start_time:.2f} seconds") | |
async def run_interactive_chat(self): | |
"""Run the interactive chat interface""" | |
try: | |
while True: | |
try: | |
# Get user input | |
user_input = input("\n๐ค You: ").strip() | |
if not user_input: | |
continue | |
# Process the message | |
response = await self.chat(user_input) | |
# Display the response | |
print(f"\n๐ค Bot: {response.message}") | |
# Check for exit | |
if user_input.lower() in ['quit', 'exit', 'ุฎุฑูุฌ', 'bye', 'goodbye']: | |
break | |
except KeyboardInterrupt: | |
print("\n\n๐ Chat interrupted. Goodbye!") | |
break | |
except EOFError: | |
print("\n\n๐ Chat ended. Goodbye!") | |
break | |
except Exception as e: | |
print(f"\nโ Error: {e}") | |
continue | |
except Exception as e: | |
print(f"โ Fatal error in chat interface: {e}") | |
def clear_history(self): | |
"""Clear conversation history""" | |
self.conversation_history = [] | |
print("๐๏ธ Conversation history cleared.") | |
def main(): | |
"""Main function to run the healthcare chatbot""" | |
try: | |
print("๐ Starting Healthcare Chatbot...") | |
chatbot = HealthcareChatbot() | |
chatbot.run_interactive_chat() | |
except KeyboardInterrupt: | |
print("\n\n๐ Shutting down gracefully...") | |
except Exception as e: | |
print(f"โ Fatal error: {e}") | |
print(f"โ Traceback: {traceback.format_exc()}") | |
if __name__ == "__main__": | |
main() | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from typing import Dict, Any, Optional | |
# FastAPI application setup | |
app = FastAPI( | |
title="Healthcare AI Assistant", | |
description="An AI-powered healthcare assistant that handles appointment booking and queries", | |
version="1.0.0" | |
) | |
# Add CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Initialize the AI agent | |
agent = HealthcareChatbot() | |
class QueryRequest(BaseModel): | |
query: str | |
user_id: Optional[str] = None | |
async def process_query(request: QueryRequest): | |
""" | |
Process a user query and return a response | |
""" | |
try: | |
response = await agent.chat(request.query, request.user_id) | |
return response.dict() | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def health_check(): | |
""" | |
Health check endpoint | |
""" | |
return {"status": "healthy", "service": "healthcare-ai-assistant"} | |
async def root(): | |
return {"message": "Hello World"} | |
async def startup_event(): | |
"""Initialize resources on startup""" | |
agent._initialize_http_client() | |
async def shutdown_event(): | |
"""Cleanup resources on shutdown""" | |
await agent._close_http_client() | |
thread_pool.shutdown(wait=True) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000, workers=4) |