|
import os |
|
import time |
|
import re |
|
from typing import Union, AnyStr |
|
from urllib.parse import urlparse, parse_qs |
|
import textwrap |
|
import streamlit as st |
|
import openai |
|
from openai import OpenAI |
|
from pydub import AudioSegment |
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
from deep_translator import GoogleTranslator |
|
import yt_dlp as youtube_dl |
|
from transformers import AutoModelForCausalLM, GPT2Tokenizer |
|
import torch |
|
from tqdm import trange |
|
import torch.nn.functional as F |
|
client = OpenAI( |
|
api_key='sk-proj-Yzjez2g6rfAiVPpb3cfJT3BlbkFJRLU4ZQpMhyLJDf0XksF4' |
|
) |
|
|
|
def generate_response(article_text, lang ): |
|
messages=[ |
|
{"role": "system", "content": "You are an expert in summarizing text in two languages: English and Vietnamese"}, |
|
{"role": "user", "content": f"summarize the following text professionally and return the summary according to the input language:\n{article_text}\nSummary:"} |
|
] |
|
if lang == 'vi': |
|
messages=[ |
|
{"role": "system", "content": "Bạn là chuyên gia tóm tắt văn bản bằng hai ngôn ngữ: tiếng Anh và tiếng Việt"}, |
|
{"role": "user", "content": f"hãy tóm tắt văn bản sau đây một cách chuyên nghiệp và trả về bản tóm tắt theo ngôn ngữ đầu vào:\n{article_text}\nBản Tóm tắt:"} |
|
] |
|
response = client.chat.completions.create( |
|
model='ft:gpt-3.5-turbo-0125:personal::9eZjpJwa' , |
|
messages=messages, |
|
max_tokens=150, |
|
temperature=0.3, |
|
top_p=0.95, |
|
frequency_penalty=0.5, |
|
presence_penalty=0.5 |
|
) |
|
|
|
|
|
summary = response.choices[0].message.content.strip() |
|
return summary |
|
def cleaning_input(input_text): |
|
from html import unescape |
|
text = str(input_text) |
|
text = re.sub(r'\n\s*\n', '\n', text) |
|
text = re.sub(r'[ ]+', ' ', text) |
|
text = re.sub(r'\.{2,}', '.', text) |
|
text = re.sub(r',{2,}', ',', text) |
|
text = re.sub(r'-{2,}', '-', text) |
|
text = re.sub(r'_{2,}', '_', text) |
|
text = re.sub(r'!{2,}', '!', text) |
|
text = re.sub(r'\?{2,}', '?', text) |
|
text = re.sub(r'(\d)([A-Za-z])', r'\1 \2', text) |
|
text = re.sub(r'([A-Za-z])(\d)', r'\1 \2', text) |
|
text = unescape(text) |
|
text = re.sub(r'[^\w\s\[\]\(\)\$\\.\n\/:#<>{},_"!@\\-\\*=\\]', '', text) |
|
text = re.sub(r'\s+', ' ', text) |
|
return text |
|
|
|
def top_k_top_p_filtering(logits, top_k, top_p, filter_value=-float('Inf')): |
|
""" Filter a distribution of logits using top-k and/or nucleus (top-p) filtering |
|
Args: |
|
logits: logits distribution shape (vocabulary size) |
|
top_k > 0: keep only top k tokens with highest probability (top-k filtering). |
|
top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering). |
|
Nucleus filtering is described in Holtzman et al. (http://arxiv.org/abs/1904.09751) |
|
From: https://gist.github.com/thomwolf/1a5a29f6962089e871b94cbd09daf317 |
|
""" |
|
assert logits.dim() == 1 |
|
top_k = min(top_k, logits.size(-1)) |
|
if top_k > 0: |
|
|
|
indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None] |
|
logits[indices_to_remove] = filter_value |
|
|
|
if top_p > 0.0: |
|
sorted_logits, sorted_indices = torch.sort(logits, descending=True) |
|
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) |
|
|
|
|
|
sorted_indices_to_remove = cumulative_probs > top_p |
|
|
|
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() |
|
sorted_indices_to_remove[..., 0] = 0 |
|
|
|
indices_to_remove = sorted_indices[sorted_indices_to_remove] |
|
logits[indices_to_remove] = filter_value |
|
return logits |
|
|
|
def sample_seq(model, context, length, device, temperature, top_k, top_p): |
|
""" Generates a sequence of tokens |
|
Args: |
|
model: gpt/gpt2 model |
|
context: tokenized text using gpt/gpt2 tokenizer |
|
length: length of generated sequence. |
|
device: torch.device object. |
|
temperature >0: used to control the randomness of predictions by scaling the logits before applying softmax. |
|
top_k > 0: keep only top k tokens with highest probability (top-k filtering). |
|
top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering). |
|
""" |
|
|
|
context = torch.tensor(context, dtype=torch.long, device=device) |
|
context = context.unsqueeze(0) |
|
generated = context |
|
with torch.no_grad(): |
|
for _ in trange(length): |
|
inputs = {'input_ids': generated} |
|
outputs = model( |
|
**inputs) |
|
next_token_logits = outputs[0][0, -1, :] / temperature |
|
filtered_logits = top_k_top_p_filtering(next_token_logits, top_k=top_k, top_p=top_p) |
|
next_token = torch.multinomial(F.softmax(filtered_logits, dim=-1), num_samples=1) |
|
generated = torch.cat((generated, next_token.unsqueeze(0)), dim=1) |
|
return generated |
|
def add_special_tokens(lang): |
|
""" Returns GPT2 tokenizer after adding separator and padding tokens """ |
|
token = 'gpt2' |
|
if lang =='vi': |
|
token = 'NlpHUST/gpt2-vietnamese' |
|
tokenizer = GPT2Tokenizer.from_pretrained(token) |
|
special_tokens = {'pad_token': '<|pad|>', 'sep_token': '<|sep|>'} |
|
tokenizer.add_special_tokens(special_tokens) |
|
return tokenizer |
|
|
|
def gene(t,a): |
|
|
|
tokenizer = add_special_tokens(a) |
|
article = tokenizer.encode(t)[:900] |
|
|
|
model = AutoModelForCausalLM.from_pretrained("tiennlu/GPT2en_CNNen_3k") |
|
if a=="vi": |
|
model = AutoModelForCausalLM.from_pretrained("tiennlu/GPT2vi_CNNvi_3k") |
|
generated_text = sample_seq(model, article, 50, torch.device('cpu'), temperature=1, top_k=10, top_p=0.5) |
|
generated_text = generated_text[0, len(article):].tolist() |
|
text = tokenizer.convert_ids_to_tokens(generated_text, skip_special_tokens=True) |
|
text = tokenizer.convert_tokens_to_string(text) |
|
return text |
|
|
|
|
|
def find_audio_files(path, extension=".mp3"): |
|
audio_files = [] |
|
for root, dirs, files in os.walk(path): |
|
for f in files: |
|
if f.endswith(extension): |
|
audio_files.append(os.path.join(root, f)) |
|
|
|
return audio_files |
|
|
|
|
|
def youtube_to_mp3(youtube_url: str, output_dir: str) -> Union[AnyStr, str, bytes]: |
|
ydl_config = { |
|
"format": "bestaudio/best", |
|
"postprocessors": [ |
|
{ |
|
"key": "FFmpegExtractAudio", |
|
"preferredcodec": "mp3", |
|
"preferredquality": "192", |
|
} |
|
], |
|
"outtmpl": os.path.join(output_dir, "%(title)s.%(ext)s"), |
|
"verbose": True, |
|
} |
|
|
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
with youtube_dl.YoutubeDL(ydl_config) as ydl: |
|
ydl.download([youtube_url]) |
|
|
|
return find_audio_files(output_dir)[0] |
|
|
|
|
|
def chunk_audio(filename, segment_length: int, output_dir): |
|
"""segment lenght is in seconds""" |
|
|
|
|
|
|
|
if not os.path.isdir(output_dir): |
|
os.mkdir(output_dir) |
|
|
|
audio = AudioSegment.from_mp3(filename) |
|
|
|
duration = len(audio) |
|
|
|
|
|
num_segments = duration // (segment_length * 1000) + 1 |
|
|
|
print(f"Chunking {num_segments} chunks...") |
|
|
|
|
|
for i in range(num_segments): |
|
start = i * segment_length * 1000 |
|
end = min((i + 1) * segment_length * 1000, duration) |
|
segment = audio[start:end] |
|
segment.export(os.path.join(output_dir, f"segment_{i}.mp3"), format="mp3") |
|
|
|
chunked_audio_files = find_audio_files(output_dir) |
|
return sorted(chunked_audio_files) |
|
|
|
|
|
def translate_text(text): |
|
wrapped_text = textwrap.wrap(text, 3500) |
|
tran_text = "" |
|
for line in wrapped_text: |
|
translation = GoogleTranslator(source='en', target='vi').translate(line) |
|
tran_text += translation + " " |
|
|
|
return tran_text |
|
|
|
|
|
def transcribe_audio(audio_files: list, model_name="whisper-1"): |
|
transcripts = "" |
|
for audio_file in audio_files: |
|
audio = open(audio_file, "rb") |
|
try: |
|
response = completions_with_backoff( |
|
model=model_name, file=audio, response_format="text" |
|
) |
|
transcripts += response.text + " " |
|
except openai.OpenAIError as e: |
|
print(f"An error occurred: {e}") |
|
return None |
|
return transcripts |
|
|
|
|
|
import random |
|
|
|
|
|
|
|
def retry_with_exponential_backoff( |
|
func, |
|
initial_delay: float = 1, |
|
exponential_base: float = 2, |
|
jitter: bool = True, |
|
max_retries: int = 10, |
|
errors: tuple = (openai.RateLimitError,), |
|
): |
|
def wrapper(*args, **kwargs): |
|
num_retries = 0 |
|
delay = initial_delay |
|
while True: |
|
try: |
|
return func(*args, **kwargs) |
|
except errors as e: |
|
print(f"Error: {e}") |
|
num_retries += 1 |
|
if num_retries > max_retries: |
|
raise Exception(f"Maximum number of retries ({max_retries}) exceeded.") |
|
delay *= exponential_base * (1 + jitter * random.random()) |
|
time.sleep(delay) |
|
except Exception as e: |
|
raise e |
|
|
|
return wrapper |
|
|
|
|
|
@retry_with_exponential_backoff |
|
def completions_with_backoff(**kwargs): |
|
return client.audio.translations.create(**kwargs) |
|
|
|
|
|
def get_video_id(youtube_url): |
|
"""Extract video ID from YouTube URL.""" |
|
parsed_url = urlparse(youtube_url) |
|
video_id = parse_qs(parsed_url.query).get("v") |
|
return video_id[0] if video_id else None |
|
|
|
|
|
def get_transcript(video_id): |
|
tran = [] |
|
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) |
|
transcript = transcript_list.find_generated_transcript(['vi','en']) |
|
translated_transcript = transcript.translate('en') |
|
transcript_data = translated_transcript.fetch() |
|
tran += [t['text'] for t in transcript_data if t['text'] != '[music]'] |
|
return ' '.join(tran) |
|
|
|
|
|
def chunk_text(text, chunk_size=1000, overlap_size=24): |
|
encoder = RecursiveCharacterTextSplitter().from_tiktoken_encoder(model_name="gpt-3.5-turbo", chunk_size=chunk_size, |
|
chunk_overlap=overlap_size) |
|
return encoder.split_text(text=text) |
|
|
|
|
|
def summarize_youtube_video(youtube_url, outputs_dir): |
|
|
|
video_id = get_video_id(youtube_url) |
|
en_transcript = get_transcript(video_id) |
|
if not os.path.exists(outputs_dir): |
|
os.makedirs(outputs_dir) |
|
if not en_transcript: |
|
outputs_dir = f"{outputs_dir}\\{video_id}" |
|
raw_audio_dir = f"{outputs_dir}\\raw_audio\\" |
|
chunks_dir = f"{outputs_dir}\\chunks" |
|
segment_length = 10 * 60 |
|
if not os.path.exists(outputs_dir): |
|
os.makedirs(outputs_dir) |
|
audio_filename = youtube_to_mp3(youtube_url, output_dir=raw_audio_dir) |
|
chunked_audio_files = chunk_audio( |
|
audio_filename, segment_length=segment_length, output_dir=chunks_dir |
|
) |
|
en_transcript = transcribe_audio(chunked_audio_files) |
|
en_transcript = cleaning_input(en_transcript) |
|
vi_transcript = translate_text(en_transcript) |
|
summ_en = summary(en_transcript, 'en') |
|
summ_vi = summary(vi_transcript, 'vi') |
|
return tuple(summ_en), tuple(summ_vi) |
|
|
|
|
|
def main(): |
|
st.set_page_config(layout="wide") |
|
|
|
st.title("YouTube Video Summarizer 🎥") |
|
st.markdown('<style>h1{color: orange; text-align: center;}</style>', unsafe_allow_html=True) |
|
st.subheader('Built with the GPT2, Streamlit and ❤️') |
|
st.markdown('<style>h3{color: pink; text-align: center;}</style>', unsafe_allow_html=True) |
|
|
|
|
|
with st.expander("About the App"): |
|
st.write("This app allows you to summarize while watching a YouTube video.") |
|
st.write( |
|
"Enter a YouTube URL in the input box below and click 'Submit' to start. This app is built by AI Anytime.") |
|
|
|
|
|
youtube_url = st.text_input("Enter YouTube URL") |
|
|
|
if st.button("Submit") and youtube_url: |
|
start_time = time.time() |
|
summ, tran = summarize_youtube_video(youtube_url, "./outputs") |
|
|
|
sum = summ[0] |
|
script = summ[1] |
|
sum_tran = tran[0] |
|
script_tran = tran[1] |
|
|
|
end_time = time.time() |
|
elapsed_time = end_time - start_time |
|
|
|
|
|
st.markdown(""" |
|
<div style="display: flex; justify-content: center; flex-direction: column; align-items: center;"> |
|
<div style="width: 60%; max-width: 720px;"> |
|
<iframe width="100%" height="315" src="{youtube_url}" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe> |
|
</div> |
|
<h2>Summarization of YouTube Video</h2> |
|
<p>Time taken: {elapsed_time:.2f} seconds</p> |
|
</div> |
|
""".format(youtube_url=youtube_url.replace("watch?v=", "embed/"), elapsed_time=elapsed_time), |
|
unsafe_allow_html=True) |
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
st.subheader("Transcript english") |
|
st.markdown( |
|
f'<div style="height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px;">{script}</div>', |
|
unsafe_allow_html=True) |
|
st.subheader("Summary english") |
|
st.write(sum) |
|
|
|
with col2: |
|
st.subheader("Transcript vietnamese") |
|
st.markdown( |
|
f'<div style="height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px;">{script_tran}</div>', |
|
unsafe_allow_html=True) |
|
st.subheader("Summary vietnamese") |
|
st.write(sum_tran) |
|
|
|
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
|
|
|
def chunk_overlap_text(text, chunk_size=1000, overlap_size=24): |
|
return RecursiveCharacterTextSplitter().from_tiktoken_encoder(model_name="gpt-3.5-turbo", chunk_size=chunk_size, |
|
chunk_overlap=overlap_size).split_text(text=text) |
|
|
|
|
|
def summary(text, lang): |
|
chunks = chunk_overlap_text(text) |
|
rs = "" |
|
print(len(chunks[0])) |
|
print(f"Number of chunks: {len(chunks)}") |
|
|
|
for t in chunks: |
|
generated_summary = generate_response(t, lang) |
|
rs += generated_summary + " " |
|
text = "" |
|
for t in chunks: |
|
text += t + " " |
|
return rs, text |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|