Spaces:
Running
Running
File size: 15,017 Bytes
8ed7829 c0a4e63 8ed7829 5dc7e98 8650d1b 6d6572c 8ed7829 2a7af3e 8650d1b e82dbcd 8650d1b 8ed7829 055c98e 8650d1b df5a49e 055c98e 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 8ed7829 8650d1b 6086ed3 8650d1b 6086ed3 8650d1b 6086ed3 8650d1b 6086ed3 8650d1b 6086ed3 16296d1 df5a49e fb78a2f df5a49e fb78a2f df5a49e 16296d1 df5a49e 16296d1 6086ed3 8650d1b 6086ed3 8650d1b 5dc7e98 7a1c198 e04bff4 b506f4a 7a1c198 5dc7e98 8650d1b e04bff4 8650d1b 6086ed3 8650d1b 6086ed3 8650d1b 6086ed3 8650d1b 6086ed3 8650d1b 6086ed3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
# run_agentic_pipeline.py
import asyncio
import os
import json
import logging
from datetime import datetime
import pandas as pd
from typing import Dict, Any, Optional
import gradio as gr
# Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder
# If 'insight_and_tasks' is not in python path, you might need to adjust sys.path
# For example, if insight_and_tasks is a sibling of the dir containing this file:
# import sys
# script_dir = os.path.dirname(os.path.abspath(__file__))
# project_root = os.path.dirname(script_dir) # Or navigate to the correct root
# sys.path.insert(0, project_root)
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
# Imports from your project structure
from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
# setup_logging might be called in app.py, if not, call it here or ensure it's called once.
# from insight_and_tasks.utils.logging_config import setup_logging
from data_processing.analytics_data_processing import prepare_filtered_analytics_data
# Placeholder for UI generator import - to be created later
# from .insights_ui_generator import format_orchestration_results_for_ui
try:
from ui.insights_ui_generator import (
format_report_to_markdown,
extract_key_results_for_selection,
format_single_okr_for_display
)
AGENTIC_MODULES_LOADED = True
except ImportError as e:
logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
AGENTIC_MODULES_LOADED = False
async def run_full_analytics_orchestration(*args, **kwargs): return None # Placeholder
def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." # Placeholder
def extract_key_results_for_selection(okrs_dict): return [] # Placeholder
def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." # Placeholder
from services.report_data_handler import save_report_results, save_actionable_okrs, fetch_and_reconstruct_data_from_bubble
logger = logging.getLogger(__name__)
async def run_full_analytics_orchestration(
token_state: Dict[str, Any],
date_filter_selection: str,
custom_start_date: Optional[datetime],
custom_end_date: Optional[datetime]
) -> Optional[Dict[str, Any]]:
"""
Runs the full analytics pipeline using data from token_state and date filters,
and returns the raw orchestration results.
Args:
token_state: Gradio token_state containing raw data and config.
date_filter_selection: String for date filter type.
custom_start_date: Optional custom start date.
custom_end_date: Optional custom end date.
Returns:
A dictionary containing the results from the analytics orchestrator,
or None if a critical error occurs.
"""
if not GOOGLE_API_KEY:
logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
return None
logger.info("Starting full analytics orchestration process...")
# 1. Prepare and filter data
try:
(
filtered_posts_df,
filtered_mentions_df,
_date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series
raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics
_start_dt, # Filtered start date, for logging or context if needed
_end_dt # Filtered end date
) = prepare_filtered_analytics_data(
token_state, date_filter_selection, custom_start_date, custom_end_date
)
logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})")
except Exception as e:
logger.error(f"Error during data preparation: {e}", exc_info=True)
return None
# Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous
if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
# Depending on requirements, you might return a specific message or empty results structure.
# 2. Initialize and run the orchestrator
try:
# You can pass a specific model name or let the orchestrator use its default
llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state
orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
api_key=GOOGLE_API_KEY,
llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default
current_date_for_tasks=datetime.utcnow().date()
)
logger.info("Orchestrator initialized. Generating full analysis and tasks...")
# The orchestrator expects the primary follower stats DF to be the one it can process for
# time-series ('follower_gains_monthly') and demographics.
# The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing.
orchestration_results = await orchestrator.generate_full_analysis_and_tasks(
follower_stats_df=raw_follower_stats_df, # Pass the full history for followers
post_df=filtered_posts_df,
mentions_df=filtered_mentions_df
)
logger.info("Orchestration process completed.")
return orchestration_results
except Exception as e:
logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
return None
async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st,selected_key_result_ids_st, key_results_for_selection_st):
logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
# Initial state before pipeline runs or if skipped
initial_yield = (
gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # agentic_report_display_md
gr.update(choices=[], value=[], interactive=False), # key_results_cbg
gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # okr_detail_display_md
orchestration_raw_results_st, # Preserve current raw results
selected_key_result_ids_st, # Preserve current selection
key_results_for_selection_st, # Preserve current options
"Pipeline AI: In attesa dei dati..." # agentic_pipeline_status_md
)
if not current_token_state_val or not current_token_state_val.get("token"):
logging.info("Agentic pipeline: Token not available in token_state. Skipping.")
yield initial_yield
return
logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
# Update status to indicate processing
yield (
gr.update(value="Analisi AI (Sempre) in corso..."),
gr.update(choices=[], value=[], interactive=False), # Keep CBG disabled during run
gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
orchestration_raw_results_st, # Preserve
selected_key_result_ids_st, # Preserve
key_results_for_selection_st, # Preserve
"Esecuzione pipeline AI (Sempre)..."
)
if not AGENTIC_MODULES_LOADED:
logging.warning("Agentic modules not loaded. Skipping autonomous pipeline.")
yield (
gr.update(value="Moduli AI non caricati. Report non disponibile."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Moduli AI non caricati. OKR non disponibili."),
None, [], [], "Pipeline AI: Moduli non caricati."
)
return
if not current_token_state_val.get("agentic_pipeline_should_run_now", False):
logging.info("Fetching existing data from Bubble as pipeline run is not required.")
report_df = current_token_state_val.get('bubble_agentic_analysis_data')
# Call the new function to get reconstructed data
retrieved_data = fetch_and_reconstruct_data_from_bubble(report_df)
if not retrieved_data:
logging.warning(f"No data found in Bubble for org_urn {org_urn}. Informing user.")
yield (
gr.update(value="Nessun dato di analisi precedente trovato in Bubble."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Eseguire la pipeline per generare un nuovo report."),
None, [], [], "Pipeline AI: Dati non disponibili"
)
return
# If data is found, format it for the UI
report_str = retrieved_data.get('report_str')
actionable_okrs = retrieved_data.get('actionable_okrs')
agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True)
all_okrs_md_parts = []
if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
if not all_okrs_md_parts:
okr_detail_display_md_update = gr.update(value="Nessun OKR trovato per il report più recente.")
else:
okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
# Yield the updates for the Gradio interface
yield (
agentic_report_md_update,
key_results_cbg_update,
okr_detail_display_md_update,
retrieved_data, # Store full retrieved data in state
[], # Reset selected KRs state
krs_for_ui_selection_list, # Update state with list of KR dicts
"Pipeline AI: Dati caricati da Bubble"
)
return
try:
# Parameters for 'Sempre' filter for the agentic pipeline
date_filter_val_agentic = "Sempre"
custom_start_val_agentic = None
custom_end_val_agentic = None
orchestration_output = await run_full_analytics_orchestration(
current_token_state_val,
date_filter_val_agentic,
custom_start_val_agentic,
custom_end_val_agentic
)
agentic_status_text = "Pipeline AI (Sempre) completata."
logging.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}")
if orchestration_output:
orchestration_results_update = orchestration_output # Store full results in state
report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
quarter = orchestration_output.get('quarter', "quarter non disponibile")
year = orchestration_output.get('year', "year non disponibile")
org_urn = current_token_state_val.get('org_urn')
try:
report_id = save_report_results(org_urn=org_urn, report_markdown=report_str, quarter=quarter, year=year, report_type='Quarter')
except Exception as e:
logging.error(f"error saving report {e}")
actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') # This is the dict containing 'okrs' list
try:
save_actionable_okrs(org_urn, actionable_okrs, report_id)
except Exception as e:
logging.error(f"error saving report {e}")
krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) # Expects the dict
krs_for_selection_update = krs_for_ui_selection_list # Update state with list of KR dicts
# Choices for CheckboxGroup: list of (label, value) tuples
kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True) # Reset selection
# Display all OKRs by default after pipeline run
all_okrs_md_parts = []
if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
if not all_okrs_md_parts:
okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
else:
okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
selected_krs_update = [] # Reset selected KRs state
else:
agentic_report_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).")
key_results_cbg_update = gr.update(choices=[], value=[], interactive=False)
okr_detail_display_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).")
orchestration_results_update = None
selected_krs_update = []
krs_for_selection_update = []
yield (
agentic_report_md_update,
key_results_cbg_update,
okr_detail_display_md_update,
orchestration_results_update, # state
selected_krs_update, # state
krs_for_selection_update, # state
agentic_status_text
)
except Exception as e:
logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
yield (
gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
gr.update(choices=[], value=[], interactive=False),
gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
None, [], [], agentic_status_text # Reset states on error
) |