Skip to content
Snippets Groups Projects
Select Git revision
  • b7b7f397b60939fe3bb50b51174c12488a165a55
  • master default
  • v2.21.0
  • v2.20.1
  • v2.20.0
  • v2.19.0
  • v2.18.1
  • v2.18.0
  • v2.17.0
  • v2.16.0
  • v2.15.0
  • v2.14.0
  • v2.13.1
  • v2.13.0
  • v2.12.0
  • v2.11.0
  • v2.10.0
  • v2.9.0
  • v2.8.0
  • v2.7.0
  • v2.6.0
  • v2.5.0
22 results

Models.py

Blame
  • DatabaseCleaner.py 5.01 KiB
    import logging
    from datetime import datetime, timedelta
    from typing import List, Optional, Tuple, Set
    
    from sqlalchemy.orm import Session
    
    from logic import Constants
    from logic.database import Crud, Schemas
    from logic.database.RetentionPolicy import RetentionPolicy
    
    LOGGER = logging.getLogger(Constants.APP_NAME)
    
    
    class DatabaseCleaner:
        MIN_DATE = datetime(year=1970, month=1, day=1).date()
    
        def __init__(self, retentionPolicies: List[RetentionPolicy], forceBackupAfterCleanup: bool):
            self._policies = retentionPolicies
            self._forceBackupAfterCleanup = forceBackupAfterCleanup
    
        def clean(self, db: Session, currentDate: datetime.date):
            LOGGER.info('Performing database cleanup...')
    
            for policy in self._policies:
                LOGGER.debug(f'Enforcing retention policy: {policy}')
    
                policyStart = currentDate - timedelta(days=policy.ageInDays)
    
                allSensors = Crud.get_sensors(db, skip=0, limit=1000000)
                for sensor in allSensors:
                    LOGGER.debug(f'Cleaning measurements for sensor "{sensor.name}" '
                                 f'(id: {sensor.id}, device_id: {sensor.device_id})')
                    self._cleanup_measurements_for_sensor(sensor, db, policy, policyStart)
    
            LOGGER.info('Database cleanup done')
    
            if self._forceBackupAfterCleanup:
                Crud.BACKUP_SERVICE.backup()
    
        @staticmethod
        def _cleanup_measurements_for_sensor(sensor: Schemas.Sensor, db: Session,
                                             policy: RetentionPolicy, policyStart: datetime.date):
            firstMeasurement = Crud.get_first_measurement_for_sensor(db=db, sensorId=sensor.id)
            if firstMeasurement is None:
                return
    
            minDate = datetime.strptime(firstMeasurement.timestamp, Crud.DATE_FORMAT).date()
    
            processedDate = policyStart
            while processedDate > minDate:
                LOGGER.debug(f'Cleaning {processedDate.strftime("%Y-%m-%d")}...')
                DatabaseCleaner._cleanup_measurements_for_day(db, processedDate, policy, sensor.id)
                processedDate = processedDate - timedelta(days=1)
    
        @staticmethod
        def _cleanup_measurements_for_day(db: Session, date: datetime.date,
                                          policy: RetentionPolicy, sensor_id: int):
            measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(db,
                                                                                           date=date,
                                                                                           policy=policy,
                                                                                           sensorId=sensor_id)
    
            if not idsToDelete:
                return
    
            LOGGER.debug(f'Scheduled {len(idsToDelete)} measurements for deletion '
                         f'(keeping: {len(measurementIds)}, max allowed: {policy.numberOfMeasurementsPerDay})')
            Crud.delete_multiple_measurements(db, idsToDelete)
    
        @staticmethod
        def _categorize_measurements_for_day(db: Session, date: datetime.date,
                                             policy: RetentionPolicy, sensorId: int) -> Tuple[List[int], Set[int]]:
            points = policy.determine_measurement_points(date)
    
            measurementIdsToKeep = []
            allMeasurementIds = set()
            for index, point in enumerate(points):
                previousItem = DatabaseCleaner.__get_previous_item(index, point, points)
                nextItem = DatabaseCleaner.__get_next_item(index, point, points)
    
                possibleMeasurements = Crud.get_measurements_for_sensor(db, previousItem.strftime(Crud.DATE_FORMAT),
                                                                        nextItem.strftime(Crud.DATE_FORMAT), sensorId)
    
                allMeasurementIds.update([m.id for m in possibleMeasurements])
    
                closestMeasurement = DatabaseCleaner._get_closest_measurement_for_point(possibleMeasurements, point)
                if closestMeasurement is not None:
                    measurementIdsToKeep.append(closestMeasurement.id)
    
            return measurementIdsToKeep, {m for m in allMeasurementIds if m not in measurementIdsToKeep}
    
        @staticmethod
        def __get_previous_item(index: int, point: datetime, points: List[datetime]) -> datetime:
            if index == 0:
                previousItem = point
            else:
                previousItem = points[index - 1]
            return previousItem
    
        @staticmethod
        def __get_next_item(index: int, point: datetime, points: List[datetime]) -> datetime:
            if index == (len(points) - 1):
                nextItem = datetime(year=point.year, month=point.month, day=point.day, hour=23, minute=59, second=59)
            else:
                nextItem = points[index + 1]
            return nextItem
    
        @staticmethod
        def _get_closest_measurement_for_point(measurements: List[Schemas.Measurement],
                                               point: datetime) -> Optional[Schemas.Measurement]:
            if not measurements:
                return None
    
            return min(measurements, key=lambda m: abs(datetime.strptime(m.timestamp, Crud.DATE_FORMAT) - point))