Spaces:
Running
Running
File size: 8,435 Bytes
8ed7829 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# run_agentic_pipeline.py
import asyncio
import os
import json
import logging
from datetime import datetime
import pandas as pd
from typing import Dict, Any, Optional
# Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder
# If 'insight_and_tasks' is not in python path, you might need to adjust sys.path
# For example, if insight_and_tasks is a sibling of the dir containing this file:
# import sys
# script_dir = os.path.dirname(os.path.abspath(__file__))
# project_root = os.path.dirname(script_dir) # Or navigate to the correct root
# sys.path.insert(0, project_root)
# Imports from your project structure
from insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
# setup_logging might be called in app.py, if not, call it here or ensure it's called once.
# from insight_and_tasks.utils.logging_config import setup_logging
from .analytics_data_processing import prepare_filtered_analytics_data
# Placeholder for UI generator import - to be created later
# from .insights_ui_generator import format_orchestration_results_for_ui
logger = logging.getLogger(__name__)
# GOOGLE_API_KEY should be set in the environment where app.py runs
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
async def run_full_analytics_orchestration(
token_state: Dict[str, Any],
date_filter_selection: str,
custom_start_date: Optional[datetime],
custom_end_date: Optional[datetime]
) -> Optional[Dict[str, Any]]:
"""
Runs the full analytics pipeline using data from token_state and date filters,
and returns the raw orchestration results.
Args:
token_state: Gradio token_state containing raw data and config.
date_filter_selection: String for date filter type.
custom_start_date: Optional custom start date.
custom_end_date: Optional custom end date.
Returns:
A dictionary containing the results from the analytics orchestrator,
or None if a critical error occurs.
"""
if not GOOGLE_API_KEY:
logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
return None
logger.info("Starting full analytics orchestration process...")
# 1. Prepare and filter data
try:
(
filtered_posts_df,
filtered_mentions_df,
_date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series
raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics
_start_dt, # Filtered start date, for logging or context if needed
_end_dt # Filtered end date
) = prepare_filtered_analytics_data(
token_state, date_filter_selection, custom_start_date, custom_end_date
)
logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})")
except Exception as e:
logger.error(f"Error during data preparation: {e}", exc_info=True)
return None
# Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous
if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
# Depending on requirements, you might return a specific message or empty results structure.
# 2. Initialize and run the orchestrator
try:
# You can pass a specific model name or let the orchestrator use its default
llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state
orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
api_key=GOOGLE_API_KEY,
llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default
current_date_for_tasks=datetime.utcnow().date()
)
logger.info("Orchestrator initialized. Generating full analysis and tasks...")
# The orchestrator expects the primary follower stats DF to be the one it can process for
# time-series ('follower_gains_monthly') and demographics.
# The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing.
orchestration_results = await orchestrator.generate_full_analysis_and_tasks(
follower_stats_df=raw_follower_stats_df, # Pass the full history for followers
post_df=filtered_posts_df,
mentions_df=filtered_mentions_df
)
logger.info("Orchestration process completed.")
return orchestration_results
except Exception as e:
logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
return None
# Example of how app.py might call this (for testing this module standalone)
if __name__ == "__main__":
# This block is for testing `run_analytics_pipeline.py` directly.
# In a real scenario, `app.py` would import and call `run_full_analytics_orchestration`.
# Ensure logging is set up for standalone test
from insight_and_tasks.utils.logging_config import setup_logging
setup_logging()
if not GOOGLE_API_KEY:
print("Please set the GOOGLE_API_KEY environment variable to run this test.")
else:
# Create mock token_state and filter parameters
mock_token_state = {
"bubble_posts_df": pd.DataFrame({
'id': [1, 2, 3], 'published_at': pd.to_datetime(['2023-01-01', '2023-07-15', '2023-12-31']),
'li_eb_label': ['A', 'B', 'A'], 'media_type': ['X', 'Y', 'X'], 'is_ad': [False, True, False],
'sentiment': ['pos', 'neg', 'neu'], 'engagement': [10,5,8], 'impressionCount': [100,50,80],
'clickCount': [1,2,3], 'likeCount': [8,3,6], 'commentCount': [1,1,1], 'shareCount': [1,1,1]
}),
"bubble_post_stats_df": pd.DataFrame({'post_id': [1, 2, 3]}), # Simplified
"bubble_mentions_df": pd.DataFrame({
'date': pd.to_datetime(['2023-06-01', '2023-08-20']), 'sentiment_label': ['Positive π', 'Negative π']
}),
"bubble_follower_stats_df": pd.DataFrame({
'category_name': ['2023-01-01', 'Industry A', '2023-02-01'],
'follower_count_organic': [100, 500, 120],
'follower_count_paid': [10, 50, 20],
'follower_count_type': ['follower_gains_monthly', 'follower_industry', 'follower_gains_monthly']
}),
"config_date_col_posts": "published_at",
"config_date_col_mentions": "date",
}
mock_date_filter = "Ultimi 365 Giorni" # Example, adjust prepare_filtered_analytics_data if new options
# For "Ultimi 365 Giorni", prepare_filtered_analytics_data needs to handle it or be updated.
# Let's use a known filter for testing:
mock_date_filter = "Sempre"
mock_custom_start = None
mock_custom_end = None
logger.info("Running standalone test of run_full_analytics_orchestration...")
async def test_run():
results = await run_full_analytics_orchestration(
mock_token_state, mock_date_filter, mock_custom_start, mock_custom_end
)
if results:
print("\n\n" + "="*30 + " MOCK ORCHESTRATION RESULTS " + "="*30)
print("\n--- Comprehensive Analysis Report ---")
print(results.get("comprehensive_analysis_report", "N/A"))
print("\n--- Actionable OKRs and Tasks ---")
okrs_data = results.get("actionable_okrs_and_tasks")
if okrs_data:
print(json.dumps(okrs_data, indent=2))
else:
print("N/A")
# Optionally print detailed metrics if needed for debugging
# print("\n--- Detailed Agent Metrics ---")
# print(json.dumps(results.get("detailed_metrics"), indent=2, default=str))
else:
print("Standalone test: Orchestration returned no results or failed.")
asyncio.run(test_run())
|