Spaces:
Running
Running
# bubble_api_calls.py | |
import os | |
import json | |
import requests | |
import pandas as pd | |
import logging | |
logger = logging.getLogger(__name__) | |
def fetch_linkedin_token_from_bubble(url_user_token_str: str): | |
""" | |
Fetches LinkedIn access token from Bubble.io API using the state value (url_user_token_str). | |
The token is expected in a 'Raw_text' field as a JSON string, which is then parsed. | |
Args: | |
url_user_token_str: The state value (token from URL) to query Bubble. | |
Returns: | |
tuple: (parsed_token_dict, status_message) | |
parsed_token_dict is the dictionary containing the token (e.g., {"access_token": "value"}) | |
or None if an error occurred or token not found. | |
status_message is a string describing the outcome of the API call. | |
""" | |
bubble_api_key = os.environ.get("Bubble_API") | |
if not bubble_api_key: | |
error_msg = "❌ Bubble API Error: The 'Bubble_API' environment variable is not set." | |
print(error_msg) | |
return None, error_msg | |
if not url_user_token_str or "not found" in url_user_token_str or "Could not access" in url_user_token_str: | |
status_msg = f"ℹ️ No valid user token from URL to query Bubble. ({url_user_token_str})" | |
print(status_msg) | |
return None, status_msg | |
base_url = "https://app.ingaze.ai/version-test/api/1.1/obj/Linkedin_access" | |
constraints = [{"key": "state", "constraint_type": "equals", "value": url_user_token_str}] | |
params = {'constraints': json.dumps(constraints)} | |
headers = {"Authorization": f"Bearer {bubble_api_key}"} | |
status_message = f"Attempting to fetch token from Bubble for state: {url_user_token_str}..." | |
print(status_message) | |
parsed_token_dict = None | |
response = None | |
try: | |
response = requests.get(base_url, params=params, headers=headers, timeout=15) | |
response.raise_for_status() | |
data = response.json() | |
results = data.get("response", {}).get("results", []) | |
if results: | |
raw_text_from_bubble = results[0].get("Raw_text", None) | |
if raw_text_from_bubble and isinstance(raw_text_from_bubble, str): | |
try: | |
temp_parsed_dict = json.loads(raw_text_from_bubble) | |
if isinstance(temp_parsed_dict, dict) and "access_token" in temp_parsed_dict: | |
parsed_token_dict = temp_parsed_dict # Successfully parsed and has access_token | |
status_message = f"✅ LinkedIn Token successfully fetched and parsed from Bubble 'Raw_text' for state: {url_user_token_str}" | |
elif isinstance(temp_parsed_dict, dict): | |
status_message = (f"⚠️ Bubble API: 'access_token' key missing in parsed 'Raw_text' dictionary for state: {url_user_token_str}. Parsed: {temp_parsed_dict}") | |
else: # Not a dict | |
status_message = (f"⚠️ Bubble API: 'Raw_text' field did not contain a valid JSON dictionary string. " | |
f"Content type: {type(raw_text_from_bubble)}, Value: {raw_text_from_bubble}") | |
except json.JSONDecodeError as e: | |
status_message = (f"⚠️ Bubble API: Error decoding 'Raw_text' JSON string: {e}. " | |
f"Content: {raw_text_from_bubble}") | |
elif raw_text_from_bubble: # It exists but is not a string | |
status_message = (f"⚠️ Bubble API: 'Raw_text' field was not a string. " | |
f"Type: {type(raw_text_from_bubble)}, Value: {raw_text_from_bubble}") | |
else: # Raw_text not found or null | |
status_message = (f"⚠️ Bubble API: Token field ('Raw_text') " | |
f"not found or is null in response for state: {url_user_token_str}. Result: {results[0]}") | |
else: # No results from Bubble for the given state | |
status_message = f"❌ Bubble API: No results found for state: {url_user_token_str}" | |
except requests.exceptions.HTTPError as http_err: | |
error_details = response.text if response else "No response content" | |
status_message = f"❌ Bubble API HTTP error: {http_err} - Response: {error_details}" | |
except requests.exceptions.Timeout: | |
status_message = "❌ Bubble API Request timed out." | |
except requests.exceptions.RequestException as req_err: | |
status_message = f"❌ Bubble API Request error: {req_err}" | |
except json.JSONDecodeError as json_err: # Error decoding the main Bubble response | |
error_details = response.text if response else "No response content" | |
status_message = f"❌ Bubble API main response JSON decode error: {json_err}. Response: {error_details}" | |
except Exception as e: | |
status_message = f"❌ An unexpected error occurred while fetching from Bubble: {str(e)}" | |
print(status_message) # Log the final status message | |
return parsed_token_dict | |
def fetch_linkedin_posts_data_from_bubble(constraint_value: str, data_type:str, constraint_key:str, constraint_type:str, additional_constraints: list = None): | |
bubble_api_key = os.environ.get("Bubble_API") | |
if not bubble_api_key: | |
error_msg = "❌ Bubble API Error: The 'Bubble_API' environment variable is not set." | |
print(error_msg) | |
return None, error_msg | |
base_url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}" | |
constraints = [{"key": constraint_key, "constraint_type": constraint_type, "value": constraint_value}] | |
if additional_constraints: | |
constraints.extend(additional_constraints) | |
params = {'constraints': json.dumps(constraints)} | |
headers = {"Authorization": f"Bearer {bubble_api_key}"} | |
status_message = f"Attempting to fetch posts from Bubble for urn: {constraint_key}..." | |
print(status_message) | |
try: | |
response = requests.get(base_url, params=params, headers=headers, timeout=15) | |
response.raise_for_status() | |
data = response.json() | |
results = data.get("response", {}).get("results", []) | |
if results: | |
df = pd.DataFrame(results) | |
print(f"Successfully retrieved {len(df)} posts.") | |
return df, None | |
else: | |
print("No posts found for the given org_urn.") | |
return pd.DataFrame(), None | |
except requests.exceptions.RequestException as e: | |
error_msg = f"❌ Bubble API Error: {str(e)}" | |
print(error_msg) | |
return None, error_msg | |
# def bulk_upload_to_bubble(data, data_type): | |
# api_token = os.environ.get("Bubble_API") | |
# url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}/bulk" | |
# headers = { | |
# "Authorization": f"Bearer {api_token}", | |
# "Content-Type": "text/plain" | |
# } | |
# # Convert list of dicts to newline-separated JSON strings | |
# payload = "\n".join(json.dumps(item) for item in data) | |
# response = requests.post(url, headers=headers, data=payload) | |
# print("Payload being sent:") | |
# print(payload) | |
# if response.status_code == 200: | |
# print(f"Successfully uploaded {len(data)} records to {data_type}.") | |
# else: | |
# print(f"Failed to upload data to {data_type}. Status Code: {response.status_code}, Response: {response.text}") | |
#versione f49ffdd ultima che funzionava per upload dati linkedin | |
def bulk_upload_to_bubble(data, data_type): | |
""" | |
Uploads a list of dictionaries to a specified Bubble data type using the bulk endpoint. | |
Args: | |
data (list): A list of dictionaries, where each dictionary represents a record. | |
data_type (str): The name of the Bubble data type (table) to upload to. | |
Returns: | |
list: A list of dictionaries (each containing an 'id') for the created records if successful. | |
bool: False if the upload fails. | |
""" | |
api_token = os.environ.get("Bubble_API") | |
if not api_token: | |
logger.error("Bubble_API environment variable not set.") | |
return False | |
url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}/bulk" | |
headers = { | |
"Authorization": f"Bearer {api_token}", | |
"Content-Type": "text/plain" | |
} | |
payload = "\n".join(json.dumps(item) for item in data) | |
logging.info(f"Sending bulk payload to Bubble data type: {data_type}") | |
try: | |
response = requests.post(url, headers=headers, data=payload.encode('utf-8')) | |
response.raise_for_status() | |
# FIX: Handle the newline-delimited JSON response from Bubble. | |
response_text = response.text.strip() | |
if not response_text: | |
logger.warning(f"Upload to {data_type} was successful but returned an empty response.") | |
return [] # Return an empty list for success with no content | |
created_records = [] | |
for line in response_text.splitlines(): | |
if line: # Ensure the line is not empty | |
created_records.append(json.loads(line)) | |
logging.info(f"Successfully uploaded {len(created_records)} records to {data_type}.") | |
return created_records | |
except requests.exceptions.HTTPError as http_err: | |
logger.error(f"HTTP error occurred: {http_err}") | |
logger.error(f"Failed to upload data to {data_type}. Status Code: {response.status_code}, Response: {response.text}") | |
return False | |
except json.JSONDecodeError as json_err: | |
# This error is what you were seeing. We log it in case the format changes again. | |
logger.error(f"JSON decoding failed: {json_err}. Response text: {response.text}") | |
return False | |
except Exception as err: | |
logger.error(f"An other error occurred: {err}", exc_info=True) | |
return False | |
def update_record_in_bubble(table_name, record_id, payload_to_update): | |
""" | |
Updates an existing record in a Bubble.io table using a PATCH request. | |
Args: | |
table_name (str): The name of the Bubble table (e.g., "User", "Product"). | |
record_id (str): The unique ID of the record to update. | |
payload_to_update (dict): A dictionary where keys are field names (slugs) | |
and values are the new values for those fields. | |
Returns: | |
bool: True if the update was successful, False otherwise. | |
""" | |
bubble_api_key = os.environ.get("Bubble_API") | |
if not record_id: | |
logging.error(f"Record ID is missing. Cannot update record in Bubble table '{table_name}'.") | |
return False | |
if not payload_to_update: | |
logging.warning(f"Payload to update is empty for record_id '{record_id}' in Bubble table '{table_name}'. Nothing to update.") | |
# Consider this a success as there's no error, just no action. | |
# Depending on desired behavior, you might return False or raise an error. | |
return True | |
# Construct the API endpoint for a specific record | |
# Example: https://<app_name>.bubbleapps.io/api/1.1/obj/<table_name>/<record_id> | |
api_endpoint = f"https://app.ingaze.ai/version-test/api/1.1/obj/{table_name}/{record_id}" | |
headers = { | |
"Authorization": f"Bearer {bubble_api_key}", | |
"Content-Type": "application/json" | |
} | |
logging.debug(f"Attempting to update record '{record_id}' in table '{table_name}' at endpoint '{api_endpoint}' with payload: {payload_to_update}") | |
try: | |
response = requests.patch(api_endpoint, json=payload_to_update, headers=headers) | |
response.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX) | |
logging.info(f"Successfully updated record '{record_id}' in Bubble table '{table_name}'.") | |
return True | |
except requests.exceptions.HTTPError as http_err: | |
# Log more details from the response if available | |
error_details = "" | |
try: | |
error_details = response.json() # Bubble often returns JSON errors | |
except ValueError: # If response is not JSON | |
error_details = response.text | |
logging.error(f"HTTP error occurred while updating record '{record_id}' in '{table_name}': {http_err}. Response: {error_details}") | |
except requests.exceptions.RequestException as req_err: | |
logging.error(f"Request exception occurred while updating record '{record_id}' in '{table_name}': {req_err}") | |
except Exception as e: | |
logging.error(f"An unexpected error occurred while updating record '{record_id}' in '{table_name}': {e}", exc_info=True) | |
return False | |