GuglielmoTor commited on
Commit
e4cf8e8
·
verified ·
1 Parent(s): dc94b38

Update run_agentic_pipeline.py

Browse files
Files changed (1) hide show
  1. run_agentic_pipeline.py +105 -115
run_agentic_pipeline.py CHANGED
@@ -8,25 +8,13 @@ import pandas as pd
8
  from typing import Dict, Any, Optional
9
  import gradio as gr
10
 
11
- # Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder
12
- # If 'insight_and_tasks' is not in python path, you might need to adjust sys.path
13
- # For example, if insight_and_tasks is a sibling of the dir containing this file:
14
- # import sys
15
- # script_dir = os.path.dirname(os.path.abspath(__file__))
16
- # project_root = os.path.dirname(script_dir) # Or navigate to the correct root
17
- # sys.path.insert(0, project_root)
18
-
19
  os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
20
  GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
21
  os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
22
 
23
  # Imports from your project structure
24
  from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
25
- # setup_logging might be called in app.py, if not, call it here or ensure it's called once.
26
- # from insight_and_tasks.utils.logging_config import setup_logging
27
  from data_processing.analytics_data_processing import prepare_filtered_analytics_data
28
- # Placeholder for UI generator import - to be created later
29
- # from .insights_ui_generator import format_orchestration_results_for_ui
30
 
31
  try:
32
  from ui.insights_ui_generator import (
@@ -38,50 +26,39 @@ try:
38
  except ImportError as e:
39
  logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
40
  AGENTIC_MODULES_LOADED = False
41
- async def run_full_analytics_orchestration(*args, **kwargs): return None # Placeholder
42
- def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." # Placeholder
43
- def extract_key_results_for_selection(okrs_dict): return [] # Placeholder
44
- def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." # Placeholder
45
-
46
 
47
  logger = logging.getLogger(__name__)
48
 
49
-
50
- async def run_full_analytics_orchestration(
51
  token_state: Dict[str, Any],
52
  date_filter_selection: str,
53
  custom_start_date: Optional[datetime],
54
  custom_end_date: Optional[datetime]
55
- ) -> Optional[Dict[str, Any]]:
56
  """
57
- Runs the full analytics pipeline using data from token_state and date filters,
58
- and returns the raw orchestration results.
59
-
60
- Args:
61
- token_state: Gradio token_state containing raw data and config.
62
- date_filter_selection: String for date filter type.
63
- custom_start_date: Optional custom start date.
64
- custom_end_date: Optional custom end date.
65
-
66
- Returns:
67
- A dictionary containing the results from the analytics orchestrator,
68
- or None if a critical error occurs.
69
  """
70
  if not GOOGLE_API_KEY:
71
  logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
72
- return None
73
 
74
- logger.info("Starting full analytics orchestration process...")
75
 
76
  # 1. Prepare and filter data
77
  try:
78
  (
79
  filtered_posts_df,
80
  filtered_mentions_df,
81
- _date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series
82
- raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics
83
- _start_dt, # Filtered start date, for logging or context if needed
84
- _end_dt # Filtered end date
85
  ) = prepare_filtered_analytics_data(
86
  token_state, date_filter_selection, custom_start_date, custom_end_date
87
  )
@@ -89,53 +66,48 @@ async def run_full_analytics_orchestration(
89
 
90
  except Exception as e:
91
  logger.error(f"Error during data preparation: {e}", exc_info=True)
92
- return None
93
 
94
- # Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous
95
  if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
96
  logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
97
- # Depending on requirements, you might return a specific message or empty results structure.
98
 
99
- # 2. Initialize and run the orchestrator
100
  try:
101
- # You can pass a specific model name or let the orchestrator use its default
102
- llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state
103
 
104
  orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
105
  api_key=GOOGLE_API_KEY,
106
- llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default
107
  current_date_for_tasks=datetime.utcnow().date()
108
  )
109
 
110
- logger.info("Orchestrator initialized. Generating full analysis and tasks...")
111
- # The orchestrator expects the primary follower stats DF to be the one it can process for
112
- # time-series ('follower_gains_monthly') and demographics.
113
- # The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing.
114
- orchestration_results = await orchestrator.generate_full_analysis_and_tasks(
115
- follower_stats_df=raw_follower_stats_df, # Pass the full history for followers
116
  post_df=filtered_posts_df,
117
  mentions_df=filtered_mentions_df
118
- )
119
- logger.info("Orchestration process completed.")
120
- return orchestration_results
121
 
122
  except Exception as e:
123
  logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
124
- return None
125
-
126
-
127
 
128
- async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st,selected_key_result_ids_st, key_results_for_selection_st):
129
  logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
 
130
  # Initial state before pipeline runs or if skipped
