tl / app.py
tiennlu's picture
Update app.py
d744d35 verified
raw
history blame
15.7 kB
import os
import time
import re
from typing import Union, AnyStr
from urllib.parse import urlparse, parse_qs
import textwrap
import streamlit as st
import openai
from openai import OpenAI
from pydub import AudioSegment
from youtube_transcript_api import YouTubeTranscriptApi
from deep_translator import GoogleTranslator
import yt_dlp as youtube_dl
from transformers import AutoModelForCausalLM, GPT2Tokenizer
import torch
from tqdm import trange
import torch.nn.functional as F
client = OpenAI(
api_key='sk-proj-Yzjez2g6rfAiVPpb3cfJT3BlbkFJRLU4ZQpMhyLJDf0XksF4'
)
def generate_response(article_text, lang ):
messages=[
{"role": "system", "content": "You are an expert in summarizing text in two languages: English and Vietnamese"},
{"role": "user", "content": f"summarize the following text professionally and return the summary according to the input language:\n{article_text}\nSummary:"}
]
if lang == 'vi':
messages=[
{"role": "system", "content": "Bạn là chuyên gia tóm tắt văn bản bằng hai ngôn ngữ: tiếng Anh và tiếng Việt"},
{"role": "user", "content": f"hãy tóm tắt văn bản sau đây một cách chuyên nghiệp và trả về bản tóm tắt theo ngôn ngữ đầu vào:\n{article_text}\nBản Tóm tắt:"}
]
response = client.chat.completions.create(
model='ft:gpt-3.5-turbo-0125:personal::9eZjpJwa' ,
messages=messages,
max_tokens=150, # Tăng lên để có thêm không gian cho tóm tắt
temperature=0.3, # Giảm xuống để tạo ra nội dung tập trung hơn
top_p=0.95, # Tăng nhẹ để mở rộng phạm vi từ vựng
frequency_penalty=0.5, # Tăng lên để khuyến khích đa dạng từ ngữ
presence_penalty=0.5 # Tăng lên để khuyến khích đề cập đến các chủ đề mới
)
# Extract and return the generated summary
summary = response.choices[0].message.content.strip()
return summary
def cleaning_input(input_text):
from html import unescape
text = str(input_text)
text = re.sub(r'\n\s*\n', '\n', text)
text = re.sub(r'[ ]+', ' ', text)
text = re.sub(r'\.{2,}', '.', text)
text = re.sub(r',{2,}', ',', text)
text = re.sub(r'-{2,}', '-', text)
text = re.sub(r'_{2,}', '_', text)
text = re.sub(r'!{2,}', '!', text)
text = re.sub(r'\?{2,}', '?', text)
text = re.sub(r'(\d)([A-Za-z])', r'\1 \2', text)
text = re.sub(r'([A-Za-z])(\d)', r'\1 \2', text)
text = unescape(text)
text = re.sub(r'[^\w\s\[\]\(\)\$\\.\n\/:#<>{},_"!@\\-\\*=\\]', '', text)
text = re.sub(r'\s+', ' ', text)
return text
def top_k_top_p_filtering(logits, top_k, top_p, filter_value=-float('Inf')):
""" Filter a distribution of logits using top-k and/or nucleus (top-p) filtering
Args:
logits: logits distribution shape (vocabulary size)
top_k > 0: keep only top k tokens with highest probability (top-k filtering).
top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering).
Nucleus filtering is described in Holtzman et al. (http://arxiv.org/abs/1904.09751)
From: https://gist.github.com/thomwolf/1a5a29f6962089e871b94cbd09daf317
"""
assert logits.dim() == 1 # batch size 1 for now - could be updated for more but the code would be less clear
top_k = min(top_k, logits.size(-1)) # Safety check
if top_k > 0:
# Remove all tokens with a probability less than the last token of the top-k
indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None]
logits[indices_to_remove] = filter_value
if top_p > 0.0:
sorted_logits, sorted_indices = torch.sort(logits, descending=True)
cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1)
# Remove tokens with cumulative probability above the threshold
sorted_indices_to_remove = cumulative_probs > top_p
# Shift the indices to the right to keep also the first token above the threshold
sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone()
sorted_indices_to_remove[..., 0] = 0
indices_to_remove = sorted_indices[sorted_indices_to_remove]
logits[indices_to_remove] = filter_value
return logits
def sample_seq(model, context, length, device, temperature, top_k, top_p):
""" Generates a sequence of tokens
Args:
model: gpt/gpt2 model
context: tokenized text using gpt/gpt2 tokenizer
length: length of generated sequence.
device: torch.device object.
temperature >0: used to control the randomness of predictions by scaling the logits before applying softmax.
top_k > 0: keep only top k tokens with highest probability (top-k filtering).
top_p > 0.0: keep the top tokens with cumulative probability >= top_p (nucleus filtering).
"""
context = torch.tensor(context, dtype=torch.long, device=device)
context = context.unsqueeze(0)
generated = context
with torch.no_grad():
for _ in trange(length):
inputs = {'input_ids': generated}
outputs = model(
**inputs) # Note: we could also use 'past' with GPT-2/Transfo-XL/XLNet (cached hidden-states)
next_token_logits = outputs[0][0, -1, :] / temperature
filtered_logits = top_k_top_p_filtering(next_token_logits, top_k=top_k, top_p=top_p)
next_token = torch.multinomial(F.softmax(filtered_logits, dim=-1), num_samples=1)
generated = torch.cat((generated, next_token.unsqueeze(0)), dim=1)
return generated
def add_special_tokens(lang):
""" Returns GPT2 tokenizer after adding separator and padding tokens """
token = 'gpt2'
if lang =='vi':
token = 'NlpHUST/gpt2-vietnamese'
tokenizer = GPT2Tokenizer.from_pretrained(token)
special_tokens = {'pad_token': '<|pad|>', 'sep_token': '<|sep|>'}
tokenizer.add_special_tokens(special_tokens)
return tokenizer
def gene(t,a):
tokenizer = add_special_tokens(a)
article = tokenizer.encode(t)[:900]
# Load model directly
model = AutoModelForCausalLM.from_pretrained("tiennlu/GPT2en_CNNen_3k")
if a=="vi":
model = AutoModelForCausalLM.from_pretrained("tiennlu/GPT2vi_CNNvi_3k")
generated_text = sample_seq(model, article, 50, torch.device('cpu'), temperature=1, top_k=10, top_p=0.5)
generated_text = generated_text[0, len(article):].tolist()
text = tokenizer.convert_ids_to_tokens(generated_text, skip_special_tokens=True)
text = tokenizer.convert_tokens_to_string(text)
return text
def find_audio_files(path, extension=".mp3"):
audio_files = []
for root, dirs, files in os.walk(path):
for f in files:
if f.endswith(extension):
audio_files.append(os.path.join(root, f))
return audio_files
def youtube_to_mp3(youtube_url: str, output_dir: str) -> Union[AnyStr, str, bytes]:
ydl_config = {
"format": "bestaudio/best",
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}
],
"outtmpl": os.path.join(output_dir, "%(title)s.%(ext)s"),
"verbose": True,
}
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with youtube_dl.YoutubeDL(ydl_config) as ydl:
ydl.download([youtube_url])
return find_audio_files(output_dir)[0]
def chunk_audio(filename, segment_length: int, output_dir):
"""segment lenght is in seconds"""
# print(f"Chunking audio to {segment_length} second segments...")
if not os.path.isdir(output_dir):
os.mkdir(output_dir)
# Load audio file
audio = AudioSegment.from_mp3(filename)
# Calculate duration in milliseconds
duration = len(audio)
# Calculate number of segments
num_segments = duration // (segment_length * 1000) + 1
print(f"Chunking {num_segments} chunks...")
# Iterate through segments and save them
for i in range(num_segments):
start = i * segment_length * 1000
end = min((i + 1) * segment_length * 1000, duration)
segment = audio[start:end]
segment.export(os.path.join(output_dir, f"segment_{i}.mp3"), format="mp3")
chunked_audio_files = find_audio_files(output_dir)
return sorted(chunked_audio_files)
def translate_text(text):
wrapped_text = textwrap.wrap(text, 3500)
tran_text = ""
for line in wrapped_text:
translation = GoogleTranslator(source='en', target='vi').translate(line)
tran_text += translation + " "
return tran_text
def transcribe_audio(audio_files: list, model_name="whisper-1"):
transcripts = ""
for audio_file in audio_files:
audio = open(audio_file, "rb")
try:
response = completions_with_backoff(
model=model_name, file=audio, response_format="text"
)
transcripts += response.text + " "
except openai.OpenAIError as e:
print(f"An error occurred: {e}")
return None
return transcripts
import random
# define a retry decorator
def retry_with_exponential_backoff(
func,
initial_delay: float = 1,
exponential_base: float = 2,
jitter: bool = True,
max_retries: int = 10,
errors: tuple = (openai.RateLimitError,),
):
def wrapper(*args, **kwargs):
num_retries = 0
delay = initial_delay
while True:
try:
return func(*args, **kwargs)
except errors as e:
print(f"Error: {e}")
num_retries += 1
if num_retries > max_retries:
raise Exception(f"Maximum number of retries ({max_retries}) exceeded.")
delay *= exponential_base * (1 + jitter * random.random())
time.sleep(delay)
except Exception as e:
raise e
return wrapper
@retry_with_exponential_backoff
def completions_with_backoff(**kwargs):
return client.audio.translations.create(**kwargs)
def get_video_id(youtube_url):
"""Extract video ID from YouTube URL."""
parsed_url = urlparse(youtube_url)
video_id = parse_qs(parsed_url.query).get("v")
return video_id[0] if video_id else None
def get_transcript(video_id):
tran = []
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
transcript = transcript_list.find_generated_transcript(['vi','en'])
translated_transcript = transcript.translate('en')
transcript_data = translated_transcript.fetch()
tran += [t['text'] for t in transcript_data if t['text'] != '[music]']
return ' '.join(tran)
def chunk_text(text, chunk_size=1000, overlap_size=24):
encoder = RecursiveCharacterTextSplitter().from_tiktoken_encoder(model_name="gpt-3.5-turbo", chunk_size=chunk_size,
chunk_overlap=overlap_size)
return encoder.split_text(text=text)
def summarize_youtube_video(youtube_url, outputs_dir):
# Tạo đường dẫn đầy đủ cho thư mục đầu ra
video_id = get_video_id(youtube_url)
en_transcript = get_transcript(video_id)
if not os.path.exists(outputs_dir):
os.makedirs(outputs_dir)
if not en_transcript:
outputs_dir = f"{outputs_dir}\\{video_id}"
raw_audio_dir = f"{outputs_dir}\\raw_audio\\"
chunks_dir = f"{outputs_dir}\\chunks"
segment_length = 10 * 60 # chunk to 10 minute segments
if not os.path.exists(outputs_dir):
os.makedirs(outputs_dir)
audio_filename = youtube_to_mp3(youtube_url, output_dir=raw_audio_dir)
chunked_audio_files = chunk_audio(
audio_filename, segment_length=segment_length, output_dir=chunks_dir
)
en_transcript = transcribe_audio(chunked_audio_files)
en_transcript = cleaning_input(en_transcript)
vi_transcript = translate_text(en_transcript)
summ_en = summary(en_transcript, 'en')
summ_vi = summary(vi_transcript, 'vi')
return tuple(summ_en), tuple(summ_vi)
def main():
st.set_page_config(layout="wide")
st.title("YouTube Video Summarizer 🎥")
st.markdown('<style>h1{color: orange; text-align: center;}</style>', unsafe_allow_html=True)
st.subheader('Built with the GPT2, Streamlit and ❤️')
st.markdown('<style>h3{color: pink; text-align: center;}</style>', unsafe_allow_html=True)
# Expander for app details
with st.expander("About the App"):
st.write("This app allows you to summarize while watching a YouTube video.")
st.write(
"Enter a YouTube URL in the input box below and click 'Submit' to start. This app is built by AI Anytime.")
# Input box for YouTube URL
youtube_url = st.text_input("Enter YouTube URL")
# Submit button
if st.button("Submit") and youtube_url:
start_time = time.time() # Start the timer
summ, tran = summarize_youtube_video(youtube_url, "./outputs")
sum = summ[0]
script = summ[1]
sum_tran = tran[0]
script_tran = tran[1]
end_time = time.time() # End the timer
elapsed_time = end_time - start_time
# Centering the video and elapsed time
st.markdown("""
<div style="display: flex; justify-content: center; flex-direction: column; align-items: center;">
<div style="width: 60%; max-width: 720px;">
<iframe width="100%" height="315" src="{youtube_url}" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>
</div>
<h2>Summarization of YouTube Video</h2>
<p>Time taken: {elapsed_time:.2f} seconds</p>
</div>
""".format(youtube_url=youtube_url.replace("watch?v=", "embed/"), elapsed_time=elapsed_time),
unsafe_allow_html=True)
col1, col2 = st.columns(2)
with col1:
st.subheader("Transcript english")
st.markdown(
f'<div style="height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px;">{script}</div>',
unsafe_allow_html=True)
st.subheader("Summary english")
st.write(sum)
with col2:
st.subheader("Transcript vietnamese")
st.markdown(
f'<div style="height: 400px; overflow-y: auto; border: 1px solid #ccc; padding: 10px;">{script_tran}</div>',
unsafe_allow_html=True)
st.subheader("Summary vietnamese")
st.write(sum_tran)
from langchain.text_splitter import RecursiveCharacterTextSplitter
def chunk_overlap_text(text, chunk_size=1000, overlap_size=24):
return RecursiveCharacterTextSplitter().from_tiktoken_encoder(model_name="gpt-3.5-turbo", chunk_size=chunk_size,
chunk_overlap=overlap_size).split_text(text=text)
def summary(text, lang):
chunks = chunk_overlap_text(text)
rs = ""
print(len(chunks[0]))
print(f"Number of chunks: {len(chunks)}")
for t in chunks:
generated_summary = generate_response(t, lang)
rs += generated_summary + " "
text = ""
for t in chunks:
text += t + " "
return rs, text
if __name__ == "__main__":
main()