GuglielmoTor commited on
Commit
8650d1b
·
verified ·
1 Parent(s): afab0ca

Update run_agentic_pipeline.py

Browse files
Files changed (1) hide show
  1. run_agentic_pipeline.py +113 -105
run_agentic_pipeline.py CHANGED
@@ -8,13 +8,25 @@ import pandas as pd
8
  from typing import Dict, Any, Optional
9
  import gradio as gr
10
 
 
 
 
 
 
 
 
 
11
  os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
12
  GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
13
  os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
14
 
15
  # Imports from your project structure
16
  from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
 
 
17
  from data_processing.analytics_data_processing import prepare_filtered_analytics_data
 
 
18
 
19
  try:
20
  from ui.insights_ui_generator import (
@@ -26,39 +38,48 @@ try:
26
  except ImportError as e:
27
  logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
28
  AGENTIC_MODULES_LOADED = False
29
- async def run_full_analytics_orchestration_streaming(*args, **kwargs):
30
- yield None
31
- def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable."
32
- def extract_key_results_for_selection(okrs_dict): return []
33
- def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable."
34
 
35
  logger = logging.getLogger(__name__)
36
 
37
- async def run_full_analytics_orchestration_streaming(
 
38
  token_state: Dict[str, Any],
39
  date_filter_selection: str,
40
  custom_start_date: Optional[datetime],
41
  custom_end_date: Optional[datetime]
42
- ):
43
  """
44
- Runs the full analytics pipeline with streaming results.
45
- Yields results as they become available.
 
 
 
 
 
 
 
 
46
  """
47
  if not GOOGLE_API_KEY:
48
  logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
49
- return
50
 
51
- logger.info("Starting streaming analytics orchestration process...")
52
 
53
  # 1. Prepare and filter data
54
  try:
55
  (
56
  filtered_posts_df,
57
  filtered_mentions_df,
58
- _date_filtered_follower_stats_df,
59
- raw_follower_stats_df,
60
- _start_dt,
61
- _end_dt
62
  ) = prepare_filtered_analytics_data(
63
  token_state, date_filter_selection, custom_start_date, custom_end_date
64
  )
@@ -66,48 +87,53 @@ async def run_full_analytics_orchestration_streaming(
66
 
67
  except Exception as e:
68
  logger.error(f"Error during data preparation: {e}", exc_info=True)
69
- return
70
 
71
- # Check if essential dataframes are empty
72
  if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
73
  logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
 
74
 
75
- # 2. Initialize and run the orchestrator with streaming
76
  try:
77
- llm_model_for_run = "gemini-2.5-flash-preview-05-20"
 
78
 
79
  orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
80
  api_key=GOOGLE_API_KEY,
81
- llm_model_name=llm_model_for_run,
82
  current_date_for_tasks=datetime.utcnow().date()
83
  )
84
 
85
- logger.info("Orchestrator initialized. Starting streaming analysis...")
86
-
87
- # Use the new streaming method
88
- async for result in orchestrator.generate_full_analysis_and_tasks_streaming(
89
- follower_stats_df=raw_follower_stats_df,
 
90
  post_df=filtered_posts_df,
91
  mentions_df=filtered_mentions_df
92
- ):
93
- yield result
 
94
 
95
  except Exception as e:
96
  logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
97
- return
 
98
 
99
- async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st, selected_key_result_ids_st, key_results_for_selection_st):
 
100
  logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
101
-
102
  # Initial state before pipeline runs or if skipped
103
  initial_yield = (
104
- gr.update(value="Pipeline AI: In attesa dei dati necessari..."),
105
- gr.update(choices=[], value=[], interactive=False),
106
- gr.update(value="Pipeline AI: In attesa dei dati necessari..."),
107
- orchestration_raw_results_st,
108
- selected_key_result_ids_st,
109
- key_results_for_selection_st,
110
- "Pipeline AI: In attesa dei dati..."
111
  )
112
 
113
  if not current_token_state_val or not current_token_state_val.get("token"):
@@ -116,15 +142,14 @@ async def run_agentic_pipeline_autonomously(current_token_state_val, orchestrati
116
  return
117
 
118
  logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
119
-
120
  # Update status to indicate processing
121
  yield (
122
  gr.update(value="Analisi AI (Sempre) in corso..."),
123
- gr.update(choices=[], value=[], interactive=False),
124
  gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
125
- orchestration_raw_results_st,
126
- selected_key_result_ids_st,
127
- key_results_for_selection_st,
128
  "Esecuzione pipeline AI (Sempre)..."
129
  )
130
 
@@ -144,75 +169,58 @@ async def run_agentic_pipeline_autonomously(current_token_state_val, orchestrati
144
  custom_start_val_agentic = None
145
  custom_end_val_agentic = None
146
 
147
- # Use the streaming orchestration
148
- async for orchestration_output in run_full_analytics_orchestration_streaming(
149
  current_token_state_val,
150
  date_filter_val_agentic,
151
  custom_start_val_agentic,
152
  custom_end_val_agentic
153
- ):
154
- if not orchestration_output:
155
- continue
156
-
157
- status = orchestration_output.get("status", "unknown")
 
 
 
 
 
 
158
 
159
- if status == "report_ready":
160
- # Report is ready, but OKRs are not yet
161
- logging.info("Report ready, displaying preliminary results...")
162
-
163
- report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
164
- agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
165
-
166
- yield (
167
- agentic_report_md_update,
168
- gr.update(choices=[], value=[], interactive=False), # Keep OKRs disabled
169
- gr.update(value="OKR ancora in elaborazione..."),
170
- orchestration_output, # Store partial results
171
- selected_key_result_ids_st,
172
- key_results_for_selection_st,
173
- "Report AI completato. Generazione OKR in corso..."
174
- )
175
 
176
- elif status == "complete":
177
- # Everything is ready
178
- logging.info("Complete results ready, displaying final output...")
179
-
180
- orchestration_results_update = orchestration_output
181
- report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
182
- agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
183
-
184
- actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks')
185
- krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs)
186
-
187
- krs_for_selection_update = krs_for_ui_selection_list
188
- kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
189
- key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True)
190
-
191
- # Display all OKRs by default
192
- all_okrs_md_parts = []
193
- if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
194
- for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
195
- all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
196
-
197
- if not all_okrs_md_parts:
198
- okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
199
- else:
200
- okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
201
-
202
- selected_krs_update = []
203
- agentic_status_text = "Pipeline AI (Sempre) completata completamente."
204
-
205
- yield (
206
- agentic_report_md_update,
207
- key_results_cbg_update,
208
- okr_detail_display_md_update,
209
- orchestration_results_update,
210
- selected_krs_update,
211
- krs_for_selection_update,
212
- agentic_status_text
213
- )
214
- break # Final result yielded, exit the loop
215
 
 
 
 
 
 
 
 
 
 
216
  except Exception as e:
217
  logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
218
  agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
@@ -220,5 +228,5 @@ async def run_agentic_pipeline_autonomously(current_token_state_val, orchestrati
220
  gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
221
  gr.update(choices=[], value=[], interactive=False),
222
  gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
223
- None, [], [], agentic_status_text
224
  )
 
8
  from typing import Dict, Any, Optional
9
  import gradio as gr
10
 
11
+ # Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder
12
+ # If 'insight_and_tasks' is not in python path, you might need to adjust sys.path
13
+ # For example, if insight_and_tasks is a sibling of the dir containing this file:
14
+ # import sys
15
+ # script_dir = os.path.dirname(os.path.abspath(__file__))
16
+ # project_root = os.path.dirname(script_dir) # Or navigate to the correct root
17
+ # sys.path.insert(0, project_root)
18
+
19
  os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
20
  GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
21
  os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
22
 
23
  # Imports from your project structure
24
  from features.insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
25
+ # setup_logging might be called in app.py, if not, call it here or ensure it's called once.
26
+ # from insight_and_tasks.utils.logging_config import setup_logging
27
  from data_processing.analytics_data_processing import prepare_filtered_analytics_data
28
+ # Placeholder for UI generator import - to be created later
29
+ # from .insights_ui_generator import format_orchestration_results_for_ui
30
 
31
  try:
