diff --git a/src/blueprints/Routes.py b/src/blueprints/Routes.py index e6a590610f5881dffa83cf792bed3ac6b5efcb5d..bec5af42a2d816e102a55d7bb449ecb137d81eb9 100644 --- a/src/blueprints/Routes.py +++ b/src/blueprints/Routes.py @@ -51,59 +51,59 @@ def construct_blueprint(settings, version): @routes.route('/devices', methods=['GET']) def get_all_devices(): database = Database(settings['database']['databasePath']) - return jsonify(database.get_all_devices()) + return jsonify(database.deviceAccess.get_all_devices()) @routes.route('/device/<int:deviceID>', methods=['GET']) def get_device(deviceID): database = Database(settings['database']['databasePath']) - return jsonify(database.get_device(deviceID)) + return jsonify(database.deviceAccess.get_device(deviceID)) @routes.route('/sensors', methods=['GET']) def get_all_sensors(): database = Database(settings['database']['databasePath']) - return jsonify(database.get_all_sensors()) + return jsonify(database.sensorAccess.get_all_sensors()) @routes.route('/sensor/<int:sensorID>', methods=['GET']) def get_sensor(sensorID): database = Database(settings['database']['databasePath']) - return jsonify(database.get_sensor(sensorID)) + return jsonify(database.sensorAccess.get_sensor(sensorID)) @routes.route('/device/<int:deviceID>/sensors/', methods=['GET']) def get_all_sensors_for_device(deviceID): database = Database(settings['database']['databasePath']) - device = database.get_device(deviceID) + device = database.deviceAccess.get_device(deviceID) if not device: return jsonify({'success': False, 'msg': f'No device with id "{deviceID}" existing'}) - return jsonify(database.get_all_sensors_for_device(deviceID)) + return jsonify(database.sensorAccess.get_all_sensors_for_device(deviceID)) @routes.route('/measurements', methods=['GET']) def get_all_measurements(): database = Database(settings['database']['databasePath']) - return jsonify(database.get_all_measurements()) + return jsonify(database.measurementAccess.get_all_measurements()) @routes.route('/measurement/<int:measurementID>', methods=['GET']) def get_measurement(measurementID): database = Database(settings['database']['databasePath']) - return jsonify(database.get_measurement(measurementID)) + return jsonify(database.measurementAccess.get_measurement(measurementID)) @routes.route('/sensor/<sensorID>/measurements', methods=['GET']) def get_all_measurements_for_sensor(sensorID): database = Database(settings['database']['databasePath']) - sensor = database.get_sensor(sensorID) + sensor = database.sensorAccess.get_sensor(sensorID) if not sensor: return jsonify({'success': False, 'msg': f'No sensor with id "{sensorID}" existing'}) - return jsonify(database.get_all_measurements_for_sensor(sensorID)) + return jsonify(database.measurementAccess.get_all_measurements_for_sensor(sensorID)) @routes.route('/sensor/<sensorID>/measurements/latest', methods=['GET']) def get_latest_measurements_for_sensor(sensorID): database = Database(settings['database']['databasePath']) - sensor = database.get_sensor(sensorID) + sensor = database.sensorAccess.get_sensor(sensorID) if not sensor: return jsonify({'success': False, 'msg': f'No sensor with id "{sensorID}" existing'}) - return jsonify(database.get_latest_measurements_for_sensor(sensorID)) + return jsonify(database.measurementAccess.get_latest_measurements_for_sensor(sensorID)) @routes.route('/measurements', methods=['POST']) @require_api_key(password=settings['api']['key']) @@ -113,9 +113,9 @@ def construct_blueprint(settings, version): database = Database(settings['database']['databasePath']) deviceName = parameters[DeviceParameters.DEVICE.value] - if not database.get_device_by_name(deviceName): - database.add_device(deviceName) - device = database.get_device_by_name(deviceName) + if not database.deviceAccess.get_device_by_name(deviceName): + database.deviceAccess.add_device(deviceName) + device = database.deviceAccess.get_device_by_name(deviceName) sensors = parameters[DeviceParameters.SENSORS.value] for sensor in sensors: @@ -123,7 +123,8 @@ def construct_blueprint(settings, version): SensorParameters.get_values(), f'sensor "{sensor}"') sensor = __add_sensor_if_not_exists(database, int(device['id']), sensorParams) - database.add_measurement(int(sensor['id']), sensorParams[SensorParameters.VALUE.value]) + database.measurementAccess.add_measurement(int(sensor['id']), + sensorParams[SensorParameters.VALUE.value]) except ValidationError as e: return e.response, 400 @@ -132,11 +133,11 @@ def construct_blueprint(settings, version): def __add_sensor_if_not_exists(database: Database, deviceID: int, sensorParams: Dict) -> Dict[str, str]: sensorName = sensorParams[SensorParameters.NAME.value] sensorType = sensorParams[SensorParameters.TYPE.value] - sensor = database.get_sensor_by_name_and_device_id(deviceID, sensorName) + sensor = database.sensorAccess.get_sensor_by_name_and_device_id(deviceID, sensorName) if sensor: return sensor - database.add_sensor(deviceID, sensorName, sensorType) - return database.get_sensor_by_name_and_device_id(deviceID, sensorName) + database.sensorAccess.add_sensor(deviceID, sensorName, sensorType) + return database.sensorAccess.get_sensor_by_name_and_device_id(deviceID, sensorName) return routes diff --git a/src/logic/Database.py b/src/logic/Database.py index e54445c41550f12532bb2b287358fa16cf95ea8d..2951ce23313eca82fdce8d332e584b957fb7de0a 100644 --- a/src/logic/Database.py +++ b/src/logic/Database.py @@ -1,140 +1,26 @@ -import sqlite3 -from datetime import datetime from enum import Enum -from typing import Dict, List from TheCodeLabs_BaseUtils import DefaultLogger from logic import Constants +from logic.DeviceAccess import DeviceAccess +from logic.MeasurementAccess import MeasurementAccess +from logic.SensorAccess import SensorAccess LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME) -class FetchType(Enum): - NONE = 1 - ONE = 2 - ALL = 3 - - class Database: - TABLE_DEVICE = 'device' - TABLE_SENSOR = 'sensor' - TABLE_MEASUREMENT = 'measurement' - - DATE_FORMAT = '%Y-%m-%d %H:%M:%S' - - @staticmethod - def namedtuple_factory(cursor, row): - """ - Returns sqlite rows as dicts. - """ - d = {} - for idx, col in enumerate(cursor.description): - d[col[0]] = row[idx] - return d - def __init__(self, databasePath): self._databasePath = databasePath + self.deviceAccess = DeviceAccess(databasePath) + self.sensorAccess = SensorAccess(databasePath) + self.measurementAccess = MeasurementAccess(databasePath) + self.__create_database() def __create_database(self): LOGGER.debug('Creating database tables...') - self.__query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_DEVICE} ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT NOT NULL)''', fetch_type=FetchType.NONE) - self.__query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_SENSOR} ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - device_id INTEGER, - name TEXT NOT NULL, - type TEXT NOT NULL)''', fetch_type=FetchType.NONE) - self.__query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_MEASUREMENT} ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - sensor_id INTEGER, - value TEXT NOT NULL, - timestamp TEXT NOT NULL)''', fetch_type=FetchType.NONE) - - def __query(self, query, *args, fetch_type=FetchType.ALL): - connection = sqlite3.connect(self._databasePath) - connection.row_factory = Database.namedtuple_factory - - with connection: - cursor = connection.cursor() - try: - cursor.execute(query, args) - - if fetch_type == FetchType.ONE: - return cursor.fetchone() - if fetch_type == FetchType.ALL: - return cursor.fetchall() - finally: - cursor.close() - - def __get_current_datetime(self): - return datetime.strftime(datetime.now(), self.DATE_FORMAT) - - def get_all_devices(self) -> List[Dict[str, str]]: - return self.__query(f'SELECT * FROM {self.TABLE_DEVICE} ORDER BY name', fetch_type=FetchType.ALL) - - def get_device(self, deviceID: int) -> Dict[str, str] or None: - return self.__query(f'SELECT * FROM {self.TABLE_DEVICE} WHERE id = ?', deviceID, fetch_type=FetchType.ONE) - - def get_device_by_name(self, deviceName: str) -> Dict[str, str] or None: - return self.__query(f'SELECT * FROM {self.TABLE_DEVICE} WHERE name = ?', deviceName, fetch_type=FetchType.ONE) - - def add_device(self, deviceName: str): - LOGGER.debug(f'Inserting new device "{deviceName}"') - self.__query(f'INSERT INTO {self.TABLE_DEVICE}(name) VALUES(?)', deviceName, fetch_type=FetchType.NONE) - - def get_all_sensors(self) -> List[Dict[str, str]]: - return self.__query(f'SELECT * FROM {self.TABLE_SENSOR} ORDER BY device_id, name', fetch_type=FetchType.ALL) - - def get_all_sensors_for_device(self, deviceID: int) -> List[Dict[str, str]]: - return self.__query(f'SELECT * FROM {self.TABLE_SENSOR} WHERE device_id = ? ORDER BY name', - deviceID, - fetch_type=FetchType.ALL) - - def get_sensor(self, sensorID: int) -> Dict[str, str] or None: - return self.__query(f'SELECT * FROM {self.TABLE_SENSOR} WHERE id = ?', - sensorID, - fetch_type=FetchType.ONE) - - def get_sensor_by_name_and_device_id(self, deviceID: int, sensorName: str) -> Dict[str, str] or None: - return self.__query(f'SELECT * FROM {self.TABLE_SENSOR} WHERE device_id = ? AND name = ?', - deviceID, sensorName, - fetch_type=FetchType.ONE) - - def add_sensor(self, deviceID: int, name: str, sensorType: str): - LOGGER.debug(f'Inserting new "{sensorType}" sensor "{name}" for device "{deviceID}"') - self.__query(f'INSERT INTO {self.TABLE_SENSOR}(name, device_id, type) ' - f'VALUES(?, ?, ?)', - name, deviceID, sensorType, - fetch_type=FetchType.NONE) - - def get_all_measurements(self) -> List[Dict[str, str]]: - return self.__query(f'SELECT * FROM {self.TABLE_MEASUREMENT} ORDER BY sensor_id ASC, datetime(timestamp) DESC', - fetch_type=FetchType.ALL) - - def get_measurement(self, measurementID: int) -> Dict[str, str] or None: - return self.__query(f'SELECT * FROM {self.TABLE_MEASUREMENT} WHERE id = ?', - measurementID, - fetch_type=FetchType.ALL) - - def get_all_measurements_for_sensor(self, sensorID: int) -> List[Dict[str, str]]: - return self.__query(f'SELECT * FROM {self.TABLE_MEASUREMENT} WHERE sensor_id = ? ' - f'ORDER BY datetime(timestamp) DESC', - sensorID, - fetch_type=FetchType.ALL) - - def get_latest_measurements_for_sensor(self, sensorID: int) -> Dict[str, str] or None: - return self.__query(f'SELECT * FROM {self.TABLE_MEASUREMENT} WHERE sensor_id = ? ' - f'ORDER BY datetime(timestamp) DESC LIMIT 1', - sensorID, - fetch_type=FetchType.ONE) - - def add_measurement(self, sensorID: int, value: str): - sensor = self.get_sensor(sensorID) - LOGGER.debug(f'Inserting new measurement for sensor "{sensor["name"]}" ' - f'(value: "{value}", device_id "{sensor["device_id"]}")') - self.__query(f'INSERT INTO {self.TABLE_MEASUREMENT}(sensor_id, value, timestamp ) VALUES(?, ?, ?)', - sensorID, value, self.__get_current_datetime(), - fetch_type=FetchType.NONE) + self.deviceAccess.create_table() + self.sensorAccess.create_table() + self.measurementAccess.create_table() diff --git a/src/logic/DatabaseAccess.py b/src/logic/DatabaseAccess.py new file mode 100644 index 0000000000000000000000000000000000000000..b4649ea07e1b4b68b6b16e999aa75f3156499393 --- /dev/null +++ b/src/logic/DatabaseAccess.py @@ -0,0 +1,53 @@ +import abc +import sqlite3 +from abc import ABC +from enum import Enum + +from TheCodeLabs_BaseUtils import DefaultLogger + +from logic import Constants + +LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME) + + +class FetchType(Enum): + NONE = 1 + ONE = 2 + ALL = 3 + + +class DatabaseAccess(ABC): + DATE_FORMAT = '%Y-%m-%d %H:%M:%S' + + @staticmethod + def namedtuple_factory(cursor, row): + """ + Returns sqlite rows as dicts. + """ + d = {} + for idx, col in enumerate(cursor.description): + d[col[0]] = row[idx] + return d + + def __init__(self, databasePath): + self._databasePath = databasePath + + @abc.abstractmethod + def create_table(self): + pass + + def _query(self, query, *args, fetch_type=FetchType.ALL): + connection = sqlite3.connect(self._databasePath) + connection.row_factory = DatabaseAccess.namedtuple_factory + + with connection: + cursor = connection.cursor() + try: + cursor.execute(query, args) + + if fetch_type == FetchType.ONE: + return cursor.fetchone() + if fetch_type == FetchType.ALL: + return cursor.fetchall() + finally: + cursor.close() diff --git a/src/logic/DeviceAccess.py b/src/logic/DeviceAccess.py new file mode 100644 index 0000000000000000000000000000000000000000..8be7f86efd9b1e823d22ee0ca1bbd0284d8cedd2 --- /dev/null +++ b/src/logic/DeviceAccess.py @@ -0,0 +1,30 @@ +from typing import Dict, List + +from TheCodeLabs_BaseUtils import DefaultLogger + +from logic import Constants +from logic.DatabaseAccess import DatabaseAccess, FetchType + +LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME) + + +class DeviceAccess(DatabaseAccess): + TABLE_NAME = 'device' + + def create_table(self): + self._query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT NOT NULL)''', fetch_type=FetchType.NONE) + + def get_all_devices(self) -> List[Dict[str, str]]: + return self._query(f'SELECT * FROM {self.TABLE_NAME} ORDER BY name', fetch_type=FetchType.ALL) + + def get_device(self, deviceID: int) -> Dict[str, str] or None: + return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE id = ?', deviceID, fetch_type=FetchType.ONE) + + def get_device_by_name(self, deviceName: str) -> Dict[str, str] or None: + return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE name = ?', deviceName, fetch_type=FetchType.ONE) + + def add_device(self, deviceName: str): + LOGGER.debug(f'Inserting new device "{deviceName}"') + self._query(f'INSERT INTO {self.TABLE_NAME}(name) VALUES(?)', deviceName, fetch_type=FetchType.NONE) diff --git a/src/logic/MeasurementAccess.py b/src/logic/MeasurementAccess.py new file mode 100644 index 0000000000000000000000000000000000000000..d156a3c0a86be7ca6b2011216ab2f74abd471aa0 --- /dev/null +++ b/src/logic/MeasurementAccess.py @@ -0,0 +1,50 @@ +from datetime import datetime +from typing import Dict, List + +from TheCodeLabs_BaseUtils import DefaultLogger + +from logic import Constants +from logic.DatabaseAccess import DatabaseAccess, FetchType + +LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME) + + +class MeasurementAccess(DatabaseAccess): + TABLE_NAME = 'measurement' + + def create_table(self): + self._query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + sensor_id INTEGER, + value TEXT NOT NULL, + timestamp TEXT NOT NULL)''', fetch_type=FetchType.NONE) + + def __get_current_datetime(self): + return datetime.strftime(datetime.now(), self.DATE_FORMAT) + + def get_all_measurements(self) -> List[Dict[str, str]]: + return self._query(f'SELECT * FROM {self.TABLE_NAME} ORDER BY sensor_id ASC, datetime(timestamp) DESC', + fetch_type=FetchType.ALL) + + def get_measurement(self, measurementID: int) -> Dict[str, str] or None: + return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE id = ?', + measurementID, + fetch_type=FetchType.ALL) + + def get_all_measurements_for_sensor(self, sensorID: int) -> List[Dict[str, str]]: + return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE sensor_id = ? ' + f'ORDER BY datetime(timestamp) DESC', + sensorID, + fetch_type=FetchType.ALL) + + def get_latest_measurements_for_sensor(self, sensorID: int) -> Dict[str, str] or None: + return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE sensor_id = ? ' + f'ORDER BY datetime(timestamp) DESC LIMIT 1', + sensorID, + fetch_type=FetchType.ONE) + + def add_measurement(self, sensorID: int, value: str): + LOGGER.debug(f'Inserting new measurement for sensor "{sensorID}" (value: "{value}")') + self._query(f'INSERT INTO {self.TABLE_NAME}(sensor_id, value, timestamp ) VALUES(?, ?, ?)', + sensorID, value, self.__get_current_datetime(), + fetch_type=FetchType.NONE) diff --git a/src/logic/SensorAccess.py b/src/logic/SensorAccess.py new file mode 100644 index 0000000000000000000000000000000000000000..59cedfd7d05137f410b78d6b0bec2f7048df002a --- /dev/null +++ b/src/logic/SensorAccess.py @@ -0,0 +1,44 @@ +from typing import Dict, List + +from TheCodeLabs_BaseUtils import DefaultLogger + +from logic import Constants +from logic.DatabaseAccess import DatabaseAccess, FetchType + +LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME) + + +class SensorAccess(DatabaseAccess): + TABLE_NAME = 'sensor' + + def create_table(self): + self._query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + device_id INTEGER, + name TEXT NOT NULL, + type TEXT NOT NULL)''', fetch_type=FetchType.NONE) + + def get_all_sensors(self) -> List[Dict[str, str]]: + return self._query(f'SELECT * FROM {self.TABLE_NAME} ORDER BY device_id, name', fetch_type=FetchType.ALL) + + def get_all_sensors_for_device(self, deviceID: int) -> List[Dict[str, str]]: + return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE device_id = ? ORDER BY name', + deviceID, + fetch_type=FetchType.ALL) + + def get_sensor(self, sensorID: int) -> Dict[str, str] or None: + return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE id = ?', + sensorID, + fetch_type=FetchType.ONE) + + def get_sensor_by_name_and_device_id(self, deviceID: int, sensorName: str) -> Dict[str, str] or None: + return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE device_id = ? AND name = ?', + deviceID, sensorName, + fetch_type=FetchType.ONE) + + def add_sensor(self, deviceID: int, name: str, sensorType: str): + LOGGER.debug(f'Inserting new "{sensorType}" sensor "{name}" for device "{deviceID}"') + self._query(f'INSERT INTO {self.TABLE_NAME}(name, device_id, type) ' + f'VALUES(?, ?, ?)', + name, deviceID, sensorType, + fetch_type=FetchType.NONE)