maahi2412's picture
Update app.py
a8d28cf verified
raw
history blame
18.7 kB
# from flask import Flask, request, jsonify
# import os
# import pdfplumber
# import pytesseract
# from PIL import Image
# from transformers import PegasusForConditionalGeneration, PegasusTokenizer
# import torch
# import logging
# app = Flask(__name__)
# # Set up logging
# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__)
# # Load Pegasus Model (load once globally)
# logger.info("Loading Pegasus model and tokenizer...")
# tokenizer = PegasusTokenizer.from_pretrained("google/pegasus-xsum")
# model = PegasusForConditionalGeneration.from_pretrained("google/pegasus-xsum").to("cpu") # Force CPU to manage memory
# logger.info("Model loaded successfully.")
# # Extract text from PDF with page limit
# def extract_text_from_pdf(file_path, max_pages=5):
# text = ""
# try:
# with pdfplumber.open(file_path) as pdf:
# total_pages = len(pdf.pages)
# pages_to_process = min(total_pages, max_pages)
# logger.info(f"Extracting text from {pages_to_process} of {total_pages} pages in {file_path}")
# for i, page in enumerate(pdf.pages[:pages_to_process]):
# try:
# extracted = page.extract_text()
# if extracted:
# text += extracted + "\n"
# else:
# logger.info(f"No text on page {i+1}, attempting OCR...")
# image = page.to_image().original
# text += pytesseract.image_to_string(image) + "\n"
# except Exception as e:
# logger.warning(f"Error processing page {i+1}: {e}")
# continue
# except Exception as e:
# logger.error(f"Failed to process PDF {file_path}: {e}")
# return ""
# return text.strip()
# # Extract text from image (OCR)
# def extract_text_from_image(file_path):
# try:
# logger.info(f"Extracting text from image {file_path} using OCR...")
# image = Image.open(file_path)
# text = pytesseract.image_to_string(image)
# return text.strip()
# except Exception as e:
# logger.error(f"Failed to process image {file_path}: {e}")
# return ""
# # Summarize text with chunking for large inputs
# def summarize_text(text, max_input_length=512, max_output_length=150):
# try:
# logger.info("Summarizing text...")
# # Tokenize and truncate to max_input_length
# inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=max_input_length, padding=True)
# input_length = inputs["input_ids"].shape[1]
# logger.info(f"Input length: {input_length} tokens")
# # Adjust generation params for efficiency
# summary_ids = model.generate(
# inputs["input_ids"],
# max_length=max_output_length,
# min_length=30,
# num_beams=2, # Reduce beams for speedup
# early_stopping=True,
# length_penalty=1.0, # Encourage shorter outputs
# )
# summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
# logger.info("Summarization completed.")
# return summary
# except Exception as e:
# logger.error(f"Error during summarization: {e}")
# return ""
# @app.route('/summarize', methods=['POST'])
# def summarize_document():
# if 'file' not in request.files:
# logger.error("No file uploaded in request.")
# return jsonify({"error": "No file uploaded"}), 400
# file = request.files['file']
# filename = file.filename
# if not filename:
# logger.error("Empty filename in request.")
# return jsonify({"error": "No file uploaded"}), 400
# file_path = os.path.join("/tmp", filename)
# try:
# file.save(file_path)
# logger.info(f"File saved to {file_path}")
# if filename.lower().endswith('.pdf'):
# text = extract_text_from_pdf(file_path, max_pages=2) # Reduce to 2 pages
# elif filename.lower().endswith(('.png', '.jpeg', '.jpg')):
# text = extract_text_from_image(file_path)
# else:
# logger.error(f"Unsupported file format: {filename}")
# return jsonify({"error": "Unsupported file format. Use PDF, PNG, JPEG, or JPG"}), 400
# if not text:
# logger.warning(f"No text extracted from {filename}")
# return jsonify({"error": "No text extracted from the file"}), 400
# summary = summarize_text(text)
# if not summary:
# logger.warning("Summarization failed to produce output.")
# return jsonify({"error": "Failed to generate summary"}), 500
# logger.info(f"Summary generated for {filename}")
# return jsonify({"summary": summary})
# except Exception as e:
# logger.error(f"Unexpected error processing {filename}: {e}")
# return jsonify({"error": str(e)}), 500
# finally:
# if os.path.exists(file_path):
# try:
# os.remove(file_path)
# logger.info(f"Cleaned up file: {file_path}")
# except Exception as e:
# logger.warning(f"Failed to delete {file_path}: {e}")
# if __name__ == '__main__':
# logger.info("Starting Flask app...")
# app.run(host='0.0.0.0', port=7860)
import os
import pdfplumber
from PIL import Image
import pytesseract
import numpy as np
from flask import Flask, request, jsonify
from flask_cors import CORS
from transformers import PegasusForConditionalGeneration, PegasusTokenizer, BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from datasets import load_dataset, concatenate_datasets
import torch
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
app = Flask(__name__)
CORS(app)
UPLOAD_FOLDER = 'uploads'
PEGASUS_MODEL_DIR = 'fine_tuned_pegasus'
BERT_MODEL_DIR = 'fine_tuned_bert'
LEGALBERT_MODEL_DIR = 'fine_tuned_legalbert'
MAX_FILE_SIZE = 100 * 1024 * 1024
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
transformers.logging.set_verbosity_error()
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1"
# Pegasus Fine-Tuning
def load_or_finetune_pegasus():
if os.path.exists(PEGASUS_MODEL_DIR):
print("Loading fine-tuned Pegasus model...")
tokenizer = PegasusTokenizer.from_pretrained(PEGASUS_MODEL_DIR)
model = PegasusForConditionalGeneration.from_pretrained(PEGASUS_MODEL_DIR)
else:
print("Fine-tuning Pegasus on CNN/Daily Mail and XSUM...")
tokenizer = PegasusTokenizer.from_pretrained("google/pegasus-xsum")
model = PegasusForConditionalGeneration.from_pretrained("google/pegasus-xsum")
# Load and combine datasets
cnn_dm = load_dataset("cnn_dailymail", "3.0.0", split="train[:5000]") # 5K samples
xsum = load_dataset("xsum", split="train[:5000]") # 5K samples
combined_dataset = concatenate_datasets([cnn_dm, xsum])
def preprocess_function(examples):
inputs = tokenizer(examples["article"] if "article" in examples else examples["document"],
max_length=512, truncation=True, padding="max_length")
targets = tokenizer(examples["highlights"] if "highlights" in examples else examples["summary"],
max_length=400, truncation=True, padding="max_length")
inputs["labels"] = targets["input_ids"]
return inputs
tokenized_dataset = combined_dataset.map(preprocess_function, batched=True)
train_dataset = tokenized_dataset.select(range(8000)) # 80%
eval_dataset = tokenized_dataset.select(range(8000, 10000)) # 20%
training_args = TrainingArguments(
output_dir="./pegasus_finetune",
num_train_epochs=3, # Increased for better fine-tuning
per_device_train_batch_size=1,
per_device_eval_batch_size=1,
warmup_steps=500,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=10,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
)
trainer.train()
trainer.save_model(PEGASUS_MODEL_DIR)
tokenizer.save_pretrained(PEGASUS_MODEL_DIR)
print(f"Fine-tuned Pegasus saved to {PEGASUS_MODEL_DIR}")
return tokenizer, model
# BERT Fine-Tuning
def load_or_finetune_bert():
if os.path.exists(BERT_MODEL_DIR):
print("Loading fine-tuned BERT model...")
tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_DIR)
model = BertForSequenceClassification.from_pretrained(BERT_MODEL_DIR, num_labels=2)
else:
print("Fine-tuning BERT on CNN/Daily Mail for extractive summarization...")
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=2)
# Load dataset and preprocess for sentence classification
cnn_dm = load_dataset("cnn_dailymail", "3.0.0", split="train[:5000]")
def preprocess_for_extractive(examples):
sentences = []
labels = []
for article, highlights in zip(examples["article"], examples["highlights"]):
article_sents = article.split(". ")
highlight_sents = highlights.split(". ")
for sent in article_sents:
if sent.strip():
# Label as 1 if sentence is similar to any highlight, else 0
is_summary = any(sent.strip() in h for h in highlight_sents)
sentences.append(sent)
labels.append(1 if is_summary else 0)
return {"sentence": sentences, "label": labels}
dataset = cnn_dm.map(preprocess_for_extractive, batched=True, remove_columns=["article", "highlights", "id"])
tokenized_dataset = dataset.map(
lambda x: tokenizer(x["sentence"], max_length=512, truncation=True, padding="max_length"),
batched=True
)
tokenized_dataset = tokenized_dataset.remove_columns(["sentence"])
train_dataset = tokenized_dataset.select(range(int(0.8 * len(tokenized_dataset))))
eval_dataset = tokenized_dataset.select(range(int(0.8 * len(tokenized_dataset)), len(tokenized_dataset)))
training_args = TrainingArguments(
output_dir="./bert_finetune",
num_train_epochs=3,
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
warmup_steps=500,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=10,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
)
trainer.train()
trainer.save_model(BERT_MODEL_DIR)
tokenizer.save_pretrained(BERT_MODEL_DIR)
print(f"Fine-tuned BERT saved to {BERT_MODEL_DIR}")
return tokenizer, model
# LegalBERT Fine-Tuning
def load_or_finetune_legalbert():
if os.path.exists(LEGALBERT_MODEL_DIR):
print("Loading fine-tuned LegalBERT model...")
tokenizer = BertTokenizer.from_pretrained(LEGALBERT_MODEL_DIR)
model = BertForSequenceClassification.from_pretrained(LEGALBERT_MODEL_DIR, num_labels=2)
else:
print("Fine-tuning LegalBERT on Billsum for extractive summarization...")
tokenizer = BertTokenizer.from_pretrained("nlpaueb/legal-bert-base-uncased")
model = BertForSequenceClassification.from_pretrained("nlpaueb/legal-bert-base-uncased", num_labels=2)
# Load dataset
billsum = load_dataset("billsum", split="train[:5000]")
def preprocess_for_extractive(examples):
sentences = []
labels = []
for text, summary in zip(examples["text"], examples["summary"]):
text_sents = text.split(". ")
summary_sents = summary.split(". ")
for sent in text_sents:
if sent.strip():
is_summary = any(sent.strip() in s for s in summary_sents)
sentences.append(sent)
labels.append(1 if is_summary else 0)
return {"sentence": sentences, "label": labels}
dataset = billsum.map(preprocess_for_extractive, batched=True, remove_columns=["text", "summary", "title"])
tokenized_dataset = dataset.map(
lambda x: tokenizer(x["sentence"], max_length=512, truncation=True, padding="max_length"),
batched=True
)
tokenized_dataset = tokenized_dataset.remove_columns(["sentence"])
train_dataset = tokenized_dataset.select(range(int(0.8 * len(tokenized_dataset))))
eval_dataset = tokenized_dataset.select(range(int(0.8 * len(tokenized_dataset)), len(tokenized_dataset)))
training_args = TrainingArguments(
output_dir="./legalbert_finetune",
num_train_epochs=3,
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
warmup_steps=500,
weight_decay=0.01,
logging_dir="./logs",
logging_steps=10,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
)
trainer.train()
trainer.save_model(LEGALBERT_MODEL_DIR)
tokenizer.save_pretrained(LEGALBERT_MODEL_DIR)
print(f"Fine-tuned LegalBERT saved to {LEGALBERT_MODEL_DIR}")
return tokenizer, model
# Load models
pegasus_tokenizer, pegasus_model = load_or_finetune_pegasus()
bert_tokenizer, bert_model = load_or_finetune_bert()
legalbert_tokenizer, legalbert_model = load_or_finetune_legalbert()
def extract_text_from_pdf(file_path):
text = ""
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
text += page.extract_text() or ""
return text
def extract_text_from_image(file_path):
image = Image.open(file_path)
text = pytesseract.image_to_string(image)
return text
def choose_model(text):
legal_keywords = ["court", "legal", "law", "judgment", "contract", "statute", "case"]
tfidf = TfidfVectorizer(vocabulary=legal_keywords)
tfidf_matrix = tfidf.fit_transform([text.lower()])
score = np.sum(tfidf_matrix.toarray())
if score > 0.1:
return "legalbert"
elif len(text.split()) > 50:
return "pegasus"
else:
return "bert"
def summarize_with_pegasus(text):
inputs = pegasus_tokenizer(text, truncation=True, padding="longest", return_tensors="pt", max_length=512)
summary_ids = pegasus_model.generate(
inputs["input_ids"],
max_length=400, min_length=80, length_penalty=1.5, num_beams=4
)
return pegasus_tokenizer.decode(summary_ids[0], skip_special_tokens=True)
def summarize_with_bert(text):
sentences = text.split(". ")
if len(sentences) < 6: # Ensure enough for 5 sentences
return text
inputs = bert_tokenizer(sentences, return_tensors="pt", padding=True, truncation=True, max_length=512)
with torch.no_grad():
outputs = bert_model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=1)[:, 1] # Probability of being a summary sentence
key_sentence_idx = probs.argsort(descending=True)[:5] # Top 5 sentences
return ". ".join([sentences[idx] for idx in key_sentence_idx if sentences[idx].strip()])
def summarize_with_legalbert(text):
sentences = text.split(". ")
if len(sentences) < 6:
return text
inputs = legalbert_tokenizer(sentences, return_tensors="pt", padding=True, truncation=True, max_length=512)
with torch.no_grad():
outputs = legalbert_model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=1)[:, 1]
key_sentence_idx = probs.argsort(descending=True)[:5]
return ". ".join([sentences[idx] for idx in key_sentence_idx if sentences[idx].strip()])
@app.route('/summarize', methods=['POST'])
def summarize_document():
if 'file' not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files['file']
filename = file.filename
file.seek(0, os.SEEK_END)
file_size = file.tell()
if file_size > MAX_FILE_SIZE:
return jsonify({"error": f"File size exceeds {MAX_FILE_SIZE // (1024 * 1024)} MB"}), 413
file.seek(0)
file_path = os.path.join(UPLOAD_FOLDER, filename)
try:
file.save(file_path)
except Exception as e:
return jsonify({"error": f"Failed to save file: {str(e)}"}), 500
try:
if filename.endswith('.pdf'):
text = extract_text_from_pdf(file_path)
elif filename.endswith(('.png', '.jpeg', '.jpg')):
text = extract_text_from_image(file_path)
else:
os.remove(file_path)
return jsonify({"error": "Unsupported file format."}), 400
except Exception as e:
os.remove(file_path)
return jsonify({"error": f"Text extraction failed: {str(e)}"}), 500
if not text.strip():
os.remove(file_path)
return jsonify({"error": "No text extracted"}), 400
try:
model = choose_model(text)
if model == "pegasus":
summary = summarize_with_pegasus(text)
elif model == "bert":
summary = summarize_with_bert(text)
elif model == "legalbert":
summary = summarize_with_legalbert(text)
except Exception as e:
os.remove(file_path)
return jsonify({"error": f"Summarization failed: {str(e)}"}), 500
os.remove(file_path)
return jsonify({"model_used": model, "summary": summary})
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)