GuglielmoTor's picture
Update insight_and_tasks/agents/follower_agent.py
62d9a4c verified
raw
history blame
26.7 kB
# agents/follower_agent.py
import pandas as pd
from typing import Dict, List, Any, Optional
import logging
import pandasai as pai # Assuming pandasai is imported as pai globally or configured
from google.adk.agents import LlmAgent # Assuming this is the correct import path
# Project-specific imports
from utils.retry_mechanism import RetryMechanism
from data_models.metrics import AgentMetrics, TimeSeriesMetric
# Configure logger for this module
logger = logging.getLogger(__name__)
# Define the model globally or pass it as a parameter. For now, using a constant.
# Consider moving this to a shared config or environment variable.
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" # Or your specific model like "gemini-1.5-flash-preview-05-20"
class EnhancedFollowerAnalysisAgent:
"""
Enhanced follower analysis agent with proper handling of different follower count types
and structured metric extraction.
"""
AGENT_NAME = "follower_analyst"
AGENT_DESCRIPTION = "Expert analyst specializing in follower growth patterns and demographic analysis."
AGENT_INSTRUCTION = """
You are a specialized LinkedIn follower analytics expert focused on temporal patterns and demographic trends.
Your role includes:
1. FOLLOWER TREND ANALYSIS:
- Analyze follower growth trends over time (monthly data from 'follower_gains_monthly' type).
- Identify growth acceleration/deceleration periods.
- Calculate growth rates and velocity changes.
- Detect seasonal patterns and anomalies.
- Analyze organic vs paid follower counts over time.
2. DEMOGRAPHIC ANALYSIS (based on 'follower_industry', 'follower_seniority', etc.):
- Analyze follower distribution by industry, seniority, function, and geography.
- Compare organic vs paid followers across these demographic segments.
- Identify high-value audience segments based on counts and potential engagement.
3. TIME-BASED INSIGHTS:
- Provide month-over-month comparisons for growth data.
- Identify critical inflection points in follower growth.
- Calculate trend momentum and acceleration.
4. METRIC EXTRACTION (for the AgentMetrics structure):
- Extract time-series data for total, organic, and paid follower counts, and growth rates.
- Provide aggregate metrics like average monthly growth, total organic/paid followers.
- Provide demographic breakdowns as categorical metrics (e.g., top N industries by follower count).
Focus on separating temporal analysis (monthly) from demographic analysis.
When analyzing demographics, consider the top N segments (e.g., top 10 industries) for conciseness.
Ensure your analysis summary is comprehensive and insightful.
"""
def __init__(self, api_key: str, model_name: Optional[str] = None):
"""
Initializes the Follower Analysis Agent.
Args:
api_key: API key for LLM and potentially PandasAI.
model_name: Name of the language model to use. Defaults to DEFAULT_AGENT_MODEL.
"""
self.api_key = api_key # May be used if PandasAI is configured per agent or for other API calls
self.model_name = model_name or DEFAULT_AGENT_MODEL
self.agent = LlmAgent(
name=self.AGENT_NAME,
model=self.model_name,
description=self.AGENT_DESCRIPTION,
instruction=self.AGENT_INSTRUCTION
)
self.retry_mechanism = RetryMechanism()
logger.info(f"{self.AGENT_NAME} initialized with model {self.model_name}.")
def _separate_follower_data_by_type(self, df: pd.DataFrame) -> Dict[str, pd.DataFrame]:
"""Separate follower data by follower_count_type and process appropriately."""
separated_data = {}
if df is None or df.empty or 'follower_count_type' not in df.columns:
logger.warning("Input DataFrame is empty or 'follower_count_type' column is missing.")
return separated_data
# Define the expected follower count types
# These should match the 'follower_count_type' values in your Bubble data
follower_types = [
'follower_gains_monthly', # For time-series analysis
'follower_industry', # For demographic analysis
'follower_seniority',
'follower_function',
'follower_geo'
]
for ftype in follower_types:
type_data = df[df['follower_count_type'] == ftype].copy()
if not type_data.empty:
if ftype == 'follower_gains_monthly':
type_data = self._process_monthly_data(type_data)
else: # Demographic data
type_data = self._get_top_demographic_segments(type_data, top_n=10)
separated_data[ftype] = type_data
else:
logger.info(f"No data found for follower_count_type: {ftype}")
return separated_data
def _get_top_demographic_segments(self, demo_df: pd.DataFrame, top_n: int = 10) -> pd.DataFrame:
"""Get top N demographic segments by total follower count (organic + paid)."""
if demo_df.empty:
return demo_df
# Ensure required columns exist and are numeric, fill NaNs with 0 for sum
demo_df = demo_df.copy() # Work on a copy
demo_df['follower_count_organic'] = pd.to_numeric(demo_df.get('follower_count_organic'), errors='coerce').fillna(0)
demo_df['follower_count_paid'] = pd.to_numeric(demo_df.get('follower_count_paid'), errors='coerce').fillna(0)
demo_df['total_followers'] = demo_df['follower_count_organic'] + demo_df['follower_count_paid']
# Sort by total followers and take top N
# 'category_name' usually holds the demographic label (e.g., industry name)
if 'category_name' not in demo_df.columns:
logger.warning("'_get_top_demographic_segments' expects 'category_name' column for grouping.")
return demo_df.drop(columns=['total_followers'], errors='ignore')
# Group by category_name if there are multiple entries for the same category, sum followers
# This step might be redundant if data is already aggregated per category_name
# demo_df_grouped = demo_df.groupby('category_name').agg(
# follower_count_organic=('follower_count_organic', 'sum'),
# follower_count_paid=('follower_count_paid', 'sum'),
# total_followers=('total_followers', 'sum')
# ).reset_index()
top_segments = demo_df.nlargest(top_n, 'total_followers')
return top_segments.drop(columns=['total_followers'], errors='ignore')
def _process_monthly_data(self, monthly_df: pd.DataFrame) -> pd.DataFrame:
"""Process monthly follower data: parse dates, sort."""
if monthly_df.empty or 'category_name' not in monthly_df.columns:
logger.warning("Monthly data DataFrame is empty or 'category_name' column is missing.")
return monthly_df
df_processed = monthly_df.copy()
# 'category_name' for monthly data is expected to be a date string like 'YYYY-MM-DD'
# Attempt to convert 'category_name' to datetime
df_processed['date_for_analysis'] = pd.to_datetime(df_processed['category_name'], errors='coerce')
# Drop rows where date conversion failed
df_processed.dropna(subset=['date_for_analysis'], inplace=True)
if df_processed.empty:
logger.warning("No valid dates found in 'category_name' for monthly data after processing.")
return df_processed
df_processed['year_month'] = df_processed['date_for_analysis'].dt.strftime('%Y-%m')
df_processed['month_name'] = df_processed['date_for_analysis'].dt.strftime('%B %Y')
# Ensure numeric types for follower counts
for col in ['follower_count_organic', 'follower_count_paid']:
if col in df_processed.columns:
df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce').fillna(0)
else: # Add column with zeros if missing, to prevent errors in later calculations
df_processed[col] = 0
return df_processed.sort_values('date_for_analysis')
def _extract_time_series_metrics(self, monthly_df: pd.DataFrame) -> List[TimeSeriesMetric]:
"""Extract time-series metrics from processed monthly follower data."""
ts_metrics = []
if monthly_df.empty or 'date_for_analysis' not in monthly_df.columns:
logger.info("Cannot extract time-series metrics: monthly DataFrame is empty or lacks 'date_for_analysis'.")
return ts_metrics
# Ensure data is sorted by date for correct growth rate calculation
monthly_df_sorted = monthly_df.sort_values('date_for_analysis').copy()
timestamps = monthly_df_sorted['year_month'].tolist()
# Calculate total followers
monthly_df_sorted['total_followers'] = monthly_df_sorted.get('follower_count_organic', 0) + \
monthly_df_sorted.get('follower_count_paid', 0)
metric_definitions = {
"total_follower_count": monthly_df_sorted['total_followers'],
"organic_follower_count": monthly_df_sorted.get('follower_count_organic', pd.Series(0, index=monthly_df_sorted.index)),
"paid_follower_count": monthly_df_sorted.get('follower_count_paid', pd.Series(0, index=monthly_df_sorted.index))
}
for name, values_series in metric_definitions.items():
ts_metrics.append(TimeSeriesMetric(
metric_name=name,
values=values_series.tolist(),
timestamps=timestamps,
metric_type="time_series",
time_granularity="monthly"
))
# Calculate growth rate for total followers
if len(monthly_df_sorted) > 1:
# pct_change gives NaN for the first element, fill with 0
growth_rates = monthly_df_sorted['total_followers'].pct_change().fillna(0).tolist()
ts_metrics.append(TimeSeriesMetric(
metric_name="total_follower_growth_rate",
values=growth_rates,
timestamps=timestamps, # Timestamps align, first growth rate is vs non-existent previous point (so 0)
metric_type="time_series",
time_granularity="monthly",
unit="%"
))
else:
logger.info("Not enough data points (<=1) to calculate growth rate.")
return ts_metrics
def _calculate_aggregate_metrics(self, separated_data: Dict[str, pd.DataFrame]) -> Dict[str, float]:
"""Calculate aggregate metrics from all follower data."""
agg_metrics = {}
monthly_df = separated_data.get('follower_gains_monthly')
if monthly_df is not None and not monthly_df.empty:
total_organic = monthly_df['follower_count_organic'].sum()
total_paid = monthly_df['follower_count_paid'].sum()
total_all_followers = total_organic + total_paid
agg_metrics['total_organic_followers_gained_period'] = float(total_organic)
agg_metrics['total_paid_followers_gained_period'] = float(total_paid)
agg_metrics['overall_total_followers_gained_period'] = float(total_all_followers)
if total_all_followers > 0:
agg_metrics['overall_organic_follower_ratio_gained'] = float(total_organic / total_all_followers)
agg_metrics['overall_paid_follower_ratio_gained'] = float(total_paid / total_all_followers)
# Average monthly gain (if 'total_followers' represents gain, not cumulative)
# Assuming 'follower_count_organic/paid' in 'follower_gains_monthly' are indeed GAINS for that month
monthly_df['monthly_total_gain'] = monthly_df['follower_count_organic'] + monthly_df['follower_count_paid']
if not monthly_df['monthly_total_gain'].empty:
agg_metrics['avg_monthly_follower_gain'] = float(monthly_df['monthly_total_gain'].mean())
agg_metrics['max_monthly_follower_gain'] = float(monthly_df['monthly_total_gain'].max())
agg_metrics['min_monthly_follower_gain'] = float(monthly_df['monthly_total_gain'].min())
# Count of distinct demographic segments identified (top N for each)
for demo_type in ['follower_industry', 'follower_seniority', 'follower_function', 'follower_geo']:
if demo_type in separated_data and not separated_data[demo_type].empty:
agg_metrics[f'distinct_{demo_type}_segments_analyzed'] = float(len(separated_data[demo_type]))
return agg_metrics
def _extract_demographic_metrics(self, separated_data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""Extract demographic distributions (categorical metrics)."""
cat_metrics = {}
demographic_types_map = {
'follower_industry': 'industry_distribution',
'follower_seniority': 'seniority_distribution',
'follower_function': 'function_distribution',
'follower_geo': 'geographic_distribution'
}
for demo_type_key, metric_name_prefix in demographic_types_map.items():
demo_df = separated_data.get(demo_type_key)
if demo_df is not None and not demo_df.empty and 'category_name' in demo_df.columns:
distribution = {}
for _, row in demo_df.iterrows():
category = row['category_name']
organic = float(row.get('follower_count_organic', 0))
paid = float(row.get('follower_count_paid', 0))
total = organic + paid
distribution[category] = {
'total_followers': total,
'organic_followers': organic,
'paid_followers': paid,
'organic_ratio': organic / total if total > 0 else 0.0
}
# Sort by total followers descending for the distribution
sorted_distribution = dict(sorted(distribution.items(), key=lambda item: item[1]['total_followers'], reverse=True))
cat_metrics[metric_name_prefix] = sorted_distribution
# Summary for this demographic type
total_followers_in_type = sum(item['total_followers'] for item in distribution.values())
cat_metrics[f'{metric_name_prefix}_summary'] = {
'total_followers_in_top_segments': total_followers_in_type,
'number_of_segments_reported': len(distribution),
'top_segment': list(sorted_distribution.keys())[0] if sorted_distribution else "N/A"
}
return cat_metrics
def _extract_time_periods(self, monthly_df: Optional[pd.DataFrame]) -> List[str]:
"""Extract unique year-month time periods covered by the monthly data."""
if monthly_df is None or monthly_df.empty or 'year_month' not in monthly_df.columns:
return ["Data period not available or N/A"]
periods = sorted(monthly_df['year_month'].dropna().unique().tolist(), reverse=True)
return periods[:12] # Return up to the last 12 months if available
def analyze_follower_data(self, follower_stats_df: pd.DataFrame) -> AgentMetrics:
"""
Generate comprehensive follower analysis using PandasAI and structured metric extraction.
"""
if follower_stats_df is None or follower_stats_df.empty:
logger.warning("Follower statistics DataFrame is empty. Returning empty metrics.")
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary="No follower data provided for analysis.",
time_periods_covered=["N/A"]
)
# 1. Pre-process and separate data
separated_data = self._separate_follower_data_by_type(follower_stats_df)
# Prepare a combined DataFrame for PandasAI if needed, or use the original one.
# For PandasAI, it's often better to provide a clean, understandable DataFrame.
# Let's use the original df for the textual analysis by PandasAI,
# as it contains all types and the LLM can be instructed to differentiate.
# Ensure PandasAI is configured (this should ideally be done once at orchestrator level)
# from utils.pandasai_setup import configure_pandasai
# configure_pandasai(self.api_key, self.model_name) # Or pass LLM object if configured outside
df_description = "LinkedIn follower statistics. Contains 'follower_count_type' indicating data category (e.g., 'follower_gains_monthly', 'follower_industry'), 'category_name' (e.g., date for monthly, industry name for industry type), 'follower_count_organic', 'follower_count_paid'."
# Create PandasAI DataFrame
# Check if pai.DataFrame is the correct way to initialize based on your pandasai version
try:
pandas_ai_df = pai.DataFrame(follower_stats_df, description=df_description)
except Exception as e:
logger.error(f"Failed to create PandasAI DataFrame: {e}", exc_info=True)
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary=f"Error initializing PandasAI: {e}",
time_periods_covered=self._extract_time_periods(separated_data.get('follower_gains_monthly'))
)
# 2. Generate textual analysis using PandasAI via LlmAgent
# The LlmAgent itself doesn't directly use PandasAI's .chat() method.
# The instruction for LlmAgent should guide it to perform analysis.
# If direct PandasAI chat is needed, it's a separate call.
# The original code uses pandas_df.chat(analysis_query). This implies PandasAI is used directly.
# Let's stick to the direct PandasAI chat call as in the original structure.
analysis_query = f"""
Analyze the provided LinkedIn follower statistics. The DataFrame contains various 'follower_count_type' values.
Focus on:
1. For 'follower_gains_monthly': Analyze monthly follower growth trends (total, organic, paid). Identify key periods of growth or decline.
2. For demographic types (industry, seniority, function, geo): Describe the distribution of followers. Which are the top segments? How do organic vs paid compare?
3. Synthesize these findings into an overall summary of follower dynamics.
Consider the data structure: 'category_name' holds the date for monthly data or the demographic label.
'follower_count_organic' and 'follower_count_paid' are the key metrics.
"""
analysis_result_text = "PandasAI analysis could not be performed." # Default
try:
def chat_operation():
# Ensure the LLM for PandasAI is correctly configured before this call
# This might involve re-calling configure_pandasai if it's not persistent
# or if the LLM object needs to be explicitly passed to PandasAI DataFrame.
if not pai.config.llm: # Check if LLM is set for PandasAI
logger.warning("PandasAI LLM not configured. Attempting to configure now.")
# This assumes configure_pandasai is available and sets pai.config.llm
from utils.pandasai_setup import configure_pandasai
configure_pandasai(self.api_key, self.model_name)
if not pai.config.llm:
raise RuntimeError("PandasAI LLM could not be configured for chat operation.")
logger.info(f"Executing PandasAI chat for follower analysis with LLM: {pai.config.llm}")
return pandas_ai_df.chat(analysis_query)
analysis_result_raw = self.retry_mechanism.retry_with_backoff(
func=chat_operation,
max_retries=2, # Adjusted retries
base_delay=2.0,
exceptions=(Exception,) # Catch broader exceptions for PandasAI calls
)
analysis_result_text = str(analysis_result_raw) if analysis_result_raw else "No textual analysis generated by PandasAI."
logger.info("Follower analysis via PandasAI completed.")
except Exception as e:
logger.error(f"Follower analysis with PandasAI failed after retries: {e}", exc_info=True)
analysis_result_text = f"Follower analysis using PandasAI failed. Error: {str(e)[:200]}"
# 3. Extract structured metrics using the separated and processed data
monthly_data_for_metrics = separated_data.get('follower_gains_monthly', pd.DataFrame())
time_series_metrics = self._extract_time_series_metrics(monthly_data_for_metrics)
aggregate_metrics = self._calculate_aggregate_metrics(separated_data) # Uses all separated types
categorical_metrics = self._extract_demographic_metrics(separated_data) # Uses demographic types
time_periods = self._extract_time_periods(monthly_data_for_metrics)
return AgentMetrics(
agent_name=self.AGENT_NAME,
analysis_summary=analysis_result_text[:2000], # Truncate if too long
time_series_metrics=time_series_metrics,
aggregate_metrics=aggregate_metrics,
categorical_metrics=categorical_metrics,
time_periods_covered=time_periods,
data_sources_used=[f"follower_stats_df (shape: {follower_stats_df.shape})"]
)
if __name__ == '__main__':
# This is for example and testing purposes.
# Ensure logging and other necessary setups are done.
try:
from utils.logging_config import setup_logging
setup_logging()
logger.info("Logging setup for EnhancedFollowerAnalysisAgent test.")
except ImportError:
logging.basicConfig(level=logging.INFO)
logger.warning("Could not import setup_logging. Using basicConfig.")
# Mock API Key and Model for testing
# IMPORTANT: For PandasAI to run, a valid API key and model setup are needed.
# This example might not fully execute PandasAI chat without proper environment setup.
MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_followers")
MODEL_NAME = DEFAULT_AGENT_MODEL
# Configure PandasAI (essential for the .chat() part)
try:
from utils.pandasai_setup import configure_pandasai
if MOCK_API_KEY != "test_api_key_followers": # Only configure if a real key might be present
configure_pandasai(MOCK_API_KEY, MODEL_NAME)
logger.info("PandasAI configured for testing EnhancedFollowerAnalysisAgent.")
else:
logger.warning("Using mock API key. PandasAI chat will likely fail or use a default/mock LLM if available.")
# Mock pai.DataFrame if pandasai is not fully set up to avoid errors
class MockPandasAIDataFrame:
def __init__(self, df, description): self.df = df; self.description = description
def chat(self, query): return f"Mock PandasAI response to: {query}"
pai.DataFrame = MockPandasAIDataFrame
except ImportError:
logger.error("utils.pandasai_setup not found. PandasAI will not be configured.")
class MockPandasAIDataFrame:
def __init__(self, df, description): self.df = df; self.description = description
def chat(self, query): return f"Mock PandasAI response to: {query}"
pai.DataFrame = MockPandasAIDataFrame
# Sample Data
sample_follower_data = {
'follower_count_type': [
'follower_gains_monthly', 'follower_gains_monthly', 'follower_gains_monthly',
'follower_industry', 'follower_industry', 'follower_industry', 'follower_industry',
'follower_seniority', 'follower_seniority'
],
'category_name': [ # Dates for monthly, names for demographics
'2023-01-01', '2023-02-01', '2023-03-01',
'Technology', 'Finance', 'Healthcare', 'Retail',
'Senior', 'Entry-Level'
],
'follower_count_organic': [
100, 120, 110, # Monthly gains
500, 300, 200, 150, # Industry organic
600, 400 # Seniority organic
],
'follower_count_paid': [
10, 15, 12, # Monthly gains
50, 30, 20, 10, # Industry paid
60, 40 # Seniority paid
]
}
sample_df = pd.DataFrame(sample_follower_data)
# Initialize agent
follower_agent = EnhancedFollowerAnalysisAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME)
logger.info("Analyzing sample follower data...")
metrics_result = follower_agent.analyze_follower_data(sample_df)
print("\n--- EnhancedFollowerAnalysisAgent Results ---")
print(f"Agent Name: {metrics_result.agent_name}")
print(f"Analysis Summary: {metrics_result.analysis_summary}")
print("\nTime Series Metrics:")
for ts_metric in metrics_result.time_series_metrics:
print(f" - {ts_metric.metric_name}: {len(ts_metric.values)} data points, e.g., {ts_metric.values[:3]} for ts {ts_metric.timestamps[:3]}")
print("\nAggregate Metrics:")
for key, value in metrics_result.aggregate_metrics.items():
print(f" - {key}: {value}")
print("\nCategorical Metrics:")
for key, value in metrics_result.categorical_metrics.items():
print(f" - {key}: (details below)")
if isinstance(value, dict):
for sub_key, sub_value in list(value.items())[:2]: # Print first 2 items for brevity
print(f" - {sub_key}: {sub_value}")
else:
print(f" {value}")
print(f"\nTime Periods Covered: {metrics_result.time_periods_covered}")
print(f"Data Sources Used: {metrics_result.data_sources_used}")
print(f"Generated Timestamp: {metrics_result.generation_timestamp}")
# Test with empty DataFrame
logger.info("\n--- Testing with empty DataFrame ---")
empty_metrics_result = follower_agent.analyze_follower_data(pd.DataFrame())
print(f"Empty DF Analysis Summary: {empty_metrics_result.analysis_summary}")