Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import json
import os.path
import time
from typing import List, Dict
import spotipy
from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
from spotipy import SpotifyOAuth, CacheFileHandler
from SpotifyAudioRecorder import SpotifyAudioRecorder
LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyRecorder', logFormat=LOG_FORMAT)
class SpotifyRecorder:
_MAX_WAIT_TIME_TRACK_STARTING_IN_S = 10
_WAIT_TIME_TRACK_PLAYING_SHORT_IN_S = 1
_THRESHOLD_TRACK_END_IN_MS = 5000
def __init__(self, clientID: str,
clientSecret: str,
redirectUrl: str,
openBrowser: bool,
cacheFilePath: str,
playlist: Dict[str, str],
spotifyDeviceName: str,
audioDeviceName: str,
destinationFolder: str):
self._clientID = clientID
self._clientSecret = clientSecret
self._redirectUrl = redirectUrl
self._openBrowser = openBrowser
self._cacheFilePath = cacheFilePath
self._playlist = playlist
self._spotifyDeviceName = spotifyDeviceName
self._audioDeviceName = audioDeviceName
self._destinationFolder = destinationFolder
os.makedirs(self._destinationFolder, exist_ok=True)
# TODO: options specify range (start / stop track) or only offset index + limit
self._spotify = self.login()
def login(self) -> spotipy.Spotify:
client_credentials_manager = SpotifyOAuth(client_id=self._clientID,
client_secret=self._clientSecret,
redirect_uri=self._redirectUrl,
scope='user-read-playback-state,user-modify-playback-state',
open_browser=self._openBrowser,
cache_handler=CacheFileHandler(cache_path=self._cacheFilePath))
return spotipy.Spotify(client_credentials_manager=client_credentials_manager)
def run(self):
LOGGER.info(f'>>> Fetching all tracks for playlist {self._playlist["name"]}...')
allTracks = []
playlist = self.__get_playlist(self._playlist['user'], self._playlist['id'])
allTracks.extend(self.__get_tracks(playlist))
LOGGER.info(f'>>> Found {len(allTracks)} tracks')
self.__record_tracks(allTracks)
def __get_playlist(self, username: str, playlistID: str) -> Dict:
LOGGER.info(f'>>> Fetching playlist with ID: {playlistID} by {username}...')
identifier = f'spotify:user:{username}:playlist:{playlistID}'
playlist = self._spotify.playlist(identifier)
LOGGER.info(f'Found playlist "{playlist["name"]}"')
return playlist
def __get_tracks(self, playlist) -> List[Dict]:
tracks = playlist['tracks']
results = tracks['items']
while tracks['next']:
tracks = self._spotify.next(tracks)
results.extend(tracks['items'])
LOGGER.info(f'Fetched {len(results)} tracks')
return results
def __extract_track_uris(self, tracks: List) -> List[str]:
return [track['track']['uri'] for track in tracks]
def __record_tracks(self, tracks: list):
deviceId = self.__get_device_id_by_name(self._spotifyDeviceName)
recordedTracks = []
skippedTracks = []
for index, track in enumerate(tracks[:2]):
if track['is_local']:
# TODO:
# It's not possible to add a local track to a playlist using the web API.
# https://github.com/plamere/spotipy/issues/793#issuecomment-1082421408
LOGGER.info(f'Skipping local track "{track["track"]["name"]}"')
skippedTracks.append(track['track']['name'])
continue
LOGGER.info(f'Recording track {index + 1}/{len(tracks)}: "{track["track"]["name"]}"...')
try:
filePath = self.__determine_file_path(index + 1, track)
recorder = SpotifyAudioRecorder(self._audioDeviceName, filePath)
with recorder.record():
self.__play_track(deviceId, track['track']['uri'])
timeWaitedForPlaying = self.__wait_for_track_playing()
self.__wait_for_track_end(track, timeWaitedForPlaying)
recordedTracks.append(track['track']['name'])
except Exception as e:
LOGGER.error(f'An error occurred while recording track "{track["track"]["name"]}"', exc_info=e)
errorTracks.append(track['track']['name'])
LOGGER.info('### DONE ###')
LOGGER.info(f'{len(tracks)} tracks, {len(recordedTracks)} recorded, {len(skippedTracks)} skipped, {len(errorTracks)} errors')
def __determine_file_path(self, index: int, track) -> str:
artists = track['track']['artists']
artists = ' & '.join(artist['name'] for artist in artists)
fileName = f'{index} - {artists} - {track["track"]["name"]}.wav'
return os.path.join(self._destinationFolder, fileName)
def __wait_for_track_end(self, track, timeWaitedForPlaying: float) -> None:
trackDurationInMs = track['track']['duration_ms']
trackDurationInSeconds = trackDurationInMs // 1000
LOGGER.info(f'Track duration: {self.__convert_seconds_to_duration(trackDurationInSeconds)}')
startTime = time.time()
while time.time() - startTime < trackDurationInSeconds:
currentPlayback = self._spotify.current_playback()
if currentPlayback['is_playing']:
remainingTimeInMs = trackDurationInMs - currentPlayback['progress_ms']
remainingTimeInSeconds = remainingTimeInMs // 1000
sleepTime = self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S
if remainingTimeInMs > self._THRESHOLD_TRACK_END_IN_MS:
sleepTime = remainingTimeInSeconds / 2
LOGGER.debug(f'Waiting for track to end (remaining: '
f'{self.__convert_seconds_to_duration(remainingTimeInSeconds)}, '
f'sleep: {self.__convert_seconds_to_duration(int(sleepTime))}s)...')
time.sleep(sleepTime)
else:
waitDuration = int(time.time() - startTime)
waitDuration += timeWaitedForPlaying
if waitDuration < trackDurationInSeconds - self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S:
raise RuntimeError(
f'Track finished too early (waited: {waitDuration}s, expected: {trackDurationInSeconds}s)')
if waitDuration > trackDurationInSeconds + self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S * 3:
raise RuntimeError(
f'Track finished too late (waited: {waitDuration}s, expected: {trackDurationInSeconds})')
LOGGER.info(f'Track finished. Waited {waitDuration}s, expected {trackDurationInSeconds}s, OK')
break
@staticmethod
def __convert_seconds_to_duration(seconds: int):
secs = seconds % 60
minutes = int(seconds / 60) % 60
hours = int(seconds / (60 * 60)) % 24
return f'{str(hours).zfill(2)}:{str(minutes).zfill(2)}:{str(secs).zfill(2)}'
def __get_device_id_by_name(self, deviceName: str) -> str:
devices = self._spotify.devices()['devices']
for device in devices:
if device['name'] == deviceName:
return device['id']
raise RuntimeError('Device not found')
def __play_track(self, deviceId: str, trackUri: str):
self._spotify.start_playback(device_id=deviceId, uris=[trackUri])
def __wait_for_track_playing(self) -> float:
LOGGER.debug(f'Wait for track to start playing...')
startTime = time.time()
while time.time() - startTime < self._MAX_WAIT_TIME_TRACK_STARTING_IN_S:
if self._spotify.current_playback()['is_playing']:
duration = time.time() - startTime
LOGGER.debug(f'Track started playing after {duration:.1f}s')
break
time.sleep(1)
if __name__ == '__main__':
with open('config/settings-recorder.json', 'r', encoding='utf-8') as f:
SETTINGS = json.load(f)
spotifyBackup = SpotifyRecorder(SETTINGS['spotifyAPI']['clientID'],
SETTINGS['spotifyAPI']['clientSecret'],
SETTINGS['redirectUrl'],
SETTINGS['openBrowser'],
SETTINGS['cacheFilePath'],
SETTINGS['playlist'],
SETTINGS['spotifyDeviceName'],
SETTINGS['audioDeviceName'],
SETTINGS['destinationFolder'])
spotifyBackup.run()
LOGGER.info('### DONE ###')