GuglielmoTor commited on
Commit
dc94b38
·
verified ·
1 Parent(s): a9e9029

Update features/insight_and_tasks/orchestrators/linkedin_analytics_orchestrator.py

Browse files
features/insight_and_tasks/orchestrators/linkedin_analytics_orchestrator.py CHANGED
@@ -1,9 +1,9 @@
1
  # orchestrators/linkedin_analytics_orchestrator.py
2
  import pandas as pd
3
  import logging
4
- from typing import Dict, Any, Optional
5
- from datetime import date, datetime # For TaskExtractionAgent date
6
- from dataclasses import asdict # For converting AgentMetrics to dict if needed for final output
7
  import os
8
 
9
  os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
@@ -11,289 +11,115 @@ GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
11
  os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
12
 
13
  # Project-specific imports
14
- from features.insight_and_tasks.utils.pandasai_setup import configure_pandasai # Centralized PandasAI config
15
  from features.insight_and_tasks.coordinators.employer_branding_coordinator import EnhancedEmployerBrandingCoordinator
16
  from features.insight_and_tasks.agents.task_extraction_agent import TaskExtractionAgent
17
- from features.insight_and_tasks.data_models.metrics import AgentMetrics # For type hinting
18
- from features.insight_and_tasks.data_models.tasks import TaskExtractionOutput # For type hinting
19
  from features.insight_and_tasks.agents.task_extraction_model import extract_tasks_from_text
20
 
21
- # Configure logger for this module
22
  logger = logging.getLogger(__name__)
23
 
24
  class EnhancedLinkedInAnalyticsOrchestrator:
25
  """
26
- Orchestrates the end-to-end LinkedIn analytics process, from data input through
27
- specialized agent analysis, coordinator synthesis, and actionable task extraction.
28
  """
29
 
30
  def __init__(self, api_key: str, llm_model_name: Optional[str] = None, current_date_for_tasks: Optional[date] = None):
31
- """
32
- Initializes the orchestrator.
33
-
34
- Args:
35
- api_key: The API key for Google services (used by PandasAI and LlmAgents).
36
- llm_model_name: Optional. The primary LLM model name to be used by agents.
37
- Specific agents/coordinator might override with their defaults if not set.
38
- current_date_for_tasks: Optional. The date to be used by TaskExtractionAgent for quarter calculations. Defaults to today.
39
- """
40
  self.api_key = api_key
41
- self.llm_model_name = llm_model_name # Can be passed down or agents use their defaults
42
 
43
- # Configure PandasAI globally at the start of orchestration.
44
- # Pass the model_name if specified, otherwise pandasai_setup might use its own default.
45
  try:
46
  configure_pandasai(api_key=self.api_key, model_name=self.llm_model_name)
47
  logger.info(f"PandasAI configured by orchestrator with model hint: {self.llm_model_name or 'default'}.")
48
  except Exception as e:
49
  logger.error(f"Failed to configure PandasAI in orchestrator: {e}", exc_info=True)
50
- # Decide if this is a critical failure or if agents can proceed (they might try to reconfigure)
51
 
52
- # Initialize the coordinator, which in turn initializes its specialized agents.
53
- # Pass the model_name hint to the coordinator.
54
  self.coordinator = EnhancedEmployerBrandingCoordinator(api_key=self.api_key, model_name=self.llm_model_name)
55
-
56
- # Initialize the TaskExtractionAgent.
57
- # It uses its own default model unless overridden here.
58
  self.task_extractor = TaskExtractionAgent(
59
  api_key=self.api_key,
60
- model_name=self.llm_model_name, # Pass model hint
61
- current_date=current_date_for_tasks # Defaults to today if None
62
  )
63
  logger.info("EnhancedLinkedInAnalyticsOrchestrator initialized.")
64
 
65
- async def generate_full_analysis_and_tasks(
66
  self,
67
  follower_stats_df: pd.DataFrame,
68
  post_df: pd.DataFrame,
69
  mentions_df: pd.DataFrame
70
- ) -> Dict[str, Any]:
71
  """
72
- Executes the full pipeline: agent analyses, coordinator synthesis, and task extraction.
73
-
74
- Args:
75
- follower_stats_df: DataFrame containing follower statistics.
76
- post_df: DataFrame containing post performance data.
77
- mentions_df: DataFrame containing brand mentions data.
78
-
79
- Returns:
80
- A dictionary containing the comprehensive analysis text, actionable tasks (OKRs),
81
- and the detailed metrics from each specialized agent.
82
  """
