Spaces:
Running
Running
Create run_agentic_pipeline.py
Browse files- run_agentic_pipeline.py +173 -0
run_agentic_pipeline.py
ADDED
@@ -0,0 +1,173 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
# run_agentic_pipeline.py
|
2 |
+
import asyncio
|
3 |
+
import os
|
4 |
+
import json
|
5 |
+
import logging
|
6 |
+
from datetime import datetime
|
7 |
+
import pandas as pd
|
8 |
+
from typing import Dict, Any, Optional
|
9 |
+
|
10 |
+
# Assuming this script is at the same level as 'app.py' and 'insight_and_tasks/' is a subfolder
|
11 |
+
# If 'insight_and_tasks' is not in python path, you might need to adjust sys.path
|
12 |
+
# For example, if insight_and_tasks is a sibling of the dir containing this file:
|
13 |
+
# import sys
|
14 |
+
# script_dir = os.path.dirname(os.path.abspath(__file__))
|
15 |
+
# project_root = os.path.dirname(script_dir) # Or navigate to the correct root
|
16 |
+
# sys.path.insert(0, project_root)
|
17 |
+
|
18 |
+
|
19 |
+
# Imports from your project structure
|
20 |
+
from insight_and_tasks.orchestrators.linkedin_analytics_orchestrator import EnhancedLinkedInAnalyticsOrchestrator
|
21 |
+
# setup_logging might be called in app.py, if not, call it here or ensure it's called once.
|
22 |
+
# from insight_and_tasks.utils.logging_config import setup_logging
|
23 |
+
from .analytics_data_processing import prepare_filtered_analytics_data
|
24 |
+
# Placeholder for UI generator import - to be created later
|
25 |
+
# from .insights_ui_generator import format_orchestration_results_for_ui
|
26 |
+
|
27 |
+
logger = logging.getLogger(__name__)
|
28 |
+
|
29 |
+
# GOOGLE_API_KEY should be set in the environment where app.py runs
|
30 |
+
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
|
31 |
+
|
32 |
+
async def run_full_analytics_orchestration(
|
33 |
+
token_state: Dict[str, Any],
|
34 |
+
date_filter_selection: str,
|
35 |
+
custom_start_date: Optional[datetime],
|
36 |
+
custom_end_date: Optional[datetime]
|
37 |
+
) -> Optional[Dict[str, Any]]:
|
38 |
+
"""
|
39 |
+
Runs the full analytics pipeline using data from token_state and date filters,
|
40 |
+
and returns the raw orchestration results.
|
41 |
+
|
42 |
+
Args:
|
43 |
+
token_state: Gradio token_state containing raw data and config.
|
44 |
+
date_filter_selection: String for date filter type.
|
45 |
+
custom_start_date: Optional custom start date.
|
46 |
+
custom_end_date: Optional custom end date.
|
47 |
+
|
48 |
+
Returns:
|
49 |
+
A dictionary containing the results from the analytics orchestrator,
|
50 |
+
or None if a critical error occurs.
|
51 |
+
"""
|
52 |
+
if not GOOGLE_API_KEY:
|
53 |
+
logger.critical("GOOGLE_API_KEY is not set. Analytics pipeline cannot run.")
|
54 |
+
return None
|
55 |
+
|
56 |
+
logger.info("Starting full analytics orchestration process...")
|
57 |
+
|
58 |
+
# 1. Prepare and filter data
|
59 |
+
try:
|
60 |
+
(
|
61 |
+
filtered_posts_df,
|
62 |
+
filtered_mentions_df,
|
63 |
+
_date_filtered_follower_stats_df, # This might be used if FollowerAgent specifically needs pre-filtered time series
|
64 |
+
raw_follower_stats_df, # FollowerAgent typically processes raw historical for some metrics
|
65 |
+
_start_dt, # Filtered start date, for logging or context if needed
|
66 |
+
_end_dt # Filtered end date
|
67 |
+
) = prepare_filtered_analytics_data(
|
68 |
+
token_state, date_filter_selection, custom_start_date, custom_end_date
|
69 |
+
)
|
70 |
+
logger.info(f"Data prepared: Posts({len(filtered_posts_df)}), Mentions({len(filtered_mentions_df)}), FollowerStatsRaw({len(raw_follower_stats_df)})")
|
71 |
+
|
72 |
+
except Exception as e:
|
73 |
+
logger.error(f"Error during data preparation: {e}", exc_info=True)
|
74 |
+
return None
|
75 |
+
|
76 |
+
# Check if essential dataframes are empty after filtering, which might make analysis trivial or erroneous
|
77 |
+
if filtered_posts_df.empty and filtered_mentions_df.empty and raw_follower_stats_df.empty:
|
78 |
+
logger.warning("All essential DataFrames are empty after filtering. Orchestration might yield limited results.")
|
79 |
+
# Depending on requirements, you might return a specific message or empty results structure.
|
80 |
+
|
81 |
+
# 2. Initialize and run the orchestrator
|
82 |
+
try:
|
83 |
+
# You can pass a specific model name or let the orchestrator use its default
|
84 |
+
llm_model_for_run = "gemini-2.5-flash-preview-05-20" #token_state.get("config_llm_model_override") # Example: if you store this in token_state
|
85 |
+
|
86 |
+
orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
|
87 |
+
api_key=GOOGLE_API_KEY,
|
88 |
+
llm_model_name=llm_model_for_run, # Pass None to use orchestrator's default
|
89 |
+
current_date_for_tasks=datetime.utcnow().date()
|
90 |
+
)
|
91 |
+
|
92 |
+
logger.info("Orchestrator initialized. Generating full analysis and tasks...")
|
93 |
+
# The orchestrator expects the primary follower stats DF to be the one it can process for
|
94 |
+
# time-series ('follower_gains_monthly') and demographics.
|
95 |
+
# The `raw_follower_stats_df` is usually better for this, as FollowerAgent does its own processing.
|
96 |
+
orchestration_results = await orchestrator.generate_full_analysis_and_tasks(
|
97 |
+
follower_stats_df=raw_follower_stats_df, # Pass the full history for followers
|
98 |
+
post_df=filtered_posts_df,
|
99 |
+
mentions_df=filtered_mentions_df
|
100 |
+
)
|
101 |
+
logger.info("Orchestration process completed.")
|
102 |
+
return orchestration_results
|
103 |
+
|
104 |
+
except Exception as e:
|
105 |
+
logger.critical(f"Critical error during analytics orchestration: {e}", exc_info=True)
|
106 |
+
return None
|
107 |
+
|
108 |
+
# Example of how app.py might call this (for testing this module standalone)
|
109 |
+
if __name__ == "__main__":
|
110 |
+
# This block is for testing `run_analytics_pipeline.py` directly.
|
111 |
+
# In a real scenario, `app.py` would import and call `run_full_analytics_orchestration`.
|
112 |
+
|
113 |
+
# Ensure logging is set up for standalone test
|
114 |
+
from insight_and_tasks.utils.logging_config import setup_logging
|
115 |
+
setup_logging()
|
116 |
+
|
117 |
+
if not GOOGLE_API_KEY:
|
118 |
+
print("Please set the GOOGLE_API_KEY environment variable to run this test.")
|
119 |
+
else:
|
120 |
+
# Create mock token_state and filter parameters
|
121 |
+
mock_token_state = {
|
122 |
+
"bubble_posts_df": pd.DataFrame({
|
123 |
+
'id': [1, 2, 3], 'published_at': pd.to_datetime(['2023-01-01', '2023-07-15', '2023-12-31']),
|
124 |
+
'li_eb_label': ['A', 'B', 'A'], 'media_type': ['X', 'Y', 'X'], 'is_ad': [False, True, False],
|
125 |
+
'sentiment': ['pos', 'neg', 'neu'], 'engagement': [10,5,8], 'impressionCount': [100,50,80],
|
126 |
+
'clickCount': [1,2,3], 'likeCount': [8,3,6], 'commentCount': [1,1,1], 'shareCount': [1,1,1]
|
127 |
+
}),
|
128 |
+
"bubble_post_stats_df": pd.DataFrame({'post_id': [1, 2, 3]}), # Simplified
|
129 |
+
"bubble_mentions_df": pd.DataFrame({
|
130 |
+
'date': pd.to_datetime(['2023-06-01', '2023-08-20']), 'sentiment_label': ['Positive 👍', 'Negative 👎']
|
131 |
+
}),
|
132 |
+
"bubble_follower_stats_df": pd.DataFrame({
|
133 |
+
'category_name': ['2023-01-01', 'Industry A', '2023-02-01'],
|
134 |
+
'follower_count_organic': [100, 500, 120],
|
135 |
+
'follower_count_paid': [10, 50, 20],
|
136 |
+
'follower_count_type': ['follower_gains_monthly', 'follower_industry', 'follower_gains_monthly']
|
137 |
+
}),
|
138 |
+
"config_date_col_posts": "published_at",
|
139 |
+
"config_date_col_mentions": "date",
|
140 |
+
}
|
141 |
+
mock_date_filter = "Ultimi 365 Giorni" # Example, adjust prepare_filtered_analytics_data if new options
|
142 |
+
# For "Ultimi 365 Giorni", prepare_filtered_analytics_data needs to handle it or be updated.
|
143 |
+
# Let's use a known filter for testing:
|
144 |
+
mock_date_filter = "Sempre"
|
145 |
+
mock_custom_start = None
|
146 |
+
mock_custom_end = None
|
147 |
+
|
148 |
+
logger.info("Running standalone test of run_full_analytics_orchestration...")
|
149 |
+
|
150 |
+
async def test_run():
|
151 |
+
results = await run_full_analytics_orchestration(
|
152 |
+
mock_token_state, mock_date_filter, mock_custom_start, mock_custom_end
|
153 |
+
)
|
154 |
+
|
155 |
+
if results:
|
156 |
+
print("\n\n" + "="*30 + " MOCK ORCHESTRATION RESULTS " + "="*30)
|
157 |
+
print("\n--- Comprehensive Analysis Report ---")
|
158 |
+
print(results.get("comprehensive_analysis_report", "N/A"))
|
159 |
+
|
160 |
+
print("\n--- Actionable OKRs and Tasks ---")
|
161 |
+
okrs_data = results.get("actionable_okrs_and_tasks")
|
162 |
+
if okrs_data:
|
163 |
+
print(json.dumps(okrs_data, indent=2))
|
164 |
+
else:
|
165 |
+
print("N/A")
|
166 |
+
|
167 |
+
# Optionally print detailed metrics if needed for debugging
|
168 |
+
# print("\n--- Detailed Agent Metrics ---")
|
169 |
+
# print(json.dumps(results.get("detailed_metrics"), indent=2, default=str))
|
170 |
+
else:
|
171 |
+
print("Standalone test: Orchestration returned no results or failed.")
|
172 |
+
|
173 |
+
asyncio.run(test_run())
|