Spaces:
Running
Running
File size: 14,665 Bytes
6140a89 dec8c1d ca925bb 6140a89 ce630e0 a056cb8 6140a89 fc82495 a056cb8 54da3a4 49502e6 a056cb8 182a396 54da3a4 634b798 a2fb9ee 49502e6 54da3a4 28bee4a 54da3a4 49502e6 54da3a4 a056cb8 627b699 a056cb8 627b699 fc46cca 627b699 a056cb8 627b699 a056cb8 fc46cca a056cb8 9b2b6de 1ff33df 627b699 fc46cca 627b699 fc46cca 627b699 fc46cca 627b699 9b2b6de 284b445 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# bubble_api_calls.py
import os
import json
import requests
import pandas as pd
import logging
logger = logging.getLogger(__name__)
def fetch_linkedin_token_from_bubble(url_user_token_str: str):
"""
Fetches LinkedIn access token from Bubble.io API using the state value (url_user_token_str).
The token is expected in a 'Raw_text' field as a JSON string, which is then parsed.
Args:
url_user_token_str: The state value (token from URL) to query Bubble.
Returns:
tuple: (parsed_token_dict, status_message)
parsed_token_dict is the dictionary containing the token (e.g., {"access_token": "value"})
or None if an error occurred or token not found.
status_message is a string describing the outcome of the API call.
"""
bubble_api_key = os.environ.get("Bubble_API")
if not bubble_api_key:
error_msg = "β Bubble API Error: The 'Bubble_API' environment variable is not set."
print(error_msg)
return None, error_msg
if not url_user_token_str or "not found" in url_user_token_str or "Could not access" in url_user_token_str:
status_msg = f"βΉοΈ No valid user token from URL to query Bubble. ({url_user_token_str})"
print(status_msg)
return None, status_msg
base_url = "https://app.ingaze.ai/version-test/api/1.1/obj/Linkedin_access"
constraints = [{"key": "state", "constraint_type": "equals", "value": url_user_token_str}]
params = {'constraints': json.dumps(constraints)}
headers = {"Authorization": f"Bearer {bubble_api_key}"}
status_message = f"Attempting to fetch token from Bubble for state: {url_user_token_str}..."
print(status_message)
parsed_token_dict = None
response = None
try:
response = requests.get(base_url, params=params, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
results = data.get("response", {}).get("results", [])
if results:
raw_text_from_bubble = results[0].get("Raw_text", None)
if raw_text_from_bubble and isinstance(raw_text_from_bubble, str):
try:
temp_parsed_dict = json.loads(raw_text_from_bubble)
if isinstance(temp_parsed_dict, dict) and "access_token" in temp_parsed_dict:
parsed_token_dict = temp_parsed_dict # Successfully parsed and has access_token
status_message = f"β
LinkedIn Token successfully fetched and parsed from Bubble 'Raw_text' for state: {url_user_token_str}"
elif isinstance(temp_parsed_dict, dict):
status_message = (f"β οΈ Bubble API: 'access_token' key missing in parsed 'Raw_text' dictionary for state: {url_user_token_str}. Parsed: {temp_parsed_dict}")
else: # Not a dict
status_message = (f"β οΈ Bubble API: 'Raw_text' field did not contain a valid JSON dictionary string. "
f"Content type: {type(raw_text_from_bubble)}, Value: {raw_text_from_bubble}")
except json.JSONDecodeError as e:
status_message = (f"β οΈ Bubble API: Error decoding 'Raw_text' JSON string: {e}. "
f"Content: {raw_text_from_bubble}")
elif raw_text_from_bubble: # It exists but is not a string
status_message = (f"β οΈ Bubble API: 'Raw_text' field was not a string. "
f"Type: {type(raw_text_from_bubble)}, Value: {raw_text_from_bubble}")
else: # Raw_text not found or null
status_message = (f"β οΈ Bubble API: Token field ('Raw_text') "
f"not found or is null in response for state: {url_user_token_str}. Result: {results[0]}")
else: # No results from Bubble for the given state
status_message = f"β Bubble API: No results found for state: {url_user_token_str}"
except requests.exceptions.HTTPError as http_err:
error_details = response.text if response else "No response content"
status_message = f"β Bubble API HTTP error: {http_err} - Response: {error_details}"
except requests.exceptions.Timeout:
status_message = "β Bubble API Request timed out."
except requests.exceptions.RequestException as req_err:
status_message = f"β Bubble API Request error: {req_err}"
except json.JSONDecodeError as json_err: # Error decoding the main Bubble response
error_details = response.text if response else "No response content"
status_message = f"β Bubble API main response JSON decode error: {json_err}. Response: {error_details}"
except Exception as e:
status_message = f"β An unexpected error occurred while fetching from Bubble: {str(e)}"
print(status_message) # Log the final status message
return parsed_token_dict
def fetch_linkedin_posts_data_from_bubble(
constraint_value: str,
data_type: str,
constraint_key: str,
constraint_type: str,
additional_constraints: list = None
):
"""
Fetches data from the Bubble.io Data API, handling pagination to retrieve all results.
Args:
constraint_value: The value to match in the constraint.
data_type: The Bubble data type (table name) to query.
constraint_key: The field key for the constraint.
constraint_type: The type of constraint (e.g., 'equals', 'contains').
additional_constraints: A list of additional constraint dictionaries.
Returns:
A tuple containing a pandas DataFrame with all results and an error message string.
If successful, the error message is None.
If an error occurs, the DataFrame is None.
"""
bubble_api_key = os.environ.get("Bubble_API")
if not bubble_api_key:
error_msg = "β Bubble API Error: The 'Bubble_API' environment variable is not set."
print(error_msg)
return None, error_msg
base_url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}"
headers = {"Authorization": f"Bearer {bubble_api_key}"}
# --- Main Constraint Setup ---
constraints = [{"key": constraint_key, "constraint_type": constraint_type, "value": constraint_value}]
if additional_constraints:
constraints.extend(additional_constraints)
# --- Pagination Logic ---
all_results = []
cursor = 0 # Start at the beginning
print(f"Attempting to fetch data from Bubble for {constraint_key} = {constraint_value}...")
while True: # Loop until all pages are fetched
# Parameters for the API call, including constraints and pagination info
params = {
'constraints': json.dumps(constraints),
'cursor': cursor,
'limit': 100 # Fetch up to 100 items per request (the max)
}
try:
print(f"DEBUG: Requesting URL: {base_url}")
print(f"DEBUG: Request PARAMS: {params}")
# --- Make the API Request ---
response = requests.get(base_url, params=params, headers=headers, timeout=30)
response.raise_for_status() # Raises an HTTPError for bad responses (4xx or 5xx)
data = response.json().get("response", {})
results_on_page = data.get("results", [])
if results_on_page:
all_results.extend(results_on_page)
print(f"Retrieved {len(results_on_page)} results on this page. Total so far: {len(all_results)}.")
# --- Check if there are more pages ---
# The API returns the number of results remaining. If 0, we're done.
remaining = data.get("remaining", 0)
if remaining == 0:
print("No more pages to fetch.")
break # Exit the while loop
# --- Update cursor for the next page ---
# The new cursor is the current cursor + number of items just received.
cursor += len(results_on_page)
except requests.exceptions.RequestException as e:
error_msg = f"β Bubble API Error: {str(e)}"
print(error_msg)
return None, error_msg
# --- Final Processing ---
if all_results:
df = pd.DataFrame(all_results)
print(f"β
Successfully retrieved a total of {len(df)} posts.")
return df, None
else:
print("No posts found for the given constraints.")
# Return an empty DataFrame if nothing was found
return pd.DataFrame(), None
# def bulk_upload_to_bubble(data, data_type):
# api_token = os.environ.get("Bubble_API")
# url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}/bulk"
# headers = {
# "Authorization": f"Bearer {api_token}",
# "Content-Type": "text/plain"
# }
# # Convert list of dicts to newline-separated JSON strings
# payload = "\n".join(json.dumps(item) for item in data)
# response = requests.post(url, headers=headers, data=payload)
# print("Payload being sent:")
# print(payload)
# if response.status_code == 200:
# print(f"Successfully uploaded {len(data)} records to {data_type}.")
# else:
# print(f"Failed to upload data to {data_type}. Status Code: {response.status_code}, Response: {response.text}")
#versione f49ffdd ultima che funzionava per upload dati linkedin
def bulk_upload_to_bubble(data, data_type):
"""
Uploads a list of dictionaries to a specified Bubble data type using the bulk endpoint.
Args:
data (list): A list of dictionaries, where each dictionary represents a record.
data_type (str): The name of the Bubble data type (table) to upload to.
Returns:
list: A list of dictionaries (each containing an 'id') for the created records if successful.
bool: False if the upload fails.
"""
api_token = os.environ.get("Bubble_API")
if not api_token:
logger.error("Bubble_API environment variable not set.")
return False
url = f"https://app.ingaze.ai/version-test/api/1.1/obj/{data_type}/bulk"
headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "text/plain"
}
payload = "\n".join(json.dumps(item) for item in data)
logging.info(f"Sending bulk payload to Bubble data type: {data_type}")
try:
response = requests.post(url, headers=headers, data=payload.encode('utf-8'))
response.raise_for_status()
# FIX: Handle the newline-delimited JSON response from Bubble.
response_text = response.text.strip()
if not response_text:
logger.warning(f"Upload to {data_type} was successful but returned an empty response.")
return [] # Return an empty list for success with no content
created_records = []
for line in response_text.splitlines():
if line: # Ensure the line is not empty
created_records.append(json.loads(line))
logging.info(f"Successfully uploaded {len(created_records)} records to {data_type}.")
return created_records
except requests.exceptions.HTTPError as http_err:
logger.error(f"HTTP error occurred: {http_err}")
logger.error(f"Failed to upload data to {data_type}. Status Code: {response.status_code}, Response: {response.text}")
return False
except json.JSONDecodeError as json_err:
# This error is what you were seeing. We log it in case the format changes again.
logger.error(f"JSON decoding failed: {json_err}. Response text: {response.text}")
return False
except Exception as err:
logger.error(f"An other error occurred: {err}", exc_info=True)
return False
def update_record_in_bubble(table_name, record_id, payload_to_update):
"""
Updates an existing record in a Bubble.io table using a PATCH request.
Args:
table_name (str): The name of the Bubble table (e.g., "User", "Product").
record_id (str): The unique ID of the record to update.
payload_to_update (dict): A dictionary where keys are field names (slugs)
and values are the new values for those fields.
Returns:
bool: True if the update was successful, False otherwise.
"""
bubble_api_key = os.environ.get("Bubble_API")
if not record_id:
logging.error(f"Record ID is missing. Cannot update record in Bubble table '{table_name}'.")
return False
if not payload_to_update:
logging.warning(f"Payload to update is empty for record_id '{record_id}' in Bubble table '{table_name}'. Nothing to update.")
# Consider this a success as there's no error, just no action.
# Depending on desired behavior, you might return False or raise an error.
return True
# Construct the API endpoint for a specific record
# Example: https://<app_name>.bubbleapps.io/api/1.1/obj/<table_name>/<record_id>
api_endpoint = f"https://app.ingaze.ai/version-test/api/1.1/obj/{table_name}/{record_id}"
headers = {
"Authorization": f"Bearer {bubble_api_key}",
"Content-Type": "application/json"
}
logging.debug(f"Attempting to update record '{record_id}' in table '{table_name}' at endpoint '{api_endpoint}' with payload: {payload_to_update}")
try:
response = requests.patch(api_endpoint, json=payload_to_update, headers=headers)
response.raise_for_status() # Raises an HTTPError for bad responses (4XX or 5XX)
logging.info(f"Successfully updated record '{record_id}' in Bubble table '{table_name}'.")
return True
except requests.exceptions.HTTPError as http_err:
# Log more details from the response if available
error_details = ""
try:
error_details = response.json() # Bubble often returns JSON errors
except ValueError: # If response is not JSON
error_details = response.text
logging.error(f"HTTP error occurred while updating record '{record_id}' in '{table_name}': {http_err}. Response: {error_details}")
except requests.exceptions.RequestException as req_err:
logging.error(f"Request exception occurred while updating record '{record_id}' in '{table_name}': {req_err}")
except Exception as e:
logging.error(f"An unexpected error occurred while updating record '{record_id}' in '{table_name}': {e}", exc_info=True)
return False
|