Compare commits

..

No commits in common. "f1e82a2539f7b8b2d692208379533cb0a246a375" and "e6f0c8b21b276962677679c7848ada02ff317c04" have entirely different histories.

9 changed files with 323 additions and 354 deletions

55
arpabet-to-apple.py Normal file
View File

@ -0,0 +1,55 @@
#!/usr/bin/env python3
"""
Convert ARPABET <http://www.speech.cs.cmu.edu/cgi-bin/cmudict>
to Apple's codes <https://developer.apple.com/library/content/documentation/UserExperience/Conceptual/SpeechSynthesisProgrammingGuide/Phonemes/Phonemes.html>
"""
import sys
mapping = {s.split()[0]: s.split()[1] for s in """
AA AA
AE AE
AH UX
AO AO
AW AW
AY AY
B b
CH C
D d
DH D
EH EH
ER UXr
EY EY
F f
G g
HH h
IH IH
IY IY
JH J
K k
L l
M m
N n
NG N
OW OW
OY OY
P p
R r
S s
SH S
T t
TH T
UH UH
UW UW
V v
W w
Y y
Z z
ZH Z
""".strip().split('\n')}
arpabet_phonemes = sys.stdin.read().split()
apple_phonemes = [mapping[p.upper()] for p in arpabet_phonemes]
print('[[inpt PHON]] ' + ''.join(apple_phonemes))

10
create_triplets.py Normal file
View File

@ -0,0 +1,10 @@
import pandas as pd
audio_file = pd.read_csv('./outputs/audio.csv',names=['word','voice','rate','type','filename'])
word_goups = audio_file.groupby('word')
# audio
lst = [1, 2, 3, 1, 2, 3]
s = pd.Series([1, 2, 3, 10, 20, 30], lst)
df3 = pd.DataFrame({'X' : ['A', 'B', 'A', 'B'], 'Y' : [1, 4, 3, 2]})
s.groupby(level=0).sum()

View File

@ -1,42 +1,22 @@
import pyaudio
import numpy as np
# from matplotlib import pyplot as plt
from spectro_gen import plot_stft, generate_spectrogram
from matplotlib import pyplot as plt
CHUNKSIZE = 1024 # fixed chunk size
def record_spectrogram(n_sec, plot=False, playback=False):
SAMPLE_RATE = 22050
N_CHANNELS = 2
N_SEC = n_sec
CHUNKSIZE = int(SAMPLE_RATE * N_SEC / N_CHANNELS) # fixed chunk size
# show_record_prompt()
input('Press [Enter] to start recording sample... ')
p_inp = pyaudio.PyAudio()
stream = p_inp.open(
format=pyaudio.paFloat32,
channels=N_CHANNELS,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=CHUNKSIZE)
data = stream.read(CHUNKSIZE)
numpydata = np.frombuffer(data, dtype=np.float32)
multi_channel = np.abs(np.reshape(numpydata, (-1, 2))).mean(axis=1)
one_channel = np.asarray([multi_channel, -1 * multi_channel]).T.reshape(-1)
mean_channel_data = one_channel.tobytes()
stream.stop_stream()
stream.close()
p_inp.terminate()
if plot:
plot_stft(one_channel, SAMPLE_RATE)
if playback:
p_oup = pyaudio.PyAudio()
stream = p_oup.open(
format=pyaudio.paFloat32,
channels=2,
rate=SAMPLE_RATE,
output=True)
stream.write(mean_channel_data)
stream.close()
p_oup.terminate()
ims, _ = generate_spectrogram(one_channel, SAMPLE_RATE)
return ims
# initialize portaudio
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16, channels=1, rate=44100, input=True, frames_per_buffer=CHUNKSIZE)
# do this as long as you want fresh samples
data = stream.read(CHUNKSIZE)
numpydata = np.fromstring(data, dtype=np.int16)
# plot data
plt.plot(numpydata)
plt.show()
# close stream
stream.stop_stream()
stream.close()
p.terminate()

12
snippets.py Normal file
View File

@ -0,0 +1,12 @@
# import scipy.signal as sg
# import pysndfile.sndio as snd
#
# snd_data,samples,_ = snd.read('./outputs/sunflowers-Alex-150-normal-589.aiff')
# samples_per_seg = 3*int(samples*150/(3*1000))
# # samples/(len(snd_data)*1000.0)
# len(snd_data)
# samples_per_seg/2
#
# len(sg.spectrogram(snd_data,nperseg=samples_per_seg,noverlap=samples_per_seg/3)[2])
#
# from spectro_gen import generate_aiff_spectrogram

View File