131
  initial_yield = (
132
- gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # agentic_report_display_md
133
- gr.update(choices=[], value=[], interactive=False), # key_results_cbg
134
- gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # okr_detail_display_md
135
- orchestration_raw_results_st, # Preserve current raw results
136
- selected_key_result_ids_st, # Preserve current selection
137
- key_results_for_selection_st, # Preserve current options
138
- "Pipeline AI: In attesa dei dati..." # agentic_pipeline_status_md
139
  )
140
 
141
  if not current_token_state_val or not current_token_state_val.get("token"):
@@ -144,14 +116,15 @@ async def run_agentic_pipeline_autonomously(current_token_state_val, orchestrati
144
  return
145
 
146
  logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
 
147
  # Update status to indicate processing
148
  yield (
149
  gr.update(value="Analisi AI (Sempre) in corso..."),
150
- gr.update(choices=[], value=[], interactive=False), # Keep CBG disabled during run
151
  gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
152
- orchestration_raw_results_st, # Preserve
153
- selected_key_result_ids_st, # Preserve
154
- key_results_for_selection_st, # Preserve
155
  "Esecuzione pipeline AI (Sempre)..."
156
  )
157
 
@@ -171,58 +144,75 @@ async def run_agentic_pipeline_autonomously(current_token_state_val, orchestrati
171
  custom_start_val_agentic = None
172
  custom_end_val_agentic = None
173
 
174
- orchestration_output = await run_full_analytics_orchestration(
 
175
  current_token_state_val,
176
  date_filter_val_agentic,
177
  custom_start_val_agentic,
178
  custom_end_val_agentic
179
- )
180
- agentic_status_text = "Pipeline AI (Sempre) completata."
181
- logging.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}")
182
-
183
- if orchestration_output:
184
- orchestration_results_update = orchestration_output # Store full results in state
185
- report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
186
- agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
187
-
188
- actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') # This is the dict containing 'okrs' list
189
- krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) # Expects the dict
190
 
191
- krs_for_selection_update = krs_for_ui_selection_list # Update state with list of KR dicts
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192
 
193
- # Choices for CheckboxGroup: list of (label, value) tuples
194
- kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
195
- key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True) # Reset selection
196
-
197
- # Display all OKRs by default after pipeline run
198
- all_okrs_md_parts = []
199
- if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
200
- for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
201
- all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
202
-
203
- if not all_okrs_md_parts:
204
- okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
205
- else:
206
- okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
207
-
208
- selected_krs_update = [] # Reset selected KRs state
209
- else:
210
- agentic_report_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).")
211
- key_results_cbg_update = gr.update(choices=[], value=[], interactive=False)
212
- okr_detail_display_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).")
213
- orchestration_results_update = None
214
- selected_krs_update = []
215
- krs_for_selection_update = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
216
 
217
- yield (
218
- agentic_report_md_update,
219
- key_results_cbg_update,
220
- okr_detail_display_md_update,
221
- orchestration_results_update, # state
222
- selected_krs_update, # state
223
- krs_for_selection_update, # state
224
- agentic_status_text
225
- )
226
  except Exception as e:
227
  logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
228
  agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
@@ -230,5 +220,5 @@ async def run_agentic_pipeline_autonomously(current_token_state_val, orchestrati
230
  gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
231
  gr.update(choices=[], value=[], interactive=False),
232
  gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
233
- None, [], [], agentic_status_text # Reset states on error
234
  )
 
8
  from typing import Dict, Any, Optional
9
  import gradio as gr
10
 
 
 
 
 
 
 
 
 
11
  os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
12
  GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
13
  os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
14
 
15
  # Imports from your project structure
16
  from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
 
 
17
  from data_processing.analytics_data_processing import prepare_filtered_analytics_data
 
 
18
 
19
  try:
20
  from ui.insights_ui_generator import (
 
26
  except ImportError as e:
27
  logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
28
  AGENTIC_MODULES_LOADED = False
29
+ async def run_full_analytics_orchestration_streaming(*args, **kwargs):
30
+ yield None
31
+ def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable."
32
+ def extract_key_results_for_selection(okrs_dict): return []
33
+ def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable."
34
 
35
  logger = logging.getLogger(__name__)
36
 
37
+ async def run_full_analytics_orchestration_streaming(
 
38
  token_state: Dict[str, Any],
39
  date_filter_selection: str,
40
  custom_start_date: Optional[datetime],
41
  custom_end_date: Optional[datetime]
42
+ ):
43
  """
44
+ Runs the full analytics pipeline with streaming results.
45
+ Yields results as they become available.
 
 
 
 
 
 
 
 
 
 
46
  """
47
  if not GOOGLE_API_KEY:
48
  logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
49
+ return
50
 
51
+ logger.info("Starting streaming analytics orchestration process...")
52
 
53
  # 1. Prepare and filter data
54
  try:
55
  (
56
  filtered_posts_df,
57
  filtered_mentions_df,
58
+ _date_filtered_follower_stats_df,
59
+ raw_follower_stats_df,
60
+ _start_dt,
61
+ _end_dt
62
  ) = prepare_filtered_analytics_data(
63
  token_state, date_filter_selection, custom_start_date, custom_end_date
64
  )
 
66
 
67
  except Exception as e:
68
  logger.error(f"Error during data preparation: {e}", exc_info=True)
69
+ return
70
 
71
+ # Check if essential dataframes are empty
72
  if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
73
  logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
 
74
 
75
+ # 2. Initialize and run the orchestrator with streaming
76
  try:
77
+ llm_model_for_run = "gemini-2.5-flash-preview-05-20"
 
78
 
79
  orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
80
  api_key=GOOGLE_API_KEY,
81
+ llm_model_name=llm_model_for_run,
82
  current_date_for_tasks=datetime.utcnow().date()
83
  )
