diff --git a/migrations/2021-06-05-112912_create_fang_tasks/down.sql b/migrations/2021-06-05-112912_create_fang_tasks/down.sql index e8becd4..3658a90 100644 --- a/migrations/2021-06-05-112912_create_fang_tasks/down.sql +++ b/migrations/2021-06-05-112912_create_fang_tasks/down.sql @@ -1,2 +1,4 @@ DROP TABLE fang_tasks; DROP TYPE fang_task_state; + +DROP TABLE fang_periodic_tasks; diff --git a/migrations/2021-06-05-112912_create_fang_tasks/up.sql b/migrations/2021-06-05-112912_create_fang_tasks/up.sql index 075415a..a60140d 100644 --- a/migrations/2021-06-05-112912_create_fang_tasks/up.sql +++ b/migrations/2021-06-05-112912_create_fang_tasks/up.sql @@ -15,3 +15,16 @@ CREATE TABLE fang_tasks ( CREATE INDEX fang_tasks_state_index ON fang_tasks(state); CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type); CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at); +CREATE INDEX fang_tasks_metadata_index ON fang_tasks(metadata); + +CREATE TABLE fang_periodic_tasks ( + id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), + metadata jsonb NOT NULL, + period_in_seconds INTEGER NOT NULL, + scheduled_at TIMESTAMP WITH TIME ZONE, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() +); + +CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at); +CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata); diff --git a/src/lib.rs b/src/lib.rs index 7d01087..fe20481 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ mod schema; pub mod executor; pub mod postgres; +pub mod scheduler; pub mod worker_pool; pub use executor::*; diff --git a/src/postgres.rs b/src/postgres.rs index 05df5c5..56c8039 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -1,7 +1,10 @@ use crate::executor::Runnable; +use crate::schema::fang_periodic_tasks; use crate::schema::fang_tasks; use crate::schema::FangTaskState; -use chrono::{DateTime, Utc}; +use chrono::DateTime; +use chrono::Duration; +use chrono::Utc; use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::result::Error; @@ -21,6 +24,17 @@ pub struct Task { pub updated_at: DateTime, } +#[derive(Queryable, Identifiable, Debug, Eq, PartialEq, Clone)] +#[table_name = "fang_periodic_tasks"] +pub struct PeriodicTask { + pub id: Uuid, + pub metadata: serde_json::Value, + pub period_in_seconds: i32, + pub scheduled_at: Option>, + pub created_at: DateTime, + pub updated_at: DateTime, +} + #[derive(Insertable)] #[table_name = "fang_tasks"] pub struct NewTask { @@ -28,6 +42,13 @@ pub struct NewTask { pub task_type: String, } +#[derive(Insertable)] +#[table_name = "fang_periodic_tasks"] +pub struct NewPeriodicTask { + pub metadata: serde_json::Value, + pub period_in_seconds: i32, +} + pub struct Postgres { pub connection: PgConnection, } @@ -54,12 +75,38 @@ impl Postgres { pub fn push_task(&self, job: &dyn Runnable) -> Result { let json_job = serde_json::to_value(job).unwrap(); - let new_task = NewTask { - metadata: json_job, - task_type: job.task_type(), - }; + match self.find_task_by_metadata(&json_job) { + Some(task) => Ok(task), + None => { + let new_task = NewTask { + metadata: json_job.clone(), + task_type: job.task_type(), + }; + self.insert(&new_task) + } + } + } - self.insert(&new_task) + pub fn push_periodic_task( + &self, + job: &dyn Runnable, + period: i32, + ) -> Result { + let json_job = serde_json::to_value(job).unwrap(); + + match self.find_periodic_task_by_metadata(&json_job) { + Some(task) => Ok(task), + None => { + let new_task = NewPeriodicTask { + metadata: json_job, + period_in_seconds: period, + }; + + diesel::insert_into(fang_periodic_tasks::table) + .values(new_task) + .get_result::(&self.connection) + } + } } pub fn enqueue_task(job: &dyn Runnable) -> Result { @@ -101,6 +148,50 @@ impl Postgres { .ok() } + pub fn find_periodic_task_by_id(&self, id: Uuid) -> Option { + fang_periodic_tasks::table + .filter(fang_periodic_tasks::id.eq(id)) + .first::(&self.connection) + .ok() + } + + pub fn fetch_periodic_tasks(&self, error_margin_seconds: i64) -> Option> { + let current_time = Self::current_time(); + + let low_limit = current_time - Duration::seconds(error_margin_seconds); + let high_limit = current_time + Duration::seconds(error_margin_seconds); + + fang_periodic_tasks::table + .filter( + fang_periodic_tasks::scheduled_at + .gt(low_limit) + .and(fang_periodic_tasks::scheduled_at.lt(high_limit)), + ) + .or_filter(fang_periodic_tasks::scheduled_at.is_null()) + .load::(&self.connection) + .ok() + } + + pub fn schedule_next_task_execution(&self, task: &PeriodicTask) -> Result { + let current_time = Self::current_time(); + let scheduled_at = current_time + Duration::seconds(task.period_in_seconds.into()); + + diesel::update(task) + .set(( + fang_periodic_tasks::scheduled_at.eq(scheduled_at), + fang_periodic_tasks::updated_at.eq(current_time), + )) + .get_result::(&self.connection) + } + + pub fn remove_all_tasks(&self) -> Result { + diesel::delete(fang_tasks::table).execute(&self.connection) + } + + pub fn remove_all_periodic_tasks(&self) -> Result { + diesel::delete(fang_periodic_tasks::table).execute(&self.connection) + } + pub fn remove_task(&self, id: Uuid) -> Result { let query = fang_tasks::table.filter(fang_tasks::id.eq(id)); @@ -172,19 +263,41 @@ impl Postgres { .get_result::(&self.connection) .ok() } + + fn find_periodic_task_by_metadata(&self, metadata: &serde_json::Value) -> Option { + fang_periodic_tasks::table + .filter(fang_periodic_tasks::metadata.eq(metadata)) + .first::(&self.connection) + .ok() + } + + fn find_task_by_metadata(&self, metadata: &serde_json::Value) -> Option { + fang_tasks::table + .filter(fang_tasks::metadata.eq(metadata)) + .filter( + fang_tasks::state + .eq(FangTaskState::New) + .or(fang_tasks::state.eq(FangTaskState::InProgress)), + ) + .first::(&self.connection) + .ok() + } } #[cfg(test)] mod postgres_tests { use super::NewTask; + use super::PeriodicTask; use super::Postgres; use super::Task; use crate::executor::Error as ExecutorError; use crate::executor::Runnable; + use crate::schema::fang_periodic_tasks; use crate::schema::fang_tasks; use crate::schema::FangTaskState; use crate::typetag; use crate::{Deserialize, Serialize}; + use chrono::prelude::*; use chrono::{DateTime, Duration, Utc}; use diesel::connection::Connection; use diesel::prelude::*; @@ -312,6 +425,174 @@ mod postgres_tests { }); } + #[test] + fn push_task_does_not_insert_the_same_task() { + let postgres = Postgres::new(); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let job = Job { number: 10 }; + let task2 = postgres.push_task(&job).unwrap(); + + let task1 = postgres.push_task(&job).unwrap(); + + assert_eq!(task1.id, task2.id); + + Ok(()) + }); + } + + #[test] + fn push_periodic_task() { + let postgres = Postgres::new(); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let job = Job { number: 10 }; + let task = postgres.push_periodic_task(&job, 60).unwrap(); + + assert_eq!(task.period_in_seconds, 60); + assert!(postgres.find_periodic_task_by_id(task.id).is_some()); + + Ok(()) + }); + } + + #[test] + fn push_periodic_task_returns_existing_job() { + let postgres = Postgres::new(); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let job = Job { number: 10 }; + let task1 = postgres.push_periodic_task(&job, 60).unwrap(); + + let task2 = postgres.push_periodic_task(&job, 60).unwrap(); + + assert_eq!(task1.id, task2.id); + + Ok(()) + }); + } + + #[test] + fn fetch_periodic_tasks_fetches_periodic_task_without_scheduled_at() { + let postgres = Postgres::new(); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let job = Job { number: 10 }; + let task = postgres.push_periodic_task(&job, 60).unwrap(); + + let schedule_in_future = Utc::now() + Duration::hours(100); + + insert_periodic_job( + serde_json::json!(true), + schedule_in_future, + 100, + &postgres.connection, + ); + + let tasks = postgres.fetch_periodic_tasks(100).unwrap(); + + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].id, task.id); + + Ok(()) + }); + } + + #[test] + fn schedule_next_task_execution() { + let postgres = Postgres::new(); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let task = insert_periodic_job( + serde_json::json!(true), + Utc::now(), + 100, + &postgres.connection, + ); + + let updated_task = postgres.schedule_next_task_execution(&task).unwrap(); + + let next_schedule = (task.scheduled_at.unwrap() + + Duration::seconds(task.period_in_seconds.into())) + .round_subsecs(0); + + assert_eq!( + next_schedule, + updated_task.scheduled_at.unwrap().round_subsecs(0) + ); + + Ok(()) + }); + } + + #[test] + fn remove_all_periodic_tasks() { + let postgres = Postgres::new(); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let task = insert_periodic_job( + serde_json::json!(true), + Utc::now(), + 100, + &postgres.connection, + ); + + let result = postgres.remove_all_periodic_tasks().unwrap(); + + assert_eq!(1, result); + + assert_eq!(None, postgres.find_periodic_task_by_id(task.id)); + + Ok(()) + }); + } + + #[test] + fn remove_all_tasks() { + let postgres = Postgres::new(); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let task = insert_job(serde_json::json!(true), Utc::now(), &postgres.connection); + let result = postgres.remove_all_tasks().unwrap(); + + assert_eq!(1, result); + + assert_eq!(None, postgres.find_task_by_id(task.id)); + + Ok(()) + }); + } + + #[test] + fn fetch_periodic_tasks() { + let postgres = Postgres::new(); + + postgres.connection.test_transaction::<(), Error, _>(|| { + let schedule_in_future = Utc::now() + Duration::hours(100); + + insert_periodic_job( + serde_json::json!(true), + schedule_in_future, + 100, + &postgres.connection, + ); + + let task = insert_periodic_job( + serde_json::json!(true), + Utc::now(), + 100, + &postgres.connection, + ); + + let tasks = postgres.fetch_periodic_tasks(100).unwrap(); + + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].id, task.id); + + Ok(()) + }); + } + #[test] fn remove_task() { let postgres = Postgres::new(); @@ -351,13 +632,21 @@ mod postgres_tests { let postgres = Postgres::new(); let timestamp1 = Utc::now() - Duration::hours(40); - let task1 = insert_job(serde_json::json!(true), timestamp1, &postgres.connection); + let task1 = insert_job( + serde_json::json!(Job { number: 12 }), + timestamp1, + &postgres.connection, + ); let task1_id = task1.id; let timestamp2 = Utc::now() - Duration::hours(20); - let task2 = insert_job(serde_json::json!(false), timestamp2, &postgres.connection); + let task2 = insert_job( + serde_json::json!(Job { number: 11 }), + timestamp2, + &postgres.connection, + ); let thread = std::thread::spawn(move || { let postgres = Postgres::new(); @@ -416,6 +705,22 @@ mod postgres_tests { .unwrap() } + fn insert_periodic_job( + metadata: serde_json::Value, + timestamp: DateTime, + period_in_seconds: i32, + connection: &PgConnection, + ) -> PeriodicTask { + diesel::insert_into(fang_periodic_tasks::table) + .values(&vec![( + fang_periodic_tasks::metadata.eq(metadata), + fang_periodic_tasks::scheduled_at.eq(timestamp), + fang_periodic_tasks::period_in_seconds.eq(period_in_seconds), + )]) + .get_result::(connection) + .unwrap() + } + fn insert_new_job(connection: &PgConnection) -> Task { diesel::insert_into(fang_tasks::table) .values(&vec![(fang_tasks::metadata.eq(serde_json::json!(true)),)]) diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..abcfaea --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,133 @@ +use crate::executor::Runnable; +use crate::postgres::PeriodicTask; +use crate::postgres::Postgres; +use std::thread; +use std::time::Duration; + +pub struct Scheduler { + pub check_period: u64, + pub error_margin_seconds: u64, + pub postgres: Postgres, +} + +impl Drop for Scheduler { + fn drop(&mut self) { + Scheduler::start( + self.check_period, + self.error_margin_seconds, + Postgres::new(), + ) + } +} + +impl Scheduler { + pub fn start(check_period: u64, error_margin_seconds: u64, postgres: Postgres) { + let builder = thread::Builder::new().name("scheduler".to_string()); + + builder + .spawn(move || { + let scheduler = Self::new(check_period, error_margin_seconds, postgres); + + scheduler.schedule_loop(); + }) + .unwrap(); + } + + pub fn new(check_period: u64, error_margin_seconds: u64, postgres: Postgres) -> Self { + Self { + check_period, + postgres, + error_margin_seconds, + } + } + + pub fn schedule_loop(&self) { + let sleep_duration = Duration::from_secs(self.check_period); + + loop { + self.schedule(); + + thread::sleep(sleep_duration); + } + } + + pub fn schedule(&self) { + if let Some(tasks) = self + .postgres + .fetch_periodic_tasks(self.error_margin_seconds as i64) + { + for task in tasks { + self.process_task(task); + } + }; + } + + fn process_task(&self, task: PeriodicTask) { + match task.scheduled_at { + None => { + self.postgres.schedule_next_task_execution(&task).unwrap(); + } + Some(_) => { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + self.postgres.push_task(&(*actual_task)).unwrap(); + + self.postgres.schedule_next_task_execution(&task).unwrap(); + } + } + } +} + +#[cfg(test)] +mod job_scheduler_tests { + use super::Scheduler; + use crate::executor::Error; + use crate::executor::Runnable; + use crate::postgres::Postgres; + use crate::postgres::Task; + use crate::schema::fang_tasks; + use crate::typetag; + use crate::{Deserialize, Serialize}; + use diesel::pg::PgConnection; + use diesel::prelude::*; + use std::thread; + use std::time::Duration; + + #[derive(Serialize, Deserialize)] + struct ScheduledJob {} + + #[typetag::serde] + impl Runnable for ScheduledJob { + fn run(&self) -> Result<(), Error> { + Ok(()) + } + + fn task_type(&self) -> String { + "schedule".to_string() + } + } + + #[test] + #[ignore] + fn schedules_jobs() { + let postgres = Postgres::new(); + + postgres.push_periodic_task(&ScheduledJob {}, 10).unwrap(); + Scheduler::start(1, 2, Postgres::new()); + + let sleep_duration = Duration::from_secs(15); + thread::sleep(sleep_duration); + + let tasks = get_all_tasks(&postgres.connection); + + assert_eq!(1, tasks.len()); + } + + fn get_all_tasks(conn: &PgConnection) -> Vec { + fang_tasks::table + .filter(fang_tasks::task_type.eq("schedule")) + .get_results::(conn) + .unwrap() + } +} diff --git a/src/schema.rs b/src/schema.rs index 5a20286..a8cfc78 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -28,3 +28,14 @@ table! { updated_at -> Timestamptz, } } + +table! { + fang_periodic_tasks (id) { + id -> Uuid, + metadata -> Jsonb, + period_in_seconds -> Int4, + scheduled_at -> Nullable, + created_at -> Timestamptz, + updated_at -> Timestamptz, + } +} diff --git a/src/worker_pool.rs b/src/worker_pool.rs index f00db58..2a32658 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -170,7 +170,10 @@ mod job_pool_tests { } fn get_all_tasks(conn: &PgConnection) -> Vec { - fang_tasks::table.get_results::(conn).unwrap() + fang_tasks::table + .filter(fang_tasks::task_type.eq("worker_pool_test")) + .get_results::(conn) + .unwrap() } #[typetag::serde] @@ -186,6 +189,10 @@ mod job_pool_tests { Ok(()) } + + fn task_type(&self) -> String { + "worker_pool_test".to_string() + } } // this test is ignored because it commits data to the db @@ -200,8 +207,8 @@ mod job_pool_tests { worker_params.set_retention_mode(RetentionMode::KeepAll); let job_pool = WorkerPool::new_with_params(2, worker_params); - postgres.push_task(&MyJob::new(0)).unwrap(); - postgres.push_task(&MyJob::new(0)).unwrap(); + postgres.push_task(&MyJob::new(100)).unwrap(); + postgres.push_task(&MyJob::new(200)).unwrap(); job_pool.start();