consilium_mcp / research_tools /research_agent.py
azettl's picture
remove google scholar
6d0f82e
"""
Enhanced Research Agent with Multi-Source Integration
"""
from typing import Dict, List, Any, Optional, Tuple
import re
from collections import Counter
from .base_tool import BaseTool
from .web_search import WebSearchTool
from .wikipedia_search import WikipediaSearchTool
from .arxiv_search import ArxivSearchTool
from .github_search import GitHubSearchTool
from .sec_search import SECSearchTool
class EnhancedResearchAgent:
"""Enhanced research agent with multi-source synthesis and smart routing"""
def __init__(self):
# Initialize all research tools
self.tools = {
'web': WebSearchTool(),
'wikipedia': WikipediaSearchTool(),
'arxiv': ArxivSearchTool(),
'github': GitHubSearchTool(),
'sec': SECSearchTool()
}
# Tool availability status
self.tool_status = {name: True for name in self.tools.keys()}
def search(self, query: str, research_depth: str = "standard") -> str:
"""Main search method with intelligent routing"""
if research_depth == "deep":
return self._deep_multi_source_search(query)
else:
return self._standard_search(query)
def search_wikipedia(self, topic: str) -> str:
"""Wikipedia search method for backward compatibility"""
return self.tools['wikipedia'].search(topic)
def _standard_search(self, query: str) -> str:
"""Standard single-source search with smart routing"""
# Determine best tool for the query
best_tool = self._route_query_to_tool(query)
try:
return self.tools[best_tool].search(query)
except Exception as e:
# Fallback to web search
if best_tool != 'web':
try:
return self.tools['web'].search(query)
except Exception as e2:
return f"**Research for: {query}**\n\nResearch temporarily unavailable: {str(e2)[:100]}..."
else:
return f"**Research for: {query}**\n\nResearch temporarily unavailable: {str(e)[:100]}..."
def _deep_multi_source_search(self, query: str) -> str:
"""Deep research using multiple sources with synthesis"""
results = {}
quality_scores = {}
# Determine which sources to use based on query type
relevant_tools = self._get_relevant_tools(query)
# Collect results from multiple sources
for tool_name in relevant_tools:
try:
result = self.tools[tool_name].search(query)
if result and len(result.strip()) > 50: # Ensure meaningful result
results[tool_name] = result
quality_scores[tool_name] = self.tools[tool_name].score_research_quality(result, tool_name)
except Exception as e:
print(f"Error with {tool_name}: {e}")
continue
if not results:
return f"**Deep Research for: {query}**\n\nNo sources were able to provide results. Please try a different query."
# Synthesize results
return self._synthesize_multi_source_results(query, results, quality_scores)
def _route_query_to_tool(self, query: str) -> str:
"""Intelligently route query to the most appropriate tool"""
query_lower = query.lower()
# Priority routing based on query characteristics
for tool_name, tool in self.tools.items():
if tool.should_use_for_query(query):
# Return first matching tool based on priority order
priority_order = ['arxiv', 'sec', 'github', 'wikipedia', 'web']
if tool_name in priority_order[:3]: # High-priority specialized tools
return tool_name
# Secondary check for explicit indicators
if any(indicator in query_lower for indicator in ['company', 'stock', 'financial', 'revenue']):
return 'sec'
elif any(indicator in query_lower for indicator in ['research', 'study', 'academic', 'paper']):
return 'arxiv'
elif any(indicator in query_lower for indicator in ['technology', 'framework', 'programming']):
return 'github'
elif any(indicator in query_lower for indicator in ['what is', 'definition', 'history']):
return 'wikipedia'
else:
return 'web' # Default fallback
def _get_relevant_tools(self, query: str) -> List[str]:
"""Get list of relevant tools for deep search"""
relevant_tools = []
# Always include web search for current information
relevant_tools.append('web')
# Add specialized tools based on query
for tool_name, tool in self.tools.items():
if tool_name != 'web' and tool.should_use_for_query(query):
relevant_tools.append(tool_name)
# Ensure we don't overwhelm with too many sources
if len(relevant_tools) > 4:
# Prioritize specialized tools
priority_order = ['arxiv', 'sec', 'github', 'wikipedia', 'web']
relevant_tools = [tool for tool in priority_order if tool in relevant_tools][:4]
return relevant_tools
def _synthesize_multi_source_results(self, query: str, results: Dict[str, str], quality_scores: Dict[str, Dict]) -> str:
"""Synthesize results from multiple research sources"""
synthesis = f"**Comprehensive Research Analysis: {query}**\n\n"
# Add source summary
synthesis += f"**Research Sources Used:** {', '.join(results.keys()).replace('_', ' ').title()}\n\n"
# Find key themes and agreements/disagreements
key_findings = self._extract_key_findings(results)
synthesis += self._format_key_findings(key_findings)
# Add individual source results (condensed)
synthesis += "**Detailed Source Results:**\n\n"
# Sort sources by quality score
sorted_sources = sorted(quality_scores.items(), key=lambda x: x[1]['overall'], reverse=True)
for source_name, _ in sorted_sources:
if source_name in results:
source_result = results[source_name]
quality = quality_scores[source_name]
# Condense long results
if len(source_result) > 800:
source_result = source_result[:800] + "...\n[Result truncated for synthesis]"
synthesis += f"**{source_name.replace('_', ' ').title()} (Quality: {quality['overall']:.2f}/1.0):**\n"
synthesis += f"{source_result}\n\n"
# Add research quality assessment
synthesis += self._format_research_quality_assessment(quality_scores)
return synthesis
def _extract_key_findings(self, results: Dict[str, str]) -> Dict[str, List[str]]:
"""Extract key findings and themes from multiple sources"""
findings = {
'agreements': [],
'contradictions': [],
'unique_insights': [],
'data_points': []
}
# Extract key sentences from each source
all_sentences = []
source_sentences = {}
for source, result in results.items():
sentences = self._extract_key_sentences(result)
source_sentences[source] = sentences
all_sentences.extend(sentences)
# Find common themes (simplified approach)
word_counts = Counter()
for sentence in all_sentences:
words = re.findall(r'\b\w{4,}\b', sentence.lower()) # Words 4+ chars
word_counts.update(words)
common_themes = [word for word, count in word_counts.most_common(10) if count > 1]
# Look for numerical data
numbers = re.findall(r'\b\d+(?:\.\d+)?%?\b', ' '.join(all_sentences))
findings['data_points'] = list(set(numbers))[:10] # Top 10 unique numbers
# Simplified agreement detection
if len(source_sentences) > 1:
findings['agreements'] = [f"Multiple sources mention: {theme}" for theme in common_themes[:3]]
return findings
def _extract_key_sentences(self, text: str) -> List[str]:
"""Extract key sentences from research text"""
if not text:
return []
# Split into sentences
sentences = re.split(r'[.!?]+', text)
# Filter for key sentences (containing important indicators)
key_indicators = [
'research shows', 'study found', 'according to', 'data indicates',
'results suggest', 'analysis reveals', 'evidence shows', 'reported that',
'concluded that', 'demonstrated that', 'increased', 'decreased',
'growth', 'decline', 'significant', 'important', 'critical'
]
key_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if (len(sentence) > 30 and
any(indicator in sentence.lower() for indicator in key_indicators)):
key_sentences.append(sentence)
return key_sentences[:5] # Top 5 key sentences
def _format_key_findings(self, findings: Dict[str, List[str]]) -> str:
"""Format key findings summary"""
result = "**Key Research Synthesis:**\n\n"
if findings['agreements']:
result += "**Common Themes:**\n"
for agreement in findings['agreements']:
result += f"• {agreement}\n"
result += "\n"
if findings['data_points']:
result += "**Key Data Points:**\n"
for data in findings['data_points'][:5]:
result += f"• {data}\n"
result += "\n"
if findings['unique_insights']:
result += "**Unique Insights:**\n"
for insight in findings['unique_insights']:
result += f"• {insight}\n"
result += "\n"
return result
def _format_research_quality_assessment(self, quality_scores: Dict[str, Dict]) -> str:
"""Format overall research quality assessment"""
if not quality_scores:
return ""
result = "**Research Quality Assessment:**\n\n"
# Calculate average quality metrics
avg_overall = sum(scores['overall'] for scores in quality_scores.values()) / len(quality_scores)
avg_authority = sum(scores['authority'] for scores in quality_scores.values()) / len(quality_scores)
avg_recency = sum(scores['recency'] for scores in quality_scores.values()) / len(quality_scores)
avg_specificity = sum(scores['specificity'] for scores in quality_scores.values()) / len(quality_scores)
result += f"• Overall Research Quality: {avg_overall:.2f}/1.0\n"
result += f"• Source Authority: {avg_authority:.2f}/1.0\n"
result += f"• Information Recency: {avg_recency:.2f}/1.0\n"
result += f"• Data Specificity: {avg_specificity:.2f}/1.0\n"
result += f"• Sources Consulted: {len(quality_scores)}\n\n"
# Quality interpretation
if avg_overall >= 0.8:
quality_level = "Excellent"
elif avg_overall >= 0.6:
quality_level = "Good"
elif avg_overall >= 0.4:
quality_level = "Moderate"
else:
quality_level = "Limited"
result += f"**Research Reliability: {quality_level}**\n"
if avg_authority >= 0.8:
result += "• High-authority sources with strong credibility\n"
if avg_recency >= 0.7:
result += "• Current and up-to-date information\n"
if avg_specificity >= 0.6:
result += "• Specific data points and quantitative evidence\n"
return result
def generate_research_queries(self, question: str, current_discussion: List[Dict]) -> List[str]:
"""Auto-generate targeted research queries based on discussion gaps"""
# Analyze discussion for gaps
discussion_text = "\n".join([msg.get('text', '') for msg in current_discussion])
# Extract claims that need verification
unsubstantiated_claims = self._find_unsubstantiated_claims(discussion_text)
# Generate specific queries
queries = []
# Add queries for unsubstantiated claims
for claim in unsubstantiated_claims[:3]:
query = self._convert_claim_to_query(claim)
if query:
queries.append(query)
# Add queries for missing quantitative data
if not re.search(r'\d+%', discussion_text):
queries.append(f"{question} statistics data percentages")
# Add current trends query
queries.append(f"{question} 2024 2025 recent developments")
return queries[:3] # Limit to 3 targeted queries
def _find_unsubstantiated_claims(self, discussion_text: str) -> List[str]:
"""Find claims that might need research backing"""
claims = []
# Look for assertion patterns
assertion_patterns = [
r'(?:should|must|will|is|are)\s+[^.]{20,100}',
r'(?:studies show|research indicates|data suggests)\s+[^.]{20,100}',
r'(?:according to|based on)\s+[^.]{20,100}'
]
for pattern in assertion_patterns:
matches = re.findall(pattern, discussion_text, re.IGNORECASE)
claims.extend(matches[:2]) # Limit matches per pattern
return claims
def _convert_claim_to_query(self, claim: str) -> Optional[str]:
"""Convert a claim into a research query"""
if not claim or len(claim) < 10:
return None
# Extract key terms
key_terms = re.findall(r'\b\w{4,}\b', claim.lower())
if len(key_terms) < 2:
return None
# Create query from key terms
query_terms = key_terms[:4] # Use first 4 meaningful terms
return " ".join(query_terms)
def prioritize_research_needs(self, expert_positions: List[Dict], question: str) -> List[str]:
"""Identify and prioritize research that could resolve expert conflicts"""
# Extract expert claims
expert_claims = {}
for position in expert_positions:
speaker = position.get('speaker', 'Unknown')
text = position.get('text', '')
expert_claims[speaker] = self._extract_key_claims(text)
# Find disagreements
disagreements = self._find_expert_disagreements(expert_claims)
# Generate research priorities
priorities = []
for disagreement in disagreements[:3]:
# Create research query to resolve disagreement
query = f"{question} {disagreement['topic']} evidence data"
priorities.append(query)
return priorities
def _extract_key_claims(self, expert_text: str) -> List[str]:
"""Extract key factual claims from expert response"""
if not expert_text:
return []
sentences = expert_text.split('.')
claims = []
for sentence in sentences:
sentence = sentence.strip()
if (len(sentence) > 20 and
any(indicator in sentence.lower() for indicator in [
'should', 'will', 'is', 'are', 'must', 'can', 'would', 'could'
])):
claims.append(sentence)
return claims[:3] # Top 3 claims
def _find_expert_disagreements(self, expert_claims: Dict[str, List[str]]) -> List[Dict]:
"""Identify areas where experts disagree"""
disagreements = []
experts = list(expert_claims.keys())
for i, expert1 in enumerate(experts):
for expert2 in experts[i+1:]:
claims1 = expert_claims[expert1]
claims2 = expert_claims[expert2]
conflicts = self._find_conflicting_claims(claims1, claims2)
if conflicts:
disagreements.append({
'experts': [expert1, expert2],
'topic': self._extract_conflict_topic(conflicts[0]),
'conflicts': conflicts[:1] # Just the main conflict
})
return disagreements
def _find_conflicting_claims(self, claims1: List[str], claims2: List[str]) -> List[str]:
"""Identify potentially conflicting claims (simplified)"""
conflicts = []
# Simple opposing sentiment detection
opposing_pairs = [
('should', 'should not'), ('will', 'will not'), ('is', 'is not'),
('increase', 'decrease'), ('better', 'worse'), ('yes', 'no'),
('support', 'oppose'), ('benefit', 'harm'), ('effective', 'ineffective')
]
for claim1 in claims1:
for claim2 in claims2:
for pos, neg in opposing_pairs:
if pos in claim1.lower() and neg in claim2.lower():
conflicts.append(f"{claim1} vs {claim2}")
elif neg in claim1.lower() and pos in claim2.lower():
conflicts.append(f"{claim1} vs {claim2}")
return conflicts
def _extract_conflict_topic(self, conflict: str) -> str:
"""Extract the main topic from a conflict description"""
# Simple extraction of key terms
words = re.findall(r'\b\w{4,}\b', conflict.lower())
# Filter out common words
stopwords = {'should', 'will', 'would', 'could', 'this', 'that', 'with', 'from', 'they', 'them'}
topic_words = [word for word in words if word not in stopwords]
return " ".join(topic_words[:3])
def suggest_research_follow_ups(self, discussion_log: List[Dict], question: str) -> List[str]:
"""Suggest additional research questions based on discussion patterns"""
# Get recent discussion
latest_messages = discussion_log[-6:] if len(discussion_log) > 6 else discussion_log
recent_text = "\n".join([msg.get('content', '') for msg in latest_messages])
follow_ups = []
# Look for unverified statistics
if re.search(r'\d+%', recent_text):
follow_ups.append(f"{question} statistics verification current data")
# Look for trend mentions
trend_keywords = ['trend', 'growing', 'increasing', 'declining', 'emerging']
if any(keyword in recent_text.lower() for keyword in trend_keywords):
follow_ups.append(f"{question} current trends 2024 2025")
# Look for example mentions
if 'example' in recent_text.lower() or 'case study' in recent_text.lower():
follow_ups.append(f"{question} case studies examples evidence")
return follow_ups[:3]
def get_tool_status(self) -> Dict[str, bool]:
"""Get status of all research tools"""
return {
name: self.tool_status.get(name, True)
for name in self.tools.keys()
}
def test_tool_connections(self) -> Dict[str, str]:
"""Test all research tool connections"""
results = {}
for name, tool in self.tools.items():
try:
# Simple test query
test_result = tool.search("test", max_results=1)
if test_result and len(test_result) > 20:
results[name] = "✅ Working"
self.tool_status[name] = True
else:
results[name] = "⚠️ Limited response"
self.tool_status[name] = False
except Exception as e:
results[name] = f"❌ Error: {str(e)[:50]}..."
self.tool_status[name] = False
return results