File size: 3,187 Bytes
3454584 098ee5c 5531e98 57d39e3 3454584 12907b9 3454584 098ee5c 3454584 098ee5c 3454584 098ee5c a61bafb 098ee5c 12907b9 9889694 3454584 098ee5c 3454584 9889694 3454584 c2d6c32 9889694 c2d6c32 3454584 5531e98 d64fafd 3454584 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
"""
Core logic for the WellBe+ multi‑context AI assistant.
This module handles MCP server instantiation, model selection, and single‑shot
question answering. It is intentionally agnostic of any UI layer so that it can
be re‑used from a command‑line script, a notebook, or the Gradio front‑end.
"""
from __future__ import annotations
import asyncio
from copy import deepcopy
from typing import Tuple, Union, Optional
import os
import openai
from agents import Agent, Runner
from agents.extensions.models.litellm_model import LitellmModel
from agents.mcp.server import MCPServerStdio, MCPServerStreamableHttp, MCPServerSse
from config import PROMPT_TEMPLATE, MCP_CONFIGS
__all__ = [
"initialize_mcp_servers",
"select_model",
"answer_question",
"answer_sync",
]
# Global variables to store initialized servers
_healthcare_server: Optional[MCPServerStreamableHttp] = None
_whoop_server: Optional[Union[MCPServerStdio, str]] = None
def initialize_mcp_servers(whoop_email: str, whoop_password: str) -> None:
"""Initialize and cache the MCP servers (once only)."""
global _healthcare_server, _whoop_server
if _healthcare_server is None:
cfg = deepcopy(MCP_CONFIGS)
os.environ["WHOOP_EMAIL"] = whoop_email
os.environ["WHOOP_PASSWORD"] = whoop_password
_healthcare_server = MCPServerStreamableHttp(
params=cfg["healthcare-mcp-public"], name="Healthcare MCP Server"
)
# _whoop_server = MCPServerStdio(params=cfg["whoop"], name="Whoop MCP Server")
_whoop_server = MCPServerSse(
params={
"url": "https://agents-mcp-hackathon-whoop-mcp-server.hf.space/gradio_api/mcp/sse"
},
name="Whoop MCP Server"
)
def select_model() -> Union[str, LitellmModel]:
return "o3-mini"
async def answer_question(
question: str,
openai_key: str,
whoop_email: str,
whoop_password: str,
) -> str:
"""Run the WellBe+ agent on a single question and return the assistant reply."""
initialize_mcp_servers(whoop_email, whoop_password)
if _healthcare_server is None:
return "**Error:** MCP servers not initialized."
async with _healthcare_server as hserver, _whoop_server as wserver:
agent = Agent(
name="WellBe+ Assistant",
instructions=PROMPT_TEMPLATE,
model=select_model(),
mcp_servers=[
hserver,
wserver
],
)
result = await Runner.run(agent, question)
return result.final_output
def answer_sync(question: str, openai_key: str, email: str, password: str) -> str:
"""Blocking wrapper around :func:`answer_question`."""
api_key = openai_key or os.getenv("OPENAI_API_KEY")
openai.api_key = api_key
os.environ["OPENAI_API_KEY"] = api_key
if not question.strip():
return "Please enter a question."
try:
return asyncio.run(
answer_question(
question, openai_key.strip(), email.strip(), password.strip()
)
)
except Exception as exc: # noqa: BLE001
return f"**Error:** {exc}"
|