LinkedinMonitor / services /report_data_handler.py
GuglielmoTor's picture
Update services/report_data_handler.py
bf7d3b2 verified
# services/report_data_handler.py
"""
This module is responsible for fetching pre-computed agentic analysis data
(reports, OKRs, etc.) from Bubble.io and reconstructing it into a nested
dictionary format that the Gradio UI can easily display.
"""
import pandas as pd
import logging
from typing import Dict, Any, Optional, Tuple
# This is the only function needed from the Bubble API module for this handler
from apis.Bubble_API_Calls import fetch_linkedin_posts_data_from_bubble
from config import (
BUBBLE_REPORT_TABLE_NAME,
BUBBLE_OKR_TABLE_NAME,
BUBBLE_KEY_RESULTS_TABLE_NAME,
BUBBLE_TASKS_TABLE_NAME
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def fetch_latest_agentic_analysis(org_urn: str) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
"""
Fetches all agentic analysis report data for a given org_urn from Bubble.
This function is called once during the initial data load.
"""
logger.info(f"Fetching latest agentic analysis data from Bubble for org_urn: {org_urn}")
if not org_urn:
logger.warning("fetch_latest_agentic_analysis: org_urn is missing.")
return None, "org_urn is missing."
try:
report_data_df, error = fetch_linkedin_posts_data_from_bubble(
data_type=BUBBLE_REPORT_TABLE_NAME,
constraint_value=org_urn,
constraint_key='organization_urn',
constraint_type='equals'
)
if error:
logger.error(f"Error fetching agentic reports from Bubble for org_urn {org_urn}: {error}")
return None, str(error)
if report_data_df is None or report_data_df.empty:
logger.info(f"No existing agentic analysis found in Bubble for org_urn {org_urn}.")
return pd.DataFrame(), None # Return empty DataFrame, no error
logger.info(f"Successfully fetched {len(report_data_df)} agentic report records for org_urn {org_urn}")
return report_data_df, None
except Exception as e:
logger.exception(f"An unexpected error occurred in fetch_latest_agentic_analysis for org_urn {org_urn}: {e}")
return None, str(e)
def fetch_and_reconstruct_data_from_bubble(report_series: pd.Series, session_cache: dict) -> Tuple[Optional[Dict[str, Any]], dict]:
"""
MODIFIED: Takes a pandas Series of a single report and a session-specific cache dictionary.
It fetches all related child items from Bubble, reconstructs the full nested dictionary,
and uses the cache to avoid redundant API calls.
Args:
report_series: A pandas Series representing a single report to be processed.
session_cache: The session-specific cache dictionary from a Gradio State.
Returns:
A tuple containing:
- The reconstructed data dictionary.
- The updated session_cache dictionary.
"""
logger.info("Attempting to get or reconstruct data for a Bubble report using session cache.")
if report_series is None or report_series.empty:
logger.warning("Cannot reconstruct data, the provided report Series is empty.")
return None, session_cache
report_id = report_series.get('_id')
if not report_id:
logger.error("Fetched report series is missing a Bubble '_id', cannot reconstruct children.")
return None, session_cache
# --- CACHE CHECK ---
if report_id in session_cache:
logger.info(f"CACHE HIT: Found reconstructed data for report ID {report_id} in session cache.")
return session_cache[report_id], session_cache
logger.info(f"CACHE MISS: No data for report ID {report_id}. Starting reconstruction from Bubble.io.")
try:
# 1. Fetch all related OKRs using the report_id
okrs_df, error = fetch_linkedin_posts_data_from_bubble(
data_type=BUBBLE_OKR_TABLE_NAME,
constraint_value=report_id,
constraint_key='report',
constraint_type='equals'
)
if error:
logger.error(f"Error fetching OKRs for report_id {report_id}: {error}")
return None, session_cache
# 2. Fetch all related Key Results using the OKR IDs
okr_ids = okrs_df['_id'].tolist() if not okrs_df.empty else []
krs_df = pd.DataFrame()
if okr_ids:
krs_df, error = fetch_linkedin_posts_data_from_bubble(
data_type=BUBBLE_KEY_RESULTS_TABLE_NAME,
constraint_value=okr_ids,
constraint_key='okr',
constraint_type='in'
)
if error: logger.error(f"Error fetching Key Results: {error}")
# 3. Fetch all related Tasks using the Key Result IDs
kr_ids = krs_df['_id'].tolist() if not krs_df.empty else []
tasks_df = pd.DataFrame()
if kr_ids:
tasks_df, error = fetch_linkedin_posts_data_from_bubble(
data_type=BUBBLE_TASKS_TABLE_NAME,
constraint_value=kr_ids,
constraint_key='key_result',
constraint_type='in'
)
if error: logger.error(f"Error fetching Tasks: {error}")
# 4. Reconstruct the nested dictionary
tasks_by_kr_id = tasks_df.groupby('key_result').apply(lambda x: x.to_dict('records')).to_dict() if not tasks_df.empty else {}
krs_by_okr_id = krs_df.groupby('okr').apply(lambda x: x.to_dict('records')).to_dict() if not krs_df.empty else {}
reconstructed_okrs = []
if not okrs_df.empty:
for okr_data in okrs_df.to_dict('records'):
okr_id = okr_data['_id']
key_results_list = krs_by_okr_id.get(okr_id, [])
for kr_data in key_results_list:
kr_id = kr_data['_id']
kr_data['tasks'] = tasks_by_kr_id.get(kr_id, [])
okr_data['key_results'] = key_results_list
reconstructed_okrs.append(okr_data)
# 5. Assemble the final payload for the UI
actionable_okrs = {"okrs": reconstructed_okrs}
final_reconstructed_data = {
"report_str": report_series.get("report_text", "Report text not found."),
"quarter": report_series.get("quarter"),
"year": report_series.get("year"),
"actionable_okrs": actionable_okrs,
"report_id": report_id
}
# --- STORE IN SESSION CACHE ---
session_cache[report_id] = final_reconstructed_data
logger.info(f"Successfully reconstructed and cached data for report {report_id} in the current session.")
return final_reconstructed_data, session_cache
except Exception as e:
logger.exception(f"An unexpected error occurred during data reconstruction: {e}")
return None, session_cache