GuglielmoTor's picture
Update insight_and_tasks/agents/mentions_agent.py
3332e5b verified
raw
history blame
21.5 kB
# agents/mentions_agent.py
import pandas as pd
from typing import Dict, List, Any, Optional, Mapping
import logging
import pandasai as pai # Assuming pandasai is imported as pai globally or configured
from google.adk.agents import LlmAgent # Assuming this is the correct import path
# Project-specific imports
from utils.retry_mechanism import RetryMechanism
from data_models.metrics import AgentMetrics, TimeSeriesMetric
# Configure logger for this module
logger = logging.getLogger(__name__)
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20"
class EnhancedMentionsAnalysisAgent:
"""
Enhanced mentions analysis agent with time-series metric extraction and sentiment processing.
"""
AGENT_NAME = "mentions_analyst"
AGENT_DESCRIPTION = "Expert analyst specializing in brand mention trends and sentiment patterns."
AGENT_INSTRUCTION = """
You are a specialized LinkedIn brand mentions expert focused on sentiment trends and mention patterns over time.
Your role includes:
1. MENTION TREND ANALYSIS (monthly, using 'date' column):
- Analyze mention volume trends over time.
- Identify periods with significant spikes or dips in mention activity.
2. SENTIMENT PATTERN ANALYSIS (monthly, using 'date' and 'sentiment_label'):
- Track the evolution of sentiment (e.g., positive, negative, neutral) associated with mentions.
- Calculate and analyze the average sentiment score over time (if sentiment can be quantified).
- Identify shifts in overall sentiment and potential drivers for these changes.
3. CORRELATION (Conceptual):
- Consider if mention spikes/dips or sentiment shifts correlate with any known company activities, campaigns, or external events (though this data might not be in the input DataFrame, mention the need to investigate).
4. METRIC EXTRACTION (for AgentMetrics):
- Extract time-series data for monthly mention volume.
- Extract time-series data for monthly sentiment distribution (e.g., count of positive/negative/neutral mentions) and average sentiment score.
- Provide aggregate metrics like total mentions, overall sentiment distribution, and average sentiment score for the period.
- Include categorical metrics like the distribution of sentiment labels.
Focus on identifying actionable insights from mention data. How is the brand being perceived? Are there emerging reputational risks or opportunities?
Use the provided DataFrame columns: 'date' (for mentions), 'sentiment_label' (e.g., 'Positive πŸ‘', 'Negative πŸ‘Ž', 'Neutral 😐'), and potentially 'mention_source' or 'mention_content' if available and relevant for deeper analysis (though focus on 'date' and 'sentiment_label' for core metrics).
"""
# Standardized sentiment mapping (can be expanded)
# This mapping is crucial for converting labels to scores.
SENTIMENT_MAPPING = {
'Positive πŸ‘': 1,
'Positive': 1, # Adding common variations
'Very Positive': 1.5, # Example for more granular sentiment
'Negative πŸ‘Ž': -1,
'Negative': -1,
'Very Negative': -1.5,
'Neutral 😐': 0,
'Neutral': 0,
'Mixed': 0, # Or handle mixed sentiment differently
'Unknown': 0 # Default score for unmapped or unknown sentiments
}
def __init__(self, api_key: str, model_name: Optional[str] = None):
self.api_key = api_key
self.model_name = model_name or DEFAULT_AGENT_MODEL
self.agent = LlmAgent(
name=self.AGENT_NAME,
model=self.model_name,
description=self.AGENT_DESCRIPTION,
instruction=self.AGENT_INSTRUCTION
)
self.retry_mechanism = RetryMechanism()
logger.info(f"{self.AGENT_NAME} initialized with model {self.model_name}.")
def _get_sentiment_score(self, sentiment_label: Optional[str]) -> float:
"""Maps a sentiment label to a numerical score using SENTIMENT_MAPPING."""
if sentiment_label is None:
return self.SENTIMENT_MAPPING.get('Unknown', 0)
# Attempt to match known labels, case-insensitively for robustness if needed,
# but exact match is safer with the current emoji-inclusive keys.
return float(self.SENTIMENT_MAPPING.get(str(sentiment_label).strip(), self.SENTIMENT_MAPPING.get('Unknown',0)))
def _preprocess_mentions_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Cleans and prepares mentions data for analysis."""
if df is None or df.empty:
return pd.DataFrame()
df_processed = df.copy()
# Convert 'date' to datetime
if 'date' in df_processed.columns:
df_processed['date'] = pd.to_datetime(df_processed['date'], errors='coerce')
# df_processed.dropna(subset=['date'], inplace=True) # Keep for other metrics even if date is NaT
else:
logger.warning("'date' column not found in mentions data. Time-series analysis will be limited.")
# df_processed['date'] = pd.NaT # Add placeholder if critical
# Process 'sentiment_label' and create 'sentiment_score'
if 'sentiment_label' in df_processed.columns:
df_processed['sentiment_label'] = df_processed['sentiment_label'].astype(str).fillna('Unknown')
df_processed['sentiment_score'] = df_processed['sentiment_label'].apply(self._get_sentiment_score)
else:
logger.info("'sentiment_label' column not found. Sentiment analysis will be limited.")
df_processed['sentiment_label'] = 'Unknown'
df_processed['sentiment_score'] = self._get_sentiment_score('Unknown')
return df_processed
def _extract_time_series_metrics(self, df_processed: pd.DataFrame) -> List[TimeSeriesMetric]:
"""Extracts monthly time-series metrics from processed mentions data."""
ts_metrics = []
if df_processed.empty or 'date' not in df_processed.columns or df_processed['date'].isnull().all():
logger.info("Cannot extract time-series metrics for mentions: 'date' is missing or all null.")
return ts_metrics
df_ts = df_processed.dropna(subset=['date']).copy()
if df_ts.empty:
logger.info("No valid 'date' values for mentions time-series metrics after filtering NaT.")
return ts_metrics
df_ts['year_month'] = df_ts['date'].dt.strftime('%Y-%m')
# Monthly mention volume
monthly_volume = df_ts.groupby('year_month').size().reset_index(name='mention_count')
if not monthly_volume.empty:
ts_metrics.append(TimeSeriesMetric(
metric_name="monthly_mention_volume",
values=monthly_volume['mention_count'].tolist(),
timestamps=monthly_volume['year_month'].tolist(),
metric_type="time_series",
time_granularity="monthly",
unit="count"
))
# Monthly average sentiment score
if 'sentiment_score' in df_ts.columns:
monthly_avg_sentiment = df_ts.groupby('year_month')['sentiment_score'].mean().reset_index()
if not monthly_avg_sentiment.empty:
ts_metrics.append(TimeSeriesMetric(
metric_name="avg_monthly_sentiment_score",
values=monthly_avg_sentiment['sentiment_score'].tolist(),
timestamps=monthly_avg_sentiment['year_month'].tolist(),
metric_type="time_series",
time_granularity="monthly",
unit="score" # Score range depends on SENTIMENT_MAPPING
))
# Monthly distribution of sentiment labels
if 'sentiment_label' in df_ts.columns and df_ts['sentiment_label'].nunique() > 1:
# Ensure 'sentiment_label' is not all 'Unknown'
if not (df_ts['sentiment_label'] == 'Unknown').all():
sentiment_counts_by_month = df_ts.groupby(['year_month', 'sentiment_label']).size().unstack(fill_value=0)
for sentiment_val in sentiment_counts_by_month.columns:
if sentiment_val == 'Unknown' and (sentiment_counts_by_month[sentiment_val] == 0).all():
continue
ts_metrics.append(TimeSeriesMetric(
metric_name=f"monthly_mention_count_sentiment_{str(sentiment_val).lower().replace(' ', '_').replace('πŸ‘','positive').replace('πŸ‘Ž','negative').replace('😐','neutral')}",
values=sentiment_counts_by_month[sentiment_val].tolist(),
timestamps=sentiment_counts_by_month.index.tolist(), # year_month is index
metric_type="time_series",
time_granularity="monthly",
unit="count"
))
else:
logger.info("Sentiment label data is all 'Unknown', skipping sentiment distribution time series.")
return ts_metrics
def _calculate_aggregate_metrics(self, df_processed: pd.DataFrame) -> Dict[str, float]:
"""Calculates aggregate metrics for mentions."""
agg_metrics = {}
if df_processed.empty:
return agg_metrics
agg_metrics['total_mentions_analyzed'] = float(len(df_processed))
if 'sentiment_score' in df_processed.columns and not df_processed['sentiment_score'].empty:
agg_metrics['overall_avg_sentiment_score'] = float(df_processed['sentiment_score'].mean())
if 'sentiment_label' in df_processed.columns:
total_valid_sentiments = len(df_processed.dropna(subset=['sentiment_label'])) # Count non-NaN labels
if total_valid_sentiments > 0:
# Iterate through our defined sentiment mapping to count occurrences
sentiment_counts = df_processed['sentiment_label'].value_counts()
for label, score_val in self.SENTIMENT_MAPPING.items():
# Use a clean key for the metric name
clean_label_key = str(label).lower().replace(' ', '_').replace('πŸ‘','positive').replace('πŸ‘Ž','negative').replace('😐','neutral')
if clean_label_key == "unknown" and score_val == 0: # Skip generic unknown if it's just a fallback
if sentiment_counts.get(label, 0) == 0 and 'Unknown' not in label : continue
count = sentiment_counts.get(label, 0)
if count > 0 or label == 'Unknown': # Report if count > 0 or if it's the 'Unknown' category itself
agg_metrics[f'{clean_label_key}_mention_ratio'] = float(count / total_valid_sentiments)
agg_metrics[f'{clean_label_key}_mention_count'] = float(count)
# Mentions per day/week (if 'date' column is valid)
if 'date' in df_processed.columns and not df_processed['date'].isnull().all():
df_dated = df_processed.dropna(subset=['date']).sort_values('date')
if len(df_dated) > 1:
duration_days = (df_dated['date'].max() - df_dated['date'].min()).days
if duration_days > 0:
agg_metrics['avg_mentions_per_day'] = float(len(df_dated) / duration_days)
agg_metrics['avg_mentions_per_week'] = float(len(df_dated) / (duration_days / 7.0))
elif len(df_dated) == 1: # Single day with mentions
agg_metrics['avg_mentions_per_day'] = float(len(df_dated))
agg_metrics['avg_mentions_per_week'] = float(len(df_dated) * 7) # Extrapolate
return agg_metrics
def _extract_categorical_metrics(self, df_processed: pd.DataFrame) -> Dict[str, Any]:
"""Extracts categorical distributions for mentions."""
cat_metrics = {}
if df_processed.empty:
return cat_metrics
# Sentiment label distribution (counts and percentages)
if 'sentiment_label' in df_processed.columns and df_processed['sentiment_label'].nunique() > 0:
cat_metrics['sentiment_label_distribution_percentage'] = df_processed['sentiment_label'].value_counts(normalize=True).apply(lambda x: f"{x:.2%}").to_dict()
cat_metrics['sentiment_label_counts'] = df_processed['sentiment_label'].value_counts().to_dict()
# Example: If 'mention_source' column existed:
# if 'mention_source' in df_processed.columns:
# cat_metrics['mention_source_distribution'] = df_processed['mention_source'].value_counts(normalize=True).to_dict()
# cat_metrics['mention_source_counts'] = df_processed['mention_source'].value_counts().to_dict()
return cat_metrics
def _extract_time_periods(self, df_processed: pd.DataFrame) -> List[str]:
"""Extracts unique year-month time periods covered by the mentions data."""
if df_processed.empty or 'date' not in df_processed.columns or df_processed['date'].isnull().all():
return ["Data period not available or N/A"]
if 'year_month' in df_processed.columns: # If already created during TS extraction
periods = sorted(df_processed['year_month'].dropna().unique().tolist(), reverse=True)
elif 'date' in df_processed.columns: # Derive if not present
dates = df_processed['date'].dropna()
if not dates.empty:
periods = sorted(dates.dt.strftime('%Y-%m').unique().tolist(), reverse=True)
else: return ["N/A"]
else: return ["N/A"]
return periods[:12] # Return up to the last 12 months
def analyze_mentions_data(self, mentions_df: pd.DataFrame) -> AgentMetrics:
"""
Generates comprehensive mentions analysis.
"""
if mentions_df is None or mentions_df.empty:
logger.warning("Mentions DataFrame is empty. Returning empty metrics.")
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary="No mentions data provided for analysis.",
time_periods_covered=["N/A"]
)
# 1. Preprocess data
df_processed = self._preprocess_mentions_data(mentions_df)
if df_processed.empty and not mentions_df.empty:
logger.warning("Mentions DataFrame became empty after preprocessing.")
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary="Mentions data could not be processed.",
time_periods_covered=["N/A"]
)
elif df_processed.empty and mentions_df.empty:
return AgentMetrics(agent_name=self.AGENT_NAME, analysis_summary="No mentions data provided.")
# 2. Generate textual analysis using PandasAI
df_description_for_pandasai = "LinkedIn brand mentions data. Key columns: 'date' (date of mention), 'sentiment_label' (e.g., 'Positive πŸ‘', 'Negative πŸ‘Ž', 'Neutral 😐'), 'sentiment_score' (numeric score from -1.5 to 1.5)."
analysis_result_text = "PandasAI analysis for mentions could not be performed."
try:
pandas_ai_df = pai.DataFrame(df_processed, description=df_description_for_pandasai)
analysis_query = f"""
Analyze the provided LinkedIn brand mentions data. Focus on:
1. Monthly trends in mention volume.
2. Monthly trends in sentiment (average 'sentiment_score' and distribution of 'sentiment_label').
3. Identify any significant spikes/dips in mentions or shifts in sentiment.
Provide a concise summary of brand perception based on this data.
"""
def chat_operation():
if not pai.config.llm:
logger.warning("PandasAI LLM not configured for mentions agent. Attempting to configure.")
from utils.pandasai_setup import configure_pandasai
configure_pandasai(self.api_key, self.model_name)
if not pai.config.llm:
raise RuntimeError("PandasAI LLM could not be configured for mentions chat operation.")
logger.info(f"Executing PandasAI chat for mentions analysis with LLM: {pai.config.llm}")
return pandas_ai_df.chat(analysis_query)
analysis_result_raw = self.retry_mechanism.retry_with_backoff(
func=chat_operation, max_retries=2, base_delay=2.0, exceptions=(Exception,)
)
analysis_result_text = str(analysis_result_raw) if analysis_result_raw else "No textual analysis for mentions generated by PandasAI."
logger.info("Mentions analysis via PandasAI completed.")
except Exception as e:
logger.error(f"Mentions analysis with PandasAI failed: {e}", exc_info=True)
analysis_result_text = f"Mentions analysis using PandasAI failed. Error: {str(e)[:200]}"
# 3. Extract structured metrics
time_series_metrics = self._extract_time_series_metrics(df_processed)
aggregate_metrics = self._calculate_aggregate_metrics(df_processed)
categorical_metrics = self._extract_categorical_metrics(df_processed)
time_periods = self._extract_time_periods(df_processed)
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary=analysis_result_text[:2000],
time_series_metrics=time_series_metrics,
aggregate_metrics=aggregate_metrics,
categorical_metrics=categorical_metrics,
time_periods_covered=time_periods,
data_sources_used=[f"mentions_df (shape: {mentions_df.shape}) -> df_processed (shape: {df_processed.shape})"]
)
if __name__ == '__main__':
try:
from utils.logging_config import setup_logging
setup_logging()
logger.info("Logging setup for EnhancedMentionsAnalysisAgent test.")
except ImportError:
logging.basicConfig(level=logging.INFO)
logger.warning("Could not import setup_logging. Using basicConfig.")
MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_mentions")
MODEL_NAME = DEFAULT_AGENT_MODEL
try:
from utils.pandasai_setup import configure_pandasai
if MOCK_API_KEY != "test_api_key_mentions":
configure_pandasai(MOCK_API_KEY, MODEL_NAME)
logger.info("PandasAI configured for testing EnhancedMentionsAnalysisAgent.")
else:
logger.warning("Using mock API key for mentions. PandasAI chat will likely fail or use a mock.")
class MockPandasAIDataFrame:
def __init__(self, df, description): self.df = df; self.description = description
def chat(self, query): return f"Mock PandasAI mentions response to: {query}"
pai.DataFrame = MockPandasAIDataFrame
except ImportError:
logger.error("utils.pandasai_setup not found. PandasAI will not be configured for mentions.")
class MockPandasAIDataFrame:
def __init__(self, df, description): self.df = df; self.description = description
def chat(self, query): return f"Mock PandasAI mentions response to: {query}"
pai.DataFrame = MockPandasAIDataFrame
sample_mentions_data = {
'date': pd.to_datetime(['2023-01-05', '2023-01-15', '2023-02-02', '2023-02-20', '2023-03-10', '2023-03-12']),
'sentiment_label': ['Positive πŸ‘', 'Negative πŸ‘Ž', 'Neutral 😐', 'Positive πŸ‘', 'Positive πŸ‘', 'Unknown'],
# 'mention_content': ['Great product!', 'Service was slow.', 'Just a mention.', 'Love the new feature!', 'Highly recommend.', 'Seen this around.']
}
sample_df_mentions = pd.DataFrame(sample_mentions_data)
mentions_agent = EnhancedMentionsAnalysisAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME)
logger.info("Analyzing sample mentions data...")
mentions_metrics_result = mentions_agent.analyze_mentions_data(sample_df_mentions)
print("\n--- EnhancedMentionsAnalysisAgent Results ---")
print(f"Agent Name: {mentions_metrics_result.agent_name}")
print(f"Analysis Summary: {mentions_metrics_result.analysis_summary}")
print("\nTime Series Metrics (Mentions):")
for ts_metric in mentions_metrics_result.time_series_metrics:
print(f" - {ts_metric.metric_name}: {len(ts_metric.values)} data points, e.g., {ts_metric.values[:3]} for ts {ts_metric.timestamps[:3]} (Unit: {ts_metric.unit})")
print("\nAggregate Metrics (Mentions):")
for key, value in mentions_metrics_result.aggregate_metrics.items():
print(f" - {key}: {value}")
print("\nCategorical Metrics (Mentions):")
for key, value in mentions_metrics_result.categorical_metrics.items():
print(f" - {key}:")
if isinstance(value, dict):
for sub_key, sub_value in list(value.items())[:2]: # Print first 2 for brevity
print(f" - {sub_key}: {sub_value}")
else:
print(f" {value}")
print(f"\nTime Periods Covered (Mentions): {mentions_metrics_result.time_periods_covered}")
# Test with empty DataFrame
logger.info("\n--- Testing Mentions Agent with empty DataFrame ---")
empty_mentions_metrics = mentions_agent.analyze_mentions_data(pd.DataFrame())
print(f"Empty Mentions DF Analysis Summary: {empty_mentions_metrics.analysis_summary}")