Spaces:
Sleeping
Sleeping
import streamlit as st | |
import sparknlp | |
import os | |
import pandas as pd | |
import librosa | |
from sparknlp.base import * | |
from sparknlp.common import * | |
from sparknlp.annotator import * | |
from pyspark.ml import Pipeline | |
from sparknlp.pretrained import PretrainedPipeline | |
from pyspark.sql.types import * | |
import pyspark.sql.functions as F | |
# Page configuration | |
st.set_page_config( | |
layout="wide", | |
initial_sidebar_state="auto" | |
) | |
# Custom CSS for styling | |
st.markdown(""" | |
<style> | |
.main-title { | |
font-size: 36px; | |
color: #4A90E2; | |
font-weight: bold; | |
text-align: center; | |
} | |
.section { | |
background-color: #f9f9f9; | |
padding: 10px; | |
border-radius: 10px; | |
margin-top: 10px; | |
} | |
.section p, .section ul { | |
color: #666666; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
def init_spark(): | |
"""Initialize Spark NLP.""" | |
return sparknlp.start() | |
def create_pipeline(model): | |
"""Create a Spark NLP pipeline for audio processing.""" | |
audio_assembler = AudioAssembler() \ | |
.setInputCol("audio_content") \ | |
.setOutputCol("audio_assembler") | |
speech_to_text = HubertForCTC \ | |
.pretrained(model)\ | |
.setInputCols("audio_assembler") \ | |
.setOutputCol("text") | |
pipeline = Pipeline(stages=[ | |
audio_assembler, | |
speech_to_text | |
]) | |
return pipeline | |
def fit_data(pipeline, fed_data): | |
"""Fit the data into the pipeline and return the transcription.""" | |
data, sampling_rate = librosa.load(fed_data, sr=16000) | |
data = data.tolist() | |
spark_df = spark.createDataFrame([[data]], ["audio_content"]) | |
model = pipeline.fit(spark_df) | |
lp = LightPipeline(model) | |
lp_result = lp.fullAnnotate(data)[0] | |
return lp_result | |
def save_uploadedfile(uploadedfile, path): | |
"""Save the uploaded file to the specified path.""" | |
filepath = os.path.join(path, uploadedfile.name) | |
with open(filepath, "wb") as f: | |
if hasattr(uploadedfile, 'getbuffer'): | |
f.write(uploadedfile.getbuffer()) | |
else: | |
f.write(uploadedfile.read()) | |
# Sidebar content | |
model_list = ["asr_hubert_large_ls960"] | |
model = st.sidebar.selectbox( | |
"Choose the pretrained model", | |
model_list, | |
help="For more info about the models visit: https://sparknlp.org/models" | |
) | |
# Main content | |
st.markdown('<div class="main-title">Speech Recognition With HubertForCTC</div>', unsafe_allow_html=True) | |
st.markdown('<div class="section"><p>This demo transcribes audio files into texts using the <code>HubertForCTC</code> Annotator and advanced speech recognition models.</p></div>', unsafe_allow_html=True) | |
# Reference notebook link in sidebar | |
st.sidebar.markdown('Reference notebook:') | |
st.sidebar.markdown(""" | |
<a href="https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/open-source-nlp/17.0.Speech_Recognition.ipynb"> | |
<img src="https://colab.research.google.com/assets/colab-badge.svg" style="zoom: 1.3" alt="Open In Colab"/> | |
</a> | |
""", unsafe_allow_html=True) | |
# Load examples | |
AUDIO_FILE_PATH = "inputs" | |
audio_files = sorted(os.listdir(AUDIO_FILE_PATH)) | |
selected_audio = st.selectbox("Select an audio", audio_files) | |
# Creating a simplified Python list of audio file types | |
audio_file_types = ["mp3", "flac", "wav", "aac", "ogg", "aiff", "wma", "m4a", "ape", "dsf", "dff", "midi", "mid", "opus", "amr"] | |
uploadedfile = st.file_uploader("Try it for yourself!", type=audio_file_types) | |
if uploadedfile: | |
selected_audio = f"{AUDIO_FILE_PATH}/{uploadedfile.name}" | |
save_uploadedfile(uploadedfile, AUDIO_FILE_PATH) | |
elif selected_audio: | |
selected_audio = f"{AUDIO_FILE_PATH}/{selected_audio}" | |
# Audio playback and transcription | |
st.subheader("Play Audio") | |
with open(selected_audio, 'rb') as audio_file: | |
audio_bytes = audio_file.read() | |
st.audio(audio_bytes) | |
spark = init_spark() | |
pipeline = create_pipeline(model) | |
output = fit_data(pipeline, selected_audio) | |
st.subheader(f"Transcription:") | |
st.markdown(f"{(output['text'][0].result).title()}") |