Skip to content
Snippets Groups Projects
Commit 453d1c55 authored by Robert Goldmann's avatar Robert Goldmann
Browse files

#9 - only perform cleanup per sensor

parent 350f1e5c
No related branches found
No related tags found
No related merge requests found
......@@ -14,9 +14,6 @@ LOGGER = logging.getLogger(Constants.APP_NAME)
class DatabaseCleaner:
MIN_DATE = datetime(year=1970, month=1, day=1).date()
# TODO DEBUG:
# MIN_DATE = (datetime.now() - timedelta(days=31)).date()
def __init__(self, retentionPolicies: List[RetentionPolicy]):
self._policies = retentionPolicies
......@@ -27,18 +24,27 @@ class DatabaseCleaner:
LOGGER.debug(f'Enforcing retention policy: {policy}')
policyStart = currentDate - timedelta(days=policy.ageInDays)
allSensors = Crud.get_sensors(db, skip=0, limit=1000000)
for sensor in allSensors:
LOGGER.debug(f'Cleaning measurements for sensor "{sensor.name}" '
f'(id: {sensor.id}, device_id: {sensor.device_id})')
processedDate = policyStart
while processedDate > self.MIN_DATE:
LOGGER.debug(f'Cleaning {processedDate.strftime("%Y-%m-%d")}...')
measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(db, date=processedDate,
policy=policy)
measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(db,
date=processedDate,
policy=policy,
sensorId=sensor.id)
processedDate = processedDate - timedelta(days=1)
if not idsToDelete:
continue
LOGGER.debug(f'Scheduled {len(idsToDelete)} measurements for deletion (keeping: {len(measurementIds)}, '
LOGGER.debug(
f'Scheduled {len(idsToDelete)} measurements for deletion (keeping: {len(measurementIds)}, '
f'max allowed: {policy.numberOfMeasurementsPerDay})')
Crud.delete_multiple_measurements(db, idsToDelete)
......@@ -49,7 +55,7 @@ class DatabaseCleaner:
@staticmethod
def _categorize_measurements_for_day(db: Session, date: datetime.date,
policy: RetentionPolicy) -> Tuple[List[int], Set[int]]:
policy: RetentionPolicy, sensorId: int) -> Tuple[List[int], Set[int]]:
points = policy.determine_measurement_points(date)
measurementIdsToKeep = []
......@@ -58,8 +64,9 @@ class DatabaseCleaner:
previousItem = DatabaseCleaner.__get_previous_item(index, point, points)
nextItem = DatabaseCleaner.__get_next_item(index, point, points)
possibleMeasurements = Crud.get_measurements(db, previousItem.strftime(Crud.DATE_FORMAT),
nextItem.strftime(Crud.DATE_FORMAT))
possibleMeasurements = Crud.get_measurements_for_sensor(db, previousItem.strftime(Crud.DATE_FORMAT),
nextItem.strftime(Crud.DATE_FORMAT), sensorId)
allMeasurementIds.update([m.id for m in possibleMeasurements])
closestMeasurement = DatabaseCleaner._get_closest_measurement_for_point(possibleMeasurements, point)
......
......@@ -58,23 +58,32 @@ class TestRetentionPolicy(unittest.TestCase):
class TestDatabaseCleaner(unittest.TestCase):
MEASUREMENT1 = Schemas.Measurement(id=1, value=5, sensor_id=15,
MEASUREMENT1 = Schemas.Measurement(id=1, value=5, sensor_id=1,
timestamp=datetime(year=2021, month=8, day=18,
hour=6, minute=55, second=0).strftime(DATE_FORMAT))
MEASUREMENT2 = Schemas.Measurement(id=2, value=5, sensor_id=15,
MEASUREMENT2 = Schemas.Measurement(id=2, value=5, sensor_id=1,
timestamp=datetime(year=2021, month=8, day=18,
hour=13, minute=15, second=0).strftime(DATE_FORMAT))
MEASUREMENT3 = Schemas.Measurement(id=3, value=5, sensor_id=15,
MEASUREMENT3 = Schemas.Measurement(id=3, value=5, sensor_id=1,
timestamp=datetime(year=2021, month=8, day=18,
hour=13, minute=45, second=0).strftime(DATE_FORMAT))
MEASUREMENT4 = Schemas.Measurement(id=4, value=5, sensor_id=15,
MEASUREMENT4 = Schemas.Measurement(id=4, value=5, sensor_id=1,
timestamp=datetime(year=2021, month=8, day=18,
hour=13, minute=48, second=0).strftime(DATE_FORMAT))
MEASUREMENT5 = Schemas.Measurement(id=5, value=5, sensor_id=2,
timestamp=datetime(year=2021, month=8, day=18,
hour=11, minute=55, second=0).strftime(DATE_FORMAT))
MEASUREMENT6 = Schemas.Measurement(id=6, value=5, sensor_id=2,
timestamp=datetime(year=2021, month=8, day=18,
hour=11, minute=54, second=0).strftime(DATE_FORMAT))
@classmethod
def get_measurements_mocked(cls, db, startTime, endTime):
def get_measurements_mocked(cls, db, startTime, endTime, sensorId):
if sensorId == 1:
if startTime == '2021-08-18 00:00:00' and endTime == '2021-08-18 06:00:00':
return [cls.MEASUREMENT1]
if startTime == '2021-08-18 00:00:00' and endTime == '2021-08-18 12:00:00':
......@@ -87,6 +96,11 @@ class TestDatabaseCleaner(unittest.TestCase):
return []
else:
return []
else:
if startTime == '2021-08-18 00:00:00' and endTime == '2021-08-18 12:00:00':
return [cls.MEASUREMENT5, cls.MEASUREMENT6]
else:
return []
def test__GetClosestMeasurementForPoint_noMeasurementInRange(self):
mockedCrud = Mock()
......@@ -180,32 +194,37 @@ class TestDatabaseCleaner(unittest.TestCase):
mockedCrud = Mock()
with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
mockedCrud.DATE_FORMAT = DATE_FORMAT
mockedCrud.get_measurements.side_effect = self.get_measurements_mocked
mockedCrud.get_measurements_for_sensor.side_effect = self.get_measurements_mocked
from logic.database.DatabaseCleaner import DatabaseCleaner
database = Mock()
policy = RetentionPolicy(numberOfMeasurementsPerDay=4, ageInDays=10)
measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(database, CURRENT_DATE_TIME.date(), policy)
self.assertEqual([self.MEASUREMENT1.id, self.MEASUREMENT1.id, self.MEASUREMENT2.id, self.MEASUREMENT4.id], measurementIds)
measurementIds, idsToDelete = DatabaseCleaner._categorize_measurements_for_day(database,
CURRENT_DATE_TIME.date(),
policy, 1)
self.assertEqual([self.MEASUREMENT1.id, self.MEASUREMENT1.id, self.MEASUREMENT2.id, self.MEASUREMENT4.id],
measurementIds)
self.assertEqual({self.MEASUREMENT3.id}, idsToDelete)
def test_noRetentionPolicies_doNothing(self):
mockedCrud = Mock()
with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
mockedCrud.get_measurements.return_value = []
mockedCrud.get_measurements_for_sensor.return_value = []
database = Mock()
from logic.database.DatabaseCleaner import DatabaseCleaner
DatabaseCleaner([]).clean(database, CURRENT_DATE_TIME)
mockedCrud.get_measurements.assert_not_called()
mockedCrud.get_measurements_for_sensor.assert_not_called()
def test_onePolicy_deleteMeasurements(self):
def test_onePolicy_deleteMeasurements_oneSensor(self):
mockedCrud = Mock()
with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
mockedCrud.get_measurements.side_effect = self.get_measurements_mocked
mockedCrud.get_measurements_for_sensor.side_effect = self.get_measurements_mocked
mockedCrud.get_sensors.return_value = [Schemas.Sensor(id=1, name="myTempSensor",
type="temperature", device_id=1)]
database = Mock()
from logic.database.DatabaseCleaner import DatabaseCleaner
......@@ -216,4 +235,27 @@ class TestDatabaseCleaner(unittest.TestCase):
mockedCrud.delete_multiple_measurements.assert_called_once_with(database, {3})
def test_onePolicy_deleteMeasurements_twoSensors(self):
mockedCrud = Mock()
with patch.dict('sys.modules', **{'logic.database.Crud': mockedCrud}):
mockedCrud.DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
mockedCrud.get_measurements_for_sensor.side_effect = self.get_measurements_mocked
mockedCrud.get_sensors.return_value = [Schemas.Sensor(id=1, name="myTempSensor",
type="temperature", device_id=1),
Schemas.Sensor(id=2, name="myHumiditySensor",
type="humidity", device_id=1)]
database = Mock()
from logic.database.DatabaseCleaner import DatabaseCleaner
from logic.database.DatabaseCleaner import RetentionPolicy
policy = RetentionPolicy(numberOfMeasurementsPerDay=4, ageInDays=1)
DatabaseCleaner([policy]).clean(database, datetime(year=2021, month=8, day=19).date())
calls = mockedCrud.delete_multiple_measurements.call_args_list
self.assertEqual(2, len(calls))
self.assertEqual((database, {3}), calls[0].args)
self.assertEqual((database, {5}), calls[1].args)
# TODO: test: multiple policies
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment