Spaces:
Runtime error
Runtime error
import json | |
import requests | |
from tqdm import tqdm | |
import isodate | |
class YTstats: | |
def __init__(self, api_key): | |
self.api_key = api_key | |
self.channel_statistics = None | |
self.video_data = None | |
def extract_all(self, channel_id): | |
self.get_channel_statistics(channel_id) | |
self.get_channel_video_data(channel_id) | |
def get_channel_statistics(self, channel_id): | |
"""Extract the channel statistics""" | |
print('get channel statistics...') | |
url = f'https://www.googleapis.com/youtube/v3/channels?part=statistics&id={channel_id}&key={self.api_key}' | |
#pbar = tqdm(total=1) | |
json_url = requests.get(url) | |
data = json.loads(json_url.text) | |
try: | |
data = data['items'][0]['statistics'] | |
except KeyError: | |
print('Could not get channel statistics') | |
data = {} | |
self.channel_statistics = data | |
#pbar.update() | |
#pbar.close() | |
return data | |
def get_channel_video_data(self, channel_id, loading_bar, progress_text, item_limit=3): | |
"Extract all video information of the channel" | |
print('get video data...') | |
channel_videos, channel_playlists = self._get_channel_content(channel_id, limit=50) | |
channel_videos_out = dict() | |
total_items = len(channel_videos) | |
item = 0 | |
step_size=0 | |
step=0 | |
if total_items!=0: | |
step_size=round(1/total_items,4) | |
#step = step_size | |
parts=["snippet", "statistics","contentDetails", "topicDetails"] | |
for video_id in tqdm(channel_videos): | |
if item == item_limit: | |
break | |
loading_bar.progress(step, text=progress_text) | |
for part in parts: | |
data = self._get_single_video_data(video_id, part) | |
channel_videos[video_id].update(data) | |
duration = isodate.parse_duration(channel_videos[video_id]['duration']) | |
short_duration = isodate.parse_duration('PT4M') | |
if duration > short_duration: | |
item = item+1 | |
step = step +step_size | |
channel_videos_out[video_id] = channel_videos[video_id] | |
step=1.0 | |
loading_bar.progress(step, text=progress_text) | |
self.video_data = channel_videos_out | |
def _get_single_video_data(self, video_id, part): | |
""" | |
Extract further information for a single video | |
parts can be: 'snippet', 'statistics', 'contentDetails', 'topicDetails' | |
""" | |
url = f"https://www.googleapis.com/youtube/v3/videos?part={part}&id={video_id}&key={self.api_key}" | |
json_url = requests.get(url) | |
data = json.loads(json_url.text) | |
try: | |
data = data['items'][0][part] | |
except KeyError as e: | |
print(f'Error! Could not get {part} part of data: \n{data}') | |
data = dict() | |
return data | |
def _get_channel_content(self, channel_id, limit=None, check_all_pages=True): | |
""" | |
Extract all videos and playlists, can check all available search pages | |
channel_videos = videoId: title, publishedAt | |
channel_playlists = playlistId: title, publishedAt | |
return channel_videos, channel_playlists | |
""" | |
url = f"https://www.googleapis.com/youtube/v3/search?key={self.api_key}&channelId={channel_id}&part=snippet,id&order=date" | |
if limit is not None and isinstance(limit, int): | |
url += "&maxResults=" + str(limit) | |
vid, pl, npt = self._get_channel_content_per_page(url) | |
idx = 0 | |
while(check_all_pages and npt is not None and idx < 10): | |
nexturl = url + "&pageToken=" + npt | |
next_vid, next_pl, npt = self._get_channel_content_per_page(nexturl) | |
vid.update(next_vid) | |
pl.update(next_pl) | |
idx += 1 | |
return vid, pl | |
def _get_channel_content_per_page(self, url): | |
""" | |
Extract all videos and playlists per page | |
return channel_videos, channel_playlists, nextPageToken | |
""" | |
json_url = requests.get(url) | |
data = json.loads(json_url.text) | |
channel_videos = dict() | |
channel_playlists = dict() | |
if 'items' not in data: | |
print('Error! Could not get correct channel data!\n', data) | |
return channel_videos, channel_videos, None | |
nextPageToken = data.get("nextPageToken", None) | |
item_data = data['items'] | |
for item in item_data: | |
try: | |
kind = item['id']['kind'] | |
published_at = item['snippet']['publishedAt'] | |
title = item['snippet']['title'] | |
if kind == 'youtube#video': | |
video_id = item['id']['videoId'] | |
channel_videos[video_id] = {'publishedAt': published_at, 'title': title} | |
elif kind == 'youtube#playlist': | |
playlist_id = item['id']['playlistId'] | |
channel_playlists[playlist_id] = {'publishedAt': published_at, 'title': title} | |
except KeyError as e: | |
print('Error! Could not extract data from item:\n', item) | |
return channel_videos, channel_playlists, nextPageToken | |
def dump(self, channel_id): | |
"""Dumps channel statistics and video data in a single json file""" | |
if self.channel_statistics is None or self.video_data is None: | |
print('data is missing!\nCall get_channel_statistics() and get_channel_video_data() first!') | |
return | |
fused_data = {channel_id: {"channel_statistics": self.channel_statistics, | |
"video_data": self.video_data}} | |
channel_title = self.video_data.popitem()[1].get('channelTitle', channel_id) | |
channel_title = channel_title.replace(" ", "_").lower() | |
filename = channel_title + '.json' | |
with open(filename, 'w') as f: | |
json.dump(fused_data, f, indent=4) | |
print('file dumped to', filename) |