from __future__ import unicode_literals import json import logging import os import sys import googleapiclient.discovery import googleapiclient.errors import youtube_dl def prepare_logging() -> logging.Logger: LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s' DATE_FORMAT = '%Y-%m-%d %H:%M:%S' LOG_FORMATTER = logging.Formatter(fmt=LOG_FORMAT, datefmt=DATE_FORMAT) logger = logging.getLogger('SaveMyPlaylist') logger.setLevel(logging.DEBUG) outHandler = logging.StreamHandler(sys.stdout) outHandler.setFormatter(LOG_FORMATTER) outHandler.setLevel(logging.DEBUG) outHandler.addFilter(lambda record: record.levelno <= logging.INFO) logger.addHandler(outHandler) errHandler = logging.StreamHandler(sys.stderr) errHandler.setFormatter(LOG_FORMATTER) errHandler.setLevel(logging.WARNING) logger.addHandler(errHandler) return logger logger = prepare_logging() class MyLogger(object): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): print(msg, file=sys.stderr) def my_hook(d): if d['status'] == 'finished': logger.info('Download finished for {}'.format(d['filename'])) class SaveMyPlaylist: SCOPES = ['https://www.googleapis.com/auth/youtube.readonly'] API_NAME = 'youtube' API_VERSION = 'v3' CHANNEL = 0 TITLE = 1 VIDEO_ID = 2 ILLEGAL_CHARS = ['NUL', '\',''//', ':', '*', '"', '<', '>', '|'] def __init__(self, apiKey, playlistId): with open('version.json', 'r', encoding='utf-8') as f: VERSION = json.load(f)['version'] logger.info('### SaveMyPlaylist {} ###'.format(VERSION['name'])) logger.info('=============================') self._apiKey = apiKey self._playlistId = playlistId self._youtubeApi = googleapiclient.discovery.build(self.API_NAME, self.API_VERSION, developerKey=self._apiKey) self._items = self.__fetch_all_playlist_items() def __fetch_all_playlist_items(self): items = [] nextPageToken = 0 while nextPageToken is not None: pageItems, nextPageToken = self.__fetch_playlist_items(nextPageToken) items.extend(pageItems) logger.info('>>> Found {} items in playlist'.format(len(items))) return items def __fetch_playlist_items(self, nextPageToken=None): if nextPageToken is None or nextPageToken == 0: request = self._youtubeApi.playlistItems().list( part='snippet', playlistId=self._playlistId, maxResults=50, ) else: request = self._youtubeApi.playlistItems().list( part='snippet', playlistId=self._playlistId, maxResults=50, pageToken=nextPageToken ) response = request.execute() items = [] for item in response['items']: snippet = item['snippet'] title = snippet['title'] videoId = snippet['resourceId']['videoId'] channelName = self.__get_channel_name(videoId) items.append((channelName, title, videoId)) logger.info(f'{channelName} - {title} (videoId: {videoId})') nextPageToken = None if 'nextPageToken' in response: nextPageToken = response['nextPageToken'] return items, nextPageToken def __get_channel_name(self, videoId): request = self._youtubeApi.videos().list( part='snippet', id=videoId, maxResults=1 ) response = request.execute() if not response['items']: return '' return response['items'][0]['snippet']['channelTitle'] def download_items(self, destinationFolder, debug=False): os.makedirs(destinationFolder, exist_ok=True) logger.info('>>> Scanning destination folder...') downloadedVideos = [f for f in os.listdir(destinationFolder) if os.path.isfile(os.path.join(destinationFolder, f)) and f.endswith('.mp4')] logger.info('>>> Found {} videos in destination folder'.format(len(downloadedVideos))) logger.info('>>> Started Downloading...') newVideos = [] for idx, item in enumerate(self._items): fileName = '{} - {}.mp4'.format(item[self.TITLE], item[self.CHANNEL]) fileName = self.__escape_file_name(fileName) if fileName in downloadedVideos: logger.info('Skipping {}/{}: "{}" as it already exists'.format(idx + 1, len(self._items), fileName)) continue try: logger.info('Downloading {}/{}: "{}"'.format(idx + 1, len(self._items), fileName)) newVideos.append(item) ydl_opts = { 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]', 'merge_output_format': 'mp4', 'outtmpl': os.path.join(destinationFolder, fileName), 'logger': MyLogger(), 'progress_hooks': [my_hook] } if debug: continue with youtube_dl.YoutubeDL(ydl_opts) as ydl: ydl.download(['https://www.youtube.com/watch?v={}'.format(item[self.VIDEO_ID])]) except Exception as e: logger.error(f'Error while downloading video "{fileName}" with ID: "{item[self.VIDEO_ID]}"', exc_info=e) logger.info('>>> Finished Downloading') logger.info('Downloaded {} new videos'.format(len(newVideos))) def __escape_file_name(self, fileName): for char in self.ILLEGAL_CHARS: fileName = fileName.replace(char, '') return fileName if __name__ == '__main__': with open("config/settings.json", "r") as f: SETTINGS = json.load(f) saveMyPlaylist = SaveMyPlaylist(SETTINGS['apiKey'], SETTINGS['playlistId']) saveMyPlaylist.download_items(SETTINGS['destinationFolder'], SETTINGS['skipDownload'])