""" Core logic for the WellBe+ multi‑context AI assistant. This module handles MCP server instantiation, model selection, and single‑shot question answering. It is intentionally agnostic of any UI layer so that it can be re‑used from a command‑line script, a notebook, or the Gradio front‑end. """ from __future__ import annotations import asyncio from copy import deepcopy from typing import Tuple, Union, Optional import os import openai from agents import Agent, Runner from agents.extensions.models.litellm_model import LitellmModel from agents.mcp.server import MCPServerStreamableHttp, MCPServerSse from config import PROMPT_TEMPLATE, MCP_CONFIGS __all__ = [ "initialize_mcp_servers", "select_model", "answer_question", "answer_sync", ] # Global variables to store initialized servers _healthcare_server: Optional[Union[MCPServerSse, str]] = None _whoop_server: Optional[Union[MCPServerSse, str]] = None def initialize_mcp_servers(whoop_email: str, whoop_password: str) -> None: """Initialize and cache the MCP servers (once only).""" global _healthcare_server, _whoop_server if _healthcare_server is None: cfg = deepcopy(MCP_CONFIGS) os.environ["WHOOP_EMAIL"] = whoop_email os.environ["WHOOP_PASSWORD"] = whoop_password _healthcare_server = MCPServerSse( params={ "url": "https://agents-mcp-hackathon-healthcare-mcp-public.hf.space/gradio_api/mcp/sse" }, name="Healthcare MCP Server" ) _whoop_server = MCPServerSse( params={ "url": "https://agents-mcp-hackathon-whoop-mcp-server.hf.space/gradio_api/mcp/sse" }, name="Whoop MCP Server" ) def select_model() -> Union[str, LitellmModel]: return "o3-mini" async def answer_question( question: str, openai_key: str, whoop_email: str, whoop_password: str, ) -> str: """Run the WellBe+ agent on a single question and return the assistant reply.""" initialize_mcp_servers(whoop_email, whoop_password) if _healthcare_server is None: return "**Error:** MCP servers not initialized." async with _healthcare_server as hserver, _whoop_server as wserver: agent = Agent( name="WellBe+ Assistant", instructions=PROMPT_TEMPLATE, model=select_model(), mcp_servers=[ hserver, wserver ], ) result = await Runner.run(agent, question) return result.final_output def answer_sync(question: str, openai_key: str, email: str, password: str) -> str: """Blocking wrapper around :func:`answer_question`.""" api_key = openai_key or os.getenv("OPENAI_API_KEY") openai.api_key = api_key os.environ["OPENAI_API_KEY"] = api_key if not question.strip(): return "Please enter a question." try: return asyncio.run( answer_question( question, openai_key.strip(), email.strip(), password.strip() ) ) except Exception as exc: # noqa: BLE001 return f"**Error:** {exc}"