manu's picture
Update app.py
0a4c43d verified
raw
history blame
24.4 kB
# app.py — ColPali + MCP (search-only) + GPT-5 follow-up responses
# Images are injected by the app in new calls; no base64 is passed through MCP.
import os
import base64
import tempfile
from io import BytesIO
from urllib.request import urlretrieve
from typing import List, Tuple, Dict, Any, Optional
import gradio as gr
from gradio_pdf import PDF
import torch
from pdf2image import convert_from_path
from PIL import Image
from torch.utils.data import DataLoader
from tqdm import tqdm
from colpali_engine.models import ColQwen2, ColQwen2Processor
# Streaming Responses API
from openai import OpenAI
# =============================
# Globals & Config
# =============================
api_key_env = os.getenv("OPENAI_API_KEY", "").strip()
ds: List[torch.Tensor] = [] # page embeddings
images: List[Image.Image] = [] # PIL images in page order
current_pdf_path: Optional[str] = None
device_map = (
"cuda:0"
if torch.cuda.is_available()
else ("mps" if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available() else "cpu")
)
# =============================
# Load Model & Processor
# =============================
model = ColQwen2.from_pretrained(
"vidore/colqwen2-v1.0",
torch_dtype=torch.bfloat16,
device_map=device_map,
attn_implementation="flash_attention_2",
).eval()
processor = ColQwen2Processor.from_pretrained("vidore/colqwen2-v1.0")
# =============================
# Utilities
# =============================
def _ensure_model_device() -> str:
dev = (
"cuda:0"
if torch.cuda.is_available()
else ("mps" if getattr(torch.backends, "mps", None) and torch.backends.mps.is_available() else "cpu")
)
if str(model.device) != dev:
model.to(dev)
return dev
def encode_image_to_base64(image: Image.Image) -> str:
"""Encodes a PIL image to base64 (JPEG)."""
buffered = BytesIO()
image.save(buffered, format="JPEG")
return base64.b64encode(buffered.getvalue()).decode("utf-8")
# =============================
# Indexing Helpers
# =============================
def convert_files(pdf_path: str) -> List[Image.Image]:
"""Convert a single PDF path into a list of PIL Images (pages)."""
imgs = convert_from_path(pdf_path, thread_count=4)
if len(imgs) >= 800:
raise gr.Error("The number of images in the dataset should be less than 800.")
return imgs
def index_gpu(imgs: List[Image.Image]) -> str:
"""Embed a list of images (pages) with ColQwen2 (ColPali) and store in globals."""
global ds, images
device = _ensure_model_device()
# reset previous dataset
ds = []
images = imgs
dataloader = DataLoader(
images,
batch_size=4,
shuffle=False,
collate_fn=lambda x: processor.process_images(x).to(model.device),
)
for batch_doc in tqdm(dataloader, desc="Indexing pages"):
with torch.no_grad():
batch_doc = {k: v.to(device) for k, v in batch_doc.items()}
embeddings_doc = model(**batch_doc)
ds.extend(list(torch.unbind(embeddings_doc.to("cpu"))))
return f"Indexed {len(images)} pages successfully."
def index_from_path(pdf_path: str) -> str:
imgs = convert_files(pdf_path)
return index_gpu(imgs)
def index_from_url(url: str) -> Tuple[str, str]:
"""
Download a PDF from URL and index it.
Returns: (status_message, saved_pdf_path)
"""
tmp_dir = tempfile.mkdtemp(prefix="colpali_")
local_path = os.path.join(tmp_dir, "document.pdf")
urlretrieve(url, local_path)
status = index_from_path(local_path)
return status, local_path
# =============================
# Local Search (ColPali)
# =============================
def search(query: str, k: int = 5) -> List[int]:
"""
Search within a PDF document for the most relevant pages to answer a query and return the page indexes as a list.
MCP tool description:
- name: mcp_test_search
- description: Search within a PDF document for the most relevant pages to answer a query.
- input_schema:
type: object
properties:
query: {type: string, description: "User query in natural language."}
k: {type: integer, minimum: 1, maximum: 10, default: 5. description: "Number of top pages to retrieve."}
required: ["query"]
Args:
query (str): Natural-language question to search for.
k (int): Number of top results to return (1–10).
Returns:
indices (List[int]): Indices of the k most relevant pages
"""
global ds, images
if not images or not ds:
return []
k = max(1, min(int(k), len(images)))
device = _ensure_model_device()
with torch.no_grad():
batch_query = processor.process_queries([query]).to(model.device)
embeddings_query = model(**batch_query)
q_vecs = list(torch.unbind(embeddings_query.to("cpu")))
scores = processor.score(q_vecs, ds, device=device)
top_k_indices = scores[0].topk(k).indices.tolist()
print("[search]", query, top_k_indices)
return top_k_indices
def _build_image_parts_from_indices(indices: List[int]) -> List[Dict[str, Any]]:
"""Turn page indices into OpenAI vision content parts."""
parts: List[Dict[str, Any]] = []
seen = sorted({i for i in indices if 0 <= i < len(images)})
for idx in seen:
b64 = encode_image_to_base64(images[idx])
parts.append({
"type": "input_image",
"image_url": f"data:image/jpeg;base64,{b64}",
})
return parts
# =============================
# Agent System Prompt
# =============================
SYSTEM = (
"""
You are a PDF research agent with a single tool: mcp_test_search(query: string, k: int).
Act iteratively:
1) Split the user question into 1–4 focused sub-queries. Subqueries should be asked as natural language questions in the english language, not just keywords.
2) For each sub-query, call mcp_test_search (k=5 by default; increase to up to 10 if you need to go deep).
3) You will receive the output of mcp_test_search as a list of indices corresponding to page numbers. Stop generating once all the tool calls end. You will be fed the corresponding pages as images in a follow-up message.
4) Stop early when confident; otherwise run new search calls using the tool to find additional missing information. Use up to 5 rounds of iterations and 20 searches in total. If info is missing, try to continue searching using new keywords and queries.
Workflow:
• Use ONLY the provided images for grounding and cite as (p.<page>).
• If an answer is not present, say “Not found in the provided pages.”
Deliverable:
• Return a clear, standalone Markdown answer in the user's language. Include concise tables for lists of dates/items when useful, and cite the page numbers used for each fact.
"""
).strip()
# =============================
# MCP config (search-only)
# =============================
DEFAULT_MCP_SERVER_URL = "https://manu-mcp-test.hf.space/gradio_api/mcp/"
DEFAULT_MCP_SERVER_LABEL = "colpali_rag"
DEFAULT_ALLOWED_TOOLS = "mcp_test_search" # search-only; no get_pages
# =============================
# Streaming Agent (multi-round with previous_response_id)
# =============================
def stream_agent(question: str,
api_key: str,
model_name: str,
server_url: str,
server_label: str,
require_approval: str,
allowed_tools: str):
"""
Multi-round streaming:
• Seed: optional local ColPali search on the user question to attach initial pages.
• Each round: open a GPT-5 stream with *attached images* (if any).
• If the model calls mcp_test_search and returns indices, we end the stream and
start a NEW API call with previous_response_id + the requested pages attached.
"""
if not api_key:
yield "⚠️ **Please provide your OpenAI API key.**", "", ""
return
if not images or not ds:
yield "⚠️ **Index a PDF first in tab 1.**", "", ""
return
client = OpenAI(api_key=api_key)
# Optional seeding: attach some likely pages on round 1
try:
seed_indices = [] # search(question, k=5) or []
except Exception as e:
yield f"❌ Search failed: {e}", "", ""
return
log_lines = ["Log", f"[seed] indices={seed_indices}"]
prev_response_id: Optional[str] = None
# MCP tool routing (search-only)
tools = [{
"type": "mcp",
"server_label": server_label or DEFAULT_MCP_SERVER_LABEL,
"server_url": server_url or DEFAULT_MCP_SERVER_URL,
"allowed_tools": [t.strip() for t in (allowed_tools or DEFAULT_ALLOWED_TOOLS).split(",") if t.strip()],
"require_approval": require_approval or "never",
}]
# Shared mutable state for each round
round_state: Dict[str, Any] = {
"last_search_indices": [],
"final_text": "",
"summary_text": "",
}
def run_round(round_idx: int, attached_indices: List[int]):
"""
Stream one round. If tool results (indices) arrive, store them in round_state["last_search_indices"].
"""
nonlocal prev_response_id
round_state["last_search_indices"] = []
round_state["final_text"] = "" # reset final text
# round_state["summary_text"] = ""
# Build the user content for this round
parts: List[Dict[str, Any]] = []
if round_idx == 1:
parts.append({"type": "input_text", "text": question})
else:
parts.append({"type": "input_text", "text": "Continue reasoning with the newly attached pages. Remember you can ask further questions to the search tool."})
parts += _build_image_parts_from_indices(attached_indices)
if attached_indices:
pages_str = ", ".join(str(i + 1) for i in sorted(set(attached_indices)))
parts.append({"type": "input_text", "text": f"(Attached pages from round {round_idx}: {pages_str}). Ground your answer in these images, or query for new pages."})
# First call includes system; follow-ups use previous_response_id
if prev_response_id:
req_input = [{"role": "user", "content": parts}]
else:
req_input = [
{"role": "system", "content": SYSTEM},
{"role": "user", "content": parts},
]
req_kwargs = dict(
model=model_name,
input=req_input,
reasoning={"effort": "medium", "summary": "auto"},
tools=tools,
store=True, # persist conversation state on server
)
if prev_response_id:
req_kwargs["previous_response_id"] = prev_response_id
# Helper: parse a JSON array of ints from tool result text
def _maybe_parse_indices(chunk: str) -> List[int]:
import json, re
arrs = re.findall(r'\[[^\]]*\]', chunk)
for s in reversed(arrs):
try:
val = json.loads(s)
if isinstance(val, list) and all(isinstance(x, int) for x in val):
return sorted({x for x in val if 0 <= x < len(images)})
except Exception:
pass
return []
tool_result_buffer = "" # accumulate tool result deltas
try:
with client.responses.stream(**req_kwargs) as stream:
for event in stream:
etype = getattr(event, "type", "")
if etype == "response.output_text.delta":
round_state["final_text"] += event.delta
yield round_state["final_text"] or " ", round_state["summary_text"] or " ", "\n".join(log_lines[-400:])
elif etype == "response.reasoning_summary_text.delta":
round_state["summary_text"] += event.delta
yield round_state["final_text"] or " ", round_state["summary_text"] or " ", "\n".join(log_lines[-400:])
# Log tool call argument deltas (optional)
elif etype in ("response.mcp_call_arguments.delta", "response.tool_call_arguments.delta"):
delta = getattr(event, "delta", None)
if delta:
log_lines.append("[call] " + str(delta))
round_state["summary_text"] += "\nQuery call: " + event.delta + "\n"
yield round_state["final_text"] or " ", round_state["summary_text"] or " ", "\n".join(log_lines[-400:])
# Capture tool RESULT text and try to parse indices
elif etype.startswith("response.output_item.done"):
delta_text = getattr(event.item, "output", "")
if delta_text:
tool_result_buffer += str(delta_text)
parsed_now = _maybe_parse_indices(tool_result_buffer)
if parsed_now:
round_state["last_search_indices"] += parsed_now
log_lines.append(f"[tool-result] indices={parsed_now}")
yield round_state["final_text"] or " ", round_state["summary_text"] or " ", "\n".join(log_lines[-400:])
else:
print(etype)
# Finalize this response; remember ID for follow-ups
_final = stream.get_final_response()
try:
prev_response_id = getattr(_final, "id", None)
except Exception:
prev_response_id = None
# Emit one last update after stream ends
yield round_state["final_text"] or " ", round_state["summary_text"] or " ", "\n".join(log_lines[-400:])
except Exception as e:
log_lines.append(f"[round {round_idx}] stream error: {e}")
yield f"❌ {e}", round_state["summary_text"] or "", "\n".join(log_lines[-400:])
return
# Controller: iterate rounds; if the model searched, attach those pages next
max_rounds = 5
round_idx = 1
pending_indices = list(seed_indices)
while round_idx <= max_rounds:
print("Round ", round_idx, ", Indices: ", pending_indices)
for final_md, summary_md, log_md in run_round(round_idx, pending_indices):
yield final_md, summary_md, log_md
# If the model returned indices via the tool, use them in a fresh call
next_indices = round_state.get("last_search_indices") or []
if next_indices:
# Neighbor expansion for context
base = set(next_indices)
expanded = set(base)
for i in base:
expanded.add(i - 1)
expanded.add(i + 1)
expanded = {i for i in expanded if 0 <= i < len(images)}
pending_indices = sorted(expanded)
round_idx += 1
continue
# No further tool-driven retrieval → done
break
return
# =============================
# Gradio UI
# =============================
CUSTOM_CSS = """
:root {
--bg: #0e1117;
--panel: #111827;
--accent: #7c3aed;
--accent-2: #06b6d4;
--text: #e5e7eb;
--muted: #9ca3af;
--border: #1f2937;
}
.gradio-container {max-width: 1180px !important; margin: 0 auto !important;}
body {background: radial-gradient(1200px 600px at 20% -10%, rgba(124,58,237,.25), transparent 60%),
radial-gradient(1000px 500px at 120% 10%, rgba(6,182,212,.2), transparent 60%),
var(--bg) !important;}
.app-header {
display:flex; gap:16px; align-items:center; padding:20px 18px; margin:8px 0 12px;
border:1px solid var(--border); border-radius:20px;
background: linear-gradient(180deg, rgba(255,255,255,.02), rgba(255,255,255,.01));
box-shadow: 0 10px 30px rgba(0,0,0,.25), inset 0 1px 0 rgba(255,255,255,.05);
}
.app-header .icon {
width:48px; height:48px; display:grid; place-items:center; border-radius:14px;
background: linear-gradient(135deg, var(--accent), var(--accent-2));
color:white; font-size:26px;
}
.app-header h1 {font-size:22px; margin:0; color:var(--text); letter-spacing:.2px;}
.app-header p {margin:2px 0 0; color:var(--muted); font-size:14px;}
.card {
border:1px solid var(--border); border-radius:18px; padding:14px 16px;
background: linear-gradient(180deg, rgba(255,255,255,.02), rgba(255,255,255,.01));
box-shadow: 0 12px 28px rgba(0,0,0,.18), inset 0 1px 0 rgba(255,255,255,.04);
}
.gr-button-primary {border-radius:12px !important; font-weight:600;}
.gradio-container .tabs {border-radius:16px; overflow:hidden; border:1px solid var(--border);}
.markdown-wrap {min-height: 260px;}
.summary-wrap {min-height: 180px;}
.gr-markdown, .gr-prose { color: var(--text) !important; }
.gr-markdown h1, .gr-markdown h2, .gr-markdown h3 {color: #f3f4f6;}
.gr-markdown a {color: var(--accent-2); text-decoration: none;}
.gr-markdown a:hover {text-decoration: underline;}
.gr-markdown table {width: 100%; border-collapse: collapse; margin: 10px 0 16px;}
.gr-markdown th, .gr-markdown td {border: 1px solid var(--border); padding: 8px 10px;}
.gr-markdown th {background: rgba(255,255,255,.03);}
.gr-markdown pre, .gr-markdown code { background: #0b1220; color: #eaeaf0; border-radius: 12px; border: 1px solid #172036; }
.gr-markdown pre {padding: 12px 14px; overflow:auto;}
.gr-markdown blockquote { border-left: 4px solid var(--accent); padding: 6px 12px; margin: 8px 0; color: #d1d5db; background: rgba(124,58,237,.06); border-radius: 8px; }
.log-box { font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace; white-space: pre-wrap; color: #d1d5db; background:#0b1220; border:1px solid #172036; border-radius:14px; padding:12px; max-height:280px; overflow:auto; }
"""
def build_ui():
theme = gr.themes.Soft()
with gr.Blocks(title="ColPali PDF RAG + Follow-up Responses", theme=theme, css=CUSTOM_CSS) as demo:
gr.HTML(
"""
<div class="app-header">
<div class="icon">📚</div>
<div>
<h1>ColPali PDF Search + Streaming Agent (Follow-up Responses)</h1>
<p>Index PDFs with ColQwen2. The agent attaches images in follow-up GPT-5 calls; MCP is search-only.</p>
</div>
</div>
"""
)
# ---- Tab 1: Index & Preview
with gr.Tab("1) Index & Preview"):
with gr.Row():
with gr.Column(scale=1):
pdf_input = gr.File(label="Upload PDF", file_types=[".pdf"])
index_btn = gr.Button("📥 Index Uploaded PDF", variant="secondary")
url_box = gr.Textbox(
label="Or index from URL",
placeholder="https://example.com/file.pdf",
value="",
)
index_url_btn = gr.Button("🌐 Load From URL", variant="secondary")
status_box = gr.Textbox(label="Status", interactive=False)
with gr.Column(scale=2):
pdf_view = PDF(label="PDF Preview")
# wiring
def handle_upload(file):
global current_pdf_path
if file is None:
return "Please upload a PDF.", None
path = getattr(file, "name", file)
status = index_from_path(path)
current_pdf_path = path
return status, path
def handle_url(url: str):
global current_pdf_path
if not url or not url.lower().endswith(".pdf"):
return "Please provide a direct PDF URL ending in .pdf", None
status, path = index_from_url(url)
current_pdf_path = path
return status, path
index_btn.click(handle_upload, inputs=[pdf_input], outputs=[status_box, pdf_view])
index_url_btn.click(handle_url, inputs=[url_box], outputs=[status_box, pdf_view])
# ---- Tab 2: Ask (Direct — returns indices)
with gr.Tab("2) Direct Search"):
with gr.Row():
with gr.Column(scale=1):
query_box = gr.Textbox(placeholder="Enter your question…", label="Query", lines=4)
k_slider = gr.Slider(minimum=1, maximum=10, step=1, label="Number of results (k)", value=5)
search_button = gr.Button("🔍 Search", variant="primary")
with gr.Column(scale=2):
output_text = gr.Textbox(label="Indices (0-based)", lines=12, placeholder="[0, 1, 2, ...]")
search_button.click(search, inputs=[query_box, k_slider], outputs=[output_text])
# ---- Tab 3: Agent (Streaming)
with gr.Tab("3) Deep Search"):
with gr.Row(equal_height=True):
with gr.Column(scale=1):
with gr.Group():
question = gr.Textbox(
label="Your question",
placeholder="Enter your question…",
lines=8,
elem_classes=["card"],
)
run_btn = gr.Button("Run", variant="primary")
with gr.Accordion("Connection & Model", open=False, elem_classes=["card"]):
with gr.Row():
api_key_box = gr.Textbox(
label="OpenAI API Key",
placeholder="sk-...",
type="password",
value=api_key_env,
)
model_box = gr.Dropdown(
label="Model",
choices=["gpt-5", "gpt-4.1", "gpt-4o"],
value="gpt-5",
)
with gr.Row():
server_url_box = gr.Textbox(
label="MCP Server URL (search-only)",
value=DEFAULT_MCP_SERVER_URL,
)
server_label_box = gr.Textbox(
label="MCP Server Label",
value=DEFAULT_MCP_SERVER_LABEL,
)
with gr.Row():
allowed_tools_box = gr.Textbox(
label="Allowed Tools (comma-separated)",
value=DEFAULT_ALLOWED_TOOLS,
)
require_approval_box = gr.Dropdown(
label="Require Approval",
choices=["never", "auto", "always"],
value="never",
)
with gr.Column(scale=3):
with gr.Tab("Answer"):
final_md = gr.Markdown(value="", elem_classes=["card", "markdown-wrap"])
with gr.Tab("Live Reasoning"):
summary_md = gr.Markdown(value="", elem_classes=["card", "summary-wrap"])
with gr.Tab("Event Log"):
log_md = gr.Markdown(value="", elem_classes=["card", "log-box"])
run_btn.click(
stream_agent,
inputs=[
question,
api_key_box,
model_box,
server_url_box,
server_label_box,
require_approval_box,
allowed_tools_box,
],
outputs=[final_md, summary_md, log_md],
)
return demo
if __name__ == "__main__":
demo = build_ui()
# mcp_server=True exposes this app's MCP endpoint at /gradio_api/mcp/
# We keep the MCP server available, but the agent never uses MCP to pass images.
demo.queue(max_size=5).launch(debug=True, mcp_server=True)