Spaces:
Sleeping
Sleeping
import gradio as gr | |
import torch | |
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration | |
from PIL import Image | |
import requests | |
import pandas as pd | |
import numpy as np | |
import uuid | |
import os | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 1. Load Qwen2-VL OCR Model & Processor (once at startup) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
MODEL_ID = "prithivMLmods/Qwen2-VL-OCR-2B-Instruct" | |
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True) | |
model = Qwen2VLForConditionalGeneration.from_pretrained( | |
MODEL_ID, | |
trust_remote_code=True, | |
torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32, | |
).to(DEVICE).eval() | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 2. OCR Helper: Extract text from a single PIL image | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def run_qwen_ocr(pil_image: Image.Image) -> str: | |
""" | |
Use Qwen2-VL to OCR the given PIL image. | |
Returns extracted text. | |
""" | |
# Build prompt: text + image | |
user_message = [ | |
{"type": "text", "text": "OCR the text in the image."}, | |
{"type": "image", "image": pil_image}, | |
] | |
messages = [{"role": "user", "content": user_message}] | |
# Create full prompt | |
prompt_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
inputs = processor( | |
text=[prompt_full], | |
images=[pil_image], | |
return_tensors="pt", | |
padding=True, | |
).to(DEVICE) | |
outputs = model.generate(**inputs, max_new_tokens=1024) | |
decoded = processor.decode(outputs[0], skip_special_tokens=True).strip() | |
return decoded.replace("<|im_end|>", "").strip() | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 3. OpenLibrary Lookup Helper | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def query_openlibrary(title_text: str, author_text: str = None) -> dict | None: | |
""" | |
Query OpenLibrary by title (and optional author). | |
Returns a dict with title, author_name, publisher, first_publish_year. | |
""" | |
base_url = "https://openlibrary.org/search.json" | |
params = {"title": title_text} | |
if author_text: | |
params["author"] = author_text | |
try: | |
resp = requests.get(base_url, params=params, timeout=5) | |
resp.raise_for_status() | |
data = resp.json() | |
if data.get("docs"): | |
doc = data["docs"][0] | |
return { | |
"title": doc.get("title", ""), | |
"author_name": ", ".join(doc.get("author_name", [])), | |
"publisher": ", ".join(doc.get("publisher", [])), | |
"first_publish_year": doc.get("first_publish_year", ""), | |
} | |
except Exception as e: | |
print(f"OpenLibrary query failed: {e}") | |
return None | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 4. Main Processing: OCR β Parse β OpenLibrary β CSV/DF | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def process_image_list(filepaths: list[str]): | |
""" | |
Takes a list of file paths (each a single-cover image). | |
Runs OCR on each via Qwen2-VL, parses first two lines as title/author, | |
queries OpenLibrary, and returns a DataFrame + CSV path. | |
""" | |
records = [] | |
for path in filepaths: | |
try: | |
pil_img = Image.open(path).convert("RGB") | |
except Exception as e: | |
print(f"Failed to open image {path}: {e}") | |
continue | |
# 1) OCR | |
try: | |
ocr_text = run_qwen_ocr(pil_img) | |
except Exception as e: | |
print(f"OCR failed on {path}: {e}") | |
continue | |
# 2) Parse lines | |
lines = [line.strip() for line in ocr_text.splitlines() if line.strip()] | |
if not lines: | |
continue | |
title_guess = lines[0] | |
author_guess = lines[1] if len(lines) > 1 else None | |
# 3) OpenLibrary lookup | |
meta = query_openlibrary(title_guess, author_guess) | |
if meta: | |
records.append(meta) | |
else: | |
records.append({ | |
"title": title_guess, | |
"author_name": author_guess or "", | |
"publisher": "", | |
"first_publish_year": "", | |
}) | |
# 4) Build DataFrame | |
df = pd.DataFrame(records, columns=["title", "author_name", "publisher", "first_publish_year"]) | |
csv_bytes = df.to_csv(index=False).encode() | |
# 5) Write CSV to temp file | |
unique_name = f"books_{uuid.uuid4().hex}.csv" | |
temp_path = os.path.join("/tmp", unique_name) | |
with open(temp_path, "wb") as f: | |
f.write(csv_bytes) | |
return df, temp_path | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 5. Gradio Interface | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def build_interface(): | |
with gr.Blocks(title="Book Cover Scanner (Qwen2-VL OCR)") as demo: | |
gr.Markdown( | |
""" | |
# π Book Cover Scanner + Metadata Lookup | |
1. Upload **one or more** image files, each containing a single book cover. | |
2. The app will OCR each cover (via Qwen2-VL), take: | |
- the **first nonempty line** as a βtitleβ guess, and | |
- the **second nonempty line** (if present) as an βauthorβ guess, then | |
- query OpenLibrary once per image for metadata. | |
3. A table appears below with Title, Author(s), Publisher, Year. | |
4. Click βDownload CSVβ to export all results. | |
**Tips:** | |
- Use clear, high-contrast photos (text should be legible). | |
- Each image should contain exactly one book cover. | |
- If Qwen2-VL OCR fails on any image, that image is skipped. | |
""" | |
) | |
with gr.Row(): | |
file_input = gr.File( | |
label="Upload Book Cover(s)", | |
file_count="multiple", | |
type="filepath", | |
file_types=[".jpg", ".jpeg", ".png"] | |
) | |
run_button = gr.Button("OCR & Lookup") | |
output_table = gr.Dataframe( | |
headers=["title", "author_name", "publisher", "first_publish_year"], | |
label="Detected Books + Metadata", | |
datatype="pandas", | |
) | |
download_file = gr.File(label="Download CSV") | |
def on_run(filepaths): | |
# filepaths is a list of local file paths | |
df, csv_path = process_image_list(filepaths or []) | |
return df, csv_path | |
run_button.click( | |
fn=on_run, | |
inputs=[file_input], | |
outputs=[output_table, download_file], | |
) | |
return demo | |
if __name__ == "__main__": | |
build_interface().launch() | |