Spaces:
Running
Running
# run_agentic_pipeline.py | |
import asyncio | |
import os | |
import json | |
import logging | |
from datetime import datetime | |
import pandas as pd | |
from typing import Dict, Any, Optional | |
# Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder | |
# If 'insight_and_tasks' is not in python path, you might need to adjust sys.path | |
# For example, if insight_and_tasks is a sibling of the dir containing this file: | |
# import sys | |
# script_dir = os.path.dirname(os.path.abspath(__file__)) | |
# project_root = os.path.dirname(script_dir) # Or navigate to the correct root | |
# sys.path.insert(0, project_root) | |
# Imports from your project structure | |
from insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator | |
# setup_logging might be called in app.py, if not, call it here or ensure it's called once. | |
# from insight_and_tasks.utils.logging_config import setup_logging | |
from .analytics_data_processing import prepare_filtered_analytics_data | |
# Placeholder for UI generator import - to be created later | |
# from .insights_ui_generator import format_orchestration_results_for_ui | |
logger = logging.getLogger(__name__) | |
# GOOGLE_API_KEY should be set in the environment where app.py runs | |
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY") | |
async def run_full_analytics_orchestration( | |
token_state: Dict[str, Any], | |
date_filter_selection: str, | |
custom_start_date: Optional[datetime], | |
custom_end_date: Optional[datetime] | |
) -> Optional[Dict[str, Any]]: | |
""" | |
Runs the full analytics pipeline using data from token_state and date filters, | |
and returns the raw orchestration results. | |
Args: | |
token_state: Gradio token_state containing raw data and config. | |
date_filter_selection: String for date filter type. | |
custom_start_date: Optional custom start date. | |
custom_end_date: Optional custom end date. | |
Returns: | |
A dictionary containing the results from the analytics orchestrator, | |
or None if a critical error occurs. | |
""" | |
if not GOOGLE_API_KEY: | |
logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.") | |
return None | |
logger.info("Starting full analytics orchestration process...") | |
# 1. Prepare and filter data | |
try: | |
( | |
filtered_posts_df, | |
filtered_mentions_df, | |
_date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series | |
raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics | |
_start_dt, # Filtered start date, for logging or context if needed | |
_end_dt # Filtered end date | |
) = prepare_filtered_analytics_data( | |
token_state, date_filter_selection, custom_start_date, custom_end_date | |
) | |
logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})") | |
except Exception as e: | |
logger.error(f"Error during data preparation: {e}", exc_info=True) | |
return None | |
# Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous | |
if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty: | |
logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.") | |
# Depending on requirements, you might return a specific message or empty results structure. | |
# 2. Initialize and run the orchestrator | |
try: | |
# You can pass a specific model name or let the orchestrator use its default | |
llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state | |
orchestrator = EnhancedLinkedInAnalyticsOrchestrator( | |
api_key=GOOGLE_API_KEY, | |
llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default | |
current_date_for_tasks=datetime.utcnow().date() | |
) | |
logger.info("Orchestrator initialized. Generating full analysis and tasks...") | |
# The orchestrator expects the primary follower stats DF to be the one it can process for | |
# time-series ('follower_gains_monthly') and demographics. | |
# The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing. | |
orchestration_results = await orchestrator.generate_full_analysis_and_tasks( | |
follower_stats_df=raw_follower_stats_df, # Pass the full history for followers | |
post_df=filtered_posts_df, | |
mentions_df=filtered_mentions_df | |
) | |
logger.info("Orchestration process completed.") | |
return orchestration_results | |
except Exception as e: | |
logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True) | |
return None | |
# Example of how app.py might call this (for testing this module standalone) | |
if __name__ == "__main__": | |
# This block is for testing `run_analytics_pipeline.py` directly. | |
# In a real scenario, `app.py` would import and call `run_full_analytics_orchestration`. | |
# Ensure logging is set up for standalone test | |
from insight_and_tasks.utils.logging_config import setup_logging | |
setup_logging() | |
if not GOOGLE_API_KEY: | |
print("Please set the GOOGLE_API_KEY environment variable to run this test.") | |
else: | |
# Create mock token_state and filter parameters | |
mock_token_state = { | |
"bubble_posts_df": pd.DataFrame({ | |
'id': [1, 2, 3], 'published_at': pd.to_datetime(['2023-01-01', '2023-07-15', '2023-12-31']), | |
'li_eb_label': ['A', 'B', 'A'], 'media_type': ['X', 'Y', 'X'], 'is_ad': [False, True, False], | |
'sentiment': ['pos', 'neg', 'neu'], 'engagement': [10,5,8], 'impressionCount': [100,50,80], | |
'clickCount': [1,2,3], 'likeCount': [8,3,6], 'commentCount': [1,1,1], 'shareCount': [1,1,1] | |
}), | |
"bubble_post_stats_df": pd.DataFrame({'post_id': [1, 2, 3]}), # Simplified | |
"bubble_mentions_df": pd.DataFrame({ | |
'date': pd.to_datetime(['2023-06-01', '2023-08-20']), 'sentiment_label': ['Positive π', 'Negative π'] | |
}), | |
"bubble_follower_stats_df": pd.DataFrame({ | |
'category_name': ['2023-01-01', 'Industry A', '2023-02-01'], | |
'follower_count_organic': [100, 500, 120], | |
'follower_count_paid': [10, 50, 20], | |
'follower_count_type': ['follower_gains_monthly', 'follower_industry', 'follower_gains_monthly'] | |
}), | |
"config_date_col_posts": "published_at", | |
"config_date_col_mentions": "date", | |
} | |
mock_date_filter = "Ultimi 365 Giorni" # Example, adjust prepare_filtered_analytics_data if new options | |
# For "Ultimi 365 Giorni", prepare_filtered_analytics_data needs to handle it or be updated. | |
# Let's use a known filter for testing: | |
mock_date_filter = "Sempre" | |
mock_custom_start = None | |
mock_custom_end = None | |
logger.info("Running standalone test of run_full_analytics_orchestration...") | |
async def test_run(): | |
results = await run_full_analytics_orchestration( | |
mock_token_state, mock_date_filter, mock_custom_start, mock_custom_end | |
) | |
if results: | |
print("\n\n" + "="*30 + " MOCK ORCHESTRATION RESULTS " + "="*30) | |
print("\n--- Comprehensive Analysis Report ---") | |
print(results.get("comprehensive_analysis_report", "N/A")) | |
print("\n--- Actionable OKRs and Tasks ---") | |
okrs_data = results.get("actionable_okrs_and_tasks") | |
if okrs_data: | |
print(json.dumps(okrs_data, indent=2)) | |
else: | |
print("N/A") | |
# Optionally print detailed metrics if needed for debugging | |
# print("\n--- Detailed Agent Metrics ---") | |
# print(json.dumps(results.get("detailed_metrics"), indent=2, default=str)) | |
else: | |
print("Standalone test: Orchestration returned no results or failed.") | |
asyncio.run(test_run()) | |