Skip to content
Snippets Groups Projects
Commit 9e84ffa7 authored by Robert Goldmann's avatar Robert Goldmann
Browse files

#41 - added database migrator and database version

parent 2846da2d
No related branches found
No related tags found
1 merge request!1Fixed #41 - Add option to hide roadmap
......@@ -8,6 +8,7 @@ from blueprints.RoadmapAPI import RoadmapParameters
from blueprints.MilestoneAPI import MilestoneParameters
from blueprints.TaskAPI import TaskParameters
from blueprints.SubTaskAPI import SubTaskParameters
from logic.DatabaseMigrator import DatabaseMigrator
class FetchType(Enum):
......@@ -17,6 +18,8 @@ class FetchType(Enum):
class Database:
VERSION = 2
def __init__(self, database_settings):
self.__host = database_settings["host"]
self.__port = database_settings["port"]
......@@ -44,7 +47,7 @@ class Database:
if self.__connection is not None:
self.__connection.close()
def __query(self, query, *args, fetch_type=FetchType.ALL):
def _query(self, query, *args, fetch_type=FetchType.ALL):
try:
self.__cursor.execute(query, args)
except psycopg2.InterfaceError:
......@@ -58,14 +61,27 @@ class Database:
return self.__cursor.fetchall()
def __create_tables(self):
self.__create_table_version()
self.__create_table_roadmaps()
self.__create_table_milestones()
self.__create_table_tasks()
self.__create_table_subtasks()
databaseMigrator = DatabaseMigrator(self)
databaseMigrator.migrate()
def __create_table_version(self):
queryTable = f'CREATE TABLE IF NOT EXISTS "public"."version" ' \
f'("version" int4 NOT NULL DEFAULT {self.VERSION});'
self._query(queryTable, fetch_type=FetchType.NONE)
if not self.get_version():
queryInsert = f'INSERT INTO version ("version") VALUES (%s);'
self._query(queryInsert, self.VERSION, fetch_type=FetchType.NONE)
def __create_table_roadmaps(self):
query = 'CREATE SEQUENCE IF NOT EXISTS "roadmaps_ID_seq"'
self.__query(query, fetch_type=FetchType.NONE)
self._query(query, fetch_type=FetchType.NONE)
queryTable = f'CREATE TABLE IF NOT EXISTS "public"."roadmaps" ' \
f'("{RoadmapParameters.ID.value}" int4 NOT NULL DEFAULT ' \
......@@ -73,11 +89,11 @@ class Database:
f'"{RoadmapParameters.PROJECT_NAME.value}" text NOT NULL, ' \
f'"{RoadmapParameters.HIDDEN.value}" boolean NOT NULL DEFAULT false, ' \
f'PRIMARY KEY ("{RoadmapParameters.ID.value}"));'
self.__query(queryTable, fetch_type=FetchType.NONE)
self._query(queryTable, fetch_type=FetchType.NONE)
def __create_table_milestones(self):
query = 'CREATE SEQUENCE IF NOT EXISTS "milestones_ID_seq"'
self.__query(query, fetch_type=FetchType.NONE)
self._query(query, fetch_type=FetchType.NONE)
queryTable = f'CREATE TABLE IF NOT EXISTS "public"."milestones" ' \
f'("{MilestoneParameters.ID.value}" int4 NOT NULL DEFAULT ' \
......@@ -90,11 +106,11 @@ class Database:
f'"{MilestoneParameters.COMPLETION_DATE.value}" date NOT NULL, ' \
f'"{MilestoneParameters.STATUS.value}" int4 NOT NULL, ' \
f'PRIMARY KEY ("{MilestoneParameters.ID.value}"));'
self.__query(queryTable, fetch_type=FetchType.NONE)
self._query(queryTable, fetch_type=FetchType.NONE)
def __create_table_tasks(self):
query = 'CREATE SEQUENCE IF NOT EXISTS "tasks_ID_seq"'
self.__query(query, fetch_type=FetchType.NONE)
self._query(query, fetch_type=FetchType.NONE)
queryTable = f'CREATE TABLE IF NOT EXISTS "public"."tasks" ' \
f'("{TaskParameters.ID.value}" int4 NOT NULL DEFAULT ' \
......@@ -104,11 +120,11 @@ class Database:
f'"{TaskParameters.DESCRIPTION.value}" text NOT NULL, ' \
f'"{TaskParameters.STATUS.value}" int4 NOT NULL, ' \
f'PRIMARY KEY ("{TaskParameters.ID.value}"));'
self.__query(queryTable, fetch_type=FetchType.NONE)
self._query(queryTable, fetch_type=FetchType.NONE)
def __create_table_subtasks(self):
query = 'CREATE SEQUENCE IF NOT EXISTS "subtasks_ID_seq"'
self.__query(query, fetch_type=FetchType.NONE)
self._query(query, fetch_type=FetchType.NONE)
queryTable = f'CREATE TABLE IF NOT EXISTS "public"."subtasks" ' \
f'("{SubTaskParameters.ID.value}" int4 NOT NULL DEFAULT ' \
......@@ -118,135 +134,148 @@ class Database:
f'"{SubTaskParameters.DESCRIPTION.value}" text NOT NULL, ' \
f'"{SubTaskParameters.STATUS.value}" int4 NOT NULL, ' \
f'PRIMARY KEY ("{SubTaskParameters.ID.value}"));'
self.__query(queryTable, fetch_type=FetchType.NONE)
self._query(queryTable, fetch_type=FetchType.NONE)
# VERSION
def get_version(self) -> int or None:
query = f'SELECT * FROM version;'
result = self._query(query, fetch_type=FetchType.ONE)
if result is None:
return None
return result['version']
def update_version(self, version: int):
query = f'UPDATE version SET "version"=%s;'
self._query(query, version, fetch_type=FetchType.NONE)
# ROADMAPS
def get_roadmaps(self):
query = f'SELECT * FROM roadmaps ORDER BY "{RoadmapParameters.ID.value}";'
return self.__query(query)
return self._query(query)
def get_visible_roadmaps(self):
query = f'SELECT * FROM roadmaps WHERE "{RoadmapParameters.HIDDEN.value}"=FALSE ORDER BY "{RoadmapParameters.ID.value}";'
return self.__query(query)
return self._query(query)
def get_roadmap(self, roadmapID):
query = f'SELECT * FROM roadmaps WHERE "{RoadmapParameters.ID.value}"=%s;'
return self.__query(query, roadmapID, fetch_type=FetchType.ONE)
return self._query(query, roadmapID, fetch_type=FetchType.ONE)
def add_roadmap(self, name):
query = f'INSERT INTO roadmaps ("{RoadmapParameters.PROJECT_NAME.value}") VALUES (%s);'
self.__query(query, name, fetch_type=FetchType.NONE)
self._query(query, name, fetch_type=FetchType.NONE)
def update_roadmap(self, roadmapID, name, hidden):
query = f'UPDATE roadmaps SET "{RoadmapParameters.PROJECT_NAME.value}"=%s, "{RoadmapParameters.HIDDEN.value}"=%s WHERE "{RoadmapParameters.ID.value}"=%s;'
self.__query(query, name, hidden, roadmapID, fetch_type=FetchType.NONE)
self._query(query, name, hidden, roadmapID, fetch_type=FetchType.NONE)
def delete_roadmap(self, roadmapID):
query = f'DELETE FROM roadmaps WHERE "{RoadmapParameters.ID.value}"=%s;'
self.__query(query, roadmapID, fetch_type=FetchType.NONE)
self._query(query, roadmapID, fetch_type=FetchType.NONE)
# MILESTONES
def get_all_milestones(self):
query = f'SELECT * FROM milestones ORDER BY "{MilestoneParameters.VERSION_CODE.value}" DESC;'
return self.__query(query)
return self._query(query)
def get_milestones(self, roadmapID):
query = f'SELECT * FROM milestones WHERE "{MilestoneParameters.ROADMAP_ID.value}"=%s ORDER BY "{MilestoneParameters.VERSION_CODE.value}" DESC;'
return self.__query(query, roadmapID)
return self._query(query, roadmapID)
def get_open_milestones(self, roadmapID):
query = f'SELECT * FROM milestones WHERE "{MilestoneParameters.ROADMAP_ID.value}"=%s AND "Status"=0 ORDER BY "{MilestoneParameters.VERSION_CODE.value}" DESC;'
return self.__query(query, roadmapID)
return self._query(query, roadmapID)
def get_milestone(self, milestoneID):
query = f'SELECT * FROM milestones WHERE "{MilestoneParameters.ID.value}"=%s;'
return self.__query(query, milestoneID, fetch_type=FetchType.ONE)
return self._query(query, milestoneID, fetch_type=FetchType.ONE)
def get_latest_milestone(self, roadmapID):
query = f'SELECT * FROM milestones WHERE "{MilestoneParameters.ROADMAP_ID.value}"=%s AND "{MilestoneParameters.STATUS.value}" = 1 ORDER BY "{MilestoneParameters.VERSION_CODE.value}" DESC;'
return self.__query(query, roadmapID, fetch_type=FetchType.ONE)
return self._query(query, roadmapID, fetch_type=FetchType.ONE)
def add_milestone(self, roadmapID, versionCode, versionName, title, dueDate, completionDate, status):
query = f'INSERT INTO milestones ("{MilestoneParameters.ROADMAP_ID.value}", "{MilestoneParameters.VERSION_CODE.value}", "{MilestoneParameters.VERSION_NAME.value}", "{MilestoneParameters.TITLE.value}", "{MilestoneParameters.DUE_DATE.value}", "{MilestoneParameters.COMPLETION_DATE.value}", "{MilestoneParameters.STATUS.value}") VALUES (%s, %s, %s, %s, %s, %s, %s);'
self.__query(query, roadmapID, versionCode, versionName, title, dueDate, completionDate, status,
self._query(query, roadmapID, versionCode, versionName, title, dueDate, completionDate, status,
fetch_type=FetchType.NONE)
def update_milestone(self, milestoneID, roadmapID, versionCode, versionName, title, dueDate, completionDate,
status):
query = f'UPDATE milestones SET "{MilestoneParameters.ROADMAP_ID.value}"=%s, "{MilestoneParameters.VERSION_CODE.value}"=%s, "{MilestoneParameters.VERSION_NAME.value}"=%s, "{MilestoneParameters.TITLE.value}"=%s, "{MilestoneParameters.DUE_DATE.value}"=%s, "{MilestoneParameters.COMPLETION_DATE.value}"=%s, "{MilestoneParameters.STATUS.value}"=%s WHERE "{MilestoneParameters.ID.value}"=%s;'
self.__query(query, roadmapID, versionCode, versionName, title, dueDate, completionDate, status, milestoneID,
self._query(query, roadmapID, versionCode, versionName, title, dueDate, completionDate, status, milestoneID,
fetch_type=FetchType.NONE)
def finish_milestone(self, milestoneID):
query = f'UPDATE milestones SET "{MilestoneParameters.COMPLETION_DATE.value}"=%s, "{MilestoneParameters.STATUS.value}"=%s WHERE "{MilestoneParameters.ID.value}"=%s;'
self.__query(query, Date.today(), 1, milestoneID, fetch_type=FetchType.NONE)
self._query(query, Date.today(), 1, milestoneID, fetch_type=FetchType.NONE)
def delete_milestone(self, milestoneID):
query = f'DELETE FROM milestones WHERE "{MilestoneParameters.ID.value}"=%s;'
self.__query(query, milestoneID, fetch_type=FetchType.NONE)
self._query(query, milestoneID, fetch_type=FetchType.NONE)
# TASKS
def get_all_tasks(self):
query = f'SELECT * FROM tasks ORDER BY "{TaskParameters.ID.value}";'
return self.__query(query)
return self._query(query)
def get_tasks(self, milestoneID):
query = f'SELECT * FROM tasks WHERE "{TaskParameters.MILESTONE_ID.value}"=%s ORDER BY "{TaskParameters.ID.value}";'
return self.__query(query, milestoneID)
return self._query(query, milestoneID)
def get_open_tasks(self, milestoneID):
query = f'SELECT * FROM tasks WHERE "{TaskParameters.MILESTONE_ID.value}"=%s AND "{TaskParameters.STATUS.value}"=0 ORDER BY "{TaskParameters.ID.value}";'
return self.__query(query, milestoneID)
return self._query(query, milestoneID)
def get_task(self, taskID):
query = f'SELECT * FROM tasks WHERE "{TaskParameters.ID.value}"=%s;'
return self.__query(query, taskID, fetch_type=FetchType.ONE)
return self._query(query, taskID, fetch_type=FetchType.ONE)
def add_task(self, milestoneID, title, description, status):
query = f'INSERT INTO tasks ("{TaskParameters.MILESTONE_ID.value}", "{TaskParameters.TITLE.value}", "{TaskParameters.DESCRIPTION.value}", "{TaskParameters.STATUS.value}") VALUES (%s, %s, %s, %s);'
self.__query(query, milestoneID, title, description, status, fetch_type=FetchType.NONE)
self._query(query, milestoneID, title, description, status, fetch_type=FetchType.NONE)
def update_task(self, taskID, milestoneID, title, description, status):
query = f'UPDATE tasks SET "{TaskParameters.MILESTONE_ID.value}"=%s, "{TaskParameters.TITLE.value}"=%s, "{TaskParameters.DESCRIPTION.value}"=%s, "{TaskParameters.STATUS.value}"=%s WHERE "{TaskParameters.ID.value}"=%s;'
self.__query(query, milestoneID, title, description, status, taskID, fetch_type=FetchType.NONE)
self._query(query, milestoneID, title, description, status, taskID, fetch_type=FetchType.NONE)
def finish_task(self, taskID):
query = f'UPDATE tasks SET "{TaskParameters.STATUS.value}"=%s WHERE "{TaskParameters.ID.value}"=%s;'
self.__query(query, 1, taskID, fetch_type=FetchType.NONE)
self._query(query, 1, taskID, fetch_type=FetchType.NONE)
def delete_task(self, taskID):
query = f'DELETE FROM tasks WHERE "{TaskParameters.ID.value}"=%s;'
self.__query(query, taskID, fetch_type=FetchType.NONE)
self._query(query, taskID, fetch_type=FetchType.NONE)
# SUBTASKS
def get_all_sub_tasks(self):
query = f'SELECT * FROM subtasks ORDER BY "{SubTaskParameters.ID.value}";'
return self.__query(query)
return self._query(query)
def get_sub_tasks(self, taskID):
query = f'SELECT * FROM subtasks WHERE "{SubTaskParameters.TASK_ID.value}"=%s ORDER BY "{SubTaskParameters.ID.value}";'
return self.__query(query, taskID)
return self._query(query, taskID)
def get_open_sub_tasks(self, taskID):
query = f'SELECT * FROM subtasks WHERE "{SubTaskParameters.TASK_ID.value}"=%s AND "{SubTaskParameters.STATUS.value}"=0 ORDER BY "{SubTaskParameters.ID.value}";'
return self.__query(query, taskID)
return self._query(query, taskID)
def get_sub_task(self, subTaskID):
query = f'SELECT * FROM subtasks WHERE "{SubTaskParameters.ID.value}"=%s;'
return self.__query(query, subTaskID, fetch_type=FetchType.ONE)
return self._query(query, subTaskID, fetch_type=FetchType.ONE)
def add_sub_task(self, taskID, title, description, status):
query = f'INSERT INTO subtasks ("{SubTaskParameters.TASK_ID.value}", "{SubTaskParameters.TITLE.value}", "{SubTaskParameters.DESCRIPTION.value}", "{SubTaskParameters.STATUS.value}") VALUES (%s, %s, %s, %s);'
self.__query(query, taskID, title, description, status, fetch_type=FetchType.NONE)
self._query(query, taskID, title, description, status, fetch_type=FetchType.NONE)
def update_sub_task(self, subTaskID, taskID, title, description, status):
query = f'UPDATE subtasks SET "{SubTaskParameters.TASK_ID.value}"=%s, "{SubTaskParameters.TITLE.value}"=%s, "{SubTaskParameters.DESCRIPTION.value}"=%s, "{SubTaskParameters.STATUS.value}"=%s WHERE "{SubTaskParameters.ID.value}"=%s;'
self.__query(query, taskID, title, description, status, subTaskID, fetch_type=FetchType.NONE)
self._query(query, taskID, title, description, status, subTaskID, fetch_type=FetchType.NONE)
def finish_sub_task(self, subTaskID):
query = f'UPDATE subtasks SET "{SubTaskParameters.STATUS.value}"=%s WHERE "{SubTaskParameters.ID.value}"=%s;'
self.__query(query, 1, subTaskID, fetch_type=FetchType.NONE)
self._query(query, 1, subTaskID, fetch_type=FetchType.NONE)
def delete_sub_task(self, subTaskID):
query = f'DELETE FROM subtasks WHERE "{SubTaskParameters.ID.value}"=%s;'
self.__query(query, subTaskID, fetch_type=FetchType.NONE)
self._query(query, subTaskID, fetch_type=FetchType.NONE)
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
from blueprints.RoadmapAPI import RoadmapParameters
from logic import Constants
if TYPE_CHECKING:
from logic.Database import Database
LOGGER = logging.getLogger(Constants.APP_NAME)
class DatabaseMigrator:
def __init__(self, database: Database):
self._database = database
def migrate(self):
latestVersion = self._database.VERSION
currentVersion = self._database.get_version()
while currentVersion < latestVersion:
currentVersion = self._database.get_version()
self.__migrate_version(currentVersion)
if latestVersion == currentVersion:
LOGGER.debug(f'Database version: {latestVersion} (latest)')
return
def __migrate_version(self, currentVersion):
from logic.Database import FetchType
if currentVersion == 1:
LOGGER.debug('Migrating database from version 1 to 2...')
queryColumnExists = f'SELECT EXISTS (SELECT 1 FROM information_schema.columns ' \
f'WHERE table_name=\'roadmaps\' AND column_name=\'{RoadmapParameters.HIDDEN.value}\');'
exists = self._database._query(queryColumnExists, fetch_type=FetchType.ONE)
if not exists['exists']:
query = f'ALTER TABLE "roadmaps" ADD "{RoadmapParameters.HIDDEN.value}" boolean NOT NULL DEFAULT false;'
self._database._query(query, fetch_type=FetchType.NONE)
self._database.update_version(2)
return
raise ValueError(f'No migration handler for version {currentVersion} defined')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment