Spaces:
Running
Running
File size: 21,524 Bytes
3332e5b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# agents/mentions_agent.py
import pandas as pd
from typing import Dict, List, Any, Optional, Mapping
import logging
import pandasai as pai # Assuming pandasai is imported as pai globally or configured
from google.adk.agents import LlmAgent # Assuming this is the correct import path
# Project-specific imports
from utils.retry_mechanism import RetryMechanism
from data_models.metrics import AgentMetrics, TimeSeriesMetric
# Configure logger for this module
logger = logging.getLogger(__name__)
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20"
class EnhancedMentionsAnalysisAgent:
"""
Enhanced mentions analysis agent with time-series metric extraction and sentiment processing.
"""
AGENT_NAME = "mentions_analyst"
AGENT_DESCRIPTION = "Expert analyst specializing in brand mention trends and sentiment patterns."
AGENT_INSTRUCTION = """
You are a specialized LinkedIn brand mentions expert focused on sentiment trends and mention patterns over time.
Your role includes:
1. MENTION TREND ANALYSIS (monthly, using 'date' column):
- Analyze mention volume trends over time.
- Identify periods with significant spikes or dips in mention activity.
2. SENTIMENT PATTERN ANALYSIS (monthly, using 'date' and 'sentiment_label'):
- Track the evolution of sentiment (e.g., positive, negative, neutral) associated with mentions.
- Calculate and analyze the average sentiment score over time (if sentiment can be quantified).
- Identify shifts in overall sentiment and potential drivers for these changes.
3. CORRELATION (Conceptual):
- Consider if mention spikes/dips or sentiment shifts correlate with any known company activities, campaigns, or external events (though this data might not be in the input DataFrame, mention the need to investigate).
4. METRIC EXTRACTION (for AgentMetrics):
- Extract time-series data for monthly mention volume.
- Extract time-series data for monthly sentiment distribution (e.g., count of positive/negative/neutral mentions) and average sentiment score.
- Provide aggregate metrics like total mentions, overall sentiment distribution, and average sentiment score for the period.
- Include categorical metrics like the distribution of sentiment labels.
Focus on identifying actionable insights from mention data. How is the brand being perceived? Are there emerging reputational risks or opportunities?
Use the provided DataFrame columns: 'date' (for mentions), 'sentiment_label' (e.g., 'Positive π', 'Negative π', 'Neutral π'), and potentially 'mention_source' or 'mention_content' if available and relevant for deeper analysis (though focus on 'date' and 'sentiment_label' for core metrics).
"""
# Standardized sentiment mapping (can be expanded)
# This mapping is crucial for converting labels to scores.
SENTIMENT_MAPPING = {
'Positive π': 1,
'Positive': 1, # Adding common variations
'Very Positive': 1.5, # Example for more granular sentiment
'Negative π': -1,
'Negative': -1,
'Very Negative': -1.5,
'Neutral π': 0,
'Neutral': 0,
'Mixed': 0, # Or handle mixed sentiment differently
'Unknown': 0 # Default score for unmapped or unknown sentiments
}
def __init__(self, api_key: str, model_name: Optional[str] = None):
self.api_key = api_key
self.model_name = model_name or DEFAULT_AGENT_MODEL
self.agent = LlmAgent(
name=self.AGENT_NAME,
model=self.model_name,
description=self.AGENT_DESCRIPTION,
instruction=self.AGENT_INSTRUCTION
)
self.retry_mechanism = RetryMechanism()
logger.info(f"{self.AGENT_NAME} initialized with model {self.model_name}.")
def _get_sentiment_score(self, sentiment_label: Optional[str]) -> float:
"""Maps a sentiment label to a numerical score using SENTIMENT_MAPPING."""
if sentiment_label is None:
return self.SENTIMENT_MAPPING.get('Unknown', 0)
# Attempt to match known labels, case-insensitively for robustness if needed,
# but exact match is safer with the current emoji-inclusive keys.
return float(self.SENTIMENT_MAPPING.get(str(sentiment_label).strip(), self.SENTIMENT_MAPPING.get('Unknown',0)))
def _preprocess_mentions_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""Cleans and prepares mentions data for analysis."""
if df is None or df.empty:
return pd.DataFrame()
df_processed = df.copy()
# Convert 'date' to datetime
if 'date' in df_processed.columns:
df_processed['date'] = pd.to_datetime(df_processed['date'], errors='coerce')
# df_processed.dropna(subset=['date'], inplace=True) # Keep for other metrics even if date is NaT
else:
logger.warning("'date' column not found in mentions data. Time-series analysis will be limited.")
# df_processed['date'] = pd.NaT # Add placeholder if critical
# Process 'sentiment_label' and create 'sentiment_score'
if 'sentiment_label' in df_processed.columns:
df_processed['sentiment_label'] = df_processed['sentiment_label'].astype(str).fillna('Unknown')
df_processed['sentiment_score'] = df_processed['sentiment_label'].apply(self._get_sentiment_score)
else:
logger.info("'sentiment_label' column not found. Sentiment analysis will be limited.")
df_processed['sentiment_label'] = 'Unknown'
df_processed['sentiment_score'] = self._get_sentiment_score('Unknown')
return df_processed
def _extract_time_series_metrics(self, df_processed: pd.DataFrame) -> List[TimeSeriesMetric]:
"""Extracts monthly time-series metrics from processed mentions data."""
ts_metrics = []
if df_processed.empty or 'date' not in df_processed.columns or df_processed['date'].isnull().all():
logger.info("Cannot extract time-series metrics for mentions: 'date' is missing or all null.")
return ts_metrics
df_ts = df_processed.dropna(subset=['date']).copy()
if df_ts.empty:
logger.info("No valid 'date' values for mentions time-series metrics after filtering NaT.")
return ts_metrics
df_ts['year_month'] = df_ts['date'].dt.strftime('%Y-%m')
# Monthly mention volume
monthly_volume = df_ts.groupby('year_month').size().reset_index(name='mention_count')
if not monthly_volume.empty:
ts_metrics.append(TimeSeriesMetric(
metric_name="monthly_mention_volume",
values=monthly_volume['mention_count'].tolist(),
timestamps=monthly_volume['year_month'].tolist(),
metric_type="time_series",
time_granularity="monthly",
unit="count"
))
# Monthly average sentiment score
if 'sentiment_score' in df_ts.columns:
monthly_avg_sentiment = df_ts.groupby('year_month')['sentiment_score'].mean().reset_index()
if not monthly_avg_sentiment.empty:
ts_metrics.append(TimeSeriesMetric(
metric_name="avg_monthly_sentiment_score",
values=monthly_avg_sentiment['sentiment_score'].tolist(),
timestamps=monthly_avg_sentiment['year_month'].tolist(),
metric_type="time_series",
time_granularity="monthly",
unit="score" # Score range depends on SENTIMENT_MAPPING
))
# Monthly distribution of sentiment labels
if 'sentiment_label' in df_ts.columns and df_ts['sentiment_label'].nunique() > 1:
# Ensure 'sentiment_label' is not all 'Unknown'
if not (df_ts['sentiment_label'] == 'Unknown').all():
sentiment_counts_by_month = df_ts.groupby(['year_month', 'sentiment_label']).size().unstack(fill_value=0)
for sentiment_val in sentiment_counts_by_month.columns:
if sentiment_val == 'Unknown' and (sentiment_counts_by_month[sentiment_val] == 0).all():
continue
ts_metrics.append(TimeSeriesMetric(
metric_name=f"monthly_mention_count_sentiment_{str(sentiment_val).lower().replace(' ', '_').replace('π','positive').replace('π','negative').replace('π','neutral')}",
values=sentiment_counts_by_month[sentiment_val].tolist(),
timestamps=sentiment_counts_by_month.index.tolist(), # year_month is index
metric_type="time_series",
time_granularity="monthly",
unit="count"
))
else:
logger.info("Sentiment label data is all 'Unknown', skipping sentiment distribution time series.")
return ts_metrics
def _calculate_aggregate_metrics(self, df_processed: pd.DataFrame) -> Dict[str, float]:
"""Calculates aggregate metrics for mentions."""
agg_metrics = {}
if df_processed.empty:
return agg_metrics
agg_metrics['total_mentions_analyzed'] = float(len(df_processed))
if 'sentiment_score' in df_processed.columns and not df_processed['sentiment_score'].empty:
agg_metrics['overall_avg_sentiment_score'] = float(df_processed['sentiment_score'].mean())
if 'sentiment_label' in df_processed.columns:
total_valid_sentiments = len(df_processed.dropna(subset=['sentiment_label'])) # Count non-NaN labels
if total_valid_sentiments > 0:
# Iterate through our defined sentiment mapping to count occurrences
sentiment_counts = df_processed['sentiment_label'].value_counts()
for label, score_val in self.SENTIMENT_MAPPING.items():
# Use a clean key for the metric name
clean_label_key = str(label).lower().replace(' ', '_').replace('π','positive').replace('π','negative').replace('π','neutral')
if clean_label_key == "unknown" and score_val == 0: # Skip generic unknown if it's just a fallback
if sentiment_counts.get(label, 0) == 0 and 'Unknown' not in label : continue
count = sentiment_counts.get(label, 0)
if count > 0 or label == 'Unknown': # Report if count > 0 or if it's the 'Unknown' category itself
agg_metrics[f'{clean_label_key}_mention_ratio'] = float(count / total_valid_sentiments)
agg_metrics[f'{clean_label_key}_mention_count'] = float(count)
# Mentions per day/week (if 'date' column is valid)
if 'date' in df_processed.columns and not df_processed['date'].isnull().all():
df_dated = df_processed.dropna(subset=['date']).sort_values('date')
if len(df_dated) > 1:
duration_days = (df_dated['date'].max() - df_dated['date'].min()).days
if duration_days > 0:
agg_metrics['avg_mentions_per_day'] = float(len(df_dated) / duration_days)
agg_metrics['avg_mentions_per_week'] = float(len(df_dated) / (duration_days / 7.0))
elif len(df_dated) == 1: # Single day with mentions
agg_metrics['avg_mentions_per_day'] = float(len(df_dated))
agg_metrics['avg_mentions_per_week'] = float(len(df_dated) * 7) # Extrapolate
return agg_metrics
def _extract_categorical_metrics(self, df_processed: pd.DataFrame) -> Dict[str, Any]:
"""Extracts categorical distributions for mentions."""
cat_metrics = {}
if df_processed.empty:
return cat_metrics
# Sentiment label distribution (counts and percentages)
if 'sentiment_label' in df_processed.columns and df_processed['sentiment_label'].nunique() > 0:
cat_metrics['sentiment_label_distribution_percentage'] = df_processed['sentiment_label'].value_counts(normalize=True).apply(lambda x: f"{x:.2%}").to_dict()
cat_metrics['sentiment_label_counts'] = df_processed['sentiment_label'].value_counts().to_dict()
# Example: If 'mention_source' column existed:
# if 'mention_source' in df_processed.columns:
# cat_metrics['mention_source_distribution'] = df_processed['mention_source'].value_counts(normalize=True).to_dict()
# cat_metrics['mention_source_counts'] = df_processed['mention_source'].value_counts().to_dict()
return cat_metrics
def _extract_time_periods(self, df_processed: pd.DataFrame) -> List[str]:
"""Extracts unique year-month time periods covered by the mentions data."""
if df_processed.empty or 'date' not in df_processed.columns or df_processed['date'].isnull().all():
return ["Data period not available or N/A"]
if 'year_month' in df_processed.columns: # If already created during TS extraction
periods = sorted(df_processed['year_month'].dropna().unique().tolist(), reverse=True)
elif 'date' in df_processed.columns: # Derive if not present
dates = df_processed['date'].dropna()
if not dates.empty:
periods = sorted(dates.dt.strftime('%Y-%m').unique().tolist(), reverse=True)
else: return ["N/A"]
else: return ["N/A"]
return periods[:12] # Return up to the last 12 months
def analyze_mentions_data(self, mentions_df: pd.DataFrame) -> AgentMetrics:
"""
Generates comprehensive mentions analysis.
"""
if mentions_df is None or mentions_df.empty:
logger.warning("Mentions DataFrame is empty. Returning empty metrics.")
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary="No mentions data provided for analysis.",
time_periods_covered=["N/A"]
)
# 1. Preprocess data
df_processed = self._preprocess_mentions_data(mentions_df)
if df_processed.empty and not mentions_df.empty:
logger.warning("Mentions DataFrame became empty after preprocessing.")
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary="Mentions data could not be processed.",
time_periods_covered=["N/A"]
)
elif df_processed.empty and mentions_df.empty:
return AgentMetrics(agent_name=self.AGENT_NAME, analysis_summary="No mentions data provided.")
# 2. Generate textual analysis using PandasAI
df_description_for_pandasai = "LinkedIn brand mentions data. Key columns: 'date' (date of mention), 'sentiment_label' (e.g., 'Positive π', 'Negative π', 'Neutral π'), 'sentiment_score' (numeric score from -1.5 to 1.5)."
analysis_result_text = "PandasAI analysis for mentions could not be performed."
try:
pandas_ai_df = pai.DataFrame(df_processed, description=df_description_for_pandasai)
analysis_query = f"""
Analyze the provided LinkedIn brand mentions data. Focus on:
1. Monthly trends in mention volume.
2. Monthly trends in sentiment (average 'sentiment_score' and distribution of 'sentiment_label').
3. Identify any significant spikes/dips in mentions or shifts in sentiment.
Provide a concise summary of brand perception based on this data.
"""
def chat_operation():
if not pai.config.llm:
logger.warning("PandasAI LLM not configured for mentions agent. Attempting to configure.")
from utils.pandasai_setup import configure_pandasai
configure_pandasai(self.api_key, self.model_name)
if not pai.config.llm:
raise RuntimeError("PandasAI LLM could not be configured for mentions chat operation.")
logger.info(f"Executing PandasAI chat for mentions analysis with LLM: {pai.config.llm}")
return pandas_ai_df.chat(analysis_query)
analysis_result_raw = self.retry_mechanism.retry_with_backoff(
func=chat_operation, max_retries=2, base_delay=2.0, exceptions=(Exception,)
)
analysis_result_text = str(analysis_result_raw) if analysis_result_raw else "No textual analysis for mentions generated by PandasAI."
logger.info("Mentions analysis via PandasAI completed.")
except Exception as e:
logger.error(f"Mentions analysis with PandasAI failed: {e}", exc_info=True)
analysis_result_text = f"Mentions analysis using PandasAI failed. Error: {str(e)[:200]}"
# 3. Extract structured metrics
time_series_metrics = self._extract_time_series_metrics(df_processed)
aggregate_metrics = self._calculate_aggregate_metrics(df_processed)
categorical_metrics = self._extract_categorical_metrics(df_processed)
time_periods = self._extract_time_periods(df_processed)
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary=analysis_result_text[:2000],
time_series_metrics=time_series_metrics,
aggregate_metrics=aggregate_metrics,
categorical_metrics=categorical_metrics,
time_periods_covered=time_periods,
data_sources_used=[f"mentions_df (shape: {mentions_df.shape}) -> df_processed (shape: {df_processed.shape})"]
)
if __name__ == '__main__':
try:
from utils.logging_config import setup_logging
setup_logging()
logger.info("Logging setup for EnhancedMentionsAnalysisAgent test.")
except ImportError:
logging.basicConfig(level=logging.INFO)
logger.warning("Could not import setup_logging. Using basicConfig.")
MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_mentions")
MODEL_NAME = DEFAULT_AGENT_MODEL
try:
from utils.pandasai_setup import configure_pandasai
if MOCK_API_KEY != "test_api_key_mentions":
configure_pandasai(MOCK_API_KEY, MODEL_NAME)
logger.info("PandasAI configured for testing EnhancedMentionsAnalysisAgent.")
else:
logger.warning("Using mock API key for mentions. PandasAI chat will likely fail or use a mock.")
class MockPandasAIDataFrame:
def __init__(self, df, description): self.df = df; self.description = description
def chat(self, query): return f"Mock PandasAI mentions response to: {query}"
pai.DataFrame = MockPandasAIDataFrame
except ImportError:
logger.error("utils.pandasai_setup not found. PandasAI will not be configured for mentions.")
class MockPandasAIDataFrame:
def __init__(self, df, description): self.df = df; self.description = description
def chat(self, query): return f"Mock PandasAI mentions response to: {query}"
pai.DataFrame = MockPandasAIDataFrame
sample_mentions_data = {
'date': pd.to_datetime(['2023-01-05', '2023-01-15', '2023-02-02', '2023-02-20', '2023-03-10', '2023-03-12']),
'sentiment_label': ['Positive π', 'Negative π', 'Neutral π', 'Positive π', 'Positive π', 'Unknown'],
# 'mention_content': ['Great product!', 'Service was slow.', 'Just a mention.', 'Love the new feature!', 'Highly recommend.', 'Seen this around.']
}
sample_df_mentions = pd.DataFrame(sample_mentions_data)
mentions_agent = EnhancedMentionsAnalysisAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME)
logger.info("Analyzing sample mentions data...")
mentions_metrics_result = mentions_agent.analyze_mentions_data(sample_df_mentions)
print("\n--- EnhancedMentionsAnalysisAgent Results ---")
print(f"Agent Name: {mentions_metrics_result.agent_name}")
print(f"Analysis Summary: {mentions_metrics_result.analysis_summary}")
print("\nTime Series Metrics (Mentions):")
for ts_metric in mentions_metrics_result.time_series_metrics:
print(f" - {ts_metric.metric_name}: {len(ts_metric.values)} data points, e.g., {ts_metric.values[:3]} for ts {ts_metric.timestamps[:3]} (Unit: {ts_metric.unit})")
print("\nAggregate Metrics (Mentions):")
for key, value in mentions_metrics_result.aggregate_metrics.items():
print(f" - {key}: {value}")
print("\nCategorical Metrics (Mentions):")
for key, value in mentions_metrics_result.categorical_metrics.items():
print(f" - {key}:")
if isinstance(value, dict):
for sub_key, sub_value in list(value.items())[:2]: # Print first 2 for brevity
print(f" - {sub_key}: {sub_value}")
else:
print(f" {value}")
print(f"\nTime Periods Covered (Mentions): {mentions_metrics_result.time_periods_covered}")
# Test with empty DataFrame
logger.info("\n--- Testing Mentions Agent with empty DataFrame ---")
empty_mentions_metrics = mentions_agent.analyze_mentions_data(pd.DataFrame())
print(f"Empty Mentions DF Analysis Summary: {empty_mentions_metrics.analysis_summary}")
|