gaia-enhanced-agent / tools /web_research_tool.py
GAIA Agent Deployment
Deploy Complete Enhanced GAIA Agent with Phase 1-6 Improvements
9a6a4dc
"""
Enhanced Web Research Tool for GAIA Agent
Integrates with Exa API for advanced web search capabilities
"""
import os
import logging
import asyncio
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
import re
try:
from exa_py import Exa
EXA_AVAILABLE = True
except ImportError:
EXA_AVAILABLE = False
try:
import requests
from bs4 import BeautifulSoup
WEB_SCRAPING_AVAILABLE = True
except ImportError:
WEB_SCRAPING_AVAILABLE = False
logger = logging.getLogger(__name__)
@dataclass
class SearchResult:
"""Structured search result with metadata."""
title: str
url: str
content: str
score: float
source: str
published_date: Optional[str] = None
author: Optional[str] = None
domain: str = ""
def __post_init__(self):
if self.url and not self.domain:
try:
from urllib.parse import urlparse
self.domain = urlparse(self.url).netloc
except:
self.domain = "unknown"
@dataclass
class SearchQuery:
"""Structured search query with parameters."""
query: str
query_type: str = "general" # general, factual, biographical, historical, technical
time_range: Optional[str] = None # recent, year, month, week
num_results: int = 10
include_domains: Optional[List[str]] = None
exclude_domains: Optional[List[str]] = None
require_date: bool = False
class EnhancedWebSearchTool:
"""
Enhanced web search tool with multiple search strategies and result ranking.
Features:
- Exa API integration for semantic search
- Multi-source search aggregation
- Result ranking and relevance scoring
- Fallback search strategies
- Content extraction and summarization
"""
def __init__(self, exa_api_key: Optional[str] = None):
"""Initialize the enhanced web search tool."""
self.exa_api_key = exa_api_key or os.getenv("EXA_API_KEY")
self.exa_client = None
if self.exa_api_key and EXA_AVAILABLE:
try:
self.exa_client = Exa(api_key=self.exa_api_key)
logger.info("✅ Exa API client initialized successfully")
except Exception as e:
logger.warning(f"⚠️ Failed to initialize Exa client: {e}")
else:
logger.warning("⚠️ Exa API not available - check API key and dependencies")
# Initialize fallback search capabilities
self.fallback_available = WEB_SCRAPING_AVAILABLE
# Search result cache for efficiency
self._cache = {}
self._cache_ttl = 3600 # 1 hour cache
def search(self, query: Union[str, SearchQuery], **kwargs) -> List[SearchResult]:
"""
Perform enhanced web search with multiple strategies.
Args:
query: Search query string or SearchQuery object
**kwargs: Additional search parameters
Returns:
List of SearchResult objects ranked by relevance
"""
# Convert string query to SearchQuery object
if isinstance(query, str):
search_query = SearchQuery(
query=query,
query_type=kwargs.get('query_type', 'general'),
time_range=kwargs.get('time_range'),
num_results=kwargs.get('num_results', 10),
include_domains=kwargs.get('include_domains'),
exclude_domains=kwargs.get('exclude_domains'),
require_date=kwargs.get('require_date', False)
)
else:
search_query = query
logger.info(f"🔍 Searching: {search_query.query}")
# Check cache first
cache_key = self._get_cache_key(search_query)
if cache_key in self._cache:
cache_entry = self._cache[cache_key]
if datetime.now() - cache_entry['timestamp'] < timedelta(seconds=self._cache_ttl):
logger.info("📋 Returning cached results")
return cache_entry['results']
results = []
# Primary search: Exa API
if self.exa_client:
try:
exa_results = self._search_with_exa(search_query)
results.extend(exa_results)
logger.info(f"✅ Exa search returned {len(exa_results)} results")
except Exception as e:
logger.warning(f"⚠️ Exa search failed: {e}")
# Fallback search strategies
if len(results) < search_query.num_results // 2:
try:
fallback_results = self._fallback_search(search_query)
results.extend(fallback_results)
logger.info(f"✅ Fallback search returned {len(fallback_results)} results")
except Exception as e:
logger.warning(f"⚠️ Fallback search failed: {e}")
# Rank and filter results
ranked_results = self._rank_results(results, search_query)
# Cache results
self._cache[cache_key] = {
'results': ranked_results,
'timestamp': datetime.now()
}
logger.info(f"🎯 Returning {len(ranked_results)} ranked results")
return ranked_results
def _search_with_exa(self, search_query: SearchQuery) -> List[SearchResult]:
"""Search using Exa API with advanced parameters."""
if not self.exa_client:
return []
try:
# Configure Exa search parameters
search_params = {
'query': search_query.query,
'num_results': min(search_query.num_results, 20),
'include_domains': search_query.include_domains,
'exclude_domains': search_query.exclude_domains,
'use_autoprompt': True, # Let Exa optimize the query
'type': 'neural' # Use neural search for better semantic matching
}
# Add time filtering if specified
if search_query.time_range:
if search_query.time_range == 'recent':
search_params['start_published_date'] = (datetime.now() - timedelta(days=30)).isoformat()
elif search_query.time_range == 'year':
search_params['start_published_date'] = (datetime.now() - timedelta(days=365)).isoformat()
elif search_query.time_range == 'month':
search_params['start_published_date'] = (datetime.now() - timedelta(days=30)).isoformat()
elif search_query.time_range == 'week':
search_params['start_published_date'] = (datetime.now() - timedelta(days=7)).isoformat()
# Perform search
response = self.exa_client.search_and_contents(**search_params)
results = []
for item in response.results:
try:
result = SearchResult(
title=item.title or "No title",
url=item.url,
content=item.text or "",
score=item.score if hasattr(item, 'score') else 0.5,
source="exa",
published_date=item.published_date if hasattr(item, 'published_date') else None,
author=item.author if hasattr(item, 'author') else None
)
results.append(result)
except Exception as e:
logger.warning(f"⚠️ Error processing Exa result: {e}")
continue
return results
except Exception as e:
logger.error(f"❌ Exa search error: {e}")
return []
def _fallback_search(self, search_query: SearchQuery) -> List[SearchResult]:
"""Fallback search using DuckDuckGo or other methods."""
if not WEB_SCRAPING_AVAILABLE:
return []
try:
# Use DuckDuckGo search as fallback
from duckduckgo_search import DDGS
results = []
with DDGS() as ddgs:
search_results = ddgs.text(
search_query.query,
max_results=min(search_query.num_results, 10)
)
for item in search_results:
try:
result = SearchResult(
title=item.get('title', 'No title'),
url=item.get('href', ''),
content=item.get('body', ''),
score=0.3, # Lower score for fallback results
source="duckduckgo"
)
results.append(result)
except Exception as e:
logger.warning(f"⚠️ Error processing DDG result: {e}")
continue
return results
except Exception as e:
logger.warning(f"⚠️ Fallback search error: {e}")
return []
def _rank_results(self, results: List[SearchResult], search_query: SearchQuery) -> List[SearchResult]:
"""Rank search results by relevance and quality."""
if not results:
return []
# Calculate relevance scores
for result in results:
relevance_score = self._calculate_relevance(result, search_query)
quality_score = self._calculate_quality(result)
# Combine scores (weighted average)
result.score = (relevance_score * 0.7) + (quality_score * 0.3)
# Sort by score (descending)
ranked_results = sorted(results, key=lambda x: x.score, reverse=True)
# Remove duplicates based on URL
seen_urls = set()
unique_results = []
for result in ranked_results:
if result.url not in seen_urls:
seen_urls.add(result.url)
unique_results.append(result)
# Return top results
return unique_results[:search_query.num_results]
def _calculate_relevance(self, result: SearchResult, search_query: SearchQuery) -> float:
"""Calculate relevance score based on query matching."""
query_terms = search_query.query.lower().split()
title_lower = result.title.lower()
content_lower = result.content.lower()
# Count term matches in title (higher weight)
title_matches = sum(1 for term in query_terms if term in title_lower)
title_score = title_matches / len(query_terms) if query_terms else 0
# Count term matches in content
content_matches = sum(1 for term in query_terms if term in content_lower)
content_score = content_matches / len(query_terms) if query_terms else 0
# Combine scores
relevance = (title_score * 0.6) + (content_score * 0.4)
# Boost for exact phrase matches
if search_query.query.lower() in title_lower:
relevance += 0.3
elif search_query.query.lower() in content_lower:
relevance += 0.2
return min(relevance, 1.0)
def _calculate_quality(self, result: SearchResult) -> float:
"""Calculate quality score based on source and content characteristics."""
quality = 0.5 # Base score
# Domain reputation boost
trusted_domains = [
'wikipedia.org', 'britannica.com', 'reuters.com', 'bbc.com',
'cnn.com', 'nytimes.com', 'washingtonpost.com', 'theguardian.com',
'nature.com', 'science.org', 'arxiv.org', 'pubmed.ncbi.nlm.nih.gov'
]
if any(domain in result.domain for domain in trusted_domains):
quality += 0.3
# Content length boost (longer content often more informative)
if len(result.content) > 500:
quality += 0.1
elif len(result.content) > 1000:
quality += 0.2
# Published date boost (recent content)
if result.published_date:
try:
pub_date = datetime.fromisoformat(result.published_date.replace('Z', '+00:00'))
days_old = (datetime.now() - pub_date.replace(tzinfo=None)).days
if days_old < 30:
quality += 0.1
elif days_old < 365:
quality += 0.05
except:
pass
# Source boost
if result.source == "exa":
quality += 0.1
return min(quality, 1.0)
def _get_cache_key(self, search_query: SearchQuery) -> str:
"""Generate cache key for search query."""
key_data = {
'query': search_query.query,
'type': search_query.query_type,
'time_range': search_query.time_range,
'num_results': search_query.num_results
}
return str(hash(json.dumps(key_data, sort_keys=True)))
def extract_content(self, url: str) -> Optional[str]:
"""Extract clean content from a URL."""
if not WEB_SCRAPING_AVAILABLE:
return None
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text content
text = soup.get_text()
# Clean up text
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return text[:5000] # Limit content length
except Exception as e:
logger.warning(f"⚠️ Content extraction failed for {url}: {e}")
return None
def search_for_factual_answer(self, question: str) -> Optional[str]:
"""
Search for a specific factual answer to a question.
Args:
question: The factual question to answer
Returns:
The most likely answer or None if not found
"""
# Create targeted search query
search_query = SearchQuery(
query=question,
query_type="factual",
num_results=5,
require_date=False
)
results = self.search(search_query)
if not results:
return None
# Extract potential answers from top results
answers = []
for result in results[:3]: # Check top 3 results
content = result.content
if content:
# Look for direct answers in the content
answer = self._extract_answer_from_content(content, question)
if answer:
answers.append(answer)
# Return the most common answer or the first one found
if answers:
return answers[0]
return None
def _extract_answer_from_content(self, content: str, question: str) -> Optional[str]:
"""Extract a direct answer from content based on the question."""
# This is a simplified answer extraction
# In a production system, you'd use more sophisticated NLP
sentences = content.split('.')
question_lower = question.lower()
# Look for sentences that might contain the answer
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 10 and len(sentence) < 200:
# Check if sentence is relevant to the question
if any(word in sentence.lower() for word in question_lower.split() if len(word) > 3):
return sentence
return None
def get_search_suggestions(self, partial_query: str) -> List[str]:
"""Get search suggestions for a partial query."""
# This would typically use a search suggestion API
# For now, return some basic suggestions
suggestions = [
f"{partial_query} definition",
f"{partial_query} facts",
f"{partial_query} history",
f"{partial_query} recent news",
f"what is {partial_query}"
]
return suggestions[:5]