from youtube_transcript_api import YouTubeTranscriptApi as yta
from youtube_transcript_api import NoTranscriptFound, TranscriptsDisabled
import streamlit as st
from yt_stats import YTstats
from datetime import datetime
import isodate
import pandas as pd
import deeppunkt
import time
import lexrank
import mysheet
def time_it(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
elapsed = end - start
#st.write(f"Elapsed time: {end - start}")
st.write('Load time: '+str(round(elapsed,1))+' sec')
return result
return wrapper
def reset_session():
if 'punkt' in st.session_state:
del st.session_state.punkt
if 'extract' in st.session_state:
del st.session_state.extract
if 'channel_id' in st.session_state:
del st.session_state.channel_id
def update_param_example():
#st.session_state.url_vid = st.session_state.ex_vid
video_id = get_id_from_link(st.session_state.ex_vid)
st.experimental_set_query_params(vid=video_id)
reset_session()
def update_param_textinput():
#st.session_state.url_vid = st.session_state.ti_vid
video_id = get_id_from_link(st.session_state.ti_vid)
st.experimental_set_query_params(vid=video_id)
reset_session()
def get_link_from_id(video_id):
if "v=" not in video_id:
return 'https://www.youtube.com/watch?v='+video_id
else:
return video_id
def get_id_from_link(link):
if "v=" in link:
return link.split("v=")[1].split("&")[0]
elif len(link)==11:
return link
else:
return "Error: Invalid Link."
# @st.cache(allow_output_mutation=True, suppress_st_warning=True)
# def retry_access_yt_object(url, max_retries=5, interval_secs=5, on_progress_callback=None):
# """
# Retries creating a YouTube object with the given URL and accessing its title several times
# with a given interval in seconds, until it succeeds or the maximum number of attempts is reached.
# If the object still cannot be created or the title cannot be accessed after the maximum number
# of attempts, the last exception is raised.
# """
# last_exception = None
# for i in range(max_retries):
# try:
# yt = YouTube(url, on_progress_callback=on_progress_callback)
# #title = yt.title # Access the title of the YouTube object.
# #views = yt.views
# return yt # Return the YouTube object if successful.
# except Exception as err:
# last_exception = err # Keep track of the last exception raised.
# st.write(f"Failed to create YouTube object or access title. Retrying... ({i+1}/{max_retries})")
# time.sleep(interval_secs) # Wait for the specified interval before retrying.
# # If the YouTube object still cannot be created or the title cannot be accessed after the maximum number of attempts, raise the last exception.
# raise last_exception
@st.cache_data()
def get_video_data(_yt, video_id):
yt_img = f'http://img.youtube.com/vi/{video_id}/mqdefault.jpg'
yt_img_html = ''
yt_img_html_link = ''+yt_img_html+''
snippet = yt._get_single_video_data(video_id,'snippet')
yt_publish_date = snippet['publishedAt']
yt_title = snippet['title']
yt_author = snippet['channelTitle']
yt_channel_id = snippet['channelId']
try:
yt_keywords = snippet['tags']
except:
yt_keywords = []
statistics = yt._get_single_video_data(video_id,'statistics')
yt_views = statistics['viewCount']
contentDetails = yt._get_single_video_data(video_id,'contentDetails')
yt_length = contentDetails['duration']
yt_length_isodate = isodate.parse_duration(yt_length)
yt_length_isoformat = isodate.duration_isoformat(yt_length_isodate, "%H:%M:%S")[1:]
data = {'Video':[yt_img_html_link],
'Author': [yt_author],
'Title': [yt_title],
'Published': [datetime.strptime(yt_publish_date, '%Y-%m-%dT%H:%M:%SZ').strftime('%B %d, %Y')],
'Views':[format(int(yt_views), ",").replace(",", "'")],
'Length':[yt_length_isoformat]}
return data, yt_keywords, yt_channel_id
@st.cache_data()
def get_video_data_from_gsheed(df, video_id):
yt_img_html_link = df.loc[df["ID"] == video_id]['Video'].to_list()[0]
yt_author = df.loc[df["ID"] == video_id]['Author'].to_list()[0]
yt_title = df.loc[df["ID"] == video_id]['Title'].to_list()[0]
yt_publish_date = df.loc[df["ID"] == video_id]['Published'].to_list()[0]
yt_views = df.loc[df["ID"] == video_id]['Views'].to_list()[0]
yt_length_isoformat = df.loc[df["ID"] == video_id]['Length'].to_list()[0]
yt_keywords = df.loc[df["ID"] == video_id]['Keywords'].to_list()[0].split(';')
yt_channel_id = df.loc[df["ID"] == video_id]['Channel'].to_list()[0]
data = {'Video':[yt_img_html_link],
'Author': [yt_author],
'Title': [yt_title],
'Published': [yt_publish_date],
'Views':[yt_views],
'Length':[yt_length_isoformat]}
return data, yt_keywords, yt_channel_id
@time_it
def get_punctuated_text(raw_text):
response = deeppunkt.predict('sentences',raw_text)
st.session_state['punkt'] = response
def get_punctuated_text_to_dict(raw_text):
#st.session_state['punkt'] = {'data':[raw_text,0,0,0,0], 'duration':0}
st.session_state['punkt'] = [raw_text,0,0,0,0]
@time_it
def get_extracted_text(raw_text):
response = lexrank.summarize(raw_text)
st.session_state['extract'] = response
def get_extracted_text_to_dict(raw_text):
st.session_state['extract'] = [raw_text,0,0,0,0]
def get_videos_from_yt(yt):
vids_thumbnails = []
vids_videoIds = []
vids_titles = []
vids_lengths = []
vids_published= []
vids_views= []
item=0
for video in yt.video_data:
if item == item_limit:
break
item = item+1
vids_video_id = video
vids_url = 'https://www.youtube.com/watch?v='+vids_video_id
yt_img = f'http://img.youtube.com/vi/{vids_video_id}/mqdefault.jpg'
yt_img_html = ''
yt_img_html_link = ''+yt_img_html+''
vids_thumbnails.append(yt_img_html_link)
vids_video_id_link = ''+vids_video_id+''
vids_videoIds.append(vids_video_id_link)
vids_titles.append(yt.video_data[video]['title'])
yt_length = yt.video_data[video]['duration']
yt_length_isodate = isodate.parse_duration(yt_length)
yt_length_isoformat = isodate.duration_isoformat(yt_length_isodate, "%H:%M:%S")[1:]
vids_lengths.append(yt_length_isoformat)
yt_publish_date = yt.video_data[video]['publishedAt']
yt_publish_date_formatted = datetime.strptime(yt_publish_date, '%Y-%m-%dT%H:%M:%SZ').strftime('%B %d, %Y')
vids_published.append(yt_publish_date_formatted)
yt_views = yt.video_data[video]['viewCount']
yt_viws_formatted = format(int(yt_views), ",").replace(",", "'")
vids_views.append(yt_viws_formatted)
df_videos = {'Video': vids_thumbnails,
'Video ID':vids_videoIds,
'Title':vids_titles,
'Published':vids_published,
'Views':vids_views,
'Length':vids_lengths}
return df_videos
def get_transcript(video_id):
transcript_list = yta.list_transcripts(video_id)
transcript_raw = None
transcript_item = transcript_list.find_transcript(['en'])
transcript_item_is_generated = transcript_item.is_generated
transcript_raw = transcript_item.fetch()
if transcript_raw is None:
return None
transcript_text = '\n'.join([i['text'].replace('\n',' ') for i in transcript_raw])
return transcript_text, transcript_item_is_generated
def get_meta_info(video_id, url):
yt_img = f'http://img.youtube.com/vi/{video_id}/mqdefault.jpg'
yt_img_html = ''
yt_img_html_link = ''+yt_img_html+''
video_info = {'ID': [video_id],
'Video':[yt_img_html_link],
'Author': [st.session_state["video_data"]["Author"][0]],
'Channel':[st.session_state["channel_id"]],
'Title': [st.session_state["video_data"]["Title"][0]],
'Published': [st.session_state["video_data"]["Published"][0]],
'Views':[st.session_state["video_data"]["Views"][0]],
'Length':[st.session_state["video_data"]["Length"][0]],
'Keywords':['; '.join(st.session_state["keywords"])]}
transcript_info = {'Words':[int(st.session_state.extract[1])],
'Sentences': [int(st.session_state.extract[2])],
'Characters': [int(st.session_state.extract[3])],
'Tokens':[int(st.session_state.extract[4])],
'Lextext':[st.session_state.extract[0]],
'GPTSummary':[0]}
df_current_ts = pd.DataFrame({**video_info,**transcript_info})
return df_current_ts
#######################################################################################
# Application Start
#######################################################################################
st.title("Transcriptifier")
st.subheader("Youtube Transcript Downloader")
example_urls = [
'https://www.youtube.com/watch?v=8uQDDUfGNPA', # blog
'https://www.youtube.com/watch?v=ofZEo0Rzo5s', # h-educate
'https://www.youtube.com/watch?v=ReHGSGwV4-A', #wholesale ted
'https://www.youtube.com/watch?v=n8JHnLgodRI', #kevindavid
'https://www.youtube.com/watch?v=6MI0f6YjJIk', # Nicholas
'https://www.youtube.com/watch?v=nr4kmlTr9xw', # Linus
'https://www.youtube.com/watch?v=64Izfm24FKA', # Yannic
'https://www.youtube.com/watch?v=Mt1P7p9HmkU', # Fogarty
'https://www.youtube.com/watch?v=bj9snrsSook', #Geldschnurrbart
'https://www.youtube.com/watch?v=0kJz0q0pvgQ', # fcc
'https://www.youtube.com/watch?v=gNRGkMeITVU', # iman
'https://www.youtube.com/watch?v=vAuQuL8dlXo', #ghiorghiu
'https://www.youtube.com/watch?v=5scEDopRAi0', #infohaus
'https://www.youtube.com/watch?v=lCnHfTHkhbE', #fcc tutorial
'https://www.youtube.com/watch?v=QI2okshNv_4'
]
par_vid = st.experimental_get_query_params().get("vid")
if par_vid:
par_url = par_vid[0]
else:
par_url = None
select_examples = st.selectbox(label="Choose an example",options=example_urls, key='ex_vid', on_change=update_param_example)
url = st.text_input("Or Enter the YouTube video URL or ID:", value=par_url if par_url else select_examples, key='ti_vid', on_change=update_param_textinput)
########################
# Load the data for a given video
########################
API_KEY = st.secrets["api_key"]
yt = YTstats(API_KEY)
#yt = retry_access_yt_object(get_link_from_id(url))
if url:
video_id = get_id_from_link(url)
if 'gsheed' not in st.session_state:
df = mysheet.read_gspread()
st.session_state.gsheed = df
#st.write("reading spradsheet")
else:
df = st.session_state.gsheed
#st.write("getting spreadsheed from session_state")
gslist=[]
try:
gslist = df.ID.to_list()
except:
st.write('no items available.')
if video_id in gslist:
#st.write(df.loc[df["ID"] == video_id])
st.write("reading from sheet")
#transcript_item_is_generated = False
#transcript_text = df.loc[df["ID"] == video_id]['Punkttext'].to_list()[0]
#get_punctuated_text_to_dict(transcript_text)
extracted_text = df.loc[df["ID"] == video_id]['Lextext'].to_list()[0]
get_extracted_text_to_dict(extracted_text)
video_data, yt_keywords, yt_channel_id = get_video_data_from_gsheed(df, video_id)
else:
st.write("reading from api")
video_data, yt_keywords, yt_channel_id = get_video_data(yt, video_id)
st.session_state["video_data"] = video_data
st.session_state["keywords"] = yt_keywords
st.session_state["channel_id"] = yt_channel_id
df = pd.DataFrame(st.session_state["video_data"])
st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True)
st.write("")
###########################
# Load Transcript
###########################
transcript_text, transcript_item_is_generated = get_transcript(video_id)
if transcript_text is None:
st.error("No transcript available.")
st.stop()
########################
# Load Author Keywords, that are not viewable by users
########################
keywords_data = {'Authors Keywords':yt_keywords}
st.table(keywords_data)
st.write("")
# TODO
# or this video (bj9snrsSook) transcripts are available in the following languages:
# (MANUALLY CREATED)
# None
# (GENERATED)
# - de ("Deutsch (automatisch erzeugt)")[TRANSLATABLE]
# (TRANSLATION LANGUAGES)
# - af ("Afrikaans")
########################
# Display the transcript along with the download button
########################
with st.expander('Preview Transcript'):
st.code(transcript_text, language=None)
st.download_button('Download Transcript', transcript_text)
########################
# API Call to deeppunkt-gr
########################
st.subheader("Restore Punctuations of Transcript")
if not transcript_item_is_generated:
st.write("Transcript is punctuated by author.")
# TODO
#check if the transcript contains more than 5 sentences
if st.button('Load Punctuated Transcript'):
with st.spinner('Loading Punctuation...'):
if 'punkt' not in st.session_state:
# first figure out if transcript is already punctuated
if transcript_item_is_generated:
get_punctuated_text(transcript_text)
else:
get_punctuated_text_to_dict(transcript_text)
#st.write('Load time: '+str(round(st.session_state.punkt['duration'],1))+' sec')
metrics_data = {'Words':[int(st.session_state.punkt[1])],
'Sentences': [int(st.session_state.punkt[2])],
'Characters': [int(st.session_state.punkt[3])],
'Tokens':[int(st.session_state.punkt[4])]}
df = pd.DataFrame(metrics_data)
st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True)
st.write("")
with st.expander('Preview Transcript'):
st.code(st.session_state.punkt[0], language=None)
########################
# Call to lexrank-gr
########################
st.subheader("Extract Core Sentences from Transcript")
if st.button('Extract Sentences'):
# decide if the extract is already available, if not, text has to be punctuated first
with st.spinner('Loading Extractions ...'):
if 'extract' not in st.session_state:
with st.spinner('Loading Punctuation for Extraction ...'):
if 'punkt' not in st.session_state:
# first figure out if transcript is already punctuated
if transcript_item_is_generated:
get_punctuated_text(transcript_text)
else:
get_punctuated_text_to_dict(transcript_text)
get_extracted_text(st.session_state.punkt[0])
metrics_data = {'Words':[int(st.session_state.extract[1])],
'Sentences': [int(st.session_state.extract[2])],
'Characters': [int(st.session_state.extract[3])],
'Tokens':[int(st.session_state.extract[4])]}
df = pd.DataFrame(metrics_data)
st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True)
st.write("")
with st.expander('Preview Transcript'):
st.code(st.session_state.extract[0], language=None)
################
if 'extract' not in st.session_state:
st.error('Please run extraction first.', icon="🚨")
else:
df_current_ts = get_meta_info(video_id, url)
# initial write.
#df_new_sheet = pd.concat([df_current_ts])
#mysheet.write_gspread(df_new_sheet)
#st.write(video_info)
if 'gsheed' not in st.session_state:
df = mysheet.read_gspread()
st.session_state.gsheed = df
df_sheet = st.session_state.gsheed
df_current_ts_id = list(df_current_ts.ID)[0]
if df_current_ts_id not in list(df_sheet.ID):
df_new_sheet = pd.concat([df_sheet,df_current_ts])
mysheet.write_gspread(df_new_sheet)
st.session_state.gsheed = df_new_sheet
st.write('video added to sheet')
#else:
# st.write('video already in sheet')
# st.write(df_sheet)
#######################
# write to gspread file
########################
if st.button('Read Spreadsheet'):
if 'gsheed' not in st.session_state:
df = mysheet.read_gspread()
st.session_state.gsheed = df
st.write(st.session_state.gsheed)
#if st.button('Add to Spreadsheet'):
#######################
# API Call to summarymachine
########################
# def get_summarized_text(raw_text):
# response = requests.post("https://wldmr-summarymachine.hf.space/run/predict", json={
# "data": [
# raw_text,
# ]})
# #response_id = response
# if response.status_code == 504:
# raise "Error: Request took too long (>60sec), please try a shorter text."
# return response.json()
# st.subheader("Summarize Extracted Sentences with Flan-T5-large")
# if st.button('Summarize Sentences'):
# command = 'Summarize the transcript in one sentence:\n\n'
# with st.spinner('Loading Punctuation (Step 1/3)...'):
# if 'punkt' not in st.session_state:
# # first figure out if transcript is already punctuated
# if transcript_item.is_generated:
# get_punctuated_text(transcript_text)
# else:
# get_punctuated_text_to_dict(transcript_text)
# with st.spinner('Loading Extraction (Step 2/3)...'):
# if 'extract' not in st.session_state:
# get_extracted_text(st.session_state.punkt['data'][0])
# with st.spinner('Loading Summary (Step 3/3)...'):
# summary_text = get_summarized_text(command+st.session_state.extract['data'][0])
# st.write('Load time: '+str(round(summary_text['duration'],1))+' sec')
# with st.expander('Preview Transcript'):
# st.write(summary_text['data'][0], language=None)
########################
# Channel
########################
st.subheader("Other Videos of the Channel")
#st.write(st.session_state["channel_id"])
if 'channel_id' not in st.session_state:
st.error('Channel ID not available.', icon="🚨")
else:
yt.get_channel_statistics(st.session_state["channel_id"])
stats_data = {'Channel ID': [st.session_state["channel_id"]],
'Total Views':[format(int(yt.channel_statistics["viewCount"]), ",").replace(",", "'")],
'Total Subscribers':[format(int(yt.channel_statistics["subscriberCount"]), ",").replace(",", "'")],
'Total Videos':[format(int(yt.channel_statistics["videoCount"]), ",").replace(",", "'")],
}
df = pd.DataFrame(stats_data)
st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True)
st.write("")
if st.button('Load Videos'):
if 'gsheed' not in st.session_state:
df = mysheet.read_gspread()
st.session_state.gsheed = df
progress_text = 'Loading...'
loading_bar = st.progress(0, text=progress_text)
item_limit=3
df = st.session_state.gsheed
yt.get_channel_video_data(st.session_state["channel_id"],df, loading_bar, progress_text, item_limit)
df_videos = get_videos_from_yt(yt)
dataset = pd.DataFrame(df_videos)
st.markdown(dataset.style.hide(axis="index").to_html(), unsafe_allow_html=True)
########################
# Sequence Loader
########################
st.subheader("Sequence Loader")
# input hash as secret
input_hash = st.text_input("Enter Hash:")
if st.button('Load Sequence'):
HASH_KEY = st.secrets["hash_key"]
if input_hash == HASH_KEY:
st.write("Access granted")
# read in spreadsheet
if 'gsheed' not in st.session_state:
df = mysheet.read_gspread()
st.session_state.gsheed = df
progress_text = 'Loading...'
loading_bar = st.progress(0, text=progress_text)
item_limit=3
df = st.session_state.gsheed
yt.get_channel_video_data(st.session_state["channel_id"], df,loading_bar, progress_text, item_limit)
df_videos = get_videos_from_yt(yt)
dataset = pd.DataFrame(df_videos)
for sng in dataset['Video ID']:
subsng = sng[sng.find('>')+1:sng.find('')]
print(subsng)
transcript_text, transcript_item_is_generated = get_transcript(subsng)
if transcript_item_is_generated:
get_punctuated_text(transcript_text)
else:
get_punctuated_text_to_dict(transcript_text)
get_extracted_text(st.session_state.punkt[0])
else:
st.write("Access denied")
###############
# End of File #
###############
# hide_streamlit_style = """
#
# """
# st.markdown(hide_streamlit_style, unsafe_allow_html=True)