diff --git a/src/logic/database/Crud.py b/src/logic/database/Crud.py index 4c6c1cecc41e9bd9da2e3a2f8cf332ad5aeed9bf..d60e7fee1870779286969fa0f4affcd5275fa559 100644 --- a/src/logic/database/Crud.py +++ b/src/logic/database/Crud.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import List +from typing import List, Set from sqlalchemy import and_ from sqlalchemy.orm import Session @@ -168,7 +168,7 @@ def delete_measurement(db: Session, measurement: Schemas.Measurement): db.commit() -def delete_multiple_measurements(db: Session, measurementIds: List[int]): +def delete_multiple_measurements(db: Session, measurementIds: Set[int]): db.query(Models.Measurement).filter(Models.Measurement.id.in_(measurementIds)).delete() db.commit() diff --git a/src/logic/database/DatabaseCleaner.py b/src/logic/database/DatabaseCleaner.py index e5be0f758144791428294679e2ada5789da04f57..fa8df25fd74b99b7bc459e73d70e104ae3439227 100644 --- a/src/logic/database/DatabaseCleaner.py +++ b/src/logic/database/DatabaseCleaner.py @@ -1,6 +1,6 @@ import logging from datetime import datetime, timedelta -from typing import List, Optional +from typing import List, Optional, Tuple, Set from sqlalchemy.orm import Session @@ -12,66 +12,82 @@ LOGGER = logging.getLogger(Constants.APP_NAME) class DatabaseCleaner: - MIN_DATETIME = datetime(year=1970, month=1, day=1, hour=0, minute=0, second=0, microsecond=0) + MIN_DATE = datetime(year=1970, month=1, day=1).date() # TODO DEBUG: - # MIN_DATETIME = datetime.now() - timedelta(days=31) + # MIN_DATE = (datetime.now() - timedelta(days=31)).date() def __init__(self, retentionPolicies: List[RetentionPolicy]): self._policies = retentionPolicies - def clean(self, db: Session, currentDateTime: datetime): + def clean(self, db: Session, currentDate: datetime.date): LOGGER.info('Performing database cleanup...') for policy in self._policies: LOGGER.debug(f'Enforcing retention policy: {policy}') - policyStart = currentDateTime - timedelta(days=policy.ageInDays) + policyStart = currentDate - timedelta(days=policy.ageInDays) + processedDate = policyStart + while processedDate > self.MIN_DATE: + LOGGER.debug(f'Cleaning {processedDate.strftime("%Y-%m-%d")}...') + measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(db, date=processedDate, + policy=policy) - affectedMeasurements = Crud.get_measurements(db=db, - startDateTime=self.MIN_DATETIME.strftime(Crud.DATE_FORMAT), - endDateTime=policyStart.strftime(Crud.DATE_FORMAT)) - LOGGER.debug(f'Found {len(affectedMeasurements)} measurements older than {policyStart}') - if not affectedMeasurements: - continue + processedDate = processedDate - timedelta(days=1) - affectedMeasurements.reverse() + if not idsToDelete: + continue - self.__delete_old_measurements(affectedMeasurements, db, policy) + LOGGER.debug(f'Scheduled {len(idsToDelete)} measurements for deletion (keeping: {len(measurementIds)}, ' + f'max allowed: {policy.numberOfMeasurementsPerDay})') + + Crud.delete_multiple_measurements(db, idsToDelete) LOGGER.info('Database cleanup done') # TODO: force backup? @staticmethod - def _get_measurements_by_day(db: Session, date: datetime.date) -> List[Schemas.Measurement]: - startTime = datetime(year=date.year, month=date.month, day=date.day, hour=0, minute=0, second=0, microsecond=0) - endTime = datetime(year=date.year, month=date.month, day=date.day, hour=23, minute=59, second=59, microsecond=0) - - return Crud.get_measurements(db=db, - startDateTime=startTime.strftime(Crud.DATE_FORMAT), - endDateTime=endTime.strftime(Crud.DATE_FORMAT)) - - def _get_closest_measurement_for_point(self, measurements: List[Schemas.Measurement], - point: datetime, - upperLimit: datetime, - lowerLimit: datetime) -> Optional[Schemas.Measurement]: - pass - - def __delete_old_measurements(self, affectedMeasurements, db, policy): - lastTimestamp = datetime.strptime(affectedMeasurements[0].timestamp, Crud.DATE_FORMAT) - nextAllowedTimestamp = lastTimestamp - timedelta(minutes=policy.resolutionInMinutes) - - measurementsIdsToDelete = [] - - for measurement in affectedMeasurements[1:]: - timestamp = datetime.strptime(measurement.timestamp, Crud.DATE_FORMAT) - if timestamp > nextAllowedTimestamp: - measurementsIdsToDelete.append(measurement.id) - else: - lastTimestamp = timestamp - nextAllowedTimestamp = lastTimestamp - timedelta(minutes=policy.resolutionInMinutes) - - LOGGER.debug( - f'Scheduled {len(measurementsIdsToDelete)} measurements for deletion (keeping {len(affectedMeasurements) - len(measurementsIdsToDelete)})') - Crud.delete_multiple_measurements(db, measurementsIdsToDelete) + def _categorize_measurements_for_day(db: Session, date: datetime.date, + policy: RetentionPolicy) -> Tuple[List[int], Set[int]]: + points = policy.determine_measurement_points(date) + + measurementIdsToKeep = [] + allMeasurementIds = set() + for index, point in enumerate(points): + previousItem = DatabaseCleaner.__get_previous_item(index, point, points) + nextItem = DatabaseCleaner.__get_next_item(index, point, points) + + possibleMeasurements = Crud.get_measurements(db, previousItem.strftime(Crud.DATE_FORMAT), + nextItem.strftime(Crud.DATE_FORMAT)) + allMeasurementIds.update([m.id for m in possibleMeasurements]) + + closestMeasurement = DatabaseCleaner._get_closest_measurement_for_point(possibleMeasurements, point) + if closestMeasurement is not None: + measurementIdsToKeep.append(closestMeasurement.id) + + return measurementIdsToKeep, {m for m in allMeasurementIds if m not in measurementIdsToKeep} + + @staticmethod + def __get_previous_item(index: int, point: datetime, points: List[datetime]) -> datetime: + if index == 0: + previousItem = point + else: + previousItem = points[index - 1] + return previousItem + + @staticmethod + def __get_next_item(index: int, point: datetime, points: List[datetime]) -> datetime: + if index == (len(points) - 1): + nextItem = datetime(year=point.year, month=point.month, day=point.day, hour=23, minute=59, second=59) + else: + nextItem = points[index + 1] + return nextItem + + @staticmethod + def _get_closest_measurement_for_point(measurements: List[Schemas.Measurement], + point: datetime) -> Optional[Schemas.Measurement]: + if not measurements: + return None + + return min(measurements, key=lambda m: abs(datetime.strptime(m.timestamp, Crud.DATE_FORMAT) - point)) diff --git a/src/test/DatabaseCleanerTest.py b/src/test/DatabaseCleanerTest.py index 3eb1b6c5854976bf422203912b85e39ce2cbb3ea..8e9e70de6d15f6a9ba13cf3ef0644f7b0b388941 100644 --- a/src/test/DatabaseCleanerTest.py +++ b/src/test/DatabaseCleanerTest.py @@ -2,9 +2,15 @@ import unittest from datetime import datetime from unittest.mock import Mock, patch +from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger + +from logic import Constants +from logic.database import Schemas from logic.database.RetentionPolicy import RetentionPolicy CURRENT_DATE_TIME = datetime(year=2021, month=8, day=18, hour=22, minute=0, second=0) +DATE_FORMAT = '%Y-%m-%d %H:%M:%S' +LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME) class TestRetentionPolicy(unittest.TestCase): @@ -12,13 +18,13 @@ class TestRetentionPolicy(unittest.TestCase): policy = RetentionPolicy(1, 10) with self.assertRaises(ValueError): - policy.determine_measurement_points(CURRENT_DATE_TIME) + policy.determine_measurement_points(CURRENT_DATE_TIME.date()) def test_determineMeasurementPoints_zero_raise(self): policy = RetentionPolicy(0, 10) with self.assertRaises(ValueError): - policy.determine_measurement_points(CURRENT_DATE_TIME) + policy.determine_measurement_points(CURRENT_DATE_TIME.date()) def test_determineMeasurementPoints_twoPoints(self): policy = RetentionPolicy(2, 10) @@ -28,7 +34,7 @@ class TestRetentionPolicy(unittest.TestCase): datetime(year=2021, month=8, day=18, hour=12, minute=0, second=0) ] - self.assertEqual(expected, policy.determine_measurement_points(CURRENT_DATE_TIME)) + self.assertEqual(expected, policy.determine_measurement_points(CURRENT_DATE_TIME.date())) def test_determineMeasurementPoints_fourPoints(self): policy = RetentionPolicy(4, 10) @@ -40,81 +46,174 @@ class TestRetentionPolicy(unittest.TestCase): datetime(year=2021, month=8, day=18, hour=18, minute=0, second=0), ] - self.assertEqual(expected, policy.determine_measurement_points(CURRENT_DATE_TIME)) + self.assertEqual(expected, policy.determine_measurement_points(CURRENT_DATE_TIME.date())) def test_determineMeasurementPoints_moreThan24Hours(self): policy = RetentionPolicy(30, 10) - result = policy.determine_measurement_points(CURRENT_DATE_TIME) + result = policy.determine_measurement_points(CURRENT_DATE_TIME.date()) self.assertEqual(30, len(result)) self.assertIn(datetime(year=2021, month=8, day=18, hour=0, minute=0, second=0), result) self.assertIn(datetime(year=2021, month=8, day=18, hour=0, minute=48, second=0), result) class TestDatabaseCleaner(unittest.TestCase): - def test_getMeasurementsByDay(self): + MEASUREMENT1 = Schemas.Measurement(id=1, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=6, minute=55, second=0).strftime(DATE_FORMAT)) + MEASUREMENT2 = Schemas.Measurement(id=2, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=13, minute=15, second=0).strftime(DATE_FORMAT)) + + MEASUREMENT3 = Schemas.Measurement(id=3, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=13, minute=45, second=0).strftime(DATE_FORMAT)) + + MEASUREMENT4 = Schemas.Measurement(id=4, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=13, minute=48, second=0).strftime(DATE_FORMAT)) + + @classmethod + def get_measurements_mocked(cls, db, startTime, endTime): + if startTime == '2021-08-18 00:00:00' and endTime == '2021-08-18 06:00:00': + return [cls.MEASUREMENT1] + if startTime == '2021-08-18 00:00:00' and endTime == '2021-08-18 12:00:00': + return [cls.MEASUREMENT1] + elif startTime == '2021-08-18 06:00:00' and endTime == '2021-08-18 18:00:00': + return [cls.MEASUREMENT1, cls.MEASUREMENT2, cls.MEASUREMENT3, cls.MEASUREMENT4] + elif startTime == '2021-08-18 12:00:00' and endTime == '2021-08-18 23:59:59': + return [cls.MEASUREMENT2, cls.MEASUREMENT3, cls.MEASUREMENT4] + elif startTime == '2021-08-18 18:00:00' and endTime == '2021-08-18 23:59:59': + return [] + else: + return [] + + def test__GetClosestMeasurementForPoint_noMeasurementInRange(self): + mockedCrud = Mock() + with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): + from logic.database.DatabaseCleaner import DatabaseCleaner + + result = DatabaseCleaner._get_closest_measurement_for_point([], CURRENT_DATE_TIME) + self.assertIsNone(result) + + def test__GetClosestMeasurementForPoint_getClosest_AllMeasurementsBeforePoint(self): + mockedCrud = Mock() + with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): + mockedCrud.DATE_FORMAT = DATE_FORMAT + + from logic.database.DatabaseCleaner import DatabaseCleaner + + expected = Schemas.Measurement(id=2, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=21, minute=55, second=0).strftime(DATE_FORMAT)) + measurements = [ + Schemas.Measurement(id=1, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=21, minute=50, second=0).strftime(DATE_FORMAT)), + expected + ] + + result = DatabaseCleaner._get_closest_measurement_for_point(measurements, CURRENT_DATE_TIME) + self.assertEqual(expected, result) + + def test__GetClosestMeasurementForPoint_getClosest_AllMeasurementsAfterPoint(self): + mockedCrud = Mock() + with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): + mockedCrud.DATE_FORMAT = DATE_FORMAT + + from logic.database.DatabaseCleaner import DatabaseCleaner + + expected = Schemas.Measurement(id=1, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=22, minute=15, second=0).strftime(DATE_FORMAT)) + measurements = [ + expected, + Schemas.Measurement(id=1, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=22, minute=30, second=0).strftime(DATE_FORMAT)) + ] + + result = DatabaseCleaner._get_closest_measurement_for_point(measurements, CURRENT_DATE_TIME) + self.assertEqual(expected, result) + + def test__GetClosestMeasurementForPoint_getClosest_BothSidesOfPoint(self): + mockedCrud = Mock() + with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): + mockedCrud.DATE_FORMAT = DATE_FORMAT + + from logic.database.DatabaseCleaner import DatabaseCleaner + + expected = Schemas.Measurement(id=1, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=21, minute=55, second=0).strftime(DATE_FORMAT)) + measurements = [ + expected, + Schemas.Measurement(id=1, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=22, minute=10, second=0).strftime(DATE_FORMAT)) + ] + + result = DatabaseCleaner._get_closest_measurement_for_point(measurements, CURRENT_DATE_TIME) + self.assertEqual(expected, result) + + def test__GetClosestMeasurementForPoint_getClosest_EqualDistance(self): + mockedCrud = Mock() + with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): + mockedCrud.DATE_FORMAT = DATE_FORMAT + + from logic.database.DatabaseCleaner import DatabaseCleaner + + expected = Schemas.Measurement(id=1, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=21, minute=55, second=0).strftime(DATE_FORMAT)) + measurements = [ + expected, + Schemas.Measurement(id=1, value=5, sensor_id=15, + timestamp=datetime(year=2021, month=8, day=18, + hour=22, minute=5, second=0).strftime(DATE_FORMAT)) + ] + + result = DatabaseCleaner._get_closest_measurement_for_point(measurements, CURRENT_DATE_TIME) + self.assertEqual(expected, result) + + def test_GetMeasurementsForDay(self): + mockedCrud = Mock() + with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): + mockedCrud.DATE_FORMAT = DATE_FORMAT + mockedCrud.get_measurements.side_effect = self.get_measurements_mocked + + from logic.database.DatabaseCleaner import DatabaseCleaner + + database = Mock() + policy = RetentionPolicy(numberOfMeasurementsPerDay=4, ageInDays=10) + measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(database, CURRENT_DATE_TIME.date(), policy) + self.assertEqual([self.MEASUREMENT1.id, self.MEASUREMENT1.id, self.MEASUREMENT2.id, self.MEASUREMENT4.id], measurementIds) + self.assertEqual({self.MEASUREMENT3.id}, idsToDelete) + + def test_noRetentionPolicies_doNothing(self): mockedCrud = Mock() with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): mockedCrud.get_measurements.return_value = [] + + database = Mock() + from logic.database.DatabaseCleaner import DatabaseCleaner + DatabaseCleaner([]).clean(database, CURRENT_DATE_TIME) + + mockedCrud.get_measurements.assert_not_called() + + def test_onePolicy_deleteMeasurements(self): + mockedCrud = Mock() + with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S' + mockedCrud.get_measurements.side_effect = self.get_measurements_mocked database = Mock() from logic.database.DatabaseCleaner import DatabaseCleaner - DatabaseCleaner([])._get_measurements_by_day(database, CURRENT_DATE_TIME) - - mockedCrud.get_measurements.assert_called_once_with(db=database, - startDateTime='2021-08-18 00:00:00', - endDateTime='2021-08-18 23:59:59') - - # def test_noRetentionPolicies_doNothing(self): - # mockedCrud = Mock() - # with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): - # mockedCrud.get_measurements.return_value = [] - # - # database = Mock() - # from logic.database.DatabaseCleaner import DatabaseCleaner - # DatabaseCleaner([]).clean(database, CURRENT_DATE_TIME) - # - # mockedCrud.get_measurements.assert_not_called() - # - # def test_onePolicy_fetchMeasurementsOlderThanPolicyStart(self): - # mockedCrud = Mock() - # with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): - # mockedCrud.get_measurements.return_value = [] - # mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S' - # - # database = Mock() - # from logic.database.DatabaseCleaner import DatabaseCleaner - # from logic.database.DatabaseCleaner import RetentionPolicy - # - # policy = RetentionPolicy(numberOfMeasurementsPerDay=4, ageInDays=1) - # DatabaseCleaner([policy]).clean(database, CURRENT_DATE_TIME) - # - # expectedEndTime = CURRENT_DATE_TIME - timedelta(days=policy.ageInDays) - # mockedCrud.get_measurements.assert_called_once_with(db=database, - # startDateTime=DatabaseCleaner.MIN_DATETIME.strftime(mockedCrud.DATE_FORMAT), - # endDateTime=expectedEndTime.strftime(mockedCrud.DATE_FORMAT)) - # - # def test_onePolicy_deleteMeasurements(self): - # mockedCrud = Mock() - # with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}): - # - # measurementToBeDeleted1 = Schemas.Measurement(id=1, value='5', timestamp='2021-08-17 20:05:00', sensor_id=2) - # measurementToKeep = Schemas.Measurement(id=2, value='5', timestamp='2021-08-17 20:09:12', sensor_id=2) - # measurementToBeDeleted2 = Schemas.Measurement(id=3, value='5', timestamp='2021-08-17 21:07:12', sensor_id=2) - # measurementToBeDeleted3 = Schemas.Measurement(id=4, value='5', timestamp='2021-08-17 21:09:12', sensor_id=2) - # measurementAfterPolicyStart = Schemas.Measurement(id=5, value='5', timestamp='2021-08-17 21:10:12', sensor_id=2) - # - # mockedCrud.get_measurements.return_value = [measurementToBeDeleted1, measurementToKeep, measurementToBeDeleted2, measurementToBeDeleted3, measurementAfterPolicyStart] - # mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S' - # - # database = Mock() - # from logic.database.DatabaseCleaner import DatabaseCleaner - # from logic.database.DatabaseCleaner import RetentionPolicy - # - # policy = RetentionPolicy(resolutionInMinutes=10, ageInDays=1) - # DatabaseCleaner([policy]).clean(database, self.CURRENT_DATE_TIME) - # - # mockedCrud.delete_multiple_measurements.assert_called_once_with(database, [4, 3, 1]) - # - # # TODO: test: if cleanup is performed on the next day: prevent deletion of already low resolution measurements + from logic.database.DatabaseCleaner import RetentionPolicy + + policy = RetentionPolicy(numberOfMeasurementsPerDay=4, ageInDays=1) + DatabaseCleaner([policy]).clean(database, datetime(year=2021, month=8, day=19).date()) + + mockedCrud.delete_multiple_measurements.assert_called_once_with(database, {3}) + + # TODO: test: multiple policies