Web_scrapper / app.py
Zelyanoth's picture
Update app.py
481b5bc verified
raw
history blame
8.84 kB
import gradio as gr
import asyncio
import os
from typing import List, Tuple, Optional, Dict, Any
from datetime import datetime
import logging
import signal
import sys
import json
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
from mcp_use import MCPClient
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_community.tools.sleep.tool import SleepTool
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_mistralai import ChatMistralAI
except ImportError as e:
logger.error(f"Import error: {e}")
raise
# 🤖 Helper pour appeler un coroutine dans un contexte synchrone
def sync_run(coro):
try:
loop = asyncio.get_running_loop()
return loop.run_until_complete(coro)
except RuntimeError:
return asyncio.run(coro)
# ConversationManager reste identique
class ConversationManager:
def __init__(self, max_history_pairs: int = 3, max_context_chars: int = 2000):
self.max_history_pairs = max_history_pairs
self.max_context_chars = max_context_chars
self.session_context = {}
def update_session_context(self, action: str, result: str):
self.session_context.update({
'last_action': action,
'last_result': result[:500],
'timestamp': datetime.now().isoformat()
})
def get_optimized_history(self, full_history: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
recent = full_history[-self.max_history_pairs:] if full_history else []
if self.session_context:
msg = f"[SESSION_CONTEXT] Last action: {self.session_context.get('last_action','none')}"
recent.insert(0, ("system", msg))
return recent
def get_context_summary(self) -> str:
if not self.session_context:
return "Browser session not active."
return f"Browser session active. Last action: {self.session_context.get('last_action')} at {self.session_context.get('timestamp')}"
class BrowserAgent:
def __init__(self, api_key: str):
self.api_key = api_key
self.client = None
self.session = None
self.session_context = None
self.agent_executor = None
self.model = None
self.initialized = False
self.available_tools = {}
self.system_prompt = ""
self.conversation_manager = ConversationManager()
async def generate_tools_prompt(self):
# identique à l’actuel
# …
return tools_prompt
async def get_system_prompt_with_tools(self):
base = """🌐 Browser Agent — Persistent Session & Optimized Memory
You are an intelligent browser automation agent (Playwright via MCP)...
"""
tools_section = await self.generate_tools_prompt()
return base + tools_section
async def initialize_async(self):
mistral_key = os.getenv("mistralkey")
if not mistral_key:
raise ValueError("Mistral API key missing")
self.model = ChatMistralAI(model="mistral-small-latest", api_key=mistral_key)
self.client = MultiServerMCPClient({
"browser": {
"command": "npx",
"args": ["@playwright/mcp@latest", "--browser", "chromium"],
"transport": "stdio"
}
})
self.session_context = self.client.session("browser")
self.session = await self.session_context.__aenter__()
tools = await load_mcp_tools(self.session)
tools.append(SleepTool(description="Wait 4 seconds"))
self.available_tools = {t.name: t for t in tools}
install = self.available_tools.get("browser_install")
if install:
try:
await install.arun({})
except Exception:
pass
self.system_prompt = await self.get_system_prompt_with_tools()
prompt = ChatPromptTemplate.from_messages([
("system", self.system_prompt),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_tool_calling_agent(
llm=self.model, tools=tools, prompt=prompt
)
self.agent_executor = AgentExecutor(
agent=agent, tools=tools, verbose=True,
max_iterations=15, early_stopping_method="generate",
handle_parsing_errors=True, return_intermediate_steps=True,
max_execution_time=180
)
self.initialized = True
return True
async def cleanup_async(self):
if self.session_context:
await self.session_context.__aexit__(None, None, None)
self.session_context = None
if self.client:
await self.client.close()
self.client = None
self.initialized = False
async def process_query_async(self, query: str, chat_history: List[Tuple[str, str]]) -> str:
opt_hist = self.conversation_manager.get_optimized_history(chat_history)
msgs = []
for h, a in opt_hist:
if h: msgs.append(("human", h))
if a: msgs.append(("ai", a))
summary = self.conversation_manager.get_context_summary()
enhanced = f"{query}\n\n[SESSION_INFO]: {summary}"
resp = await self.agent_executor.ainvoke({
"input": enhanced,
"chat_history": msgs
})
out = resp["output"]
self.conversation_manager.update_session_context(query, out)
return out
# Global
agent: Optional[BrowserAgent] = None
def initialize_agent_sync(api_key: str) -> str:
global agent
if not api_key.strip():
return "❌ Clé Mistral requise"
try:
if agent and agent.initialized:
sync_run(agent.cleanup_async())
agent = BrowserAgent(api_key)
sync_run(agent.initialize_async())
info = agent.system_prompt[:1000]
return f"✅ Agent initialisé !\n\n{info}..."
except Exception as e:
return f"❌ Échec init. {e}"
def process_message_sync(message: str, history: List[List[str]]) -> Tuple[str, List[List[str]]]:
global agent
if not agent or not agent.initialized:
err = "❌ Agent non initialisé."
history.append([message, err])
return "", history
if not message.strip():
err = "Veuillez entrer un message."
history.append([message, err])
return "", history
agent_hist = [(m[0], m[1]) for m in history]
stats_before = agent.conversation_manager.get_optimized_history(agent_hist)
try:
resp = sync_run(agent.process_query_async(message, agent_hist))
history.append([message, resp])
return "", history
except Exception as e:
err = f"❌ Erreur: {e}"
history.append([message, err])
return "", history
def get_token_stats_sync(history: List[List[str]]) -> str:
global agent
if not agent or not agent.initialized:
return "Agent non initialisé"
orig = len(history)
opt = len(agent.conversation_manager.get_optimized_history([(m[0],m[1]) for m in history]))
# tests estimés tokens
return f"📊 Paires: {orig}{opt}"
def create_interface():
with gr.Blocks(title="MCP Browser Agent", theme=gr.themes.Soft()) as interface:
gr.Markdown("# 🌐 MCP Browser Agent")
api_input = gr.Textbox(label="Clé Mistral", type="password")
btn_init = gr.Button("Initialiser")
out_init = gr.Textbox(label="Statut", interactive=False)
btn_init.click(fn=initialize_agent_sync, inputs=[api_input], outputs=[out_init])
chatbot = gr.Chatbot(label="Conversation")
msg_input = gr.Textbox(placeholder="Écris ton message...", lines=2)
btn_send = gr.Button("Envoyer")
btn_send.click(fn=process_message_sync, inputs=[msg_input, chatbot], outputs=[msg_input, chatbot])
msg_input.submit(fn=process_message_sync, inputs=[msg_input, chatbot], outputs=[msg_input, chatbot])
btn_stats = gr.Button("Stats tokens")
out_stats = gr.Textbox(label="Token Stats", interactive=False)
btn_stats.click(fn=get_token_stats_sync, inputs=[chatbot], outputs=[out_stats])
return interface
def signal_handler(signum, frame):
if agent and agent.initialized:
sync_run(agent.cleanup_async())
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
interface = create_interface()
interface.launch(server_name="0.0.0.0", server_port=7860)