from __future__ import unicode_literals import json import os import sys import googleapiclient.discovery import googleapiclient.errors import youtube_dl VERSION = (2, 0, 0) class MyLogger(object): def debug(self, msg): pass def warning(self, msg): pass def error(self, msg): print(msg, file=sys.stderr) def my_hook(d): if d['status'] == 'finished': print('Download finished!') class SaveMyPlaylist: SCOPES = ['https://www.googleapis.com/auth/youtube.readonly'] API_NAME = 'youtube' API_VERSION = 'v3' CHANNEL = 0 TITLE = 1 VIDEO_ID = 2 ILLEGAL_CHARS = ['NUL', '\',''//', ':', '*', '"', '<', '>', '|'] def __init__(self, apiKey, playlistId): print('### SaveMyPlaylist v{} ###'.format('.'.join(str(i) for i in VERSION))) print('=============================\n') self.__apiKey = apiKey self.__playlistId = playlistId self.__youtubeApi = googleapiclient.discovery.build(self.API_NAME, self.API_VERSION, developerKey=self.__apiKey) self.__items = self.__fetch_all_playlist_items() def __fetch_all_playlist_items(self): items = [] nextPageToken = 0 while nextPageToken is not None: pageItems, nextPageToken = self.__fetch_playlist_items(nextPageToken) items.extend(pageItems) print('\n>>> Found {} items in playlist\n'.format(len(items))) return items def __fetch_playlist_items(self, nextPageToken=None): if nextPageToken is None or nextPageToken == 0: request = self.__youtubeApi.playlistItems().list( part='snippet', playlistId=self.__playlistId, maxResults=50, ) else: request = self.__youtubeApi.playlistItems().list( part='snippet', playlistId=self.__playlistId, maxResults=50, pageToken=nextPageToken ) response = request.execute() items = [] for item in response['items']: snippet = item['snippet'] title = snippet['title'] channel = snippet['channelTitle'] videoId = snippet['resourceId']['videoId'] items.append((channel, title, videoId)) print('{} - {} (videoId: {})'.format(channel, title, videoId)) nextPageToken = None if 'nextPageToken' in response: nextPageToken = response['nextPageToken'] return items, nextPageToken def download_items(self, destinationFolder, debug=False): print('>>> Scanning destination folder...') downloadedVideos = [f for f in os.listdir(destinationFolder) if os.path.isfile(os.path.join(destinationFolder, f)) and f.endswith('.mp4')] print('>>> Found {} videos in destination folder'.format(len(downloadedVideos))) print('\n>>> Started Downloading...') newVideos = [] for idx, item in enumerate(self.__items): fileName = '{} - {}.mp4'.format(item[self.TITLE], item[self.CHANNEL]) fileName = self.__escape_file_name(fileName) if fileName in downloadedVideos: print('Skipping {}/{}: "{}" as it already exists'.format(idx + 1, len(self.__items), fileName)) continue print('Downloading {}/{}: "{}"'.format(idx + 1, len(self.__items), fileName)) newVideos.append(item) ydl_opts = { 'format': 'bestaudio/best', 'merge_output_format': 'mp4', 'outtmpl': os.path.join(destinationFolder, fileName), 'logger': MyLogger(), 'progress_hooks': [my_hook], } if debug: continue with youtube_dl.YoutubeDL(ydl_opts) as ydl: ydl.download(['https://www.youtube.com/watch?v={}'.format(item[self.VIDEO_ID])]) print('\n>>> Finished Downloading') print('Downloaded {} new videos'.format(len(newVideos))) def __escape_file_name(self, fileName): for char in self.ILLEGAL_CHARS: fileName = fileName.replace(char, '') return fileName if __name__ == '__main__': with open("settings.json", "r") as f: SETTINGS = json.load(f) saveMyPlaylist = SaveMyPlaylist(SETTINGS['apiKey'], SETTINGS['playlistId']) saveMyPlaylist.download_items(SETTINGS['destinationFolder'], SETTINGS['skipDownload'])