Spaces:
Sleeping
Sleeping
import logging | |
from semantic_kernel import Kernel | |
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion | |
from semantic_kernel.functions import kernel_function | |
from azure.cosmos import CosmosClient | |
from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.azure_chat_prompt_execution_settings import ( | |
AzureChatPromptExecutionSettings, | |
) | |
from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior | |
from models.converterModels import PowerConverter | |
import os | |
from dotenv import load_dotenv | |
load_dotenv() | |
logger = logging.getLogger("kernel") | |
logger.setLevel(logging.DEBUG) | |
handler = logging.StreamHandler() | |
handler.setFormatter(logging.Formatter( | |
"[%(asctime)s - %(name)s:%(lineno)d - %(levelname)s] %(message)s" | |
)) | |
logger.addHandler(handler) | |
# Initialize Semantic Kernel | |
kernel = Kernel() | |
# Add Azure OpenAI Chat Service | |
kernel.add_service(AzureChatCompletion( | |
service_id="chat", | |
deployment_name=os.getenv("AZURE_OPENAI_DEPLOYMENT_NAME"), | |
endpoint=os.getenv("AZURE_OPENAI_ENDPOINT"), | |
api_key=os.getenv("AZURE_OPENAI_KEY") | |
)) | |
# Database Plugin | |
class CosmosDBPlugin: | |
def __init__(self): | |
self.client = CosmosClient( | |
os.getenv("AZURE_COSMOS_DB_ENDPOINT"), | |
os.getenv("AZURE_COSMOS_DB_KEY") | |
) | |
self.database = self.client.get_database_client("TAL_DB") | |
self.container = self.database.get_container_client("Converters") | |
self.logger = logger | |
async def query_converters(self, query: str) -> str: | |
try: | |
print(f"Executing query: {query}") | |
items = list(self.container.query_items( | |
query=query, | |
enable_cross_partition_query=True | |
)) | |
print(f"Query returned {len(items)} items") | |
items = items[:10] | |
self.logger.debug(f"Raw items: {items}") | |
items = [PowerConverter(**item) for item in items] if items else [] | |
self.logger.info(f"Query returned {len(items)} items after conversion") | |
self.logger.debug(f"Items: {items}") | |
return str(items) | |
except Exception as e: | |
self.logger.info(f"Query failed: {str(e)}") | |
return f"Query failed: {str(e)}" | |
# SQL Generation Plugin | |
class NL2SQLPlugin: | |
async def generate_sql(self, question: str) -> str: | |
sql = await self._generate_sql_helper(question) | |
if "SELECT *" not in sql and "FROM c" in sql: | |
sql = sql.replace("SELECT c.*", "SELECT *") | |
sql = sql.replace("SELECT c", "SELECT *") | |
return sql | |
async def _generate_sql_helper(self, question: str) -> str: | |
from semantic_kernel.contents import ChatHistory | |
chat_service = kernel.get_service("chat") | |
chat_history = ChatHistory() | |
chat_history.add_user_message(f"""Convert to Cosmos DB SQL: {question} | |
Collection: converters (alias 'c') | |
Fields: | |
- type (e.g., '350mA') | |
- artnr (numeric (int) article number e.g., 930546) | |
- output_voltage_v: dictionary with min/max values for output voltage | |
- output_voltage_v.min (e.g., 15) | |
- output_voltage_v.max (e.g., 40) | |
- input_voltage_v: dictionary with min/max values for input voltage | |
- input_voltage_v.min (e.g., 198) | |
- input_voltage_v.max (e.g., 264) | |
- lamps: dictionary with min/max values for lamp types for this converter | |
- lamps["lamp_name"].min (e.g., 1) | |
- lamps["lamp_name"].max (e.g., 10) | |
- nom_input_voltage (e.g, '198-264V') | |
- class (safety class) | |
- dimmability (e.g., 'MAINS DIM LC') | |
- listprice (e.g., 58) | |
- lifecycle (e.g., 'Active') | |
- size (e.g., '150x30x30') | |
- dimlist_type (e.g., 'DALI') | |
- pdf_link (link to product PDF) | |
- converter_description (e.g., 'POWERLED CONVERTER REMOTE 180mA 8W IP20 1-10V') | |
- ip (Ingress Protection, integer values e.g., 20,67) | |
- efficiency_full_load (e.g., 0.9) | |
- name (e.g., 'Power Converter 350mA') | |
- unit (e.g., 'PC') | |
Return ONLY SQL without explanations""") | |
response = await chat_service.get_chat_message_content( | |
chat_history=chat_history, | |
settings=AzureChatPromptExecutionSettings() | |
) | |
return str(response) | |
# Register plugins | |
kernel.add_plugin(CosmosDBPlugin(), "CosmosDBPlugin") | |
kernel.add_plugin(NL2SQLPlugin(), "NL2SQLPlugin") | |
# Updated query handler using function calling | |
async def handle_query(user_input: str): | |
settings = AzureChatPromptExecutionSettings( | |
function_choice_behavior=FunctionChoiceBehavior.Auto(auto_invoke=True) | |
) | |
prompt = f""" | |
You are a converter database expert. Process this user query: | |
{user_input} | |
Available functions: | |
- generate_sql: Creates SQL queries from natural language | |
- query_converters: Executes SQL queries against the database | |
Follow these steps: | |
1. Generate SQL using generate_sql | |
2. Execute query with query_converters | |
3. Format results into natural language response | |
Query Guidelines: | |
1. When performing SELECT ALL queries always use SELECT *. Never use SELECT c.* or SELECT c | |
2. For questions about lamp compatibility, ALWAYS use SELECT * FROM c WHERE IS_DEFINED(c.lamps["lamp_name"]) | |
3. For questions about lamps that can be used with a converter, ALWAYS use SELECT c.lamps FROM c WHERE c.artnr = @artnr | |
5. For questions about lamp limits, query for the lamps dictionary and return min/max values | |
""" | |
result = await kernel.invoke_prompt( | |
prompt=prompt, | |
settings=settings | |
) | |
return str(result) | |
# Example usage | |
async def main(): | |
while True: | |
try: | |
query = input("User: ") | |
if query.lower() in ["exit", "quit"]: | |
break | |
response = await handle_query(query) | |
print(response) | |
except KeyboardInterrupt: | |
break | |
if __name__ == "__main__": | |
import asyncio | |
asyncio.run(main()) |