Spaces:
Runtime error
Runtime error
File size: 6,087 Bytes
68d26c9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
import json
import requests
from tqdm import tqdm
import isodate
class YTstats:
def __init__(self, api_key):
self.api_key = api_key
self.channel_statistics = None
self.video_data = None
def extract_all(self, channel_id):
self.get_channel_statistics(channel_id)
self.get_channel_video_data(channel_id)
def get_channel_statistics(self, channel_id):
"""Extract the channel statistics"""
print('get channel statistics...')
url = f'https://www.googleapis.com/youtube/v3/channels?part=statistics&id={channel_id}&key={self.api_key}'
#pbar = tqdm(total=1)
json_url = requests.get(url)
data = json.loads(json_url.text)
try:
data = data['items'][0]['statistics']
except KeyError:
print('Could not get channel statistics')
data = {}
self.channel_statistics = data
#pbar.update()
#pbar.close()
return data
def get_channel_video_data(self, channel_id, loading_bar, progress_text, item_limit=3):
"Extract all video information of the channel"
print('get video data...')
channel_videos, channel_playlists = self._get_channel_content(channel_id, limit=50)
channel_videos_out = dict()
total_items = len(channel_videos)
item = 0
step_size=0
step=0
if total_items!=0:
step_size=round(1/total_items,4)
#step = step_size
parts=["snippet", "statistics","contentDetails", "topicDetails"]
for video_id in tqdm(channel_videos):
if item == item_limit:
break
loading_bar.progress(step, text=progress_text)
for part in parts:
data = self._get_single_video_data(video_id, part)
channel_videos[video_id].update(data)
duration = isodate.parse_duration(channel_videos[video_id]['duration'])
short_duration = isodate.parse_duration('PT4M')
if duration > short_duration:
item = item+1
step = step +step_size
channel_videos_out[video_id] = channel_videos[video_id]
step=1.0
loading_bar.progress(step, text=progress_text)
self.video_data = channel_videos_out
def _get_single_video_data(self, video_id, part):
"""
Extract further information for a single video
parts can be: 'snippet', 'statistics', 'contentDetails', 'topicDetails'
"""
url = f"https://www.googleapis.com/youtube/v3/videos?part={part}&id={video_id}&key={self.api_key}"
json_url = requests.get(url)
data = json.loads(json_url.text)
try:
data = data['items'][0][part]
except KeyError as e:
print(f'Error! Could not get {part} part of data: \n{data}')
data = dict()
return data
def _get_channel_content(self, channel_id, limit=None, check_all_pages=True):
"""
Extract all videos and playlists, can check all available search pages
channel_videos = videoId: title, publishedAt
channel_playlists = playlistId: title, publishedAt
return channel_videos, channel_playlists
"""
url = f"https://www.googleapis.com/youtube/v3/search?key={self.api_key}&channelId={channel_id}&part=snippet,id&order=date"
if limit is not None and isinstance(limit, int):
url += "&maxResults=" + str(limit)
vid, pl, npt = self._get_channel_content_per_page(url)
idx = 0
while(check_all_pages and npt is not None and idx < 10):
nexturl = url + "&pageToken=" + npt
next_vid, next_pl, npt = self._get_channel_content_per_page(nexturl)
vid.update(next_vid)
pl.update(next_pl)
idx += 1
return vid, pl
def _get_channel_content_per_page(self, url):
"""
Extract all videos and playlists per page
return channel_videos, channel_playlists, nextPageToken
"""
json_url = requests.get(url)
data = json.loads(json_url.text)
channel_videos = dict()
channel_playlists = dict()
if 'items' not in data:
print('Error! Could not get correct channel data!\n', data)
return channel_videos, channel_videos, None
nextPageToken = data.get("nextPageToken", None)
item_data = data['items']
for item in item_data:
try:
kind = item['id']['kind']
published_at = item['snippet']['publishedAt']
title = item['snippet']['title']
if kind == 'youtube#video':
video_id = item['id']['videoId']
channel_videos[video_id] = {'publishedAt': published_at, 'title': title}
elif kind == 'youtube#playlist':
playlist_id = item['id']['playlistId']
channel_playlists[playlist_id] = {'publishedAt': published_at, 'title': title}
except KeyError as e:
print('Error! Could not extract data from item:\n', item)
return channel_videos, channel_playlists, nextPageToken
def dump(self, channel_id):
"""Dumps channel statistics and video data in a single json file"""
if self.channel_statistics is None or self.video_data is None:
print('data is missing!\nCall get_channel_statistics() and get_channel_video_data() first!')
return
fused_data = {channel_id: {"channel_statistics": self.channel_statistics,
"video_data": self.video_data}}
channel_title = self.video_data.popitem()[1].get('channelTitle', channel_id)
channel_title = channel_title.replace(" ", "_").lower()
filename = channel_title + '.json'
with open(filename, 'w') as f:
json.dump(fused_data, f, indent=4)
print('file dumped to', filename) |