rightnight / app.py
mac9087's picture
Update app.py
bf928c6 verified
raw
history blame
38.6 kB
import os
import torch
import time
import threading
import json
import gc
from flask import Flask, request, jsonify, send_file, Response, stream_with_context
from werkzeug.utils import secure_filename
from PIL import Image
import io
import zipfile
import uuid
import traceback
from huggingface_hub import snapshot_download
from flask_cors import CORS
import numpy as np
import trimesh
from transformers import pipeline
from scipy.ndimage import gaussian_filter, uniform_filter, median_filter
from scipy import interpolate
import cv2
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# Configure directories
UPLOAD_FOLDER = '/tmp/uploads'
RESULTS_FOLDER = '/tmp/results'
CACHE_DIR = '/tmp/huggingface'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'}
# Create necessary directories
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(RESULTS_FOLDER, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)
# Set Hugging Face cache environment variables
os.environ['HF_HOME'] = CACHE_DIR
os.environ['TRANSFORMERS_CACHE'] = os.path.join(CACHE_DIR, 'transformers')
os.environ['HF_DATASETS_CACHE'] = os.path.join(CACHE_DIR, 'datasets')
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max
# Job tracking dictionary
processing_jobs = {}
# Global model variables
depth_estimator = None
model_loaded = False
model_loading = False
# Configuration for processing
TIMEOUT_SECONDS = 240 # 4 minutes max for processing
MAX_DIMENSION = 512 # Max image dimension to process
# TimeoutError for handling timeouts
class TimeoutError(Exception):
pass
# Thread-safe timeout implementation
def process_with_timeout(function, args, timeout):
result = [None]
error = [None]
completed = [False]
def target():
try:
result[0] = function(*args)
completed[0] = True
except Exception as e:
error[0] = e
thread = threading.Thread(target=target)
thread.daemon = True
thread.start()
thread.join(timeout)
if not completed[0]:
if thread.is_alive():
return None, TimeoutError(f"Processing timed out after {timeout} seconds")
elif error[0]:
return None, error[0]
if error[0]:
return None, error[0]
return result[0], None
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# Enhanced image preprocessing with better detail preservation
def preprocess_image(image_path):
with Image.open(image_path) as img:
img = img.convert("RGB")
# Resize if the image is too large
if img.width > MAX_DIMENSION or img.height > MAX_DIMENSION:
# Calculate new dimensions while preserving aspect ratio
if img.width > img.height:
new_width = MAX_DIMENSION
new_height = int(img.height * (MAX_DIMENSION / img.width))
else:
new_height = MAX_DIMENSION
new_width = int(img.width * (MAX_DIMENSION / img.height))
# Use high-quality Lanczos resampling for better detail preservation
img = img.resize((new_width, new_height), Image.LANCZOS)
# Convert to numpy array for additional preprocessing
img_array = np.array(img)
# Optional: Apply adaptive histogram equalization for better contrast
# This helps the depth model detect more details
if len(img_array.shape) == 3 and img_array.shape[2] == 3:
# Convert to LAB color space
lab = cv2.cvtColor(img_array, cv2.COLOR_RGB2LAB)
l, a, b = cv2.split(lab)
# Apply CLAHE to L channel
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
cl = clahe.apply(l)
# Merge channels back
enhanced_lab = cv2.merge((cl, a, b))
# Convert back to RGB
img_array = cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2RGB)
# Convert back to PIL Image
img = Image.fromarray(img_array)
return img
def load_model():
global depth_estimator, model_loaded, model_loading
if model_loaded:
return depth_estimator
if model_loading:
# Wait for model to load if it's already in progress
while model_loading and not model_loaded:
time.sleep(0.5)
return depth_estimator
try:
model_loading = True
print("Starting model loading...")
# Using DPT-Large which provides better detail than DPT-Hybrid
# Alternatively, consider "vinvino02/glpn-nyu" for different detail characteristics
model_name = "Intel/dpt-large"
# Download model with retry mechanism
max_retries = 3
retry_delay = 5
for attempt in range(max_retries):
try:
snapshot_download(
repo_id=model_name,
cache_dir=CACHE_DIR,
resume_download=True,
)
break
except Exception as e:
if attempt < max_retries - 1:
print(f"Download attempt {attempt+1} failed: {str(e)}. Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
retry_delay *= 2
else:
raise
# Initialize model with appropriate precision
device = "cuda" if torch.cuda.is_available() else "cpu"
# Load depth estimator pipeline
depth_estimator = pipeline(
"depth-estimation",
model=model_name,
device=device if device == "cuda" else -1,
cache_dir=CACHE_DIR
)
# Optimize memory usage
if device == "cuda":
torch.cuda.empty_cache()
model_loaded = True
print(f"Model loaded successfully on {device}")
return depth_estimator
except Exception as e:
print(f"Error loading model: {str(e)}")
print(traceback.format_exc())
raise
finally:
model_loading = False
# Enhanced depth processing function to improve detail quality
def enhance_depth_map(depth_map, detail_level='medium'):
"""Apply sophisticated processing to enhance depth map details"""
# Convert to numpy array if needed
if isinstance(depth_map, Image.Image):
depth_map = np.array(depth_map)
# Make sure the depth map is 2D
if len(depth_map.shape) > 2:
depth_map = np.mean(depth_map, axis=2) if depth_map.shape[2] > 1 else depth_map[:,:,0]
# Create a copy for processing
enhanced_depth = depth_map.copy().astype(np.float32)
# Remove outliers using percentile clipping (more stable than min/max)
p_low, p_high = np.percentile(enhanced_depth, [1, 99])
enhanced_depth = np.clip(enhanced_depth, p_low, p_high)
# Normalize to 0-1 range for processing
enhanced_depth = (enhanced_depth - p_low) / (p_high - p_low) if p_high > p_low else enhanced_depth
# Apply different enhancement methods based on detail level
if detail_level == 'high':
# Apply unsharp masking for edge enhancement - simulating Hunyuan's detail technique
# First apply gaussian blur
blurred = gaussian_filter(enhanced_depth, sigma=1.5)
# Create the unsharp mask
mask = enhanced_depth - blurred
# Apply the mask with strength factor
enhanced_depth = enhanced_depth + 1.5 * mask
# Apply bilateral filter to preserve edges while smoothing noise
# Simulate using gaussian combinations
smooth1 = gaussian_filter(enhanced_depth, sigma=0.5)
smooth2 = gaussian_filter(enhanced_depth, sigma=2.0)
edge_mask = enhanced_depth - smooth2
enhanced_depth = smooth1 + 1.2 * edge_mask
elif detail_level == 'medium':
# Less aggressive but still effective enhancement
# Apply mild unsharp masking
blurred = gaussian_filter(enhanced_depth, sigma=1.0)
mask = enhanced_depth - blurred
enhanced_depth = enhanced_depth + 0.8 * mask
# Apply mild smoothing to reduce noise but preserve edges
enhanced_depth = gaussian_filter(enhanced_depth, sigma=0.5)
else: # low
# Just apply noise reduction without too much detail enhancement
enhanced_depth = gaussian_filter(enhanced_depth, sigma=0.7)
# Normalize again after processing
enhanced_depth = np.clip(enhanced_depth, 0, 1)
return enhanced_depth
# Convert depth map to 3D mesh with significantly enhanced detail
def depth_to_mesh(depth_map, image, resolution=100, detail_level='medium'):
"""Convert depth map to 3D mesh with highly improved detail preservation"""
# First, enhance the depth map for better details
enhanced_depth = enhance_depth_map(depth_map, detail_level)
# Get dimensions of depth map
h, w = enhanced_depth.shape
# Create a higher resolution grid for better detail
x = np.linspace(0, w-1, resolution)
y = np.linspace(0, h-1, resolution)
x_grid, y_grid = np.meshgrid(x, y)
# Use bicubic interpolation for smoother surface with better details
# Create interpolation function
interp_func = interpolate.RectBivariateSpline(
np.arange(h), np.arange(w), enhanced_depth, kx=3, ky=3
)
# Sample depth at grid points with the interpolation function
z_values = interp_func(y, x, grid=True)
# Apply a post-processing step to enhance small details even further
if detail_level == 'high':
# Calculate local gradients to detect edges
dx = np.gradient(z_values, axis=1)
dy = np.gradient(z_values, axis=0)
# Enhance edges by increasing depth differences at high gradient areas
gradient_magnitude = np.sqrt(dx**2 + dy**2)
edge_mask = np.clip(gradient_magnitude * 5, 0, 0.2) # Scale and limit effect
# Apply edge enhancement
z_values = z_values + edge_mask * (z_values - gaussian_filter(z_values, sigma=1.0))
# Normalize z-values with advanced scaling for better depth impression
z_min, z_max = np.percentile(z_values, [2, 98]) # Remove outliers
z_values = (z_values - z_min) / (z_max - z_min) if z_max > z_min else z_values
# Apply depth scaling appropriate to the detail level
if detail_level == 'high':
z_scaling = 2.5 # More pronounced depth variations
elif detail_level == 'medium':
z_scaling = 2.0 # Standard depth
else:
z_scaling = 1.5 # More subtle depth variations
z_values = z_values * z_scaling
# Normalize x and y coordinates
x_grid = (x_grid / w - 0.5) * 2.0 # Map to -1 to 1
y_grid = (y_grid / h - 0.5) * 2.0 # Map to -1 to 1
# Create vertices
vertices = np.vstack([x_grid.flatten(), -y_grid.flatten(), -z_values.flatten()]).T
# Create faces (triangles) with optimized winding for better normals
faces = []
for i in range(resolution-1):
for j in range(resolution-1):
p1 = i * resolution + j
p2 = i * resolution + (j + 1)
p3 = (i + 1) * resolution + j
p4 = (i + 1) * resolution + (j + 1)
# Calculate normals to ensure consistent orientation
v1 = vertices[p1]
v2 = vertices[p2]
v3 = vertices[p3]
v4 = vertices[p4]
# Calculate normals for both possible triangulations
# and choose the one that's more consistent
norm1 = np.cross(v2-v1, v4-v1)
norm2 = np.cross(v4-v3, v1-v3)
if np.dot(norm1, norm2) >= 0:
# Standard triangulation
faces.append([p1, p2, p4])
faces.append([p1, p4, p3])
else:
# Alternative triangulation for smoother surface
faces.append([p1, p2, p3])
faces.append([p2, p4, p3])
faces = np.array(faces)
# Create mesh
mesh = trimesh.Trimesh(vertices=vertices, faces=faces)
# Apply advanced texturing if image is provided
if image:
# Convert to numpy array if needed
if isinstance(image, Image.Image):
img_array = np.array(image)
else:
img_array = image
# Create vertex colors with improved sampling
if resolution <= img_array.shape[0] and resolution <= img_array.shape[1]:
# Create vertex colors by sampling the image with bilinear interpolation
vertex_colors = np.zeros((vertices.shape[0], 4), dtype=np.uint8)
# Get normalized coordinates for sampling
for i in range(resolution):
for j in range(resolution):
# Calculate exact image coordinates with proper scaling
img_x = j * (img_array.shape[1] - 1) / (resolution - 1)
img_y = i * (img_array.shape[0] - 1) / (resolution - 1)
# Bilinear interpolation for smooth color transitions
x0, y0 = int(img_x), int(img_y)
x1, y1 = min(x0 + 1, img_array.shape[1] - 1), min(y0 + 1, img_array.shape[0] - 1)
# Calculate interpolation weights
wx = img_x - x0
wy = img_y - y0
vertex_idx = i * resolution + j
if len(img_array.shape) == 3 and img_array.shape[2] == 3: # RGB
# Perform bilinear interpolation for each color channel
r = int((1-wx)*(1-wy)*img_array[y0, x0, 0] + wx*(1-wy)*img_array[y0, x1, 0] +
(1-wx)*wy*img_array[y1, x0, 0] + wx*wy*img_array[y1, x1, 0])
g = int((1-wx)*(1-wy)*img_array[y0, x0, 1] + wx*(1-wy)*img_array[y0, x1, 1] +
(1-wx)*wy*img_array[y1, x0, 1] + wx*wy*img_array[y1, x1, 1])
b = int((1-wx)*(1-wy)*img_array[y0, x0, 2] + wx*(1-wy)*img_array[y0, x1, 2] +
(1-wx)*wy*img_array[y1, x0, 2] + wx*wy*img_array[y1, x1, 2])
vertex_colors[vertex_idx, :3] = [r, g, b]
vertex_colors[vertex_idx, 3] = 255 # Alpha
elif len(img_array.shape) == 3 and img_array.shape[2] == 4: # RGBA
for c in range(4): # For each RGBA channel
vertex_colors[vertex_idx, c] = int((1-wx)*(1-wy)*img_array[y0, x0, c] +
wx*(1-wy)*img_array[y0, x1, c] +
(1-wx)*wy*img_array[y1, x0, c] +
wx*wy*img_array[y1, x1, c])
else:
# Handle grayscale with bilinear interpolation
gray = int((1-wx)*(1-wy)*img_array[y0, x0] + wx*(1-wy)*img_array[y0, x1] +
(1-wx)*wy*img_array[y1, x0] + wx*wy*img_array[y1, x1])
vertex_colors[vertex_idx, :3] = [gray, gray, gray]
vertex_colors[vertex_idx, 3] = 255
mesh.visual.vertex_colors = vertex_colors
# Apply smoothing to get rid of staircase artifacts
if detail_level != 'high':
# For medium and low detail, apply Laplacian smoothing
# but preserve the overall shape
mesh = mesh.smoothed(method='laplacian', iterations=1)
# Calculate and fix normals for better rendering
mesh.fix_normals()
return mesh
@app.route('/health', methods=['GET'])
def health_check():
return jsonify({
"status": "healthy",
"model": "Enhanced Depth-Based 3D Model Generator (DPT-Large)",
"device": "cuda" if torch.cuda.is_available() else "cpu"
}), 200
@app.route('/progress/<job_id>', methods=['GET'])
def progress(job_id):
def generate():
if job_id not in processing_jobs:
yield f"data: {json.dumps({'error': 'Job not found'})}\n\n"
return
job = processing_jobs[job_id]
# Send initial progress
yield f"data: {json.dumps({'status': 'processing', 'progress': job['progress']})}\n\n"
# Wait for job to complete or update
last_progress = job['progress']
check_count = 0
while job['status'] == 'processing':
if job['progress'] != last_progress:
yield f"data: {json.dumps({'status': 'processing', 'progress': job['progress']})}\n\n"
last_progress = job['progress']
time.sleep(0.5)
check_count += 1
# If client hasn't received updates for a while, check if job is still running
if check_count > 60: # 30 seconds with no updates
if 'thread_alive' in job and not job['thread_alive']():
job['status'] = 'error'
job['error'] = 'Processing thread died unexpectedly'
break
check_count = 0
# Send final status
if job['status'] == 'completed':
yield f"data: {json.dumps({'status': 'completed', 'progress': 100, 'result_url': job['result_url'], 'preview_url': job['preview_url']})}\n\n"
else:
yield f"data: {json.dumps({'status': 'error', 'error': job['error']})}\n\n"
return Response(stream_with_context(generate()), mimetype='text/event-stream')
@app.route('/convert', methods=['POST'])
def convert_image_to_3d():
# Check if image is in the request
if 'image' not in request.files:
return jsonify({"error": "No image provided"}), 400
file = request.files['image']
if file.filename == '':
return jsonify({"error": "No image selected"}), 400
if not allowed_file(file.filename):
return jsonify({"error": f"File type not allowed. Supported types: {', '.join(ALLOWED_EXTENSIONS)}"}), 400
# Get optional parameters with defaults
try:
mesh_resolution = min(int(request.form.get('mesh_resolution', 100)), 200) # Limit max resolution
output_format = request.form.get('output_format', 'obj').lower()
detail_level = request.form.get('detail_level', 'medium').lower() # Parameter for detail level
texture_quality = request.form.get('texture_quality', 'medium').lower() # New parameter for texture quality
except ValueError:
return jsonify({"error": "Invalid parameter values"}), 400
# Validate output format
if output_format not in ['obj', 'glb']:
return jsonify({"error": "Unsupported output format. Use 'obj' or 'glb'"}), 400
# Adjust mesh resolution based on detail level
if detail_level == 'high':
mesh_resolution = min(int(mesh_resolution * 1.5), 200)
elif detail_level == 'low':
mesh_resolution = max(int(mesh_resolution * 0.7), 50)
# Create a job ID
job_id = str(uuid.uuid4())
output_dir = os.path.join(RESULTS_FOLDER, job_id)
os.makedirs(output_dir, exist_ok=True)
# Save the uploaded file
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{job_id}_{filename}")
file.save(filepath)
# Initialize job tracking
processing_jobs[job_id] = {
'status': 'processing',
'progress': 0,
'result_url': None,
'preview_url': None,
'error': None,
'output_format': output_format,
'created_at': time.time()
}
# Start processing in a separate thread
def process_image():
thread = threading.current_thread()
processing_jobs[job_id]['thread_alive'] = lambda: thread.is_alive()
try:
# Preprocess image with enhanced detail preservation
processing_jobs[job_id]['progress'] = 5
image = preprocess_image(filepath)
processing_jobs[job_id]['progress'] = 10
# Load model
try:
model = load_model()
processing_jobs[job_id]['progress'] = 30
except Exception as e:
processing_jobs[job_id]['status'] = 'error'
processing_jobs[job_id]['error'] = f"Error loading model: {str(e)}"
return
# Process image with thread-safe timeout
try:
def estimate_depth():
# Get depth map
result = model(image)
depth_map = result["depth"]
# Convert to numpy array if needed
if isinstance(depth_map, torch.Tensor):
depth_map = depth_map.cpu().numpy()
elif hasattr(depth_map, 'numpy'):
depth_map = depth_map.numpy()
elif isinstance(depth_map, Image.Image):
depth_map = np.array(depth_map)
return depth_map
depth_map, error = process_with_timeout(estimate_depth, [], TIMEOUT_SECONDS)
if error:
if isinstance(error, TimeoutError):
processing_jobs[job_id]['status'] = 'error'
processing_jobs[job_id]['error'] = f"Processing timed out after {TIMEOUT_SECONDS} seconds"
return
else:
raise error
processing_jobs[job_id]['progress'] = 60
# Create mesh from depth map with enhanced detail handling
mesh_resolution_int = int(mesh_resolution)
mesh = depth_to_mesh(depth_map, image, resolution=mesh_resolution_int, detail_level=detail_level)
processing_jobs[job_id]['progress'] = 80
except Exception as e:
error_details = traceback.format_exc()
processing_jobs[job_id]['status'] = 'error'
processing_jobs[job_id]['error'] = f"Error during processing: {str(e)}"
print(f"Error processing job {job_id}: {str(e)}")
print(error_details)
return
# Export based on requested format with enhanced quality settings
try:
if output_format == 'obj':
obj_path = os.path.join(output_dir, "model.obj")
# Export with normal and texture coordinates
mesh.export(
obj_path,
file_type='obj',
include_normals=True,
include_texture=True
)
# Create a zip file with OBJ and MTL
zip_path = os.path.join(output_dir, "model.zip")
with zipfile.ZipFile(zip_path, 'w') as zipf:
zipf.write(obj_path, arcname="model.obj")
mtl_path = os.path.join(output_dir, "model.mtl")
if os.path.exists(mtl_path):
zipf.write(mtl_path, arcname="model.mtl")
# Include texture file if it exists
texture_path = os.path.join(output_dir, "model.png")
if os.path.exists(texture_path):
zipf.write(texture_path, arcname="model.png")
processing_jobs[job_id]['result_url'] = f"/download/{job_id}"
processing_jobs[job_id]['preview_url'] = f"/preview/{job_id}"
elif output_format == 'glb':
# Export as GLB with enhanced settings
glb_path = os.path.join(output_dir, "model.glb")
mesh.export(
glb_path,
file_type='glb'
)
processing_jobs[job_id]['result_url'] = f"/download/{job_id}"
processing_jobs[job_id]['preview_url'] = f"/preview/{job_id}"
# Update job status
processing_jobs[job_id]['status'] = 'completed'
processing_jobs[job_id]['progress'] = 100
print(f"Job {job_id} completed successfully")
except Exception as e:
error_details = traceback.format_exc()
processing_jobs[job_id]['status'] = 'error'
processing_jobs[job_id]['error'] = f"Error exporting model: {str(e)}"
print(f"Error exporting model for job {job_id}: {str(e)}")
print(error_details)
# Clean up temporary file
if os.path.exists(filepath):
os.remove(filepath)
# Force garbage collection to free memory
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception as e:
# Handle errors
error_details = traceback.format_exc()
processing_jobs[job_id]['status'] = 'error'
processing_jobs[job_id]['error'] = f"{str(e)}\n{error_details}"
print(f"Error processing job {job_id}: {str(e)}")
print(error_details)
# Clean up on error
if os.path.exists(filepath):
os.remove(filepath)
# Start processing thread
processing_thread = threading.Thread(target=process_image)
processing_thread.daemon = True
processing_thread.start()
# Return job ID immediately
return jsonify({"job_id": job_id}), 202 # 202 Accepted
@app.route('/download/<job_id>', methods=['GET'])
def download_model(job_id):
if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed':
return jsonify({"error": "Model not found or processing not complete"}), 404
# Get the output directory for this job
output_dir = os.path.join(RESULTS_FOLDER, job_id)
# Determine file format from the job data
output_format = processing_jobs[job_id].get('output_format', 'obj')
if output_format == 'obj':
zip_path = os.path.join(output_dir, "model.zip")
if os.path.exists(zip_path):
return send_file(zip_path, as_attachment=True, download_name="model.zip")
else: # glb
glb_path = os.path.join(output_dir, "model.glb")
if os.path.exists(glb_path):
return send_file(glb_path, as_attachment=True, download_name="model.glb")
return jsonify({"error": "File not found"}), 404
@app.route('/preview/<job_id>', methods=['GET'])
def preview_model(job_id):
if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed':
return jsonify({"error": "Model not found or processing not complete"}), 404
# Get the output directory for this job
output_dir = os.path.join(RESULTS_FOLDER, job_id)
output_format = processing_jobs[job_id].get('output_format', 'obj')
if output_format == 'obj':
obj_path = os.path.join(output_dir, "model.obj")
if os.path.exists(obj_path):
return send_file(obj_path, mimetype='model/obj')
else: # glb
glb_path = os.path.join(output_dir, "model.glb")
if os.path.exists(glb_path):
return send_file(glb_path, mimetype='model/gltf-binary')
return jsonify({"error": "Model file not found"}), 404
# Cleanup old jobs periodically
def cleanup_old_jobs():
current_time = time.time()
job_ids_to_remove = []
for job_id, job_data in processing_jobs.items():
# Remove completed jobs after 1 hour
if job_data['status'] == 'completed' and (current_time - job_data.get('created_at', 0)) > 3600:
job_ids_to_remove.append(job_id)
# Remove error jobs after 30 minutes
elif job_data['status'] == 'error' and (current_time - job_data.get('created_at', 0)) > 1800:
job_ids_to_remove.append(job_id)
# Remove the jobs
for job_id in job_ids_to_remove:
output_dir = os.path.join(RESULTS_FOLDER, job_id)
try:
import shutil
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
except Exception as e:
print(f"Error cleaning up job {job_id}: {str(e)}")
# Remove from tracking dictionary
if job_id in processing_jobs:
del processing_jobs[job_id]
# Schedule the next cleanup
threading.Timer(300, cleanup_old_jobs).start() # Run every 5 minutes
# New endpoint to get detailed information about a model
@app.route('/model-info/<job_id>', methods=['GET'])
def model_info(job_id):
if job_id not in processing_jobs:
return jsonify({"error": "Model not found"}), 404
job = processing_jobs[job_id]
if job['status'] != 'completed':
return jsonify({
"status": job['status'],
"progress": job['progress'],
"error": job.get('error')
}), 200
# For completed jobs, include information about the model
output_dir = os.path.join(RESULTS_FOLDER, job_id)
model_stats = {}
# Get file size
if job['output_format'] == 'obj':
obj_path = os.path.join(output_dir, "model.obj")
zip_path = os.path.join(output_dir, "model.zip")
if os.path.exists(obj_path):
model_stats['obj_size'] = os.path.getsize(obj_path)
if os.path.exists(zip_path):
model_stats['package_size'] = os.path.getsize(zip_path)
else: # glb
glb_path = os.path.join(output_dir, "model.glb")
if os.path.exists(glb_path):
model_stats['model_size'] = os.path.getsize(glb_path)
# Return detailed info
return jsonify({
"status": job['status'],
"model_format": job['output_format'],
"download_url": job['result_url'],
"preview_url": job['preview_url'],
"model_stats": model_stats,
"created_at": job.get('created_at'),
"completed_at": job.get('completed_at')
}), 200
@app.route('/', methods=['GET'])
def index():
return jsonify({
"message": "Enhanced Image to 3D API (DPT-Large Model)",
"endpoints": [
"/convert",
"/progress/<job_id>",
"/download/<job_id>",
"/preview/<job_id>",
"/model-info/<job_id>"
],
"parameters": {
"mesh_resolution": "Integer (50-200), controls mesh density",
"output_format": "obj or glb",
"detail_level": "low, medium, or high - controls the level of detail in the final model",
"texture_quality": "low, medium, or high - controls the quality of textures"
},
"description": "This API creates high-quality 3D models from 2D images with enhanced detail finishing similar to Hunyuan model"
}), 200
# Example endpoint showing how to compare different detail levels
@app.route('/detail-comparison', methods=['POST'])
def compare_detail_levels():
# Check if image is in the request
if 'image' not in request.files:
return jsonify({"error": "No image provided"}), 400
file = request.files['image']
if file.filename == '':
return jsonify({"error": "No image selected"}), 400
if not allowed_file(file.filename):
return jsonify({"error": f"File type not allowed. Supported types: {', '.join(ALLOWED_EXTENSIONS)}"}), 400
# Create a job ID
job_id = str(uuid.uuid4())
output_dir = os.path.join(RESULTS_FOLDER, job_id)
os.makedirs(output_dir, exist_ok=True)
# Save the uploaded file
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{job_id}_{filename}")
file.save(filepath)
# Initialize job tracking
processing_jobs[job_id] = {
'status': 'processing',
'progress': 0,
'result_url': None,
'preview_url': None,
'error': None,
'output_format': 'glb', # Use GLB for comparison
'created_at': time.time(),
'comparison': True
}
# Process in separate thread to create 3 different detail levels
def process_comparison():
thread = threading.current_thread()
processing_jobs[job_id]['thread_alive'] = lambda: thread.is_alive()
try:
# Preprocess image
image = preprocess_image(filepath)
processing_jobs[job_id]['progress'] = 10
# Load model
try:
model = load_model()
processing_jobs[job_id]['progress'] = 20
except Exception as e:
processing_jobs[job_id]['status'] = 'error'
processing_jobs[job_id]['error'] = f"Error loading model: {str(e)}"
return
# Process image to get depth map
try:
depth_map = model(image)["depth"]
if isinstance(depth_map, torch.Tensor):
depth_map = depth_map.cpu().numpy()
elif hasattr(depth_map, 'numpy'):
depth_map = depth_map.numpy()
elif isinstance(depth_map, Image.Image):
depth_map = np.array(depth_map)
processing_jobs[job_id]['progress'] = 40
except Exception as e:
processing_jobs[job_id]['status'] = 'error'
processing_jobs[job_id]['error'] = f"Error estimating depth: {str(e)}"
return
# Create meshes at different detail levels
result_urls = {}
for detail_level in ['low', 'medium', 'high']:
try:
# Update progress
if detail_level == 'low':
processing_jobs[job_id]['progress'] = 50
elif detail_level == 'medium':
processing_jobs[job_id]['progress'] = 70
else:
processing_jobs[job_id]['progress'] = 90
# Create mesh with appropriate detail level
mesh_resolution = 100 # Fixed resolution for fair comparison
if detail_level == 'high':
mesh_resolution = 150
elif detail_level == 'low':
mesh_resolution = 80
mesh = depth_to_mesh(depth_map, image,
resolution=mesh_resolution,
detail_level=detail_level)
# Export as GLB
model_path = os.path.join(output_dir, f"model_{detail_level}.glb")
mesh.export(model_path, file_type='glb')
# Add to result URLs
result_urls[detail_level] = f"/compare-download/{job_id}/{detail_level}"
except Exception as e:
print(f"Error processing {detail_level} detail level: {str(e)}")
# Continue with other detail levels even if one fails
# Update job status
processing_jobs[job_id]['status'] = 'completed'
processing_jobs[job_id]['progress'] = 100
processing_jobs[job_id]['result_urls'] = result_urls
processing_jobs[job_id]['completed_at'] = time.time()
# Clean up temporary file
if os.path.exists(filepath):
os.remove(filepath)
# Force garbage collection
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception as e:
# Handle errors
processing_jobs[job_id]['status'] = 'error'
processing_jobs[job_id]['error'] = f"Error during processing: {str(e)}"
# Clean up on error
if os.path.exists(filepath):
os.remove(filepath)
# Start processing thread
processing_thread = threading.Thread(target=process_comparison)
processing_thread.daemon = True
processing_thread.start()
# Return job ID immediately
return jsonify({"job_id": job_id, "check_progress_at": f"/progress/{job_id}"}), 202
@app.route('/compare-download/<job_id>/<detail_level>', methods=['GET'])
def download_comparison_model(job_id, detail_level):
if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed':
return jsonify({"error": "Model not found or processing not complete"}), 404
if 'comparison' not in processing_jobs[job_id] or not processing_jobs[job_id]['comparison']:
return jsonify({"error": "This is not a comparison job"}), 400
if detail_level not in ['low', 'medium', 'high']:
return jsonify({"error": "Invalid detail level"}), 400
# Get the output directory for this job
output_dir = os.path.join(RESULTS_FOLDER, job_id)
model_path = os.path.join(output_dir, f"model_{detail_level}.glb")
if os.path.exists(model_path):
return send_file(model_path, as_attachment=True, download_name=f"model_{detail_level}.glb")
return jsonify({"error": "File not found"}), 404
if __name__ == '__main__':
# Start the cleanup thread
cleanup_old_jobs()
# Use port 7860 which is standard for Hugging Face Spaces
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port)