|
import gradio as gr |
|
import fitz |
|
import os |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
from wordcloud import WordCloud |
|
from io import BytesIO |
|
from collections import Counter |
|
import re |
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from sentence_transformers import SentenceTransformer |
|
from transformers import pipeline |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
|
|
embed_model = SentenceTransformer('all-MiniLM-L6-v2') |
|
qa_pipeline = pipeline("question-answering", model="deepset/tinyroberta-squad2") |
|
|
|
|
|
all_chunks = [] |
|
chunk_sources = [] |
|
chunk_embeddings = None |
|
combined_text = "" |
|
|
|
def extract_text_from_pdfs(pdf_files): |
|
"""Extract text and page info from uploaded PDFs""" |
|
global all_chunks, chunk_sources, combined_text |
|
texts = [] |
|
chunk_sources = [] |
|
combined_text = "" |
|
|
|
for file in pdf_files: |
|
doc = fitz.open(file.name) |
|
for page_num, page in enumerate(doc): |
|
text = page.get_text() |
|
if text.strip(): |
|
texts.append((text, f"{os.path.basename(file.name)} - Page {page_num + 1}")) |
|
combined_text += " " + text |
|
|
|
return texts |
|
|
|
def split_and_embed(texts_with_sources): |
|
"""Split text into chunks and compute embeddings""" |
|
global all_chunks, chunk_sources, chunk_embeddings |
|
all_chunks = [] |
|
chunk_sources = [] |
|
|
|
splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) |
|
|
|
for text, source in texts_with_sources: |
|
docs = splitter.create_documents([text]) |
|
for doc in docs: |
|
all_chunks.append(doc.page_content) |
|
chunk_sources.append(source) |
|
|
|
if all_chunks: |
|
chunk_embeddings = embed_model.encode(all_chunks, convert_to_numpy=True) |
|
else: |
|
chunk_embeddings = None |
|
|
|
def generate_wordcloud(): |
|
"""Generate a word cloud from combined PDF text""" |
|
global combined_text |
|
if not combined_text.strip(): |
|
return None |
|
|
|
cleaned = re.sub(r"[^a-zA-Z\s]", "", combined_text.lower()) |
|
word_freq = Counter(cleaned.split()) |
|
|
|
wc = WordCloud(width=800, height=400, background_color="white").generate_from_frequencies(word_freq) |
|
|
|
fig, ax = plt.subplots() |
|
ax.imshow(wc, interpolation='bilinear') |
|
ax.axis("off") |
|
|
|
buffer = BytesIO() |
|
plt.savefig(buffer, format="png") |
|
buffer.seek(0) |
|
return buffer |
|
|
|
def answer_question(question): |
|
"""Retrieve top chunks, answer question, and show confidence""" |
|
global all_chunks, chunk_sources, chunk_embeddings |
|
print("π₯ Question received:", question) |
|
|
|
if not all_chunks or chunk_embeddings is None: |
|
print("β οΈ PDF not processed or empty.") |
|
return "Please upload and process some PDFs first.", None |
|
|
|
q_emb = embed_model.encode([question], convert_to_numpy=True) |
|
sims = cosine_similarity(q_emb, chunk_embeddings)[0] |
|
top_k_idx = sims.argsort()[::-1][:3] |
|
|
|
selected_chunks = [all_chunks[i] for i in top_k_idx] |
|
selected_sources = [chunk_sources[i] for i in top_k_idx] |
|
context = "\n\n".join(selected_chunks) |
|
|
|
if not context.strip(): |
|
print("β οΈ Empty context from chunks.") |
|
return "Could not extract relevant content from the PDFs.", None |
|
|
|
try: |
|
answer_dict = qa_pipeline(question=question, context=context) |
|
answer = answer_dict.get("answer", "No answer found.") |
|
except Exception as e: |
|
print("β Error from QA model:", e) |
|
return "Model failed to generate an answer.", None |
|
|
|
avg_conf = np.mean([sims[i] for i in top_k_idx]) * 100 |
|
source_info = "\n".join([f"- {src}" for src in selected_sources]) |
|
result = f"**Answer**: {answer}\n\n**Sources**:\n{source_info}\n\n**Confidence Score**: {avg_conf:.2f}%" |
|
print("β
Answer generated.") |
|
return result, None |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# π Enhanced RAG PDF Chatbot") |
|
gr.Markdown("Upload PDFs β Preview Keywords β Ask Questions β Get Answers with Confidence & Sources") |
|
|
|
with gr.Row(): |
|
pdf_input = gr.File(file_types=[".pdf"], file_count="multiple", label="Upload PDFs") |
|
load_button = gr.Button("Extract & Index") |
|
cloud_output = gr.Image(label="Keyword Preview (Word Cloud)") |
|
|
|
with gr.Row(): |
|
question_input = gr.Textbox(lines=2, placeholder="Ask your question here...", label="Question") |
|
ask_button = gr.Button("Get Answer") |
|
answer_output = gr.Markdown() |
|
|
|
def load_and_index(files): |
|
texts = extract_text_from_pdfs(files) |
|
split_and_embed(texts) |
|
return generate_wordcloud() |
|
|
|
load_button.click(fn=load_and_index, inputs=[pdf_input], outputs=[cloud_output]) |
|
ask_button.click(fn=answer_question, inputs=[question_input], outputs=[answer_output, cloud_output]) |
|
|
|
demo.launch() |
|
|