File size: 3,669 Bytes
e269a6f 0514416 e269a6f 7dc402c e269a6f 7dc402c e269a6f 7dc402c 0514416 7dc402c e269a6f 7dc402c e269a6f 765b3de e269a6f 0514416 765b3de e269a6f 0514416 765b3de e269a6f 0514416 e269a6f 765b3de e269a6f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
import cProfile
import pstats
import time
from pathlib import Path
from typing import Tuple
import h5py
import numpy as np
def append_suffix_to_file(file_path: Path, suffix: str = '_INF', ext: str = None):
"""
Adds a suffix to the given file path.
:param file_path: `Path` object to the original file.
:param suffix: `str` suffix to add to the end of the original file name.
:param ext: `str` potential new file extension.
:return: Updated `Path`.
"""
if ext:
file_path = file_path.with_suffix(ext)
new_file_name = file_path.stem + suffix + file_path.suffix
return file_path.with_name(new_file_name)
def array4d_to_h5(array_4ds: Tuple, output_file: Path, group: str = None, datasets: Tuple = 'array_data'):
if len(array_4ds) != len(datasets):
raise ValueError(f'Amount of arrays {len(array_4ds)} must match amount of dataset names {len(datasets)}.')
with h5py.File(output_file, 'a') as h5f:
if group is not None:
grp = h5f.create_group(group)
for i in range(len(array_4ds)):
grp.create_dataset(name=datasets[i], data=array_4ds[i], compression='gzip', compression_opts=9)
else:
for i in range(len(array_4ds)):
h5f.create_dataset(name=datasets[i], data=array_4ds[i], compression='gzip', compression_opts=9)
def h5_to_array4d(input_file: Path) -> np.array:
with h5py.File(input_file, 'r') as h5f:
return np.vstack([np.array(h5f[key]) for key in h5f.keys()])
def combined_test_h5_to_array4d(input_file: Path, pc_size: int = 1024, merged: bool = True) -> np.array:
with h5py.File(input_file, 'r') as h5f:
data = []
for grp_name in list(h5f.keys()):
grp = h5f[grp_name]
labeled = np.array(grp['labeled'])
unlabeled = np.array(grp['unlabeled'])
data.append(merge_labeled_and_unlabeled_data(labeled, unlabeled, pc_size=pc_size))
return np.vstack(data)
def merge_labeled_and_unlabeled_data(labeled: np.array, unlabeled: np.array, pc_size: int,
augment: str = None) -> np.array:
missing = pc_size - (labeled.shape[2] + unlabeled.shape[2])
if missing <= 0:
# Returns shape (n_frames, 15, self.pc_size).
return np.concatenate((unlabeled, labeled), axis=2)[:, :, -pc_size:]
# This is similar to the way that TrainDataset.fill_point_cloud() fills values.
if augment is None:
missing_markers = np.ones((labeled.shape[0], labeled.shape[1], missing))
elif augment == 'normal':
missing_markers = np.random.rand(labeled.shape[0], labeled.shape[1], missing)
else:
missing_markers = np.zeros((labeled.shape[0], labeled.shape[1], missing))
missing_markers[:, 0] = 0.
missing_markers[:, 1] = 0.
# Returns shape (n_frames, 15, self.pc_size).
return np.concatenate((missing_markers,
unlabeled,
labeled), axis=2)
class Timer:
def __init__(self, txt: str = 'Execution time: ', profiler: bool = False):
self.txt = txt
self.profiler = profiler
def __enter__(self):
self.start_time = time.time()
if self.profiler:
self.p = cProfile.Profile()
self.p.enable()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.end_time = time.time()
dif = self.end_time - self.start_time
print(f"{self.txt}: {dif:.4f} seconds")
if self.profiler:
self.p.disable()
stats = pstats.Stats(self.p).sort_stats('time')
stats.print_stats()
|