import json from typing import Dict, List import spotipy from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger from spotipy.oauth2 import SpotifyPKCE LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s' LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyAutoPlaylistCreator', logFormat=LOG_FORMAT) class SpotifyAutoPlaylistCreator: def __init__(self, clientID: str, clientSecret: str, playlistInfo: List[Dict[str, str]], destinationPlaylistInfo: Dict[str, str], numberOfTracks: int, redirectUrl: str): self._clientID = clientID self._clientSecret = clientSecret self._playlistInfo = playlistInfo self._destinationPlaylistInfo = destinationPlaylistInfo self._numberOfTracks = numberOfTracks self._redirectUrl = redirectUrl self._spotify = self.login() def login(self) -> spotipy.Spotify: client_credentials_manager = SpotifyPKCE(client_id=self._clientID, redirect_uri=self._redirectUrl, scope='playlist-modify-private,playlist-modify-public') return spotipy.Spotify(client_credentials_manager=client_credentials_manager) def run(self): destinationPlaylist = self.__get_playlist(self._destinationPlaylistInfo['user'], self._destinationPlaylistInfo['id']) self.__CleanupDestinationPlaylist(destinationPlaylist) LOGGER.info(f'>>> Fetching tracks for all source playlists...') allTracks = [] for playlistInfo in SETTINGS['playlists']: playlist = self.__get_playlist(playlistInfo['user'], playlistInfo['id']) allTracks.extend(self.__get_tracks(playlist)) sortedTracks = sorted(allTracks, key=lambda d: d['added_at'], reverse=True) LOGGER.info(f'>>> Collecting latest tracks (limit: {self._numberOfTracks})...') latestTrackUris = self.__CollectLatestTracks(sortedTracks) LOGGER.info(f'>>> Found {len(latestTrackUris)} latest tracks (limit: {self._numberOfTracks})') LOGGER.info(f'>>> Adding tracks to destination playlist "{destinationPlaylist["name"]}"...') self._spotify.playlist_add_items(self._destinationPlaylistInfo['id'], latestTrackUris) def __CollectLatestTracks(self, sortedTracks): tracksToAdd = [] for track in sortedTracks: if len(tracksToAdd) >= self._numberOfTracks: break if track['is_local']: LOGGER.info(f'Skipping local track "{track["track"]["name"]}"') continue LOGGER.info(f'Appending track "{track["track"]["name"]}"') tracksToAdd.append(track) return self.__extract_track_uris(tracksToAdd) def __CleanupDestinationPlaylist(self, destinationPlaylist): existingTracks = self.__get_tracks(destinationPlaylist) LOGGER.info(f'>>> Removing {len(existingTracks)} tracks from destination ' f'playlist "{destinationPlaylist["name"]}"...') existingTracksUris = self.__extract_track_uris(existingTracks) self._spotify.playlist_remove_all_occurrences_of_items(self._destinationPlaylistInfo['id'], existingTracksUris) def __get_playlist(self, username: str, playlistID: str) -> Dict: LOGGER.info(f'>>> Fetching playlist with ID: {playlistID} by {username}...') identifier = f'spotify:user:{username}:playlist:{playlistID}' playlist = self._spotify.playlist(identifier) LOGGER.info(f'Found playlist "{playlist["name"]}"') return playlist def __get_tracks(self, playlist) -> List[Dict]: tracks = playlist['tracks'] results = tracks['items'] while tracks['next']: tracks = self._spotify.next(tracks) results.extend(tracks['items']) LOGGER.info(f'Fetched {len(results)} tracks') return results def __extract_track_uris(self, tracks: List) -> List[str]: return [track['track']['uri'] for track in tracks] if __name__ == '__main__': with open('config/settings-creator.json', 'r', encoding='utf-8') as f: SETTINGS = json.load(f) spotifyBackup = SpotifyAutoPlaylistCreator(SETTINGS['spotifyAPI']['clientID'], SETTINGS['spotifyAPI']['clientSecret'], SETTINGS['playlists'], SETTINGS['destinationPlaylist'], SETTINGS['numberOfTracks'], SETTINGS['redirectUrl']) spotifyBackup.run() LOGGER.info('### DONE ###')