@ -1,19 +1,16 @@
#!/usr/bin/env python
""" This work is licensed under a Creative Commons Attribution 3.0 Unported
License.
#coding: utf-8
""" This work is licensed under a Creative Commons Attribution 3.0 Unported License.
Frank Zalkow, 2012-2013
http://www.frank-zalkow.de/en/code-snippets/create-audio-spectrograms-with-python.html?i=1
"""
# %matplotlib inline
import numpy as np
import pyaudio
from matplotlib import pyplot as plt
from pysndfile import sndio as snd
from numpy.lib import stride_tricks
""" short time fourier transform of audio signal """
def stft(sig, frameSize, overlapFac=0.5, window=np.hanning):
win = window(frameSize)
hopSize = int(frameSize - np.floor(overlapFac * frameSize))
@ -21,128 +18,94 @@ def stft(sig, frameSize, overlapFac=0.5, window=np.hanning):
# zeros at beginning (thus center of 1st window should be for sample nr. 0)
# sig = (sig*255).astype(np.uint8)
# import pdb;pdb.set_trace()
count = int(np.floor(frameSize / 2.0))
count = int(np.floor(frameSize/2.0))
# import pdb;pdb.set_trace()
samples = np.append(np.zeros(count), sig)
# cols for windowing
cols = int(np.ceil((len(samples) - frameSize) / float(hopSize)) + 1)
cols = int(np.ceil( (len(samples) - frameSize) / float(hopSize)) + 1)
# zeros at end (thus samples can be fully covered by frames)
samples = np.append(samples, np.zeros(frameSize))
frames = stride_tricks.as_strided(
samples,
shape=(cols, frameSize),
strides=(samples.strides[0] * hopSize, samples.strides[0])).copy()
frames = stride_tricks.as_strided(samples, shape=(cols, frameSize), strides=(samples.strides[0]*hopSize, samples.strides[0])).copy()
frames *= win
return np.fft.rfft(frames)
""" scale frequency axis logarithmically """
def logscale_spec(spec, sr=44100, factor=20.):
timebins, freqbins = np.shape(spec)
scale = np.linspace(0, 1, freqbins)**factor
scale *= (freqbins - 1) / max(scale)
scale = np.linspace(0, 1, freqbins) ** factor
scale *= (freqbins-1)/max(scale)
scale = np.unique(np.round(scale)).astype(np.uint32)
# import pdb;pdb.set_trace()
# create spectrogram with new freq bins
newspec = np.complex128(np.zeros([timebins, len(scale)]))
for i in range(0, len(scale)):
if i == len(scale) - 1:
newspec[:, i] = np.sum(spec[:, scale[i]:], axis=1)
if i == len(scale)-1:
newspec[:,i] = np.sum(spec[:,scale[i]:], axis=1)
else:
newspec[:, i] = np.sum(spec[:, scale[i]:scale[i + 1]], axis=1)
newspec[:,i] = np.sum(spec[:,scale[i]:scale[i+1]], axis=1)
# list center freq of bins
allfreqs = np.abs(np.fft.fftfreq(freqbins * 2, 1. / sr)[:freqbins + 1])
allfreqs = np.abs(np.fft.fftfreq(freqbins*2, 1./sr)[:freqbins+1])
freqs = []
for i in range(0, len(scale)):
if i == len(scale) - 1:
if i == len(scale)-1:
freqs += [np.mean(allfreqs[scale[i]:])]
else:
freqs += [np.mean(allfreqs[scale[i]:scale[i + 1]])]
freqs += [np.mean(allfreqs[scale[i]:scale[i+1]])]
return newspec, freqs
""" generate spectrogram for aiff audio with 150ms windows and 50ms overlap"""
def generate_spectrogram(samples, samplerate):
def generate_aiff_spectrogram(audiopath):
samples,samplerate,_ = snd.read(audiopath)
# samplerate, samples = wav.read(audiopath)
# s = stft(samples, binsize)
s = stft(samples, samplerate * 150 // 1000, 1.0 / 3)
s = stft(samples, samplerate*150//1000,1.0/3)
sshow, freq = logscale_spec(s, factor=1.0, sr=samplerate)
ims = 20. * np.log10(np.abs(sshow) / 10e-6)
return ims, freq
def generate_aiff_spectrogram(audiopath):
samples, samplerate, _ = snd.read(audiopath)
ims, _ = generate_spectrogram(samples, samplerate)
ims = 20.*np.log10(np.abs(sshow)/10e-6)
return ims
""" plot spectrogram"""
def plotstft(audiopath, binsize=2**10, plotpath=None, colormap="jet"):
samples,samplerate,_ = snd.read(audiopath)
# samplerate, samples = wav.read(audiopath)
# s = stft(samples, binsize)
# print(samplerate*150//1000)
s = stft(samples, samplerate*150//1000,1.0/3)
sshow, freq = logscale_spec(s, factor=1.0, sr=samplerate)
ims = 20.*np.log10(np.abs(sshow)/10e-6) # amplitude to decibel
def plot_stft(samples, samplerate, binsize=2**10, plotpath=None, colormap="jet"):
(ims, freq) = generate_spectrogram(samples, samplerate)
timebins, freqbins = np.shape(ims)
# import pdb;pdb.set_trace()
plt.figure(figsize=(15, 7.5))
plt.imshow(
np.transpose(ims),
origin="lower",
aspect="auto",
cmap=colormap,
interpolation="none")
plt.imshow(np.transpose(ims), origin="lower", aspect="auto", cmap=colormap, interpolation="none")
plt.colorbar()
plt.xlabel("time (s)")
plt.ylabel("frequency (hz)")
plt.xlim([0, timebins - 1])
plt.xlim([0, timebins-1])
plt.ylim([0, freqbins])
xlocs = np.float32(np.linspace(0, timebins - 1, 5))
plt.xticks(xlocs, [
"%.02f" % l
for l in (
(xlocs * len(samples) / timebins) + (0.5 * binsize)) / samplerate
])
ylocs = np.int16(np.round(np.linspace(0, freqbins - 1, 10)))
xlocs = np.float32(np.linspace(0, timebins-1, 5))
plt.xticks(xlocs, ["%.02f" % l for l in ((xlocs*len(samples)/timebins)+(0.5*binsize))/samplerate])
ylocs = np.int16(np.round(np.linspace(0, freqbins-1, 10)))
plt.yticks(ylocs, ["%.02f" % freq[i] for i in ylocs])
if plotpath:
plt.savefig(plotpath, bbox_inches="tight")
else:
plt.show()
plt.clf()
def plot_aiff_stft(audiopath, binsize=2**10, plotpath=None, colormap="jet"):
samples, samplerate, _ = snd.read(audiopath)
plot_stft(samples, samplerate)
def play_sunflower():
sample_r = snd.get_info('./outputs/sunflowers-Alex-150-normal-589.aiff')[0]
snd_data_f64 = snd.read('./outputs/sunflowers-Alex-150-normal-589.aiff')[0]
snd_data_f32 = snd_data_f64.astype(np.float32)
print(snd_data_f32.shape)
snd_data = snd_data_f32.tobytes()
p_oup = pyaudio.PyAudio()
stream = p_oup.open(
format=pyaudio.paFloat32, channels=1, rate=sample_r, output=True)
stream.write(snd_data)
stream.close()
p_oup.terminate()
plot_stft(snd_data_f32, sample_r)
if __name__ == '__main__':
play_sunflower()
# plot_aiff_stft('./outputs/sunflowers-Alex-150-normal-589.aiff')
# plot_aiff_stft('./outputs/sunflowers-Alex-180-normal-4763.aiff')
# plot_aiff_stft('./outputs/sunflowers-Victoria-180-normal-870.aiff')
# plot_aiff_stft('./outputs/sunflowers-Fred-180-phoneme-9733.aiff')
# plot_aiff_stft('./outputs/sunflowers-Fred-180-normal-6515.aiff')
plotstft('./outputs/sunflowers-Alex-150-normal-589.aiff')
plotstft('./outputs/sunflowers-Alex-180-normal-4763.aiff')
plotstft('./outputs/sunflowers-Victoria-180-normal-870.aiff')
plotstft('./outputs/sunflowers-Fred-180-phoneme-9733.aiff')
plotstft('./outputs/sunflowers-Fred-180-normal-6515.aiff')

View File

@ -3,111 +3,135 @@ import numpy as np
from spectro_gen import generate_aiff_spectrogram
from sklearn.model_selection import train_test_split
import itertools
import gc
import pickle,gc
def get_siamese_pairs(groupF1, groupF2):
group1 = [r for (i, r) in groupF1.iterrows()]
group2 = [r for (i, r) in groupF2.iterrows()]
f = [(g1, g2) for g2 in group2 for g1 in group1]
t = [i for i in itertools.combinations(group1, 2)
] + [i for i in itertools.combinations(group2, 2)]
return (t, f)
def create_X(sp, max_samples):
def sunflower_data():
audio_samples = pd.read_csv('./outputs/audio.csv',names=['word','voice','rate','variant','file'])
sunflowers = audio_samples.loc[audio_samples['word'] == 'sunflowers'].reset_index(drop=True)
sunflowers.loc[:,'file'] = sunflowers.loc[:,'file'].apply(lambda x:'outputs/'+x).apply(generate_aiff_spectrogram)
y_data = sunflowers['variant'].apply(lambda x:x=='normal').values
max_samples = sunflowers['file'].apply(lambda x:x.shape[0]).max()
sample_size = sunflowers['file'][0].shape[1]
sample_count = sunflowers['file'].shape[0]
sunflowers['file'][0].shape[0]
def append_zeros(spgr):
return np.lib.pad(spgr, [(0, max_samples - spgr.shape[0]), (0, 0)],
'median')
l_sample = append_zeros(sp[0]['spectrogram'])
r_sample = append_zeros(sp[1]['spectrogram'])
return np.asarray([l_sample, r_sample])
orig = spgr.shape[0]
return np.lib.pad(spgr,[(0, max_samples-orig), (0,0)],'median')
pad_sun = sunflowers['file'].apply(append_zeros).values
x_data = np.vstack(pad_sun).reshape((sample_count,max_samples,sample_size,))
return train_test_split(x_data,y_data,test_size=0.33)
def get_siamese_pairs(groupF1,groupF2):
group1 = [r for (i,r) in groupF1.iterrows()]
group2 = [r for (i,r) in groupF2.iterrows()]
f = [(g1,g2) for g2 in group2 for g1 in group1]
t = [i for i in itertools.combinations(group1,2)]+[i for i in itertools.combinations(group2,2)]
return (t,f)
def sunflower_pairs_data():
audio_samples = pd.read_csv(
'./outputs/audio.csv',
names=['word', 'voice', 'rate', 'variant', 'file'])
audio_samples = audio_samples.loc[audio_samples['word'] ==
'sunflowers'].reset_index(drop=True)
audio_samples.loc[:, 'spectrogram'] = audio_samples.loc[:, 'file'].apply(
lambda x: 'outputs/audio/' + x).apply(generate_aiff_spectrogram)
max_samples = audio_samples['spectrogram'].apply(
lambda x: x.shape[0]).max()
same_data, diff_data = [], []
for (w, g) in audio_samples.groupby(audio_samples['word']):
audio_samples = pd.read_csv('./outputs/audio.csv',names=['word','voice','rate','variant','file'])
audio_samples = audio_samples.loc[audio_samples['word'] == 'sunflowers'].reset_index(drop=True)
audio_samples.loc[:,'spectrogram'] = audio_samples.loc[:,'file'].apply(lambda x:'outputs/audio/'+x).apply(generate_aiff_spectrogram)
max_samples = audio_samples['spectrogram'].apply(lambda x:x.shape[0]).max()
sample_size = audio_samples['spectrogram'][0].shape[1]
same_data,diff_data = [],[]
for (w,g) in audio_samples.groupby(audio_samples['word']):
sample_norm = g.loc[audio_samples['variant'] == 'normal']
sample_phon = g.loc[audio_samples['variant'] == 'phoneme']
same, diff = get_siamese_pairs(sample_norm, sample_phon)
same , diff = get_siamese_pairs(sample_norm,sample_phon)
same_data.extend(same)
diff_data.extend(diff)
Y = np.hstack([np.ones(len(same_data)), np.zeros(len(diff_data))])
X_sample_pairs = same_data + diff_data
X_list = (create_X(sp, max_samples) for sp in X_sample_pairs)
Y = np.hstack([np.ones(len(same_data)),np.zeros(len(diff_data))])
X_sample_pairs = same_data+diff_data
def append_zeros(spgr):
sample = np.lib.pad(spgr,[(0, max_samples-spgr.shape[0]), (0,0)],'median')
return np.expand_dims(sample,axis=0)
def create_X(sp):
# sample_count = sp[0]['file'].shape[0]
l_sample = append_zeros(sp[0]['spectrogram'])
r_sample = append_zeros(sp[1]['spectrogram'])#.apply(append_zeros).values
# x_data = np.vstack(pad_sun).reshape((sample_count,max_samples,sample_size))
return np.expand_dims(np.vstack([l_sample,r_sample]),axis=0)
X_list = (create_X(sp) for sp in X_sample_pairs)
X = np.vstack(X_list)
tr_pairs, te_pairs, tr_y, te_y = train_test_split(X, Y, test_size=0.1)
return train_test_split(X, Y, test_size=0.1)
tr_pairs,te_pairs,tr_y,te_y = train_test_split(X,Y,test_size=0.1)
return train_test_split(X,Y,test_size=0.1)
def create_spectrogram_data(audio_group='audio'):
audio_samples = pd.read_csv(
'./outputs/' + audio_group + '.csv',
names=['word', 'voice', 'rate', 'variant', 'file'])
# audio_samples = audio_samples.loc[audio_samples['word'] ==
# 'sunflowers'].reset_index(drop=True)
audio_samples.loc[:, 'spectrogram'] = audio_samples.loc[:, 'file'].apply(
lambda x: 'outputs/' + audio_group + '/' + x).apply(
generate_aiff_spectrogram)
audio_samples = pd.read_csv('./outputs/'+audio_group+'.csv',names=['word','voice','rate','variant','file'])
# audio_samples = audio_samples.loc[audio_samples['word'] == 'sunflowers'].reset_index(drop=True)
audio_samples.loc[:,'spectrogram'] = audio_samples.loc[:,'file'].apply(lambda x:'outputs/'+audio_group+'/'+x).apply(generate_aiff_spectrogram)
audio_samples.to_pickle('outputs/spectrogram.pkl')
def create_speech_pairs_data(audio_group='audio'):
audio_samples = pd.read_pickle('outputs/spectrogram.pkl')
max_samples = audio_samples['spectrogram'].apply(
lambda x: x.shape[0]).max()
# sample_size = audio_samples['spectrogram'][0].shape[1]
max_samples = audio_samples['spectrogram'].apply(lambda x:x.shape[0]).max()
sample_size = audio_samples['spectrogram'][0].shape[1]
def append_zeros(spgr):
sample = np.lib.pad(spgr,[(0, max_samples-spgr.shape[0]), (0,0)],'median')
return sample
def create_X(sp):
l_sample = append_zeros(sp[0]['spectrogram'])
r_sample = append_zeros(sp[1]['spectrogram'])
return np.asarray([l_sample,r_sample])
print('generating siamese speech pairs')
same_data, diff_data = [], []
for (w, g) in audio_samples.groupby(audio_samples['word']):
sample_norm = g.loc[audio_samples['variant'] == 'normal']
sample_phon = g.loc[audio_samples['variant'] == 'phoneme']
same, diff = get_siamese_pairs(sample_norm, sample_phon)
same_data.extend([create_X(s, max_samples) for s in same[:10]])
diff_data.extend([create_X(d, max_samples) for d in diff[:10]])
same_data,diff_data = [],[]
for (w,g) in audio_samples.groupby(audio_samples['word']):
sample_norm = g.loc[audio_samples['variant'] == 'normal']#.reset_index(drop=True)
sample_phon = g.loc[audio_samples['variant'] == 'phoneme']#.reset_index(drop=True)
same , diff = get_siamese_pairs(sample_norm,sample_phon)
same_data.extend([create_X(s) for s in same[:10]])
diff_data.extend([create_X(d) for d in diff[:10]])
print('creating all speech pairs')
Y = np.hstack([np.ones(len(same_data)), np.zeros(len(diff_data))])
Y = np.hstack([np.ones(len(same_data)),np.zeros(len(diff_data))])
print('casting as array speech pairs')
X = np.asarray(same_data + diff_data)
X = np.asarray(same_data+diff_data)
print('pickling X/Y')
np.save('outputs/X.npy', X)
np.save('outputs/Y.npy', Y)
del same_data
del diff_data
np.save('outputs/X.npy',X)
np.save('outputs/Y.npy',Y)
del X
gc.collect()
print('train/test splitting speech pairs')
tr_pairs, te_pairs, tr_y, te_y = train_test_split(X, Y, test_size=0.1)
tr_pairs,te_pairs,tr_y,te_y = train_test_split(X,Y,test_size=0.1)
print('pickling train/test')
np.save('outputs/tr_pairs.npy', tr_pairs)
np.save('outputs/te_pairs.npy', te_pairs)
np.save('outputs/tr_y.npy', tr_y)
np.save('outputs/te_y.npy', te_y)
np.save('outputs/tr_pairs.npy',tr_pairs)
np.save('outputs/te_pairs.npy',te_pairs)
np.save('outputs/tr_y.npy',tr_y)
np.save('outputs/te_y.npy',te_y)
# def create_speech_model_data():
# (max_samples,sample_size) = pickle.load(open('./spectrogram_vars.pkl','rb'))
# x_data_pos = np.load('outputs/x_data_pos.npy')
# x_data_neg = np.load('outputs/x_data_neg.npy')
# x_pos_train, x_pos_test, x_neg_train, x_neg_test =train_test_split(x_data_pos,x_data_neg,test_size=0.1)
# del x_data_pos
# del x_data_neg
# gc.collect()
# print('split train and test')
# tr_y = np.array(x_pos_train.shape[0]*[1])
# te_y = np.array(x_pos_test.shape[0]*[[1,0]])
# tr_pairs = np.array([x_pos_train,x_neg_train]).reshape(x_pos_train.shape[0],2,max_samples,sample_size)
# te_pairs = np.array([x_pos_test,x_neg_test]).reshape(x_pos_test.shape[0],2,max_samples,sample_size)
# print('reshaped to input dim')
# np.save('outputs/tr_pairs.npy',tr_pairs)
# np.save('outputs/te_pairs.npy',te_pairs)
# np.save('outputs/tr_y.npy',tr_y)
# np.save('outputs/te_y.npy',te_y)
# print('pickled speech model data')
def speech_model_data():
tr_pairs = np.load('outputs/tr_pairs.npy') / 255.0
te_pairs = np.load('outputs/te_pairs.npy') / 255.0
tr_pairs = np.load('outputs/tr_pairs.npy')/255.0
te_pairs = np.load('outputs/te_pairs.npy')/255.0
tr_pairs[tr_pairs < 0] = 0
te_pairs[te_pairs < 0] = 0
tr_y = np.load('outputs/tr_y.npy')
te_y = np.load('outputs/te_y.npy')
return tr_pairs, te_pairs, tr_y, te_y
return tr_pairs,te_pairs,tr_y,te_y
if __name__ == '__main__':
# sunflower_pairs_data()
# create_spectrogram_data()
#create_spectrogram_data()
create_speech_pairs_data()
# print(speech_model_data())

View File

@ -1,18 +1,19 @@
from __future__ import absolute_import
from __future__ import print_function
import numpy as np
from speech_data import speech_model_data
from keras.models import Model,load_model
from keras.models import Model
from keras.layers import Input, Dense, Dropout, LSTM, Lambda
from keras.optimizers import RMSprop
from keras.optimizers import RMSprop, SGD
from keras.callbacks import TensorBoard, ModelCheckpoint
from keras import backend as K
def euclidean_distance(vects):
x, y = vects
return K.sqrt(
K.maximum(K.sum(K.square(x - y), axis=1, keepdims=True), K.epsilon()))
return K.sqrt(K.maximum(K.sum(K.square(x - y), axis=1, keepdims=True),
K.epsilon()))
def eucl_dist_output_shape(shapes):
@ -63,86 +64,53 @@ def accuracy(y_true, y_pred):
return K.mean(K.equal(y_true, K.cast(y_pred < 0.5, y_true.dtype)))
def train_siamese():
# the data, shuffled and split between train and test sets
tr_pairs, te_pairs, tr_y, te_y = speech_model_data()
input_dim = (tr_pairs.shape[2], tr_pairs.shape[3])
# the data, shuffled and split between train and test sets
tr_pairs, te_pairs, tr_y, te_y = speech_model_data()
input_dim = (tr_pairs.shape[2], tr_pairs.shape[3])
# network definition
base_network = create_base_rnn_network(input_dim)
input_a = Input(shape=input_dim)
input_b = Input(shape=input_dim)
# network definition
base_network = create_base_rnn_network(input_dim)
input_a = Input(shape=input_dim)
input_b = Input(shape=input_dim)
# because we re-use the same instance `base_network`,
# the weights of the network
# will be shared across the two branches
processed_a = base_network(input_a)
processed_b = base_network(input_b)
# because we re-use the same instance `base_network`,
# the weights of the network
# will be shared across the two branches
processed_a = base_network(input_a)
processed_b = base_network(input_b)
distance = Lambda(
euclidean_distance,
output_shape=eucl_dist_output_shape)([processed_a, processed_b])
distance = Lambda(euclidean_distance,
output_shape=eucl_dist_output_shape)(
[processed_a, processed_b]
)
model = Model([input_a, input_b], distance)
model = Model([input_a, input_b], distance)
tb_cb = TensorBoard(
log_dir='./logs/siamese_logs',
histogram_freq=1,
batch_size=32,
write_graph=True,
write_grads=True,
write_images=True,
embeddings_freq=0,
embeddings_layer_names=None,
embeddings_metadata=None)
cp_file_fmt = './models/siamese_speech_model-{epoch:02d}-epoch-{val_acc:0.2f}\
-acc.h5'
tb_cb = TensorBoard(log_dir='./logs/siamese_logs', histogram_freq=1,
batch_size=32, write_graph=True, write_grads=True,
write_images=True, embeddings_freq=0,
embeddings_layer_names=None, embeddings_metadata=None)
cp_file_fmt = './models/siamese_speech_model-{epoch:02d}-epoch-{val_acc:0.2f}\
-acc.h5'
cp_cb = ModelCheckpoint(cp_file_fmt, monitor='val_acc', verbose=0,
save_best_only=False, save_weights_only=False,
mode='auto', period=1)
# train
rms = RMSprop(lr=0.001)
sgd = SGD(lr=0.001)
model.compile(loss=contrastive_loss, optimizer=rms, metrics=[accuracy])
model.fit([tr_pairs[:, 0], tr_pairs[:, 1]], tr_y,
batch_size=128,
epochs=50,
validation_data=([te_pairs[:, 0], te_pairs[:, 1]], te_y),
callbacks=[tb_cb, cp_cb])
cp_cb = ModelCheckpoint(
cp_file_fmt,
monitor='val_acc',
verbose=0,
save_best_only=False,
save_weights_only=False,
mode='auto',
period=1)
# train
rms = RMSprop(lr=0.001)
model.compile(loss=contrastive_loss, optimizer=rms, metrics=[accuracy])
model.fit(
[tr_pairs[:, 0], tr_pairs[:, 1]],
tr_y,
batch_size=128,
epochs=50,
validation_data=([te_pairs[:, 0], te_pairs[:, 1]], te_y),
callbacks=[tb_cb, cp_cb])
model.save('./models/siamese_speech_model-final.h5')
# compute final accuracy on training and test sets
y_pred = model.predict([tr_pairs[:, 0], tr_pairs[:, 1]])
tr_acc = compute_accuracy(tr_y, y_pred)
y_pred = model.predict([te_pairs[:, 0], te_pairs[:, 1]])
te_acc = compute_accuracy(te_y, y_pred)
model.save('./models/siamese_speech_model-final.h5')
# compute final accuracy on training and test sets
y_pred = model.predict([tr_pairs[:, 0], tr_pairs[:, 1]])
tr_acc = compute_accuracy(tr_y, y_pred)
y_pred = model.predict([te_pairs[:, 0], te_pairs[:, 1]])
te_acc = compute_accuracy(te_y, y_pred)
print('* Accuracy on training set: %0.2f%%' % (100 * tr_acc))
print('* Accuracy on test set: %0.2f%%' % (100 * te_acc))
def trained_siamese_model():
# input_dim = (15, 1654)
# base_network = create_base_rnn_network(input_dim)
# input_a = Input(shape=input_dim)
# input_b = Input(shape=input_dim)
# processed_a = base_network(input_a)
# processed_b = base_network(input_b)
# distance = Lambda(
# euclidean_distance,
# output_shape=eucl_dist_output_shape)([processed_a, processed_b])
#
# model = Model([input_a, input_b], distance)
model = load_model('./models/siamese_speech_model-final.h5')
return model
if __name__ == '__main__':
train_siamese()
print('* Accuracy on training set: %0.2f%%' % (100 * tr_acc))
print('* Accuracy on test set: %0.2f%%' % (100 * te_acc))

View File

@ -1,7 +0,0 @@
# from speech_siamese import trained_siamese_model
from keras.models import load_model
from record_mic_speech import record_spectrogram
model = load_model('./models/siamese_speech_model-final.h5')
spec1 = record_spectrogram(n_sec=1.2)
spec2 = record_spectrogram(n_sec=1.2)

View File

@ -1,42 +1,29 @@
import objc
from AppKit import NSSpeechSynthesizer, NSSpeechInputModeProperty
from AppKit import NSSpeechModePhoneme
from Foundation import NSURL
from AppKit import NSSpeechSynthesizer,NSSpeechInputModeProperty,NSSpeechModePhoneme
from Foundation import NSURL,NSError,NSObject
import json
import random
import os
import re
import subprocess
OUTPUT_NAME = 'audio'
dest_dir = os.path.abspath('.') + '/outputs/' + OUTPUT_NAME + '/'
dest_file = './outputs/' + OUTPUT_NAME + '.csv'
dest_dir = os.path.abspath('.')+'/outputs/'+OUTPUT_NAME+'/'
dest_file = './outputs/'+OUTPUT_NAME+'.csv'
def create_dir(direc):
if not os.path.exists(direc):
os.mkdir(direc)
dest_filename = lambda n,v,r,t: '{}-{}-{}-{}-'.format(n,v,r,t)+str(random.randint(0,10000))+'.aiff'
dest_path = lambda v,r,n: dest_dir+v+'/'+r+'/'+n
dest_url = lambda p: NSURL.fileURLWithPath_(p)
def dest_filename(n, v, r, t):
return '{}-{}-{}-{}-'.format(n, v, r,
t) + str(random.randint(0, 10000)) + '.aiff'
def dest_path(v, r, n):
return dest_dir + v + '/' + r + '/' + n
def cli_gen_audio(speech_cmd, rate, voice, out_path):
subprocess.call(
['say', '-v', voice, '-r',
str(rate), '-o', out_path, speech_cmd])
def cli_gen_audio(speech_cmd,rate,voice,out_path):
subprocess.call(['say','-v',voice,'-r',str(rate),'-o',out_path,speech_cmd])
class SynthFile(object):
"""docstring for SynthFile."""
def __init__(self, word, phon, filename, voice, rate, operation):
def __init__(self,word,phon, filename,voice,rate,operation):
super(SynthFile, self).__init__()
self.word = word
self.phoneme = phon
@ -46,114 +33,91 @@ class SynthFile(object):
self.variant = operation
def get_json(self):
return {
'filename': self.filename,
'voice': self.voice,
'rate': self.rate,
'operation': self.operation
}
return {'filename':self.filename,'voice':self.voice,
'rate':self.rate,'operation':self.operation}
def get_csv(self):
return '{},{},{},{},{}\n'.format(self.word, self.phoneme, self.voice,
self.rate, self.variant,
self.filename)
return '{},{},{},{},{}\n'.format(self.word,self.phoneme,self.voice,self.rate,self.variant,self.filename)
class SynthVariant(object):
"""docstring for SynthVariant."""
def __init__(self, identifier, rate):
def __init__(self,identifier,rate):
super(SynthVariant, self).__init__()
self.synth = NSSpeechSynthesizer.alloc().initWithVoice_(identifier)
self.synth.setVolume_(100)
self.synth.setRate_(rate)
self.phone_synth = NSSpeechSynthesizer.alloc().initWithVoice_(
identifier)
self.phone_synth = NSSpeechSynthesizer.alloc().initWithVoice_(identifier)
self.phone_synth.setVolume_(100)
self.phone_synth.setRate_(rate)
self.phone_synth.setObject_forProperty_error_(
NSSpeechModePhoneme, NSSpeechInputModeProperty, None)
self.phone_synth.setObject_forProperty_error_(NSSpeechModePhoneme,NSSpeechInputModeProperty,None)
self.identifier = identifier
self.rate = rate
self.name = identifier.split('.')[-1]
def __repr__(self):
return 'Synthesizer[{} - {}]({})'.format(self.name, self.rate)
return 'Synthesizer[{} - {}]({})'.format(self.name,self.rate)
def generate_audio(self, word, variant):
orig_phon, phoneme, phon_cmd = self.synth.phonemesFromText_(
word), '', word
def generate_audio(self,word,variant):
orig_phon,phoneme,phon_cmd = self.synth.phonemesFromText_(word),'',word
if variant == 'low':
# self.synth.startSpeakingString_toURL_(word,d_url)
phoneme = orig_phon
elif variant == 'medium':
phoneme = re.sub('[0-9]', '', orig_phon)
phon_cmd = '[[inpt PHON]] ' + phoneme
phoneme = re.sub('[0-9]','',orig_phon)
phon_cmd = '[[inpt PHON]] '+phoneme
elif variant == 'high':
phoneme = orig_phon
phon_cmd = word
# elif variant == 'long':
# if phon != '':
# self.phone_synth.startSpeakingString_toURL_(phon,d_url)
# else:
# self.synth.startSpeakingString_toURL_(word,d_url)
fname = dest_filename(word, phoneme, self.name, self.rate)
d_path = dest_path(self.name, self.rate, fname)
# d_url = NSURL.fileURLWithPath_(d_path)
cli_gen_audio(phon_cmd, self.rate, self.name, d_path)
return SynthFile(word, phoneme, fname, self.name, self.rate, variant)
# if phon != '':
# self.phone_synth.startSpeakingString_toURL_(phon,d_url)
# else:
# self.synth.startSpeakingString_toURL_(word,d_url)
fname = dest_filename(word,phoneme,self.name,self.rate)
d_path = dest_path(self.name,self.rate,fname)
d_url = dest_url(d_path)
cli_gen_audio(phon_cmd,self.rate,self.name,d_path)
return SynthFile(word,phoneme,fname,self.name,self.rate,variant)
def synth_generator():
voices_installed = NSSpeechSynthesizer.availableVoices()
voice_attrs = [
NSSpeechSynthesizer.attributesForVoice_(v) for v in voices_installed
]
us_voices_ids = [
v['VoiceIdentifier'] for v in voice_attrs
if v['VoiceLanguage'] == 'en-US'
and v['VoiceIdentifier'].split('.')[-1][0].isupper()
]
# us_voices_ids = ['com.apple.speech.synthesis.voice.Fred',
# 'com.apple.speech.synthesis.voice.Alex',
voice_attrs = [NSSpeechSynthesizer.attributesForVoice_(v) for v in voices_installed]
us_voices_ids = [v['VoiceIdentifier'] for v in voice_attrs if v['VoiceLanguage'] == 'en-US' and v['VoiceIdentifier'].split('.')[-1][0].isupper()]
# us_voices_ids = ['com.apple.speech.synthesis.voice.Fred','com.apple.speech.synthesis.voice.Alex',
# 'com.apple.speech.synthesis.voice.Victoria']
# voice_rates = list(range(150,221,(220-180)//4))
voice_rates = [150, 180, 210, 250]
voice_rates = [150,180,210,250]
voice_synths = []
create_dir(dest_dir)
for v in us_voices_ids:
for r in voice_rates:
create_dir(dest_dir + v + '/' + r)
voice_synths.append(SynthVariant(v, r))
create_dir(dest_dir+v+'/'+r)
voice_synths.append(SynthVariant(v,r))
def synth_for_words(words):
all_synths = []
for w in words:
for s in voice_synths:
for v in ['low', 'medium', 'high']:
all_synths.append(s.generate_audio(w, v))
for v in ['low','medium','high']:
all_synths.append(s.generate_audio(w,v))
return all_synths
return synth_for_words
def write_synths(synth_list, fname, csv=False):
f = open(fname, 'w')
def write_synths(synth_list,fname,csv=False):
f = open(fname,'w')
if csv:
for s in synth_list:
f.write(s.get_csv())
else:
json.dump([s.get_json() for s in synth_list], f)
json.dump([s.get_json() for s in synth_list],f)
f.close()
def generate_audio_for_stories():
stories_data = json.load(open('./inputs/all_stories_hs.json'))
word_list = [t[0] for i in stories_data.values() for t in i]
words_audio_synth = synth_generator()
return words_audio_synth(word_list)
# words_audio_synth = synth_generator()
# synth = NSSpeechSynthesizer.alloc().init()
# voices_installed = NSSpeechSynthesizer.availableVoices()
@ -167,5 +131,5 @@ def generate_audio_for_stories():
synths = synth_generator()([OUTPUT_NAME])
# synths = generate_audio_for_stories()
write_synths(synths, dest_file, True)
write_synths(synths,dest_file,True)
# write_synths(synths,'./outputs/synths.json')