Skip to content
Snippets Groups Projects
Commit 6af7341e authored by Robert Goldmann's avatar Robert Goldmann
Browse files

coding style: set type hints

parent 071c37eb
No related branches found
No related tags found
No related merge requests found
from datetime import datetime from datetime import datetime
from typing import List
from sqlalchemy import and_ from sqlalchemy import and_
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
...@@ -27,7 +28,7 @@ def notify_backup_service(backupService: BackupService): ...@@ -27,7 +28,7 @@ def notify_backup_service(backupService: BackupService):
# ===== devices ===== # ===== devices =====
def get_devices(db: Session, skip: int = 0, limit: int = 100): def get_devices(db: Session, skip: int = 0, limit: int = 100) -> List[Models.Device]:
return db.query(Models.Device).offset(skip).limit(limit).all() return db.query(Models.Device).offset(skip).limit(limit).all()
...@@ -35,12 +36,12 @@ def get_device(db: Session, deviceId: int) -> Models.Device: ...@@ -35,12 +36,12 @@ def get_device(db: Session, deviceId: int) -> Models.Device:
return db.query(Models.Device).filter(Models.Device.id == deviceId).first() return db.query(Models.Device).filter(Models.Device.id == deviceId).first()
def get_device_by_name(db: Session, name: str): def get_device_by_name(db: Session, name: str) -> Models.Device:
return db.query(Models.Device).filter(Models.Device.name == name).first() return db.query(Models.Device).filter(Models.Device.name == name).first()
@notify_backup_service(BACKUP_SERVICE) @notify_backup_service(BACKUP_SERVICE)
def create_device(db: Session, device: Schemas.DeviceCreate): def create_device(db: Session, device: Schemas.DeviceCreate) -> Models.Device:
dbDevice = Models.Device(name=device.name) dbDevice = Models.Device(name=device.name)
db.add(dbDevice) db.add(dbDevice)
db.commit() db.commit()
...@@ -49,7 +50,7 @@ def create_device(db: Session, device: Schemas.DeviceCreate): ...@@ -49,7 +50,7 @@ def create_device(db: Session, device: Schemas.DeviceCreate):
@notify_backup_service(BACKUP_SERVICE) @notify_backup_service(BACKUP_SERVICE)
def update_device(db: Session, deviceId: int, device: Schemas.DeviceCreate): def update_device(db: Session, deviceId: int, device: Schemas.DeviceCreate) -> Models.Device:
existingDevice = get_device(db, deviceId) existingDevice = get_device(db, deviceId)
existingDevice.name = device.name existingDevice.name = device.name
db.commit() db.commit()
...@@ -65,21 +66,21 @@ def delete_device(db: Session, device: Schemas.Device): ...@@ -65,21 +66,21 @@ def delete_device(db: Session, device: Schemas.Device):
# ===== sensors ===== # ===== sensors =====
def get_sensors(db: Session, skip: int = 0, limit: int = 100): def get_sensors(db: Session, skip: int = 0, limit: int = 100) -> List[Models.Sensor]:
return db.query(Models.Sensor).offset(skip).limit(limit).all() return db.query(Models.Sensor).offset(skip).limit(limit).all()
def get_sensor(db: Session, sensorId: int): def get_sensor(db: Session, sensorId: int) -> Models.Sensor:
return db.query(Models.Sensor).filter(Models.Sensor.id == sensorId).first() return db.query(Models.Sensor).filter(Models.Sensor.id == sensorId).first()
def get_sensor_by_name_and_device_id(db: Session, sensorName: str, deviceId: int): def get_sensor_by_name_and_device_id(db: Session, sensorName: str, deviceId: int) -> Models.Sensor:
return db.query(Models.Sensor).filter( return db.query(Models.Sensor).filter(and_(Models.Sensor.name == sensorName,
Models.Sensor.name == sensorName and Models.Sensor.deviceId == deviceId).first() Models.Sensor.deviceId == deviceId)).first()
@notify_backup_service(BACKUP_SERVICE) @notify_backup_service(BACKUP_SERVICE)
def create_sensor(db: Session, sensor: Schemas.SensorCreate): def create_sensor(db: Session, sensor: Schemas.SensorCreate) -> Models.Sensor:
dbSensor = Models.Sensor(**sensor.dict()) dbSensor = Models.Sensor(**sensor.dict())
db.add(dbSensor) db.add(dbSensor)
db.commit() db.commit()
...@@ -88,7 +89,7 @@ def create_sensor(db: Session, sensor: Schemas.SensorCreate): ...@@ -88,7 +89,7 @@ def create_sensor(db: Session, sensor: Schemas.SensorCreate):
@notify_backup_service(BACKUP_SERVICE) @notify_backup_service(BACKUP_SERVICE)
def update_sensor(db: Session, sensorId: int, sensor: Schemas.SensorUpdate): def update_sensor(db: Session, sensorId: int, sensor: Schemas.SensorUpdate) -> Models.Sensor:
existingSensor = get_sensor(db, sensorId) existingSensor = get_sensor(db, sensorId)
existingSensor.name = sensor.name existingSensor.name = sensor.name
existingSensor.type = sensor.type existingSensor.type = sensor.type
...@@ -105,7 +106,7 @@ def delete_sensor(db: Session, sensor: Schemas.Sensor): ...@@ -105,7 +106,7 @@ def delete_sensor(db: Session, sensor: Schemas.Sensor):
# ===== measurements ===== # ===== measurements =====
def get_measurements(db: Session, startDateTime: str, endDateTime: str): def get_measurements(db: Session, startDateTime: str, endDateTime: str) -> List[Models.Measurement]:
if startDateTime and endDateTime: if startDateTime and endDateTime:
return db.query(Models.Measurement).filter(and_(startDateTime <= Models.Measurement.timestamp, return db.query(Models.Measurement).filter(and_(startDateTime <= Models.Measurement.timestamp,
endDateTime >= Models.Measurement.timestamp)).all() endDateTime >= Models.Measurement.timestamp)).all()
...@@ -113,7 +114,8 @@ def get_measurements(db: Session, startDateTime: str, endDateTime: str): ...@@ -113,7 +114,8 @@ def get_measurements(db: Session, startDateTime: str, endDateTime: str):
return db.query(Models.Measurement).all() return db.query(Models.Measurement).all()
def get_measurements_for_sensor(db: Session, startDateTime: str, endDateTime: str, sensorId: int): def get_measurements_for_sensor(db: Session, startDateTime: str,
endDateTime: str, sensorId: int) -> List[Models.Measurement]:
if startDateTime and endDateTime: if startDateTime and endDateTime:
return db.query(Models.Measurement).filter(and_(startDateTime <= Models.Measurement.timestamp, return db.query(Models.Measurement).filter(and_(startDateTime <= Models.Measurement.timestamp,
endDateTime >= Models.Measurement.timestamp, endDateTime >= Models.Measurement.timestamp,
...@@ -122,19 +124,19 @@ def get_measurements_for_sensor(db: Session, startDateTime: str, endDateTime: st ...@@ -122,19 +124,19 @@ def get_measurements_for_sensor(db: Session, startDateTime: str, endDateTime: st
return db.query(Models.Measurement).filter(Models.Measurement.sensorId == sensorId).all() return db.query(Models.Measurement).filter(Models.Measurement.sensorId == sensorId).all()
def get_latest_measurement_for_sensor(db: Session, sensorId: int): def get_latest_measurement_for_sensor(db: Session, sensorId: int) -> Models.Measurement:
return db.query(Models.Measurement) \ return db.query(Models.Measurement) \
.filter(Models.Measurement.sensorId == sensorId) \ .filter(Models.Measurement.sensorId == sensorId) \
.order_by(Models.Measurement.timestamp.desc()) \ .order_by(Models.Measurement.timestamp.desc()) \
.first() .first()
def get_measurement(db: Session, measurementId: int): def get_measurement(db: Session, measurementId: int) -> Models.Measurement:
return db.query(Models.Measurement).filter(Models.Measurement.id == measurementId).first() return db.query(Models.Measurement).filter(Models.Measurement.id == measurementId).first()
@notify_backup_service(BACKUP_SERVICE) @notify_backup_service(BACKUP_SERVICE)
def create_measurement(db: Session, measurement: Schemas.MeasurementCreate): def create_measurement(db: Session, measurement: Schemas.MeasurementCreate) -> Models.Measurement:
dbMeasurement = Models.Measurement(**measurement.dict(), timestamp=__get_current_datetime()) dbMeasurement = Models.Measurement(**measurement.dict(), timestamp=__get_current_datetime())
db.add(dbMeasurement) db.add(dbMeasurement)
db.commit() db.commit()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment