import gradio as gr # from transformers import pipeline # from pydub import AudioSegment # from pydub.silence import split_on_silence # import io import time import yt_dlp from pydub.silence import detect_nonsilent from pydub import AudioSegment import os from openai import OpenAI import random import string import json from faster_whisper import WhisperModel # Initialize OpenAI API client OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") OPEN_AI_CLIENT = OpenAI(api_key=OPEN_AI_KEY) PASSWORD = os.getenv("PASSWORD") # Initialize the Whisper model pipeline for transcription # transcription_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-large") def random_filename(length=10): letters = string.ascii_lowercase result_str = ''.join(random.choice(letters) for i in range(length)) return result_str def download_youtube_audio(youtube_url): """ Download the audio from a YouTube video and return the path to the audio file. """ # random_filename filename = random_filename() codec_name = "mp3" ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': codec_name, 'preferredquality': '192' }], 'outtmpl': f'{filename}.%(ext)s', } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([youtube_url]) return f"{filename}.{codec_name}" # def transcribe_audio_by_transformers_pipeline(audio_path): # """ # Convert audio to text using Whisper model and return chunks with timestamps. # """ # start_time = time.time() # 函数开始执行的时间 # # Load audio file # audio = AudioSegment.from_file(audio_path) # # Split audio on silence # chunks = split_on_silence(audio, # min_silence_len=500, # silence_thresh=audio.dBFS-14, # keep_silence=500) # transcriptions = [] # print( f"Transcribing {len(chunks)} chunks") # # Process each chunk # for i, chunk in enumerate(chunks): # byte_io = io.BytesIO() # chunk.export(byte_io, format="wav") # audio_data = byte_io.getvalue() # # Transcribe chunk # result = transcription_pipeline(audio_data) # text = result['text'] # # Calculate timestamps # start_time = sum(chunks[j].duration_seconds for j in range(i)) # end_time = start_time + chunk.duration_seconds # # Format timestamp and text # transcriptions.append(f"[{start_time:.3f} -> {end_time:.3f}] {text}") # print(f"Chunk {i+1}/{len(chunks)} transcribed") # transcription = "\n".join(transcriptions) # end_time = time.time() # 函数结束执行的时间 # processing_time = end_time - start_time # 计算处理时间 # print("start_time:", start_time) # print("end_time:", end_time) # print("processing_time:", processing_time) # return transcription, processing_time def transcribe_audio_by_whisper(audio_path): # Whisper模型的轉錄實現 start_time = time.time() model_size = "large-v3" # Run on GPU with FP16 # model = WhisperModel(model_size, device="cuda", compute_type="float16") # or run on GPU with INT8 # model = WhisperModel(model_size, device="cuda", compute_type="int8_float16") # or run on CPU with INT8 model = WhisperModel(model_size, device="cpu", compute_type="int8") segments, info = model.transcribe(audio_path, beam_size=5) print("Detected language '%s' with probability %f" % (info.language, info.language_probability)) transcription = "" for segment in segments: print("[%.2fs -> %.2fs] %s" % (segment.start, segment.end, segment.text)) # SRT格式 transcription += f"[{segment.start:.3f} -> {segment.end:.3f}] {segment.text}\n" end_time = time.time() # 函数结束执行的时间 processing_time = int(end_time - start_time) return transcription, processing_time def transcribe_audio_by_open_ai(audio_path): # OpenAI語音識別的轉錄實現 start_time = time.time() # # 讀取音頻文件 # audio = AudioSegment.from_wav(audio_path) # # 先找到所有非靜音片段的開始和結束時間 # nonsilent_ranges = detect_nonsilent(audio, min_silence_len=50, silence_thresh=-40) # def ms_to_srt_time(ms): # sec, ms = divmod(ms, 1000) # min, sec = divmod(sec, 60) # hr, min = divmod(min, 60) # return f"{hr:02}:{min:02}:{sec:02},{ms:03}" # def merge_short_ranges(ranges, min_duration=1500, max_duration=3000): # """ # Merge consecutive short durations into the previous range if merging doesn't exceed max_duration. # Args: # ranges (List[Tuple[int, int]]): List of start and end times. # min_duration (int): Minimum duration for a range to be considered valid. # max_duration (int): Maximum duration for a merged range. # Returns: # List[Tuple[int, int]]: Modified list of start and end times. # """ # merged_ranges = [] # for start, end in ranges: # if merged_ranges: # prev_start, prev_end = merged_ranges[-1] # # Check if current range is short and if merging doesn't exceed max_duration # if end - start < min_duration and (end - prev_start) <= max_duration: # # Modify the end time of the last range in the list # merged_ranges[-1] = (prev_start, end) # else: # merged_ranges.append((start, end)) # else: # merged_ranges.append((start, end)) # return merged_ranges # def filter_short_ranges(ranges, min_duration=100): # 0.1秒等於100毫秒 # """ # Filter out short durations. # Args: # ranges (List[Tuple[int, int]]): List of start and end times. # min_duration (int): Minimum duration for a range to be considered valid. # Returns: # List[Tuple[int, int]]: Filtered list of start and end times. # """ # return [r for r in ranges if (r[1] - r[0]) >= min_duration] # nonsilent_ranges = merge_short_ranges(nonsilent_ranges) # nonsilent_ranges = filter_short_ranges(nonsilent_ranges) # print(nonsilent_ranges) # srt_content = "" # counter = 1 # for start, end in nonsilent_ranges: # chunk = audio[start:end] # chunk.export("temp_chunk.wav", format="wav") # with open("temp_chunk.wav", "rb") as audio_file: # transcript = OPEN_AI_CLIENT.audio.transcriptions.create( # model="whisper-1", # file=audio_file, # response_format="vtt" # ) # srt_content += f"{counter}\n" # srt_content += f"{ms_to_srt_time(start)} --> {ms_to_srt_time(end)}\n" # srt_content += f"{transcript}\n\n" # counter += 1 # # 列印SRT # print(srt_content) with open(audio_path, "rb") as audio_file: srt_content = OPEN_AI_CLIENT.audio.transcriptions.create( model="whisper-1", file=audio_file, response_format="verbose_json", timestamp_granularities=["segment"], prompt="Transcribe the following audio file. if chinese, please using 'language: zh-TW' in the prompt.", ) # get segments segments = srt_content.segments transformed_data = [ { "text": item["text"], "start": round(item["start"], 3), "duration": round(item["end"] - item["start"], 3) } for item in segments ] transcription = json.dumps(transformed_data, indent=2, ensure_ascii=False) end_time = time.time() # 函数结束执行的时间 processing_time = int(end_time - start_time) return transcription, processing_time def process_youtube_video(password, url, method): """ Download YouTube audio and transcribe it. """ # Check password if password != PASSWORD: raise ValueError("Invalid password.") # Download audio audio_path = download_youtube_audio(url) # if method == 'transformers': # transcription, processing_time = transcribe_audio_by_transformers_pipeline(audio_path) # el if method == 'whisper': transcription, processing_time = transcribe_audio_by_whisper(audio_path) elif method == 'open_ai': transcription, processing_time = transcribe_audio_by_open_ai(audio_path) else: raise ValueError("Unknown transcription method.") return transcription, processing_time # Create Gradio interface with gr.Blocks() as demo: with gr.Row(): password = gr.Textbox(label="Password") url = gr.Textbox(label="YouTube URL") # send_btn = gr.Button() with gr.Row(): # Transformers Pipeline transcribe_transformers_result = gr.Textbox(label="Transcription by Transformers") transformers_time = gr.Textbox(label="Processing Time") transcribe_transformers_btn = gr.Button("Transcribe by Transformers") with gr.Row(): # Whisper transcribe_whisper_result = gr.Textbox(label="Transcription by Whisper") whisper_time = gr.Textbox(label="Processing Time") transcribe_whisper_btn = gr.Button("Transcribe by Whisper") with gr.Row(): # Open AI transcribe_openai_result = gr.Textbox(label="Transcription by Open AI") openai_time = gr.Textbox(label="Processing Time") transcribe_openai_btn = gr.Button("Transcribe by Open AI") transcribe_transformers_btn.click( process_youtube_video, inputs=[password, url, gr.Textbox(value="transformers")], outputs=[transcribe_transformers_result, transformers_time] ) transcribe_whisper_btn.click( process_youtube_video, inputs=[password, url, gr.Textbox(value="whisper")], outputs=[transcribe_whisper_result, whisper_time] ) transcribe_openai_btn.click( process_youtube_video, inputs=[password, url, gr.Textbox(value="open_ai")], outputs=[transcribe_openai_result, openai_time] ) demo.launch()