File size: 15,017 Bytes
8ed7829
 
 
 
 
 
 
 
c0a4e63
8ed7829
5dc7e98
8650d1b
 
 
 
 
 
 
 
6d6572c
 
 
8ed7829
 
2a7af3e
8650d1b
 
e82dbcd
8650d1b
 
8ed7829
055c98e
 
 
 
 
 
 
 
 
 
8650d1b
 
 
 
 
df5a49e
055c98e
8ed7829
 
8650d1b
 
8ed7829
 
 
 
8650d1b
8ed7829
8650d1b
 
 
 
 
 
 
 
 
 
8ed7829
 
 
8650d1b
8ed7829
8650d1b
8ed7829
 
 
 
 
 
8650d1b
 
 
 
8ed7829
 
 
 
 
 
 
8650d1b
8ed7829
8650d1b
8ed7829
 
8650d1b
8ed7829
8650d1b
8ed7829
8650d1b
 
8ed7829
 
 
8650d1b
8ed7829
 
 
8650d1b
 
 
 
 
 
8ed7829
 
8650d1b
 
 
8ed7829
 
 
8650d1b
 
6086ed3
8650d1b
 
6086ed3
 
 
8650d1b
 
 
 
 
 
 
6086ed3
 
 
 
 
 
 
 
 
 
 
8650d1b
6086ed3
8650d1b
 
 
6086ed3
 
 
 
 
 
 
 
 
 
 
 
 
16296d1
df5a49e
fb78a2f
 
df5a49e
 
fb78a2f
df5a49e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16296d1
df5a49e
 
 
 
 
 
 
16296d1
 
 
 
6086ed3
 
 
 
 
 
8650d1b
6086ed3
 
 
 
8650d1b
 
 
 
 
 
 
 
 
5dc7e98
 
 
 
7a1c198
e04bff4
b506f4a
7a1c198
5dc7e98
8650d1b
e04bff4
 
 
 
 
8650d1b
6086ed3
8650d1b
6086ed3
8650d1b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6086ed3
8650d1b
 
 
 
 
 
 
 
 
6086ed3
 
 
 
 
 
 
8650d1b
6086ed3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# run_agentic_pipeline.py
import asyncio
import os
import json
import logging
from datetime import datetime
import pandas as pd
from typing import Dict, Any, Optional
import gradio as gr


# Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder
# If 'insight_and_tasks' is not in python path, you might need to adjust sys.path
# For example, if insight_and_tasks is a sibling of the dir containing this file:
# import sys
# script_dir = os.path.dirname(os.path.abspath(__file__))
# project_root = os.path.dirname(script_dir) # Or navigate to the correct root
# sys.path.insert(0, project_root)

os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

# Imports from your project structure
from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
# setup_logging might be called in app.py, if not, call it here or ensure it's called once.
# from insight_and_tasks.utils.logging_config import setup_logging 
from data_processing.analytics_data_processing import prepare_filtered_analytics_data 
# Placeholder for UI generator import - to be created later
# from .insights_ui_generator import format_orchestration_results_for_ui 

try:
    from ui.insights_ui_generator import (
        format_report_to_markdown,
        extract_key_results_for_selection,
        format_single_okr_for_display
    )
    AGENTIC_MODULES_LOADED = True
except ImportError as e:
    logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
    AGENTIC_MODULES_LOADED = False
    async def run_full_analytics_orchestration(*args, **kwargs): return None # Placeholder
    def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." # Placeholder
    def extract_key_results_for_selection(okrs_dict): return [] # Placeholder
    def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." # Placeholder

from services.report_data_handler import save_report_results, save_actionable_okrs, fetch_and_reconstruct_data_from_bubble

logger = logging.getLogger(__name__)


async def run_full_analytics_orchestration(
    token_state: Dict[str, Any],
    date_filter_selection: str,
    custom_start_date: Optional[datetime],
    custom_end_date: Optional[datetime]
) -> Optional[Dict[str, Any]]:
    """
    Runs the full analytics pipeline using data from token_state and date filters,
    and returns the raw orchestration results.
    Args:
        token_state: Gradio token_state containing raw data and config.
        date_filter_selection: String for date filter type.
        custom_start_date: Optional custom start date.
        custom_end_date: Optional custom end date.
    Returns:
        A dictionary containing the results from the analytics orchestrator,
        or None if a critical error occurs.
    """
    if not GOOGLE_API_KEY:
        logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
        return None

    logger.info("Starting full analytics orchestration process...")

    # 1. Prepare and filter data
    try:
        (
            filtered_posts_df,
            filtered_mentions_df,
            _date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series
            raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics
            _start_dt, # Filtered start date, for logging or context if needed
            _end_dt    # Filtered end date
        ) = prepare_filtered_analytics_data(
            token_state, date_filter_selection, custom_start_date, custom_end_date
        )
        logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})")

    except Exception as e:
        logger.error(f"Error during data preparation: {e}", exc_info=True)
        return None

    # Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous
    if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
        logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
        # Depending on requirements, you might return a specific message or empty results structure.

    # 2. Initialize and run the orchestrator
    try:
        # You can pass a specific model name or let the orchestrator use its default
        llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state
        
        orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
            api_key=GOOGLE_API_KEY,
            llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default
            current_date_for_tasks=datetime.utcnow().date()
        )

        logger.info("Orchestrator initialized. Generating full analysis and tasks...")
        # The orchestrator expects the primary follower stats DF to be the one it can process for
        # time-series ('follower_gains_monthly') and demographics.
        # The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing.
        orchestration_results = await orchestrator.generate_full_analysis_and_tasks(
            follower_stats_df=raw_follower_stats_df, # Pass the full history for followers
            post_df=filtered_posts_df,
            mentions_df=filtered_mentions_df
        )
        logger.info("Orchestration process completed.")
        return orchestration_results

    except Exception as e:
        logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
        return None



