import gradio as gr | |
# from transformers import pipeline | |
# from pydub import AudioSegment | |
# from pydub.silence import split_on_silence | |
# import io | |
import time | |
import yt_dlp | |
from pydub.silence import detect_nonsilent | |
from pydub import AudioSegment | |
import os | |
from openai import OpenAI | |
import random | |
import string | |
import json | |
from faster_whisper import WhisperModel | |
# Initialize OpenAI API client | |
OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") | |
OPEN_AI_CLIENT = OpenAI(api_key=OPEN_AI_KEY) | |
PASSWORD = os.getenv("PASSWORD") | |
# Initialize the Whisper model pipeline for transcription | |
# transcription_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-large") | |
def random_filename(length=10): | |
letters = string.ascii_lowercase | |
result_str = ''.join(random.choice(letters) for i in range(length)) | |
return result_str | |
def download_youtube_audio(youtube_url): | |
""" | |
Download the audio from a YouTube video and return the path to the audio file. | |
""" | |
# random_filename | |
filename = random_filename() | |
codec_name = "mp3" | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': codec_name, | |
'preferredquality': '192' | |
}], | |
'outtmpl': f'{filename}.%(ext)s', | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([youtube_url]) | |
return f"{filename}.{codec_name}" | |
# def transcribe_audio_by_transformers_pipeline(audio_path): | |
# """ | |
# Convert audio to text using Whisper model and return chunks with timestamps. | |
# """ | |
# start_time = time.time() # 函数开始执行的时间 | |
# # Load audio file | |
# audio = AudioSegment.from_file(audio_path) | |
# # Split audio on silence | |
# chunks = split_on_silence(audio, | |
# min_silence_len=500, | |
# silence_thresh=audio.dBFS-14, | |
# keep_silence=500) | |
# transcriptions = [] | |
# print( f"Transcribing {len(chunks)} chunks") | |
# # Process each chunk | |
# for i, chunk in enumerate(chunks): | |
# byte_io = io.BytesIO() | |
# chunk.export(byte_io, format="wav") | |
# audio_data = byte_io.getvalue() | |
# # Transcribe chunk | |
# result = transcription_pipeline(audio_data) | |
# text = result['text'] | |
# # Calculate timestamps | |
# start_time = sum(chunks[j].duration_seconds for j in range(i)) | |
# end_time = start_time + chunk.duration_seconds | |
# # Format timestamp and text | |
# transcriptions.append(f"[{start_time:.3f} -> {end_time:.3f}] {text}") | |
# print(f"Chunk {i+1}/{len(chunks)} transcribed") | |
# transcription = "\n".join(transcriptions) | |
# end_time = time.time() # 函数结束执行的时间 | |
# processing_time = end_time - start_time # 计算处理时间 | |
# print("start_time:", start_time) | |
# print("end_time:", end_time) | |
# print("processing_time:", processing_time) | |
# return transcription, processing_time | |
def transcribe_audio_by_whisper(audio_path): | |
# Whisper模型的轉錄實現 | |
start_time = time.time() | |
model_size = "large-v3" | |
# Run on GPU with FP16 | |
# model = WhisperModel(model_size, device="cuda", compute_type="float16") | |
# or run on GPU with INT8 | |
# model = WhisperModel(model_size, device="cuda", compute_type="int8_float16") | |
# or run on CPU with INT8 | |
model = WhisperModel(model_size, device="cpu", compute_type="int8") | |
segments, info = model.transcribe(audio_path, beam_size=5) | |
print("Detected language '%s' with probability %f" % (info.language, info.language_probability)) | |
transcription = "" | |
for segment in segments: | |
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text)) | |
# SRT格式 | |
transcription += f"[{segment.start:.3f} -> {segment.end:.3f}] {segment.text}\n" | |
end_time = time.time() # 函数结束执行的时间 | |
processing_time = int(end_time - start_time) | |
return transcription, processing_time | |
def transcribe_audio_by_open_ai(audio_path): | |
# OpenAI語音識別的轉錄實現 | |
start_time = time.time() | |
# # 讀取音頻文件 | |
# audio = AudioSegment.from_wav(audio_path) | |
# # 先找到所有非靜音片段的開始和結束時間 | |
# nonsilent_ranges = detect_nonsilent(audio, min_silence_len=50, silence_thresh=-40) | |
# def ms_to_srt_time(ms): | |
# sec, ms = divmod(ms, 1000) | |
# min, sec = divmod(sec, 60) | |
# hr, min = divmod(min, 60) | |
# return f"{hr:02}:{min:02}:{sec:02},{ms:03}" | |
# def merge_short_ranges(ranges, min_duration=1500, max_duration=3000): | |
# """ | |
# Merge consecutive short durations into the previous range if merging doesn't exceed max_duration. | |
# Args: | |
# ranges (List[Tuple[int, int]]): List of start and end times. | |
# min_duration (int): Minimum duration for a range to be considered valid. | |
# max_duration (int): Maximum duration for a merged range. | |
# Returns: | |
# List[Tuple[int, int]]: Modified list of start and end times. | |
# """ | |
# merged_ranges = [] | |
# for start, end in ranges: | |
# if merged_ranges: | |
# prev_start, prev_end = merged_ranges[-1] | |
# # Check if current range is short and if merging doesn't exceed max_duration | |
# if end - start < min_duration and (end - prev_start) <= max_duration: | |
# # Modify the end time of the last range in the list | |
# merged_ranges[-1] = (prev_start, end) | |
# else: | |
# merged_ranges.append((start, end)) | |
# else: | |
# merged_ranges.append((start, end)) | |
# return merged_ranges | |
# def filter_short_ranges(ranges, min_duration=100): # 0.1秒等於100毫秒 | |
# """ | |
# Filter out short durations. | |
# Args: | |
# ranges (List[Tuple[int, int]]): List of start and end times. | |
# min_duration (int): Minimum duration for a range to be considered valid. | |
# Returns: | |
# List[Tuple[int, int]]: Filtered list of start and end times. | |
# """ | |
# return [r for r in ranges if (r[1] - r[0]) >= min_duration] | |
# nonsilent_ranges = merge_short_ranges(nonsilent_ranges) | |
# nonsilent_ranges = filter_short_ranges(nonsilent_ranges) | |
# print(nonsilent_ranges) | |
# srt_content = "" | |
# counter = 1 | |
# for start, end in nonsilent_ranges: | |
# chunk = audio[start:end] | |
# chunk.export("temp_chunk.wav", format="wav") | |
# with open("temp_chunk.wav", "rb") as audio_file: | |
# transcript = OPEN_AI_CLIENT.audio.transcriptions.create( | |
# model="whisper-1", | |
# file=audio_file, | |
# response_format="vtt" | |
# ) | |
# srt_content += f"{counter}\n" | |
# srt_content += f"{ms_to_srt_time(start)} --> {ms_to_srt_time(end)}\n" | |
# srt_content += f"{transcript}\n\n" | |
# counter += 1 | |
# # 列印SRT | |
# print(srt_content) | |
with open(audio_path, "rb") as audio_file: | |
srt_content = OPEN_AI_CLIENT.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file, | |
response_format="verbose_json", | |
timestamp_granularities=["segment"], | |
prompt="Transcribe the following audio file. if chinese, please using 'language: zh-TW' in the prompt.", | |
) | |
# get segments | |
segments = srt_content.segments | |
transformed_data = [ | |
{ | |
"text": item["text"], | |
"start": round(item["start"], 3), | |
"duration": round(item["end"] - item["start"], 3) | |
} | |
for item in segments | |
] | |
transcription = json.dumps(transformed_data, indent=2, ensure_ascii=False) | |
end_time = time.time() # 函数结束执行的时间 | |
processing_time = int(end_time - start_time) | |
return transcription, processing_time | |
def process_youtube_video(password, url, method): | |
""" | |
Download YouTube audio and transcribe it. | |
""" | |
# Check password | |
if password != PASSWORD: | |
raise ValueError("Invalid password.") | |
# Download audio | |
audio_path = download_youtube_audio(url) | |
# if method == 'transformers': | |
# transcription, processing_time = transcribe_audio_by_transformers_pipeline(audio_path) | |
# el | |
if method == 'whisper': | |
transcription, processing_time = transcribe_audio_by_whisper(audio_path) | |
elif method == 'open_ai': | |
transcription, processing_time = transcribe_audio_by_open_ai(audio_path) | |
else: | |
raise ValueError("Unknown transcription method.") | |
return transcription, processing_time | |
# Create Gradio interface | |
with gr.Blocks() as demo: | |
with gr.Row(): | |
password = gr.Textbox(label="Password") | |
url = gr.Textbox(label="YouTube URL") | |
# send_btn = gr.Button() | |
with gr.Row(): | |
# Transformers Pipeline | |
transcribe_transformers_result = gr.Textbox(label="Transcription by Transformers") | |
transformers_time = gr.Textbox(label="Processing Time") | |
transcribe_transformers_btn = gr.Button("Transcribe by Transformers") | |
with gr.Row(): | |
# Whisper | |
transcribe_whisper_result = gr.Textbox(label="Transcription by Whisper") | |
whisper_time = gr.Textbox(label="Processing Time") | |
transcribe_whisper_btn = gr.Button("Transcribe by Whisper") | |
with gr.Row(): | |
# Open AI | |
transcribe_openai_result = gr.Textbox(label="Transcription by Open AI") | |
openai_time = gr.Textbox(label="Processing Time") | |
transcribe_openai_btn = gr.Button("Transcribe by Open AI") | |
transcribe_transformers_btn.click( | |
process_youtube_video, | |
inputs=[password, url, gr.Textbox(value="transformers")], | |
outputs=[transcribe_transformers_result, transformers_time] | |
) | |
transcribe_whisper_btn.click( | |
process_youtube_video, | |
inputs=[password, url, gr.Textbox(value="whisper")], | |
outputs=[transcribe_whisper_result, whisper_time] | |
) | |
transcribe_openai_btn.click( | |
process_youtube_video, | |
inputs=[password, url, gr.Textbox(value="open_ai")], | |
outputs=[transcribe_openai_result, openai_time] | |
) | |
demo.launch() | |