YT_SRT_2 / app.py
youngtsai's picture
prompt="Transcribe the following audio file. if chinese, please using 'language: zh-TW' in the prompt.",
4712f49
import gradio as gr
# from transformers import pipeline
# from pydub import AudioSegment
# from pydub.silence import split_on_silence
# import io
import time
import yt_dlp
from pydub.silence import detect_nonsilent
from pydub import AudioSegment
import os
from openai import OpenAI
import random
import string
import json
from faster_whisper import WhisperModel
# Initialize OpenAI API client
OPEN_AI_KEY = os.getenv("OPEN_AI_KEY")
OPEN_AI_CLIENT = OpenAI(api_key=OPEN_AI_KEY)
PASSWORD = os.getenv("PASSWORD")
# Initialize the Whisper model pipeline for transcription
# transcription_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-large")
def random_filename(length=10):
letters = string.ascii_lowercase
result_str = ''.join(random.choice(letters) for i in range(length))
return result_str
def download_youtube_audio(youtube_url):
"""
Download the audio from a YouTube video and return the path to the audio file.
"""
# random_filename
filename = random_filename()
codec_name = "mp3"
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': codec_name,
'preferredquality': '192'
}],
'outtmpl': f'{filename}.%(ext)s',
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([youtube_url])
return f"{filename}.{codec_name}"
# def transcribe_audio_by_transformers_pipeline(audio_path):
# """
# Convert audio to text using Whisper model and return chunks with timestamps.
# """
# start_time = time.time() # 函数开始执行的时间
# # Load audio file
# audio = AudioSegment.from_file(audio_path)
# # Split audio on silence
# chunks = split_on_silence(audio,
# min_silence_len=500,
# silence_thresh=audio.dBFS-14,
# keep_silence=500)
# transcriptions = []
# print( f"Transcribing {len(chunks)} chunks")
# # Process each chunk
# for i, chunk in enumerate(chunks):
# byte_io = io.BytesIO()
# chunk.export(byte_io, format="wav")
# audio_data = byte_io.getvalue()
# # Transcribe chunk
# result = transcription_pipeline(audio_data)
# text = result['text']
# # Calculate timestamps
# start_time = sum(chunks[j].duration_seconds for j in range(i))
# end_time = start_time + chunk.duration_seconds
# # Format timestamp and text
# transcriptions.append(f"[{start_time:.3f} -> {end_time:.3f}] {text}")
# print(f"Chunk {i+1}/{len(chunks)} transcribed")
# transcription = "\n".join(transcriptions)
# end_time = time.time() # 函数结束执行的时间
# processing_time = end_time - start_time # 计算处理时间
# print("start_time:", start_time)
# print("end_time:", end_time)
# print("processing_time:", processing_time)
# return transcription, processing_time
def transcribe_audio_by_whisper(audio_path):
# Whisper模型的轉錄實現
start_time = time.time()
model_size = "large-v3"
# Run on GPU with FP16
# model = WhisperModel(model_size, device="cuda", compute_type="float16")
# or run on GPU with INT8
# model = WhisperModel(model_size, device="cuda", compute_type="int8_float16")
# or run on CPU with INT8
model = WhisperModel(model_size, device="cpu", compute_type="int8")
segments, info = model.transcribe(audio_path, beam_size=5)
print("Detected language '%s' with probability %f" % (info.language, info.language_probability))
transcription = ""
for segment in segments:
print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text))
# SRT格式
transcription += f"[{segment.start:.3f} -> {segment.end:.3f}] {segment.text}\n"
end_time = time.time() # 函数结束执行的时间
processing_time = int(end_time - start_time)
return transcription, processing_time
def transcribe_audio_by_open_ai(audio_path):
# OpenAI語音識別的轉錄實現
start_time = time.time()
# # 讀取音頻文件
# audio = AudioSegment.from_wav(audio_path)
# # 先找到所有非靜音片段的開始和結束時間
# nonsilent_ranges = detect_nonsilent(audio, min_silence_len=50, silence_thresh=-40)
# def ms_to_srt_time(ms):
# sec, ms = divmod(ms, 1000)
# min, sec = divmod(sec, 60)
# hr, min = divmod(min, 60)
# return f"{hr:02}:{min:02}:{sec:02},{ms:03}"
# def merge_short_ranges(ranges, min_duration=1500, max_duration=3000):
# """
# Merge consecutive short durations into the previous range if merging doesn't exceed max_duration.
# Args:
# ranges (List[Tuple[int, int]]): List of start and end times.
# min_duration (int): Minimum duration for a range to be considered valid.
# max_duration (int): Maximum duration for a merged range.
# Returns:
# List[Tuple[int, int]]: Modified list of start and end times.
# """
# merged_ranges = []
# for start, end in ranges:
# if merged_ranges:
# prev_start, prev_end = merged_ranges[-1]
# # Check if current range is short and if merging doesn't exceed max_duration
# if end - start < min_duration and (end - prev_start) <= max_duration:
# # Modify the end time of the last range in the list
# merged_ranges[-1] = (prev_start, end)
# else:
# merged_ranges.append((start, end))
# else:
# merged_ranges.append((start, end))
# return merged_ranges
# def filter_short_ranges(ranges, min_duration=100): # 0.1秒等於100毫秒
# """
# Filter out short durations.
# Args:
# ranges (List[Tuple[int, int]]): List of start and end times.
# min_duration (int): Minimum duration for a range to be considered valid.
# Returns:
# List[Tuple[int, int]]: Filtered list of start and end times.
# """
# return [r for r in ranges if (r[1] - r[0]) >= min_duration]
# nonsilent_ranges = merge_short_ranges(nonsilent_ranges)
# nonsilent_ranges = filter_short_ranges(nonsilent_ranges)
# print(nonsilent_ranges)
# srt_content = ""
# counter = 1
# for start, end in nonsilent_ranges:
# chunk = audio[start:end]
# chunk.export("temp_chunk.wav", format="wav")
# with open("temp_chunk.wav", "rb") as audio_file:
# transcript = OPEN_AI_CLIENT.audio.transcriptions.create(
# model="whisper-1",
# file=audio_file,
# response_format="vtt"
# )
# srt_content += f"{counter}\n"
# srt_content += f"{ms_to_srt_time(start)} --> {ms_to_srt_time(end)}\n"
# srt_content += f"{transcript}\n\n"
# counter += 1
# # 列印SRT
# print(srt_content)
with open(audio_path, "rb") as audio_file:
srt_content = OPEN_AI_CLIENT.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="verbose_json",
timestamp_granularities=["segment"],
prompt="Transcribe the following audio file. if chinese, please using 'language: zh-TW' in the prompt.",
)
# get segments
segments = srt_content.segments
transformed_data = [
{
"text": item["text"],
"start": round(item["start"], 3),
"duration": round(item["end"] - item["start"], 3)
}
for item in segments
]
transcription = json.dumps(transformed_data, indent=2, ensure_ascii=False)
end_time = time.time() # 函数结束执行的时间
processing_time = int(end_time - start_time)
return transcription, processing_time
def process_youtube_video(password, url, method):
"""
Download YouTube audio and transcribe it.
"""
# Check password
if password != PASSWORD:
raise ValueError("Invalid password.")
# Download audio
audio_path = download_youtube_audio(url)
# if method == 'transformers':
# transcription, processing_time = transcribe_audio_by_transformers_pipeline(audio_path)
# el
if method == 'whisper':
transcription, processing_time = transcribe_audio_by_whisper(audio_path)
elif method == 'open_ai':
transcription, processing_time = transcribe_audio_by_open_ai(audio_path)
else:
raise ValueError("Unknown transcription method.")
return transcription, processing_time
# Create Gradio interface
with gr.Blocks() as demo:
with gr.Row():
password = gr.Textbox(label="Password")
url = gr.Textbox(label="YouTube URL")
# send_btn = gr.Button()
with gr.Row():
# Transformers Pipeline
transcribe_transformers_result = gr.Textbox(label="Transcription by Transformers")
transformers_time = gr.Textbox(label="Processing Time")
transcribe_transformers_btn = gr.Button("Transcribe by Transformers")
with gr.Row():
# Whisper
transcribe_whisper_result = gr.Textbox(label="Transcription by Whisper")
whisper_time = gr.Textbox(label="Processing Time")
transcribe_whisper_btn = gr.Button("Transcribe by Whisper")
with gr.Row():
# Open AI
transcribe_openai_result = gr.Textbox(label="Transcription by Open AI")
openai_time = gr.Textbox(label="Processing Time")
transcribe_openai_btn = gr.Button("Transcribe by Open AI")
transcribe_transformers_btn.click(
process_youtube_video,
inputs=[password, url, gr.Textbox(value="transformers")],
outputs=[transcribe_transformers_result, transformers_time]
)
transcribe_whisper_btn.click(
process_youtube_video,
inputs=[password, url, gr.Textbox(value="whisper")],
outputs=[transcribe_whisper_result, whisper_time]
)
transcribe_openai_btn.click(
process_youtube_video,
inputs=[password, url, gr.Textbox(value="open_ai")],
outputs=[transcribe_openai_result, openai_time]
)
demo.launch()