move codestyle to black

master
Malar Kannan 2019-07-03 18:10:16 +05:30
parent 108ce2b079
commit 9413fb73b9
12 changed files with 1719 additions and 27 deletions

103
audio_processing.py Normal file
View File

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
import torch
import numpy as np
from scipy.signal import get_window
import librosa.util as librosa_util
def window_sumsquare(
window,
n_frames,
hop_length=200,
win_length=800,
n_fft=800,
dtype=np.float32,
norm=None,
):
"""
# from librosa 0.6
Compute the sum-square envelope of a window function at a given hop length.
This is used to estimate modulation effects induced by windowing
observations in short-time fourier transforms.
Parameters
----------
window : string, tuple, number, callable, or list-like
Window specification, as in `get_window`
n_frames : int > 0
The number of analysis frames
hop_length : int > 0
The number of samples to advance between frames
win_length : [optional]
The length of the window function. By default, this matches `n_fft`.
n_fft : int > 0
The length of each analysis frame.
dtype : np.dtype
The data type of the output
Returns
-------
wss : np.ndarray, shape=`(n_fft + hop_length * (n_frames - 1))`
The sum-squared envelope of the window function
"""
if win_length is None:
win_length = n_fft
n = n_fft + hop_length * (n_frames - 1)
x = np.zeros(n, dtype=dtype)
# Compute the squared window at the desired length
win_sq = get_window(window, win_length, fftbins=True)
win_sq = librosa_util.normalize(win_sq, norm=norm) ** 2
win_sq = librosa_util.pad_center(win_sq, n_fft)
# Fill the envelope
for i in range(n_frames):
sample = i * hop_length
x[sample : min(n, sample + n_fft)] += win_sq[
: max(0, min(n_fft, n - sample))
]
return x
def griffin_lim(magnitudes, stft_fn, n_iters=30):
"""
PARAMS
------
magnitudes: spectrogram magnitudes
stft_fn: STFT class with transform (STFT) and inverse (ISTFT) methods
"""
angles = np.angle(np.exp(2j * np.pi * np.random.rand(*magnitudes.size())))
angles = angles.astype(np.float32)
angles = torch.autograd.Variable(torch.from_numpy(angles))
signal = stft_fn.inverse(magnitudes, angles).squeeze(1)
for i in range(n_iters):
_, angles = stft_fn.transform(signal)
signal = stft_fn.inverse(magnitudes, angles).squeeze(1)
return signal
def dynamic_range_compression(x, C=1, clip_val=1e-5):
"""
PARAMS
------
C: compression factor
"""
return torch.log(torch.clamp(x, min=clip_val) * C)
def dynamic_range_decompression(x, C=1):
"""
PARAMS
------
C: compression factor used to compress
"""
return torch.exp(x) / C

134
data_utils.py Normal file
View File

@ -0,0 +1,134 @@
# -*- coding: utf-8 -*-
import random
import numpy as np
import torch
import torch.utils.data
from . import layers
from .utils import load_wav_to_torch, load_filepaths_and_text
from .text import text_to_sequence
class TextMelLoader(torch.utils.data.Dataset):
"""
1) loads audio,text pairs
2) normalizes text and converts them to sequences of one-hot vectors
3) computes mel-spectrograms from audio files.
"""
def __init__(self, audiopaths_and_text, hparams):
self.audiopaths_and_text = load_filepaths_and_text(audiopaths_and_text)
self.text_cleaners = hparams.text_cleaners
self.max_wav_value = hparams.max_wav_value
self.sampling_rate = hparams.sampling_rate
self.load_mel_from_disk = hparams.load_mel_from_disk
self.stft = layers.TacotronSTFT(
hparams.filter_length,
hparams.hop_length,
hparams.win_length,
hparams.n_mel_channels,
hparams.sampling_rate,
hparams.mel_fmin,
hparams.mel_fmax,
)
random.seed(1234)
random.shuffle(self.audiopaths_and_text)
def get_mel_text_pair(self, audiopath_and_text):
# separate filename and text
audiopath, text = audiopath_and_text[0], audiopath_and_text[1]
text = self.get_text(text)
mel = self.get_mel(audiopath)
return (text, mel)
def get_mel(self, filename):
if not self.load_mel_from_disk:
audio, sampling_rate = load_wav_to_torch(filename)
if sampling_rate != self.stft.sampling_rate:
raise ValueError(
"{} {} SR doesn't match target {} SR".format(
sampling_rate, self.stft.sampling_rate
)
)
audio_norm = audio / self.max_wav_value
audio_norm = audio_norm.unsqueeze(0)
audio_norm = torch.autograd.Variable(
audio_norm, requires_grad=False
)
melspec = self.stft.mel_spectrogram(audio_norm)
melspec = torch.squeeze(melspec, 0)
else:
melspec = torch.from_numpy(np.load(filename))
assert (
melspec.size(0) == self.stft.n_mel_channels
), "Mel dimension mismatch: given {}, expected {}".format(
melspec.size(0), self.stft.n_mel_channels
)
return melspec
def get_text(self, text):
text_norm = torch.IntTensor(text_to_sequence(text, self.text_cleaners))
return text_norm
def __getitem__(self, index):
return self.get_mel_text_pair(self.audiopaths_and_text[index])
def __len__(self):
return len(self.audiopaths_and_text)
class TextMelCollate:
""" Zero-pads model inputs and targets based on number of frames per setep
"""
def __init__(self, n_frames_per_step):
self.n_frames_per_step = n_frames_per_step
def __call__(self, batch):
"""Collate's training batch from normalized text and mel-spectrogram
PARAMS
------
batch: [text_normalized, mel_normalized]
"""
# Right zero-pad all one-hot text sequences to max input length
input_lengths, ids_sorted_decreasing = torch.sort(
torch.LongTensor([len(x[0]) for x in batch]),
dim=0,
descending=True,
)
max_input_len = input_lengths[0]
text_padded = torch.LongTensor(len(batch), max_input_len)
text_padded.zero_()
for i in range(len(ids_sorted_decreasing)):
text = batch[ids_sorted_decreasing[i]][0]
text_padded[i, : text.size(0)] = text
# Right zero-pad mel-spec
num_mels = batch[0][1].size(0)
max_target_len = max([x[1].size(1) for x in batch])
if max_target_len % self.n_frames_per_step != 0:
rest = max_target_len % self.n_frames_per_step
max_target_len += self.n_frames_per_step - rest
assert max_target_len % self.n_frames_per_step == 0
# include mel padded and gate padded
mel_padded = torch.FloatTensor(len(batch), num_mels, max_target_len)
mel_padded.zero_()
gate_padded = torch.FloatTensor(len(batch), max_target_len)
gate_padded.zero_()
output_lengths = torch.LongTensor(len(batch))
for i in range(len(ids_sorted_decreasing)):
mel = batch[ids_sorted_decreasing[i]][1]
mel_padded[i, :, : mel.size(1)] = mel
gate_padded[i, mel.size(1) - 1 :] = 1
output_lengths[i] = mel.size(1)
return (
text_padded,
input_lengths,
mel_padded,
gate_padded,
output_lengths,
)

