From 5b13aa51d1293f015c7c5140f89e5ac731fec2a2 Mon Sep 17 00:00:00 2001
From: Robert Goldmann <deadlocker@gmx.de>
Date: Sun, 22 Aug 2021 18:49:13 +0200
Subject: [PATCH] #9 - determine closest measurement for each policy range

---
 src/logic/database/Crud.py            |   4 +-
 src/logic/database/DatabaseCleaner.py | 104 +++++++-----
 src/test/DatabaseCleanerTest.py       | 227 ++++++++++++++++++--------
 3 files changed, 225 insertions(+), 110 deletions(-)

diff --git a/src/logic/database/Crud.py b/src/logic/database/Crud.py
index 4c6c1ce..d60e7fe 100644
--- a/src/logic/database/Crud.py
+++ b/src/logic/database/Crud.py
@@ -1,5 +1,5 @@
 from datetime import datetime
-from typing import List
+from typing import List, Set
 
 from sqlalchemy import and_
 from sqlalchemy.orm import Session
@@ -168,7 +168,7 @@ def delete_measurement(db: Session, measurement: Schemas.Measurement):
     db.commit()
 
 
-def delete_multiple_measurements(db: Session, measurementIds: List[int]):
+def delete_multiple_measurements(db: Session, measurementIds: Set[int]):
     db.query(Models.Measurement).filter(Models.Measurement.id.in_(measurementIds)).delete()
     db.commit()
 
diff --git a/src/logic/database/DatabaseCleaner.py b/src/logic/database/DatabaseCleaner.py
index e5be0f7..fa8df25 100644
--- a/src/logic/database/DatabaseCleaner.py
+++ b/src/logic/database/DatabaseCleaner.py
@@ -1,6 +1,6 @@
 import logging
 from datetime import datetime, timedelta
-from typing import List, Optional
+from typing import List, Optional, Tuple, Set
 
 from sqlalchemy.orm import Session
 
@@ -12,66 +12,82 @@ LOGGER = logging.getLogger(Constants.APP_NAME)
 
 
 class DatabaseCleaner:
