rss-mcp-client / rss_client.py
gperdrizet's picture
Improvements to the UI.
33412db verified
'''RSS MCP server demonstration client app.'''
import os
import logging
from pathlib import Path
from logging.handlers import RotatingFileHandler
import gradio as gr
import client.interface as interface
from client.mcp_client import MCPClientWrapper
from client.anthropic_bridge import AnthropicBridge
# Make sure log directory exists
Path('logs').mkdir(parents=True, exist_ok=True)
# Set-up logger
logger = logging.getLogger()
logging.basicConfig(
handlers=[RotatingFileHandler(
'logs/rss_client.log',
maxBytes=100000,
backupCount=10,
mode='w'
)],
level=logging.INFO,
format='%(levelname)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Handle MCP server connection and interactions
RSS_CLIENT = MCPClientWrapper(
'https://agents-mcp-hackathon-rss-mcp-server.hf.space/gradio_api/mcp/sse'
)
# Handles Anthropic API I/O
BRIDGE = AnthropicBridge(
RSS_CLIENT,
api_key=os.environ['ANTHROPIC_API_KEY']
)
async def send_message(message: str, chat_history: list) -> str:
'''Submits user message to agent.
Args:
message: the new message from the user as a string
chat_history: list containing conversation history where each element is
a dictionary with keys 'role' and 'content'
Returns
New chat history with model's response to user added.
'''
function_logger = logging.getLogger(__name__ + '.submit_input')
function_logger.info('Submitting user message: %s', message)
chat_history.append({"role": "user", "content": message})
chat_history = await interface.agent_input(BRIDGE, chat_history)
return '', chat_history
with gr.Blocks(title='MCP RSS client') as demo:
gr.Markdown('# Agentic RSS reader')
gr.Markdown("""
Uses sister Space
[RSS feed reader](https://huggingface.co/spaces/Agents-MCP-Hackathon/rss-mcp-server)
via MCP. Click 'Connect to MCP server' to get started. Check out the
[main project repo on GitHub](https://github.com/gperdrizet/MCP-hackathon/tree/main).
Both Spaces by [George Perdrizet](https://www.linkedin.com/in/gperdrizet/).
""")
connect_btn = gr.Button('Connect to MCP server')
status = gr.Textbox(label='MCP server tool dump', interactive=False, lines=4)
chatbot = gr.Chatbot(
value=[],
height=800,
type='messages',
show_copy_button=True
)
msg = gr.Textbox(
'Are there any new posts on Hacker News?',
label='Ask about content or articles on a site or platform',
placeholder='Is there anything new on Hacker News?',
scale=4
)
connect_btn.click(RSS_CLIENT.list_tools, outputs=status) # pylint: disable=no-member
msg.submit( # pylint: disable=no-member
send_message,
[msg, chatbot],
[msg, chatbot]
)
if __name__ == '__main__':
current_directory = os.getcwd()
if 'pyrite' in current_directory:
demo.launch(server_name="0.0.0.0", server_port=7860)
else:
demo.launch()