# bubble_api_calls.py import os import json import requests import pandas as pd import logging def fetch_linkedin_token_from_bubble(url_user_token_str: str): """ Fetches LinkedIn access token from Bubble.io API using the state value (url_user_token_str). The token is expected in a 'Raw_text' field as a JSON string, which is then parsed. Args: url_user_token_str: The state value (token from URL) to query Bubble. Returns: tuple: (parsed_token_dict, status_message) parsed_token_dict is the dictionary containing the token (e.g., {"access_token": "value"}) or None if an error occurred or token not found. status_message is a string describing the outcome of the API call. """ bubble_api_key = os.environ.get("Bubble_API") if not bubble_api_key: error_msg = "❌ Bubble API Error: The 'Bubble_API' environment variable is not set." print(error_msg) return None, error_msg if not url_user_token_str or "not found" in url_user_token_str or "Could not access" in url_user_token_str: status_msg = f"ℹ️ No valid user token from URL to query Bubble. ({url_user_token_str})" print(status_msg) return None, status_msg base_url = "https://app.ingaze.ai/version-test/api/1.1/obj/Linkedin_access" constraints = [{"key": "state", "constraint_type": "equals", "value": url_user_token_str}] params = {'constraints': json.dumps(constraints)} headers = {"Authorization": f"Bearer {bubble_api_key}"} status_message = f"Attempting to fetch token from Bubble for state: {url_user_token_str}..." print(status_message) parsed_token_dict = None response = None try: response = requests.get(base_url, params=params, headers=headers, timeout=15) response.raise_for_status() data = response.json() results = data.get("response", {}).get("results", []) if results: raw_text_from_bubble = results[0].get("Raw_text", None) if raw_text_from_bubble and isinstance(raw_text_from_bubble, str): try: temp_parsed_dict = json.loads(raw_text_from_bubble) if isinstance(temp_parsed_dict, dict) and "access_token" in temp_parsed_dict: parsed_token_dict = temp_parsed_dict # Successfully parsed and has access_token status_message = f"✅ LinkedIn Token successfully fetched and parsed from Bubble 'Raw_text' for state: {url_user_token_str}" elif isinstance(temp_parsed_dict, dict): status_message = (f"⚠️ Bubble API: 'access_token' key missing in parsed 'Raw_text' dictionary for state: {url_user_token_str}. Parsed: {temp_parsed_dict}") else: # Not a dict status_message = (f"⚠️ Bubble API: 'Raw_text' field did not contain a valid JSON dictionary string. " f"Content type: {type(raw_text_from_bubble)}, Value: {raw_text_from_bubble}") except json.JSONDecodeError as e: status_message = (f"⚠️ Bubble API: Error decoding 'Raw_text' JSON string: {e}. " f"Content: {raw_text_from_bubble}") elif raw_text_from_bubble: # It exists but is not a string status_message = (f"⚠️ Bubble API: 'Raw_text' field was not a string. " f"Type: {type(raw_text_from_bubble)}, Value: {raw_text_from_bubble}") else: # Raw_text not found or null status_message = (f"⚠️ Bubble API: Token field ('Raw_text') " f"not found or is null in response for state: {url_user_token_str}. Result: {results[0]}") else: # No results from Bubble for the given state status_message = f"❌ Bubble API: No results found for state: {url_user_token_str}" except requests.exceptions.HTTPError as http_err: error_details = response.text if response else "No response content" status_message = f"❌ Bubble API HTTP error: {http_err} - Response: {error_details}" except requests.exceptions.Timeout: status_message = "❌ Bubble API Request timed out." except requests.exceptions.RequestException as req_err: status_message = f"❌ Bubble API Request error: {req_err}" except json.JSONDecodeError as json_err: # Error decoding the main Bubble response error_details = response.text if response else "No response content" status_message = f"❌ Bubble API main response JSON decode error: {json_err}. Response: {error_details}" except Exception as e: status_message = f"❌ An unexpected error occurred while fetching from Bubble: {str(e)}" print(status_message) # Log the final status message return parsed_token_dict def fetch_linkedin_posts_data_from_bubble(org_urn: str, data_type:str): bubble_api_key = os.environ.get("Bubble_API") if not bubble_api_key: error_msg = "❌ Bubble API Error: The 'Bubble_API' environment variable is not set." print(error_msg) return None, error_msg base_url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}" constraints = [{"key": "organization_urn", "constraint_type": "equals", "value": org_urn}] params = {'constraints': json.dumps(constraints)} headers = {"Authorization": f"Bearer {bubble_api_key}"} status_message = f"Attempting to fetch posts from Bubble for urn: {org_urn}..." print(status_message) try: response = requests.get(base_url, params=params, headers=headers, timeout=15) response.raise_for_status() data = response.json() results = data.get("response", {}).get("results", []) if results: df = pd.DataFrame(results) print(f"Successfully retrieved {len(df)} posts.") return df, None else: print("No posts found for the given org_urn.") return pd.DataFrame(), None except requests.exceptions.RequestException as e: error_msg = f"❌ Bubble API Error: {str(e)}" print(error_msg) return None, error_msg # def bulk_upload_to_bubble(data, data_type): # api_token = os.environ.get("Bubble_API") # url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}/bulk" # headers = { # "Authorization": f"Bearer {api_token}", # "Content-Type": "text/plain" # } # # Convert list of dicts to newline-separated JSON strings # payload = "\n".join(json.dumps(item) for item in data) # response = requests.post(url, headers=headers, data=payload) # print("Payload being sent:") # print(payload) # if response.status_code == 200: # print(f"Successfully uploaded {len(data)} records to {data_type}.") # else: # print(f"Failed to upload data to {data_type}. Status Code: {response.status_code}, Response: {response.text}") #versione f49ffdd ultima che funzionava per upload dati linkedin def bulk_upload_to_bubble(data, data_type): """ Uploads a list of dictionaries to a specified Bubble data type using the bulk endpoint. Args: data (list): A list of dictionaries, where each dictionary represents a record. data_type (str): The name of the Bubble data type (table) to upload to. Returns: list: A list of unique Bubble IDs for the created records if successful. bool: False if the upload fails. """ api_token = os.environ.get("Bubble_API") if not api_token: logger.error("Bubble_API environment variable not set.") return False url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}/bulk" headers = { "Authorization": f"Bearer {api_token}", # The bulk endpoint expects newline-delimited JSON objects with a text/plain Content-Type "Content-Type": "text/plain" } # Convert the list of dictionaries into a single string, # with each JSON object separated by a newline character. payload = "\n".join(json.dumps(item) for item in data) logging.info(f"Sending bulk payload to Bubble data type: {data_type}") # print("Payload being sent:\n" + payload) # Uncomment for deep debugging try: response = requests.post(url, headers=headers, data=payload.encode('utf-8')) # Raise an exception for bad status codes (4xx or 5xx) response.raise_for_status() # If the request was successful (200 OK), Bubble returns a JSON array of the new record IDs. # We parse the JSON response to get this list. created_ids = response.json() logging.info(f"Successfully uploaded {len(data)} records to {data_type}.") return created_ids except requests.exceptions.HTTPError as http_err: logger.error(f"HTTP error occurred: {http_err}") logger.error(f"Failed to upload data to {data_type}. Status Code: {response.status_code}, Response: {response.text}") return False except Exception as err: logger.error(f"An other error occurred: {err}") return False def update_record_in_bubble(table_name, record_id, payload_to_update): """ Updates an existing record in a Bubble.io table using a PATCH request. Args: table_name (str): The name of the Bubble table (e.g., "User", "Product"). record_id (str): The unique ID of the record to update. payload_to_update (dict): A dictionary where keys are field names (slugs) and values are the new values for those fields. Returns: bool: True if the update was successful, False otherwise. """ bubble_api_key = os.environ.get("Bubble_API") if not record_id: logging.error(f"Record ID is missing. Cannot update record in Bubble table '{table_name}'.") return False if not payload_to_update: logging.warning(f"Payload to update is empty for record_id '{record_id}' in Bubble table '{table_name}'. Nothing to update.") # Consider this a success as there's no error, just no action. # Depending on desired behavior, you might return False or raise an error. return True # Construct the API endpoint for a specific record # Example: https://.bubbleapps.io/api/1.1/obj// api_endpoint = f"https://app.ingaze.ai/version-test/api/1.1/obj/{table_name}/{record_id}" headers = { "Authorization": f"Bearer {bubble_api_key}", "Content-Type": "application/json" } logging.debug(f"Attempting to update record '{record_id}' in table '{table_name}' at endpoint '{api_endpoint}' with payload: {payload_to_update}") try: response = requests.patch(api_endpoint, json=payload_to_update, headers=headers) response.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX) logging.info(f"Successfully updated record '{record_id}' in Bubble table '{table_name}'.") return True except requests.exceptions.HTTPError as http_err: # Log more details from the response if available error_details = "" try: error_details = response.json() # Bubble often returns JSON errors except ValueError: # If response is not JSON error_details = response.text logging.error(f"HTTP error occurred while updating record '{record_id}' in '{table_name}': {http_err}. Response: {error_details}") except requests.exceptions.RequestException as req_err: logging.error(f"Request exception occurred while updating record '{record_id}' in '{table_name}': {req_err}") except Exception as e: logging.error(f"An unexpected error occurred while updating record '{record_id}' in '{table_name}': {e}", exc_info=True) return False