LinkedinMonitor / apis /Bubble_API_Calls.py
GuglielmoTor's picture
Update apis/Bubble_API_Calls.py
28bee4a verified
raw
history blame
14.7 kB
# bubble_api_calls.py
import os
import json
import requests
import pandas as pd
import logging
logger = logging.getLogger(__name__)
def fetch_linkedin_token_from_bubble(url_user_token_str: str):
"""
Fetches LinkedIn access token from Bubble.io API using the state value (url_user_token_str).
The token is expected in a 'Raw_text' field as a JSON string, which is then parsed.
Args:
url_user_token_str: The state value (token from URL) to query Bubble.
Returns:
tuple: (parsed_token_dict, status_message)
parsed_token_dict is the dictionary containing the token (e.g., {"access_token": "value"})
or None if an error occurred or token not found.
status_message is a string describing the outcome of the API call.
"""
bubble_api_key = os.environ.get("Bubble_API")
if not bubble_api_key:
error_msg = "❌ Bubble API Error: The 'Bubble_API' environment variable is not set."
print(error_msg)
return None, error_msg
if not url_user_token_str or "not found" in url_user_token_str or "Could not access" in url_user_token_str:
status_msg = f"ℹ️ No valid user token from URL to query Bubble. ({url_user_token_str})"
print(status_msg)
return None, status_msg
base_url = "https://app.ingaze.ai/version-test/api/1.1/obj/Linkedin_access"
constraints = [{"key": "state", "constraint_type": "equals", "value": url_user_token_str}]
params = {'constraints': json.dumps(constraints)}
headers = {"Authorization": f"Bearer {bubble_api_key}"}
status_message = f"Attempting to fetch token from Bubble for state: {url_user_token_str}..."
print(status_message)
parsed_token_dict = None
response = None
try:
response = requests.get(base_url, params=params, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
results = data.get("response", {}).get("results", [])
if results:
raw_text_from_bubble = results[0].get("Raw_text", None)
if raw_text_from_bubble and isinstance(raw_text_from_bubble, str):
try:
temp_parsed_dict = json.loads(raw_text_from_bubble)
if isinstance(temp_parsed_dict, dict) and "access_token" in temp_parsed_dict:
parsed_token_dict = temp_parsed_dict # Successfully parsed and has access_token
status_message = f"✅ LinkedIn Token successfully fetched and parsed from Bubble 'Raw_text' for state: {url_user_token_str}"
elif isinstance(temp_parsed_dict, dict):
status_message = (f"⚠️ Bubble API: 'access_token' key missing in parsed 'Raw_text' dictionary for state: {url_user_token_str}. Parsed: {temp_parsed_dict}")
else: # Not a dict
status_message = (f"⚠️ Bubble API: 'Raw_text' field did not contain a valid JSON dictionary string. "
f"Content type: {type(raw_text_from_bubble)}, Value: {raw_text_from_bubble}")
except json.JSONDecodeError as e:
status_message = (f"⚠️ Bubble API: Error decoding 'Raw_text' JSON string: {e}. "
f"Content: {raw_text_from_bubble}")
elif raw_text_from_bubble: # It exists but is not a string
status_message = (f"⚠️ Bubble API: 'Raw_text' field was not a string. "
f"Type: {type(raw_text_from_bubble)}, Value: {raw_text_from_bubble}")
else: # Raw_text not found or null
status_message = (f"⚠️ Bubble API: Token field ('Raw_text') "
f"not found or is null in response for state: {url_user_token_str}. Result: {results[0]}")
else: # No results from Bubble for the given state
status_message = f"❌ Bubble API: No results found for state: {url_user_token_str}"
except requests.exceptions.HTTPError as http_err:
error_details = response.text if response else "No response content"
status_message = f"❌ Bubble API HTTP error: {http_err} - Response: {error_details}"
except requests.exceptions.Timeout:
status_message = "❌ Bubble API Request timed out."
except requests.exceptions.RequestException as req_err:
status_message = f"❌ Bubble API Request error: {req_err}"
except json.JSONDecodeError as json_err: # Error decoding the main Bubble response
error_details = response.text if response else "No response content"
status_message = f"❌ Bubble API main response JSON decode error: {json_err}. Response: {error_details}"
except Exception as e:
status_message = f"❌ An unexpected error occurred while fetching from Bubble: {str(e)}"
print(status_message) # Log the final status message
return parsed_token_dict
def fetch_linkedin_posts_data_from_bubble(
constraint_value: str,
data_type: str,
constraint_key: str,
constraint_type: str,
additional_constraints: list = None
):
"""
Fetches data from the Bubble.io Data API, handling pagination to retrieve all results.
Args:
constraint_value: The value to match in the constraint.
data_type: The Bubble data type (table name) to query.
constraint_key: The field key for the constraint.
constraint_type: The type of constraint (e.g., 'equals', 'contains').
additional_constraints: A list of additional constraint dictionaries.
Returns:
A tuple containing a pandas DataFrame with all results and an error message string.
If successful, the error message is None.
If an error occurs, the DataFrame is None.
"""
bubble_api_key = os.environ.get("Bubble_API")
if not bubble_api_key:
error_msg = "❌ Bubble API Error: The 'Bubble_API' environment variable is not set."
print(error_msg)
return None, error_msg
base_url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}"
headers = {"Authorization": f"Bearer {bubble_api_key}"}
# --- Main Constraint Setup ---
constraints = [{"key": constraint_key, "constraint_type": constraint_type, "value": constraint_value}]
if additional_constraints:
constraints.extend(additional_constraints)
# --- Pagination Logic ---
all_results = []
cursor = 0 # Start at the beginning
print(f"Attempting to fetch data from Bubble for {constraint_key} = {constraint_value}...")
while True: # Loop until all pages are fetched
# Parameters for the API call, including constraints and pagination info
params = {
'constraints': json.dumps(constraints),
'cursor': cursor,
'limit': 100 # Fetch up to 100 items per request (the max)
}
try:
print(f"DEBUG: Requesting URL: {base_url}")
print(f"DEBUG: Request PARAMS: {params}")
# --- Make the API Request ---
response = requests.get(base_url, params=params, headers=headers, timeout=30)
response.raise_for_status() # Raises an HTTPError for bad responses (4xx or 5xx)
data = response.json().get("response", {})
results_on_page = data.get("results", [])
if results_on_page:
all_results.extend(results_on_page)
print(f"Retrieved {len(results_on_page)} results on this page. Total so far: {len(all_results)}.")
# --- Check if there are more pages ---
# The API returns the number of results remaining. If 0, we're done.
remaining = data.get("remaining", 0)
if remaining == 0:
print("No more pages to fetch.")
break # Exit the while loop
# --- Update cursor for the next page ---
# The new cursor is the current cursor + number of items just received.
cursor += len(results_on_page)
except requests.exceptions.RequestException as e:
error_msg = f"❌ Bubble API Error: {str(e)}"
print(error_msg)
return None, error_msg
# --- Final Processing ---
if all_results:
df = pd.DataFrame(all_results)
print(f"✅ Successfully retrieved a total of {len(df)} posts.")
return df, None
else:
print("No posts found for the given constraints.")
# Return an empty DataFrame if nothing was found
return pd.DataFrame(), None
# def bulk_upload_to_bubble(data, data_type):
# api_token = os.environ.get("Bubble_API")
# url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}/bulk"
# headers = {
# "Authorization": f"Bearer {api_token}",
# "Content-Type": "text/plain"
# }
# # Convert list of dicts to newline-separated JSON strings
# payload = "\n".join(json.dumps(item) for item in data)
# response = requests.post(url, headers=headers, data=payload)
# print("Payload being sent:")
# print(payload)
# if response.status_code == 200:
# print(f"Successfully uploaded {len(data)} records to {data_type}.")
# else:
# print(f"Failed to upload data to {data_type}. Status Code: {response.status_code}, Response: {response.text}")
#versione f49ffdd ultima che funzionava per upload dati linkedin
def bulk_upload_to_bubble(data, data_type):
"""
Uploads a list of dictionaries to a specified Bubble data type using the bulk endpoint.
Args:
data (list): A list of dictionaries, where each dictionary represents a record.
data_type (str): The name of the Bubble data type (table) to upload to.
Returns:
list: A list of dictionaries (each containing an 'id') for the created records if successful.
bool: False if the upload fails.
"""
api_token = os.environ.get("Bubble_API")
if not api_token:
logger.error("Bubble_API environment variable not set.")
return False
url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}/bulk"
headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "text/plain"
}
payload = "\n".join(json.dumps(item) for item in data)
logging.info(f"Sending bulk payload to Bubble data type: {data_type}")
try:
response = requests.post(url, headers=headers, data=payload.encode('utf-8'))
response.raise_for_status()
# FIX: Handle the newline-delimited JSON response from Bubble.
response_text = response.text.strip()
if not response_text:
logger.warning(f"Upload to {data_type} was successful but returned an empty response.")
return [] # Return an empty list for success with no content
created_records = []
for line in response_text.splitlines():
if line: # Ensure the line is not empty
created_records.append(json.loads(line))
logging.info(f"Successfully uploaded {len(created_records)} records to {data_type}.")
return created_records
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error occurred: {http_err}")
logger.error(f"Failed to upload data to {data_type}. Status Code: {response.status_code}, Response: {response.text}")
return False
except json.JSONDecodeError as json_err:
# This error is what you were seeing. We log it in case the format changes again.
logger.error(f"JSON decoding failed: {json_err}. Response text: {response.text}")
return False
except Exception as err:
logger.error(f"An other error occurred: {err}", exc_info=True)
return False
def update_record_in_bubble(table_name, record_id, payload_to_update):
"""
Updates an existing record in a Bubble.io table using a PATCH request.
Args:
table_name (str): The name of the Bubble table (e.g., "User", "Product").
record_id (str): The unique ID of the record to update.
payload_to_update (dict): A dictionary where keys are field names (slugs)
and values are the new values for those fields.
Returns:
bool: True if the update was successful, False otherwise.
"""
bubble_api_key = os.environ.get("Bubble_API")
if not record_id:
logging.error(f"Record ID is missing. Cannot update record in Bubble table '{table_name}'.")
return False
if not payload_to_update:
logging.warning(f"Payload to update is empty for record_id '{record_id}' in Bubble table '{table_name}'. Nothing to update.")
# Consider this a success as there's no error, just no action.
# Depending on desired behavior, you might return False or raise an error.
return True
# Construct the API endpoint for a specific record
# Example: https://<app_name>.bubbleapps.io/api/1.1/obj/<table_name>/<record_id>
api_endpoint = f"https://app.ingaze.ai/version-test/api/1.1/obj/{table_name}/{record_id}"
headers = {
"Authorization": f"Bearer {bubble_api_key}",
"Content-Type": "application/json"
}
logging.debug(f"Attempting to update record '{record_id}' in table '{table_name}' at endpoint '{api_endpoint}' with payload: {payload_to_update}")
try:
response = requests.patch(api_endpoint, json=payload_to_update, headers=headers)
response.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX)
logging.info(f"Successfully updated record '{record_id}' in Bubble table '{table_name}'.")
return True
except requests.exceptions.HTTPError as http_err:
# Log more details from the response if available
error_details = ""
try:
error_details = response.json() # Bubble often returns JSON errors
except ValueError: # If response is not JSON
error_details = response.text
logging.error(f"HTTP error occurred while updating record '{record_id}' in '{table_name}': {http_err}. Response: {error_details}")
except requests.exceptions.RequestException as req_err:
logging.error(f"Request exception occurred while updating record '{record_id}' in '{table_name}': {req_err}")
except Exception as e:
logging.error(f"An unexpected error occurred while updating record '{record_id}' in '{table_name}': {e}", exc_info=True)
return False