import os import torch import time import threading import json import gc from flask import Flask, request, jsonify, send_file, Response, stream_with_context from werkzeug.utils import secure_filename from PIL import Image import io import zipfile import uuid import traceback from huggingface_hub import snapshot_download from flask_cors import CORS import numpy as np import trimesh from transformers import pipeline from scipy.ndimage import gaussian_filter, uniform_filter, median_filter from scipy import interpolate import cv2 app = Flask(__name__) CORS(app) # Enable CORS for all routes # Configure directories UPLOAD_FOLDER = '/tmp/uploads' RESULTS_FOLDER = '/tmp/results' CACHE_DIR = '/tmp/huggingface' ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'} # Create necessary directories os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(RESULTS_FOLDER, exist_ok=True) os.makedirs(CACHE_DIR, exist_ok=True) # Set Hugging Face cache environment variables os.environ['HF_HOME'] = CACHE_DIR os.environ['TRANSFORMERS_CACHE'] = os.path.join(CACHE_DIR, 'transformers') os.environ['HF_DATASETS_CACHE'] = os.path.join(CACHE_DIR, 'datasets') app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max # Job tracking dictionary processing_jobs = {} # Global model variables depth_estimator = None model_loaded = False model_loading = False # Configuration for processing TIMEOUT_SECONDS = 240 # 4 minutes max for processing MAX_DIMENSION = 512 # Max image dimension to process # TimeoutError for handling timeouts class TimeoutError(Exception): pass # Thread-safe timeout implementation def process_with_timeout(function, args, timeout): result = [None] error = [None] completed = [False] def target(): try: result[0] = function(*args) completed[0] = True except Exception as e: error[0] = e thread = threading.Thread(target=target) thread.daemon = True thread.start() thread.join(timeout) if not completed[0]: if thread.is_alive(): return None, TimeoutError(f"Processing timed out after {timeout} seconds") elif error[0]: return None, error[0] if error[0]: return None, error[0] return result[0], None def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS # Enhanced image preprocessing with better detail preservation def preprocess_image(image_path): with Image.open(image_path) as img: img = img.convert("RGB") # Resize if the image is too large if img.width > MAX_DIMENSION or img.height > MAX_DIMENSION: # Calculate new dimensions while preserving aspect ratio if img.width > img.height: new_width = MAX_DIMENSION new_height = int(img.height * (MAX_DIMENSION / img.width)) else: new_height = MAX_DIMENSION new_width = int(img.width * (MAX_DIMENSION / img.height)) # Use high-quality Lanczos resampling for better detail preservation img = img.resize((new_width, new_height), Image.LANCZOS) # Convert to numpy array for additional preprocessing img_array = np.array(img) # Optional: Apply adaptive histogram equalization for better contrast # This helps the depth model detect more details if len(img_array.shape) == 3 and img_array.shape[2] == 3: # Convert to LAB color space lab = cv2.cvtColor(img_array, cv2.COLOR_RGB2LAB) l, a, b = cv2.split(lab) # Apply CLAHE to L channel clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) cl = clahe.apply(l) # Merge channels back enhanced_lab = cv2.merge((cl, a, b)) # Convert back to RGB img_array = cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2RGB) # Convert back to PIL Image img = Image.fromarray(img_array) return img def load_model(): global depth_estimator, model_loaded, model_loading if model_loaded: return depth_estimator if model_loading: # Wait for model to load if it's already in progress while model_loading and not model_loaded: time.sleep(0.5) return depth_estimator try: model_loading = True print("Starting model loading...") # Using DPT-Large which provides better detail than DPT-Hybrid # Alternatively, consider "vinvino02/glpn-nyu" for different detail characteristics model_name = "Intel/dpt-large" # Download model with retry mechanism max_retries = 3 retry_delay = 5 for attempt in range(max_retries): try: snapshot_download( repo_id=model_name, cache_dir=CACHE_DIR, resume_download=True, ) break except Exception as e: if attempt < max_retries - 1: print(f"Download attempt {attempt+1} failed: {str(e)}. Retrying in {retry_delay} seconds...") time.sleep(retry_delay) retry_delay *= 2 else: raise # Initialize model with appropriate precision device = "cuda" if torch.cuda.is_available() else "cpu" # Load depth estimator pipeline depth_estimator = pipeline( "depth-estimation", model=model_name, device=device if device == "cuda" else -1, cache_dir=CACHE_DIR ) # Optimize memory usage if device == "cuda": torch.cuda.empty_cache() model_loaded = True print(f"Model loaded successfully on {device}") return depth_estimator except Exception as e: print(f"Error loading model: {str(e)}") print(traceback.format_exc()) raise finally: model_loading = False # Enhanced depth processing function to improve detail quality def enhance_depth_map(depth_map, detail_level='medium'): """Apply sophisticated processing to enhance depth map details""" # Convert to numpy array if needed if isinstance(depth_map, Image.Image): depth_map = np.array(depth_map) # Make sure the depth map is 2D if len(depth_map.shape) > 2: depth_map = np.mean(depth_map, axis=2) if depth_map.shape[2] > 1 else depth_map[:,:,0] # Create a copy for processing enhanced_depth = depth_map.copy().astype(np.float32) # Remove outliers using percentile clipping (more stable than min/max) p_low, p_high = np.percentile(enhanced_depth, [1, 99]) enhanced_depth = np.clip(enhanced_depth, p_low, p_high) # Normalize to 0-1 range for processing enhanced_depth = (enhanced_depth - p_low) / (p_high - p_low) if p_high > p_low else enhanced_depth # Apply different enhancement methods based on detail level if detail_level == 'high': # Apply unsharp masking for edge enhancement - simulating Hunyuan's detail technique # First apply gaussian blur blurred = gaussian_filter(enhanced_depth, sigma=1.5) # Create the unsharp mask mask = enhanced_depth - blurred # Apply the mask with strength factor enhanced_depth = enhanced_depth + 1.5 * mask # Apply bilateral filter to preserve edges while smoothing noise # Simulate using gaussian combinations smooth1 = gaussian_filter(enhanced_depth, sigma=0.5) smooth2 = gaussian_filter(enhanced_depth, sigma=2.0) edge_mask = enhanced_depth - smooth2 enhanced_depth = smooth1 + 1.2 * edge_mask elif detail_level == 'medium': # Less aggressive but still effective enhancement # Apply mild unsharp masking blurred = gaussian_filter(enhanced_depth, sigma=1.0) mask = enhanced_depth - blurred enhanced_depth = enhanced_depth + 0.8 * mask # Apply mild smoothing to reduce noise but preserve edges enhanced_depth = gaussian_filter(enhanced_depth, sigma=0.5) else: # low # Just apply noise reduction without too much detail enhancement enhanced_depth = gaussian_filter(enhanced_depth, sigma=0.7) # Normalize again after processing enhanced_depth = np.clip(enhanced_depth, 0, 1) return enhanced_depth # Convert depth map to 3D mesh with significantly enhanced detail def depth_to_mesh(depth_map, image, resolution=100, detail_level='medium'): """Convert depth map to 3D mesh with highly improved detail preservation""" # First, enhance the depth map for better details enhanced_depth = enhance_depth_map(depth_map, detail_level) # Get dimensions of depth map h, w = enhanced_depth.shape # Create a higher resolution grid for better detail x = np.linspace(0, w-1, resolution) y = np.linspace(0, h-1, resolution) x_grid, y_grid = np.meshgrid(x, y) # Use bicubic interpolation for smoother surface with better details # Create interpolation function interp_func = interpolate.RectBivariateSpline( np.arange(h), np.arange(w), enhanced_depth, kx=3, ky=3 ) # Sample depth at grid points with the interpolation function z_values = interp_func(y, x, grid=True) # Apply a post-processing step to enhance small details even further if detail_level == 'high': # Calculate local gradients to detect edges dx = np.gradient(z_values, axis=1) dy = np.gradient(z_values, axis=0) # Enhance edges by increasing depth differences at high gradient areas gradient_magnitude = np.sqrt(dx**2 + dy**2) edge_mask = np.clip(gradient_magnitude * 5, 0, 0.2) # Scale and limit effect # Apply edge enhancement z_values = z_values + edge_mask * (z_values - gaussian_filter(z_values, sigma=1.0)) # Normalize z-values with advanced scaling for better depth impression z_min, z_max = np.percentile(z_values, [2, 98]) # Remove outliers z_values = (z_values - z_min) / (z_max - z_min) if z_max > z_min else z_values # Apply depth scaling appropriate to the detail level if detail_level == 'high': z_scaling = 2.5 # More pronounced depth variations elif detail_level == 'medium': z_scaling = 2.0 # Standard depth else: z_scaling = 1.5 # More subtle depth variations z_values = z_values * z_scaling # Normalize x and y coordinates x_grid = (x_grid / w - 0.5) * 2.0 # Map to -1 to 1 y_grid = (y_grid / h - 0.5) * 2.0 # Map to -1 to 1 # Create vertices vertices = np.vstack([x_grid.flatten(), -y_grid.flatten(), -z_values.flatten()]).T # Create faces (triangles) with optimized winding for better normals faces = [] for i in range(resolution-1): for j in range(resolution-1): p1 = i * resolution + j p2 = i * resolution + (j + 1) p3 = (i + 1) * resolution + j p4 = (i + 1) * resolution + (j + 1) # Calculate normals to ensure consistent orientation v1 = vertices[p1] v2 = vertices[p2] v3 = vertices[p3] v4 = vertices[p4] # Calculate normals for both possible triangulations # and choose the one that's more consistent norm1 = np.cross(v2-v1, v4-v1) norm2 = np.cross(v4-v3, v1-v3) if np.dot(norm1, norm2) >= 0: # Standard triangulation faces.append([p1, p2, p4]) faces.append([p1, p4, p3]) else: # Alternative triangulation for smoother surface faces.append([p1, p2, p3]) faces.append([p2, p4, p3]) faces = np.array(faces) # Create mesh mesh = trimesh.Trimesh(vertices=vertices, faces=faces) # Apply advanced texturing if image is provided if image: # Convert to numpy array if needed if isinstance(image, Image.Image): img_array = np.array(image) else: img_array = image # Create vertex colors with improved sampling if resolution <= img_array.shape[0] and resolution <= img_array.shape[1]: # Create vertex colors by sampling the image with bilinear interpolation vertex_colors = np.zeros((vertices.shape[0], 4), dtype=np.uint8) # Get normalized coordinates for sampling for i in range(resolution): for j in range(resolution): # Calculate exact image coordinates with proper scaling img_x = j * (img_array.shape[1] - 1) / (resolution - 1) img_y = i * (img_array.shape[0] - 1) / (resolution - 1) # Bilinear interpolation for smooth color transitions x0, y0 = int(img_x), int(img_y) x1, y1 = min(x0 + 1, img_array.shape[1] - 1), min(y0 + 1, img_array.shape[0] - 1) # Calculate interpolation weights wx = img_x - x0 wy = img_y - y0 vertex_idx = i * resolution + j if len(img_array.shape) == 3 and img_array.shape[2] == 3: # RGB # Perform bilinear interpolation for each color channel r = int((1-wx)*(1-wy)*img_array[y0, x0, 0] + wx*(1-wy)*img_array[y0, x1, 0] + (1-wx)*wy*img_array[y1, x0, 0] + wx*wy*img_array[y1, x1, 0]) g = int((1-wx)*(1-wy)*img_array[y0, x0, 1] + wx*(1-wy)*img_array[y0, x1, 1] + (1-wx)*wy*img_array[y1, x0, 1] + wx*wy*img_array[y1, x1, 1]) b = int((1-wx)*(1-wy)*img_array[y0, x0, 2] + wx*(1-wy)*img_array[y0, x1, 2] + (1-wx)*wy*img_array[y1, x0, 2] + wx*wy*img_array[y1, x1, 2]) vertex_colors[vertex_idx, :3] = [r, g, b] vertex_colors[vertex_idx, 3] = 255 # Alpha elif len(img_array.shape) == 3 and img_array.shape[2] == 4: # RGBA for c in range(4): # For each RGBA channel vertex_colors[vertex_idx, c] = int((1-wx)*(1-wy)*img_array[y0, x0, c] + wx*(1-wy)*img_array[y0, x1, c] + (1-wx)*wy*img_array[y1, x0, c] + wx*wy*img_array[y1, x1, c]) else: # Handle grayscale with bilinear interpolation gray = int((1-wx)*(1-wy)*img_array[y0, x0] + wx*(1-wy)*img_array[y0, x1] + (1-wx)*wy*img_array[y1, x0] + wx*wy*img_array[y1, x1]) vertex_colors[vertex_idx, :3] = [gray, gray, gray] vertex_colors[vertex_idx, 3] = 255 mesh.visual.vertex_colors = vertex_colors # Apply smoothing to get rid of staircase artifacts if detail_level != 'high': # For medium and low detail, apply Laplacian smoothing # but preserve the overall shape mesh = mesh.smoothed(method='laplacian', iterations=1) # Calculate and fix normals for better rendering mesh.fix_normals() return mesh @app.route('/health', methods=['GET']) def health_check(): return jsonify({ "status": "healthy", "model": "Enhanced Depth-Based 3D Model Generator (DPT-Large)", "device": "cuda" if torch.cuda.is_available() else "cpu" }), 200 @app.route('/progress/', methods=['GET']) def progress(job_id): def generate(): if job_id not in processing_jobs: yield f"data: {json.dumps({'error': 'Job not found'})}\n\n" return job = processing_jobs[job_id] # Send initial progress yield f"data: {json.dumps({'status': 'processing', 'progress': job['progress']})}\n\n" # Wait for job to complete or update last_progress = job['progress'] check_count = 0 while job['status'] == 'processing': if job['progress'] != last_progress: yield f"data: {json.dumps({'status': 'processing', 'progress': job['progress']})}\n\n" last_progress = job['progress'] time.sleep(0.5) check_count += 1 # If client hasn't received updates for a while, check if job is still running if check_count > 60: # 30 seconds with no updates if 'thread_alive' in job and not job['thread_alive'](): job['status'] = 'error' job['error'] = 'Processing thread died unexpectedly' break check_count = 0 # Send final status if job['status'] == 'completed': yield f"data: {json.dumps({'status': 'completed', 'progress': 100, 'result_url': job['result_url'], 'preview_url': job['preview_url']})}\n\n" else: yield f"data: {json.dumps({'status': 'error', 'error': job['error']})}\n\n" return Response(stream_with_context(generate()), mimetype='text/event-stream') @app.route('/convert', methods=['POST']) def convert_image_to_3d(): # Check if image is in the request if 'image' not in request.files: return jsonify({"error": "No image provided"}), 400 file = request.files['image'] if file.filename == '': return jsonify({"error": "No image selected"}), 400 if not allowed_file(file.filename): return jsonify({"error": f"File type not allowed. Supported types: {', '.join(ALLOWED_EXTENSIONS)}"}), 400 # Get optional parameters with defaults try: mesh_resolution = min(int(request.form.get('mesh_resolution', 100)), 200) # Limit max resolution output_format = request.form.get('output_format', 'obj').lower() detail_level = request.form.get('detail_level', 'medium').lower() # Parameter for detail level texture_quality = request.form.get('texture_quality', 'medium').lower() # New parameter for texture quality except ValueError: return jsonify({"error": "Invalid parameter values"}), 400 # Validate output format if output_format not in ['obj', 'glb']: return jsonify({"error": "Unsupported output format. Use 'obj' or 'glb'"}), 400 # Adjust mesh resolution based on detail level if detail_level == 'high': mesh_resolution = min(int(mesh_resolution * 1.5), 200) elif detail_level == 'low': mesh_resolution = max(int(mesh_resolution * 0.7), 50) # Create a job ID job_id = str(uuid.uuid4()) output_dir = os.path.join(RESULTS_FOLDER, job_id) os.makedirs(output_dir, exist_ok=True) # Save the uploaded file filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{job_id}_{filename}") file.save(filepath) # Initialize job tracking processing_jobs[job_id] = { 'status': 'processing', 'progress': 0, 'result_url': None, 'preview_url': None, 'error': None, 'output_format': output_format, 'created_at': time.time() } # Start processing in a separate thread def process_image(): thread = threading.current_thread() processing_jobs[job_id]['thread_alive'] = lambda: thread.is_alive() try: # Preprocess image with enhanced detail preservation processing_jobs[job_id]['progress'] = 5 image = preprocess_image(filepath) processing_jobs[job_id]['progress'] = 10 # Load model try: model = load_model() processing_jobs[job_id]['progress'] = 30 except Exception as e: processing_jobs[job_id]['status'] = 'error' processing_jobs[job_id]['error'] = f"Error loading model: {str(e)}" return # Process image with thread-safe timeout try: def estimate_depth(): # Get depth map result = model(image) depth_map = result["depth"] # Convert to numpy array if needed if isinstance(depth_map, torch.Tensor): depth_map = depth_map.cpu().numpy() elif hasattr(depth_map, 'numpy'): depth_map = depth_map.numpy() elif isinstance(depth_map, Image.Image): depth_map = np.array(depth_map) return depth_map depth_map, error = process_with_timeout(estimate_depth, [], TIMEOUT_SECONDS) if error: if isinstance(error, TimeoutError): processing_jobs[job_id]['status'] = 'error' processing_jobs[job_id]['error'] = f"Processing timed out after {TIMEOUT_SECONDS} seconds" return else: raise error processing_jobs[job_id]['progress'] = 60 # Create mesh from depth map with enhanced detail handling mesh_resolution_int = int(mesh_resolution) mesh = depth_to_mesh(depth_map, image, resolution=mesh_resolution_int, detail_level=detail_level) processing_jobs[job_id]['progress'] = 80 except Exception as e: error_details = traceback.format_exc() processing_jobs[job_id]['status'] = 'error' processing_jobs[job_id]['error'] = f"Error during processing: {str(e)}" print(f"Error processing job {job_id}: {str(e)}") print(error_details) return # Export based on requested format with enhanced quality settings try: if output_format == 'obj': obj_path = os.path.join(output_dir, "model.obj") # Export with normal and texture coordinates mesh.export( obj_path, file_type='obj', include_normals=True, include_texture=True ) # Create a zip file with OBJ and MTL zip_path = os.path.join(output_dir, "model.zip") with zipfile.ZipFile(zip_path, 'w') as zipf: zipf.write(obj_path, arcname="model.obj") mtl_path = os.path.join(output_dir, "model.mtl") if os.path.exists(mtl_path): zipf.write(mtl_path, arcname="model.mtl") # Include texture file if it exists texture_path = os.path.join(output_dir, "model.png") if os.path.exists(texture_path): zipf.write(texture_path, arcname="model.png") processing_jobs[job_id]['result_url'] = f"/download/{job_id}" processing_jobs[job_id]['preview_url'] = f"/preview/{job_id}" elif output_format == 'glb': # Export as GLB with enhanced settings glb_path = os.path.join(output_dir, "model.glb") mesh.export( glb_path, file_type='glb' ) processing_jobs[job_id]['result_url'] = f"/download/{job_id}" processing_jobs[job_id]['preview_url'] = f"/preview/{job_id}" # Update job status processing_jobs[job_id]['status'] = 'completed' processing_jobs[job_id]['progress'] = 100 print(f"Job {job_id} completed successfully") except Exception as e: error_details = traceback.format_exc() processing_jobs[job_id]['status'] = 'error' processing_jobs[job_id]['error'] = f"Error exporting model: {str(e)}" print(f"Error exporting model for job {job_id}: {str(e)}") print(error_details) # Clean up temporary file if os.path.exists(filepath): os.remove(filepath) # Force garbage collection to free memory gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() except Exception as e: # Handle errors error_details = traceback.format_exc() processing_jobs[job_id]['status'] = 'error' processing_jobs[job_id]['error'] = f"{str(e)}\n{error_details}" print(f"Error processing job {job_id}: {str(e)}") print(error_details) # Clean up on error if os.path.exists(filepath): os.remove(filepath) # Start processing thread processing_thread = threading.Thread(target=process_image) processing_thread.daemon = True processing_thread.start() # Return job ID immediately return jsonify({"job_id": job_id}), 202 # 202 Accepted @app.route('/download/', methods=['GET']) def download_model(job_id): if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed': return jsonify({"error": "Model not found or processing not complete"}), 404 # Get the output directory for this job output_dir = os.path.join(RESULTS_FOLDER, job_id) # Determine file format from the job data output_format = processing_jobs[job_id].get('output_format', 'obj') if output_format == 'obj': zip_path = os.path.join(output_dir, "model.zip") if os.path.exists(zip_path): return send_file(zip_path, as_attachment=True, download_name="model.zip") else: # glb glb_path = os.path.join(output_dir, "model.glb") if os.path.exists(glb_path): return send_file(glb_path, as_attachment=True, download_name="model.glb") return jsonify({"error": "File not found"}), 404 @app.route('/preview/', methods=['GET']) def preview_model(job_id): if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed': return jsonify({"error": "Model not found or processing not complete"}), 404 # Get the output directory for this job output_dir = os.path.join(RESULTS_FOLDER, job_id) output_format = processing_jobs[job_id].get('output_format', 'obj') if output_format == 'obj': obj_path = os.path.join(output_dir, "model.obj") if os.path.exists(obj_path): return send_file(obj_path, mimetype='model/obj') else: # glb glb_path = os.path.join(output_dir, "model.glb") if os.path.exists(glb_path): return send_file(glb_path, mimetype='model/gltf-binary') return jsonify({"error": "Model file not found"}), 404 # Cleanup old jobs periodically def cleanup_old_jobs(): current_time = time.time() job_ids_to_remove = [] for job_id, job_data in processing_jobs.items(): # Remove completed jobs after 1 hour if job_data['status'] == 'completed' and (current_time - job_data.get('created_at', 0)) > 3600: job_ids_to_remove.append(job_id) # Remove error jobs after 30 minutes elif job_data['status'] == 'error' and (current_time - job_data.get('created_at', 0)) > 1800: job_ids_to_remove.append(job_id) # Remove the jobs for job_id in job_ids_to_remove: output_dir = os.path.join(RESULTS_FOLDER, job_id) try: import shutil if os.path.exists(output_dir): shutil.rmtree(output_dir) except Exception as e: print(f"Error cleaning up job {job_id}: {str(e)}") # Remove from tracking dictionary if job_id in processing_jobs: del processing_jobs[job_id] # Schedule the next cleanup threading.Timer(300, cleanup_old_jobs).start() # Run every 5 minutes # New endpoint to get detailed information about a model @app.route('/model-info/', methods=['GET']) def model_info(job_id): if job_id not in processing_jobs: return jsonify({"error": "Model not found"}), 404 job = processing_jobs[job_id] if job['status'] != 'completed': return jsonify({ "status": job['status'], "progress": job['progress'], "error": job.get('error') }), 200 # For completed jobs, include information about the model output_dir = os.path.join(RESULTS_FOLDER, job_id) model_stats = {} # Get file size if job['output_format'] == 'obj': obj_path = os.path.join(output_dir, "model.obj") zip_path = os.path.join(output_dir, "model.zip") if os.path.exists(obj_path): model_stats['obj_size'] = os.path.getsize(obj_path) if os.path.exists(zip_path): model_stats['package_size'] = os.path.getsize(zip_path) else: # glb glb_path = os.path.join(output_dir, "model.glb") if os.path.exists(glb_path): model_stats['model_size'] = os.path.getsize(glb_path) # Return detailed info return jsonify({ "status": job['status'], "model_format": job['output_format'], "download_url": job['result_url'], "preview_url": job['preview_url'], "model_stats": model_stats, "created_at": job.get('created_at'), "completed_at": job.get('completed_at') }), 200 @app.route('/', methods=['GET']) def index(): return jsonify({ "message": "Enhanced Image to 3D API (DPT-Large Model)", "endpoints": [ "/convert", "/progress/", "/download/", "/preview/", "/model-info/" ], "parameters": { "mesh_resolution": "Integer (50-200), controls mesh density", "output_format": "obj or glb", "detail_level": "low, medium, or high - controls the level of detail in the final model", "texture_quality": "low, medium, or high - controls the quality of textures" }, "description": "This API creates high-quality 3D models from 2D images with enhanced detail finishing similar to Hunyuan model" }), 200 # Example endpoint showing how to compare different detail levels @app.route('/detail-comparison', methods=['POST']) def compare_detail_levels(): # Check if image is in the request if 'image' not in request.files: return jsonify({"error": "No image provided"}), 400 file = request.files['image'] if file.filename == '': return jsonify({"error": "No image selected"}), 400 if not allowed_file(file.filename): return jsonify({"error": f"File type not allowed. Supported types: {', '.join(ALLOWED_EXTENSIONS)}"}), 400 # Create a job ID job_id = str(uuid.uuid4()) output_dir = os.path.join(RESULTS_FOLDER, job_id) os.makedirs(output_dir, exist_ok=True) # Save the uploaded file filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{job_id}_{filename}") file.save(filepath) # Initialize job tracking processing_jobs[job_id] = { 'status': 'processing', 'progress': 0, 'result_url': None, 'preview_url': None, 'error': None, 'output_format': 'glb', # Use GLB for comparison 'created_at': time.time(), 'comparison': True } # Process in separate thread to create 3 different detail levels def process_comparison(): thread = threading.current_thread() processing_jobs[job_id]['thread_alive'] = lambda: thread.is_alive() try: # Preprocess image image = preprocess_image(filepath) processing_jobs[job_id]['progress'] = 10 # Load model try: model = load_model() processing_jobs[job_id]['progress'] = 20 except Exception as e: processing_jobs[job_id]['status'] = 'error' processing_jobs[job_id]['error'] = f"Error loading model: {str(e)}" return # Process image to get depth map try: depth_map = model(image)["depth"] if isinstance(depth_map, torch.Tensor): depth_map = depth_map.cpu().numpy() elif hasattr(depth_map, 'numpy'): depth_map = depth_map.numpy() elif isinstance(depth_map, Image.Image): depth_map = np.array(depth_map) processing_jobs[job_id]['progress'] = 40 except Exception as e: processing_jobs[job_id]['status'] = 'error' processing_jobs[job_id]['error'] = f"Error estimating depth: {str(e)}" return # Create meshes at different detail levels result_urls = {} for detail_level in ['low', 'medium', 'high']: try: # Update progress if detail_level == 'low': processing_jobs[job_id]['progress'] = 50 elif detail_level == 'medium': processing_jobs[job_id]['progress'] = 70 else: processing_jobs[job_id]['progress'] = 90 # Create mesh with appropriate detail level mesh_resolution = 100 # Fixed resolution for fair comparison if detail_level == 'high': mesh_resolution = 150 elif detail_level == 'low': mesh_resolution = 80 mesh = depth_to_mesh(depth_map, image, resolution=mesh_resolution, detail_level=detail_level) # Export as GLB model_path = os.path.join(output_dir, f"model_{detail_level}.glb") mesh.export(model_path, file_type='glb') # Add to result URLs result_urls[detail_level] = f"/compare-download/{job_id}/{detail_level}" except Exception as e: print(f"Error processing {detail_level} detail level: {str(e)}") # Continue with other detail levels even if one fails # Update job status processing_jobs[job_id]['status'] = 'completed' processing_jobs[job_id]['progress'] = 100 processing_jobs[job_id]['result_urls'] = result_urls processing_jobs[job_id]['completed_at'] = time.time() # Clean up temporary file if os.path.exists(filepath): os.remove(filepath) # Force garbage collection gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() except Exception as e: # Handle errors processing_jobs[job_id]['status'] = 'error' processing_jobs[job_id]['error'] = f"Error during processing: {str(e)}" # Clean up on error if os.path.exists(filepath): os.remove(filepath) # Start processing thread processing_thread = threading.Thread(target=process_comparison) processing_thread.daemon = True processing_thread.start() # Return job ID immediately return jsonify({"job_id": job_id, "check_progress_at": f"/progress/{job_id}"}), 202 @app.route('/compare-download//', methods=['GET']) def download_comparison_model(job_id, detail_level): if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed': return jsonify({"error": "Model not found or processing not complete"}), 404 if 'comparison' not in processing_jobs[job_id] or not processing_jobs[job_id]['comparison']: return jsonify({"error": "This is not a comparison job"}), 400 if detail_level not in ['low', 'medium', 'high']: return jsonify({"error": "Invalid detail level"}), 400 # Get the output directory for this job output_dir = os.path.join(RESULTS_FOLDER, job_id) model_path = os.path.join(output_dir, f"model_{detail_level}.glb") if os.path.exists(model_path): return send_file(model_path, as_attachment=True, download_name=f"model_{detail_level}.glb") return jsonify({"error": "File not found"}), 404 if __name__ == '__main__': # Start the cleanup thread cleanup_old_jobs() # Use port 7860 which is standard for Hugging Face Spaces port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port)