LinkedinMonitor / services /state_manager.py
GuglielmoTor's picture
Update services/state_manager.py
5b325ca verified
# services/state_manager.py
"""
Manages the application state by loading all necessary, pre-processed data
from Bubble.io. This includes the LinkedIn token, organizational data (posts,
mentions, stats), and the results of any pre-computed AI analysis.
"""
import pandas as pd
import logging
import os
# Assuming Bubble_API_Calls contains the necessary fetch functions
from apis.Bubble_API_Calls import (
fetch_linkedin_token_from_bubble,
fetch_linkedin_posts_data_from_bubble # Generic fetch function
)
# Assuming config.py contains all necessary constants
from config import (
LINKEDIN_CLIENT_ID_ENV_VAR,
BUBBLE_POSTS_TABLE_NAME,
BUBBLE_POST_STATS_TABLE_NAME,
BUBBLE_MENTIONS_TABLE_NAME,
BUBBLE_FOLLOWER_STATS_TABLE_NAME,
BUBBLE_OPERATIONS_LOG_TABLE_NAME # Kept for potential display/logging purposes
)
# The report_data_handler is responsible for fetching the already-computed AI analysis
from .report_data_handler import fetch_latest_agentic_analysis
def check_token_status(token_state):
"""Checks the status of the LinkedIn token."""
return "βœ… Token available" if token_state and token_state.get("token") else "❌ Token not available"
def load_data_from_bubble(url_user_token, org_urn, current_state):
"""
Fetches the LinkedIn token and all relevant pre-processed data (posts, mentions,
follower stats, and agentic analysis results) from Bubble.io.
This function assumes data is populated in Bubble by an external process. It
only retrieves the data for display and does not trigger any syncing or
data processing.
"""
logging.info(f"Loading all data from Bubble for Org URN: '{org_urn}'")
# Initialize a new state, preserving the core structure from the old state
new_state = current_state.copy() if current_state else {}
new_state.update({
"token": new_state.get("token"), # Preserve existing token if any
"client_id": new_state.get("client_id"),
"org_urn": org_urn,
"bubble_posts_df": pd.DataFrame(),
"bubble_post_stats_df": pd.DataFrame(),
"bubble_mentions_df": pd.DataFrame(),
"bubble_follower_stats_df": pd.DataFrame(),
"bubble_operations_log_df": pd.DataFrame(),
"bubble_agentic_analysis_data": pd.DataFrame(),
"url_user_token_temp_storage": url_user_token
})
# 1. Get Client ID from environment
client_id = os.environ.get(LINKEDIN_CLIENT_ID_ENV_VAR)
new_state["client_id"] = client_id if client_id else "ENV VAR MISSING"
if not client_id:
logging.error(f"CRITICAL ERROR: '{LINKEDIN_CLIENT_ID_ENV_VAR}' environment variable not set.")
# 2. Fetch LinkedIn Access Token from Bubble
if url_user_token and "not found" not in url_user_token and "Could not access" not in url_user_token:
logging.info("Attempting to fetch LinkedIn token from Bubble.")
try:
parsed_linkedin_token = fetch_linkedin_token_from_bubble(url_user_token)
if isinstance(parsed_linkedin_token, dict) and "access_token" in parsed_linkedin_token:
new_state["token"] = parsed_linkedin_token
logging.info("βœ… LinkedIn Token successfully fetched from Bubble.")
else:
new_state["token"] = None
logging.warning(f"❌ Failed to fetch a valid LinkedIn token from Bubble. Response: {parsed_linkedin_token}")
except Exception as e:
new_state["token"] = None
logging.error(f"❌ Exception while fetching LinkedIn token from Bubble: {e}", exc_info=True)
else:
new_state["token"] = None
logging.info("No valid URL user token provided for LinkedIn token fetch.")
# 3. Fetch all data tables from Bubble if an Org URN is present
if org_urn:
data_tables_to_fetch = {
"bubble_posts_df": BUBBLE_POSTS_TABLE_NAME,
"bubble_post_stats_df": BUBBLE_POST_STATS_TABLE_NAME,
"bubble_mentions_df": BUBBLE_MENTIONS_TABLE_NAME,
"bubble_follower_stats_df": BUBBLE_FOLLOWER_STATS_TABLE_NAME,
"bubble_operations_log_df": BUBBLE_OPERATIONS_LOG_TABLE_NAME,
}
for state_key, table_name in data_tables_to_fetch.items():
logging.info(f"Attempting to fetch '{table_name}' from Bubble for org_urn: {org_urn}")
try:
fetched_df, error_message = fetch_linkedin_posts_data_from_bubble(constraint_value=org_urn, data_type=table_name, constraint_key="organization_urn", constraint_type="equals")
new_state[state_key] = pd.DataFrame() if error_message or fetched_df is None else fetched_df
if error_message:
logging.warning(f"Error fetching '{table_name}' from Bubble: {error_message}.")
else:
logging.info(f"βœ… Successfully fetched {len(new_state[state_key])} records for '{table_name}'.")
except Exception as e:
logging.error(f"❌ Exception while fetching '{table_name}' from Bubble: {e}.", exc_info=True)
new_state[state_key] = pd.DataFrame()
# 4. Fetch the pre-computed Agentic Analysis data
logging.info(f"Attempting to fetch agentic analysis data from Bubble for org_urn: {org_urn}")
all_analysis_data, error = fetch_latest_agentic_analysis(org_urn)
if error:
logging.warning(f"Error fetching agentic analysis data: {error}")
new_state["bubble_agentic_analysis_data"] = pd.DataFrame()
elif all_analysis_data is not None and not all_analysis_data.empty:
new_state["bubble_agentic_analysis_data"] = all_analysis_data
logging.info(f"βœ… Successfully fetched {len(all_analysis_data)} records for agentic analysis.")
else:
new_state["bubble_agentic_analysis_data"] = pd.DataFrame()
logging.info("No agentic analysis data found in Bubble for this org.")
else:
logging.warning("Org URN not available in state. Cannot fetch any data from Bubble.")
for key in ["bubble_posts_df", "bubble_post_stats_df", "bubble_mentions_df", "bubble_follower_stats_df", "bubble_operations_log_df", "bubble_agentic_analysis_data"]:
new_state[key] = pd.DataFrame()
token_status_message = check_token_status(new_state)
logging.info(f"Data loading from Bubble complete. Status: {token_status_message}.")
# This function now only returns the status message and the updated state.
# The sync button logic has been removed.
return token_status_message, new_state