from youtube_transcript_api import YouTubeTranscriptApi as yta from youtube_transcript_api import NoTranscriptFound, TranscriptsDisabled import streamlit as st from yt_stats import YTstats from datetime import datetime import isodate import pandas as pd import deeppunkt import time import lexrank import mysheet def time_it(func): def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) end = time.time() elapsed = end - start #st.write(f"Elapsed time: {end - start}") st.write('Load time: '+str(round(elapsed,1))+' sec') return result return wrapper def reset_session(): if 'punkt' in st.session_state: del st.session_state.punkt if 'extract' in st.session_state: del st.session_state.extract if 'channel_id' in st.session_state: del st.session_state.channel_id def update_param_example(): #st.session_state.url_vid = st.session_state.ex_vid video_id = get_id_from_link(st.session_state.ex_vid) st.experimental_set_query_params(vid=video_id) reset_session() def update_param_textinput(): #st.session_state.url_vid = st.session_state.ti_vid video_id = get_id_from_link(st.session_state.ti_vid) st.experimental_set_query_params(vid=video_id) reset_session() def get_link_from_id(video_id): if "v=" not in video_id: return 'https://www.youtube.com/watch?v='+video_id else: return video_id def get_id_from_link(link): if "v=" in link: return link.split("v=")[1].split("&")[0] elif len(link)==11: return link else: return "Error: Invalid Link." # @st.cache(allow_output_mutation=True, suppress_st_warning=True) # def retry_access_yt_object(url, max_retries=5, interval_secs=5, on_progress_callback=None): # """ # Retries creating a YouTube object with the given URL and accessing its title several times # with a given interval in seconds, until it succeeds or the maximum number of attempts is reached. # If the object still cannot be created or the title cannot be accessed after the maximum number # of attempts, the last exception is raised. # """ # last_exception = None # for i in range(max_retries): # try: # yt = YouTube(url, on_progress_callback=on_progress_callback) # #title = yt.title # Access the title of the YouTube object. # #views = yt.views # return yt # Return the YouTube object if successful. # except Exception as err: # last_exception = err # Keep track of the last exception raised. # st.write(f"Failed to create YouTube object or access title. Retrying... ({i+1}/{max_retries})") # time.sleep(interval_secs) # Wait for the specified interval before retrying. # # If the YouTube object still cannot be created or the title cannot be accessed after the maximum number of attempts, raise the last exception. # raise last_exception @st.cache_data() def get_video_data(_yt, video_id): yt_img = f'http://img.youtube.com/vi/{video_id}/mqdefault.jpg' yt_img_html = '' yt_img_html_link = ''+yt_img_html+'' snippet = yt._get_single_video_data(video_id,'snippet') yt_publish_date = snippet['publishedAt'] yt_title = snippet['title'] yt_author = snippet['channelTitle'] yt_channel_id = snippet['channelId'] try: yt_keywords = snippet['tags'] except: yt_keywords = [] statistics = yt._get_single_video_data(video_id,'statistics') yt_views = statistics['viewCount'] contentDetails = yt._get_single_video_data(video_id,'contentDetails') yt_length = contentDetails['duration'] yt_length_isodate = isodate.parse_duration(yt_length) yt_length_isoformat = isodate.duration_isoformat(yt_length_isodate, "%H:%M:%S")[1:] data = {'Video':[yt_img_html_link], 'Author': [yt_author], 'Title': [yt_title], 'Published': [datetime.strptime(yt_publish_date, '%Y-%m-%dT%H:%M:%SZ').strftime('%B %d, %Y')], 'Views':[format(int(yt_views), ",").replace(",", "'")], 'Length':[yt_length_isoformat]} return data, yt_keywords, yt_channel_id @st.cache_data() def get_video_data_from_gsheed(df, video_id): yt_img_html_link = df.loc[df["ID"] == video_id]['Video'].to_list()[0] yt_author = df.loc[df["ID"] == video_id]['Author'].to_list()[0] yt_title = df.loc[df["ID"] == video_id]['Title'].to_list()[0] yt_publish_date = df.loc[df["ID"] == video_id]['Published'].to_list()[0] yt_views = df.loc[df["ID"] == video_id]['Views'].to_list()[0] yt_length_isoformat = df.loc[df["ID"] == video_id]['Length'].to_list()[0] yt_keywords = df.loc[df["ID"] == video_id]['Keywords'].to_list()[0].split(';') yt_channel_id = df.loc[df["ID"] == video_id]['Channel'].to_list()[0] data = {'Video':[yt_img_html_link], 'Author': [yt_author], 'Title': [yt_title], 'Published': [yt_publish_date], 'Views':[yt_views], 'Length':[yt_length_isoformat]} return data, yt_keywords, yt_channel_id @time_it def get_punctuated_text(raw_text): response = deeppunkt.predict('sentences',raw_text) st.session_state['punkt'] = response def get_punctuated_text_to_dict(raw_text): #st.session_state['punkt'] = {'data':[raw_text,0,0,0,0], 'duration':0} st.session_state['punkt'] = [raw_text,0,0,0,0] @time_it def get_extracted_text(raw_text): response = lexrank.summarize(raw_text) st.session_state['extract'] = response def get_extracted_text_to_dict(raw_text): st.session_state['extract'] = [raw_text,0,0,0,0] def get_videos_from_yt(yt): vids_thumbnails = [] vids_videoIds = [] vids_titles = [] vids_lengths = [] vids_published= [] vids_views= [] item=0 for video in yt.video_data: if item == item_limit: break item = item+1 vids_video_id = video vids_url = 'https://www.youtube.com/watch?v='+vids_video_id yt_img = f'http://img.youtube.com/vi/{vids_video_id}/mqdefault.jpg' yt_img_html = '' yt_img_html_link = ''+yt_img_html+'' vids_thumbnails.append(yt_img_html_link) vids_video_id_link = ''+vids_video_id+'' vids_videoIds.append(vids_video_id_link) vids_titles.append(yt.video_data[video]['title']) yt_length = yt.video_data[video]['duration'] yt_length_isodate = isodate.parse_duration(yt_length) yt_length_isoformat = isodate.duration_isoformat(yt_length_isodate, "%H:%M:%S")[1:] vids_lengths.append(yt_length_isoformat) yt_publish_date = yt.video_data[video]['publishedAt'] yt_publish_date_formatted = datetime.strptime(yt_publish_date, '%Y-%m-%dT%H:%M:%SZ').strftime('%B %d, %Y') vids_published.append(yt_publish_date_formatted) yt_views = yt.video_data[video]['viewCount'] yt_viws_formatted = format(int(yt_views), ",").replace(",", "'") vids_views.append(yt_viws_formatted) df_videos = {'Video': vids_thumbnails, 'Video ID':vids_videoIds, 'Title':vids_titles, 'Published':vids_published, 'Views':vids_views, 'Length':vids_lengths} return df_videos def get_transcript(video_id): transcript_list = yta.list_transcripts(video_id) transcript_raw = None transcript_item = transcript_list.find_transcript(['en']) transcript_item_is_generated = transcript_item.is_generated transcript_raw = transcript_item.fetch() if transcript_raw is None: return None transcript_text = '\n'.join([i['text'].replace('\n',' ') for i in transcript_raw]) return transcript_text, transcript_item_is_generated def get_meta_info(video_id, url): yt_img = f'http://img.youtube.com/vi/{video_id}/mqdefault.jpg' yt_img_html = '' yt_img_html_link = ''+yt_img_html+'' video_info = {'ID': [video_id], 'Video':[yt_img_html_link], 'Author': [st.session_state["video_data"]["Author"][0]], 'Channel':[st.session_state["channel_id"]], 'Title': [st.session_state["video_data"]["Title"][0]], 'Published': [st.session_state["video_data"]["Published"][0]], 'Views':[st.session_state["video_data"]["Views"][0]], 'Length':[st.session_state["video_data"]["Length"][0]], 'Keywords':['; '.join(st.session_state["keywords"])]} transcript_info = {'Words':[int(st.session_state.extract[1])], 'Sentences': [int(st.session_state.extract[2])], 'Characters': [int(st.session_state.extract[3])], 'Tokens':[int(st.session_state.extract[4])], 'Lextext':[st.session_state.extract[0]], 'GPTSummary':[0]} df_current_ts = pd.DataFrame({**video_info,**transcript_info}) return df_current_ts ####################################################################################### # Application Start ####################################################################################### st.title("Transcriptifier") st.subheader("Youtube Transcript Downloader") example_urls = [ 'https://www.youtube.com/watch?v=8uQDDUfGNPA', # blog 'https://www.youtube.com/watch?v=ofZEo0Rzo5s', # h-educate 'https://www.youtube.com/watch?v=ReHGSGwV4-A', #wholesale ted 'https://www.youtube.com/watch?v=n8JHnLgodRI', #kevindavid 'https://www.youtube.com/watch?v=6MI0f6YjJIk', # Nicholas 'https://www.youtube.com/watch?v=nr4kmlTr9xw', # Linus 'https://www.youtube.com/watch?v=64Izfm24FKA', # Yannic 'https://www.youtube.com/watch?v=Mt1P7p9HmkU', # Fogarty 'https://www.youtube.com/watch?v=bj9snrsSook', #Geldschnurrbart 'https://www.youtube.com/watch?v=0kJz0q0pvgQ', # fcc 'https://www.youtube.com/watch?v=gNRGkMeITVU', # iman 'https://www.youtube.com/watch?v=vAuQuL8dlXo', #ghiorghiu 'https://www.youtube.com/watch?v=5scEDopRAi0', #infohaus 'https://www.youtube.com/watch?v=lCnHfTHkhbE', #fcc tutorial 'https://www.youtube.com/watch?v=QI2okshNv_4' ] par_vid = st.experimental_get_query_params().get("vid") if par_vid: par_url = par_vid[0] else: par_url = None select_examples = st.selectbox(label="Choose an example",options=example_urls, key='ex_vid', on_change=update_param_example) url = st.text_input("Or Enter the YouTube video URL or ID:", value=par_url if par_url else select_examples, key='ti_vid', on_change=update_param_textinput) ######################## # Load the data for a given video ######################## API_KEY = st.secrets["api_key"] yt = YTstats(API_KEY) #yt = retry_access_yt_object(get_link_from_id(url)) if url: video_id = get_id_from_link(url) if 'gsheed' not in st.session_state: df = mysheet.read_gspread() st.session_state.gsheed = df #st.write("reading spradsheet") else: df = st.session_state.gsheed #st.write("getting spreadsheed from session_state") gslist=[] try: gslist = df.ID.to_list() except: st.write('no items available.') if video_id in gslist: #st.write(df.loc[df["ID"] == video_id]) st.write("reading from sheet") #transcript_item_is_generated = False #transcript_text = df.loc[df["ID"] == video_id]['Punkttext'].to_list()[0] #get_punctuated_text_to_dict(transcript_text) extracted_text = df.loc[df["ID"] == video_id]['Lextext'].to_list()[0] get_extracted_text_to_dict(extracted_text) video_data, yt_keywords, yt_channel_id = get_video_data_from_gsheed(df, video_id) else: st.write("reading from api") video_data, yt_keywords, yt_channel_id = get_video_data(yt, video_id) st.session_state["video_data"] = video_data st.session_state["keywords"] = yt_keywords st.session_state["channel_id"] = yt_channel_id df = pd.DataFrame(st.session_state["video_data"]) st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True) st.write("") ########################### # Load Transcript ########################### transcript_text, transcript_item_is_generated = get_transcript(video_id) if transcript_text is None: st.error("No transcript available.") st.stop() ######################## # Load Author Keywords, that are not viewable by users ######################## keywords_data = {'Authors Keywords':yt_keywords} st.table(keywords_data) st.write("") # TODO # or this video (bj9snrsSook) transcripts are available in the following languages: # (MANUALLY CREATED) # None # (GENERATED) # - de ("Deutsch (automatisch erzeugt)")[TRANSLATABLE] # (TRANSLATION LANGUAGES) # - af ("Afrikaans") ######################## # Display the transcript along with the download button ######################## with st.expander('Preview Transcript'): st.code(transcript_text, language=None) st.download_button('Download Transcript', transcript_text) ######################## # API Call to deeppunkt-gr ######################## st.subheader("Restore Punctuations of Transcript") if not transcript_item_is_generated: st.write("Transcript is punctuated by author.") # TODO #check if the transcript contains more than 5 sentences if st.button('Load Punctuated Transcript'): with st.spinner('Loading Punctuation...'): if 'punkt' not in st.session_state: # first figure out if transcript is already punctuated if transcript_item_is_generated: get_punctuated_text(transcript_text) else: get_punctuated_text_to_dict(transcript_text) #st.write('Load time: '+str(round(st.session_state.punkt['duration'],1))+' sec') metrics_data = {'Words':[int(st.session_state.punkt[1])], 'Sentences': [int(st.session_state.punkt[2])], 'Characters': [int(st.session_state.punkt[3])], 'Tokens':[int(st.session_state.punkt[4])]} df = pd.DataFrame(metrics_data) st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True) st.write("") with st.expander('Preview Transcript'): st.code(st.session_state.punkt[0], language=None) ######################## # Call to lexrank-gr ######################## st.subheader("Extract Core Sentences from Transcript") if st.button('Extract Sentences'): # decide if the extract is already available, if not, text has to be punctuated first with st.spinner('Loading Extractions ...'): if 'extract' not in st.session_state: with st.spinner('Loading Punctuation for Extraction ...'): if 'punkt' not in st.session_state: # first figure out if transcript is already punctuated if transcript_item_is_generated: get_punctuated_text(transcript_text) else: get_punctuated_text_to_dict(transcript_text) get_extracted_text(st.session_state.punkt[0]) metrics_data = {'Words':[int(st.session_state.extract[1])], 'Sentences': [int(st.session_state.extract[2])], 'Characters': [int(st.session_state.extract[3])], 'Tokens':[int(st.session_state.extract[4])]} df = pd.DataFrame(metrics_data) st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True) st.write("") with st.expander('Preview Transcript'): st.code(st.session_state.extract[0], language=None) ################ if 'extract' not in st.session_state: st.error('Please run extraction first.', icon="🚨") else: df_current_ts = get_meta_info(video_id, url) # initial write. #df_new_sheet = pd.concat([df_current_ts]) #mysheet.write_gspread(df_new_sheet) #st.write(video_info) if 'gsheed' not in st.session_state: df = mysheet.read_gspread() st.session_state.gsheed = df df_sheet = st.session_state.gsheed df_current_ts_id = list(df_current_ts.ID)[0] if df_current_ts_id not in list(df_sheet.ID): df_new_sheet = pd.concat([df_sheet,df_current_ts]) mysheet.write_gspread(df_new_sheet) st.session_state.gsheed = df_new_sheet st.write('video added to sheet') #else: # st.write('video already in sheet') # st.write(df_sheet) ####################### # write to gspread file ######################## if st.button('Read Spreadsheet'): if 'gsheed' not in st.session_state: df = mysheet.read_gspread() st.session_state.gsheed = df st.write(st.session_state.gsheed) #if st.button('Add to Spreadsheet'): ####################### # API Call to summarymachine ######################## # def get_summarized_text(raw_text): # response = requests.post("https://wldmr-summarymachine.hf.space/run/predict", json={ # "data": [ # raw_text, # ]}) # #response_id = response # if response.status_code == 504: # raise "Error: Request took too long (>60sec), please try a shorter text." # return response.json() # st.subheader("Summarize Extracted Sentences with Flan-T5-large") # if st.button('Summarize Sentences'): # command = 'Summarize the transcript in one sentence:\n\n' # with st.spinner('Loading Punctuation (Step 1/3)...'): # if 'punkt' not in st.session_state: # # first figure out if transcript is already punctuated # if transcript_item.is_generated: # get_punctuated_text(transcript_text) # else: # get_punctuated_text_to_dict(transcript_text) # with st.spinner('Loading Extraction (Step 2/3)...'): # if 'extract' not in st.session_state: # get_extracted_text(st.session_state.punkt['data'][0]) # with st.spinner('Loading Summary (Step 3/3)...'): # summary_text = get_summarized_text(command+st.session_state.extract['data'][0]) # st.write('Load time: '+str(round(summary_text['duration'],1))+' sec') # with st.expander('Preview Transcript'): # st.write(summary_text['data'][0], language=None) ######################## # Channel ######################## st.subheader("Other Videos of the Channel") #st.write(st.session_state["channel_id"]) if 'channel_id' not in st.session_state: st.error('Channel ID not available.', icon="🚨") else: yt.get_channel_statistics(st.session_state["channel_id"]) stats_data = {'Channel ID': [st.session_state["channel_id"]], 'Total Views':[format(int(yt.channel_statistics["viewCount"]), ",").replace(",", "'")], 'Total Subscribers':[format(int(yt.channel_statistics["subscriberCount"]), ",").replace(",", "'")], 'Total Videos':[format(int(yt.channel_statistics["videoCount"]), ",").replace(",", "'")], } df = pd.DataFrame(stats_data) st.markdown(df.style.hide(axis="index").to_html(), unsafe_allow_html=True) st.write("") if st.button('Load Videos'): if 'gsheed' not in st.session_state: df = mysheet.read_gspread() st.session_state.gsheed = df progress_text = 'Loading...' loading_bar = st.progress(0, text=progress_text) item_limit=3 df = st.session_state.gsheed yt.get_channel_video_data(st.session_state["channel_id"],df, loading_bar, progress_text, item_limit) df_videos = get_videos_from_yt(yt) dataset = pd.DataFrame(df_videos) st.markdown(dataset.style.hide(axis="index").to_html(), unsafe_allow_html=True) ######################## # Sequence Loader ######################## st.subheader("Sequence Loader") # input hash as secret input_hash = st.text_input("Enter Hash:") if st.button('Load Sequence'): HASH_KEY = st.secrets["hash_key"] if input_hash == HASH_KEY: st.write("Access granted") # read in spreadsheet if 'gsheed' not in st.session_state: df = mysheet.read_gspread() st.session_state.gsheed = df progress_text = 'Loading...' loading_bar = st.progress(0, text=progress_text) item_limit=3 df = st.session_state.gsheed yt.get_channel_video_data(st.session_state["channel_id"], df,loading_bar, progress_text, item_limit) df_videos = get_videos_from_yt(yt) dataset = pd.DataFrame(df_videos) for sng in dataset['Video ID']: subsng = sng[sng.find('>')+1:sng.find(' # #MainMenu {visibility: hidden;} # footer {visibility: hidden;} # # """ # st.markdown(hide_streamlit_style, unsafe_allow_html=True)