|
|
|
|
|
""" |
|
Enhanced Track 3: Gmail AI Agent with Advanced Behaviors |
|
Sophisticated agent decision-making and workflow automation |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
import json |
|
import logging |
|
import os |
|
from typing import List, Dict, Any, Optional, Tuple |
|
from dataclasses import dataclass |
|
import pandas as pd |
|
from datetime import datetime, timedelta |
|
import re |
|
import requests |
|
import time |
|
from collections import Counter |
|
|
|
|
|
from google.oauth2.credentials import Credentials |
|
from google.auth.transport.requests import Request |
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
|
|
|
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
from mcp_client import GmailMCPClientSync |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger("enhanced-gmail-agent") |
|
|
|
|
|
GMAIL_CLIENT_ID = os.getenv("GMAIL_CLIENT_ID") |
|
GMAIL_CLIENT_SECRET = os.getenv("GMAIL_CLIENT_SECRET") |
|
GMAIL_TOKEN_JSON = os.getenv("GMAIL_TOKEN_JSON") |
|
MODAL_API_URL = os.getenv("MODAL_API_URL") |
|
|
|
|
|
logger.info(f"GMAIL_CLIENT_ID available: {GMAIL_CLIENT_ID is not None}") |
|
logger.info(f"GMAIL_CLIENT_SECRET available: {GMAIL_CLIENT_SECRET is not None}") |
|
logger.info(f"GMAIL_TOKEN_JSON available: {GMAIL_TOKEN_JSON is not None}") |
|
logger.info(f"MODAL_API_URL available: {MODAL_API_URL is not None}") |
|
|
|
|
|
if GMAIL_CLIENT_ID and GMAIL_CLIENT_SECRET: |
|
credentials_data = { |
|
"installed": { |
|
"client_id": GMAIL_CLIENT_ID, |
|
"client_secret": GMAIL_CLIENT_SECRET, |
|
"auth_uri": "https://accounts.google.com/o/oauth2/auth", |
|
"token_uri": "https://oauth2.googleapis.com/token", |
|
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", |
|
"redirect_uris": ["urn:ietf:wg:oauth:2.0:oob", "http://localhost"] |
|
} |
|
} |
|
|
|
|
|
with open("credentials.json", "w") as f: |
|
json.dump(credentials_data, f) |
|
logger.info("Created credentials.json from environment variables") |
|
|
|
|
|
if GMAIL_TOKEN_JSON: |
|
with open("token.json", "w") as f: |
|
f.write(GMAIL_TOKEN_JSON) |
|
logger.info("Created token.json from environment variable") |
|
|
|
class QwenClient: |
|
"""Client for the Modal-hosted Qwen model - Updated to properly handle thinking model""" |
|
|
|
def __init__(self, api_url: str = None): |
|
self.api_url = api_url |
|
self.first_request = True |
|
logger.info(f"Initializing QwenClient with API URL: {self.api_url}") |
|
|
|
def _strip_thinking_tags(self, text: str) -> str: |
|
"""Strip <think> sections from the response to get clean output""" |
|
import re |
|
|
|
return re.sub(r'<think>.*?(?:</think>|$)', '', text, flags=re.DOTALL).strip() |
|
|
|
def generate_content(self, prompt: str, max_tokens: int = 2048, |
|
temperature: float = 0.7, strip_thinking: bool = True, |
|
retries: int = 3, retry_delay: float = 2.0, |
|
timeout: float = 180.0) -> Any: |
|
"""Generate content using the Modal-hosted Qwen model with proper thinking model support""" |
|
|
|
|
|
if len(prompt) > 4000: |
|
logger.warning(f"Prompt too long ({len(prompt)} chars), truncating to 4000 chars") |
|
prompt = prompt[:4000] + "... [truncated]" |
|
|
|
|
|
if self.first_request: |
|
logger.info("⚠️ First request might take longer due to cold start...") |
|
self.first_request = False |
|
|
|
for attempt in range(retries): |
|
try: |
|
payload = { |
|
"message": prompt, |
|
"max_tokens": max_tokens, |
|
"temperature": temperature, |
|
"top_p": 0.9, |
|
"strip_thinking": strip_thinking |
|
} |
|
|
|
logger.info(f"Sending request to Qwen API (attempt {attempt+1}/{retries}) with prompt length: {len(prompt)}") |
|
start_time = time.time() |
|
|
|
response = requests.post( |
|
self.api_url, |
|
json=payload, |
|
headers={"Content-Type": "application/json"}, |
|
timeout=timeout |
|
) |
|
|
|
duration = time.time() - start_time |
|
logger.info(f"⏱️ Response received in {duration:.1f} seconds") |
|
|
|
|
|
response.raise_for_status() |
|
|
|
result = response.json() |
|
response_text = result.get("response", "") |
|
|
|
logger.info(f"Received response from Qwen API: {len(response_text)} chars") |
|
|
|
|
|
if response_text and len(response_text.strip()) > 0: |
|
|
|
class QwenResponse: |
|
def __init__(self, text): |
|
self.text = text |
|
|
|
|
|
|
|
|
|
if not strip_thinking and '<think>' in response_text: |
|
|
|
final_text = response_text |
|
else: |
|
|
|
final_text = self._strip_thinking_tags(response_text) |
|
|
|
if len(final_text.strip()) < 10 and '<think>' in response_text: |
|
|
|
parts = response_text.split('</think>') |
|
if len(parts) > 1: |
|
final_text = parts[-1].strip() |
|
else: |
|
|
|
final_text = response_text.split('>')[-1].strip() |
|
|
|
|
|
if 'tokens_used' in result: |
|
logger.info(f"📊 Token usage: {result.get('input_tokens', 'N/A')} input, {result['tokens_used']} output") |
|
|
|
return QwenResponse(final_text) |
|
else: |
|
logger.warning(f"Empty response received from Qwen API (attempt {attempt+1}/{retries})") |
|
if attempt < retries - 1: |
|
logger.info(f"Retrying in {retry_delay} seconds...") |
|
time.sleep(retry_delay) |
|
|
|
except requests.exceptions.Timeout: |
|
logger.error(f"Timeout calling Qwen API (attempt {attempt+1}/{retries}) - request took too long") |
|
if attempt < retries - 1: |
|
next_delay = retry_delay * (attempt + 1) |
|
logger.info(f"Retrying in {next_delay} seconds...") |
|
time.sleep(next_delay) |
|
else: |
|
|
|
class QwenResponse: |
|
def __init__(self, text): |
|
self.text = text |
|
return QwenResponse("I apologize, but I'm having difficulty accessing enhanced AI capabilities right now due to timeout. Please try again later when the service may be more responsive.") |
|
|
|
except requests.RequestException as e: |
|
logger.error(f"Error calling Qwen API (attempt {attempt+1}/{retries}): {str(e)}") |
|
if hasattr(e, 'response') and e.response is not None: |
|
logger.error(f"Response: {e.response.text}") |
|
if attempt < retries - 1: |
|
next_delay = retry_delay * (attempt + 1) |
|
logger.info(f"Retrying in {next_delay} seconds...") |
|
time.sleep(next_delay) |
|
|
|
logger.error("All retry attempts failed for Qwen API request") |
|
|
|
|
|
class QwenResponse: |
|
def __init__(self, text): |
|
self.text = text |
|
return QwenResponse("I apologize, but I can't access my advanced AI capabilities right now. Let me provide a simplified response based on your request. You might want to try again later when the network connection to the AI service is more stable.") |
|
|
|
@dataclass |
|
class AgentMemory: |
|
"""Memory system for the agent to track context and user patterns""" |
|
user_preferences: Dict[str, Any] |
|
conversation_context: List[Dict[str, Any]] |
|
email_patterns: Dict[str, Any] |
|
workflow_history: List[Dict[str, Any]] |
|
|
|
class EnhancedGmailAgent: |
|
"""Enhanced Gmail AI Agent with sophisticated behaviors""" |
|
|
|
def __init__(self, mcp_server_path: str = "gmail_mcp_server.py", modal_api_url: str = None): |
|
self.mcp_client = GmailMCPClientSync(mcp_server_path) |
|
self.memory = AgentMemory( |
|
user_preferences={}, |
|
conversation_context=[], |
|
email_patterns={}, |
|
workflow_history=[] |
|
) |
|
|
|
|
|
self.modal_api_url = modal_api_url or MODAL_API_URL |
|
|
|
try: |
|
logger.info("Initializing Qwen model client from Modal") |
|
if not self.modal_api_url: |
|
logger.warning("No Modal API URL provided, Qwen functionality will be limited") |
|
self.model = None |
|
self.model_status = "Not configured" |
|
else: |
|
self.model = QwenClient(api_url=self.modal_api_url) |
|
logger.info("Qwen model client initialized successfully") |
|
self.model_status = "Initialized" |
|
except Exception as e: |
|
logger.error(f"Error initializing Qwen model: {str(e)}") |
|
self.model = None |
|
self.model_status = "Error during initialization" |
|
|
|
|
|
self.last_analysis = {} |
|
self.active_workflows = [] |
|
|
|
def intelligent_email_triage(self, max_emails: int = 20) -> Dict[str, Any]: |
|
""" |
|
Sophisticated email triage with AI-powered categorization and priority scoring - SINGLE API CALL VERSION |
|
""" |
|
logger.info("🧠 Starting intelligent email triage...") |
|
|
|
try: |
|
|
|
emails = self.mcp_client.fetch_emails(query="newer_than:3d", max_results=max_emails) |
|
|
|
if not emails or 'emails' not in emails: |
|
return {"error": "No emails to analyze"} |
|
|
|
email_list = emails['emails'] |
|
logger.info(f"Fetched {len(email_list)} emails for analysis") |
|
|
|
|
|
triage_results = { |
|
"high_priority": [], |
|
"meetings_and_calls": [], |
|
"action_required": [], |
|
"newsletters_and_updates": [], |
|
"personal": [], |
|
"low_priority": [], |
|
"analysis_summary": "", |
|
"recommendations": [] |
|
} |
|
|
|
if self.model and len(email_list) > 0: |
|
logger.info(f"Processing ALL {len(email_list)} emails in SINGLE API call") |
|
|
|
|
|
email_summaries = [] |
|
for i, email in enumerate(email_list): |
|
|
|
subject = email.get('subject', 'No subject')[:80] |
|
sender = self._clean_sender(email.get('sender', 'Unknown'))[:50] |
|
snippet = email.get('snippet', 'No preview')[:60] |
|
is_unread = email.get('is_unread', False) |
|
|
|
email_summaries.append(f"Email {i+1}: Subject=\"{subject}\" | From={sender} | Preview=\"{snippet}\" | Unread={is_unread}") |
|
|
|
|
|
master_prompt = f"""You are an expert email analyst and productivity coach. Analyze ALL {len(email_list)} emails below and provide detailed, actionable insights. |
|
|
|
EMAILS TO ANALYZE: |
|
{chr(10).join(email_summaries)} |
|
|
|
TASK: For each email, determine: |
|
1. Category (select exactly one): |
|
- high_priority: Urgent emails needing immediate attention (deadlines, emergencies) |
|
- meetings_and_calls: Calendar items, meeting invites, calls |
|
- action_required: Emails requiring specific action but not urgent |
|
- newsletters_and_updates: Marketing, newsletters, product updates |
|
- personal: Personal communications |
|
- low_priority: Everything else with minimal importance |
|
|
|
2. Priority score: 0.0 to 1.0 (0.0 = lowest, 1.0 = highest priority) |
|
- 0.9-1.0: Critical/urgent - requires immediate attention |
|
- 0.7-0.8: Important - handle today |
|
- 0.5-0.6: Moderate - handle within 48 hours |
|
- 0.3-0.4: Low - handle when convenient |
|
- 0.1-0.2: Very low - can be archived or ignored |
|
|
|
3. Detailed reasoning: Explain WHY you categorized this way (deadline mentions, sender importance, action words, etc.) |
|
|
|
RESPONSE FORMAT - Respond with ONLY this JSON array (no extra text): |
|
[ |
|
{{ |
|
"email_num": 1, |
|
"category": "category_name", |
|
"priority_score": 0.8, |
|
"reasoning": "Detailed reason with specific insights from email content" |
|
}}, |
|
{{ |
|
"email_num": 2, |
|
"category": "category_name", |
|
"priority_score": 0.6, |
|
"reasoning": "Detailed reason with specific insights from email content" |
|
}}, |
|
...continue for all {len(email_list)} emails... |
|
] |
|
|
|
IMPORTANT ANALYSIS GUIDELINES: |
|
- Look for urgency indicators: words like "urgent", "ASAP", "deadline", "today", "overdue" |
|
- Consider sender importance: work contacts vs marketing emails |
|
- Identify action verbs: "confirm", "review", "approve", "respond", "complete" |
|
- Check for meeting details: times, dates, calendar invites |
|
- Detect personal communication markers: friendly tone, personal questions |
|
- Evaluate if the email requires a response or action |
|
- Consider unread status as potentially more important |
|
|
|
Respond with ONLY the JSON array - no introduction, explanation or additional text. |
|
""" |
|
|
|
try: |
|
|
|
logger.info(f"Sending SINGLE request to Qwen API for ALL {len(email_list)} emails") |
|
response = self.model.generate_content( |
|
master_prompt, |
|
max_tokens=2048, |
|
temperature=0.2, |
|
strip_thinking=True, |
|
timeout=240.0 |
|
) |
|
|
|
if response and hasattr(response, 'text') and response.text: |
|
response_text = response.text.strip() |
|
logger.info(f"Received comprehensive response: {len(response_text)} chars") |
|
|
|
|
|
json_start = response_text.find('[') |
|
json_end = response_text.rfind(']') + 1 |
|
|
|
if json_start >= 0 and json_end > json_start: |
|
try: |
|
json_text = response_text[json_start:json_end] |
|
results = json.loads(json_text) |
|
|
|
logger.info(f"Successfully parsed JSON with {len(results)} email analyses") |
|
|
|
|
|
valid_categories = ['high_priority', 'meetings_and_calls', 'action_required', |
|
'newsletters_and_updates', 'personal', 'low_priority'] |
|
|
|
for result in results: |
|
try: |
|
email_num = int(result.get('email_num', 1)) - 1 |
|
if 0 <= email_num < len(email_list): |
|
email = email_list[email_num] |
|
|
|
|
|
category = result.get('category', '').lower() |
|
if category not in valid_categories: |
|
category = 'newsletters_and_updates' |
|
|
|
|
|
try: |
|
priority_score = float(result.get('priority_score', 0.5)) |
|
if not (0.0 <= priority_score <= 1.0): |
|
priority_score = 0.5 |
|
except: |
|
priority_score = 0.5 |
|
|
|
reasoning = result.get('reasoning', 'AI analysis completed') |
|
|
|
|
|
email_analysis = { |
|
**email, |
|
"priority_score": priority_score, |
|
"category": category, |
|
"ai_reasoning": reasoning, |
|
"suggested_actions": self._suggest_email_actions(email, category, priority_score) |
|
} |
|
|
|
triage_results[category].append(email_analysis) |
|
logger.info(f"✅ Email {email_num+1}: '{email.get('subject', '')[:40]}...' → {category} (score: {priority_score})") |
|
|
|
except Exception as e: |
|
logger.warning(f"Error processing email result {result}: {str(e)}") |
|
|
|
if 0 <= email_num < len(email_list): |
|
self._apply_fallback_categorization(email_list[email_num], triage_results) |
|
|
|
|
|
processed_count = sum(len(emails) for category, emails in triage_results.items() |
|
if category not in ['analysis_summary', 'recommendations']) |
|
|
|
if processed_count < len(email_list): |
|
logger.warning(f"Only processed {processed_count}/{len(email_list)} emails, using fallback for remaining") |
|
|
|
for i, email in enumerate(email_list): |
|
|
|
email_found = False |
|
for category in ['high_priority', 'meetings_and_calls', 'action_required', |
|
'newsletters_and_updates', 'personal', 'low_priority']: |
|
if any(e.get('id') == email.get('id') for e in triage_results[category]): |
|
email_found = True |
|
break |
|
|
|
if not email_found: |
|
logger.info(f"Adding fallback categorization for email {i+1}") |
|
self._apply_fallback_categorization(email, triage_results) |
|
|
|
logger.info(f"🎉 Successfully analyzed {len(email_list)} emails with AI in single API call!") |
|
|
|
except json.JSONDecodeError as e: |
|
logger.warning(f"Failed to parse JSON from AI response: {e}") |
|
logger.warning(f"Response was: {response_text[:500]}...") |
|
|
|
for email in email_list: |
|
self._apply_fallback_categorization(email, triage_results) |
|
else: |
|
logger.warning("Could not find valid JSON array in AI response") |
|
logger.warning(f"Response was: {response_text[:500]}...") |
|
|
|
for email in email_list: |
|
self._apply_fallback_categorization(email, triage_results) |
|
else: |
|
logger.warning("Empty or invalid response from AI") |
|
|
|
for email in email_list: |
|
self._apply_fallback_categorization(email, triage_results) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in AI analysis: {str(e)}") |
|
|
|
for email in email_list: |
|
self._apply_fallback_categorization(email, triage_results) |
|
else: |
|
|
|
logger.info("No AI model available, using fallback categorization for all emails") |
|
for email in email_list: |
|
self._apply_fallback_categorization(email, triage_results) |
|
|
|
|
|
triage_results["analysis_summary"] = self._generate_triage_summary(triage_results) |
|
triage_results["recommendations"] = self._generate_workflow_recommendations(triage_results) |
|
|
|
|
|
self._update_email_patterns(email_list) |
|
|
|
|
|
logger.info("🎨 Formatting triage results for display") |
|
formatted_output = self._display_triage_results(triage_results) |
|
|
|
|
|
triage_results["formatted_display"] = formatted_output |
|
|
|
return triage_results |
|
|
|
except Exception as e: |
|
logger.error(f"Error in intelligent triage: {e}") |
|
return {"error": str(e)} |
|
|
|
def _apply_fallback_categorization(self, email: Dict, triage_results: Dict): |
|
"""Apply fallback categorization and add to triage results""" |
|
category, priority_score, reasoning = self._fallback_categorization(email) |
|
|
|
email_analysis = { |
|
**email, |
|
"priority_score": priority_score, |
|
"category": category, |
|
"ai_reasoning": reasoning, |
|
"suggested_actions": self._suggest_email_actions(email, category, priority_score) |
|
} |
|
|
|
triage_results[category].append(email_analysis) |
|
|
|
def _analyze_email_batch_with_ai(self, emails: List[Dict]) -> List[Tuple[str, float, str]]: |
|
"""Use AI to analyze a batch of emails at once""" |
|
if not self.model or not emails: |
|
return [self._fallback_categorization(email) for email in emails] |
|
|
|
try: |
|
|
|
|
|
email_prompts = [] |
|
for i, email in enumerate(emails): |
|
|
|
snippet = email.get('snippet', 'No preview') |
|
snippet = snippet[:50] if snippet else 'No preview' |
|
|
|
|
|
email_prompts.append(f"""Email #{i+1}: Subject: "{email.get('subject', 'No subject')}" | From: {email.get('sender', 'Unknown')} | Preview: "{snippet}" | Unread: {email.get('is_unread', False)}""") |
|
|
|
batch_prompt = f""" |
|
You are an email categorization expert. Analyze each email below and categorize them. |
|
Respond ONLY with a JSON array containing one object per email. |
|
|
|
Emails to analyze: |
|
{chr(10).join(email_prompts)} |
|
|
|
Categories: high_priority, meetings_and_calls, action_required, newsletters_and_updates, personal, low_priority |
|
|
|
Response format (JSON array only): |
|
[ |
|
{{"category": "category_name", "priority_score": 0.1_to_1.0, "reasoning": "brief reason"}} |
|
] |
|
""" |
|
|
|
|
|
response = self.model.generate_content( |
|
batch_prompt, |
|
max_tokens=512, |
|
temperature=0.2, |
|
timeout=120.0 |
|
) |
|
|
|
|
|
if not response or not hasattr(response, 'text') or not response.text: |
|
logger.warning("Empty response from Qwen model for batch analysis") |
|
return [self._fallback_categorization(email) for email in emails] |
|
|
|
response_text = response.text.strip() |
|
logger.debug(f"Qwen batch response: {response_text}") |
|
|
|
|
|
json_start = response_text.find('[') |
|
json_end = response_text.rfind(']') + 1 |
|
|
|
if json_start == -1 or json_end == 0: |
|
logger.warning("No valid JSON array found in batch response") |
|
return [self._fallback_categorization(email) for email in emails] |
|
|
|
json_text = response_text[json_start:json_end] |
|
|
|
try: |
|
|
|
results = json.loads(json_text) |
|
|
|
if not isinstance(results, list) or len(results) == 0: |
|
logger.warning("JSON response is not a valid array") |
|
return [self._fallback_categorization(email) for email in emails] |
|
|
|
valid_categories = [ |
|
'high_priority', 'meetings_and_calls', 'action_required', |
|
'newsletters_and_updates', 'personal', 'low_priority' |
|
] |
|
|
|
|
|
processed_results = [] |
|
for i, result in enumerate(results): |
|
if i >= len(emails): |
|
break |
|
|
|
|
|
if not all(key in result for key in ['category', 'priority_score', 'reasoning']): |
|
logger.warning(f"Missing required fields in JSON for email #{i+1}") |
|
processed_results.append(self._fallback_categorization(emails[i])) |
|
continue |
|
|
|
category = result['category'] |
|
if category not in valid_categories: |
|
logger.warning(f"Invalid category '{category}' returned for email #{i+1}") |
|
processed_results.append(self._fallback_categorization(emails[i])) |
|
continue |
|
|
|
|
|
try: |
|
priority_score = float(result['priority_score']) |
|
if not 0.0 <= priority_score <= 1.0: |
|
priority_score = 0.5 |
|
except (ValueError, TypeError): |
|
priority_score = 0.5 |
|
|
|
reasoning = str(result.get('reasoning', 'AI analysis completed')) |
|
|
|
logger.info(f"AI categorized email '{emails[i].get('subject', '')[:50]}...' as {category} (score: {priority_score})") |
|
processed_results.append((category, priority_score, reasoning)) |
|
|
|
|
|
while len(processed_results) < len(emails): |
|
missing_idx = len(processed_results) |
|
processed_results.append(self._fallback_categorization(emails[missing_idx])) |
|
|
|
return processed_results |
|
|
|
except json.JSONDecodeError as e: |
|
logger.warning(f"JSON parsing failed for batch: {e}. Response was: {json_text}") |
|
return [self._fallback_categorization(email) for email in emails] |
|
|
|
except Exception as e: |
|
logger.warning(f"Batch AI analysis failed with error: {e}") |
|
return [self._fallback_categorization(email) for email in emails] |
|
|
|
def _analyze_email_with_ai(self, email: Dict) -> Tuple[str, float, str]: |
|
"""Use AI to analyze email and determine category, priority, and reasoning""" |
|
if not self.model: |
|
logger.info("No Qwen model available, using rule-based categorization") |
|
return self._fallback_categorization(email) |
|
|
|
try: |
|
|
|
prompt = f""" |
|
You are an email categorization expert. Your task is to analyze the email details below and categorize it. |
|
You MUST respond with ONLY a valid JSON object in the exact format requested at the end. |
|
|
|
Email Details: |
|
- Subject: {email.get('subject', 'No subject')} |
|
- From: {email.get('sender', 'Unknown sender')} |
|
- Content Preview: {email.get('snippet', 'No preview')} |
|
- Is Unread: {email.get('is_unread', False)} |
|
- Date: {email.get('date', 'Unknown date')} |
|
|
|
Available Categories (choose exactly one): |
|
1. high_priority: For urgent emails that need immediate attention |
|
2. meetings_and_calls: For meeting invites, call schedules, or appointment-related emails |
|
3. action_required: For emails that require a specific action or response, but aren't urgent |
|
4. newsletters_and_updates: For subscription emails, product updates, marketing content |
|
5. personal: For emails that are personal in nature but not urgent |
|
6. low_priority: For emails that can be handled later or are low importance |
|
|
|
Your response MUST be ONLY a JSON object in this exact format: |
|
{{"category": "one_of_the_categories_above", "priority_score": 0.1_to_1.0, "reasoning": "Brief explanation of your categorization"}} |
|
|
|
Examples of valid responses: |
|
{{"category": "high_priority", "priority_score": 0.9, "reasoning": "Contains urgent deadline requiring immediate action"}} |
|
{{"category": "newsletters_and_updates", "priority_score": 0.3, "reasoning": "Marketing newsletter with no action required"}} |
|
""" |
|
|
|
|
|
response = self.model.generate_content( |
|
prompt, |
|
max_tokens=512, |
|
temperature=0.3, |
|
retries=3 |
|
) |
|
|
|
|
|
if not response or not hasattr(response, 'text') or not response.text: |
|
logger.warning("Empty response from Qwen model after retries") |
|
return self._fallback_categorization(email) |
|
|
|
response_text = response.text.strip() |
|
logger.debug(f"Qwen response: {response_text}") |
|
|
|
|
|
json_start = response_text.find('{') |
|
json_end = response_text.rfind('}') + 1 |
|
|
|
if json_start == -1 or json_end == 0: |
|
logger.warning("No valid JSON found in response") |
|
|
|
try: |
|
import re |
|
|
|
category_match = re.search(r'"category"\s*:\s*"([^"]+)"', response_text) |
|
score_match = re.search(r'"priority_score"\s*:\s*([\d.]+)', response_text) |
|
reasoning_match = re.search(r'"reasoning"\s*:\s*"([^"]+)"', response_text) |
|
|
|
if category_match and score_match: |
|
category = category_match.group(1) |
|
priority_score = float(score_match.group(1)) |
|
reasoning = reasoning_match.group(1) if reasoning_match else "AI analysis" |
|
|
|
valid_categories = [ |
|
'high_priority', 'meetings_and_calls', 'action_required', |
|
'newsletters_and_updates', 'personal', 'low_priority' |
|
] |
|
|
|
if category in valid_categories and 0 <= priority_score <= 1: |
|
logger.info(f"Recovered partial JSON data from malformed response") |
|
return category, priority_score, reasoning |
|
|
|
except Exception as e: |
|
logger.warning(f"Could not recover data from malformed response: {e}") |
|
|
|
return self._fallback_categorization(email) |
|
|
|
json_text = response_text[json_start:json_end] |
|
|
|
try: |
|
|
|
result = json.loads(json_text) |
|
|
|
|
|
if not all(key in result for key in ['category', 'priority_score', 'reasoning']): |
|
logger.warning("Missing required fields in JSON response") |
|
return self._fallback_categorization(email) |
|
|
|
|
|
valid_categories = [ |
|
'high_priority', 'meetings_and_calls', 'action_required', |
|
'newsletters_and_updates', 'personal', 'low_priority' |
|
] |
|
|
|
category = result['category'] |
|
if category not in valid_categories: |
|
logger.warning(f"Invalid category '{category}' returned") |
|
return self._fallback_categorization(email) |
|
|
|
|
|
try: |
|
priority_score = float(result['priority_score']) |
|
if not 0.0 <= priority_score <= 1.0: |
|
priority_score = 0.5 |
|
except (ValueError, TypeError): |
|
priority_score = 0.5 |
|
|
|
reasoning = str(result.get('reasoning', 'AI analysis completed')) |
|
|
|
logger.info(f"AI categorized email '{email.get('subject', '')[:50]}...' as {category} (score: {priority_score})") |
|
return category, priority_score, reasoning |
|
|
|
except json.JSONDecodeError as e: |
|
logger.warning(f"JSON parsing failed: {e}. Response was: {json_text}") |
|
return self._fallback_categorization(email) |
|
|
|
except Exception as e: |
|
logger.warning(f"AI analysis failed with error: {e}") |
|
return self._fallback_categorization(email) |
|
|
|
def _fallback_categorization(self, email: Dict) -> Tuple[str, float, str]: |
|
"""Fallback categorization when AI is not available""" |
|
subject = email.get('subject', '').lower() |
|
sender = email.get('sender', '').lower() |
|
is_unread = email.get('is_unread', False) |
|
|
|
|
|
if any(word in subject for word in ['urgent', 'asap', 'critical', 'emergency']): |
|
return "high_priority", 0.9, "Contains urgent keywords" |
|
elif any(word in subject for word in ['meeting', 'call', 'zoom', 'appointment']): |
|
return "meetings_and_calls", 0.7, "Meeting or call related" |
|
elif any(word in subject for word in ['action', 'required', 'todo', 'task']): |
|
return "action_required", 0.6, "Appears to require action" |
|
elif any(word in subject for word in ['newsletter', 'digest', 'update', 'notification']): |
|
return "newsletters_and_updates", 0.3, "Newsletter or update" |
|
elif is_unread: |
|
return "personal", 0.5, "Unread personal email" |
|
else: |
|
return "low_priority", 0.2, "Standard email" |
|
|
|
def _suggest_email_actions(self, email: Dict, category: str, priority_score: float) -> List[str]: |
|
"""Suggest specific actions for each email""" |
|
actions = [] |
|
|
|
if category == "high_priority": |
|
actions.extend(["📞 Call sender immediately", "⚡ Respond within 1 hour", "📌 Add to priority list"]) |
|
elif category == "meetings_and_calls": |
|
actions.extend(["📅 Add to calendar", "✅ Send confirmation", "📋 Prepare agenda"]) |
|
elif category == "action_required": |
|
actions.extend(["✏️ Create task", "⏰ Set reminder", "📝 Draft response"]) |
|
elif category == "newsletters_and_updates": |
|
actions.extend(["📖 Schedule reading time", "🗂️ Archive after reading"]) |
|
elif priority_score > 0.6: |
|
actions.extend(["👀 Review carefully", "📝 Respond today"]) |
|
else: |
|
actions.extend(["📁 Archive if not important", "👁️ Quick scan"]) |
|
|
|
return actions |
|
|
|
def _generate_triage_summary(self, triage_results: Dict) -> str: |
|
"""Generate intelligent summary of triage results based on actual email content""" |
|
total_emails = sum(len(emails) for key, emails in triage_results.items() |
|
if key not in ['analysis_summary', 'recommendations']) |
|
|
|
if total_emails == 0: |
|
return "📭 **No emails to analyze**\n\nYour inbox is empty or no emails match the search criteria." |
|
|
|
|
|
high_priority_count = len(triage_results.get('high_priority', [])) |
|
action_required_count = len(triage_results.get('action_required', [])) |
|
meetings_count = len(triage_results.get('meetings_and_calls', [])) |
|
newsletters_count = len(triage_results.get('newsletters_and_updates', [])) |
|
personal_count = len(triage_results.get('personal', [])) |
|
|
|
|
|
urgent_subjects = [email['subject'][:30] + "..." for email in triage_results.get('high_priority', [])[:2]] |
|
action_subjects = [email['subject'][:30] + "..." for email in triage_results.get('action_required', [])[:2]] |
|
|
|
|
|
all_senders = [] |
|
for category in triage_results: |
|
if category in ['analysis_summary', 'recommendations']: |
|
continue |
|
for email in triage_results[category]: |
|
all_senders.append(self._clean_sender(email.get('sender', 'Unknown'))) |
|
|
|
top_senders = Counter(all_senders).most_common(3) |
|
|
|
|
|
unread_count = sum(1 for cat in triage_results.keys() |
|
if cat not in ['analysis_summary', 'recommendations'] |
|
for email in triage_results[cat] |
|
if email.get('is_unread', False)) |
|
unread_percentage = (unread_count / total_emails * 100) if total_emails > 0 else 0 |
|
|
|
|
|
summary = f""" |
|
🧠 **Intelligent Email Triage Analysis** |
|
|
|
📊 **Overview**: Analyzed {total_emails} emails from the last 3 days |
|
|
|
""" |
|
|
|
|
|
if high_priority_count > 0: |
|
summary += f"🚨 **Immediate Attention**: {high_priority_count} high-priority emails require urgent response\n" |
|
if urgent_subjects: |
|
summary += " • " + "\n • ".join(f'"{subject}"' for subject in urgent_subjects) |
|
if high_priority_count > len(urgent_subjects): |
|
summary += f"\n • ...and {high_priority_count - len(urgent_subjects)} more" |
|
summary += "\n\n" |
|
else: |
|
summary += "✅ **No urgent emails requiring immediate attention**\n\n" |
|
|
|
|
|
if action_required_count > 0: |
|
summary += f"✅ **Action Items**: {action_required_count} emails need specific actions\n" |
|
if action_subjects: |
|
summary += " • " + "\n • ".join(f'"{subject}"' for subject in action_subjects) |
|
if action_required_count > len(action_subjects): |
|
summary += f"\n • ...and {action_required_count - len(action_subjects)} more" |
|
summary += "\n\n" |
|
|
|
|
|
if meetings_count > 0: |
|
summary += f"📅 **Calendar Items**: {meetings_count} meeting-related emails\n\n" |
|
|
|
|
|
summary += f"📦 **Email Distribution**:\n" |
|
if newsletters_count > 0: |
|
summary += f" • Newsletters/Updates: {newsletters_count} ({newsletters_count/total_emails*100:.0f}%)\n" |
|
if personal_count > 0: |
|
summary += f" • Personal: {personal_count} ({personal_count/total_emails*100:.0f}%)\n" |
|
if action_required_count > 0: |
|
summary += f" • Action Required: {action_required_count} ({action_required_count/total_emails*100:.0f}%)\n" |
|
if high_priority_count > 0: |
|
summary += f" • High Priority: {high_priority_count} ({high_priority_count/total_emails*100:.0f}%)\n" |
|
|
|
|
|
summary += f"\n📬 **Inbox Status**: {unread_count} unread emails ({unread_percentage:.0f}% of analyzed emails)\n" |
|
|
|
|
|
if top_senders: |
|
summary += f"\n👥 **Top Senders**:\n" |
|
for sender, count in top_senders: |
|
summary += f" • {sender}: {count} emails\n" |
|
|
|
|
|
summary += f""" |
|
💡 **AI Insights**: |
|
- {'High' if unread_percentage > 70 else 'Moderate' if unread_percentage > 30 else 'Low'} unread email ratio ({unread_percentage:.0f}%) |
|
- {'High' if high_priority_count > 3 else 'Normal'} priority workload ({high_priority_count} urgent emails) |
|
- {'Consider' if action_required_count > 5 else 'Manageable'} task delegation for action items ({action_required_count} tasks) |
|
- {top_senders[0][0] if top_senders else 'No single sender'} is your most frequent correspondent ({top_senders[0][1] if top_senders else 0} emails) |
|
""" |
|
|
|
return summary |
|
|
|
def _generate_workflow_recommendations(self, triage_results: Dict) -> List[str]: |
|
"""Generate intelligent workflow recommendations""" |
|
recommendations = [] |
|
|
|
high_priority_count = len(triage_results.get('high_priority', [])) |
|
action_count = len(triage_results.get('action_required', [])) |
|
|
|
if high_priority_count > 0: |
|
recommendations.append(f"🚨 Handle {high_priority_count} urgent emails first") |
|
|
|
if action_count > 5: |
|
recommendations.append("📋 Consider batching similar action items") |
|
recommendations.append("⏰ Set aside 2-3 hour block for email processing") |
|
|
|
recommendations.extend([ |
|
"📅 Schedule 15-min email review sessions", |
|
"🔄 Set up automated filters for newsletters", |
|
"📱 Enable smart notifications for high-priority senders" |
|
]) |
|
|
|
return recommendations |
|
|
|
def _update_email_patterns(self, emails: List[Dict]): |
|
"""Update agent memory with email patterns""" |
|
|
|
current_time = datetime.now().isoformat() |
|
self.memory.email_patterns[current_time] = { |
|
"total_emails": len(emails), |
|
"unread_count": sum(1 for e in emails if e.get('is_unread', False)), |
|
"top_senders": self._get_sender_stats(emails) |
|
} |
|
|
|
def _get_sender_stats(self, emails: List[Dict]) -> Dict[str, int]: |
|
"""Get sender statistics""" |
|
senders = {} |
|
for email in emails: |
|
sender = email.get('sender', 'Unknown') |
|
senders[sender] = senders.get(sender, 0) + 1 |
|
return dict(sorted(senders.items(), key=lambda x: x[1], reverse=True)[:5]) |
|
|
|
def proactive_assistant_chat(self, user_message: str, chat_history: List) -> Tuple[str, List]: |
|
"""Enhanced chat with proactive suggestions and agent reasoning""" |
|
try: |
|
|
|
intent, confidence = self._analyze_user_intent(user_message) |
|
|
|
|
|
chat_history.append((user_message, None)) |
|
|
|
|
|
try: |
|
|
|
logger.info("Fetching recent emails for chat context") |
|
recent_emails = self.mcp_client.fetch_emails(query="newer_than:3d", max_results=10) |
|
|
|
if recent_emails and 'emails' in recent_emails: |
|
|
|
response = self._generate_smart_response_batch(user_message, intent, confidence, recent_emails['emails'][:5]) |
|
else: |
|
|
|
response = self._handle_intent_fallback(user_message, intent, confidence) |
|
|
|
|
|
self.memory.conversation_context.append({ |
|
"user_message": user_message, |
|
"intent": intent, |
|
"confidence": confidence, |
|
"timestamp": datetime.now().isoformat() |
|
}) |
|
|
|
except requests.exceptions.Timeout: |
|
logger.error(f"Timeout when processing chat response") |
|
response = f"🧠 **AI Analysis**\n💭 *Understanding: {intent.replace('_', ' ').title()}*\n\nI apologize, but I'm having trouble connecting to the advanced AI service at the moment. Here's a simplified response based on your query:\n\n{self._handle_intent_fallback(user_message, intent, confidence)}" |
|
except Exception as e: |
|
logger.error(f"Error generating chat response: {e}") |
|
response = f"🧠 **AI Analysis**\n💭 *Understanding: {intent.replace('_', ' ').title()}*\n\nI apologize, but I encountered an error while analyzing your emails. Let me provide a simple response instead:\n\n{self._handle_intent_fallback(user_message, intent, confidence)}" |
|
|
|
|
|
chat_history[-1] = (user_message, response) |
|
|
|
return "", chat_history |
|
|
|
except Exception as e: |
|
error_response = f"🧠 **Agent Analysis**\n\n❌ Error: {str(e)}\n\nI apologize for the inconvenience. Please try a different question or check the email connection status." |
|
|
|
|
|
if not chat_history: |
|
chat_history = [] |
|
|
|
|
|
chat_history.append((user_message, error_response)) |
|
return "", chat_history |
|
|
|
def _generate_smart_response_batch(self, message: str, intent: str, confidence: float, emails: List[Dict]) -> str: |
|
"""Generate intelligent response based on intent analysis and email processing - SINGLE API CALL""" |
|
|
|
|
|
response_header = f"🧠 **AI Analysis**\n" |
|
response_header += f"<span class='detected-intent'>Understanding: {intent.replace('_', ' ').title()}</span>\n\n" |
|
|
|
try: |
|
if self.model and emails: |
|
logger.info(f"Generating smart response with email context ({len(emails)} emails)") |
|
|
|
|
|
email_summaries = [] |
|
for i, email in enumerate(emails[:5]): |
|
sender = self._clean_sender(email.get('sender', 'Unknown')) |
|
subject = email.get('subject', 'No subject')[:60] |
|
is_unread = "unread" if email.get('is_unread', False) else "read" |
|
|
|
email_summaries.append(f"Email {i+1}: \"{subject}\" from {sender} ({is_unread})") |
|
|
|
|
|
chat_prompt = f"""You are an intelligent email assistant with access to the user's actual emails. The user asked: "{message}" |
|
|
|
I've detected their intent as: {intent} |
|
|
|
Here are their recent emails for context: |
|
{chr(10).join(email_summaries)} |
|
|
|
IMPORTANT GUIDELINES: |
|
1. Be concise but informative - keep responses under 250 words |
|
2. Reference specific emails by subject or sender when relevant |
|
3. Format your response with clear sections and bullet points when appropriate |
|
4. Provide actionable advice based on the actual emails |
|
5. Use a friendly, helpful tone |
|
6. For search queries, mention specific matching emails |
|
7. For summaries, group similar emails together |
|
8. For workflow questions, suggest specific organization strategies |
|
|
|
Make your response personalized to their actual emails. Be direct and helpful without unnecessary explanations. |
|
""" |
|
|
|
logger.info(f"Sending chat request to Qwen API with {len(emails)} email context") |
|
response = self.model.generate_content( |
|
chat_prompt, |
|
max_tokens=512, |
|
temperature=0.7, |
|
strip_thinking=True, |
|
timeout=120.0 |
|
) |
|
|
|
if response and hasattr(response, 'text') and response.text: |
|
ai_response = response.text.strip() |
|
logger.info(f"Received chat response: {len(ai_response)} chars") |
|
|
|
|
|
if len(ai_response) > 10: |
|
|
|
final_response = response_header + ai_response |
|
return final_response |
|
else: |
|
logger.warning("AI response too short, using fallback") |
|
return response_header + self._handle_intent_fallback(message, intent, confidence) |
|
else: |
|
|
|
logger.warning("Empty response from Qwen for chat, using fallback") |
|
return response_header + self._handle_intent_fallback(message, intent, confidence) |
|
else: |
|
|
|
logger.info("Using fallback response (no model or emails available)") |
|
return response_header + self._handle_intent_fallback(message, intent, confidence) |
|
|
|
except requests.exceptions.Timeout: |
|
logger.warning(f"Timeout in Qwen response generation - using fallback") |
|
return response_header + "I'll help with your request using my basic capabilities:\n\n" + self._handle_intent_fallback(message, intent, confidence) |
|
except Exception as e: |
|
logger.warning(f"Error generating smart response: {e}") |
|
return response_header + self._handle_intent_fallback(message, intent, confidence) |
|
|
|
def _analyze_user_intent(self, message: str) -> Tuple[str, float]: |
|
"""Analyze user intent and confidence level""" |
|
message_lower = message.lower() |
|
|
|
|
|
intent_patterns = { |
|
"email_search": (["find", "search", "look for", "show me"], 0.8), |
|
"email_summary": (["summarize", "summary", "overview", "brief"], 0.9), |
|
"workflow_automation": (["automate", "organize", "cleanup", "triage"], 0.7), |
|
"meeting_prep": (["meeting", "call", "appointment", "schedule"], 0.8), |
|
"priority_focus": (["urgent", "important", "priority", "critical"], 0.9), |
|
"general_help": (["help", "how to", "what can", "assistance"], 0.6) |
|
} |
|
|
|
best_intent = "general_help" |
|
best_confidence = 0.3 |
|
|
|
for intent, (keywords, base_confidence) in intent_patterns.items(): |
|
matches = sum(1 for keyword in keywords if keyword in message_lower) |
|
if matches > 0: |
|
confidence = min(base_confidence + (matches - 1) * 0.1, 1.0) |
|
if confidence > best_confidence: |
|
best_intent = intent |
|
best_confidence = confidence |
|
|
|
return best_intent, best_confidence |
|
|
|
def _generate_smart_response(self, message: str, intent: str, confidence: float) -> str: |
|
"""Generate intelligent response based on intent analysis using real email data""" |
|
|
|
|
|
response_header = f"🧠 **AI Analysis**\n" |
|
response_header += f"<span class='detected-intent'>Understanding: {intent.replace('_', ' ').title()}</span>\n\n" |
|
|
|
try: |
|
|
|
recent_emails = self.mcp_client.fetch_emails(query="newer_than:7d", max_results=15) |
|
|
|
if self.model and recent_emails and 'emails' in recent_emails: |
|
|
|
return self._generate_qwen_response(message, intent, confidence, recent_emails['emails']) |
|
else: |
|
|
|
if intent == "email_search" and confidence > 0.7: |
|
return response_header + self._handle_search_intent(message) |
|
elif intent == "email_summary" and confidence > 0.8: |
|
return response_header + self._handle_summary_intent(message) |
|
elif intent == "workflow_automation": |
|
return response_header + self._handle_automation_intent(message) |
|
elif intent == "meeting_prep": |
|
return response_header + self._handle_meeting_intent(message) |
|
elif intent == "priority_focus": |
|
return response_header + self._handle_priority_intent(message) |
|
else: |
|
return response_header + self._handle_general_intent(message) |
|
|
|
except Exception as e: |
|
return f"Error generating smart response: {str(e)}" |
|
|
|
def _handle_search_intent(self, message: str) -> str: |
|
"""Handle search-related queries""" |
|
|
|
search_terms = self._extract_search_terms(message) |
|
|
|
if search_terms: |
|
|
|
results = self.mcp_client.search_emails( |
|
subject_contains=" OR ".join(search_terms), |
|
max_results=10 |
|
) |
|
|
|
if results and 'emails' in results: |
|
count = len(results['emails']) |
|
return f"🔍 **Search Results**: Found {count} emails matching your criteria.\n\n" + \ |
|
f"**Search Terms Used**: {', '.join(search_terms)}\n\n" + \ |
|
"📧 **Top Matches**:\n" + \ |
|
"\n".join([f"• {email['subject']} (from {self._clean_sender(email['sender'])})" |
|
for email in results['emails'][:3]]) |
|
|
|
return "🔍 I can help you search your emails! Try being more specific, like:\n" + \ |
|
"• 'Find emails from John about the project'\n" + \ |
|
"• 'Search for meeting emails from last week'\n" + \ |
|
"• 'Show me emails with attachments'" |
|
|
|
def _handle_summary_intent(self, message: str) -> str: |
|
"""Handle summary-related queries""" |
|
|
|
if "week" in message.lower(): |
|
days = 7 |
|
elif "month" in message.lower(): |
|
days = 30 |
|
elif "today" in message.lower(): |
|
days = 1 |
|
else: |
|
days = 3 |
|
|
|
summary_result = self.mcp_client.summarize_emails(days=days, include_body=True) |
|
|
|
if summary_result: |
|
return f"📊 **Email Summary - Last {days} Days**\n\n" + \ |
|
f"📧 Total: {summary_result.get('total_emails', 0)} emails\n" + \ |
|
f"🔴 Unread: {summary_result.get('unread_count', 0)} emails\n\n" + \ |
|
"💡 **Agent Recommendation**: Focus on unread emails first for maximum efficiency." |
|
|
|
return f"📊 No emails found for the last {days} days." |
|
|
|
def _handle_automation_intent(self, message: str) -> str: |
|
"""Handle workflow automation queries""" |
|
return """🤖 **Email Organization Options**: |
|
|
|
📊 **Email Analysis** - Get insights about your emails |
|
🎯 **Priority Focus** - Focus on high-priority emails |
|
🧹 **Inbox Management** - Get organization suggestions |
|
📅 **Meeting Preparation** - Find meeting-related emails |
|
|
|
💡 **Try asking**: "Help me organize my inbox" or "Analyze my emails and provide recommendations" |
|
""" |
|
|
|
def _handle_meeting_intent(self, message: str) -> str: |
|
"""Handle meeting-related queries""" |
|
meeting_workflow = self._meeting_preparation_workflow() |
|
return f"📅 **Meeting Assistant Activated**\n\n{meeting_workflow}" |
|
|
|
def _handle_priority_intent(self, message: str) -> str: |
|
"""Handle priority/urgent email queries""" |
|
priority_workflow = self._priority_focus_workflow() |
|
return f"🚨 **Priority Mode Activated**\n\n{priority_workflow}" |
|
|
|
def _handle_general_intent(self, message: str) -> str: |
|
"""Handle general queries with proactive suggestions""" |
|
return """🤖 **How can I help you today?** |
|
|
|
I'm your intelligent email assistant with advanced capabilities: |
|
|
|
🧠 **Smart Features**: |
|
• Intent recognition and context awareness |
|
• Automated email triage and categorization |
|
• Intelligent recommendations |
|
• Email pattern analysis |
|
|
|
💬 **Try asking me**: |
|
• "Show me my most important emails" |
|
• "Help me organize my inbox" |
|
• "What emails do I have from [specific sender]?" |
|
• "Summarize emails from this week" |
|
|
|
🎯 **Proactive Suggestion**: Let me analyze your emails to provide personalized recommendations!""" |
|
|
|
def _extract_search_terms(self, message: str) -> List[str]: |
|
"""Extract search terms from user message""" |
|
|
|
words = message.lower().split() |
|
stop_words = {'find', 'search', 'show', 'me', 'my', 'from', 'about', 'with', 'for', 'the', 'a', 'an'} |
|
return [word for word in words if word not in stop_words and len(word) > 2] |
|
|
|
def _clean_sender(self, sender: str) -> str: |
|
"""Clean sender email for display""" |
|
if '<' in sender: |
|
email_match = re.search(r'<([^>]+)>', sender) |
|
if email_match: |
|
return email_match.group(1) |
|
return sender |
|
|
|
def _generate_qwen_response(self, message: str, intent: str, confidence: float, emails: List[Dict]) -> str: |
|
"""Generate response using Qwen with real email context""" |
|
try: |
|
|
|
email_context = [] |
|
|
|
for email in emails[:2]: |
|
email_context.append({ |
|
'subject': email.get('subject', 'No subject')[:20], |
|
'sender': self._clean_sender(email.get('sender', 'Unknown')), |
|
'is_unread': email.get('is_unread', False) |
|
|
|
}) |
|
|
|
|
|
system_prompt = f""" |
|
You are an email assistant. Answer the user's query about their emails briefly. |
|
USER QUERY: "{message}" |
|
INTENT: {intent} |
|
EMAIL SAMPLE: {len(email_context)} emails |
|
""" |
|
|
|
|
|
response = self.model.generate_content( |
|
system_prompt, |
|
max_tokens=512, |
|
temperature=0.7, |
|
strip_thinking=True, |
|
retries=2 |
|
) |
|
|
|
if response and hasattr(response, 'text') and response.text: |
|
ai_response = response.text.strip() |
|
|
|
|
|
final_response = f"🧠 **AI Analysis**\n" |
|
final_response += f"<span class='detected-intent'>Understanding: {intent.replace('_', ' ').title()}</span>\n\n" |
|
final_response += ai_response |
|
|
|
return final_response |
|
else: |
|
|
|
logger.warning("Empty response from Qwen, using fallback") |
|
return self._handle_intent_fallback(message, intent, confidence) |
|
|
|
except requests.exceptions.Timeout: |
|
logger.warning(f"Timeout in Qwen response generation - using fallback") |
|
response_header = f"🧠 **AI Analysis**\n" |
|
response_header += f"<span class='detected-intent'>Understanding: {intent.replace('_', ' ').title()}</span>\n\n" |
|
response_header += "I'll help with your request using my basic capabilities:\n\n" |
|
return response_header + self._handle_intent_fallback(message, intent, confidence) |
|
except Exception as e: |
|
logger.warning(f"Qwen response generation failed: {e}") |
|
return self._handle_intent_fallback(message, intent, confidence) |
|
|
|
def _handle_intent_fallback(self, message: str, intent: str, confidence: float) -> str: |
|
"""Fallback intent handling when Qwen is not available""" |
|
response_header = f"🧠 **AI Analysis**\n" |
|
response_header += f"<span class='detected-intent'>Understanding: {intent.replace('_', ' ').title()}</span>\n\n" |
|
|
|
if intent == "email_search" and confidence > 0.7: |
|
return response_header + self._handle_search_intent(message) |
|
elif intent == "email_summary" and confidence > 0.8: |
|
return response_header + self._handle_summary_intent(message) |
|
elif intent == "workflow_automation": |
|
return response_header + self._handle_automation_intent(message) |
|
elif intent == "meeting_prep": |
|
return response_header + self._handle_meeting_intent(message) |
|
elif intent == "priority_focus": |
|
return response_header + self._handle_priority_intent(message) |
|
else: |
|
return response_header + self._handle_general_intent(message) |
|
|
|
def get_connection_status(self) -> Tuple[str, str]: |
|
"""Get the current connection status of the MCP client""" |
|
try: |
|
|
|
test_result = self.mcp_client.fetch_emails(query="newer_than:1d", max_results=1) |
|
|
|
if test_result and 'emails' in test_result: |
|
total_count = test_result.get('total_count', 0) |
|
return f"Connected ✅ ({total_count} emails accessible)", "success" |
|
else: |
|
return "Connected but no emails found 📭", "warning" |
|
|
|
except Exception as e: |
|
return f"Connection Error ❌ ({str(e)[:50]}...)", "error" |
|
|
|
def _display_triage_results(self, triage_results: Dict) -> str: |
|
"""Format triage results for beautiful, readable display in Gradio""" |
|
if "error" in triage_results: |
|
return f"❌ **Error:** {triage_results['error']}" |
|
|
|
|
|
output = "# 🧠 **AI Email Analysis Results**\n\n" |
|
|
|
|
|
summary = triage_results.get("analysis_summary", "") |
|
if summary: |
|
output += summary + "\n\n" |
|
output += "---\n\n" |
|
|
|
|
|
categories = [ |
|
("high_priority", "🚨 **High Priority**", "Urgent attention needed"), |
|
("meetings_and_calls", "📅 **Meetings & Calls**", "Calendar items"), |
|
("action_required", "✅ **Action Required**", "Tasks to complete"), |
|
("newsletters_and_updates", "📰 **Newsletters**", "Updates & info"), |
|
("personal", "👤 **Personal**", "Personal messages"), |
|
("low_priority", "📁 **Low Priority**", "Can wait") |
|
] |
|
|
|
|
|
total_processed = sum(len(triage_results.get(cat, [])) for cat, _, _ in categories) |
|
|
|
if total_processed == 0: |
|
return output + "📭 **No emails found to analyze.**" |
|
|
|
|
|
output += f"## 📊 **Quick Overview** ({total_processed} emails analyzed)\n\n" |
|
|
|
|
|
output += "| Category | Count | Distribution |\n" |
|
output += "|----------|-------|-------------|\n" |
|
|
|
for category_key, category_icon, category_desc in categories: |
|
count = len(triage_results.get(category_key, [])) |
|
if total_processed > 0: |
|
percentage = (count / total_processed * 100) |
|
|
|
bar_length = int(percentage / 5) |
|
bar = "█" * bar_length if bar_length > 0 else "" |
|
output += f"| {category_icon} | {count} | {bar} {percentage:.0f}% |\n" |
|
|
|
output += "\n---\n\n" |
|
|
|
|
|
priority_categories = ["high_priority", "meetings_and_calls", "action_required"] |
|
|
|
for category_key, category_title, category_desc in categories: |
|
emails = triage_results.get(category_key, []) |
|
if not emails: |
|
continue |
|
|
|
|
|
show_full_details = category_key in priority_categories |
|
|
|
output += f"## {category_title} ({len(emails)} emails)\n" |
|
output += f"*{category_desc}*\n\n" |
|
|
|
if show_full_details: |
|
|
|
for i, email in enumerate(emails[:5]): |
|
priority_score = email.get('priority_score', 0.5) |
|
priority_icon = "🔥" if priority_score > 0.8 else "⚠️" if priority_score > 0.6 else "📌" |
|
|
|
subject = email.get('subject', 'No subject') |
|
sender = self._clean_sender(email.get('sender', 'Unknown')) |
|
|
|
|
|
if len(subject) > 60: |
|
subject = subject[:57] + "..." |
|
if len(sender) > 35: |
|
sender = sender[:32] + "..." |
|
|
|
output += f"### {priority_icon} {subject}\n" |
|
output += f"📤 **From:** {sender} | ⭐ **Priority:** {priority_score:.1f}/1.0\n\n" |
|
|
|
|
|
reasoning = email.get('ai_reasoning', '') |
|
if reasoning and len(reasoning) > 5: |
|
output += f"🧠 **AI Analysis:** {reasoning}\n\n" |
|
|
|
|
|
actions = email.get('suggested_actions', []) |
|
if actions: |
|
output += "💡 **Suggested Actions:**\n" |
|
for action in actions[:3]: |
|
output += f"- {action}\n" |
|
output += "\n" |
|
|
|
output += "---\n\n" |
|
|
|
|
|
if len(emails) > 5: |
|
output += f"*...and {len(emails) - 5} more emails in this category*\n\n" |
|
|
|
else: |
|
|
|
output += "<div class='email-summary-table'>\n\n" |
|
output += "| Subject | From | Priority |\n" |
|
output += "|---------|------|----------|\n" |
|
|
|
for email in emails[:5]: |
|
subject = email.get('subject', 'No subject') |
|
if len(subject) > 40: |
|
subject = subject[:37] + "..." |
|
|
|
sender = self._clean_sender(email.get('sender', 'Unknown')) |
|
if len(sender) > 25: |
|
sender = sender[:22] + "..." |
|
|
|
priority = email.get('priority_score', 0.5) |
|
priority_icon = "🔥" if priority > 0.8 else "⚠️" if priority > 0.6 else "📌" |
|
|
|
output += f"| {subject} | {sender} | {priority_icon} {priority:.1f} |\n" |
|
|
|
output += "\n</div>\n\n" |
|
|
|
if len(emails) > 5: |
|
output += f"*...and {len(emails) - 5} more emails in this category*\n\n" |
|
|
|
|
|
recommendations = triage_results.get("recommendations", []) |
|
if recommendations: |
|
output += "---\n\n" |
|
output += "## 💡 **AI Recommendations**\n\n" |
|
for i, rec in enumerate(recommendations[:5], 1): |
|
output += f"{i}. {rec}\n" |
|
output += "\n" |
|
|
|
|
|
high_count = len(triage_results.get('high_priority', [])) |
|
action_count = len(triage_results.get('action_required', [])) |
|
|
|
output += "---\n\n" |
|
output += "## 🎯 **What to Do Next**\n\n" |
|
|
|
output += "<div class='next-steps'>\n\n" |
|
|
|
if high_count > 0: |
|
output += f"1. 🚨 **URGENT:** Handle {high_count} high-priority emails first\n\n" |
|
|
|
if action_count > 0: |
|
output += f"2. ✅ **TODAY:** Complete {action_count} action items\n\n" |
|
|
|
output += f"3. 📅 **SCHEDULE:** Review meeting emails and update calendar\n\n" |
|
output += f"4. 🗂️ **ORGANIZE:** Archive newsletters and low-priority items\n\n" |
|
|
|
output += "</div>\n\n" |
|
|
|
output += "---\n\n" |
|
output += "*🤖 Analysis powered by Qwen AI • Results based on your actual email content*" |
|
|
|
return output |
|
|
|
|
|
def create_enhanced_gradio_interface(modal_api_url: str = None): |
|
"""Create the enhanced Gradio interface for the Gmail AI Agent""" |
|
|
|
|
|
modal_api_url = modal_api_url or MODAL_API_URL |
|
|
|
|
|
agent = EnhancedGmailAgent(modal_api_url=modal_api_url) |
|
|
|
|
|
mcp_status, model_status = agent.get_connection_status() |
|
|
|
|
|
theme = gr.Theme( |
|
primary_hue="indigo", |
|
secondary_hue="blue", |
|
neutral_hue="slate", |
|
) |
|
|
|
|
|
custom_css = """ |
|
/* ... existing code ... */ |
|
""" |
|
|
|
with gr.Blocks(theme=theme, title="Enhanced Gmail AI Agent - Track 3") as app: |
|
|
|
|
|
gr.HTML(""" |
|
<div class="main-header"> |
|
<h1>🤖 Enhanced Gmail AI Agent</h1> |
|
<p>Track 3: Sophisticated Agent Behaviors & Workflow Automation</p> |
|
<p style="font-size: 1.1rem; margin-top: 10px;">Intelligent Decision-Making • Workflow Automation • Proactive Assistance</p> |
|
</div> |
|
""") |
|
|
|
|
|
gr.HTML(""" |
|
<div style="background-color: #FF9800; color: #000000; padding: 15px; border-radius: 5px; margin-bottom: 20px; border: 2px solid #FF5722; box-shadow: 0 2px 5px rgba(0,0,0,0.2);"> |
|
<h3 style="margin-top: 0; font-size: 1.3rem; color: #000000;">⚠️ IMPORTANT: Initial Loading Time</h3> |
|
<p style="font-size: 1.1rem; font-weight: 500; margin: 10px 0;">On first run, AI responses may take <strong style="color: #D32F2F; font-size: 1.2rem;">120-200 seconds</strong> because the Qwen LLM is hosted on Modal and requires time for cold start.</p> |
|
<p style="font-size: 1.1rem;">Subsequent requests will be much faster after the initial cold start. Please be patient during your first interaction.</p> |
|
</div> |
|
""") |
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
status_text = gr.HTML(elem_classes=["status-card"]) |
|
|
|
def update_status(): |
|
email_status, email_status_type = agent.get_connection_status() |
|
|
|
|
|
qwen_status = agent.model_status if hasattr(agent, 'model_status') else "Unknown" |
|
qwen_class = "status-connected" if agent.model else "status-error" |
|
|
|
return f'''<div class="status-card" style="background-color: #1A237E; color: white; padding: 12px; border-radius: 5px; margin-bottom: 15px; box-shadow: 0 2px 5px rgba(0,0,0,0.2);"> |
|
<div style="font-size: 1.1rem; font-weight: bold; margin-bottom: 5px;">🧠 Qwen Model Status: <span style="color: {'#4CAF50' if agent.model else '#F44336'};">{qwen_status}</span></div> |
|
<p style="margin: 5px 0; font-size: 0.9rem;">Note: The Qwen API will be tested when you first use an AI feature</p> |
|
</div>''' |
|
|
|
status_text.value = update_status() |
|
|
|
with gr.Tabs() as tabs: |
|
|
|
|
|
with gr.Tab("🧠 Intelligent Triage"): |
|
with gr.Column(elem_classes=["content-card"]): |
|
gr.HTML('<h2 class="section-header">🧠 AI-Powered Email Triage & Analysis</h2>') |
|
|
|
gr.HTML(""" |
|
<div class="info-box"> |
|
<h3>✨ Smart Email Analysis</h3> |
|
<p>The agent analyzes your emails using AI to categorize, prioritize, and suggest specific actions. Each email gets a priority score and reasoning.</p> |
|
<p><strong>⏱️ Expected Time:</strong> 20-30 seconds for thorough analysis</p> |
|
</div> |
|
""") |
|
|
|
|
|
gr.HTML(""" |
|
<div style="background-color: #FF9800; color: #000000; padding: 12px; border-radius: 5px; margin-bottom: 15px; border: 2px solid #FF5722; box-shadow: 0 2px 5px rgba(0,0,0,0.2);"> |
|
<p style="margin: 0; font-weight: 600; font-size: 1rem;"><span style="font-size: 1.2rem;">⚠️</span> <strong>IMPORTANT:</strong> First-time AI operations may take <strong style="color: #D32F2F;">120-200 seconds</strong> due to Modal cold start.</p> |
|
</div> |
|
""") |
|
|
|
with gr.Row(): |
|
max_emails_triage = gr.Slider( |
|
minimum=10, maximum=50, value=20, step=5, |
|
label="📊 Emails to Analyze", |
|
info="More emails = more thorough analysis but longer wait time" |
|
) |
|
triage_btn = gr.Button("🧠 Run Intelligent Triage", variant="primary", size="lg") |
|
|
|
|
|
triage_loading = gr.HTML(visible=False) |
|
|
|
|
|
triage_output = gr.Markdown( |
|
label="🎯 Triage Results", |
|
value="Click 'Run Intelligent Triage' to see AI-powered email analysis with specific recommendations", |
|
elem_classes=["triage-results"], |
|
show_label=True |
|
) |
|
|
|
def run_triage_with_loading(max_emails): |
|
loading_html = f""" |
|
<div class="loading-container"> |
|
<div class="loading-spinner"></div> |
|
<h3>🧠 Analyzing Your {max_emails} Most Recent Emails...</h3> |
|
<p>AI is categorizing, prioritizing, and generating recommendations</p> |
|
<div class="progress-bar"> |
|
<div class="progress-fill"></div> |
|
</div> |
|
<p style="color: #666; font-size: 14px;">⏱️ This usually takes 20-30 seconds • Please wait patiently</p> |
|
<p style="color: #888; font-size: 12px;">📊 Processing: Fetch emails → AI analysis → Priority scoring → Recommendations</p> |
|
</div> |
|
""" |
|
return ( |
|
gr.update(value=loading_html, visible=True), |
|
gr.update(visible=False), |
|
gr.update(interactive=False) |
|
) |
|
|
|
def run_triage_complete(max_emails): |
|
results = agent.intelligent_email_triage(max_emails) |
|
if "error" in results: |
|
output = f"❌ **Error:** {results['error']}" |
|
else: |
|
|
|
output = results.get("formatted_display", "Analysis complete but no formatted output available.") |
|
|
|
return ( |
|
gr.update(visible=False), |
|
gr.update(value=output, visible=True), |
|
gr.update(interactive=True) |
|
) |
|
|
|
triage_btn.click( |
|
run_triage_with_loading, |
|
inputs=[max_emails_triage], |
|
outputs=[triage_loading, triage_output, triage_btn] |
|
).then( |
|
run_triage_complete, |
|
inputs=[max_emails_triage], |
|
outputs=[triage_loading, triage_output, triage_btn] |
|
) |
|
|
|
|
|
with gr.Tab("🤖 Proactive Assistant"): |
|
with gr.Column(elem_classes=["content-card"]): |
|
gr.HTML('<h2 class="section-header">🤖 Intelligent AI Assistant</h2>') |
|
|
|
gr.HTML(""" |
|
<div class="info-box"> |
|
<h3>🧠 Advanced AI Capabilities</h3> |
|
<p>This AI assistant analyzes your <strong>actual email data</strong> using Qwen model to provide personalized, specific recommendations:</p> |
|
<ul> |
|
<li><strong>Intent Recognition:</strong> Understands what you want with confidence scoring</li> |
|
<li><strong>Real Email Analysis:</strong> Uses your actual recent emails for context</li> |
|
<li><strong>Specific Recommendations:</strong> Mentions actual email subjects and senders</li> |
|
<li><strong>Proactive Suggestions:</strong> Based on your email patterns and priorities</li> |
|
</ul> |
|
</div> |
|
""") |
|
|
|
|
|
gr.HTML(""" |
|
<div style="background-color: #FF9800; color: #000000; padding: 12px; border-radius: 5px; margin-bottom: 15px; border: 2px solid #FF5722; box-shadow: 0 2px 5px rgba(0,0,0,0.2);"> |
|
<p style="margin: 0; font-weight: 600; font-size: 1rem;"><span style="font-size: 1.2rem;">⚠️</span> <strong>IMPORTANT:</strong> First-time AI operations may take <strong style="color: #D32F2F;">120-200 seconds</strong> due to Modal cold start.</p> |
|
</div> |
|
""") |
|
|
|
|
|
with gr.Column(elem_classes=["chat-container"]): |
|
chatbot = gr.Chatbot( |
|
height=500, |
|
container=True, |
|
value=[(None, "🤖 **Welcome to your Enhanced Email Assistant!**\n\nI'm powered by advanced AI and can analyze your actual emails to provide specific, personalized recommendations.\n\n⚠️ **IMPORTANT: First-time responses may take 120-200 seconds** due to the Qwen model's cold start on Modal. Please be patient!\n\n💡 **Try asking me:**\n• \"What emails should I focus on today?\"\n• \"Help me organize my inbox based on my recent emails\"\n• \"Show me my most important unread emails\"\n• \"Summarize my emails from this week\"\n\nI'll analyze your real email data and give you specific advice! ✨")], |
|
elem_classes=["enhanced-chatbot", "chat-messages"] |
|
) |
|
|
|
with gr.Row(elem_classes=["chat-input-container"]): |
|
chat_input = gr.Textbox( |
|
placeholder="Ask me about your emails or request email analysis...", |
|
scale=4, |
|
lines=2, |
|
max_lines=8, |
|
elem_classes=["enhanced-input"], |
|
autofocus=True, |
|
show_label=False |
|
) |
|
chat_send = gr.Button("📤 Send", variant="primary", scale=1, elem_classes=["send-button"]) |
|
|
|
chat_send.click( |
|
agent.proactive_assistant_chat, |
|
inputs=[chat_input, chatbot], |
|
outputs=[chat_input, chatbot] |
|
) |
|
|
|
chat_input.submit( |
|
agent.proactive_assistant_chat, |
|
inputs=[chat_input, chatbot], |
|
outputs=[chat_input, chatbot] |
|
) |
|
|
|
|
|
gr.HTML(""" |
|
<div class="enhanced-footer"> |
|
<h3>🏆 Track 3: Enhanced Agentic Demo Features</h3> |
|
<p> |
|
✨ <strong>Specific Email Recommendations</strong> • |
|
🔄 <strong>Real Data Analysis</strong> • |
|
🧠 <strong>Qwen-Powered Intelligence</strong> |
|
</p> |
|
<p style="font-size: 14px;"> |
|
Enhanced with real-time email analysis, smart loading states, and beautiful UI design |
|
</p> |
|
</div> |
|
""") |
|
|
|
return app |
|
|
|
if __name__ == "__main__": |
|
print("🚀 Starting Enhanced Gmail AI Agent (Track 3)") |
|
print("🧠 Features: Intelligent Triage | Smart Workflows | Proactive Assistant") |
|
|
|
|
|
modal_api_url = MODAL_API_URL |
|
if modal_api_url: |
|
print(f"📡 Using Modal API URL from environment: {modal_api_url}") |
|
else: |
|
print("⚠️ No Modal API URL found in environment variables. Some features may be limited.") |
|
|
|
app = create_enhanced_gradio_interface(modal_api_url=modal_api_url) |
|
app.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
show_error=True |
|
) |
|
|
|
|
|
app = create_enhanced_gradio_interface(modal_api_url=MODAL_API_URL) |