GuglielmoTor commited on
Commit
b2864a2
·
verified ·
1 Parent(s): b45a5ab

Create orchestrators/linkedin_analytics_orchestrator.py

Browse files
insight_and_tasks/orchestrators/linkedin_analytics_orchestrator.py ADDED
@@ -0,0 +1,292 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # orchestrators/linkedin_analytics_orchestrator.py
2
+ import pandas as pd
3
+ import logging
4
+ from typing import Dict, Any, Optional
5
+ from datetime import date, datetime # For TaskExtractionAgent date
6
+ from dataclasses import asdict # For converting AgentMetrics to dict if needed for final output
7
+
8
+ # Project-specific imports
9
+ from utils.pandasai_setup import configure_pandasai # Centralized PandasAI config
10
+ from coordinators.employer_branding_coordinator import EnhancedEmployerBrandingCoordinator
11
+ from agents.task_extraction_agent import TaskExtractionAgent
12
+ from data_models.metrics import AgentMetrics # For type hinting
13
+ from data_models.tasks import TaskExtractionOutput # For type hinting
14
+
15
+ # Configure logger for this module
16
+ logger = logging.getLogger(__name__)
17
+
18
+ class EnhancedLinkedInAnalyticsOrchestrator:
19
+ """
20
+ Orchestrates the end-to-end LinkedIn analytics process, from data input through
21
+ specialized agent analysis, coordinator synthesis, and actionable task extraction.
22
+ """
23
+
24
+ def __init__(self, api_key: str, llm_model_name: Optional[str] = None, current_date_for_tasks: Optional[date] = None):
25
+ """
26
+ Initializes the orchestrator.
27
+
28
+ Args:
29
+ api_key: The API key for Google services (used by PandasAI and LlmAgents).
30
+ llm_model_name: Optional. The primary LLM model name to be used by agents.
31
+ Specific agents/coordinator might override with their defaults if not set.
32
+ current_date_for_tasks: Optional. The date to be used by TaskExtractionAgent for quarter calculations. Defaults to today.
33
+ """
34
+ self.api_key = api_key
35
+ self.llm_model_name = llm_model_name # Can be passed down or agents use their defaults
36
+
37
+ # Configure PandasAI globally at the start of orchestration.
38
+ # Pass the model_name if specified, otherwise pandasai_setup might use its own default.
39
+ try:
40
+ configure_pandasai(api_key=self.api_key, model_name=self.llm_model_name)
41
+ logger.info(f"PandasAI configured by orchestrator with model hint: {self.llm_model_name or 'default'}.")
42
+ except Exception as e:
43
+ logger.error(f"Failed to configure PandasAI in orchestrator: {e}", exc_info=True)
44
+ # Decide if this is a critical failure or if agents can proceed (they might try to reconfigure)
45
+
46
+ # Initialize the coordinator, which in turn initializes its specialized agents.
47
+ # Pass the model_name hint to the coordinator.
48
+ self.coordinator = EnhancedEmployerBrandingCoordinator(api_key=self.api_key, model_name=self.llm_model_name)
49
+
50
+ # Initialize the TaskExtractionAgent.
51
+ # It uses its own default model unless overridden here.
52
+ self.task_extractor = TaskExtractionAgent(
53
+ api_key=self.api_key,
54
+ model_name=self.llm_model_name, # Pass model hint
55
+ current_date=current_date_for_tasks # Defaults to today if None
56
+ )
57
+ logger.info("EnhancedLinkedInAnalyticsOrchestrator initialized.")
58
+
59
+ async def generate_full_analysis_and_tasks(
60
+ self,
61
+ follower_stats_df: pd.DataFrame,
62
+ post_df: pd.DataFrame,
63
+ mentions_df: pd.DataFrame
64
+ ) -> Dict[str, Any]:
65
+ """
66
+ Executes the full pipeline: agent analyses, coordinator synthesis, and task extraction.
67
+
68
+ Args:
69
+ follower_stats_df: DataFrame containing follower statistics.
70
+ post_df: DataFrame containing post performance data.
71
+ mentions_df: DataFrame containing brand mentions data.
72
+
73
+ Returns:
74
+ A dictionary containing the comprehensive analysis text, actionable tasks (OKRs),
75
+ and the detailed metrics from each specialized agent.
76
+ """
77
+ logger.info("Starting full analysis and task generation pipeline...")
78
+
79
+ # Step 1: Get analyses and metrics from specialized agents.
80
+ # The coordinator's internal agents are used here.
81
+ logger.info("Running follower analysis...")
82
+ follower_agent_metrics: AgentMetrics = self.coordinator.follower_agent.analyze_follower_data(follower_stats_df)
83
+ logger.info(f"Follower analysis complete. Summary: {follower_agent_metrics.analysis_summary[:100]}...")
84
+
85
+ logger.info("Running post performance analysis...")
86
+ post_agent_metrics: AgentMetrics = self.coordinator.post_agent.analyze_post_data(post_df)
87
+ logger.info(f"Post analysis complete. Summary: {post_agent_metrics.analysis_summary[:100]}...")
88
+
89
+ logger.info("Running mentions analysis...")
90
+ mentions_agent_metrics: AgentMetrics = self.coordinator.mentions_agent.analyze_mentions_data(mentions_df)
91
+ logger.info(f"Mentions analysis complete. Summary: {mentions_agent_metrics.analysis_summary[:100]}...")
92
+
93
+ # Step 2: Coordinator synthesizes these metrics into a comprehensive analysis text.
94
+ logger.info("Running coordinator for synthesis...")
95
+ comprehensive_analysis_text: str = await self.coordinator.generate_comprehensive_analysis(
96
+ follower_metrics, post_metrics, mentions_metrics
97
+ )
98
+ logger.info(f"Coordinator synthesis complete. Report length: {len(comprehensive_analysis_text)} chars.")
99
+ if not comprehensive_analysis_text or comprehensive_analysis_text.startswith("Error"):
100
+ logger.error(f"Coordinator synthesis failed or produced an error message: {comprehensive_analysis_text}")
101
+ # Potentially stop here or proceed with task extraction on whatever text was generated.
102
+
103
+ # Step 3: TaskExtractionAgent extracts actionable tasks (OKRs) from the comprehensive text.
104
+ logger.info("Running task extraction...")
105
+ actionable_tasks_okrs: TaskExtractionOutput = await self.task_extractor.extract_tasks(comprehensive_analysis_text)
106
+ logger.info(f"Task extraction complete. Number of OKRs: {len(actionable_tasks_okrs.okrs) if actionable_tasks_okrs else 'Error'}.")
107
+
108
+ # Step 4: Compile and return all results.
109
+ # Convert Pydantic/dataclass objects to dicts for easier JSON serialization if the final output needs it.
110
+ # The `actionable_tasks_okrs` is already a Pydantic model, which can be serialized with .model_dump() / .json().
111
+ # `AgentMetrics` are dataclasses, use `asdict`.
112
+
113
+ final_results = {
114
+ "comprehensive_analysis_report": comprehensive_analysis_text,
115
+ "actionable_okrs_and_tasks": actionable_tasks_okrs.model_dump() if actionable_tasks_okrs else None, # Pydantic v2
116
+ # "actionable_okrs_and_tasks": actionable_tasks_okrs.dict() if actionable_tasks_okrs else None, # Pydantic v1
117
+ "detailed_metrics": {
118
+ "follower_agent": asdict(follower_agent_metrics) if follower_agent_metrics else None,
119
+ "post_agent": asdict(post_agent_metrics) if post_agent_metrics else None,
120
+ "mentions_agent": asdict(mentions_agent_metrics) if mentions_agent_metrics else None,
121
+ }
122
+ }
123
+ logger.info("Full analysis and task generation pipeline finished successfully.")
124
+ return final_results
125
+
126
+ # Example usage (similar to the original script's main execution block)
127
+ if __name__ == '__main__':
128
+ import asyncio
129
+ import os
130
+ from utils.logging_config import setup_logging
131
+ from utils.data_fetching import fetch_linkedin_data_from_bubble, VALID_DATA_TYPES
132
+
133
+ setup_logging() # Configure logging for the application
134
+
135
+ # --- Configuration ---
136
+ # Attempt to get API key from environment variable
137
+ # IMPORTANT: Set GOOGLE_API_KEY and BUBBLE_API_KEY in your environment for this to run.
138
+ GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
139
+ BUBBLE_API_KEY_ENV = os.environ.get("BUBBLE_API_KEY") # Used by data_fetching
140
+
141
+ if not GOOGLE_API_KEY:
142
+ logger.critical("GOOGLE_API_KEY environment variable not set. Orchestrator cannot initialize LLM agents.")
143
+ exit(1)
144
+ if not BUBBLE_API_KEY_ENV: # data_fetching will also check, but good to note here
145
+ logger.warning("BUBBLE_API_KEY environment variable not set. Data fetching from Bubble will fail.")
146
+ # You might want to exit or use mock data if Bubble is essential.
147
+
148
+ # Set the Google Vertex AI environment variable if not using Vertex AI (as in original)
149
+ os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
150
+
151
+ # Orchestrator settings
152
+ ORG_URN_EXAMPLE = "urn:li:organization:19010008" # Example, replace with actual
153
+ # Specify a model or let orchestrator/agents use their defaults
154
+ # LLM_MODEL_FOR_ORCHESTRATION = "gemini-2.5-flash-preview-05-20" # Example: use a powerful model
155
+ LLM_MODEL_FOR_ORCHESTRATION = None # Let agents use their defaults or pass a specific one
156
+
157
+ # --- Initialize Orchestrator ---
158
+ orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
159
+ api_key=GOOGLE_API_KEY,
160
+ llm_model_name=LLM_MODEL_FOR_ORCHESTRATION,
161
+ current_date_for_tasks=datetime.utcnow().date() # Use today for task planning
162
+ )
163
+
164
+ # --- Data Fetching ---
165
+ logger.info(f"Fetching data for organization URN: {ORG_URN_EXAMPLE}")
166
+
167
+ # Helper to fetch and log
168
+ def get_data(data_type: VALID_DATA_TYPES, org_urn: str) -> pd.DataFrame:
169
+ df, error = fetch_linkedin_data_from_bubble(org_urn=org_urn, data_type=data_type)
170
+ if error:
171
+ logger.error(f"Error fetching {data_type}: {error}. Using empty DataFrame.")
172
+ return pd.DataFrame()
173
+ if df is None: # Should not happen if error is None, but as a safeguard
174
+ logger.warning(f"Fetched {data_type} is None but no error reported. Using empty DataFrame.")
175
+ return pd.DataFrame()
176
+ logger.info(f"Successfully fetched {data_type} with {len(df)} rows.")
177
+ return df
178
+
179
+ follower_stats_df_raw = get_data("li_follower_stats", ORG_URN_EXAMPLE)
180
+ posts_df_raw = get_data("LI_posts", ORG_URN_EXAMPLE) # Contains post content, media_type, etc.
181
+ mentions_df_raw = get_data("Li_mentions", ORG_URN_EXAMPLE)
182
+ post_stats_df_raw = get_data("LI_post_stats", ORG_URN_EXAMPLE) # Contains engagement numbers for posts
183
+
184
+ # --- Data Preprocessing/Merging (as in original example) ---
185
+
186
+ # Select relevant columns for follower_stats_df
187
+ if not follower_stats_df_raw.empty:
188
+ follower_stats_df = follower_stats_df_raw[[
189
+ 'category_name', "follower_count_organic", "follower_count_paid", "follower_count_type"
190
+ ]].copy()
191
+ else:
192
+ follower_stats_df = pd.DataFrame() # Ensure it's an empty DF if raw is empty
193
+
194
+ # Merge posts_df and post_stats_df
195
+ # This logic assumes 'id' in posts_df_raw and 'post_id' in post_stats_df_raw
196
+ merged_posts_df = pd.DataFrame()
197
+ if not posts_df_raw.empty and not post_stats_df_raw.empty:
198
+ if 'id' in posts_df_raw.columns and 'post_id' in post_stats_df_raw.columns:
199
+ # Ensure 'id' in posts_df_raw is unique before merge if it's a left table key
200
+ # posts_df_raw.drop_duplicates(subset=['id'], keep='first', inplace=True)
201
+ merged_posts_df = pd.merge(posts_df_raw, post_stats_df_raw, left_on='id', right_on='post_id', how='left', suffixes=('', '_stats'))
202
+ logger.info(f"Merged posts_df ({len(posts_df_raw)}) and post_stats_df ({len(post_stats_df_raw)}) into merged_posts_df ({len(merged_posts_df)}).")
203
+ else:
204
+ logger.warning("Cannot merge posts_df and post_stats_df due to missing 'id' or 'post_id'. Using posts_df_raw.")
205
+ merged_posts_df = posts_df_raw.copy() # Fallback to posts_df_raw
206
+ elif not posts_df_raw.empty:
207
+ logger.info("post_stats_df is empty. Using posts_df_raw for post analysis.")
208
+ merged_posts_df = posts_df_raw.copy()
209
+ else:
210
+ logger.warning("Both posts_df_raw and post_stats_df_raw are empty.")
211
+ merged_posts_df = pd.DataFrame() # Empty DF
212
+
213
+ # Select and ensure essential columns for merged_posts_df
214
+ # These are columns expected by EnhancedPostPerformanceAgent
215
+ expected_post_cols = [
216
+ 'li_eb_label', 'media_type', 'is_ad', 'id', 'published_at', 'sentiment',
217
+ 'engagement', 'impressionCount', 'clickCount', 'likeCount', 'commentCount', 'shareCount'
218
+ ]
219
+ if not merged_posts_df.empty:
220
+ final_post_df_cols = {}
221
+ for col in expected_post_cols:
222
+ if col in merged_posts_df.columns:
223
+ final_post_df_cols[col] = merged_posts_df[col]
224
+ elif f"{col}_stats" in merged_posts_df.columns: # Check for suffixed columns from merge
225
+ final_post_df_cols[col] = merged_posts_df[f"{col}_stats"]
226
+ else:
227
+ logger.debug(f"Expected column '{col}' not found in merged_posts_df. Will be created as empty/default by agent if needed.")
228
+ # Agent preprocessing should handle missing columns by creating them with defaults (0 or 'Unknown')
229
+
230
+ # Create the final DataFrame with only the selected/available columns
231
+ # This ensures that if a column is missing, it doesn't cause an error here,
232
+ # but the agent's preprocessing will handle it.
233
+ # However, it's better to ensure they exist with NAs if the agent expects them.
234
+ temp_post_df = pd.DataFrame(final_post_df_cols)
235
+ # Ensure all expected columns are present, filling with NA if missing from selection
236
+ for col in expected_post_cols:
237
+ if col not in temp_post_df.columns:
238
+ temp_post_df[col] = pd.NA # Or appropriate default like 0 for numeric, 'Unknown' for categorical
239
+ merged_posts_df = temp_post_df[expected_post_cols].copy() # Ensure correct order and all columns
240
+
241
+ else: # If merged_posts_df started empty and stayed empty
242
+ merged_posts_df = pd.DataFrame(columns=expected_post_cols)
243
+
244
+
245
+ # Mentions DataFrame - select relevant columns if necessary, or pass as is
246
+ # Assuming mentions_df_raw is already in the correct shape or agent handles it.
247
+ # For example, if it needs specific columns:
248
+ # mentions_df = mentions_df_raw[['date', 'sentiment_label', 'mention_content']].copy() if not mentions_df_raw.empty else pd.DataFrame()
249
+ mentions_df = mentions_df_raw.copy() # Pass as is, agent will preprocess
250
+
251
+
252
+ # --- Run Orchestration ---
253
+ async def main_orchestration():
254
+ if follower_stats_df.empty and merged_posts_df.empty and mentions_df.empty:
255
+ logger.error("All input DataFrames are empty. Aborting orchestration.")
256
+ return None
257
+
258
+ logger.info("Orchestrator starting generate_full_analysis_and_tasks...")
259
+ results = await orchestrator.generate_full_analysis_and_tasks(
260
+ follower_stats_df=follower_stats_df,
261
+ post_df=merged_posts_df,
262
+ mentions_df=mentions_df
263
+ )
264
+ return results
265
+
266
+ orchestration_results = asyncio.run(main_orchestration())
267
+
268
+ # --- Output Results ---
269
+ if orchestration_results:
270
+ print("\n\n" + "="*30 + " COMPREHENSIVE ANALYSIS REPORT " + "="*30)
271
+ print(orchestration_results.get("comprehensive_analysis_report", "Report not generated."))
272
+
273
+ print("\n\n" + "="*30 + " ACTIONABLE TASKS (OKRs) " + "="*30)
274
+ okrs_data = orchestration_results.get("actionable_okrs_and_tasks")
275
+ if okrs_data:
276
+ # okrs_data is already a dict from .model_dump()
277
+ print(json.dumps(okrs_data, indent=2))
278
+ else:
279
+ print("No actionable tasks (OKRs) generated or an error occurred.")
280
+
281
+ print("\n\n" + "="*30 + " DETAILED AGENT METRICS " + "="*30)
282
+ detailed_metrics = orchestration_results.get("detailed_metrics", {})
283
+ for agent_name, metrics_dict in detailed_metrics.items():
284
+ print(f"\n--- {agent_name.replace('_', ' ').title()} Metrics ---")
285
+ if metrics_dict:
286
+ print(json.dumps(metrics_dict, indent=2, default=str)) # default=str for any non-serializable types
287
+ else:
288
+ print("Metrics not available for this agent.")
289
+ else:
290
+ logger.info("Orchestration did not produce results (likely due to empty input data).")
291
+
292
+ logger.info("Orchestration example finished.")