Skip to content
Snippets Groups Projects
MeasurementRouter.py 4.64 KiB
Newer Older
  • Learn to ignore specific revisions
  • from typing import List
    
    
    from fastapi import APIRouter, HTTPException, Depends, Query
    
    from sqlalchemy.orm import Session
    
    
    from logic.Dependencies import get_database, check_api_key, START_DATE_TIME, END_DATE_TIME
    
    from logic.database import Schemas, Crud
    from logic.database.Schemas import Status, MinMax
    
    router = APIRouter(tags=['measurement'])
    
    @router.get('/measurement', response_model=List[Schemas.Measurement],
    
                summary='Gets all measurements',
                description='Number of results can be limited by specifying a date range')
    async def read_measurements(startDateTime: str = START_DATE_TIME,
                                endDateTime: str = END_DATE_TIME,
    
                                db: Session = Depends(get_database)):
        return Crud.get_measurements(db, startDateTime=startDateTime, endDateTime=endDateTime)
    
    @router.get('/measurement/{measurementId}', response_model=Schemas.Measurement,
    
                summary='Gets a specific measurement',
                responses={404: {'description': 'Measurement not found'}})
    async def read_measurement(measurementId: int, db: Session = Depends(get_database)):
        measurement = Crud.get_measurement(db, measurementId=measurementId)
        if measurement is None:
            raise HTTPException(status_code=404, detail='Measurement not found')
        return measurement
    
    
    
    @router.post('/measurement/', response_model=Schemas.Measurement,
    
                 summary='Adds a new measurement',
    
                 responses={404: {'description': 'No sensor with id "{measurement.sensor_id}" existing'}},
    
                 dependencies=[Depends(check_api_key)])
    async def create_measurement(measurement: Schemas.MeasurementCreate, db: Session = Depends(get_database)):
    
        existingSensor = Crud.get_sensor(db, measurement.sensor_id)
    
        if not existingSensor:
    
            raise HTTPException(status_code=404, detail=f'No sensor with id "{measurement.sensor_id}" existing')
    
    
        return Crud.create_measurement(db=db, measurement=measurement)
    
    
    
    @router.delete('/measurement/{measurementId}', response_model=Status,
    
                   summary='Deletes a specific measurementId',
                   responses={404: {'description': 'Measurement not found'}},
                   dependencies=[Depends(check_api_key)])
    async def delete_measurement(measurementId: int, db: Session = Depends(get_database)):
        measurement = Crud.get_measurement(db, measurementId=measurementId)
        if measurement is None:
            raise HTTPException(status_code=404, detail='Measurement not found')
    
        Crud.delete_measurement(db, measurement)
        return Status(message=f'Deleted measurement {measurement.id}')
    
    
    
    @router.get('/measurements/minMax', response_model=Schemas.MinMax,
                summary='Gets the minimum and maximum values for the given sensor ids',
                description='Number of checked values can be limited by specifying a date range')
    async def get_min_and_max_for_sensor_ids(sensorIds: List[int] = Query(None),
                                             startDateTime: str = START_DATE_TIME,
                                             endDateTime: str = END_DATE_TIME,
                                             db: Session = Depends(get_database)):
        values = []
        for sensorId in sensorIds:
            measurementsForSensor = Crud.get_measurements_for_sensor(db, startDateTime, endDateTime, sensorId)
            for measurement in measurementsForSensor:
                values.append(float(measurement.value))
    
        if values:
            return MinMax(min=min(values), max=max(values))
    
    
    
    
    @router.post('/measurements/', response_model=Schemas.Status,
                 summary='Adds multiple measurements',
                 description='Non-existent device and sensors will be created automatically',
                 dependencies=[Depends(check_api_key)])
    async def create_multiple_measurements(measurementsToAdd: Schemas.MultipleMeasurements,
                                           db: Session = Depends(get_database)):
        existingDevice = Crud.get_device_by_name(db, measurementsToAdd.deviceName)
        if not existingDevice:
            existingDevice = Crud.create_device(db, Schemas.DeviceCreate(name=measurementsToAdd.deviceName))
    
        for sensor in measurementsToAdd.sensors:
            existingSensor = Crud.get_sensor_by_name_and_device_id(db, sensor.name, existingDevice.id)
            if not existingSensor:
                existingSensor = Crud.create_sensor(db, Schemas.SensorCreate(name=sensor.name,
                                                                             type=sensor.type,
    
                                                                             device_id=existingDevice.id))
            Crud.create_measurement(db, Schemas.MeasurementCreate(value=sensor.value, sensor_id=existingSensor.id))