Skip to content
Snippets Groups Projects
Commit f27ebdca authored by Robert Goldmann's avatar Robert Goldmann
Browse files

re-structured database access (splitted into separate classes)

parent 205d6b2d
Branches
Tags
No related merge requests found
...@@ -51,59 +51,59 @@ def construct_blueprint(settings, version): ...@@ -51,59 +51,59 @@ def construct_blueprint(settings, version):
@routes.route('/devices', methods=['GET']) @routes.route('/devices', methods=['GET'])
def get_all_devices(): def get_all_devices():
database = Database(settings['database']['databasePath']) database = Database(settings['database']['databasePath'])
return jsonify(database.get_all_devices()) return jsonify(database.deviceAccess.get_all_devices())
@routes.route('/device/<int:deviceID>', methods=['GET']) @routes.route('/device/<int:deviceID>', methods=['GET'])
def get_device(deviceID): def get_device(deviceID):
database = Database(settings['database']['databasePath']) database = Database(settings['database']['databasePath'])
return jsonify(database.get_device(deviceID)) return jsonify(database.deviceAccess.get_device(deviceID))
@routes.route('/sensors', methods=['GET']) @routes.route('/sensors', methods=['GET'])
def get_all_sensors(): def get_all_sensors():
database = Database(settings['database']['databasePath']) database = Database(settings['database']['databasePath'])
return jsonify(database.get_all_sensors()) return jsonify(database.sensorAccess.get_all_sensors())
@routes.route('/sensor/<int:sensorID>', methods=['GET']) @routes.route('/sensor/<int:sensorID>', methods=['GET'])
def get_sensor(sensorID): def get_sensor(sensorID):
database = Database(settings['database']['databasePath']) database = Database(settings['database']['databasePath'])
return jsonify(database.get_sensor(sensorID)) return jsonify(database.sensorAccess.get_sensor(sensorID))
@routes.route('/device/<int:deviceID>/sensors/', methods=['GET']) @routes.route('/device/<int:deviceID>/sensors/', methods=['GET'])
def get_all_sensors_for_device(deviceID): def get_all_sensors_for_device(deviceID):
database = Database(settings['database']['databasePath']) database = Database(settings['database']['databasePath'])
device = database.get_device(deviceID) device = database.deviceAccess.get_device(deviceID)
if not device: if not device:
return jsonify({'success': False, 'msg': f'No device with id "{deviceID}" existing'}) return jsonify({'success': False, 'msg': f'No device with id "{deviceID}" existing'})
return jsonify(database.get_all_sensors_for_device(deviceID)) return jsonify(database.sensorAccess.get_all_sensors_for_device(deviceID))
@routes.route('/measurements', methods=['GET']) @routes.route('/measurements', methods=['GET'])
def get_all_measurements(): def get_all_measurements():
database = Database(settings['database']['databasePath']) database = Database(settings['database']['databasePath'])
return jsonify(database.get_all_measurements()) return jsonify(database.measurementAccess.get_all_measurements())
@routes.route('/measurement/<int:measurementID>', methods=['GET']) @routes.route('/measurement/<int:measurementID>', methods=['GET'])
def get_measurement(measurementID): def get_measurement(measurementID):
database = Database(settings['database']['databasePath']) database = Database(settings['database']['databasePath'])
return jsonify(database.get_measurement(measurementID)) return jsonify(database.measurementAccess.get_measurement(measurementID))
@routes.route('/sensor/<sensorID>/measurements', methods=['GET']) @routes.route('/sensor/<sensorID>/measurements', methods=['GET'])
def get_all_measurements_for_sensor(sensorID): def get_all_measurements_for_sensor(sensorID):
database = Database(settings['database']['databasePath']) database = Database(settings['database']['databasePath'])
sensor = database.get_sensor(sensorID) sensor = database.sensorAccess.get_sensor(sensorID)
if not sensor: if not sensor:
return jsonify({'success': False, 'msg': f'No sensor with id "{sensorID}" existing'}) return jsonify({'success': False, 'msg': f'No sensor with id "{sensorID}" existing'})
return jsonify(database.get_all_measurements_for_sensor(sensorID)) return jsonify(database.measurementAccess.get_all_measurements_for_sensor(sensorID))
@routes.route('/sensor/<sensorID>/measurements/latest', methods=['GET']) @routes.route('/sensor/<sensorID>/measurements/latest', methods=['GET'])
def get_latest_measurements_for_sensor(sensorID): def get_latest_measurements_for_sensor(sensorID):
database = Database(settings['database']['databasePath']) database = Database(settings['database']['databasePath'])
sensor = database.get_sensor(sensorID) sensor = database.sensorAccess.get_sensor(sensorID)
if not sensor: if not sensor:
return jsonify({'success': False, 'msg': f'No sensor with id "{sensorID}" existing'}) return jsonify({'success': False, 'msg': f'No sensor with id "{sensorID}" existing'})
return jsonify(database.get_latest_measurements_for_sensor(sensorID)) return jsonify(database.measurementAccess.get_latest_measurements_for_sensor(sensorID))
@routes.route('/measurements', methods=['POST']) @routes.route('/measurements', methods=['POST'])
@require_api_key(password=settings['api']['key']) @require_api_key(password=settings['api']['key'])
...@@ -113,9 +113,9 @@ def construct_blueprint(settings, version): ...@@ -113,9 +113,9 @@ def construct_blueprint(settings, version):
database = Database(settings['database']['databasePath']) database = Database(settings['database']['databasePath'])
deviceName = parameters[DeviceParameters.DEVICE.value] deviceName = parameters[DeviceParameters.DEVICE.value]
if not database.get_device_by_name(deviceName): if not database.deviceAccess.get_device_by_name(deviceName):
database.add_device(deviceName) database.deviceAccess.add_device(deviceName)
device = database.get_device_by_name(deviceName) device = database.deviceAccess.get_device_by_name(deviceName)
sensors = parameters[DeviceParameters.SENSORS.value] sensors = parameters[DeviceParameters.SENSORS.value]
for sensor in sensors: for sensor in sensors:
...@@ -123,7 +123,8 @@ def construct_blueprint(settings, version): ...@@ -123,7 +123,8 @@ def construct_blueprint(settings, version):
SensorParameters.get_values(), SensorParameters.get_values(),
f'sensor "{sensor}"') f'sensor "{sensor}"')
sensor = __add_sensor_if_not_exists(database, int(device['id']), sensorParams) sensor = __add_sensor_if_not_exists(database, int(device['id']), sensorParams)
database.add_measurement(int(sensor['id']), sensorParams[SensorParameters.VALUE.value]) database.measurementAccess.add_measurement(int(sensor['id']),
sensorParams[SensorParameters.VALUE.value])
except ValidationError as e: except ValidationError as e:
return e.response, 400 return e.response, 400
...@@ -132,11 +133,11 @@ def construct_blueprint(settings, version): ...@@ -132,11 +133,11 @@ def construct_blueprint(settings, version):
def __add_sensor_if_not_exists(database: Database, deviceID: int, sensorParams: Dict) -> Dict[str, str]: def __add_sensor_if_not_exists(database: Database, deviceID: int, sensorParams: Dict) -> Dict[str, str]:
sensorName = sensorParams[SensorParameters.NAME.value] sensorName = sensorParams[SensorParameters.NAME.value]
sensorType = sensorParams[SensorParameters.TYPE.value] sensorType = sensorParams[SensorParameters.TYPE.value]
sensor = database.get_sensor_by_name_and_device_id(deviceID, sensorName) sensor = database.sensorAccess.get_sensor_by_name_and_device_id(deviceID, sensorName)
if sensor: if sensor:
return sensor return sensor
database.add_sensor(deviceID, sensorName, sensorType) database.sensorAccess.add_sensor(deviceID, sensorName, sensorType)
return database.get_sensor_by_name_and_device_id(deviceID, sensorName) return database.sensorAccess.get_sensor_by_name_and_device_id(deviceID, sensorName)
return routes return routes
import sqlite3
from datetime import datetime
from enum import Enum from enum import Enum
from typing import Dict, List
from TheCodeLabs_BaseUtils import DefaultLogger from TheCodeLabs_BaseUtils import DefaultLogger
from logic import Constants from logic import Constants
from logic.DeviceAccess import DeviceAccess
from logic.MeasurementAccess import MeasurementAccess
from logic.SensorAccess import SensorAccess
LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME) LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME)
class FetchType(Enum):
NONE = 1
ONE = 2
ALL = 3
class Database: class Database:
TABLE_DEVICE = 'device'
TABLE_SENSOR = 'sensor'
TABLE_MEASUREMENT = 'measurement'
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
@staticmethod
def namedtuple_factory(cursor, row):
"""
Returns sqlite rows as dicts.
"""
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def __init__(self, databasePath): def __init__(self, databasePath):
self._databasePath = databasePath self._databasePath = databasePath
self.deviceAccess = DeviceAccess(databasePath)
self.sensorAccess = SensorAccess(databasePath)
self.measurementAccess = MeasurementAccess(databasePath)
self.__create_database() self.__create_database()
def __create_database(self): def __create_database(self):
LOGGER.debug('Creating database tables...') LOGGER.debug('Creating database tables...')
self.__query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_DEVICE} ( self.deviceAccess.create_table()
id INTEGER PRIMARY KEY AUTOINCREMENT, self.sensorAccess.create_table()
name TEXT NOT NULL)''', fetch_type=FetchType.NONE) self.measurementAccess.create_table()
self.__query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_SENSOR} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id INTEGER,
name TEXT NOT NULL,
type TEXT NOT NULL)''', fetch_type=FetchType.NONE)
self.__query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_MEASUREMENT} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sensor_id INTEGER,
value TEXT NOT NULL,
timestamp TEXT NOT NULL)''', fetch_type=FetchType.NONE)
def __query(self, query, *args, fetch_type=FetchType.ALL):
connection = sqlite3.connect(self._databasePath)
connection.row_factory = Database.namedtuple_factory
with connection:
cursor = connection.cursor()
try:
cursor.execute(query, args)
if fetch_type == FetchType.ONE:
return cursor.fetchone()
if fetch_type == FetchType.ALL:
return cursor.fetchall()
finally:
cursor.close()
def __get_current_datetime(self):
return datetime.strftime(datetime.now(), self.DATE_FORMAT)
def get_all_devices(self) -> List[Dict[str, str]]:
return self.__query(f'SELECT * FROM {self.TABLE_DEVICE} ORDER BY name', fetch_type=FetchType.ALL)
def get_device(self, deviceID: int) -> Dict[str, str] or None:
return self.__query(f'SELECT * FROM {self.TABLE_DEVICE} WHERE id = ?', deviceID, fetch_type=FetchType.ONE)
def get_device_by_name(self, deviceName: str) -> Dict[str, str] or None:
return self.__query(f'SELECT * FROM {self.TABLE_DEVICE} WHERE name = ?', deviceName, fetch_type=FetchType.ONE)
def add_device(self, deviceName: str):
LOGGER.debug(f'Inserting new device "{deviceName}"')
self.__query(f'INSERT INTO {self.TABLE_DEVICE}(name) VALUES(?)', deviceName, fetch_type=FetchType.NONE)
def get_all_sensors(self) -> List[Dict[str, str]]:
return self.__query(f'SELECT * FROM {self.TABLE_SENSOR} ORDER BY device_id, name', fetch_type=FetchType.ALL)
def get_all_sensors_for_device(self, deviceID: int) -> List[Dict[str, str]]:
return self.__query(f'SELECT * FROM {self.TABLE_SENSOR} WHERE device_id = ? ORDER BY name',
deviceID,
fetch_type=FetchType.ALL)
def get_sensor(self, sensorID: int) -> Dict[str, str] or None:
return self.__query(f'SELECT * FROM {self.TABLE_SENSOR} WHERE id = ?',
sensorID,
fetch_type=FetchType.ONE)
def get_sensor_by_name_and_device_id(self, deviceID: int, sensorName: str) -> Dict[str, str] or None:
return self.__query(f'SELECT * FROM {self.TABLE_SENSOR} WHERE device_id = ? AND name = ?',
deviceID, sensorName,
fetch_type=FetchType.ONE)
def add_sensor(self, deviceID: int, name: str, sensorType: str):
LOGGER.debug(f'Inserting new "{sensorType}" sensor "{name}" for device "{deviceID}"')
self.__query(f'INSERT INTO {self.TABLE_SENSOR}(name, device_id, type) '
f'VALUES(?, ?, ?)',
name, deviceID, sensorType,
fetch_type=FetchType.NONE)
def get_all_measurements(self) -> List[Dict[str, str]]:
return self.__query(f'SELECT * FROM {self.TABLE_MEASUREMENT} ORDER BY sensor_id ASC, datetime(timestamp) DESC',
fetch_type=FetchType.ALL)
def get_measurement(self, measurementID: int) -> Dict[str, str] or None:
return self.__query(f'SELECT * FROM {self.TABLE_MEASUREMENT} WHERE id = ?',
measurementID,
fetch_type=FetchType.ALL)
def get_all_measurements_for_sensor(self, sensorID: int) -> List[Dict[str, str]]:
return self.__query(f'SELECT * FROM {self.TABLE_MEASUREMENT} WHERE sensor_id = ? '
f'ORDER BY datetime(timestamp) DESC',
sensorID,
fetch_type=FetchType.ALL)
def get_latest_measurements_for_sensor(self, sensorID: int) -> Dict[str, str] or None:
return self.__query(f'SELECT * FROM {self.TABLE_MEASUREMENT} WHERE sensor_id = ? '
f'ORDER BY datetime(timestamp) DESC LIMIT 1',
sensorID,
fetch_type=FetchType.ONE)
def add_measurement(self, sensorID: int, value: str):
sensor = self.get_sensor(sensorID)
LOGGER.debug(f'Inserting new measurement for sensor "{sensor["name"]}" '
f'(value: "{value}", device_id "{sensor["device_id"]}")')
self.__query(f'INSERT INTO {self.TABLE_MEASUREMENT}(sensor_id, value, timestamp ) VALUES(?, ?, ?)',
sensorID, value, self.__get_current_datetime(),
fetch_type=FetchType.NONE)
import abc
import sqlite3
from abc import ABC
from enum import Enum
from TheCodeLabs_BaseUtils import DefaultLogger
from logic import Constants
LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME)
class FetchType(Enum):
NONE = 1
ONE = 2
ALL = 3
class DatabaseAccess(ABC):
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
@staticmethod
def namedtuple_factory(cursor, row):
"""
Returns sqlite rows as dicts.
"""
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
def __init__(self, databasePath):
self._databasePath = databasePath
@abc.abstractmethod
def create_table(self):
pass
def _query(self, query, *args, fetch_type=FetchType.ALL):
connection = sqlite3.connect(self._databasePath)
connection.row_factory = DatabaseAccess.namedtuple_factory
with connection:
cursor = connection.cursor()
try:
cursor.execute(query, args)
if fetch_type == FetchType.ONE:
return cursor.fetchone()
if fetch_type == FetchType.ALL:
return cursor.fetchall()
finally:
cursor.close()
from typing import Dict, List
from TheCodeLabs_BaseUtils import DefaultLogger
from logic import Constants
from logic.DatabaseAccess import DatabaseAccess, FetchType
LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME)
class DeviceAccess(DatabaseAccess):
TABLE_NAME = 'device'
def create_table(self):
self._query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL)''', fetch_type=FetchType.NONE)
def get_all_devices(self) -> List[Dict[str, str]]:
return self._query(f'SELECT * FROM {self.TABLE_NAME} ORDER BY name', fetch_type=FetchType.ALL)
def get_device(self, deviceID: int) -> Dict[str, str] or None:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE id = ?', deviceID, fetch_type=FetchType.ONE)
def get_device_by_name(self, deviceName: str) -> Dict[str, str] or None:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE name = ?', deviceName, fetch_type=FetchType.ONE)
def add_device(self, deviceName: str):
LOGGER.debug(f'Inserting new device "{deviceName}"')
self._query(f'INSERT INTO {self.TABLE_NAME}(name) VALUES(?)', deviceName, fetch_type=FetchType.NONE)
from datetime import datetime
from typing import Dict, List
from TheCodeLabs_BaseUtils import DefaultLogger
from logic import Constants
from logic.DatabaseAccess import DatabaseAccess, FetchType
LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME)
class MeasurementAccess(DatabaseAccess):
TABLE_NAME = 'measurement'
def create_table(self):
self._query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sensor_id INTEGER,
value TEXT NOT NULL,
timestamp TEXT NOT NULL)''', fetch_type=FetchType.NONE)
def __get_current_datetime(self):
return datetime.strftime(datetime.now(), self.DATE_FORMAT)
def get_all_measurements(self) -> List[Dict[str, str]]:
return self._query(f'SELECT * FROM {self.TABLE_NAME} ORDER BY sensor_id ASC, datetime(timestamp) DESC',
fetch_type=FetchType.ALL)
def get_measurement(self, measurementID: int) -> Dict[str, str] or None:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE id = ?',
measurementID,
fetch_type=FetchType.ALL)
def get_all_measurements_for_sensor(self, sensorID: int) -> List[Dict[str, str]]:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE sensor_id = ? '
f'ORDER BY datetime(timestamp) DESC',
sensorID,
fetch_type=FetchType.ALL)
def get_latest_measurements_for_sensor(self, sensorID: int) -> Dict[str, str] or None:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE sensor_id = ? '
f'ORDER BY datetime(timestamp) DESC LIMIT 1',
sensorID,
fetch_type=FetchType.ONE)
def add_measurement(self, sensorID: int, value: str):
LOGGER.debug(f'Inserting new measurement for sensor "{sensorID}" (value: "{value}")')
self._query(f'INSERT INTO {self.TABLE_NAME}(sensor_id, value, timestamp ) VALUES(?, ?, ?)',
sensorID, value, self.__get_current_datetime(),
fetch_type=FetchType.NONE)
from typing import Dict, List
from TheCodeLabs_BaseUtils import DefaultLogger
from logic import Constants
from logic.DatabaseAccess import DatabaseAccess, FetchType
LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME)
class SensorAccess(DatabaseAccess):
TABLE_NAME = 'sensor'
def create_table(self):
self._query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id INTEGER,
name TEXT NOT NULL,
type TEXT NOT NULL)''', fetch_type=FetchType.NONE)
def get_all_sensors(self) -> List[Dict[str, str]]:
return self._query(f'SELECT * FROM {self.TABLE_NAME} ORDER BY device_id, name', fetch_type=FetchType.ALL)
def get_all_sensors_for_device(self, deviceID: int) -> List[Dict[str, str]]:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE device_id = ? ORDER BY name',
deviceID,
fetch_type=FetchType.ALL)
def get_sensor(self, sensorID: int) -> Dict[str, str] or None:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE id = ?',
sensorID,
fetch_type=FetchType.ONE)
def get_sensor_by_name_and_device_id(self, deviceID: int, sensorName: str) -> Dict[str, str] or None:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE device_id = ? AND name = ?',
deviceID, sensorName,
fetch_type=FetchType.ONE)
def add_sensor(self, deviceID: int, name: str, sensorType: str):
LOGGER.debug(f'Inserting new "{sensorType}" sensor "{name}" for device "{deviceID}"')
self._query(f'INSERT INTO {self.TABLE_NAME}(name, device_id, type) '
f'VALUES(?, ?, ?)',
name, deviceID, sensorType,
fetch_type=FetchType.NONE)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment