|
"""API client functions for LLM interactions""" |
|
|
|
import os |
|
import time |
|
import requests |
|
import hashlib |
|
from functools import lru_cache |
|
from typing import Optional |
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
together_models = [ |
|
"Qwen/Qwen2.5-Coder-32B-Instruct", |
|
"nvidia/Llama-3.1-Nemotron-70B-Instruct-HF", |
|
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B", |
|
"meta-llama/Llama-3.3-70B-Instruct-Turbo-Free" |
|
] |
|
|
|
anthropic_models = [ |
|
"claude-3-7-sonnet-20250219", |
|
"claude-3-haiku-20240307", |
|
"claude-opus-4-20250514", |
|
"claude-sonnet-4-20250514" |
|
] |
|
|
|
all_models = together_models + anthropic_models |
|
|
|
def get_api_key(provider: str) -> str: |
|
"""Securely retrieve API key for the specified provider.""" |
|
try: |
|
if provider == "together": |
|
api_key = os.getenv("TOGETHER_API_KEY") |
|
if not api_key: |
|
raise ValueError("API key not configured. Please contact administrator.") |
|
return api_key |
|
elif provider == "anthropic": |
|
api_key = os.getenv("ANTHROPIC_API_KEY") |
|
if not api_key: |
|
raise ValueError("API key not configured. Please contact administrator.") |
|
return api_key |
|
else: |
|
raise ValueError(f"Unknown provider: {provider}") |
|
except Exception as e: |
|
logger.error(f"Error retrieving API key: {e}") |
|
raise |
|
|
|
def get_provider(model: str) -> str: |
|
"""Determine the provider for a given model.""" |
|
if model in together_models: |
|
return "together" |
|
elif model in anthropic_models: |
|
return "anthropic" |
|
else: |
|
raise ValueError(f"Unknown model: {model}") |
|
|
|
def call_api_with_retry(api_func, *args, max_retries: int = 3, timeout: int = 30, **kwargs): |
|
"""Call API with retry logic and timeout.""" |
|
from utils import handle_api_error |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
kwargs['timeout'] = timeout |
|
return api_func(*args, **kwargs) |
|
except requests.Timeout: |
|
if attempt == max_retries - 1: |
|
return "Request timed out. Please try again with a shorter input." |
|
except requests.ConnectionError: |
|
if attempt == max_retries - 1: |
|
return "Connection error. Please check your internet connection." |
|
except Exception as e: |
|
if attempt == max_retries - 1: |
|
return f"Error: {str(e)}" |
|
time.sleep(2 ** attempt) |
|
|
|
def call_together_api(model: str, prompt: str, temperature: float = 0.7, max_tokens: int = 1500) -> str: |
|
"""Call Together AI API with enhanced error handling.""" |
|
from utils import handle_api_error |
|
|
|
api_key = get_api_key("together") |
|
system_message = ( |
|
"You are a Salesforce B2B Commerce expert. Be CONCISE and PRECISE. " |
|
"Focus on CODE QUALITY over explanations. Use structured formats when requested. " |
|
"Always check for syntax errors, security issues, and performance problems." |
|
) |
|
|
|
def make_request(): |
|
headers = { |
|
"Authorization": f"Bearer {api_key}", |
|
"Content-Type": "application/json" |
|
} |
|
payload = { |
|
"model": model, |
|
"messages": [ |
|
{"role": "system", "content": system_message}, |
|
{"role": "user", "content": prompt} |
|
], |
|
"temperature": temperature, |
|
"max_tokens": max_tokens, |
|
"top_p": 0.9 |
|
} |
|
resp = requests.post( |
|
"https://api.together.xyz/v1/chat/completions", |
|
headers=headers, |
|
json=payload, |
|
timeout=30 |
|
) |
|
if resp.status_code != 200: |
|
return handle_api_error(resp.status_code, resp.text) |
|
data = resp.json() |
|
return data["choices"][0]["message"]["content"] |
|
|
|
return call_api_with_retry(make_request) |
|
|
|
def call_anthropic_api(model: str, prompt: str, temperature: float = 0.7, max_tokens: int = 1500) -> str: |
|
"""Call Anthropic API with enhanced error handling.""" |
|
from utils import handle_api_error |
|
|
|
api_key = get_api_key("anthropic") |
|
system_message = ( |
|
"You are a Salesforce B2B Commerce expert. Be CONCISE and PRECISE. " |
|
"Focus on CODE QUALITY over explanations. Use structured formats when requested. " |
|
"Always check for syntax errors, security issues, and performance problems." |
|
) |
|
|
|
def make_request(): |
|
headers = { |
|
"x-api-key": api_key, |
|
"anthropic-version": "2023-06-01", |
|
"content-type": "application/json" |
|
} |
|
payload = { |
|
"model": model, |
|
"system": system_message, |
|
"messages": [ |
|
{"role": "user", "content": prompt} |
|
], |
|
"temperature": temperature, |
|
"max_tokens": max_tokens |
|
} |
|
resp = requests.post( |
|
"https://api.anthropic.com/v1/messages", |
|
headers=headers, |
|
json=payload, |
|
timeout=30 |
|
) |
|
if resp.status_code != 200: |
|
return handle_api_error(resp.status_code, resp.text) |
|
data = resp.json() |
|
return data["content"][0]["text"] |
|
|
|
return call_api_with_retry(make_request) |
|
|
|
@lru_cache(maxsize=100) |
|
def cached_llm_call(model_hash: str, prompt_hash: str, model: str, prompt: str, temperature: float = 0.7, max_tokens: int = 1500) -> str: |
|
"""Cached LLM call to avoid repeated API calls for same inputs.""" |
|
provider = get_provider(model) |
|
if provider == "together": |
|
return call_together_api(model, prompt, temperature, max_tokens) |
|
elif provider == "anthropic": |
|
return call_anthropic_api(model, prompt, temperature, max_tokens) |
|
else: |
|
return f"Error: Unknown provider for model {model}" |
|
|
|
def call_llm(model: str, prompt: str, temperature: float = 0.7, max_tokens: int = 1500) -> str: |
|
"""Call LLM with caching support.""" |
|
model_hash = hashlib.md5(model.encode()).hexdigest() |
|
prompt_hash = hashlib.md5(prompt.encode()).hexdigest() |
|
return cached_llm_call(model_hash, prompt_hash, model, prompt, temperature, max_tokens) |