cyyeh's picture
add new page and refactor
7df9c64
from typing import Optional
import requests
API_BASE_URL = "https://cloud.getwren.ai/api/v1"
def ask(
api_key: str,
project_id: str,
question: str,
thread_id: str = "",
language: str = "English",
sample_size: int = 1000,
) -> tuple[dict, str]:
"""Ask a question and get an answer with SQL and explanation."""
endpoint = f"{API_BASE_URL}/ask"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"projectId": project_id,
"question": question,
"sampleSize": sample_size,
"language": language,
}
if thread_id:
payload["threadId"] = thread_id
try:
response = requests.post(endpoint, json=payload, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def generate_sql(
api_key: str,
project_id: str,
question: str,
thread_id: str = "",
language: str = "English",
return_sql_dialect: bool = False,
) -> tuple[dict, str]:
"""Generate SQL from natural language query."""
endpoint = f"{API_BASE_URL}/generate_sql"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"projectId": project_id,
"question": question,
"language": language,
"returnSqlDialect": return_sql_dialect,
}
if thread_id:
payload["threadId"] = thread_id
try:
response = requests.post(endpoint, json=payload, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def run_sql(
api_key: str,
project_id: str,
sql: str,
thread_id: str = "",
limit: int = 1000,
) -> tuple[dict, str]:
"""Execute SQL query and return results."""
endpoint = f"{API_BASE_URL}/run_sql"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"projectId": project_id,
"sql": sql,
"limit": limit,
}
if thread_id:
payload["threadId"] = thread_id
try:
response = requests.post(endpoint, json=payload, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def generate_summary(
api_key: str,
project_id: str,
question: str,
sql: str,
thread_id: str = "",
language: str = "English",
sample_size: int = 1000,
) -> tuple[dict, str]:
"""Generate a summary of query results."""
endpoint = f"{API_BASE_URL}/generate_summary"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"projectId": project_id,
"question": question,
"sql": sql,
"language": language,
"sampleSize": sample_size,
}
if thread_id:
payload["threadId"] = thread_id
try:
response = requests.post(endpoint, json=payload, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def stream_explanation(
api_key: str,
query_id: str,
) -> tuple[dict, str]:
"""Stream explanation for a query."""
endpoint = f"{API_BASE_URL}/stream_explanation/{query_id}"
headers = {
"Authorization": f"Bearer {api_key}",
"Accept": "text/event-stream",
}
try:
response = requests.get(endpoint, headers=headers, stream=True)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def generate_chart(
api_key: str,
project_id: str,
question: str,
sql: str,
thread_id: str = "",
sample_size: int = 1000,
) -> tuple[dict, str]:
"""Generate a chart from query results."""
endpoint = f"{API_BASE_URL}/generate_vega_chart"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"projectId": project_id,
"question": question,
"sql": sql,
"sampleSize": sample_size
}
if thread_id:
payload["threadId"] = thread_id
try:
response = requests.post(endpoint, json=payload, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def stream_ask(
api_key: str,
project_id: str,
question: str,
thread_id: str = "",
language: str = "English",
sample_size: int = 1000,
) -> tuple[dict, str]:
"""Stream ask endpoint for real-time responses."""
endpoint = f"{API_BASE_URL}/stream/ask"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "text/event-stream",
}
payload = {
"projectId": project_id,
"question": question,
"language": language,
"sampleSize": sample_size,
}
if thread_id:
payload["threadId"] = thread_id
try:
response = requests.post(endpoint, json=payload, headers=headers, stream=True)
response.raise_for_status()
return response, ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def stream_generate_sql(
api_key: str,
project_id: str,
question: str,
thread_id: str = "",
language: str = "English",
return_sql_dialect: bool = False,
) -> tuple[dict, str]:
"""Stream SQL generation endpoint for real-time responses."""
endpoint = f"{API_BASE_URL}/stream/generate_sql"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "text/event-stream",
}
payload = {
"projectId": project_id,
"question": question,
"language": language,
"returnSqlDialect": return_sql_dialect,
}
if thread_id:
payload["threadId"] = thread_id
try:
response = requests.post(endpoint, json=payload, headers=headers, stream=True)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def get_models(
api_key: str,
project_id: str,
) -> tuple[dict, str]:
"""Get latest deployed models for a project."""
endpoint = f"{API_BASE_URL}/projects/{project_id}/models"
headers = {
"Authorization": f"Bearer {api_key}",
}
try:
response = requests.get(endpoint, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def get_instructions(
api_key: str,
project_id: str,
) -> tuple[dict, str]:
"""Get all instructions for a project."""
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions"
headers = {
"Authorization": f"Bearer {api_key}",
}
try:
response = requests.get(endpoint, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def create_instruction(
api_key: str,
project_id: str,
instruction: str,
is_global: bool = True,
questions: Optional[list[str]] = None,
) -> tuple[dict, str]:
"""Create a new instruction for a project."""
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"instruction": instruction,
"isGlobal": is_global,
"questions": questions or [],
}
try:
response = requests.post(endpoint, json=payload, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def update_instruction(
api_key: str,
project_id: str,
instruction_id: str,
instruction: str,
is_global: bool = True,
questions: Optional[list[str]] = None,
) -> tuple[dict, str]:
"""Update an existing instruction."""
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions/{instruction_id}"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"instruction": instruction,
"isGlobal": is_global,
"questions": questions or [],
}
try:
response = requests.put(endpoint, json=payload, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def delete_instruction(
api_key: str,
project_id: str,
instruction_id: str,
) -> str:
"""Delete an instruction."""
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions/{instruction_id}"
headers = {
"Authorization": f"Bearer {api_key}",
}
try:
response = requests.delete(endpoint, headers=headers)
response.raise_for_status()
return ""
except requests.exceptions.RequestException as e:
return str(e)
def get_sql_pairs(
api_key: str,
project_id: str,
) -> tuple[dict, str]:
"""Get all SQL pairs for a project."""
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs"
headers = {
"Authorization": f"Bearer {api_key}",
}
try:
response = requests.get(endpoint, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def create_sql_pair(
api_key: str,
project_id: str,
question: str,
sql: str,
) -> tuple[dict, str]:
"""Create a new SQL pair for a project."""
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"question": question,
"sql": sql,
}
try:
response = requests.post(endpoint, json=payload, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def update_sql_pair(
api_key: str,
project_id: str,
sql_pair_id: str,
question: str,
sql: str,
) -> tuple[dict, str]:
"""Update an existing SQL pair."""
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs/{sql_pair_id}"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"question": question,
"sql": sql,
}
try:
response = requests.put(endpoint, json=payload, headers=headers)
response.raise_for_status()
return response.json(), ""
except requests.exceptions.RequestException as e:
return {}, str(e)
def delete_sql_pair(
api_key: str,
project_id: str,
sql_pair_id: str,
) -> str:
"""Delete an SQL pair."""
endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs/{sql_pair_id}"
headers = {
"Authorization": f"Bearer {api_key}",
}
try:
response = requests.delete(endpoint, headers=headers)
response.raise_for_status()
return ""
except requests.exceptions.RequestException as e:
return str(e)