# run_agentic_pipeline.py import asyncio import os import json import logging from datetime import datetime import pandas as pd from typing import Dict, Any, Optional # Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder # If 'insight_and_tasks' is not in python path, you might need to adjust sys.path # For example, if insight_and_tasks is a sibling of the dir containing this file: # import sys # script_dir = os.path.dirname(os.path.abspath(__file__)) # project_root = os.path.dirname(script_dir) # Or navigate to the correct root # sys.path.insert(0, project_root) # Imports from your project structure from insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator # setup_logging might be called in app.py, if not, call it here or ensure it's called once. # from insight_and_tasks.utils.logging_config import setup_logging from analytics_data_processing import prepare_filtered_analytics_data # Placeholder for UI generator import - to be created later # from .insights_ui_generator import format_orchestration_results_for_ui logger = logging.getLogger(__name__) # GOOGLE_API_KEY should be set in the environment where app.py runs GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY") async def run_full_analytics_orchestration( token_state: Dict[str, Any], date_filter_selection: str, custom_start_date: Optional[datetime], custom_end_date: Optional[datetime] ) -> Optional[Dict[str, Any]]: """ Runs the full analytics pipeline using data from token_state and date filters, and returns the raw orchestration results. Args: token_state: Gradio token_state containing raw data and config. date_filter_selection: String for date filter type. custom_start_date: Optional custom start date. custom_end_date: Optional custom end date. Returns: A dictionary containing the results from the analytics orchestrator, or None if a critical error occurs. """ if not GOOGLE_API_KEY: logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.") return None logger.info("Starting full analytics orchestration process...") # 1. Prepare and filter data try: ( filtered_posts_df, filtered_mentions_df, _date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics _start_dt, # Filtered start date, for logging or context if needed _end_dt # Filtered end date ) = prepare_filtered_analytics_data( token_state, date_filter_selection, custom_start_date, custom_end_date ) logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})") except Exception as e: logger.error(f"Error during data preparation: {e}", exc_info=True) return None # Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty: logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.") # Depending on requirements, you might return a specific message or empty results structure. # 2. Initialize and run the orchestrator try: # You can pass a specific model name or let the orchestrator use its default llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state orchestrator = EnhancedLinkedInAnalyticsOrchestrator( api_key=GOOGLE_API_KEY, llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default current_date_for_tasks=datetime.utcnow().date() ) logger.info("Orchestrator initialized. Generating full analysis and tasks...") # The orchestrator expects the primary follower stats DF to be the one it can process for # time-series ('follower_gains_monthly') and demographics. # The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing. orchestration_results = await orchestrator.generate_full_analysis_and_tasks( follower_stats_df=raw_follower_stats_df, # Pass the full history for followers post_df=filtered_posts_df, mentions_df=filtered_mentions_df ) logger.info("Orchestration process completed.") return orchestration_results except Exception as e: logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True) return None