gemini_nl2sql / pipeline.py
acadiaway's picture
pipeline with the bug messages for Google API -v3 syntax
d339486
raw
history blame
7.42 kB
import os
import requests
import time
import re
import json
from db_utils import get_schema, execute_sql
def query_gemini_api(prompt, max_retries=3):
"""Query the Google Gemini API"""
api_key = os.getenv("GOOGLE_API_KEY")
print(f"=== DEBUG: API Key Loaded: {api_key[:5]}...") # Partial key for debug
if not api_key:
raise ValueError("GOOGLE_API_KEY not found in environment variables")
# Gemini API endpoint
url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={api_key}"
print(f"=== DEBUG: API URL: {url[:50]}...") # Fixed: Proper f-string syntax
headers = {
"Content-Type": "application/json"
}
payload = {
"contents": [{
"parts": [{
"text": prompt
}]
}],
"generationConfig": {
"temperature": 0.1,
"topK": 1,
"topP": 0.8,
"maxOutputTokens": 200,
"stopSequences": ["```", "\n\n"]
}
}
print(f"=== DEBUG: Payload: {json.dumps(payload, indent=2)}")
for attempt in range(max_retries):
try:
print(f"=== DEBUG: Attempt {attempt + 1} of {max_retries}")
response = requests.post(url, headers=headers, json=payload, timeout=30)
print(f"=== DEBUG: API Response Status: {response.status_code}")
print(f"=== DEBUG: Response Text: {response.text[:200]}...") # Partial response
if response.status_code == 200:
result = response.json()
print(f"=== DEBUG: API Response: {result}")
if "candidates" in result and len(result["candidates"]) > 0:
candidate = result["candidates"][0]
if "content" in candidate and "parts" in candidate["content"]:
generated_text = candidate["content"]["parts"][0]["text"].strip()
return generated_text
return "No valid response generated"
elif response.status_code == 429:
wait_time = 60 * (attempt + 1) # Rate limit - wait longer
print(f"=== DEBUG: Rate limited, waiting {wait_time} seconds...")
time.sleep(wait_time)
continue
else:
error_msg = f"Gemini API Error {response.status_code}: {response.text}"
print(f"=== DEBUG: {error_msg}")
if attempt == max_retries - 1:
raise Exception(error_msg)
except requests.exceptions.Timeout: # Previous fix retained
print(f"=== DEBUG: Timeout on attempt {attempt + 1}")
if attempt == max_retries - 1:
raise Exception("Request timed out after multiple attempts")
time.sleep(5)
except Exception as e:
print(f"=== DEBUG: Exception on attempt {attempt + 1}: {e}")
if attempt == max_retries - 1:
raise e
time.sleep(5)
raise Exception("Failed to get response after all retries")
def extract_user_requested_limit(nl_query):
"""Extract user-requested number from natural language query"""
patterns = [
r'\b(\d+)\s+(?:ships?|vessels?|boats?|records?|results?|entries?|names?)\b',
r'(?:show|list|find|get)\s+(?:me\s+)?(?:the\s+)?(?:top\s+|first\s+)?(\d+)',
r'(?:names\s+of\s+)(\d+)\s+',
r'\b(\d+)\s+(?:oldest|newest|biggest|smallest|largest)',
]
for pattern in patterns:
match = re.search(pattern, nl_query, re.IGNORECASE)
if match:
return int(match.group(1))
return None
def clean_sql_output(sql_text, user_limit=None):
"""Clean and validate SQL output from the model"""
sql_text = sql_text.strip()
# Remove markdown formatting
if sql_text.startswith("```"):
lines = sql_text.split('\n')
# Find SQL content between backticks
sql_lines = []
in_sql = False
for line in lines:
if line.strip().startswith("```"):
in_sql = not in_sql
continue
if in_sql:
sql_lines.append(line)
sql_text = '\n'.join(sql_lines)
# Handle multiple lines - extract the main SELECT query
lines = sql_text.split('\n')
sql = ""
for line in lines:
line = line.strip()
if line and (line.upper().startswith('SELECT') or sql):
sql += line + " "
if line.endswith(';'):
break
if not sql:
# If no SELECT found, take the first non-empty line that looks like SQL
for line in lines:
line = line.strip()
if line and any(keyword in line.upper() for keyword in ['SELECT', 'WITH', 'FROM']):
sql = line
break
sql = sql.strip().rstrip(';')
# Apply user-requested limit
if user_limit:
sql = re.sub(r'\s+LIMIT\s+\d+', '', sql, flags=re.IGNORECASE)
sql += f" LIMIT {user_limit}"
return sql
def text_to_sql(nl_query):
"""Convert natural language to SQL using Google Gemini"""
try:
print(f"=== DEBUG: Starting text_to_sql with query: {nl_query}")
# Get database schema
try:
schema = get_schema()
print(f"=== DEBUG: Schema retrieved, length: {len(schema)}")
except Exception as e:
print(f"=== DEBUG: Schema error: {e}")
return f"Error: Database schema access failed: {str(e)}", []
# Extract user limit
user_limit = extract_user_requested_limit(nl_query)
print(f"=== DEBUG: Extracted user limit: {user_limit}")
# Create optimized prompt for Gemini
prompt = f"""You are an expert PostgreSQL developer. Convert this natural language question to a precise SQL query.
Question: {nl_query}
Database Schema:
{schema[:1500]}
Requirements:
- Generate ONLY the SQL query, no explanation
- Use PostgreSQL syntax
- Be precise with table and column names from the schema
- Return a single SELECT statement
SQL Query:"""
print(f"=== DEBUG: Calling Google Gemini API...")
generated_sql = query_gemini_api(prompt)
print(f"=== DEBUG: Generated SQL raw: {generated_sql}")
if not generated_sql or "No valid response" in generated_sql:
return "Error: No SQL generated from Gemini", []
# Clean the SQL output
sql = clean_sql_output(generated_sql, user_limit)
print(f"=== DEBUG: Final cleaned SQL: {sql}")
if not sql or not sql.upper().strip().startswith('SELECT'):
return f"Error: Invalid SQL generated: {sql}", []
# Execute SQL
print(f"=== DEBUG: Executing SQL...")
try:
results = execute_sql(sql)
print(f"=== DEBUG: SQL executed successfully, {len(results)} results")
return sql, results
except Exception as e:
print(f"=== DEBUG: SQL execution error: {e}")
return f"Error: SQL execution failed: {str(e)}", []
except Exception as e:
print(f"=== DEBUG: General error in text_to_sql: {e}")
return f"Error: {str(e)}", []