LinkedinMonitor / app.py
GuglielmoTor's picture
Update app.py
8add36b verified
raw
history blame
22.8 kB
# app.py
import gradio as gr
import pandas as pd
import os
import logging
import matplotlib
matplotlib.use('Agg') # Set backend for Matplotlib
import matplotlib.pyplot as plt
import time
from datetime import datetime, timedelta
import numpy as np
from collections import OrderedDict, defaultdict # Added defaultdict
import asyncio
# --- Module Imports ---
from utils.gradio_utils import get_url_user_token
# Functions from newly created/refactored modules
from config import (
LINKEDIN_CLIENT_ID_ENV_VAR, BUBBLE_APP_NAME_ENV_VAR,
BUBBLE_API_KEY_PRIVATE_ENV_VAR, BUBBLE_API_ENDPOINT_ENV_VAR,
PLOT_ID_TO_FORMULA_KEY_MAP # Keep this if used by AnalyticsTab
)
from services.state_manager import process_and_store_bubble_token
from services.sync_logic import sync_all_linkedin_data_orchestrator
from ui.ui_generators import (
display_main_dashboard,
build_analytics_tab_plot_area, # This will be passed to AnalyticsTab
BOMB_ICON, EXPLORE_ICON, FORMULA_ICON, ACTIVE_ICON # These will be passed
)
from ui.analytics_plot_generator import update_analytics_plots_figures, create_placeholder_plot # Pass these
from formulas import PLOT_FORMULAS # Keep this if used by AnalyticsTab
# --- EXISTING CHATBOT MODULE IMPORTS ---
from features.chatbot.chatbot_prompts import get_initial_insight_prompt_and_suggestions # Pass this
from features.chatbot.chatbot_handler import generate_llm_response # Pass this
# --- NEW AGENTIC PIPELINE IMPORTS ---
try:
from run_agentic_pipeline import run_full_analytics_orchestration
from ui.insights_ui_generator import (
format_report_to_markdown,
extract_key_results_for_selection,
format_single_okr_for_display
)
AGENTIC_MODULES_LOADED = True
except ImportError as e:
logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
AGENTIC_MODULES_LOADED = False
async def run_full_analytics_orchestration(*args, **kwargs): return None # Placeholder
def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." # Placeholder
def extract_key_results_for_selection(okrs_dict): return [] # Placeholder
def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." # Placeholder
# --- IMPORT THE NEW ANALYTICS TAB MODULE ---
from services.analytics_tab_module import AnalyticsTab # Assuming analytics_tab_module.py is in the services directory
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(module)s - %(message)s')
# API Key Setup
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
user_provided_api_key = os.environ.get("GEMINI_API_KEY")
if user_provided_api_key:
os.environ["GOOGLE_API_KEY"] = user_provided_api_key
logging.info("GOOGLE_API_KEY environment variable has been set from GEMINI_API_KEY.")
else:
logging.error("CRITICAL ERROR: The API key environment variable 'GEMINI_API_KEY' was not found.")
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue", secondary_hue="sky"),
title="LinkedIn Organization Dashboard") as app:
token_state = gr.State(value={
"token": None, "client_id": None, "org_urn": None,
"bubble_posts_df": pd.DataFrame(), "bubble_post_stats_df": pd.DataFrame(),
"bubble_mentions_df": pd.DataFrame(),
"bubble_follower_stats_df": pd.DataFrame(),
"fetch_count_for_api": 0, "url_user_token_temp_storage": None,
"config_date_col_posts": "published_at", "config_date_col_mentions": "date",
"config_date_col_followers": "date", "config_media_type_col": "media_type",
"config_eb_labels_col": "li_eb_label"
})
# States for existing analytics tab chatbot - these are passed to AnalyticsTab
chat_histories_st = gr.State({})
current_chat_plot_id_st = gr.State(None)
plot_data_for_chatbot_st = gr.State({}) # This will be populated by the analytics module's refresh
# --- STATES FOR AGENTIC PIPELINE ---
orchestration_raw_results_st = gr.State(None) # Stores the full raw output from the agentic pipeline
key_results_for_selection_st = gr.State([]) # Stores the list of dicts for KR selection (label, id, etc.)
selected_key_result_ids_st = gr.State([]) # Stores the unique_kr_ids selected in the CheckboxGroup
gr.Markdown("# 🚀 LinkedIn Organization Dashboard")
url_user_token_display = gr.Textbox(label="User Token (Nascosto)", interactive=False, visible=False)
status_box = gr.Textbox(label="Stato Generale Token LinkedIn", interactive=False, value="Inizializzazione...")
org_urn_display = gr.Textbox(label="URN Organizzazione (Nascosto)", interactive=False, visible=False)
app.load(fn=get_url_user_token, inputs=None, outputs=[url_user_token_display, org_urn_display], api_name="get_url_params", show_progress=False)
def initial_load_sequence(url_token, org_urn_val, current_state):
status_msg, new_state, btn_update = process_and_store_bubble_token(url_token, org_urn_val, current_state)
dashboard_content = display_main_dashboard(new_state)
return status_msg, new_state, btn_update, dashboard_content
# --- Instantiate the AnalyticsTab module ---
analytics_icons = {
'bomb': BOMB_ICON, 'explore': EXPLORE_ICON,
'formula': FORMULA_ICON, 'active': ACTIVE_ICON
}
analytics_tab_instance = AnalyticsTab(
token_state=token_state,
chat_histories_st=chat_histories_st,
current_chat_plot_id_st=current_chat_plot_id_st,
plot_data_for_chatbot_st=plot_data_for_chatbot_st,
plot_id_to_formula_map=PLOT_ID_TO_FORMULA_KEY_MAP,
plot_formulas_data=PLOT_FORMULAS,
icons=analytics_icons,
fn_build_plot_area=build_analytics_tab_plot_area,
fn_update_plot_figures=update_analytics_plots_figures,
fn_create_placeholder_plot=create_placeholder_plot,
fn_get_initial_insight=get_initial_insight_prompt_and_suggestions,
fn_generate_llm_response=generate_llm_response
)
with gr.Tabs() as tabs:
with gr.TabItem("1️⃣ Dashboard & Sync", id="tab_dashboard_sync"):
gr.Markdown("Il sistema controlla i dati esistenti da Bubble. 'Sincronizza' si attiva se sono necessari nuovi dati.")
sync_data_btn = gr.Button("🔄 Sincronizza Dati LinkedIn", variant="primary", visible=False, interactive=False)
sync_status_html_output = gr.HTML("<p style='text-align:center;'>Stato sincronizzazione...</p>")
dashboard_display_html = gr.HTML("<p style='text-align:center;'>Caricamento dashboard...</p>")
# --- Use the AnalyticsTab module to create Tab 2 ---
analytics_tab_instance.create_tab_ui()
# --- Tab 3: Agentic Analysis Report ---
with gr.TabItem("3️⃣ Agentic Analysis Report", id="tab_agentic_report", visible=AGENTIC_MODULES_LOADED):
gr.Markdown("## 🤖 Comprehensive Analysis Report (AI Generated)")
agentic_pipeline_status_md = gr.Markdown("Stato Pipeline AI (filtro 'Sempre'): In attesa...", visible=True)
gr.Markdown("Questo report è generato da un agente AI con filtro 'Sempre' sui dati disponibili. Rivedi criticamente.")
agentic_report_display_md = gr.Markdown("La pipeline AI si avvierà automaticamente dopo il caricamento iniziale dei dati o dopo una sincronizzazione.")
if not AGENTIC_MODULES_LOADED:
gr.Markdown("🔴 **Error:** Agentic pipeline modules could not be loaded. This tab is disabled.")
# --- Tab 4: Agentic OKRs & Tasks ---
with gr.TabItem("4️⃣ Agentic OKRs & Tasks", id="tab_agentic_okrs", visible=AGENTIC_MODULES_LOADED):
gr.Markdown("## 🎯 AI Generated OKRs and Actionable Tasks (filtro 'Sempre')")
gr.Markdown("Basato sull'analisi AI (filtro 'Sempre'), l'agente ha proposto i seguenti OKR e task. Seleziona i Key Results per dettagli.")
if not AGENTIC_MODULES_LOADED:
gr.Markdown("🔴 **Error:** Agentic pipeline modules could not be loaded. This tab is disabled.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Suggested Key Results (da analisi 'Sempre')")
key_results_cbg = gr.CheckboxGroup(label="Select Key Results", choices=[], value=[], interactive=True)
with gr.Column(scale=3):
gr.Markdown("### Detailed OKRs and Tasks for Selected Key Results")
okr_detail_display_md = gr.Markdown("I dettagli OKR appariranno qui dopo l'esecuzione della pipeline AI.")
def update_okr_display_on_selection(selected_kr_unique_ids: list, raw_orchestration_results: dict, all_krs_for_selection: list):
if not raw_orchestration_results or not AGENTIC_MODULES_LOADED:
return gr.update(value="Nessun dato dalla pipeline AI o moduli non caricati.")
actionable_okrs_dict = raw_orchestration_results.get("actionable_okrs_and_tasks")
if not actionable_okrs_dict or not isinstance(actionable_okrs_dict.get("okrs"), list):
return gr.update(value="Nessun OKR trovato nei risultati della pipeline.")
okrs_list = actionable_okrs_dict["okrs"]
# Ensure all_krs_for_selection is a list of dicts with expected keys
if not all_krs_for_selection or not isinstance(all_krs_for_selection, list) or \
not all(isinstance(kr, dict) and 'unique_kr_id' in kr and 'okr_index' in kr and 'kr_index' in kr for kr in all_krs_for_selection):
logging.error("all_krs_for_selection is not in the expected format.")
return gr.update(value="Errore interno: formato dati KR non valido.")
kr_id_to_indices = {kr_info['unique_kr_id']: (kr_info['okr_index'], kr_info['kr_index']) for kr_info in all_krs_for_selection}
selected_krs_by_okr_idx = defaultdict(list)
if selected_kr_unique_ids:
for kr_unique_id in selected_kr_unique_ids:
if kr_unique_id in kr_id_to_indices:
okr_idx, kr_idx = kr_id_to_indices[kr_unique_id]
selected_krs_by_okr_idx[okr_idx].append(kr_idx)
output_md_parts = []
if not okrs_list:
output_md_parts.append("Nessun OKR generato.")
else:
for okr_idx, okr_data in enumerate(okrs_list):
accepted_indices_for_this_okr = selected_krs_by_okr_idx.get(okr_idx)
# If specific KRs are selected, only show OKRs that have at least one of the selected KRs
# OR if no KRs are selected at all, show all OKRs.
if selected_kr_unique_ids: # User has made a selection
if accepted_indices_for_this_okr is not None: # This OKR has some of the selected KRs
output_md_parts.append(format_single_okr_for_display(okr_data, accepted_kr_indices=accepted_indices_for_this_okr, okr_main_index=okr_idx))
else: # No KRs selected, show all OKRs with all their KRs
output_md_parts.append(format_single_okr_for_display(okr_data, accepted_kr_indices=None, okr_main_index=okr_idx))
if not output_md_parts and selected_kr_unique_ids:
final_md = "Nessun OKR corrisponde alla selezione corrente o i KR selezionati non hanno task dettagliati."
elif not output_md_parts and not selected_kr_unique_ids: # Should be covered by "Nessun OKR generato."
final_md = "Nessun OKR generato."
else:
final_md = "\n\n---\n\n".join(output_md_parts)
return gr.update(value=final_md)
if AGENTIC_MODULES_LOADED:
key_results_cbg.change(
fn=update_okr_display_on_selection,
inputs=[key_results_cbg, orchestration_raw_results_st, key_results_for_selection_st],
outputs=[okr_detail_display_md],
api_name="update_okr_display_on_selection_module"
)
async def run_agentic_pipeline_autonomously(current_token_state_val):
logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
# Initial state before pipeline runs or if skipped
initial_yield = (
gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # agentic_report_display_md
gr.update(choices=[], value=[], interactive=False), # key_results_cbg
gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # okr_detail_display_md
orchestration_raw_results_st.value, # Preserve current raw results
selected_key_result_ids_st.value, # Preserve current selection
key_results_for_selection_st.value, # Preserve current options
"Pipeline AI: In attesa dei dati..." # agentic_pipeline_status_md
)
if not current_token_state_val or not current_token_state_val.get("token"):
logging.info("Agentic pipeline: Token not available in token_state. Skipping.")
yield initial_yield
return
logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
# Update status to indicate processing
yield (
gr.update(value="Analisi AI (Sempre) in corso..."),
gr.update(choices=[], value=[], interactive=False), # Keep CBG disabled during run
gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
orchestration_raw_results_st.value, # Preserve
selected_key_result_ids_st.value, # Preserve
key_results_for_selection_st.value, # Preserve
"Esecuzione pipeline AI (Sempre)..."
)
if not AGENTIC_MODULES_LOADED:
logging.warning("Agentic modules not loaded. Skipping autonomous pipeline.")
yield (
gr.update(value="Moduli AI non caricati. Report non disponibile."),
gr.update(choices=[], value=[], interactive=False),
gr.update(value="Moduli AI non caricati. OKR non disponibili."),
None, [], [], "Pipeline AI: Moduli non caricati."
)
return
try:
# Parameters for 'Sempre' filter for the agentic pipeline
date_filter_val_agentic = "Sempre"
custom_start_val_agentic = None
custom_end_val_agentic = None
orchestration_output = await run_full_analytics_orchestration(
current_token_state_val,
date_filter_val_agentic,
custom_start_val_agentic,
custom_end_val_agentic
)
agentic_status_text = "Pipeline AI (Sempre) completata."
logging.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}")
if orchestration_output:
orchestration_results_update = orchestration_output # Store full results in state
report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') # This is the dict containing 'okrs' list
krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) # Expects the dict
krs_for_selection_update = krs_for_ui_selection_list # Update state with list of KR dicts
# Choices for CheckboxGroup: list of (label, value) tuples
kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True) # Reset selection
# Display all OKRs by default after pipeline run
all_okrs_md_parts = []
if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
if not all_okrs_md_parts:
okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
else:
okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
selected_krs_update = [] # Reset selected KRs state
else:
agentic_report_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).")
key_results_cbg_update = gr.update(choices=[], value=[], interactive=False)
okr_detail_display_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).")
orchestration_results_update = None
selected_krs_update = []
krs_for_selection_update = []
yield (
agentic_report_md_update,
key_results_cbg_update,
okr_detail_display_md_update,
orchestration_results_update, # state
selected_krs_update, # state
krs_for_selection_update, # state
agentic_status_text
)
except Exception as e:
logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
yield (
gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
gr.update(choices=[], value=[], interactive=False),
gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
None, [], [], agentic_status_text # Reset states on error
)
# Define the output list for the agentic pipeline callbacks
# Order: Report MD, KR CBG, OKR Detail MD, RawResults State, SelectedKRIDs State, KRList State, Status MD
agentic_pipeline_outputs_list = [
agentic_report_display_md,
key_results_cbg,
okr_detail_display_md,
orchestration_raw_results_st,
selected_key_result_ids_st,
key_results_for_selection_st,
agentic_pipeline_status_md
]
agentic_pipeline_inputs = [token_state] # Input for the autonomous run
# --- Event Handling ---
initial_load_event = org_urn_display.change(
fn=initial_load_sequence,
inputs=[url_user_token_display, org_urn_display, token_state],
outputs=[status_box, token_state, sync_data_btn, dashboard_display_html],
show_progress="full"
)
initial_load_event.then(
fn=analytics_tab_instance._refresh_analytics_graphs_ui,
inputs=[
token_state,
analytics_tab_instance.date_filter_selector,
analytics_tab_instance.custom_start_date_picker,
analytics_tab_instance.custom_end_date_picker,
chat_histories_st
],
outputs=analytics_tab_instance.graph_refresh_outputs_list,
show_progress="full"
).then(
fn=run_agentic_pipeline_autonomously, # Generator function
inputs=agentic_pipeline_inputs,
outputs=agentic_pipeline_outputs_list,
show_progress="minimal" # Use minimal for generators that yield status
)
sync_event_part1 = sync_data_btn.click(
fn=sync_all_linkedin_data_orchestrator,
inputs=[token_state],
outputs=[sync_status_html_output, token_state],
show_progress="full"
)
sync_event_part2 = sync_event_part1.then(
fn=process_and_store_bubble_token,
inputs=[url_user_token_display, org_urn_display, token_state],
outputs=[status_box, token_state, sync_data_btn],
show_progress=False
)
sync_event_part2.then(
fn=run_agentic_pipeline_autonomously, # Generator function
inputs=agentic_pipeline_inputs,
outputs=agentic_pipeline_outputs_list,
show_progress="minimal"
)
sync_event_part3 = sync_event_part2.then(
fn=display_main_dashboard,
inputs=[token_state],
outputs=[dashboard_display_html],
show_progress=False
)
sync_event_graphs_after_sync = sync_event_part3.then(
fn=analytics_tab_instance._refresh_analytics_graphs_ui,
inputs=[
token_state,
analytics_tab_instance.date_filter_selector,
analytics_tab_instance.custom_start_date_picker,
analytics_tab_instance.custom_end_date_picker,
chat_histories_st
],
outputs=analytics_tab_instance.graph_refresh_outputs_list,
show_progress="full"
)
if __name__ == "__main__":
if not os.environ.get(LINKEDIN_CLIENT_ID_ENV_VAR):
logging.warning(f"ATTENZIONE: '{LINKEDIN_CLIENT_ID_ENV_VAR}' non impostata.")
if not all(os.environ.get(var) for var in [BUBBLE_APP_NAME_ENV_VAR, BUBBLE_API_KEY_PRIVATE_ENV_VAR, BUBBLE_API_ENDPOINT_ENV_VAR]):
logging.warning("ATTENZIONE: Una o più variabili d'ambiente Bubble (BUBBLE_APP_NAME, BUBBLE_API_KEY_PRIVATE, BUBBLE_API_ENDPOINT) non sono impostate.")
if not AGENTIC_MODULES_LOADED:
logging.warning("CRITICAL: Agentic pipeline modules failed to load. Tabs 3 and 4 (Agentic Report & OKRs) will be non-functional.")
if not os.environ.get("GEMINI_API_KEY"): # Check GEMINI_API_KEY directly as GOOGLE_API_KEY is derived
logging.warning("ATTENZIONE: 'GEMINI_API_KEY' non impostata. Questo è necessario per le funzionalità AI, incluse le tab agentiche e il chatbot dei grafici.")
try:
logging.info(f"Gradio version: {gr.__version__}")
logging.info(f"Pandas version: {pd.__version__}")
logging.info(f"Matplotlib version: {matplotlib.__version__}, Backend: {matplotlib.get_backend()}")
except Exception as e:
logging.warning(f"Could not log library versions: {e}")
app.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), debug=True)