import time from multiprocessing import Process, Manager from tqdm import tqdm def worker_main(work_queue, result_queue, process_func, config): while True: item = work_queue.get() if item is None: result_queue.put(None) break try: results, cost = process_func(config, item) result_queue.put((results, cost)) except Exception as e: item_info = item.get('idx', item.get('id', 'unknown item')) print(f"Error processing item {item_info}: {e}") result_queue.put(None) finally: work_queue.task_done() def run_parallel_evaluation(dataset, process_func, config, num_workers, description): """ Runs parallel evaluation on the given dataset and returns the results. Args: dataset (list or datasets.Dataset): Data to evaluate. process_func (callable): Function to process each data item. config (dict): Configuration for the process_func. num_workers (int): Number of worker processes to use. description (str): Description to display on the tqdm progress bar. Returns: tuple: (list of evaluation results, total cost) """ manager = Manager() work_queue = manager.Queue() result_queue = manager.Queue() # Add data to the work queue dataset_list = list(dataset) if not isinstance(dataset, list) else dataset for data in dataset_list: work_queue.put(data) # Add termination signals for workers for _ in range(num_workers): work_queue.put(None) # Start parallel processing processes = [] for _ in range(num_workers): p = Process(target=worker_main, args=(work_queue, result_queue, process_func, config)) p.start() processes.append(p) # Show progress bar and collect results process_results = [] process_cost = 0 completed_workers = 0 with tqdm(total=len(dataset_list), desc=description) as pbar: while completed_workers < num_workers: result_item = result_queue.get() if result_item is None: completed_workers += 1 else: results, cost = result_item if results is not None: process_results.append(results) process_cost += cost if cost is not None else 0 pbar.update(1) # Wait for all processes to finish for p in processes: p.join() # Collect remaining results while not result_queue.empty(): result_item = result_queue.get_nowait() if result_item is not None: results, cost = result_item if results is not None: process_results.append(results) process_cost += cost if cost is not None else 0 return process_results, process_cost