webscarper / app.py
mobenta's picture
Update app.py
0d9d03d verified
raw
history blame
9.28 kB
import gradio as gr
import requests
import os
import re
from datetime import datetime, timedelta
# Fetch the keys from the environment variable and convert them into a list
YOUTUBE_API_KEYS = os.getenv("YOUTUBE_API_KEYS")
if YOUTUBE_API_KEYS:
YOUTUBE_API_KEYS = [key.strip() for key in YOUTUBE_API_KEYS.split(",")]
else:
raise ValueError("API keys not found. Make sure the secret 'YOUTUBE_API_KEYS' is set.")
# Index to keep track of which API key to use
key_index = 0
def get_api_key():
global key_index
api_key = YOUTUBE_API_KEYS[key_index]
key_index = (key_index + 1) % len(YOUTUBE_API_KEYS) # Rotate to the next key
return api_key
def get_iso8601_date(time_frame):
now = datetime.utcnow()
if time_frame == "Last Hour":
return (now - timedelta(hours=1)).isoformat("T") + "Z"
elif time_frame == "Today":
return (now - timedelta(days=1)).isoformat("T") + "Z"
elif time_frame == "This Week":
return (now - timedelta(weeks=1)).isoformat("T") + "Z"
elif time_frame == "This Month":
return (now - timedelta(days=30)).isoformat("T") + "Z"
elif time_frame == "This Year":
return (now - timedelta(days=365)).isoformat("T") + "Z"
else:
return ""
def youtube_search(query, upload_date, video_type, duration, max_results=50):
search_url = "https://www.googleapis.com/youtube/v3/search"
all_results = []
params = {
"part": "snippet",
"q": query,
"maxResults": 50,
"order": "relevance"
}
if upload_date:
published_after = get_iso8601_date(upload_date)
if published_after:
params["publishedAfter"] = published_after
if video_type and video_type != "All":
params["type"] = video_type.lower()
if duration:
duration_mapping = {
"Short (<4 mins)": "short",
"Medium (4-20 mins)": "medium",
"Long (>20 mins)": "long"
}
params["videoDuration"] = duration_mapping.get(duration, "any")
try:
params["key"] = get_api_key()
response = requests.get(search_url, params=params)
if response.status_code in [403, 429]:
print(f"Quota exceeded or forbidden for API key. Trying next key...")
return []
response.raise_for_status()
results = response.json().get("items", [])
for result in results:
if result["id"]["kind"] == "youtube#video":
video_info = {
'thumbnail_url': result["snippet"]["thumbnails"]["high"]["url"],
'id': result["id"]["videoId"],
'title': result["snippet"]["title"],
'type': "video"
}
elif result["id"]["kind"] == "youtube#channel":
video_info = {
'thumbnail_url': result["snippet"]["thumbnails"]["high"]["url"],
'id': result["id"]["channelId"],
'title': result["snippet"]["title"],
'type': "channel"
}
elif result["id"]["kind"] == "youtube#playlist":
video_info = {
'thumbnail_url': result["snippet"]["thumbnails"]["high"]["url"],
'id': result["id"]["playlistId"],
'title': result["snippet"]["title"],
'type': "playlist"
}
else:
continue
all_results.append(video_info)
return all_results
except requests.exceptions.RequestException as e:
print(f"Error during YouTube API request: {e}")
return []
def get_channel_videos(channel_id, max_results=10):
search_url = "https://www.googleapis.com/youtube/v3/search"
params = {
"part": "snippet",
"channelId": channel_id,
"maxResults": max_results,
"order": "date",
"type": "video",
"key": get_api_key()
}
try:
response = requests.get(search_url, params=params)
response.raise_for_status()
results = response.json().get("items", [])
channel_videos = []
for result in results:
channel_videos.append({
'thumbnail_url': result["snippet"]["thumbnails"]["high"]["url"],
'id': result["id"]["videoId"],
'title': result["snippet"]["title"],
'type': "video"
})
return channel_videos
except requests.exceptions.RequestException as e:
print(f"Error during YouTube API request: {e}")
return []
def show_video(video_id, video_type):
if video_type == "video":
embed_url = f"https://www.youtube.com/embed/{video_id}"
elif video_type == "channel":
embed_url = f"https://www.youtube.com/channel/{video_id}"
elif video_type == "playlist":
embed_url = f"https://www.youtube.com/playlist?list={video_id}"
else:
return "Invalid YouTube URL. Please enter a valid YouTube video link."
html_code = f'''
<iframe width="100%" height="562" src="{embed_url}"
frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture"
allowfullscreen></iframe>
'''
return html_code
with gr.Blocks(css="""
#search_output {
max-width: 1300px;
}
#search_output img {
width: 150px !important;
height: 150px !important;
margin-right: 10px;
}
#search_output .gallery-item {
display: flex !important;
align-items: center !important;
margin-bottom: 30px !important;
}
#search_output .gallery-caption {
text-align: left !important;
padding-left: 20px;
font-size: 24px !important;
}
#video_container {
display: flex;
justify-content: center;
align-items: center;
max-width: 1500px;
margin: 0 auto;
padding: 20px;
background-color: #f9f9f9;
border-radius: 8px;
}
""") as demo:
gr.Markdown("## YouTube Video Search, Selection, and Playback")
video_ids_state = gr.State()
video_types_state = gr.State()
with gr.Row(elem_id="video_container"):
video_output = gr.HTML(label="Video Player", elem_id="video_output")
with gr.Row():
selected_video_link = gr.Textbox(label="Selected Video Link", interactive=False)
selected_video_type = gr.Textbox(label="Selected Video Type", visible=False, interactive=False)
play_video_button = gr.Button("Play Video")
with gr.Row():
with gr.Column(scale=3):
search_query_input = gr.Textbox(label="Search YouTube",
placeholder="Enter your search query here",
elem_id="search_query_input")
upload_date_input = gr.Dropdown(label="Upload Date", choices=["", "Last Hour", "Today", "This Week", "This Month", "This Year"])
video_type_input = gr.Dropdown(label="Type", choices=["All", "Video", "Channel", "Playlist"])
duration_input = gr.Dropdown(label="Duration", choices=["", "Short (<4 mins)", "Medium (4-20 mins)", "Long (>20 mins)"])
search_button = gr.Button("Search")
search_output = gr.Gallery(label="Search Results", columns=1, height="800px", elem_id="search_output")
channel_videos_output = gr.Gallery(label="Channel Videos", columns=1, height="800px", visible=False)
def update_search_results(query, upload_date, video_type, duration):
search_results = youtube_search(query, upload_date, video_type, duration)
gallery_items = []
video_ids = []
video_types = []
for item in search_results:
image_url = item['thumbnail_url']
title = item['title']
caption = f"{title}"
gallery_items.append((image_url, caption))
video_ids.append(item['id'])
video_types.append(item['type'])
return gallery_items, video_ids, video_types
def on_video_select(evt: gr.SelectData, video_ids, video_types):
index = evt.index
selected_video_id = video_ids[index]
selected_video_type = video_types[index]
if selected_video_type == "channel":
# Fetch videos from the channel
channel_videos = get_channel_videos(selected_video_id)
gallery_items = [(video['thumbnail_url'], video['title']) for video in channel_videos]
return gallery_items, video_ids, video_types
else:
return f"https://www.youtube.com/watch?v={selected_video_id}", selected_video_type
def play_video(video_id, video_type):
return show_video(video_id.split("=")[-1], video_type)
search_button.click(
update_search_results,
inputs=[search_query_input, upload_date_input, video_type_input, duration_input],
outputs=[search_output, video_ids_state, video_types_state]
)
search_output.select(
on_video_select,
inputs=[video_ids_state, video_types_state],
outputs=[selected_video_link, selected_video_type]
)
play_video_button.click(
play_video,
inputs=[selected_video_link, selected_video_type],
outputs=video_output
)
demo.launch()