Spaces:
Sleeping
Sleeping
import os | |
import torch | |
import time | |
import threading | |
import json | |
import gc | |
from flask import Flask, request, jsonify, send_file, Response, stream_with_context | |
from werkzeug.utils import secure_filename | |
from PIL import Image | |
import io | |
import zipfile | |
import uuid | |
import traceback | |
from huggingface_hub import snapshot_download | |
from flask_cors import CORS | |
import numpy as np | |
import trimesh | |
from transformers import pipeline | |
from scipy.ndimage import gaussian_filter, uniform_filter, median_filter | |
from scipy import interpolate | |
import cv2 | |
app = Flask(__name__) | |
CORS(app) # Enable CORS for all routes | |
# Configure directories | |
UPLOAD_FOLDER = '/tmp/uploads' | |
RESULTS_FOLDER = '/tmp/results' | |
CACHE_DIR = '/tmp/huggingface' | |
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'} | |
# Create necessary directories | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
os.makedirs(RESULTS_FOLDER, exist_ok=True) | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
# Set Hugging Face cache environment variables | |
os.environ['HF_HOME'] = CACHE_DIR | |
os.environ['TRANSFORMERS_CACHE'] = os.path.join(CACHE_DIR, 'transformers') | |
os.environ['HF_DATASETS_CACHE'] = os.path.join(CACHE_DIR, 'datasets') | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max | |
# Job tracking dictionary | |
processing_jobs = {} | |
# Global model variables | |
depth_estimator = None | |
model_loaded = False | |
model_loading = False | |
# Configuration for processing | |
TIMEOUT_SECONDS = 240 # 4 minutes max for processing | |
MAX_DIMENSION = 512 # Max image dimension to process | |
# TimeoutError for handling timeouts | |
class TimeoutError(Exception): | |
pass | |
# Thread-safe timeout implementation | |
def process_with_timeout(function, args, timeout): | |
result = [None] | |
error = [None] | |
completed = [False] | |
def target(): | |
try: | |
result[0] = function(*args) | |
completed[0] = True | |
except Exception as e: | |
error[0] = e | |
thread = threading.Thread(target=target) | |
thread.daemon = True | |
thread.start() | |
thread.join(timeout) | |
if not completed[0]: | |
if thread.is_alive(): | |
return None, TimeoutError(f"Processing timed out after {timeout} seconds") | |
elif error[0]: | |
return None, error[0] | |
if error[0]: | |
return None, error[0] | |
return result[0], None | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
# Enhanced image preprocessing with better detail preservation | |
def preprocess_image(image_path): | |
with Image.open(image_path) as img: | |
img = img.convert("RGB") | |
# Resize if the image is too large | |
if img.width > MAX_DIMENSION or img.height > MAX_DIMENSION: | |
# Calculate new dimensions while preserving aspect ratio | |
if img.width > img.height: | |
new_width = MAX_DIMENSION | |
new_height = int(img.height * (MAX_DIMENSION / img.width)) | |
else: | |
new_height = MAX_DIMENSION | |
new_width = int(img.width * (MAX_DIMENSION / img.height)) | |
# Use high-quality Lanczos resampling for better detail preservation | |
img = img.resize((new_width, new_height), Image.LANCZOS) | |
# Convert to numpy array for additional preprocessing | |
img_array = np.array(img) | |
# Optional: Apply adaptive histogram equalization for better contrast | |
# This helps the depth model detect more details | |
if len(img_array.shape) == 3 and img_array.shape[2] == 3: | |
# Convert to LAB color space | |
lab = cv2.cvtColor(img_array, cv2.COLOR_RGB2LAB) | |
l, a, b = cv2.split(lab) | |
# Apply CLAHE to L channel | |
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
cl = clahe.apply(l) | |
# Merge channels back | |
enhanced_lab = cv2.merge((cl, a, b)) | |
# Convert back to RGB | |
img_array = cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2RGB) | |
# Convert back to PIL Image | |
img = Image.fromarray(img_array) | |
return img | |
def load_model(): | |
global depth_estimator, model_loaded, model_loading | |
if model_loaded: | |
return depth_estimator | |
if model_loading: | |
# Wait for model to load if it's already in progress | |
while model_loading and not model_loaded: | |
time.sleep(0.5) | |
return depth_estimator | |
try: | |
model_loading = True | |
print("Starting model loading...") | |
# Using DPT-Large which provides better detail than DPT-Hybrid | |
# Alternatively, consider "vinvino02/glpn-nyu" for different detail characteristics | |
model_name = "Intel/dpt-large" | |
# Download model with retry mechanism | |
max_retries = 3 | |
retry_delay = 5 | |
for attempt in range(max_retries): | |
try: | |
snapshot_download( | |
repo_id=model_name, | |
cache_dir=CACHE_DIR, | |
resume_download=True, | |
) | |
break | |
except Exception as e: | |
if attempt < max_retries - 1: | |
print(f"Download attempt {attempt+1} failed: {str(e)}. Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
raise | |
# Initialize model with appropriate precision | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Load depth estimator pipeline | |
depth_estimator = pipeline( | |
"depth-estimation", | |
model=model_name, | |
device=device if device == "cuda" else -1, | |
cache_dir=CACHE_DIR | |
) | |
# Optimize memory usage | |
if device == "cuda": | |
torch.cuda.empty_cache() | |
model_loaded = True | |
print(f"Model loaded successfully on {device}") | |
return depth_estimator | |
except Exception as e: | |
print(f"Error loading model: {str(e)}") | |
print(traceback.format_exc()) | |
raise | |
finally: | |
model_loading = False | |
# Enhanced depth processing function to improve detail quality | |
def enhance_depth_map(depth_map, detail_level='medium'): | |
"""Apply sophisticated processing to enhance depth map details""" | |
# Convert to numpy array if needed | |
if isinstance(depth_map, Image.Image): | |
depth_map = np.array(depth_map) | |
# Make sure the depth map is 2D | |
if len(depth_map.shape) > 2: | |
depth_map = np.mean(depth_map, axis=2) if depth_map.shape[2] > 1 else depth_map[:,:,0] | |
# Create a copy for processing | |
enhanced_depth = depth_map.copy().astype(np.float32) | |
# Remove outliers using percentile clipping (more stable than min/max) | |
p_low, p_high = np.percentile(enhanced_depth, [1, 99]) | |
enhanced_depth = np.clip(enhanced_depth, p_low, p_high) | |
# Normalize to 0-1 range for processing | |
enhanced_depth = (enhanced_depth - p_low) / (p_high - p_low) if p_high > p_low else enhanced_depth | |
# Apply different enhancement methods based on detail level | |
if detail_level == 'high': | |
# Apply unsharp masking for edge enhancement - simulating Hunyuan's detail technique | |
# First apply gaussian blur | |
blurred = gaussian_filter(enhanced_depth, sigma=1.5) | |
# Create the unsharp mask | |
mask = enhanced_depth - blurred | |
# Apply the mask with strength factor | |
enhanced_depth = enhanced_depth + 1.5 * mask | |
# Apply bilateral filter to preserve edges while smoothing noise | |
# Simulate using gaussian combinations | |
smooth1 = gaussian_filter(enhanced_depth, sigma=0.5) | |
smooth2 = gaussian_filter(enhanced_depth, sigma=2.0) | |
edge_mask = enhanced_depth - smooth2 | |
enhanced_depth = smooth1 + 1.2 * edge_mask | |
elif detail_level == 'medium': | |
# Less aggressive but still effective enhancement | |
# Apply mild unsharp masking | |
blurred = gaussian_filter(enhanced_depth, sigma=1.0) | |
mask = enhanced_depth - blurred | |
enhanced_depth = enhanced_depth + 0.8 * mask | |
# Apply mild smoothing to reduce noise but preserve edges | |
enhanced_depth = gaussian_filter(enhanced_depth, sigma=0.5) | |
else: # low | |
# Just apply noise reduction without too much detail enhancement | |
enhanced_depth = gaussian_filter(enhanced_depth, sigma=0.7) | |
# Normalize again after processing | |
enhanced_depth = np.clip(enhanced_depth, 0, 1) | |
return enhanced_depth | |
# Convert depth map to 3D mesh with significantly enhanced detail | |
def depth_to_mesh(depth_map, image, resolution=100, detail_level='medium'): | |
"""Convert depth map to 3D mesh with highly improved detail preservation""" | |
# First, enhance the depth map for better details | |
enhanced_depth = enhance_depth_map(depth_map, detail_level) | |
# Get dimensions of depth map | |
h, w = enhanced_depth.shape | |
# Create a higher resolution grid for better detail | |
x = np.linspace(0, w-1, resolution) | |
y = np.linspace(0, h-1, resolution) | |
x_grid, y_grid = np.meshgrid(x, y) | |
# Use bicubic interpolation for smoother surface with better details | |
# Create interpolation function | |
interp_func = interpolate.RectBivariateSpline( | |
np.arange(h), np.arange(w), enhanced_depth, kx=3, ky=3 | |
) | |
# Sample depth at grid points with the interpolation function | |
z_values = interp_func(y, x, grid=True) | |
# Apply a post-processing step to enhance small details even further | |
if detail_level == 'high': | |
# Calculate local gradients to detect edges | |
dx = np.gradient(z_values, axis=1) | |
dy = np.gradient(z_values, axis=0) | |
# Enhance edges by increasing depth differences at high gradient areas | |
gradient_magnitude = np.sqrt(dx**2 + dy**2) | |
edge_mask = np.clip(gradient_magnitude * 5, 0, 0.2) # Scale and limit effect | |
# Apply edge enhancement | |
z_values = z_values + edge_mask * (z_values - gaussian_filter(z_values, sigma=1.0)) | |
# Normalize z-values with advanced scaling for better depth impression | |
z_min, z_max = np.percentile(z_values, [2, 98]) # Remove outliers | |
z_values = (z_values - z_min) / (z_max - z_min) if z_max > z_min else z_values | |
# Apply depth scaling appropriate to the detail level | |
if detail_level == 'high': | |
z_scaling = 2.5 # More pronounced depth variations | |
elif detail_level == 'medium': | |
z_scaling = 2.0 # Standard depth | |
else: | |
z_scaling = 1.5 # More subtle depth variations | |
z_values = z_values * z_scaling | |
# Normalize x and y coordinates | |
x_grid = (x_grid / w - 0.5) * 2.0 # Map to -1 to 1 | |
y_grid = (y_grid / h - 0.5) * 2.0 # Map to -1 to 1 | |
# Create vertices | |
vertices = np.vstack([x_grid.flatten(), -y_grid.flatten(), -z_values.flatten()]).T | |
# Create faces (triangles) with optimized winding for better normals | |
faces = [] | |
for i in range(resolution-1): | |
for j in range(resolution-1): | |
p1 = i * resolution + j | |
p2 = i * resolution + (j + 1) | |
p3 = (i + 1) * resolution + j | |
p4 = (i + 1) * resolution + (j + 1) | |
# Calculate normals to ensure consistent orientation | |
v1 = vertices[p1] | |
v2 = vertices[p2] | |
v3 = vertices[p3] | |
v4 = vertices[p4] | |
# Calculate normals for both possible triangulations | |
# and choose the one that's more consistent | |
norm1 = np.cross(v2-v1, v4-v1) | |
norm2 = np.cross(v4-v3, v1-v3) | |
if np.dot(norm1, norm2) >= 0: | |
# Standard triangulation | |
faces.append([p1, p2, p4]) | |
faces.append([p1, p4, p3]) | |
else: | |
# Alternative triangulation for smoother surface | |
faces.append([p1, p2, p3]) | |
faces.append([p2, p4, p3]) | |
faces = np.array(faces) | |
# Create mesh | |
mesh = trimesh.Trimesh(vertices=vertices, faces=faces) | |
# Apply advanced texturing if image is provided | |
if image: | |
# Convert to numpy array if needed | |
if isinstance(image, Image.Image): | |
img_array = np.array(image) | |
else: | |
img_array = image | |
# Create vertex colors with improved sampling | |
if resolution <= img_array.shape[0] and resolution <= img_array.shape[1]: | |
# Create vertex colors by sampling the image with bilinear interpolation | |
vertex_colors = np.zeros((vertices.shape[0], 4), dtype=np.uint8) | |
# Get normalized coordinates for sampling | |
for i in range(resolution): | |
for j in range(resolution): | |
# Calculate exact image coordinates with proper scaling | |
img_x = j * (img_array.shape[1] - 1) / (resolution - 1) | |
img_y = i * (img_array.shape[0] - 1) / (resolution - 1) | |
# Bilinear interpolation for smooth color transitions | |
x0, y0 = int(img_x), int(img_y) | |
x1, y1 = min(x0 + 1, img_array.shape[1] - 1), min(y0 + 1, img_array.shape[0] - 1) | |
# Calculate interpolation weights | |
wx = img_x - x0 | |
wy = img_y - y0 | |
vertex_idx = i * resolution + j | |
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # RGB | |
# Perform bilinear interpolation for each color channel | |
r = int((1-wx)*(1-wy)*img_array[y0, x0, 0] + wx*(1-wy)*img_array[y0, x1, 0] + | |
(1-wx)*wy*img_array[y1, x0, 0] + wx*wy*img_array[y1, x1, 0]) | |
g = int((1-wx)*(1-wy)*img_array[y0, x0, 1] + wx*(1-wy)*img_array[y0, x1, 1] + | |
(1-wx)*wy*img_array[y1, x0, 1] + wx*wy*img_array[y1, x1, 1]) | |
b = int((1-wx)*(1-wy)*img_array[y0, x0, 2] + wx*(1-wy)*img_array[y0, x1, 2] + | |
(1-wx)*wy*img_array[y1, x0, 2] + wx*wy*img_array[y1, x1, 2]) | |
vertex_colors[vertex_idx, :3] = [r, g, b] | |
vertex_colors[vertex_idx, 3] = 255 # Alpha | |
elif len(img_array.shape) == 3 and img_array.shape[2] == 4: # RGBA | |
for c in range(4): # For each RGBA channel | |
vertex_colors[vertex_idx, c] = int((1-wx)*(1-wy)*img_array[y0, x0, c] + | |
wx*(1-wy)*img_array[y0, x1, c] + | |
(1-wx)*wy*img_array[y1, x0, c] + | |
wx*wy*img_array[y1, x1, c]) | |
else: | |
# Handle grayscale with bilinear interpolation | |
gray = int((1-wx)*(1-wy)*img_array[y0, x0] + wx*(1-wy)*img_array[y0, x1] + | |
(1-wx)*wy*img_array[y1, x0] + wx*wy*img_array[y1, x1]) | |
vertex_colors[vertex_idx, :3] = [gray, gray, gray] | |
vertex_colors[vertex_idx, 3] = 255 | |
mesh.visual.vertex_colors = vertex_colors | |
# Apply smoothing to get rid of staircase artifacts | |
if detail_level != 'high': | |
# For medium and low detail, apply Laplacian smoothing | |
# but preserve the overall shape | |
mesh = mesh.smoothed(method='laplacian', iterations=1) | |
# Calculate and fix normals for better rendering | |
mesh.fix_normals() | |
return mesh | |
def health_check(): | |
return jsonify({ | |
"status": "healthy", | |
"model": "Enhanced Depth-Based 3D Model Generator (DPT-Large)", | |
"device": "cuda" if torch.cuda.is_available() else "cpu" | |
}), 200 | |
def progress(job_id): | |
def generate(): | |
if job_id not in processing_jobs: | |
yield f"data: {json.dumps({'error': 'Job not found'})}\n\n" | |
return | |
job = processing_jobs[job_id] | |
# Send initial progress | |
yield f"data: {json.dumps({'status': 'processing', 'progress': job['progress']})}\n\n" | |
# Wait for job to complete or update | |
last_progress = job['progress'] | |
check_count = 0 | |
while job['status'] == 'processing': | |
if job['progress'] != last_progress: | |
yield f"data: {json.dumps({'status': 'processing', 'progress': job['progress']})}\n\n" | |
last_progress = job['progress'] | |
time.sleep(0.5) | |
check_count += 1 | |
# If client hasn't received updates for a while, check if job is still running | |
if check_count > 60: # 30 seconds with no updates | |
if 'thread_alive' in job and not job['thread_alive'](): | |
job['status'] = 'error' | |
job['error'] = 'Processing thread died unexpectedly' | |
break | |
check_count = 0 | |
# Send final status | |
if job['status'] == 'completed': | |
yield f"data: {json.dumps({'status': 'completed', 'progress': 100, 'result_url': job['result_url'], 'preview_url': job['preview_url']})}\n\n" | |
else: | |
yield f"data: {json.dumps({'status': 'error', 'error': job['error']})}\n\n" | |
return Response(stream_with_context(generate()), mimetype='text/event-stream') | |
def convert_image_to_3d(): | |
# Check if image is in the request | |
if 'image' not in request.files: | |
return jsonify({"error": "No image provided"}), 400 | |
file = request.files['image'] | |
if file.filename == '': | |
return jsonify({"error": "No image selected"}), 400 | |
if not allowed_file(file.filename): | |
return jsonify({"error": f"File type not allowed. Supported types: {', '.join(ALLOWED_EXTENSIONS)}"}), 400 | |
# Get optional parameters with defaults | |
try: | |
mesh_resolution = min(int(request.form.get('mesh_resolution', 100)), 200) # Limit max resolution | |
output_format = request.form.get('output_format', 'obj').lower() | |
detail_level = request.form.get('detail_level', 'medium').lower() # Parameter for detail level | |
texture_quality = request.form.get('texture_quality', 'medium').lower() # New parameter for texture quality | |
except ValueError: | |
return jsonify({"error": "Invalid parameter values"}), 400 | |
# Validate output format | |
if output_format not in ['obj', 'glb']: | |
return jsonify({"error": "Unsupported output format. Use 'obj' or 'glb'"}), 400 | |
# Adjust mesh resolution based on detail level | |
if detail_level == 'high': | |
mesh_resolution = min(int(mesh_resolution * 1.5), 200) | |
elif detail_level == 'low': | |
mesh_resolution = max(int(mesh_resolution * 0.7), 50) | |
# Create a job ID | |
job_id = str(uuid.uuid4()) | |
output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
os.makedirs(output_dir, exist_ok=True) | |
# Save the uploaded file | |
filename = secure_filename(file.filename) | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{job_id}_{filename}") | |
file.save(filepath) | |
# Initialize job tracking | |
processing_jobs[job_id] = { | |
'status': 'processing', | |
'progress': 0, | |
'result_url': None, | |
'preview_url': None, | |
'error': None, | |
'output_format': output_format, | |
'created_at': time.time() | |
} | |
# Start processing in a separate thread | |
def process_image(): | |
thread = threading.current_thread() | |
processing_jobs[job_id]['thread_alive'] = lambda: thread.is_alive() | |
try: | |
# Preprocess image with enhanced detail preservation | |
processing_jobs[job_id]['progress'] = 5 | |
image = preprocess_image(filepath) | |
processing_jobs[job_id]['progress'] = 10 | |
# Load model | |
try: | |
model = load_model() | |
processing_jobs[job_id]['progress'] = 30 | |
except Exception as e: | |
processing_jobs[job_id]['status'] = 'error' | |
processing_jobs[job_id]['error'] = f"Error loading model: {str(e)}" | |
return | |
# Process image with thread-safe timeout | |
try: | |
def estimate_depth(): | |
# Get depth map | |
result = model(image) | |
depth_map = result["depth"] | |
# Convert to numpy array if needed | |
if isinstance(depth_map, torch.Tensor): | |
depth_map = depth_map.cpu().numpy() | |
elif hasattr(depth_map, 'numpy'): | |
depth_map = depth_map.numpy() | |
elif isinstance(depth_map, Image.Image): | |
depth_map = np.array(depth_map) | |
return depth_map | |
depth_map, error = process_with_timeout(estimate_depth, [], TIMEOUT_SECONDS) | |
if error: | |
if isinstance(error, TimeoutError): | |
processing_jobs[job_id]['status'] = 'error' | |
processing_jobs[job_id]['error'] = f"Processing timed out after {TIMEOUT_SECONDS} seconds" | |
return | |
else: | |
raise error | |
processing_jobs[job_id]['progress'] = 60 | |
# Create mesh from depth map with enhanced detail handling | |
mesh_resolution_int = int(mesh_resolution) | |
mesh = depth_to_mesh(depth_map, image, resolution=mesh_resolution_int, detail_level=detail_level) | |
processing_jobs[job_id]['progress'] = 80 | |
except Exception as e: | |
error_details = traceback.format_exc() | |
processing_jobs[job_id]['status'] = 'error' | |
processing_jobs[job_id]['error'] = f"Error during processing: {str(e)}" | |
print(f"Error processing job {job_id}: {str(e)}") | |
print(error_details) | |
return | |
# Export based on requested format with enhanced quality settings | |
try: | |
if output_format == 'obj': | |
obj_path = os.path.join(output_dir, "model.obj") | |
# Export with normal and texture coordinates | |
mesh.export( | |
obj_path, | |
file_type='obj', | |
include_normals=True, | |
include_texture=True | |
) | |
# Create a zip file with OBJ and MTL | |
zip_path = os.path.join(output_dir, "model.zip") | |
with zipfile.ZipFile(zip_path, 'w') as zipf: | |
zipf.write(obj_path, arcname="model.obj") | |
mtl_path = os.path.join(output_dir, "model.mtl") | |
if os.path.exists(mtl_path): | |
zipf.write(mtl_path, arcname="model.mtl") | |
# Include texture file if it exists | |
texture_path = os.path.join(output_dir, "model.png") | |
if os.path.exists(texture_path): | |
zipf.write(texture_path, arcname="model.png") | |
processing_jobs[job_id]['result_url'] = f"/download/{job_id}" | |
processing_jobs[job_id]['preview_url'] = f"/preview/{job_id}" | |
elif output_format == 'glb': | |
# Export as GLB with enhanced settings | |
glb_path = os.path.join(output_dir, "model.glb") | |
mesh.export( | |
glb_path, | |
file_type='glb' | |
) | |
processing_jobs[job_id]['result_url'] = f"/download/{job_id}" | |
processing_jobs[job_id]['preview_url'] = f"/preview/{job_id}" | |
# Update job status | |
processing_jobs[job_id]['status'] = 'completed' | |
processing_jobs[job_id]['progress'] = 100 | |
print(f"Job {job_id} completed successfully") | |
except Exception as e: | |
error_details = traceback.format_exc() | |
processing_jobs[job_id]['status'] = 'error' | |
processing_jobs[job_id]['error'] = f"Error exporting model: {str(e)}" | |
print(f"Error exporting model for job {job_id}: {str(e)}") | |
print(error_details) | |
# Clean up temporary file | |
if os.path.exists(filepath): | |
os.remove(filepath) | |
# Force garbage collection to free memory | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
except Exception as e: | |
# Handle errors | |
error_details = traceback.format_exc() | |
processing_jobs[job_id]['status'] = 'error' | |
processing_jobs[job_id]['error'] = f"{str(e)}\n{error_details}" | |
print(f"Error processing job {job_id}: {str(e)}") | |
print(error_details) | |
# Clean up on error | |
if os.path.exists(filepath): | |
os.remove(filepath) | |
# Start processing thread | |
processing_thread = threading.Thread(target=process_image) | |
processing_thread.daemon = True | |
processing_thread.start() | |
# Return job ID immediately | |
return jsonify({"job_id": job_id}), 202 # 202 Accepted | |
def download_model(job_id): | |
if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed': | |
return jsonify({"error": "Model not found or processing not complete"}), 404 | |
# Get the output directory for this job | |
output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
# Determine file format from the job data | |
output_format = processing_jobs[job_id].get('output_format', 'obj') | |
if output_format == 'obj': | |
zip_path = os.path.join(output_dir, "model.zip") | |
if os.path.exists(zip_path): | |
return send_file(zip_path, as_attachment=True, download_name="model.zip") | |
else: # glb | |
glb_path = os.path.join(output_dir, "model.glb") | |
if os.path.exists(glb_path): | |
return send_file(glb_path, as_attachment=True, download_name="model.glb") | |
return jsonify({"error": "File not found"}), 404 | |
def preview_model(job_id): | |
if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed': | |
return jsonify({"error": "Model not found or processing not complete"}), 404 | |
# Get the output directory for this job | |
output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
output_format = processing_jobs[job_id].get('output_format', 'obj') | |
if output_format == 'obj': | |
obj_path = os.path.join(output_dir, "model.obj") | |
if os.path.exists(obj_path): | |
return send_file(obj_path, mimetype='model/obj') | |
else: # glb | |
glb_path = os.path.join(output_dir, "model.glb") | |
if os.path.exists(glb_path): | |
return send_file(glb_path, mimetype='model/gltf-binary') | |
return jsonify({"error": "Model file not found"}), 404 | |
# Cleanup old jobs periodically | |
def cleanup_old_jobs(): | |
current_time = time.time() | |
job_ids_to_remove = [] | |
for job_id, job_data in processing_jobs.items(): | |
# Remove completed jobs after 1 hour | |
if job_data['status'] == 'completed' and (current_time - job_data.get('created_at', 0)) > 3600: | |
job_ids_to_remove.append(job_id) | |
# Remove error jobs after 30 minutes | |
elif job_data['status'] == 'error' and (current_time - job_data.get('created_at', 0)) > 1800: | |
job_ids_to_remove.append(job_id) | |
# Remove the jobs | |
for job_id in job_ids_to_remove: | |
output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
try: | |
import shutil | |
if os.path.exists(output_dir): | |
shutil.rmtree(output_dir) | |
except Exception as e: | |
print(f"Error cleaning up job {job_id}: {str(e)}") | |
# Remove from tracking dictionary | |
if job_id in processing_jobs: | |
del processing_jobs[job_id] | |
# Schedule the next cleanup | |
threading.Timer(300, cleanup_old_jobs).start() # Run every 5 minutes | |
# New endpoint to get detailed information about a model | |
def model_info(job_id): | |
if job_id not in processing_jobs: | |
return jsonify({"error": "Model not found"}), 404 | |
job = processing_jobs[job_id] | |
if job['status'] != 'completed': | |
return jsonify({ | |
"status": job['status'], | |
"progress": job['progress'], | |
"error": job.get('error') | |
}), 200 | |
# For completed jobs, include information about the model | |
output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
model_stats = {} | |
# Get file size | |
if job['output_format'] == 'obj': | |
obj_path = os.path.join(output_dir, "model.obj") | |
zip_path = os.path.join(output_dir, "model.zip") | |
if os.path.exists(obj_path): | |
model_stats['obj_size'] = os.path.getsize(obj_path) | |
if os.path.exists(zip_path): | |
model_stats['package_size'] = os.path.getsize(zip_path) | |
else: # glb | |
glb_path = os.path.join(output_dir, "model.glb") | |
if os.path.exists(glb_path): | |
model_stats['model_size'] = os.path.getsize(glb_path) | |
# Return detailed info | |
return jsonify({ | |
"status": job['status'], | |
"model_format": job['output_format'], | |
"download_url": job['result_url'], | |
"preview_url": job['preview_url'], | |
"model_stats": model_stats, | |
"created_at": job.get('created_at'), | |
"completed_at": job.get('completed_at') | |
}), 200 | |
def index(): | |
return jsonify({ | |
"message": "Enhanced Image to 3D API (DPT-Large Model)", | |
"endpoints": [ | |
"/convert", | |
"/progress/<job_id>", | |
"/download/<job_id>", | |
"/preview/<job_id>", | |
"/model-info/<job_id>" | |
], | |
"parameters": { | |
"mesh_resolution": "Integer (50-200), controls mesh density", | |
"output_format": "obj or glb", | |
"detail_level": "low, medium, or high - controls the level of detail in the final model", | |
"texture_quality": "low, medium, or high - controls the quality of textures" | |
}, | |
"description": "This API creates high-quality 3D models from 2D images with enhanced detail finishing similar to Hunyuan model" | |
}), 200 | |
# Example endpoint showing how to compare different detail levels | |
def compare_detail_levels(): | |
# Check if image is in the request | |
if 'image' not in request.files: | |
return jsonify({"error": "No image provided"}), 400 | |
file = request.files['image'] | |
if file.filename == '': | |
return jsonify({"error": "No image selected"}), 400 | |
if not allowed_file(file.filename): | |
return jsonify({"error": f"File type not allowed. Supported types: {', '.join(ALLOWED_EXTENSIONS)}"}), 400 | |
# Create a job ID | |
job_id = str(uuid.uuid4()) | |
output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
os.makedirs(output_dir, exist_ok=True) | |
# Save the uploaded file | |
filename = secure_filename(file.filename) | |
filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{job_id}_{filename}") | |
file.save(filepath) | |
# Initialize job tracking | |
processing_jobs[job_id] = { | |
'status': 'processing', | |
'progress': 0, | |
'result_url': None, | |
'preview_url': None, | |
'error': None, | |
'output_format': 'glb', # Use GLB for comparison | |
'created_at': time.time(), | |
'comparison': True | |
} | |
# Process in separate thread to create 3 different detail levels | |
def process_comparison(): | |
thread = threading.current_thread() | |
processing_jobs[job_id]['thread_alive'] = lambda: thread.is_alive() | |
try: | |
# Preprocess image | |
image = preprocess_image(filepath) | |
processing_jobs[job_id]['progress'] = 10 | |
# Load model | |
try: | |
model = load_model() | |
processing_jobs[job_id]['progress'] = 20 | |
except Exception as e: | |
processing_jobs[job_id]['status'] = 'error' | |
processing_jobs[job_id]['error'] = f"Error loading model: {str(e)}" | |
return | |
# Process image to get depth map | |
try: | |
depth_map = model(image)["depth"] | |
if isinstance(depth_map, torch.Tensor): | |
depth_map = depth_map.cpu().numpy() | |
elif hasattr(depth_map, 'numpy'): | |
depth_map = depth_map.numpy() | |
elif isinstance(depth_map, Image.Image): | |
depth_map = np.array(depth_map) | |
processing_jobs[job_id]['progress'] = 40 | |
except Exception as e: | |
processing_jobs[job_id]['status'] = 'error' | |
processing_jobs[job_id]['error'] = f"Error estimating depth: {str(e)}" | |
return | |
# Create meshes at different detail levels | |
result_urls = {} | |
for detail_level in ['low', 'medium', 'high']: | |
try: | |
# Update progress | |
if detail_level == 'low': | |
processing_jobs[job_id]['progress'] = 50 | |
elif detail_level == 'medium': | |
processing_jobs[job_id]['progress'] = 70 | |
else: | |
processing_jobs[job_id]['progress'] = 90 | |
# Create mesh with appropriate detail level | |
mesh_resolution = 100 # Fixed resolution for fair comparison | |
if detail_level == 'high': | |
mesh_resolution = 150 | |
elif detail_level == 'low': | |
mesh_resolution = 80 | |
mesh = depth_to_mesh(depth_map, image, | |
resolution=mesh_resolution, | |
detail_level=detail_level) | |
# Export as GLB | |
model_path = os.path.join(output_dir, f"model_{detail_level}.glb") | |
mesh.export(model_path, file_type='glb') | |
# Add to result URLs | |
result_urls[detail_level] = f"/compare-download/{job_id}/{detail_level}" | |
except Exception as e: | |
print(f"Error processing {detail_level} detail level: {str(e)}") | |
# Continue with other detail levels even if one fails | |
# Update job status | |
processing_jobs[job_id]['status'] = 'completed' | |
processing_jobs[job_id]['progress'] = 100 | |
processing_jobs[job_id]['result_urls'] = result_urls | |
processing_jobs[job_id]['completed_at'] = time.time() | |
# Clean up temporary file | |
if os.path.exists(filepath): | |
os.remove(filepath) | |
# Force garbage collection | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
except Exception as e: | |
# Handle errors | |
processing_jobs[job_id]['status'] = 'error' | |
processing_jobs[job_id]['error'] = f"Error during processing: {str(e)}" | |
# Clean up on error | |
if os.path.exists(filepath): | |
os.remove(filepath) | |
# Start processing thread | |
processing_thread = threading.Thread(target=process_comparison) | |
processing_thread.daemon = True | |
processing_thread.start() | |
# Return job ID immediately | |
return jsonify({"job_id": job_id, "check_progress_at": f"/progress/{job_id}"}), 202 | |
def download_comparison_model(job_id, detail_level): | |
if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed': | |
return jsonify({"error": "Model not found or processing not complete"}), 404 | |
if 'comparison' not in processing_jobs[job_id] or not processing_jobs[job_id]['comparison']: | |
return jsonify({"error": "This is not a comparison job"}), 400 | |
if detail_level not in ['low', 'medium', 'high']: | |
return jsonify({"error": "Invalid detail level"}), 400 | |
# Get the output directory for this job | |
output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
model_path = os.path.join(output_dir, f"model_{detail_level}.glb") | |
if os.path.exists(model_path): | |
return send_file(model_path, as_attachment=True, download_name=f"model_{detail_level}.glb") | |
return jsonify({"error": "File not found"}), 404 | |
if __name__ == '__main__': | |
# Start the cleanup thread | |
cleanup_old_jobs() | |
# Use port 7860 which is standard for Hugging Face Spaces | |
port = int(os.environ.get('PORT', 7860)) | |
app.run(host='0.0.0.0', port=port) |