Skip to content
Snippets Groups Projects
Select Git revision
  • 6b037a6465a1f61c9bf62ad96e389e84472f4d7e
  • master default
  • v1.12.2
  • v1.12.1
  • v1.12.0
  • v1.11.0
  • v1.10.0
  • v1.1.0
  • v1.0.0
9 results

SpotifyBackup.py

Blame
  • SpotifyBackup.py 5.11 KiB
    import csv
    import json
    import logging
    import os
    import sys
    from datetime import datetime, timedelta
    
    import spotipy
    from spotipy.oauth2 import SpotifyClientCredentials
    
    
    def prepare_logging() -> logging.Logger:
        LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
        DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
    
        LOG_FORMATTER = logging.Formatter(fmt=LOG_FORMAT, datefmt=DATE_FORMAT)
    
        logger = logging.getLogger('SpotifyBackup')
        logger.setLevel(logging.DEBUG)
    
        outHandler = logging.StreamHandler(sys.stdout)
        outHandler.setFormatter(LOG_FORMATTER)
        outHandler.setLevel(logging.DEBUG)
        outHandler.addFilter(lambda record: record.levelno <= logging.INFO)
        logger.addHandler(outHandler)
    
        errHandler = logging.StreamHandler(sys.stderr)
        errHandler.setFormatter(LOG_FORMATTER)
        errHandler.setLevel(logging.WARNING)
        logger.addHandler(errHandler)
        return logger
    
    
    LOGGER = prepare_logging()
    
    
    class SpotifyBackup:
        DATE_FORMAT = '%Y-%m-%d_%H-%M-%S'
        DATE_FORMAT_SHORT = '%Y-%m-%d'
        INVALID_CHARS = [' ', '\t', '\n']
        REPLACE_CHAR = '_'
    
        def __init__(self, clientID: str, clientSecret: str, exportFolder: str):
            self._exportFolder = exportFolder
            if self._exportFolder:
                os.makedirs(self._exportFolder, exist_ok=True)
    
            client_credentials_manager = SpotifyClientCredentials(client_id=clientID, client_secret=clientSecret)
            self._spotify = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
    
        def backup_playlist(self, username: str, playlistID: str):
            playlist = self.__get_playlist(username, playlistID)
            tracks = self.__get_tracks(playlist)
    
            exportPath = self.__determine_export_path(playlist['name'])
    
            LOGGER.info(f'>>> Exporting tracks to "{exportPath}"...')
            self.__export_csv(exportPath, tracks)
            LOGGER.info('Exporting DONE')
    
        def __get_playlist(self, username: str, playlistID: str):
            LOGGER.info(f'>>> Fetching playlist with ID: {playlistID} by {username}...')
            identifier = f'spotify:user:{username}:playlist:{playlistID}'
            playlist = self._spotify.playlist(identifier)
            LOGGER.info(f'Found playlist "{playlist["name"]}"')
            return playlist
    
        def __get_tracks(self, playlist):
            tracks = playlist['tracks']
            results = tracks['items']
            while tracks['next']:
                tracks = self._spotify.next(tracks)
                results.extend(tracks['items'])
    
            LOGGER.info(f'Fetched {len(results)} tracks')
            return results
    
        def __determine_export_path(self, playlistName):
            for item in self.INVALID_CHARS:
                playlistName = playlistName.replace(item, self.REPLACE_CHAR)
            fileName = f'{datetime.strftime(datetime.now(), self.DATE_FORMAT)}_{playlistName}.csv'
            exportPath = os.path.join(self._exportFolder, fileName)
            return exportPath
    
        def __export_csv(self, exportPath, tracks):
            with open(exportPath, 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f, delimiter=';', quotechar='|', quoting=csv.QUOTE_MINIMAL)
                writer.writerow(('title', 'artists', 'album', 'addedBy', 'addedAt', 'url'))
    
                for track in tracks:
                    try:
                        trackInfo = track['track']
                        title = trackInfo['name']
                        artists = trackInfo['artists']
                        artists = ' & '.join(artist['name'] for artist in artists)
                        album = trackInfo['album']['name']
                        author = track['added_by']['id']
                        addedAt = track['added_at']
                        url = trackInfo['external_urls'].get('spotify', '')
    
                        writer.writerow((title, artists, album, author, addedAt, url))
                    except Exception:
                        LOGGER.exception(f'Error while exporting track "{track}"')
    
        def clean_old_exports(self, daysToKeep):
            if not daysToKeep:
                LOGGER.info('Skipping deletion of old files')
                return
    
            minimumDate = datetime.today() - timedelta(days=int(daysToKeep))
            LOGGER.info(f'>>> Deleting files older than {minimumDate} ({daysToKeep} days)')
    
            files = [file for file in sorted(os.listdir(self._exportFolder)) if
                     os.path.isfile(os.path.join(self._exportFolder, file))]
    
            for file in files:
                parts = file.split('_')
                creationDate = datetime.strptime(parts[0], self.DATE_FORMAT_SHORT)
                if creationDate < minimumDate:
                    LOGGER.info(f'Removing old file "{file}"')
                    os.remove(os.path.join(self._exportFolder, file))
    
    
    if __name__ == '__main__':
        with open('config/settings.json', 'r', encoding='utf-8') as f:
            SETTINGS = json.load(f)
    
        spotifyBackup = SpotifyBackup(SETTINGS['spotifyAPI']['clientID'],
                                      SETTINGS['spotifyAPI']['clientSecret'],
                                      SETTINGS['exportFolder'])
    
        spotifyBackup.clean_old_exports(SETTINGS['daysToKeep'])
    
        for playlist in SETTINGS['playlists']:
            spotifyBackup.backup_playlist(playlist['user'], playlist['id'])
    
        LOGGER.info('### DONE ###')