AdigaTTS / app.py
showgan's picture
testing
baf4ea1
raw
history blame
11.2 kB
#!/usr/bin/env python3
# import csv
import typing
import wave
import subprocess
import tempfile
from pathlib import Path
import gradio as gr
# from typing import Tuple, Dict
# import pandas as pd
import socket
# import yaml
# from os.path import exists
# def read_config(file) -> Dict:
# with open(file, 'r') as f:
# return yaml.safe_load(f)
# def write_config(file, config_data):
# with open(file, 'w') as yaml_file:
# yaml.dump(config_data, yaml_file, default_flow_style=False)
# def load_data(filename: str) -> pd.DataFrame:
# text_df = pd.read_csv(filename, header=None, names=['wav', 'text'], sep='|', on_bad_lines='skip')
# print(text_df.head(4))
# return text_df
# def validate_index(index: int) -> int:
# max_index = Globals['max_index']
# if index > max_index:
# index = max_index
# if index < 0:
# index = 0
# return index
def process_text(text: str) -> typing.List:
text2 = text.lower().split('\n')
# mucella_001|500|Nartıme zı pşıj-themate gore yaağ. A pşıjım zerécew ştığexer pşı Jećej. A pşıjım yıe yiĺ šıfxem lıyew arixırer afemış'ejew ḣuğe.|ady-lt
index = 0
text3 = []
for line in text2:
line = line.strip()
if line == '':
print('-D- Skipping an empty line')
continue
print(f'-D- Processing line: {line}')
line = f'mucella_{index:04d}|500|{line}|ady-lt'
index = index + 1
text3.append(line)
print(f'-D- process_text() text\n-D- before:\n{text}\n-D- after:\n{text3}')
return text3
# def get_item_data(index: int) -> Tuple[int, str, str, str]:
# index = validate_index(index)
# df = Globals['text_df']
# text = df.at[index, 'text']
# if not isinstance(text, str):
# text = ''
# # Limit text to 'max_characters'
# # if len(text) > Globals['max_characters']:
# # text = text[0:Globals['max_characters']]
# audio_tag = df.at[index, 'wav']
# audio_file = None
# if isinstance(audio_tag, str):
# audio_file = Globals['audio_dir'] + '/' + audio_tag + '.wav'
# save_index(index)
# return index, text, audio_tag, audio_file
# Globals = {
# 'input_csv': '',
# 'output_csv': '',
# 'audio_dir': '',
# 'session_config_file': '',
# 'text_df': pd.DataFrame(),
# 'max_index': 0
# }
# config_file = '/home/haroon/PycharmProjects/gradio_creating_web_apis/mucella_metadata.cfg'
# python eval.py --model-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella --log-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella/synthesize/kamzegur1 --data-dir=/home/haroon/git_repos/few-shot-transformer-tts/Samples/ --eval_meta=/home/haroon/git_repos/few-shot-transformer-tts/Samples/kamzegur1.txt --start_step=2580000 --no_wait=True
Globals = {
'python_interpreter': '/home/haroon/python_virtual_envs/few_shot_tts/bin/python3',
'code_repository_dir': '/home/haroon/git_repos/few-shot-transformer-tts',
'model_dir': '/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella',
'start_step': 2580000,
'log_dir': '/tmp/few-shot-transformer-tts-server/log',
'data_dir': '/tmp/few-shot-transformer-tts-server/text',
'default_text': ' Maḣe keume sépĺı! \n \n Harun Şewgen \n',
'outfile': ''
}
# def incr_index(current_index: int) -> int:
# current_index = current_index + 1
# current_index = validate_index(current_index)
# return current_index
# def decr_index(current_index: int) -> int:
# current_index = current_index - 1
# current_index = validate_index(current_index)
# return current_index
def concat_wavs(processed_text: str) -> str:
temp_wav_name = f'adiga_{next(tempfile._get_candidate_names())}'
# outfile = f"{Globals['log_dir']}/output.wav"
outfile = f"{Globals['log_dir']}/{temp_wav_name}.wav"
print(f'-D- Concatenating to a single wav file: {outfile}')
# /tmp/few-shot-transformer-tts-server/log/eval_2580000/mucella_0000.wav
data = []
for line in processed_text:
print(f'-D- concat_wavs() line before split: {line}')
wav_file, _, _, _ = line.split('|')
wav_file = f"{Globals['log_dir']}/eval_{Globals['start_step']}/{wav_file}.wav"
print(f'-D- wav_file: {wav_file}')
w = wave.open(wav_file, 'rb')
data.append([w.getparams(), w.readframes(w.getnframes())])
w.close()
output = wave.open(outfile, 'wb')
output.setparams(data[0][0])
for i in range(len(data)):
output.writeframes(data[i][1])
output.close()
return outfile
def speak(text: str) -> str:
Globals['outfile'] = ''
print('-I- Generating speech ...')
print(f'-D- speak() text: {text}')
processed_text = process_text(text)
# Save text to a temporary file
text_file = f"{Globals['data_dir']}/text.txt"
with open(text_file, 'w') as f:
for line in processed_text:
f.write(f'{line}\n')
f.close()
# Prepare speech synthesis command:
# python eval.py --model-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella --log-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella/synthesize/kamzegur1 --data-dir=/home/haroon/git_repos/few-shot-transformer-tts/Samples/ --eval_meta=/home/haroon/git_repos/few-shot-transformer-tts/Samples/kamzegur1.txt --start_step=2580000 --no_wait=True
cmd = f"{Globals['python_interpreter']} {Globals['code_repository_dir']}/eval.py --model-dir={Globals['model_dir']} --log-dir={Globals['log_dir']} --data-dir={Globals['data_dir']} --eval_meta={text_file} --start_step={Globals['start_step']} --no_wait=True".split()
print(f'-D- Speech synthesis command:\n{cmd}')
subprocess.run(cmd)
print(f'-D- Finished synthesizing speech.')
outfile = concat_wavs(processed_text)
Globals['outfile'] = outfile
return outfile, outfile
def download() -> str:
outfile = Globals['outfile']
if outfile != '' and Path(outfile).is_file():
print(f'-I- Downloading {outfile}')
return outfile
return None
# def handle_text_editing(index: int, text: str) -> dict:
# index = int(index)
# index = validate_index(index)
# new_text = text
# if "\n" in new_text:
# if index == Globals['max_index']:
# print("-W- Can't split text since there are no audio tags left. Please add more audio tags.")
# return gr.update(value=new_text)
# [new_text1, new_text2] = new_text.split("\n", 1)
# orig_text = Globals['text_df'].iat[index, 1]
# orig_text2_index = orig_text.find(new_text2)
# if orig_text2_index == -1:
# # If text after the new line has been modified then just take the whole original text of this item to the
# # next item (not easy way to figure out how to cut the original text).
# new_text2 = orig_text
# else:
# new_text2 = orig_text[orig_text2_index:]
# next_item_orig_text = Globals['text_df'].iat[index + 1, 1]
# if not isinstance(next_item_orig_text, str):
# next_item_orig_text = ''
# new_text2 = new_text2 + ' ' + next_item_orig_text
# save_text(index, new_text1)
# save_text(index + 1, new_text2)
# return gr.update(value=new_text1)
# save_text(index, new_text)
# return gr.update(value=new_text)
# def update_output_file(output_file: str):
# Globals['output_csv'] = output_file
# def save_text(index: int, text: str):
# index = int(index)
# index = validate_index(index)
# Globals['text_df'].iat[index, 1] = text
# Globals['text_df'].to_csv(Globals['output_csv'], index=False, sep='|', header=False, quoting=csv.QUOTE_NONE)
# def save_index(index: int):
# index = validate_index(index)
# session_config_data = {'current_index': int(index)}
# write_config(Globals['session_config_file'], session_config_data)
def main():
global Globals
# # Read main config file
# config_data = {}
# if exists(config_file):
# config_data = read_config(config_file)
# if config_data is not None:
# Globals.update(config_data)
# # Read config file which was written by an earlier session of the app
# Globals['session_config_file'] = config_file + '.session'
# session_config_data = None
# if exists(Globals['session_config_file']):
# session_config_data = read_config(Globals['session_config_file'])
# if session_config_data is not None:
# Globals.update(session_config_data)
# Globals['output_csv'] = Globals['input_csv'] + '.new'
# Globals['text_df'] = load_data(Globals['input_csv'])
# Globals['max_index'] = Globals['text_df'].shape[0] - 1
# default_index = 0
# if 'current_index' in Globals:
# default_index = Globals['current_index']
# default_index = validate_index(default_index)
# _, default_text, default_audio_tag, default_audio_file = get_item_data(default_index)
# print(f'-D- default_text: {default_text}, default_wav: {default_audio_file}')
# Close port(s) in case it's still open from previous session
gr.close_all()
with gr.Blocks() as app:
# output_file_elem = gr.Text(label='Output File', value=Globals['output_csv'], interactive=True, max_lines=1)
# index_elem = gr.Number(label='Index', value=default_index)
text_elem = gr.Text(show_label=False, value=Globals['default_text'], interactive=True, max_lines=5)
# audio_tag_elem = gr.Text(label='Audio Tag', value=default_audio_tag, interactive=False)
# audio_file_elem = gr.Audio(show_label=False, value='')
speak_btn = gr.Button('Speak')
# with gr.Row():
# speak_btn = gr.Button('Speak')
# download_btn = gr.Button("Download")
# audio_file_elem = gr.Audio(show_label=False)
# file_elem = gr.File(visible=True)
# file_elem.change(fn=download, inputs=[download_btn], outputs=[])
# index_elem.change(fn=get_item_data, inputs=[index_elem], outputs=[index_elem, text_elem, audio_tag_elem, audio_file_elem])
# prev_btn.click(fn=decr_index, inputs=[index_elem], outputs=[index_elem])
# next_btn.click(fn=incr_index, inputs=[index_elem], outputs=[index_elem])
speak_btn.click(fn=speak, inputs=[text_elem], outputs=[gr.Audio(show_label=False), gr.File()])
# download_btn.click(fn=download, inputs=[], outputs=[gr.File()])
# text_elem.change(fn=handle_text_editing, inputs=[index_elem, text_elem], outputs=[text_elem])
# output_file_elem.change(fn=update_output_file, inputs=[output_file_elem], outputs=[])
hostname = (([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2]if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]
print(f'-D- Hostname: {hostname}')
# app.launch(server_name=hostname, server_port=6012, share=False)
app.launch(server_name=hostname, share=True)
exit(0)
if __name__ == '__main__':
main()