Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -8,6 +8,7 @@ import json
|
|
8 |
import asyncio
|
9 |
from dataclasses import dataclass, asdict
|
10 |
import logging
|
|
|
11 |
|
12 |
# Document processing imports
|
13 |
import PyPDF2
|
@@ -27,7 +28,7 @@ from huggingface_hub import InferenceClient
|
|
27 |
logging.basicConfig(level=logging.INFO)
|
28 |
logger = logging.getLogger(__name__)
|
29 |
|
30 |
-
# Get HF token from environment
|
31 |
HF_TOKEN = os.getenv('hf_token')
|
32 |
|
33 |
if HF_TOKEN is None:
|
@@ -45,11 +46,11 @@ class MCPMessage:
|
|
45 |
trace_id: str
|
46 |
payload: Dict[str, Any]
|
47 |
timestamp: str = None
|
48 |
-
|
49 |
def __post_init__(self):
|
50 |
if self.timestamp is None:
|
51 |
self.timestamp = datetime.now().isoformat()
|
52 |
-
|
53 |
def to_dict(self):
|
54 |
return asdict(self)
|
55 |
|
@@ -58,11 +59,11 @@ class MCPCommunicator:
|
|
58 |
def __init__(self):
|
59 |
self.message_queue = asyncio.Queue()
|
60 |
self.subscribers = {}
|
61 |
-
|
62 |
async def send_message(self, message: MCPMessage):
|
63 |
logger.info(f"MCP: {message.sender} -> {message.receiver}: {message.type}")
|
64 |
await self.message_queue.put(message)
|
65 |
-
|
66 |
async def receive_message(self, agent_name: str) -> MCPMessage:
|
67 |
while True:
|
68 |
message = await self.message_queue.get()
|
@@ -79,7 +80,7 @@ class BaseAgent:
|
|
79 |
def __init__(self, name: str):
|
80 |
self.name = name
|
81 |
self.mcp = mcp
|
82 |
-
|
83 |
async def send_mcp_message(self, receiver: str, msg_type: str, payload: Dict[str, Any], trace_id: str):
|
84 |
message = MCPMessage(
|
85 |
sender=self.name,
|
@@ -89,7 +90,7 @@ class BaseAgent:
|
|
89 |
payload=payload
|
90 |
)
|
91 |
await self.mcp.send_message(message)
|
92 |
-
|
93 |
async def receive_mcp_message(self) -> MCPMessage:
|
94 |
return await self.mcp.receive_message(self.name)
|
95 |
|
@@ -102,7 +103,7 @@ class IngestionAgent(BaseAgent):
|
|
102 |
chunk_overlap=200,
|
103 |
length_function=len,
|
104 |
)
|
105 |
-
|
106 |
def parse_pdf(self, file_path: str) -> str:
|
107 |
"""Parse PDF file and extract text"""
|
108 |
try:
|
@@ -115,7 +116,7 @@ class IngestionAgent(BaseAgent):
|
|
115 |
except Exception as e:
|
116 |
logger.error(f"Error parsing PDF: {e}")
|
117 |
return ""
|
118 |
-
|
119 |
def parse_docx(self, file_path: str) -> str:
|
120 |
"""Parse DOCX file and extract text"""
|
121 |
try:
|
@@ -127,7 +128,7 @@ class IngestionAgent(BaseAgent):
|
|
127 |
except Exception as e:
|
128 |
logger.error(f"Error parsing DOCX: {e}")
|
129 |
return ""
|
130 |
-
|
131 |
def parse_pptx(self, file_path: str) -> str:
|
132 |
"""Parse PPTX file and extract text"""
|
133 |
try:
|
@@ -141,9 +142,9 @@ class IngestionAgent(BaseAgent):
|
|
141 |
text += "\n"
|
142 |
return text
|
143 |
except Exception as e:
|
144 |
-
|
145 |
-
|
146 |
-
|
147 |
def parse_csv(self, file_path: str) -> str:
|
148 |
"""Parse CSV file and convert to text"""
|
149 |
try:
|
@@ -152,7 +153,7 @@ class IngestionAgent(BaseAgent):
|
|
152 |
except Exception as e:
|
153 |
logger.error(f"Error parsing CSV: {e}")
|
154 |
return ""
|
155 |
-
|
156 |
def parse_txt_md(self, file_path: str) -> str:
|
157 |
"""Parse TXT or MD file"""
|
158 |
try:
|
@@ -165,15 +166,15 @@ class IngestionAgent(BaseAgent):
|
|
165 |
except Exception as e:
|
166 |
logger.error(f"Error parsing TXT/MD: {e}")
|
167 |
return ""
|
168 |
-
|
169 |
async def process_documents(self, files: List[str], trace_id: str) -> List[LCDocument]:
|
170 |
"""Process uploaded documents and return chunked documents"""
|
171 |
all_documents = []
|
172 |
-
|
173 |
for file_path in files:
|
174 |
file_ext = os.path.splitext(file_path)[1].lower()
|
175 |
filename = os.path.basename(file_path)
|
176 |
-
|
177 |
# Parse based on file extension
|
178 |
if file_ext == '.pdf':
|
179 |
content = self.parse_pdf(file_path)
|
@@ -188,11 +189,11 @@ class IngestionAgent(BaseAgent):
|
|
188 |
else:
|
189 |
logger.warning(f"Unsupported file type: {file_ext}")
|
190 |
continue
|
191 |
-
|
192 |
if content.strip():
|
193 |
# Split content into chunks
|
194 |
chunks = self.text_splitter.split_text(content)
|
195 |
-
|
196 |
# Create LangChain documents
|
197 |
for i, chunk in enumerate(chunks):
|
198 |
doc = LCDocument(
|
@@ -204,7 +205,7 @@ class IngestionAgent(BaseAgent):
|
|
204 |
}
|
205 |
)
|
206 |
all_documents.append(doc)
|
207 |
-
|
208 |
return all_documents
|
209 |
|
210 |
# Retrieval Agent
|
@@ -215,7 +216,7 @@ class RetrievalAgent(BaseAgent):
|
|
215 |
model_name="sentence-transformers/all-MiniLM-L6-v2"
|
216 |
)
|
217 |
self.vector_store = None
|
218 |
-
|
219 |
async def create_vector_store(self, documents: List[LCDocument], trace_id: str):
|
220 |
"""Create vector store from documents"""
|
221 |
try:
|
@@ -226,16 +227,16 @@ class RetrievalAgent(BaseAgent):
|
|
226 |
logger.warning("No documents to create vector store")
|
227 |
except Exception as e:
|
228 |
logger.error(f"Error creating vector store: {e}")
|
229 |
-
|
230 |
async def retrieve_relevant_chunks(self, query: str, k: int = 5, trace_id: str = None) -> List[Dict]:
|
231 |
"""Retrieve relevant chunks for a query"""
|
232 |
if not self.vector_store:
|
233 |
return []
|
234 |
-
|
235 |
try:
|
236 |
# Similarity search
|
237 |
docs = self.vector_store.similarity_search(query, k=k)
|
238 |
-
|
239 |
# Format results
|
240 |
results = []
|
241 |
for doc in docs:
|
@@ -245,7 +246,7 @@ class RetrievalAgent(BaseAgent):
|
|
245 |
"chunk_id": doc.metadata.get("chunk_id", 0),
|
246 |
"file_type": doc.metadata.get("file_type", "Unknown")
|
247 |
})
|
248 |
-
|
249 |
return results
|
250 |
except Exception as e:
|
251 |
logger.error(f"Error retrieving chunks: {e}")
|
@@ -255,19 +256,25 @@ class RetrievalAgent(BaseAgent):
|
|
255 |
class LLMResponseAgent(BaseAgent):
|
256 |
def __init__(self):
|
257 |
super().__init__("LLMResponseAgent")
|
|
|
258 |
self.client = InferenceClient(
|
259 |
model="meta-llama/Llama-3.1-8B-Instruct",
|
260 |
-
token=HF_TOKEN
|
261 |
)
|
262 |
-
|
263 |
-
def
|
264 |
-
"""
|
|
|
|
|
|
|
265 |
context_text = "\n\n".join([
|
266 |
f"Source: {chunk['source']}\nContent: {chunk['content']}"
|
267 |
for chunk in context_chunks
|
268 |
])
|
269 |
-
|
270 |
-
|
|
|
|
|
271 |
|
272 |
Context:
|
273 |
{context_text}
|
@@ -276,27 +283,40 @@ Question: {query}
|
|
276 |
|
277 |
Please provide a comprehensive answer based on the context above. If the context doesn't contain enough information to fully answer the question, please mention what information is available and what might be missing.
|
278 |
|
279 |
-
Answer:"""
|
280 |
-
|
281 |
-
|
282 |
-
|
283 |
async def generate_response(self, query: str, context_chunks: List[Dict], trace_id: str) -> str:
|
284 |
-
"""Generate response using LLM"""
|
285 |
try:
|
286 |
-
prompt
|
287 |
-
|
288 |
-
|
289 |
-
|
290 |
-
|
291 |
-
|
292 |
-
|
293 |
-
|
294 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
295 |
)
|
296 |
-
|
297 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
298 |
except Exception as e:
|
299 |
-
logger.error(f"Error generating response: {e}")
|
300 |
return f"I apologize, but I encountered an error while generating the response: {str(e)}"
|
301 |
|
302 |
# Coordinator Agent
|
@@ -305,72 +325,72 @@ class CoordinatorAgent(BaseAgent):
|
|
305 |
super().__init__("CoordinatorAgent")
|
306 |
self.ingestion_agent = IngestionAgent()
|
307 |
self.retrieval_agent = RetrievalAgent()
|
308 |
-
self.llm_agent = LLMResponseAgent()
|
309 |
self.documents_processed = False
|
310 |
-
|
311 |
async def process_documents(self, files: List[str]) -> str:
|
312 |
"""Orchestrate document processing"""
|
313 |
trace_id = str(uuid.uuid4())
|
314 |
-
|
315 |
try:
|
316 |
# Step 1: Ingestion
|
317 |
await self.send_mcp_message(
|
318 |
-
"IngestionAgent",
|
319 |
-
"DOCUMENT_INGESTION_REQUEST",
|
320 |
-
{"files": files},
|
321 |
trace_id
|
322 |
)
|
323 |
-
|
324 |
documents = await self.ingestion_agent.process_documents(files, trace_id)
|
325 |
-
|
326 |
await self.send_mcp_message(
|
327 |
-
"RetrievalAgent",
|
328 |
-
"VECTOR_STORE_CREATE_REQUEST",
|
329 |
-
{"documents": len(documents)},
|
330 |
trace_id
|
331 |
)
|
332 |
-
|
333 |
# Step 2: Create vector store
|
334 |
await self.retrieval_agent.create_vector_store(documents, trace_id)
|
335 |
-
|
336 |
self.documents_processed = True
|
337 |
-
|
338 |
return f"Successfully processed {len(documents)} document chunks from {len(files)} files."
|
339 |
-
|
340 |
except Exception as e:
|
341 |
logger.error(f"Error in document processing: {e}")
|
342 |
return f"Error processing documents: {str(e)}"
|
343 |
-
|
344 |
async def answer_query(self, query: str) -> tuple[str, List[Dict]]:
|
345 |
"""Orchestrate query answering"""
|
346 |
if not self.documents_processed:
|
347 |
return "Please upload and process documents first.", []
|
348 |
-
|
349 |
trace_id = str(uuid.uuid4())
|
350 |
-
|
351 |
try:
|
352 |
# Step 1: Retrieval
|
353 |
await self.send_mcp_message(
|
354 |
-
"RetrievalAgent",
|
355 |
-
"RETRIEVAL_REQUEST",
|
356 |
-
{"query": query},
|
357 |
trace_id
|
358 |
)
|
359 |
-
|
360 |
context_chunks = await self.retrieval_agent.retrieve_relevant_chunks(query, k=5, trace_id=trace_id)
|
361 |
-
|
362 |
# Step 2: LLM Response
|
363 |
await self.send_mcp_message(
|
364 |
-
"LLMResponseAgent",
|
365 |
-
"LLM_GENERATION_REQUEST",
|
366 |
-
{"query": query, "context_chunks": len(context_chunks)},
|
367 |
trace_id
|
368 |
)
|
369 |
-
|
370 |
response = await self.llm_agent.generate_response(query, context_chunks, trace_id)
|
371 |
-
|
372 |
return response, context_chunks
|
373 |
-
|
374 |
except Exception as e:
|
375 |
logger.error(f"Error in query processing: {e}")
|
376 |
return f"Error processing query: {str(e)}", []
|
@@ -382,41 +402,49 @@ async def process_files(files):
|
|
382 |
"""Process uploaded files"""
|
383 |
if not files:
|
384 |
return "β Please upload at least one file."
|
385 |
-
|
386 |
-
# Save uploaded files to temporary directory
|
387 |
file_paths = []
|
388 |
for file in files:
|
389 |
-
|
390 |
-
|
391 |
-
|
392 |
-
|
393 |
-
|
394 |
-
|
395 |
-
|
|
|
|
|
|
|
|
|
|
|
396 |
result = await coordinator.process_documents(file_paths)
|
397 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
398 |
return result
|
399 |
|
400 |
async def answer_question(query, history):
|
401 |
"""Answer user question"""
|
402 |
if not query.strip():
|
403 |
return history, ""
|
404 |
-
|
405 |
response, context_chunks = await coordinator.answer_query(query)
|
406 |
-
|
407 |
-
# Format response with sources
|
408 |
if context_chunks:
|
409 |
sources = "\n\n**Sources:**\n"
|
410 |
-
for i, chunk in enumerate(context_chunks[:3], 1):
|
411 |
sources += f"{i}. {chunk['source']} (Chunk {chunk['chunk_id']})\n"
|
412 |
response += sources
|
413 |
-
|
414 |
-
# Add to chat history
|
415 |
history.append((query, response))
|
416 |
-
|
417 |
return history, ""
|
418 |
|
419 |
-
# Custom CSS
|
420 |
custom_css = """
|
421 |
/* Main container styling */
|
422 |
.gradio-container {
|
@@ -459,7 +487,7 @@ custom_css = """
|
|
459 |
}
|
460 |
|
461 |
/* Card styling */
|
462 |
-
.
|
463 |
background: white !important;
|
464 |
border-radius: 15px !important;
|
465 |
padding: 2rem !important;
|
@@ -559,8 +587,8 @@ custom_css = """
|
|
559 |
.header-title {
|
560 |
font-size: 2rem !important;
|
561 |
}
|
562 |
-
|
563 |
-
.
|
564 |
padding: 1.5rem !important;
|
565 |
}
|
566 |
}
|
@@ -586,9 +614,9 @@ def create_interface():
|
|
586 |
<p class="header-subtitle">Multi-Format Document QA using Model Context Protocol (MCP)</p>
|
587 |
</div>
|
588 |
""")
|
589 |
-
|
590 |
with gr.Tabs() as tabs:
|
591 |
-
# Upload Tab
|
592 |
with gr.TabItem("π Upload Documents", elem_classes=["tab-nav"]):
|
593 |
gr.HTML("""
|
594 |
<div class="upload-card">
|
@@ -596,26 +624,26 @@ def create_interface():
|
|
596 |
<p>Upload your documents in any supported format: PDF, DOCX, PPTX, CSV, TXT, or Markdown.</p>
|
597 |
</div>
|
598 |
""")
|
599 |
-
|
600 |
file_upload = gr.File(
|
601 |
label="Choose Files",
|
602 |
file_count="multiple",
|
603 |
file_types=[".pdf", ".docx", ".pptx", ".csv", ".txt", ".md"],
|
604 |
elem_classes=["file-upload"]
|
605 |
)
|
606 |
-
|
607 |
upload_button = gr.Button(
|
608 |
-
"Process Documents",
|
609 |
variant="primary",
|
610 |
elem_classes=["primary-button"]
|
611 |
)
|
612 |
-
|
613 |
upload_status = gr.Textbox(
|
614 |
label="Processing Status",
|
615 |
interactive=False,
|
616 |
elem_classes=["input-container"]
|
617 |
)
|
618 |
-
|
619 |
# Chat Tab
|
620 |
with gr.TabItem("π¬ Chat", elem_classes=["tab-nav"]):
|
621 |
gr.HTML("""
|
@@ -624,13 +652,13 @@ def create_interface():
|
|
624 |
<p>Ask questions about your uploaded documents. The AI will provide answers based on the document content.</p>
|
625 |
</div>
|
626 |
""")
|
627 |
-
|
628 |
chatbot = gr.Chatbot(
|
629 |
label="Conversation",
|
630 |
height=400,
|
631 |
elem_classes=["chat-container"]
|
632 |
)
|
633 |
-
|
634 |
with gr.Row():
|
635 |
query_input = gr.Textbox(
|
636 |
label="Your Question",
|
@@ -638,11 +666,11 @@ def create_interface():
|
|
638 |
elem_classes=["input-container"]
|
639 |
)
|
640 |
ask_button = gr.Button(
|
641 |
-
"Ask",
|
642 |
variant="primary",
|
643 |
elem_classes=["primary-button"]
|
644 |
)
|
645 |
-
|
646 |
gr.Examples(
|
647 |
examples=[
|
648 |
"What are the main topics covered in the documents?",
|
@@ -653,7 +681,7 @@ def create_interface():
|
|
653 |
inputs=query_input,
|
654 |
label="Example Questions"
|
655 |
)
|
656 |
-
|
657 |
# Architecture Tab
|
658 |
with gr.TabItem("ποΈ Architecture", elem_classes=["tab-nav"]):
|
659 |
gr.HTML("""
|
@@ -662,38 +690,38 @@ def create_interface():
|
|
662 |
<p>This system uses an agentic architecture with Model Context Protocol (MCP) for inter-agent communication.</p>
|
663 |
</div>
|
664 |
""")
|
665 |
-
|
666 |
gr.Markdown("""
|
667 |
## π Agent Flow Diagram
|
668 |
-
|
669 |
```
|
670 |
User Upload β CoordinatorAgent β IngestionAgent β RetrievalAgent β LLMResponseAgent
|
671 |
β β β β β
|
672 |
Documents MCP Messages Text Chunks Vector Store Final Response
|
673 |
```
|
674 |
-
|
675 |
## π€ Agent Descriptions
|
676 |
-
|
677 |
- **CoordinatorAgent**: Orchestrates the entire workflow and manages MCP communication
|
678 |
- **IngestionAgent**: Parses and preprocesses documents (PDF, DOCX, PPTX, CSV, TXT, MD)
|
679 |
- **RetrievalAgent**: Handles embeddings and semantic retrieval using FAISS
|
680 |
- **LLMResponseAgent**: Generates final responses using Llama-3.1-8B-Instruct
|
681 |
-
|
682 |
## π Tech Stack
|
683 |
-
|
684 |
- **Frontend**: Gradio with custom CSS
|
685 |
- **LLM**: Meta Llama-3.1-8B-Instruct (via HuggingFace Inference)
|
686 |
- **Embeddings**: sentence-transformers/all-MiniLM-L6-v2
|
687 |
- **Vector Store**: FAISS
|
688 |
- **Document Processing**: PyPDF2, python-docx, python-pptx, pandas
|
689 |
- **Framework**: LangChain for document handling
|
690 |
-
|
691 |
## π¨ MCP Message Example
|
692 |
-
|
693 |
```json
|
694 |
{
|
695 |
"sender": "RetrievalAgent",
|
696 |
-
"receiver": "LLMResponseAgent",
|
697 |
"type": "RETRIEVAL_RESULT",
|
698 |
"trace_id": "rag-457",
|
699 |
"payload": {
|
@@ -704,26 +732,26 @@ def create_interface():
|
|
704 |
}
|
705 |
```
|
706 |
""")
|
707 |
-
|
708 |
# Event handlers
|
709 |
upload_button.click(
|
710 |
fn=process_files,
|
711 |
inputs=[file_upload],
|
712 |
outputs=[upload_status]
|
713 |
)
|
714 |
-
|
715 |
ask_button.click(
|
716 |
fn=answer_question,
|
717 |
inputs=[query_input, chatbot],
|
718 |
outputs=[chatbot, query_input]
|
719 |
)
|
720 |
-
|
721 |
query_input.submit(
|
722 |
fn=answer_question,
|
723 |
inputs=[query_input, chatbot],
|
724 |
outputs=[chatbot, query_input]
|
725 |
)
|
726 |
-
|
727 |
return demo
|
728 |
|
729 |
if __name__ == "__main__":
|
|
|
8 |
import asyncio
|
9 |
from dataclasses import dataclass, asdict
|
10 |
import logging
|
11 |
+
import sys # Import sys for exiting if token is missing
|
12 |
|
13 |
# Document processing imports
|
14 |
import PyPDF2
|
|
|
28 |
logging.basicConfig(level=logging.INFO)
|
29 |
logger = logging.getLogger(__name__)
|
30 |
|
31 |
+
# --- Get HF token from environment and perform a crucial check ---
|
32 |
HF_TOKEN = os.getenv('hf_token')
|
33 |
|
34 |
if HF_TOKEN is None:
|
|
|
46 |
trace_id: str
|
47 |
payload: Dict[str, Any]
|
48 |
timestamp: str = None
|
49 |
+
|
50 |
def __post_init__(self):
|
51 |
if self.timestamp is None:
|
52 |
self.timestamp = datetime.now().isoformat()
|
53 |
+
|
54 |
def to_dict(self):
|
55 |
return asdict(self)
|
56 |
|
|
|
59 |
def __init__(self):
|
60 |
self.message_queue = asyncio.Queue()
|
61 |
self.subscribers = {}
|
62 |
+
|
63 |
async def send_message(self, message: MCPMessage):
|
64 |
logger.info(f"MCP: {message.sender} -> {message.receiver}: {message.type}")
|
65 |
await self.message_queue.put(message)
|
66 |
+
|
67 |
async def receive_message(self, agent_name: str) -> MCPMessage:
|
68 |
while True:
|
69 |
message = await self.message_queue.get()
|
|
|
80 |
def __init__(self, name: str):
|
81 |
self.name = name
|
82 |
self.mcp = mcp
|
83 |
+
|
84 |
async def send_mcp_message(self, receiver: str, msg_type: str, payload: Dict[str, Any], trace_id: str):
|
85 |
message = MCPMessage(
|
86 |
sender=self.name,
|
|
|
90 |
payload=payload
|
91 |
)
|
92 |
await self.mcp.send_message(message)
|
93 |
+
|
94 |
async def receive_mcp_message(self) -> MCPMessage:
|
95 |
return await self.mcp.receive_message(self.name)
|
96 |
|
|
|
103 |
chunk_overlap=200,
|
104 |
length_function=len,
|
105 |
)
|
106 |
+
|
107 |
def parse_pdf(self, file_path: str) -> str:
|
108 |
"""Parse PDF file and extract text"""
|
109 |
try:
|
|
|
116 |
except Exception as e:
|
117 |
logger.error(f"Error parsing PDF: {e}")
|
118 |
return ""
|
119 |
+
|
120 |
def parse_docx(self, file_path: str) -> str:
|
121 |
"""Parse DOCX file and extract text"""
|
122 |
try:
|
|
|
128 |
except Exception as e:
|
129 |
logger.error(f"Error parsing DOCX: {e}")
|
130 |
return ""
|
131 |
+
|
132 |
def parse_pptx(self, file_path: str) -> str:
|
133 |
"""Parse PPTX file and extract text"""
|
134 |
try:
|
|
|
142 |
text += "\n"
|
143 |
return text
|
144 |
except Exception as e:
|
145 |
+
logger.error(f"Error parsing PPTX: {e}")
|
146 |
+
return ""
|
147 |
+
|
148 |
def parse_csv(self, file_path: str) -> str:
|
149 |
"""Parse CSV file and convert to text"""
|
150 |
try:
|
|
|
153 |
except Exception as e:
|
154 |
logger.error(f"Error parsing CSV: {e}")
|
155 |
return ""
|
156 |
+
|
157 |
def parse_txt_md(self, file_path: str) -> str:
|
158 |
"""Parse TXT or MD file"""
|
159 |
try:
|
|
|
166 |
except Exception as e:
|
167 |
logger.error(f"Error parsing TXT/MD: {e}")
|
168 |
return ""
|
169 |
+
|
170 |
async def process_documents(self, files: List[str], trace_id: str) -> List[LCDocument]:
|
171 |
"""Process uploaded documents and return chunked documents"""
|
172 |
all_documents = []
|
173 |
+
|
174 |
for file_path in files:
|
175 |
file_ext = os.path.splitext(file_path)[1].lower()
|
176 |
filename = os.path.basename(file_path)
|
177 |
+
|
178 |
# Parse based on file extension
|
179 |
if file_ext == '.pdf':
|
180 |
content = self.parse_pdf(file_path)
|
|
|
189 |
else:
|
190 |
logger.warning(f"Unsupported file type: {file_ext}")
|
191 |
continue
|
192 |
+
|
193 |
if content.strip():
|
194 |
# Split content into chunks
|
195 |
chunks = self.text_splitter.split_text(content)
|
196 |
+
|
197 |
# Create LangChain documents
|
198 |
for i, chunk in enumerate(chunks):
|
199 |
doc = LCDocument(
|
|
|
205 |
}
|
206 |
)
|
207 |
all_documents.append(doc)
|
208 |
+
|
209 |
return all_documents
|
210 |
|
211 |
# Retrieval Agent
|
|
|
216 |
model_name="sentence-transformers/all-MiniLM-L6-v2"
|
217 |
)
|
218 |
self.vector_store = None
|
219 |
+
|
220 |
async def create_vector_store(self, documents: List[LCDocument], trace_id: str):
|
221 |
"""Create vector store from documents"""
|
222 |
try:
|
|
|
227 |
logger.warning("No documents to create vector store")
|
228 |
except Exception as e:
|
229 |
logger.error(f"Error creating vector store: {e}")
|
230 |
+
|
231 |
async def retrieve_relevant_chunks(self, query: str, k: int = 5, trace_id: str = None) -> List[Dict]:
|
232 |
"""Retrieve relevant chunks for a query"""
|
233 |
if not self.vector_store:
|
234 |
return []
|
235 |
+
|
236 |
try:
|
237 |
# Similarity search
|
238 |
docs = self.vector_store.similarity_search(query, k=k)
|
239 |
+
|
240 |
# Format results
|
241 |
results = []
|
242 |
for doc in docs:
|
|
|
246 |
"chunk_id": doc.metadata.get("chunk_id", 0),
|
247 |
"file_type": doc.metadata.get("file_type", "Unknown")
|
248 |
})
|
249 |
+
|
250 |
return results
|
251 |
except Exception as e:
|
252 |
logger.error(f"Error retrieving chunks: {e}")
|
|
|
256 |
class LLMResponseAgent(BaseAgent):
|
257 |
def __init__(self):
|
258 |
super().__init__("LLMResponseAgent")
|
259 |
+
# Use the global HF_TOKEN which is validated at script start
|
260 |
self.client = InferenceClient(
|
261 |
model="meta-llama/Llama-3.1-8B-Instruct",
|
262 |
+
token=HF_TOKEN # Pass the token here
|
263 |
)
|
264 |
+
|
265 |
+
def format_prompt_for_conversational(self, query: str, context_chunks: List[Dict]) -> str:
|
266 |
+
"""
|
267 |
+
Format prompt with context and query as a single 'user' input
|
268 |
+
suitable for a conversational model.
|
269 |
+
"""
|
270 |
context_text = "\n\n".join([
|
271 |
f"Source: {chunk['source']}\nContent: {chunk['content']}"
|
272 |
for chunk in context_chunks
|
273 |
])
|
274 |
+
|
275 |
+
# We are putting the RAG prompt into the 'user' input for the conversational model.
|
276 |
+
# This is a common way to use a conversational model for RAG if text_generation isn't available.
|
277 |
+
prompt_as_user_input = f"""Based on the following context from uploaded documents, please answer the user's question.
|
278 |
|
279 |
Context:
|
280 |
{context_text}
|
|
|
283 |
|
284 |
Please provide a comprehensive answer based on the context above. If the context doesn't contain enough information to fully answer the question, please mention what information is available and what might be missing.
|
285 |
|
286 |
+
Answer:""" # Keeping "Answer:" to guide the model to start generating the answer directly.
|
287 |
+
return prompt_as_user_input
|
288 |
+
|
|
|
289 |
async def generate_response(self, query: str, context_chunks: List[Dict], trace_id: str) -> str:
|
290 |
+
"""Generate response using LLM via the conversational task."""
|
291 |
try:
|
292 |
+
# Format the RAG prompt as the user's input for the conversational model
|
293 |
+
formatted_input = self.format_prompt_for_conversational(query, context_chunks)
|
294 |
+
|
295 |
+
# Use the conversational task
|
296 |
+
response = self.client.conversational(
|
297 |
+
inputs=formatted_input, # This is the current user turn
|
298 |
+
# No past_user_inputs or generated_responses are provided initially
|
299 |
+
# to keep it stateless per query, akin to text_generation.
|
300 |
+
parameters={
|
301 |
+
"temperature": 0.7,
|
302 |
+
"max_new_tokens": 512,
|
303 |
+
# Add other parameters if needed, e.g., do_sample, top_p, top_k
|
304 |
+
# "do_sample": True,
|
305 |
+
# "top_p": 0.95,
|
306 |
+
# "top_k": 50,
|
307 |
+
}
|
308 |
)
|
309 |
+
|
310 |
+
# The conversational response has a list of generated responses.
|
311 |
+
# We assume the first one is the primary answer.
|
312 |
+
if response.generated_responses:
|
313 |
+
return response.generated_responses[0]
|
314 |
+
else:
|
315 |
+
logger.warning("LLM generated an empty response via conversational API.")
|
316 |
+
return "I apologize, the model did not generate a response."
|
317 |
+
|
318 |
except Exception as e:
|
319 |
+
logger.error(f"Error generating response with conversational LLM: {e}")
|
320 |
return f"I apologize, but I encountered an error while generating the response: {str(e)}"
|
321 |
|
322 |
# Coordinator Agent
|
|
|
325 |
super().__init__("CoordinatorAgent")
|
326 |
self.ingestion_agent = IngestionAgent()
|
327 |
self.retrieval_agent = RetrievalAgent()
|
328 |
+
self.llm_agent = LLMResponseAgent() # LLMResponseAgent will use the global HF_TOKEN
|
329 |
self.documents_processed = False
|
330 |
+
|
331 |
async def process_documents(self, files: List[str]) -> str:
|
332 |
"""Orchestrate document processing"""
|
333 |
trace_id = str(uuid.uuid4())
|
334 |
+
|
335 |
try:
|
336 |
# Step 1: Ingestion
|
337 |
await self.send_mcp_message(
|
338 |
+
"IngestionAgent",
|
339 |
+
"DOCUMENT_INGESTION_REQUEST",
|
340 |
+
{"files": files},
|
341 |
trace_id
|
342 |
)
|
343 |
+
|
344 |
documents = await self.ingestion_agent.process_documents(files, trace_id)
|
345 |
+
|
346 |
await self.send_mcp_message(
|
347 |
+
"RetrievalAgent",
|
348 |
+
"VECTOR_STORE_CREATE_REQUEST",
|
349 |
+
{"documents": len(documents)},
|
350 |
trace_id
|
351 |
)
|
352 |
+
|
353 |
# Step 2: Create vector store
|
354 |
await self.retrieval_agent.create_vector_store(documents, trace_id)
|
355 |
+
|
356 |
self.documents_processed = True
|
357 |
+
|
358 |
return f"Successfully processed {len(documents)} document chunks from {len(files)} files."
|
359 |
+
|
360 |
except Exception as e:
|
361 |
logger.error(f"Error in document processing: {e}")
|
362 |
return f"Error processing documents: {str(e)}"
|
363 |
+
|
364 |
async def answer_query(self, query: str) -> tuple[str, List[Dict]]:
|
365 |
"""Orchestrate query answering"""
|
366 |
if not self.documents_processed:
|
367 |
return "Please upload and process documents first.", []
|
368 |
+
|
369 |
trace_id = str(uuid.uuid4())
|
370 |
+
|
371 |
try:
|
372 |
# Step 1: Retrieval
|
373 |
await self.send_mcp_message(
|
374 |
+
"RetrievalAgent",
|
375 |
+
"RETRIEVAL_REQUEST",
|
376 |
+
{"query": query},
|
377 |
trace_id
|
378 |
)
|
379 |
+
|
380 |
context_chunks = await self.retrieval_agent.retrieve_relevant_chunks(query, k=5, trace_id=trace_id)
|
381 |
+
|
382 |
# Step 2: LLM Response
|
383 |
await self.send_mcp_message(
|
384 |
+
"LLMResponseAgent",
|
385 |
+
"LLM_GENERATION_REQUEST",
|
386 |
+
{"query": query, "context_chunks": len(context_chunks)},
|
387 |
trace_id
|
388 |
)
|
389 |
+
|
390 |
response = await self.llm_agent.generate_response(query, context_chunks, trace_id)
|
391 |
+
|
392 |
return response, context_chunks
|
393 |
+
|
394 |
except Exception as e:
|
395 |
logger.error(f"Error in query processing: {e}")
|
396 |
return f"Error processing query: {str(e)}", []
|
|
|
402 |
"""Process uploaded files"""
|
403 |
if not files:
|
404 |
return "β Please upload at least one file."
|
405 |
+
|
|
|
406 |
file_paths = []
|
407 |
for file in files:
|
408 |
+
temp_dir = tempfile.gettempdir()
|
409 |
+
unique_filename = f"{uuid.uuid4()}_{os.path.basename(file.name)}"
|
410 |
+
temp_path = os.path.join(temp_dir, unique_filename)
|
411 |
+
try:
|
412 |
+
file_content = file.read()
|
413 |
+
with open(temp_path, 'wb') as f:
|
414 |
+
f.write(file_content)
|
415 |
+
file_paths.append(temp_path)
|
416 |
+
except Exception as e:
|
417 |
+
logger.error(f"Error saving uploaded file {file.name}: {e}")
|
418 |
+
return f"β Error saving uploaded file {file.name}: {e}"
|
419 |
+
|
420 |
result = await coordinator.process_documents(file_paths)
|
421 |
+
|
422 |
+
for path in file_paths:
|
423 |
+
try:
|
424 |
+
os.remove(path)
|
425 |
+
except Exception as e:
|
426 |
+
logger.warning(f"Could not remove temporary file {path}: {e}")
|
427 |
+
|
428 |
return result
|
429 |
|
430 |
async def answer_question(query, history):
|
431 |
"""Answer user question"""
|
432 |
if not query.strip():
|
433 |
return history, ""
|
434 |
+
|
435 |
response, context_chunks = await coordinator.answer_query(query)
|
436 |
+
|
|
|
437 |
if context_chunks:
|
438 |
sources = "\n\n**Sources:**\n"
|
439 |
+
for i, chunk in enumerate(context_chunks[:3], 1):
|
440 |
sources += f"{i}. {chunk['source']} (Chunk {chunk['chunk_id']})\n"
|
441 |
response += sources
|
442 |
+
|
|
|
443 |
history.append((query, response))
|
444 |
+
|
445 |
return history, ""
|
446 |
|
447 |
+
# Custom CSS (unchanged)
|
448 |
custom_css = """
|
449 |
/* Main container styling */
|
450 |
.gradio-container {
|
|
|
487 |
}
|
488 |
|
489 |
/* Card styling */
|
490 |
+
.upload-card, .chat-card {
|
491 |
background: white !important;
|
492 |
border-radius: 15px !important;
|
493 |
padding: 2rem !important;
|
|
|
587 |
.header-title {
|
588 |
font-size: 2rem !important;
|
589 |
}
|
590 |
+
|
591 |
+
.upload-card, .chat-card {
|
592 |
padding: 1.5rem !important;
|
593 |
}
|
594 |
}
|
|
|
614 |
<p class="header-subtitle">Multi-Format Document QA using Model Context Protocol (MCP)</p>
|
615 |
</div>
|
616 |
""")
|
617 |
+
|
618 |
with gr.Tabs() as tabs:
|
619 |
+
# Upload Tab (now the first tab)
|
620 |
with gr.TabItem("π Upload Documents", elem_classes=["tab-nav"]):
|
621 |
gr.HTML("""
|
622 |
<div class="upload-card">
|
|
|
624 |
<p>Upload your documents in any supported format: PDF, DOCX, PPTX, CSV, TXT, or Markdown.</p>
|
625 |
</div>
|
626 |
""")
|
627 |
+
|
628 |
file_upload = gr.File(
|
629 |
label="Choose Files",
|
630 |
file_count="multiple",
|
631 |
file_types=[".pdf", ".docx", ".pptx", ".csv", ".txt", ".md"],
|
632 |
elem_classes=["file-upload"]
|
633 |
)
|
634 |
+
|
635 |
upload_button = gr.Button(
|
636 |
+
"Process Documents",
|
637 |
variant="primary",
|
638 |
elem_classes=["primary-button"]
|
639 |
)
|
640 |
+
|
641 |
upload_status = gr.Textbox(
|
642 |
label="Processing Status",
|
643 |
interactive=False,
|
644 |
elem_classes=["input-container"]
|
645 |
)
|
646 |
+
|
647 |
# Chat Tab
|
648 |
with gr.TabItem("π¬ Chat", elem_classes=["tab-nav"]):
|
649 |
gr.HTML("""
|
|
|
652 |
<p>Ask questions about your uploaded documents. The AI will provide answers based on the document content.</p>
|
653 |
</div>
|
654 |
""")
|
655 |
+
|
656 |
chatbot = gr.Chatbot(
|
657 |
label="Conversation",
|
658 |
height=400,
|
659 |
elem_classes=["chat-container"]
|
660 |
)
|
661 |
+
|
662 |
with gr.Row():
|
663 |
query_input = gr.Textbox(
|
664 |
label="Your Question",
|
|
|
666 |
elem_classes=["input-container"]
|
667 |
)
|
668 |
ask_button = gr.Button(
|
669 |
+
"Ask",
|
670 |
variant="primary",
|
671 |
elem_classes=["primary-button"]
|
672 |
)
|
673 |
+
|
674 |
gr.Examples(
|
675 |
examples=[
|
676 |
"What are the main topics covered in the documents?",
|
|
|
681 |
inputs=query_input,
|
682 |
label="Example Questions"
|
683 |
)
|
684 |
+
|
685 |
# Architecture Tab
|
686 |
with gr.TabItem("ποΈ Architecture", elem_classes=["tab-nav"]):
|
687 |
gr.HTML("""
|
|
|
690 |
<p>This system uses an agentic architecture with Model Context Protocol (MCP) for inter-agent communication.</p>
|
691 |
</div>
|
692 |
""")
|
693 |
+
|
694 |
gr.Markdown("""
|
695 |
## π Agent Flow Diagram
|
696 |
+
|
697 |
```
|
698 |
User Upload β CoordinatorAgent β IngestionAgent β RetrievalAgent β LLMResponseAgent
|
699 |
β β β β β
|
700 |
Documents MCP Messages Text Chunks Vector Store Final Response
|
701 |
```
|
702 |
+
|
703 |
## π€ Agent Descriptions
|
704 |
+
|
705 |
- **CoordinatorAgent**: Orchestrates the entire workflow and manages MCP communication
|
706 |
- **IngestionAgent**: Parses and preprocesses documents (PDF, DOCX, PPTX, CSV, TXT, MD)
|
707 |
- **RetrievalAgent**: Handles embeddings and semantic retrieval using FAISS
|
708 |
- **LLMResponseAgent**: Generates final responses using Llama-3.1-8B-Instruct
|
709 |
+
|
710 |
## π Tech Stack
|
711 |
+
|
712 |
- **Frontend**: Gradio with custom CSS
|
713 |
- **LLM**: Meta Llama-3.1-8B-Instruct (via HuggingFace Inference)
|
714 |
- **Embeddings**: sentence-transformers/all-MiniLM-L6-v2
|
715 |
- **Vector Store**: FAISS
|
716 |
- **Document Processing**: PyPDF2, python-docx, python-pptx, pandas
|
717 |
- **Framework**: LangChain for document handling
|
718 |
+
|
719 |
## π¨ MCP Message Example
|
720 |
+
|
721 |
```json
|
722 |
{
|
723 |
"sender": "RetrievalAgent",
|
724 |
+
"receiver": "LLMResponseAgent",
|
725 |
"type": "RETRIEVAL_RESULT",
|
726 |
"trace_id": "rag-457",
|
727 |
"payload": {
|
|
|
732 |
}
|
733 |
```
|
734 |
""")
|
735 |
+
|
736 |
# Event handlers
|
737 |
upload_button.click(
|
738 |
fn=process_files,
|
739 |
inputs=[file_upload],
|
740 |
outputs=[upload_status]
|
741 |
)
|
742 |
+
|
743 |
ask_button.click(
|
744 |
fn=answer_question,
|
745 |
inputs=[query_input, chatbot],
|
746 |
outputs=[chatbot, query_input]
|
747 |
)
|
748 |
+
|
749 |
query_input.submit(
|
750 |
fn=answer_question,
|
751 |
inputs=[query_input, chatbot],
|
752 |
outputs=[chatbot, query_input]
|
753 |
)
|
754 |
+
|
755 |
return demo
|
756 |
|
757 |
if __name__ == "__main__":
|