LinkedinMonitor / run_agentic_pipeline.py
GuglielmoTor's picture
Update run_agentic_pipeline.py
e4cf8e8 verified
raw
history blame
9.92 kB
# run_agentic_pipeline.py
import asyncio
import os
import json
import logging
from datetime import datetime
import pandas as pd
from typing import Dict, Any, Optional
import gradio as gr
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
# Imports from your project structure
from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
from data_processing.analytics_data_processing import prepare_filtered_analytics_data
try:
from ui.insights_ui_generator import (
format_report_to_markdown,
extract_key_results_for_selection,
format_single_okr_for_display
)
AGENTIC_MODULES_LOADED = True
except ImportError as e:
logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
AGENTIC_MODULES_LOADED = False
async def run_full_analytics_orchestration_streaming(*args, **kwargs):
yield None
def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable."
def extract_key_results_for_selection(okrs_dict): return []
def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable."
logger = logging.getLogger(__name__)
async def run_full_analytics_orchestration_streaming(
token_state: Dict[str, Any],
date_filter_selection: str,
custom_start_date: Optional[datetime],
custom_end_date: Optional[datetime]
):
"""
Runs the full analytics pipeline with streaming results.
Yields results as they become available.
"""
if not GOOGLE_API_KEY:
logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
return
logger.info("Starting streaming analytics orchestration process...")
# 1. Prepare and filter data
try:
(
filtered_posts_df,
filtered_mentions_df,
_date_filtered_follower_stats_df,
raw_follower_stats_df,
_start_dt,
_end_dt
) = prepare_filtered_analytics_data(
token_state, date_filter_selection, custom_start_date, custom_end_date
)
logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})")
except Exception as e:
logger.error(f"Error during data preparation: {e}", exc_info=True)
return
# Check if essential dataframes are empty
if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
# 2. Initialize and run the orchestrator with streaming
try:
llm_model_for_run = "gemini-2.5-flash-preview-05-20"
orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
api_key=GOOGLE_API_KEY,
llm_model_name=llm_model_for_run,
current_date_for_tasks=datetime.utcnow().date()
)
logger.info("Orchestrator initialized. Starting streaming analysis...")
# Use the new streaming method
async for result in orchestrator.generate_full_analysis_and_tasks_streaming(
follower_stats_df=raw_follower_stats_df,
post_df=filtered_posts_df,
mentions_df=filtered_mentions_df
):
yield result
except Exception as e:
logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
return
async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st, selected_key_result_ids_st, key_results_for_selection_st):
logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
# Initial state before pipeline runs or if skipped
initial_yield = (
gr.update(value="Pipeline AI: In attesa dei dati necessari..."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Pipeline AI: In attesa dei dati necessari..."),
orchestration_raw_results_st,
selected_key_result_ids_st,
key_results_for_selection_st,
"Pipeline AI: In attesa dei dati..."
)
if not current_token_state_val or not current_token_state_val.get("token"):
logging.info("Agentic pipeline: Token not available in token_state. Skipping.")
yield initial_yield
return
logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
# Update status to indicate processing
yield (
gr.update(value="Analisi AI (Sempre) in corso..."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
orchestration_raw_results_st,
selected_key_result_ids_st,
key_results_for_selection_st,
"Esecuzione pipeline AI (Sempre)..."
)
if not AGENTIC_MODULES_LOADED:
logging.warning("Agentic modules not loaded. Skipping autonomous pipeline.")
yield (
gr.update(value="Moduli AI non caricati. Report non disponibile."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Moduli AI non caricati. OKR non disponibili."),
None, [], [], "Pipeline AI: Moduli non caricati."
)
return
try:
# Parameters for 'Sempre' filter for the agentic pipeline
date_filter_val_agentic = "Sempre"
custom_start_val_agentic = None
custom_end_val_agentic = None
# Use the streaming orchestration
async for orchestration_output in run_full_analytics_orchestration_streaming(
current_token_state_val,
date_filter_val_agentic,
custom_start_val_agentic,
custom_end_val_agentic
):
if not orchestration_output:
continue
status = orchestration_output.get("status", "unknown")
if status == "report_ready":
# Report is ready, but OKRs are not yet
logging.info("Report ready, displaying preliminary results...")
report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
yield (
agentic_report_md_update,
gr.update(choices=[], value=[], interactive=False), # Keep OKRs disabled
gr.update(value="OKR ancora in elaborazione..."),
orchestration_output, # Store partial results
selected_key_result_ids_st,
key_results_for_selection_st,
"Report AI completato. Generazione OKR in corso..."
)
elif status == "complete":
# Everything is ready
logging.info("Complete results ready, displaying final output...")
orchestration_results_update = orchestration_output
report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks')
krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
krs_for_selection_update = krs_for_ui_selection_list
kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True)
# Display all OKRs by default
all_okrs_md_parts = []
if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
if not all_okrs_md_parts:
okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
else:
okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
selected_krs_update = []
agentic_status_text = "Pipeline AI (Sempre) completata completamente."
yield (
agentic_report_md_update,
key_results_cbg_update,
okr_detail_display_md_update,
orchestration_results_update,
selected_krs_update,
krs_for_selection_update,
agentic_status_text
)
break # Final result yielded, exit the loop
except Exception as e:
logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
yield (
gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
gr.update(choices=[], value=[], interactive=False),
gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
None, [], [], agentic_status_text
)