Spaces:
Running
Running
File size: 26,727 Bytes
9981a7c 62d9a4c 9981a7c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 |
# agents/follower_agent.py
import pandas as pd
from typing import Dict, List, Any, Optional
import logging
import pandasai as pai # Assuming pandasai is imported as pai globally or configured
from google.adk.agents import LlmAgent # Assuming this is the correct import path
# Project-specific imports
from utils.retry_mechanism import RetryMechanism
from data_models.metrics import AgentMetrics, TimeSeriesMetric
# Configure logger for this module
logger = logging.getLogger(__name__)
# Define the model globally or pass it as a parameter. For now, using a constant.
# Consider moving this to a shared config or environment variable.
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" # Or your specific model like "gemini-1.5-flash-preview-05-20"
class EnhancedFollowerAnalysisAgent:
"""
Enhanced follower analysis agent with proper handling of different follower count types
and structured metric extraction.
"""
AGENT_NAME = "follower_analyst"
AGENT_DESCRIPTION = "Expert analyst specializing in follower growth patterns and demographic analysis."
AGENT_INSTRUCTION = """
You are a specialized LinkedIn follower analytics expert focused on temporal patterns and demographic trends.
Your role includes:
1. FOLLOWER TREND ANALYSIS:
- Analyze follower growth trends over time (monthly data from 'follower_gains_monthly' type).
- Identify growth acceleration/deceleration periods.
- Calculate growth rates and velocity changes.
- Detect seasonal patterns and anomalies.
- Analyze organic vs paid follower counts over time.
2. DEMOGRAPHIC ANALYSIS (based on 'follower_industry', 'follower_seniority', etc.):
- Analyze follower distribution by industry, seniority, function, and geography.
- Compare organic vs paid followers across these demographic segments.
- Identify high-value audience segments based on counts and potential engagement.
3. TIME-BASED INSIGHTS:
- Provide month-over-month comparisons for growth data.
- Identify critical inflection points in follower growth.
- Calculate trend momentum and acceleration.
4. METRIC EXTRACTION (for the AgentMetrics structure):
- Extract time-series data for total, organic, and paid follower counts, and growth rates.
- Provide aggregate metrics like average monthly growth, total organic/paid followers.
- Provide demographic breakdowns as categorical metrics (e.g., top N industries by follower count).
Focus on separating temporal analysis (monthly) from demographic analysis.
When analyzing demographics, consider the top N segments (e.g., top 10 industries) for conciseness.
Ensure your analysis summary is comprehensive and insightful.
"""
def __init__(self, api_key: str, model_name: Optional[str] = None):
"""
Initializes the Follower Analysis Agent.
Args:
api_key: API key for LLM and potentially PandasAI.
model_name: Name of the language model to use. Defaults to DEFAULT_AGENT_MODEL.
"""
self.api_key = api_key # May be used if PandasAI is configured per agent or for other API calls
self.model_name = model_name or DEFAULT_AGENT_MODEL
self.agent = LlmAgent(
name=self.AGENT_NAME,
model=self.model_name,
description=self.AGENT_DESCRIPTION,
instruction=self.AGENT_INSTRUCTION
)
self.retry_mechanism = RetryMechanism()
logger.info(f"{self.AGENT_NAME} initialized with model {self.model_name}.")
def _separate_follower_data_by_type(self, df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
"""Separate follower data by follower_count_type and process appropriately."""
separated_data = {}
if df is None or df.empty or 'follower_count_type' not in df.columns:
logger.warning("Input DataFrame is empty or 'follower_count_type' column is missing.")
return separated_data
# Define the expected follower count types
# These should match the 'follower_count_type' values in your Bubble data
follower_types = [
'follower_gains_monthly', # For time-series analysis
'follower_industry', # For demographic analysis
'follower_seniority',
'follower_function',
'follower_geo'
]
for ftype in follower_types:
type_data = df[df['follower_count_type'] == ftype].copy()
if not type_data.empty:
if ftype == 'follower_gains_monthly':
type_data = self._process_monthly_data(type_data)
else: # Demographic data
type_data = self._get_top_demographic_segments(type_data, top_n=10)
separated_data[ftype] = type_data
else:
logger.info(f"No data found for follower_count_type: {ftype}")
return separated_data
def _get_top_demographic_segments(self, demo_df: pd.DataFrame, top_n: int = 10) -> pd.DataFrame:
"""Get top N demographic segments by total follower count (organic + paid)."""
if demo_df.empty:
return demo_df
# Ensure required columns exist and are numeric, fill NaNs with 0 for sum
demo_df = demo_df.copy() # Work on a copy
demo_df['follower_count_organic'] = pd.to_numeric(demo_df.get('follower_count_organic'), errors='coerce').fillna(0)
demo_df['follower_count_paid'] = pd.to_numeric(demo_df.get('follower_count_paid'), errors='coerce').fillna(0)
demo_df['total_followers'] = demo_df['follower_count_organic'] + demo_df['follower_count_paid']
# Sort by total followers and take top N
# 'category_name' usually holds the demographic label (e.g., industry name)
if 'category_name' not in demo_df.columns:
logger.warning("'_get_top_demographic_segments' expects 'category_name' column for grouping.")
return demo_df.drop(columns=['total_followers'], errors='ignore')
# Group by category_name if there are multiple entries for the same category, sum followers
# This step might be redundant if data is already aggregated per category_name
# demo_df_grouped = demo_df.groupby('category_name').agg(
# follower_count_organic=('follower_count_organic', 'sum'),
# follower_count_paid=('follower_count_paid', 'sum'),
# total_followers=('total_followers', 'sum')
# ).reset_index()
top_segments = demo_df.nlargest(top_n, 'total_followers')
return top_segments.drop(columns=['total_followers'], errors='ignore')
def _process_monthly_data(self, monthly_df: pd.DataFrame) -> pd.DataFrame:
"""Process monthly follower data: parse dates, sort."""
if monthly_df.empty or 'category_name' not in monthly_df.columns:
logger.warning("Monthly data DataFrame is empty or 'category_name' column is missing.")
return monthly_df
df_processed = monthly_df.copy()
# 'category_name' for monthly data is expected to be a date string like 'YYYY-MM-DD'
# Attempt to convert 'category_name' to datetime
df_processed['date_for_analysis'] = pd.to_datetime(df_processed['category_name'], errors='coerce')
# Drop rows where date conversion failed
df_processed.dropna(subset=['date_for_analysis'], inplace=True)
if df_processed.empty:
logger.warning("No valid dates found in 'category_name' for monthly data after processing.")
return df_processed
df_processed['year_month'] = df_processed['date_for_analysis'].dt.strftime('%Y-%m')
df_processed['month_name'] = df_processed['date_for_analysis'].dt.strftime('%B %Y')
# Ensure numeric types for follower counts
for col in ['follower_count_organic', 'follower_count_paid']:
if col in df_processed.columns:
df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce').fillna(0)
else: # Add column with zeros if missing, to prevent errors in later calculations
df_processed[col] = 0
return df_processed.sort_values('date_for_analysis')
def _extract_time_series_metrics(self, monthly_df: pd.DataFrame) -> List[TimeSeriesMetric]:
"""Extract time-series metrics from processed monthly follower data."""
ts_metrics = []
if monthly_df.empty or 'date_for_analysis' not in monthly_df.columns:
logger.info("Cannot extract time-series metrics: monthly DataFrame is empty or lacks 'date_for_analysis'.")
return ts_metrics
# Ensure data is sorted by date for correct growth rate calculation
monthly_df_sorted = monthly_df.sort_values('date_for_analysis').copy()
timestamps = monthly_df_sorted['year_month'].tolist()
# Calculate total followers
monthly_df_sorted['total_followers'] = monthly_df_sorted.get('follower_count_organic', 0) + \
monthly_df_sorted.get('follower_count_paid', 0)
metric_definitions = {
"total_follower_count": monthly_df_sorted['total_followers'],
"organic_follower_count": monthly_df_sorted.get('follower_count_organic', pd.Series(0, index=monthly_df_sorted.index)),
"paid_follower_count": monthly_df_sorted.get('follower_count_paid', pd.Series(0, index=monthly_df_sorted.index))
}
for name, values_series in metric_definitions.items():
ts_metrics.append(TimeSeriesMetric(
metric_name=name,
values=values_series.tolist(),
timestamps=timestamps,
metric_type="time_series",
time_granularity="monthly"
))
# Calculate growth rate for total followers
if len(monthly_df_sorted) > 1:
# pct_change gives NaN for the first element, fill with 0
growth_rates = monthly_df_sorted['total_followers'].pct_change().fillna(0).tolist()
ts_metrics.append(TimeSeriesMetric(
metric_name="total_follower_growth_rate",
values=growth_rates,
timestamps=timestamps, # Timestamps align, first growth rate is vs non-existent previous point (so 0)
metric_type="time_series",
time_granularity="monthly",
unit="%"
))
else:
logger.info("Not enough data points (<=1) to calculate growth rate.")
return ts_metrics
def _calculate_aggregate_metrics(self, separated_data: Dict[str, pd.DataFrame]) -> Dict[str, float]:
"""Calculate aggregate metrics from all follower data."""
agg_metrics = {}
monthly_df = separated_data.get('follower_gains_monthly')
if monthly_df is not None and not monthly_df.empty:
total_organic = monthly_df['follower_count_organic'].sum()
total_paid = monthly_df['follower_count_paid'].sum()
total_all_followers = total_organic + total_paid
agg_metrics['total_organic_followers_gained_period'] = float(total_organic)
agg_metrics['total_paid_followers_gained_period'] = float(total_paid)
agg_metrics['overall_total_followers_gained_period'] = float(total_all_followers)
if total_all_followers > 0:
agg_metrics['overall_organic_follower_ratio_gained'] = float(total_organic / total_all_followers)
agg_metrics['overall_paid_follower_ratio_gained'] = float(total_paid / total_all_followers)
# Average monthly gain (if 'total_followers' represents gain, not cumulative)
# Assuming 'follower_count_organic/paid' in 'follower_gains_monthly' are indeed GAINS for that month
monthly_df['monthly_total_gain'] = monthly_df['follower_count_organic'] + monthly_df['follower_count_paid']
if not monthly_df['monthly_total_gain'].empty:
agg_metrics['avg_monthly_follower_gain'] = float(monthly_df['monthly_total_gain'].mean())
agg_metrics['max_monthly_follower_gain'] = float(monthly_df['monthly_total_gain'].max())
agg_metrics['min_monthly_follower_gain'] = float(monthly_df['monthly_total_gain'].min())
# Count of distinct demographic segments identified (top N for each)
for demo_type in ['follower_industry', 'follower_seniority', 'follower_function', 'follower_geo']:
if demo_type in separated_data and not separated_data[demo_type].empty:
agg_metrics[f'distinct_{demo_type}_segments_analyzed'] = float(len(separated_data[demo_type]))
return agg_metrics
def _extract_demographic_metrics(self, separated_data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""Extract demographic distributions (categorical metrics)."""
cat_metrics = {}
demographic_types_map = {
'follower_industry': 'industry_distribution',
'follower_seniority': 'seniority_distribution',
'follower_function': 'function_distribution',
'follower_geo': 'geographic_distribution'
}
for demo_type_key, metric_name_prefix in demographic_types_map.items():
demo_df = separated_data.get(demo_type_key)
if demo_df is not None and not demo_df.empty and 'category_name' in demo_df.columns:
distribution = {}
for _, row in demo_df.iterrows():
category = row['category_name']
organic = float(row.get('follower_count_organic', 0))
paid = float(row.get('follower_count_paid', 0))
total = organic + paid
distribution[category] = {
'total_followers': total,
'organic_followers': organic,
'paid_followers': paid,
'organic_ratio': organic / total if total > 0 else 0.0
}
# Sort by total followers descending for the distribution
sorted_distribution = dict(sorted(distribution.items(), key=lambda item: item[1]['total_followers'], reverse=True))
cat_metrics[metric_name_prefix] = sorted_distribution
# Summary for this demographic type
total_followers_in_type = sum(item['total_followers'] for item in distribution.values())
cat_metrics[f'{metric_name_prefix}_summary'] = {
'total_followers_in_top_segments': total_followers_in_type,
'number_of_segments_reported': len(distribution),
'top_segment': list(sorted_distribution.keys())[0] if sorted_distribution else "N/A"
}
return cat_metrics
def _extract_time_periods(self, monthly_df: Optional[pd.DataFrame]) -> List[str]:
"""Extract unique year-month time periods covered by the monthly data."""
if monthly_df is None or monthly_df.empty or 'year_month' not in monthly_df.columns:
return ["Data period not available or N/A"]
periods = sorted(monthly_df['year_month'].dropna().unique().tolist(), reverse=True)
return periods[:12] # Return up to the last 12 months if available
def analyze_follower_data(self, follower_stats_df: pd.DataFrame) -> AgentMetrics:
"""
Generate comprehensive follower analysis using PandasAI and structured metric extraction.
"""
if follower_stats_df is None or follower_stats_df.empty:
logger.warning("Follower statistics DataFrame is empty. Returning empty metrics.")
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary="No follower data provided for analysis.",
time_periods_covered=["N/A"]
)
# 1. Pre-process and separate data
separated_data = self._separate_follower_data_by_type(follower_stats_df)
# Prepare a combined DataFrame for PandasAI if needed, or use the original one.
# For PandasAI, it's often better to provide a clean, understandable DataFrame.
# Let's use the original df for the textual analysis by PandasAI,
# as it contains all types and the LLM can be instructed to differentiate.
# Ensure PandasAI is configured (this should ideally be done once at orchestrator level)
# from utils.pandasai_setup import configure_pandasai
# configure_pandasai(self.api_key, self.model_name) # Or pass LLM object if configured outside
df_description = "LinkedIn follower statistics. Contains 'follower_count_type' indicating data category (e.g., 'follower_gains_monthly', 'follower_industry'), 'category_name' (e.g., date for monthly, industry name for industry type), 'follower_count_organic', 'follower_count_paid'."
# Create PandasAI DataFrame
# Check if pai.DataFrame is the correct way to initialize based on your pandasai version
try:
pandas_ai_df = pai.DataFrame(follower_stats_df, description=df_description)
except Exception as e:
logger.error(f"Failed to create PandasAI DataFrame: {e}", exc_info=True)
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary=f"Error initializing PandasAI: {e}",
time_periods_covered=self._extract_time_periods(separated_data.get('follower_gains_monthly'))
)
# 2. Generate textual analysis using PandasAI via LlmAgent
# The LlmAgent itself doesn't directly use PandasAI's .chat() method.
# The instruction for LlmAgent should guide it to perform analysis.
# If direct PandasAI chat is needed, it's a separate call.
# The original code uses pandas_df.chat(analysis_query). This implies PandasAI is used directly.
# Let's stick to the direct PandasAI chat call as in the original structure.
analysis_query = f"""
Analyze the provided LinkedIn follower statistics. The DataFrame contains various 'follower_count_type' values.
Focus on:
1. For 'follower_gains_monthly': Analyze monthly follower growth trends (total, organic, paid). Identify key periods of growth or decline.
2. For demographic types (industry, seniority, function, geo): Describe the distribution of followers. Which are the top segments? How do organic vs paid compare?
3. Synthesize these findings into an overall summary of follower dynamics.
Consider the data structure: 'category_name' holds the date for monthly data or the demographic label.
'follower_count_organic' and 'follower_count_paid' are the key metrics.
"""
analysis_result_text = "PandasAI analysis could not be performed." # Default
try:
def chat_operation():
# Ensure the LLM for PandasAI is correctly configured before this call
# This might involve re-calling configure_pandasai if it's not persistent
# or if the LLM object needs to be explicitly passed to PandasAI DataFrame.
if not pai.config.llm: # Check if LLM is set for PandasAI
logger.warning("PandasAI LLM not configured. Attempting to configure now.")
# This assumes configure_pandasai is available and sets pai.config.llm
from utils.pandasai_setup import configure_pandasai
configure_pandasai(self.api_key, self.model_name)
if not pai.config.llm:
raise RuntimeError("PandasAI LLM could not be configured for chat operation.")
logger.info(f"Executing PandasAI chat for follower analysis with LLM: {pai.config.llm}")
return pandas_ai_df.chat(analysis_query)
analysis_result_raw = self.retry_mechanism.retry_with_backoff(
func=chat_operation,
max_retries=2, # Adjusted retries
base_delay=2.0,
exceptions=(Exception,) # Catch broader exceptions for PandasAI calls
)
analysis_result_text = str(analysis_result_raw) if analysis_result_raw else "No textual analysis generated by PandasAI."
logger.info("Follower analysis via PandasAI completed.")
except Exception as e:
logger.error(f"Follower analysis with PandasAI failed after retries: {e}", exc_info=True)
analysis_result_text = f"Follower analysis using PandasAI failed. Error: {str(e)[:200]}"
# 3. Extract structured metrics using the separated and processed data
monthly_data_for_metrics = separated_data.get('follower_gains_monthly', pd.DataFrame())
time_series_metrics = self._extract_time_series_metrics(monthly_data_for_metrics)
aggregate_metrics = self._calculate_aggregate_metrics(separated_data) # Uses all separated types
categorical_metrics = self._extract_demographic_metrics(separated_data) # Uses demographic types
time_periods = self._extract_time_periods(monthly_data_for_metrics)
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary=analysis_result_text[:2000], # Truncate if too long
time_series_metrics=time_series_metrics,
aggregate_metrics=aggregate_metrics,
categorical_metrics=categorical_metrics,
time_periods_covered=time_periods,
data_sources_used=[f"follower_stats_df (shape: {follower_stats_df.shape})"]
)
if __name__ == '__main__':
# This is for example and testing purposes.
# Ensure logging and other necessary setups are done.
try:
from utils.logging_config import setup_logging
setup_logging()
logger.info("Logging setup for EnhancedFollowerAnalysisAgent test.")
except ImportError:
logging.basicConfig(level=logging.INFO)
logger.warning("Could not import setup_logging. Using basicConfig.")
# Mock API Key and Model for testing
# IMPORTANT: For PandasAI to run, a valid API key and model setup are needed.
# This example might not fully execute PandasAI chat without proper environment setup.
MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_followers")
MODEL_NAME = DEFAULT_AGENT_MODEL
# Configure PandasAI (essential for the .chat() part)
try:
from utils.pandasai_setup import configure_pandasai
if MOCK_API_KEY != "test_api_key_followers": # Only configure if a real key might be present
configure_pandasai(MOCK_API_KEY, MODEL_NAME)
logger.info("PandasAI configured for testing EnhancedFollowerAnalysisAgent.")
else:
logger.warning("Using mock API key. PandasAI chat will likely fail or use a default/mock LLM if available.")
# Mock pai.DataFrame if pandasai is not fully set up to avoid errors
class MockPandasAIDataFrame:
def __init__(self, df, description): self.df = df; self.description = description
def chat(self, query): return f"Mock PandasAI response to: {query}"
pai.DataFrame = MockPandasAIDataFrame
except ImportError:
logger.error("utils.pandasai_setup not found. PandasAI will not be configured.")
class MockPandasAIDataFrame:
def __init__(self, df, description): self.df = df; self.description = description
def chat(self, query): return f"Mock PandasAI response to: {query}"
pai.DataFrame = MockPandasAIDataFrame
# Sample Data
sample_follower_data = {
'follower_count_type': [
'follower_gains_monthly', 'follower_gains_monthly', 'follower_gains_monthly',
'follower_industry', 'follower_industry', 'follower_industry', 'follower_industry',
'follower_seniority', 'follower_seniority'
],
'category_name': [ # Dates for monthly, names for demographics
'2023-01-01', '2023-02-01', '2023-03-01',
'Technology', 'Finance', 'Healthcare', 'Retail',
'Senior', 'Entry-Level'
],
'follower_count_organic': [
100, 120, 110, # Monthly gains
500, 300, 200, 150, # Industry organic
600, 400 # Seniority organic
],
'follower_count_paid': [
10, 15, 12, # Monthly gains
50, 30, 20, 10, # Industry paid
60, 40 # Seniority paid
]
}
sample_df = pd.DataFrame(sample_follower_data)
# Initialize agent
follower_agent = EnhancedFollowerAnalysisAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME)
logger.info("Analyzing sample follower data...")
metrics_result = follower_agent.analyze_follower_data(sample_df)
print("\n--- EnhancedFollowerAnalysisAgent Results ---")
print(f"Agent Name: {metrics_result.agent_name}")
print(f"Analysis Summary: {metrics_result.analysis_summary}")
print("\nTime Series Metrics:")
for ts_metric in metrics_result.time_series_metrics:
print(f" - {ts_metric.metric_name}: {len(ts_metric.values)} data points, e.g., {ts_metric.values[:3]} for ts {ts_metric.timestamps[:3]}")
print("\nAggregate Metrics:")
for key, value in metrics_result.aggregate_metrics.items():
print(f" - {key}: {value}")
print("\nCategorical Metrics:")
for key, value in metrics_result.categorical_metrics.items():
print(f" - {key}: (details below)")
if isinstance(value, dict):
for sub_key, sub_value in list(value.items())[:2]: # Print first 2 items for brevity
print(f" - {sub_key}: {sub_value}")
else:
print(f" {value}")
print(f"\nTime Periods Covered: {metrics_result.time_periods_covered}")
print(f"Data Sources Used: {metrics_result.data_sources_used}")
print(f"Generated Timestamp: {metrics_result.generation_timestamp}")
# Test with empty DataFrame
logger.info("\n--- Testing with empty DataFrame ---")
empty_metrics_result = follower_agent.analyze_follower_data(pd.DataFrame())
print(f"Empty DF Analysis Summary: {empty_metrics_result.analysis_summary}")
|