File size: 9,917 Bytes
8ed7829
 
 
 
 
 
 
 
c0a4e63
8ed7829
6d6572c
 
 
8ed7829
 
2a7af3e
e82dbcd
8ed7829
055c98e
 
 
 
 
 
 
 
 
 
e4cf8e8
 
 
 
 
055c98e
8ed7829
 
e4cf8e8
8ed7829
 
 
 
e4cf8e8
8ed7829
e4cf8e8
 
8ed7829
 
 
e4cf8e8
8ed7829
e4cf8e8
8ed7829
 
 
 
 
 
e4cf8e8
 
 
 
8ed7829
 
 
 
 
 
 
e4cf8e8
8ed7829
e4cf8e8
8ed7829
 
 
e4cf8e8
8ed7829
e4cf8e8
8ed7829
 
 
e4cf8e8
8ed7829
 
 
e4cf8e8
 
 
 
 
8ed7829
 
e4cf8e8
 
8ed7829
 
 
e4cf8e8
6086ed3
e4cf8e8
6086ed3
e4cf8e8
6086ed3
 
e4cf8e8
 
 
 
 
 
 
6086ed3
 
 
 
 
 
 
 
e4cf8e8
6086ed3
 
 
e4cf8e8
6086ed3
e4cf8e8
 
 
6086ed3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e4cf8e8
 
6086ed3
 
 
 
e4cf8e8
 
 
 
 
6086ed3
e4cf8e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6086ed3
e4cf8e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6086ed3
 
 
 
 
 
 
 
e4cf8e8
6086ed3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# run_agentic_pipeline.py
import asyncio
import os
import json
import logging
from datetime import datetime
import pandas as pd
from typing import Dict, Any, Optional
import gradio as gr

os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

# Imports from your project structure
from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
from data_processing.analytics_data_processing import prepare_filtered_analytics_data 

try:
    from ui.insights_ui_generator import (
        format_report_to_markdown,
        extract_key_results_for_selection,
        format_single_okr_for_display
    )
    AGENTIC_MODULES_LOADED = True
except ImportError as e:
    logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
    AGENTIC_MODULES_LOADED = False
    async def run_full_analytics_orchestration_streaming(*args, **kwargs): 
        yield None
    def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable."
    def extract_key_results_for_selection(okrs_dict): return []
    def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable."

logger = logging.getLogger(__name__)

async def run_full_analytics_orchestration_streaming(
    token_state: Dict[str, Any],
    date_filter_selection: str,
    custom_start_date: Optional[datetime],
    custom_end_date: Optional[datetime]
):
    """
    Runs the full analytics pipeline with streaming results.
    Yields results as they become available.
    """
    if not GOOGLE_API_KEY:
        logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
        return

    logger.info("Starting streaming analytics orchestration process...")

    # 1. Prepare and filter data
    try:
        (
            filtered_posts_df,
            filtered_mentions_df,
            _date_filtered_follower_stats_df,
            raw_follower_stats_df,
            _start_dt,
            _end_dt
        ) = prepare_filtered_analytics_data(
            token_state, date_filter_selection, custom_start_date, custom_end_date
        )
        logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})")

    except Exception as e:
        logger.error(f"Error during data preparation: {e}", exc_info=True)
        return

    # Check if essential dataframes are empty
    if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
        logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")

    # 2. Initialize and run the orchestrator with streaming
    try:
        llm_model_for_run = "gemini-2.5-flash-preview-05-20"
        
        orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
            api_key=GOOGLE_API_KEY,
            llm_model_name=llm_model_for_run,
            current_date_for_tasks=datetime.utcnow().date()
        )

        logger.info("Orchestrator initialized. Starting streaming analysis...")
        
        # Use the new streaming method
        async for result in orchestrator.generate_full_analysis_and_tasks_streaming(
            follower_stats_df=raw_follower_stats_df,
            post_df=filtered_posts_df,
            mentions_df=filtered_mentions_df
        ):
            yield result

    except Exception as e:
        logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
        return

async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st, selected_key_result_ids_st, key_results_for_selection_st):
    logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
    
    # Initial state before pipeline runs or if skipped
    initial_yield = (
        gr.update(value="Pipeline AI: In attesa dei dati necessari..."),
        gr.update(choices=[], value=[], interactive=False),
        gr.update(value="Pipeline AI: In attesa dei dati necessari..."),
        orchestration_raw_results_st,
        selected_key_result_ids_st,
        key_results_for_selection_st,
        "Pipeline AI: In attesa dei dati..."
    )

    if not current_token_state_val or not current_token_state_val.get("token"):
        logging.info("Agentic pipeline: Token not available in token_state. Skipping.")
        yield initial_yield
        return

    logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
    
    # Update status to indicate processing
    yield (
        gr.update(value="Analisi AI (Sempre) in corso..."),
        gr.update(choices=[], value=[], interactive=False),
        gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
        orchestration_raw_results_st,
        selected_key_result_ids_st,
        key_results_for_selection_st,
        "Esecuzione pipeline AI (Sempre)..."
    )

    if not AGENTIC_MODULES_LOADED:
        logging.warning("Agentic modules not loaded. Skipping autonomous pipeline.")
        yield (
            gr.update(value="Moduli AI non caricati. Report non disponibile."),
            gr.update(choices=[], value=[], interactive=False),
            gr.update(value="Moduli AI non caricati. OKR non disponibili."),
            None, [], [], "Pipeline AI: Moduli non caricati."
        )
        return

    try:
        # Parameters for 'Sempre' filter for the agentic pipeline
        date_filter_val_agentic = "Sempre"
        custom_start_val_agentic = None
        custom_end_val_agentic = None

        # Use the streaming orchestration
        async for orchestration_output in run_full_analytics_orchestration_streaming(
            current_token_state_val,
            date_filter_val_agentic,
            custom_start_val_agentic,
            custom_end_val_agentic
        ):
            if not orchestration_output:
                continue
                
            status = orchestration_output.get("status", "unknown")
            
            if status == "report_ready":
                # Report is ready, but OKRs are not yet
                logging.info("Report ready, displaying preliminary results...")
                
                report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
                agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
                
                yield (
                    agentic_report_md_update,
                    gr.update(choices=[], value=[], interactive=False),  # Keep OKRs disabled
                    gr.update(value="OKR ancora in elaborazione..."),
                    orchestration_output,  # Store partial results
                    selected_key_result_ids_st,
                    key_results_for_selection_st,
                    "Report AI completato. Generazione OKR in corso..."
                )
            
            elif status == "complete":
                # Everything is ready
                logging.info("Complete results ready, displaying final output...")
                
                orchestration_results_update = orchestration_output
                report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
                agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))

                actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks')
                krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
                
                krs_for_selection_update = krs_for_ui_selection_list
                kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
                key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True)

                # Display all OKRs by default
                all_okrs_md_parts = []
                if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
                    for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
                        all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
                
                if not all_okrs_md_parts:
                    okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
                else:
                    okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
                
                selected_krs_update = []
                agentic_status_text = "Pipeline AI (Sempre) completata completamente."

                yield (
                    agentic_report_md_update,
                    key_results_cbg_update,
                    okr_detail_display_md_update,
                    orchestration_results_update,
                    selected_krs_update,
                    krs_for_selection_update,
                    agentic_status_text
                )
                break  # Final result yielded, exit the loop

    except Exception as e:
        logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
        agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
        yield (
            gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
            gr.update(choices=[], value=[], interactive=False),
            gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
            None, [], [], agentic_status_text
        )