File size: 4,324 Bytes
4701375
751d628
 
 
 
 
 
 
4701375
 
 
751d628
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import logging
import os
import asyncio
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
from typing import Optional, List
from duckduckgo_search import DDGS
from serpapi import GoogleSearch

logger = logging.getLogger(__name__)

class DuckDuckGoSearchInput(BaseModel):
    query: str = Field(description="Search query")
    original_query: str = Field(description="Original query for context")
    embedder: Optional[object] = Field(description="SentenceTransformer embedder", default=None)

async def duckduckgo_search_func(query: str, original_query: str, embedder: Optional[object] = None) -> List[str]:
    """
    Perform a DuckDuckGo search with retries and fall back to SerpAPI if needed.
    
    Args:
        query (str): Search query.
        original_query (str): Original query for context.
        embedder (Optional[object]): SentenceTransformer for result filtering.
    
    Returns:
        List[str]: List of search result snippets.
    """
    async def try_duckduckgo(query: str, max_retries: int = 3) -> List[str]:
        for attempt in range(max_retries):
            try:
                logger.info(f"DuckDuckGo search attempt {attempt + 1} for query: {query}")
                with DDGS() as ddgs:
                    results = [r['body'] for r in ddgs.text(query, max_results=5)]
                return results
            except Exception as e:
                if "Ratelimit" in str(e) and attempt < max_retries - 1:
                    wait_time = 2 ** attempt  # Exponential backoff: 1s, 2s, 4s
                    logger.warning(f"DuckDuckGo rate limit hit, retrying in {wait_time}s: {e}")
                    await asyncio.sleep(wait_time)
                else:
                    logger.error(f"DuckDuckGo search failed for query '{query}': {e}")
                    raise e
        return []

    async def try_serpapi(query: str, max_retries: int = 3) -> List[str]:
        if not os.getenv("SERPAPI_API_KEY"):
            logger.warning("SERPAPI_API_KEY not set, cannot use SerpAPI fallback")
            return []
        for attempt in range(max_retries):
            try:
                logger.info(f"SerpAPI search attempt {attempt + 1} for query: {query}")
                params = {
                    "q": query,
                    "api_key": os.getenv("SERPAPI_API_KEY"),
                    "num": 5
                }
                search = GoogleSearch(params)
                results = search.get_dict().get("organic_results", [])
                return [result.get("snippet", "") for result in results if "snippet" in result]
            except Exception as e:
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt  # Exponential backoff: 1s, 2s, 4s
                    logger.warning(f"SerpAPI search failed, retrying in {wait_time}s: {e}")
                    await asyncio.sleep(wait_time)
                else:
                    logger.error(f"SerpAPI search failed for query '{query}': {e}")
                    return []

    try:
        # Try DuckDuckGo with retries
        logger.info(f"Executing DuckDuckGo search for query: {query}")
        results = await try_duckduckgo(query)
        
        # Fall back to SerpAPI if DuckDuckGo fails
        if not results:
            logger.info(f"DuckDuckGo returned no results, falling back to SerpAPI for query: {query}")
            results = await try_serpapi(query)
        
        # Rank results if embedder is provided
        if embedder and results:
            from sentence_transformers import util
            query_embedding = embedder.encode(original_query, convert_to_tensor=True)
            result_embeddings = embedder.encode(results, convert_to_tensor=True)
            scores = util.cos_sim(query_embedding, result_embeddings)[0]
            ranked_results = [results[i] for i in scores.argsort(descending=True)]
            return ranked_results[:3]
        
        return results[:3] if results else []
    except Exception as e:
        logger.error(f"Search failed for query '{query}': {e}")
        return []

duckduckgo_search_tool = StructuredTool.from_function(
    func=duckduckgo_search_func,
    name="duckduckgo_search_tool",
    args_schema=DuckDuckGoSearchInput,
    coroutine=duckduckgo_search_func
)