View File

@ -7,7 +7,7 @@ from .tts import player_gen
def tts_player(): def tts_player():
player = player_gen() player = player_gen()
channel = grpc.insecure_channel('localhost:50060') channel = grpc.insecure_channel("localhost:50060")
stub = tts_pb2_grpc.ServerStub(channel) stub = tts_pb2_grpc.ServerStub(channel)
def play(t): def play(t):
@ -20,10 +20,11 @@ def tts_player():
def main(): def main():
play = tts_player() play = tts_player()
play('How may I help you today?') play("How may I help you today?")
import pdb import pdb
pdb.set_trace() pdb.set_trace()
if __name__ == '__main__': if __name__ == "__main__":
main() main()

349
glow.py Normal file
View File

@ -0,0 +1,349 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the NVIDIA CORPORATION nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# *****************************************************************************
import torch
from torch.autograd import Variable
import torch.nn.functional as F
@torch.jit.script
def fused_add_tanh_sigmoid_multiply(input_a, input_b, n_channels):
n_channels_int = n_channels[0]
in_act = input_a + input_b
t_act = torch.nn.functional.tanh(in_act[:, :n_channels_int, :])
s_act = torch.nn.functional.sigmoid(in_act[:, n_channels_int:, :])
acts = t_act * s_act
return acts
class WaveGlowLoss(torch.nn.Module):
def __init__(self, sigma=1.0):
super(WaveGlowLoss, self).__init__()
self.sigma = sigma
def forward(self, model_output):
z, log_s_list, log_det_W_list = model_output
for i, log_s in enumerate(log_s_list):
if i == 0:
log_s_total = torch.sum(log_s)
log_det_W_total = log_det_W_list[i]
else:
log_s_total = log_s_total + torch.sum(log_s)
log_det_W_total += log_det_W_list[i]
loss = (
torch.sum(z * z) / (2 * self.sigma * self.sigma)
- log_s_total
- log_det_W_total
)
return loss / (z.size(0) * z.size(1) * z.size(2))
class Invertible1x1Conv(torch.nn.Module):
"""
The layer outputs both the convolution, and the log determinant
of its weight matrix. If reverse=True it does convolution with
inverse
"""
def __init__(self, c):
super(Invertible1x1Conv, self).__init__()
self.conv = torch.nn.Conv1d(
c, c, kernel_size=1, stride=1, padding=0, bias=False
)
# Sample a random orthonormal matrix to initialize weights
W = torch.qr(torch.FloatTensor(c, c).normal_())[0]
# Ensure determinant is 1.0 not -1.0
if torch.det(W) < 0:
W[:, 0] = -1 * W[:, 0]
W = W.view(c, c, 1)
self.conv.weight.data = W
def forward(self, z, reverse=False):
# shape
batch_size, group_size, n_of_groups = z.size()
W = self.conv.weight.squeeze()
if reverse:
if not hasattr(self, "W_inverse"):
# Reverse computation
W_inverse = W.inverse()
W_inverse = Variable(W_inverse[..., None])
if z.type() == "torch.cuda.HalfTensor":
W_inverse = W_inverse.half()
self.W_inverse = W_inverse
z = F.conv1d(z, self.W_inverse, bias=None, stride=1, padding=0)
return z
else:
# Forward computation
log_det_W = batch_size * n_of_groups * torch.logdet(W)
z = self.conv(z)
return z, log_det_W
class WN(torch.nn.Module):
"""
This is the WaveNet like layer for the affine coupling. The primary
difference from WaveNet is the convolutions need not be causal. There is
also no dilation size reset. The dilation only doubles on each layer
"""
def __init__(
self, n_in_channels, n_mel_channels, n_layers, n_channels, kernel_size
):
super(WN, self).__init__()
assert kernel_size % 2 == 1
assert n_channels % 2 == 0
self.n_layers = n_layers
self.n_channels = n_channels
self.in_layers = torch.nn.ModuleList()
self.res_skip_layers = torch.nn.ModuleList()
self.cond_layers = torch.nn.ModuleList()
start = torch.nn.Conv1d(n_in_channels, n_channels, 1)
start = torch.nn.utils.weight_norm(start, name="weight")
self.start = start
# Initializing last layer to 0 makes the affine coupling layers
# do nothing at first. This helps with training stability
end = torch.nn.Conv1d(n_channels, 2 * n_in_channels, 1)
end.weight.data.zero_()
end.bias.data.zero_()
self.end = end
for i in range(n_layers):
dilation = 2 ** i
padding = int((kernel_size * dilation - dilation) / 2)
in_layer = torch.nn.Conv1d(
n_channels,
2 * n_channels,
kernel_size,
dilation=dilation,
padding=padding,
)
in_layer = torch.nn.utils.weight_norm(in_layer, name="weight")
self.in_layers.append(in_layer)
cond_layer = torch.nn.Conv1d(n_mel_channels, 2 * n_channels, 1)
cond_layer = torch.nn.utils.weight_norm(cond_layer, name="weight")
self.cond_layers.append(cond_layer)
# last one is not necessary
if i < n_layers - 1:
res_skip_channels = 2 * n_channels
else:
res_skip_channels = n_channels
res_skip_layer = torch.nn.Conv1d(n_channels, res_skip_channels, 1)
res_skip_layer = torch.nn.utils.weight_norm(
res_skip_layer, name="weight"
)
self.res_skip_layers.append(res_skip_layer)
def forward(self, forward_input):
audio, spect = forward_input
audio = self.start(audio)
for i in range(self.n_layers):
acts = fused_add_tanh_sigmoid_multiply(
self.in_layers[i](audio),
self.cond_layers[i](spect),
torch.IntTensor([self.n_channels]),
)
res_skip_acts = self.res_skip_layers[i](acts)
if i < self.n_layers - 1:
audio = res_skip_acts[:, : self.n_channels, :] + audio
skip_acts = res_skip_acts[:, self.n_channels :, :]
else:
skip_acts = res_skip_acts
if i == 0:
output = skip_acts
else:
output = skip_acts + output
return self.end(output)
class WaveGlow(torch.nn.Module):
def __init__(
self,
n_mel_channels,
n_flows,
n_group,
n_early_every,
n_early_size,
WN_config,
):
super(WaveGlow, self).__init__()
self.upsample = torch.nn.ConvTranspose1d(
n_mel_channels, n_mel_channels, 1024, stride=256
)
assert n_group % 2 == 0
self.n_flows = n_flows
self.n_group = n_group
self.n_early_every = n_early_every
self.n_early_size = n_early_size
self.WN = torch.nn.ModuleList()
self.convinv = torch.nn.ModuleList()
n_half = int(n_group / 2)
# Set up layers with the right sizes based on how many dimensions
# have been output already
n_remaining_channels = n_group
for k in range(n_flows):
if k % self.n_early_every == 0 and k > 0:
n_half = n_half - int(self.n_early_size / 2)
n_remaining_channels = n_remaining_channels - self.n_early_size
self.convinv.append(Invertible1x1Conv(n_remaining_channels))
self.WN.append(WN(n_half, n_mel_channels * n_group, **WN_config))
self.n_remaining_channels = n_remaining_channels
# Useful during inference
def forward(self, forward_input):
"""
forward_input[0] = mel_spectrogram: batch x n_mel_channels x frames
forward_input[1] = audio: batch x time
"""
spect, audio = forward_input
# Upsample spectrogram to size of audio
spect = self.upsample(spect)
assert spect.size(2) >= audio.size(1)
if spect.size(2) > audio.size(1):
spect = spect[:, :, : audio.size(1)]
spect = spect.unfold(2, self.n_group, self.n_group).permute(0, 2, 1, 3)
spect = (
spect.contiguous()
.view(spect.size(0), spect.size(1), -1)
.permute(0, 2, 1)
)
audio = audio.unfold(1, self.n_group, self.n_group).permute(0, 2, 1)
output_audio = []
log_s_list = []
log_det_W_list = []
for k in range(self.n_flows):
if k % self.n_early_every == 0 and k > 0:
output_audio.append(audio[:, : self.n_early_size, :])
audio = audio[:, self.n_early_size :, :]
audio, log_det_W = self.convinv[k](audio)
log_det_W_list.append(log_det_W)
n_half = int(audio.size(1) / 2)
audio_0 = audio[:, :n_half, :]
audio_1 = audio[:, n_half:, :]
output = self.WN[k]((audio_0, spect))
log_s = output[:, n_half:, :]
b = output[:, :n_half, :]
audio_1 = torch.exp(log_s) * audio_1 + b
log_s_list.append(log_s)
audio = torch.cat([audio_0, audio_1], 1)
output_audio.append(audio)
return torch.cat(output_audio, 1), log_s_list, log_det_W_list
def infer(self, spect, sigma=1.0):
spect = self.upsample(spect)
# trim conv artifacts. maybe pad spec to kernel multiple
time_cutoff = self.upsample.kernel_size[0] - self.upsample.stride[0]
spect = spect[:, :, :-time_cutoff]
spect = spect.unfold(2, self.n_group, self.n_group).permute(0, 2, 1, 3)
spect = (
spect.contiguous()
.view(spect.size(0), spect.size(1), -1)
.permute(0, 2, 1)
)
if spect.type() == "torch.cuda.HalfTensor":
audio = torch.cuda.HalfTensor(
spect.size(0), self.n_remaining_channels, spect.size(2)
).normal_()
else:
# cuda.FloatTensor -> FloatTensor
audio = torch.FloatTensor(
spect.size(0), self.n_remaining_channels, spect.size(2)
).normal_()
audio = torch.autograd.Variable(sigma * audio)
for k in reversed(range(self.n_flows)):
n_half = int(audio.size(1) / 2)
audio_0 = audio[:, :n_half, :]
audio_1 = audio[:, n_half:, :]
output = self.WN[k]((audio_0, spect))
s = output[:, n_half:, :]
b = output[:, :n_half, :]
audio_1 = (audio_1 - b) / torch.exp(s)
audio = torch.cat([audio_0, audio_1], 1)
audio = self.convinv[k](audio, reverse=True)
if k % self.n_early_every == 0 and k > 0:
if spect.type() == "torch.cuda.HalfTensor":
z = torch.cuda.HalfTensor(
spect.size(0), self.n_early_size, spect.size(2)
).normal_()
else:
# cuda.FloatTensor -> FloatTensor
z = torch.FloatTensor(
spect.size(0), self.n_early_size, spect.size(2)
).normal_()
audio = torch.cat((sigma * z, audio), 1)
audio = (
audio.permute(0, 2, 1).contiguous().view(audio.size(0), -1).data
)
return audio
@staticmethod
def remove_weightnorm(model):
waveglow = model
for WN in waveglow.WN:
WN.start = torch.nn.utils.remove_weight_norm(WN.start)
WN.in_layers = remove(WN.in_layers)
WN.cond_layers = remove(WN.cond_layers)
WN.res_skip_layers = remove(WN.res_skip_layers)
return waveglow
def remove(conv_list):
new_conv_list = torch.nn.ModuleList()
for old_conv in conv_list:
old_conv = torch.nn.utils.remove_weight_norm(old_conv)
new_conv_list.append(old_conv)
return new_conv_list

