Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import json
import os.path
import time
from typing import List, Dict
import spotipy
from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
from spotipy import SpotifyOAuth, CacheFileHandler
from SpotifyAudioRecorder import SpotifyAudioRecorder
LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyRecorder', logFormat=LOG_FORMAT)
class SpotifyRecorder:
_MAX_WAIT_TIME_TRACK_STARTING_IN_S = 10
_WAIT_TIME_TRACK_PLAYING_SHORT_IN_S = 1
_THRESHOLD_TRACK_END_IN_MS = 5000
def __init__(self, clientID: str,
clientSecret: str,
redirectUrl: str,
openBrowser: bool,
cacheFilePath: str,
playlist: Dict[str, str],
spotifyDeviceName: str,
audioDeviceName: str,
destinationFolder: str):
self._clientID = clientID
self._clientSecret = clientSecret
self._redirectUrl = redirectUrl
self._openBrowser = openBrowser
self._cacheFilePath = cacheFilePath
self._playlist = playlist
self._spotifyDeviceName = spotifyDeviceName
self._audioDeviceName = audioDeviceName
self._destinationFolder = destinationFolder
os.makedirs(self._destinationFolder, exist_ok=True)
# TODO: options specify range (start / stop track) or only offset index + limit
self._spotify = self.login()
def login(self) -> spotipy.Spotify:
client_credentials_manager = SpotifyOAuth(client_id=self._clientID,
client_secret=self._clientSecret,
redirect_uri=self._redirectUrl,
scope='user-read-playback-state,user-modify-playback-state',
open_browser=self._openBrowser,
cache_handler=CacheFileHandler(cache_path=self._cacheFilePath))
return spotipy.Spotify(client_credentials_manager=client_credentials_manager)
def run(self):
LOGGER.info(f'>>> Fetching all tracks for playlist {self._playlist["name"]}...')
allTracks = []
playlist = self.__get_playlist(self._playlist['user'], self._playlist['id'])
allTracks.extend(self.__get_tracks(playlist))
LOGGER.info(f'>>> Found {len(allTracks)} tracks')
self.__record_tracks(allTracks)
def __get_playlist(self, username: str, playlistID: str) -> Dict:
LOGGER.info(f'>>> Fetching playlist with ID: {playlistID} by {username}...')
identifier = f'spotify:user:{username}:playlist:{playlistID}'
playlist = self._spotify.playlist(identifier)
LOGGER.info(f'Found playlist "{playlist["name"]}"')
return playlist
def __get_tracks(self, playlist) -> List[Dict]:
tracks = playlist['tracks']
results = tracks['items']
while tracks['next']:
tracks = self._spotify.next(tracks)
results.extend(tracks['items'])
LOGGER.info(f'Fetched {len(results)} tracks')
return results
def __extract_track_uris(self, tracks: List) -> List[str]:
return [track['track']['uri'] for track in tracks]
def __record_tracks(self, tracks: list):
deviceId = self.__get_device_id_by_name(self._spotifyDeviceName)
recordedTracks = []
skippedTracks = []
for index, track in enumerate(tracks[:2]):
if track['is_local']:
# TODO:
# It's not possible to add a local track to a playlist using the web API.
# https://github.com/plamere/spotipy/issues/793#issuecomment-1082421408
LOGGER.info(f'Skipping local track "{track["track"]["name"]}"')
skippedTracks.append(track['track']['name'])
continue
LOGGER.info(f'Recording track {index + 1}/{len(tracks)}: "{track["track"]["name"]}"...')
try:
filePath = self.__determine_file_path(index + 1, track)
recorder = SpotifyAudioRecorder(self._audioDeviceName, filePath)
with recorder.record():
self.__play_track(deviceId, track['track']['uri'])
self.__wait_for_track_playing()
self.__wait_for_track_end(track)
recordedTracks.append(track['track']['name'])
except Exception as e:
LOGGER.error(f'An error occurred while recording track "{track["track"]["name"]}"', exc_info=e)
errorTracks.append(track['track']['name'])
LOGGER.info('### DONE ###')
LOGGER.info(f'{len(tracks)} tracks, {len(recordedTracks)} recorded, {len(skippedTracks)} skipped, {len(errorTracks)} errors')
def __determine_file_path(self, index: int, track) -> str:
artists = track['track']['artists']
artists = ' & '.join(artist['name'] for artist in artists)
fileName = f'{index} - {artists} - {track["track"]["name"]}.wav'
return os.path.join(self._destinationFolder, fileName)
def __wait_for_track_end(self, track):
trackDurationInMs = track['track']['duration_ms']
trackDurationInSeconds = trackDurationInMs // 1000
LOGGER.info(f'Track duration: {self.__convert_seconds_to_duration(trackDurationInSeconds)}')
startTime = time.time()
while time.time() - startTime < trackDurationInSeconds:
currentPlayback = self._spotify.current_playback()
if currentPlayback['is_playing']:
remainingTimeInMs = trackDurationInMs - currentPlayback['progress_ms']
remainingTimeInSeconds = remainingTimeInMs // 1000
sleepTime = self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S
if remainingTimeInMs > self._THRESHOLD_TRACK_END_IN_MS:
sleepTime = remainingTimeInSeconds / 2
LOGGER.debug(f'Waiting for track to end (remaining: '
f'{self.__convert_seconds_to_duration(remainingTimeInSeconds)}, '
f'sleep: {self.__convert_seconds_to_duration(sleepTime)}s)...')
time.sleep(sleepTime)
else:
waitDuration = int(time.time() - startTime)
if waitDuration < trackDurationInSeconds:
raise RuntimeError(
f'Track finished too early (waited: {waitDuration}s, expected: {trackDurationInSeconds}s)')
if waitDuration > trackDurationInSeconds + self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S * 3:
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
raise RuntimeError(
f'Track finished too late (waited: {waitDuration}s, expected: {trackDurationInSeconds})')
LOGGER.debug(f'Track finished. Waited {waitDuration}s, expected {trackDurationInSeconds}s, OK')
break
@staticmethod
def __convert_seconds_to_duration(seconds: int):
secs = seconds % 60
minutes = int(seconds / 60) % 60
hours = int(seconds / (60 * 60)) % 24
return f'{str(hours).zfill(2)}:{str(minutes).zfill(2)}:{str(secs).zfill(2)}'
def __get_device_id_by_name(self, deviceName: str) -> str:
devices = self._spotify.devices()['devices']
for device in devices:
if device['name'] == deviceName:
return device['id']
raise RuntimeError('Device not found')
def __play_track(self, deviceId: str, trackUri: str):
self._spotify.start_playback(device_id=deviceId, uris=[trackUri])
def __wait_for_track_playing(self) -> None:
LOGGER.debug(f'Wait for track to start playing...')
startTime = time.time()
while time.time() - startTime < self._MAX_WAIT_TIME_TRACK_STARTING_IN_S:
if self._spotify.current_playback()['is_playing']:
LOGGER.debug(f'Track started playing after {time.time() - startTime:.1f}s')
break
time.sleep(1)
if __name__ == '__main__':
with open('config/settings-recorder.json', 'r', encoding='utf-8') as f:
SETTINGS = json.load(f)
spotifyBackup = SpotifyRecorder(SETTINGS['spotifyAPI']['clientID'],
SETTINGS['spotifyAPI']['clientSecret'],
SETTINGS['redirectUrl'],
SETTINGS['openBrowser'],
SETTINGS['cacheFilePath'],
SETTINGS['playlist'],
SETTINGS['spotifyDeviceName'],
SETTINGS['audioDeviceName'],
SETTINGS['destinationFolder'])
spotifyBackup.run()
LOGGER.info('### DONE ###')