83
- logger.info("Starting full analysis and task generation pipeline...")
84
 
85
- # Step 1: Get analyses and metrics from specialized agents.
86
- # The coordinator's internal agents are used here.
87
  logger.info("Running follower analysis...")
88
  follower_agent_metrics: AgentMetrics = self.coordinator.follower_agent.analyze_follower_data(follower_stats_df)
89
- logger.info(f"Follower analysis complete. Summary: {follower_agent_metrics.analysis_summary[:100]}...")
90
 
91
  logger.info("Running post performance analysis...")
92
  post_agent_metrics: AgentMetrics = self.coordinator.post_agent.analyze_post_data(post_df)
93
- logger.info(f"Post analysis complete. Summary: {post_agent_metrics.analysis_summary[:100]}...")
94
 
95
  logger.info("Running mentions analysis...")
96
  mentions_agent_metrics: AgentMetrics = self.coordinator.mentions_agent.analyze_mentions_data(mentions_df)
97
- logger.info(f"Mentions analysis complete. Summary: {mentions_agent_metrics.analysis_summary[:100]}...")
98
 
99
- # Step 2: Coordinator synthesizes these metrics into a comprehensive analysis text.
100
  logger.info("Running coordinator for synthesis...")
101
  comprehensive_analysis_text: str = await self.coordinator.generate_comprehensive_analysis(
102
  follower_agent_metrics, post_agent_metrics, mentions_agent_metrics
103
  )
104
  logger.info(f"Coordinator synthesis complete. Report length: {len(comprehensive_analysis_text)} chars.")
105
- if not comprehensive_analysis_text or comprehensive_analysis_text.startswith("Error"):
106
- logger.error(f"Coordinator synthesis failed or produced an error message: {comprehensive_analysis_text}")
107
- # Potentially stop here or proceed with task extraction on whatever text was generated.
108
 
109
- # Step 3: TaskExtractionAgent extracts actionable tasks (OKRs) from the comprehensive text.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
  logger.info("Running task extraction...")
111
- #actionable_tasks_okrs: TaskExtractionOutput = await self.task_extractor.extract_tasks(comprehensive_analysis_text)
112
  actionable_tasks_okrs: TaskExtractionOutput = extract_tasks_from_text(comprehensive_analysis_text, GOOGLE_API_KEY)
113
  logger.info(f"Task extraction complete. Number of OKRs: {len(actionable_tasks_okrs.okrs) if actionable_tasks_okrs else 'Error'}.")
114
 
115
- # Step 4: Compile and return all results.
116
- # Convert Pydantic/dataclass objects to dicts for easier JSON serialization if the final output needs it.
117
- # The `actionable_tasks_okrs` is already a Pydantic model, which can be serialized with .model_dump() / .json().
118
- # `AgentMetrics` are dataclasses, use `asdict`.
119
-
120
  final_results = {
121
  "comprehensive_analysis_report": comprehensive_analysis_text,
122
- "actionable_okrs_and_tasks": actionable_tasks_okrs.model_dump() if actionable_tasks_okrs else None, # Pydantic v2
123
- # "actionable_okrs_and_tasks": actionable_tasks_okrs.dict() if actionable_tasks_okrs else None, # Pydantic v1
124
  "detailed_metrics": {
125
  "follower_agent": asdict(follower_agent_metrics) if follower_agent_metrics else None,
126
  "post_agent": asdict(post_agent_metrics) if post_agent_metrics else None,
127
  "mentions_agent": asdict(mentions_agent_metrics) if mentions_agent_metrics else None,
128
- }
 
129
  }
