import gradio as gr import os import json import uuid import asyncio from datetime import datetime from typing import List, Dict, Any, Optional, Generator import logging # Import required libraries from huggingface_hub import InferenceClient from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import FAISS from langchain.docstore.document import Document # Import document parsers import PyPDF2 from pptx import Presentation import pandas as pd from docx import Document as DocxDocument import io # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Get HuggingFace token from environment HF_TOKEN = os.getenv("hf_token") if not HF_TOKEN: raise ValueError("HuggingFace token not found in environment variables") # Initialize HuggingFace Inference Client client = InferenceClient(model="meta-llama/Llama-3.1-8B-Instruct", token=HF_TOKEN) # Initialize embeddings embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") class MCPMessage: """Model Context Protocol Message Structure""" def __init__(self, sender: str, receiver: str, msg_type: str, trace_id: str = None, payload: Dict = None): self.sender = sender self.receiver = receiver self.type = msg_type self.trace_id = trace_id or str(uuid.uuid4()) self.payload = payload or {} self.timestamp = datetime.now().isoformat() def to_dict(self): return { "sender": self.sender, "receiver": self.receiver, "type": self.type, "trace_id": self.trace_id, "payload": self.payload, "timestamp": self.timestamp } class MessageBus: """In-memory message bus for MCP communication""" def __init__(self): self.messages = [] self.subscribers = {} def publish(self, message: MCPMessage): """Publish message to the bus""" self.messages.append(message) logger.info(f"Message published: {message.sender} -> {message.receiver} [{message.type}]") # Notify subscribers if message.receiver in self.subscribers: for callback in self.subscribers[message.receiver]: callback(message) def subscribe(self, agent_name: str, callback): """Subscribe agent to receive messages""" if agent_name not in self.subscribers: self.subscribers[agent_name] = [] self.subscribers[agent_name].append(callback) # Global message bus message_bus = MessageBus() class IngestionAgent: """Agent responsible for document parsing and preprocessing""" def __init__(self, message_bus: MessageBus): self.name = "IngestionAgent" self.message_bus = message_bus self.message_bus.subscribe(self.name, self.handle_message) self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=200 ) def handle_message(self, message: MCPMessage): """Handle incoming MCP messages""" if message.type == "INGESTION_REQUEST": self.process_documents(message) def parse_pdf(self, file_path: str) -> str: """Parse PDF document""" try: with open(file_path, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) text = "" for page in pdf_reader.pages: text += page.extract_text() return text except Exception as e: logger.error(f"Error parsing PDF: {e}") return "" def parse_pptx(self, file_path: str) -> str: """Parse PPTX document""" try: prs = Presentation(file_path) text = "" for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, "text"): text += shape.text + "\n" return text except Exception as e: logger.error(f"Error parsing PPTX: {e}") return "" def parse_csv(self, file_path: str) -> str: """Parse CSV document""" try: df = pd.read_csv(file_path) return df.to_string() except Exception as e: logger.error(f"Error parsing CSV: {e}") return "" def parse_docx(self, file_path: str) -> str: """Parse DOCX document""" try: doc = DocxDocument(file_path) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" return text except Exception as e: logger.error(f"Error parsing DOCX: {e}") return "" def parse_txt(self, file_path: str) -> str: """Parse TXT/Markdown document""" try: with open(file_path, 'r', encoding='utf-8') as file: return file.read() except Exception as e: logger.error(f"Error parsing TXT: {e}") return "" def process_documents(self, message: MCPMessage): """Process uploaded documents""" files = message.payload.get("files", []) processed_docs = [] for file_path in files: file_ext = os.path.splitext(file_path)[1].lower() # Parse document based on file type if file_ext == '.pdf': text = self.parse_pdf(file_path) elif file_ext == '.pptx': text = self.parse_pptx(file_path) elif file_ext == '.csv': text = self.parse_csv(file_path) elif file_ext == '.docx': text = self.parse_docx(file_path) elif file_ext in ['.txt', '.md']: text = self.parse_txt(file_path) else: logger.warning(f"Unsupported file type: {file_ext}") continue if text: # Split text into chunks chunks = self.text_splitter.split_text(text) docs = [Document(page_content=chunk, metadata={"source": file_path}) for chunk in chunks] processed_docs.extend(docs) # Send processed documents to RetrievalAgent response = MCPMessage( sender=self.name, receiver="RetrievalAgent", msg_type="INGESTION_COMPLETE", trace_id=message.trace_id, payload={"documents": processed_docs} ) self.message_bus.publish(response) class RetrievalAgent: """Agent responsible for embedding and semantic retrieval""" def __init__(self, message_bus: MessageBus): self.name = "RetrievalAgent" self.message_bus = message_bus self.message_bus.subscribe(self.name, self.handle_message) self.vector_store = None def handle_message(self, message: MCPMessage): """Handle incoming MCP messages""" if message.type == "INGESTION_COMPLETE": self.create_vector_store(message) elif message.type == "RETRIEVAL_REQUEST": self.retrieve_context(message) def create_vector_store(self, message: MCPMessage): """Create vector store from processed documents""" documents = message.payload.get("documents", []) if documents: try: self.vector_store = FAISS.from_documents(documents, embeddings) logger.info(f"Vector store created with {len(documents)} documents") # Notify completion response = MCPMessage( sender=self.name, receiver="CoordinatorAgent", msg_type="VECTORSTORE_READY", trace_id=message.trace_id, payload={"status": "ready"} ) self.message_bus.publish(response) except Exception as e: logger.error(f"Error creating vector store: {e}") def retrieve_context(self, message: MCPMessage): """Retrieve relevant context for a query""" query = message.payload.get("query", "") k = message.payload.get("k", 3) if self.vector_store and query: try: docs = self.vector_store.similarity_search(query, k=k) context = [{"content": doc.page_content, "source": doc.metadata.get("source", "")} for doc in docs] response = MCPMessage( sender=self.name, receiver="LLMResponseAgent", msg_type="CONTEXT_RESPONSE", trace_id=message.trace_id, payload={ "query": query, "retrieved_context": context, "top_chunks": [doc.page_content for doc in docs] } ) self.message_bus.publish(response) except Exception as e: logger.error(f"Error retrieving context: {e}") class LLMResponseAgent: """Agent responsible for generating LLM responses""" def __init__(self, message_bus: MessageBus): self.name = "LLMResponseAgent" self.message_bus = message_bus self.message_bus.subscribe(self.name, self.handle_message) def handle_message(self, message: MCPMessage): """Handle incoming MCP messages""" if message.type == "CONTEXT_RESPONSE": self.generate_response(message) def generate_response(self, message: MCPMessage): """Generate response using retrieved context""" query = message.payload.get("query", "") context = message.payload.get("retrieved_context", []) # Build context string context_text = "\n\n".join([f"Source: {ctx['source']}\nContent: {ctx['content']}" for ctx in context]) # Create messages for conversational format messages = [ { "role": "system", "content": "You are a helpful assistant. Based on the provided context below, answer the user's question accurately and comprehensively. Cite the sources if possible.", }, { "role": "user", "content": f"Context:\n\n{context_text}\n\nQuestion: {query}" } ] try: # Use client.chat_completion for conversational models response_stream = client.chat_completion( messages=messages, max_tokens=512, temperature=0.7, stream=True ) # Send streaming response response = MCPMessage( sender=self.name, receiver="CoordinatorAgent", msg_type="LLM_RESPONSE_STREAM", trace_id=message.trace_id, payload={ "query": query, "response_stream": response_stream, "context": context } ) self.message_bus.publish(response) except Exception as e: logger.error(f"Error generating response: {e}") # Send an error stream back error_msg = f"Error from LLM: {e}" def error_generator(): yield error_msg response = MCPMessage( sender=self.name, receiver="CoordinatorAgent", msg_type="LLM_RESPONSE_STREAM", trace_id=message.trace_id, payload={"response_stream": error_generator()} ) self.message_bus.publish(response) class CoordinatorAgent: """Coordinator agent that orchestrates the entire workflow""" def __init__(self, message_bus: MessageBus): self.name = "CoordinatorAgent" self.message_bus = message_bus self.message_bus.subscribe(self.name, self.handle_message) self.current_response_stream = None self.vector_store_ready = False def handle_message(self, message: MCPMessage): """Handle incoming MCP messages""" if message.type == "VECTORSTORE_READY": self.vector_store_ready = True elif message.type == "LLM_RESPONSE_STREAM": self.current_response_stream = message.payload.get("response_stream") def process_files(self, files): """Process uploaded files""" if not files: return "No files uploaded." file_paths = [file.name for file in files] # Send ingestion request message = MCPMessage( sender=self.name, receiver="IngestionAgent", msg_type="INGESTION_REQUEST", payload={"files": file_paths} ) self.message_bus.publish(message) return f"Processing {len(files)} files: {', '.join([os.path.basename(fp) for fp in file_paths])}" def handle_query(self, query: str, history: List) -> Generator[str, None, None]: """Handle user query and return streaming response""" if not self.vector_store_ready: yield "Please upload and process documents first." return # Send retrieval request message = MCPMessage( sender=self.name, receiver="RetrievalAgent", msg_type="RETRIEVAL_REQUEST", payload={"query": query} ) self.message_bus.publish(message) # Wait for response and stream import time timeout = 20 # seconds start_time = time.time() while not self.current_response_stream and (time.time() - start_time) < timeout: time.sleep(0.1) if self.current_response_stream: try: # Stream tokens directly for chunk in self.current_response_stream: # The token is in chunk.choices[0].delta.content for chat_completion token = chunk.choices[0].delta.content if token: yield token except Exception as e: yield f"Error streaming response: {e}" finally: self.current_response_stream = None # Reset for next query else: yield "Timeout: No response received from LLM agent." # Initialize agents ingestion_agent = IngestionAgent(message_bus) retrieval_agent = RetrievalAgent(message_bus) llm_response_agent = LLMResponseAgent(message_bus) coordinator_agent = CoordinatorAgent(message_bus) # Gradio Interface def create_interface(): """Create modern ChatGPT-like Gradio interface""" with gr.Blocks( theme=gr.themes.Soft(), css=""" /* Main container styling */ .gradio-container { max-width: 100vw !important; margin: 0 !important; padding: 0 !important; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; min-height: 100vh; } /* Header styling */ .header-container { background: rgba(255, 255, 255, 0.95); backdrop-filter: blur(10px); border-bottom: 1px solid rgba(255, 255, 255, 0.2); padding: 1rem 2rem; box-shadow: 0 2px 20px rgba(0, 0, 0, 0.1); } .header-title { font-size: 2.5rem; font-weight: 700; background: linear-gradient(135deg, #667eea, #764ba2); -webkit-background-clip: text; -webkit-text-fill-color: transparent; text-align: center; margin: 0; } .header-subtitle { font-size: 1.1rem; color: #6b7280; text-align: center; margin-top: 0.5rem; font-weight: 400; } /* Sidebar styling */ .sidebar { background: rgba(255, 255, 255, 0.95) !important; backdrop-filter: blur(10px); border-right: 1px solid rgba(255, 255, 255, 0.2); padding: 2rem 1.5rem !important; min-height: calc(100vh - 100px); box-shadow: 2px 0 20px rgba(0, 0, 0, 0.05); } .sidebar h3 { color: #374151; font-weight: 600; margin-bottom: 1rem; font-size: 1.2rem; } /* Upload area styling */ .upload-area { background: linear-gradient(135deg, #f8fafc 0%, #e2e8f0 100%); border: 2px dashed #667eea; border-radius: 12px; padding: 2rem 1rem; margin: 1rem 0; transition: all 0.3s ease; text-align: center; } .upload-area:hover { border-color: #764ba2; background: linear-gradient(135deg, #f1f5f9 0%, #ddd6fe 100%); transform: translateY(-2px); box-shadow: 0 4px 15px rgba(102, 126, 234, 0.2); } /* Process button styling */ .process-btn { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; border: none !important; border-radius: 8px !important; padding: 0.75rem 2rem !important; color: white !important; font-weight: 600 !important; transition: all 0.3s ease !important; box-shadow: 0 4px 15px rgba(102, 126, 234, 0.4) !important; } .process-btn:hover { transform: translateY(-2px) !important; box-shadow: 0 6px 20px rgba(102, 126, 234, 0.6) !important; } /* Chat container styling */ .chat-container { background: rgba(255, 255, 255, 0.95) !important; backdrop-filter: blur(10px); border-radius: 16px !important; margin: 2rem; box-shadow: 0 8px 32px rgba(0, 0, 0, 0.1) !important; overflow: hidden; min-height: calc(100vh - 200px); } /* Chatbot styling */ .chatbot { background: transparent !important; border: none !important; } /* Message input styling */ .message-input { background: rgba(255, 255, 255, 0.9) !important; border: 2px solid rgba(102, 126, 234, 0.2) !important; border-radius: 25px !important; padding: 0.75rem 1.5rem !important; font-size: 1rem !important; transition: all 0.3s ease !important; } .message-input:focus { border-color: #667eea !important; box-shadow: 0 0 0 3px rgba(102, 126, 234, 0.1) !important; outline: none !important; } /* Send button styling */ .send-btn { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%) !important; border: none !important; border-radius: 50% !important; width: 48px !important; height: 48px !important; display: flex !important; align-items: center !important; justify-content: center !important; color: white !important; font-weight: 600 !important; transition: all 0.3s ease !important; box-shadow: 0 4px 15px rgba(102, 126, 234, 0.4) !important; margin-left: 0.5rem !important; } .send-btn:hover { transform: scale(1.05) !important; box-shadow: 0 6px 20px rgba(102, 126, 234, 0.6) !important; } /* Status display styling */ .status-display { background: linear-gradient(135deg, #f0f9ff 0%, #e0e7ff 100%) !important; border: 1px solid rgba(102, 126, 234, 0.2) !important; border-radius: 8px !important; padding: 1rem !important; margin: 1rem 0 !important; font-family: 'SF Mono', 'Monaco', monospace !important; font-size: 0.9rem !important; } /* Info section styling */ .info-section { background: linear-gradient(135deg, #f8fafc 0%, #f1f5f9 100%); border-radius: 12px; padding: 1.5rem; margin-top: 2rem; border: 1px solid rgba(102, 126, 234, 0.1); } .info-section h4 { color: #374151; font-weight: 600; margin-bottom: 1rem; font-size: 1.1rem; } .info-section p { color: #6b7280; font-size: 0.9rem; line-height: 1.5; margin: 0.5rem 0; } /* Examples styling */ .examples-container { padding: 1rem 2rem; } .example-btn { background: rgba(255, 255, 255, 0.8) !important; border: 1px solid rgba(102, 126, 234, 0.3) !important; border-radius: 20px !important; padding: 0.5rem 1rem !important; margin: 0.25rem !important; font-size: 0.9rem !important; color: #374151 !important; transition: all 0.3s ease !important; } .example-btn:hover { background: rgba(102, 126, 234, 0.1) !important; border-color: #667eea !important; transform: translateY(-1px) !important; } /* Responsive design */ @media (max-width: 768px) { .gradio-container { padding: 0 !important; } .header-title { font-size: 2rem; } .sidebar { padding: 1rem !important; } .chat-container { margin: 1rem; } } """, title="AI Document Assistant" ) as iface: # Header gr.HTML("""
Intelligent multi-format document analysis with advanced RAG architecture
Ingestion: Document parsing & preprocessing
Retrieval: Semantic search & context extraction
Response: Intelligent answer generation
Coordinator: Workflow orchestration
Built with Model Context Protocol (MCP) for seamless agent communication and advanced RAG capabilities.