Skip to content
Snippets Groups Projects
Commit e44f1e1e authored by Robert Goldmann's avatar Robert Goldmann
Browse files

added route to retrieve latest measurement of a specific sensor

parent e7e15dd4
No related branches found
No related tags found
No related merge requests found
import logging
from datetime import datetime
from typing import Dict, List
from typing import Dict
from logic import Constants
from logic.database.DatabaseAccess import DatabaseAccess, FetchType
......@@ -9,50 +8,8 @@ LOGGER = logging.getLogger(Constants.APP_NAME)
class MeasurementAccess(DatabaseAccess):
TABLE_NAME = 'measurement'
def create_table(self):
self._query(f'''CREATE TABLE IF NOT EXISTS {self.TABLE_NAME} (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sensor_id INTEGER,
value TEXT NOT NULL,
timestamp TEXT NOT NULL)''', fetch_type=FetchType.CREATE)
def __get_current_datetime(self):
return datetime.strftime(datetime.now(), self.DATE_FORMAT)
def get_all_measurements(self, startDateTime: str, endDateTime: str) -> List[Dict[str, str]]:
if startDateTime and endDateTime:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE '
f'DATETIME(timestamp) BETWEEN DATETIME(?) AND DATETIME(?) '
f'ORDER BY sensor_id ASC, datetime(timestamp) DESC',
startDateTime,
endDateTime,
fetch_type=FetchType.ALL)
return self._query(f'SELECT * FROM {self.TABLE_NAME} ORDER BY sensor_id ASC, '
f'datetime(timestamp) DESC',
fetch_type=FetchType.ALL)
def get_all_measurements_for_sensor(self, sensorID: int,
startDateTime: str,
endDateTime: str) -> List[Dict[str, str]]:
if startDateTime and endDateTime:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE sensor_id = ? '
f'AND DATETIME(timestamp) BETWEEN DATETIME(?) AND DATETIME(?) '
f'ORDER BY datetime(timestamp) DESC',
sensorID,
startDateTime,
endDateTime,
fetch_type=FetchType.ALL)
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE sensor_id = ? ORDER BY datetime(timestamp) DESC',
sensorID,
fetch_type=FetchType.ALL)
def get_latest_measurements_for_sensor(self, sensorID: int) -> Dict[str, str] or None:
return self._query(f'SELECT * FROM {self.TABLE_NAME} WHERE sensor_id = ? '
f'ORDER BY datetime(timestamp) DESC LIMIT 1',
sensorID,
fetch_type=FetchType.ONE)
......@@ -83,6 +83,13 @@ def get_measurements_for_sensor(db: Session, startDateTime: str, endDateTime: st
return db.query(Models.Measurement).filter(Models.Measurement.sensorId == sensorId).all()
def get_latest_measurement_for_sensor(db: Session, sensorId: int):
return db.query(Models.Measurement) \
.filter(Models.Measurement.sensorId == sensorId) \
.order_by(Models.Measurement.timestamp.desc()) \
.first()
def get_measurement(db: Session, measurementId: int):
return db.query(Models.Measurement).filter(Models.Measurement.id == measurementId).first()
......
......@@ -75,3 +75,13 @@ async def get_sensor_measurements(sensorId: int,
if sensor is None:
raise HTTPException(status_code=404, detail='Sensor not found')
return Crud.get_measurements_for_sensor(db, startDateTime, endDateTime, sensorId)
@router.get('/{sensorId}/measurements/latest', response_model=Schemas.Measurement,
summary='Gets the latest measurement for a specific sensor',
responses={404: {'description': 'Sensor not found'}})
async def get_latest_measurements_for_sensor(sensorId: int, db: Session = Depends(get_database)):
sensor = Crud.get_sensor(db, sensorId=sensorId)
if sensor is None:
raise HTTPException(status_code=404, detail='Sensor not found')
return Crud.get_latest_measurement_for_sensor(db, sensorId)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment