Skip to content
Snippets Groups Projects
SpotifyAutoPlaylistCreator.py 7.28 KiB
Newer Older
  • Learn to ignore specific revisions
  • import spotipy
    from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
    
    from spotipy import CacheFileHandler
    
    from spotipy.oauth2 import SpotifyOAuth
    
    
    LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
    LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyAutoPlaylistCreator', logFormat=LOG_FORMAT)
    
    
    
        def __init__(self, clientID: str,
                     clientSecret: str,
                     playlistInfo: List[Dict[str, str]],
                     destinationPlaylistInfo: Dict[str, str],
                     numberOfTracks: int,
    
                     redirectUrl: str,
    
                     openBrowser: bool,
                     cacheFilePath: str):
    
            self._clientID = clientID
            self._clientSecret = clientSecret
            self._playlistInfo = playlistInfo
            self._destinationPlaylistInfo = destinationPlaylistInfo
            self._numberOfTracks = numberOfTracks
    
            self._redirectUrl = redirectUrl
    
            self._openBrowser = openBrowser
    
            self._cacheFilePath = cacheFilePath
    
    
            self._spotify = self.login()
    
        def login(self) -> spotipy.Spotify:
    
            client_credentials_manager = SpotifyOAuth(client_id=self._clientID,
                                                      client_secret=self._clientSecret,
                                                      redirect_uri=self._redirectUrl,
                                                      scope='playlist-modify-private,playlist-modify-public',
    
                                                      open_browser=self._openBrowser,
                                                      cache_handler=CacheFileHandler(cache_path=self._cacheFilePath))
    
            return spotipy.Spotify(client_credentials_manager=client_credentials_manager)
    
        def run(self):
            destinationPlaylist = self.__get_playlist(self._destinationPlaylistInfo['user'],
                                                      self._destinationPlaylistInfo['id'])
    
            self.__CleanupDestinationPlaylist(destinationPlaylist)
    
            LOGGER.info(f'>>> Fetching tracks for all source playlists...')
            allTracks = []
    
                playlist = self.__get_playlist(playlistInfo['user'], playlistInfo['id'])
                allTracks.extend(self.__get_tracks(playlist))
    
    
            LOGGER.info(f'>>> Run mode {self._mode}...')
            if self._mode == MODE_LATEST:
                self.__RunModeLatestTracks(allTracks, destinationPlaylist)
            elif self._mode == MODE_RANDOM:
                self.__RunModeRandom(allTracks, destinationPlaylist)
            else:
                raise RuntimeError(f'Unknown mode {self._mode}')
    
        def __RunModeLatestTracks(self, allTracks: List, destinationPlaylist) -> None:
    
            sortedTracks = sorted(allTracks, key=lambda d: d['added_at'], reverse=True)
    
            LOGGER.info(f'>>> Collecting latest tracks (limit: {self._numberOfTracks})...')
            latestTrackUris = self.__CollectLatestTracks(sortedTracks)
    
            LOGGER.info(f'>>> Found {len(latestTrackUris)} latest tracks (limit: {self._numberOfTracks})')
    
            LOGGER.info(f'>>> Adding tracks to destination playlist "{destinationPlaylist["name"]}"...')
            self._spotify.playlist_add_items(self._destinationPlaylistInfo['id'], latestTrackUris)
    
    
        def __RunModeRandom(self, allTracks: List, destinationPlaylist) -> None:
            randomTracks = []
            filteredTracks = [t for t in allTracks if not t['is_local']]
    
            while len(randomTracks) < self._numberOfTracks:
                numberOfExercisesToGenerate = self._numberOfTracks - len(randomTracks)
                randomTracks.extend(random.sample(filteredTracks, min(len(filteredTracks), numberOfExercisesToGenerate)))
    
            LOGGER.info(f'>>> Found {len(randomTracks)} random tracks (limit: {self._numberOfTracks})')
    
            LOGGER.info(f'>>> Adding tracks to destination playlist "{destinationPlaylist["name"]}"...')
            randomTrackUris = self.__extract_track_uris(randomTracks)
    
    
            for batch in self.__chunk_list(randomTrackUris, 100):
                self._spotify.playlist_add_items(self._destinationPlaylistInfo['id'], batch)
    
        @staticmethod
        def __chunk_list(items, batchSize):
            for i in range(0, len(items), batchSize):
                yield items[i:i + batchSize]
    
        def __CollectLatestTracks(self, sortedTracks):
            tracksToAdd = []
            for track in sortedTracks:
                if len(tracksToAdd) >= self._numberOfTracks:
                    break
    
                if track['is_local']:
    
                    # It's not possible to add a local track to a playlist using the web API.
                    # https://github.com/plamere/spotipy/issues/793#issuecomment-1082421408
    
                    LOGGER.info(f'Skipping local track "{track["track"]["name"]}"')
                    continue
    
                LOGGER.info(f'Appending track "{track["track"]["name"]}"')
                tracksToAdd.append(track)
    
            return self.__extract_track_uris(tracksToAdd)
    
        def __CleanupDestinationPlaylist(self, destinationPlaylist):
            existingTracks = self.__get_tracks(destinationPlaylist)
            LOGGER.info(f'>>> Removing {len(existingTracks)} tracks from destination '
                        f'playlist "{destinationPlaylist["name"]}"...')
            existingTracksUris = self.__extract_track_uris(existingTracks)
    
            if existingTracksUris:
                self._spotify.playlist_remove_all_occurrences_of_items(self._destinationPlaylistInfo['id'], existingTracksUris)
    
    
        def __get_playlist(self, username: str, playlistID: str) -> Dict:
            LOGGER.info(f'>>> Fetching playlist with ID: {playlistID} by {username}...')
            identifier = f'spotify:user:{username}:playlist:{playlistID}'
            playlist = self._spotify.playlist(identifier)
            LOGGER.info(f'Found playlist "{playlist["name"]}"')
            return playlist
    
        def __get_tracks(self, playlist) -> List[Dict]:
            tracks = playlist['tracks']
            results = tracks['items']
    
            while tracks['next']:
                tracks = self._spotify.next(tracks)
                results.extend(tracks['items'])
    
            LOGGER.info(f'Fetched {len(results)} tracks')
            return results
    
        def __extract_track_uris(self, tracks: List) -> List[str]:
            return [track['track']['uri'] for track in tracks]
    
    
    
    @click.command()
    @click.option('--settings-path', '-s', 'settingsPath', help='Path to the settings file.', required=True)
    def start(settingsPath) -> None:
        with open(settingsPath, 'r', encoding='utf-8') as f:
    
            SETTINGS = json.load(f)
    
        spotifyBackup = SpotifyAutoPlaylistCreator(SETTINGS['spotifyAPI']['clientID'],
                                                   SETTINGS['spotifyAPI']['clientSecret'],
                                                   SETTINGS['playlists'],
                                                   SETTINGS['destinationPlaylist'],
    
                                                   SETTINGS['numberOfTracks'],
    
                                                   SETTINGS['redirectUrl'],
    
                                                   SETTINGS['openBrowser'],
                                                   SETTINGS['cacheFilePath'])