32
  from ui.insights_ui_generator import (
 
38
  except ImportError as e:
39
  logging.error(f"Could not import agentic pipeline modules: {e}. Tabs 3 and 4 will be disabled.")
40
  AGENTIC_MODULES_LOADED = False
41
+ async def run_full_analytics_orchestration(*args, **kwargs): return None # Placeholder
42
+ def format_report_to_markdown(report_string): return "Agentic modules not loaded. Report unavailable." # Placeholder
43
+ def extract_key_results_for_selection(okrs_dict): return [] # Placeholder
44
+ def format_single_okr_for_display(okr_data, **kwargs): return "Agentic modules not loaded. OKR display unavailable." # Placeholder
45
+
46
 
47
  logger = logging.getLogger(__name__)
48
 
49
+
50
+ async def run_full_analytics_orchestration(
51
  token_state: Dict[str, Any],
52
  date_filter_selection: str,
53
  custom_start_date: Optional[datetime],
54
  custom_end_date: Optional[datetime]
55
+ ) -> Optional[Dict[str, Any]]:
56
  """
57
+ Runs the full analytics pipeline using data from token_state and date filters,
58
+ and returns the raw orchestration results.
59
+ Args:
60
+ token_state: Gradio token_state containing raw data and config.
61
+ date_filter_selection: String for date filter type.
62
+ custom_start_date: Optional custom start date.
63
+ custom_end_date: Optional custom end date.
64
+ Returns:
65
+ A dictionary containing the results from the analytics orchestrator,
66
+ or None if a critical error occurs.
67
  """
68
  if not GOOGLE_API_KEY:
69
  logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
70
+ return None
71
 
72
+ logger.info("Starting full analytics orchestration process...")
73
 
74
  # 1. Prepare and filter data
75
  try:
76
  (
77
  filtered_posts_df,
78
  filtered_mentions_df,
79
+ _date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series
80
+ raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics
81
+ _start_dt, # Filtered start date, for logging or context if needed
82
+ _end_dt # Filtered end date
83
  ) = prepare_filtered_analytics_data(
84
  token_state, date_filter_selection, custom_start_date, custom_end_date
85
  )
 
87
 
88
  except Exception as e:
89
  logger.error(f"Error during data preparation: {e}", exc_info=True)
90
+ return None
91
 
92
+ # Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous
93
  if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
94
  logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
95
+ # Depending on requirements, you might return a specific message or empty results structure.
96
 
97
+ # 2. Initialize and run the orchestrator
98
  try:
99
+ # You can pass a specific model name or let the orchestrator use its default
100
+ llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state
101
 
102
  orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
103
  api_key=GOOGLE_API_KEY,
104
+ llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default
105
  current_date_for_tasks=datetime.utcnow().date()
106
  )
107
 
108
+ logger.info("Orchestrator initialized. Generating full analysis and tasks...")
109
+ # The orchestrator expects the primary follower stats DF to be the one it can process for
110
+ # time-series ('follower_gains_monthly') and demographics.
111
+ # The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing.
112
+ orchestration_results = await orchestrator.generate_full_analysis_and_tasks(
113
+ follower_stats_df=raw_follower_stats_df, # Pass the full history for followers
114
  post_df=filtered_posts_df,
115
  mentions_df=filtered_mentions_df
116
+ )
117
+ logger.info("Orchestration process completed.")
118
+ return orchestration_results
119
 
120
  except Exception as e:
121
  logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
122
+ return None
123
+
124
 
125
+
126
+ async def run_agentic_pipeline_autonomously(current_token_state_val, orchestration_raw_results_st,selected_key_result_ids_st, key_results_for_selection_st):
127
  logging.info(f"Agentic pipeline check triggered for token_state update. Current token: {'Set' if current_token_state_val.get('token') else 'Not Set'}")
 
128
  # Initial state before pipeline runs or if skipped
129
  initial_yield = (
130
+ gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # agentic_report_display_md
131
+ gr.update(choices=[], value=[], interactive=False), # key_results_cbg
132
+ gr.update(value="Pipeline AI: In attesa dei dati necessari..."), # okr_detail_display_md
133
+ orchestration_raw_results_st, # Preserve current raw results
134
+ selected_key_result_ids_st, # Preserve current selection
135
+ key_results_for_selection_st, # Preserve current options
136
+ "Pipeline AI: In attesa dei dati..." # agentic_pipeline_status_md
137
  )
138
 
139
  if not current_token_state_val or not current_token_state_val.get("token"):
 
142
  return
143
 
144
  logging.info("Agentic pipeline starting autonomously with 'Sempre' filter.")
 
