Newer
Older
from __future__ import unicode_literals
import googleapiclient.discovery
import googleapiclient.errors
import youtube_dl
VERSION = (2, 0, 0)
class MyLogger(object):
def debug(self, msg):
pass
def warning(self, msg):
pass
def error(self, msg):
print(msg, file=sys.stderr)
def my_hook(d):
if d['status'] == 'finished':
print('Download finished!')
class SaveMyPlaylist:
SCOPES = ['https://www.googleapis.com/auth/youtube.readonly']
API_NAME = 'youtube'
API_VERSION = 'v3'
CHANNEL = 0
TITLE = 1
VIDEO_ID = 2
ILLEGAL_CHARS = ['NUL', '\',''//', ':', '*', '"', '<', '>', '|']
def __init__(self, apiKey, playlistId):
print('### SaveMyPlaylist v{} ###'.format('.'.join(str(i) for i in VERSION)))
print('=============================\n')
self._apiKey = apiKey
self._playlistId = playlistId
self._youtubeApi = googleapiclient.discovery.build(self.API_NAME, self.API_VERSION, developerKey=self._apiKey)
self._items = self.__fetch_all_playlist_items()
def __fetch_all_playlist_items(self):
items = []
nextPageToken = 0
while nextPageToken is not None:
pageItems, nextPageToken = self.__fetch_playlist_items(nextPageToken)
items.extend(pageItems)
print('\n>>> Found {} items in playlist\n'.format(len(items)))
return items
def __fetch_playlist_items(self, nextPageToken=None):
if nextPageToken is None or nextPageToken == 0:
request = self._youtubeApi.playlistItems().list(
request = self._youtubeApi.playlistItems().list(
response = request.execute()
items = []
for item in response['items']:
snippet = item['snippet']
title = snippet['title']
channel = snippet['channelTitle']
videoId = snippet['resourceId']['videoId']
items.append((channel, title, videoId))
print('{} - {} (videoId: {})'.format(channel, title, videoId))
nextPageToken = None
if 'nextPageToken' in response:
nextPageToken = response['nextPageToken']
return items, nextPageToken
def download_items(self, destinationFolder, debug=False):
os.makedirs(destinationFolder, exist_ok=True)
print('>>> Scanning destination folder...')
downloadedVideos = [f for f in os.listdir(destinationFolder) if
os.path.isfile(os.path.join(destinationFolder, f)) and f.endswith('.mp4')]
print('>>> Found {} videos in destination folder'.format(len(downloadedVideos)))
print('\n>>> Started Downloading...')
newVideos = []
for idx, item in enumerate(self._items):
fileName = '{} - {}.mp4'.format(item[self.TITLE], item[self.CHANNEL])
fileName = self.__escape_file_name(fileName)
if fileName in downloadedVideos:
print('Skipping {}/{}: "{}" as it already exists'.format(idx + 1, len(self._items), fileName))
continue
print('Downloading {}/{}: "{}"'.format(idx + 1, len(self._items), fileName))
newVideos.append(item)
ydl_opts = {
'merge_output_format': 'mp4',
'outtmpl': os.path.join(destinationFolder, fileName),
'logger': MyLogger(),
}
if debug:
continue
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
ydl.download(['https://www.youtube.com/watch?v={}'.format(item[self.VIDEO_ID])])
print('\n>>> Finished Downloading')
print('Downloaded {} new videos'.format(len(newVideos)))
def __escape_file_name(self, fileName):
for char in self.ILLEGAL_CHARS:
fileName = fileName.replace(char, '')
return fileName
if __name__ == '__main__':
with open("config/settings.json", "r") as f:
saveMyPlaylist = SaveMyPlaylist(SETTINGS['apiKey'], SETTINGS['playlistId'])
saveMyPlaylist.download_items(SETTINGS['destinationFolder'], SETTINGS['skipDownload'])