# services/report_data_handler.py """ This module is responsible for fetching pre-computed agentic analysis data (reports, OKRs, etc.) from Bubble.io and reconstructing it into a nested dictionary format that the Gradio UI can easily display. """ import pandas as pd import logging from typing import Dict, Any, Optional, Tuple # This is the only function needed from the Bubble API module for this handler from apis.Bubble_API_Calls import fetch_linkedin_posts_data_from_bubble from config import ( BUBBLE_REPORT_TABLE_NAME, BUBBLE_OKR_TABLE_NAME, BUBBLE_KEY_RESULTS_TABLE_NAME, BUBBLE_TASKS_TABLE_NAME ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def fetch_latest_agentic_analysis(org_urn: str) -> Tuple[Optional[pd.DataFrame], Optional[str]]: """ Fetches all agentic analysis report data for a given org_urn from Bubble. This function is called once during the initial data load. """ logger.info(f"Fetching latest agentic analysis data from Bubble for org_urn: {org_urn}") if not org_urn: logger.warning("fetch_latest_agentic_analysis: org_urn is missing.") return None, "org_urn is missing." try: report_data_df, error = fetch_linkedin_posts_data_from_bubble( data_type=BUBBLE_REPORT_TABLE_NAME, constraint_value=org_urn, constraint_key='organization_urn', constraint_type='equals' ) if error: logger.error(f"Error fetching agentic reports from Bubble for org_urn {org_urn}: {error}") return None, str(error) if report_data_df is None or report_data_df.empty: logger.info(f"No existing agentic analysis found in Bubble for org_urn {org_urn}.") return pd.DataFrame(), None # Return empty DataFrame, no error logger.info(f"Successfully fetched {len(report_data_df)} agentic report records for org_urn {org_urn}") return report_data_df, None except Exception as e: logger.exception(f"An unexpected error occurred in fetch_latest_agentic_analysis for org_urn {org_urn}: {e}") return None, str(e) def fetch_and_reconstruct_data_from_bubble(report_series: pd.Series, session_cache: dict) -> Tuple[Optional[Dict[str, Any]], dict]: """ MODIFIED: Takes a pandas Series of a single report and a session-specific cache dictionary. It fetches all related child items from Bubble, reconstructs the full nested dictionary, and uses the cache to avoid redundant API calls. Args: report_series: A pandas Series representing a single report to be processed. session_cache: The session-specific cache dictionary from a Gradio State. Returns: A tuple containing: - The reconstructed data dictionary. - The updated session_cache dictionary. """ logger.info("Attempting to get or reconstruct data for a Bubble report using session cache.") if report_series is None or report_series.empty: logger.warning("Cannot reconstruct data, the provided report Series is empty.") return None, session_cache report_id = report_series.get('_id') if not report_id: logger.error("Fetched report series is missing a Bubble '_id', cannot reconstruct children.") return None, session_cache # --- CACHE CHECK --- if report_id in session_cache: logger.info(f"CACHE HIT: Found reconstructed data for report ID {report_id} in session cache.") return session_cache[report_id], session_cache logger.info(f"CACHE MISS: No data for report ID {report_id}. Starting reconstruction from Bubble.io.") try: # 1. Fetch all related OKRs using the report_id okrs_df, error = fetch_linkedin_posts_data_from_bubble( data_type=BUBBLE_OKR_TABLE_NAME, constraint_value=report_id, constraint_key='report', constraint_type='equals' ) if error: logger.error(f"Error fetching OKRs for report_id {report_id}: {error}") return None, session_cache # 2. Fetch all related Key Results using the OKR IDs okr_ids = okrs_df['_id'].tolist() if not okrs_df.empty else [] krs_df = pd.DataFrame() if okr_ids: krs_df, error = fetch_linkedin_posts_data_from_bubble( data_type=BUBBLE_KEY_RESULTS_TABLE_NAME, constraint_value=okr_ids, constraint_key='okr', constraint_type='in' ) if error: logger.error(f"Error fetching Key Results: {error}") # 3. Fetch all related Tasks using the Key Result IDs kr_ids = krs_df['_id'].tolist() if not krs_df.empty else [] tasks_df = pd.DataFrame() if kr_ids: tasks_df, error = fetch_linkedin_posts_data_from_bubble( data_type=BUBBLE_TASKS_TABLE_NAME, constraint_value=kr_ids, constraint_key='key_result', constraint_type='in' ) if error: logger.error(f"Error fetching Tasks: {error}") # 4. Reconstruct the nested dictionary tasks_by_kr_id = tasks_df.groupby('key_result').apply(lambda x: x.to_dict('records')).to_dict() if not tasks_df.empty else {} krs_by_okr_id = krs_df.groupby('okr').apply(lambda x: x.to_dict('records')).to_dict() if not krs_df.empty else {} reconstructed_okrs = [] if not okrs_df.empty: for okr_data in okrs_df.to_dict('records'): okr_id = okr_data['_id'] key_results_list = krs_by_okr_id.get(okr_id, []) for kr_data in key_results_list: kr_id = kr_data['_id'] kr_data['tasks'] = tasks_by_kr_id.get(kr_id, []) okr_data['key_results'] = key_results_list reconstructed_okrs.append(okr_data) # 5. Assemble the final payload for the UI actionable_okrs = {"okrs": reconstructed_okrs} final_reconstructed_data = { "report_str": report_series.get("report_text", "Report text not found."), "quarter": report_series.get("quarter"), "year": report_series.get("year"), "actionable_okrs": actionable_okrs, "report_id": report_id } # --- STORE IN SESSION CACHE --- session_cache[report_id] = final_reconstructed_data logger.info(f"Successfully reconstructed and cached data for report {report_id} in the current session.") return final_reconstructed_data, session_cache except Exception as e: logger.exception(f"An unexpected error occurred during data reconstruction: {e}") return None, session_cache