#!/usr/bin/env python3 | |
# import csv | |
import typing | |
import wave | |
import subprocess | |
import tempfile | |
from pathlib import Path | |
import gradio as gr | |
# from typing import Tuple, Dict | |
# import pandas as pd | |
import socket | |
# import yaml | |
# from os.path import exists | |
# def read_config(file) -> Dict: | |
# with open(file, 'r') as f: | |
# return yaml.safe_load(f) | |
# def write_config(file, config_data): | |
# with open(file, 'w') as yaml_file: | |
# yaml.dump(config_data, yaml_file, default_flow_style=False) | |
# def load_data(filename: str) -> pd.DataFrame: | |
# text_df = pd.read_csv(filename, header=None, names=['wav', 'text'], sep='|', on_bad_lines='skip') | |
# print(text_df.head(4)) | |
# return text_df | |
# def validate_index(index: int) -> int: | |
# max_index = Globals['max_index'] | |
# if index > max_index: | |
# index = max_index | |
# if index < 0: | |
# index = 0 | |
# return index | |
def process_text(text: str) -> typing.List: | |
text2 = text.lower().split('\n') | |
# mucella_001|500|Nartıme zı pşıj-themate gore yaağ. A pşıjım zerécew ştığexer pşı Jećej. A pşıjım yıe yiĺ šıfxem lıyew arixırer afemış'ejew ḣuğe.|ady-lt | |
index = 0 | |
text3 = [] | |
for line in text2: | |
line = line.strip() | |
if line == '': | |
print('-D- Skipping an empty line') | |
continue | |
print(f'-D- Processing line: {line}') | |
line = f'mucella_{index:04d}|500|{line}|ady-lt' | |
index = index + 1 | |
text3.append(line) | |
print(f'-D- process_text() text\n-D- before:\n{text}\n-D- after:\n{text3}') | |
return text3 | |
# def get_item_data(index: int) -> Tuple[int, str, str, str]: | |
# index = validate_index(index) | |
# df = Globals['text_df'] | |
# text = df.at[index, 'text'] | |
# if not isinstance(text, str): | |
# text = '' | |
# # Limit text to 'max_characters' | |
# # if len(text) > Globals['max_characters']: | |
# # text = text[0:Globals['max_characters']] | |
# audio_tag = df.at[index, 'wav'] | |
# audio_file = None | |
# if isinstance(audio_tag, str): | |
# audio_file = Globals['audio_dir'] + '/' + audio_tag + '.wav' | |
# save_index(index) | |
# return index, text, audio_tag, audio_file | |
# Globals = { | |
# 'input_csv': '', | |
# 'output_csv': '', | |
# 'audio_dir': '', | |
# 'session_config_file': '', | |
# 'text_df': pd.DataFrame(), | |
# 'max_index': 0 | |
# } | |
# config_file = '/home/haroon/PycharmProjects/gradio_creating_web_apis/mucella_metadata.cfg' | |
# python eval.py --model-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella --log-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella/synthesize/kamzegur1 --data-dir=/home/haroon/git_repos/few-shot-transformer-tts/Samples/ --eval_meta=/home/haroon/git_repos/few-shot-transformer-tts/Samples/kamzegur1.txt --start_step=2580000 --no_wait=True | |
Globals = { | |
'python_interpreter': '/home/haroon/python_virtual_envs/few_shot_tts/bin/python3', | |
'code_repository_dir': '/home/haroon/git_repos/few-shot-transformer-tts', | |
'model_dir': '/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella', | |
'start_step': 2580000, | |
'log_dir': '/tmp/few-shot-transformer-tts-server/log', | |
'data_dir': '/tmp/few-shot-transformer-tts-server/text', | |
'default_text': ' Maḣe keume sépĺı! \n \n Harun Şewgen \n', | |
'outfile': '' | |
} | |
# def incr_index(current_index: int) -> int: | |
# current_index = current_index + 1 | |
# current_index = validate_index(current_index) | |
# return current_index | |
# def decr_index(current_index: int) -> int: | |
# current_index = current_index - 1 | |
# current_index = validate_index(current_index) | |
# return current_index | |
def concat_wavs(processed_text: str) -> str: | |
temp_wav_name = f'adiga_{next(tempfile._get_candidate_names())}' | |
# outfile = f"{Globals['log_dir']}/output.wav" | |
outfile = f"{Globals['log_dir']}/{temp_wav_name}.wav" | |
print(f'-D- Concatenating to a single wav file: {outfile}') | |
# /tmp/few-shot-transformer-tts-server/log/eval_2580000/mucella_0000.wav | |
data = [] | |
for line in processed_text: | |
print(f'-D- concat_wavs() line before split: {line}') | |
wav_file, _, _, _ = line.split('|') | |
wav_file = f"{Globals['log_dir']}/eval_{Globals['start_step']}/{wav_file}.wav" | |
print(f'-D- wav_file: {wav_file}') | |
w = wave.open(wav_file, 'rb') | |
data.append([w.getparams(), w.readframes(w.getnframes())]) | |
w.close() | |
output = wave.open(outfile, 'wb') | |
output.setparams(data[0][0]) | |
for i in range(len(data)): | |
output.writeframes(data[i][1]) | |
output.close() | |
return outfile | |
def speak(text: str) -> str: | |
Globals['outfile'] = '' | |
print('-I- Generating speech ...') | |
print(f'-D- speak() text: {text}') | |
processed_text = process_text(text) | |
# Save text to a temporary file | |
text_file = f"{Globals['data_dir']}/text.txt" | |
with open(text_file, 'w') as f: | |
for line in processed_text: | |
f.write(f'{line}\n') | |
f.close() | |
# Prepare speech synthesis command: | |
# python eval.py --model-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella --log-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella/synthesize/kamzegur1 --data-dir=/home/haroon/git_repos/few-shot-transformer-tts/Samples/ --eval_meta=/home/haroon/git_repos/few-shot-transformer-tts/Samples/kamzegur1.txt --start_step=2580000 --no_wait=True | |
cmd = f"{Globals['python_interpreter']} {Globals['code_repository_dir']}/eval.py --model-dir={Globals['model_dir']} --log-dir={Globals['log_dir']} --data-dir={Globals['data_dir']} --eval_meta={text_file} --start_step={Globals['start_step']} --no_wait=True".split() | |
print(f'-D- Speech synthesis command:\n{cmd}') | |
subprocess.run(cmd) | |
print(f'-D- Finished synthesizing speech.') | |
outfile = concat_wavs(processed_text) | |
Globals['outfile'] = outfile | |
return outfile, outfile | |
def download() -> str: | |
outfile = Globals['outfile'] | |
if outfile != '' and Path(outfile).is_file(): | |
print(f'-I- Downloading {outfile}') | |
return outfile | |
return None | |
# def handle_text_editing(index: int, text: str) -> dict: | |
# index = int(index) | |
# index = validate_index(index) | |
# new_text = text | |
# if "\n" in new_text: | |
# if index == Globals['max_index']: | |
# print("-W- Can't split text since there are no audio tags left. Please add more audio tags.") | |
# return gr.update(value=new_text) | |
# [new_text1, new_text2] = new_text.split("\n", 1) | |
# orig_text = Globals['text_df'].iat[index, 1] | |
# orig_text2_index = orig_text.find(new_text2) | |
# if orig_text2_index == -1: | |
# # If text after the new line has been modified then just take the whole original text of this item to the | |
# # next item (not easy way to figure out how to cut the original text). | |
# new_text2 = orig_text | |
# else: | |
# new_text2 = orig_text[orig_text2_index:] | |
# next_item_orig_text = Globals['text_df'].iat[index + 1, 1] | |
# if not isinstance(next_item_orig_text, str): | |
# next_item_orig_text = '' | |
# new_text2 = new_text2 + ' ' + next_item_orig_text | |
# save_text(index, new_text1) | |
# save_text(index + 1, new_text2) | |
# return gr.update(value=new_text1) | |
# save_text(index, new_text) | |
# return gr.update(value=new_text) | |
# def update_output_file(output_file: str): | |
# Globals['output_csv'] = output_file | |
# def save_text(index: int, text: str): | |
# index = int(index) | |
# index = validate_index(index) | |
# Globals['text_df'].iat[index, 1] = text | |
# Globals['text_df'].to_csv(Globals['output_csv'], index=False, sep='|', header=False, quoting=csv.QUOTE_NONE) | |
# def save_index(index: int): | |
# index = validate_index(index) | |
# session_config_data = {'current_index': int(index)} | |
# write_config(Globals['session_config_file'], session_config_data) | |
def main(): | |
global Globals | |
# # Read main config file | |
# config_data = {} | |
# if exists(config_file): | |
# config_data = read_config(config_file) | |
# if config_data is not None: | |
# Globals.update(config_data) | |
# # Read config file which was written by an earlier session of the app | |
# Globals['session_config_file'] = config_file + '.session' | |
# session_config_data = None | |
# if exists(Globals['session_config_file']): | |
# session_config_data = read_config(Globals['session_config_file']) | |
# if session_config_data is not None: | |
# Globals.update(session_config_data) | |
# Globals['output_csv'] = Globals['input_csv'] + '.new' | |
# Globals['text_df'] = load_data(Globals['input_csv']) | |
# Globals['max_index'] = Globals['text_df'].shape[0] - 1 | |
# default_index = 0 | |
# if 'current_index' in Globals: | |
# default_index = Globals['current_index'] | |
# default_index = validate_index(default_index) | |
# _, default_text, default_audio_tag, default_audio_file = get_item_data(default_index) | |
# print(f'-D- default_text: {default_text}, default_wav: {default_audio_file}') | |
# Close port(s) in case it's still open from previous session | |
gr.close_all() | |
with gr.Blocks() as app: | |
# output_file_elem = gr.Text(label='Output File', value=Globals['output_csv'], interactive=True, max_lines=1) | |
# index_elem = gr.Number(label='Index', value=default_index) | |
text_elem = gr.Text(show_label=False, value=Globals['default_text'], interactive=True, max_lines=5) | |
# audio_tag_elem = gr.Text(label='Audio Tag', value=default_audio_tag, interactive=False) | |
# audio_file_elem = gr.Audio(show_label=False, value='') | |
speak_btn = gr.Button('Speak') | |
# with gr.Row(): | |
# speak_btn = gr.Button('Speak') | |
# download_btn = gr.Button("Download") | |
# audio_file_elem = gr.Audio(show_label=False) | |
# file_elem = gr.File(visible=True) | |
# file_elem.change(fn=download, inputs=[download_btn], outputs=[]) | |
# index_elem.change(fn=get_item_data, inputs=[index_elem], outputs=[index_elem, text_elem, audio_tag_elem, audio_file_elem]) | |
# prev_btn.click(fn=decr_index, inputs=[index_elem], outputs=[index_elem]) | |
# next_btn.click(fn=incr_index, inputs=[index_elem], outputs=[index_elem]) | |
speak_btn.click(fn=speak, inputs=[text_elem], outputs=[gr.Audio(show_label=False), gr.File()]) | |
# download_btn.click(fn=download, inputs=[], outputs=[gr.File()]) | |
# text_elem.change(fn=handle_text_editing, inputs=[index_elem, text_elem], outputs=[text_elem]) | |
# output_file_elem.change(fn=update_output_file, inputs=[output_file_elem], outputs=[]) | |
hostname = (([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2]if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0] | |
print(f'-D- Hostname: {hostname}') | |
# app.launch(server_name=hostname, server_port=6012, share=False) | |
app.launch(server_name=hostname, share=True) | |
exit(0) | |
if __name__ == '__main__': | |
main() | |