130
- logger.info("Full analysis and task generation pipeline finished successfully.")
131
- return final_results
132
-
133
- # Example usage (similar to the original script's main execution block)
134
- if __name__ == '__main__':
135
- import asyncio
136
- import os
137
- from utils.logging_config import setup_logging
138
- from utils.data_fetching import fetch_linkedin_data_from_bubble, VALID_DATA_TYPES
139
-
140
- setup_logging() # Configure logging for the application
141
-
142
- # --- Configuration ---
143
- # Attempt to get API key from environment variable
144
- # IMPORTANT: Set GOOGLE_API_KEY and BUBBLE_API_KEY in your environment for this to run.
145
- GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
146
- BUBBLE_API_KEY_ENV = os.environ.get("BUBBLE_API_KEY") # Used by data_fetching
147
-
148
- if not GOOGLE_API_KEY:
149
- logger.critical("GOOGLE_API_KEY environment variable not set. Orchestrator cannot initialize LLM agents.")
150
- exit(1)
151
- if not BUBBLE_API_KEY_ENV: # data_fetching will also check, but good to note here
152
- logger.warning("BUBBLE_API_KEY environment variable not set. Data fetching from Bubble will fail.")
153
- # You might want to exit or use mock data if Bubble is essential.
154
-
155
- # Set the Google Vertex AI environment variable if not using Vertex AI (as in original)
156
- os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
157
-
158
- # Orchestrator settings
159
- ORG_URN_EXAMPLE = "urn:li:organization:19010008" # Example, replace with actual
160
- # Specify a model or let orchestrator/agents use their defaults
161
- # LLM_MODEL_FOR_ORCHESTRATION = "gemini-2.5-flash-preview-05-20" # Example: use a powerful model
162
- LLM_MODEL_FOR_ORCHESTRATION = None # Let agents use their defaults or pass a specific one
163
-
164
- # --- Initialize Orchestrator ---
165
- orchestrator = EnhancedLinkedInAnalyticsOrchestrator(
166
- api_key=GOOGLE_API_KEY,
167
- llm_model_name=LLM_MODEL_FOR_ORCHESTRATION,
168
- current_date_for_tasks=datetime.utcnow().date() # Use today for task planning
169
- )
170
-
171
- # --- Data Fetching ---
172
- logger.info(f"Fetching data for organization URN: {ORG_URN_EXAMPLE}")
173
-
174
- # Helper to fetch and log
175
- def get_data(data_type: VALID_DATA_TYPES, org_urn: str) -> pd.DataFrame:
176
- df, error = fetch_linkedin_data_from_bubble(org_urn=org_urn, data_type=data_type)
177
- if error:
178
- logger.error(f"Error fetching {data_type}: {error}. Using empty DataFrame.")
179
- return pd.DataFrame()
180
- if df is None: # Should not happen if error is None, but as a safeguard
181
- logger.warning(f"Fetched {data_type} is None but no error reported. Using empty DataFrame.")
182
- return pd.DataFrame()
183
- logger.info(f"Successfully fetched {data_type} with {len(df)} rows.")
184
- return df
185
-
186
- follower_stats_df_raw = get_data("li_follower_stats", ORG_URN_EXAMPLE)
187
- posts_df_raw = get_data("LI_posts", ORG_URN_EXAMPLE) # Contains post content, media_type, etc.
188
- mentions_df_raw = get_data("Li_mentions", ORG_URN_EXAMPLE)
189
- post_stats_df_raw = get_data("LI_post_stats", ORG_URN_EXAMPLE) # Contains engagement numbers for posts
190
 
