Spaces:
Sleeping
Sleeping
from pdfminer.high_level import extract_text | |
from pdf2image import convert_from_path # Convert PDF pages to images | |
import base64 | |
import io | |
import os | |
from PIL import Image | |
import json | |
from openai import OpenAI | |
from dotenv import load_dotenv | |
from huggingface_hub import HfApi | |
import shutil | |
import gradio as gr | |
load_dotenv() | |
client = OpenAI() | |
# from huggingface_hub import login | |
# login(token=os.getenv("HF_API_KEY")) | |
# Function to encode image to Base64 | |
def encode_image(image_input): | |
""" | |
Encode an image to Base64. | |
Supports both file paths (str) and in-memory PIL images. | |
""" | |
if isinstance(image_input, str): # If input is a file path | |
with open(image_input, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode("utf-8") | |
elif isinstance(image_input, Image.Image): # If input is a PIL image | |
buffered = io.BytesIO() | |
image_input.save(buffered, format="JPEG") | |
return base64.b64encode(buffered.getvalue()).decode("utf-8") | |
else: | |
raise ValueError("Unsupported input type. Provide a file path or a PIL image.") | |
# Function to process image files | |
def process_image(image_path): | |
print(f"πΌοΈ Processing image file: {image_path}") | |
image_base64 = encode_image(image_path) | |
image_url = f"data:image/jpeg;base64,{image_base64}" | |
response = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{"type": "text", "text": "Extract all text from this image."}, | |
{"type": "image_url", "image_url": {"url": image_url}}, | |
], | |
} | |
], | |
) | |
extracted_text = response.choices[0].message.content.strip() | |
# print(f"π Extracted text: {extracted_text}") | |
return extracted_text | |
# Function to process text-based PDFs | |
def process_text_pdf(pdf_path): | |
text_content = extract_text(pdf_path).strip() | |
if text_content: | |
print(f"π Extracting text from PDF: {pdf_path}") | |
return text_content | |
return None # No text found, fallback to image processing | |
# Function to process scanned PDFs (image-based) | |
def process_image_pdf(pdf_path): | |
print(f"πΌοΈ No text found! Processing as an image-based (scanned) PDF: {pdf_path}") | |
images = convert_from_path(pdf_path) | |
extracted_text = [] | |
for i, image in enumerate(images): | |
image_text = process_image(image) | |
extracted_text.append(image_text) | |
return "\n\n".join(extracted_text) | |
# Function to detect file type and extract text accordingly | |
def process_file(file_path): | |
if not os.path.exists(file_path): | |
print(f"β Error: File not found: {file_path}") | |
return None | |
file_extension = file_path.lower().split(".")[-1] | |
if file_extension in ["jpg", "jpeg", "png"]: | |
return process_image(file_path) # Process images | |
elif file_extension == "pdf": | |
text_data = process_text_pdf(file_path) | |
if text_data: # If text extraction succeeds, return it | |
return text_data | |
return process_image_pdf(file_path) # Otherwise, process as image | |
else: | |
print(f"β Unsupported file type: {file_path}") | |
return None | |
def extract_certificate_details(certificate_path): | |
certificate_text = process_file(certificate_path) | |
print(f"πΌοΈ Extracting details from certificate: {certificate_path}") | |
if not certificate_text: | |
print(f"β Error: Certificate text could not be extracted from {certificate_path}") | |
return None | |
# Ask GPT-4o to compare the texts | |
response = client.chat.completions.create( | |
model="gpt-4o", | |
response_format={ "type": "json_object" }, | |
seed=123, | |
temperature=0, | |
messages=[ | |
{ | |
"role": "developer", | |
"content": f"""Extract the following details from the certificate text in JSON format, leave blank if not found: | |
{{ | |
"Certificate Name": "", | |
"Certificate ID": "", | |
"Ship Name": "", | |
"Date of Issue": "", | |
"Expiration Date": "" | |
}} | |
Certificate Text: | |
{certificate_text} | |
""" | |
} | |
], | |
) | |
result = response.choices[0].message.content | |
result_json = json.loads(result) # Parse the result as JSON | |
certificate_name = result_json.get("Certificate Name", "") | |
certificate_id = result_json.get("Certificate ID", "") | |
ship_name = result_json.get("Ship Name", "") | |
date_of_issue = result_json.get("Date of Issue", "") | |
expiration_date = result_json.get("Expiration Date", "") | |
print(f"β Extracted details:\n- Certificate Name: {certificate_name}\n- Certificate ID: {certificate_id}\n- Ship Name: {ship_name}\n- Date of Issue: {date_of_issue}\n- Expiration Date: {expiration_date}") | |
return certificate_text, certificate_name, certificate_id, ship_name, date_of_issue, expiration_date | |
# Function to compare two certificates using AI | |
def compare_certificates(new_cert_details, old_cert_details): | |
# Ask GPT-4o to compare the texts | |
response = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "user", | |
"content": f"""Compare the two certificates below and provide a structured summary highlighting key differences in the format below: | |
### Comparison Summary: | |
- Identify differences in terms of: | |
- Certificate ID | |
- Date of Issue | |
- Expiration Date | |
- Highlight any changes in other key details, if applicable. | |
### Take Note: | |
- Clearly structure the output for easy reading | |
- Do not include any structural changes in the text, only content changes | |
### Old Certificate: | |
{old_cert_details} | |
### New Certificate: | |
{new_cert_details}""" | |
} | |
], | |
) | |
comparison_result = response.choices[0].message.content.strip() | |
return comparison_result | |
def gradio_upload_certificate(uploaded_file): | |
# Save uploaded file to local path immediately | |
file_ext = os.path.splitext(uploaded_file.name)[-1] | |
temp_path = f"temp_uploaded_file{file_ext}" | |
shutil.copy(uploaded_file, temp_path) | |
extracted = extract_certificate_details(temp_path) | |
if not extracted: | |
return "β Failed to extract certificate details." | |
certificate_text, certificate_name, certificate_id, ship_name, date_of_issue, expiration_date = extracted | |
if not all([certificate_name, ship_name]): | |
return "β Missing key fields, unable to rename or upload." | |
safe_cert_name = certificate_name.replace(" ", "_") | |
safe_ship_name = ship_name.replace(" ", "_") | |
save_dir = os.path.join("hf_dataset_upload", safe_ship_name, safe_cert_name) | |
os.makedirs(save_dir, exist_ok=True) | |
# Check for existing certificates in the directory | |
existing_files = [ | |
f for f in os.listdir(save_dir) if os.path.isfile(os.path.join(save_dir, f)) | |
] | |
if existing_files: | |
old_cert_path = os.path.join(save_dir, existing_files[0]) | |
print(f"π Existing certificate found: {old_cert_path}") | |
old_text, old_name, old_id, old_ship_name, old_date_of_issue, old_expiration_date = extract_certificate_details(old_cert_path) | |
if not old_text: | |
return "β Failed to process the existing certificate for comparison." | |
new_cert_details = { | |
"Certificate Name": certificate_name, | |
"Certificate ID": certificate_id, | |
"Ship Name": ship_name, | |
"Date of Issue": date_of_issue, | |
"Expiration Date": expiration_date, | |
"Certificate Text": certificate_text | |
} | |
old_cert_details = { | |
"Certificate Name": old_name, | |
"Certificate ID": old_id, | |
"Ship Name": old_ship_name, | |
"Date of Issue": old_date_of_issue, | |
"Expiration Date": old_expiration_date, | |
"Certificate Text": old_text | |
} | |
# Compare the old and new certificates | |
comparison_result = compare_certificates(new_cert_details, old_cert_details) | |
# Always delete the existing file before saving the new one | |
for existing_file in existing_files: | |
os.remove(os.path.join(save_dir, existing_file)) | |
# Remove the file from Hugging Face as well | |
hf_file_path = f"{safe_ship_name}/{safe_cert_name}/{existing_file}" | |
api = HfApi(token=os.getenv("HF_API_KEY")) | |
api.delete_file( | |
path_in_repo=hf_file_path, | |
repo_id="MikeMai/Certificates_Management", | |
repo_type="dataset", | |
) | |
# Replace the existing file with the uploaded file | |
new_filename = f"{safe_ship_name}_{safe_cert_name}{file_ext}" | |
new_path = os.path.join(save_dir, new_filename) | |
shutil.copy(temp_path, new_path) | |
print(f"β Replaced the existing file with the uploaded file: {new_path}") | |
api = HfApi(token=os.getenv("HF_API_KEY")) | |
api.upload_folder( | |
folder_path="hf_dataset_upload", | |
repo_id="MikeMai/Certificates_Management", | |
repo_type="dataset", | |
) | |
hf_path = f"https://huggingface.co/datasets/MikeMai/Certificates_Management/blob/main/{safe_ship_name}/{safe_cert_name}/{new_filename}" | |
return f""" | |
β **Certificate Uploaded Successfully! Existing Certificate** | |
π [View on Hugging Face Hub]({hf_path}) | |
**New Certificate Details**: | |
**Certificate Name**: {new_cert_details['Certificate Name']} | |
**Certificate ID**: {new_cert_details['Certificate ID']} | |
**Ship Name**: {new_cert_details['Ship Name']} | |
**Date of Issue**: {new_cert_details['Date of Issue'] or "N/A"} | |
**Expiration Date**: {new_cert_details['Expiration Date'] or "N/A"} | |
**Old Certificate Details**: | |
**Certificate Name**: {old_cert_details['Certificate Name']} | |
**Certificate ID**: {old_cert_details['Certificate ID']} | |
**Ship Name**: {old_cert_details['Ship Name']} | |
**Date of Issue**: {old_cert_details['Date of Issue'] or "N/A"} | |
**Expiration Date**: {old_cert_details['Expiration Date'] or "N/A"} | |
{comparison_result} | |
""" | |
else: | |
# Save the new file if it doesn't exist | |
new_filename = f"{safe_ship_name}_{safe_cert_name}{file_ext}" | |
new_path = os.path.join(save_dir, new_filename) | |
shutil.copy(temp_path, new_path) | |
api = HfApi(token=os.getenv("HF_API_KEY")) | |
api.upload_folder( | |
folder_path="hf_dataset_upload", | |
repo_id="MikeMai/Certificates_Management", | |
repo_type="dataset", | |
) | |
hf_path = f"https://huggingface.co/datasets/MikeMai/Certificates_Management/blob/main/{safe_ship_name}/{safe_cert_name}/{new_filename}" | |
return f""" | |
β **Certificate Uploaded Successfully!** | |
**Certificate Name**: {certificate_name} | |
**Certificate ID**: {certificate_id} | |
**Ship Name**: {ship_name} | |
**Date of Issue**: {date_of_issue or "N/A"} | |
**Expiration Date**: {expiration_date or "N/A"} | |
π [View on Hugging Face Hub]({hf_path}) | |
""" | |
# Launch Gradio UI | |
gr.Interface( | |
fn=gradio_upload_certificate, | |
inputs=gr.File(label="Upload Certificate (PDF or Image)"), | |
outputs=gr.Markdown(label="Upload Result"), | |
title="π Certificate Manager", | |
description="Upload a certificate to extract certificate details, rename, and store in respective folders.", | |
show_progress='full', | |
allow_flagging="never" | |
).launch() | |
# # Run the script with your files | |
# old_cert = "load_line_cert_old.jpg" # Change to your old cert file | |
# new_cert = "load_line_cert_new.pdf" # Change to your new cert file | |
# extract_certificate_details(new_cert) | |
# comparison_result = compare_certificates(old_cert, new_cert, True) | |
# print("\nπ AI-Based Structured Comparison:\n") | |
# print(comparison_result) | |
# Gradio Interface ------------------------------ | |
# import gradio as gr | |
# from gradio.themes.base import Base | |
# interface = gr.Interface( | |
# fn=compare_certificates, | |
# title="Certificate Comparison Summarizer", | |
# inputs=[gr.File(label="Old Certificate"), gr.File(label="New Certificate")], | |
# outputs=[gr.Textbox(label="Comparison Summary")], | |
# allow_flagging="never", | |
# theme=Base() | |
# ) | |
# interface.launch() |