Spaces:
Sleeping
Sleeping
| # run_agentic_pipeline.py | |
| import asyncio | |
| import os | |
| import json | |
| import logging | |
| from datetime import datetime | |
| import pandas as pd | |
| from typing import Dict, Any, Optional | |
| import gradio as gr | |
| # Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder | |
| # If 'insight_and_tasks' is not in python path, you might need to adjust sys.path | |
| # For example, if insight_and_tasks is a sibling of the dir containing this file: | |
| # import sys | |
| # script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| # project_root = os.path.dirname(script_dir) # Or navigate to the correct root | |
| # sys.path.insert(0, project_root) | |
| os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False" | |
| GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY") | |
| os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY | |
| # Imports from your project structure | |
| from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator | |
| # setup_logging might be called in app.py, if not, call it here or ensure it's called once. | |
| # from insight_and_tasks.utils.logging_config import setup_logging | |
| from data_processing.analytics_data_processing import prepare_filtered_analytics_data | |
| # Placeholder for UI generator import - to be created later | |
| # from .insights_ui_generator import format_orchestration_results_for_ui | |
| try: | |
| from ui.insights_ui_generator import ( | |
| format_report_to_markdown, | |
| extract_key_results_for_selection, | |
| format_single_okr_for_display | |
| ) | |
| AGENTIC_MODULES_LOADED = True | |
| except ImportError as e: | |
| logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.") | |
| AGENTIC_MODULES_LOADED = False | |
| async def run_full_analytics_orchestration(*args, **kwargs): return None # Placeholder | |
| def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." # Placeholder | |
| def extract_key_results_for_selection(okrs_dict): return [] # Placeholder | |
| def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." # Placeholder | |
| from services.report_data_handler import save_report_results | |
| logger = logging.getLogger(__name__) | |
| async def run_full_analytics_orchestration( | |
| token_state: Dict[str, Any], | |
| date_filter_selection: str, | |
| custom_start_date: Optional[datetime], | |
| custom_end_date: Optional[datetime] | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Runs the full analytics pipeline using data from token_state and date filters, | |
| and returns the raw orchestration results. | |
| Args: | |
| token_state: Gradio token_state containing raw data and config. | |
| date_filter_selection: String for date filter type. | |
| custom_start_date: Optional custom start date. | |
| custom_end_date: Optional custom end date. | |
| Returns: | |
| A dictionary containing the results from the analytics orchestrator, | |
| or None if a critical error occurs. | |
| """ | |
| if not GOOGLE_API_KEY: | |
| logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.") | |
| return None | |
| logger.info("Starting full analytics orchestration process...") | |
| # 1. Prepare and filter data | |
| try: | |
| ( | |
| filtered_posts_df, | |
| filtered_mentions_df, | |
| _date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series | |
| raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics | |
| _start_dt, # Filtered start date, for logging or context if needed | |
| _end_dt # Filtered end date | |
| ) = prepare_filtered_analytics_data( | |
| token_state, date_filter_selection, custom_start_date, custom_end_date | |
| ) | |
| logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})") | |
| except Exception as e: | |
| logger.error(f"Error during data preparation: {e}", exc_info=True) | |
| return None | |
| # Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous | |
| if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty: | |
| logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.") | |
| # Depending on requirements, you might return a specific message or empty results structure. | |
| # 2. Initialize and run the orchestrator | |
| try: | |
| # You can pass a specific model name or let the orchestrator use its default | |
| llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state | |
| orchestrator = EnhancedLinkedInAnalyticsOrchestrator( | |
| api_key=GOOGLE_API_KEY, | |
| llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default | |
| current_date_for_tasks=datetime.utcnow().date() | |
| ) | |
| logger.info("Orchestrator initialized. Generating full analysis and tasks...") | |
| # The orchestrator expects the primary follower stats DF to be the one it can process for | |
| # time-series ('follower_gains_monthly') and demographics. | |
| # The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing. | |
| orchestration_results = await orchestrator.generate_full_analysis_and_tasks( | |
| follower_stats_df=raw_follower_stats_df, # Pass the full history for followers | |
| post_df=filtered_posts_df, | |
| mentions_df=filtered_mentions_df | |
| ) | |
| logger.info("Orchestration process completed.") | |
| return orchestration_results | |
| except Exception as e: | |
| logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True) | |
| return None | |
| async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st,selected_key_result_ids_st, key_results_for_selection_st): | |
| logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}") | |
| # Initial state before pipeline runs or if skipped | |
| initial_yield = ( | |
| gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # agentic_report_display_md | |
| gr.update(choices=[], value=[], interactive=False), # key_results_cbg | |
| gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # okr_detail_display_md | |
| orchestration_raw_results_st, # Preserve current raw results | |
| selected_key_result_ids_st, # Preserve current selection | |
| key_results_for_selection_st, # Preserve current options | |
| "Pipeline AI: In attesa dei dati..." # agentic_pipeline_status_md | |
| ) | |
| if not current_token_state_val or not current_token_state_val.get("token"): | |
| logging.info("Agentic pipeline: Token not available in token_state. Skipping.") | |
| yield initial_yield | |
| return | |
| logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.") | |
| # Update status to indicate processing | |
| yield ( | |
| gr.update(value="Analisi AI (Sempre) in corso..."), | |
| gr.update(choices=[], value=[], interactive=False), # Keep CBG disabled during run | |
| gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."), | |
| orchestration_raw_results_st, # Preserve | |
| selected_key_result_ids_st, # Preserve | |
| key_results_for_selection_st, # Preserve | |
| "Esecuzione pipeline AI (Sempre)..." | |
| ) | |
| if not AGENTIC_MODULES_LOADED: | |
| logging.warning("Agentic modules not loaded. Skipping autonomous pipeline.") | |
| yield ( | |
| gr.update(value="Moduli AI non caricati. Report non disponibile."), | |
| gr.update(choices=[], value=[], interactive=False), | |
| gr.update(value="Moduli AI non caricati. OKR non disponibili."), | |
| None, [], [], "Pipeline AI: Moduli non caricati." | |
| ) | |
| return | |
| #DA AGGIORNARE CON RIPORTO DEI DATI PRESI DA BUBBLE E FORMATTATI NEL MODO GIUSTO | |
| if not current_token_state_val.get("agentic_pipeline_should_run_now", False): | |
| logging.warning("no need for agentic pipeline to run. Data stored in db.") | |
| yield ( | |
| gr.update(value="Data stored in db"), | |
| gr.update(choices=[], value=[], interactive=False), | |
| gr.update(value="Data stored in db"), | |
| None, [], [], "Pipeline AI: Data stored in db" | |
| ) | |
| return | |
| # else: | |
| # continue | |
| try: | |
| # Parameters for 'Sempre' filter for the agentic pipeline | |
| date_filter_val_agentic = "Sempre" | |
| custom_start_val_agentic = None | |
| custom_end_val_agentic = None | |
| orchestration_output = await run_full_analytics_orchestration( | |
| current_token_state_val, | |
| date_filter_val_agentic, | |
| custom_start_val_agentic, | |
| custom_end_val_agentic | |
| ) | |
| agentic_status_text = "Pipeline AI (Sempre) completata." | |
| logging.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}") | |
| if orchestration_output: | |
| orchestration_results_update = orchestration_output # Store full results in state | |
| report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.") | |
| agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str)) | |
| quarter = orchestration_output.get('quarter', "quarter non disponibile") | |
| year = orchestration_output.get('year', "year non disponibile") | |
| org_urn = current_token_state_val.get('org_urn') | |
| try: | |
| save_report_results(org_urn=org_urn, report_markdown=report_str, quarter=quarter, year=year, report_type='Quarter') | |
| except Exception as e: | |
| logging.error(f"error saving report {e}") | |
| actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') # This is the dict containing 'okrs' list | |
| krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) # Expects the dict | |
| krs_for_selection_update = krs_for_ui_selection_list # Update state with list of KR dicts | |
| # Choices for CheckboxGroup: list of (label, value) tuples | |
| kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list] | |
| key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True) # Reset selection | |
| # Display all OKRs by default after pipeline run | |
| all_okrs_md_parts = [] | |
| if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list): | |
| for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]): | |
| all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx)) | |
| if not all_okrs_md_parts: | |
| okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).") | |
| else: | |
| okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts)) | |
| selected_krs_update = [] # Reset selected KRs state | |
| else: | |
| agentic_report_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).") | |
| key_results_cbg_update = gr.update(choices=[], value=[], interactive=False) | |
| okr_detail_display_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).") | |
| orchestration_results_update = None | |
| selected_krs_update = [] | |
| krs_for_selection_update = [] | |
| yield ( | |
| agentic_report_md_update, | |
| key_results_cbg_update, | |
| okr_detail_display_md_update, | |
| orchestration_results_update, # state | |
| selected_krs_update, # state | |
| krs_for_selection_update, # state | |
| agentic_status_text | |
| ) | |
| except Exception as e: | |
| logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True) | |
| agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}" | |
| yield ( | |
| gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"), | |
| gr.update(choices=[], value=[], interactive=False), | |
| gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"), | |
| None, [], [], agentic_status_text # Reset states on error | |
| ) |