Spaces:
Starting
Starting
File size: 4,324 Bytes
4701375 751d628 4701375 751d628 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
import logging
import os
import asyncio
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from typing import Optional, List
from duckduckgo_search import DDGS
from serpapi import GoogleSearch
logger = logging.getLogger(__name__)
class DuckDuckGoSearchInput(BaseModel):
query: str = Field(description="Search query")
original_query: str = Field(description="Original query for context")
embedder: Optional[object] = Field(description="SentenceTransformer embedder", default=None)
async def duckduckgo_search_func(query: str, original_query: str, embedder: Optional[object] = None) -> List[str]:
"""
Perform a DuckDuckGo search with retries and fall back to SerpAPI if needed.
Args:
query (str): Search query.
original_query (str): Original query for context.
embedder (Optional[object]): SentenceTransformer for result filtering.
Returns:
List[str]: List of search result snippets.
"""
async def try_duckduckgo(query: str, max_retries: int = 3) -> List[str]:
for attempt in range(max_retries):
try:
logger.info(f"DuckDuckGo search attempt {attempt + 1} for query: {query}")
with DDGS() as ddgs:
results = [r['body'] for r in ddgs.text(query, max_results=5)]
return results
except Exception as e:
if "Ratelimit" in str(e) and attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
logger.warning(f"DuckDuckGo rate limit hit, retrying in {wait_time}s: {e}")
await asyncio.sleep(wait_time)
else:
logger.error(f"DuckDuckGo search failed for query '{query}': {e}")
raise e
return []
async def try_serpapi(query: str, max_retries: int = 3) -> List[str]:
if not os.getenv("SERPAPI_API_KEY"):
logger.warning("SERPAPI_API_KEY not set, cannot use SerpAPI fallback")
return []
for attempt in range(max_retries):
try:
logger.info(f"SerpAPI search attempt {attempt + 1} for query: {query}")
params = {
"q": query,
"api_key": os.getenv("SERPAPI_API_KEY"),
"num": 5
}
search = GoogleSearch(params)
results = search.get_dict().get("organic_results", [])
return [result.get("snippet", "") for result in results if "snippet" in result]
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
logger.warning(f"SerpAPI search failed, retrying in {wait_time}s: {e}")
await asyncio.sleep(wait_time)
else:
logger.error(f"SerpAPI search failed for query '{query}': {e}")
return []
try:
# Try DuckDuckGo with retries
logger.info(f"Executing DuckDuckGo search for query: {query}")
results = await try_duckduckgo(query)
# Fall back to SerpAPI if DuckDuckGo fails
if not results:
logger.info(f"DuckDuckGo returned no results, falling back to SerpAPI for query: {query}")
results = await try_serpapi(query)
# Rank results if embedder is provided
if embedder and results:
from sentence_transformers import util
query_embedding = embedder.encode(original_query, convert_to_tensor=True)
result_embeddings = embedder.encode(results, convert_to_tensor=True)
scores = util.cos_sim(query_embedding, result_embeddings)[0]
ranked_results = [results[i] for i in scores.argsort(descending=True)]
return ranked_results[:3]
return results[:3] if results else []
except Exception as e:
logger.error(f"Search failed for query '{query}': {e}")
return []
duckduckgo_search_tool = StructuredTool.from_function(
func=duckduckgo_search_func,
name="duckduckgo_search_tool",
args_schema=DuckDuckGoSearchInput,
coroutine=duckduckgo_search_func
) |