Newer
Older
from typing import List, Optional, Tuple, Set
from sqlalchemy.orm import Session
from logic import Constants

Robert Goldmann
committed
from logic.database import Crud, Schemas
from logic.database.RetentionPolicy import RetentionPolicy
LOGGER = logging.getLogger(Constants.APP_NAME)
class DatabaseCleaner:
MIN_DATE = datetime(year=1970, month=1, day=1).date()
def __init__(self, retentionPolicies: List[RetentionPolicy], forceBackupAfterCleanup: bool):
self._forceBackupAfterCleanup = forceBackupAfterCleanup
def clean(self, db: Session, currentDate: datetime.date):
LOGGER.info('Performing database cleanup...')
for policy in self._policies:
LOGGER.debug(f'Enforcing retention policy: {policy}')
policyStart = currentDate - timedelta(days=policy.ageInDays)
allSensors = Crud.get_sensors(db, skip=0, limit=1000000)
for sensor in allSensors:
LOGGER.debug(f'Cleaning measurements for sensor "{sensor.name}" '
f'(id: {sensor.id}, device_id: {sensor.device_id})')
self._cleanup_measurements_for_sensor(sensor, db, policy, policyStart)
if self._forceBackupAfterCleanup:
Crud.BACKUP_SERVICE.backup()
@staticmethod
def _cleanup_measurements_for_sensor(sensor: Schemas.Sensor, db: Session,
policy: RetentionPolicy, policyStart: datetime.date):
firstMeasurement = Crud.get_first_measurement_for_sensor(db=db, sensorId=sensor.id)
if firstMeasurement is None:
return
minDate = datetime.strptime(firstMeasurement.timestamp, Crud.DATE_FORMAT).date()
processedDate = policyStart
while processedDate > minDate:
LOGGER.debug(f'Cleaning {processedDate.strftime("%Y-%m-%d")}...')
DatabaseCleaner._cleanup_measurements_for_day(db, processedDate, policy, sensor.id)
processedDate = processedDate - timedelta(days=1)
@staticmethod
def _cleanup_measurements_for_day(db: Session, date: datetime.date,
policy: RetentionPolicy, sensor_id: int):
measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(db,
date=date,
policy=policy,
sensorId=sensor_id)
if not idsToDelete:
return
LOGGER.debug(f'Scheduled {len(idsToDelete)} measurements for deletion '
f'(keeping: {len(measurementIds)}, max allowed: {policy.numberOfMeasurementsPerDay})')
Crud.delete_multiple_measurements(db, idsToDelete)

Robert Goldmann
committed
@staticmethod
def _categorize_measurements_for_day(db: Session, date: datetime.date,
policy: RetentionPolicy, sensorId: int) -> Tuple[List[int], Set[int]]:
points = policy.determine_measurement_points(date)
measurementIdsToKeep = []
allMeasurementIds = set()
for index, point in enumerate(points):
previousItem = DatabaseCleaner.__get_previous_item(index, point, points)
nextItem = DatabaseCleaner.__get_next_item(index, point, points)
possibleMeasurements = Crud.get_measurements_for_sensor(db, previousItem.strftime(Crud.DATE_FORMAT),
nextItem.strftime(Crud.DATE_FORMAT), sensorId)
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
allMeasurementIds.update([m.id for m in possibleMeasurements])
closestMeasurement = DatabaseCleaner._get_closest_measurement_for_point(possibleMeasurements, point)
if closestMeasurement is not None:
measurementIdsToKeep.append(closestMeasurement.id)
return measurementIdsToKeep, {m for m in allMeasurementIds if m not in measurementIdsToKeep}
@staticmethod
def __get_previous_item(index: int, point: datetime, points: List[datetime]) -> datetime:
if index == 0:
previousItem = point
else:
previousItem = points[index - 1]
return previousItem
@staticmethod
def __get_next_item(index: int, point: datetime, points: List[datetime]) -> datetime:
if index == (len(points) - 1):
nextItem = datetime(year=point.year, month=point.month, day=point.day, hour=23, minute=59, second=59)
else:
nextItem = points[index + 1]
return nextItem
@staticmethod
def _get_closest_measurement_for_point(measurements: List[Schemas.Measurement],
point: datetime) -> Optional[Schemas.Measurement]:
if not measurements:
return None
return min(measurements, key=lambda m: abs(datetime.strptime(m.timestamp, Crud.DATE_FORMAT) - point))