-    MIN_DATETIME = datetime(year=1970, month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
+    MIN_DATE = datetime(year=1970, month=1, day=1).date()
 
     # TODO DEBUG:
-    # MIN_DATETIME = datetime.now() - timedelta(days=31)
+    # MIN_DATE = (datetime.now() - timedelta(days=31)).date()
 
     def __init__(self, retentionPolicies: List[RetentionPolicy]):
         self._policies = retentionPolicies
 
-    def clean(self, db: Session, currentDateTime: datetime):
+    def clean(self, db: Session, currentDate: datetime.date):
         LOGGER.info('Performing database cleanup...')
 
         for policy in self._policies:
             LOGGER.debug(f'Enforcing retention policy: {policy}')
 
-            policyStart = currentDateTime - timedelta(days=policy.ageInDays)
+            policyStart = currentDate - timedelta(days=policy.ageInDays)
+            processedDate = policyStart
+            while processedDate > self.MIN_DATE:
+                LOGGER.debug(f'Cleaning {processedDate.strftime("%Y-%m-%d")}...')
+                measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(db, date=processedDate,
+                                                                                               policy=policy)
 
-            affectedMeasurements = Crud.get_measurements(db=db,
-                                                         startDateTime=self.MIN_DATETIME.strftime(Crud.DATE_FORMAT),
-                                                         endDateTime=policyStart.strftime(Crud.DATE_FORMAT))
-            LOGGER.debug(f'Found {len(affectedMeasurements)} measurements older than {policyStart}')
-            if not affectedMeasurements:
-                continue
+                processedDate = processedDate - timedelta(days=1)
 
-            affectedMeasurements.reverse()
+                if not idsToDelete:
+                    continue
 
-            self.__delete_old_measurements(affectedMeasurements, db, policy)
+                LOGGER.debug(f'Scheduled {len(idsToDelete)} measurements for deletion (keeping: {len(measurementIds)}, '
+                             f'max allowed: {policy.numberOfMeasurementsPerDay})')
+
+                Crud.delete_multiple_measurements(db, idsToDelete)
 
         LOGGER.info('Database cleanup done')
 
         # TODO: force backup?
 
     @staticmethod
-    def _get_measurements_by_day(db: Session, date: datetime.date) -> List[Schemas.Measurement]:
-        startTime = datetime(year=date.year, month=date.month, day=date.day, hour=0, minute=0, second=0, microsecond=0)
-        endTime = datetime(year=date.year, month=date.month, day=date.day, hour=23, minute=59, second=59, microsecond=0)
-
-        return Crud.get_measurements(db=db,
-                                     startDateTime=startTime.strftime(Crud.DATE_FORMAT),
-                                     endDateTime=endTime.strftime(Crud.DATE_FORMAT))
-
-    def _get_closest_measurement_for_point(self, measurements: List[Schemas.Measurement],
-                                           point: datetime,
-                                           upperLimit: datetime,
-                                           lowerLimit: datetime) -> Optional[Schemas.Measurement]:
-        pass
-
-    def __delete_old_measurements(self, affectedMeasurements, db, policy):
-        lastTimestamp = datetime.strptime(affectedMeasurements[0].timestamp, Crud.DATE_FORMAT)
-        nextAllowedTimestamp = lastTimestamp - timedelta(minutes=policy.resolutionInMinutes)
-
-        measurementsIdsToDelete = []
-
-        for measurement in affectedMeasurements[1:]:
-            timestamp = datetime.strptime(measurement.timestamp, Crud.DATE_FORMAT)
-            if timestamp > nextAllowedTimestamp:
-                measurementsIdsToDelete.append(measurement.id)
-            else:
-                lastTimestamp = timestamp
-                nextAllowedTimestamp = lastTimestamp - timedelta(minutes=policy.resolutionInMinutes)
-
-        LOGGER.debug(
-            f'Scheduled {len(measurementsIdsToDelete)} measurements for deletion (keeping {len(affectedMeasurements) - len(measurementsIdsToDelete)})')
-        Crud.delete_multiple_measurements(db, measurementsIdsToDelete)
+    def _categorize_measurements_for_day(db: Session, date: datetime.date,
+                                         policy: RetentionPolicy) -> Tuple[List[int], Set[int]]:
+        points = policy.determine_measurement_points(date)
+
+        measurementIdsToKeep = []
+        allMeasurementIds = set()
+        for index, point in enumerate(points):
+            previousItem = DatabaseCleaner.__get_previous_item(index, point, points)
+            nextItem = DatabaseCleaner.__get_next_item(index, point, points)
+
+            possibleMeasurements = Crud.get_measurements(db, previousItem.strftime(Crud.DATE_FORMAT),
+                                                         nextItem.strftime(Crud.DATE_FORMAT))
+            allMeasurementIds.update([m.id for m in possibleMeasurements])
+
+            closestMeasurement = DatabaseCleaner._get_closest_measurement_for_point(possibleMeasurements, point)
+            if closestMeasurement is not None:
+                measurementIdsToKeep.append(closestMeasurement.id)
+
+        return measurementIdsToKeep, {m for m in allMeasurementIds if m not in measurementIdsToKeep}
+
+    @staticmethod
+    def __get_previous_item(index: int, point: datetime, points: List[datetime]) -> datetime:
+        if index == 0:
+            previousItem = point
+        else:
+            previousItem = points[index - 1]
+        return previousItem
+
+    @staticmethod
+    def __get_next_item(index: int, point: datetime, points: List[datetime]) -> datetime:
+        if index == (len(points) - 1):
+            nextItem = datetime(year=point.year, month=point.month, day=point.day, hour=23, minute=59, second=59)
+        else:
+            nextItem = points[index + 1]
+        return nextItem
+
+    @staticmethod
+    def _get_closest_measurement_for_point(measurements: List[Schemas.Measurement],
+                                           point: datetime) -> Optional[Schemas.Measurement]:
+        if not measurements:
+            return None
+
+        return min(measurements, key=lambda m: abs(datetime.strptime(m.timestamp, Crud.DATE_FORMAT) - point))
diff --git a/src/test/DatabaseCleanerTest.py b/src/test/DatabaseCleanerTest.py
index 3eb1b6c..8e9e70d 100644
--- a/src/test/DatabaseCleanerTest.py
+++ b/src/test/DatabaseCleanerTest.py
@@ -2,9 +2,15 @@ import unittest
 from datetime import datetime
 from unittest.mock import Mock, patch
 
+from TheCodeLabs_BaseUtils.DefaultLogger import DefaultLogger
+
+from logic import Constants
+from logic.database import Schemas
 from logic.database.RetentionPolicy import RetentionPolicy
 
 CURRENT_DATE_TIME = datetime(year=2021, month=8, day=18, hour=22, minute=0, second=0)
+DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
+LOGGER = DefaultLogger().create_logger_if_not_exists(Constants.APP_NAME)
 
 
 class TestRetentionPolicy(unittest.TestCase):
@@ -12,13 +18,13 @@ class TestRetentionPolicy(unittest.TestCase):
         policy = RetentionPolicy(1, 10)
 
         with self.assertRaises(ValueError):
-            policy.determine_measurement_points(CURRENT_DATE_TIME)
+            policy.determine_measurement_points(CURRENT_DATE_TIME.date())
 
     def test_determineMeasurementPoints_zero_raise(self):
         policy = RetentionPolicy(0, 10)
 
         with self.assertRaises(ValueError):
-            policy.determine_measurement_points(CURRENT_DATE_TIME)
+            policy.determine_measurement_points(CURRENT_DATE_TIME.date())
 
     def test_determineMeasurementPoints_twoPoints(self):
         policy = RetentionPolicy(2, 10)
@@ -28,7 +34,7 @@ class TestRetentionPolicy(unittest.TestCase):
             datetime(year=2021, month=8, day=18, hour=12, minute=0, second=0)
         ]
 
