# services/report_data_handler.py import pandas as pd import logging from apis.Bubble_API_Calls import fetch_linkedin_posts_data_from_bubble, bulk_upload_to_bubble from config import ( BUBBLE_REPORT_TABLE_NAME, BUBBLE_OKR_TABLE_NAME, BUBBLE_KEY_RESULTS_TABLE_NAME, BUBBLE_TASKS_TABLE_NAME, BUBBLE_KR_UPDATE_TABLE_NAME, ) import json # For handling JSON data logger = logging.getLogger(__name__) def fetch_latest_agentic_analysis(org_urn: str): """ Fetches all agentic analysis data for a given org_urn from Bubble. Returns the full dataframe and any error, or None, None. """ if not org_urn: logger.warning("fetch_latest_agentic_analysis: org_urn is missing.") return None, None report_data_df, error = fetch_linkedin_posts_data_from_bubble( data_type=BUBBLE_REPORT_TABLE_NAME, org_urn=org_urn ) if error or report_data_df is None or report_data_df.empty: logger.info(f"No existing agentic analysis found in Bubble for org_urn {org_urn} or error: {error}") return None, None logger.info(f"Agentic analysis data fetched for org_urn {org_urn}") return report_data_df, None # Return full dataframe and no error def save_report_results( org_urn: str, report_markdown: str, quarter, year, report_type: str, ): """Saves the agentic pipeline results to Bubble.""" if not org_urn: logger.error("Cannot save agentic results: org_urn missing.") return False payload = { "organization_urn": org_urn, "report_text": report_markdown if report_markdown else "N/A", "quarter": quarter, "year": year, "report_type": report_type, } logger.info(f"Attempting to save agentic analysis to Bubble for org_urn: {org_urn}") success_ids = bulk_upload_to_bubble([payload], BUBBLE_REPORT_TABLE_NAME) if success_ids: # bulk_upload_to_bubble returns list of IDs or False logger.info(f"Successfully saved agentic analysis to Bubble. Record ID: {success_ids}") return success_ids['id'] else: logger.error(f"Failed to save agentic analysis to Bubble. {success_ids}") return False # --- Data Saving Functions --- def save_objectives( org_urn: str, report_id objectives_data: List[Dict[str, Any]] ) -> Optional[List[str]]: """ Saves Objective records to Bubble. Args: org_urn: The URN of the organization to associate these objectives with. objectives_data: A list of objective dictionaries from the main data structure. Returns: A list of the newly created Bubble record IDs for the objectives, or None on failure. """ if not objectives_data: logger.info("No objectives to save.") return [] payloads = [] for objective in objectives_data: payloads.append({ #"organization_urn": org_urn, "objective_description": objective.get("objective_description"), "objective_timeline": objective.get("objective_timeline"), "objective_owner": objective.get("objective_owner"), "report": report_id }) logger.info(f"Attempting to save {len(payloads)} objectives for org_urn: {org_urn}") objective_ids = bulk_upload_to_bubble(payloads, BUBBLE_OBJECTIVES_TABLE_NAME) if objective_ids is None: logger.error("Failed to save objectives to Bubble.") return None logger.info(f"Successfully saved {len(objective_ids)} objectives.") return objective_ids def save_key_results( org_urn: str, objectives_with_ids: List[Tuple[Dict[str, Any], str]] ) -> Optional[List[Tuple[Dict[str, Any], str]]]: """ Saves Key Result records to Bubble, linking them to their parent objectives. Args: org_urn: The URN of the organization. objectives_with_ids: A list of tuples, where each tuple contains an original objective dictionary and its new Bubble ID. Example: [(objective_dict, 'bubble_id_123'), ...] Returns: A list of tuples containing the original key result data and its new Bubble ID, or None on failure. This is needed to save the tasks in the next step. """ key_result_payloads = [] # This list preserves the original KR data in the correct order to match the returned IDs key_results_to_process = [] for objective_data, parent_objective_id in objectives_with_ids: for kr in objective_data.get("key_results", []): key_results_to_process.append(kr) key_result_payloads.append({ #"organization_urn": org_urn, "okr": parent_objective_id, # Link to parent "description": kr.get("key_result_description"), "target_metric": kr.get("target_metric"), "target_value": kr.get("target_value"), "kr_type": kr.get("key_result_type"), "data_subject": kr.get("data_subject"), }) if not key_result_payloads: logger.info("No key results to save.") return [] logger.info(f"Attempting to save {len(key_result_payloads)} key results for org_urn: {org_urn}") key_result_ids = bulk_upload_to_bubble(key_result_payloads, BUBBLE_KEY_RESULTS_TABLE_NAME) if key_result_ids is None: logger.error("Failed to save key results to Bubble.") return None logger.info(f"Successfully saved {len(key_result_ids)} key results.") # Combine the original KR data with their new IDs return list(zip(key_results_to_process, key_result_ids)) def save_tasks( org_urn: str, key_results_with_ids: List[Tuple[Dict[str, Any], str]] ) -> Optional[List[str]]: """ Saves Task records to Bubble, linking them to their parent key results. Args: org_urn: The URN of the organization. key_results_with_ids: A list of tuples from the save_key_results function. Example: [(key_result_dict, 'bubble_id_456'), ...] Returns: A list of the newly created Bubble record IDs for the tasks, or None on failure. """ task_payloads = [] for key_result_data, parent_key_result_id in key_results_with_ids: for task in key_result_data.get("tasks", []): task_payloads.append({ #"organization_urn": org_urn, "key_result": parent_key_result_id, # Link to parent "description": task.get("task_description"), "objective_deliverable": task.get("objective_deliverable"), "category": task.get("task_category"), #"task_type": task.get("task_type"), "priority": task.get("priority"), "priority_justification": task.get("priority_justification"), "effort": task.get("effort"), "timeline": task.get("timeline"), #"data_subject": task.get("data_subject"), "responsible_party": task.get("responsible_party"), "success_criteria_metrics": task.get("success_criteria_metrics"), "dependencies": task.get("dependencies_prerequisites"), "why": task.get("why_proposed"), }) if not task_payloads: logger.info("No tasks to save.") return [] logger.info(f"Attempting to save {len(task_payloads)} tasks for org_urn: {org_urn}") task_ids = bulk_upload_to_bubble(task_payloads, BUBBLE_TASKS_TABLE_NAME) if task_ids is None: logger.error("Failed to save tasks to Bubble.") return None logger.info(f"Successfully saved {len(task_ids)} tasks.") return task_ids # --- Orchestrator Function --- def save_actionable_okrs(org_urn: str, actionable_okrs: Dict[str, Any], report_id): """ Orchestrates the sequential saving of objectives, key results, and tasks. This function shows how to correctly call the individual save functions in the right order, passing the IDs from one step to the next. """ logger.info(f"--- Starting OKR save process for org_urn: {org_urn} ---") objectives_data = actionable_okrs.get("okrs", []) # Step 1: Save the top-level objectives objective_ids = save_objectives(org_urn, objectives_data, report_id) if objective_ids is None: logger.error("OKR save process aborted due to failure in saving objectives.") return # Combine the original objective data with their new IDs for the next step objectives_with_ids = list(zip(objectives_data, objective_ids)) # Step 2: Save the key results, linking them to the objectives key_results_with_ids = save_key_results(org_urn, objectives_with_ids) if key_results_with_ids is None: logger.error("OKR save process aborted due to failure in saving key results.") return # Step 3: Save the tasks, linking them to the key results save_tasks(org_urn, key_results_with_ids) logger.info(f"--- OKR save process completed for org_urn: {org_urn} ---")