Skip to content
Snippets Groups Projects
Commit e20bb5df authored by Robert Goldmann's avatar Robert Goldmann
Browse files

SpotifyRecorder: record multiple playlists

parent 98bea529
No related branches found
No related tags found
No related merge requests found
import json import json
import os.path import os.path
import time import time
from typing import List, Dict
import click import click
import spotipy import spotipy
...@@ -25,28 +24,20 @@ class SpotifyRecorder: ...@@ -25,28 +24,20 @@ class SpotifyRecorder:
redirectUrl: str, redirectUrl: str,
openBrowser: bool, openBrowser: bool,
cacheFilePath: str, cacheFilePath: str,
playlist: Dict[str, str], playlists: list[dict[str, str | int]],
spotifyDeviceName: str, spotifyDeviceName: str,
audioDeviceName: str, audioDeviceName: str,
destinationFolder: str, destinationFolder: str):
startNumber: int = 0,
limit: int = -1):
self._clientID = clientID self._clientID = clientID
self._clientSecret = clientSecret self._clientSecret = clientSecret
self._redirectUrl = redirectUrl self._redirectUrl = redirectUrl
self._openBrowser = openBrowser self._openBrowser = openBrowser
self._cacheFilePath = cacheFilePath self._cacheFilePath = cacheFilePath
self._playlist = playlist self._playlists = playlists
self._spotifyDeviceName = spotifyDeviceName self._spotifyDeviceName = spotifyDeviceName
self._audioDeviceName = audioDeviceName self._audioDeviceName = audioDeviceName
self._destinationFolder = destinationFolder self._destinationFolder = destinationFolder
if startNumber <= 0:
raise ValueError(f'startNumber must be greater than 0')
self._startNumber = startNumber
self._limit = limit
os.makedirs(self._destinationFolder, exist_ok=True) os.makedirs(self._destinationFolder, exist_ok=True)
self._spotify = self.login() self._spotify = self.login()
...@@ -61,33 +52,45 @@ class SpotifyRecorder: ...@@ -61,33 +52,45 @@ class SpotifyRecorder:
return spotipy.Spotify(client_credentials_manager=client_credentials_manager) return spotipy.Spotify(client_credentials_manager=client_credentials_manager)
def run(self): def run(self):
LOGGER.info(f'Fetching all tracks for playlist {self._playlist["name"]}...') for playlist in self._playlists:
playlist = self.__get_playlist(self._playlist['user'], self._playlist['id']) self.__record_playlist(playlist)
def __record_playlist(self, playlistSettings: dict[str, str | int]):
LOGGER.info(f'Fetching all tracks for playlist "{playlistSettings["name"]}"...')
startNumber = playlistSettings['startNumber']
limit = playlistSettings['limit']
if startNumber <= 0:
raise ValueError(f'startNumber must be greater than 0')
playlist = self.__get_playlist(playlistSettings['user'], playlistSettings['id'])
tracks = self.__get_tracks(playlist) tracks = self.__get_tracks(playlist)
if self._limit == -1: if limit == -1:
LOGGER.info(f'Recording track #{self._startNumber} to end of playlist') LOGGER.info(f'Recording track #{startNumber} to end of playlist')
tracks = tracks[self._startNumber - 1:] tracks = tracks[startNumber - 1:]
else: else:
LOGGER.info(f'Recording track #{self._startNumber} to (including) #{self._startNumber + self._limit - 1}') LOGGER.info(f'Recording track #{startNumber} to (including) #{startNumber + limit - 1}')
tracks = tracks[self._startNumber - 1:self._startNumber - 1 + self._limit] tracks = tracks[startNumber - 1:startNumber - 1 + limit]
totalDurationInSeconds = sum([track['track']['duration_ms'] // 1000 for track in tracks if not track['is_local']]) totalDurationInSeconds = sum(
[track['track']['duration_ms'] // 1000 for track in tracks if not track['is_local']])
LOGGER.info(f'Total duration: {self.__convert_seconds_to_duration(totalDurationInSeconds)}') LOGGER.info(f'Total duration: {self.__convert_seconds_to_duration(totalDurationInSeconds)}')
if click.confirm('Do you want to start recording?', default=True): if click.confirm('Do you want to start recording?', default=True):
self.__record_tracks(tracks) destinationFolder = os.path.join(self._destinationFolder, playlistSettings['name'])
os.makedirs(destinationFolder, exist_ok=True)
self.__record_tracks(tracks, startNumber, destinationFolder)
else: else:
LOGGER.warning('Aborted') LOGGER.warning('Aborted')
def __get_playlist(self, username: str, playlistID: str) -> Dict: def __get_playlist(self, username: str, playlistID: str) -> dict:
LOGGER.info(f'Fetching playlist with ID: {playlistID} by {username}...') LOGGER.info(f'Fetching playlist with ID: {playlistID} by {username}...')
identifier = f'spotify:user:{username}:playlist:{playlistID}' identifier = f'spotify:user:{username}:playlist:{playlistID}'
playlist = self._spotify.playlist(identifier) playlist = self._spotify.playlist(identifier)
LOGGER.info(f'Found playlist "{playlist["name"]}"') LOGGER.info(f'Found playlist "{playlist["name"]}"')
return playlist return playlist
def __get_tracks(self, playlist) -> List[Dict]: def __get_tracks(self, playlist) -> list[dict]:
tracks = playlist['tracks'] tracks = playlist['tracks']
results = tracks['items'] results = tracks['items']
...@@ -98,7 +101,7 @@ class SpotifyRecorder: ...@@ -98,7 +101,7 @@ class SpotifyRecorder:
LOGGER.info(f'Found {len(results)} tracks in playlist') LOGGER.info(f'Found {len(results)} tracks in playlist')
return results return results
def __record_tracks(self, tracks: list): def __record_tracks(self, tracks: list, startNumber: int, destinationFolder: str):
deviceId = self.__get_device_id_by_name(self._spotifyDeviceName) deviceId = self.__get_device_id_by_name(self._spotifyDeviceName)
recordedTrackNumbers = [] recordedTrackNumbers = []
...@@ -107,7 +110,7 @@ class SpotifyRecorder: ...@@ -107,7 +110,7 @@ class SpotifyRecorder:
errorTrackNumbers = [] errorTrackNumbers = []
for index, track in enumerate(tracks): for index, track in enumerate(tracks):
indexInPlaylist = self._startNumber + index indexInPlaylist = startNumber + index
if track['is_local']: if track['is_local']:
# It's not possible to add a local track to a playlist using the web API. # It's not possible to add a local track to a playlist using the web API.
...@@ -119,7 +122,7 @@ class SpotifyRecorder: ...@@ -119,7 +122,7 @@ class SpotifyRecorder:
LOGGER.info(f'>>> Recording track {index + 1}/{len(tracks)}: ' LOGGER.info(f'>>> Recording track {index + 1}/{len(tracks)}: '
f'#{indexInPlaylist} "{track["track"]["name"]}"...') f'#{indexInPlaylist} "{track["track"]["name"]}"...')
try: try:
recordingCreated = self.__record_single_track(deviceId, indexInPlaylist, track) recordingCreated = self.__record_single_track(deviceId, indexInPlaylist, track, destinationFolder)
if recordingCreated: if recordingCreated:
recordedTrackNumbers.append(indexInPlaylist) recordedTrackNumbers.append(indexInPlaylist)
else: else:
...@@ -128,12 +131,13 @@ class SpotifyRecorder: ...@@ -128,12 +131,13 @@ class SpotifyRecorder:
LOGGER.error(f'An error occurred while recording track "{track["track"]["name"]}"', exc_info=e) LOGGER.error(f'An error occurred while recording track "{track["track"]["name"]}"', exc_info=e)
errorTrackNumbers.append(indexInPlaylist) errorTrackNumbers.append(indexInPlaylist)
self.__print_end_statistics(errorTrackNumbers, recordedTrackNumbers, skippedTrackNumbers, alreadyExistingTrackNumbers, len(tracks)) self.__print_end_statistics(errorTrackNumbers, recordedTrackNumbers, skippedTrackNumbers,
alreadyExistingTrackNumbers, len(tracks))
def __record_single_track(self, deviceId, indexInPlaylist, track) -> bool: def __record_single_track(self, deviceId, indexInPlaylist, track, destinationFolder: str) -> bool:
self.__stop_playback_if_playing(deviceId) self.__stop_playback_if_playing(deviceId)
fileName = self.__determine_file_name(indexInPlaylist, track) fileName = self.__determine_file_name(indexInPlaylist, track, destinationFolder)
filePathWav = f'{fileName}.wav' filePathWav = f'{fileName}.wav'
filePathMp3 = f'{fileName}.mp3' filePathMp3 = f'{fileName}.mp3'
...@@ -181,7 +185,7 @@ class SpotifyRecorder: ...@@ -181,7 +185,7 @@ class SpotifyRecorder:
f'{len(alreadyExistingTrackNumbers)} existing tracks skipped, ' f'{len(alreadyExistingTrackNumbers)} existing tracks skipped, '
f'{len(errorTrackNumbers)} errors ###') f'{len(errorTrackNumbers)} errors ###')
def __determine_file_name(self, index: int, track) -> str: def __determine_file_name(self, index: int, track, destinationFolder: str) -> str:
artists = self.__join_artists(track) artists = self.__join_artists(track)
fileName = f'{index} - {artists} - {track["track"]["name"]}' fileName = f'{index} - {artists} - {track["track"]["name"]}'
fileName = fileName.replace('/', '_') fileName = fileName.replace('/', '_')
...@@ -193,7 +197,7 @@ class SpotifyRecorder: ...@@ -193,7 +197,7 @@ class SpotifyRecorder:
fileName = fileName.replace('>', '_') fileName = fileName.replace('>', '_')
fileName = fileName.replace('<', '_') fileName = fileName.replace('<', '_')
fileName = fileName.replace('|', '_') fileName = fileName.replace('|', '_')
return os.path.join(self._destinationFolder, fileName) return os.path.join(destinationFolder, fileName)
@staticmethod @staticmethod
def __join_artists(track) -> str: def __join_artists(track) -> str:
...@@ -291,12 +295,11 @@ if __name__ == '__main__': ...@@ -291,12 +295,11 @@ if __name__ == '__main__':
SETTINGS['redirectUrl'], SETTINGS['redirectUrl'],
SETTINGS['openBrowser'], SETTINGS['openBrowser'],
SETTINGS['cacheFilePath'], SETTINGS['cacheFilePath'],
SETTINGS['playlist'], SETTINGS['playlists'],
SETTINGS['spotifyDeviceName'], SETTINGS['spotifyDeviceName'],
SETTINGS['audioDeviceName'], SETTINGS['audioDeviceName'],
SETTINGS['destinationFolder'], SETTINGS['destinationFolder'],
SETTINGS['startNumber'], )
SETTINGS['limit'], )
spotifyBackup.run() spotifyBackup.run()
......
...@@ -3,15 +3,18 @@ ...@@ -3,15 +3,18 @@
"clientID": "", "clientID": "",
"clientSecret": "" "clientSecret": ""
}, },
"playlist": { "playlists": [
{
"user": "", "user": "",
"id": "" "name": "",
}, "id": "",
"startNumber": 1,
"limit": -1
}
],
"spotifyDeviceName": "MYDEVICE", "spotifyDeviceName": "MYDEVICE",
"audioDeviceName": "3/4 - Musik (2- GIGAPort HD Audio driver) [Loopback]", "audioDeviceName": "3/4 - Musik (2- GIGAPort HD Audio driver) [Loopback]",
"destinationFolder": "", "destinationFolder": "",
"startNumber": 1,
"limit": -1,
"redirectUrl": "http://localhost:8080", "redirectUrl": "http://localhost:8080",
"openBrowser": false, "openBrowser": false,
"cacheFilePath": ".cache" "cacheFilePath": ".cache"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment