Select Git revision
-
Robert Goldmann authoredRobert Goldmann authored
DatabaseCleaner.py 5.01 KiB
import logging
from datetime import datetime, timedelta
from typing import List, Optional, Tuple, Set
from sqlalchemy.orm import Session
from logic import Constants
from logic.database import Crud, Schemas
from logic.database.RetentionPolicy import RetentionPolicy
LOGGER = logging.getLogger(Constants.APP_NAME)
class DatabaseCleaner:
MIN_DATE = datetime(year=1970, month=1, day=1).date()
def __init__(self, retentionPolicies: List[RetentionPolicy], forceBackupAfterCleanup: bool):
self._policies = retentionPolicies
self._forceBackupAfterCleanup = forceBackupAfterCleanup
def clean(self, db: Session, currentDate: datetime.date):
LOGGER.info('Performing database cleanup...')
for policy in self._policies:
LOGGER.debug(f'Enforcing retention policy: {policy}')
policyStart = currentDate - timedelta(days=policy.ageInDays)
allSensors = Crud.get_sensors(db, skip=0, limit=1000000)
for sensor in allSensors:
LOGGER.debug(f'Cleaning measurements for sensor "{sensor.name}" '
f'(id: {sensor.id}, device_id: {sensor.device_id})')
self._cleanup_measurements_for_sensor(sensor, db, policy, policyStart)
LOGGER.info('Database cleanup done')
if self._forceBackupAfterCleanup:
Crud.BACKUP_SERVICE.backup()
@staticmethod
def _cleanup_measurements_for_sensor(sensor: Schemas.Sensor, db: Session,
policy: RetentionPolicy, policyStart: datetime.date):
firstMeasurement = Crud.get_first_measurement_for_sensor(db=db, sensorId=sensor.id)
if firstMeasurement is None:
return
minDate = datetime.strptime(firstMeasurement.timestamp, Crud.DATE_FORMAT).date()
processedDate = policyStart
while processedDate > minDate:
LOGGER.debug(f'Cleaning {processedDate.strftime("%Y-%m-%d")}...')
DatabaseCleaner._cleanup_measurements_for_day(db, processedDate, policy, sensor.id)
processedDate = processedDate - timedelta(days=1)
@staticmethod
def _cleanup_measurements_for_day(db: Session, date: datetime.date,
policy: RetentionPolicy, sensor_id: int):
measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(db,
date=date,
policy=policy,
sensorId=sensor_id)
if not idsToDelete:
return
LOGGER.debug(f'Scheduled {len(idsToDelete)} measurements for deletion '
f'(keeping: {len(measurementIds)}, max allowed: {policy.numberOfMeasurementsPerDay})')
Crud.delete_multiple_measurements(db, idsToDelete)
@staticmethod
def _categorize_measurements_for_day(db: Session, date: datetime.date,
policy: RetentionPolicy, sensorId: int) -> Tuple[List[int], Set[int]]:
points = policy.determine_measurement_points(date)
measurementIdsToKeep = []
allMeasurementIds = set()
for index, point in enumerate(points):
previousItem = DatabaseCleaner.__get_previous_item(index, point, points)
nextItem = DatabaseCleaner.__get_next_item(index, point, points)
possibleMeasurements = Crud.get_measurements_for_sensor(db, previousItem.strftime(Crud.DATE_FORMAT),
nextItem.strftime(Crud.DATE_FORMAT), sensorId)
allMeasurementIds.update([m.id for m in possibleMeasurements])
closestMeasurement = DatabaseCleaner._get_closest_measurement_for_point(possibleMeasurements, point)
if closestMeasurement is not None:
measurementIdsToKeep.append(closestMeasurement.id)
return measurementIdsToKeep, {m for m in allMeasurementIds if m not in measurementIdsToKeep}
@staticmethod
def __get_previous_item(index: int, point: datetime, points: List[datetime]) -> datetime:
if index == 0:
previousItem = point
else:
previousItem = points[index - 1]
return previousItem
@staticmethod
def __get_next_item(index: int, point: datetime, points: List[datetime]) -> datetime:
if index == (len(points) - 1):
nextItem = datetime(year=point.year, month=point.month, day=point.day, hour=23, minute=59, second=59)
else:
nextItem = points[index + 1]
return nextItem
@staticmethod
def _get_closest_measurement_for_point(measurements: List[Schemas.Measurement],
point: datetime) -> Optional[Schemas.Measurement]:
if not measurements:
return None
return min(measurements, key=lambda m: abs(datetime.strptime(m.timestamp, Crud.DATE_FORMAT) - point))