Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • master
  • v1.0.0
  • v1.1.0
  • v1.10.0
  • v1.11.0
  • v1.12.0
  • v1.12.1
  • v1.12.2
8 results

Target

Select target project
  • deadlocker8/SpotifyBackup
1 result
Select Git revision
  • master
  • v1.0.0
  • v1.1.0
  • v1.10.0
  • v1.11.0
  • v1.12.0
  • v1.12.1
  • v1.12.2
8 results
Show changes
Commits on Source (5)
......@@ -2,6 +2,9 @@ exports/
config/settings.json
config/settings-import.json
config/settings-creator.json
config/settings-recorder.json
.cache
.cache_creator
.cache_recorder
recorder/
import time
import wave
from contextlib import contextmanager
import pyaudiowpatch as pyaudio
from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyAudioRecorder', logFormat=LOG_FORMAT)
class SpotifyAudioRecorder:
_CHUNK_SIZE = 512
def __init__(self, deviceName: str, destinationFilePath: str):
self._deviceName = deviceName
self._destinationFilePath = destinationFilePath
def __get_device_by_name(self, pyaudioInstance, deviceName: str):
deviceNames = []
for loopback in pyaudioInstance.get_loopback_device_info_generator():
deviceNames.append(loopback['name'])
if deviceName in loopback['name']:
return loopback
else:
message = f'Device with name {self._deviceName} not found.'
message += f'\nAvailable devices:'
for deviceName in deviceNames:
message += f'\n\t{deviceName}'
raise RuntimeError(message)
@contextmanager
def record(self):
with pyaudio.PyAudio() as pyaudioInstance:
device = self.__get_device_by_name(pyaudioInstance, self._deviceName)
LOGGER.info(f'Recording from: ({device["index"]}) {device["name"]}')
waveFile = wave.open(self._destinationFilePath, 'wb')
waveFile.setnchannels(device['maxInputChannels'])
waveFile.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
waveFile.setframerate(int(device['defaultSampleRate']))
def callback(in_data, frame_count, time_info, status):
waveFile.writeframes(in_data)
return in_data, pyaudio.paContinue
with pyaudioInstance.open(format=pyaudio.paInt16,
channels=device['maxInputChannels'],
rate=int(device['defaultSampleRate']),
frames_per_buffer=self._CHUNK_SIZE,
input=True,
input_device_index=device['index'],
stream_callback=callback
) as stream:
yield stream
waveFile.close()
LOGGER.info(f'Recording stopped. File: "{self._destinationFilePath}"')
if __name__ == '__main__':
recorder = SpotifyAudioRecorder(
deviceName='3/4 - Musik (2- GIGAPort HD Audio driver) [Loopback]',
destinationFilePath='C:/Users/RobertG/Desktop/output.wav'
)
with recorder.record() as stream:
time.sleep(5)
import json
import os.path
import time
from typing import List, Dict
import spotipy
from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
from spotipy import SpotifyOAuth, CacheFileHandler
from SpotifyAudioRecorder import SpotifyAudioRecorder
LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyRecorder', logFormat=LOG_FORMAT)
class SpotifyRecorder:
_MAX_WAIT_TIME_TRACK_STARTING_IN_S = 10
_WAIT_TIME_TRACK_PLAYING_SHORT_IN_S = 1
_THRESHOLD_TRACK_END_IN_MS = 5000
def __init__(self, clientID: str,
clientSecret: str,
redirectUrl: str,
openBrowser: bool,
cacheFilePath: str,
playlist: Dict[str, str],
spotifyDeviceName: str,
audioDeviceName: str,
destinationFolder: str):
self._clientID = clientID
self._clientSecret = clientSecret
self._redirectUrl = redirectUrl
self._openBrowser = openBrowser
self._cacheFilePath = cacheFilePath
self._playlist = playlist
self._spotifyDeviceName = spotifyDeviceName
self._audioDeviceName = audioDeviceName
self._destinationFolder = destinationFolder
os.makedirs(self._destinationFolder, exist_ok=True)
# TODO: options specify range (start / stop track) or only offset index + limit
self._spotify = self.login()
def login(self) -> spotipy.Spotify:
client_credentials_manager = SpotifyOAuth(client_id=self._clientID,
client_secret=self._clientSecret,
redirect_uri=self._redirectUrl,
scope='user-read-playback-state,user-modify-playback-state',
open_browser=self._openBrowser,
cache_handler=CacheFileHandler(cache_path=self._cacheFilePath))
return spotipy.Spotify(client_credentials_manager=client_credentials_manager)
def run(self):
LOGGER.info(f'>>> Fetching all tracks for playlist {self._playlist["name"]}...')
allTracks = []
playlist = self.__get_playlist(self._playlist['user'], self._playlist['id'])
allTracks.extend(self.__get_tracks(playlist))
LOGGER.info(f'>>> Found {len(allTracks)} tracks')
self.__record_tracks(allTracks)
def __get_playlist(self, username: str, playlistID: str) -> Dict:
LOGGER.info(f'>>> Fetching playlist with ID: {playlistID} by {username}...')
identifier = f'spotify:user:{username}:playlist:{playlistID}'
playlist = self._spotify.playlist(identifier)
LOGGER.info(f'Found playlist "{playlist["name"]}"')
return playlist
def __get_tracks(self, playlist) -> List[Dict]:
tracks = playlist['tracks']
results = tracks['items']
while tracks['next']:
tracks = self._spotify.next(tracks)
results.extend(tracks['items'])
LOGGER.info(f'Fetched {len(results)} tracks')
return results
def __extract_track_uris(self, tracks: List) -> List[str]:
return [track['track']['uri'] for track in tracks]
def __record_tracks(self, tracks: list):
deviceId = self.__get_device_id_by_name(self._spotifyDeviceName)
recordedTracks = []
skippedTracks = []
errorTracks = []
for index, track in enumerate(tracks[:2]):
if track['is_local']:
# TODO:
# It's not possible to add a local track to a playlist using the web API.
# https://github.com/plamere/spotipy/issues/793#issuecomment-1082421408
LOGGER.info(f'Skipping local track "{track["track"]["name"]}"')
skippedTracks.append(track['track']['name'])
continue
LOGGER.info(f'Recording track {index + 1}/{len(tracks)}: "{track["track"]["name"]}"...')
try:
filePath = self.__determine_file_path(index + 1, track)
recorder = SpotifyAudioRecorder(self._audioDeviceName, filePath)
with recorder.record():
self.__play_track(deviceId, track['track']['uri'])
self.__wait_for_track_playing()
self.__wait_for_track_end(track)
recordedTracks.append(track['track']['name'])
except Exception as e:
LOGGER.error(f'An error occurred while recording track "{track["track"]["name"]}"', exc_info=e)
errorTracks.append(track['track']['name'])
LOGGER.info('### DONE ###')
LOGGER.info(f'{len(tracks)} tracks, {len(recordedTracks)} recorded, {len(skippedTracks)} skipped, {len(errorTracks)} errors')
def __determine_file_path(self, index: int, track) -> str:
artists = track['track']['artists']
artists = ' & '.join(artist['name'] for artist in artists)
fileName = f'{index} - {artists} - {track["track"]["name"]}.wav'
return os.path.join(self._destinationFolder, fileName)
def __wait_for_track_end(self, track):
trackDurationInMs = track['track']['duration_ms']
trackDurationInSeconds = trackDurationInMs // 1000
LOGGER.info(f'Track duration: {self.__convert_seconds_to_duration(trackDurationInSeconds)}')
startTime = time.time()
while time.time() - startTime < trackDurationInSeconds:
currentPlayback = self._spotify.current_playback()
if currentPlayback['is_playing']:
remainingTimeInMs = trackDurationInMs - currentPlayback['progress_ms']
remainingTimeInSeconds = remainingTimeInMs // 1000
sleepTime = self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S
if remainingTimeInMs > self._THRESHOLD_TRACK_END_IN_MS:
sleepTime = remainingTimeInSeconds / 2
LOGGER.debug(f'Waiting for track to end (remaining: '
f'{self.__convert_seconds_to_duration(remainingTimeInSeconds)}, '
f'sleep: {self.__convert_seconds_to_duration(sleepTime)}s)...')
time.sleep(sleepTime)
else:
waitDuration = int(time.time() - startTime)
if waitDuration < trackDurationInSeconds:
raise RuntimeError(
f'Track finished too early (waited: {waitDuration}s, expected: {trackDurationInSeconds}s)')
if waitDuration > trackDurationInSeconds + self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S * 3:
raise RuntimeError(
f'Track finished too late (waited: {waitDuration}s, expected: {trackDurationInSeconds})')
LOGGER.debug(f'Track finished. Waited {waitDuration}s, expected {trackDurationInSeconds}s, OK')
break
@staticmethod
def __convert_seconds_to_duration(seconds: int):
secs = seconds % 60
minutes = int(seconds / 60) % 60
hours = int(seconds / (60 * 60)) % 24
return f'{str(hours).zfill(2)}:{str(minutes).zfill(2)}:{str(secs).zfill(2)}'
def __get_device_id_by_name(self, deviceName: str) -> str:
devices = self._spotify.devices()['devices']
for device in devices:
if device['name'] == deviceName:
return device['id']
raise RuntimeError('Device not found')
def __play_track(self, deviceId: str, trackUri: str):
self._spotify.start_playback(device_id=deviceId, uris=[trackUri])
def __wait_for_track_playing(self) -> None:
LOGGER.debug(f'Wait for track to start playing...')
startTime = time.time()
while time.time() - startTime < self._MAX_WAIT_TIME_TRACK_STARTING_IN_S:
if self._spotify.current_playback()['is_playing']:
LOGGER.debug(f'Track started playing after {time.time() - startTime:.1f}s')
break
time.sleep(1)
if __name__ == '__main__':
with open('config/settings-recorder.json', 'r', encoding='utf-8') as f:
SETTINGS = json.load(f)
spotifyBackup = SpotifyRecorder(SETTINGS['spotifyAPI']['clientID'],
SETTINGS['spotifyAPI']['clientSecret'],
SETTINGS['redirectUrl'],
SETTINGS['openBrowser'],
SETTINGS['cacheFilePath'],
SETTINGS['playlist'],
SETTINGS['spotifyDeviceName'],
SETTINGS['audioDeviceName'],
SETTINGS['destinationFolder'])
spotifyBackup.run()
LOGGER.info('### DONE ###')
......@@ -6,6 +6,7 @@
"playlists": [
{
"user": "",
"name": "",
"id": ""
}
],
......
{
"spotifyAPI": {
"clientID": "",
"clientSecret": ""
},
"playlist": {
"user": "",
"id": ""
},
"spotifyDeviceName": "MYDEVICE",
"audioDeviceName": "3/4 - Musik (2- GIGAPort HD Audio driver) [Loopback]",
"destinationFolder": "",
"redirectUrl": "http://localhost:8080",
"openBrowser": false,
"cacheFilePath": ".cache"
}
\ No newline at end of file
This diff is collapsed.
......@@ -13,6 +13,7 @@ priority = "explicit"
python = "^3.11"
thecodelabs-baseutils = {version = "*", source = "TheCodeLabs" }
spotipy = "2.22.0"
PyAudioWPatch = "0.2.12.6"
[tool.poetry.dev-dependencies]
......