Spaces:
Running
Running
import gradio as gr | |
import asyncio | |
import os | |
from typing import List, Tuple, Optional, Dict, Any | |
from datetime import datetime | |
import logging | |
import signal | |
import sys | |
import json | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
try: | |
from mcp_use import MCPClient | |
from langchain_mcp_adapters.client import MultiServerMCPClient | |
from langchain_community.tools.sleep.tool import SleepTool | |
from langchain_mcp_adapters.tools import load_mcp_tools | |
from langchain.agents import AgentExecutor, create_tool_calling_agent | |
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain_mistralai import ChatMistralAI | |
except ImportError as e: | |
logger.error(f"Import error: {e}") | |
raise | |
# 🤖 Helper pour appeler un coroutine dans un contexte synchrone | |
def sync_run(coro): | |
try: | |
loop = asyncio.get_running_loop() | |
return loop.run_until_complete(coro) | |
except RuntimeError: | |
return asyncio.run(coro) | |
# ConversationManager reste identique | |
class ConversationManager: | |
def __init__(self, max_history_pairs: int = 3, max_context_chars: int = 2000): | |
self.max_history_pairs = max_history_pairs | |
self.max_context_chars = max_context_chars | |
self.session_context = {} | |
def update_session_context(self, action: str, result: str): | |
self.session_context.update({ | |
'last_action': action, | |
'last_result': result[:500], | |
'timestamp': datetime.now().isoformat() | |
}) | |
def get_optimized_history(self, full_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]: | |
recent = full_history[-self.max_history_pairs:] if full_history else [] | |
if self.session_context: | |
msg = f"[SESSION_CONTEXT] Last action: {self.session_context.get('last_action','none')}" | |
recent.insert(0, ("system", msg)) | |
return recent | |
def get_context_summary(self) -> str: | |
if not self.session_context: | |
return "Browser session not active." | |
return f"Browser session active. Last action: {self.session_context.get('last_action')} at {self.session_context.get('timestamp')}" | |
class BrowserAgent: | |
def __init__(self, api_key: str): | |
self.api_key = api_key | |
self.client = None | |
self.session = None | |
self.session_context = None | |
self.agent_executor = None | |
self.model = None | |
self.initialized = False | |
self.available_tools = {} | |
self.system_prompt = "" | |
self.conversation_manager = ConversationManager() | |
async def generate_tools_prompt(self): | |
# identique à l’actuel | |
# … | |
return tools_prompt | |
async def get_system_prompt_with_tools(self): | |
base = """🌐 Browser Agent — Persistent Session & Optimized Memory | |
You are an intelligent browser automation agent (Playwright via MCP)... | |
""" | |
tools_section = await self.generate_tools_prompt() | |
return base + tools_section | |
async def initialize_async(self): | |
mistral_key = os.getenv("mistralkey") | |
if not mistral_key: | |
raise ValueError("Mistral API key missing") | |
self.model = ChatMistralAI(model="mistral-small-latest", api_key=mistral_key) | |
self.client = MultiServerMCPClient({ | |
"browser": { | |
"command": "npx", | |
"args": ["@playwright/mcp@latest", "--browser", "chromium"], | |
"transport": "stdio" | |
} | |
}) | |
self.session_context = self.client.session("browser") | |
self.session = await self.session_context.__aenter__() | |
tools = await load_mcp_tools(self.session) | |
tools.append(SleepTool(description="Wait 4 seconds")) | |
self.available_tools = {t.name: t for t in tools} | |
install = self.available_tools.get("browser_install") | |
if install: | |
try: | |
await install.arun({}) | |
except Exception: | |
pass | |
self.system_prompt = await self.get_system_prompt_with_tools() | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", self.system_prompt), | |
MessagesPlaceholder(variable_name="chat_history"), | |
("human", "{input}"), | |
MessagesPlaceholder(variable_name="agent_scratchpad"), | |
]) | |
agent = create_tool_calling_agent( | |
llm=self.model, tools=tools, prompt=prompt | |
) | |
self.agent_executor = AgentExecutor( | |
agent=agent, tools=tools, verbose=True, | |
max_iterations=15, early_stopping_method="generate", | |
handle_parsing_errors=True, return_intermediate_steps=True, | |
max_execution_time=180 | |
) | |
self.initialized = True | |
return True | |
async def cleanup_async(self): | |
if self.session_context: | |
await self.session_context.__aexit__(None, None, None) | |
self.session_context = None | |
if self.client: | |
await self.client.close() | |
self.client = None | |
self.initialized = False | |
async def process_query_async(self, query: str, chat_history: List[Tuple[str, str]]) -> str: | |
opt_hist = self.conversation_manager.get_optimized_history(chat_history) | |
msgs = [] | |
for h, a in opt_hist: | |
if h: msgs.append(("human", h)) | |
if a: msgs.append(("ai", a)) | |
summary = self.conversation_manager.get_context_summary() | |
enhanced = f"{query}\n\n[SESSION_INFO]: {summary}" | |
resp = await self.agent_executor.ainvoke({ | |
"input": enhanced, | |
"chat_history": msgs | |
}) | |
out = resp["output"] | |
self.conversation_manager.update_session_context(query, out) | |
return out | |
# Global | |
agent: Optional[BrowserAgent] = None | |
def initialize_agent_sync(api_key: str) -> str: | |
global agent | |
if not api_key.strip(): | |
return "❌ Clé Mistral requise" | |
try: | |
if agent and agent.initialized: | |
sync_run(agent.cleanup_async()) | |
agent = BrowserAgent(api_key) | |
sync_run(agent.initialize_async()) | |
info = agent.system_prompt[:1000] | |
return f"✅ Agent initialisé !\n\n{info}..." | |
except Exception as e: | |
return f"❌ Échec init. {e}" | |
def process_message_sync(message: str, history: List[List[str]]) -> Tuple[str, List[List[str]]]: | |
global agent | |
if not agent or not agent.initialized: | |
err = "❌ Agent non initialisé." | |
history.append([message, err]) | |
return "", history | |
if not message.strip(): | |
err = "Veuillez entrer un message." | |
history.append([message, err]) | |
return "", history | |
agent_hist = [(m[0], m[1]) for m in history] | |
stats_before = agent.conversation_manager.get_optimized_history(agent_hist) | |
try: | |
resp = sync_run(agent.process_query_async(message, agent_hist)) | |
history.append([message, resp]) | |
return "", history | |
except Exception as e: | |
err = f"❌ Erreur: {e}" | |
history.append([message, err]) | |
return "", history | |
def get_token_stats_sync(history: List[List[str]]) -> str: | |
global agent | |
if not agent or not agent.initialized: | |
return "Agent non initialisé" | |
orig = len(history) | |
opt = len(agent.conversation_manager.get_optimized_history([(m[0],m[1]) for m in history])) | |
# tests estimés tokens | |
return f"📊 Paires: {orig} → {opt}" | |
def create_interface(): | |
with gr.Blocks(title="MCP Browser Agent", theme=gr.themes.Soft()) as interface: | |
gr.Markdown("# 🌐 MCP Browser Agent") | |
api_input = gr.Textbox(label="Clé Mistral", type="password") | |
btn_init = gr.Button("Initialiser") | |
out_init = gr.Textbox(label="Statut", interactive=False) | |
btn_init.click(fn=initialize_agent_sync, inputs=[api_input], outputs=[out_init]) | |
chatbot = gr.Chatbot(label="Conversation") | |
msg_input = gr.Textbox(placeholder="Écris ton message...", lines=2) | |
btn_send = gr.Button("Envoyer") | |
btn_send.click(fn=process_message_sync, inputs=[msg_input, chatbot], outputs=[msg_input, chatbot]) | |
msg_input.submit(fn=process_message_sync, inputs=[msg_input, chatbot], outputs=[msg_input, chatbot]) | |
btn_stats = gr.Button("Stats tokens") | |
out_stats = gr.Textbox(label="Token Stats", interactive=False) | |
btn_stats.click(fn=get_token_stats_sync, inputs=[chatbot], outputs=[out_stats]) | |
return interface | |
def signal_handler(signum, frame): | |
if agent and agent.initialized: | |
sync_run(agent.cleanup_async()) | |
sys.exit(0) | |
if __name__ == "__main__": | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
interface = create_interface() | |
interface.launch(server_name="0.0.0.0", server_port=7860) | |