Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import tempfile | |
import uuid | |
from datetime import datetime | |
from typing import List, Dict, Any, Optional | |
import json | |
import asyncio | |
from dataclasses import dataclass, asdict | |
import logging | |
# Document processing imports | |
import PyPDF2 | |
import pandas as pd | |
from docx import Document | |
from pptx import Presentation | |
import markdown | |
# ML/AI imports | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain.schema import Document as LCDocument | |
from huggingface_hub import InferenceClient | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# MCP Message Structure | |
class MCPMessage: | |
sender: str | |
receiver: str | |
type: str | |
trace_id: str | |
payload: Dict[str, Any] | |
timestamp: str = None | |
def __post_init__(self): | |
if self.timestamp is None: | |
self.timestamp = datetime.now().isoformat() | |
def to_dict(self): | |
return asdict(self) | |
# MCP Communication Layer | |
class MCPCommunicator: | |
def __init__(self): | |
self.message_queue = asyncio.Queue() | |
self.subscribers = {} | |
async def send_message(self, message: MCPMessage): | |
logger.info(f"MCP: {message.sender} -> {message.receiver}: {message.type}") | |
await self.message_queue.put(message) | |
async def receive_message(self, agent_name: str) -> MCPMessage: | |
while True: | |
message = await self.message_queue.get() | |
if message.receiver == agent_name: | |
return message | |
# Re-queue if not for this agent | |
await self.message_queue.put(message) | |
# Global MCP instance | |
mcp = MCPCommunicator() | |
# Base Agent Class | |
class BaseAgent: | |
def __init__(self, name: str): | |
self.name = name | |
self.mcp = mcp | |
async def send_mcp_message(self, receiver: str, msg_type: str, payload: Dict[str, Any], trace_id: str): | |
message = MCPMessage( | |
sender=self.name, | |
receiver=receiver, | |
type=msg_type, | |
trace_id=trace_id, | |
payload=payload | |
) | |
await self.mcp.send_message(message) | |
async def receive_mcp_message(self) -> MCPMessage: | |
return await self.mcp.receive_message(self.name) | |
# Document Ingestion Agent | |
class IngestionAgent(BaseAgent): | |
def __init__(self): | |
super().__init__("IngestionAgent") | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200, | |
length_function=len, | |
) | |
def parse_pdf(self, file_path: str) -> str: | |
"""Parse PDF file and extract text""" | |
try: | |
with open(file_path, 'rb') as file: | |
pdf_reader = PyPDF2.PdfReader(file) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
return text | |
except Exception as e: | |
logger.error(f"Error parsing PDF: {e}") | |
return "" | |
def parse_docx(self, file_path: str) -> str: | |
"""Parse DOCX file and extract text""" | |
try: | |
doc = Document(file_path) | |
text = "" | |
for paragraph in doc.paragraphs: | |
text += paragraph.text + "\n" | |
return text | |
except Exception as e: | |
logger.error(f"Error parsing DOCX: {e}") | |
return "" | |
def parse_pptx(self, file_path: str) -> str: | |
"""Parse PPTX file and extract text""" | |
try: | |
prs = Presentation(file_path) | |
text = "" | |
for slide_num, slide in enumerate(prs.slides, 1): | |
text += f"Slide {slide_num}:\n" | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
text += shape.text + "\n" | |
text += "\n" | |
return text | |
except Exception as e: | |
logger.error(f"Error parsing PPTX: {e}") | |
return "" | |
def parse_csv(self, file_path: str) -> str: | |
"""Parse CSV file and convert to text""" | |
try: | |
df = pd.read_csv(file_path) | |
return df.to_string() | |
except Exception as e: | |
logger.error(f"Error parsing CSV: {e}") | |
return "" | |
def parse_txt_md(self, file_path: str) -> str: | |
"""Parse TXT or MD file""" | |
try: | |
with open(file_path, 'r', encoding='utf-8') as file: | |
content = file.read() | |
# If markdown, convert to plain text | |
if file_path.lower().endswith('.md'): | |
content = markdown.markdown(content) | |
return content | |
except Exception as e: | |
logger.error(f"Error parsing TXT/MD: {e}") | |
return "" | |
async def process_documents(self, files: List[str], trace_id: str) -> List[LCDocument]: | |
"""Process uploaded documents and return chunked documents""" | |
all_documents = [] | |
for file_path in files: | |
file_ext = os.path.splitext(file_path)[1].lower() | |
filename = os.path.basename(file_path) | |
# Parse based on file extension | |
if file_ext == '.pdf': | |
content = self.parse_pdf(file_path) | |
elif file_ext == '.docx': | |
content = self.parse_docx(file_path) | |
elif file_ext == '.pptx': | |
content = self.parse_pptx(file_path) | |
elif file_ext == '.csv': | |
content = self.parse_csv(file_path) | |
elif file_ext in ['.txt', '.md']: | |
content = self.parse_txt_md(file_path) | |
else: | |
logger.warning(f"Unsupported file type: {file_ext}") | |
continue | |
if content.strip(): | |
# Split content into chunks | |
chunks = self.text_splitter.split_text(content) | |
# Create LangChain documents | |
for i, chunk in enumerate(chunks): | |
doc = LCDocument( | |
page_content=chunk, | |
metadata={ | |
"source": filename, | |
"chunk_id": i, | |
"file_type": file_ext | |
} | |
) | |
all_documents.append(doc) | |
return all_documents | |
# Retrieval Agent | |
class RetrievalAgent(BaseAgent): | |
def __init__(self): | |
super().__init__("RetrievalAgent") | |
self.embeddings = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-MiniLM-L6-v2" | |
) | |
self.vector_store = None | |
async def create_vector_store(self, documents: List[LCDocument], trace_id: str): | |
"""Create vector store from documents""" | |
try: | |
if documents: | |
self.vector_store = FAISS.from_documents(documents, self.embeddings) | |
logger.info(f"Created vector store with {len(documents)} documents") | |
else: | |
logger.warning("No documents to create vector store") | |
except Exception as e: | |
logger.error(f"Error creating vector store: {e}") | |
async def retrieve_relevant_chunks(self, query: str, k: int = 5, trace_id: str = None) -> List[Dict]: | |
"""Retrieve relevant chunks for a query""" | |
if not self.vector_store: | |
return [] | |
try: | |
# Similarity search | |
docs = self.vector_store.similarity_search(query, k=k) | |
# Format results | |
results = [] | |
for doc in docs: | |
results.append({ | |
"content": doc.page_content, | |
"source": doc.metadata.get("source", "Unknown"), | |
"chunk_id": doc.metadata.get("chunk_id", 0), | |
"file_type": doc.metadata.get("file_type", "Unknown") | |
}) | |
return results | |
except Exception as e: | |
logger.error(f"Error retrieving chunks: {e}") | |
return [] | |
# LLM Response Agent | |
class LLMResponseAgent(BaseAgent): | |
def __init__(self, hf_token: str = None): | |
super().__init__("LLMResponseAgent") | |
self.client = InferenceClient( | |
model="meta-llama/Llama-3.1-8B-Instruct", | |
token=hf_token | |
) | |
def format_prompt(self, query: str, context_chunks: List[Dict]) -> str: | |
"""Format prompt with context and query""" | |
context_text = "\n\n".join([ | |
f"Source: {chunk['source']}\nContent: {chunk['content']}" | |
for chunk in context_chunks | |
]) | |
prompt = f"""Based on the following context from uploaded documents, please answer the user's question. | |
Context: | |
{context_text} | |
Question: {query} | |
Please provide a comprehensive answer based on the context above. If the context doesn't contain enough information to fully answer the question, please mention what information is available and what might be missing. | |
Answer:""" | |
return prompt | |
async def generate_response(self, query: str, context_chunks: List[Dict], trace_id: str) -> str: | |
"""Generate response using LLM""" | |
try: | |
prompt = self.format_prompt(query, context_chunks) | |
# Generate response using HuggingFace Inference | |
response = self.client.text_generation( | |
prompt, | |
max_new_tokens=512, | |
temperature=0.7, | |
do_sample=True, | |
return_full_text=False | |
) | |
return response | |
except Exception as e: | |
logger.error(f"Error generating response: {e}") | |
return f"I apologize, but I encountered an error while generating the response: {str(e)}" | |
# Coordinator Agent | |
class CoordinatorAgent(BaseAgent): | |
def __init__(self, hf_token: str = None): | |
super().__init__("CoordinatorAgent") | |
self.ingestion_agent = IngestionAgent() | |
self.retrieval_agent = RetrievalAgent() | |
self.llm_agent = LLMResponseAgent(hf_token) | |
self.documents_processed = False | |
async def process_documents(self, files: List[str]) -> str: | |
"""Orchestrate document processing""" | |
trace_id = str(uuid.uuid4()) | |
try: | |
# Step 1: Ingestion | |
await self.send_mcp_message( | |
"IngestionAgent", | |
"DOCUMENT_INGESTION_REQUEST", | |
{"files": files}, | |
trace_id | |
) | |
documents = await self.ingestion_agent.process_documents(files, trace_id) | |
await self.send_mcp_message( | |
"RetrievalAgent", | |
"VECTOR_STORE_CREATE_REQUEST", | |
{"documents": len(documents)}, | |
trace_id | |
) | |
# Step 2: Create vector store | |
await self.retrieval_agent.create_vector_store(documents, trace_id) | |
self.documents_processed = True | |
return f"Successfully processed {len(documents)} document chunks from {len(files)} files." | |
except Exception as e: | |
logger.error(f"Error in document processing: {e}") | |
return f"Error processing documents: {str(e)}" | |
async def answer_query(self, query: str) -> tuple[str, List[Dict]]: | |
"""Orchestrate query answering""" | |
if not self.documents_processed: | |
return "Please upload and process documents first.", [] | |
trace_id = str(uuid.uuid4()) | |
try: | |
# Step 1: Retrieval | |
await self.send_mcp_message( | |
"RetrievalAgent", | |
"RETRIEVAL_REQUEST", | |
{"query": query}, | |
trace_id | |
) | |
context_chunks = await self.retrieval_agent.retrieve_relevant_chunks(query, k=5, trace_id=trace_id) | |
# Step 2: LLM Response | |
await self.send_mcp_message( | |
"LLMResponseAgent", | |
"LLM_GENERATION_REQUEST", | |
{"query": query, "context_chunks": len(context_chunks)}, | |
trace_id | |
) | |
response = await self.llm_agent.generate_response(query, context_chunks, trace_id) | |
return response, context_chunks | |
except Exception as e: | |
logger.error(f"Error in query processing: {e}") | |
return f"Error processing query: {str(e)}", [] | |
# Global coordinator instance | |
coordinator = None | |
def initialize_app(hf_token): | |
"""Initialize the application with HuggingFace token""" | |
global coordinator | |
coordinator = CoordinatorAgent(hf_token) | |
return "β Application initialized successfully!" | |
async def process_files(files): | |
"""Process uploaded files""" | |
if not coordinator: | |
return "β Please set your HuggingFace token first!" | |
if not files: | |
return "β Please upload at least one file." | |
# Save uploaded files to temporary directory | |
file_paths = [] | |
for file in files: | |
temp_path = os.path.join(tempfile.gettempdir(), file.name) | |
with open(temp_path, 'wb') as f: | |
f.write(file.read()) | |
file_paths.append(temp_path) | |
result = await coordinator.process_documents(file_paths) | |
# Cleanup temporary files | |
for path in file_paths: | |
try: | |
os.remove(path) | |
except: | |
pass | |
return result | |
async def answer_question(query, history): | |
"""Answer user question""" | |
if not coordinator: | |
return "β Please set your HuggingFace token first!" | |
if not query.strip(): | |
return "β Please enter a question." | |
response, context_chunks = await coordinator.answer_query(query) | |
# Format response with sources | |
if context_chunks: | |
sources = "\n\n**Sources:**\n" | |
for i, chunk in enumerate(context_chunks[:3], 1): # Show top 3 sources | |
sources += f"{i}. {chunk['source']} (Chunk {chunk['chunk_id']})\n" | |
response += sources | |
return response | |
# Custom CSS | |
custom_css = """ | |
/* Main container styling */ | |
.gradio-container { | |
max-width: 1200px !important; | |
margin: 0 auto !important; | |
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif !important; | |
} | |
/* Header styling */ | |
.header-container { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; | |
color: white !important; | |
padding: 2rem !important; | |
border-radius: 15px !important; | |
margin-bottom: 2rem !important; | |
text-align: center !important; | |
box-shadow: 0 8px 32px rgba(0,0,0,0.1) !important; | |
} | |
.header-title { | |
font-size: 2.5rem !important; | |
font-weight: 700 !important; | |
margin-bottom: 0.5rem !important; | |
text-shadow: 2px 2px 4px rgba(0,0,0,0.3) !important; | |
} | |
.header-subtitle { | |
font-size: 1.2rem !important; | |
opacity: 0.9 !important; | |
font-weight: 300 !important; | |
} | |
/* Tab styling */ | |
.tab-nav { | |
background: white !important; | |
border-radius: 12px !important; | |
box-shadow: 0 4px 20px rgba(0,0,0,0.08) !important; | |
padding: 0.5rem !important; | |
margin-bottom: 1rem !important; | |
} | |
/* Card styling */ | |
.setup-card, .upload-card, .chat-card { | |
background: white !important; | |
border-radius: 15px !important; | |
padding: 2rem !important; | |
box-shadow: 0 4px 20px rgba(0,0,0,0.08) !important; | |
border: 1px solid #e1e5e9 !important; | |
margin-bottom: 1.5rem !important; | |
} | |
/* Button styling */ | |
.primary-button { | |
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; | |
color: white !important; | |
border: none !important; | |
border-radius: 10px !important; | |
padding: 0.75rem 2rem !important; | |
font-weight: 600 !important; | |
transition: all 0.3s ease !important; | |
box-shadow: 0 4px 15px rgba(102, 126, 234, 0.3) !important; | |
} | |
.primary-button:hover { | |
transform: translateY(-2px) !important; | |
box-shadow: 0 6px 20px rgba(102, 126, 234, 0.4) !important; | |
} | |
/* Chat interface styling */ | |
.chat-container { | |
max-height: 600px !important; | |
overflow-y: auto !important; | |
background: #f8f9fa !important; | |
border-radius: 15px !important; | |
padding: 1rem !important; | |
border: 1px solid #e1e5e9 !important; | |
} | |
/* Input styling */ | |
.input-container input, .input-container textarea { | |
border: 2px solid #e1e5e9 !important; | |
border-radius: 10px !important; | |
padding: 0.75rem 1rem !important; | |
font-size: 1rem !important; | |
transition: border-color 0.3s ease !important; | |
} | |
.input-container input:focus, .input-container textarea:focus { | |
border-color: #667eea !important; | |
box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important; | |
outline: none !important; | |
} | |
/* Status indicators */ | |
.status-success { | |
color: #28a745 !important; | |
background: #d4edda !important; | |
padding: 0.75rem 1rem !important; | |
border-radius: 8px !important; | |
border: 1px solid #c3e6cb !important; | |
margin: 1rem 0 !important; | |
} | |
.status-error { | |
color: #dc3545 !important; | |
background: #f8d7da !important; | |
padding: 0.75rem 1rem !important; | |
border-radius: 8px !important; | |
border: 1px solid #f5c6cb !important; | |
margin: 1rem 0 !important; | |
} | |
/* File upload styling */ | |
.file-upload { | |
border: 2px dashed #667eea !important; | |
border-radius: 15px !important; | |
padding: 2rem !important; | |
text-align: center !important; | |
background: #f8f9ff !important; | |
transition: all 0.3s ease !important; | |
} | |
.file-upload:hover { | |
border-color: #764ba2 !important; | |
background: #f0f4ff !important; | |
} | |
/* Architecture diagram container */ | |
.architecture-container { | |
background: white !important; | |
border-radius: 15px !important; | |
padding: 2rem !important; | |
margin: 1rem 0 !important; | |
box-shadow: 0 4px 20px rgba(0,0,0,0.08) !important; | |
text-align: center !important; | |
} | |
/* Responsive design */ | |
@media (max-width: 768px) { | |
.header-title { | |
font-size: 2rem !important; | |
} | |
.setup-card, .upload-card, .chat-card { | |
padding: 1.5rem !important; | |
} | |
} | |
/* Animation for loading states */ | |
@keyframes pulse { | |
0% { opacity: 1; } | |
50% { opacity: 0.5; } | |
100% { opacity: 1; } | |
} | |
.loading { | |
animation: pulse 1.5s ease-in-out infinite !important; | |
} | |
""" | |
# Create Gradio Interface | |
def create_interface(): | |
with gr.Blocks(css=custom_css, title="π€ Agentic RAG Chatbot") as demo: | |
gr.HTML(""" | |
<div class="header-container"> | |
<h1 class="header-title">π€ Agentic RAG Chatbot</h1> | |
<p class="header-subtitle">Multi-Format Document QA using Model Context Protocol (MCP)</p> | |
</div> | |
""") | |
with gr.Tabs() as tabs: | |
# Setup Tab | |
with gr.TabItem("βοΈ Setup", elem_classes=["tab-nav"]): | |
gr.HTML(""" | |
<div class="setup-card"> | |
<h3>π Configuration</h3> | |
<p>Enter your HuggingFace token to get started. This token is used to access the Llama-3.1-8B-Instruct model.</p> | |
</div> | |
""") | |
with gr.Row(): | |
hf_token_input = gr.Textbox( | |
label="HuggingFace Token", | |
placeholder="hf_xxxxxxxxxxxxxxxxxxxxxxxxx", | |
type="password", | |
elem_classes=["input-container"] | |
) | |
with gr.Row(): | |
init_button = gr.Button( | |
"Initialize Application", | |
variant="primary", | |
elem_classes=["primary-button"] | |
) | |
init_status = gr.Textbox( | |
label="Status", | |
interactive=False, | |
elem_classes=["input-container"] | |
) | |
# Upload Tab | |
with gr.TabItem("π Upload Documents", elem_classes=["tab-nav"]): | |
gr.HTML(""" | |
<div class="upload-card"> | |
<h3>π Document Upload</h3> | |
<p>Upload your documents in any supported format: PDF, DOCX, PPTX, CSV, TXT, or Markdown.</p> | |
</div> | |
""") | |
file_upload = gr.File( | |
label="Choose Files", | |
file_count="multiple", | |
file_types=[".pdf", ".docx", ".pptx", ".csv", ".txt", ".md"], | |
elem_classes=["file-upload"] | |
) | |
upload_button = gr.Button( | |
"Process Documents", | |
variant="primary", | |
elem_classes=["primary-button"] | |
) | |
upload_status = gr.Textbox( | |
label="Processing Status", | |
interactive=False, | |
elem_classes=["input-container"] | |
) | |
# Chat Tab | |
with gr.TabItem("π¬ Chat", elem_classes=["tab-nav"]): | |
gr.HTML(""" | |
<div class="chat-card"> | |
<h3>π¨οΈ Ask Questions</h3> | |
<p>Ask questions about your uploaded documents. The AI will provide answers based on the document content.</p> | |
</div> | |
""") | |
chatbot = gr.Chatbot( | |
label="Conversation", | |
height=400, | |
elem_classes=["chat-container"] | |
) | |
with gr.Row(): | |
query_input = gr.Textbox( | |
label="Your Question", | |
placeholder="What are the key findings in the document?", | |
elem_classes=["input-container"] | |
) | |
ask_button = gr.Button( | |
"Ask", | |
variant="primary", | |
elem_classes=["primary-button"] | |
) | |
gr.Examples( | |
examples=[ | |
"What are the main topics covered in the documents?", | |
"Can you summarize the key findings?", | |
"What are the important metrics mentioned?", | |
"What recommendations are provided?", | |
], | |
inputs=query_input, | |
label="Example Questions" | |
) | |
# Architecture Tab | |
with gr.TabItem("ποΈ Architecture", elem_classes=["tab-nav"]): | |
gr.HTML(""" | |
<div class="architecture-container"> | |
<h3>ποΈ System Architecture</h3> | |
<p>This system uses an agentic architecture with Model Context Protocol (MCP) for inter-agent communication.</p> | |
</div> | |
""") | |
gr.Markdown(""" | |
## π Agent Flow Diagram | |
``` | |
User Upload β CoordinatorAgent β IngestionAgent β RetrievalAgent β LLMResponseAgent | |
β β β β β | |
Documents MCP Messages Text Chunks Vector Store Final Response | |
``` | |
## π€ Agent Descriptions | |
- **CoordinatorAgent**: Orchestrates the entire workflow and manages MCP communication | |
- **IngestionAgent**: Parses and preprocesses documents (PDF, DOCX, PPTX, CSV, TXT, MD) | |
- **RetrievalAgent**: Handles embeddings and semantic retrieval using FAISS | |
- **LLMResponseAgent**: Generates final responses using Llama-3.1-8B-Instruct | |
## π Tech Stack | |
- **Frontend**: Gradio with custom CSS | |
- **LLM**: Meta Llama-3.1-8B-Instruct (via HuggingFace Inference) | |
- **Embeddings**: sentence-transformers/all-MiniLM-L6-v2 | |
- **Vector Store**: FAISS | |
- **Document Processing**: PyPDF2, python-docx, python-pptx, pandas | |
- **Framework**: LangChain for document handling | |
## π¨ MCP Message Example | |
```json | |
{ | |
"sender": "RetrievalAgent", | |
"receiver": "LLMResponseAgent", | |
"type": "RETRIEVAL_RESULT", | |
"trace_id": "rag-457", | |
"payload": { | |
"retrieved_context": ["Revenue increased by 25%", "Q1 KPIs exceeded targets"], | |
"query": "What were the Q1 KPIs?" | |
}, | |
"timestamp": "2025-07-21T10:30:00Z" | |
} | |
``` | |
""") | |
# Event handlers | |
init_button.click( | |
fn=initialize_app, | |
inputs=[hf_token_input], | |
outputs=[init_status] | |
) | |
upload_button.click( | |
fn=process_files, | |
inputs=[file_upload], | |
outputs=[upload_status] | |
) | |
ask_button.click( | |
fn=answer_question, | |
inputs=[query_input, chatbot], | |
outputs=[chatbot] | |
) | |
query_input.submit( | |
fn=answer_question, | |
inputs=[query_input, chatbot], | |
outputs=[chatbot] | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_interface() | |
demo.launch( | |
share=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_api=False | |
) |