Spaces:
Running
Running
# agents/post_agent.py | |
import pandas as pd | |
from typing import Dict, List, Any, Optional | |
import logging | |
import pandasai as pai # Assuming pandasai is imported as pai globally or configured | |
from google.adk.agents import LlmAgent # Assuming this is the correct import path | |
# Project-specific imports | |
from utils.retry_mechanism import RetryMechanism | |
from data_models.metrics import AgentMetrics, TimeSeriesMetric | |
# Configure logger for this module | |
logger = logging.getLogger(__name__) | |
DEFAULT_AGENT_MODEL = "gemini-2.5-flash-preview-05-20" | |
class EnhancedPostPerformanceAgent: | |
""" | |
Enhanced post performance agent with time-series metric extraction and detailed analysis. | |
""" | |
AGENT_NAME = "post_analyst" | |
AGENT_DESCRIPTION = "Expert analyst specializing in content performance trends and engagement patterns." | |
AGENT_INSTRUCTION = """ | |
You are a specialized LinkedIn content performance expert focused on temporal engagement patterns, | |
content type effectiveness, and audience interaction. | |
Your role includes: | |
1. ENGAGEMENT TREND ANALYSIS (monthly, using 'published_at'): | |
- Analyze trends for key engagement metrics: likes, comments, shares, overall engagement ('engagement' column), impressions, clicks. | |
- Calculate and analyze engagement rate (engagement / impressionCount) over time. | |
- Calculate and analyze click-through rate (CTR: clickCount / impressionCount) over time. | |
- Identify periods of high/low engagement and potential drivers. | |
2. CONTENT TYPE & TOPIC PERFORMANCE: | |
- Compare performance across different media types (using 'media_type' column). | |
- Analyze performance by content topic/pillar (using 'li_eb_label' column). | |
- Identify which content types/topics drive the most engagement, impressions, or clicks. | |
- Analyze if the effectiveness of certain media types or topics changes over time. | |
3. POSTING BEHAVIOR ANALYSIS: | |
- Analyze posting frequency (e.g., posts per week/month) and its potential impact on overall engagement or reach. | |
- Identify if there are optimal posting times or days based on engagement patterns (if 'published_at' provides time detail). | |
4. SENTIMENT ANALYSIS (if 'sentiment' column is available): | |
- Analyze the distribution of sentiment (e.g., positive, negative, neutral) associated with posts. | |
- Track how average sentiment of posts evolves over time. | |
5. AD PERFORMANCE (if 'is_ad' column is available): | |
- Compare performance (engagement, reach, CTR) of ad posts vs. organic posts. | |
6. METRIC EXTRACTION (for AgentMetrics): | |
- Extract time-series data for average monthly engagement metrics (likes, comments, engagement rate, CTR, etc.). | |
- Provide aggregate performance metrics (e.g., overall average engagement rate, total impressions, top performing media type). | |
- Include distributions for content types, topics, and sentiment as categorical metrics. | |
Focus on actionable insights. What content resonates most? When is the audience most active? How can strategy be improved? | |
Structure your analysis clearly. Use the provided DataFrame columns ('published_at', 'media_type', 'li_eb_label', | |
'likeCount', 'commentCount', 'shareCount', 'engagement', 'impressionCount', 'clickCount', 'sentiment', 'is_ad'). | |
""" | |
def __init__(self, api_key: str, model_name: Optional[str] = None): | |
self.api_key = api_key | |
self.model_name = model_name or DEFAULT_AGENT_MODEL | |
self.agent = LlmAgent( | |
name=self.AGENT_NAME, | |
model=self.model_name, | |
description=self.AGENT_DESCRIPTION, | |
instruction=self.AGENT_INSTRUCTION | |
) | |
self.retry_mechanism = RetryMechanism() | |
logger.info(f"{self.AGENT_NAME} initialized with model {self.model_name}.") | |
def _preprocess_post_data(self, df: pd.DataFrame) -> pd.DataFrame: | |
"""Cleans and prepares post data for analysis.""" | |
if df is None or df.empty: | |
return pd.DataFrame() | |
df_processed = df.copy() | |
# Convert 'published_at' to datetime | |
if 'published_at' in df_processed.columns: | |
df_processed['published_at'] = pd.to_datetime(df_processed['published_at'], errors='coerce') | |
# df_processed.dropna(subset=['published_at'], inplace=True) # Keep rows even if date is NaT for other metrics | |
else: | |
logger.warning("'published_at' column not found. Time-series analysis will be limited.") | |
# Add a placeholder if critical for downstream, or handle absence gracefully | |
# df_processed['published_at'] = pd.NaT | |
# Ensure numeric types for engagement metrics, coercing errors and filling NaNs | |
metric_cols = ['likeCount', 'commentCount', 'shareCount', 'engagement', 'impressionCount', 'clickCount'] | |
for col in metric_cols: | |
if col in df_processed.columns: | |
df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce').fillna(0) | |
else: | |
logger.info(f"Metric column '{col}' not found in post data. Will be treated as 0.") | |
df_processed[col] = 0 # Add column with zeros if missing | |
# Calculate Engagement Rate and CTR where possible | |
if 'impressionCount' in df_processed.columns and 'engagement' in df_processed.columns: | |
df_processed['engagement_rate'] = df_processed.apply( | |
lambda row: (row['engagement'] / row['impressionCount']) if row['impressionCount'] > 0 else 0.0, axis=1 | |
) | |
else: | |
df_processed['engagement_rate'] = 0.0 | |
if 'impressionCount' in df_processed.columns and 'clickCount' in df_processed.columns: | |
df_processed['ctr'] = df_processed.apply( | |
lambda row: (row['clickCount'] / row['impressionCount']) if row['impressionCount'] > 0 else 0.0, axis=1 | |
) | |
else: | |
df_processed['ctr'] = 0.0 | |
# Handle 'is_ad' boolean conversion if it exists | |
if 'is_ad' in df_processed.columns: | |
df_processed['is_ad'] = df_processed['is_ad'].astype(bool) | |
else: | |
df_processed['is_ad'] = False # Assume organic if not specified | |
# Handle 'sentiment' - ensure it's string, fill NaNs | |
if 'sentiment' in df_processed.columns: | |
df_processed['sentiment'] = df_processed['sentiment'].astype(str).fillna('Unknown') | |
else: | |
df_processed['sentiment'] = 'Unknown' | |
# Handle 'media_type' and 'li_eb_label' - ensure string, fill NaNs | |
for col in ['media_type', 'li_eb_label']: | |
if col in df_processed.columns: | |
df_processed[col] = df_processed[col].astype(str).fillna('Unknown') | |
else: | |
df_processed[col] = 'Unknown' | |
return df_processed | |
def _extract_time_series_metrics(self, df_processed: pd.DataFrame) -> List[TimeSeriesMetric]: | |
"""Extracts monthly time-series metrics from processed post data.""" | |
ts_metrics = [] | |
if df_processed.empty or 'published_at' not in df_processed.columns or df_processed['published_at'].isnull().all(): | |
logger.info("Cannot extract time-series metrics for posts: 'published_at' is missing or all null.") | |
return ts_metrics | |
# Filter out rows where 'published_at' is NaT for time-series aggregation | |
df_ts = df_processed.dropna(subset=['published_at']).copy() | |
if df_ts.empty: | |
logger.info("No valid 'published_at' dates for post time-series metrics after filtering NaT.") | |
return ts_metrics | |
df_ts['year_month'] = df_ts['published_at'].dt.strftime('%Y-%m') | |
# Metrics to average monthly | |
metrics_to_agg = { | |
'likeCount': 'mean', 'commentCount': 'mean', 'shareCount': 'mean', | |
'engagement': 'mean', 'impressionCount': 'mean', 'clickCount': 'mean', | |
'engagement_rate': 'mean', 'ctr': 'mean' | |
} | |
# Filter out metrics not present in the DataFrame | |
available_metrics_to_agg = {k: v for k, v in metrics_to_agg.items() if k in df_ts.columns} | |
if not available_metrics_to_agg: | |
logger.info("No standard engagement metric columns found for time-series aggregation.") | |
else: | |
monthly_stats = df_ts.groupby('year_month').agg(available_metrics_to_agg).reset_index() | |
timestamps = monthly_stats['year_month'].tolist() | |
for metric_col, agg_type in available_metrics_to_agg.items(): | |
# Use original column name, or a more descriptive one like "avg_monthly_likes" | |
ts_metrics.append(TimeSeriesMetric( | |
metric_name=f"avg_monthly_{metric_col.lower()}", | |
values=monthly_stats[metric_col].fillna(0).tolist(), | |
timestamps=timestamps, | |
metric_type="time_series", | |
time_granularity="monthly", | |
unit="%" if "_rate" in metric_col or "ctr" in metric_col else "count" | |
)) | |
# Time series for sentiment distribution (count of posts by sentiment per month) | |
if 'sentiment' in df_ts.columns and df_ts['sentiment'].nunique() > 1 : # if sentiment data exists | |
# Ensure 'sentiment' is not all 'Unknown' | |
if not (df_ts['sentiment'] == 'Unknown').all(): | |
sentiment_by_month = df_ts.groupby(['year_month', 'sentiment']).size().unstack(fill_value=0) | |
for sentiment_value in sentiment_by_month.columns: | |
if sentiment_value == 'Unknown' and (sentiment_by_month[sentiment_value] == 0).all(): | |
continue # Skip if 'Unknown' sentiment has no posts | |
ts_metrics.append(TimeSeriesMetric( | |
metric_name=f"monthly_post_count_sentiment_{str(sentiment_value).lower().replace(' ', '_')}", | |
values=sentiment_by_month[sentiment_value].tolist(), | |
timestamps=sentiment_by_month.index.tolist(), # year_month is the index | |
metric_type="time_series", | |
time_granularity="monthly", | |
unit="count" | |
)) | |
else: | |
logger.info("Sentiment data is all 'Unknown', skipping sentiment time series.") | |
# Time series for post count | |
monthly_post_counts = df_ts.groupby('year_month').size().reset_index(name='post_count') | |
if not monthly_post_counts.empty: | |
ts_metrics.append(TimeSeriesMetric( | |
metric_name="monthly_post_count", | |
values=monthly_post_counts['post_count'].tolist(), | |
timestamps=monthly_post_counts['year_month'].tolist(), | |
metric_type="time_series", | |
time_granularity="monthly", | |
unit="count" | |
)) | |
return ts_metrics | |
def _calculate_aggregate_metrics(self, df_processed: pd.DataFrame) -> Dict[str, Any]: | |
"""Calculates aggregate performance metrics for posts.""" | |
agg_metrics = {} | |
if df_processed.empty: | |
return agg_metrics | |
# Overall averages and totals | |
metric_cols_for_agg = ['likeCount', 'commentCount', 'shareCount', 'engagement', | |
'impressionCount', 'clickCount', 'engagement_rate', 'ctr'] | |
for col in metric_cols_for_agg: | |
if col in df_processed.columns and pd.api.types.is_numeric_dtype(df_processed[col]): | |
agg_metrics[f'overall_avg_{col.lower()}'] = float(df_processed[col].mean()) | |
if col not in ['engagement_rate', 'ctr']: # Totals make sense for counts | |
agg_metrics[f'overall_total_{col.lower()}'] = float(df_processed[col].sum()) | |
agg_metrics['total_posts_analyzed'] = float(len(df_processed)) | |
# Posting frequency (posts per week) | |
if 'published_at' in df_processed.columns and not df_processed['published_at'].isnull().all(): | |
df_dated = df_processed.dropna(subset=['published_at']).sort_values('published_at') | |
if len(df_dated) > 1: | |
# Calculate total duration in days | |
duration_days = (df_dated['published_at'].max() - df_dated['published_at'].min()).days | |
if duration_days > 0: | |
agg_metrics['avg_posts_per_week'] = float(len(df_dated) / (duration_days / 7.0)) | |
elif len(df_dated) > 0: # All posts on the same day or within a day | |
agg_metrics['avg_posts_per_week'] = float(len(df_dated) * 7) # Extrapolate | |
elif len(df_dated) == 1: | |
agg_metrics['avg_posts_per_week'] = 7.0 # One post, extrapolate to 7 per week | |
# Performance by media type and topic (as tables/structured dicts) | |
agg_metrics['performance_by_media_type'] = self._create_performance_table(df_processed, 'media_type') | |
agg_metrics['performance_by_topic'] = self._create_performance_table(df_processed, 'li_eb_label') | |
return agg_metrics | |
def _create_performance_table(self, df: pd.DataFrame, group_column: str) -> Dict[str, Any]: | |
"""Helper to create a structured performance table for categorical analysis.""" | |
if group_column not in df.columns or df[group_column].isnull().all() or (df[group_column] == 'Unknown').all(): | |
return {"message": f"No data or only 'Unknown' values for grouping by {group_column}."} | |
# Filter out 'Unknown' category if it's the only one or for cleaner tables | |
df_filtered = df[df[group_column] != 'Unknown'] | |
if df_filtered.empty: # If filtering 'Unknown' leaves no data, use original df but acknowledge | |
df_filtered = df | |
logger.info(f"Performance table for {group_column} includes 'Unknown' as it's the only/main category.") | |
# Define metrics to aggregate | |
agg_config = { | |
'engagement': 'mean', | |
'impressionCount': 'mean', | |
'clickCount': 'mean', | |
'likeCount': 'mean', | |
'commentCount': 'mean', | |
'shareCount': 'mean', | |
'engagement_rate': 'mean', | |
'ctr': 'mean', | |
'published_at': 'count' # To get number of posts per category | |
} | |
# Filter config for columns that actually exist in df_filtered | |
valid_agg_config = {k: v for k, v in agg_config.items() if k in df_filtered.columns or k == 'published_at'} # 'published_at' for count | |
if not valid_agg_config or 'published_at' not in valid_agg_config : # Need at least one metric or count | |
return {"message": f"Not enough relevant metric columns to create performance table for {group_column}."} | |
try: | |
# Group by the specified column and aggregate | |
# Rename 'published_at' count to 'num_posts' for clarity | |
grouped = df_filtered.groupby(group_column).agg(valid_agg_config).rename( | |
columns={'published_at': 'num_posts'} | |
).reset_index() | |
# Sort by a primary engagement metric, e.g., average engagement rate or num_posts | |
sort_key = 'num_posts' | |
if 'engagement_rate' in grouped.columns: | |
sort_key = 'engagement_rate' | |
elif 'engagement' in grouped.columns: | |
sort_key = 'engagement' | |
grouped = grouped.sort_values(by=sort_key, ascending=False) | |
# Prepare for JSON serializable output | |
table_data = [] | |
for _, row in grouped.iterrows(): | |
row_dict = {'category': row[group_column]} | |
for col in grouped.columns: | |
if col == group_column: continue | |
value = row[col] | |
if isinstance(value, (int, float)): | |
if "_rate" in col or "ctr" in col: | |
row_dict[col] = f"{value:.2%}" # Percentage | |
else: | |
row_dict[col] = round(value, 2) if isinstance(value, float) else value | |
else: | |
row_dict[col] = str(value) | |
table_data.append(row_dict) | |
return { | |
"grouping_column": group_column, | |
"columns_reported": [col for col in grouped.columns.tolist() if col != group_column], | |
"data": table_data, | |
"note": f"Top categories by {sort_key}." | |
} | |
except Exception as e: | |
logger.error(f"Error creating performance table for {group_column}: {e}", exc_info=True) | |
return {"error": f"Could not generate table for {group_column}: {e}"} | |
def _extract_categorical_metrics(self, df_processed: pd.DataFrame) -> Dict[str, Any]: | |
"""Extracts distributions and other categorical insights for posts.""" | |
cat_metrics = {} | |
if df_processed.empty: | |
return cat_metrics | |
# Media type distribution | |
if 'media_type' in df_processed.columns and df_processed['media_type'].nunique() > 0: | |
cat_metrics['media_type_distribution'] = df_processed['media_type'].value_counts(normalize=True).apply(lambda x: f"{x:.2%}").to_dict() | |
cat_metrics['media_type_counts'] = df_processed['media_type'].value_counts().to_dict() | |
# Topic distribution (li_eb_label) | |
if 'li_eb_label' in df_processed.columns and df_processed['li_eb_label'].nunique() > 0: | |
cat_metrics['topic_distribution'] = df_processed['li_eb_label'].value_counts(normalize=True).apply(lambda x: f"{x:.2%}").to_dict() | |
cat_metrics['topic_counts'] = df_processed['li_eb_label'].value_counts().to_dict() | |
# Sentiment distribution | |
if 'sentiment' in df_processed.columns and df_processed['sentiment'].nunique() > 0: | |
cat_metrics['sentiment_distribution'] = df_processed['sentiment'].value_counts(normalize=True).apply(lambda x: f"{x:.2%}").to_dict() | |
cat_metrics['sentiment_counts'] = df_processed['sentiment'].value_counts().to_dict() | |
# Ad vs. Organic performance summary | |
if 'is_ad' in df_processed.columns: | |
ad_summary = {} | |
for ad_status in [True, False]: | |
subset = df_processed[df_processed['is_ad'] == ad_status] | |
if not subset.empty: | |
label = "ad" if ad_status else "organic" | |
ad_summary[f'{label}_post_count'] = int(len(subset)) | |
ad_summary[f'{label}_avg_engagement_rate'] = float(subset['engagement_rate'].mean()) | |
ad_summary[f'{label}_avg_impressions'] = float(subset['impressionCount'].mean()) | |
ad_summary[f'{label}_avg_ctr'] = float(subset['ctr'].mean()) | |
if ad_summary: | |
cat_metrics['ad_vs_organic_summary'] = ad_summary | |
return cat_metrics | |
def _extract_time_periods(self, df_processed: pd.DataFrame) -> List[str]: | |
"""Extracts unique year-month time periods covered by the post data.""" | |
if df_processed.empty or 'published_at' not in df_processed.columns or df_processed['published_at'].isnull().all(): | |
return ["Data period not available or N/A"] | |
# Use already created 'year_month' if available from preprocessing, or derive it | |
if 'year_month' in df_processed.columns: | |
periods = sorted(df_processed['year_month'].dropna().unique().tolist(), reverse=True) | |
elif 'published_at' in df_processed.columns: # Derive if not present | |
dates = df_processed['published_at'].dropna() | |
if not dates.empty: | |
periods = sorted(dates.dt.strftime('%Y-%m').unique().tolist(), reverse=True) | |
else: return ["N/A"] | |
else: return ["N/A"] | |
return periods[:12] # Return up to the last 12 months | |
def analyze_post_data(self, post_df: pd.DataFrame) -> AgentMetrics: | |
""" | |
Generates comprehensive post performance analysis. | |
""" | |
if post_df is None or post_df.empty: | |
logger.warning("Post DataFrame is empty. Returning empty metrics.") | |
return AgentMetrics( | |
agent_name=self.AGENT_NAME, | |
analysis_summary="No post data provided for analysis.", | |
time_periods_covered=["N/A"] | |
) | |
# 1. Preprocess data | |
df_processed = self._preprocess_post_data(post_df) | |
if df_processed.empty and not post_df.empty : # Preprocessing resulted in empty df | |
logger.warning("Post DataFrame became empty after preprocessing. Original data might have issues.") | |
return AgentMetrics( | |
agent_name=self.AGENT_NAME, | |
analysis_summary="Post data could not be processed (e.g., all dates invalid).", | |
time_periods_covered=["N/A"] | |
) | |
elif df_processed.empty and post_df.empty: # Was already empty | |
# This case is handled by the initial check, but as a safeguard: | |
return AgentMetrics(agent_name=self.AGENT_NAME, analysis_summary="No post data provided.") | |
# 2. Generate textual analysis using PandasAI (similar to follower agent) | |
df_description_for_pandasai = "LinkedIn post performance data. Key columns: 'published_at' (date of post), 'media_type' (e.g., IMAGE, VIDEO, ARTICLE), 'li_eb_label' (content topic/pillar), 'likeCount', 'commentCount', 'shareCount', 'engagement' (sum of reactions, comments, shares), 'impressionCount', 'clickCount', 'sentiment' (post sentiment), 'is_ad' (boolean), 'engagement_rate', 'ctr'." | |
analysis_result_text = "PandasAI analysis for posts could not be performed." | |
try: | |
# Ensure PandasAI is configured | |
pandas_ai_df = pai.DataFrame(df_processed, description=df_description_for_pandasai) | |
analysis_query = f""" | |
Analyze the provided LinkedIn post performance data. Focus on: | |
1. Monthly trends for key metrics (engagement, impressions, engagement rate, CTR). | |
2. Performance comparison by 'media_type' and 'li_eb_label'. Which ones are most effective? | |
3. Impact of posting frequency (if derivable from 'published_at' timestamps). | |
4. Sentiment trends and distribution. | |
5. Differences in performance between ad posts ('is_ad'=True) and organic posts. | |
Provide a concise summary of findings and actionable recommendations. | |
""" | |
def chat_operation(): | |
if not pai.config.llm: | |
logger.warning("PandasAI LLM not configured for post agent. Attempting to configure.") | |
from utils.pandasai_setup import configure_pandasai | |
configure_pandasai(self.api_key, self.model_name) | |
if not pai.config.llm: | |
raise RuntimeError("PandasAI LLM could not be configured for post chat operation.") | |
logger.info(f"Executing PandasAI chat for post analysis with LLM: {pai.config.llm}") | |
return pandas_ai_df.chat(analysis_query) | |
analysis_result_raw = self.retry_mechanism.retry_with_backoff( | |
func=chat_operation, max_retries=2, base_delay=2.0, exceptions=(Exception,) | |
) | |
analysis_result_text = str(analysis_result_raw) if analysis_result_raw else "No textual analysis for posts generated by PandasAI." | |
logger.info("Post performance analysis via PandasAI completed.") | |
except Exception as e: | |
logger.error(f"Post analysis with PandasAI failed: {e}", exc_info=True) | |
analysis_result_text = f"Post analysis using PandasAI failed. Error: {str(e)[:200]}" | |
# 3. Extract structured metrics | |
time_series_metrics = self._extract_time_series_metrics(df_processed) | |
aggregate_metrics = self._calculate_aggregate_metrics(df_processed) | |
categorical_metrics = self._extract_categorical_metrics(df_processed) | |
time_periods = self._extract_time_periods(df_processed) | |
return AgentMetrics( | |
agent_name=self.AGENT_NAME, | |
analysis_summary=analysis_result_text[:2000], | |
time_series_metrics=time_series_metrics, | |
aggregate_metrics=aggregate_metrics, | |
categorical_metrics=categorical_metrics, | |
time_periods_covered=time_periods, | |
data_sources_used=[f"post_df (shape: {post_df.shape}) -> df_processed (shape: {df_processed.shape})"] | |
) | |
if __name__ == '__main__': | |
try: | |
from utils.logging_config import setup_logging | |
setup_logging() | |
logger.info("Logging setup for EnhancedPostPerformanceAgent test.") | |
except ImportError: | |
logging.basicConfig(level=logging.INFO) | |
logger.warning("Could not import setup_logging. Using basicConfig.") | |
MOCK_API_KEY = os.environ.get("GOOGLE_API_KEY", "test_api_key_posts") | |
MODEL_NAME = DEFAULT_AGENT_MODEL | |
try: | |
from utils.pandasai_setup import configure_pandasai | |
if MOCK_API_KEY != "test_api_key_posts": | |
configure_pandasai(MOCK_API_KEY, MODEL_NAME) | |
logger.info("PandasAI configured for testing EnhancedPostPerformanceAgent.") | |
else: | |
logger.warning("Using mock API key for posts. PandasAI chat will likely fail or use a mock.") | |
class MockPandasAIDataFrame: | |
def __init__(self, df, description): self.df = df; self.description = description | |
def chat(self, query): return f"Mock PandasAI post response to: {query}" | |
pai.DataFrame = MockPandasAIDataFrame | |
except ImportError: | |
logger.error("utils.pandasai_setup not found. PandasAI will not be configured for posts.") | |
class MockPandasAIDataFrame: | |
def __init__(self, df, description): self.df = df; self.description = description | |
def chat(self, query): return f"Mock PandasAI post response to: {query}" | |
pai.DataFrame = MockPandasAIDataFrame | |
sample_post_data = { | |
'published_at': pd.to_datetime(['2023-01-15', '2023-01-20', '2023-02-10', '2023-02-25', '2023-03-05', None]), | |
'media_type': ['IMAGE', 'VIDEO', 'IMAGE', 'ARTICLE', 'IMAGE', 'IMAGE'], | |
'li_eb_label': ['Product Update', 'Company Culture', 'Product Update', 'Industry Insights', 'Company Culture', 'Product Update'], | |
'likeCount': [100, 150, 120, 80, 200, 50], | |
'commentCount': [10, 20, 15, 5, 25, 3], | |
'shareCount': [5, 10, 8, 2, 12, 1], | |
'engagement': [115, 180, 143, 87, 237, 54], # Sum of likes, comments, shares | |
'impressionCount': [1000, 1500, 1200, 900, 2000, 600], | |
'clickCount': [50, 70, 60, 30, 90, 20], | |
'sentiment': ['Positive π', 'Positive π', 'Neutral π', 'Positive π', 'Negative π', 'Positive π'], | |
'is_ad': [False, False, True, False, False, True] | |
} | |
sample_df_posts = pd.DataFrame(sample_post_data) | |
post_agent = EnhancedPostPerformanceAgent(api_key=MOCK_API_KEY, model_name=MODEL_NAME) | |
logger.info("Analyzing sample post data...") | |
post_metrics_result = post_agent.analyze_post_data(sample_df_posts) | |
print("\n--- EnhancedPostPerformanceAgent Results ---") | |
print(f"Agent Name: {post_metrics_result.agent_name}") | |
print(f"Analysis Summary: {post_metrics_result.analysis_summary}") | |
print("\nTime Series Metrics (Post):") | |
for ts_metric in post_metrics_result.time_series_metrics: | |
print(f" - {ts_metric.metric_name}: {len(ts_metric.values)} data points, e.g., {ts_metric.values[:3]} for ts {ts_metric.timestamps[:3]} (Unit: {ts_metric.unit})") | |
print("\nAggregate Metrics (Post):") | |
for key, value in post_metrics_result.aggregate_metrics.items(): | |
if isinstance(value, dict) and 'data' in value: # Performance table | |
print(f" - {key}: (Table - {value.get('grouping_column', '')}) - {len(value['data'])} categories") | |
for item in value['data'][:1]: # Print first item for brevity | |
print(f" Example Category '{item.get('category')}': { {k:v for k,v in item.items() if k!='category'} }") | |
else: | |
print(f" - {key}: {value}") | |
print("\nCategorical Metrics (Post):") | |
for key, value in post_metrics_result.categorical_metrics.items(): | |
print(f" - {key}:") | |
if isinstance(value, dict): | |
for sub_key, sub_value in list(value.items())[:2]: | |
print(f" - {sub_key}: {sub_value}") | |
else: | |
print(f" {value}") | |
print(f"\nTime Periods Covered (Post): {post_metrics_result.time_periods_covered}") | |
# Test with empty DataFrame | |
logger.info("\n--- Testing Post Agent with empty DataFrame ---") | |
empty_post_metrics = post_agent.analyze_post_data(pd.DataFrame()) | |
print(f"Empty Post DF Analysis Summary: {empty_post_metrics.analysis_summary}") | |