|
import re |
|
import nltk |
|
import spacy |
|
import fitz |
|
from transformers import pipeline |
|
import textwrap |
|
import gradio as gr |
|
import spacy |
|
|
|
|
|
|
|
|
|
nltk.download('punkt') |
|
nltk.download('punkt_tab') |
|
|
|
|
|
try: |
|
nlp = spacy.load("en_core_web_sm") |
|
except Exception as e: |
|
spacy.cli.download("en_core_web_sm") |
|
nlp = spacy.load("en_core_web_sm") |
|
|
|
|
|
summarizer = pipeline("summarization", model="google/bigbird-pegasus-large-pubmed") |
|
|
|
|
|
def read_pdf_with_content_filter(file_path, keywords=["Abstract", "Introduction", "Methods", "Results", "Conclusions"]): |
|
""" |
|
Reads a PDF file and returns text only from pages that contain one of the specified keywords. |
|
This helps exclude pages that mainly contain header/metadata. |
|
""" |
|
doc = fitz.open(file_path) |
|
content_pages = [] |
|
for i in range(len(doc)): |
|
page_text = doc[i].get_text() |
|
if any(keyword.lower() in page_text.lower() for keyword in keywords): |
|
content_pages.append(page_text) |
|
return "\n".join(content_pages) |
|
|
|
|
|
def clean_text(text): |
|
""" |
|
Cleans the text by removing citations, extra whitespace, and unwanted characters. |
|
""" |
|
text = re.sub(r'\[\d+\]', '', text) |
|
text = re.sub(r'\(\d+\)', '', text) |
|
text = re.sub(r'\s+', ' ', text) |
|
return text.strip() |
|
|
|
|
|
def extract_core_sections(text): |
|
""" |
|
Attempts to extract core sections using common headings. |
|
Returns a dictionary with section name (lowercase) as key and its content as value. |
|
""" |
|
pattern = r'(?i)(Abstract|Introduction|Methods|Results|Conclusions|Discussion)\s*[:\n\.]' |
|
splits = re.split(pattern, text) |
|
sections = {} |
|
if len(splits) > 1: |
|
for i in range(1, len(splits), 2): |
|
heading = splits[i].strip().lower() |
|
content = splits[i+1].strip() if i+1 < len(splits) else "" |
|
sections[heading] = content |
|
return sections |
|
|
|
|
|
def remove_header_metadata(text, marker="Competing Interests:"): |
|
""" |
|
Removes header/metadata from the text by using a marker. |
|
If the marker is found, returns text after it; otherwise, returns the original text. |
|
""" |
|
idx = text.find(marker) |
|
if idx != -1: |
|
return text[idx + len(marker):].strip() |
|
return text |
|
|
|
|
|
def split_into_chunks(text, chunk_size=500): |
|
""" |
|
Splits the text into chunks of approximately chunk_size words. |
|
""" |
|
words = text.split() |
|
chunks = [] |
|
for i in range(0, len(words), chunk_size): |
|
chunk = " ".join(words[i:i+chunk_size]) |
|
chunks.append(chunk) |
|
return chunks |
|
|
|
|
|
def summarize_text(text, max_length=200, min_length=50): |
|
""" |
|
Summarizes the given text using BigBird-Pegasus. |
|
Adjusts output lengths if the input is very short. |
|
""" |
|
input_length = len(text.split()) |
|
if input_length < 60: |
|
max_length = min(max_length, 40) |
|
min_length = min(min_length, 10) |
|
summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False) |
|
return summary[0]['summary_text'] |
|
|
|
|
|
def format_bullet_points(summary): |
|
""" |
|
Splits the summary into sentences and formats each as a bullet point. |
|
""" |
|
sentences = nltk.sent_tokenize(summary) |
|
bullets = ["- " + sentence for sentence in sentences] |
|
return "\n".join(bullets) |
|
|
|
|
|
def bullet_to_paragraph_wrapped(bullet_text, width=80): |
|
""" |
|
Converts bullet point summary into a paragraph and wraps the text to a specified width. |
|
""" |
|
paragraph = bullet_text.replace("- ", "").replace("<n>", " ") |
|
paragraph = re.sub(r'\s+', ' ', paragraph).strip() |
|
wrapped_paragraph = textwrap.fill(paragraph, width=width) |
|
return wrapped_paragraph |
|
|
|
|
|
def process_pdf(file_obj): |
|
""" |
|
Processes the uploaded PDF file and returns a bullet summary and a wrapped paragraph summary. |
|
""" |
|
|
|
full_text = read_pdf_with_content_filter(file_obj.name) |
|
cleaned_text = clean_text(full_text) |
|
sections = extract_core_sections(cleaned_text) |
|
if not sections: |
|
core_text = remove_header_metadata(cleaned_text) |
|
else: |
|
order = ['abstract', 'introduction', 'methods', 'results', 'conclusions', 'discussion'] |
|
core_content = [sections[sec] for sec in order if sec in sections] |
|
core_text = " ".join(core_content) if core_content else cleaned_text |
|
|
|
chunks = split_into_chunks(core_text, chunk_size=500) |
|
chunk_summaries = [] |
|
for chunk in chunks: |
|
try: |
|
chunk_summary = summarize_text(chunk, max_length=200, min_length=50) |
|
except Exception as e: |
|
chunk_summary = "" |
|
chunk_summaries.append(chunk_summary) |
|
final_core_summary_text = " ".join(chunk_summaries) |
|
final_summary = summarize_text(final_core_summary_text, max_length=200, min_length=50) |
|
bullet_points = format_bullet_points(final_summary) |
|
paragraph_summary_wrapped = bullet_to_paragraph_wrapped(bullet_points, width=80) |
|
return bullet_points, paragraph_summary_wrapped |
|
|
|
|
|
iface = gr.Interface( |
|
fn=process_pdf, |
|
inputs=gr.File(label="Upload a Medical PDF"), |
|
outputs=[ |
|
gr.Textbox(label="Bullet Summary"), |
|
gr.Textbox(label="Paragraph Summary") |
|
], |
|
title="Medical Document Summarization", |
|
description="Upload a medical PDF document to get a summarized bullet-point and paragraph summary of its core content." |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch() |