88
hparams.py Normal file
View File

@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
import tensorflow as tf
from .text import symbols
# changed path, sampling rate and batch size
def create_hparams(hparams_string=None, verbose=False):
"""Create model hyperparameters. Parse nondefault from given string."""
hparams = tf.contrib.training.HParams(
################################
# Experiment Parameters #
################################
epochs=500,
iters_per_checkpoint=1000,
seed=1234,
dynamic_loss_scaling=True,
fp16_run=False,
distributed_run=False,
dist_backend="nccl",
dist_url="tcp://localhost:54321",
cudnn_enabled=True,
cudnn_benchmark=False,
ignore_layers=["embedding.weight"],
################################
# Data Parameters #
################################
load_mel_from_disk=False,
training_files="lists/tts_data_train_processed.txt",
validation_files="filelists/tts_data_val_processed.txt",
text_cleaners=["english_cleaners"],
################################
# Audio Parameters #
################################
max_wav_value=32768.0,
sampling_rate=16000,
filter_length=1024,
hop_length=256,
win_length=1024,
n_mel_channels=80,
mel_fmin=0.0,
mel_fmax=8000.0,
################################
# Model Parameters #
################################
n_symbols=len(symbols),
symbols_embedding_dim=512,
# Encoder parameters
encoder_kernel_size=5,
encoder_n_convolutions=3,
encoder_embedding_dim=512,
# Decoder parameters
n_frames_per_step=1, # currently only 1 is supported
decoder_rnn_dim=1024,
prenet_dim=256,
max_decoder_steps=1000,
gate_threshold=0.5,
p_attention_dropout=0.1,
p_decoder_dropout=0.1,
# Attention parameters
attention_rnn_dim=1024,
attention_dim=128,
# Location Layer parameters
attention_location_n_filters=32,
attention_location_kernel_size=31,
# Mel-post processing network parameters
postnet_embedding_dim=512,
postnet_kernel_size=5,
postnet_n_convolutions=5,
################################
# Optimization Hyperparameters #
################################
use_saved_learning_rate=False,
learning_rate=1e-3,
weight_decay=1e-6,
grad_clip_thresh=1.0,
batch_size=4,
mask_padding=True, # set model's padded outputs to padded values
)
if hparams_string:
tf.logging.info("Parsing command line hparams: %s", hparams_string)
hparams.parse(hparams_string)
if verbose:
tf.logging.info("Final parsed hparams: %s", hparams.values())
return hparams

105
layers.py Normal file
View File

