Newer
Older
import json
import os.path
import time
from typing import List, Dict
import spotipy
from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
from spotipy import SpotifyOAuth, CacheFileHandler
from SpotifyAudioRecorder import SpotifyAudioRecorder
LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyRecorder', logFormat=LOG_FORMAT)
class SpotifyRecorder:
_MAX_WAIT_TIME_TRACK_STARTING_IN_S = 10
_WAIT_TIME_TRACK_PLAYING_SHORT_IN_S = 1
_THRESHOLD_TRACK_END_IN_MS = 5000
def __init__(self, clientID: str,
clientSecret: str,
redirectUrl: str,
openBrowser: bool,
cacheFilePath: str,
playlist: Dict[str, str],
spotifyDeviceName: str,
audioDeviceName: str,
destinationFolder: str,
startNumber: int = 0,
limit: int = -1):
self._clientID = clientID
self._clientSecret = clientSecret
self._redirectUrl = redirectUrl
self._openBrowser = openBrowser
self._cacheFilePath = cacheFilePath
self._playlist = playlist
self._spotifyDeviceName = spotifyDeviceName
self._audioDeviceName = audioDeviceName
self._destinationFolder = destinationFolder
self._startNumber = startNumber
self._limit = limit
os.makedirs(self._destinationFolder, exist_ok=True)
self._spotify = self.login()
def login(self) -> spotipy.Spotify:
client_credentials_manager = SpotifyOAuth(client_id=self._clientID,
client_secret=self._clientSecret,
redirect_uri=self._redirectUrl,
scope='user-read-playback-state,user-modify-playback-state',
open_browser=self._openBrowser,
cache_handler=CacheFileHandler(cache_path=self._cacheFilePath))
return spotipy.Spotify(client_credentials_manager=client_credentials_manager)
def run(self):
LOGGER.info(f'Fetching all tracks for playlist {self._playlist["name"]}...')
playlist = self.__get_playlist(self._playlist['user'], self._playlist['id'])
tracks = self.__get_tracks(playlist)
self.__record_tracks(tracks)
def __get_playlist(self, username: str, playlistID: str) -> Dict:
LOGGER.info(f'Fetching playlist with ID: {playlistID} by {username}...')
identifier = f'spotify:user:{username}:playlist:{playlistID}'
playlist = self._spotify.playlist(identifier)
LOGGER.info(f'Found playlist "{playlist["name"]}"')
return playlist
def __get_tracks(self, playlist) -> List[Dict]:
tracks = playlist['tracks']
results = tracks['items']
while tracks['next']:
tracks = self._spotify.next(tracks)
results.extend(tracks['items'])
LOGGER.info(f'Found {len(tracks)} tracks in playlist')
return results
def __extract_track_uris(self, tracks: List) -> List[str]:
return [track['track']['uri'] for track in tracks]
def __record_tracks(self, tracks: list):
deviceId = self.__get_device_id_by_name(self._spotifyDeviceName)
recordedTrackNumbers = []
skippedTrackNumbers = []
errorTrackNumbers = []
if self._limit == -1:
LOGGER.info(f'Recording track #{self._startNumber} to all')
tracks = tracks[self._startNumber - 1:]
else:
LOGGER.info(f'Recording track #{self._startNumber} to (including) #{self._startNumber + self._limit - 1}')
tracks = tracks[self._startNumber - 1:self._startNumber - 1 + self._limit]
for index, track in enumerate(tracks):
indexInPlaylist = self._startNumber + index
if track['is_local']:
# It's not possible to add a local track to a playlist using the web API.
# https://github.com/plamere/spotipy/issues/793#issuecomment-1082421408
LOGGER.info(f'Skipping local track "{track["track"]["name"]}"')
skippedTrackNumbers.append(indexInPlaylist)
LOGGER.info(
f'>>> Recording track {index + 1}/{len(tracks)}: #{indexInPlaylist} "{track["track"]["name"]}"...')