191
- # --- Data Preprocessing/Merging (as in original example) ---
192
-
193
- # Select relevant columns for follower_stats_df
194
- if not follower_stats_df_raw.empty:
195
- follower_stats_df = follower_stats_df_raw[[
196
- 'category_name', "follower_count_organic", "follower_count_paid", "follower_count_type"
197
- ]].copy()
198
- else:
199
- follower_stats_df = pd.DataFrame() # Ensure it's an empty DF if raw is empty
200
-
201
- # Merge posts_df and post_stats_df
202
- # This logic assumes 'id' in posts_df_raw and 'post_id' in post_stats_df_raw
203
- merged_posts_df = pd.DataFrame()
204
- if not posts_df_raw.empty and not post_stats_df_raw.empty:
205
- if 'id' in posts_df_raw.columns and 'post_id' in post_stats_df_raw.columns:
206
- # Ensure 'id' in posts_df_raw is unique before merge if it's a left table key
207
- # posts_df_raw.drop_duplicates(subset=['id'], keep='first', inplace=True)
208
- merged_posts_df = pd.merge(posts_df_raw, post_stats_df_raw, left_on='id', right_on='post_id', how='left', suffixes=('', '_stats'))
209
- logger.info(f"Merged posts_df ({len(posts_df_raw)}) and post_stats_df ({len(post_stats_df_raw)}) into merged_posts_df ({len(merged_posts_df)}).")
210
- else:
211
- logger.warning("Cannot merge posts_df and post_stats_df due to missing 'id' or 'post_id'. Using posts_df_raw.")
212
- merged_posts_df = posts_df_raw.copy() # Fallback to posts_df_raw
213
- elif not posts_df_raw.empty:
214
- logger.info("post_stats_df is empty. Using posts_df_raw for post analysis.")
215
- merged_posts_df = posts_df_raw.copy()
216
- else:
217
- logger.warning("Both posts_df_raw and post_stats_df_raw are empty.")
218
- merged_posts_df = pd.DataFrame() # Empty DF
219
-
220
- # Select and ensure essential columns for merged_posts_df
221
- # These are columns expected by EnhancedPostPerformanceAgent
222
- expected_post_cols = [
223
- 'li_eb_label', 'media_type', 'is_ad', 'id', 'published_at', 'sentiment',
224
- 'engagement', 'impressionCount', 'clickCount', 'likeCount', 'commentCount', 'shareCount'
225
- ]
226
- if not merged_posts_df.empty:
227
- final_post_df_cols = {}
228
- for col in expected_post_cols:
229
- if col in merged_posts_df.columns:
230
- final_post_df_cols[col] = merged_posts_df[col]
231
- elif f"{col}_stats" in merged_posts_df.columns: # Check for suffixed columns from merge
232
- final_post_df_cols[col] = merged_posts_df[f"{col}_stats"]
233
- else:
234
- logger.debug(f"Expected column '{col}' not found in merged_posts_df. Will be created as empty/default by agent if needed.")
235
- # Agent preprocessing should handle missing columns by creating them with defaults (0 or 'Unknown')
236
-
237
- # Create the final DataFrame with only the selected/available columns
238
- # This ensures that if a column is missing, it doesn't cause an error here,
239
- # but the agent's preprocessing will handle it.
240
- # However, it's better to ensure they exist with NAs if the agent expects them.
241
- temp_post_df = pd.DataFrame(final_post_df_cols)
242
- # Ensure all expected columns are present, filling with NA if missing from selection
243
- for col in expected_post_cols:
244
- if col not in temp_post_df.columns:
245
- temp_post_df[col] = pd.NA # Or appropriate default like 0 for numeric, 'Unknown' for categorical
246
- merged_posts_df = temp_post_df[expected_post_cols].copy() # Ensure correct order and all columns
247
-
248
- else: # If merged_posts_df started empty and stayed empty
249
- merged_posts_df = pd.DataFrame(columns=expected_post_cols)
250
-
251
-
252
- # Mentions DataFrame - select relevant columns if necessary, or pass as is
253
- # Assuming mentions_df_raw is already in the correct shape or agent handles it.
254
- # For example, if it needs specific columns:
255
- # mentions_df = mentions_df_raw[['date', 'sentiment_label', 'mention_content']].copy() if not mentions_df_raw.empty else pd.DataFrame()
256
- mentions_df = mentions_df_raw.copy() # Pass as is, agent will preprocess
257
-
258
-
259
- # --- Run Orchestration ---
260
- async def main_orchestration():
261
- if follower_stats_df.empty and merged_posts_df.empty and mentions_df.empty:
262
- logger.error("All input DataFrames are empty. Aborting orchestration.")
263
- return None
264
-
265
- logger.info("Orchestrator starting generate_full_analysis_and_tasks...")
266
- results = await orchestrator.generate_full_analysis_and_tasks(
267
- follower_stats_df=follower_stats_df,
268
- post_df=merged_posts_df,
269
- mentions_df=mentions_df
270
- )
271
- return results
272
-
273
- orchestration_results = asyncio.run(main_orchestration())
274
-
275
- # --- Output Results ---
276
- if orchestration_results:
277
- print("\n\n" + "="*30 + " COMPREHENSIVE ANALYSIS REPORT " + "="*30)
278
- print(orchestration_results.get("comprehensive_analysis_report", "Report not generated."))
279
-
280
- print("\n\n" + "="*30 + " ACTIONABLE TASKS (OKRs) " + "="*30)
281
- okrs_data = orchestration_results.get("actionable_okrs_and_tasks")
282
- if okrs_data:
283
- # okrs_data is already a dict from .model_dump()
284
- print(json.dumps(okrs_data, indent=2))
285
- else:
286
- print("No actionable tasks (OKRs) generated or an error occurred.")
287
-
288
- print("\n\n" + "="*30 + " DETAILED AGENT METRICS " + "="*30)
289
- detailed_metrics = orchestration_results.get("detailed_metrics", {})
290
- for agent_name, metrics_dict in detailed_metrics.items():
291
- print(f"\n--- {agent_name.replace('_', ' ').title()} Metrics ---")
292
- if metrics_dict:
293
- print(json.dumps(metrics_dict, indent=2, default=str)) # default=str for any non-serializable types
294
- else:
295
- print("Metrics not available for this agent.")
296
- else:
297
- logger.info("Orchestration did not produce results (likely due to empty input data).")
298
-
299
- logger.info("Orchestration example finished.")
 