@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
import torch
from librosa.filters import mel as librosa_mel_fn
from .audio_processing import dynamic_range_compression
from .audio_processing import dynamic_range_decompression
from .stft import STFT
class LinearNorm(torch.nn.Module):
def __init__(self, in_dim, out_dim, bias=True, w_init_gain="linear"):
super(LinearNorm, self).__init__()
self.linear_layer = torch.nn.Linear(in_dim, out_dim, bias=bias)
torch.nn.init.xavier_uniform_(
self.linear_layer.weight,
gain=torch.nn.init.calculate_gain(w_init_gain),
)
def forward(self, x):
return self.linear_layer(x)
class ConvNorm(torch.nn.Module):
def __init__(
self,
in_channels,
out_channels,
kernel_size=1,
stride=1,
padding=None,
dilation=1,
bias=True,
w_init_gain="linear",
):
super(ConvNorm, self).__init__()
if padding is None:
assert kernel_size % 2 == 1
padding = int(dilation * (kernel_size - 1) / 2)
self.conv = torch.nn.Conv1d(
in_channels,
out_channels,
kernel_size=kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
bias=bias,
)
torch.nn.init.xavier_uniform_(
self.conv.weight, gain=torch.nn.init.calculate_gain(w_init_gain)
)
def forward(self, signal):
conv_signal = self.conv(signal)
return conv_signal
class TacotronSTFT(torch.nn.Module):
def __init__(
self,
filter_length=1024,
hop_length=256,
win_length=1024,
n_mel_channels=80,
sampling_rate=22050,
mel_fmin=0.0,
mel_fmax=8000.0,
):
super(TacotronSTFT, self).__init__()
self.n_mel_channels = n_mel_channels
self.sampling_rate = sampling_rate
self.stft_fn = STFT(filter_length, hop_length, win_length)
mel_basis = librosa_mel_fn(
sampling_rate, filter_length, n_mel_channels, mel_fmin, mel_fmax
)
mel_basis = torch.from_numpy(mel_basis).float()
self.register_buffer("mel_basis", mel_basis)
def spectral_normalize(self, magnitudes):
output = dynamic_range_compression(magnitudes)
return output
def spectral_de_normalize(self, magnitudes):
output = dynamic_range_decompression(magnitudes)
return output
def mel_spectrogram(self, y):
"""Computes mel-spectrograms from a batch of waves
PARAMS
------
y: Variable(torch.FloatTensor) with shape (B, T) in range [-1, 1]
RETURNS
-------
mel_output: torch.FloatTensor of shape (B, n_mel_channels, T)
"""
assert torch.min(y.data) >= -1
assert torch.max(y.data) <= 1
magnitudes, phases = self.stft_fn.transform(y)
magnitudes = magnitudes.data
mel_output = torch.matmul(self.mel_basis, magnitudes)
mel_output = self.spectral_normalize(mel_output)
return mel_output

21
loss_function.py Normal file
View File

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
from torch import nn
class Tacotron2Loss(nn.Module):
def __init__(self):
super(Tacotron2Loss, self).__init__()
def forward(self, model_output, targets):
mel_target, gate_target = targets[0], targets[1]
mel_target.requires_grad = False
gate_target.requires_grad = False
gate_target = gate_target.view(-1, 1)
mel_out, mel_out_postnet, gate_out, _ = model_output
gate_out = gate_out.view(-1, 1)
mel_loss = nn.MSELoss()(mel_out, mel_target) + nn.MSELoss()(
mel_out_postnet, mel_target
)
gate_loss = nn.BCEWithLogitsLoss()(gate_out, gate_target)
return mel_loss + gate_loss

644
model.py Normal file
View File

