Skip to content
Snippets Groups Projects
SpotifyRecorder.py 10.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • import json
    import os.path
    import time
    from typing import List, Dict
    
    import spotipy
    from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
    from spotipy import SpotifyOAuth, CacheFileHandler
    
    from SpotifyAudioRecorder import SpotifyAudioRecorder
    
    LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
    LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyRecorder', logFormat=LOG_FORMAT)
    
    
    class SpotifyRecorder:
        _MAX_WAIT_TIME_TRACK_STARTING_IN_S = 10
        _WAIT_TIME_TRACK_PLAYING_SHORT_IN_S = 1
        _THRESHOLD_TRACK_END_IN_MS = 5000
    
        def __init__(self, clientID: str,
                     clientSecret: str,
                     redirectUrl: str,
                     openBrowser: bool,
                     cacheFilePath: str,
                     playlist: Dict[str, str],
                     spotifyDeviceName: str,
                     audioDeviceName: str,
    
                     destinationFolder: str,
                     startNumber: int = 0,
                     limit: int = -1):
    
            self._clientID = clientID
            self._clientSecret = clientSecret
            self._redirectUrl = redirectUrl
            self._openBrowser = openBrowser
            self._cacheFilePath = cacheFilePath
            self._playlist = playlist
            self._spotifyDeviceName = spotifyDeviceName
            self._audioDeviceName = audioDeviceName
            self._destinationFolder = destinationFolder
    
            self._startNumber = startNumber
            self._limit = limit
    
    
            os.makedirs(self._destinationFolder, exist_ok=True)
    
            self._spotify = self.login()
    
        def login(self) -> spotipy.Spotify:
            client_credentials_manager = SpotifyOAuth(client_id=self._clientID,
                                                      client_secret=self._clientSecret,
                                                      redirect_uri=self._redirectUrl,
                                                      scope='user-read-playback-state,user-modify-playback-state',
                                                      open_browser=self._openBrowser,
                                                      cache_handler=CacheFileHandler(cache_path=self._cacheFilePath))
            return spotipy.Spotify(client_credentials_manager=client_credentials_manager)
    
        def run(self):
    
            LOGGER.info(f'Fetching all tracks for playlist {self._playlist["name"]}...')
    
            playlist = self.__get_playlist(self._playlist['user'], self._playlist['id'])
    
            tracks = self.__get_tracks(playlist)
    
    
        def __get_playlist(self, username: str, playlistID: str) -> Dict:
    
            LOGGER.info(f'Fetching playlist with ID: {playlistID} by {username}...')
    
            identifier = f'spotify:user:{username}:playlist:{playlistID}'
            playlist = self._spotify.playlist(identifier)
            LOGGER.info(f'Found playlist "{playlist["name"]}"')
            return playlist
    
        def __get_tracks(self, playlist) -> List[Dict]:
            tracks = playlist['tracks']
            results = tracks['items']
    
            while tracks['next']:
                tracks = self._spotify.next(tracks)
                results.extend(tracks['items'])
    
    
            LOGGER.info(f'Found {len(tracks)} tracks in playlist')
    
            return results
    
        def __extract_track_uris(self, tracks: List) -> List[str]:
            return [track['track']['uri'] for track in tracks]
    
        def __record_tracks(self, tracks: list):
            deviceId = self.__get_device_id_by_name(self._spotifyDeviceName)
    
    
            recordedTrackNumbers = []
            skippedTrackNumbers = []
            errorTrackNumbers = []
    
    
            if self._limit == -1:
                LOGGER.info(f'Recording track #{self._startNumber} to all')
    
                tracks = tracks[self._startNumber - 1:]
    
            else:
                LOGGER.info(f'Recording track #{self._startNumber} to (including) #{self._startNumber + self._limit - 1}')
    
                tracks = tracks[self._startNumber - 1:self._startNumber - 1 + self._limit]
    
    
            for index, track in enumerate(tracks):
    
                indexInPlaylist = self._startNumber + index
    
    
                if track['is_local']:
                    # It's not possible to add a local track to a playlist using the web API.
                    # https://github.com/plamere/spotipy/issues/793#issuecomment-1082421408
                    LOGGER.info(f'Skipping local track "{track["track"]["name"]}"')
    
                    skippedTrackNumbers.append(indexInPlaylist)
    
                LOGGER.info(
                    f'>>> Recording track {index + 1}/{len(tracks)}: #{indexInPlaylist} "{track["track"]["name"]}"...')
    
                    filePath = self.__determine_file_path(indexInPlaylist + 1, track)
    
                    recorder = SpotifyAudioRecorder(self._audioDeviceName, filePath)
    
                    with recorder.record():
                        self.__play_track(deviceId, track['track']['uri'])
    
                        timeWaitedForPlaying = self.__wait_for_track_playing(track['track']['id'])
    
                        self.__wait_for_track_end(track, timeWaitedForPlaying)
    
                    recordedTrackNumbers.append(indexInPlaylist)
    
                except Exception as e:
                    LOGGER.error(f'An error occurred while recording track "{track["track"]["name"]}"', exc_info=e)
    
                    errorTrackNumbers.append(indexInPlaylist)
    
    
            LOGGER.info('>>> Skipped <<<')
            for number in skippedTrackNumbers:
                LOGGER.info(f'Skipped #{number}')
    
            LOGGER.info('>>> Errors <<<')
            for number in errorTrackNumbers:
                LOGGER.info(f'Error #{number}')
    
            LOGGER.info(
                f'### {len(tracks)} tracks, {len(recordedTrackNumbers)} recorded, {len(skippedTrackNumbers)} skipped, {len(errorTrackNumbers)} errors ###')
    
    
        def __determine_file_path(self, index: int, track) -> str:
            artists = track['track']['artists']
            artists = ' & '.join(artist['name'] for artist in artists)
            fileName = f'{index} - {artists} - {track["track"]["name"]}.wav'
            return os.path.join(self._destinationFolder, fileName)
    
    
        def __wait_for_track_end(self, track, timeWaitedForPlaying: float) -> None:
    
            trackDurationInMs = track['track']['duration_ms']
            trackDurationInSeconds = trackDurationInMs // 1000
    
            LOGGER.info(f'\tTrack duration: {self.__convert_seconds_to_duration(trackDurationInSeconds)}')
    
    
            startTime = time.time()
    
            while time.time() - startTime < trackDurationInSeconds:
                currentPlayback = self._spotify.current_playback()
                if currentPlayback['is_playing']:
                    remainingTimeInMs = trackDurationInMs - currentPlayback['progress_ms']
    
                    remainingTimeInSeconds = remainingTimeInMs // 1000
    
                    sleepTime = self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S
    
                    if remainingTimeInMs > self._THRESHOLD_TRACK_END_IN_MS:
                        sleepTime = remainingTimeInSeconds / 2
    
    
                    LOGGER.debug(f'\t\tWaiting for track to end (remaining: '
    
                                 f'{self.__convert_seconds_to_duration(remainingTimeInSeconds)}, '
    
                                 f'sleep: {self.__convert_seconds_to_duration(int(sleepTime))}s)...')
    
                else:
                    waitDuration = int(time.time() - startTime)
    
                    waitDuration += timeWaitedForPlaying
    
                    if waitDuration < trackDurationInSeconds - self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S:
    
                        raise RuntimeError(
                            f'Track finished too early (waited: {waitDuration}s, expected: {trackDurationInSeconds}s)')
    
                    if waitDuration > trackDurationInSeconds + self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S * 3:
    
                        raise RuntimeError(
                            f'Track finished too late (waited: {waitDuration}s, expected: {trackDurationInSeconds})')
    
    
                    LOGGER.info(f'Track finished. Waited {waitDuration}s, expected {trackDurationInSeconds}s, OK')
    
                    break
    
        @staticmethod
        def __convert_seconds_to_duration(seconds: int):
            secs = seconds % 60
            minutes = int(seconds / 60) % 60
            hours = int(seconds / (60 * 60)) % 24
            return f'{str(hours).zfill(2)}:{str(minutes).zfill(2)}:{str(secs).zfill(2)}'
    
        def __get_device_id_by_name(self, deviceName: str) -> str:
            devices = self._spotify.devices()['devices']
            for device in devices:
                if device['name'] == deviceName:
                    return device['id']
    
            raise RuntimeError('Device not found')
    
    
        def __play_track(self, deviceId: str, trackUri: str) -> None:
    
            self._spotify.start_playback(device_id=deviceId, uris=[trackUri])
    
    
        def __stop_playback_if_playing(self, deviceId: str) -> None:
            if self._spotify.current_playback()['is_playing']:
                self._spotify.pause_playback(device_id=deviceId)
    
    
        def __wait_for_track_playing(self, expectedTrackId: str) -> float:
    
            LOGGER.debug(f'\t\tWait for track to start playing...')
    
            while time.time() - startTime < self._MAX_WAIT_TIME_TRACK_STARTING_IN_S:
    
                currentPlayback = self._spotify.current_playback()
                if currentPlayback['is_playing']:
    
                    duration = time.time() - startTime
    
                    if currentPlayback['item']['id'] == expectedTrackId:
                        LOGGER.debug(f'\t\tTrack started playing after {duration:.1f}s')
                        break
                    else:
    
                        raise RuntimeError(f'Wrong track started playing (actual: {currentPlayback["item"]["id"]}, expected: {expectedTrackId})')
    
    
    if __name__ == '__main__':
        with open('config/settings-recorder.json', 'r', encoding='utf-8') as f:
            SETTINGS = json.load(f)
    
        spotifyBackup = SpotifyRecorder(SETTINGS['spotifyAPI']['clientID'],
                                        SETTINGS['spotifyAPI']['clientSecret'],
                                        SETTINGS['redirectUrl'],
                                        SETTINGS['openBrowser'],
                                        SETTINGS['cacheFilePath'],
                                        SETTINGS['playlist'],
                                        SETTINGS['spotifyDeviceName'],
                                        SETTINGS['audioDeviceName'],
    
                                        SETTINGS['destinationFolder'],
                                        SETTINGS['startNumber'],
    
    
        spotifyBackup.run()
    
        LOGGER.info('### DONE ###')