145
  # Update status to indicate processing
146
  yield (
147
  gr.update(value="Analisi AI (Sempre) in corso..."),
148
+ gr.update(choices=[], value=[], interactive=False), # Keep CBG disabled during run
149
  gr.update(value="Dettagli OKR (Sempre) in corso di generazione..."),
150
+ orchestration_raw_results_st, # Preserve
151
+ selected_key_result_ids_st, # Preserve
152
+ key_results_for_selection_st, # Preserve
153
  "Esecuzione pipeline AI (Sempre)..."
154
  )
155
 
 
169
  custom_start_val_agentic = None
170
  custom_end_val_agentic = None
171
 
172
+ orchestration_output = await run_full_analytics_orchestration(
 
173
  current_token_state_val,
174
  date_filter_val_agentic,
175
  custom_start_val_agentic,
176
  custom_end_val_agentic
177
+ )
178
+ agentic_status_text = "Pipeline AI (Sempre) completata."
179
+ logging.info(f"Autonomous agentic pipeline finished. Output keys: {orchestration_output.keys() if orchestration_output else 'None'}")
180
+
181
+ if orchestration_output:
182
+ orchestration_results_update = orchestration_output # Store full results in state
183
+ report_str = orchestration_output.get('comprehensive_analysis_report', "Nessun report dettagliato fornito.")
184
+ agentic_report_md_update = gr.update(value=format_report_to_markdown(report_str))
185
+
186
+ actionable_okrs = orchestration_output.get('actionable_okrs_and_tasks') # This is the dict containing 'okrs' list
187
+ krs_for_ui_selection_list = extract_key_results_for_selection(actionable_okrs) # Expects the dict
188
 
189
+ krs_for_selection_update = krs_for_ui_selection_list # Update state with list of KR dicts
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
 
191
+ # Choices for CheckboxGroup: list of (label, value) tuples
192
+ kr_choices_for_cbg = [(kr['kr_description'], kr['unique_kr_id']) for kr in krs_for_ui_selection_list]
193
+ key_results_cbg_update = gr.update(choices=kr_choices_for_cbg, value=[], interactive=True) # Reset selection
194
+
195
+ # Display all OKRs by default after pipeline run
196
+ all_okrs_md_parts = []
197
+ if actionable_okrs and isinstance(actionable_okrs.get("okrs"), list):
198
+ for okr_idx, okr_item in enumerate(actionable_okrs["okrs"]):
199
+ all_okrs_md_parts.append(format_single_okr_for_display(okr_item, accepted_kr_indices=None, okr_main_index=okr_idx))
200
+
201
+ if not all_okrs_md_parts:
202
+ okr_detail_display_md_update = gr.update(value="Nessun OKR generato o trovato (Sempre).")
203
+ else:
204
+ okr_detail_display_md_update = gr.update(value="\n\n---\n\n".join(all_okrs_md_parts))
205
+
206
+ selected_krs_update = [] # Reset selected KRs state
207
+ else:
208
+ agentic_report_md_update = gr.update(value="Nessun report generato dalla pipeline AI (Sempre).")
209
+ key_results_cbg_update = gr.update(choices=[], value=[], interactive=False)
210
+ okr_detail_display_md_update = gr.update(value="Nessun OKR generato o errore nella pipeline AI (Sempre).")
211
+ orchestration_results_update = None
212
+ selected_krs_update = []
213
+ krs_for_selection_update = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
214
 
215
+ yield (
216
+ agentic_report_md_update,
217
+ key_results_cbg_update,
218
+ okr_detail_display_md_update,
219
+ orchestration_results_update, # state
220
+ selected_krs_update, # state
221
+ krs_for_selection_update, # state
222
+ agentic_status_text
223
+ )
224
  except Exception as e:
225
  logging.error(f"Error during autonomous agentic pipeline execution: {e}", exc_info=True)
226
  agentic_status_text = f"Errore pipeline AI (Sempre): {str(e)}"
 
228
  gr.update(value=f"Errore generazione report AI (Sempre): {str(e)}"),
229
  gr.update(choices=[], value=[], interactive=False),
230
  gr.update(value=f"Errore generazione OKR AI (Sempre): {str(e)}"),
231
+ None, [], [], agentic_status_text # Reset states on error
232
  )