Spaces:
Sleeping
Sleeping
import aiofiles | |
import asyncio | |
import base64 | |
import fitz | |
import glob | |
import logging | |
import os | |
import pandas as pd | |
import pytz | |
import random | |
import re | |
import requests | |
import shutil | |
import streamlit as st | |
import time | |
import torch | |
import zipfile | |
from dataclasses import dataclass | |
from datetime import datetime | |
from diffusers import StableDiffusionPipeline | |
from io import BytesIO | |
from openai import OpenAI | |
from PIL import Image | |
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel | |
from typing import Optional | |
# ๐ค OpenAI wizardry: Summon your API magic! | |
client = OpenAI( | |
api_key=os.getenv('OPENAI_API_KEY'), | |
organization=os.getenv('OPENAI_ORG_ID') | |
) | |
# ๐ Logging activated: Capturing chaos and calm! | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
log_records = [] | |
class LogCaptureHandler(logging.Handler): | |
def emit(self, record): | |
log_records.append(record) | |
logger.addHandler(LogCaptureHandler()) | |
# ๐จ Streamlit styling: Designing a cosmic interface! | |
st.set_page_config( | |
page_title="AI Vision & SFT Titans ๐", | |
page_icon="๐ค", | |
layout="wide", | |
initial_sidebar_state="expanded", | |
menu_items={ | |
'Get Help': 'https://huggingface.co/awacke1', | |
'Report a Bug': 'https://huggingface.co/spaces/awacke1', | |
'About': "AI Vision & SFT Titans: PDFs, OCR, Image Gen, Line Drawings, Custom Diffusion, and SFT on CPU! ๐" | |
} | |
) | |
st.session_state.setdefault('history', []) # ๐ฑ History: starting fresh if empty! | |
st.session_state.setdefault('builder', None) # ๐ ๏ธ Builder: set up the builder if it's missing! | |
st.session_state.setdefault('model_loaded', False) # ๐ฆ Model Loaded: mark as not loaded by default! | |
st.session_state.setdefault('processing', {}) # โณ Processing: initialize processing state as an empty dict! | |
st.session_state.setdefault('asset_checkboxes', {}) # โ Asset Checkboxes: default to an empty dictionary! | |
st.session_state.setdefault('downloaded_pdfs', {}) # ๐ Downloaded PDFs: start with no PDFs downloaded! | |
st.session_state.setdefault('unique_counter', 0) # ๐ข Unique Counter: initialize the counter to zero! | |
st.session_state.setdefault('selected_model_type', "Causal LM") # ๐ง Selected Model Type: default to "Causal LM"! | |
st.session_state.setdefault('selected_model', "None") # ๐ค Selected Model: set to "None" if not already set! | |
st.session_state.setdefault('cam0_file', None) # ๐ธ Cam0 File: no file loaded by default! | |
st.session_state.setdefault('cam1_file', None) # ๐ธ Cam1 File: no file loaded by default! | |
# ๐จ ModelConfig: A blueprint for model configurations! | |
class ModelConfig: | |
name: str | |
base_model: str | |
size: str | |
domain: Optional[str] = None | |
model_type: str = "causal_lm" | |
def model_path(self): return f"models/{self.name}" # ๐ Model Path: Home base for brilliance! | |
# ๐จ DiffusionConfig: Where diffusion magic takes shape! | |
class DiffusionConfig: | |
name: str | |
base_model: str | |
size: str | |
domain: Optional[str] = None | |
def model_path(self): return f"diffusion_models/{self.name}" # ๐ Diffusion Path: Let the diffusion begin! | |
class ModelBuilder: # ๐ง ModelBuilder: Crafting AI wonders with wit! | |
def __init__(self): # ๐ Initialize: Setting up the AI factory! | |
self.config = None # No config yetโwaiting for genius! | |
self.model = None # Model not built until the magic happens! | |
self.tokenizer = None # Tokenizer: Ready to speak in AI! | |
self.jokes = [ # ๐คฃ Jokes to keep the circuits laughing! | |
"Why did the AI go to therapy? Too many layers to unpack! ๐", | |
"Training complete! Time for a binary coffee break. โ", | |
"I told my neural network a joke; it couldn't stop dropping bits! ๐ค", | |
"I asked the AI for a pun, and it said, 'I'm punning on parallel processing!' ๐", | |
"Debugging my code is like a stand-up routineโalways a series of exceptions! ๐" | |
] | |
def load_model(self, model_path: str, config: Optional[ModelConfig] = None): # ๐ load_model: Booting up genius! | |
with st.spinner(f"Loading {model_path}... โณ"): # โณ Spinner: Genius loading... | |
self.model = AutoModelForCausalLM.from_pretrained(model_path) | |
self.tokenizer = AutoTokenizer.from_pretrained(model_path) | |
if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token # ๐ง Fix pad token if missing! | |
if config: self.config = config # ๐ ๏ธ Config loadedโsetting the stage! | |
self.model.to("cuda" if torch.cuda.is_available() else "cpu") # ๐ป Deploying the model to its device! | |
st.success(f"Model loaded! ๐ {random.choice(self.jokes)}") # ๐ Success: Model is now in orbit! | |
return self | |
def save_model(self, path: str): # ๐พ save_model: Securing your masterpiece! | |
with st.spinner("Saving model... ๐พ"): # โณ Spinner: Saving brilliance... | |
os.makedirs(os.path.dirname(path), exist_ok=True); self.model.save_pretrained(path); self.tokenizer.save_pretrained(path) # ๐ Directory magic: Creating and saving! | |
st.success(f"Model saved at {path}! โ ") # โ Success: Your model is safely stored! | |
class DiffusionBuilder: | |
def __init__(self): | |
self.config = None | |
self.pipeline = None | |
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): | |
with st.spinner(f"Loading diffusion model {model_path}... โณ"): | |
self.pipeline = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32).to("cpu") | |
if config: | |
self.config = config | |
st.success("Diffusion model loaded! ๐จ") | |
return self | |
def save_model(self, path: str): | |
with st.spinner("Saving diffusion model... ๐พ"): | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
self.pipeline.save_pretrained(path) | |
st.success(f"Diffusion model saved at {path}! โ ") | |
def generate(self, prompt: str): | |
return self.pipeline(prompt, num_inference_steps=20).images[0] | |
def generate_filename(sequence, ext="png"): return f"{sequence}_{time.strftime('%d%m%Y%H%M%S')}.{ext}" # โณ Generate filename with timestamp magic! | |
def pdf_url_to_filename(url): | |
return re.sub(r'[<>:"/\\|?*]', '_', url) + ".pdf" # ๐ Convert URL to a safe PDF filename โ no hackers allowed! | |
def get_download_link(file_path, mime_type="application/pdf", label="Download"): return f'<a href="data:{mime_type};base64,{base64.b64encode(open(file_path, "rb").read()).decode()}" download="{os.path.basename(file_path)}">{label}</a>' # ๐ Create a download link โ click it like it's hot! | |
def zip_directory(directory_path, zip_path): | |
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: [zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.dirname(directory_path))) for root, _, files in os.walk(directory_path) for file in files] # ๐ Zip directory: Packing files faster than Santa on Christmas Eve! | |
def get_model_files(model_type="causal_lm"): return [d for d in glob.glob("models/*" if model_type == "causal_lm" else "diffusion_models/*") if os.path.isdir(d)] or ["None"] # ๐ Get model files: Hunting directories like a pro! | |
def get_gallery_files(file_types=["png", "pdf"]): return sorted(list({f for ext in file_types for f in glob.glob(f"*.{ext}")})) # ๐ผ๏ธ Get gallery files: Finding art in a digital haystack! | |
def get_pdf_files(): return sorted(glob.glob("*.pdf")) # ๐ Get PDF files: Sorted and served โ no paper cuts here! | |
# ๐ฅ Download PDF: Delivering docs faster than a caffeinated courier! | |
def download_pdf(url, output_path): | |
try: | |
response = requests.get(url, stream=True, timeout=10); [open(output_path, "wb").write(chunk) for chunk in response.iter_content(chunk_size=8192)] if response.status_code == 200 else None; ret = True if response.status_code == 200 else False | |
except requests.RequestException as e: | |
logger.error(f"Failed to download {url}: {e}"); ret = False | |
return ret | |
# ๐ Async PDF Snapshot: Snap your PDF pages without blockingโjuggle pages like a ninja! ๐ฅท | |
async def process_pdf_snapshot(pdf_path, mode="single"): | |
start_time = time.time(); status = st.empty(); status.text(f"Processing PDF Snapshot ({mode})... (0s)") | |
try: | |
doc = fitz.open(pdf_path); output_files = [] | |
if mode == "single": page = doc[0]; pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); output_file = generate_filename("single", "png"); pix.save(output_file); output_files.append(output_file) | |
elif mode == "twopage": | |
for i in range(min(2, len(doc))): page = doc[i]; pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); output_file = generate_filename(f"twopage_{i}", "png"); pix.save(output_file); output_files.append(output_file) | |
elif mode == "allpages": | |
for i in range(len(doc)): page = doc[i]; pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); output_file = generate_filename(f"page_{i}", "png"); pix.save(output_file); output_files.append(output_file) | |
doc.close(); elapsed = int(time.time() - start_time); status.text(f"PDF Snapshot ({mode}) completed in {elapsed}s!"); update_gallery(); return output_files | |
except Exception as e: status.error(f"Failed to process PDF: {str(e)}"); return [] | |
# ๐ Async OCR: Convert images to text while your app keeps on groovin'โno blocking, just rocking! ๐ธ | |
async def process_ocr(image, output_file): | |
start_time = time.time(); status = st.empty(); status.text("Processing GOT-OCR2_0... (0s)") | |
tokenizer = AutoTokenizer.from_pretrained("ucaslcl/GOT-OCR2_0", trust_remote_code=True); model = AutoModel.from_pretrained("ucaslcl/GOT-OCR2_0", trust_remote_code=True, torch_dtype=torch.float32).to("cpu").eval() | |
temp_file = f"temp_{int(time.time())}.png"; image.save(temp_file) | |
result = model.chat(tokenizer, temp_file, ocr_type='ocr'); os.remove(temp_file) | |
elapsed = int(time.time() - start_time); status.text(f"GOT-OCR2_0 completed in {elapsed}s!") | |
async with aiofiles.open(output_file, "w") as f: await f.write(result) | |
update_gallery(); return result | |
# ๐ง Async Image Gen: Your image genieโwishing up pictures while the event loop keeps the party going! ๐ | |
async def process_image_gen(prompt, output_file): | |
start_time = time.time(); status = st.empty(); status.text("Processing Image Gen... (0s)") | |
pipeline = st.session_state['builder'].pipeline if st.session_state.get('builder') and isinstance(st.session_state['builder'], DiffusionBuilder) and st.session_state['builder'].pipeline else StableDiffusionPipeline.from_pretrained("OFA-Sys/small-stable-diffusion-v0", torch_dtype=torch.float32).to("cpu") | |
gen_image = pipeline(prompt, num_inference_steps=20).images[0]; elapsed = int(time.time() - start_time) | |
status.text(f"Image Gen completed in {elapsed}s!"); gen_image.save(output_file); update_gallery(); return gen_image | |
# ๐ผ๏ธ GPT-Image Interpreter: Turning pixels into prose! | |
def process_image_with_prompt(image, prompt, model="gpt-4o-mini", detail="auto"): | |
buffered = BytesIO(); image.save(buffered, format="PNG") # ๐พ Save the image in-memory as PNGโno hard drives harmed! | |
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") # ๐ Encode image data in Base64 for secure, inline transmission! | |
messages = [{"role": "user", "content": [{"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_str}", "detail": detail}}]}] # ๐ฌ Build the GPT conversation with your prompt and image! | |
try: | |
response = client.chat.completions.create(model=model, messages=messages, max_tokens=300); return response.choices[0].message.content # ๐ค Invoke GPTโs magic and return its dazzling output! | |
except Exception as e: return f"Error processing image with GPT: {str(e)}" # โ ๏ธ OopsโGPT encountered a snag, so we catch and report the error! | |
# ๐ GPT-Text Alchemist: Merging your prompt and text into digital gold! | |
def process_text_with_prompt(text, prompt, model="gpt-4o-mini"): | |
messages = [{"role": "user", "content": f"{prompt}\n\n{text}"}] # ๐ ๏ธ Constructing the conversation input like a master wordsmith! | |
try: | |
response = client.chat.completions.create(model=model, messages=messages, max_tokens=300); return response.choices[0].message.content # ๐ค Summon GPTโs wisdom and return its brilliant answer! | |
except Exception as e: return f"Error processing text with GPT: {str(e)}" # โ ๏ธ Oops, GPT stumbledโcatching and reporting the error! | |
st.sidebar.subheader("Gallery Settings") # ๐จ Sidebar Gallery: Customize your creative space! | |
st.session_state.setdefault('gallery_size', 2) # ๐ง Setting default gallery size to 2 if it's missing! | |
st.session_state['gallery_size'] = st.sidebar.slider("Gallery Size", 1, 10, st.session_state['gallery_size'], key="gallery_size_slider") # ๐๏ธ Slide to adjust your gallery size and bring balance to your art! | |
# ๐ธ Gallery Updater: Making your assets dazzle and disappear faster than a magician's rabbit! ๐โจ | |
def update_gallery(): | |
all_files = get_gallery_files() # ๐ Grab all gallery files like a digital treasure hunt! | |
if all_files: # โ If assets are found, let the show begin! | |
st.sidebar.subheader("Asset Gallery ๐ธ๐"); cols = st.sidebar.columns(2) # ๐จ Set up a stylish 2-column layout in the sidebar! | |
for idx, file in enumerate(all_files[:st.session_state['gallery_size']]): # ๐ผ๏ธ Loop through your favorite files, limited by gallery size! | |
with cols[idx % 2]: # ๐ Alternate columnsโbecause balance is key (and funny)! | |
st.session_state['unique_counter'] += 1; unique_id = st.session_state['unique_counter'] # ๐ Increment your asset counterโevery asset gets its moment in the spotlight! | |
if file.endswith('.png'): st.image(Image.open(file), caption=os.path.basename(file), use_container_width=True) # ๐ผ๏ธ Display the image like a masterpiece! | |
else: # ๐ For PDFs, we snap their first page like a paparazzo! | |
doc = fitz.open(file); pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)); img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples); st.image(img, caption=os.path.basename(file), use_container_width=True); doc.close() | |
checkbox_key = f"asset_{file}_{unique_id}" # ๐ Create a unique keyโbecause every asset deserves VIP treatment! | |
st.session_state['asset_checkboxes'][file] = st.checkbox("Use for SFT/Input", value=st.session_state['asset_checkboxes'].get(file, False), key=checkbox_key) # โ Checkbox: Pick your asset for magic (or SFT)! | |
mime_type = "image/png" if file.endswith('.png') else "application/pdf" # ๐ Determine MIME typeโlike sorting your socks, but cooler! | |
st.markdown(get_download_link(file, mime_type, "Snag It! ๐ฅ"), unsafe_allow_html=True) # ๐ Provide a download linkโgrab your asset faster than a flash sale! | |
if st.button("Zap It! ๐๏ธ", key=f"delete_{file}_{unique_id}"): # โก "Zap It!" button: Because sometimes you just gotta make stuff disappear! | |
os.remove(file); st.session_state['asset_checkboxes'].pop(file, None); st.sidebar.success(f"Asset {os.path.basename(file)} vaporized! ๐จ"); st.rerun() # ๐ฅ Delete the file and refresh the galleryโpoof, it's gone! | |
#update_gallery() # ๐ Launch the gallery updateโlet the art party commence! (Joke: Why did the asset cross the road? To get zapped on the other side! ๐) | |
st.sidebar.subheader("Action Logs ๐") # ๐ Action Logs: Where our system whispers its secrets! | |
with st.sidebar: [st.write(f"{record.asctime} - {record.levelname} - {record.message}") for record in log_records] # ๐ Loop through log records and display them like diary entries! | |
st.sidebar.subheader("History ๐") # ๐ฐ๏ธ History: A walk down memory lane, one log at a time! | |
with st.sidebar: [st.write(entry) for entry in st.session_state['history']] # โณ Display every historic moment with style! | |
tabs = st.tabs(["Camera Snap ๐ท", "Download PDFs ๐ฅ", "Test OCR ๐", "Build Titan ๐ฑ", "Test Image Gen ๐จ", "PDF Process ๐", "Image Process ๐ผ๏ธ", "MD Gallery ๐"]) # ๐ญ Tabs: Navigate your AI universe like a boss! | |
(tab_camera, tab_download, tab_ocr, tab_build, tab_imggen, tab_pdf_process, tab_image_process, tab_md_gallery) = tabs # ๐ Unpack the tabs and get ready to exploreโbecause even tabs need to party! | |
with tab_camera: | |
st.header("Camera Snap ๐ท") # ๐ฅ Header: Letโs capture those Kodak moments! | |
st.subheader("Single Capture") # ๐ธ Subheader: One snap at a time, no double exposure! | |
cols = st.columns(2) # ๐งฉ Creating two columns for double-camera action! | |
with cols[0]: | |
cam0_img = st.camera_input("Take a picture - Cam 0", key="cam0") # ๐ท Cam 0: Say cheese! | |
if cam0_img: | |
filename = generate_filename("cam0") # ๐ท๏ธ Filename for Cam 0 snapshot generated! | |
if st.session_state['cam0_file'] and os.path.exists(st.session_state['cam0_file']): os.remove(st.session_state['cam0_file']) # ๐๏ธ Out with the old Cam 0 snap! | |
with open(filename, "wb") as f: f.write(cam0_img.getvalue()) # ๐พ Saving Cam 0 image like a boss! | |
st.session_state['cam0_file'] = filename # ๐ Updating session state for Cam 0 file! | |
entry = f"Snapshot from Cam 0: {filename}" # ๐ History entry: Cam 0 snapshot recorded! | |
if entry not in st.session_state['history']: | |
st.session_state['history'] = [e for e in st.session_state['history'] if not e.startswith("Snapshot from Cam 0:")] + [entry] # ๐งน Cleaning and updating history! | |
st.image(Image.open(filename), caption="Camera 0", use_container_width=True) # ๐ผ๏ธ Displaying the fresh Cam 0 image! | |
logger.info(f"Saved snapshot from Camera 0: {filename}") # ๐ Logging: Cam 0 snapshot saved! | |
update_gallery() # ๐ Refreshing gallery to show the new snap! | |
with cols[1]: | |
cam1_img = st.camera_input("Take a picture - Cam 1", key="cam1") # ๐ท Cam 1: Capture your best side! | |
if cam1_img: | |
filename = generate_filename("cam1") # ๐ท๏ธ Filename for Cam 1 snapshot generated! | |
if st.session_state['cam1_file'] and os.path.exists(st.session_state['cam1_file']): os.remove(st.session_state['cam1_file']) # ๐๏ธ Out with the old Cam 1 snap! | |
with open(filename, "wb") as f: f.write(cam1_img.getvalue()) # ๐พ Saving Cam 1 image like a pro! | |
st.session_state['cam1_file'] = filename # ๐ Updating session state for Cam 1 file! | |
entry = f"Snapshot from Cam 1: {filename}" # ๐ History entry: Cam 1 snapshot recorded! | |
if entry not in st.session_state['history']: | |
st.session_state['history'] = [e for e in st.session_state['history'] if not e.startswith("Snapshot from Cam 1:")] + [entry] # ๐งน Cleaning and updating history! | |
st.image(Image.open(filename), caption="Camera 1", use_container_width=True) # ๐ผ๏ธ Displaying the fresh Cam 1 image! | |
logger.info(f"Saved snapshot from Camera 1: {filename}") # ๐ Logging: Cam 1 snapshot saved! | |
update_gallery() # ๐ Refreshing gallery to show the new snap! | |
# === Tab: Download PDFs === | |
with tab_download: | |
st.header("Download PDFs ๐ฅ") # ๐ฅ Header: Ready to snag PDFs like a digital ninja! | |
if st.button("Examples ๐"): # ๐ Button: Load up some scholarly URLs for instant fun! | |
example_urls = ["https://arxiv.org/pdf/2308.03892", "https://arxiv.org/pdf/1912.01703", "https://arxiv.org/pdf/2408.11039", "https://arxiv.org/pdf/2109.10282", "https://arxiv.org/pdf/2112.10752", "https://arxiv.org/pdf/2308.11236", "https://arxiv.org/pdf/1706.03762", "https://arxiv.org/pdf/2006.11239", "https://arxiv.org/pdf/2305.11207", "https://arxiv.org/pdf/2106.09685", "https://arxiv.org/pdf/2005.11401", "https://arxiv.org/pdf/2106.10504"]; st.session_state['pdf_urls'] = "\n".join(example_urls) # ๐ Examples loaded into session! | |
url_input = st.text_area("Enter PDF URLs (one per line)", value=st.session_state.get('pdf_urls', ""), height=200) # ๐ Text area: Paste your PDF URLs hereโno commas needed! | |
# --- Download PDFs Tab (modified section) --- | |
if st.button("Robo-Download ๐ค"): | |
urls = url_input.strip().split("\n") | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
total_urls = len(urls) | |
existing_pdfs = get_pdf_files() | |
for idx, url in enumerate(urls): | |
if url: | |
output_path = pdf_url_to_filename(url) | |
status_text.text(f"Fetching {idx + 1}/{total_urls}: {os.path.basename(output_path)}...") | |
if output_path not in existing_pdfs: | |
if download_pdf(url, output_path): | |
st.session_state['downloaded_pdfs'][url] = output_path | |
logger.info(f"Downloaded PDF from {url} to {output_path}") | |
entry = f"Downloaded PDF: {output_path}" | |
if entry not in st.session_state['history']: | |
st.session_state['history'].append(entry) | |
st.session_state['asset_checkboxes'][output_path] = True | |
else: | |
st.error(f"Failed to nab {url} ๐ฟ") | |
else: | |
st.info(f"Already got {os.path.basename(output_path)}! Skipping... ๐พ") | |
st.session_state['downloaded_pdfs'][url] = output_path | |
progress_bar.progress((idx + 1) / total_urls) | |
status_text.text("Robo-Download complete! ๐") | |
update_gallery() | |
mode = st.selectbox("Snapshot Mode", ["Single Page (High-Res)", "Two Pages (High-Res)", "All Pages (High-Res)"], key="download_mode") # ๐๏ธ Selectbox: Choose your snapshot resolution! | |
if st.button("Snapshot Selected ๐ธ"): | |
selected_pdfs = [path for path in get_gallery_files() | |
if path.endswith('.pdf') and st.session_state['asset_checkboxes'].get(path, False)] | |
if selected_pdfs: | |
for pdf_path in selected_pdfs: | |
if not os.path.exists(pdf_path): | |
st.warning(f"File not found: {pdf_path}. Skipping.") | |
continue | |
mode_key = {"Single Page (High-Res)": "single", | |
"Two Pages (High-Res)": "twopage", | |
"All Pages (High-Res)": "allpages"}[mode] | |
snapshots = asyncio.run(process_pdf_snapshot(pdf_path, mode_key)) | |
for snapshot in snapshots: | |
st.image(Image.open(snapshot), caption=snapshot, use_container_width=True) | |
st.session_state['asset_checkboxes'][snapshot] = True | |
update_gallery() | |
else: | |
st.warning("No PDFs selected for snapshotting! Check some boxes in the sidebar.") | |
# === Tab: Test OCR === | |
with tab_ocr: | |
st.header("Test OCR ๐") # ๐ Header: Time to turn images into textโmagic for your eyeballs! | |
all_files = get_gallery_files(); # ๐ Gathering all assets from the gallery! | |
if all_files: | |
if st.button("OCR All Assets ๐"): # ๐ Button: Blast OCR on every asset in one go! | |
full_text = "# OCR Results\n\n"; # ๐ Starting a full OCR report! | |
for file in all_files: | |
if file.endswith('.png'): image = Image.open(file) # ๐ผ๏ธ PNG? Open image directly! | |
else: | |
doc = fitz.open(file); pix = doc[0].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples); doc.close() # ๐ PDF? Grab a snapshot of the first page! | |
output_file = generate_filename(f"ocr_{os.path.basename(file)}", "txt"); # ๐พ Create a unique filename for the OCR text! | |
result = asyncio.run(process_ocr(image, output_file)); # ๐ค Run OCR asynchronouslyโnon-blocking wizardry! | |
full_text += f"## {os.path.basename(file)}\n\n{result}\n\n"; # ๐ Append the OCR result to the full report! | |
entry = f"OCR Test: {file} -> {output_file}"; # ๐ Log this OCR operation! | |
if entry not in st.session_state['history']: st.session_state['history'].append(entry) # โ Update history if this entry is new! | |
md_output_file = f"full_ocr_{int(time.time())}.md"; # ๐ Generate a markdown filename for the full OCR report! | |
with open(md_output_file, "w") as f: f.write(full_text); # ๐พ Write the full OCR report to disk! | |
st.success(f"Full OCR saved to {md_output_file}"); # ๐ Success: Full OCR report is saved! | |
st.markdown(get_download_link(md_output_file, "text/markdown", "Download Full OCR Markdown"), unsafe_allow_html=True) # ๐ Provide a download link for your OCR masterpiece! | |
selected_file = st.selectbox("Select Image or PDF", all_files, key="ocr_select"); # ๐ Selectbox: Pick an asset for individual OCR! | |
if selected_file: | |
if selected_file.endswith('.png'): image = Image.open(selected_file) # ๐ผ๏ธ Open the selected PNG image! | |
else: | |
doc = fitz.open(selected_file); pix = doc[0].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples); doc.close() # ๐ For PDFs, extract a snapshot from the first page! | |
st.image(image, caption="Input Image", use_container_width=True); # ๐ผ๏ธ Display the selected asset for OCR review! | |
if st.button("Run OCR ๐", key="ocr_run"): # ๐ Button: Run OCR on the selected asset! | |
output_file = generate_filename("ocr_output", "txt"); st.session_state['processing']['ocr'] = True; # ๐พ Generate output filename and flag processing! | |
result = asyncio.run(process_ocr(image, output_file)); # ๐ค Execute OCR asynchronously! | |
entry = f"OCR Test: {selected_file} -> {output_file}"; # ๐ Create a log entry for this OCR run! | |
if entry not in st.session_state['history']: st.session_state['history'].append(entry); # โ Update history if new! | |
st.text_area("OCR Result", result, height=200, key="ocr_result"); # ๐ Show the OCR result in a text area! | |
st.success(f"OCR output saved to {output_file}"); st.session_state['processing']['ocr'] = False # ๐ Success: OCR result saved and processing flag reset! | |
if selected_file.endswith('.pdf') and st.button("OCR All Pages ๐", key="ocr_all_pages"): # ๐ Button: Run OCR on every page of a PDF! | |
doc = fitz.open(selected_file); full_text = f"# OCR Results for {os.path.basename(selected_file)}\n\n"; # ๐ Start a report for multi-page PDF OCR! | |
for i in range(len(doc)): | |
pix = doc[i].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples); # ๐ผ๏ธ Capture each page as an image! | |
output_file = generate_filename(f"ocr_page_{i}", "txt"); result = asyncio.run(process_ocr(image, output_file)); # ๐พ Generate filename and process OCR for the page! | |
full_text += f"## Page {i + 1}\n\n{result}\n\n"; # ๐ Append the page's OCR result to the report! | |
entry = f"OCR Test: {selected_file} Page {i + 1} -> {output_file}"; # ๐ Log this page's OCR operation! | |
if entry not in st.session_state['history']: st.session_state['history'].append(entry) # โ Update history if this entry is new! | |
md_output_file = f"full_ocr_{os.path.basename(selected_file)}_{int(time.time())}.md"; # ๐ Create a markdown filename for the full multi-page OCR report! | |
with open(md_output_file, "w") as f: f.write(full_text); # ๐พ Write the full multi-page OCR report to disk! | |
st.success(f"Full OCR saved to {md_output_file}"); # ๐ Success: Multi-page OCR report is saved! | |
st.markdown(get_download_link(md_output_file, "text/markdown", "Download Full OCR Markdown"), unsafe_allow_html=True) # ๐ Provide a download link for the multi-page OCR report! | |
else: | |
st.warning("No assets in gallery yet. Use Camera Snap or Download PDFs!") # โ ๏ธ Warning: Your gallery is emptyโcapture or download some assets first! | |
# === Tab: Build Titan === | |
with tab_build: | |
st.header("Build Titan ๐ฑ") # ๐ฑ Header: Build your own Titanโtiny models, huge ambitions! | |
model_type = st.selectbox("Model Type", ["Causal LM", "Diffusion"], key="build_type") # ๐ Choose your model flavor! | |
base_model = st.selectbox( | |
"Select Tiny Model", | |
["HuggingFaceTB/SmolLM-135M", "Qwen/Qwen1.5-0.5B-Chat"] if model_type == "Causal LM" | |
else ["OFA-Sys/small-stable-diffusion-v0", "stabilityai/stable-diffusion-2-base"] | |
) # ๐ค Pick a tiny model based on your choice! | |
model_name = st.text_input("Model Name", f"tiny-titan-{int(time.time())}") # ๐ท๏ธ Auto-generate a cool model name with a timestamp! | |
domain = st.text_input("Target Domain", "general") # ๐ฏ Specify your target domain (default: general)! | |
if st.button("Download Model โฌ๏ธ"): # โฌ๏ธ Button: Download your model and get ready to unleash the Titan! | |
config = (ModelConfig if model_type == "Causal LM" else DiffusionConfig)( | |
name=model_name, base_model=base_model, size="small", domain=domain | |
) # ๐ Create model configuration on the fly! | |
builder = ModelBuilder() if model_type == "Causal LM" else DiffusionBuilder() # ๐ง Instantiate the builder for your model type! | |
builder.load_model(base_model, config); builder.save_model(config.model_path) # ๐ Load and save the modelโinstant Titan assembly! | |
st.session_state['builder'] = builder; st.session_state['model_loaded'] = True # โ๏ธ Update session state: model is now loaded! | |
st.session_state['selected_model_type'] = model_type; st.session_state['selected_model'] = config.model_path # ๐ Store your selection for posterity! | |
entry = f"Built {model_type} model: {model_name}" # ๐ Log the build event in history! | |
if entry not in st.session_state['history']: st.session_state['history'].append(entry) | |
st.success(f"Model downloaded and saved to {config.model_path}! ๐"); st.rerun() # ๐ Success: Titan built, now re-run to refresh the interface! | |
# === Tab: Test Image Gen === | |
with tab_imggen: | |
st.header("Test Image Gen ๐จ") # ๐จ Header: Time to get creative with AI image generation! | |
all_files = get_gallery_files() # ๐ Retrieve all gallery assets for selection. | |
if all_files: | |
selected_file = st.selectbox("Select Image or PDF", all_files, key="gen_select") # ๐ Select an asset to spark creativity! | |
if selected_file: | |
if selected_file.endswith('.png'): | |
image = Image.open(selected_file) # ๐ผ๏ธ Directly open PNG images! | |
else: | |
doc = fitz.open(selected_file); pix = doc[0].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); | |
image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples); doc.close() # ๐ For PDFs, extract the first page as an image! | |
st.image(image, caption="Reference Image", use_container_width=True) # ๐ผ๏ธ Display the chosen asset as reference. | |
prompt = st.text_area("Prompt", "Generate a neon superhero version of this image", key="gen_prompt") # โ๏ธ Enter a creative prompt to transform the image! | |
if st.button("Run Image Gen ๐", key="gen_run"): # ๐ Button: Ignite the image generator! | |
output_file = generate_filename("gen_output", "png"); st.session_state['processing']['gen'] = True # ๐พ Create output filename and flag processing status. | |
result = asyncio.run(process_image_gen(prompt, output_file)) # ๐ค Run the async image generationโnon-blocking magic in action! | |
entry = f"Image Gen Test: {prompt} -> {output_file}" # ๐ Log the image generation event! | |
if entry not in st.session_state['history']: st.session_state['history'].append(entry) | |
st.image(result, caption="Generated Image", use_container_width=True) # ๐ผ๏ธ Showcase the newly generated image! | |
st.success(f"Image saved to {output_file}"); st.session_state['processing']['gen'] = False # ๐ Success: Your masterpiece is saved and processing is complete! | |
else: | |
st.warning("No images or PDFs in gallery yet. Use Camera Snap or Download PDFs!") # โ ๏ธ Warning: No assets availableโcapture or download some first! | |
update_gallery() # ๐ Refresh the gallery to display any updates! | |
# === Updated Tab: PDF Process === | |
with tab_pdf_process: | |
st.header("PDF Process") # ๐ Header: Ready to transform your PDFs into text with GPT magic! | |
st.subheader("Upload PDFs for GPT-based text extraction") # ๐ Subheader: Upload your PDFs and let the AI do the reading! | |
gpt_models = ["gpt-4o", "gpt-4o-mini"] # ๐ค GPT Models: Pick your AI wizardโmore vision-capable models may join the party! | |
selected_gpt_model = st.selectbox("Select GPT Model", gpt_models, key="pdf_gpt_model") # ๐ Select your GPT model and let it work its charm! | |
detail_level = st.selectbox("Detail Level", ["auto", "low", "high"], key="pdf_detail_level") # ๐๏ธ Detail Level: Fine-tune your extractionโs precision! | |
uploaded_pdfs = st.file_uploader("Upload PDF files", type=["pdf"], accept_multiple_files=True, key="pdf_process_uploader") # ๐ค Uploader: Drag & drop your PDFs for processing! | |
view_mode = st.selectbox("View Mode", ["Single Page", "Double Page"], key="pdf_view_mode") # ๐ View Mode: Choose single or double page snapshots! | |
if st.button("Process Uploaded PDFs", key="process_pdfs"): # โ๏ธ Button: Kick off the PDF processing extravaganza! | |
combined_text = "" # ๐ Initialize a blank slate for the GPT output! | |
for pdf_file in uploaded_pdfs: # ๐ Loop through each uploaded PDF file! | |
pdf_bytes = pdf_file.read() # ๐ฅ Read the PDF bytes into memory! | |
temp_pdf_path = f"temp_{pdf_file.name}" # ๐ท๏ธ Create a temporary filename for processing! | |
with open(temp_pdf_path, "wb") as f: f.write(pdf_bytes) # ๐พ Write the PDF to a temporary file! | |
try: | |
doc = fitz.open(temp_pdf_path) # ๐ Open the temporary PDF document! | |
st.write(f"Processing {pdf_file.name} with {len(doc)} pages") # ๐ Log: Display file name and page count! | |
if view_mode == "Single Page": # ๐ Single Page Mode: Process each page separately! | |
for i, page in enumerate(doc): | |
pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); # ๐๏ธ Create a high-res pixmap of the page! | |
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples); # ๐ผ๏ธ Convert the pixmap to an image! | |
st.image(img, caption=f"{pdf_file.name} Page {i+1}"); # ๐ผ๏ธ Display the page image! | |
gpt_text = process_image_with_prompt( | |
img, "Extract the electronic text from image", model=selected_gpt_model, detail=detail_level | |
); # ๐ค Run GPT to extract text from the image! | |
combined_text += f"\n## {pdf_file.name} - Page {i+1}\n\n{gpt_text}\n"; # ๐ Append the result to the combined text! | |
else: # ๐ Double Page Mode: Process pages in pairs! | |
pages = list(doc); # ๐ข Convert document pages to a list! | |
for i in range(0, len(pages), 2): | |
if i+1 < len(pages): # ๐ฏ Process two pages if available! | |
pix1 = pages[i].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); img1 = Image.frombytes("RGB", [pix1.width, pix1.height], pix1.samples); # ๐ผ๏ธ Process first page! | |
pix2 = pages[i+1].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); img2 = Image.frombytes("RGB", [pix2.width, pix2.height], pix2.samples); # ๐ผ๏ธ Process second page! | |
total_width = img1.width + img2.width; max_height = max(img1.height, img2.height); # ๐ Calculate dimensions for the combined image! | |
combined_img = Image.new("RGB", (total_width, max_height)); # ๐ผ๏ธ Create a blank canvas for the two pages! | |
combined_img.paste(img1, (0, 0)); combined_img.paste(img2, (img1.width, 0)); # ๐จ Paste the images side by side! | |
st.image(combined_img, caption=f"{pdf_file.name} Pages {i+1}-{i+2}"); # ๐ผ๏ธ Display the combined image! | |
gpt_text = process_image_with_prompt( | |
combined_img, "Extract the electronic text from image", model=selected_gpt_model, detail=detail_level | |
); # ๐ค Extract text from the combined image! | |
combined_text += f"\n## {pdf_file.name} - Pages {i+1}-{i+2}\n\n{gpt_text}\n"; # ๐ Append the result to the combined text! | |
else: # ๐น If there's an odd page out, process it solo! | |
pix = pages[i].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)); img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples); # ๐ผ๏ธ Process the single remaining page! | |
st.image(img, caption=f"{pdf_file.name} Page {i+1}"); # ๐ผ๏ธ Display the solo page image! | |
gpt_text = process_image_with_prompt( | |
img, "Extract the electronic text from image", model=selected_gpt_model, detail=detail_level | |
); # ๐ค Run GPT extraction on the solo page! | |
combined_text += f"\n## {pdf_file.name} - Page {i+1}\n\n{gpt_text}\n"; # ๐ Append the result! | |
doc.close(); # โ Close the PDF document to free up resources! | |
except Exception as e: | |
st.error(f"Error processing {pdf_file.name}: {str(e)}"); # โ ๏ธ Error: Report any issues during processing! | |
finally: | |
os.remove(temp_pdf_path); # ๐งน Cleanup: Remove the temporary PDF file! | |
output_filename = generate_filename("processed_pdf", "md"); # ๐ท๏ธ Generate a unique filename for the Markdown output! | |
with open(output_filename, "w", encoding="utf-8") as f: f.write(combined_text); # ๐พ Write the combined GPT text to the Markdown file! | |
st.success(f"PDF processing complete. MD file saved as {output_filename}"); # ๐ Success: Notify the user of completion! | |
st.markdown(get_download_link(output_filename, "text/markdown", "Download Processed PDF MD"), unsafe_allow_html=True); # ๐ Provide a download link for your processed file! | |
# === Updated Tab: Image Process === | |
with tab_image_process: | |
st.header("Image Process") # ๐ผ๏ธ Header: Transform images into text with GPT magic! | |
st.subheader("Upload Images for GPT-based OCR") # ๐ Subheader: Let your images speak for themselves! | |
gpt_models = ["gpt-4o", "gpt-4o-mini"] # ๐ค GPT Models: Choose your image wizard! | |
selected_gpt_model = st.selectbox("Select GPT Model", gpt_models, key="img_gpt_model") # ๐ Pick your GPT model for image processing! | |
detail_level = st.selectbox("Detail Level", ["auto", "low", "high"], key="img_detail_level") # ๐๏ธ Detail Level: Set your extraction precision! | |
prompt_img = st.text_input("Enter prompt for image processing", "Extract the electronic text from image", key="img_process_prompt") # โ๏ธ Prompt: Tell GPT what to extract! | |
uploaded_images = st.file_uploader("Upload image files", type=["png", "jpg", "jpeg"], accept_multiple_files=True, key="image_process_uploader") # ๐ค Uploader: Drag & drop your images here! | |
if st.button("Process Uploaded Images", key="process_images"): # ๐ Button: Fire up the image processing! | |
combined_text = "" # ๐ Initialize combined text output! | |
for img_file in uploaded_images: | |
try: | |
img = Image.open(img_file); st.image(img, caption=img_file.name) # ๐ธ Display each uploaded image! | |
gpt_text = process_image_with_prompt(img, prompt_img, model=selected_gpt_model, detail=detail_level) # ๐ค Process image with GPT magic! | |
combined_text += f"\n## {img_file.name}\n\n{gpt_text}\n" # ๐ Append GPT output with file header! | |
except Exception as e: st.error(f"Error processing image {img_file.name}: {str(e)}") # โ ๏ธ Oops: Report errors if any! | |
output_filename = generate_filename("processed_image", "md") # ๐พ Generate a unique filename for the Markdown output! | |
with open(output_filename, "w", encoding="utf-8") as f: f.write(combined_text) # ๐ Save the combined GPT output! | |
st.success(f"Image processing complete. MD file saved as {output_filename}") # ๐ Success: Notify the user! | |
st.markdown(get_download_link(output_filename, "text/markdown", "Download Processed Image MD"), unsafe_allow_html=True) # ๐ Provide a download link! | |
# === Updated Tab: MD Gallery === | |
with tab_md_gallery: | |
st.header("MD Gallery and GPT Processing") # ๐ Header: Where markdown meets GPT wizardry! | |
gpt_models = ["gpt-4o", "gpt-4o-mini"] # ๐ค GPT Models: Pick your processing partner! | |
selected_gpt_model = st.selectbox("Select GPT Model", gpt_models, key="md_gpt_model") # ๐ Select a GPT model for MD processing! | |
md_files = sorted(glob.glob("*.md")) # ๐ Gather all Markdown files in the directory! | |
if md_files: | |
st.subheader("Individual File Processing") # ๐ Subheader: Process files one at a time! | |
cols = st.columns(2) # ๐งฉ Set up two columns for a balanced view! | |
for idx, md_file in enumerate(md_files): | |
with cols[idx % 2]: | |
st.write(md_file) # ๐ Show the filename! | |
if st.button(f"Process {md_file}", key=f"process_md_{md_file}"): # ๐ Button: Process this file! | |
try: | |
with open(md_file, "r", encoding="utf-8") as f: content = f.read() # ๐ Read file content! | |
prompt_md = "Summarize this into markdown outline with emojis and number the topics 1..12" # โ๏ธ Prompt: Summarize with style! | |
result_text = process_text_with_prompt(content, prompt_md, model=selected_gpt_model) # ๐ค Let GPT work its magic! | |
st.markdown(result_text) # ๐จ Display the GPT output! | |
output_filename = generate_filename(f"processed_{os.path.splitext(md_file)[0]}", "md") # ๐พ Create a unique output filename! | |
with open(output_filename, "w", encoding="utf-8") as f: f.write(result_text) # ๐ Save the processed content! | |
st.markdown(get_download_link(output_filename, "text/markdown", f"Download {output_filename}"), unsafe_allow_html=True) # ๐ Provide a download link! | |
except Exception as e: st.error(f"Error processing {md_file}: {str(e)}") # โ ๏ธ Report errors if processing fails! | |
st.subheader("Batch Processing") # ๐ Subheader: Combine and process multiple files at once! | |
st.write("Select MD files to combine and process:") # ๐ Instruction: Choose files for batch processing! | |
selected_md = {} # ๐๏ธ Initialize selection dictionary! | |
for md_file in md_files: selected_md[md_file] = st.checkbox(md_file, key=f"checkbox_md_{md_file}") # โ Create checkboxes for each file! | |
batch_prompt = st.text_input("Enter batch processing prompt", "Summarize this into markdown outline with emojis and number the topics 1..12", key="batch_prompt") # โ๏ธ Batch prompt: Set your summarization style! | |
if st.button("Process Selected MD Files", key="process_batch_md"): # ๐ Button: Process the selected files! | |
combined_content = "" # ๐ Initialize combined content string! | |
for md_file, selected in selected_md.items(): | |
if selected: | |
try: | |
with open(md_file, "r", encoding="utf-8") as f: combined_content += f"\n## {md_file}\n" + f.read() + "\n" # ๐ Append each selected file's content! | |
except Exception as e: st.error(f"Error reading {md_file}: {str(e)}") # โ ๏ธ Report errors if file reading fails! | |
if combined_content: | |
result_text = process_text_with_prompt(combined_content, batch_prompt, model=selected_gpt_model) # ๐ค Process the batch with GPT! | |
st.markdown(result_text) # ๐จ Display the combined GPT output! | |
output_filename = generate_filename("batch_processed_md", "md") # ๐พ Generate a unique filename for the batch output! | |
with open(output_filename, "w", encoding="utf-8") as f: f.write(result_text) # ๐ Save the batch processed text! | |
st.success(f"Batch processing complete. MD file saved as {output_filename}") # ๐ Notify success! | |
st.markdown(get_download_link(output_filename, "text/markdown", "Download Batch Processed MD"), unsafe_allow_html=True) # ๐ Provide a download link! | |
else: | |
st.warning("No MD files selected.") # โ ๏ธ Warning: No files were chosen for batch processing! | |
else: | |
st.warning("No MD files found.") # โ ๏ธ Warning: Your gallery is emptyโno markdown files available! |