Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
import sqlite3 | |
import json | |
import time | |
from PIL import Image, ImageDraw | |
# ------ Mock MCP Server Implementation ------ | |
class MockMCPServer: | |
def __init__(self): | |
self.tools = {} | |
def register_tool(self, name, func, description): | |
self.tools[name] = { | |
"function": func, | |
"description": description | |
} | |
def call_tool(self, tool_name, params): | |
if tool_name in self.tools: | |
return self.tools[tool_name]["function"](**params) | |
return {"error": f"Tool {tool_name} not found"} | |
# ------ Create Mock MCP Server ------ | |
mcp_server = MockMCPServer() | |
# ------ Tool Implementations ------ | |
def get_recipe_by_ingredients(ingredients): | |
"""Find recipes based on available ingredients""" | |
# In a real implementation, this would call an API | |
print(f"Searching recipes with ingredients: {ingredients}") | |
return { | |
"recipes": [ | |
{"name": "Vegetable Stir Fry", "time": 20, "difficulty": "Easy"}, | |
{"name": "Pasta Primavera", "time": 30, "difficulty": "Medium"} | |
] | |
} | |
def get_recipe_image(recipe_name): | |
"""Generate an image of the finished recipe""" | |
print(f"Generating image for: {recipe_name}") | |
# Create a placeholder image with the recipe name | |
img = Image.new('RGB', (300, 200), color=(73, 109, 137)) | |
d = ImageDraw.Draw(img) | |
d.text((10,10), f"Image of: {recipe_name}", fill=(255,255,0)) | |
return img | |
def convert_measurements(amount, from_unit, to_unit): | |
"""Convert cooking measurements between units""" | |
print(f"Converting {amount} {from_unit} to {to_unit}") | |
conversions = { | |
("tbsp", "tsp"): lambda x: x * 3, | |
("cups", "ml"): lambda x: x * 240, | |
("oz", "g"): lambda x: x * 28.35 | |
} | |
conversion_key = (from_unit.lower(), to_unit.lower()) | |
if conversion_key in conversions: | |
result = conversions[conversion_key](amount) | |
return {"result": round(result, 2), "unit": to_unit} | |
return {"error": "Conversion not supported"} | |
# ------ Recipe Database ------ | |
def init_recipe_db(): | |
conn = sqlite3.connect(':memory:') | |
c = conn.cursor() | |
c.execute('''CREATE TABLE recipes | |
(id INTEGER PRIMARY KEY, name TEXT, ingredients TEXT, instructions TEXT, prep_time INT)''') | |
recipes = [ | |
("Classic Pancakes", json.dumps(["flour", "eggs", "milk", "baking powder"]), | |
"1. Mix dry ingredients\n2. Add wet ingredients\n3. Cook on griddle", 15), | |
("Tomato Soup", json.dumps(["tomatoes", "onion", "garlic", "vegetable stock"]), | |
"1. Sauté onions\n2. Add tomatoes\n3. Simmer and blend", 30), | |
("Chocolate Cake", json.dumps(["flour", "sugar", "cocoa", "eggs", "milk"]), | |
"1. Mix dry ingredients\n2. Add wet ingredients\n3. Bake at 350°F", 45) | |
] | |
c.executemany("INSERT INTO recipes (name, ingredients, instructions, prep_time) VALUES (?,?,?,?)", recipes) | |
conn.commit() | |
return conn | |
# ------ Voice Processing Functions ------ | |
def text_to_speech(text): | |
"""Mock TTS function - in real use, replace with actual TTS""" | |
print(f"[TTS]: {text}") | |
# Return dummy audio data (silence) | |
duration = 2 # seconds | |
sample_rate = 44100 | |
samples = np.zeros(int(duration * sample_rate), dtype=np.float32) | |
return (sample_rate, samples) | |
def speech_to_text(audio): | |
"""Mock STT function - in real use, replace with actual STT""" | |
# For now, we return a fixed string. In reality, we would process the audio | |
sample_rate, audio_data = audio | |
print(f"Received audio with sample rate {sample_rate} and shape {audio_data.shape}") | |
# Return a fixed response for demo | |
return "What can I make with eggs and flour?" | |
# ------ Agent Logic ------ | |
def process_query(query, db_conn): | |
"""Process user query using the available tools""" | |
print(f"Processing query: {query}") | |
# Simple intent recognition | |
if "recipe" in query.lower() or "make" in query.lower() or "cook" in query.lower(): | |
# Extract ingredients - very simple, just use some keywords | |
ingredients = [] | |
for word in ["eggs", "flour", "milk", "tomatoes", "onion", "garlic"]: | |
if word in query.lower(): | |
ingredients.append(word) | |
if not ingredients: | |
ingredients = ["eggs", "flour"] # default | |
return { | |
"type": "recipes", | |
"data": mcp_server.call_tool("get_recipe_by_ingredients", {"ingredients": ingredients}) | |
} | |
elif "image" in query.lower() or "show" in query.lower() or "look" in query.lower(): | |
# Extract recipe name | |
recipe_name = "Classic Pancakes" # default | |
for recipe in ["pancakes", "stir fry", "tomato soup", "chocolate cake"]: | |
if recipe in query.lower(): | |
recipe_name = recipe | |
break | |
return { | |
"type": "image", | |
"data": mcp_server.call_tool("get_recipe_image", {"recipe_name": recipe_name}) | |
} | |
elif "convert" in query.lower(): | |
# Extract amount and units - very simple | |
# Assume pattern: convert <number> <unit> to <unit> | |
words = query.split() | |
try: | |
amount = float(words[words.index("convert")+1]) | |
from_unit = words[words.index("convert")+2] | |
to_unit = words[words.index("to")+1] | |
except: | |
amount = 2 | |
from_unit = "cups" | |
to_unit = "ml" | |
return { | |
"type": "conversion", | |
"data": mcp_server.call_tool("convert_measurements", {"amount": amount, "from_unit": from_unit, "to_unit": to_unit}) | |
} | |
else: | |
# Fallback to database search | |
c = db_conn.cursor() | |
c.execute("SELECT * FROM recipes WHERE name LIKE ?", (f"%{query}%",)) | |
recipes = c.fetchall() | |
return { | |
"type": "db_recipes", | |
"data": recipes | |
} | |
# ------ Register Tools with MCP Server ------ | |
mcp_server.register_tool( | |
"get_recipe_by_ingredients", | |
get_recipe_by_ingredients, | |
"Find recipes based on available ingredients" | |
) | |
mcp_server.register_tool( | |
"get_recipe_image", | |
get_recipe_image, | |
"Generate an image of the finished recipe" | |
) | |
mcp_server.register_tool( | |
"convert_measurements", | |
convert_measurements, | |
"Convert cooking measurements between units" | |
) | |
# ------ Initialize System ------ | |
db_conn = init_recipe_db() | |
# ------ Gradio Interface ------ | |
def process_voice_command(audio): | |
"""Process voice command through the agent system""" | |
# Convert audio to text | |
query = speech_to_text(audio) | |
# Process query using agent logic | |
result = process_query(query, db_conn) | |
# Generate response text and image | |
response_text = "" | |
image = None | |
if result["type"] == "recipes": | |
recipes = result["data"]["recipes"] | |
response_text = f"Found {len(recipes)} recipes:\n" | |
for recipe in recipes: | |
response_text += f"- {recipe['name']} ({recipe['time']} mins, {recipe['difficulty']})\n" | |
elif result["type"] == "image": | |
image = result["data"] # This is a PIL image | |
response_text = "Here is an image of the recipe!" | |
elif result["type"] == "conversion": | |
conv = result["data"] | |
if "error" in conv: | |
response_text = f"Error: {conv['error']}" | |
else: | |
response_text = f"{conv['result']} {conv['unit']}" | |
elif result["type"] == "db_recipes": | |
recipes = result["data"] | |
if recipes: | |
response_text = f"Found {len(recipes)} recipes in database:\n" | |
for recipe in recipes: | |
response_text += f"- {recipe[1]} ({recipe[4]} mins)\n" | |
else: | |
response_text = "No recipes found." | |
else: | |
response_text = "I'm not sure how to help with that." | |
# Convert response to audio | |
sr, audio_data = text_to_speech(response_text) | |
# Return results: audio output, text, and image | |
return (sr, audio_data), response_text, image | |
# ------ Hugging Face Space UI ------ | |
with gr.Blocks(title="MCP Culinary Voice Assistant") as demo: | |
gr.Markdown("# 🧑🍳 MCP-Powered Culinary Voice Assistant") | |
gr.Markdown("Speak to your cooking assistant about recipes, conversions, and more!") | |
with gr.Row(): | |
with gr.Column(): | |
audio_input = gr.Audio(source="microphone", type="numpy", label="Speak to Chef Assistant") | |
submit_btn = gr.Button("Process Command", variant="primary") | |
with gr.Column(): | |
audio_output = gr.Audio(label="Assistant Response", interactive=False) | |
with gr.Row(): | |
text_output = gr.Textbox(label="Transcription", interactive=False) | |
image_output = gr.Image(label="Recipe Image", interactive=False) | |
submit_btn.click( | |
fn=process_voice_command, | |
inputs=[audio_input], | |
outputs=[audio_output, text_output, image_output] | |
) | |
gr.Examples( | |
examples=[ | |
["What can I make with eggs and flour?"], | |
["Show me how tomato soup looks"], | |
["Convert 2 cups to milliliters"], | |
["Find chocolate cake recipes"] | |
], | |
inputs=[text_output], | |
label="Example Queries" | |
) | |
if __name__ == "__main__": | |
demo.launch() |