84
 
85
+ logger.info("Orchestrator initialized. Starting streaming analysis...")
86
+
87
+ # Use the new streaming method
88
+ async for result in orchestrator.generate_full_analysis_and_tasks_streaming(
89
+ follower_stats_df=raw_follower_stats_df,
 
90
  post_df=filtered_posts_df,
91
  mentions_df=filtered_mentions_df
92
+ ):
93
+ yield result
 
94
 
95
  except Exception as e:
96
  logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
97
+ return
 
 
98
 
99
+ async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st, selected_key_result_ids_st, key_results_for_selection_st):
100
  logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
101
+
102
  # Initial state before pipeline runs or if skipped
103
  initial_yield = (
104
+ gr.update(value="Pipeline AI: In attesa dei dati necessari..."),
105
+ gr.update(choices=[], value=[], interactive=False),
106
+ gr.update(value="Pipeline AI: In attesa dei dati necessari..."),
107
+ orchestration_raw_results_st,
108
+ selected_key_result_ids_st,
109
+ key_results_for_selection_st,
110
+ "Pipeline AI: In attesa dei dati..."
111
  )
112
 
113
  if not current_token_state_val or not current_token_state_val.get("token"):
 
116
  return
117
 
118
  logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
119
+
120
  # Update status to indicate processing
121
  yield (
122
  gr.update(value="Analisi AI (Sempre) in corso..."),
123
+ gr.update(choices=[], value=[], interactive=False),
124
  gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
125
+ orchestration_raw_results_st,
126
+ selected_key_result_ids_st,
127
+ key_results_for_selection_st,
128
  "Esecuzione pipeline AI (Sempre)..."
129
  )
130
 
 
144
  custom_start_val_agentic = None
145
  custom_end_val_agentic = None
146
 
147
+ # Use the streaming orchestration
148
+ async for orchestration_output in run_full_analytics_orchestration_streaming(
149
  current_token_state_val,
150
  date_filter_val_agentic,
151
  custom_start_val_agentic,
152
  custom_end_val_agentic
153
+ ):
154
+ if not orchestration_output:
155
+ continue
156
+
157
+ status = orchestration_output.get("status", "unknown")
 
 
 
 
 
 
158
 
159
+ if status == "report_ready":
160
+ # Report is ready, but OKRs are not yet
161
+ logging.info("Report ready, displaying preliminary results...")
162
+
163
+ report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
164
+ agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
165
+
166
+ yield (
167
+ agentic_report_md_update,
168
+ gr.update(choices=[], value=[], interactive=False), # Keep OKRs disabled
169
+ gr.update(value="OKR ancora in elaborazione..."),
170
+ orchestration_output, # Store partial results
171
+ selected_key_result_ids_st,
172
+ key_results_for_selection_st,
173
+ "Report AI completato. Generazione OKR in corso..."
174
+ )
175
 
176
+ elif status == "complete":
177
+ # Everything is ready
178
+ logging.info("Complete results ready, displaying final output...")
179
+
180
+ orchestration_results_update = orchestration_output
181
+ report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
182
+ agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
183
+
184
+ actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks')
185
+ krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
186
+
187
+ krs_for_selection_update = krs_for_ui_selection_list
188
+ kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
189
+ key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True)
190
+
191
+ # Display all OKRs by default
192
+ all_okrs_md_parts = []
193
+ if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
194
+ for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
195
+ all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
196
+
197
+ if not all_okrs_md_parts:
198
+ okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
199
+ else:
200
+ okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
201
+
202
+ selected_krs_update = []
203
+ agentic_status_text = "Pipeline AI (Sempre) completata completamente."
204
+
205
+ yield (
206
+ agentic_report_md_update,
207
+ key_results_cbg_update,
208
+ okr_detail_display_md_update,
209
+ orchestration_results_update,
210
+ selected_krs_update,
211
+ krs_for_selection_update,
212
+ agentic_status_text
213
+ )
214
+ break # Final result yielded, exit the loop
215
 
 
 
 
 
 
 
 
 
 
216
  except Exception as e:
217
  logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
218
  agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
 
220
  gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
221
  gr.update(choices=[], value=[], interactive=False),
222
  gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
223
+ None, [], [], agentic_status_text
224
  )