Newer
Older

Robert Goldmann
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import json
from typing import Dict, List
import spotipy
from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
from spotipy.oauth2 import SpotifyPKCE
LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyAutoPlaylistCreator', logFormat=LOG_FORMAT)
class SpotifyAutoPlaylistCreator:
def __init__(self, clientID: str, clientSecret: str, playlistInfo: List[Dict[str, str]],
destinationPlaylistInfo: Dict[str, str], numberOfTracks: int):
self._clientID = clientID
self._clientSecret = clientSecret
self._playlistInfo = playlistInfo
self._destinationPlaylistInfo = destinationPlaylistInfo
self._numberOfTracks = numberOfTracks
self._spotify = self.login()
def login(self) -> spotipy.Spotify:
client_credentials_manager = SpotifyPKCE(client_id=self._clientID, redirect_uri='http://localhost:8080',
scope='playlist-modify-private,playlist-modify-public')
return spotipy.Spotify(client_credentials_manager=client_credentials_manager)
def run(self):
destinationPlaylist = self.__get_playlist(self._destinationPlaylistInfo['user'],
self._destinationPlaylistInfo['id'])
self.__CleanupDestinationPlaylist(destinationPlaylist)
LOGGER.info(f'>>> Fetching tracks for all source playlists...')
allTracks = []
for playlistInfo in SETTINGS['playlists']:
playlist = self.__get_playlist(playlistInfo['user'], playlistInfo['id'])
allTracks.extend(self.__get_tracks(playlist))
sortedTracks = sorted(allTracks, key=lambda d: d['added_at'], reverse=True)
LOGGER.info(f'>>> Collecting latest tracks (limit: {self._numberOfTracks})...')
latestTrackUris = self.__CollectLatestTracks(sortedTracks)
LOGGER.info(f'>>> Found {len(latestTrackUris)} latest tracks (limit: {self._numberOfTracks})')
LOGGER.info(f'>>> Adding tracks to destination playlist "{destinationPlaylist["name"]}"...')
self._spotify.playlist_add_items(self._destinationPlaylistInfo['id'], latestTrackUris)
def __CollectLatestTracks(self, sortedTracks):
tracksToAdd = []
for track in sortedTracks:
if len(tracksToAdd) >= self._numberOfTracks:
break
if track['is_local']:
LOGGER.info(f'Skipping local track "{track["track"]["name"]}"')
continue
LOGGER.info(f'Appending track "{track["track"]["name"]}"')
tracksToAdd.append(track)
return self.__extract_track_uris(tracksToAdd)
def __CleanupDestinationPlaylist(self, destinationPlaylist):
existingTracks = self.__get_tracks(destinationPlaylist)
LOGGER.info(f'>>> Removing {len(existingTracks)} tracks from destination '
f'playlist "{destinationPlaylist["name"]}"...')
existingTracksUris = self.__extract_track_uris(existingTracks)
self._spotify.playlist_remove_all_occurrences_of_items(self._destinationPlaylistInfo['id'], existingTracksUris)
def __get_playlist(self, username: str, playlistID: str) -> Dict:
LOGGER.info(f'>>> Fetching playlist with ID: {playlistID} by {username}...')
identifier = f'spotify:user:{username}:playlist:{playlistID}'
playlist = self._spotify.playlist(identifier)
LOGGER.info(f'Found playlist "{playlist["name"]}"')
return playlist
def __get_tracks(self, playlist) -> List[Dict]:
tracks = playlist['tracks']
results = tracks['items']
while tracks['next']:
tracks = self._spotify.next(tracks)
results.extend(tracks['items'])
LOGGER.info(f'Fetched {len(results)} tracks')
return results
def __extract_track_uris(self, tracks: List) -> List[str]:
return [track['track']['uri'] for track in tracks]
if __name__ == '__main__':
with open('config/settings-creator.json', 'r', encoding='utf-8') as f:
SETTINGS = json.load(f)
spotifyBackup = SpotifyAutoPlaylistCreator(SETTINGS['spotifyAPI']['clientID'],
SETTINGS['spotifyAPI']['clientSecret'],
SETTINGS['playlists'],
SETTINGS['destinationPlaylist'],
SETTINGS['numberOfTracks'])
spotifyBackup.run()
LOGGER.info('### DONE ###')