|
""" |
|
DeepSpeech features processing routines. |
|
NB: Based on VOCA code. See the corresponding license restrictions. |
|
""" |
|
|
|
__all__ = ['conv_audios_to_deepspeech'] |
|
|
|
import numpy as np |
|
import warnings |
|
import resampy |
|
from scipy.io import wavfile |
|
from python_speech_features import mfcc |
|
import tensorflow.compat.v1 as tf |
|
tf.disable_v2_behavior() |
|
|
|
def conv_audios_to_deepspeech(audios, |
|
out_files, |
|
num_frames_info, |
|
deepspeech_pb_path, |
|
audio_window_size=1, |
|
audio_window_stride=1): |
|
""" |
|
Convert list of audio files into files with DeepSpeech features. |
|
|
|
Parameters |
|
---------- |
|
audios : list of str or list of None |
|
Paths to input audio files. |
|
out_files : list of str |
|
Paths to output files with DeepSpeech features. |
|
num_frames_info : list of int |
|
List of numbers of frames. |
|
deepspeech_pb_path : str |
|
Path to DeepSpeech 0.1.0 frozen model. |
|
audio_window_size : int, default 16 |
|
Audio window size. |
|
audio_window_stride : int, default 1 |
|
Audio window stride. |
|
""" |
|
|
|
graph, logits_ph, input_node_ph, input_lengths_ph = prepare_deepspeech_net( |
|
deepspeech_pb_path) |
|
|
|
with tf.compat.v1.Session(graph=graph) as sess: |
|
for audio_file_path, out_file_path, num_frames in zip(audios, out_files, num_frames_info): |
|
print(audio_file_path) |
|
print(out_file_path) |
|
audio_sample_rate, audio = wavfile.read(audio_file_path) |
|
if audio.ndim != 1: |
|
warnings.warn( |
|
"Audio has multiple channels, the first channel is used") |
|
audio = audio[:, 0] |
|
ds_features = pure_conv_audio_to_deepspeech( |
|
audio=audio, |
|
audio_sample_rate=audio_sample_rate, |
|
audio_window_size=audio_window_size, |
|
audio_window_stride=audio_window_stride, |
|
num_frames=num_frames, |
|
net_fn=lambda x: sess.run( |
|
logits_ph, |
|
feed_dict={ |
|
input_node_ph: x[np.newaxis, ...], |
|
input_lengths_ph: [x.shape[0]]})) |
|
|
|
net_output = ds_features.reshape(-1, 29) |
|
win_size = 16 |
|
zero_pad = np.zeros((int(win_size / 2), net_output.shape[1])) |
|
net_output = np.concatenate( |
|
(zero_pad, net_output, zero_pad), axis=0) |
|
windows = [] |
|
for window_index in range(0, net_output.shape[0] - win_size, 2): |
|
windows.append( |
|
net_output[window_index:window_index + win_size]) |
|
print(np.array(windows).shape) |
|
np.save(out_file_path, np.array(windows)) |
|
|
|
|
|
def prepare_deepspeech_net(deepspeech_pb_path): |
|
""" |
|
Load and prepare DeepSpeech network. |
|
|
|
Parameters |
|
---------- |
|
deepspeech_pb_path : str |
|
Path to DeepSpeech 0.1.0 frozen model. |
|
|
|
Returns |
|
------- |
|
graph : obj |
|
ThensorFlow graph. |
|
logits_ph : obj |
|
ThensorFlow placeholder for `logits`. |
|
input_node_ph : obj |
|
ThensorFlow placeholder for `input_node`. |
|
input_lengths_ph : obj |
|
ThensorFlow placeholder for `input_lengths`. |
|
""" |
|
|
|
with tf.io.gfile.GFile(deepspeech_pb_path, "rb") as f: |
|
graph_def = tf.compat.v1.GraphDef() |
|
graph_def.ParseFromString(f.read()) |
|
|
|
graph = tf.compat.v1.get_default_graph() |
|
tf.import_graph_def(graph_def, name="deepspeech") |
|
logits_ph = graph.get_tensor_by_name("deepspeech/logits:0") |
|
input_node_ph = graph.get_tensor_by_name("deepspeech/input_node:0") |
|
input_lengths_ph = graph.get_tensor_by_name("deepspeech/input_lengths:0") |
|
|
|
return graph, logits_ph, input_node_ph, input_lengths_ph |
|
|
|
|
|
def pure_conv_audio_to_deepspeech(audio, |
|
audio_sample_rate, |
|
audio_window_size, |
|
audio_window_stride, |
|
num_frames, |
|
net_fn): |
|
""" |
|
Core routine for converting audion into DeepSpeech features. |
|
|
|
Parameters |
|
---------- |
|
audio : np.array |
|
Audio data. |
|
audio_sample_rate : int |
|
Audio sample rate. |
|
audio_window_size : int |
|
Audio window size. |
|
audio_window_stride : int |
|
Audio window stride. |
|
num_frames : int or None |
|
Numbers of frames. |
|
net_fn : func |
|
Function for DeepSpeech model call. |
|
|
|
Returns |
|
------- |
|
np.array |
|
DeepSpeech features. |
|
""" |
|
target_sample_rate = 16000 |
|
if audio_sample_rate != target_sample_rate: |
|
resampled_audio = resampy.resample( |
|
x=audio.astype(np.float), |
|
sr_orig=audio_sample_rate, |
|
sr_new=target_sample_rate) |
|
else: |
|
resampled_audio = audio.astype(np.float32) |
|
input_vector = conv_audio_to_deepspeech_input_vector( |
|
audio=resampled_audio.astype(np.int16), |
|
sample_rate=target_sample_rate, |
|
num_cepstrum=26, |
|
num_context=9) |
|
|
|
network_output = net_fn(input_vector) |
|
|
|
|
|
deepspeech_fps = 50 |
|
video_fps = 50 |
|
audio_len_s = float(audio.shape[0]) / audio_sample_rate |
|
if num_frames is None: |
|
num_frames = int(round(audio_len_s * video_fps)) |
|
else: |
|
video_fps = num_frames / audio_len_s |
|
network_output = interpolate_features( |
|
features=network_output[:, 0], |
|
input_rate=deepspeech_fps, |
|
output_rate=video_fps, |
|
output_len=num_frames) |
|
|
|
|
|
zero_pad = np.zeros((int(audio_window_size / 2), network_output.shape[1])) |
|
network_output = np.concatenate( |
|
(zero_pad, network_output, zero_pad), axis=0) |
|
windows = [] |
|
for window_index in range(0, network_output.shape[0] - audio_window_size, audio_window_stride): |
|
windows.append( |
|
network_output[window_index:window_index + audio_window_size]) |
|
|
|
return np.array(windows) |
|
|
|
|
|
def conv_audio_to_deepspeech_input_vector(audio, |
|
sample_rate, |
|
num_cepstrum, |
|
num_context): |
|
""" |
|
Convert audio raw data into DeepSpeech input vector. |
|
|
|
Parameters |
|
---------- |
|
audio : np.array |
|
Audio data. |
|
audio_sample_rate : int |
|
Audio sample rate. |
|
num_cepstrum : int |
|
Number of cepstrum. |
|
num_context : int |
|
Number of context. |
|
|
|
Returns |
|
------- |
|
np.array |
|
DeepSpeech input vector. |
|
""" |
|
|
|
features = mfcc( |
|
signal=audio, |
|
samplerate=sample_rate, |
|
numcep=num_cepstrum) |
|
|
|
|
|
features = features[::2] |
|
|
|
|
|
num_strides = len(features) |
|
|
|
|
|
empty_context = np.zeros((num_context, num_cepstrum), dtype=features.dtype) |
|
features = np.concatenate((empty_context, features, empty_context)) |
|
|
|
|
|
|
|
window_size = 2 * num_context + 1 |
|
train_inputs = np.lib.stride_tricks.as_strided( |
|
features, |
|
shape=(num_strides, window_size, num_cepstrum), |
|
strides=(features.strides[0], |
|
features.strides[0], features.strides[1]), |
|
writeable=False) |
|
|
|
|
|
train_inputs = np.reshape(train_inputs, [num_strides, -1]) |
|
|
|
train_inputs = np.copy(train_inputs) |
|
train_inputs = (train_inputs - np.mean(train_inputs)) / \ |
|
np.std(train_inputs) |
|
|
|
return train_inputs |
|
|
|
|
|
def interpolate_features(features, |
|
input_rate, |
|
output_rate, |
|
output_len): |
|
""" |
|
Interpolate DeepSpeech features. |
|
|
|
Parameters |
|
---------- |
|
features : np.array |
|
DeepSpeech features. |
|
input_rate : int |
|
input rate (FPS). |
|
output_rate : int |
|
Output rate (FPS). |
|
output_len : int |
|
Output data length. |
|
|
|
Returns |
|
------- |
|
np.array |
|
Interpolated data. |
|
""" |
|
input_len = features.shape[0] |
|
num_features = features.shape[1] |
|
input_timestamps = np.arange(input_len) / float(input_rate) |
|
output_timestamps = np.arange(output_len) / float(output_rate) |
|
output_features = np.zeros((output_len, num_features)) |
|
for feature_idx in range(num_features): |
|
output_features[:, feature_idx] = np.interp( |
|
x=output_timestamps, |
|
xp=input_timestamps, |
|
fp=features[:, feature_idx]) |
|
return output_features |
|
|