@ -0,0 +1,644 @@
# -*- coding: utf-8 -*-
from math import sqrt
import torch
from torch.autograd import Variable
from torch import nn
from torch.nn import functional as F
from .layers import ConvNorm, LinearNorm
from .utils import to_gpu, get_mask_from_lengths
class LocationLayer(nn.Module):
def __init__(
self, attention_n_filters, attention_kernel_size, attention_dim
):
super(LocationLayer, self).__init__()
padding = int((attention_kernel_size - 1) / 2)
self.location_conv = ConvNorm(
2,
attention_n_filters,
kernel_size=attention_kernel_size,
padding=padding,
bias=False,
stride=1,
dilation=1,
)
self.location_dense = LinearNorm(
attention_n_filters, attention_dim, bias=False, w_init_gain="tanh"
)
def forward(self, attention_weights_cat):
processed_attention = self.location_conv(attention_weights_cat)
processed_attention = processed_attention.transpose(1, 2)
processed_attention = self.location_dense(processed_attention)
return processed_attention
class Attention(nn.Module):
def __init__(
self,
attention_rnn_dim,
embedding_dim,
attention_dim,
attention_location_n_filters,
attention_location_kernel_size,
):
super(Attention, self).__init__()
self.query_layer = LinearNorm(
attention_rnn_dim, attention_dim, bias=False, w_init_gain="tanh"
)
self.memory_layer = LinearNorm(
embedding_dim, attention_dim, bias=False, w_init_gain="tanh"
)
self.v = LinearNorm(attention_dim, 1, bias=False)
self.location_layer = LocationLayer(
attention_location_n_filters,
attention_location_kernel_size,
attention_dim,
)
self.score_mask_value = -float("inf")
def get_alignment_energies(
self, query, processed_memory, attention_weights_cat
):
"""
PARAMS
------
query: decoder output (batch, n_mel_channels * n_frames_per_step)
processed_memory: processed encoder outputs (B, T_in, attention_dim)
attention_weights_cat: cumul. and prev. att weights (B, 2, max_time)
RETURNS
-------
alignment (batch, max_time)
"""
processed_query = self.query_layer(query.unsqueeze(1))
processed_attention_weights = self.location_layer(
attention_weights_cat
)
energies = self.v(
torch.tanh(
processed_query
+ processed_attention_weights
+ processed_memory
)
)
energies = energies.squeeze(-1)
return energies
def forward(
self,
attention_hidden_state,
memory,
processed_memory,
attention_weights_cat,
mask,
):
"""
PARAMS
------
attention_hidden_state: attention rnn last output
memory: encoder outputs
processed_memory: processed encoder outputs
attention_weights_cat: previous and cummulative attention weights
mask: binary mask for padded data
"""
alignment = self.get_alignment_energies(
attention_hidden_state, processed_memory, attention_weights_cat
)
if mask is not None:
alignment.data.masked_fill_(mask, self.score_mask_value)
attention_weights = F.softmax(alignment, dim=1)
attention_context = torch.bmm(attention_weights.unsqueeze(1), memory)
attention_context = attention_context.squeeze(1)
return attention_context, attention_weights
class Prenet(nn.Module):
def __init__(self, in_dim, sizes):
super(Prenet, self).__init__()
in_sizes = [in_dim] + sizes[:-1]
self.layers = nn.ModuleList(
[
LinearNorm(in_size, out_size, bias=False)
for (in_size, out_size) in zip(in_sizes, sizes)
]
)
def forward(self, x):
for linear in self.layers:
x = F.dropout(F.relu(linear(x)), p=0.5, training=True)
return x
class Postnet(nn.Module):
"""Postnet
- Five 1-d convolution with 512 channels and kernel size 5
"""
def __init__(self, hparams):
super(Postnet, self).__init__()
self.convolutions = nn.ModuleList()
self.convolutions.append(
nn.Sequential(
ConvNorm(
hparams.n_mel_channels,
hparams.postnet_embedding_dim,
kernel_size=hparams.postnet_kernel_size,
stride=1,
padding=int((hparams.postnet_kernel_size - 1) / 2),
dilation=1,
w_init_gain="tanh",
),
nn.BatchNorm1d(hparams.postnet_embedding_dim),
)
)
for i in range(1, hparams.postnet_n_convolutions - 1):
self.convolutions.append(
nn.Sequential(
ConvNorm(
hparams.postnet_embedding_dim,
hparams.postnet_embedding_dim,
kernel_size=hparams.postnet_kernel_size,
stride=1,
padding=int((hparams.postnet_kernel_size - 1) / 2),
dilation=1,
w_init_gain="tanh",
),
nn.BatchNorm1d(hparams.postnet_embedding_dim),
)
)
self.convolutions.append(
nn.Sequential(
ConvNorm(
hparams.postnet_embedding_dim,
hparams.n_mel_channels,
kernel_size=hparams.postnet_kernel_size,
stride=1,
padding=int((hparams.postnet_kernel_size - 1) / 2),
dilation=1,
w_init_gain="linear",
),
nn.BatchNorm1d(hparams.n_mel_channels),
)
)
def forward(self, x):
for i in range(len(self.convolutions) - 1):
x = F.dropout(
torch.tanh(self.convolutions[i](x)), 0.5, self.training
)
x = F.dropout(self.convolutions[-1](x), 0.5, self.training)
return x
class Encoder(nn.Module):
"""Encoder module:
- Three 1-d convolution banks
- Bidirectional LSTM
"""
def __init__(self, hparams):
super(Encoder, self).__init__()
convolutions = []
for _ in range(hparams.encoder_n_convolutions):
conv_layer = nn.Sequential(
ConvNorm(
hparams.encoder_embedding_dim,
hparams.encoder_embedding_dim,
kernel_size=hparams.encoder_kernel_size,
stride=1,
padding=int((hparams.encoder_kernel_size - 1) / 2),
dilation=1,
w_init_gain="relu",
),
nn.BatchNorm1d(hparams.encoder_embedding_dim),
)
convolutions.append(conv_layer)
self.convolutions = nn.ModuleList(convolutions)
self.lstm = nn.LSTM(
hparams.encoder_embedding_dim,
int(hparams.encoder_embedding_dim / 2),
1,
batch_first=True,
bidirectional=True,
)
def forward(self, x, input_lengths):
for conv in self.convolutions:
x = F.dropout(F.relu(conv(x)), 0.5, self.training)
x = x.transpose(1, 2)
# pytorch tensor are not reversible, hence the conversion
input_lengths = input_lengths.cpu().numpy()
x = nn.utils.rnn.pack_padded_sequence(
x, input_lengths, batch_first=True
)
self.lstm.flatten_parameters()
outputs, _ = self.lstm(x)
outputs, _ = nn.utils.rnn.pad_packed_sequence(
outputs, batch_first=True
)
return outputs
def inference(self, x):
for conv in self.convolutions:
x = F.dropout(F.relu(conv(x)), 0.5, self.training)
x = x.transpose(1, 2)
self.lstm.flatten_parameters()
outputs, _ = self.lstm(x)
return outputs
class Decoder(nn.Module):
def __init__(self, hparams):
super(Decoder, self).__init__()
self.n_mel_channels = hparams.n_mel_channels
self.n_frames_per_step = hparams.n_frames_per_step
self.encoder_embedding_dim = hparams.encoder_embedding_dim
self.attention_rnn_dim = hparams.attention_rnn_dim
self.decoder_rnn_dim = hparams.decoder_rnn_dim
self.prenet_dim = hparams.prenet_dim
self.max_decoder_steps = hparams.max_decoder_steps
self.gate_threshold = hparams.gate_threshold
self.p_attention_dropout = hparams.p_attention_dropout
self.p_decoder_dropout = hparams.p_decoder_dropout
self.prenet = Prenet(
hparams.n_mel_channels * hparams.n_frames_per_step,
[hparams.prenet_dim, hparams.prenet_dim],
)
self.attention_rnn = nn.LSTMCell(
hparams.prenet_dim + hparams.encoder_embedding_dim,
hparams.attention_rnn_dim,
)
self.attention_layer = Attention(
hparams.attention_rnn_dim,
hparams.encoder_embedding_dim,
hparams.attention_dim,
hparams.attention_location_n_filters,
hparams.attention_location_kernel_size,
)
self.decoder_rnn = nn.LSTMCell(
hparams.attention_rnn_dim + hparams.encoder_embedding_dim,
hparams.decoder_rnn_dim,
1,
)
self.linear_projection = LinearNorm(
hparams.decoder_rnn_dim + hparams.encoder_embedding_dim,
hparams.n_mel_channels * hparams.n_frames_per_step,
)
self.gate_layer = LinearNorm(
hparams.decoder_rnn_dim + hparams.encoder_embedding_dim,
1,
bias=True,
w_init_gain="sigmoid",
)
def get_go_frame(self, memory):
""" Gets all zeros frames to use as first decoder input
PARAMS
------
memory: decoder outputs
RETURNS
-------
decoder_input: all zeros frames
"""
B = memory.size(0)
decoder_input = Variable(
memory.data.new(
B, self.n_mel_channels * self.n_frames_per_step
).zero_()
)
return decoder_input
def initialize_decoder_states(self, memory, mask):
""" Initializes attention rnn states, decoder rnn states, attention
weights, attention cumulative weights, attention context, stores memory
and stores processed memory
PARAMS
------
memory: Encoder outputs
mask: Mask for padded data if training, expects None for inference
"""
B = memory.size(0)
MAX_TIME = memory.size(1)
self.attention_hidden = Variable(
memory.data.new(B, self.attention_rnn_dim).zero_()
)
self.attention_cell = Variable(
memory.data.new(B, self.attention_rnn_dim).zero_()
)
self.decoder_hidden = Variable(
memory.data.new(B, self.decoder_rnn_dim).zero_()
)
self.decoder_cell = Variable(
memory.data.new(B, self.decoder_rnn_dim).zero_()
)
self.attention_weights = Variable(memory.data.new(B, MAX_TIME).zero_())
self.attention_weights_cum = Variable(
memory.data.new(B, MAX_TIME).zero_()
)
self.attention_context = Variable(
memory.data.new(B, self.encoder_embedding_dim).zero_()
)
self.memory = memory
self.processed_memory = self.attention_layer.memory_layer(memory)
self.mask = mask
def parse_decoder_inputs(self, decoder_inputs):
""" Prepares decoder inputs, i.e. mel outputs
PARAMS
------
decoder_inputs: inputs used for teacher-forced training, i.e. mel-specs
RETURNS
-------
inputs: processed decoder inputs
"""
# (B, n_mel_channels, T_out) -> (B, T_out, n_mel_channels)
decoder_inputs = decoder_inputs.transpose(1, 2)
decoder_inputs = decoder_inputs.view(
decoder_inputs.size(0),
int(decoder_inputs.size(1) / self.n_frames_per_step),
-1,
)
# (B, T_out, n_mel_channels) -> (T_out, B, n_mel_channels)
decoder_inputs = decoder_inputs.transpose(0, 1)
return decoder_inputs
def parse_decoder_outputs(self, mel_outputs, gate_outputs, alignments):
""" Prepares decoder outputs for output
PARAMS
------
mel_outputs:
gate_outputs: gate output energies
alignments:
RETURNS
-------
mel_outputs:
gate_outpust: gate output energies
alignments:
"""
# (T_out, B) -> (B, T_out)
alignments = torch.stack(alignments).transpose(0, 1)
# (T_out, B) -> (B, T_out)
gate_outputs = torch.stack(gate_outputs).transpose(0, 1)
gate_outputs = gate_outputs.contiguous()
# (T_out, B, n_mel_channels) -> (B, T_out, n_mel_channels)
mel_outputs = torch.stack(mel_outputs).transpose(0, 1).contiguous()
# decouple frames per step
mel_outputs = mel_outputs.view(
mel_outputs.size(0), -1, self.n_mel_channels
)
# (B, T_out, n_mel_channels) -> (B, n_mel_channels, T_out)
mel_outputs = mel_outputs.transpose(1, 2)
return mel_outputs, gate_outputs, alignments
def decode(self, decoder_input):
""" Decoder step using stored states, attention and memory
PARAMS
------
decoder_input: previous mel output
RETURNS
-------
mel_output:
gate_output: gate output energies
attention_weights:
"""
cell_input = torch.cat((decoder_input, self.attention_context), -1)
self.attention_hidden, self.attention_cell = self.attention_rnn(
cell_input, (self.attention_hidden, self.attention_cell)
)
self.attention_hidden = F.dropout(
self.attention_hidden, self.p_attention_dropout, self.training
)
attention_weights_cat = torch.cat(
(
self.attention_weights.unsqueeze(1),
self.attention_weights_cum.unsqueeze(1),
),
dim=1,
)
self.attention_context, self.attention_weights = self.attention_layer(
self.attention_hidden,
self.memory,
self.processed_memory,
attention_weights_cat,
self.mask,
)
self.attention_weights_cum += self.attention_weights
decoder_input = torch.cat(
(self.attention_hidden, self.attention_context), -1
)
self.decoder_hidden, self.decoder_cell = self.decoder_rnn(
decoder_input, (self.decoder_hidden, self.decoder_cell)
)
self.decoder_hidden = F.dropout(
self.decoder_hidden, self.p_decoder_dropout, self.training
)
decoder_hidden_attention_context = torch.cat(
(self.decoder_hidden, self.attention_context), dim=1
)
decoder_output = self.linear_projection(
decoder_hidden_attention_context
)
gate_prediction = self.gate_layer(decoder_hidden_attention_context)
return decoder_output, gate_prediction, self.attention_weights
def forward(self, memory, decoder_inputs, memory_lengths):
""" Decoder forward pass for training
PARAMS
------
memory: Encoder outputs
decoder_inputs: Decoder inputs for teacher forcing. i.e. mel-specs
memory_lengths: Encoder output lengths for attention masking.
RETURNS
-------
mel_outputs: mel outputs from the decoder
gate_outputs: gate outputs from the decoder
alignments: sequence of attention weights from the decoder
"""
decoder_input = self.get_go_frame(memory).unsqueeze(0)
decoder_inputs = self.parse_decoder_inputs(decoder_inputs)
decoder_inputs = torch.cat((decoder_input, decoder_inputs), dim=0)
decoder_inputs = self.prenet(decoder_inputs)
self.initialize_decoder_states(
memory, mask=~get_mask_from_lengths(memory_lengths)
)
mel_outputs, gate_outputs, alignments = [], [], []
while len(mel_outputs) < decoder_inputs.size(0) - 1:
decoder_input = decoder_inputs[len(mel_outputs)]
mel_output, gate_output, attention_weights = self.decode(
decoder_input
)
mel_outputs += [mel_output.squeeze(1)]
gate_outputs += [gate_output.squeeze()]
alignments += [attention_weights]
mel_outputs, gate_outputs, alignments = self.parse_decoder_outputs(
mel_outputs, gate_outputs, alignments
)
return mel_outputs, gate_outputs, alignments
def inference(self, memory):
""" Decoder inference
PARAMS
------
memory: Encoder outputs
RETURNS
-------
mel_outputs: mel outputs from the decoder
gate_outputs: gate outputs from the decoder
alignments: sequence of attention weights from the decoder
"""
decoder_input = self.get_go_frame(memory)
self.initialize_decoder_states(memory, mask=None)
mel_outputs, gate_outputs, alignments = [], [], []
while True:
decoder_input = self.prenet(decoder_input)
mel_output, gate_output, alignment = self.decode(decoder_input)
mel_outputs += [mel_output.squeeze(1)]
gate_outputs += [gate_output]
alignments += [alignment]
if torch.sigmoid(gate_output.data) > self.gate_threshold:
break
elif len(mel_outputs) == self.max_decoder_steps:
print("Warning! Reached max decoder steps")
break
decoder_input = mel_output
mel_outputs, gate_outputs, alignments = self.parse_decoder_outputs(
mel_outputs, gate_outputs, alignments
)
return mel_outputs, gate_outputs, alignments
class Tacotron2(nn.Module):
def __init__(self, hparams):
super(Tacotron2, self).__init__()
self.mask_padding = hparams.mask_padding
self.fp16_run = hparams.fp16_run
self.n_mel_channels = hparams.n_mel_channels
self.n_frames_per_step = hparams.n_frames_per_step
self.embedding = nn.Embedding(
hparams.n_symbols, hparams.symbols_embedding_dim
)
std = sqrt(2.0 / (hparams.n_symbols + hparams.symbols_embedding_dim))
val = sqrt(3.0) * std # uniform bounds for std
self.embedding.weight.data.uniform_(-val, val)
self.encoder = Encoder(hparams)
self.decoder = Decoder(hparams)
self.postnet = Postnet(hparams)
def parse_batch(self, batch):
text_padded, input_lengths, mel_padded, gate_padded, output_lengths = (
batch
)
text_padded = to_gpu(text_padded).long()
input_lengths = to_gpu(input_lengths).long()
max_len = torch.max(input_lengths.data).item()
mel_padded = to_gpu(mel_padded).float()
gate_padded = to_gpu(gate_padded).float()
output_lengths = to_gpu(output_lengths).long()
return (
(text_padded, input_lengths, mel_padded, max_len, output_lengths),
(mel_padded, gate_padded),
)
def parse_output(self, outputs, output_lengths=None):
if self.mask_padding and output_lengths is not None:
mask = ~get_mask_from_lengths(output_lengths)
mask = mask.expand(self.n_mel_channels, mask.size(0), mask.size(1))
mask = mask.permute(1, 0, 2)
outputs[0].data.masked_fill_(mask, 0.0)
outputs[1].data.masked_fill_(mask, 0.0)
outputs[2].data.masked_fill_(mask[:, 0, :], 1e3) # gate energies
return outputs
def forward(self, inputs):
text_inputs, text_lengths, mels, max_len, output_lengths = inputs
text_lengths, output_lengths = text_lengths.data, output_lengths.data
embedded_inputs = self.embedding(text_inputs).transpose(1, 2)
encoder_outputs = self.encoder(embedded_inputs, text_lengths)
mel_outputs, gate_outputs, alignments = self.decoder(
encoder_outputs, mels, memory_lengths=text_lengths
)
mel_outputs_postnet = self.postnet(mel_outputs)
mel_outputs_postnet = mel_outputs + mel_outputs_postnet
return self.parse_output(
[mel_outputs, mel_outputs_postnet, gate_outputs, alignments],
output_lengths,
)
def inference(self, inputs):
embedded_inputs = self.embedding(inputs).transpose(1, 2)
encoder_outputs = self.encoder.inference(embedded_inputs)
mel_outputs, gate_outputs, alignments = self.decoder.inference(
encoder_outputs
)
mel_outputs_postnet = self.postnet(mel_outputs)
mel_outputs_postnet = mel_outputs + mel_outputs_postnet
outputs = self.parse_output(
[mel_outputs, mel_outputs_postnet, gate_outputs, alignments]
)
return outputs

