import gradio as gr import os import torch import requests import re import time import json from transformers import AutoModelForCausalLM, AutoTokenizer from bs4 import BeautifulSoup import urllib.parse from markdown import markdown # Set environment variables os.environ["TOKENIZERS_PARALLELISM"] = "false" print("Loading model... Please wait...") # Load the model with proper error handling try: # Try with Phi-2 MODEL_ID = "microsoft/phi-2" tokenizer = AutoTokenizer.from_pretrained( MODEL_ID, trust_remote_code=True ) model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.float16, device_map="auto", trust_remote_code=True ) print("Successfully loaded Phi-2 model") except Exception as e: print(f"Error loading Phi-2: {e}") print("Trying fallback model...") try: # Fallback to FLAN-T5-base MODEL_ID = "google/flan-t5-base" tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) from transformers import T5ForConditionalGeneration model = T5ForConditionalGeneration.from_pretrained( MODEL_ID, torch_dtype=torch.float16, device_map="auto" ) print("Successfully loaded fallback model") except Exception as e: print(f"Error loading fallback model: {e}") print("Operating in reduced functionality mode") def search_web(query, max_results=5): """Perform real web searches using multiple search endpoints""" results = [] # Try multiple search methods for reliability # Method 1: Wikipedia API try: wiki_url = f"https://en.wikipedia.org/w/api.php?action=opensearch&search={urllib.parse.quote(query)}&limit={max_results}&namespace=0&format=json" response = requests.get(wiki_url, timeout=5) if response.status_code == 200: data = response.json() titles = data[1] urls = data[3] for i in range(min(len(titles), len(urls))): # Get summary for each page page_url = f"https://en.wikipedia.org/w/api.php?action=query&prop=extracts&exintro&explaintext&titles={urllib.parse.quote(titles[i])}&format=json" page_response = requests.get(page_url, timeout=5) if page_response.status_code == 200: page_data = page_response.json() try: page_id = next(iter(page_data['query']['pages'].keys())) if page_id != "-1": extract = page_data['query']['pages'][page_id].get('extract', '') snippet = extract[:200] + "..." if len(extract) > 200 else extract results.append({ 'title': f"Wikipedia - {titles[i]}", 'url': urls[i], 'snippet': snippet }) except Exception as e: print(f"Error extracting wiki data: {e}") continue except Exception as e: print(f"Wikipedia search error: {e}") # Method 2: Public Search API (SerpAPI demo) if len(results) < max_results: try: serpapi_url = f"https://serpapi.com/search.json?engine=google&q={urllib.parse.quote(query)}&api_key=demo" response = requests.get(serpapi_url, timeout=5) if response.status_code == 200: data = response.json() if "organic_results" in data: for result in data["organic_results"][:max_results - len(results)]: results.append({ 'title': result.get('title', ''), 'url': result.get('link', ''), 'snippet': result.get('snippet', '') }) except Exception as e: print(f"SerpAPI error: {e}") # Method 3: Direct web scraping (as last resort) if len(results) < max_results: try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } url = f"https://www.bing.com/search?q={urllib.parse.quote(query)}" response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') search_results = soup.find_all('li', class_='b_algo') for result in search_results[:max_results - len(results)]: title_elem = result.find('h2') if title_elem and title_elem.find('a'): title = title_elem.text url = title_elem.find('a')['href'] snippet_elem = result.find('div', class_='b_caption') snippet = snippet_elem.find('p').text if snippet_elem and snippet_elem.find('p') else "" results.append({ 'title': title, 'url': url, 'snippet': snippet }) except Exception as e: print(f"Web scraping error: {e}") # If we still don't have results, create minimal placeholder results # This ensures the UI doesn't break if all search methods fail if not results: results = [ { 'title': f"Search: {query}", 'url': f"https://www.google.com/search?q={urllib.parse.quote(query)}", 'snippet': "Search engine results for your query." } ] return results[:max_results] def generate_response(prompt, max_new_tokens=256): """Generate response using the AI model with robust fallbacks""" # Check if model is loaded properly if 'model' not in globals() or model is None: print("Model not available for generation") response = f"Based on the search results for '{query}', I can provide the following information:\n\n" # Extract key information from search results for i, result in enumerate(search_results[:3], 1): # Add a section for each source with actual content title = result['title'].replace("Wikipedia - ", "") content = result['snippet'] response += f"**{title}**: {content} [{i}]\n\n" # Add a conclusion response += f"These sources provide information about {query} from different perspectives. For more detailed information, you can explore the full sources listed below." return response try: # For T5 models if "t5" in MODEL_ID.lower(): # Simplify prompt for T5 simple_prompt = prompt if len(simple_prompt) > 512: # Truncate to essential parts for T5 parts = prompt.split("\n\n") query_part = next((p for p in parts if p.startswith("Query:")), "") instruction_part = parts[-1] if parts else "" simple_prompt = f"{query_part}\n\n{instruction_part}" inputs = tokenizer(simple_prompt, return_tensors="pt", truncation=True, max_length=512).to(model.device) with torch.no_grad(): outputs = model.generate( inputs.input_ids, max_new_tokens=max_new_tokens, temperature=0.8, do_sample=True, top_k=50, repetition_penalty=1.2 ) response = tokenizer.decode(outputs[0], skip_special_tokens=True) # If response is too short, try again with different parameters if len(response) < 50: outputs = model.generate( inputs.input_ids, max_new_tokens=max_new_tokens, num_beams=4, temperature=1.0, do_sample=False ) response = tokenizer.decode(outputs[0], skip_special_tokens=True) return response # For Phi and other models else: # Extract just the query from the prompt for simpler generation query = "" search_results_text = "" if "Query:" in prompt: query_section = prompt.split("Query:")[1].split("\n")[0].strip() query = query_section elif "question:" in prompt.lower(): query_section = prompt.split("question:")[1].split("\n")[0].strip() query = query_section else: # Try to extract from the beginning of the prompt query = prompt.split("\n")[0].strip() if "Search Results:" in prompt: search_results_text = prompt.split("Search Results:")[1].split("Based on")[0].strip() # Create a simpler prompt format for better results simple_prompt = f"Answer this question based on these search results:\n\nQuestion: {query}\n\nSearch Results: {search_results_text[:500]}...\n\nAnswer:" # Adjust format based on model if "phi" in MODEL_ID.lower(): formatted_prompt = f"Instruct: {simple_prompt}\nOutput:" else: formatted_prompt = simple_prompt inputs = tokenizer(formatted_prompt, return_tensors="pt", truncation=True, max_length=512).to(model.device) with torch.no_grad(): outputs = model.generate( inputs.input_ids, max_new_tokens=max_new_tokens, temperature=0.85, top_p=0.92, top_k=50, do_sample=True, pad_token_id=tokenizer.eos_token_id ) response = tokenizer.decode(outputs[0][inputs.input_ids.size(1):], skip_special_tokens=True).strip() # Check if response is empty or too short if not response or len(response) < 20: print("First generation attempt failed, trying alternative method") # Try with different parameters outputs = model.generate( inputs.input_ids, max_new_tokens=max_new_tokens, num_beams=3, # Use beam search temperature=1.0, do_sample=False, # Deterministic generation repetition_penalty=1.2, pad_token_id=tokenizer.eos_token_id ) response = tokenizer.decode(outputs[0][inputs.input_ids.size(1):], skip_special_tokens=True).strip() # If still no good response, use a minimal reliable response if not response or len(response) < 20: print("Second generation attempt failed, using fallback response") # Create a simple response that's guaranteed to work if query: base_response = f"Based on the search results, I can provide information about {query}. " base_response += "The sources contain relevant details about this topic. " base_response += "You can refer to them for more in-depth information." return base_response else: return "Based on the search results, I can provide information related to your query. Please check the sources for more details." return response except Exception as e: print(f"Error in generate_response: {e}") # Return a guaranteed fallback response return "Based on the search results, I found information related to your query. The sources listed below contain more detailed information about this topic." def parse_related_topics(text, query): """Extract related topics from generated text with better fallbacks""" topics = [] # Parse lines and clean them up lines = text.split('\n') for line in lines: # Clean up line from numbers and symbols clean_line = re.sub(r'^[\d\-\*\•\.\s]+', '', line.strip()) if clean_line and len(clean_line) > 10: # Make sure it ends with a question mark if it seems like a question if any(q in clean_line.lower() for q in ['what', 'how', 'why', 'when', 'where', 'who']) and not clean_line.endswith('?'): clean_line += '?' topics.append(clean_line) # If we don't have enough topics, generate some based on the query if len(topics) < 3: base_queries = [ f"What is the history of {query}?", f"How does {query} work?", f"What are the latest developments in {query}?", f"What are common applications of {query}?", f"How is {query} used today?" ] # Add base queries until we have at least 3 for bq in base_queries: if len(topics) >= 3: break if not any(bq.lower() in t.lower() for t in topics): topics.append(bq) return topics[:3] # Return top 3 topics def ensure_citations(text, search_results): """Ensure citations are properly added to the text""" # If text is too short, return a generic message if not text or len(text.strip()) < 10: return "I couldn't generate a proper response for this query. Please try a different search term." # Add citations if not present if not re.search(r'\[\d+\]', text): # Try to find snippets in the answer for i, result in enumerate(search_results, 1): key_phrases = result['snippet'].split('.') for phrase in key_phrases: if phrase and len(phrase) > 15 and phrase.strip() in text: text = text.replace(phrase, f"{phrase} [{i}]", 1) # If still no citations, add a generic one at the end if not re.search(r'\[\d+\]', text): text += f" [{1}]" return text def process_query(query): """Main function to process a query with robust response generation""" try: # Step 1: Search the web for real results search_results = search_web(query, max_results=5) # Step 2: Create context from search results - shorter and more focused context = f"Query: {query}\n\n" context += "Search Results Summary:\n\n" for i, result in enumerate(search_results, 1): # Use shorter context to avoid token limits context += f"Source {i}: {result['title']}\n" context += f"Content: {result['snippet'][:150]}\n\n" # Step 3: Create a simpler prompt for the AI model prompt = f"""Answer this question based on the search results: {query} {context} Provide a clear answer using information from these sources. Include citations like [1], [2] to reference sources.""" # Step 4: Generate answer using the improved generation function answer = generate_response(prompt, max_new_tokens=384) # Step 5: Ensure we have some answer content if not answer or len(answer.strip()) < 30: print("Fallback to generic response") answer = f"Based on the search results for '{query}', I found relevant information in the sources listed below. They provide details about this topic that you may find useful." # Step 6: Ensure citations answer = ensure_citations(answer, search_results) # Step 7: Generate related topics # Use a simpler approach to get related topics since this might be failing too try: related_prompt = f"Generate 3 questions related to: {query}" related_raw = generate_response(related_prompt, max_new_tokens=150) related_topics = parse_related_topics(related_raw, query) except Exception as e: print(f"Error generating related topics: {e}") # Fallback topics related_topics = [ f"What is the history of {query}?", f"How does {query} work?", f"What are applications of {query}?" ] # Return the complete result return { "answer": answer, "sources": search_results, "related_topics": related_topics } except Exception as e: print(f"Error in process_query: {e}") # Return a minimal result that won't break the UI return { "answer": f"I found information about '{query}' in the sources below. They provide details about this topic that may be helpful.", "sources": search_results if 'search_results' in locals() else search_web(query, max_results=2), "related_topics": [f"What is {query}?", f"History of {query}", f"How to use {query}"] } def format_sources(sources): """Format sources for display""" if not sources: return "" html = "" for i, source in enumerate(sources, 1): html += f"""
""" return html def format_related(topics): """Format related topics for display with reliable click handlers""" if not topics: return "" # Create HTML with unique IDs for each topic html = "Get comprehensive answers with real sources for any question.