Spaces:
Running
Running
# agents/follower_agent.py | |
import pandas as pd | |
from typing import Dict, List, Any, Optional | |
import logging | |
import pandasai as pai # Assuming pandasai is imported as pai globally or configured | |
from google.adk.agents import LlmAgent # Assuming this is the correct import path | |
# Project-specific imports | |
from utils.retry_mechanism import RetryMechanism | |
from data_models.metrics import AgentMetrics, TimeSeriesMetric | |
# Configure logger for this module | |
logger = logging.getLogger(__name__) | |
# Define the model globally or pass it as a parameter. For now, using a constant. | |
# Consider moving this to a shared config or environment variable. | |
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" # Or your specific model like "gemini-1.5-flash-preview-05-20" | |
class EnhancedFollowerAnalysisAgent: | |
""" | |
Enhanced follower analysis agent with proper handling of different follower count types | |
and structured metric extraction. | |
""" | |
AGENT_NAME = "follower_analyst" | |
AGENT_DESCRIPTION = "Expert analyst specializing in follower growth patterns and demographic analysis." | |
AGENT_INSTRUCTION = """ | |
You are a specialized LinkedIn follower analytics expert focused on temporal patterns and demographic trends. | |
Your role includes: | |
1. FOLLOWER TREND ANALYSIS: | |
- Analyze follower growth trends over time (monthly data from 'follower_gains_monthly' type). | |
- Identify growth acceleration/deceleration periods. | |
- Calculate growth rates and velocity changes. | |
- Detect seasonal patterns and anomalies. | |
- Analyze organic vs paid follower counts over time. | |
2. DEMOGRAPHIC ANALYSIS (based on 'follower_industry', 'follower_seniority', etc.): | |
- Analyze follower distribution by industry, seniority, function, and geography. | |
- Compare organic vs paid followers across these demographic segments. | |
- Identify high-value audience segments based on counts and potential engagement. | |
3. TIME-BASED INSIGHTS: | |
- Provide month-over-month comparisons for growth data. | |
- Identify critical inflection points in follower growth. | |
- Calculate trend momentum and acceleration. | |
4. METRIC EXTRACTION (for the AgentMetrics structure): | |
- Extract time-series data for total, organic, and paid follower counts, and growth rates. | |
- Provide aggregate metrics like average monthly growth, total organic/paid followers. | |
- Provide demographic breakdowns as categorical metrics (e.g., top N industries by follower count). | |
Focus on separating temporal analysis (monthly) from demographic analysis. | |
When analyzing demographics, consider the top N segments (e.g., top 10 industries) for conciseness. | |
Ensure your analysis summary is comprehensive and insightful. | |
""" | |
def __init__(self, api_key: str, model_name: Optional[str] = None): | |
""" | |
Initializes the Follower Analysis Agent. | |
Args: | |
api_key: API key for LLM and potentially PandasAI. | |
model_name: Name of the language model to use. Defaults to DEFAULT_AGENT_MODEL. | |
""" | |
self.api_key = api_key # May be used if PandasAI is configured per agent or for other API calls | |
self.model_name = model_name or DEFAULT_AGENT_MODEL | |
self.agent = LlmAgent( | |
name=self.AGENT_NAME, | |
model=self.model_name, | |
description=self.AGENT_DESCRIPTION, | |
instruction=self.AGENT_INSTRUCTION | |
) | |
self.retry_mechanism = RetryMechanism() | |
logger.info(f"{self.AGENT_NAME} initialized with model {self.model_name}.") | |
def _separate_follower_data_by_type(self, df: pd.DataFrame) -> Dict[str, pd.DataFrame]: | |
"""Separate follower data by follower_count_type and process appropriately.""" | |
separated_data = {} | |
if df is None or df.empty or 'follower_count_type' not in df.columns: | |
logger.warning("Input DataFrame is empty or 'follower_count_type' column is missing.") | |
return separated_data | |
# Define the expected follower count types | |
# These should match the 'follower_count_type' values in your Bubble data | |
follower_types = [ | |
'follower_gains_monthly', # For time-series analysis | |
'follower_industry', # For demographic analysis | |
'follower_seniority', | |
'follower_function', | |
'follower_geo' | |
] | |
for ftype in follower_types: | |
type_data = df[df['follower_count_type'] == ftype].copy() | |
if not type_data.empty: | |
if ftype == 'follower_gains_monthly': | |
type_data = self._process_monthly_data(type_data) | |
else: # Demographic data | |
type_data = self._get_top_demographic_segments(type_data, top_n=10) | |
separated_data[ftype] = type_data | |
else: | |
logger.info(f"No data found for follower_count_type: {ftype}") | |
return separated_data | |
def _get_top_demographic_segments(self, demo_df: pd.DataFrame, top_n: int = 10) -> pd.DataFrame: | |
"""Get top N demographic segments by total follower count (organic + paid).""" | |
if demo_df.empty: | |
return demo_df | |
# Ensure required columns exist and are numeric, fill NaNs with 0 for sum | |
demo_df = demo_df.copy() # Work on a copy | |
demo_df['follower_count_organic'] = pd.to_numeric(demo_df.get('follower_count_organic'), errors='coerce').fillna(0) | |
demo_df['follower_count_paid'] = pd.to_numeric(demo_df.get('follower_count_paid'), errors='coerce').fillna(0) | |
demo_df['total_followers'] = demo_df['follower_count_organic'] + demo_df['follower_count_paid'] | |
# Sort by total followers and take top N | |
# 'category_name' usually holds the demographic label (e.g., industry name) | |
if 'category_name' not in demo_df.columns: | |
logger.warning("'_get_top_demographic_segments' expects 'category_name' column for grouping.") | |
return demo_df.drop(columns=['total_followers'], errors='ignore') | |
# Group by category_name if there are multiple entries for the same category, sum followers | |
# This step might be redundant if data is already aggregated per category_name | |
# demo_df_grouped = demo_df.groupby('category_name').agg( | |
# follower_count_organic=('follower_count_organic', 'sum'), | |
# follower_count_paid=('follower_count_paid', 'sum'), | |
# total_followers=('total_followers', 'sum') | |
# ).reset_index() | |
top_segments = demo_df.nlargest(top_n, 'total_followers') | |
return top_segments.drop(columns=['total_followers'], errors='ignore') | |
def _process_monthly_data(self, monthly_df: pd.DataFrame) -> pd.DataFrame: | |
"""Process monthly follower data: parse dates, sort.""" | |
if monthly_df.empty or 'category_name' not in monthly_df.columns: | |
logger.warning("Monthly data DataFrame is empty or 'category_name' column is missing.") | |
return monthly_df | |
df_processed = monthly_df.copy() | |
# 'category_name' for monthly data is expected to be a date string like 'YYYY-MM-DD' | |
# Attempt to convert 'category_name' to datetime | |
df_processed['date_for_analysis'] = pd.to_datetime(df_processed['category_name'], errors='coerce') | |
# Drop rows where date conversion failed | |
df_processed.dropna(subset=['date_for_analysis'], inplace=True) | |
if df_processed.empty: | |
logger.warning("No valid dates found in 'category_name' for monthly data after processing.") | |
return df_processed | |
df_processed['year_month'] = df_processed['date_for_analysis'].dt.strftime('%Y-%m') | |
df_processed['month_name'] = df_processed['date_for_analysis'].dt.strftime('%B %Y') | |
# Ensure numeric types for follower counts | |
for col in ['follower_count_organic', 'follower_count_paid']: | |
if col in df_processed.columns: | |
df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce').fillna(0) | |
else: # Add column with zeros if missing, to prevent errors in later calculations | |
df_processed[col] = 0 | |
return df_processed.sort_values('date_for_analysis') | |
def _extract_time_series_metrics(self, monthly_df: pd.DataFrame) -> List[TimeSeriesMetric]: | |
"""Extract time-series metrics from processed monthly follower data.""" | |
ts_metrics = [] | |
if monthly_df.empty or 'date_for_analysis' not in monthly_df.columns: | |
logger.info("Cannot extract time-series metrics: monthly DataFrame is empty or lacks 'date_for_analysis'.") | |
return ts_metrics | |
# Ensure data is sorted by date for correct growth rate calculation | |
monthly_df_sorted = monthly_df.sort_values('date_for_analysis').copy() | |
timestamps = monthly_df_sorted['year_month'].tolist() | |
# Calculate total followers | |
monthly_df_sorted['total_followers'] = monthly_df_sorted.get('follower_count_organic', 0) + \ | |
monthly_df_sorted.get('follower_count_paid', 0) | |
metric_definitions = { | |
"total_follower_count": monthly_df_sorted['total_followers'], | |
"organic_follower_count": monthly_df_sorted.get('follower_count_organic', pd.Series(0, index=monthly_df_sorted.index)), | |
"paid_follower_count": monthly_df_sorted.get('follower_count_paid', pd.Series(0, index=monthly_df_sorted.index)) | |
} | |
for name, values_series in metric_definitions.items(): | |
ts_metrics.append(TimeSeriesMetric( | |
metric_name=name, | |
values=values_series.tolist(), | |
timestamps=timestamps, | |
metric_type="time_series", | |
time_granularity="monthly" | |
)) | |
# Calculate growth rate for total followers | |
if len(monthly_df_sorted) > 1: | |
# pct_change gives NaN for the first element, fill with 0 | |
growth_rates = monthly_df_sorted['total_followers'].pct_change().fillna(0).tolist() | |
ts_metrics.append(TimeSeriesMetric( | |
metric_name="total_follower_growth_rate", | |
values=growth_rates, | |
timestamps=timestamps, # Timestamps align, first growth rate is vs non-existent previous point (so 0) | |
metric_type="time_series", | |
time_granularity="monthly", | |
unit="%" | |
)) | |
else: | |
logger.info("Not enough data points (<=1) to calculate growth rate.") | |
return ts_metrics | |
def _calculate_aggregate_metrics(self, separated_data: Dict[str, pd.DataFrame]) -> Dict[str, float]: | |
"""Calculate aggregate metrics from all follower data.""" | |
agg_metrics = {} | |
monthly_df = separated_data.get('follower_gains_monthly') | |
if monthly_df is not None and not monthly_df.empty: | |
total_organic = monthly_df['follower_count_organic'].sum() | |
total_paid = monthly_df['follower_count_paid'].sum() | |
total_all_followers = total_organic + total_paid | |
agg_metrics['total_organic_followers_gained_period'] = float(total_organic) | |
agg_metrics['total_paid_followers_gained_period'] = float(total_paid) | |
agg_metrics['overall_total_followers_gained_period'] = float(total_all_followers) | |
if total_all_followers > 0: | |
agg_metrics['overall_organic_follower_ratio_gained'] = float(total_organic / total_all_followers) | |
agg_metrics['overall_paid_follower_ratio_gained'] = float(total_paid / total_all_followers) | |
# Average monthly gain (if 'total_followers' represents gain, not cumulative) | |
# Assuming 'follower_count_organic/paid' in 'follower_gains_monthly' are indeed GAINS for that month | |
monthly_df['monthly_total_gain'] = monthly_df['follower_count_organic'] + monthly_df['follower_count_paid'] | |
if not monthly_df['monthly_total_gain'].empty: | |
agg_metrics['avg_monthly_follower_gain'] = float(monthly_df['monthly_total_gain'].mean()) | |
agg_metrics['max_monthly_follower_gain'] = float(monthly_df['monthly_total_gain'].max()) | |
agg_metrics['min_monthly_follower_gain'] = float(monthly_df['monthly_total_gain'].min()) | |
# Count of distinct demographic segments identified (top N for each) | |
for demo_type in ['follower_industry', 'follower_seniority', 'follower_function', 'follower_geo']: | |
if demo_type in separated_data and not separated_data[demo_type].empty: | |
agg_metrics[f'distinct_{demo_type}_segments_analyzed'] = float(len(separated_data[demo_type])) | |
return agg_metrics | |
def _extract_demographic_metrics(self, separated_data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: | |
"""Extract demographic distributions (categorical metrics).""" | |
cat_metrics = {} | |
demographic_types_map = { | |
'follower_industry': 'industry_distribution', | |
'follower_seniority': 'seniority_distribution', | |
'follower_function': 'function_distribution', | |
'follower_geo': 'geographic_distribution' | |
} | |
for demo_type_key, metric_name_prefix in demographic_types_map.items(): | |
demo_df = separated_data.get(demo_type_key) | |
if demo_df is not None and not demo_df.empty and 'category_name' in demo_df.columns: | |
distribution = {} | |
for _, row in demo_df.iterrows(): | |
category = row['category_name'] | |
organic = float(row.get('follower_count_organic', 0)) | |
paid = float(row.get('follower_count_paid', 0)) | |
total = organic + paid | |
distribution[category] = { | |
'total_followers': total, | |
'organic_followers': organic, | |
'paid_followers': paid, | |
'organic_ratio': organic / total if total > 0 else 0.0 | |
} | |
# Sort by total followers descending for the distribution | |
sorted_distribution = dict(sorted(distribution.items(), key=lambda item: item[1]['total_followers'], reverse=True)) | |
cat_metrics[metric_name_prefix] = sorted_distribution | |
# Summary for this demographic type | |
total_followers_in_type = sum(item['total_followers'] for item in distribution.values()) | |
cat_metrics[f'{metric_name_prefix}_summary'] = { | |
'total_followers_in_top_segments': total_followers_in_type, | |
'number_of_segments_reported': len(distribution), | |
'top_segment': list(sorted_distribution.keys())[0] if sorted_distribution else "N/A" | |
} | |
return cat_metrics | |
def _extract_time_periods(self, monthly_df: Optional[pd.DataFrame]) -> List[str]: | |
"""Extract unique year-month time periods covered by the monthly data.""" | |
if monthly_df is None or monthly_df.empty or 'year_month' not in monthly_df.columns: | |
return ["Data period not available or N/A"] | |
periods = sorted(monthly_df['year_month'].dropna().unique().tolist(), reverse=True) | |
return periods[:12] # Return up to the last 12 months if available | |
def analyze_follower_data(self, follower_stats_df: pd.DataFrame) -> AgentMetrics: | |
""" | |
Generate comprehensive follower analysis using PandasAI and structured metric extraction. | |
""" | |
if follower_stats_df is None or follower_stats_df.empty: | |
logger.warning("Follower statistics DataFrame is empty. Returning empty metrics.") | |
return AgentMetrics( | |
agent_name=self.AGENT_NAME, | |
analysis_summary="No follower data provided for analysis.", | |
time_periods_covered=["N/A"] | |
) | |
# 1. Pre-process and separate data | |
separated_data = self._separate_follower_data_by_type(follower_stats_df) | |
# Prepare a combined DataFrame for PandasAI if needed, or use the original one. | |
# For PandasAI, it's often better to provide a clean, understandable DataFrame. | |
# Let's use the original df for the textual analysis by PandasAI, | |
# as it contains all types and the LLM can be instructed to differentiate. | |
# Ensure PandasAI is configured (this should ideally be done once at orchestrator level) | |
# from utils.pandasai_setup import configure_pandasai | |
# configure_pandasai(self.api_key, self.model_name) # Or pass LLM object if configured outside | |
df_description = "LinkedIn follower statistics. Contains 'follower_count_type' indicating data category (e.g., 'follower_gains_monthly', 'follower_industry'), 'category_name' (e.g., date for monthly, industry name for industry type), 'follower_count_organic', 'follower_count_paid'." | |
# Create PandasAI DataFrame | |
# Check if pai.DataFrame is the correct way to initialize based on your pandasai version | |
try: | |
pandas_ai_df = pai.DataFrame(follower_stats_df, description=df_description) | |
except Exception as e: | |
logger.error(f"Failed to create PandasAI DataFrame: {e}", exc_info=True) | |
return AgentMetrics( | |
agent_name=self.AGENT_NAME, | |
analysis_summary=f"Error initializing PandasAI: {e}", | |
time_periods_covered=self._extract_time_periods(separated_data.get('follower_gains_monthly')) | |
) | |
# 2. Generate textual analysis using PandasAI via LlmAgent | |
# The LlmAgent itself doesn't directly use PandasAI's .chat() method. | |
# The instruction for LlmAgent should guide it to perform analysis. | |
# If direct PandasAI chat is needed, it's a separate call. | |
# The original code uses pandas_df.chat(analysis_query). This implies PandasAI is used directly. | |
# Let's stick to the direct PandasAI chat call as in the original structure. | |
analysis_query = f""" | |
Analyze the provided LinkedIn follower statistics. The DataFrame contains various 'follower_count_type' values. | |
Focus on: | |
1. For 'follower_gains_monthly': Analyze monthly follower growth trends (total, organic, paid). Identify key periods of growth or decline. | |
2. For demographic types (industry, seniority, function, geo): Describe the distribution of followers. Which are the top segments? How do organic vs paid compare? | |
3. Synthesize these findings into an overall summary of follower dynamics. | |
Consider the data structure: 'category_name' holds the date for monthly data or the demographic label. | |
'follower_count_organic' and 'follower_count_paid' are the key metrics. | |
""" | |
analysis_result_text = "PandasAI analysis could not be performed." # Default | |
try: | |
def chat_operation(): | |
# Ensure the LLM for PandasAI is correctly configured before this call | |
# This might involve re-calling configure_pandasai if it's not persistent | |
# or if the LLM object needs to be explicitly passed to PandasAI DataFrame. | |
if not pai.config.llm: # Check if LLM is set for PandasAI | |
logger.warning("PandasAI LLM not configured. Attempting to configure now.") | |
# This assumes configure_pandasai is available and sets pai.config.llm | |
from utils.pandasai_setup import configure_pandasai | |
configure_pandasai(self.api_key, self.model_name) | |
if not pai.config.llm: | |
raise RuntimeError("PandasAI LLM could not be configured for chat operation.") | |
logger.info(f"Executing PandasAI chat for follower analysis with LLM: {pai.config.llm}") | |
return pandas_ai_df.chat(analysis_query) | |
analysis_result_raw = self.retry_mechanism.retry_with_backoff( | |
func=chat_operation, | |
max_retries=2, # Adjusted retries | |
base_delay=2.0, | |
exceptions=(Exception,) # Catch broader exceptions for PandasAI calls | |
) | |
analysis_result_text = str(analysis_result_raw) if analysis_result_raw else "No textual analysis generated by PandasAI." | |
logger.info("Follower analysis via PandasAI completed.") | |
except Exception as e: | |
logger.error(f"Follower analysis with PandasAI failed after retries: {e}", exc_info=True) | |
analysis_result_text = f"Follower analysis using PandasAI failed. Error: {str(e)[:200]}" | |
# 3. Extract structured metrics using the separated and processed data | |
monthly_data_for_metrics = separated_data.get('follower_gains_monthly', pd.DataFrame()) | |
time_series_metrics = self._extract_time_series_metrics(monthly_data_for_metrics) | |
aggregate_metrics = self._calculate_aggregate_metrics(separated_data) # Uses all separated types | |
categorical_metrics = self._extract_demographic_metrics(separated_data) # Uses demographic types | |
time_periods = self._extract_time_periods(monthly_data_for_metrics) | |
return AgentMetrics( | |
agent_name=self.AGENT_NAME, | |
analysis_summary=analysis_result_text[:2000], # Truncate if too long | |
time_series_metrics=time_series_metrics, | |
aggregate_metrics=aggregate_metrics, | |
categorical_metrics=categorical_metrics, | |
time_periods_covered=time_periods, | |
data_sources_used=[f"follower_stats_df (shape: {follower_stats_df.shape})"] | |
) | |
if __name__ == '__main__': | |
# This is for example and testing purposes. | |
# Ensure logging and other necessary setups are done. | |
try: | |
from utils.logging_config import setup_logging | |
setup_logging() | |
logger.info("Logging setup for EnhancedFollowerAnalysisAgent test.") | |
except ImportError: | |
logging.basicConfig(level=logging.INFO) | |
logger.warning("Could not import setup_logging. Using basicConfig.") | |
# Mock API Key and Model for testing | |
# IMPORTANT: For PandasAI to run, a valid API key and model setup are needed. | |
# This example might not fully execute PandasAI chat without proper environment setup. | |
MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_followers") | |
MODEL_NAME = DEFAULT_AGENT_MODEL | |
# Configure PandasAI (essential for the .chat() part) | |
try: | |
from utils.pandasai_setup import configure_pandasai | |
if MOCK_API_KEY != "test_api_key_followers": # Only configure if a real key might be present | |
configure_pandasai(MOCK_API_KEY, MODEL_NAME) | |
logger.info("PandasAI configured for testing EnhancedFollowerAnalysisAgent.") | |
else: | |
logger.warning("Using mock API key. PandasAI chat will likely fail or use a default/mock LLM if available.") | |
# Mock pai.DataFrame if pandasai is not fully set up to avoid errors | |
class MockPandasAIDataFrame: | |
def __init__(self, df, description): self.df = df; self.description = description | |
def chat(self, query): return f"Mock PandasAI response to: {query}" | |
pai.DataFrame = MockPandasAIDataFrame | |
except ImportError: | |
logger.error("utils.pandasai_setup not found. PandasAI will not be configured.") | |
class MockPandasAIDataFrame: | |
def __init__(self, df, description): self.df = df; self.description = description | |
def chat(self, query): return f"Mock PandasAI response to: {query}" | |
pai.DataFrame = MockPandasAIDataFrame | |
# Sample Data | |
sample_follower_data = { | |
'follower_count_type': [ | |
'follower_gains_monthly', 'follower_gains_monthly', 'follower_gains_monthly', | |
'follower_industry', 'follower_industry', 'follower_industry', 'follower_industry', | |
'follower_seniority', 'follower_seniority' | |
], | |
'category_name': [ # Dates for monthly, names for demographics | |
'2023-01-01', '2023-02-01', '2023-03-01', | |
'Technology', 'Finance', 'Healthcare', 'Retail', | |
'Senior', 'Entry-Level' | |
], | |
'follower_count_organic': [ | |
100, 120, 110, # Monthly gains | |
500, 300, 200, 150, # Industry organic | |
600, 400 # Seniority organic | |
], | |
'follower_count_paid': [ | |
10, 15, 12, # Monthly gains | |
50, 30, 20, 10, # Industry paid | |
60, 40 # Seniority paid | |
] | |
} | |
sample_df = pd.DataFrame(sample_follower_data) | |
# Initialize agent | |
follower_agent = EnhancedFollowerAnalysisAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME) | |
logger.info("Analyzing sample follower data...") | |
metrics_result = follower_agent.analyze_follower_data(sample_df) | |
print("\n--- EnhancedFollowerAnalysisAgent Results ---") | |
print(f"Agent Name: {metrics_result.agent_name}") | |
print(f"Analysis Summary: {metrics_result.analysis_summary}") | |
print("\nTime Series Metrics:") | |
for ts_metric in metrics_result.time_series_metrics: | |
print(f" - {ts_metric.metric_name}: {len(ts_metric.values)} data points, e.g., {ts_metric.values[:3]} for ts {ts_metric.timestamps[:3]}") | |
print("\nAggregate Metrics:") | |
for key, value in metrics_result.aggregate_metrics.items(): | |
print(f" - {key}: {value}") | |
print("\nCategorical Metrics:") | |
for key, value in metrics_result.categorical_metrics.items(): | |
print(f" - {key}: (details below)") | |
if isinstance(value, dict): | |
for sub_key, sub_value in list(value.items())[:2]: # Print first 2 items for brevity | |
print(f" - {sub_key}: {sub_value}") | |
else: | |
print(f" {value}") | |
print(f"\nTime Periods Covered: {metrics_result.time_periods_covered}") | |
print(f"Data Sources Used: {metrics_result.data_sources_used}") | |
print(f"Generated Timestamp: {metrics_result.generation_timestamp}") | |
# Test with empty DataFrame | |
logger.info("\n--- Testing with empty DataFrame ---") | |
empty_metrics_result = follower_agent.analyze_follower_data(pd.DataFrame()) | |
print(f"Empty DF Analysis Summary: {empty_metrics_result.analysis_summary}") | |