MikeMai's picture
Update app.py
da89fc6 verified
from pdfminer.high_level import extract_text
from pdf2image import convert_from_path # Convert PDF pages to images
import base64
import io
import os
from PIL import Image
import json
from openai import OpenAI
from dotenv import load_dotenv
from huggingface_hub import HfApi
import shutil
import gradio as gr
load_dotenv()
client = OpenAI()
# from huggingface_hub import login
# login(token=os.getenv("HF_API_KEY"))
# Function to encode image to Base64
def encode_image(image_input):
"""
Encode an image to Base64.
Supports both file paths (str) and in-memory PIL images.
"""
if isinstance(image_input, str): # If input is a file path
with open(image_input, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
elif isinstance(image_input, Image.Image): # If input is a PIL image
buffered = io.BytesIO()
image_input.save(buffered, format="JPEG")
return base64.b64encode(buffered.getvalue()).decode("utf-8")
else:
raise ValueError("Unsupported input type. Provide a file path or a PIL image.")
# Function to process image files
def process_image(image_path):
print(f"πŸ–ΌοΈ Processing image file: {image_path}")
image_base64 = encode_image(image_path)
image_url = f"data:image/jpeg;base64,{image_base64}"
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": [
{"type": "text", "text": "Extract all text from this image."},
{"type": "image_url", "image_url": {"url": image_url}},
],
}
],
)
extracted_text = response.choices[0].message.content.strip()
# print(f"πŸ“ Extracted text: {extracted_text}")
return extracted_text
# Function to process text-based PDFs
def process_text_pdf(pdf_path):
text_content = extract_text(pdf_path).strip()
if text_content:
print(f"πŸ“„ Extracting text from PDF: {pdf_path}")
return text_content
return None # No text found, fallback to image processing
# Function to process scanned PDFs (image-based)
def process_image_pdf(pdf_path):
print(f"πŸ–ΌοΈ No text found! Processing as an image-based (scanned) PDF: {pdf_path}")
images = convert_from_path(pdf_path)
extracted_text = []
for i, image in enumerate(images):
image_text = process_image(image)
extracted_text.append(image_text)
return "\n\n".join(extracted_text)
# Function to detect file type and extract text accordingly
def process_file(file_path):
if not os.path.exists(file_path):
print(f"❌ Error: File not found: {file_path}")
return None
file_extension = file_path.lower().split(".")[-1]
if file_extension in ["jpg", "jpeg", "png"]:
return process_image(file_path) # Process images
elif file_extension == "pdf":
text_data = process_text_pdf(file_path)
if text_data: # If text extraction succeeds, return it
return text_data
return process_image_pdf(file_path) # Otherwise, process as image
else:
print(f"❌ Unsupported file type: {file_path}")
return None
def extract_certificate_details(certificate_path):
certificate_text = process_file(certificate_path)
print(f"πŸ–ΌοΈ Extracting details from certificate: {certificate_path}")
if not certificate_text:
print(f"❌ Error: Certificate text could not be extracted from {certificate_path}")
return None
# Ask GPT-4o to compare the texts
response = client.chat.completions.create(
model="gpt-4o",
response_format={ "type": "json_object" },
seed=123,
temperature=0,
messages=[
{
"role": "developer",
"content": f"""Extract the following details from the certificate text in JSON format, leave blank if not found:
{{
"Certificate Name": "",
"Certificate ID": "",
"Ship Name": "",
"Date of Issue": "",
"Expiration Date": ""
}}
Certificate Text:
{certificate_text}
"""
}
],
)
result = response.choices[0].message.content
result_json = json.loads(result) # Parse the result as JSON
certificate_name = result_json.get("Certificate Name", "")
certificate_id = result_json.get("Certificate ID", "")
ship_name = result_json.get("Ship Name", "")
date_of_issue = result_json.get("Date of Issue", "")
expiration_date = result_json.get("Expiration Date", "")
print(f"βœ… Extracted details:\n- Certificate Name: {certificate_name}\n- Certificate ID: {certificate_id}\n- Ship Name: {ship_name}\n- Date of Issue: {date_of_issue}\n- Expiration Date: {expiration_date}")
return certificate_text, certificate_name, certificate_id, ship_name, date_of_issue, expiration_date
# Function to compare two certificates using AI
def compare_certificates(new_cert_details, old_cert_details):
# Ask GPT-4o to compare the texts
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": f"""Compare the two certificates below and provide a structured summary highlighting key differences in the format below:
### Comparison Summary:
- Identify differences in terms of:
- Certificate ID
- Date of Issue
- Expiration Date
- Highlight any changes in other key details, if applicable.
### Take Note:
- Clearly structure the output for easy reading
- Do not include any structural changes in the text, only content changes
### Old Certificate:
{old_cert_details}
### New Certificate:
{new_cert_details}"""
}
],
)
comparison_result = response.choices[0].message.content.strip()
return comparison_result
def gradio_upload_certificate(uploaded_file):
# Save uploaded file to local path immediately
file_ext = os.path.splitext(uploaded_file.name)[-1]
temp_path = f"temp_uploaded_file{file_ext}"
shutil.copy(uploaded_file, temp_path)
extracted = extract_certificate_details(temp_path)
if not extracted:
return "❌ Failed to extract certificate details."
certificate_text, certificate_name, certificate_id, ship_name, date_of_issue, expiration_date = extracted
if not all([certificate_name, ship_name]):
return "❌ Missing key fields, unable to rename or upload."
safe_cert_name = certificate_name.replace(" ", "_")
safe_ship_name = ship_name.replace(" ", "_")
save_dir = os.path.join("hf_dataset_upload", safe_ship_name, safe_cert_name)
os.makedirs(save_dir, exist_ok=True)
# Check for existing certificates in the directory
existing_files = [
f for f in os.listdir(save_dir) if os.path.isfile(os.path.join(save_dir, f))
]
if existing_files:
old_cert_path = os.path.join(save_dir, existing_files[0])
print(f"πŸ“‚ Existing certificate found: {old_cert_path}")
old_text, old_name, old_id, old_ship_name, old_date_of_issue, old_expiration_date = extract_certificate_details(old_cert_path)
if not old_text:
return "❌ Failed to process the existing certificate for comparison."
new_cert_details = {
"Certificate Name": certificate_name,
"Certificate ID": certificate_id,
"Ship Name": ship_name,
"Date of Issue": date_of_issue,
"Expiration Date": expiration_date,
"Certificate Text": certificate_text
}
old_cert_details = {
"Certificate Name": old_name,
"Certificate ID": old_id,
"Ship Name": old_ship_name,
"Date of Issue": old_date_of_issue,
"Expiration Date": old_expiration_date,
"Certificate Text": old_text
}
# Compare the old and new certificates
comparison_result = compare_certificates(new_cert_details, old_cert_details)
# Always delete the existing file before saving the new one
for existing_file in existing_files:
os.remove(os.path.join(save_dir, existing_file))
# Remove the file from Hugging Face as well
hf_file_path = f"{safe_ship_name}/{safe_cert_name}/{existing_file}"
api = HfApi(token=os.getenv("HF_API_KEY"))
api.delete_file(
path_in_repo=hf_file_path,
repo_id="MikeMai/Certificates_Management",
repo_type="dataset",
)
# Replace the existing file with the uploaded file
new_filename = f"{safe_ship_name}_{safe_cert_name}{file_ext}"
new_path = os.path.join(save_dir, new_filename)
shutil.copy(temp_path, new_path)
print(f"βœ… Replaced the existing file with the uploaded file: {new_path}")
api = HfApi(token=os.getenv("HF_API_KEY"))
api.upload_folder(
folder_path="hf_dataset_upload",
repo_id="MikeMai/Certificates_Management",
repo_type="dataset",
)
hf_path = f"https://huggingface.co/datasets/MikeMai/Certificates_Management/blob/main/{safe_ship_name}/{safe_cert_name}/{new_filename}"
return f"""
βœ… **Certificate Uploaded Successfully! Existing Certificate**
πŸ”— [View on Hugging Face Hub]({hf_path})
**New Certificate Details**:
**Certificate Name**: {new_cert_details['Certificate Name']}
**Certificate ID**: {new_cert_details['Certificate ID']}
**Ship Name**: {new_cert_details['Ship Name']}
**Date of Issue**: {new_cert_details['Date of Issue'] or "N/A"}
**Expiration Date**: {new_cert_details['Expiration Date'] or "N/A"}
**Old Certificate Details**:
**Certificate Name**: {old_cert_details['Certificate Name']}
**Certificate ID**: {old_cert_details['Certificate ID']}
**Ship Name**: {old_cert_details['Ship Name']}
**Date of Issue**: {old_cert_details['Date of Issue'] or "N/A"}
**Expiration Date**: {old_cert_details['Expiration Date'] or "N/A"}
{comparison_result}
"""
else:
# Save the new file if it doesn't exist
new_filename = f"{safe_ship_name}_{safe_cert_name}{file_ext}"
new_path = os.path.join(save_dir, new_filename)
shutil.copy(temp_path, new_path)
api = HfApi(token=os.getenv("HF_API_KEY"))
api.upload_folder(
folder_path="hf_dataset_upload",
repo_id="MikeMai/Certificates_Management",
repo_type="dataset",
)
hf_path = f"https://huggingface.co/datasets/MikeMai/Certificates_Management/blob/main/{safe_ship_name}/{safe_cert_name}/{new_filename}"
return f"""
βœ… **Certificate Uploaded Successfully!**
**Certificate Name**: {certificate_name}
**Certificate ID**: {certificate_id}
**Ship Name**: {ship_name}
**Date of Issue**: {date_of_issue or "N/A"}
**Expiration Date**: {expiration_date or "N/A"}
πŸ”— [View on Hugging Face Hub]({hf_path})
"""
# Launch Gradio UI
gr.Interface(
fn=gradio_upload_certificate,
inputs=gr.File(label="Upload Certificate (PDF or Image)"),
outputs=gr.Markdown(label="Upload Result"),
title="πŸ“œ Certificate Manager",
description="Upload a certificate to extract certificate details, rename, and store in respective folders.",
show_progress='full',
allow_flagging="never"
).launch()
# # Run the script with your files
# old_cert = "load_line_cert_old.jpg" # Change to your old cert file
# new_cert = "load_line_cert_new.pdf" # Change to your new cert file
# extract_certificate_details(new_cert)
# comparison_result = compare_certificates(old_cert, new_cert, True)
# print("\nπŸ”Ž AI-Based Structured Comparison:\n")
# print(comparison_result)
# Gradio Interface ------------------------------
# import gradio as gr
# from gradio.themes.base import Base
# interface = gr.Interface(
# fn=compare_certificates,
# title="Certificate Comparison Summarizer",
# inputs=[gr.File(label="Old Certificate"), gr.File(label="New Certificate")],
# outputs=[gr.Textbox(label="Comparison Summary")],
# allow_flagging="never",
# theme=Base()
# )
# interface.launch()