Spaces:
Sleeping
Sleeping
""" | |
item: one piece of data | |
item_name: data id | |
wav_fn: wave file path | |
spk: dataset name | |
ph_seq: phoneme sequence | |
ph_dur: phoneme durations | |
""" | |
import csv | |
import os | |
import pathlib | |
import random | |
from copy import deepcopy | |
import librosa | |
import numpy as np | |
import torch | |
from basics.base_binarizer import BaseBinarizer | |
from basics.base_pe import BasePE | |
from modules.fastspeech.tts_modules import LengthRegulator | |
from modules.pe import initialize_pe | |
from utils.binarizer_utils import ( | |
SinusoidalSmoothingConv1d, | |
get_mel_torch, | |
get_mel2ph_torch, | |
get_energy_librosa, | |
get_breathiness, | |
get_voicing, | |
get_tension_base_harmonic, | |
) | |
from utils.decomposed_waveform import DecomposedWaveform | |
from utils.hparams import hparams | |
os.environ["OMP_NUM_THREADS"] = "1" | |
ACOUSTIC_ITEM_ATTRIBUTES = [ | |
'spk_id', | |
'mel', | |
'tokens', | |
'mel2ph', | |
'f0', | |
'energy', | |
'breathiness', | |
'voicing', | |
'tension', | |
'key_shift', | |
'speed', | |
] | |
pitch_extractor: BasePE = None | |
energy_smooth: SinusoidalSmoothingConv1d = None | |
breathiness_smooth: SinusoidalSmoothingConv1d = None | |
voicing_smooth: SinusoidalSmoothingConv1d = None | |
tension_smooth: SinusoidalSmoothingConv1d = None | |
class AcousticBinarizer(BaseBinarizer): | |
def __init__(self): | |
super().__init__(data_attrs=ACOUSTIC_ITEM_ATTRIBUTES) | |
self.lr = LengthRegulator() | |
self.need_energy = hparams['use_energy_embed'] | |
self.need_breathiness = hparams['use_breathiness_embed'] | |
self.need_voicing = hparams['use_voicing_embed'] | |
self.need_tension = hparams['use_tension_embed'] | |
assert hparams['mel_base'] == 'e', ( | |
"Mel base must be set to \'e\' according to 2nd stage of the migration plan. " | |
"See https://github.com/openvpi/DiffSinger/releases/tag/v2.3.0 for more details." | |
) | |
def load_meta_data(self, raw_data_dir: pathlib.Path, ds_id, spk_id): | |
meta_data_dict = {} | |
with open(raw_data_dir / 'transcriptions.csv', 'r', encoding='utf-8') as f: | |
for utterance_label in csv.DictReader(f): | |
item_name = utterance_label['name'] | |
temp_dict = { | |
'wav_fn': str(raw_data_dir / 'wavs' / f'{item_name}.wav'), | |
'ph_seq': utterance_label['ph_seq'].split(), | |
'ph_dur': [float(x) for x in utterance_label['ph_dur'].split()], | |
'spk_id': spk_id, | |
'spk_name': self.speakers[ds_id], | |
} | |
assert len(temp_dict['ph_seq']) == len(temp_dict['ph_dur']), \ | |
f'Lengths of ph_seq and ph_dur mismatch in \'{item_name}\'.' | |
assert all(ph_dur >= 0 for ph_dur in temp_dict['ph_dur']), \ | |
f'Negative ph_dur found in \'{item_name}\'.' | |
meta_data_dict[f'{ds_id}:{item_name}'] = temp_dict | |
self.items.update(meta_data_dict) | |
def process_item(self, item_name, meta_data, binarization_args): | |
waveform, _ = librosa.load(meta_data['wav_fn'], sr=hparams['audio_sample_rate'], mono=True) | |
mel = get_mel_torch( | |
waveform, hparams['audio_sample_rate'], num_mel_bins=hparams['audio_num_mel_bins'], | |
hop_size=hparams['hop_size'], win_size=hparams['win_size'], fft_size=hparams['fft_size'], | |
fmin=hparams['fmin'], fmax=hparams['fmax'], | |
device=self.device | |
) | |
length = mel.shape[0] | |
seconds = length * hparams['hop_size'] / hparams['audio_sample_rate'] | |
processed_input = { | |
'name': item_name, | |
'wav_fn': meta_data['wav_fn'], | |
'spk_id': meta_data['spk_id'], | |
'spk_name': meta_data['spk_name'], | |
'seconds': seconds, | |
'length': length, | |
'mel': mel, | |
'tokens': np.array(self.phone_encoder.encode(meta_data['ph_seq']), dtype=np.int64), | |
'ph_dur': np.array(meta_data['ph_dur']).astype(np.float32), | |
} | |
# get ground truth dur | |
processed_input['mel2ph'] = get_mel2ph_torch( | |
self.lr, torch.from_numpy(processed_input['ph_dur']), length, self.timestep, device=self.device | |
).cpu().numpy() | |
# get ground truth f0 | |
global pitch_extractor | |
if pitch_extractor is None: | |
pitch_extractor = initialize_pe() | |
gt_f0, uv = pitch_extractor.get_pitch( | |
waveform, samplerate=hparams['audio_sample_rate'], length=length, | |
hop_size=hparams['hop_size'], f0_min=hparams['f0_min'], f0_max=hparams['f0_max'], | |
interp_uv=True | |
) | |
if uv.all(): # All unvoiced | |
print(f'Skipped \'{item_name}\': empty gt f0') | |
return None | |
processed_input['f0'] = gt_f0.astype(np.float32) | |
if self.need_energy: | |
# get ground truth energy | |
energy = get_energy_librosa( | |
waveform, length, hop_size=hparams['hop_size'], win_size=hparams['win_size'] | |
).astype(np.float32) | |
global energy_smooth | |
if energy_smooth is None: | |
energy_smooth = SinusoidalSmoothingConv1d( | |
round(hparams['energy_smooth_width'] / self.timestep) | |
).eval().to(self.device) | |
energy = energy_smooth(torch.from_numpy(energy).to(self.device)[None])[0] | |
processed_input['energy'] = energy.cpu().numpy() | |
# create a DecomposedWaveform object for further feature extraction | |
dec_waveform = DecomposedWaveform( | |
waveform, samplerate=hparams['audio_sample_rate'], f0=gt_f0 * ~uv, | |
hop_size=hparams['hop_size'], fft_size=hparams['fft_size'], win_size=hparams['win_size'], | |
algorithm=hparams['hnsep'] | |
) | |
if self.need_breathiness: | |
# get ground truth breathiness | |
breathiness = get_breathiness( | |
dec_waveform, None, None, length=length | |
) | |
global breathiness_smooth | |
if breathiness_smooth is None: | |
breathiness_smooth = SinusoidalSmoothingConv1d( | |
round(hparams['breathiness_smooth_width'] / self.timestep) | |
).eval().to(self.device) | |
breathiness = breathiness_smooth(torch.from_numpy(breathiness).to(self.device)[None])[0] | |
processed_input['breathiness'] = breathiness.cpu().numpy() | |
if self.need_voicing: | |
# get ground truth voicing | |
voicing = get_voicing( | |
dec_waveform, None, None, length=length | |
) | |
global voicing_smooth | |
if voicing_smooth is None: | |
voicing_smooth = SinusoidalSmoothingConv1d( | |
round(hparams['voicing_smooth_width'] / self.timestep) | |
).eval().to(self.device) | |
voicing = voicing_smooth(torch.from_numpy(voicing).to(self.device)[None])[0] | |
processed_input['voicing'] = voicing.cpu().numpy() | |
if self.need_tension: | |
# get ground truth tension | |
tension = get_tension_base_harmonic( | |
dec_waveform, None, None, length=length, domain='logit' | |
) | |
global tension_smooth | |
if tension_smooth is None: | |
tension_smooth = SinusoidalSmoothingConv1d( | |
round(hparams['tension_smooth_width'] / self.timestep) | |
).eval().to(self.device) | |
tension = tension_smooth(torch.from_numpy(tension).to(self.device)[None])[0] | |
if tension.isnan().any(): | |
print('Error:', item_name) | |
print(tension) | |
return None | |
processed_input['tension'] = tension.cpu().numpy() | |
if hparams['use_key_shift_embed']: | |
processed_input['key_shift'] = 0. | |
if hparams['use_speed_embed']: | |
processed_input['speed'] = 1. | |
return processed_input | |
def arrange_data_augmentation(self, data_iterator): | |
aug_map = {} | |
aug_list = [] | |
all_item_names = [item_name for item_name, _ in data_iterator] | |
total_scale = 0 | |
aug_pe = initialize_pe() | |
if self.augmentation_args['random_pitch_shifting']['enabled']: | |
from augmentation.spec_stretch import SpectrogramStretchAugmentation | |
aug_args = self.augmentation_args['random_pitch_shifting'] | |
key_shift_min, key_shift_max = aug_args['range'] | |
assert hparams['use_key_shift_embed'], \ | |
'Random pitch shifting augmentation requires use_key_shift_embed == True.' | |
assert key_shift_min < 0 < key_shift_max, \ | |
'Random pitch shifting augmentation must have a range where min < 0 < max.' | |
aug_ins = SpectrogramStretchAugmentation(self.raw_data_dirs, aug_args, pe=aug_pe) | |
scale = aug_args['scale'] | |
aug_item_names = random.choices(all_item_names, k=int(scale * len(all_item_names))) | |
for aug_item_name in aug_item_names: | |
rand = random.uniform(-1, 1) | |
if rand < 0: | |
key_shift = key_shift_min * abs(rand) | |
else: | |
key_shift = key_shift_max * rand | |
aug_task = { | |
'name': aug_item_name, | |
'func': aug_ins.process_item, | |
'kwargs': {'key_shift': key_shift} | |
} | |
if aug_item_name in aug_map: | |
aug_map[aug_item_name].append(aug_task) | |
else: | |
aug_map[aug_item_name] = [aug_task] | |
aug_list.append(aug_task) | |
total_scale += scale | |
if self.augmentation_args['fixed_pitch_shifting']['enabled']: | |
from augmentation.spec_stretch import SpectrogramStretchAugmentation | |
aug_args = self.augmentation_args['fixed_pitch_shifting'] | |
targets = aug_args['targets'] | |
scale = aug_args['scale'] | |
spk_id_size = max(self.spk_ids) + 1 | |
min_num_spk = (1 + len(targets)) * spk_id_size | |
assert not self.augmentation_args['random_pitch_shifting']['enabled'], \ | |
'Fixed pitch shifting augmentation is not compatible with random pitch shifting.' | |
assert len(targets) == len(set(targets)), \ | |
'Fixed pitch shifting augmentation requires having no duplicate targets.' | |
assert hparams['use_spk_id'], 'Fixed pitch shifting augmentation requires use_spk_id == True.' | |
assert hparams['num_spk'] >= min_num_spk, \ | |
f'Fixed pitch shifting augmentation requires num_spk >= (1 + len(targets)) * (max(spk_ids) + 1).' | |
assert scale < 1, 'Fixed pitch shifting augmentation requires scale < 1.' | |
aug_ins = SpectrogramStretchAugmentation(self.raw_data_dirs, aug_args, pe=aug_pe) | |
for i, target in enumerate(targets): | |
aug_item_names = random.choices(all_item_names, k=int(scale * len(all_item_names))) | |
for aug_item_name in aug_item_names: | |
replace_spk_id = self.spk_ids[int(aug_item_name.split(':', maxsplit=1)[0])] + (i + 1) * spk_id_size | |
aug_task = { | |
'name': aug_item_name, | |
'func': aug_ins.process_item, | |
'kwargs': {'key_shift': target, 'replace_spk_id': replace_spk_id} | |
} | |
if aug_item_name in aug_map: | |
aug_map[aug_item_name].append(aug_task) | |
else: | |
aug_map[aug_item_name] = [aug_task] | |
aug_list.append(aug_task) | |
total_scale += scale * len(targets) | |
if self.augmentation_args['random_time_stretching']['enabled']: | |
from augmentation.spec_stretch import SpectrogramStretchAugmentation | |
aug_args = self.augmentation_args['random_time_stretching'] | |
speed_min, speed_max = aug_args['range'] | |
assert hparams['use_speed_embed'], \ | |
'Random time stretching augmentation requires use_speed_embed == True.' | |
assert 0 < speed_min < 1 < speed_max, \ | |
'Random time stretching augmentation must have a range where 0 < min < 1 < max.' | |
aug_ins = SpectrogramStretchAugmentation(self.raw_data_dirs, aug_args, pe=aug_pe) | |
scale = aug_args['scale'] | |
k_from_raw = int(scale / (1 + total_scale) * len(all_item_names)) | |
k_from_aug = int(total_scale * scale / (1 + total_scale) * len(all_item_names)) | |
k_mutate = int(total_scale * scale / (1 + scale) * len(all_item_names)) | |
aug_types = [0] * k_from_raw + [1] * k_from_aug + [2] * k_mutate | |
aug_items = random.choices(all_item_names, k=k_from_raw) + random.choices(aug_list, k=k_from_aug + k_mutate) | |
for aug_type, aug_item in zip(aug_types, aug_items): | |
# Uniform distribution in log domain | |
speed = speed_min * (speed_max / speed_min) ** random.random() | |
if aug_type == 0: | |
aug_task = { | |
'name': aug_item, | |
'func': aug_ins.process_item, | |
'kwargs': {'speed': speed} | |
} | |
if aug_item in aug_map: | |
aug_map[aug_item].append(aug_task) | |
else: | |
aug_map[aug_item] = [aug_task] | |
aug_list.append(aug_task) | |
elif aug_type == 1: | |
aug_task = { | |
'name': aug_item, | |
'func': aug_item['func'], | |
'kwargs': deepcopy(aug_item['kwargs']) | |
} | |
aug_task['kwargs']['speed'] = speed | |
if aug_item['name'] in aug_map: | |
aug_map[aug_item['name']].append(aug_task) | |
else: | |
aug_map[aug_item['name']] = [aug_task] | |
aug_list.append(aug_task) | |
elif aug_type == 2: | |
aug_item['kwargs']['speed'] = speed | |
total_scale += scale | |
return aug_map | |