-        self.assertEqual(expected, policy.determine_measurement_points(CURRENT_DATE_TIME))
+        self.assertEqual(expected, policy.determine_measurement_points(CURRENT_DATE_TIME.date()))
 
     def test_determineMeasurementPoints_fourPoints(self):
         policy = RetentionPolicy(4, 10)
@@ -40,81 +46,174 @@ class TestRetentionPolicy(unittest.TestCase):
             datetime(year=2021, month=8, day=18, hour=18, minute=0, second=0),
         ]
 
-        self.assertEqual(expected, policy.determine_measurement_points(CURRENT_DATE_TIME))
+        self.assertEqual(expected, policy.determine_measurement_points(CURRENT_DATE_TIME.date()))
 
     def test_determineMeasurementPoints_moreThan24Hours(self):
         policy = RetentionPolicy(30, 10)
 
-        result = policy.determine_measurement_points(CURRENT_DATE_TIME)
+        result = policy.determine_measurement_points(CURRENT_DATE_TIME.date())
         self.assertEqual(30, len(result))
         self.assertIn(datetime(year=2021, month=8, day=18, hour=0, minute=0, second=0), result)
         self.assertIn(datetime(year=2021, month=8, day=18, hour=0, minute=48, second=0), result)
 
 
 class TestDatabaseCleaner(unittest.TestCase):
