File size: 4,235 Bytes
171bc27 641eeeb 171bc27 4e0c3e5 171bc27 722014e 747d7b4 4e0c3e5 722014e 4e0c3e5 171bc27 4e0c3e5 79892f2 4e3512b 2b0365b 195413c 722014e 171bc27 e298aea 9a5cdc0 4e0c3e5 67b38a5 c5e38e6 4b65bb8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
import torch
from speechbrain.inference.interfaces import Pretrained
import librosa
class ASR(Pretrained):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def encode_batch(self, device, wavs, wav_lens=None, normalize=False):
wavs = wavs.to(device)
wav_lens = wav_lens.to(device)
# Forward pass
encoded_outputs = self.mods.encoder_w2v2(wavs.detach())
# append
tokens_bos = torch.zeros((wavs.size(0), 1), dtype=torch.long).to(device)
embedded_tokens = self.mods.embedding(tokens_bos)
decoder_outputs, _ = self.mods.decoder(embedded_tokens, encoded_outputs, wav_lens)
# Output layer for seq2seq log-probabilities
predictions = self.hparams.test_search(encoded_outputs, wav_lens)[0]
# predicted_words = [self.hparams.tokenizer.decode_ids(prediction).split(" ") for prediction in predictions]
predicted_words = []
for prediction in predictions:
prediction = [token for token in prediction if token != 0]
predicted_words.append(self.hparams.tokenizer.decode_ids(prediction).split(" "))
prediction = []
for sent in predicted_words:
sent = self.filter_repetitions(sent, 3)
prediction.append(sent)
predicted_words = prediction
return predicted_words
def filter_repetitions(self, seq, max_repetition_length):
seq = list(seq)
output = []
max_n = len(seq) // 2
for n in range(max_n, 0, -1):
max_repetitions = max(max_repetition_length // n, 1)
# Don't need to iterate over impossible n values:
# len(seq) can change a lot during iteration
if (len(seq) <= n*2) or (len(seq) <= max_repetition_length):
continue
iterator = enumerate(seq)
# Fill first buffers:
buffers = [[next(iterator)[1]] for _ in range(n)]
for seq_index, token in iterator:
current_buffer = seq_index % n
if token != buffers[current_buffer][-1]:
# No repeat, we can flush some tokens
buf_len = sum(map(len, buffers))
flush_start = (current_buffer-buf_len) % n
# Keep n-1 tokens, but possibly mark some for removal
for flush_index in range(buf_len - buf_len%n):
if (buf_len - flush_index) > n-1:
to_flush = buffers[(flush_index + flush_start) % n].pop(0)
else:
to_flush = None
# Here, repetitions get removed:
if (flush_index // n < max_repetitions) and to_flush is not None:
output.append(to_flush)
elif (flush_index // n >= max_repetitions) and to_flush is None:
output.append(to_flush)
buffers[current_buffer].append(token)
# At the end, final flush
current_buffer += 1
buf_len = sum(map(len, buffers))
flush_start = (current_buffer-buf_len) % n
for flush_index in range(buf_len):
to_flush = buffers[(flush_index + flush_start) % n].pop(0)
# Here, repetitions just get removed:
if flush_index // n < max_repetitions:
output.append(to_flush)
seq = []
to_delete = 0
for token in output:
if token is None:
to_delete += 1
elif to_delete > 0:
to_delete -= 1
else:
seq.append(token)
output = []
return seq
def classify_file(self, path, device):
waveform, sr = librosa.load(path, sr=16000)
waveform = torch.tensor(waveform).to(device)
waveform = waveform.to(device)
# Fake a batch:
batch = waveform.unsqueeze(0)
rel_length = torch.tensor([1.0]).to(device)
outputs = self.encode_batch(device, batch, rel_length)
outputs = " ".join(outputs[0])
return outputs
|