Skip to content
Snippets Groups Projects
Commit 7e77c2bd authored by Robert Goldmann's avatar Robert Goldmann
Browse files

#9 - get measurements by day and calculate measurement points for retention policy

parent b77bdc16
No related branches found
No related tags found
No related merge requests found
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List
from typing import List, Optional
from sqlalchemy.orm import Session
from logic import Constants
from logic.database import Crud
from logic.database import Crud, Schemas
from logic.database.RetentionPolicy import RetentionPolicy
LOGGER = logging.getLogger(Constants.APP_NAME)
@dataclass
class RetentionPolicy:
resolutionInMinutes: int
ageInDays: int
class DatabaseCleaner:
MIN_DATETIME = datetime(year=1970, month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
......@@ -49,6 +43,21 @@ class DatabaseCleaner:
# TODO: force backup?
@staticmethod
def _get_measurements_by_day(db: Session, date: datetime.date) -> List[Schemas.Measurement]:
startTime = datetime(year=date.year, month=date.month, day=date.day, hour=0, minute=0, second=0, microsecond=0)
endTime = datetime(year=date.year, month=date.month, day=date.day, hour=23, minute=59, second=59, microsecond=0)
return Crud.get_measurements(db=db,
startDateTime=startTime.strftime(Crud.DATE_FORMAT),
endDateTime=endTime.strftime(Crud.DATE_FORMAT))
def _get_closest_measurement_for_point(self, measurements: List[Schemas.Measurement],
point: datetime,
upperLimit: datetime,
lowerLimit: datetime) -> Optional[Schemas.Measurement]:
pass
def __delete_old_measurements(self, affectedMeasurements, db, policy):
lastTimestamp = datetime.strptime(affectedMeasurements[0].timestamp, Crud.DATE_FORMAT)
nextAllowedTimestamp = lastTimestamp - timedelta(minutes=policy.resolutionInMinutes)
......@@ -63,5 +72,6 @@ class DatabaseCleaner:
lastTimestamp = timestamp
nextAllowedTimestamp = lastTimestamp - timedelta(minutes=policy.resolutionInMinutes)
LOGGER.debug(f'Scheduled {len(measurementsIdsToDelete)} measurements for deletion (keeping {len(affectedMeasurements) - len(measurementsIdsToDelete)})')
LOGGER.debug(
f'Scheduled {len(measurementsIdsToDelete)} measurements for deletion (keeping {len(affectedMeasurements) - len(measurementsIdsToDelete)})')
Crud.delete_multiple_measurements(db, measurementsIdsToDelete)
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List
@dataclass
class RetentionPolicy:
numberOfMeasurementsPerDay: int
ageInDays: int
def determine_measurement_points(self, date: datetime.date) -> List[datetime]:
if self.numberOfMeasurementsPerDay % 2 != 0:
raise ValueError('"numberOfMeasurementsPerDay" must be an even number!')
if self.numberOfMeasurementsPerDay <= 0:
raise ValueError('"numberOfMeasurementsPerDay" must be larger than zero!')
startTime = datetime(year=date.year, month=date.month, day=date.day, hour=0, minute=0, second=0)
stepSizeInMinutes = 24 * 60 // self.numberOfMeasurementsPerDay
points = []
for index in range(self.numberOfMeasurementsPerDay):
points.append(startTime + timedelta(minutes=stepSizeInMinutes * index))
return points
import unittest
from datetime import datetime, timedelta
from datetime import datetime
from unittest.mock import Mock, patch
from logic.database import Schemas
from logic.database.RetentionPolicy import RetentionPolicy
class TestDatabaseCleaner(unittest.TestCase):
CURRENT_DATE_TIME = datetime(year=2021, month=8, day=18, hour=22, minute=0, second=0)
def test_noRetentionPolicies_doNothing(self):
mockedCrud = Mock()
with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
mockedCrud.get_measurements.return_value = []
database = Mock()
from logic.database.DatabaseCleaner import DatabaseCleaner
DatabaseCleaner([]).clean(database, self.CURRENT_DATE_TIME)
class TestRetentionPolicy(unittest.TestCase):
def test_determineMeasurementPoints_oddNumberOfPoints_raise(self):
policy = RetentionPolicy(1, 10)
mockedCrud.get_measurements.assert_not_called()
with self.assertRaises(ValueError):
policy.determine_measurement_points(CURRENT_DATE_TIME)
def test_onePolicy_fetchMeasurementsOlderThanPolicyStart(self):
mockedCrud = Mock()
with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
mockedCrud.get_measurements.return_value = []
mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
def test_determineMeasurementPoints_zero_raise(self):
policy = RetentionPolicy(0, 10)
database = Mock()
from logic.database.DatabaseCleaner import DatabaseCleaner
from logic.database.DatabaseCleaner import RetentionPolicy
with self.assertRaises(ValueError):
policy.determine_measurement_points(CURRENT_DATE_TIME)
policy = RetentionPolicy(resolutionInMinutes=10, ageInDays=1)
DatabaseCleaner([policy]).clean(database, self.CURRENT_DATE_TIME)
def test_determineMeasurementPoints_twoPoints(self):
policy = RetentionPolicy(2, 10)
expectedEndTime = self.CURRENT_DATE_TIME - timedelta(days=policy.ageInDays)
mockedCrud.get_measurements.assert_called_once_with(db=database,
startDateTime=DatabaseCleaner.MIN_DATETIME.strftime(mockedCrud.DATE_FORMAT),
endDateTime=expectedEndTime.strftime(mockedCrud.DATE_FORMAT))
expected = [
datetime(year=2021, month=8, day=18, hour=0, minute=0, second=0),
datetime(year=2021, month=8, day=18, hour=12, minute=0, second=0)
]
def test_onePolicy_deleteMeasurements(self):
mockedCrud = Mock()
with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
self.assertEqual(expected, policy.determine_measurement_points(CURRENT_DATE_TIME))
def test_determineMeasurementPoints_fourPoints(self):
policy = RetentionPolicy(4, 10)
expected = [
datetime(year=2021, month=8, day=18, hour=0, minute=0, second=0),
datetime(year=2021, month=8, day=18, hour=6, minute=0, second=0),
datetime(year=2021, month=8, day=18, hour=12, minute=0, second=0),
datetime(year=2021, month=8, day=18, hour=18, minute=0, second=0),
]
self.assertEqual(expected, policy.determine_measurement_points(CURRENT_DATE_TIME))
def test_determineMeasurementPoints_moreThan24Hours(self):
policy = RetentionPolicy(30, 10)
result = policy.determine_measurement_points(CURRENT_DATE_TIME)
self.assertEqual(30, len(result))
self.assertIn(datetime(year=2021, month=8, day=18, hour=0, minute=0, second=0), result)
self.assertIn(datetime(year=2021, month=8, day=18, hour=0, minute=48, second=0), result)
measurementToBeDeleted1 = Schemas.Measurement(id=1, value='5', timestamp='2021-08-17 20:05:00', sensor_id=2)
measurementToKeep = Schemas.Measurement(id=2, value='5', timestamp='2021-08-17 20:09:12', sensor_id=2)
measurementToBeDeleted2 = Schemas.Measurement(id=3, value='5', timestamp='2021-08-17 21:07:12', sensor_id=2)
measurementToBeDeleted3 = Schemas.Measurement(id=4, value='5', timestamp='2021-08-17 21:09:12', sensor_id=2)
measurementAfterPolicyStart = Schemas.Measurement(id=5, value='5', timestamp='2021-08-17 21:10:12', sensor_id=2)
mockedCrud.get_measurements.return_value = [measurementToBeDeleted1, measurementToKeep, measurementToBeDeleted2, measurementToBeDeleted3, measurementAfterPolicyStart]
class TestDatabaseCleaner(unittest.TestCase):
def test_getMeasurementsByDay(self):
mockedCrud = Mock()
with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
mockedCrud.get_measurements.return_value = []
mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
database = Mock()
from logic.database.DatabaseCleaner import DatabaseCleaner
from logic.database.DatabaseCleaner import RetentionPolicy
DatabaseCleaner([])._get_measurements_by_day(database, CURRENT_DATE_TIME)
policy = RetentionPolicy(resolutionInMinutes=10, ageInDays=1)
DatabaseCleaner([policy]).clean(database, self.CURRENT_DATE_TIME)
mockedCrud.delete_multiple_measurements.assert_called_once_with(database, [4, 3, 1])
# TODO: test: if cleanup is performed on the next day: prevent deletion of already low resolution measurements
mockedCrud.get_measurements.assert_called_once_with(db=database,
startDateTime='2021-08-18 00:00:00',
endDateTime='2021-08-18 23:59:59')
# def test_noRetentionPolicies_doNothing(self):
# mockedCrud = Mock()
# with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
# mockedCrud.get_measurements.return_value = []
#
# database = Mock()
# from logic.database.DatabaseCleaner import DatabaseCleaner
# DatabaseCleaner([]).clean(database, CURRENT_DATE_TIME)
#
# mockedCrud.get_measurements.assert_not_called()
#
# def test_onePolicy_fetchMeasurementsOlderThanPolicyStart(self):
# mockedCrud = Mock()
# with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
# mockedCrud.get_measurements.return_value = []
# mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
#
# database = Mock()
# from logic.database.DatabaseCleaner import DatabaseCleaner
# from logic.database.DatabaseCleaner import RetentionPolicy
#
# policy = RetentionPolicy(numberOfMeasurementsPerDay=4, ageInDays=1)
# DatabaseCleaner([policy]).clean(database, CURRENT_DATE_TIME)
#
# expectedEndTime = CURRENT_DATE_TIME - timedelta(days=policy.ageInDays)
# mockedCrud.get_measurements.assert_called_once_with(db=database,
# startDateTime=DatabaseCleaner.MIN_DATETIME.strftime(mockedCrud.DATE_FORMAT),
# endDateTime=expectedEndTime.strftime(mockedCrud.DATE_FORMAT))
#
# def test_onePolicy_deleteMeasurements(self):
# mockedCrud = Mock()
# with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
#
# measurementToBeDeleted1 = Schemas.Measurement(id=1, value='5', timestamp='2021-08-17 20:05:00', sensor_id=2)
# measurementToKeep = Schemas.Measurement(id=2, value='5', timestamp='2021-08-17 20:09:12', sensor_id=2)
# measurementToBeDeleted2 = Schemas.Measurement(id=3, value='5', timestamp='2021-08-17 21:07:12', sensor_id=2)
# measurementToBeDeleted3 = Schemas.Measurement(id=4, value='5', timestamp='2021-08-17 21:09:12', sensor_id=2)
# measurementAfterPolicyStart = Schemas.Measurement(id=5, value='5', timestamp='2021-08-17 21:10:12', sensor_id=2)
#
# mockedCrud.get_measurements.return_value = [measurementToBeDeleted1, measurementToKeep, measurementToBeDeleted2, measurementToBeDeleted3, measurementAfterPolicyStart]
# mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
#
# database = Mock()
# from logic.database.DatabaseCleaner import DatabaseCleaner
# from logic.database.DatabaseCleaner import RetentionPolicy
#
# policy = RetentionPolicy(resolutionInMinutes=10, ageInDays=1)
# DatabaseCleaner([policy]).clean(database, self.CURRENT_DATE_TIME)
#
# mockedCrud.delete_multiple_measurements.assert_called_once_with(database, [4, 3, 1])
#
# # TODO: test: if cleanup is performed on the next day: prevent deletion of already low resolution measurements
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment