import json import random from typing import Dict, List import click import spotipy from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger from spotipy import CacheFileHandler from spotipy.oauth2 import SpotifyOAuth LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s' LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyAutoPlaylistCreator', logFormat=LOG_FORMAT) MODE_LATEST = 'LATEST' MODE_RANDOM = 'RANDOM' class SpotifyAutoPlaylistCreator: def __init__(self, clientID: str, clientSecret: str, playlistInfo: List[Dict[str, str]], destinationPlaylistInfo: Dict[str, str], numberOfTracks: int, mode: str, redirectUrl: str, openBrowser: bool, cacheFilePath: str): self._clientID = clientID self._clientSecret = clientSecret self._playlistInfo = playlistInfo self._destinationPlaylistInfo = destinationPlaylistInfo self._numberOfTracks = numberOfTracks self._mode = mode self._redirectUrl = redirectUrl self._openBrowser = openBrowser self._cacheFilePath = cacheFilePath self._spotify = self.login() def login(self) -> spotipy.Spotify: client_credentials_manager = SpotifyOAuth(client_id=self._clientID, client_secret=self._clientSecret, redirect_uri=self._redirectUrl, scope='playlist-modify-private,playlist-modify-public', open_browser=self._openBrowser, cache_handler=CacheFileHandler(cache_path=self._cacheFilePath)) return spotipy.Spotify(client_credentials_manager=client_credentials_manager) def run(self): destinationPlaylist = self.__get_playlist(self._destinationPlaylistInfo['user'], self._destinationPlaylistInfo['id']) self.__CleanupDestinationPlaylist(destinationPlaylist) LOGGER.info(f'>>> Fetching tracks for all source playlists...') allTracks = [] for playlistInfo in self._playlistInfo: playlist = self.__get_playlist(playlistInfo['user'], playlistInfo['id']) allTracks.extend(self.__get_tracks(playlist)) LOGGER.info(f'>>> Run mode {self._mode}...') if self._mode == MODE_LATEST: self.__RunModeLatestTracks(allTracks, destinationPlaylist) elif self._mode == MODE_RANDOM: self.__RunModeRandom(allTracks, destinationPlaylist) else: raise RuntimeError(f'Unknown mode {self._mode}') def __RunModeLatestTracks(self, allTracks: List, destinationPlaylist) -> None: sortedTracks = sorted(allTracks, key=lambda d: d['added_at'], reverse=True) LOGGER.info(f'>>> Collecting latest tracks (limit: {self._numberOfTracks})...') latestTrackUris = self.__CollectLatestTracks(sortedTracks) LOGGER.info(f'>>> Found {len(latestTrackUris)} latest tracks (limit: {self._numberOfTracks})') LOGGER.info(f'>>> Adding tracks to destination playlist "{destinationPlaylist["name"]}"...') self._spotify.playlist_add_items(self._destinationPlaylistInfo['id'], latestTrackUris) def __RunModeRandom(self, allTracks: List, destinationPlaylist) -> None: randomTracks = [] filteredTracks = [t for t in allTracks if not t['is_local']] while len(randomTracks) < self._numberOfTracks: numberOfExercisesToGenerate = self._numberOfTracks - len(randomTracks) randomTracks.extend(random.sample(filteredTracks, min(len(filteredTracks), numberOfExercisesToGenerate))) LOGGER.info(f'>>> Found {len(randomTracks)} random tracks (limit: {self._numberOfTracks})') LOGGER.info(f'>>> Adding tracks to destination playlist "{destinationPlaylist["name"]}"...') randomTrackUris = self.__extract_track_uris(randomTracks) for batch in self.__chunk_list(randomTrackUris, 100): self._spotify.playlist_add_items(self._destinationPlaylistInfo['id'], batch) @staticmethod def __chunk_list(items, batchSize): for i in range(0, len(items), batchSize): yield items[i:i + batchSize] def __CollectLatestTracks(self, sortedTracks): tracksToAdd = [] for track in sortedTracks: if len(tracksToAdd) >= self._numberOfTracks: break if track['is_local']: # It's not possible to add a local track to a playlist using the web API. # https://github.com/plamere/spotipy/issues/793#issuecomment-1082421408 LOGGER.info(f'Skipping local track "{track["track"]["name"]}"') continue LOGGER.info(f'Appending track "{track["track"]["name"]}"') tracksToAdd.append(track) return self.__extract_track_uris(tracksToAdd) def __CleanupDestinationPlaylist(self, destinationPlaylist): existingTracks = self.__get_tracks(destinationPlaylist) LOGGER.info(f'>>> Removing {len(existingTracks)} tracks from destination ' f'playlist "{destinationPlaylist["name"]}"...') existingTracksUris = self.__extract_track_uris(existingTracks) if existingTracksUris: self._spotify.playlist_remove_all_occurrences_of_items(self._destinationPlaylistInfo['id'], existingTracksUris) def __get_playlist(self, username: str, playlistID: str) -> Dict: LOGGER.info(f'>>> Fetching playlist with ID: {playlistID} by {username}...') identifier = f'spotify:user:{username}:playlist:{playlistID}' playlist = self._spotify.playlist(identifier) LOGGER.info(f'Found playlist "{playlist["name"]}"') return playlist def __get_tracks(self, playlist) -> List[Dict]: tracks = playlist['tracks'] results = tracks['items'] while tracks['next']: tracks = self._spotify.next(tracks) results.extend(tracks['items']) LOGGER.info(f'Fetched {len(results)} tracks') return results def __extract_track_uris(self, tracks: List) -> List[str]: return [track['track']['uri'] for track in tracks] @click.command() @click.option('--settings-path', '-s', 'settingsPath', help='Path to the settings file.', required=True) def start(settingsPath) -> None: with open(settingsPath, 'r', encoding='utf-8') as f: SETTINGS = json.load(f) spotifyBackup = SpotifyAutoPlaylistCreator(SETTINGS['spotifyAPI']['clientID'], SETTINGS['spotifyAPI']['clientSecret'], SETTINGS['playlists'], SETTINGS['destinationPlaylist'], SETTINGS['numberOfTracks'], SETTINGS['mode'], SETTINGS['redirectUrl'], SETTINGS['openBrowser'], SETTINGS['cacheFilePath']) spotifyBackup.run() LOGGER.info('### DONE ###') if __name__ == '__main__': start()