Robert Goldmann
committed
self.__stop_playback_if_playing(deviceId)
filePath = self.__determine_file_path(indexInPlaylist + 1, track)
recorder = SpotifyAudioRecorder(self._audioDeviceName, filePath)
with recorder.record():
self.__play_track(deviceId, track['track']['uri'])
timeWaitedForPlaying = self.__wait_for_track_playing(track['track']['id'])
self.__wait_for_track_end(track, timeWaitedForPlaying)
recordedTrackNumbers.append(indexInPlaylist)
except Exception as e:
LOGGER.error(f'An error occurred while recording track "{track["track"]["name"]}"', exc_info=e)
errorTrackNumbers.append(indexInPlaylist)
LOGGER.info('### DONE ###')
LOGGER.info('>>> Skipped <<<')
for number in skippedTrackNumbers:
LOGGER.info(f'Skipped #{number}')
LOGGER.info('>>> Errors <<<')
for number in errorTrackNumbers:
LOGGER.info(f'Error #{number}')
LOGGER.info(
f'### {len(tracks)} tracks, {len(recordedTrackNumbers)} recorded, {len(skippedTrackNumbers)} skipped, {len(errorTrackNumbers)} errors ###')
def __determine_file_path(self, index: int, track) -> str:
artists = track['track']['artists']
artists = ' & '.join(artist['name'] for artist in artists)
fileName = f'{index} - {artists} - {track["track"]["name"]}.wav'
return os.path.join(self._destinationFolder, fileName)
def __wait_for_track_end(self, track, timeWaitedForPlaying: float) -> None:
trackDurationInMs = track['track']['duration_ms']
trackDurationInSeconds = trackDurationInMs // 1000
LOGGER.info(f'\tTrack duration: {self.__convert_seconds_to_duration(trackDurationInSeconds)}')
startTime = time.time()
while time.time() - startTime < trackDurationInSeconds:
currentPlayback = self._spotify.current_playback()
if currentPlayback['is_playing']:
remainingTimeInMs = trackDurationInMs - currentPlayback['progress_ms']
remainingTimeInSeconds = remainingTimeInMs // 1000
sleepTime = self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S
if remainingTimeInMs > self._THRESHOLD_TRACK_END_IN_MS:
sleepTime = remainingTimeInSeconds / 2
LOGGER.debug(f'\t\tWaiting for track to end (remaining: '
f'{self.__convert_seconds_to_duration(remainingTimeInSeconds)}, '
f'sleep: {self.__convert_seconds_to_duration(int(sleepTime))}s)...')
time.sleep(sleepTime)
else:
waitDuration = int(time.time() - startTime)
waitDuration += timeWaitedForPlaying
if waitDuration < trackDurationInSeconds - self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S:
raise RuntimeError(
f'Track finished too early (waited: {waitDuration}s, expected: {trackDurationInSeconds}s)')
if waitDuration > trackDurationInSeconds + self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S * 3:
raise RuntimeError(
f'Track finished too late (waited: {waitDuration}s, expected: {trackDurationInSeconds})')
LOGGER.info(f'Track finished. Waited {waitDuration}s, expected {trackDurationInSeconds}s, OK')
break
@staticmethod
def __convert_seconds_to_duration(seconds: int):
secs = seconds % 60
minutes = int(seconds / 60) % 60
hours = int(seconds / (60 * 60)) % 24
return f'{str(hours).zfill(2)}:{str(minutes).zfill(2)}:{str(secs).zfill(2)}'
def __get_device_id_by_name(self, deviceName: str) -> str:
devices = self._spotify.devices()['devices']
for device in devices:
if device['name'] == deviceName:
return device['id']
raise RuntimeError('Device not found')
def __play_track(self, deviceId: str, trackUri: str) -> None:
self._spotify.start_playback(device_id=deviceId, uris=[trackUri])

Robert Goldmann
committed
def __stop_playback_if_playing(self, deviceId: str) -> None:
if self._spotify.current_playback()['is_playing']:
self._spotify.pause_playback(device_id=deviceId)
def __wait_for_track_playing(self, expectedTrackId: str) -> float:
LOGGER.debug(f'\t\tWait for track to start playing...')
startTime = time.time()
while time.time() - startTime < self._MAX_WAIT_TIME_TRACK_STARTING_IN_S:
currentPlayback = self._spotify.current_playback()
if currentPlayback['is_playing']:
duration = time.time() - startTime
if currentPlayback['item']['id'] == expectedTrackId:
LOGGER.debug(f'\t\tTrack started playing after {duration:.1f}s')
break
else:
raise RuntimeError(f'Wrong track started playing (actual: {currentPlayback["item"]["id"]}, expected: {expectedTrackId})')
time.sleep(1)
if __name__ == '__main__':
with open('config/settings-recorder.json', 'r', encoding='utf-8') as f:
SETTINGS = json.load(f)
spotifyBackup = SpotifyRecorder(SETTINGS['spotifyAPI']['clientID'],
SETTINGS['spotifyAPI']['clientSecret'],
SETTINGS['redirectUrl'],
SETTINGS['openBrowser'],
SETTINGS['cacheFilePath'],
SETTINGS['playlist'],
SETTINGS['spotifyDeviceName'],
SETTINGS['audioDeviceName'],
SETTINGS['destinationFolder'],
SETTINGS['startNumber'],
SETTINGS['limit'], )
spotifyBackup.run()
LOGGER.info('### DONE ###')