Spaces:
Running
Running
#!/usr/bin/env python3 | |
import gradio as gr | |
import os | |
import hashlib | |
import json | |
import traceback | |
import zipfile | |
import tempfile | |
import shutil | |
from pathlib import Path | |
import requests | |
import threading | |
import time | |
from typing import List, Dict, Optional, Tuple | |
from dataclasses import dataclass | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
# Global variables for progress tracking | |
upload_progress = {"current": 0, "total": 0, "status": "", "files_processed": [], "errors": [], "final_summary": ""} | |
upload_lock = threading.Lock() | |
class ProcessResult: | |
filename: str | |
status: str # "skipped", "uploaded", "error" | |
message: str | |
file_hash: Optional[str] = None | |
def calculate_sha256(filepath: Path) -> str: | |
"""Calculate SHA256 hash of a file""" | |
sha256_hash = hashlib.sha256() | |
with open(filepath, "rb") as f: | |
# Read in 100MB chunks for better performance | |
for byte_block in iter(lambda: f.read(104857600), b""): | |
sha256_hash.update(byte_block) | |
return sha256_hash.hexdigest() | |
def check_hash_exists(file_hash: str) -> bool: | |
"""Check if file hash already exists in datadrones.com""" | |
try: | |
hash_request = requests.get( | |
f"https://dl.datadrones.com/api/model/sha256sum/{file_hash}", | |
timeout=10 | |
) | |
return hash_request.status_code == 200 | |
except Exception as e: | |
print(f"Error checking hash existence: {e}") | |
return False | |
def find_by_hash(file_hash: str) -> Optional[Dict]: | |
"""Find metadata by hash from Civitai and other sources""" | |
# Get Civitai API key from environment variable (HuggingFace Spaces secret) | |
civitai_api_key = os.getenv("CIVITAI_API_KEY") | |
header = { | |
"Content-Type": "application/json", | |
} | |
# Only add Authorization header if API key is available | |
if civitai_api_key: | |
header["Authorization"] = f"Bearer {civitai_api_key}" | |
else: | |
print("β οΈ Warning: CIVITAI_API_KEY not found in environment variables") | |
print(f"Retrieving metadata by hash {file_hash}") | |
# Try Civitai first | |
try: | |
response = requests.get( | |
f"https://civitai.com/api/v1/model-versions/by-hash/{file_hash}", | |
headers=header, | |
timeout=15 | |
) | |
if response.status_code == 200: | |
civitai_data = {"civitai": response.json()} | |
return civitai_data | |
except Exception as e: | |
print(f"Civitai API error: {e}") | |
# Try civitaiarchive as fallback | |
try: | |
response = requests.get(f"https://civitaiarchive.com/api/sha256/{file_hash}", timeout=15) | |
if response.status_code == 200: | |
civitai_data = {"civitai": response.json()} | |
return civitai_data | |
except Exception as e: | |
print(f"CivitaiArchive API error: {e}") | |
return None | |
def submit_to_datadrones(model_path: Path, metadata: Dict) -> bool: | |
"""Submit file to datadrones.com""" | |
image_url = None | |
model_versions = None | |
base_model = None | |
tags = None | |
model_type = None | |
# Start with model name if available | |
description = "" | |
model_name = None | |
try: | |
print(f"π Starting upload of {model_path.name} to datadrones.com...") | |
model_name = (metadata.get("model_name") | |
or metadata.get("civitai").get("name") | |
or metadata.get("name")) | |
# Add civitai description if available | |
civitai = metadata.get("civitai", {}) | |
is_nsfw = civitai.get("nsfw", False) | |
if civitai and "modelVersions" in civitai: | |
model_versions = civitai.get("modelVersions") | |
# Add image if available | |
if civitai and "images" in civitai and len(civitai["images"]) > 0: | |
image_url = civitai["images"][0].get("url") | |
if not image_url and model_versions: | |
# try in model versions | |
image_url = model_versions[0]["images"][0].get("url") | |
if image_url: | |
description += f"\n\n" | |
if civitai and "type" in civitai: | |
model_type = civitai.get("type") | |
# could be version id api | |
if civitai and "model" in civitai: | |
model = civitai["model"] | |
model_type = model.get("type") | |
is_nsfw = model.get("nsfw") | |
model_name = model.get("name") | |
model_description = model.get("description") | |
tags = model.get("tags") | |
if model_description: | |
description += f"\n\n{model_description}" | |
if model_name: | |
description = f"{model_name} \n" + description | |
if civitai and "description" in civitai: | |
if description: | |
description += f"\n\n{civitai['description']}" | |
if not description: | |
description = "Possibly deleted" | |
if not tags and metadata.get("tags"): | |
tags = ",".join(metadata.get("tags", [])) | |
if not tags and civitai and "tags" in civitai: | |
tags = ",".join(civitai.get("tags", [])) | |
if civitai and "baseModel" in civitai: | |
base_model = civitai.get("baseModel") | |
if civitai and "modelVersions" in civitai: | |
model_versions = civitai.get("modelVersions") | |
if model_versions: | |
base_model = model_versions[0]["baseModel"] | |
if base_model == "Hunyuan Video": | |
base_model = "HunyuanVideo" | |
# Prepare form data for submission | |
data = { | |
"description": description, | |
"base_model": base_model if base_model else "Other", | |
"tags": tags if tags else "", | |
"model_type": model_type if model_type else "LoRA", | |
"is_nsfw": is_nsfw, | |
} | |
print(f"π Upload data for {model_path.name}:") | |
print(f" - Model name: {model_name}") | |
print(f" - Model type: {data['model_type']}") | |
print(f" - Base model: {data['base_model']}") | |
print(f" - NSFW: {data['is_nsfw']}") | |
print(f" - Tags: {data['tags']}") | |
print(f" - Image URL: {image_url}") | |
print(f" - Description length: {len(data['description'])} chars") | |
print(f" - File size: {model_path.stat().st_size / (1024*1024):.1f} MB") | |
# Submit to datadrones.com, bypass cloudflare | |
with open(model_path, "rb") as f: | |
files = {"file": f} | |
headers = {'Host': 'up.datadrones.com'} | |
print(f"π Making POST request to https://up.datadrones.com/upload for {model_path.name}...") | |
response = requests.post("https://up.datadrones.com/upload", files=files, data=data, headers=headers, timeout=300) | |
print(f"π‘ Response for {model_path.name}:") | |
print(f" - Status code: {response.status_code}") | |
if response.status_code != 200: | |
print(f" - Response text: {response.text}") | |
return response.status_code == 200 | |
except Exception as e: | |
print(f"π₯ Exception during upload of {model_path.name}: {e}") | |
traceback.print_exc() | |
return False | |
def extract_model_files(uploaded_files: List) -> List[Path]: | |
"""Extract model files from uploaded files, handling both direct files and zip archives""" | |
model_files = [] | |
temp_dir = Path(tempfile.mkdtemp()) | |
# Supported model file extensions | |
supported_extensions = {'.safetensors', '.pt', '.bin'} | |
for file_info in uploaded_files: | |
file_path = Path(file_info.name) | |
if file_path.suffix.lower() in supported_extensions: | |
# Direct model file | |
dest_path = temp_dir / file_path.name | |
shutil.copy2(file_path, dest_path) | |
model_files.append(dest_path) | |
elif file_path.suffix.lower() == '.zip': | |
# Extract zip and find model files | |
try: | |
with zipfile.ZipFile(file_path, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
# Find all model files in extracted content | |
for extension in supported_extensions: | |
for extracted_file in temp_dir.rglob(f"*{extension}"): | |
model_files.append(extracted_file) | |
except Exception as e: | |
print(f"Error extracting {file_path}: {e}") | |
return model_files | |
def process_single_file(model_file: Path) -> ProcessResult: | |
"""Process a single model file""" | |
try: | |
print(f"\nπ Processing file: {model_file.name}") | |
# Check file size (skip if over 4GB) | |
file_size = model_file.stat().st_size | |
if file_size > 4 * 1024 * 1024 * 1024: # 4GB | |
print(f"βοΈ Skipping {model_file.name} - over 4GB limit") | |
return ProcessResult( | |
filename=model_file.name, | |
status="skipped", | |
message="File over 4GB size limit" | |
) | |
# Calculate hash | |
print(f"π’ Calculating hash for {model_file.name}...") | |
file_hash = calculate_sha256(model_file) | |
print(f"π Hash: {file_hash}") | |
# Check if already exists in datadrones | |
print(f"π Checking if {file_hash} already exists on datadrones.com...") | |
if check_hash_exists(file_hash): | |
print(f"βοΈ File {model_file.name} already exists on datadrones.com") | |
return ProcessResult( | |
filename=model_file.name, | |
status="skipped", | |
message="Already exists in datadrones.com", | |
file_hash=file_hash | |
) | |
# Find metadata by hash | |
print(f"π Looking up metadata for {file_hash}...") | |
metadata = find_by_hash(file_hash) | |
if not metadata: | |
print(f"β No metadata found for {model_file.name}") | |
return ProcessResult( | |
filename=model_file.name, | |
status="error", | |
message="No metadata found for this file", | |
file_hash=file_hash | |
) | |
print(f"β Found metadata for {model_file.name}") | |
# Submit to datadrones | |
print(f"π Attempting upload of {model_file.name} to datadrones.com...") | |
if submit_to_datadrones(model_file, metadata): | |
print(f"β Successfully uploaded {model_file.name} to datadrones.com") | |
return ProcessResult( | |
filename=model_file.name, | |
status="uploaded", | |
message="Successfully uploaded to datadrones.com", | |
file_hash=file_hash | |
) | |
else: | |
print(f"β Failed to upload {model_file.name} to datadrones.com") | |
return ProcessResult( | |
filename=model_file.name, | |
status="error", | |
message="Failed to upload to datadrones.com", | |
file_hash=file_hash | |
) | |
except Exception as e: | |
print(f"π₯ Error processing {model_file.name}: {e}") | |
traceback.print_exc() | |
return ProcessResult( | |
filename=model_file.name, | |
status="error", | |
message=f"Processing error: {str(e)}" | |
) | |
def update_progress(current: int, total: int, status: str, file_result: ProcessResult = None, final_summary: str = None): | |
"""Update global progress tracking""" | |
with upload_lock: | |
upload_progress["current"] = current | |
upload_progress["total"] = total | |
upload_progress["status"] = status | |
# Store final summary when processing is complete | |
if final_summary: | |
upload_progress["final_summary"] = final_summary | |
if file_result: | |
upload_progress["files_processed"].append({ | |
"filename": file_result.filename, | |
"status": file_result.status, | |
"message": file_result.message, | |
"hash": file_result.file_hash | |
}) | |
if file_result.status == "error": | |
upload_progress["errors"].append(f"{file_result.filename}: {file_result.message}") | |
def process_files_async(uploaded_files: List) -> str: | |
"""Process uploaded files asynchronously""" | |
try: | |
print(f"\n㪠Starting bulk upload process...") | |
# Reset progress | |
with upload_lock: | |
upload_progress.update({ | |
"current": 0, | |
"total": 0, | |
"status": "Extracting files...", | |
"files_processed": [], | |
"errors": [], | |
"final_summary": "" | |
}) | |
# Extract model files | |
print(f"π¦ Extracting model files from uploaded content...") | |
model_files = extract_model_files(uploaded_files) | |
total_files = len(model_files) | |
print(f"π Found {total_files} model files to process") | |
for i, file in enumerate(model_files, 1): | |
print(f" {i}. {file.name} ({file.stat().st_size / (1024*1024):.1f} MB)") | |
if total_files == 0: | |
print("β No supported model files found") | |
return "No supported model files (.safetensors, .pt, .bin) found in uploaded content." | |
update_progress(0, total_files, "Processing files...") | |
# Process files with thread pool for better performance | |
print(f"π Processing {total_files} files with ThreadPoolExecutor...") | |
results = [] | |
with ThreadPoolExecutor(max_workers=3) as executor: | |
future_to_file = { | |
executor.submit(process_single_file, file): file | |
for file in model_files | |
} | |
for i, future in enumerate(as_completed(future_to_file), 1): | |
result = future.result() | |
results.append(result) | |
print(f"π Completed {i}/{total_files}: {result.filename} -> {result.status}") | |
update_progress(i, total_files, f"Processed {i}/{total_files} files", result) | |
# Generate summary | |
uploaded_count = sum(1 for r in results if r.status == "uploaded") | |
skipped_count = sum(1 for r in results if r.status == "skipped") | |
error_count = sum(1 for r in results if r.status == "error") | |
summary = f"""Processing Complete! | |
Total files: {total_files} | |
β Uploaded: {uploaded_count} | |
βοΈ Skipped: {skipped_count} | |
β Errors: {error_count}""" | |
# Update progress with final summary | |
update_progress(total_files, total_files, "Complete", None, summary) | |
print(f"π Bulk upload completed: {uploaded_count} uploaded, {skipped_count} skipped, {error_count} errors") | |
# Cleanup temp files | |
print(f"π§Ή Cleaning up temporary files...") | |
for file in model_files: | |
try: | |
if file.exists(): | |
file.unlink() | |
# Also cleanup parent temp directory if empty | |
parent = file.parent | |
if parent.exists() and not any(parent.iterdir()): | |
parent.rmdir() | |
except: | |
pass | |
return summary | |
except Exception as e: | |
error_msg = f"Processing failed: {str(e)}" | |
print(f"π₯ Bulk processing failed: {e}") | |
traceback.print_exc() | |
update_progress(0, 0, error_msg, None, error_msg) | |
return error_msg | |
def get_progress_update(): | |
"""Get current progress status""" | |
with upload_lock: | |
if upload_progress["total"] == 0: | |
return "No active uploads", "" | |
current = upload_progress["current"] | |
total = upload_progress["total"] | |
status = upload_progress["status"] | |
# Show final summary if processing is complete | |
if current == total and total > 0 and "final_summary" in upload_progress: | |
progress_text = upload_progress["final_summary"] | |
else: | |
progress_text = f"Progress: {current}/{total} - {status}" | |
# Build detailed log | |
log_lines = [] | |
for file_info in upload_progress["files_processed"][-10:]: # Show last 10 | |
status_emoji = {"uploaded": "β ", "skipped": "βοΈ", "error": "β"}.get(file_info["status"], "?") | |
log_lines.append(f"{status_emoji} {file_info['filename']}: {file_info['message']}") | |
if upload_progress["errors"]: | |
log_lines.append(f"\nRecent Errors ({len(upload_progress['errors'])}):") | |
log_lines.extend(upload_progress["errors"][-5:]) # Show last 5 errors | |
detailed_log = "\n".join(log_lines) | |
return progress_text, detailed_log | |
def start_upload(files): | |
"""Start the upload process in a separate thread""" | |
if not files: | |
return "No files selected", "" | |
# Start processing in background thread | |
thread = threading.Thread(target=process_files_async, args=(files,)) | |
thread.daemon = True | |
thread.start() | |
return "Upload started! Check progress below...", "" | |
# Create Gradio interface | |
def create_interface(): | |
with gr.Blocks(title="DataDrones Bulk Uploader", theme=gr.themes.Soft()) as iface: | |
gr.Markdown(""" | |
# π DataDrones Bulk Uploader | |
Upload multiple model files (`.safetensors`, `.pt`, `.bin`) or zip archives containing model files to datadrones.com. | |
**Features:** | |
- Supports direct model file uploads (.safetensors, .pt, .bin) and zip archives | |
- Automatic hash checking to avoid duplicates | |
- Metadata retrieval from Civitai and other sources | |
- Real-time progress tracking | |
- Concurrent processing for faster uploads | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
file_input = gr.File( | |
label="Select model files (.safetensors, .pt, .bin) or .zip archives", | |
file_count="multiple", | |
file_types=[".safetensors", ".pt", ".bin", ".zip"] | |
) | |
upload_btn = gr.Button("π Start Upload", variant="primary", size="lg") | |
with gr.Column(scale=1): | |
gr.Markdown(""" | |
### Instructions: | |
1. Select multiple model files (`.safetensors`, `.pt`, `.bin`) directly, or | |
2. Upload `.zip` archives containing model files | |
3. Click "Start Upload" to begin processing | |
4. Monitor progress in real-time below | |
**Note:** Files over 4GB will be skipped. | |
""") | |
gr.Markdown("---") | |
with gr.Row(): | |
with gr.Column(): | |
progress_display = gr.Textbox( | |
label="Upload Progress", | |
value="Ready to upload", | |
interactive=False | |
) | |
refresh_btn = gr.Button("π Refresh Progress", size="sm") | |
detailed_log = gr.Textbox( | |
label="Detailed Log", | |
value="", | |
lines=15, | |
interactive=False | |
) | |
# Set up event handlers | |
upload_btn.click( | |
fn=start_upload, | |
inputs=[file_input], | |
outputs=[progress_display, detailed_log] | |
) | |
# Manual refresh for progress updates | |
refresh_btn.click( | |
fn=get_progress_update, | |
outputs=[progress_display, detailed_log] | |
) | |
return iface | |
if __name__ == "__main__": | |
app = create_interface() | |
app.queue(max_size=10) # Enable queuing for background processing | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False | |
) | |