import os import time import re from typing import Union, AnyStr from urllib.parse import urlparse, parse_qs import textwrap import streamlit as st import openai from openai import OpenAI from pydub import AudioSegment from youtube_transcript_api import YouTubeTranscriptApi from deep_translator import GoogleTranslator import yt_dlp as youtube_dl from transformers import AutoModelForCausalLM, GPT2Tokenizer import torch from tqdm import trange import torch.nn.functional as F client = OpenAI( api_key='sk-proj-Yzjez2g6rfAiVPpb3cfJT3BlbkFJRLU4ZQpMhyLJDf0XksF4' ) def generate_response(article_text, lang ): messages=[ {"role": "system", "content": "You are an expert in summarizing text in two languages: English and Vietnamese"}, {"role": "user", "content": f"summarize the following text professionally and return the summary according to the input language:\n{article_text}\nSummary:"} ] if lang == 'vi': messages=[ {"role": "system", "content": "Bạn là chuyên gia tóm tắt văn bản bằng hai ngôn ngữ: tiếng Anh và tiếng Việt"}, {"role": "user", "content": f"hãy tóm tắt văn bản sau đây một cách chuyên nghiệp và trả về bản tóm tắt theo ngôn ngữ đầu vào:\n{article_text}\nBản Tóm tắt:"} ] response = client.chat.completions.create( model='ft:gpt-3.5-turbo-0125:personal::9eZjpJwa' , messages=messages, max_tokens=150, # Tăng lên để có thêm không gian cho tóm tắt temperature=0.3, # Giảm xuống để tạo ra nội dung tập trung hơn top_p=0.95, # Tăng nhẹ để mở rộng phạm vi từ vựng frequency_penalty=0.5, # Tăng lên để khuyến khích đa dạng từ ngữ presence_penalty=0.5 # Tăng lên để khuyến khích đề cập đến các chủ đề mới ) # Extract and return the generated summary summary = response.choices[0].message.content.strip() return summary def cleaning_input(input_text): from html import unescape text = str(input_text) text = re.sub(r'\n\s*\n', '\n', text) text = re.sub(r'[ ]+', ' ', text) text = re.sub(r'\.{2,}', '.', text) text = re.sub(r',{2,}', ',', text) text = re.sub(r'-{2,}', '-', text) text = re.sub(r'_{2,}', '_', text) text = re.sub(r'!{2,}', '!', text) text = re.sub(r'\?{2,}', '?', text) text = re.sub(r'(\d)([A-Za-z])', r'\1 \2', text) text = re.sub(r'([A-Za-z])(\d)', r'\1 \2', text) text = unescape(text) text = re.sub(r'[^\w\s\[\]\(\)\$\\.\n\/:#<>{},_"!@\\-\\*=\\]', '', text) text = re.sub(r'\s+', ' ', text) return text def top_k_top_p_filtering(logits, top_k, top_p, filter_value=-float('Inf')): """ Filter a distribution of logits using top-k and/or nucleus (top-p) filtering Args: logits: logits distribution shape (vocabulary size) top_k > 0: keep only top k tokens with highest probability (top-k filtering). top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering). Nucleus filtering is described in Holtzman et al. (http://arxiv.org/abs/1904.09751) From: https://gist.github.com/thomwolf/1a5a29f6962089e871b94cbd09daf317 """ assert logits.dim() == 1 # batch size 1 for now - could be updated for more but the code would be less clear top_k = min(top_k, logits.size(-1)) # Safety check if top_k > 0: # Remove all tokens with a probability less than the last token of the top-k indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None] logits[indices_to_remove] = filter_value if top_p > 0.0: sorted_logits, sorted_indices = torch.sort(logits, descending=True) cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) # Remove tokens with cumulative probability above the threshold sorted_indices_to_remove = cumulative_probs > top_p # Shift the indices to the right to keep also the first token above the threshold sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() sorted_indices_to_remove[..., 0] = 0 indices_to_remove = sorted_indices[sorted_indices_to_remove] logits[indices_to_remove] = filter_value return logits def sample_seq(model, context, length, device, temperature, top_k, top_p): """ Generates a sequence of tokens Args: model: gpt/gpt2 model context: tokenized text using gpt/gpt2 tokenizer length: length of generated sequence. device: torch.device object. temperature >0: used to control the randomness of predictions by scaling the logits before applying softmax. top_k > 0: keep only top k tokens with highest probability (top-k filtering). top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering). """ context = torch.tensor(context, dtype=torch.long, device=device) context = context.unsqueeze(0) generated = context with torch.no_grad(): for _ in trange(length): inputs = {'input_ids': generated} outputs = model( **inputs) # Note: we could also use 'past' with GPT-2/Transfo-XL/XLNet (cached hidden-states) next_token_logits = outputs[0][0, -1, :] / temperature filtered_logits = top_k_top_p_filtering(next_token_logits, top_k=top_k, top_p=top_p) next_token = torch.multinomial(F.softmax(filtered_logits, dim=-1), num_samples=1) generated = torch.cat((generated, next_token.unsqueeze(0)), dim=1) return generated def add_special_tokens(lang): """ Returns GPT2 tokenizer after adding separator and padding tokens """ token = 'gpt2' if lang =='vi': token = 'NlpHUST/gpt2-vietnamese' tokenizer = GPT2Tokenizer.from_pretrained(token) special_tokens = {'pad_token': '<|pad|>', 'sep_token': '<|sep|>'} tokenizer.add_special_tokens(special_tokens) return tokenizer def gene(t,a): tokenizer = add_special_tokens(a) article = tokenizer.encode(t)[:900] # Load model directly model = AutoModelForCausalLM.from_pretrained("tiennlu/GPT2en_CNNen_3k") if a=="vi": model = AutoModelForCausalLM.from_pretrained("tiennlu/GPT2vi_CNNvi_3k") generated_text = sample_seq(model, article, 50, torch.device('cpu'), temperature=1, top_k=10, top_p=0.5) generated_text = generated_text[0, len(article):].tolist() text = tokenizer.convert_ids_to_tokens(generated_text, skip_special_tokens=True) text = tokenizer.convert_tokens_to_string(text) return text def find_audio_files(path, extension=".mp3"): audio_files = [] for root, dirs, files in os.walk(path): for f in files: if f.endswith(extension): audio_files.append(os.path.join(root, f)) return audio_files def youtube_to_mp3(youtube_url: str, output_dir: str) -> Union[AnyStr, str, bytes]: ydl_config = { "format": "bestaudio/best", "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "192", } ], "outtmpl": os.path.join(output_dir, "%(title)s.%(ext)s"), "verbose": True, } if not os.path.exists(output_dir): os.makedirs(output_dir) with youtube_dl.YoutubeDL(ydl_config) as ydl: ydl.download([youtube_url]) return find_audio_files(output_dir)[0] def chunk_audio(filename, segment_length: int, output_dir): """segment lenght is in seconds""" # print(f"Chunking audio to {segment_length} second segments...") if not os.path.isdir(output_dir): os.mkdir(output_dir) # Load audio file audio = AudioSegment.from_mp3(filename) # Calculate duration in milliseconds duration = len(audio) # Calculate number of segments num_segments = duration // (segment_length * 1000) + 1 print(f"Chunking {num_segments} chunks...") # Iterate through segments and save them for i in range(num_segments): start = i * segment_length * 1000 end = min((i + 1) * segment_length * 1000, duration) segment = audio[start:end] segment.export(os.path.join(output_dir, f"segment_{i}.mp3"), format="mp3") chunked_audio_files = find_audio_files(output_dir) return sorted(chunked_audio_files) def translate_text(text): wrapped_text = textwrap.wrap(text, 3500) tran_text = "" for line in wrapped_text: translation = GoogleTranslator(source='en', target='vi').translate(line) tran_text += translation + " " return tran_text def transcribe_audio(audio_files: list, model_name="whisper-1"): transcripts = "" for audio_file in audio_files: audio = open(audio_file, "rb") try: response = completions_with_backoff( model=model_name, file=audio, response_format="text" ) transcripts += response.text + " " except openai.OpenAIError as e: print(f"An error occurred: {e}") return None return transcripts import random # define a retry decorator def retry_with_exponential_backoff( func, initial_delay: float = 1, exponential_base: float = 2, jitter: bool = True, max_retries: int = 10, errors: tuple = (openai.RateLimitError,), ): def wrapper(*args, **kwargs): num_retries = 0 delay = initial_delay while True: try: return func(*args, **kwargs) except errors as e: print(f"Error: {e}") num_retries += 1 if num_retries > max_retries: raise Exception(f"Maximum number of retries ({max_retries}) exceeded.") delay *= exponential_base * (1 + jitter * random.random()) time.sleep(delay) except Exception as e: raise e return wrapper @retry_with_exponential_backoff def completions_with_backoff(**kwargs): return client.audio.translations.create(**kwargs) def get_video_id(youtube_url): """Extract video ID from YouTube URL.""" parsed_url = urlparse(youtube_url) video_id = parse_qs(parsed_url.query).get("v") return video_id[0] if video_id else None def get_transcript(video_id): tran = [] transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) transcript = transcript_list.find_generated_transcript(['vi','en']) translated_transcript = transcript.translate('en') transcript_data = translated_transcript.fetch() tran += [t['text'] for t in transcript_data if t['text'] != '[music]'] return ' '.join(tran) def chunk_text(text, chunk_size=1000, overlap_size=24): encoder = RecursiveCharacterTextSplitter().from_tiktoken_encoder(model_name="gpt-3.5-turbo", chunk_size=chunk_size, chunk_overlap=overlap_size) return encoder.split_text(text=text) def summarize_youtube_video(youtube_url, outputs_dir): # Tạo đường dẫn đầy đủ cho thư mục đầu ra video_id = get_video_id(youtube_url) en_transcript = get_transcript(video_id) if not os.path.exists(outputs_dir): os.makedirs(outputs_dir) if not en_transcript: outputs_dir = f"{outputs_dir}\\{video_id}" raw_audio_dir = f"{outputs_dir}\\raw_audio\\" chunks_dir = f"{outputs_dir}\\chunks" segment_length = 10 * 60 # chunk to 10 minute segments if not os.path.exists(outputs_dir): os.makedirs(outputs_dir) audio_filename = youtube_to_mp3(youtube_url, output_dir=raw_audio_dir) chunked_audio_files = chunk_audio( audio_filename, segment_length=segment_length, output_dir=chunks_dir ) en_transcript = transcribe_audio(chunked_audio_files) en_transcript = cleaning_input(en_transcript) vi_transcript = translate_text(en_transcript) summ_en = summary(en_transcript, 'en') summ_vi = summary(vi_transcript, 'vi') return tuple(summ_en), tuple(summ_vi) def main(): st.set_page_config(layout="wide") st.title("YouTube Video Summarizer 🎥") st.markdown('', unsafe_allow_html=True) st.subheader('Built with the GPT2, Streamlit and ❤️') st.markdown('', unsafe_allow_html=True) # Expander for app details with st.expander("About the App"): st.write("This app allows you to summarize while watching a YouTube video.") st.write( "Enter a YouTube URL in the input box below and click 'Submit' to start. This app is built by AI Anytime.") # Input box for YouTube URL youtube_url = st.text_input("Enter YouTube URL") # Submit button if st.button("Submit") and youtube_url: start_time = time.time() # Start the timer summ, tran = summarize_youtube_video(youtube_url, "./outputs") sum = summ[0] script = summ[1] sum_tran = tran[0] script_tran = tran[1] end_time = time.time() # End the timer elapsed_time = end_time - start_time # Centering the video and elapsed time st.markdown("""
Time taken: {elapsed_time:.2f} seconds