import spaces import os import gradio as gr import torch import torchaudio from transformers import pipeline from pytube import YouTube import re import numpy as np from scipy.signal import wiener from io import BytesIO import noisereduce as nr import soundfile as sf pipe = pipeline(model="anzorq/w2v-bert-2.0-kbd-v2", device=0) # Define the replacements for Kabardian transcription replacements = [ ('гъ', 'ɣ'), ('дж', 'j'), ('дз', 'ӡ'), ('жь', 'ʐ'), ('кӏ', 'қ'), ('кхъ', 'qҳ'), ('къ', 'q'), ('лъ', 'ɬ'), ('лӏ', 'ԯ'), ('пӏ', 'ԥ'), ('тӏ', 'ҭ'), ('фӏ', 'ჶ'), ('хь', 'h'), ('хъ', 'ҳ'), ('цӏ', 'ҵ'), ('щӏ', 'ɕ'), ('я', 'йа') ] # Reverse replacements for transcription reverse_replacements = {v: k for k, v in replacements} reverse_pattern = re.compile('|'.join(re.escape(key) for key in reverse_replacements)) def replace_symbols_back(text): return reverse_pattern.sub(lambda match: reverse_replacements[match.group(0)], text) def preprocess_audio(audio_tensor, original_sample_rate, apply_normalization): audio_tensor = audio_tensor.to(dtype=torch.float32) audio_tensor = torch.mean(audio_tensor, dim=0, keepdim=True) # Convert to mono if apply_normalization: audio_tensor = audio_tensor / torch.max(torch.abs(audio_tensor)) # Normalize audio_tensor = torchaudio.functional.resample(audio_tensor, orig_freq=original_sample_rate, new_freq=16000) # Resample return audio_tensor def spectral_gating(audio_tensor): audio_data = audio_tensor.numpy() reduced_noise = nr.reduce_noise(y=audio_data, sr=16_000) return torch.tensor(reduced_noise, dtype=audio_tensor.dtype) def wiener_filter(audio_tensor): audio_data = audio_tensor.numpy() filtered_audio = wiener(audio_data) return torch.tensor(filtered_audio, dtype=audio_tensor.dtype) @spaces.GPU def transcribe_speech(audio, apply_wiener_filter=False, apply_normalization=False, apply_spectral_gating=False, progress=gr.Progress()): if audio is None: return "No audio received.", None progress(0.1, desc="Preprocessing audio...") audio_tensor, original_sample_rate = torchaudio.load(audio) audio_tensor = preprocess_audio(audio_tensor, original_sample_rate, apply_normalization) if apply_wiener_filter: progress(0.3, desc="Applying Wiener filter...") audio_tensor = wiener_filter(audio_tensor) if apply_spectral_gating: progress(0.5, desc="Applying Spectral Gating filter...") audio_tensor = spectral_gating(audio_tensor) progress(0.7, desc="Transcribing audio...") audio_np = audio_tensor.numpy().squeeze() transcription = pipe(audio_np, chunk_length_s=10)['text'] transcription = replace_symbols_back(transcription) audio_np = audio_tensor.numpy().squeeze() sf.write("temp_audio.wav", audio_np, 16000, subtype='PCM_16') return transcription, "temp_audio.wav" def transcribe_from_youtube(url, apply_wiener_filter, apply_normalization, apply_spectral_gating, progress=gr.Progress()): progress(0, "Downloading YouTube audio...") try: yt = YouTube(url) stream = yt.streams.filter(only_audio=True).first() audio_data = BytesIO() stream.stream_to_buffer(audio_data) audio_data.seek(0) audio_tensor, original_sample_rate = torchaudio.load(audio_data) audio_tensor = preprocess_audio(audio_tensor, original_sample_rate, apply_normalization) if apply_wiener_filter: progress(0.4, "Applying Wiener filter...") audio_tensor = wiener_filter(audio_tensor) if apply_spectral_gating: progress(0.4, "Applying Spectral Gating filter...") audio_tensor = spectral_gating(audio_tensor) transcription, _ = transcribe_speech(audio_tensor) audio_np = audio_tensor.numpy().squeeze() sf.write("temp_audio.wav", audio_np, 16000, subtype='PCM_16') except Exception as e: return str(e), None return transcription, "temp_audio.wav" def populate_metadata(url): yt = YouTube(url) return yt.thumbnail_url, yt.title with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.HTML( """
Kabardian speech to text transcription using a fine-tuned Wav2Vec2-BERT model