import numpy as np from pathlib import Path import padertorch as pt import paderbox as pb import time import torch import torchaudio from onnxruntime import InferenceSession from pvq_manipulation.models.vits import Vits_NT from pvq_manipulation.models.ffjord import FFJORD from IPython.display import display, Audio, clear_output from pvq_manipulation.models.hubert import HubertExtractor, SID_LARGE_LAYER import librosa from pvq_manipulation.helper.vad import EnergyVAD import gradio as gr device = 'cpu' #'cuda' if torch.cuda.is_available() else 'cpu' # load tts model storage_dir_tts = Path("./models/tts_model/") tts_model = Vits_NT.load_model(storage_dir_tts, "model.pt") # load normalizing flow storage_dir_normalizing_flow = Path("./models/norm_flow") speaker_conditioning = pb.io.load(storage_dir_normalizing_flow / "speaker_conditioning.json") normalizing_flow = FFJORD.load_model(storage_dir_normalizing_flow, checkpoint="model.pt", device=device) # load hubert features model hubert_model = HubertExtractor( layer=SID_LARGE_LAYER, model_name="HUBERT_LARGE", backend="torchaudio", device=device, # storage_dir= # target storage dir hubert model ) # example synthesis # speaker_id = 1034 # example_id = "1034_121119_000028_000001" # wav_1 = tts_model.synthesize_from_example({ # 'text' : "It took me quite a long time to develop a voice, and now that I have it I'm not going to be silent.", # 'd_vector_storage_root': f"./Saved_models/Dataset/Embeddings/{speaker_id}/{example_id}.pth" # }) # display(Audio(wav_1, rate=24_000, normalize=True)) # manipulation block def get_manipulation( d_vector, labels, flow, tts_model, manipulation_idx=0, manipulation_fkt=1, ): labels_manipulated = labels.clone() labels_manipulated[:,manipulation_idx] += manipulation_fkt output_forward = flow.forward((d_vector.float(), labels))[0] sampled_class_manipulated = flow.sample((output_forward, labels_manipulated))[0] wav = tts_model.synthesize_from_example({ 'text': "It took me quite a long time to develop a voice, and now that I have it I'm not going to be silent.", 'd_vector': d_vector.detach().numpy(), 'd_vector_man': sampled_class_manipulated.detach().numpy(), }) return wav def extract_speaker_embedding(example): observation, sr = pb.io.load_audio(example['audio_path']['observation'], return_sample_rate=True) observation = librosa.resample(observation, orig_sr=sr, target_sr=16_000) vad = EnergyVAD(sample_rate=16_000) if observation.ndim == 1: observation = observation[None, :] observation = vad({'audio_data': observation})['audio_data'] with torch.no_grad(): example = tts_model.speaker_manager.prepare_example({'audio_data': {'observation': observation}, **example}) example = pt.data.utils.collate_fn([example]) example['features'] = torch.tensor(np.array(example['features'])) d_vector = tts_model.speaker_manager.forward(example)[0] return d_vector # load speaker labels def load_speaker_labels(example, speaker_conditioning, reg_stor_dir=Path('./models/pvq_extractor/')): audio, _ = torchaudio.load(example['audio_path']['observation']) audio = audio.to(device) num_samples = torch.tensor([audio.shape[-1]], device=device) providers = ["CPUExecutionProvider"] with torch.no_grad(): features, seq_len = hubert_model( audio, 24_000, sequence_lengths=num_samples, ) features = np.mean(features.squeeze(0).detach().cpu().numpy(), axis=-1) pvqd_predictions = {} for pvq in ['Breathiness', 'Loudness', 'Pitch', 'Resonance', 'Roughness', 'Strain', 'Weight']: with open(reg_stor_dir / f"{pvq}.onnx", "rb") as fid: onnx = fid.read() sess = InferenceSession(onnx, providers=providers) pred = sess.run(None, {"X": features[None]})[0].squeeze(1) pvqd_predictions[pvq] = pred.tolist()[0] labels = [] for key in speaker_conditioning: labels.append(pvqd_predictions[key]/100) return torch.tensor(labels) example = { 'audio_path': {'observation': "audio/1034_121119_000028_000001.wav"}, 'speaker_id': 1034, 'example_id': "1034_121119_000028_000001", } labels = load_speaker_labels(example, speaker_conditioning) label_options = ['Weight', 'Resonance', 'Breathiness', 'Roughness', 'Loudness', 'Strain', 'Pitch'] # print('Estimated PVQ strengths of input speaker:') # max_len = max(len(name) for name in label_options) # for label_name, pvq in zip(label_options, labels): # print(f'{label_name:<{max_len}} : {pvq:6.2f}') def update_manipulation(manipulation_idx, manipulation_fkt): d_vector = extract_speaker_embedding(example) labels = load_speaker_labels(example, speaker_conditioning) wav_manipulated = get_manipulation( # example=example, d_vector=d_vector, labels=labels[None, :], flow=normalizing_flow, tts_model=tts_model, manipulation_idx=manipulation_idx, manipulation_fkt=manipulation_fkt, ) wav_unmanipulated = tts_model.synthesize_from_example({ 'text': "It took me quite a long time to develop a voice, and now that I have it I'm not going to be silent.", 'd_vector': d_vector.detach().numpy(), }) sr = 24_000 return (sr, wav_unmanipulated), (sr, wav_manipulated) # with audio_output: # clear_output(wait=True) # print('Manipulated Speaker') # display(Audio(wav_manipulated, rate=24_000, normalize=True)) # print('Unmanipulated Synthese') # display(Audio(wav_unmanipulated, rate=24_000, normalize=True)) # print('Original Speaker') # display(Audio(example['audio_path']['observation'], rate=24_000, normalize=True)) # print(f"Manipulated {label_options[manipulation_idx]} with strength {manipulation_fkt}") dropdown_options = [(label, i) for i, label in enumerate(label_options)] demo = gr.Interface( title="Perceptual Voice Quality (PVQ) Manipulation", fn=update_manipulation, inputs=[ gr.Dropdown(label="PVQ Feature", choices=dropdown_options, value=2, type="index"), gr.Slider(label="Manipulation Factor", minimum=-2.0, maximum=2.0, value=1.0, step=0.1), ], outputs=[gr.Audio(label="original utterance"), gr.Audio(label="manipulated utterance")], ) if __name__ == "__main__": demo.launch(share=True)