Spaces:
Starting
Starting
import logging | |
import os | |
import asyncio | |
from langchain_core.tools import StructuredTool | |
from pydantic import BaseModel, Field | |
from typing import Optional, List | |
from duckduckgo_search import DDGS | |
from serpapi import GoogleSearch | |
logger = logging.getLogger(__name__) | |
class DuckDuckGoSearchInput(BaseModel): | |
query: str = Field(description="Search query") | |
original_query: str = Field(description="Original query for context") | |
embedder: Optional[object] = Field(description="SentenceTransformer embedder", default=None) | |
async def duckduckgo_search_func(query: str, original_query: str, embedder: Optional[object] = None) -> List[str]: | |
""" | |
Perform a DuckDuckGo search with retries and fall back to SerpAPI if needed. | |
Args: | |
query (str): Search query. | |
original_query (str): Original query for context. | |
embedder (Optional[object]): SentenceTransformer for result filtering. | |
Returns: | |
List[str]: List of search result snippets. | |
""" | |
async def try_duckduckgo(query: str, max_retries: int = 3) -> List[str]: | |
for attempt in range(max_retries): | |
try: | |
logger.info(f"DuckDuckGo search attempt {attempt + 1} for query: {query}") | |
with DDGS() as ddgs: | |
results = [r['body'] for r in ddgs.text(query, max_results=5)] | |
return results | |
except Exception as e: | |
if "Ratelimit" in str(e) and attempt < max_retries - 1: | |
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s | |
logger.warning(f"DuckDuckGo rate limit hit, retrying in {wait_time}s: {e}") | |
await asyncio.sleep(wait_time) | |
else: | |
logger.error(f"DuckDuckGo search failed for query '{query}': {e}") | |
raise e | |
return [] | |
async def try_serpapi(query: str, max_retries: int = 3) -> List[str]: | |
if not os.getenv("SERPAPI_API_KEY"): | |
logger.warning("SERPAPI_API_KEY not set, cannot use SerpAPI fallback") | |
return [] | |
for attempt in range(max_retries): | |
try: | |
logger.info(f"SerpAPI search attempt {attempt + 1} for query: {query}") | |
params = { | |
"q": query, | |
"api_key": os.getenv("SERPAPI_API_KEY"), | |
"num": 5 | |
} | |
search = GoogleSearch(params) | |
results = search.get_dict().get("organic_results", []) | |
return [result.get("snippet", "") for result in results if "snippet" in result] | |
except Exception as e: | |
if attempt < max_retries - 1: | |
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s | |
logger.warning(f"SerpAPI search failed, retrying in {wait_time}s: {e}") | |
await asyncio.sleep(wait_time) | |
else: | |
logger.error(f"SerpAPI search failed for query '{query}': {e}") | |
return [] | |
try: | |
# Try DuckDuckGo with retries | |
logger.info(f"Executing DuckDuckGo search for query: {query}") | |
results = await try_duckduckgo(query) | |
# Fall back to SerpAPI if DuckDuckGo fails | |
if not results: | |
logger.info(f"DuckDuckGo returned no results, falling back to SerpAPI for query: {query}") | |
results = await try_serpapi(query) | |
# Rank results if embedder is provided | |
if embedder and results: | |
from sentence_transformers import util | |
query_embedding = embedder.encode(original_query, convert_to_tensor=True) | |
result_embeddings = embedder.encode(results, convert_to_tensor=True) | |
scores = util.cos_sim(query_embedding, result_embeddings)[0] | |
ranked_results = [results[i] for i in scores.argsort(descending=True)] | |
return ranked_results[:3] | |
return results[:3] if results else [] | |
except Exception as e: | |
logger.error(f"Search failed for query '{query}': {e}") | |
return [] | |
duckduckgo_search_tool = StructuredTool.from_function( | |
func=duckduckgo_search_func, | |
name="duckduckgo_search_tool", | |
args_schema=DuckDuckGoSearchInput, | |
coroutine=duckduckgo_search_func | |
) |