-    def test_getMeasurementsByDay(self):
+    MEASUREMENT1 = Schemas.Measurement(id=1, value=5, sensor_id=15,
+                                       timestamp=datetime(year=2021, month=8, day=18,
+                                                          hour=6, minute=55, second=0).strftime(DATE_FORMAT))
+    MEASUREMENT2 = Schemas.Measurement(id=2, value=5, sensor_id=15,
+                                       timestamp=datetime(year=2021, month=8, day=18,
+                                                          hour=13, minute=15, second=0).strftime(DATE_FORMAT))
+
+    MEASUREMENT3 = Schemas.Measurement(id=3, value=5, sensor_id=15,
+                                       timestamp=datetime(year=2021, month=8, day=18,
+                                                          hour=13, minute=45, second=0).strftime(DATE_FORMAT))
+
+    MEASUREMENT4 = Schemas.Measurement(id=4, value=5, sensor_id=15,
+                                       timestamp=datetime(year=2021, month=8, day=18,
+                                                          hour=13, minute=48, second=0).strftime(DATE_FORMAT))
+
+    @classmethod
+    def get_measurements_mocked(cls, db, startTime, endTime):
+        if startTime == '2021-08-18 00:00:00' and endTime == '2021-08-18 06:00:00':
+            return [cls.MEASUREMENT1]
+        if startTime == '2021-08-18 00:00:00' and endTime == '2021-08-18 12:00:00':
+            return [cls.MEASUREMENT1]
+        elif startTime == '2021-08-18 06:00:00' and endTime == '2021-08-18 18:00:00':
+            return [cls.MEASUREMENT1, cls.MEASUREMENT2, cls.MEASUREMENT3, cls.MEASUREMENT4]
+        elif startTime == '2021-08-18 12:00:00' and endTime == '2021-08-18 23:59:59':
+            return [cls.MEASUREMENT2, cls.MEASUREMENT3, cls.MEASUREMENT4]
+        elif startTime == '2021-08-18 18:00:00' and endTime == '2021-08-18 23:59:59':
+            return []
+        else:
+            return []
+
+    def test__GetClosestMeasurementForPoint_noMeasurementInRange(self):
+        mockedCrud = Mock()
+        with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
+            from logic.database.DatabaseCleaner import DatabaseCleaner
+
+            result = DatabaseCleaner._get_closest_measurement_for_point([], CURRENT_DATE_TIME)
+            self.assertIsNone(result)
+
+    def test__GetClosestMeasurementForPoint_getClosest_AllMeasurementsBeforePoint(self):
+        mockedCrud = Mock()
+        with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
+            mockedCrud.DATE_FORMAT = DATE_FORMAT
+
+            from logic.database.DatabaseCleaner import DatabaseCleaner
+
+            expected = Schemas.Measurement(id=2, value=5, sensor_id=15,
+                                           timestamp=datetime(year=2021, month=8, day=18,
+                                                              hour=21, minute=55, second=0).strftime(DATE_FORMAT))
+            measurements = [
+                Schemas.Measurement(id=1, value=5, sensor_id=15,
+                                    timestamp=datetime(year=2021, month=8, day=18,
+                                                       hour=21, minute=50, second=0).strftime(DATE_FORMAT)),
+                expected
+            ]
+
+            result = DatabaseCleaner._get_closest_measurement_for_point(measurements, CURRENT_DATE_TIME)
+            self.assertEqual(expected, result)
+
+    def test__GetClosestMeasurementForPoint_getClosest_AllMeasurementsAfterPoint(self):
+        mockedCrud = Mock()
+        with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
+            mockedCrud.DATE_FORMAT = DATE_FORMAT
+
+            from logic.database.DatabaseCleaner import DatabaseCleaner
+
+            expected = Schemas.Measurement(id=1, value=5, sensor_id=15,
+                                           timestamp=datetime(year=2021, month=8, day=18,
+                                                              hour=22, minute=15, second=0).strftime(DATE_FORMAT))
+            measurements = [
+                expected,
+                Schemas.Measurement(id=1, value=5, sensor_id=15,
+                                    timestamp=datetime(year=2021, month=8, day=18,
+                                                       hour=22, minute=30, second=0).strftime(DATE_FORMAT))
+            ]
+
+            result = DatabaseCleaner._get_closest_measurement_for_point(measurements, CURRENT_DATE_TIME)
+            self.assertEqual(expected, result)
+
+    def test__GetClosestMeasurementForPoint_getClosest_BothSidesOfPoint(self):
+        mockedCrud = Mock()
+        with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
+            mockedCrud.DATE_FORMAT = DATE_FORMAT
+
+            from logic.database.DatabaseCleaner import DatabaseCleaner
+
+            expected = Schemas.Measurement(id=1, value=5, sensor_id=15,
+                                           timestamp=datetime(year=2021, month=8, day=18,
+                                                              hour=21, minute=55, second=0).strftime(DATE_FORMAT))
+            measurements = [
+                expected,
+                Schemas.Measurement(id=1, value=5, sensor_id=15,
+                                    timestamp=datetime(year=2021, month=8, day=18,
+                                                       hour=22, minute=10, second=0).strftime(DATE_FORMAT))
+            ]
+
+            result = DatabaseCleaner._get_closest_measurement_for_point(measurements, CURRENT_DATE_TIME)
+            self.assertEqual(expected, result)
+
+    def test__GetClosestMeasurementForPoint_getClosest_EqualDistance(self):
+        mockedCrud = Mock()
+        with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
+            mockedCrud.DATE_FORMAT = DATE_FORMAT
+
+            from logic.database.DatabaseCleaner import DatabaseCleaner
+
+            expected = Schemas.Measurement(id=1, value=5, sensor_id=15,
+                                           timestamp=datetime(year=2021, month=8, day=18,
+                                                              hour=21, minute=55, second=0).strftime(DATE_FORMAT))
+            measurements = [
+                expected,
+                Schemas.Measurement(id=1, value=5, sensor_id=15,
+                                    timestamp=datetime(year=2021, month=8, day=18,
+                                                       hour=22, minute=5, second=0).strftime(DATE_FORMAT))
+            ]
+
+            result = DatabaseCleaner._get_closest_measurement_for_point(measurements, CURRENT_DATE_TIME)
+            self.assertEqual(expected, result)
+
+    def test_GetMeasurementsForDay(self):
+        mockedCrud = Mock()
+        with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
+            mockedCrud.DATE_FORMAT = DATE_FORMAT
+            mockedCrud.get_measurements.side_effect = self.get_measurements_mocked
+
+            from logic.database.DatabaseCleaner import DatabaseCleaner
+
+            database = Mock()
+            policy = RetentionPolicy(numberOfMeasurementsPerDay=4, ageInDays=10)
+            measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(database, CURRENT_DATE_TIME.date(), policy)
+            self.assertEqual([self.MEASUREMENT1.id, self.MEASUREMENT1.id, self.MEASUREMENT2.id, self.MEASUREMENT4.id], measurementIds)
+            self.assertEqual({self.MEASUREMENT3.id}, idsToDelete)
+
+    def test_noRetentionPolicies_doNothing(self):
         mockedCrud = Mock()
         with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
             mockedCrud.get_measurements.return_value = []
