Spaces:
Running
Running
from typing import Optional | |
import requests | |
API_BASE_URL = "https://cloud.getwren.ai/api/v1" | |
def ask( | |
api_key: str, | |
project_id: str, | |
question: str, | |
thread_id: str = "", | |
language: str = "English", | |
sample_size: int = 1000, | |
) -> tuple[dict, str]: | |
"""Ask a question and get an answer with SQL and explanation.""" | |
endpoint = f"{API_BASE_URL}/ask" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"projectId": project_id, | |
"question": question, | |
"sampleSize": sample_size, | |
"language": language, | |
} | |
if thread_id: | |
payload["threadId"] = thread_id | |
try: | |
response = requests.post(endpoint, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def generate_sql( | |
api_key: str, | |
project_id: str, | |
question: str, | |
thread_id: str = "", | |
language: str = "English", | |
return_sql_dialect: bool = False, | |
) -> tuple[dict, str]: | |
"""Generate SQL from natural language query.""" | |
endpoint = f"{API_BASE_URL}/generate_sql" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"projectId": project_id, | |
"question": question, | |
"language": language, | |
"returnSqlDialect": return_sql_dialect, | |
} | |
if thread_id: | |
payload["threadId"] = thread_id | |
try: | |
response = requests.post(endpoint, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def run_sql( | |
api_key: str, | |
project_id: str, | |
sql: str, | |
thread_id: str = "", | |
limit: int = 1000, | |
) -> tuple[dict, str]: | |
"""Execute SQL query and return results.""" | |
endpoint = f"{API_BASE_URL}/run_sql" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"projectId": project_id, | |
"sql": sql, | |
"limit": limit, | |
} | |
if thread_id: | |
payload["threadId"] = thread_id | |
try: | |
response = requests.post(endpoint, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def generate_summary( | |
api_key: str, | |
project_id: str, | |
question: str, | |
sql: str, | |
thread_id: str = "", | |
language: str = "English", | |
sample_size: int = 1000, | |
) -> tuple[dict, str]: | |
"""Generate a summary of query results.""" | |
endpoint = f"{API_BASE_URL}/generate_summary" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"projectId": project_id, | |
"question": question, | |
"sql": sql, | |
"language": language, | |
"sampleSize": sample_size, | |
} | |
if thread_id: | |
payload["threadId"] = thread_id | |
try: | |
response = requests.post(endpoint, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def stream_explanation( | |
api_key: str, | |
query_id: str, | |
) -> tuple[dict, str]: | |
"""Stream explanation for a query.""" | |
endpoint = f"{API_BASE_URL}/stream_explanation/{query_id}" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Accept": "text/event-stream", | |
} | |
try: | |
response = requests.get(endpoint, headers=headers, stream=True) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def generate_chart( | |
api_key: str, | |
project_id: str, | |
question: str, | |
sql: str, | |
thread_id: str = "", | |
sample_size: int = 1000, | |
) -> tuple[dict, str]: | |
"""Generate a chart from query results.""" | |
endpoint = f"{API_BASE_URL}/generate_vega_chart" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"projectId": project_id, | |
"question": question, | |
"sql": sql, | |
"sampleSize": sample_size | |
} | |
if thread_id: | |
payload["threadId"] = thread_id | |
try: | |
response = requests.post(endpoint, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def stream_ask( | |
api_key: str, | |
project_id: str, | |
question: str, | |
thread_id: str = "", | |
language: str = "English", | |
sample_size: int = 1000, | |
) -> tuple[dict, str]: | |
"""Stream ask endpoint for real-time responses.""" | |
endpoint = f"{API_BASE_URL}/stream/ask" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json", | |
"Accept": "text/event-stream", | |
} | |
payload = { | |
"projectId": project_id, | |
"question": question, | |
"language": language, | |
"sampleSize": sample_size, | |
} | |
if thread_id: | |
payload["threadId"] = thread_id | |
try: | |
response = requests.post(endpoint, json=payload, headers=headers, stream=True) | |
response.raise_for_status() | |
return response, "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def stream_generate_sql( | |
api_key: str, | |
project_id: str, | |
question: str, | |
thread_id: str = "", | |
language: str = "English", | |
return_sql_dialect: bool = False, | |
) -> tuple[dict, str]: | |
"""Stream SQL generation endpoint for real-time responses.""" | |
endpoint = f"{API_BASE_URL}/stream/generate_sql" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json", | |
"Accept": "text/event-stream", | |
} | |
payload = { | |
"projectId": project_id, | |
"question": question, | |
"language": language, | |
"returnSqlDialect": return_sql_dialect, | |
} | |
if thread_id: | |
payload["threadId"] = thread_id | |
try: | |
response = requests.post(endpoint, json=payload, headers=headers, stream=True) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def get_models( | |
api_key: str, | |
project_id: str, | |
) -> tuple[dict, str]: | |
"""Get latest deployed models for a project.""" | |
endpoint = f"{API_BASE_URL}/projects/{project_id}/models" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
} | |
try: | |
response = requests.get(endpoint, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def get_instructions( | |
api_key: str, | |
project_id: str, | |
) -> tuple[dict, str]: | |
"""Get all instructions for a project.""" | |
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
} | |
try: | |
response = requests.get(endpoint, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def create_instruction( | |
api_key: str, | |
project_id: str, | |
instruction: str, | |
is_global: bool = True, | |
questions: Optional[list[str]] = None, | |
) -> tuple[dict, str]: | |
"""Create a new instruction for a project.""" | |
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"instruction": instruction, | |
"isGlobal": is_global, | |
"questions": questions or [], | |
} | |
try: | |
response = requests.post(endpoint, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def update_instruction( | |
api_key: str, | |
project_id: str, | |
instruction_id: str, | |
instruction: str, | |
is_global: bool = True, | |
questions: Optional[list[str]] = None, | |
) -> tuple[dict, str]: | |
"""Update an existing instruction.""" | |
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions/{instruction_id}" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"instruction": instruction, | |
"isGlobal": is_global, | |
"questions": questions or [], | |
} | |
try: | |
response = requests.put(endpoint, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def delete_instruction( | |
api_key: str, | |
project_id: str, | |
instruction_id: str, | |
) -> str: | |
"""Delete an instruction.""" | |
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions/{instruction_id}" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
} | |
try: | |
response = requests.delete(endpoint, headers=headers) | |
response.raise_for_status() | |
return "" | |
except requests.exceptions.RequestException as e: | |
return str(e) | |
def get_sql_pairs( | |
api_key: str, | |
project_id: str, | |
) -> tuple[dict, str]: | |
"""Get all SQL pairs for a project.""" | |
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
} | |
try: | |
response = requests.get(endpoint, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def create_sql_pair( | |
api_key: str, | |
project_id: str, | |
question: str, | |
sql: str, | |
) -> tuple[dict, str]: | |
"""Create a new SQL pair for a project.""" | |
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"question": question, | |
"sql": sql, | |
} | |
try: | |
response = requests.post(endpoint, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def update_sql_pair( | |
api_key: str, | |
project_id: str, | |
sql_pair_id: str, | |
question: str, | |
sql: str, | |
) -> tuple[dict, str]: | |
"""Update an existing SQL pair.""" | |
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs/{sql_pair_id}" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"question": question, | |
"sql": sql, | |
} | |
try: | |
response = requests.put(endpoint, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json(), "" | |
except requests.exceptions.RequestException as e: | |
return {}, str(e) | |
def delete_sql_pair( | |
api_key: str, | |
project_id: str, | |
sql_pair_id: str, | |
) -> str: | |
"""Delete an SQL pair.""" | |
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs/{sql_pair_id}" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
} | |
try: | |
response = requests.delete(endpoint, headers=headers) | |
response.raise_for_status() | |
return "" | |
except requests.exceptions.RequestException as e: | |
return str(e) | |