|
'''RSS MCP server demonstration client app.''' |
|
|
|
import os |
|
import logging |
|
from pathlib import Path |
|
from logging.handlers import RotatingFileHandler |
|
|
|
import gradio as gr |
|
import client.interface as interface |
|
from client.mcp_client import MCPClientWrapper |
|
from client.anthropic_bridge import AnthropicBridge |
|
|
|
|
|
Path('logs').mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
logger = logging.getLogger() |
|
|
|
logging.basicConfig( |
|
handlers=[RotatingFileHandler( |
|
'logs/rss_client.log', |
|
maxBytes=100000, |
|
backupCount=10, |
|
mode='w' |
|
)], |
|
level=logging.INFO, |
|
format='%(levelname)s - %(name)s - %(message)s' |
|
) |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
RSS_CLIENT = MCPClientWrapper( |
|
'https://agents-mcp-hackathon-rss-mcp-server.hf.space/gradio_api/mcp/sse' |
|
) |
|
|
|
|
|
BRIDGE = AnthropicBridge( |
|
RSS_CLIENT, |
|
api_key=os.environ['ANTHROPIC_API_KEY'] |
|
) |
|
|
|
async def send_message(message: str, chat_history: list) -> str: |
|
'''Submits user message to agent. |
|
|
|
Args: |
|
message: the new message from the user as a string |
|
chat_history: list containing conversation history where each element is |
|
a dictionary with keys 'role' and 'content' |
|
|
|
Returns |
|
New chat history with model's response to user added. |
|
''' |
|
|
|
function_logger = logging.getLogger(__name__ + '.submit_input') |
|
function_logger.info('Submitting user message: %s', message) |
|
|
|
chat_history.append({"role": "user", "content": message}) |
|
chat_history = await interface.agent_input(BRIDGE, chat_history) |
|
|
|
return '', chat_history |
|
|
|
|
|
with gr.Blocks(title='MCP RSS client') as demo: |
|
gr.Markdown('# Agentic RSS reader') |
|
gr.Markdown(""" |
|
Uses sister Space |
|
[RSS feed reader](https://huggingface.co/spaces/Agents-MCP-Hackathon/rss-mcp-server) |
|
via MCP. Click 'Connect to MCP server' to get started. Check out the |
|
[main project repo on GitHub](https://github.com/gperdrizet/MCP-hackathon/tree/main). |
|
Both Spaces by [George Perdrizet](https://www.linkedin.com/in/gperdrizet/). |
|
""") |
|
|
|
connect_btn = gr.Button('Connect to MCP server') |
|
status = gr.Textbox(label='MCP server tool dump', interactive=False, lines=4) |
|
|
|
chatbot = gr.Chatbot( |
|
value=[], |
|
height=800, |
|
type='messages', |
|
show_copy_button=True |
|
) |
|
|
|
msg = gr.Textbox( |
|
'Are there any new posts on Hacker News?', |
|
label='Ask about content or articles on a site or platform', |
|
placeholder='Is there anything new on Hacker News?', |
|
scale=4 |
|
) |
|
|
|
connect_btn.click(RSS_CLIENT.list_tools, outputs=status) |
|
|
|
msg.submit( |
|
send_message, |
|
[msg, chatbot], |
|
[msg, chatbot] |
|
) |
|
|
|
if __name__ == '__main__': |
|
|
|
current_directory = os.getcwd() |
|
|
|
if 'pyrite' in current_directory: |
|
demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|
|
else: |
|
demo.launch() |
|
|