Spaces:
Running
Running
File size: 9,917 Bytes
8ed7829 c0a4e63 8ed7829 6d6572c 8ed7829 2a7af3e e82dbcd 8ed7829 055c98e e4cf8e8 055c98e 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 8ed7829 e4cf8e8 6086ed3 e4cf8e8 6086ed3 e4cf8e8 6086ed3 e4cf8e8 6086ed3 e4cf8e8 6086ed3 e4cf8e8 6086ed3 e4cf8e8 6086ed3 e4cf8e8 6086ed3 e4cf8e8 6086ed3 e4cf8e8 6086ed3 e4cf8e8 6086ed3 e4cf8e8 6086ed3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# run_agentic_pipeline.py
import asyncio
import os
import json
import logging
from datetime import datetime
import pandas as pd
from typing import Dict, Any, Optional
import gradio as gr
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
# Imports from your project structure
from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
from data_processing.analytics_data_processing import prepare_filtered_analytics_data
try:
from ui.insights_ui_generator import (
format_report_to_markdown,
extract_key_results_for_selection,
format_single_okr_for_display
)
AGENTIC_MODULES_LOADED = True
except ImportError as e:
logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
AGENTIC_MODULES_LOADED = False
async def run_full_analytics_orchestration_streaming(*args, **kwargs):
yield None
def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable."
def extract_key_results_for_selection(okrs_dict): return []
def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable."
logger = logging.getLogger(__name__)
async def run_full_analytics_orchestration_streaming(
token_state: Dict[str, Any],
date_filter_selection: str,
custom_start_date: Optional[datetime],
custom_end_date: Optional[datetime]
):
"""
Runs the full analytics pipeline with streaming results.
Yields results as they become available.
"""
if not GOOGLE_API_KEY:
logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
return
logger.info("Starting streaming analytics orchestration process...")
# 1. Prepare and filter data
try:
(
filtered_posts_df,
filtered_mentions_df,
_date_filtered_follower_stats_df,
raw_follower_stats_df,
_start_dt,
_end_dt
) = prepare_filtered_analytics_data(
token_state, date_filter_selection, custom_start_date, custom_end_date
)
logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})")
except Exception as e:
logger.error(f"Error during data preparation: {e}", exc_info=True)
return
# Check if essential dataframes are empty
if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
# 2. Initialize and run the orchestrator with streaming
try:
llm_model_for_run = "gemini-2.5-flash-preview-05-20"
orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
api_key=GOOGLE_API_KEY,
llm_model_name=llm_model_for_run,
current_date_for_tasks=datetime.utcnow().date()
)
logger.info("Orchestrator initialized. Starting streaming analysis...")
# Use the new streaming method
async for result in orchestrator.generate_full_analysis_and_tasks_streaming(
follower_stats_df=raw_follower_stats_df,
post_df=filtered_posts_df,
mentions_df=filtered_mentions_df
):
yield result
except Exception as e:
logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
return
async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st, selected_key_result_ids_st, key_results_for_selection_st):
logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
# Initial state before pipeline runs or if skipped
initial_yield = (
gr.update(value="Pipeline AI: In attesa dei dati necessari..."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Pipeline AI: In attesa dei dati necessari..."),
orchestration_raw_results_st,
selected_key_result_ids_st,
key_results_for_selection_st,
"Pipeline AI: In attesa dei dati..."
)
if not current_token_state_val or not current_token_state_val.get("token"):
logging.info("Agentic pipeline: Token not available in token_state. Skipping.")
yield initial_yield
return
logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
# Update status to indicate processing
yield (
gr.update(value="Analisi AI (Sempre) in corso..."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
orchestration_raw_results_st,
selected_key_result_ids_st,
key_results_for_selection_st,
"Esecuzione pipeline AI (Sempre)..."
)
if not AGENTIC_MODULES_LOADED:
logging.warning("Agentic modules not loaded. Skipping autonomous pipeline.")
yield (
gr.update(value="Moduli AI non caricati. Report non disponibile."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Moduli AI non caricati. OKR non disponibili."),
None, [], [], "Pipeline AI: Moduli non caricati."
)
return
try:
# Parameters for 'Sempre' filter for the agentic pipeline
date_filter_val_agentic = "Sempre"
custom_start_val_agentic = None
custom_end_val_agentic = None
# Use the streaming orchestration
async for orchestration_output in run_full_analytics_orchestration_streaming(
current_token_state_val,
date_filter_val_agentic,
custom_start_val_agentic,
custom_end_val_agentic
):
if not orchestration_output:
continue
status = orchestration_output.get("status", "unknown")
if status == "report_ready":
# Report is ready, but OKRs are not yet
logging.info("Report ready, displaying preliminary results...")
report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
yield (
agentic_report_md_update,
gr.update(choices=[], value=[], interactive=False), # Keep OKRs disabled
gr.update(value="OKR ancora in elaborazione..."),
orchestration_output, # Store partial results
selected_key_result_ids_st,
key_results_for_selection_st,
"Report AI completato. Generazione OKR in corso..."
)
elif status == "complete":
# Everything is ready
logging.info("Complete results ready, displaying final output...")
orchestration_results_update = orchestration_output
report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks')
krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
krs_for_selection_update = krs_for_ui_selection_list
kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True)
# Display all OKRs by default
all_okrs_md_parts = []
if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
if not all_okrs_md_parts:
okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
else:
okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
selected_krs_update = []
agentic_status_text = "Pipeline AI (Sempre) completata completamente."
yield (
agentic_report_md_update,
key_results_cbg_update,
okr_detail_display_md_update,
orchestration_results_update,
selected_krs_update,
krs_for_selection_update,
agentic_status_text
)
break # Final result yielded, exit the loop
except Exception as e:
logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
yield (
gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
gr.update(choices=[], value=[], interactive=False),
gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
None, [], [], agentic_status_text
) |