async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st,selected_key_result_ids_st, key_results_for_selection_st):
    logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
    # Initial state before pipeline runs or if skipped
    initial_yield = (
        gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # agentic_report_display_md
        gr.update(choices=[], value=[], interactive=False),             # key_results_cbg
        gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # okr_detail_display_md
        orchestration_raw_results_st, # Preserve current raw results
        selected_key_result_ids_st,   # Preserve current selection
        key_results_for_selection_st, # Preserve current options
        "Pipeline AI: In attesa dei dati..." # agentic_pipeline_status_md
    )

    if not current_token_state_val or not current_token_state_val.get("token"):
        logging.info("Agentic pipeline: Token not available in token_state. Skipping.")
        yield initial_yield
        return

    logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
    # Update status to indicate processing
    yield (
        gr.update(value="Analisi AI (Sempre) in corso..."),
        gr.update(choices=[], value=[], interactive=False), # Keep CBG disabled during run
        gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
        orchestration_raw_results_st, # Preserve
        selected_key_result_ids_st,   # Preserve
        key_results_for_selection_st, # Preserve
        "Esecuzione pipeline AI (Sempre)..."
    )

    if not AGENTIC_MODULES_LOADED:
        logging.warning("Agentic modules not loaded. Skipping autonomous pipeline.")
        yield (
            gr.update(value="Moduli AI non caricati. Report non disponibile."),
            gr.update(choices=[], value=[], interactive=False),
            gr.update(value="Moduli AI non caricati. OKR non disponibili."),
            None, [], [], "Pipeline AI: Moduli non caricati."
        )
        return

    if not current_token_state_val.get("agentic_pipeline_should_run_now", False):
        logging.info("Fetching existing data from Bubble as pipeline run is not required.")

        report_df = current_token_state_val.get('bubble_agentic_analysis_data')
        
        # Call the new function to get reconstructed data
        retrieved_data = fetch_and_reconstruct_data_from_bubble(report_df)

        if not retrieved_data:
            logging.warning(f"No data found in Bubble for org_urn {org_urn}. Informing user.")
            yield (
                gr.update(value="Nessun dato di analisi precedente trovato in Bubble."),
                gr.update(choices=[], value=[], interactive=False),
                gr.update(value="Eseguire la pipeline per generare un nuovo report."),
                None, [], [], "Pipeline AI: Dati non disponibili"
            )
            return

        # If data is found, format it for the UI
        report_str = retrieved_data.get('report_str')
        actionable_okrs = retrieved_data.get('actionable_okrs')

        agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
        
        krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
        kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
        key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True)
        
        all_okrs_md_parts = []
        if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
            for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
                all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
        
        if not all_okrs_md_parts:
            okr_detail_display_md_update = gr.update(value="Nessun OKR trovato per il report più recente.")
        else:
            okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))

        # Yield the updates for the Gradio interface
        yield (
            agentic_report_md_update,
            key_results_cbg_update,
            okr_detail_display_md_update,
            retrieved_data,  # Store full retrieved data in state
            [],  # Reset selected KRs state
            krs_for_ui_selection_list,  # Update state with list of KR dicts
            "Pipeline AI: Dati caricati da Bubble"
        )
        return

        
    try:
        # Parameters for 'Sempre' filter for the agentic pipeline
        date_filter_val_agentic = "Sempre"
        custom_start_val_agentic = None
        custom_end_val_agentic = None

        orchestration_output = await run_full_analytics_orchestration(
            current_token_state_val,
            date_filter_val_agentic,
            custom_start_val_agentic,
            custom_end_val_agentic
        )
        agentic_status_text = "Pipeline AI (Sempre) completata."
        logging.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}")

        if orchestration_output:
            orchestration_results_update = orchestration_output # Store full results in state
            report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
            agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))

            quarter = orchestration_output.get('quarter', "quarter non disponibile")
            year = orchestration_output.get('year', "year non disponibile")
            org_urn = current_token_state_val.get('org_urn')

            try:
                report_id = save_report_results(org_urn=org_urn, report_markdown=report_str, quarter=quarter, year=year, report_type='Quarter')
            except Exception as e:
                logging.error(f"error saving report {e}")

            actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') # This is the dict containing 'okrs' list
            try:
                save_actionable_okrs(org_urn, actionable_okrs, report_id)
            except Exception as e:
                logging.error(f"error saving report {e}")
                
            krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) # Expects the dict
            
            krs_for_selection_update = krs_for_ui_selection_list # Update state with list of KR dicts
            
            # Choices for CheckboxGroup: list of (label, value) tuples
            kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
            key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True) # Reset selection

            # Display all OKRs by default after pipeline run
            all_okrs_md_parts = []
            if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
                for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
                    all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
            
            if not all_okrs_md_parts:
                okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
            else:
                okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
            
            selected_krs_update = [] # Reset selected KRs state
        else:
            agentic_report_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).")
            key_results_cbg_update = gr.update(choices=[], value=[], interactive=False)
            okr_detail_display_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).")
            orchestration_results_update = None
            selected_krs_update = []
            krs_for_selection_update = []

        yield (
            agentic_report_md_update,
            key_results_cbg_update,
            okr_detail_display_md_update,
            orchestration_results_update, # state
            selected_krs_update,          # state
            krs_for_selection_update,     # state
            agentic_status_text
        )
    except Exception as e:
        logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
        agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
        yield (
            gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
            gr.update(choices=[], value=[], interactive=False),
            gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
            None, [], [], agentic_status_text # Reset states on error
        )