Skip to content
Snippets Groups Projects
Commit 29a6f060 authored by Robert Goldmann's avatar Robert Goldmann
Browse files

added new class for recording spotify tracks as wav file

parent 671e13af
No related branches found
No related tags found
No related merge requests found
...@@ -2,6 +2,8 @@ exports/ ...@@ -2,6 +2,8 @@ exports/
config/settings.json config/settings.json
config/settings-import.json config/settings-import.json
config/settings-creator.json config/settings-creator.json
config/settings-recorder.json
.cache .cache
.cache_creator .cache_creator
.cache_recorder
import threading
import time
import wave
import pyaudiowpatch as pyaudio
from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyAudioRecorder', logFormat=LOG_FORMAT)
class SpotifyAudioRecorder(threading.Thread):
_CHUNK_SIZE = 512
def __init__(self, deviceName: str, destinationFilePath: str):
threading.Thread.__init__(self, daemon=True)
self._deviceName = deviceName
self._destinationFilePath = destinationFilePath
self._stopEvent = threading.Event()
def __get_device_by_name(self, pyaudioInstance, deviceName: str):
deviceNames = []
for loopback in pyaudioInstance.get_loopback_device_info_generator():
deviceNames.append(loopback['name'])
if deviceName in loopback['name']:
return loopback
else:
message = f'Device with name {self._deviceName} not found.'
message += f'\nAvailable devices:'
for deviceName in deviceNames:
message += f'\n\t{deviceName}'
raise RuntimeError(message)
def run(self):
with pyaudio.PyAudio() as pyaudioInstance:
device = self.__get_device_by_name(pyaudioInstance, self._deviceName)
LOGGER.info(f'Recording from: ({device["index"]}) {device["name"]}')
waveFile = wave.open(self._destinationFilePath, 'wb')
waveFile.setnchannels(device['maxInputChannels'])
waveFile.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
waveFile.setframerate(int(device['defaultSampleRate']))
def callback(in_data, frame_count, time_info, status):
waveFile.writeframes(in_data)
return in_data, pyaudio.paContinue
with pyaudioInstance.open(format=pyaudio.paInt16,
channels=device['maxInputChannels'],
rate=int(device['defaultSampleRate']),
frames_per_buffer=self._CHUNK_SIZE,
input=True,
input_device_index=device['index'],
stream_callback=callback
):
while not self._stopEvent.is_set():
time.sleep(1)
waveFile.close()
LOGGER.info(f'Recording stopped. File: "{self._destinationFilePath}"')
def stop(self):
if not self._stopEvent.is_set():
self._stopEvent.set()
if __name__ == '__main__':
recorder = SpotifyAudioRecorder(
deviceName='3/4 - Musik (2- GIGAPort HD Audio driver) [Loopback]',
destinationFilePath='C:/Users/RobertG/Desktop/output.wav'
)
recorder.start()
time.sleep(5)
recorder.stop()
import json
import os.path
import time
from typing import List, Dict
import spotipy
from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
from spotipy import SpotifyOAuth, CacheFileHandler
from SpotifyAudioRecorder import SpotifyAudioRecorder
LOG_FORMAT = '[%(levelname)-7s] - %(asctime)s - %(message)s'
LOGGER = DefaultLogger().create_logger_if_not_exists('SpotifyRecorder', logFormat=LOG_FORMAT)
class SpotifyRecorder:
_MAX_WAIT_TIME_TRACK_STARTING_IN_S = 10
_WAIT_TIME_TRACK_PLAYING_IN_S = 10
_WAIT_TIME_TRACK_PLAYING_SHORT_IN_S = 1
_THRESHOLD_TRACK_END_IN_MS = 5000
def __init__(self, clientID: str,
clientSecret: str,
redirectUrl: str,
openBrowser: bool,
cacheFilePath: str,
playlist: Dict[str, str],
spotifyDeviceName: str,
audioDeviceName: str,
destinationFolder: str):
self._clientID = clientID
self._clientSecret = clientSecret
self._redirectUrl = redirectUrl
self._openBrowser = openBrowser
self._cacheFilePath = cacheFilePath
self._playlist = playlist
self._spotifyDeviceName = spotifyDeviceName
self._audioDeviceName = audioDeviceName
self._destinationFolder = destinationFolder
os.makedirs(self._destinationFolder, exist_ok=True)
# TODO: options specify range (start / stop track) or only offset index + limit
self._spotify = self.login()
def login(self) -> spotipy.Spotify:
client_credentials_manager = SpotifyOAuth(client_id=self._clientID,
client_secret=self._clientSecret,
redirect_uri=self._redirectUrl,
scope='user-read-playback-state,user-modify-playback-state',
open_browser=self._openBrowser,
cache_handler=CacheFileHandler(cache_path=self._cacheFilePath))
return spotipy.Spotify(client_credentials_manager=client_credentials_manager)
def run(self):
LOGGER.info(f'>>> Fetching all tracks for playlist {self._playlist["name"]}...')
allTracks = []
playlist = self.__get_playlist(self._playlist['user'], self._playlist['id'])
allTracks.extend(self.__get_tracks(playlist))
LOGGER.info(f'>>> Found {len(allTracks)} tracks')
self.__record_tracks(allTracks)
def __get_playlist(self, username: str, playlistID: str) -> Dict:
LOGGER.info(f'>>> Fetching playlist with ID: {playlistID} by {username}...')
identifier = f'spotify:user:{username}:playlist:{playlistID}'
playlist = self._spotify.playlist(identifier)
LOGGER.info(f'Found playlist "{playlist["name"]}"')
return playlist
def __get_tracks(self, playlist) -> List[Dict]:
tracks = playlist['tracks']
results = tracks['items']
while tracks['next']:
tracks = self._spotify.next(tracks)
results.extend(tracks['items'])
LOGGER.info(f'Fetched {len(results)} tracks')
return results
def __extract_track_uris(self, tracks: List) -> List[str]:
return [track['track']['uri'] for track in tracks]
def __record_tracks(self, tracks: list):
deviceId = self.__get_device_id_by_name(self._spotifyDeviceName)
recordedTracks = []
skippedTracks = []
for index, track in enumerate(tracks[:2]):
if track['is_local']:
# TODO:
# It's not possible to add a local track to a playlist using the web API.
# https://github.com/plamere/spotipy/issues/793#issuecomment-1082421408
LOGGER.info(f'Skipping local track "{track["track"]["name"]}"')
skippedTracks.append(track['track']['name'])
continue
LOGGER.info(f'Recording track {index + 1}/{len(tracks)}: "{track["track"]["name"]}"...')
recorder = None
try:
filePath = self.__determine_file_path(index, track)
recorder = SpotifyAudioRecorder(self._audioDeviceName, filePath)
recorder.start()
self.__play_track(deviceId, track['track']['uri'])
self.__wait_for_track_playing()
self.__wait_for_track_end(track)
recorder.stop()
recordedTracks.append(track['track']['name'])
except Exception as e:
LOGGER.error(f'An error occurred while recording track "{track["track"]["name"]}"', exc_info=e)
skippedTracks.append(track['track']['name'])
if recorder is not None and recorder.is_alive():
recorder.stop()
LOGGER.info('### DONE ###')
LOGGER.info(f'{len(tracks)} tracks, {len(recordedTracks)} recorded, {len(skippedTracks)} skipped')
def __determine_file_path(self, index: int, track) -> str:
artists = track['track']['artists']
artists = ' & '.join(artist['name'] for artist in artists)
fileName = f'{index} - {artists} - {track["track"]["name"]}.wav'
return os.path.join(self._destinationFolder, fileName)
def __wait_for_track_end(self, track):
trackDurationInMs = track['track']['duration_ms']
trackDurationInSeconds = trackDurationInMs // 1000
LOGGER.info(f'Track duration: {self.__convert_seconds_to_duration(trackDurationInSeconds)}')
startTime = time.time()
while time.time() - startTime < trackDurationInSeconds:
currentPlayback = self._spotify.current_playback()
if currentPlayback['is_playing']:
remainingTimeInMs = trackDurationInMs - currentPlayback['progress_ms']
if remainingTimeInMs < self._THRESHOLD_TRACK_END_IN_MS:
LOGGER.debug(f'Waiting for track to end (remaining: '
f'{self.__convert_seconds_to_duration(remainingTimeInMs // 1000)}, '
f'sleep: {self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S}s)...')
time.sleep(self._WAIT_TIME_TRACK_PLAYING_SHORT_IN_S)
else:
LOGGER.debug(f'Waiting for track to end (remaining:'
f' {self.__convert_seconds_to_duration(remainingTimeInMs // 1000)}, '
f'sleep: {self._WAIT_TIME_TRACK_PLAYING_IN_S}s)...')
time.sleep(self._WAIT_TIME_TRACK_PLAYING_IN_S)
continue
else:
waitDuration = int(time.time() - startTime)
if waitDuration < trackDurationInSeconds:
raise RuntimeError(
f'Track finished too early (waited: {waitDuration}s, expected: {trackDurationInSeconds}s)')
if waitDuration > trackDurationInSeconds + self._WAIT_TIME_TRACK_PLAYING_IN_S:
raise RuntimeError(
f'Track finished too late (waited: {waitDuration}s, expected: {trackDurationInSeconds})')
LOGGER.debug(f'Track finished. Waited {waitDuration}s, expected {trackDurationInSeconds}s, OK')
break
@staticmethod
def __convert_seconds_to_duration(seconds: int):
secs = seconds % 60
minutes = int(seconds / 60) % 60
hours = int(seconds / (60 * 60)) % 24
return f'{str(hours).zfill(2)}:{str(minutes).zfill(2)}:{str(secs).zfill(2)}'
def __get_device_id_by_name(self, deviceName: str) -> str:
devices = self._spotify.devices()['devices']
for device in devices:
if device['name'] == deviceName:
return device['id']
raise RuntimeError('Device not found')
def __play_track(self, deviceId: str, trackUri: str):
self._spotify.start_playback(device_id=deviceId, uris=[trackUri])
def __wait_for_track_playing(self) -> None:
LOGGER.debug(f'Wait for track to start playing...')
startTime = time.time()
while time.time() - startTime < self._MAX_WAIT_TIME_TRACK_STARTING_IN_S:
if self._spotify.current_playback()['is_playing']:
LOGGER.debug(f'Track started playing after {time.time() - startTime}s')
break
time.sleep(1)
if __name__ == '__main__':
with open('config/settings-recorder.json', 'r', encoding='utf-8') as f:
SETTINGS = json.load(f)
spotifyBackup = SpotifyRecorder(SETTINGS['spotifyAPI']['clientID'],
SETTINGS['spotifyAPI']['clientSecret'],
SETTINGS['redirectUrl'],
SETTINGS['openBrowser'],
SETTINGS['cacheFilePath'],
SETTINGS['playlist'],
SETTINGS['spotifyDeviceName'],
SETTINGS['audioDeviceName'],
SETTINGS['destinationFolder'])
spotifyBackup.run()
LOGGER.info('### DONE ###')
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
"playlists": [ "playlists": [
{ {
"user": "", "user": "",
"name": "",
"id": "" "id": ""
} }
], ],
......
{
"spotifyAPI": {
"clientID": "",
"clientSecret": ""
},
"playlist": {
"user": "",
"id": ""
},
"spotifyDeviceName": "MYDEVICE",
"audioDeviceName": "3/4 - Musik (2- GIGAPort HD Audio driver) [Loopback]",
"destinationFolder": "",
"redirectUrl": "http://localhost:8080",
"openBrowser": false,
"cacheFilePath": ".cache"
}
\ No newline at end of file
This diff is collapsed.
...@@ -13,6 +13,7 @@ priority = "explicit" ...@@ -13,6 +13,7 @@ priority = "explicit"
python = "^3.11" python = "^3.11"
thecodelabs-baseutils = {version = "*", source = "TheCodeLabs" } thecodelabs-baseutils = {version = "*", source = "TheCodeLabs" }
spotipy = "2.22.0" spotipy = "2.22.0"
PyAudioWPatch = "0.2.12.6"
[tool.poetry.dev-dependencies] [tool.poetry.dev-dependencies]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment