# run_agentic_pipeline.py import asyncio import os import json import logging from datetime import datetime import pandas as pd from typing import Dict, Any, Optional import gradio as gr os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False" GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY") os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY # Imports from your project structure from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator from data_processing.analytics_data_processing import prepare_filtered_analytics_data try: from ui.insights_ui_generator import ( format_report_to_markdown, extract_key_results_for_selection, format_single_okr_for_display ) AGENTIC_MODULES_LOADED = True except ImportError as e: logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.") AGENTIC_MODULES_LOADED = False async def run_full_analytics_orchestration_streaming(*args, **kwargs): yield None def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." def extract_key_results_for_selection(okrs_dict): return [] def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." logger = logging.getLogger(__name__) async def run_full_analytics_orchestration_streaming( token_state: Dict[str, Any], date_filter_selection: str, custom_start_date: Optional[datetime], custom_end_date: Optional[datetime] ): """ Runs the full analytics pipeline with streaming results. Yields results as they become available. """ if not GOOGLE_API_KEY: logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.") return logger.info("Starting streaming analytics orchestration process...") # 1. Prepare and filter data try: ( filtered_posts_df, filtered_mentions_df, _date_filtered_follower_stats_df, raw_follower_stats_df, _start_dt, _end_dt ) = prepare_filtered_analytics_data( token_state, date_filter_selection, custom_start_date, custom_end_date ) logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})") except Exception as e: logger.error(f"Error during data preparation: {e}", exc_info=True) return # Check if essential dataframes are empty if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty: logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.") # 2. Initialize and run the orchestrator with streaming try: llm_model_for_run = "gemini-2.5-flash-preview-05-20" orchestrator = EnhancedLinkedInAnalyticsOrchestrator( api_key=GOOGLE_API_KEY, llm_model_name=llm_model_for_run, current_date_for_tasks=datetime.utcnow().date() ) logger.info("Orchestrator initialized. Starting streaming analysis...") # Use the new streaming method async for result in orchestrator.generate_full_analysis_and_tasks_streaming( follower_stats_df=raw_follower_stats_df, post_df=filtered_posts_df, mentions_df=filtered_mentions_df ): yield result except Exception as e: logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True) return async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st, selected_key_result_ids_st, key_results_for_selection_st): logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}") # Initial state before pipeline runs or if skipped initial_yield = ( gr.update(value="Pipeline AI: In attesa dei dati necessari..."), gr.update(choices=[], value=[], interactive=False), gr.update(value="Pipeline AI: In attesa dei dati necessari..."), orchestration_raw_results_st, selected_key_result_ids_st, key_results_for_selection_st, "Pipeline AI: In attesa dei dati..." ) if not current_token_state_val or not current_token_state_val.get("token"): logging.info("Agentic pipeline: Token not available in token_state. Skipping.") yield initial_yield return logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.") # Update status to indicate processing yield ( gr.update(value="Analisi AI (Sempre) in corso..."), gr.update(choices=[], value=[], interactive=False), gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."), orchestration_raw_results_st, selected_key_result_ids_st, key_results_for_selection_st, "Esecuzione pipeline AI (Sempre)..." ) if not AGENTIC_MODULES_LOADED: logging.warning("Agentic modules not loaded. Skipping autonomous pipeline.") yield ( gr.update(value="Moduli AI non caricati. Report non disponibile."), gr.update(choices=[], value=[], interactive=False), gr.update(value="Moduli AI non caricati. OKR non disponibili."), None, [], [], "Pipeline AI: Moduli non caricati." ) return try: # Parameters for 'Sempre' filter for the agentic pipeline date_filter_val_agentic = "Sempre" custom_start_val_agentic = None custom_end_val_agentic = None # Use the streaming orchestration async for orchestration_output in run_full_analytics_orchestration_streaming( current_token_state_val, date_filter_val_agentic, custom_start_val_agentic, custom_end_val_agentic ): if not orchestration_output: continue status = orchestration_output.get("status", "unknown") if status == "report_ready": # Report is ready, but OKRs are not yet logging.info("Report ready, displaying preliminary results...") report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.") agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str)) yield ( agentic_report_md_update, gr.update(choices=[], value=[], interactive=False), # Keep OKRs disabled gr.update(value="OKR ancora in elaborazione..."), orchestration_output, # Store partial results selected_key_result_ids_st, key_results_for_selection_st, "Report AI completato. Generazione OKR in corso..." ) elif status == "complete": # Everything is ready logging.info("Complete results ready, displaying final output...") orchestration_results_update = orchestration_output report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.") agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str)) actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) krs_for_selection_update = krs_for_ui_selection_list kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list] key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True) # Display all OKRs by default all_okrs_md_parts = [] if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list): for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]): all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx)) if not all_okrs_md_parts: okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).") else: okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts)) selected_krs_update = [] agentic_status_text = "Pipeline AI (Sempre) completata completamente." yield ( agentic_report_md_update, key_results_cbg_update, okr_detail_display_md_update, orchestration_results_update, selected_krs_update, krs_for_selection_update, agentic_status_text ) break # Final result yielded, exit the loop except Exception as e: logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True) agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}" yield ( gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"), gr.update(choices=[], value=[], interactive=False), gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"), None, [], [], agentic_status_text )