1. implemented spectrogram generator for audio files

2. imported siamese network class (wip)
3. added similarity measure based phoneme neighbor generator
4. fixed samplegen variants code
5. create triplets (wip)
6. updates
master
Malar Kannan 2017-10-13 16:40:57 +05:30
parent 258356780f
commit dccbec7cba
8 changed files with 286 additions and 35 deletions

1
.gitignore vendored
View File

@ -138,3 +138,4 @@ Temporary Items
# End of https://www.gitignore.io/api/macos # End of https://www.gitignore.io/api/macos
outputs/* outputs/*
inputs/mnist

4
TODO.md Normal file
View File

@ -0,0 +1,4 @@
0. generate the samples of phoneme similarity variants.
1. create spectrograms of 150ms windows with 50ms overlap for each word.
2. train a rnn to output a vector using the spectrograms
3. train a nn to output True/False based on the acceptability of the rnn output. -> Siamese network(implementation detail)

View File

@ -6,5 +6,5 @@ word_goups = audio_file.groupby('word')
lst = [1, 2, 3, 1, 2, 3] lst = [1, 2, 3, 1, 2, 3]
s = pd.Series([1, 2, 3, 10, 20, 30], lst) s = pd.Series([1, 2, 3, 10, 20, 30], lst)
df3 = pd.DataFrame({'X' : ['A', 'B', 'A', 'B'], 'Y' : [1, 4, 3, 2]}) df3 = pd.DataFrame({'X' : ['A', 'B', 'A', 'B'], 'Y' : [1, 4, 3, 2]})
df3
s.groupby(level=0).sum() s.groupby(level=0).sum()

View File

@ -38,11 +38,40 @@ UW UW
V v V v
W w W w
Y y Y y
X x
Z z Z z
ZH Z ZH Z
""".strip().split('\n')} """.strip().split('\n')}
mapping
sim_mat = pd.read_csv('./similarity.csv',header=0,index_col=0) sim_mat = pd.read_csv('./similarity.csv',header=0,index_col=0)
[mapping[re.sub('[0-9]','',i)] for i in sim_mat.index.tolist()]
# sim_mat.loc def convert_ph(ph):
stress_level = re.search("(\w+)([0-9])",ph)
if stress_level:
return stress_level.group(2)+mapping[stress_level.group(1)]
else:
return mapping[ph]
def sim_mat_to_apple_table(smt):
colnames = [convert_ph(ph) for ph in smt.index.tolist()]
smt = pd.DataFrame(np.nan_to_num(smt.values))
fsmt = (smt.T+smt)
np.fill_diagonal(fsmt.values,100.0)
asmt = pd.DataFrame.copy(fsmt)
asmt.columns = colnames
asmt.index = colnames
apple_sim_lookup = asmt.stack().reset_index()
apple_sim_lookup.columns = ['q','r','s']
return apple_sim_lookup
apple_sim_lookup = sim_mat_to_apple_table(sim_mat)
def top_match(ph):
selected = apple_sim_lookup[(apple_sim_lookup.q == ph) & (apple_sim_lookup.s < 100) & (apple_sim_lookup.s >= 70)]
tm = ph
if len(selected) > 0:
tm = pd.DataFrame.sort_values(selected,'s',ascending=False).iloc[0].r
return tm
def similar_phoneme(ph_str):
return ph_str

90
siamese_network.py Normal file
View File

@ -0,0 +1,90 @@
import tensorflow as tf
import numpy as np
class SiameseLSTM(object):
"""
A LSTM based deep Siamese network for text similarity.
Uses an character embedding layer, followed by a biLSTM and Energy Loss layer.
"""
def BiRNN(self, x, dropout, scope, embedding_size, sequence_length):
n_input=embedding_size
n_steps=sequence_length
n_hidden=n_steps
n_layers=3
# Prepare data shape to match `bidirectional_rnn` function requirements
# Current data input shape: (batch_size, n_steps, n_input) (?, seq_len, embedding_size)
# Required shape: 'n_steps' tensors list of shape (batch_size, n_input)
# Permuting batch_size and n_steps
x = tf.transpose(x, [1, 0, 2])
# Reshape to (n_steps*batch_size, n_input)
x = tf.reshape(x, [-1, n_input])
print(x)
# Split to get a list of 'n_steps' tensors of shape (batch_size, n_input)
x = tf.split(x, n_steps, 0)
print(x)
# Define lstm cells with tensorflow
# Forward direction cell
with tf.name_scope("fw"+scope),tf.variable_scope("fw"+scope):
stacked_rnn_fw = []
for _ in range(n_layers):
fw_cell = tf.nn.rnn_cell.BasicLSTMCell(n_hidden, forget_bias=1.0, state_is_tuple=True)
lstm_fw_cell = tf.contrib.rnn.DropoutWrapper(fw_cell,output_keep_prob=dropout)
stacked_rnn_fw.append(lstm_fw_cell)
lstm_fw_cell_m = tf.nn.rnn_cell.MultiRNNCell(cells=stacked_rnn_fw, state_is_tuple=True)
with tf.name_scope("bw"+scope),tf.variable_scope("bw"+scope):
stacked_rnn_bw = []
for _ in range(n_layers):
bw_cell = tf.nn.rnn_cell.BasicLSTMCell(n_hidden, forget_bias=1.0, state_is_tuple=True)
lstm_bw_cell = tf.contrib.rnn.DropoutWrapper(bw_cell,output_keep_prob=dropout)
stacked_rnn_bw.append(lstm_bw_cell)
lstm_bw_cell_m = tf.nn.rnn_cell.MultiRNNCell(cells=stacked_rnn_bw, state_is_tuple=True)
# Get lstm cell output
with tf.name_scope("bw"+scope),tf.variable_scope("bw"+scope):
outputs, _, _ = tf.nn.static_bidirectional_rnn(lstm_fw_cell_m, lstm_bw_cell_m, x, dtype=tf.float32)
return outputs[-1]
def contrastive_loss(self, y,d,batch_size):
tmp= y *tf.square(d)
#tmp= tf.mul(y,tf.square(d))
tmp2 = (1-y) *tf.square(tf.maximum((1 - d),0))
return tf.reduce_sum(tmp +tmp2)/batch_size/2
def __init__(
self, sequence_length, vocab_size, embedding_size, hidden_units, l2_reg_lambda, batch_size):
# Placeholders for input, output and dropout
self.input_x1 = tf.placeholder(tf.int32, [None, sequence_length], name="input_x1")
self.input_x2 = tf.placeholder(tf.int32, [None, sequence_length], name="input_x2")
self.input_y = tf.placeholder(tf.float32, [None], name="input_y")
self.dropout_keep_prob = tf.placeholder(tf.float32, name="dropout_keep_prob")
# Keeping track of l2 regularization loss (optional)
l2_loss = tf.constant(0.0, name="l2_loss")
# Embedding layer
with tf.name_scope("embedding"):
self.W = tf.Variable(
tf.random_uniform([vocab_size, embedding_size], -1.0, 1.0),
trainable=True,name="W")
self.embedded_chars1 = tf.nn.embedding_lookup(self.W, self.input_x1)
#self.embedded_chars_expanded1 = tf.expand_dims(self.embedded_chars1, -1)
self.embedded_chars2 = tf.nn.embedding_lookup(self.W, self.input_x2)
#self.embedded_chars_expanded2 = tf.expand_dims(self.embedded_chars2, -1)
# Create a convolution + maxpool layer for each filter size
with tf.name_scope("output"):
self.out1=self.BiRNN(self.embedded_chars1, self.dropout_keep_prob, "side1", embedding_size, sequence_length)
self.out2=self.BiRNN(self.embedded_chars2, self.dropout_keep_prob, "side2", embedding_size, sequence_length)
self.distance = tf.sqrt(tf.reduce_sum(tf.square(tf.subtract(self.out1,self.out2)),1,keep_dims=True))
self.distance = tf.div(self.distance, tf.add(tf.sqrt(tf.reduce_sum(tf.square(self.out1),1,keep_dims=True)),tf.sqrt(tf.reduce_sum(tf.square(self.out2),1,keep_dims=True))))
self.distance = tf.reshape(self.distance, [-1], name="distance")
with tf.name_scope("loss"):
self.loss = self.contrastive_loss(self.input_y,self.distance, batch_size)
#### Accuracy computation is outside of this class.
with tf.name_scope("accuracy"):
self.temp_sim = tf.subtract(tf.ones_like(self.distance),tf.rint(self.distance), name="temp_sim") #auto threshold 0.5
correct_predictions = tf.equal(self.temp_sim, self.input_y)
self.accuracy=tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")

12
snippets.py Normal file
View File

@ -0,0 +1,12 @@
# import scipy.signal as sg
# import pysndfile.sndio as snd
#
# snd_data,samples,_ = snd.read('./outputs/sunflowers-Alex-150-normal-589.aiff')
# samples_per_seg = 3*int(samples*150/(3*1000))
# # samples/(len(snd_data)*1000.0)
# len(snd_data)
# samples_per_seg/2
#
# len(sg.spectrogram(snd_data,nperseg=samples_per_seg,noverlap=samples_per_seg/3)[2])
#
# from spectro_gen import generate_aiff_spectrogram

110
spectro_gen.py Normal file
View File

@ -0,0 +1,110 @@
#!/usr/bin/env python
#coding: utf-8
""" This work is licensed under a Creative Commons Attribution 3.0 Unported License.
Frank Zalkow, 2012-2013
http://www.frank-zalkow.de/en/code-snippets/create-audio-spectrograms-with-python.html?i=1
"""
# %matplotlib inline
import numpy as np
from matplotlib import pyplot as plt
from pysndfile import sndio as snd
from numpy.lib import stride_tricks
""" short time fourier transform of audio signal """
def stft(sig, frameSize, overlapFac=0.5, window=np.hanning):
win = window(frameSize)
hopSize = int(frameSize - np.floor(overlapFac * frameSize))
# zeros at beginning (thus center of 1st window should be for sample nr. 0)
# sig = (sig*255).astype(np.uint8)
# import pdb;pdb.set_trace()
count = int(np.floor(frameSize/2.0))
# import pdb;pdb.set_trace()
samples = np.append(np.zeros(count), sig)
# cols for windowing
cols = int(np.ceil( (len(samples) - frameSize) / float(hopSize)) + 1)
# zeros at end (thus samples can be fully covered by frames)
samples = np.append(samples, np.zeros(frameSize))
frames = stride_tricks.as_strided(samples, shape=(cols, frameSize), strides=(samples.strides[0]*hopSize, samples.strides[0])).copy()
frames *= win
return np.fft.rfft(frames)
""" scale frequency axis logarithmically """
def logscale_spec(spec, sr=44100, factor=20.):
timebins, freqbins = np.shape(spec)
scale = np.linspace(0, 1, freqbins) ** factor
scale *= (freqbins-1)/max(scale)
scale = np.unique(np.round(scale)).astype(np.uint32)
# import pdb;pdb.set_trace()
# create spectrogram with new freq bins
newspec = np.complex128(np.zeros([timebins, len(scale)]))
for i in range(0, len(scale)):
if i == len(scale)-1:
newspec[:,i] = np.sum(spec[:,scale[i]:], axis=1)
else:
newspec[:,i] = np.sum(spec[:,scale[i]:scale[i+1]], axis=1)
# list center freq of bins
allfreqs = np.abs(np.fft.fftfreq(freqbins*2, 1./sr)[:freqbins+1])
freqs = []
for i in range(0, len(scale)):
if i == len(scale)-1:
freqs += [np.mean(allfreqs[scale[i]:])]
else:
freqs += [np.mean(allfreqs[scale[i]:scale[i+1]])]
return newspec, freqs
""" generate spectrogram for aiff audio with 150ms windows and 50ms overlap"""
def generate_aiff_spectrogram(audiopath):
samples,samplerate,_ = snd.read(audiopath)
# samplerate, samples = wav.read(audiopath)
# s = stft(samples, binsize)
s = stft(samples, samplerate*150/1000,1.0/3)
sshow, freq = logscale_spec(s, factor=1.0, sr=samplerate)
ims = 20.*np.log10(np.abs(sshow)/10e-6)
return ims
""" plot spectrogram"""
def plotstft(audiopath, binsize=2**10, plotpath=None, colormap="jet"):
samples,samplerate,_ = snd.read(audiopath)
# samplerate, samples = wav.read(audiopath)
# s = stft(samples, binsize)
s = stft(samples, samplerate*150/1000,1.0/3)
sshow, freq = logscale_spec(s, factor=1.0, sr=samplerate)
ims = 20.*np.log10(np.abs(sshow)/10e-6) # amplitude to decibel
timebins, freqbins = np.shape(ims)
# import pdb;pdb.set_trace()
plt.figure(figsize=(15, 7.5))
plt.imshow(np.transpose(ims), origin="lower", aspect="auto", cmap=colormap, interpolation="none")
plt.colorbar()
plt.xlabel("time (s)")
plt.ylabel("frequency (hz)")
plt.xlim([0, timebins-1])
plt.ylim([0, freqbins])
xlocs = np.float32(np.linspace(0, timebins-1, 5))
plt.xticks(xlocs, ["%.02f" % l for l in ((xlocs*len(samples)/timebins)+(0.5*binsize))/samplerate])
ylocs = np.int16(np.round(np.linspace(0, freqbins-1, 10)))
plt.yticks(ylocs, ["%.02f" % freq[i] for i in ylocs])
if plotpath:
plt.savefig(plotpath, bbox_inches="tight")
else:
plt.show()
plt.clf()
if __name__ == '__main__':
plotstft('./outputs/sunflowers-Alex-150-normal-589.aiff')
plotstft('./outputs/sunflowers-Alex-180-normal-4763.aiff')
plotstft('./outputs/sunflowers-Victoria-180-normal-870.aiff')
plotstft('./outputs/sunflowers-Fred-180-phoneme-9733.aiff')
plotstft('./outputs/sunflowers-Fred-180-normal-6515.aiff')

View File

@ -9,38 +9,39 @@ import subprocess
OUTPUT_NAME = 'audio' OUTPUT_NAME = 'audio'
def create_output_dir(): dest_dir = os.path.abspath('.')+'/outputs/'+OUTPUT_NAME+'/'
direc = os.path.abspath('.')+'/outputs/'+OUTPUT_NAME+'/' dest_file = './outputs/'+OUTPUT_NAME+'.csv'
def create_dir(direc):
if not os.path.exists(direc): if not os.path.exists(direc):
os.mkdir(direc) os.mkdir(direc)
create_output_dir()
dest_filename = lambda n,v,r,t: '{}-{}-{}-{}-'.format(n,v,r,t)+str(random.randint(0,10000))+'.aiff' dest_filename = lambda n,v,r,t: '{}-{}-{}-{}-'.format(n,v,r,t)+str(random.randint(0,10000))+'.aiff'
dest_path = lambda n: os.path.abspath('.')+'/outputs/'+OUTPUT_NAME+'/'+n dest_path = lambda v,r,n: dest_dir+v+'/'+r+'/'+n
dest_url = lambda p: NSURL.fileURLWithPath_(p) dest_url = lambda p: NSURL.fileURLWithPath_(p)
def cli_gen_audio(word,rate,voice,out_path): def cli_gen_audio(speech_cmd,rate,voice,out_path):
subprocess.call(['say','-v',voice,'-r',str(rate),'-o',out_path,word]) subprocess.call(['say','-v',voice,'-r',str(rate),'-o',out_path,speech_cmd])
class SynthFile(object): class SynthFile(object):
"""docstring for SynthFile.""" """docstring for SynthFile."""
def __init__(self,word, filename,voice,rate,operation): def __init__(self,word,phon, filename,voice,rate,operation):
super(SynthFile, self).__init__() super(SynthFile, self).__init__()
self.word = word self.word = word
self.phoneme = phon
self.filename = filename self.filename = filename
self.voice = voice self.voice = voice
self.rate = rate self.rate = rate
self.operation = operation self.variant = operation
def get_json(self): def get_json(self):
return {'filename':self.filename,'voice':self.voice, return {'filename':self.filename,'voice':self.voice,
'rate':self.rate,'operation':self.operation} 'rate':self.rate,'operation':self.operation}
def get_csv(self): def get_csv(self):
return '{},{},{},{},{}\n'.format(self.word,self.voice,self.rate,self.operation,self.filename) return '{},{},{},{},{}\n'.format(self.word,self.phoneme,self.voice,self.rate,self.variant,self.filename)
class SynthVariant(object): class SynthVariant(object):
"""docstring for SynthVariant.""" """docstring for SynthVariant."""
def __init__(self,identifier,rate,op): def __init__(self,identifier,rate):
super(SynthVariant, self).__init__() super(SynthVariant, self).__init__()
self.synth = NSSpeechSynthesizer.alloc().initWithVoice_(identifier) self.synth = NSSpeechSynthesizer.alloc().initWithVoice_(identifier)
self.synth.setVolume_(100) self.synth.setVolume_(100)
@ -52,28 +53,31 @@ class SynthVariant(object):
self.identifier = identifier self.identifier = identifier
self.rate = rate self.rate = rate
self.name = identifier.split('.')[-1] self.name = identifier.split('.')[-1]
self.operation = op
def __repr__(self): def __repr__(self):
return 'Synthesizer[{} - {}]({})'.format(self.name,self.rate,self.operation) return 'Synthesizer[{} - {}]({})'.format(self.name,self.rate)
def generate_audio(self,word): def generate_audio(self,word,variant):
fname = dest_filename(word,self.name,self.rate,self.operation) orig_phon,phoneme,phon_cmd = self.synth.phonemesFromText_(word),'',word
d_path = dest_path(fname) if variant == 'low':
d_url = dest_url(d_path) # self.synth.startSpeakingString_toURL_(word,d_url)
if self.operation == 'normal': phoneme = orig_phon
self.synth.startSpeakingString_toURL_(word,d_url) elif variant == 'medium':
# cli_gen_audio(word,self.rate,self.name,d_path) phoneme = re.sub('[0-9]','',orig_phon)
else: phon_cmd = '[[inpt PHON]] '+phoneme
orig_phon = self.synth.phonemesFromText_(word) elif variant == 'high':
phon = '[[inpt PHON]] '+re.sub('[0-9]','',orig_phon) phoneme = orig_phon
# phon = re.sub('[0-9]','',orig_phon) phon_cmd = word
cli_gen_audio(phon,self.rate,self.name,d_path) # elif variant == 'long':
# if phon != '': # if phon != '':
# self.phone_synth.startSpeakingString_toURL_(phon,d_url) # self.phone_synth.startSpeakingString_toURL_(phon,d_url)
# else: # else:
# self.synth.startSpeakingString_toURL_(word,d_url) # self.synth.startSpeakingString_toURL_(word,d_url)
return SynthFile(word,fname,self.name,self.rate,self.operation) fname = dest_filename(word,phoneme,self.name,self.rate)
d_path = dest_path(self.name,self.rate,fname)
d_url = dest_url(d_path)
cli_gen_audio(phon_cmd,self.rate,self.name,d_path)
return SynthFile(word,phoneme,fname,self.name,self.rate,variant)
def synth_generator(): def synth_generator():
@ -83,18 +87,19 @@ def synth_generator():
# us_voices_ids = ['com.apple.speech.synthesis.voice.Fred','com.apple.speech.synthesis.voice.Alex', # us_voices_ids = ['com.apple.speech.synthesis.voice.Fred','com.apple.speech.synthesis.voice.Alex',
# 'com.apple.speech.synthesis.voice.Victoria'] # 'com.apple.speech.synthesis.voice.Victoria']
# voice_rates = list(range(150,221,(220-180)//4)) # voice_rates = list(range(150,221,(220-180)//4))
voice_rates = [150,180,210] voice_rates = [150,180,210,250]
voice_synths = [] voice_synths = []
variants = ['normal','phoneme'] create_dir(dest_dir)
for v in us_voices_ids: for v in us_voices_ids:
for r in voice_rates: for r in voice_rates:
for o in variants: create_dir(dest_dir+v+'/'+r)
voice_synths.append(SynthVariant(v,r,o)) voice_synths.append(SynthVariant(v,r))
def synth_for_words(words): def synth_for_words(words):
all_synths = [] all_synths = []
for w in words: for w in words:
for s in voice_synths: for s in voice_synths:
all_synths.append(s.generate_audio(w)) for v in ['low','medium','high']:
all_synths.append(s.generate_audio(w,v))
return all_synths return all_synths
return synth_for_words return synth_for_words
@ -126,5 +131,5 @@ def generate_audio_for_stories():
synths = synth_generator()([OUTPUT_NAME]) synths = synth_generator()([OUTPUT_NAME])
# synths = generate_audio_for_stories() # synths = generate_audio_for_stories()
write_synths(synths,'./outputs/'+OUTPUT_NAME+'.csv',True) write_synths(synths,dest_file,True)
# write_synths(synths,'./outputs/synths.json') # write_synths(synths,'./outputs/synths.json')