import objc from AppKit import * from Foundation import NSURL from PyObjCTools import AppHelper from time import time import os import sys import random import json import csv import subprocess from tqdm import tqdm from speech_tools import create_dir,format_filename apple_phonemes = [ '%', '@', 'AE', 'EY', 'AO', 'AX', 'IY', 'EH', 'IH', 'AY', 'IX', 'AA', 'UW', 'UH', 'UX', 'OW', 'AW', 'OY', 'b', 'C', 'd', 'D', 'f', 'g', 'h', 'J', 'k', 'l', 'm', 'n', 'N', 'p', 'r', 's', 'S', 't', 'T', 'v', 'w', 'y', 'z', 'Z' ] OUTPUT_NAME = 'story_test_segments' dest_dir = os.path.abspath('.') + '/outputs/' + OUTPUT_NAME + '/' csv_dest_file = os.path.abspath('.') + '/outputs/' + OUTPUT_NAME + '.csv' create_dir(dest_dir) def cli_gen_audio(speech_cmd, out_path): subprocess.call( ['say', '-o', out_path, "'" + speech_cmd + "'"]) class SpeechDelegate (NSObject): def speechSynthesizer_willSpeakWord_ofString_(self, sender, word, text): '''Called automatically when the application has launched''' # print("Speaking word {} in sentence {}".format(word,text)) self.wordWillSpeak() def speechSynthesizer_willSpeakPhoneme_(self, sender, phoneme): phon_ch = apple_phonemes[phoneme] self.phonemeWillSpeak(phon_ch) def speechSynthesizer_didFinishSpeaking_(self, synth, didFinishSpeaking): if didFinishSpeaking: self.completeCB() def setC_W_Ph_(self, completed, word, phoneme): self.completeCB = completed self.wordWillSpeak = word self.phonemeWillSpeak = phoneme # del SpeechDelegate class Delegate (NSObject): def applicationDidFinishLaunching_(self, aNotification): '''Called automatically when the application has launched''' print("App Launched!") # phrases = story_texts()#random.sample(story_texts(), 100) # # phrases = test_texts(30) phrases = story_words() # print(phrases) generate_audio(phrases) class PhonemeTiming(object): """docstring for PhonemeTiming.""" def __init__(self, phon, start): super(PhonemeTiming, self).__init__() self.phoneme = phon self.start = start self.fraction = 0 self.duration = None self.end = None def is_audible(self): return self.phoneme not in ['%', '~'] def tune(self): if self.is_audible(): dur_ms = int(self.duration * 1000) return '{} {{D {}}}'.format(self.phoneme, dur_ms) else: return '~' def __repr__(self): return '[{}]({:0.4f})'.format(self.phoneme, self.fraction) @staticmethod def to_tune(phone_ts): tune_list = ['[[inpt TUNE]]'] for ph in phone_ts: tune_list.append(ph.tune()) tune_list.append('[[inpt TEXT]]') return '\n'.join(tune_list) class SegData(object): """docstring for SegData.""" def __init__(self, text, filename): super(SegData, self).__init__() self.text = text self.tune = '' self.filename = filename self.segments = [] def csv_rows(self): result = [] s_tim = self.segments[0].start for i in range(len(self.segments) - 1): cs = self.segments[i] # if cs.is_audible(): ns = self.segments[i + 1] row = [self.text, self.filename, cs.phoneme, ns.phoneme, (cs.start - s_tim) * 1000, (cs.end - s_tim) * 1000] result.append(row) return result class SynthesizerQueue(object): """docstring for SynthesizerQueue.""" def __init__(self): super(SynthesizerQueue, self).__init__() self.synth = NSSpeechSynthesizer.alloc().init() self.didComplete = None q_delg = SpeechDelegate.alloc().init() self.synth.setDelegate_(q_delg) def synth_complete(): end_time = time() for i in range(len(self.phoneme_timing)): if i == len(self.phoneme_timing) - 1: self.phoneme_timing[i].duration = end_time - \ self.phoneme_timing[i].start self.phoneme_timing[i].end = end_time else: self.phoneme_timing[i].duration = self.phoneme_timing[i + 1].start - self.phoneme_timing[i].start self.phoneme_timing[i].end = self.phoneme_timing[i + 1].start total_time = sum( [i.duration for i in self.phoneme_timing if i.is_audible()]) for ph in self.phoneme_timing: if ph.is_audible(): ph.fraction = ph.duration / total_time if self.didComplete: self.data.segments = self.phoneme_timing self.data.tune = PhonemeTiming.to_tune(self.phoneme_timing) self.didComplete(self.data) def will_speak_phoneme(phon): phtm = PhonemeTiming(phon, time()) self.phoneme_timing.append(phtm) def will_speak_word(): pass # coz it comes after the first phoneme of the word is started # phtm = PhonemeTiming('~', time()) # self.phoneme_timing.append(phtm) q_delg.setC_W_Ph_(synth_complete, will_speak_word, will_speak_phoneme) def queueTask(self, text): rand_no = str(random.randint(0, 10000)) fname = '{}-{}.aiff'.format(text, rand_no) sanitized = format_filename(fname) dest_file = dest_dir + sanitized cli_gen_audio(text, dest_file) self.phoneme_timing = [] self.data = SegData(text, sanitized) self.synth.startSpeakingString_(text) def story_texts(): story_file = './inputs/all_stories.json' stories_data = json.load(open(story_file)) text_list_dup = [t for i in stories_data.values() for t in i] text_list = sorted(list(set(text_list_dup))) return text_list def story_words(): story_file = './inputs/all_stories_hs.json' stories_data = json.load(open(story_file)) text_list_dup = [t[0] for i in stories_data.values() for t in i] text_list = sorted(list(set(text_list_dup))) return text_list def test_texts(count=10): word_list = [i.strip('\n_') for i in open('./inputs/wordlist.txt','r').readlines()] text_list = sorted(random.sample(list(set(word_list)),count)) return text_list def generate_audio(phrases): synthQ = SynthesizerQueue() f = open(csv_dest_file, 'w') s_csv_w = csv.writer(f, quoting=csv.QUOTE_MINIMAL) i = 0 p = tqdm(total=len(phrases)) def nextTask(seg_data=None): nonlocal i if i < len(phrases): p.set_postfix(phrase=phrases[i]) p.update() synthQ.queueTask(phrases[i]) i += 1 else: p.close() f.close() dg = NSApplication.sharedApplication().delegate print('App terminated.') NSApp().terminate_(dg) if seg_data: s_csv_w.writerows(seg_data.csv_rows()) synthQ.didComplete = nextTask nextTask() def main(): # Create a new application instance ... a = NSApplication.sharedApplication() # ... and create its delgate. Note the use of the # Objective C constructors below, because Delegate # is a subcalss of an Objective C class, NSObject delegate = Delegate.alloc().init() # Tell the application which delegate object to use. a.setDelegate_(delegate) AppHelper.runEventLoop() if __name__ == '__main__': main()