1
  # orchestrators/linkedin_analytics_orchestrator.py
2
  import pandas as pd
3
  import logging
4
+ from typing import Dict, Any, Optional, AsyncGenerator
5
+ from datetime import date, datetime
6
+ from dataclasses import asdict
7
  import os
8
 
9
  os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "False"
 
11
  os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
12
 
13
  # Project-specific imports
14
+ from features.insight_and_tasks.utils.pandasai_setup import configure_pandasai
15
  from features.insight_and_tasks.coordinators.employer_branding_coordinator import EnhancedEmployerBrandingCoordinator
16
  from features.insight_and_tasks.agents.task_extraction_agent import TaskExtractionAgent
17
+ from features.insight_and_tasks.data_models.metrics import AgentMetrics
18
+ from features.insight_and_tasks.data_models.tasks import TaskExtractionOutput
19
  from features.insight_and_tasks.agents.task_extraction_model import extract_tasks_from_text
20
 
 
21
  logger = logging.getLogger(__name__)
22
 
23
  class EnhancedLinkedInAnalyticsOrchestrator:
24
  """
25
+ Orchestrates the end-to-end LinkedIn analytics process with streaming results.
 
26
  """
27
 
28
  def __init__(self, api_key: str, llm_model_name: Optional[str] = None, current_date_for_tasks: Optional[date] = None):
 
 
 
 
 
 
 
 
 
29
  self.api_key = api_key
30
+ self.llm_model_name = llm_model_name
31
 
 
 
32
  try:
33
  configure_pandasai(api_key=self.api_key, model_name=self.llm_model_name)
34
  logger.info(f"PandasAI configured by orchestrator with model hint: {self.llm_model_name or 'default'}.")
35
  except Exception as e:
36
  logger.error(f"Failed to configure PandasAI in orchestrator: {e}", exc_info=True)
 
37
 
 
 
38
  self.coordinator = EnhancedEmployerBrandingCoordinator(api_key=self.api_key, model_name=self.llm_model_name)
 
 
 
39
  self.task_extractor = TaskExtractionAgent(
40
  api_key=self.api_key,
41
+ model_name=self.llm_model_name,
42
+ current_date=current_date_for_tasks
43
  )
44
  logger.info("EnhancedLinkedInAnalyticsOrchestrator initialized.")
45
 
46
+ async def generate_full_analysis_and_tasks_streaming(
47
  self,
48
  follower_stats_df: pd.DataFrame,
49
  post_df: pd.DataFrame,
50
  mentions_df: pd.DataFrame
51
+ ) -> AsyncGenerator[Dict[str, Any], None]:
52
  """
53
+ Executes the full pipeline with streaming results.
54
+ Yields intermediate results as they become available.
 
 
 
 
 
 
 
 
55
  """
