Spaces:
Running
Running
# agents/mentions_agent.py | |
import pandas as pd | |
from typing import Dict, List, Any, Optional, Mapping | |
import logging | |
import pandasai as pai # Assuming pandasai is imported as pai globally or configured | |
from google.adk.agents import LlmAgent # Assuming this is the correct import path | |
# Project-specific imports | |
from utils.retry_mechanism import RetryMechanism | |
from data_models.metrics import AgentMetrics, TimeSeriesMetric | |
# Configure logger for this module | |
logger = logging.getLogger(__name__) | |
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" | |
class EnhancedMentionsAnalysisAgent: | |
""" | |
Enhanced mentions analysis agent with time-series metric extraction and sentiment processing. | |
""" | |
AGENT_NAME = "mentions_analyst" | |
AGENT_DESCRIPTION = "Expert analyst specializing in brand mention trends and sentiment patterns." | |
AGENT_INSTRUCTION = """ | |
You are a specialized LinkedIn brand mentions expert focused on sentiment trends and mention patterns over time. | |
Your role includes: | |
1. MENTION TREND ANALYSIS (monthly, using 'date' column): | |
- Analyze mention volume trends over time. | |
- Identify periods with significant spikes or dips in mention activity. | |
2. SENTIMENT PATTERN ANALYSIS (monthly, using 'date' and 'sentiment_label'): | |
- Track the evolution of sentiment (e.g., positive, negative, neutral) associated with mentions. | |
- Calculate and analyze the average sentiment score over time (if sentiment can be quantified). | |
- Identify shifts in overall sentiment and potential drivers for these changes. | |
3. CORRELATION (Conceptual): | |
- Consider if mention spikes/dips or sentiment shifts correlate with any known company activities, campaigns, or external events (though this data might not be in the input DataFrame, mention the need to investigate). | |
4. METRIC EXTRACTION (for AgentMetrics): | |
- Extract time-series data for monthly mention volume. | |
- Extract time-series data for monthly sentiment distribution (e.g., count of positive/negative/neutral mentions) and average sentiment score. | |
- Provide aggregate metrics like total mentions, overall sentiment distribution, and average sentiment score for the period. | |
- Include categorical metrics like the distribution of sentiment labels. | |
Focus on identifying actionable insights from mention data. How is the brand being perceived? Are there emerging reputational risks or opportunities? | |
Use the provided DataFrame columns: 'date' (for mentions), 'sentiment_label' (e.g., 'Positive π', 'Negative π', 'Neutral π'), and potentially 'mention_source' or 'mention_content' if available and relevant for deeper analysis (though focus on 'date' and 'sentiment_label' for core metrics). | |
""" | |
# Standardized sentiment mapping (can be expanded) | |
# This mapping is crucial for converting labels to scores. | |
SENTIMENT_MAPPING = { | |
'Positive π': 1, | |
'Positive': 1, # Adding common variations | |
'Very Positive': 1.5, # Example for more granular sentiment | |
'Negative π': -1, | |
'Negative': -1, | |
'Very Negative': -1.5, | |
'Neutral π': 0, | |
'Neutral': 0, | |
'Mixed': 0, # Or handle mixed sentiment differently | |
'Unknown': 0 # Default score for unmapped or unknown sentiments | |
} | |
def __init__(self, api_key: str, model_name: Optional[str] = None): | |
self.api_key = api_key | |
self.model_name = model_name or DEFAULT_AGENT_MODEL | |
self.agent = LlmAgent( | |
name=self.AGENT_NAME, | |
model=self.model_name, | |
description=self.AGENT_DESCRIPTION, | |
instruction=self.AGENT_INSTRUCTION | |
) | |
self.retry_mechanism = RetryMechanism() | |
logger.info(f"{self.AGENT_NAME} initialized with model {self.model_name}.") | |
def _get_sentiment_score(self, sentiment_label: Optional[str]) -> float: | |
"""Maps a sentiment label to a numerical score using SENTIMENT_MAPPING.""" | |
if sentiment_label is None: | |
return self.SENTIMENT_MAPPING.get('Unknown', 0) | |
# Attempt to match known labels, case-insensitively for robustness if needed, | |
# but exact match is safer with the current emoji-inclusive keys. | |
return float(self.SENTIMENT_MAPPING.get(str(sentiment_label).strip(), self.SENTIMENT_MAPPING.get('Unknown',0))) | |
def _preprocess_mentions_data(self, df: pd.DataFrame) -> pd.DataFrame: | |
"""Cleans and prepares mentions data for analysis.""" | |
if df is None or df.empty: | |
return pd.DataFrame() | |
df_processed = df.copy() | |
# Convert 'date' to datetime | |
if 'date' in df_processed.columns: | |
df_processed['date'] = pd.to_datetime(df_processed['date'], errors='coerce') | |
# df_processed.dropna(subset=['date'], inplace=True) # Keep for other metrics even if date is NaT | |
else: | |
logger.warning("'date' column not found in mentions data. Time-series analysis will be limited.") | |
# df_processed['date'] = pd.NaT # Add placeholder if critical | |
# Process 'sentiment_label' and create 'sentiment_score' | |
if 'sentiment_label' in df_processed.columns: | |
df_processed['sentiment_label'] = df_processed['sentiment_label'].astype(str).fillna('Unknown') | |
df_processed['sentiment_score'] = df_processed['sentiment_label'].apply(self._get_sentiment_score) | |
else: | |
logger.info("'sentiment_label' column not found. Sentiment analysis will be limited.") | |
df_processed['sentiment_label'] = 'Unknown' | |
df_processed['sentiment_score'] = self._get_sentiment_score('Unknown') | |
return df_processed | |
def _extract_time_series_metrics(self, df_processed: pd.DataFrame) -> List[TimeSeriesMetric]: | |
"""Extracts monthly time-series metrics from processed mentions data.""" | |
ts_metrics = [] | |
if df_processed.empty or 'date' not in df_processed.columns or df_processed['date'].isnull().all(): | |
logger.info("Cannot extract time-series metrics for mentions: 'date' is missing or all null.") | |
return ts_metrics | |
df_ts = df_processed.dropna(subset=['date']).copy() | |
if df_ts.empty: | |
logger.info("No valid 'date' values for mentions time-series metrics after filtering NaT.") | |
return ts_metrics | |
df_ts['year_month'] = df_ts['date'].dt.strftime('%Y-%m') | |
# Monthly mention volume | |
monthly_volume = df_ts.groupby('year_month').size().reset_index(name='mention_count') | |
if not monthly_volume.empty: | |
ts_metrics.append(TimeSeriesMetric( | |
metric_name="monthly_mention_volume", | |
values=monthly_volume['mention_count'].tolist(), | |
timestamps=monthly_volume['year_month'].tolist(), | |
metric_type="time_series", | |
time_granularity="monthly", | |
unit="count" | |
)) | |
# Monthly average sentiment score | |
if 'sentiment_score' in df_ts.columns: | |
monthly_avg_sentiment = df_ts.groupby('year_month')['sentiment_score'].mean().reset_index() | |
if not monthly_avg_sentiment.empty: | |
ts_metrics.append(TimeSeriesMetric( | |
metric_name="avg_monthly_sentiment_score", | |
values=monthly_avg_sentiment['sentiment_score'].tolist(), | |
timestamps=monthly_avg_sentiment['year_month'].tolist(), | |
metric_type="time_series", | |
time_granularity="monthly", | |
unit="score" # Score range depends on SENTIMENT_MAPPING | |
)) | |
# Monthly distribution of sentiment labels | |
if 'sentiment_label' in df_ts.columns and df_ts['sentiment_label'].nunique() > 1: | |
# Ensure 'sentiment_label' is not all 'Unknown' | |
if not (df_ts['sentiment_label'] == 'Unknown').all(): | |
sentiment_counts_by_month = df_ts.groupby(['year_month', 'sentiment_label']).size().unstack(fill_value=0) | |
for sentiment_val in sentiment_counts_by_month.columns: | |
if sentiment_val == 'Unknown' and (sentiment_counts_by_month[sentiment_val] == 0).all(): | |
continue | |
ts_metrics.append(TimeSeriesMetric( | |
metric_name=f"monthly_mention_count_sentiment_{str(sentiment_val).lower().replace(' ', '_').replace('π','positive').replace('π','negative').replace('π','neutral')}", | |
values=sentiment_counts_by_month[sentiment_val].tolist(), | |
timestamps=sentiment_counts_by_month.index.tolist(), # year_month is index | |
metric_type="time_series", | |
time_granularity="monthly", | |
unit="count" | |
)) | |
else: | |
logger.info("Sentiment label data is all 'Unknown', skipping sentiment distribution time series.") | |
return ts_metrics | |
def _calculate_aggregate_metrics(self, df_processed: pd.DataFrame) -> Dict[str, float]: | |
"""Calculates aggregate metrics for mentions.""" | |
agg_metrics = {} | |
if df_processed.empty: | |
return agg_metrics | |
agg_metrics['total_mentions_analyzed'] = float(len(df_processed)) | |
if 'sentiment_score' in df_processed.columns and not df_processed['sentiment_score'].empty: | |
agg_metrics['overall_avg_sentiment_score'] = float(df_processed['sentiment_score'].mean()) | |
if 'sentiment_label' in df_processed.columns: | |
total_valid_sentiments = len(df_processed.dropna(subset=['sentiment_label'])) # Count non-NaN labels | |
if total_valid_sentiments > 0: | |
# Iterate through our defined sentiment mapping to count occurrences | |
sentiment_counts = df_processed['sentiment_label'].value_counts() | |
for label, score_val in self.SENTIMENT_MAPPING.items(): | |
# Use a clean key for the metric name | |
clean_label_key = str(label).lower().replace(' ', '_').replace('π','positive').replace('π','negative').replace('π','neutral') | |
if clean_label_key == "unknown" and score_val == 0: # Skip generic unknown if it's just a fallback | |
if sentiment_counts.get(label, 0) == 0 and 'Unknown' not in label : continue | |
count = sentiment_counts.get(label, 0) | |
if count > 0 or label == 'Unknown': # Report if count > 0 or if it's the 'Unknown' category itself | |
agg_metrics[f'{clean_label_key}_mention_ratio'] = float(count / total_valid_sentiments) | |
agg_metrics[f'{clean_label_key}_mention_count'] = float(count) | |
# Mentions per day/week (if 'date' column is valid) | |
if 'date' in df_processed.columns and not df_processed['date'].isnull().all(): | |
df_dated = df_processed.dropna(subset=['date']).sort_values('date') | |
if len(df_dated) > 1: | |
duration_days = (df_dated['date'].max() - df_dated['date'].min()).days | |
if duration_days > 0: | |
agg_metrics['avg_mentions_per_day'] = float(len(df_dated) / duration_days) | |
agg_metrics['avg_mentions_per_week'] = float(len(df_dated) / (duration_days / 7.0)) | |
elif len(df_dated) == 1: # Single day with mentions | |
agg_metrics['avg_mentions_per_day'] = float(len(df_dated)) | |
agg_metrics['avg_mentions_per_week'] = float(len(df_dated) * 7) # Extrapolate | |
return agg_metrics | |
def _extract_categorical_metrics(self, df_processed: pd.DataFrame) -> Dict[str, Any]: | |
"""Extracts categorical distributions for mentions.""" | |
cat_metrics = {} | |
if df_processed.empty: | |
return cat_metrics | |
# Sentiment label distribution (counts and percentages) | |
if 'sentiment_label' in df_processed.columns and df_processed['sentiment_label'].nunique() > 0: | |
cat_metrics['sentiment_label_distribution_percentage'] = df_processed['sentiment_label'].value_counts(normalize=True).apply(lambda x: f"{x:.2%}").to_dict() | |
cat_metrics['sentiment_label_counts'] = df_processed['sentiment_label'].value_counts().to_dict() | |
# Example: If 'mention_source' column existed: | |
# if 'mention_source' in df_processed.columns: | |
# cat_metrics['mention_source_distribution'] = df_processed['mention_source'].value_counts(normalize=True).to_dict() | |
# cat_metrics['mention_source_counts'] = df_processed['mention_source'].value_counts().to_dict() | |
return cat_metrics | |
def _extract_time_periods(self, df_processed: pd.DataFrame) -> List[str]: | |
"""Extracts unique year-month time periods covered by the mentions data.""" | |
if df_processed.empty or 'date' not in df_processed.columns or df_processed['date'].isnull().all(): | |
return ["Data period not available or N/A"] | |
if 'year_month' in df_processed.columns: # If already created during TS extraction | |
periods = sorted(df_processed['year_month'].dropna().unique().tolist(), reverse=True) | |
elif 'date' in df_processed.columns: # Derive if not present | |
dates = df_processed['date'].dropna() | |
if not dates.empty: | |
periods = sorted(dates.dt.strftime('%Y-%m').unique().tolist(), reverse=True) | |
else: return ["N/A"] | |
else: return ["N/A"] | |
return periods[:12] # Return up to the last 12 months | |
def analyze_mentions_data(self, mentions_df: pd.DataFrame) -> AgentMetrics: | |
""" | |
Generates comprehensive mentions analysis. | |
""" | |
if mentions_df is None or mentions_df.empty: | |
logger.warning("Mentions DataFrame is empty. Returning empty metrics.") | |
return AgentMetrics( | |
agent_name=self.AGENT_NAME, | |
analysis_summary="No mentions data provided for analysis.", | |
time_periods_covered=["N/A"] | |
) | |
# 1. Preprocess data | |
df_processed = self._preprocess_mentions_data(mentions_df) | |
if df_processed.empty and not mentions_df.empty: | |
logger.warning("Mentions DataFrame became empty after preprocessing.") | |
return AgentMetrics( | |
agent_name=self.AGENT_NAME, | |
analysis_summary="Mentions data could not be processed.", | |
time_periods_covered=["N/A"] | |
) | |
elif df_processed.empty and mentions_df.empty: | |
return AgentMetrics(agent_name=self.AGENT_NAME, analysis_summary="No mentions data provided.") | |
# 2. Generate textual analysis using PandasAI | |
df_description_for_pandasai = "LinkedIn brand mentions data. Key columns: 'date' (date of mention), 'sentiment_label' (e.g., 'Positive π', 'Negative π', 'Neutral π'), 'sentiment_score' (numeric score from -1.5 to 1.5)." | |
analysis_result_text = "PandasAI analysis for mentions could not be performed." | |
try: | |
pandas_ai_df = pai.DataFrame(df_processed, description=df_description_for_pandasai) | |
analysis_query = f""" | |
Analyze the provided LinkedIn brand mentions data. Focus on: | |
1. Monthly trends in mention volume. | |
2. Monthly trends in sentiment (average 'sentiment_score' and distribution of 'sentiment_label'). | |
3. Identify any significant spikes/dips in mentions or shifts in sentiment. | |
Provide a concise summary of brand perception based on this data. | |
""" | |
def chat_operation(): | |
if not pai.config.llm: | |
logger.warning("PandasAI LLM not configured for mentions agent. Attempting to configure.") | |
from utils.pandasai_setup import configure_pandasai | |
configure_pandasai(self.api_key, self.model_name) | |
if not pai.config.llm: | |
raise RuntimeError("PandasAI LLM could not be configured for mentions chat operation.") | |
logger.info(f"Executing PandasAI chat for mentions analysis with LLM: {pai.config.llm}") | |
return pandas_ai_df.chat(analysis_query) | |
analysis_result_raw = self.retry_mechanism.retry_with_backoff( | |
func=chat_operation, max_retries=2, base_delay=2.0, exceptions=(Exception,) | |
) | |
analysis_result_text = str(analysis_result_raw) if analysis_result_raw else "No textual analysis for mentions generated by PandasAI." | |
logger.info("Mentions analysis via PandasAI completed.") | |
except Exception as e: | |
logger.error(f"Mentions analysis with PandasAI failed: {e}", exc_info=True) | |
analysis_result_text = f"Mentions analysis using PandasAI failed. Error: {str(e)[:200]}" | |
# 3. Extract structured metrics | |
time_series_metrics = self._extract_time_series_metrics(df_processed) | |
aggregate_metrics = self._calculate_aggregate_metrics(df_processed) | |
categorical_metrics = self._extract_categorical_metrics(df_processed) | |
time_periods = self._extract_time_periods(df_processed) | |
return AgentMetrics( | |
agent_name=self.AGENT_NAME, | |
analysis_summary=analysis_result_text[:2000], | |
time_series_metrics=time_series_metrics, | |
aggregate_metrics=aggregate_metrics, | |
categorical_metrics=categorical_metrics, | |
time_periods_covered=time_periods, | |
data_sources_used=[f"mentions_df (shape: {mentions_df.shape}) -> df_processed (shape: {df_processed.shape})"] | |
) | |
if __name__ == '__main__': | |
try: | |
from utils.logging_config import setup_logging | |
setup_logging() | |
logger.info("Logging setup for EnhancedMentionsAnalysisAgent test.") | |
except ImportError: | |
logging.basicConfig(level=logging.INFO) | |
logger.warning("Could not import setup_logging. Using basicConfig.") | |
MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_mentions") | |
MODEL_NAME = DEFAULT_AGENT_MODEL | |
try: | |
from utils.pandasai_setup import configure_pandasai | |
if MOCK_API_KEY != "test_api_key_mentions": | |
configure_pandasai(MOCK_API_KEY, MODEL_NAME) | |
logger.info("PandasAI configured for testing EnhancedMentionsAnalysisAgent.") | |
else: | |
logger.warning("Using mock API key for mentions. PandasAI chat will likely fail or use a mock.") | |
class MockPandasAIDataFrame: | |
def __init__(self, df, description): self.df = df; self.description = description | |
def chat(self, query): return f"Mock PandasAI mentions response to: {query}" | |
pai.DataFrame = MockPandasAIDataFrame | |
except ImportError: | |
logger.error("utils.pandasai_setup not found. PandasAI will not be configured for mentions.") | |
class MockPandasAIDataFrame: | |
def __init__(self, df, description): self.df = df; self.description = description | |
def chat(self, query): return f"Mock PandasAI mentions response to: {query}" | |
pai.DataFrame = MockPandasAIDataFrame | |
sample_mentions_data = { | |
'date': pd.to_datetime(['2023-01-05', '2023-01-15', '2023-02-02', '2023-02-20', '2023-03-10', '2023-03-12']), | |
'sentiment_label': ['Positive π', 'Negative π', 'Neutral π', 'Positive π', 'Positive π', 'Unknown'], | |
# 'mention_content': ['Great product!', 'Service was slow.', 'Just a mention.', 'Love the new feature!', 'Highly recommend.', 'Seen this around.'] | |
} | |
sample_df_mentions = pd.DataFrame(sample_mentions_data) | |
mentions_agent = EnhancedMentionsAnalysisAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME) | |
logger.info("Analyzing sample mentions data...") | |
mentions_metrics_result = mentions_agent.analyze_mentions_data(sample_df_mentions) | |
print("\n--- EnhancedMentionsAnalysisAgent Results ---") | |
print(f"Agent Name: {mentions_metrics_result.agent_name}") | |
print(f"Analysis Summary: {mentions_metrics_result.analysis_summary}") | |
print("\nTime Series Metrics (Mentions):") | |
for ts_metric in mentions_metrics_result.time_series_metrics: | |
print(f" - {ts_metric.metric_name}: {len(ts_metric.values)} data points, e.g., {ts_metric.values[:3]} for ts {ts_metric.timestamps[:3]} (Unit: {ts_metric.unit})") | |
print("\nAggregate Metrics (Mentions):") | |
for key, value in mentions_metrics_result.aggregate_metrics.items(): | |
print(f" - {key}: {value}") | |
print("\nCategorical Metrics (Mentions):") | |
for key, value in mentions_metrics_result.categorical_metrics.items(): | |
print(f" - {key}:") | |
if isinstance(value, dict): | |
for sub_key, sub_value in list(value.items())[:2]: # Print first 2 for brevity | |
print(f" - {sub_key}: {sub_value}") | |
else: | |
print(f" {value}") | |
print(f"\nTime Periods Covered (Mentions): {mentions_metrics_result.time_periods_covered}") | |
# Test with empty DataFrame | |
logger.info("\n--- Testing Mentions Agent with empty DataFrame ---") | |
empty_mentions_metrics = mentions_agent.analyze_mentions_data(pd.DataFrame()) | |
print(f"Empty Mentions DF Analysis Summary: {empty_mentions_metrics.analysis_summary}") | |