diff --git a/speech_tts_queue.py b/speech_tts_queue.py index ecb8a8e..950a11a 100644 --- a/speech_tts_queue.py +++ b/speech_tts_queue.py @@ -3,63 +3,216 @@ from AppKit import * from Foundation import NSURL from PyObjCTools import AppHelper from time import time +import os +import sys +import random +import json +import csv +import subprocess +from tqdm import tqdm + +from speech_samplegen import SynthVariant, format_filename +from speech_tools import create_dir apple_phonemes = [ '%', '@', 'AE', 'EY', 'AO', 'AX', 'IY', 'EH', 'IH', 'AY', 'IX', 'AA', 'UW', 'UH', 'UX', 'OW', 'AW', 'OY', 'b', 'C', 'd', 'D', 'f', 'g', 'h', 'J', 'k', 'l', 'm', 'n', 'N', 'p', 'r', 's', 'S', 't', 'T', 'v', 'w', 'y', 'z', 'Z' ] -len(apple_phonemes) -speech_phoneme_data = [] +OUTPUT_NAME = 'test' + +dest_dir = os.path.abspath('.') + '/outputs/' + OUTPUT_NAME + '/' +csv_dest_file = os.path.abspath('.') + '/outputs/' + OUTPUT_NAME + '.csv' +create_dir(dest_dir) + + +def cli_gen_audio(speech_cmd, out_path): + subprocess.call( + ['say', '-o', out_path, "'" + speech_cmd + "'"]) + class SpeechDelegate (NSObject): def speechSynthesizer_willSpeakWord_ofString_(self, sender, word, text): '''Called automatically when the application has launched''' - print("Speaking word {} in sentence {}".format(word,text)) + # print("Speaking word {} in sentence {}".format(word,text)) + self.wordWillSpeak() - def speechSynthesizer_willSpeakPhoneme_(self,sender,phoneme): + def speechSynthesizer_willSpeakPhoneme_(self, sender, phoneme): phon_ch = apple_phonemes[phoneme] - # print('first',speech_phoneme_data) - # prev_time = speech_phoneme_data[-1][1] - # print('prev_time',prev_time) - speech_phoneme_data.append((phon_ch,time())) - print("phoneme boundary for {} time {}".format(phon_ch,time())) - # NSApp().terminate_(self) + self.phonemeWillSpeak(phon_ch) - def speechSynthesizer_didFinishSpeaking_(self,synth,didFinishSpeaking): - speech_phoneme_data.append(('%',time())) - print("finished speaking time {}".format(time())) - diff_time = [] - for i in range(len(speech_phoneme_data)-1): - dur = speech_phoneme_data[i+1][1] - speech_phoneme_data[i][1] - diff_time.append((speech_phoneme_data[i][0],dur)) - print(diff_time) + def speechSynthesizer_didFinishSpeaking_(self, synth, didFinishSpeaking): + if didFinishSpeaking: + self.completeCB() + + def setC_W_Ph_(self, completed, word, phoneme): + self.completeCB = completed + self.wordWillSpeak = word + self.phonemeWillSpeak = phoneme # del SpeechDelegate + + class Delegate (NSObject): def applicationDidFinishLaunching_(self, aNotification): '''Called automatically when the application has launched''' - print("Window, World!") + print("App Launched!") + generate_audio() - def windowWillClose_(self, aNotification): - '''Called automatically when the window is closed''' - print("Window has been closed") - # Terminate the application - NSApp().terminate_(self) + +class PhonemeTiming(object): + """docstring for PhonemeTiming.""" + + def __init__(self, phon, start): + super(PhonemeTiming, self).__init__() + self.phoneme = phon + self.start = start + self.fraction = 0 + self.duration = None + self.end = None + + def is_audible(self): + return self.phoneme not in ['%', '~'] + + def tune(self): + if self.is_audible(): + dur_ms = int(self.duration * 1000) + return '{} {{D {}}}'.format(self.phoneme, dur_ms) + else: + return '~' + + def __repr__(self): + return '[{}]({:0.4f})'.format(self.phoneme, self.fraction) + + @staticmethod + def to_tune(phone_ts): + tune_list = ['[[inpt TUNE]]'] + for ph in phone_ts: + tune_list.append(ph.tune()) + tune_list.append('[[inpt TEXT]]') + return '\n'.join(tune_list) + + +class SegData(object): + """docstring for SegData.""" + + def __init__(self, text, filename): + super(SegData, self).__init__() + self.text = text + self.tune = '' + self.filename = filename + self.segments = [] + + def csv_rows(self): + result = [] + s_tim = self.segments[0].start + for i in range(len(self.segments) - 1): + cs = self.segments[i] + # if cs.is_audible(): + ns = self.segments[i + 1] + row = [self.text, self.filename, cs.phoneme, ns.phoneme, + (cs.start - s_tim) * 1000, (cs.end - s_tim) * 1000] + result.append(row) + return result + + +class SynthesizerQueue(object): + """docstring for SynthesizerQueue.""" + + def __init__(self): + super(SynthesizerQueue, self).__init__() + self.synth = NSSpeechSynthesizer.alloc().init() + self.didComplete = None + q_delg = SpeechDelegate.alloc().init() + self.synth.setDelegate_(q_delg) + + def synth_complete(): + end_time = time() + for i in range(len(self.phoneme_timing)): + if i == len(self.phoneme_timing) - 1: + self.phoneme_timing[i].duration = end_time - \ + self.phoneme_timing[i].start + self.phoneme_timing[i].end = end_time + else: + self.phoneme_timing[i].duration = self.phoneme_timing[i + + 1].start - self.phoneme_timing[i].start + self.phoneme_timing[i].end = self.phoneme_timing[i + 1].start + + total_time = sum( + [i.duration for i in self.phoneme_timing if i.is_audible()]) + for ph in self.phoneme_timing: + if ph.is_audible(): + ph.fraction = ph.duration / total_time + if self.didComplete: + self.data.segments = self.phoneme_timing + self.data.tune = PhonemeTiming.to_tune(self.phoneme_timing) + self.didComplete(self.data) + + def will_speak_phoneme(phon): + phtm = PhonemeTiming(phon, time()) + self.phoneme_timing.append(phtm) + + def will_speak_word(): + pass + # coz it comes after the first phoneme of the word is started + # phtm = PhonemeTiming('~', time()) + # self.phoneme_timing.append(phtm) + + q_delg.setC_W_Ph_(synth_complete, will_speak_word, will_speak_phoneme) + + def queueTask(self, text): + rand_no = str(random.randint(0, 10000)) + fname = '{}-{}.aiff'.format(text, rand_no) + sanitized = format_filename(fname) + dest_file = dest_dir + sanitized + cli_gen_audio(text, dest_file) + self.phoneme_timing = [] + self.data = SegData(text, sanitized) + self.synth.startSpeakingString_(text) + + +def story_texts(): + # story_file = './inputs/all_stories_hs.json' + story_file = './inputs/all_stories.json' + stories_data = json.load(open(story_file)) + # text_list_dup = [t[0] for i in stories_data.values() for t in i] + text_list_dup = [t for i in stories_data.values() for t in i] + text_list = sorted(list(set(text_list_dup))) + return text_list + + +def generate_audio(): + synthQ = SynthesizerQueue() + phrases = random.sample(story_texts(), 5) # story_texts() + f = open(csv_dest_file, 'w') + s_csv_w = csv.writer(f, quoting=csv.QUOTE_MINIMAL) + i = 0 + p = tqdm(total=len(phrases)) + + def nextTask(seg_data=None): + nonlocal i + if i < len(phrases): + p.set_postfix(phrase=phrases[i]) + p.update() + synthQ.queueTask(phrases[i]) + i += 1 + else: + p.close() + f.close() + dg = NSApplication.sharedApplication().delegate + print('App terminated.') + NSApp().terminate_(dg) + if seg_data: + s_csv_w.writerows(seg_data.csv_rows()) + synthQ.didComplete = nextTask + nextTask() def main(): - speech_delg = SpeechDelegate.alloc().init() - speech_delg.speechSynthesizer_didFinishSpeaking_('t',True) - voices = NSSpeechSynthesizer.availableVoices() - identifier = voices[2] - time() - alex_voice = NSSpeechSynthesizer.alloc().initWithVoice_(identifier) - alex_voice.setDelegate_(speech_delg) - alex_voice.startSpeakingString_("This is a test for speech synthesis generation") + # Create a new application instance ... - a=NSApplication.sharedApplication() + a = NSApplication.sharedApplication() # ... and create its delgate. Note the use of the # Objective C constructors below, because Delegate # is a subcalss of an Objective C class, NSObject @@ -69,5 +222,6 @@ def main(): AppHelper.runEventLoop() + if __name__ == '__main__': main()