56
+ logger.info("Starting streaming analysis and task generation pipeline...")
57
 
58
+ # Step 1: Get analyses and metrics from specialized agents
 
59
  logger.info("Running follower analysis...")
60
  follower_agent_metrics: AgentMetrics = self.coordinator.follower_agent.analyze_follower_data(follower_stats_df)
61
+ logger.info(f"Follower analysis complete.")
62
 
63
  logger.info("Running post performance analysis...")
64
  post_agent_metrics: AgentMetrics = self.coordinator.post_agent.analyze_post_data(post_df)
65
+ logger.info(f"Post analysis complete.")
66
 
67
  logger.info("Running mentions analysis...")
68
  mentions_agent_metrics: AgentMetrics = self.coordinator.mentions_agent.analyze_mentions_data(mentions_df)
69
+ logger.info(f"Mentions analysis complete.")
70
 
71
+ # Step 2: Coordinator synthesizes these metrics into a comprehensive analysis text
72
  logger.info("Running coordinator for synthesis...")
73
  comprehensive_analysis_text: str = await self.coordinator.generate_comprehensive_analysis(
74
  follower_agent_metrics, post_agent_metrics, mentions_agent_metrics
75
  )
76
  logger.info(f"Coordinator synthesis complete. Report length: {len(comprehensive_analysis_text)} chars.")
 
 
 
77
 
78
+ # Yield the report as soon as it's ready
79
+ partial_results = {
80
+ "comprehensive_analysis_report": comprehensive_analysis_text,
81
+ "actionable_okrs_and_tasks": None, # Not ready yet
82
+ "detailed_metrics": {
83
+ "follower_agent": asdict(follower_agent_metrics) if follower_agent_metrics else None,
84
+ "post_agent": asdict(post_agent_metrics) if post_agent_metrics else None,
85
+ "mentions_agent": asdict(mentions_agent_metrics) if mentions_agent_metrics else None,
86
+ },
87
+ "status": "report_ready" # Indicate what's available
88
+ }
89
+ logger.info("Yielding report results...")
90
+ yield partial_results
91
+
92
+ # Step 3: TaskExtractionAgent extracts actionable tasks (OKRs) from the comprehensive text
93
  logger.info("Running task extraction...")
 
94
  actionable_tasks_okrs: TaskExtractionOutput = extract_tasks_from_text(comprehensive_analysis_text, GOOGLE_API_KEY)
95
  logger.info(f"Task extraction complete. Number of OKRs: {len(actionable_tasks_okrs.okrs) if actionable_tasks_okrs else 'Error'}.")
96
 
97
+ # Yield the final complete results
 
 
 
 
98
  final_results = {
99
  "comprehensive_analysis_report": comprehensive_analysis_text,
100
+ "actionable_okrs_and_tasks": actionable_tasks_okrs.model_dump() if actionable_tasks_okrs else None,
 
101
  "detailed_metrics": {
102
  "follower_agent": asdict(follower_agent_metrics) if follower_agent_metrics else None,
103
  "post_agent": asdict(post_agent_metrics) if post_agent_metrics else None,
104
  "mentions_agent": asdict(mentions_agent_metrics) if mentions_agent_metrics else None,
105
+ },
106
+ "status": "complete" # Indicate everything is ready
107
  }
108
+ logger.info("Yielding final complete results...")
109
+ yield final_results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
 
111
+ # Keep the original method for backward compatibility
112
+ async def generate_full_analysis_and_tasks(
113
+ self,
114
+ follower_stats_df: pd.DataFrame,
115
+ post_df: pd.DataFrame,
116
+ mentions_df: pd.DataFrame
117
+ ) -> Dict[str, Any]:
118
+ """
119
+ Original method - returns complete results only when everything is done.
120
+ """
121
+ async for result in self.generate_full_analysis_and_tasks_streaming(follower_stats_df, post_df, mentions_df):
122
+ if result.get("status") == "complete":
123
+ return result
124
+ # Fallback if no complete result
125
+ return {"error": "Pipeline did not complete successfully"}