Spaces:
Running
Running
# run_agentic_pipeline.py | |
import asyncio | |
import os | |
import json | |
import logging | |
from datetime import datetime | |
import pandas as pd | |
from typing import Dict, Any, Optional | |
import gradio as gr | |
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False" | |
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY") | |
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY | |
# Imports from your project structure | |
from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator | |
from data_processing.analytics_data_processing import prepare_filtered_analytics_data | |
try: | |
from ui.insights_ui_generator import ( | |
format_report_to_markdown, | |
extract_key_results_for_selection, | |
format_single_okr_for_display | |
) | |
AGENTIC_MODULES_LOADED = True | |
except ImportError as e: | |
logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.") | |
AGENTIC_MODULES_LOADED = False | |
async def run_full_analytics_orchestration_streaming(*args, **kwargs): | |
yield None | |
def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." | |
def extract_key_results_for_selection(okrs_dict): return [] | |
def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." | |
logger = logging.getLogger(__name__) | |
async def run_full_analytics_orchestration_streaming( | |
token_state: Dict[str, Any], | |
date_filter_selection: str, | |
custom_start_date: Optional[datetime], | |
custom_end_date: Optional[datetime] | |
): | |
""" | |
Runs the full analytics pipeline with streaming results. | |
Yields results as they become available. | |
""" | |
if not GOOGLE_API_KEY: | |
logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.") | |
return | |
logger.info("Starting streaming analytics orchestration process...") | |
# 1. Prepare and filter data | |
try: | |
( | |
filtered_posts_df, | |
filtered_mentions_df, | |
_date_filtered_follower_stats_df, | |
raw_follower_stats_df, | |
_start_dt, | |
_end_dt | |
) = prepare_filtered_analytics_data( | |
token_state, date_filter_selection, custom_start_date, custom_end_date | |
) | |
logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})") | |
except Exception as e: | |
logger.error(f"Error during data preparation: {e}", exc_info=True) | |
return | |
# Check if essential dataframes are empty | |
if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty: | |
logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.") | |
# 2. Initialize and run the orchestrator with streaming | |
try: | |
llm_model_for_run = "gemini-2.5-flash-preview-05-20" | |
orchestrator = EnhancedLinkedInAnalyticsOrchestrator( | |
api_key=GOOGLE_API_KEY, | |
llm_model_name=llm_model_for_run, | |
current_date_for_tasks=datetime.utcnow().date() | |
) | |
logger.info("Orchestrator initialized. Starting streaming analysis...") | |
# Use the new streaming method | |
async for result in orchestrator.generate_full_analysis_and_tasks_streaming( | |
follower_stats_df=raw_follower_stats_df, | |
post_df=filtered_posts_df, | |
mentions_df=filtered_mentions_df | |
): | |
yield result | |
except Exception as e: | |
logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True) | |
return | |
async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st, selected_key_result_ids_st, key_results_for_selection_st): | |
logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}") | |
# Initial state before pipeline runs or if skipped | |
initial_yield = ( | |
gr.update(value="Pipeline AI: In attesa dei dati necessari..."), | |
gr.update(choices=[], value=[], interactive=False), | |
gr.update(value="Pipeline AI: In attesa dei dati necessari..."), | |
orchestration_raw_results_st, | |
selected_key_result_ids_st, | |
key_results_for_selection_st, | |
"Pipeline AI: In attesa dei dati..." | |
) | |
if not current_token_state_val or not current_token_state_val.get("token"): | |
logging.info("Agentic pipeline: Token not available in token_state. Skipping.") | |
yield initial_yield | |
return | |
logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.") | |
# Update status to indicate processing | |
yield ( | |
gr.update(value="Analisi AI (Sempre) in corso..."), | |
gr.update(choices=[], value=[], interactive=False), | |
gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."), | |
orchestration_raw_results_st, | |
selected_key_result_ids_st, | |
key_results_for_selection_st, | |
"Esecuzione pipeline AI (Sempre)..." | |
) | |
if not AGENTIC_MODULES_LOADED: | |
logging.warning("Agentic modules not loaded. Skipping autonomous pipeline.") | |
yield ( | |
gr.update(value="Moduli AI non caricati. Report non disponibile."), | |
gr.update(choices=[], value=[], interactive=False), | |
gr.update(value="Moduli AI non caricati. OKR non disponibili."), | |
None, [], [], "Pipeline AI: Moduli non caricati." | |
) | |
return | |
try: | |
# Parameters for 'Sempre' filter for the agentic pipeline | |
date_filter_val_agentic = "Sempre" | |
custom_start_val_agentic = None | |
custom_end_val_agentic = None | |
# Use the streaming orchestration | |
async for orchestration_output in run_full_analytics_orchestration_streaming( | |
current_token_state_val, | |
date_filter_val_agentic, | |
custom_start_val_agentic, | |
custom_end_val_agentic | |
): | |
if not orchestration_output: | |
continue | |
status = orchestration_output.get("status", "unknown") | |
if status == "report_ready": | |
# Report is ready, but OKRs are not yet | |
logging.info("Report ready, displaying preliminary results...") | |
report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.") | |
agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str)) | |
yield ( | |
agentic_report_md_update, | |
gr.update(choices=[], value=[], interactive=False), # Keep OKRs disabled | |
gr.update(value="OKR ancora in elaborazione..."), | |
orchestration_output, # Store partial results | |
selected_key_result_ids_st, | |
key_results_for_selection_st, | |
"Report AI completato. Generazione OKR in corso..." | |
) | |
elif status == "complete": | |
# Everything is ready | |
logging.info("Complete results ready, displaying final output...") | |
orchestration_results_update = orchestration_output | |
report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.") | |
agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str)) | |
actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') | |
krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) | |
krs_for_selection_update = krs_for_ui_selection_list | |
kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list] | |
key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True) | |
# Display all OKRs by default | |
all_okrs_md_parts = [] | |
if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list): | |
for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]): | |
all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx)) | |
if not all_okrs_md_parts: | |
okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).") | |
else: | |
okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts)) | |
selected_krs_update = [] | |
agentic_status_text = "Pipeline AI (Sempre) completata completamente." | |
yield ( | |
agentic_report_md_update, | |
key_results_cbg_update, | |
okr_detail_display_md_update, | |
orchestration_results_update, | |
selected_krs_update, | |
krs_for_selection_update, | |
agentic_status_text | |
) | |
break # Final result yielded, exit the loop | |
except Exception as e: | |
logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True) | |
agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}" | |
yield ( | |
gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"), | |
gr.update(choices=[], value=[], interactive=False), | |
gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"), | |
None, [], [], agentic_status_text | |
) |