From be0d1d73fb5b3e687caacf67821424f2bd9d14db Mon Sep 17 00:00:00 2001 From: Laszlo Fogas Date: Tue, 25 Jun 2019 12:07:36 +0200 Subject: [PATCH] Task dependencies to survive restarts --- model/queue.go | 37 ++++++++++++------- store/datastore/ddl/mysql/ddl_gen.go | 20 ++++++++++ .../ddl/mysql/files/022_add_task_columns.sql | 6 +++ store/datastore/ddl/postgres/ddl_gen.go | 20 ++++++++++ .../postgres/files/022_add_task_columns.sql | 6 +++ store/datastore/ddl/sqlite/ddl_gen.go | 20 ++++++++++ .../ddl/sqlite/files/022_add_task_columns.sql | 6 +++ store/datastore/sql/mysql/files/task.sql | 2 + store/datastore/sql/mysql/sql_gen.go | 2 + store/datastore/sql/postgres/files/tasks.sql | 2 + store/datastore/sql/postgres/sql_gen.go | 2 + store/datastore/sql/sqlite/files/task.sql | 2 + store/datastore/sql/sqlite/sql_gen.go | 2 + 13 files changed, 114 insertions(+), 13 deletions(-) create mode 100644 store/datastore/ddl/mysql/files/022_add_task_columns.sql create mode 100644 store/datastore/ddl/postgres/files/022_add_task_columns.sql create mode 100644 store/datastore/ddl/sqlite/files/022_add_task_columns.sql diff --git a/model/queue.go b/model/queue.go index 433fdcc00..3b5952687 100644 --- a/model/queue.go +++ b/model/queue.go @@ -23,9 +23,11 @@ import ( // Task defines scheduled pipeline Task. type Task struct { - ID string `meddler:"task_id"` - Data []byte `meddler:"task_data"` - Labels map[string]string `meddler:"task_labels,json"` + ID string `meddler:"task_id"` + Data []byte `meddler:"task_data"` + Labels map[string]string `meddler:"task_labels,json"` + Dependencies []string `meddler:"task_dependencies,json"` + RunOn []string `meddler:"task_run_on,json"` } // TaskStore defines storage for scheduled Tasks. @@ -39,13 +41,18 @@ type TaskStore interface { // ensures the task Queue can be restored when the system starts. func WithTaskStore(q queue.Queue, s TaskStore) queue.Queue { tasks, _ := s.TaskList() + toEnqueue := []*queue.Task{} for _, task := range tasks { - q.Push(context.Background(), &queue.Task{ - ID: task.ID, - Data: task.Data, - Labels: task.Labels, + toEnqueue = append(toEnqueue, &queue.Task{ + ID: task.ID, + Data: task.Data, + Labels: task.Labels, + Dependencies: task.Dependencies, + RunOn: task.RunOn, + DepStatus: make(map[string]bool), }) } + q.PushAtOnce(context.Background(), toEnqueue) return &persistentQueue{q, s} } @@ -57,9 +64,11 @@ type persistentQueue struct { // Push pushes a task to the tail of this queue. func (q *persistentQueue) Push(c context.Context, task *queue.Task) error { q.store.TaskInsert(&Task{ - ID: task.ID, - Data: task.Data, - Labels: task.Labels, + ID: task.ID, + Data: task.Data, + Labels: task.Labels, + Dependencies: task.Dependencies, + RunOn: task.RunOn, }) err := q.Queue.Push(c, task) if err != nil { @@ -72,9 +81,11 @@ func (q *persistentQueue) Push(c context.Context, task *queue.Task) error { func (q *persistentQueue) PushAtOnce(c context.Context, tasks []*queue.Task) error { for _, task := range tasks { q.store.TaskInsert(&Task{ - ID: task.ID, - Data: task.Data, - Labels: task.Labels, + ID: task.ID, + Data: task.Data, + Labels: task.Labels, + Dependencies: task.Dependencies, + RunOn: task.RunOn, }) } err := q.Queue.PushAtOnce(c, tasks) diff --git a/store/datastore/ddl/mysql/ddl_gen.go b/store/datastore/ddl/mysql/ddl_gen.go index 79da2b3f9..23c77f2de 100644 --- a/store/datastore/ddl/mysql/ddl_gen.go +++ b/store/datastore/ddl/mysql/ddl_gen.go @@ -172,6 +172,14 @@ var migrations = []struct { name: "populate-build-config", stmt: populateBuildConfig, }, + { + name: "alter-table-add-task-dependencies", + stmt: alterTableAddTaskDependencies, + }, + { + name: "alter-table-add-task-run-on", + stmt: alterTableAddTaskRunOn, + }, } // Migrate performs the database migration. If the migration fails @@ -673,3 +681,15 @@ var populateBuildConfig = ` INSERT INTO build_config (config_id, build_id) SELECT build_config_id, build_id FROM builds ` + +// +// 022_add_task_columns.sql +// + +var alterTableAddTaskDependencies = ` +ALTER TABLE tasks ADD COLUMN task_dependencies MEDIUMBLOB +` + +var alterTableAddTaskRunOn = ` +ALTER TABLE tasks ADD COLUMN task_run_on MEDIUMBLOB +` diff --git a/store/datastore/ddl/mysql/files/022_add_task_columns.sql b/store/datastore/ddl/mysql/files/022_add_task_columns.sql new file mode 100644 index 000000000..b98a2180a --- /dev/null +++ b/store/datastore/ddl/mysql/files/022_add_task_columns.sql @@ -0,0 +1,6 @@ +-- name: alter-table-add-task-dependencies +ALTER TABLE tasks ADD COLUMN task_dependencies MEDIUMBLOB + +-- name: alter-table-add-task-run-on + +ALTER TABLE tasks ADD COLUMN task_run_on MEDIUMBLOB \ No newline at end of file diff --git a/store/datastore/ddl/postgres/ddl_gen.go b/store/datastore/ddl/postgres/ddl_gen.go index 1fe2828e9..06dfdd9e4 100644 --- a/store/datastore/ddl/postgres/ddl_gen.go +++ b/store/datastore/ddl/postgres/ddl_gen.go @@ -172,6 +172,14 @@ var migrations = []struct { name: "populate-build-config", stmt: populateBuildConfig, }, + { + name: "alter-table-add-task-dependencies", + stmt: alterTableAddTaskDependencies, + }, + { + name: "alter-table-add-task-run-on", + stmt: alterTableAddTaskRunOn, + }, } // Migrate performs the database migration. If the migration fails @@ -675,3 +683,15 @@ var populateBuildConfig = ` INSERT INTO build_config (config_id, build_id) SELECT build_config_id, build_id FROM builds ` + +// +// 022_add_task_columns.sql +// + +var alterTableAddTaskDependencies = ` +ALTER TABLE tasks ADD COLUMN task_dependencies BYTEA +` + +var alterTableAddTaskRunOn = ` +ALTER TABLE tasks ADD COLUMN task_run_on BYTEA +` diff --git a/store/datastore/ddl/postgres/files/022_add_task_columns.sql b/store/datastore/ddl/postgres/files/022_add_task_columns.sql new file mode 100644 index 000000000..555e1d882 --- /dev/null +++ b/store/datastore/ddl/postgres/files/022_add_task_columns.sql @@ -0,0 +1,6 @@ +-- name: alter-table-add-task-dependencies +ALTER TABLE tasks ADD COLUMN task_dependencies BYTEA + +-- name: alter-table-add-task-run-on + +ALTER TABLE tasks ADD COLUMN task_run_on BYTEA diff --git a/store/datastore/ddl/sqlite/ddl_gen.go b/store/datastore/ddl/sqlite/ddl_gen.go index 1da2de86b..b898bbed9 100644 --- a/store/datastore/ddl/sqlite/ddl_gen.go +++ b/store/datastore/ddl/sqlite/ddl_gen.go @@ -176,6 +176,14 @@ var migrations = []struct { name: "populate-build-config", stmt: populateBuildConfig, }, + { + name: "alter-table-add-task-dependencies", + stmt: alterTableAddTaskDependencies, + }, + { + name: "alter-table-add-task-run-on", + stmt: alterTableAddTaskRunOn, + }, } // Migrate performs the database migration. If the migration fails @@ -674,3 +682,15 @@ var populateBuildConfig = ` INSERT INTO build_config (config_id, build_id) SELECT build_config_id, build_id FROM builds ` + +// +// 022_add_task_columns.sql +// + +var alterTableAddTaskDependencies = ` +ALTER TABLE tasks ADD COLUMN task_dependencies BLOB +` + +var alterTableAddTaskRunOn = ` +ALTER TABLE tasks ADD COLUMN task_run_on BLOB +` diff --git a/store/datastore/ddl/sqlite/files/022_add_task_columns.sql b/store/datastore/ddl/sqlite/files/022_add_task_columns.sql new file mode 100644 index 000000000..01b16d89a --- /dev/null +++ b/store/datastore/ddl/sqlite/files/022_add_task_columns.sql @@ -0,0 +1,6 @@ +-- name: alter-table-add-task-dependencies +ALTER TABLE tasks ADD COLUMN task_dependencies BLOB + +-- name: alter-table-add-task-run-on + +ALTER TABLE tasks ADD COLUMN task_run_on BLOB diff --git a/store/datastore/sql/mysql/files/task.sql b/store/datastore/sql/mysql/files/task.sql index 5b61c7c5e..61890f721 100644 --- a/store/datastore/sql/mysql/files/task.sql +++ b/store/datastore/sql/mysql/files/task.sql @@ -4,6 +4,8 @@ SELECT task_id ,task_data ,task_labels +,task_dependencies +,task_run_on FROM tasks -- name: task-delete diff --git a/store/datastore/sql/mysql/sql_gen.go b/store/datastore/sql/mysql/sql_gen.go index d0669c3c9..73990a460 100644 --- a/store/datastore/sql/mysql/sql_gen.go +++ b/store/datastore/sql/mysql/sql_gen.go @@ -554,6 +554,8 @@ SELECT task_id ,task_data ,task_labels +,task_dependencies +,task_run_on FROM tasks ` diff --git a/store/datastore/sql/postgres/files/tasks.sql b/store/datastore/sql/postgres/files/tasks.sql index 896c66e51..515ec5e83 100644 --- a/store/datastore/sql/postgres/files/tasks.sql +++ b/store/datastore/sql/postgres/files/tasks.sql @@ -4,6 +4,8 @@ SELECT task_id ,task_data ,task_labels +,task_dependencies +,task_run_on FROM tasks -- name: task-delete diff --git a/store/datastore/sql/postgres/sql_gen.go b/store/datastore/sql/postgres/sql_gen.go index e64d661af..84eb1df9a 100644 --- a/store/datastore/sql/postgres/sql_gen.go +++ b/store/datastore/sql/postgres/sql_gen.go @@ -559,6 +559,8 @@ SELECT task_id ,task_data ,task_labels +,task_dependencies +,task_run_on FROM tasks ` diff --git a/store/datastore/sql/sqlite/files/task.sql b/store/datastore/sql/sqlite/files/task.sql index 5b61c7c5e..61890f721 100644 --- a/store/datastore/sql/sqlite/files/task.sql +++ b/store/datastore/sql/sqlite/files/task.sql @@ -4,6 +4,8 @@ SELECT task_id ,task_data ,task_labels +,task_dependencies +,task_run_on FROM tasks -- name: task-delete diff --git a/store/datastore/sql/sqlite/sql_gen.go b/store/datastore/sql/sqlite/sql_gen.go index 4d2798f9d..08a2c1b43 100644 --- a/store/datastore/sql/sqlite/sql_gen.go +++ b/store/datastore/sql/sqlite/sql_gen.go @@ -554,6 +554,8 @@ SELECT task_id ,task_data ,task_labels +,task_dependencies +,task_run_on FROM tasks `