Spaces:
Running
Running
File size: 17,094 Bytes
9a6a4dc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 |
"""
Enhanced Web Research Tool for GAIA Agent
Integrates with Exa API for advanced web search capabilities
"""
import os
import logging
import asyncio
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
import re
try:
from exa_py import Exa
EXA_AVAILABLE = True
except ImportError:
EXA_AVAILABLE = False
try:
import requests
from bs4 import BeautifulSoup
WEB_SCRAPING_AVAILABLE = True
except ImportError:
WEB_SCRAPING_AVAILABLE = False
logger = logging.getLogger(__name__)
@dataclass
class SearchResult:
"""Structured search result with metadata."""
title: str
url: str
content: str
score: float
source: str
published_date: Optional[str] = None
author: Optional[str] = None
domain: str = ""
def __post_init__(self):
if self.url and not self.domain:
try:
from urllib.parse import urlparse
self.domain = urlparse(self.url).netloc
except:
self.domain = "unknown"
@dataclass
class SearchQuery:
"""Structured search query with parameters."""
query: str
query_type: str = "general" # general, factual, biographical, historical, technical
time_range: Optional[str] = None # recent, year, month, week
num_results: int = 10
include_domains: Optional[List[str]] = None
exclude_domains: Optional[List[str]] = None
require_date: bool = False
class EnhancedWebSearchTool:
"""
Enhanced web search tool with multiple search strategies and result ranking.
Features:
- Exa API integration for semantic search
- Multi-source search aggregation
- Result ranking and relevance scoring
- Fallback search strategies
- Content extraction and summarization
"""
def __init__(self, exa_api_key: Optional[str] = None):
"""Initialize the enhanced web search tool."""
self.exa_api_key = exa_api_key or os.getenv("EXA_API_KEY")
self.exa_client = None
if self.exa_api_key and EXA_AVAILABLE:
try:
self.exa_client = Exa(api_key=self.exa_api_key)
logger.info("✅ Exa API client initialized successfully")
except Exception as e:
logger.warning(f"⚠️ Failed to initialize Exa client: {e}")
else:
logger.warning("⚠️ Exa API not available - check API key and dependencies")
# Initialize fallback search capabilities
self.fallback_available = WEB_SCRAPING_AVAILABLE
# Search result cache for efficiency
self._cache = {}
self._cache_ttl = 3600 # 1 hour cache
def search(self, query: Union[str, SearchQuery], **kwargs) -> List[SearchResult]:
"""
Perform enhanced web search with multiple strategies.
Args:
query: Search query string or SearchQuery object
**kwargs: Additional search parameters
Returns:
List of SearchResult objects ranked by relevance
"""
# Convert string query to SearchQuery object
if isinstance(query, str):
search_query = SearchQuery(
query=query,
query_type=kwargs.get('query_type', 'general'),
time_range=kwargs.get('time_range'),
num_results=kwargs.get('num_results', 10),
include_domains=kwargs.get('include_domains'),
exclude_domains=kwargs.get('exclude_domains'),
require_date=kwargs.get('require_date', False)
)
else:
search_query = query
logger.info(f"🔍 Searching: {search_query.query}")
# Check cache first
cache_key = self._get_cache_key(search_query)
if cache_key in self._cache:
cache_entry = self._cache[cache_key]
if datetime.now() - cache_entry['timestamp'] < timedelta(seconds=self._cache_ttl):
logger.info("📋 Returning cached results")
return cache_entry['results']
results = []
# Primary search: Exa API
if self.exa_client:
try:
exa_results = self._search_with_exa(search_query)
results.extend(exa_results)
logger.info(f"✅ Exa search returned {len(exa_results)} results")
except Exception as e:
logger.warning(f"⚠️ Exa search failed: {e}")
# Fallback search strategies
if len(results) < search_query.num_results // 2:
try:
fallback_results = self._fallback_search(search_query)
results.extend(fallback_results)
logger.info(f"✅ Fallback search returned {len(fallback_results)} results")
except Exception as e:
logger.warning(f"⚠️ Fallback search failed: {e}")
# Rank and filter results
ranked_results = self._rank_results(results, search_query)
# Cache results
self._cache[cache_key] = {
'results': ranked_results,
'timestamp': datetime.now()
}
logger.info(f"🎯 Returning {len(ranked_results)} ranked results")
return ranked_results
def _search_with_exa(self, search_query: SearchQuery) -> List[SearchResult]:
"""Search using Exa API with advanced parameters."""
if not self.exa_client:
return []
try:
# Configure Exa search parameters
search_params = {
'query': search_query.query,
'num_results': min(search_query.num_results, 20),
'include_domains': search_query.include_domains,
'exclude_domains': search_query.exclude_domains,
'use_autoprompt': True, # Let Exa optimize the query
'type': 'neural' # Use neural search for better semantic matching
}
# Add time filtering if specified
if search_query.time_range:
if search_query.time_range == 'recent':
search_params['start_published_date'] = (datetime.now() - timedelta(days=30)).isoformat()
elif search_query.time_range == 'year':
search_params['start_published_date'] = (datetime.now() - timedelta(days=365)).isoformat()
elif search_query.time_range == 'month':
search_params['start_published_date'] = (datetime.now() - timedelta(days=30)).isoformat()
elif search_query.time_range == 'week':
search_params['start_published_date'] = (datetime.now() - timedelta(days=7)).isoformat()
# Perform search
response = self.exa_client.search_and_contents(**search_params)
results = []
for item in response.results:
try:
result = SearchResult(
title=item.title or "No title",
url=item.url,
content=item.text or "",
score=item.score if hasattr(item, 'score') else 0.5,
source="exa",
published_date=item.published_date if hasattr(item, 'published_date') else None,
author=item.author if hasattr(item, 'author') else None
)
results.append(result)
except Exception as e:
logger.warning(f"⚠️ Error processing Exa result: {e}")
continue
return results
except Exception as e:
logger.error(f"❌ Exa search error: {e}")
return []
def _fallback_search(self, search_query: SearchQuery) -> List[SearchResult]:
"""Fallback search using DuckDuckGo or other methods."""
if not WEB_SCRAPING_AVAILABLE:
return []
try:
# Use DuckDuckGo search as fallback
from duckduckgo_search import DDGS
results = []
with DDGS() as ddgs:
search_results = ddgs.text(
search_query.query,
max_results=min(search_query.num_results, 10)
)
for item in search_results:
try:
result = SearchResult(
title=item.get('title', 'No title'),
url=item.get('href', ''),
content=item.get('body', ''),
score=0.3, # Lower score for fallback results
source="duckduckgo"
)
results.append(result)
except Exception as e:
logger.warning(f"⚠️ Error processing DDG result: {e}")
continue
return results
except Exception as e:
logger.warning(f"⚠️ Fallback search error: {e}")
return []
def _rank_results(self, results: List[SearchResult], search_query: SearchQuery) -> List[SearchResult]:
"""Rank search results by relevance and quality."""
if not results:
return []
# Calculate relevance scores
for result in results:
relevance_score = self._calculate_relevance(result, search_query)
quality_score = self._calculate_quality(result)
# Combine scores (weighted average)
result.score = (relevance_score * 0.7) + (quality_score * 0.3)
# Sort by score (descending)
ranked_results = sorted(results, key=lambda x: x.score, reverse=True)
# Remove duplicates based on URL
seen_urls = set()
unique_results = []
for result in ranked_results:
if result.url not in seen_urls:
seen_urls.add(result.url)
unique_results.append(result)
# Return top results
return unique_results[:search_query.num_results]
def _calculate_relevance(self, result: SearchResult, search_query: SearchQuery) -> float:
"""Calculate relevance score based on query matching."""
query_terms = search_query.query.lower().split()
title_lower = result.title.lower()
content_lower = result.content.lower()
# Count term matches in title (higher weight)
title_matches = sum(1 for term in query_terms if term in title_lower)
title_score = title_matches / len(query_terms) if query_terms else 0
# Count term matches in content
content_matches = sum(1 for term in query_terms if term in content_lower)
content_score = content_matches / len(query_terms) if query_terms else 0
# Combine scores
relevance = (title_score * 0.6) + (content_score * 0.4)
# Boost for exact phrase matches
if search_query.query.lower() in title_lower:
relevance += 0.3
elif search_query.query.lower() in content_lower:
relevance += 0.2
return min(relevance, 1.0)
def _calculate_quality(self, result: SearchResult) -> float:
"""Calculate quality score based on source and content characteristics."""
quality = 0.5 # Base score
# Domain reputation boost
trusted_domains = [
'wikipedia.org', 'britannica.com', 'reuters.com', 'bbc.com',
'cnn.com', 'nytimes.com', 'washingtonpost.com', 'theguardian.com',
'nature.com', 'science.org', 'arxiv.org', 'pubmed.ncbi.nlm.nih.gov'
]
if any(domain in result.domain for domain in trusted_domains):
quality += 0.3
# Content length boost (longer content often more informative)
if len(result.content) > 500:
quality += 0.1
elif len(result.content) > 1000:
quality += 0.2
# Published date boost (recent content)
if result.published_date:
try:
pub_date = datetime.fromisoformat(result.published_date.replace('Z', '+00:00'))
days_old = (datetime.now() - pub_date.replace(tzinfo=None)).days
if days_old < 30:
quality += 0.1
elif days_old < 365:
quality += 0.05
except:
pass
# Source boost
if result.source == "exa":
quality += 0.1
return min(quality, 1.0)
def _get_cache_key(self, search_query: SearchQuery) -> str:
"""Generate cache key for search query."""
key_data = {
'query': search_query.query,
'type': search_query.query_type,
'time_range': search_query.time_range,
'num_results': search_query.num_results
}
return str(hash(json.dumps(key_data, sort_keys=True)))
def extract_content(self, url: str) -> Optional[str]:
"""Extract clean content from a URL."""
if not WEB_SCRAPING_AVAILABLE:
return None
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text content
text = soup.get_text()
# Clean up text
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return text[:5000] # Limit content length
except Exception as e:
logger.warning(f"⚠️ Content extraction failed for {url}: {e}")
return None
def search_for_factual_answer(self, question: str) -> Optional[str]:
"""
Search for a specific factual answer to a question.
Args:
question: The factual question to answer
Returns:
The most likely answer or None if not found
"""
# Create targeted search query
search_query = SearchQuery(
query=question,
query_type="factual",
num_results=5,
require_date=False
)
results = self.search(search_query)
if not results:
return None
# Extract potential answers from top results
answers = []
for result in results[:3]: # Check top 3 results
content = result.content
if content:
# Look for direct answers in the content
answer = self._extract_answer_from_content(content, question)
if answer:
answers.append(answer)
# Return the most common answer or the first one found
if answers:
return answers[0]
return None
def _extract_answer_from_content(self, content: str, question: str) -> Optional[str]:
"""Extract a direct answer from content based on the question."""
# This is a simplified answer extraction
# In a production system, you'd use more sophisticated NLP
sentences = content.split('.')
question_lower = question.lower()
# Look for sentences that might contain the answer
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) > 10 and len(sentence) < 200:
# Check if sentence is relevant to the question
if any(word in sentence.lower() for word in question_lower.split() if len(word) > 3):
return sentence
return None
def get_search_suggestions(self, partial_query: str) -> List[str]:
"""Get search suggestions for a partial query."""
# This would typically use a search suggestion API
# For now, return some basic suggestions
suggestions = [
f"{partial_query} definition",
f"{partial_query} facts",
f"{partial_query} history",
f"{partial_query} recent news",
f"what is {partial_query}"
]
return suggestions[:5] |