Spaces:
Sleeping
Sleeping
# frontend.py | |
""" | |
Streamlit frontend for MCP Agent | |
Provides a chat interface for interacting with the MCP backend | |
""" | |
import streamlit as st | |
import asyncio | |
from backend import get_agent, MCPAgent | |
import time | |
# Page configuration | |
st.set_page_config( | |
page_title="MCP Agent Chat", | |
page_icon="π€", | |
layout="wide", | |
initial_sidebar_state="expanded" | |
) | |
# Custom CSS for better chat appearance | |
st.markdown(""" | |
<style> | |
.stChatMessage { | |
padding: 1rem; | |
border-radius: 0.5rem; | |
margin-bottom: 1rem; | |
} | |
.user-message { | |
background-color: #e3f2fd; | |
} | |
.assistant-message { | |
background-color: #f5f5f5; | |
} | |
.sidebar-info { | |
padding: 1rem; | |
background-color: #f0f2f6; | |
border-radius: 0.5rem; | |
margin-bottom: 1rem; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Initialize session state | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
st.session_state.history = [] # For agent history | |
st.session_state.agent_initialized = False | |
st.session_state.available_tools = [] | |
st.session_state.pending_query = None # For example queries | |
# Sidebar | |
with st.sidebar: | |
st.title("π€ MCP Agent") | |
st.markdown("---") | |
# Status indicator | |
if st.session_state.agent_initialized: | |
st.success("β Agent Connected") | |
# Show available tools | |
st.markdown("### π οΈ Available Tools") | |
available_tools = st.session_state.get("available_tools", []) | |
for tool in available_tools: | |
st.markdown(f"β’ `{tool}`") | |
else: | |
st.info("β³ Agent Initializing...") | |
st.markdown("---") | |
# Example queries | |
st.markdown("### π‘ Example Queries") | |
example_queries = [ | |
"What's the price of AAPL?", | |
"Show me the market summary", | |
"Get news about TSLA", | |
"Calculate 25 * 4", | |
"What's 100 divided by 7?", | |
"Add 456 and 789" | |
] | |
for query in example_queries: | |
if st.button(query, key=f"example_{query}"): | |
st.session_state.pending_query = query | |
st.rerun() | |
st.markdown("---") | |
# Clear chat button | |
if st.button("ποΈ Clear Chat", type="secondary"): | |
st.session_state.messages = [] | |
st.session_state.history = [] | |
st.rerun() | |
# Info section | |
st.markdown("### βΉοΈ About") | |
st.markdown(""" | |
This chat interface connects to: | |
- **Math Server**: Basic arithmetic operations | |
- **Stock Server**: Real-time market data | |
The agent uses LangChain and MCP to intelligently route your queries to the appropriate tools. | |
""") | |
# Main chat interface | |
st.title("π¬ MCP Agent Chat") | |
st.markdown("Ask me about stocks, math calculations, or general questions!") | |
# Initialize agent asynchronously | |
async def initialize_agent(): | |
"""Initialize the agent if not already done""" | |
if not st.session_state.agent_initialized: | |
agent = get_agent() | |
with st.spinner("π§ Initializing MCP servers..."): | |
try: | |
tools = await agent.initialize() | |
st.session_state.available_tools = tools | |
st.session_state.agent_initialized = True | |
return True | |
except Exception as e: | |
st.error(f"Failed to initialize agent: {str(e)}") | |
st.info("Please make sure the stock server is running: `python stock_server.py`") | |
return False | |
return True | |
# Process user message | |
async def process_user_message(user_input: str): | |
"""Process the user's message and get response from agent""" | |
agent = get_agent() | |
# Add user message to history | |
st.session_state.history.append({"role": "user", "content": user_input}) | |
try: | |
# Get response from agent | |
response = await agent.process_message(user_input, st.session_state.history) | |
# Add assistant response to history | |
st.session_state.history.append({"role": "assistant", "content": response}) | |
return response | |
except Exception as e: | |
return f"Error: {str(e)}" | |
# Display chat messages | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
# Process pending query from example buttons | |
if st.session_state.pending_query: | |
query = st.session_state.pending_query | |
st.session_state.pending_query = None # Clear it | |
# Add to messages | |
st.session_state.messages.append({"role": "user", "content": query}) | |
# Process the query | |
async def process_example(): | |
if not st.session_state.agent_initialized: | |
if not await initialize_agent(): | |
return "Failed to initialize agent. Please check the servers." | |
return await process_user_message(query) | |
# Get response | |
with st.spinner("Processing..."): | |
response = asyncio.run(process_example()) | |
# Add response to messages | |
st.session_state.messages.append({"role": "assistant", "content": response}) | |
# Rerun to display the new messages | |
st.rerun() | |
# Chat input | |
if prompt := st.chat_input("Type your message here..."): | |
# Add user message to chat | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
# Display user message | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
# Get and display assistant response | |
with st.chat_message("assistant"): | |
message_placeholder = st.empty() | |
# Run async function | |
async def get_response(): | |
# Initialize agent if needed | |
if not st.session_state.agent_initialized: | |
if not await initialize_agent(): | |
return "Failed to initialize agent. Please check the servers." | |
# Process message | |
return await process_user_message(prompt) | |
# Execute async function | |
with st.spinner("Thinking..."): | |
response = asyncio.run(get_response()) | |
message_placeholder.markdown(response) | |
st.session_state.messages.append({"role": "assistant", "content": response}) | |
# Auto-initialize agent on first load | |
if not st.session_state.agent_initialized: | |
with st.spinner("π§ Initializing agent..."): | |
asyncio.run(initialize_agent()) | |
# Footer | |
st.markdown("---") | |
st.markdown( | |
""" | |
<div style='text-align: center; color: #666;'> | |
Powered by LangChain, MCP, and Hugging Face π€ | |
</div> | |
""", | |
unsafe_allow_html=True | |
) |