Spaces:
Sleeping
Sleeping
""" | |
HTTP utility functions for making synchronous and asynchronous HTTP requests. | |
This module provides a unified interface for HTTP operations using httpx, | |
with proper error handling, timeout configuration, and retry logic. | |
""" | |
import asyncio | |
import os | |
import time | |
import uuid | |
from typing import Any, Dict, Optional | |
import httpx | |
from utils.bio_logger import bio_logger as logger | |
class HTTPError(Exception): | |
"""Custom exception for HTTP-related errors.""" | |
def __init__(self, status_code: int, message: str, url: str): | |
self.status_code = status_code | |
self.message = message | |
self.url = url | |
super().__init__(f"HTTP {status_code}: {message} for {url}") | |
def _create_timeout(timeout: float = 10.0) -> httpx.Timeout: | |
"""Create a timeout configuration for HTTP requests.""" | |
return httpx.Timeout(timeout, connect=5.0) | |
def _handle_response(response: httpx.Response, url: str) -> Any: | |
"""Handle HTTP response and raise appropriate exceptions.""" | |
if response.status_code == 200: | |
return response.json() | |
logger.error(f"HTTP request failed: {response.status_code} for {url}") | |
raise HTTPError( | |
status_code=response.status_code, | |
message=f"Request failed with status {response.status_code}", | |
url=url, | |
) | |
async def async_http_get( | |
url: str, | |
params: Optional[Dict[str, Any]] = None, | |
timeout: float = 10.0, | |
headers: Optional[Dict[str, str]] = None, | |
) -> Any: | |
""" | |
Make an asynchronous HTTP GET request. | |
Args: | |
url: The URL to make the request to | |
params: Query parameters to include in the request | |
timeout: Request timeout in seconds | |
headers: Optional headers to include in the request | |
Returns: | |
The JSON response from the server | |
Raises: | |
HTTPError: If the request fails | |
httpx.RequestError: If there's a network error | |
""" | |
timeout_config = _create_timeout(timeout) | |
start_time = time.time() | |
async with httpx.AsyncClient(timeout=timeout_config) as client: | |
response = await client.get(url=url, params=params, headers=headers) | |
duration = time.time() - start_time | |
# Log the API call | |
logger.log_api_call("GET", url, response.status_code, duration) | |
return _handle_response(response, url) | |
def http_get( | |
url: str, | |
params: Optional[Dict[str, Any]] = None, | |
timeout: float = 10.0, | |
headers: Optional[Dict[str, str]] = None, | |
) -> Any: | |
""" | |
Make a synchronous HTTP GET request. | |
Args: | |
url: The URL to make the request to | |
params: Query parameters to include in the request | |
timeout: Request timeout in seconds | |
headers: Optional headers to include in the request | |
Returns: | |
The JSON response from the server | |
Raises: | |
HTTPError: If the request fails | |
httpx.RequestError: If there's a network error | |
""" | |
timeout_config = _create_timeout(timeout) | |
start_time = time.time() | |
with httpx.Client(timeout=timeout_config) as client: | |
response = client.get(url=url, params=params, headers=headers) | |
duration = time.time() - start_time | |
# Log the API call | |
logger.log_api_call("GET", url, response.status_code, duration) | |
return _handle_response(response, url) | |
def http_post( | |
url: str, data: Any, headers: Optional[Dict[str, Any]] = None, timeout: float = 10.0 | |
) -> Any: | |
""" | |
Make a synchronous HTTP POST request. | |
Args: | |
url: The URL to make the request to | |
data: The data to send in the request body | |
headers: Optional headers to include in the request | |
timeout: Request timeout in seconds | |
Returns: | |
The JSON response from the server | |
Raises: | |
HTTPError: If the request fails | |
httpx.RequestError: If there's a network error | |
""" | |
timeout_config = _create_timeout(timeout) | |
start_time = time.time() | |
with httpx.Client(timeout=timeout_config) as client: | |
response = client.post(url=url, json=data, headers=headers) | |
duration = time.time() - start_time | |
# Log the API call | |
logger.log_api_call("POST", url, response.status_code, duration) | |
return _handle_response(response, url) | |
async def async_http_post( | |
url: str, | |
data: Any, | |
headers: Optional[Dict[str, Any]] = None, | |
timeout: float = 10.0, | |
max_retries: int = 3, | |
retry_delay: float = 0.5, | |
) -> Any: | |
""" | |
Make an asynchronous HTTP POST request with retry logic. | |
Args: | |
url: The URL to make the request to | |
data: The data to send in the request body | |
headers: Optional headers to include in the request | |
timeout: Request timeout in seconds | |
max_retries: Maximum number of retry attempts | |
retry_delay: Delay between retries in seconds | |
Returns: | |
The JSON response from the server | |
Raises: | |
HTTPError: If the request fails after all retries | |
httpx.RequestError: If there's a network error | |
""" | |
timeout_config = _create_timeout(timeout) | |
async with httpx.AsyncClient(timeout=timeout_config) as client: | |
for attempt in range(1, max_retries + 1): | |
try: | |
start_time = time.time() | |
response = await client.post(url=url, json=data, headers=headers) | |
duration = time.time() - start_time | |
# Log the API call | |
logger.log_api_call("POST", url, response.status_code, duration) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
logger.error( | |
f"HTTP POST failed (attempt {attempt}/{max_retries}): " | |
f"{response.status_code} for {url}" | |
) | |
if attempt < max_retries: | |
await asyncio.sleep(retry_delay) | |
else: | |
raise HTTPError( | |
status_code=response.status_code, | |
message=f"Request failed after {max_retries} attempts", | |
url=url, | |
) | |
except httpx.RequestError as e: | |
logger.error(f"Network error on attempt {attempt}: {e}") | |
if attempt < max_retries: | |
await asyncio.sleep(retry_delay) | |
else: | |
raise HTTPError( | |
status_code=0, | |
message=f"Network error after {max_retries} attempts: {str(e)}", | |
url=url, | |
) from e | |
raise HTTPError( | |
status_code=0, | |
message=f"Failed to fetch data from {url} after {max_retries} attempts", | |
url=url, | |
) | |
def download_file( | |
file_url: str, directory_path: str, timeout: int = 60, verify_ssl: bool = True | |
) -> Optional[str]: | |
""" | |
Download a file from a URL to a local directory. | |
Args: | |
file_url: The URL of the file to download | |
directory_path: The directory to save the file in | |
timeout: Request timeout in seconds | |
verify_ssl: Whether to verify SSL certificates | |
Returns: | |
The path to the downloaded file, or None if download failed | |
""" | |
# Extract file extension from URL | |
file_extension = file_url.split(".")[-1].split("?")[0] # Remove query params | |
random_filename = f"{uuid.uuid4()}.{file_extension}" | |
# Create directory if it doesn't exist | |
os.makedirs(directory_path, exist_ok=True) | |
file_path = os.path.join(directory_path, random_filename) | |
try: | |
with httpx.Client(timeout=timeout, verify=verify_ssl) as client: | |
with client.stream("GET", file_url) as response: | |
if response.status_code == 200: | |
with open(file_path, "wb") as file: | |
for chunk in response.iter_bytes(chunk_size=8192): | |
file.write(chunk) | |
logger.info(f"Successfully downloaded file to {file_path}") | |
return file_path | |
else: | |
logger.error( | |
f"Download failed with status code: {response.status_code}" | |
) | |
return None | |
except httpx.TimeoutException: | |
logger.error("Download request timed out") | |
return None | |
except httpx.RequestError as e: | |
logger.error(f"Download request failed: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Unexpected error during download: {e}") | |
return None | |
# Backward compatibility functions | |
async def async_http_post_legacy(url: str, params: dict) -> Any: | |
""" | |
Legacy async HTTP POST function for backward compatibility. | |
This function maintains the old interface but uses the new implementation. | |
""" | |
return await async_http_post(url=url, data=params) | |