from typing import Optional import requests API_BASE_URL = "https://cloud.getwren.ai/api/v1" def ask( api_key: str, project_id: str, question: str, thread_id: str = "", language: str = "English", sample_size: int = 1000, ) -> tuple[dict, str]: """Ask a question and get an answer with SQL and explanation.""" endpoint = f"{API_BASE_URL}/ask" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "projectId": project_id, "question": question, "sampleSize": sample_size, "language": language, } if thread_id: payload["threadId"] = thread_id try: response = requests.post(endpoint, json=payload, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def generate_sql( api_key: str, project_id: str, question: str, thread_id: str = "", language: str = "English", return_sql_dialect: bool = False, ) -> tuple[dict, str]: """Generate SQL from natural language query.""" endpoint = f"{API_BASE_URL}/generate_sql" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "projectId": project_id, "question": question, "language": language, "returnSqlDialect": return_sql_dialect, } if thread_id: payload["threadId"] = thread_id try: response = requests.post(endpoint, json=payload, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def run_sql( api_key: str, project_id: str, sql: str, thread_id: str = "", limit: int = 1000, ) -> tuple[dict, str]: """Execute SQL query and return results.""" endpoint = f"{API_BASE_URL}/run_sql" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "projectId": project_id, "sql": sql, "limit": limit, } if thread_id: payload["threadId"] = thread_id try: response = requests.post(endpoint, json=payload, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def generate_summary( api_key: str, project_id: str, question: str, sql: str, thread_id: str = "", language: str = "English", sample_size: int = 1000, ) -> tuple[dict, str]: """Generate a summary of query results.""" endpoint = f"{API_BASE_URL}/generate_summary" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "projectId": project_id, "question": question, "sql": sql, "language": language, "sampleSize": sample_size, } if thread_id: payload["threadId"] = thread_id try: response = requests.post(endpoint, json=payload, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def stream_explanation( api_key: str, query_id: str, ) -> tuple[dict, str]: """Stream explanation for a query.""" endpoint = f"{API_BASE_URL}/stream_explanation/{query_id}" headers = { "Authorization": f"Bearer {api_key}", "Accept": "text/event-stream", } try: response = requests.get(endpoint, headers=headers, stream=True) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def generate_chart( api_key: str, project_id: str, question: str, sql: str, thread_id: str = "", sample_size: int = 1000, ) -> tuple[dict, str]: """Generate a chart from query results.""" endpoint = f"{API_BASE_URL}/generate_vega_chart" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "projectId": project_id, "question": question, "sql": sql, "sampleSize": sample_size } if thread_id: payload["threadId"] = thread_id try: response = requests.post(endpoint, json=payload, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def stream_ask( api_key: str, project_id: str, question: str, thread_id: str = "", language: str = "English", sample_size: int = 1000, ) -> tuple[dict, str]: """Stream ask endpoint for real-time responses.""" endpoint = f"{API_BASE_URL}/stream/ask" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "Accept": "text/event-stream", } payload = { "projectId": project_id, "question": question, "language": language, "sampleSize": sample_size, } if thread_id: payload["threadId"] = thread_id try: response = requests.post(endpoint, json=payload, headers=headers, stream=True) response.raise_for_status() return response, "" except requests.exceptions.RequestException as e: return {}, str(e) def stream_generate_sql( api_key: str, project_id: str, question: str, thread_id: str = "", language: str = "English", return_sql_dialect: bool = False, ) -> tuple[dict, str]: """Stream SQL generation endpoint for real-time responses.""" endpoint = f"{API_BASE_URL}/stream/generate_sql" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", "Accept": "text/event-stream", } payload = { "projectId": project_id, "question": question, "language": language, "returnSqlDialect": return_sql_dialect, } if thread_id: payload["threadId"] = thread_id try: response = requests.post(endpoint, json=payload, headers=headers, stream=True) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def get_models( api_key: str, project_id: str, ) -> tuple[dict, str]: """Get latest deployed models for a project.""" endpoint = f"{API_BASE_URL}/projects/{project_id}/models" headers = { "Authorization": f"Bearer {api_key}", } try: response = requests.get(endpoint, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def get_instructions( api_key: str, project_id: str, ) -> tuple[dict, str]: """Get all instructions for a project.""" endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions" headers = { "Authorization": f"Bearer {api_key}", } try: response = requests.get(endpoint, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def create_instruction( api_key: str, project_id: str, instruction: str, is_global: bool = True, questions: Optional[list[str]] = None, ) -> tuple[dict, str]: """Create a new instruction for a project.""" endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "instruction": instruction, "isGlobal": is_global, "questions": questions or [], } try: response = requests.post(endpoint, json=payload, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def update_instruction( api_key: str, project_id: str, instruction_id: str, instruction: str, is_global: bool = True, questions: Optional[list[str]] = None, ) -> tuple[dict, str]: """Update an existing instruction.""" endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions/{instruction_id}" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "instruction": instruction, "isGlobal": is_global, "questions": questions or [], } try: response = requests.put(endpoint, json=payload, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def delete_instruction( api_key: str, project_id: str, instruction_id: str, ) -> str: """Delete an instruction.""" endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/instructions/{instruction_id}" headers = { "Authorization": f"Bearer {api_key}", } try: response = requests.delete(endpoint, headers=headers) response.raise_for_status() return "" except requests.exceptions.RequestException as e: return str(e) def get_sql_pairs( api_key: str, project_id: str, ) -> tuple[dict, str]: """Get all SQL pairs for a project.""" endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs" headers = { "Authorization": f"Bearer {api_key}", } try: response = requests.get(endpoint, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def create_sql_pair( api_key: str, project_id: str, question: str, sql: str, ) -> tuple[dict, str]: """Create a new SQL pair for a project.""" endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "question": question, "sql": sql, } try: response = requests.post(endpoint, json=payload, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def update_sql_pair( api_key: str, project_id: str, sql_pair_id: str, question: str, sql: str, ) -> tuple[dict, str]: """Update an existing SQL pair.""" endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs/{sql_pair_id}" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "question": question, "sql": sql, } try: response = requests.put(endpoint, json=payload, headers=headers) response.raise_for_status() return response.json(), "" except requests.exceptions.RequestException as e: return {}, str(e) def delete_sql_pair( api_key: str, project_id: str, sql_pair_id: str, ) -> str: """Delete an SQL pair.""" endpoint = f"{API_BASE_URL}/projects/{project_id}/knowledge/sql_pairs/{sql_pair_id}" headers = { "Authorization": f"Bearer {api_key}", } try: response = requests.delete(endpoint, headers=headers) response.raise_for_status() return "" except requests.exceptions.RequestException as e: return str(e)