LinkedinMonitor / services /report_data_handler.py
GuglielmoTor's picture
Update services/report_data_handler.py
de146fb verified
raw
history blame
3.01 kB
# services/report_data_handler.py
import pandas as pd
import logging
from apis.Bubble_API_Calls import fetch_linkedin_posts_data_from_bubble, bulk_upload_to_bubble
from config import (
BUBBLE_REPORT_TABLE_NAME,
BUBBLE_OKR_TABLE_NAME,
BUBBLE_KEY_RESULTS_TABLE_NAME,
BUBBLE_TASKS_TABLE_NAME,
BUBBLE_KR_UPDATE_TABLE_NAME,
)
import json # For handling JSON data
logger = logging.getLogger(__name__)
def fetch_latest_agentic_analysis(org_urn: str):
"""
Fetches all agentic analysis data for a given org_urn from Bubble.
Returns the full dataframe and any error, or None, None.
"""
if not org_urn:
logger.warning("fetch_latest_agentic_analysis: org_urn is missing.")
return None, None
report_data_df, error = fetch_linkedin_posts_data_from_bubble(
data_type=BUBBLE_REPORT_TABLE_NAME,
org_urn=org_urn
)
if error or report_data_df is None or report_data_df.empty:
logger.info(f"No existing agentic analysis found in Bubble for org_urn {org_urn} or error: {error}")
return None, None
logger.info(f"Agentic analysis data fetched for org_urn {org_urn}")
return report_data_df, None # Return full dataframe and no error
def save_agentic_analysis_results(
org_urn: str,
report_markdown: str,
orchestration_raw_results: dict, # This is the full output from agentic pipeline
# key_results_for_selection: list, # Pass this too if you want to save it
generation_timestamp: pd.Timestamp
):
"""Saves the agentic pipeline results to Bubble."""
if not org_urn:
logger.error("Cannot save agentic results: org_urn missing.")
return False
# Extract what you need from orchestration_raw_results
actionable_okrs_dict = orchestration_raw_results.get("actionable_okrs_and_tasks", {})
# Assuming key_results_for_selection is also part of orchestration_raw_results or passed separately
key_results_selection_list = orchestration_raw_results.get("key_results_for_selection", [])
payload = {
"org_urn": org_urn,
BUBBLE_AGENTIC_TIMESTAMP_COLUMN: generation_timestamp.isoformat(), # Store as ISO string
"report_markdown": report_markdown if report_markdown else "N/A",
"okrs_json": json.dumps(actionable_okrs_dict) if actionable_okrs_dict else json.dumps({}),
"key_results_selection_json": json.dumps(key_results_selection_list) if key_results_selection_list else json.dumps([]),
# Add other fields as needed
}
logger.info(f"Attempting to save agentic analysis to Bubble for org_urn: {org_urn}")
success_ids = bulk_upload_to_bubble([payload], BUBBLE_AGENTIC_RESULTS_TABLE_NAME)
if success_ids and success_ids[0]: # bulk_upload_to_bubble returns list of IDs or False
logger.info(f"Successfully saved agentic analysis to Bubble. Record ID: {success_ids[0]}")
return True
else:
logger.error("Failed to save agentic analysis to Bubble.")
return False