import json import requests from tqdm import tqdm import isodate class YTstats: def __init__(self, api_key): self.api_key = api_key self.channel_statistics = None self.video_data = None def extract_all(self, channel_id): self.get_channel_statistics(channel_id) self.get_channel_video_data(channel_id) def get_channel_statistics(self, channel_id): """Extract the channel statistics""" print('get channel statistics...') url = f'{channel_id}&key={self.api_key}' #pbar = tqdm(total=1) json_url = requests.get(url) data = json.loads(json_url.text) try: data = data['items'][0]['statistics'] except KeyError: print('Could not get channel statistics') data = {} self.channel_statistics = data #pbar.update() #pbar.close() return data def get_channel_video_data(self, channel_id, loading_bar, progress_text, item_limit=3): "Extract all video information of the channel" print('get video data...') channel_videos, channel_playlists = self._get_channel_content(channel_id, limit=50) channel_videos_out = dict() total_items = len(channel_videos) item = 0 step_size=0 step=0 if total_items!=0: step_size=round(1/total_items,4) #step = step_size parts=["snippet", "statistics","contentDetails", "topicDetails"] for video_id in tqdm(channel_videos): if item == item_limit: break loading_bar.progress(step, text=progress_text) for part in parts: data = self._get_single_video_data(video_id, part) channel_videos[video_id].update(data) duration = isodate.parse_duration(channel_videos[video_id]['duration']) short_duration = isodate.parse_duration('PT4M') if duration > short_duration: item = item+1 step = step +step_size channel_videos_out[video_id] = channel_videos[video_id] step=1.0 loading_bar.progress(step, text=progress_text) self.video_data = channel_videos_out def _get_single_video_data(self, video_id, part): """ Extract further information for a single video parts can be: 'snippet', 'statistics', 'contentDetails', 'topicDetails' """ url = f"{part}&id={video_id}&key={self.api_key}" json_url = requests.get(url) data = json.loads(json_url.text) try: data = data['items'][0][part] except KeyError as e: print(f'Error! Could not get {part} part of data: \n{data}') data = dict() return data def _get_channel_content(self, channel_id, limit=None, check_all_pages=True): """ Extract all videos and playlists, can check all available search pages channel_videos = videoId: title, publishedAt channel_playlists = playlistId: title, publishedAt return channel_videos, channel_playlists """ url = f"{self.api_key}&channelId={channel_id}&part=snippet,id&order=date" if limit is not None and isinstance(limit, int): url += "&maxResults=" + str(limit) vid, pl, npt = self._get_channel_content_per_page(url) idx = 0 while(check_all_pages and npt is not None and idx < 10): nexturl = url + "&pageToken=" + npt next_vid, next_pl, npt = self._get_channel_content_per_page(nexturl) vid.update(next_vid) pl.update(next_pl) idx += 1 return vid, pl def _get_channel_content_per_page(self, url): """ Extract all videos and playlists per page return channel_videos, channel_playlists, nextPageToken """ json_url = requests.get(url) data = json.loads(json_url.text) channel_videos = dict() channel_playlists = dict() if 'items' not in data: print('Error! Could not get correct channel data!\n', data) return channel_videos, channel_videos, None nextPageToken = data.get("nextPageToken", None) item_data = data['items'] for item in item_data: try: kind = item['id']['kind'] published_at = item['snippet']['publishedAt'] title = item['snippet']['title'] if kind == 'youtube#video': video_id = item['id']['videoId'] channel_videos[video_id] = {'publishedAt': published_at, 'title': title} elif kind == 'youtube#playlist': playlist_id = item['id']['playlistId'] channel_playlists[playlist_id] = {'publishedAt': published_at, 'title': title} except KeyError as e: print('Error! Could not extract data from item:\n', item) return channel_videos, channel_playlists, nextPageToken def dump(self, channel_id): """Dumps channel statistics and video data in a single json file""" if self.channel_statistics is None or self.video_data is None: print('data is missing!\nCall get_channel_statistics() and get_channel_video_data() first!') return fused_data = {channel_id: {"channel_statistics": self.channel_statistics, "video_data": self.video_data}} channel_title = self.video_data.popitem()[1].get('channelTitle', channel_id) channel_title = channel_title.replace(" ", "_").lower() filename = channel_title + '.json' with open(filename, 'w') as f: json.dump(fused_data, f, indent=4) print('file dumped to', filename)