import json import os.path import time from typing import List, Dict import spotipy from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger from spotipy import SpotifyOAuth, CacheFileHandler from SpotifyAudioRecorder import SpotifyAudioRecorder LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s' LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyRecorder', logFormat=LOG_FORMAT) class SpotifyRecorder: _MAX_WAIT_TIME_TRACK_STARTING_IN_S = 10 _WAIT_TIME_TRACK_PLAYING_SHORT_IN_S = 1 _THRESHOLD_TRACK_END_IN_MS = 5000 def __init__(self, clientID: str, clientSecret: str, redirectUrl: str, openBrowser: bool, cacheFilePath: str, playlist: Dict[str, str], spotifyDeviceName: str, audioDeviceName: str, destinationFolder: str, startNumber: int = 0, limit: int = -1): self._clientID = clientID self._clientSecret = clientSecret self._redirectUrl = redirectUrl self._openBrowser = openBrowser self._cacheFilePath = cacheFilePath self._playlist = playlist self._spotifyDeviceName = spotifyDeviceName self._audioDeviceName = audioDeviceName self._destinationFolder = destinationFolder self._startNumber = startNumber self._limit = limit os.makedirs(self._destinationFolder, exist_ok=True) self._spotify = self.login() def login(self) -> spotipy.Spotify: client_credentials_manager = SpotifyOAuth(client_id=self._clientID, client_secret=self._clientSecret, redirect_uri=self._redirectUrl, scope='user-read-playback-state,user-modify-playback-state', open_browser=self._openBrowser, cache_handler=CacheFileHandler(cache_path=self._cacheFilePath)) return spotipy.Spotify(client_credentials_manager=client_credentials_manager) def run(self): LOGGER.info(f'Fetching all tracks for playlist {self._playlist["name"]}...') playlist = self.__get_playlist(self._playlist['user'], self._playlist['id']) tracks = self.__get_tracks(playlist) self.__record_tracks(tracks) def __get_playlist(self, username: str, playlistID: str) -> Dict: LOGGER.info(f'Fetching playlist with ID: {playlistID} by {username}...') identifier = f'spotify:user:{username}:playlist:{playlistID}' playlist = self._spotify.playlist(identifier) LOGGER.info(f'Found playlist "{playlist["name"]}"') return playlist def __get_tracks(self, playlist) -> List[Dict]: tracks = playlist['tracks'] results = tracks['items'] while tracks['next']: tracks = self._spotify.next(tracks) results.extend(tracks['items']) LOGGER.info(f'Found {len(tracks)} tracks in playlist') return results def __extract_track_uris(self, tracks: List) -> List[str]: return [track['track']['uri'] for track in tracks] def __record_tracks(self, tracks: list): deviceId = self.__get_device_id_by_name(self._spotifyDeviceName) recordedTrackNumbers = [] skippedTrackNumbers = [] errorTrackNumbers = [] if self._limit == -1: LOGGER.info(f'Recording track #{self._startNumber} to all') tracks = tracks[self._startNumber - 1:] else: LOGGER.info(f'Recording track #{self._startNumber} to (including) #{self._startNumber + self._limit - 1}') tracks = tracks[self._startNumber - 1:self._startNumber - 1 + self._limit] for index, track in enumerate(tracks): indexInPlaylist = self._startNumber + index if track['is_local']: # It's not possible to add a local track to a playlist using the web API. # https://github.com/plamere/spotipy/issues/793#issuecomment-1082421408 LOGGER.info(f'Skipping local track "{track["track"]["name"]}"') skippedTrackNumbers.append(indexInPlaylist) continue LOGGER.info( f'>>> Recording track {index + 1}/{len(tracks)}: #{indexInPlaylist} "{track["track"]["name"]}"...') try: self.__stop_playback_if_playing(deviceId) filePath = self.__determine_file_path(indexInPlaylist + 1, track) recorder = SpotifyAudioRecorder(self._audioDeviceName, filePath) with recorder.record(): self.__play_track(deviceId, track['track']['uri']) timeWaitedForPlaying = self.__wait_for_track_playing(track['track']['id']) self.__wait_for_track_end(track, timeWaitedForPlaying) recordedTrackNumbers.append(indexInPlaylist) except Exception as e: LOGGER.error(f'An error occurred while recording track "{track["track"]["name"]}"', exc_info=e) errorTrackNumbers.append(indexInPlaylist) LOGGER.info('### DONE ###') LOGGER.info('>>> Skipped <<<') for number in skippedTrackNumbers: LOGGER.info(f'Skipped #{number}') LOGGER.info('>>> Errors <<<') for number in errorTrackNumbers: LOGGER.info(f'Error #{number}') LOGGER.info( f'### {len(tracks)} tracks, {len(recordedTrackNumbers)} recorded, {len(skippedTrackNumbers)} skipped, {len(errorTrackNumbers)} errors ###') def __determine_file_path(self, index: int, track) -> str: artists = track['track']['artists'] artists = ' & '.join(artist['name'] for artist in artists) fileName = f'{index} - {artists} - {track["track"]["name"]}.wav' return os.path.join(self._destinationFolder, fileName) def __wait_for_track_end(self, track, timeWaitedForPlaying: float) -> None: trackDurationInMs = track['track']['duration_ms'] trackDurationInSeconds = trackDurationInMs // 1000 LOGGER.info(f'\tTrack duration: {self.__convert_seconds_to_duration(trackDurationInSeconds)}') startTime = time.time() while time.time() - startTime < trackDurationInSeconds: currentPlayback = self._spotify.current_playback() if currentPlayback['is_playing']: remainingTimeInMs = trackDurationInMs - currentPlayback['progress_ms'] remainingTimeInSeconds = remainingTimeInMs // 1000 sleepTime = self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S if remainingTimeInMs > self._THRESHOLD_TRACK_END_IN_MS: sleepTime = remainingTimeInSeconds / 2 LOGGER.debug(f'\t\tWaiting for track to end (remaining: ' f'{self.__convert_seconds_to_duration(remainingTimeInSeconds)}, ' f'sleep: {self.__convert_seconds_to_duration(int(sleepTime))}s)...') time.sleep(sleepTime) else: waitDuration = int(time.time() - startTime) waitDuration += timeWaitedForPlaying if waitDuration < trackDurationInSeconds - self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S: raise RuntimeError( f'Track finished too early (waited: {waitDuration}s, expected: {trackDurationInSeconds}s)') if waitDuration > trackDurationInSeconds + self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S * 3: raise RuntimeError( f'Track finished too late (waited: {waitDuration}s, expected: {trackDurationInSeconds})') LOGGER.info(f'Track finished. Waited {waitDuration}s, expected {trackDurationInSeconds}s, OK') break @staticmethod def __convert_seconds_to_duration(seconds: int): secs = seconds % 60 minutes = int(seconds / 60) % 60 hours = int(seconds / (60 * 60)) % 24 return f'{str(hours).zfill(2)}:{str(minutes).zfill(2)}:{str(secs).zfill(2)}' def __get_device_id_by_name(self, deviceName: str) -> str: devices = self._spotify.devices()['devices'] for device in devices: if device['name'] == deviceName: return device['id'] raise RuntimeError('Device not found') def __play_track(self, deviceId: str, trackUri: str) -> None: self._spotify.start_playback(device_id=deviceId, uris=[trackUri]) def __stop_playback_if_playing(self, deviceId: str) -> None: if self._spotify.current_playback()['is_playing']: self._spotify.pause_playback(device_id=deviceId) def __wait_for_track_playing(self, expectedTrackId: str) -> float: LOGGER.debug(f'\t\tWait for track to start playing...') startTime = time.time() duration = 0 while time.time() - startTime < self._MAX_WAIT_TIME_TRACK_STARTING_IN_S: currentPlayback = self._spotify.current_playback() if currentPlayback['is_playing']: duration = time.time() - startTime if currentPlayback['item']['id'] == expectedTrackId: LOGGER.debug(f'\t\tTrack started playing after {duration:.1f}s') break else: raise RuntimeError(f'Wrong track started playing (actual: {currentPlayback["item"]["id"]}, expected: {expectedTrackId})') time.sleep(1) return duration if __name__ == '__main__': with open('config/settings-recorder.json', 'r', encoding='utf-8') as f: SETTINGS = json.load(f) spotifyBackup = SpotifyRecorder(SETTINGS['spotifyAPI']['clientID'], SETTINGS['spotifyAPI']['clientSecret'], SETTINGS['redirectUrl'], SETTINGS['openBrowser'], SETTINGS['cacheFilePath'], SETTINGS['playlist'], SETTINGS['spotifyDeviceName'], SETTINGS['audioDeviceName'], SETTINGS['destinationFolder'], SETTINGS['startNumber'], SETTINGS['limit'], ) spotifyBackup.run() LOGGER.info('### DONE ###')