|
import json |
|
import multiprocessing |
|
import subprocess |
|
import time |
|
from dataclasses import dataclass |
|
import os |
|
import tyro |
|
import concurrent.futures |
|
@dataclass |
|
class Args: |
|
workers_per_gpu: int |
|
"""number of workers per gpu""" |
|
num_gpus: int = 8 |
|
"""number of gpus to use. -1 means all available gpus""" |
|
input_dir: str |
|
save_dir: str |
|
engine: str = "BLENDER_EEVEE" |
|
|
|
|
|
def check_already_rendered(save_path): |
|
if not os.path.exists(os.path.join(save_path, '02419_semantic.png')): |
|
return False |
|
return True |
|
|
|
def process_file(file): |
|
if not check_already_rendered(file[1]): |
|
return file |
|
return None |
|
|
|
def worker(queue, count, gpu): |
|
while True: |
|
try: |
|
item = queue.get() |
|
if item is None: |
|
queue.task_done() |
|
break |
|
data_path, save_path, engine, log_name = item |
|
print(f"Processing: {data_path} on GPU {gpu}") |
|
start = time.time() |
|
if check_already_rendered(save_path): |
|
queue.task_done() |
|
print('========', item, 'rendered', '========') |
|
continue |
|
else: |
|
os.makedirs(save_path, exist_ok=True) |
|
command = (f"export DISPLAY=:0.{gpu} &&" |
|
f" CUDA_VISIBLE_DEVICES={gpu} " |
|
f" blender -b -P blender_lrm_script.py --" |
|
f" --object_path {data_path} --output_dir {save_path} --engine {engine}") |
|
|
|
try: |
|
subprocess.run(command, shell=True, timeout=3600, check=True) |
|
count.value += 1 |
|
end = time.time() |
|
with open(log_name, 'a') as f: |
|
f.write(f'{end - start}\n') |
|
except subprocess.CalledProcessError as e: |
|
print(f"Subprocess error processing {item}: {e}") |
|
except subprocess.TimeoutExpired as e: |
|
print(f"Timeout expired processing {item}: {e}") |
|
except Exception as e: |
|
print(f"Error processing {item}: {e}") |
|
finally: |
|
queue.task_done() |
|
|
|
except Exception as e: |
|
print(f"Error processing {item}: {e}") |
|
queue.task_done() |
|
|
|
|
|
if __name__ == "__main__": |
|
args = tyro.cli(Args) |
|
queue = multiprocessing.JoinableQueue() |
|
count = multiprocessing.Value("i", 0) |
|
log_name = f'time_log_{args.workers_per_gpu}_{args.num_gpus}_{args.engine}.txt' |
|
|
|
if args.num_gpus == -1: |
|
result = subprocess.run(['nvidia-smi', '--list-gpus'], stdout=subprocess.PIPE) |
|
output = result.stdout.decode('utf-8') |
|
args.num_gpus = output.count('GPU') |
|
|
|
files = [] |
|
|
|
for group in [ str(i) for i in range(10) ]: |
|
for folder in os.listdir(f'{args.input_dir}/{group}'): |
|
filename = f'{args.input_dir}/{group}/{folder}/{folder}.vrm' |
|
outputdir = f'{args.save_dir}/{group}/{folder}' |
|
files.append([filename, outputdir]) |
|
|
|
|
|
files = sorted(files, key=lambda x: x[0]) |
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
|
|
results = list(executor.map(process_file, files)) |
|
|
|
|
|
unprocess_files = [file for file in results if file is not None] |
|
|
|
|
|
print(f'Unprocessed files: {len(unprocess_files)}') |
|
|
|
|
|
for gpu_i in range(args.num_gpus): |
|
for worker_i in range(args.workers_per_gpu): |
|
worker_i = gpu_i * args.workers_per_gpu + worker_i |
|
process = multiprocessing.Process( |
|
target=worker, args=(queue, count, gpu_i) |
|
) |
|
process.daemon = True |
|
process.start() |
|
|
|
for file in unprocess_files: |
|
queue.put((file[0], file[1], args.engine, log_name)) |
|
|
|
|
|
for i in range(args.num_gpus * args.workers_per_gpu * 10): |
|
queue.put(None) |
|
|
|
queue.join() |
|
end = time.time() |
|
|