38
server.py Normal file
View File

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
import grpc
import time
from sia.proto import tts_pb2
from sia.proto import tts_pb2_grpc
from concurrent import futures
from .tts import TTSModel
class TTSServer:
def __init__(self):
self.tts_model = TTSModel()
def TextToSpeechAPI(self, request, context):
while True:
input_text = request.text
speech_response = self.tts_model.synth_speech(input_text)
return tts_pb2.SpeechResponse(response=speech_response)
def main():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
tts_server = TTSServer()
tts_pb2_grpc.add_ServerServicer_to_server(tts_server, server)
server.add_insecure_port("localhost:50060")
server.start()
print("TTSServer started!")
try:
while True:
time.sleep(10000)
except KeyboardInterrupt:
server.start()
# server.stop(0)
if __name__ == "__main__":
main()

172
stft.py Normal file
View File

@ -0,0 +1,172 @@
# -*- coding: utf-8 -*-
"""
BSD 3-Clause License
Copyright (c) 2017, Prem Seetharaman
All rights reserved.
* Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import torch
import numpy as np
import torch.nn.functional as F
from torch.autograd import Variable
from scipy.signal import get_window
from librosa.util import pad_center, tiny
from .audio_processing import window_sumsquare
class STFT(torch.nn.Module):
"""
adapted from Prem Seetharaman's
https://github.com/pseeth/pytorch-stft
"""
def __init__(
self, filter_length=800, hop_length=200, win_length=800, window="hann"
):
super(STFT, self).__init__()
self.filter_length = filter_length
self.hop_length = hop_length
self.win_length = win_length
self.window = window
self.forward_transform = None
scale = self.filter_length / self.hop_length
fourier_basis = np.fft.fft(np.eye(self.filter_length))
cutoff = int((self.filter_length / 2 + 1))
fourier_basis = np.vstack(
[
np.real(fourier_basis[:cutoff, :]),
np.imag(fourier_basis[:cutoff, :]),
]
)
forward_basis = torch.FloatTensor(fourier_basis[:, None, :])
inverse_basis = torch.FloatTensor(
np.linalg.pinv(scale * fourier_basis).T[:, None, :]
)
if window is not None:
assert filter_length >= win_length
# get window and zero center pad it to filter_length
fft_window = get_window(window, win_length, fftbins=True)
fft_window = pad_center(fft_window, filter_length)
fft_window = torch.from_numpy(fft_window).float()
# window the bases
forward_basis *= fft_window
inverse_basis *= fft_window
self.register_buffer("forward_basis", forward_basis.float())
self.register_buffer("inverse_basis", inverse_basis.float())
def transform(self, input_data):
num_batches = input_data.size(0)
num_samples = input_data.size(1)
self.num_samples = num_samples
# similar to librosa, reflect-pad the input
input_data = input_data.view(num_batches, 1, num_samples)
input_data = F.pad(
input_data.unsqueeze(1),
(int(self.filter_length / 2), int(self.filter_length / 2), 0, 0),
mode="reflect",
)
input_data = input_data.squeeze(1)
forward_transform = F.conv1d(
input_data,
Variable(self.forward_basis, requires_grad=False),
stride=self.hop_length,
padding=0,
)
cutoff = int((self.filter_length / 2) + 1)
real_part = forward_transform[:, :cutoff, :]
imag_part = forward_transform[:, cutoff:, :]
magnitude = torch.sqrt(real_part ** 2 + imag_part ** 2)
phase = torch.autograd.Variable(
torch.atan2(imag_part.data, real_part.data)
)
return magnitude, phase
def inverse(self, magnitude, phase):
recombine_magnitude_phase = torch.cat(
[magnitude * torch.cos(phase), magnitude * torch.sin(phase)], dim=1
)
inverse_transform = F.conv_transpose1d(
recombine_magnitude_phase,
Variable(self.inverse_basis, requires_grad=False),
stride=self.hop_length,
padding=0,
)
if self.window is not None:
window_sum = window_sumsquare(
self.window,
magnitude.size(-1),
hop_length=self.hop_length,
win_length=self.win_length,
n_fft=self.filter_length,
dtype=np.float32,
)
# remove modulation effects
approx_nonzero_indices = torch.from_numpy(
np.where(window_sum > tiny(window_sum))[0]
)
window_sum = torch.autograd.Variable(
torch.from_numpy(window_sum), requires_grad=False
)
# window_sum = window_sum.cuda() if magnitude.is_cuda else
# window_sum
# initially not commented out
inverse_transform[:, :, approx_nonzero_indices] /= window_sum[
approx_nonzero_indices
]
# scale by hop ratio
inverse_transform *= float(self.filter_length) / self.hop_length
inverse_transform = inverse_transform[
:, :, int(self.filter_length / 2) :
]
inverse_transform = inverse_transform[
:, :, : -int(self.filter_length / 2) :
]
return inverse_transform
def forward(self, input_data):
self.magnitude, self.phase = self.transform(input_data)
reconstruction = self.inverse(self.magnitude, self.phase)
return reconstruction

53
tts.py
View File

@ -6,6 +6,7 @@ import torch
from .hparams import create_hparams from .hparams import create_hparams
from .text import text_to_sequence from .text import text_to_sequence
from .glow import WaveGlow from .glow import WaveGlow
# import os # import os
# import soundfile as sf # import soundfile as sf
import pyaudio import pyaudio
@ -26,11 +27,7 @@ WAVEGLOW_CONFIG = {
"n_group": 8, "n_group": 8,
"n_early_every": 4, "n_early_every": 4,
"n_early_size": 2, "n_early_size": 2,
"WN_config": { "WN_config": {"n_layers": 8, "n_channels": 256, "kernel_size": 3},
"n_layers": 8,
"n_channels": 256,
"kernel_size": 3
}
} }
@ -44,32 +41,36 @@ class TTSModel(object):
self.model = Tacotron2(hparams) self.model = Tacotron2(hparams)
tacotron2_path = cached_model_path("tacotron2_model") tacotron2_path = cached_model_path("tacotron2_model")
self.model.load_state_dict( self.model.load_state_dict(
torch.load(tacotron2_path, map_location='cpu')['state_dict']) torch.load(tacotron2_path, map_location="cpu")["state_dict"]
)
self.model.eval() self.model.eval()
waveglow_path = cached_model_path('waveglow_model') waveglow_path = cached_model_path("waveglow_model")
self.waveglow = WaveGlow(**WAVEGLOW_CONFIG) self.waveglow = WaveGlow(**WAVEGLOW_CONFIG)
wave_params = torch.load(waveglow_path, map_location='cpu') wave_params = torch.load(waveglow_path, map_location="cpu")
self.waveglow.load_state_dict(wave_params) self.waveglow.load_state_dict(wave_params)
self.waveglow.eval() self.waveglow.eval()
for k in self.waveglow.convinv: for k in self.waveglow.convinv:
k.float() k.float()
self.k_cache = klepto.archives.file_archive(cached=False) self.k_cache = klepto.archives.file_archive(cached=False)
self.synth_speech = klepto.safe.inf_cache(cache=self.k_cache)( self.synth_speech = klepto.safe.inf_cache(cache=self.k_cache)(
self.synth_speech) self.synth_speech
)
# https://github.com/NVIDIA/waveglow/issues/127 # https://github.com/NVIDIA/waveglow/issues/127
for m in self.waveglow.modules(): for m in self.waveglow.modules():
if 'Conv' in str(type(m)): if "Conv" in str(type(m)):
setattr(m, 'padding_mode', 'zeros') setattr(m, "padding_mode", "zeros")
@do_time @do_time
def synth_speech(self, t): def synth_speech(self, t):
text = t text = t
sequence = np.array(text_to_sequence(text, sequence = np.array(text_to_sequence(text, ["english_cleaners"]))[
['english_cleaners']))[None, :] None, :
]
sequence = torch.autograd.Variable(torch.from_numpy(sequence)).long() sequence = torch.autograd.Variable(torch.from_numpy(sequence)).long()
mel_outputs, mel_outputs_postnet, _, alignments = self.model.inference( mel_outputs, mel_outputs_postnet, _, alignments = self.model.inference(
sequence) sequence
)
with torch.no_grad(): with torch.no_grad():
audio_t = self.waveglow.infer(mel_outputs_postnet, sigma=0.666) audio_t = self.waveglow.infer(mel_outputs_postnet, sigma=0.666)
audio = audio_t[0].data.cpu().numpy() audio = audio_t[0].data.cpu().numpy()
@ -92,7 +93,7 @@ class TTSModel(object):
# https://github.com/mgeier/python-audio/blob/master/audio-files/utility.py # https://github.com/mgeier/python-audio/blob/master/audio-files/utility.py
def float2pcm(sig, dtype='int16'): def float2pcm(sig, dtype="int16"):
"""Convert floating point signal with a range from -1 to 1 to PCM. """Convert floating point signal with a range from -1 to 1 to PCM.
Any signal values outside the interval [-1.0, 1.0) are clipped. Any signal values outside the interval [-1.0, 1.0) are clipped.
No dithering is used. No dithering is used.
@ -116,30 +117,33 @@ def float2pcm(sig, dtype='int16'):
pcm2float, dtype pcm2float, dtype
""" """
sig = np.asarray(sig) sig = np.asarray(sig)
if sig.dtype.kind != 'f': if sig.dtype.kind != "f":
raise TypeError("'sig' must be a float array") raise TypeError("'sig' must be a float array")
dtype = np.dtype(dtype) dtype = np.dtype(dtype)
if dtype.kind not in 'iu': if dtype.kind not in "iu":
raise TypeError("'dtype' must be an integer type") raise TypeError("'dtype' must be an integer type")
i = np.iinfo(dtype) i = np.iinfo(dtype)
abs_max = 2**(i.bits - 1) abs_max = 2 ** (i.bits - 1)
offset = i.min + abs_max offset = i.min + abs_max
return (sig * abs_max + offset).clip(i.min, i.max).astype(dtype) return (sig * abs_max + offset).clip(i.min, i.max).astype(dtype)
def display(data): def display(data):
import IPython.display as ipd import IPython.display as ipd
aud = ipd.Audio(data, rate=16000) aud = ipd.Audio(data, rate=16000)
return aud return aud
def player_gen(): def player_gen():
audio_interface = pyaudio.PyAudio() audio_interface = pyaudio.PyAudio()
_audio_stream = audio_interface.open(format=pyaudio.paInt16, _audio_stream = audio_interface.open(
channels=1, format=pyaudio.paInt16,
rate=OUTPUT_SAMPLE_RATE, channels=1,
output=True) rate=OUTPUT_SAMPLE_RATE,
output=True,
)
def play_device(data): def play_device(data):
_audio_stream.write(data) _audio_stream.write(data)
@ -151,7 +155,7 @@ def player_gen():
def synthesize_corpus(): def synthesize_corpus():
tts_model = TTSModel() tts_model = TTSModel()
all_data = [] all_data = []
for (i, line) in enumerate(open('corpus.txt').readlines()): for (i, line) in enumerate(open("corpus.txt").readlines()):
print('synthesizing... "{}"'.format(line.strip())) print('synthesizing... "{}"'.format(line.strip()))
data = tts_model.synth_speech(line.strip()) data = tts_model.synth_speech(line.strip())
all_data.append(data) all_data.append(data)
@ -168,8 +172,9 @@ def main():
corpus_synth_data = synthesize_corpus() corpus_synth_data = synthesize_corpus()
play_corpus(corpus_synth_data) play_corpus(corpus_synth_data)
import ipdb import ipdb
ipdb.set_trace() ipdb.set_trace()
if __name__ == '__main__': if __name__ == "__main__":
main() main()

32
utils.py Normal file
View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
import numpy as np
from scipy.io.wavfile import read
import torch
def get_mask_from_lengths(lengths):
max_len = torch.max(lengths).item()
ids = torch.arange(
0, max_len, out=torch.LongTensor(max_len)
) # initially out = torch.LongTensor(max_len)
mask = (ids < lengths.unsqueeze(1)).byte()
return mask
def load_wav_to_torch(full_path):
sampling_rate, data = read(full_path)
return torch.FloatTensor(data.astype(np.float32)), sampling_rate
def load_filepaths_and_text(filename, split="|"):
with open(filename, encoding="utf-8") as f:
filepaths_and_text = [line.strip().split(split) for line in f]
return filepaths_and_text
def to_gpu(x):
x = x.contiguous()
# if torch.cuda.is_available(): #initially not commented out
# x = x.cuda(non_blocking=True) # initially not commented out
return torch.autograd.Variable(x)