+
+            database = Mock()
+            from logic.database.DatabaseCleaner import DatabaseCleaner
+            DatabaseCleaner([]).clean(database, CURRENT_DATE_TIME)
+
+            mockedCrud.get_measurements.assert_not_called()
+
+    def test_onePolicy_deleteMeasurements(self):
+        mockedCrud = Mock()
+        with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
             mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
+            mockedCrud.get_measurements.side_effect = self.get_measurements_mocked
 
             database = Mock()
             from logic.database.DatabaseCleaner import DatabaseCleaner
-            DatabaseCleaner([])._get_measurements_by_day(database, CURRENT_DATE_TIME)
-
-            mockedCrud.get_measurements.assert_called_once_with(db=database,
-                                                                startDateTime='2021-08-18 00:00:00',
-                                                                endDateTime='2021-08-18 23:59:59')
-
-    # def test_noRetentionPolicies_doNothing(self):
-    #     mockedCrud = Mock()
-    #     with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
-    #         mockedCrud.get_measurements.return_value = []
-    #
-    #         database = Mock()
-    #         from logic.database.DatabaseCleaner import DatabaseCleaner
-    #         DatabaseCleaner([]).clean(database, CURRENT_DATE_TIME)
-    #
-    #         mockedCrud.get_measurements.assert_not_called()
-    #
-    # def test_onePolicy_fetchMeasurementsOlderThanPolicyStart(self):
-    #     mockedCrud = Mock()
-    #     with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
-    #         mockedCrud.get_measurements.return_value = []
-    #         mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
-    #
-    #         database = Mock()
-    #         from logic.database.DatabaseCleaner import DatabaseCleaner
-    #         from logic.database.DatabaseCleaner import RetentionPolicy
-    #
-    #         policy = RetentionPolicy(numberOfMeasurementsPerDay=4, ageInDays=1)
-    #         DatabaseCleaner([policy]).clean(database, CURRENT_DATE_TIME)
-    #
-    #         expectedEndTime = CURRENT_DATE_TIME - timedelta(days=policy.ageInDays)
-    #         mockedCrud.get_measurements.assert_called_once_with(db=database,
-    #                                                             startDateTime=DatabaseCleaner.MIN_DATETIME.strftime(mockedCrud.DATE_FORMAT),
-    #                                                             endDateTime=expectedEndTime.strftime(mockedCrud.DATE_FORMAT))
-    #
-    # def test_onePolicy_deleteMeasurements(self):
-    #     mockedCrud = Mock()
-    #     with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
-    #
-    #         measurementToBeDeleted1 = Schemas.Measurement(id=1, value='5', timestamp='2021-08-17 20:05:00', sensor_id=2)
-    #         measurementToKeep = Schemas.Measurement(id=2, value='5', timestamp='2021-08-17 20:09:12', sensor_id=2)
-    #         measurementToBeDeleted2 = Schemas.Measurement(id=3, value='5', timestamp='2021-08-17 21:07:12', sensor_id=2)
-    #         measurementToBeDeleted3 = Schemas.Measurement(id=4, value='5', timestamp='2021-08-17 21:09:12', sensor_id=2)
-    #         measurementAfterPolicyStart = Schemas.Measurement(id=5, value='5', timestamp='2021-08-17 21:10:12', sensor_id=2)
-    #
-    #         mockedCrud.get_measurements.return_value = [measurementToBeDeleted1, measurementToKeep, measurementToBeDeleted2, measurementToBeDeleted3, measurementAfterPolicyStart]
-    #         mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
-    #
-    #         database = Mock()
-    #         from logic.database.DatabaseCleaner import DatabaseCleaner
-    #         from logic.database.DatabaseCleaner import RetentionPolicy
-    #
-    #         policy = RetentionPolicy(resolutionInMinutes=10, ageInDays=1)
-    #         DatabaseCleaner([policy]).clean(database, self.CURRENT_DATE_TIME)
-    #
-    #         mockedCrud.delete_multiple_measurements.assert_called_once_with(database, [4, 3, 1])
-    #
-    # # TODO: test: if cleanup is performed on the next day: prevent deletion of already low resolution measurements
+            from logic.database.DatabaseCleaner import RetentionPolicy
+
+            policy = RetentionPolicy(numberOfMeasurementsPerDay=4, ageInDays=1)
+            DatabaseCleaner([policy]).clean(database, datetime(year=2021, month=8, day=19).date())
+
+            mockedCrud.delete_multiple_measurements.assert_called_once_with(database, {3})
+
+    # TODO: test: multiple policies
-- 
GitLab