diff --git a/CHANGELOG.md b/CHANGELOG.md index 723e79e..7cbcfb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ ## Unreleased - Execute different types of tasks in separate workers - [#1](https://github.com/ayrat555/fang/pull/1) - +- Add retention mode for tasks - [#2](https://github.com/ayrat555/fang/pull/2) ## 0.2.0 (2021-06-24) diff --git a/src/executor.rs b/src/executor.rs index 29267af..cdc1dcb 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -8,12 +8,50 @@ use std::time::Duration; pub struct Executor { pub storage: Postgres, pub task_type: Option, + pub sleep_params: SleepParams, + pub retention_mode: RetentionMode, +} + +#[derive(Clone)] +pub enum RetentionMode { + KeepAll, + RemoveAll, + RemoveFinished, +} + +#[derive(Clone)] +pub struct SleepParams { pub sleep_period: u64, pub max_sleep_period: u64, pub min_sleep_period: u64, pub sleep_step: u64, } +impl SleepParams { + pub fn maybe_reset_sleep_period(&mut self) { + if self.sleep_period != self.min_sleep_period { + self.sleep_period = self.min_sleep_period; + } + } + + pub fn maybe_increase_sleep_period(&mut self) { + if self.sleep_period < self.max_sleep_period { + self.sleep_period += self.sleep_step; + } + } +} + +impl Default for SleepParams { + fn default() -> Self { + SleepParams { + sleep_period: 5, + max_sleep_period: 15, + min_sleep_period: 5, + sleep_step: 5, + } + } +} + #[derive(Debug)] pub struct Error { pub description: String, @@ -35,10 +73,8 @@ impl Executor { pub fn new(storage: Postgres) -> Self { Self { storage, - sleep_period: 5, - max_sleep_period: 15, - min_sleep_period: 5, - sleep_step: 5, + sleep_params: SleepParams::default(), + retention_mode: RetentionMode::RemoveFinished, task_type: None, } } @@ -47,24 +83,18 @@ impl Executor { self.task_type = Some(task_type); } - pub fn run(&self, task: &Task) { - let actual_task: Box = serde_json::from_value(task.metadata.clone()).unwrap(); + pub fn set_sleep_params(&mut self, sleep_params: SleepParams) { + self.sleep_params = sleep_params; + } - let task_result = panic::catch_unwind(|| actual_task.run()); + pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) { + self.retention_mode = retention_mode; + } - match task_result { - Ok(result) => { - match result { - Ok(()) => self.storage.finish_task(task).unwrap(), - Err(error) => self.storage.fail_task(task, error.description).unwrap(), - }; - } + pub fn run(&self, task: Task) { + let result = self.execute_task(task); - Err(error) => { - let message = format!("panicked during tak execution {:?}", error); - self.storage.fail_task(task, message).unwrap(); - } - } + self.finalize_task(result) } pub fn run_tasks(&mut self) { @@ -72,7 +102,7 @@ impl Executor { match self.storage.fetch_and_touch(&self.task_type.clone()) { Ok(Some(task)) => { self.maybe_reset_sleep_period(); - self.run(&task); + self.run(task); } Ok(None) => { self.sleep(); @@ -88,17 +118,62 @@ impl Executor { } pub fn maybe_reset_sleep_period(&mut self) { - if self.sleep_period != self.min_sleep_period { - self.sleep_period = self.min_sleep_period; - } + self.sleep_params.maybe_reset_sleep_period(); } pub fn sleep(&mut self) { - if self.sleep_period < self.max_sleep_period { - self.sleep_period += self.sleep_step; - } + self.sleep_params.maybe_increase_sleep_period(); - thread::sleep(Duration::from_secs(self.sleep_period)); + thread::sleep(Duration::from_secs(self.sleep_params.sleep_period)); + } + + fn execute_task(&self, task: Task) -> Result { + let actual_task: Box = serde_json::from_value(task.metadata.clone()).unwrap(); + let task_result = panic::catch_unwind(|| actual_task.run()); + + match task_result { + Ok(result) => match result { + Ok(()) => Ok(task), + Err(error) => Err((task, error.description)), + }, + + Err(error) => { + let message = format!("panicked during task execution {:?}", error); + + Err((task, message)) + } + } + } + + fn finalize_task(&self, result: Result) { + match self.retention_mode { + RetentionMode::KeepAll => { + match result { + Ok(task) => self.storage.finish_task(&task).unwrap(), + Err((task, error)) => self.storage.fail_task(&task, error).unwrap(), + }; + + () + } + RetentionMode::RemoveAll => { + match result { + Ok(task) => self.storage.remove_task(task.id).unwrap(), + Err((task, _error)) => self.storage.remove_task(task.id).unwrap(), + }; + + () + } + RetentionMode::RemoveFinished => match result { + Ok(task) => { + self.storage.remove_task(task.id).unwrap(); + () + } + Err((task, error)) => { + self.storage.fail_task(&task, error).unwrap(); + () + } + }, + } } } @@ -106,6 +181,7 @@ impl Executor { mod executor_tests { use super::Error; use super::Executor; + use super::RetentionMode; use super::Runnable; use crate::postgres::NewTask; use crate::postgres::Postgres; @@ -198,7 +274,8 @@ mod executor_tests { task_type: "common".to_string(), }; - let executor = Executor::new(Postgres::new(None)); + let mut executor = Executor::new(Postgres::new()); + executor.set_retention_mode(RetentionMode::KeepAll); executor .storage @@ -208,7 +285,7 @@ mod executor_tests { assert_eq!(FangTaskState::New, task.state); - executor.run(&task); + executor.run(task.clone()); let found_task = executor.storage.find_task_by_id(task.id).unwrap(); @@ -234,7 +311,7 @@ mod executor_tests { task_type: "type2".to_string(), }; - let executor = Executor::new(Postgres::new(None)); + let executor = Executor::new(Postgres::new()); let task1 = executor.storage.insert(&new_task1).unwrap(); let task2 = executor.storage.insert(&new_task2).unwrap(); @@ -243,8 +320,9 @@ mod executor_tests { assert_eq!(FangTaskState::New, task2.state); std::thread::spawn(move || { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); let mut executor = Executor::new(postgres); + executor.set_retention_mode(RetentionMode::KeepAll); executor.set_task_type("type1".to_string()); executor.run_tasks(); @@ -268,7 +346,7 @@ mod executor_tests { task_type: "common".to_string(), }; - let executor = Executor::new(Postgres::new(None)); + let executor = Executor::new(Postgres::new()); executor .storage @@ -278,7 +356,7 @@ mod executor_tests { assert_eq!(FangTaskState::New, task.state); - executor.run(&task); + executor.run(task.clone()); let found_task = executor.storage.find_task_by_id(task.id).unwrap(); @@ -301,7 +379,7 @@ mod executor_tests { task_type: "common".to_string(), }; - let executor = Executor::new(Postgres::new(None)); + let executor = Executor::new(Postgres::new()); executor .storage @@ -311,13 +389,13 @@ mod executor_tests { assert_eq!(FangTaskState::New, task.state); - executor.run(&task); + executor.run(task.clone()); let found_task = executor.storage.find_task_by_id(task.id).unwrap(); assert_eq!(FangTaskState::Failed, found_task.state); assert_eq!( - "panicked during tak execution Any".to_string(), + "panicked during task execution Any".to_string(), found_task.error_message.unwrap() ); diff --git a/src/postgres.rs b/src/postgres.rs index b2196a6..a6be18a 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -33,8 +33,14 @@ pub struct Postgres { } impl Postgres { - pub fn new(database_url: Option) -> Self { - let connection = Self::pg_connection(database_url); + pub fn new() -> Self { + let connection = Self::pg_connection(None); + + Self { connection } + } + + pub fn new_with_url(database_url: String) -> Self { + let connection = Self::pg_connection(Some(database_url)); Self { connection } } @@ -51,7 +57,7 @@ impl Postgres { } pub fn enqueue_task(job: &dyn Runnable) -> Result { - Self::new(None).push_task(job) + Self::new().push_task(job) } pub fn insert(&self, params: &NewTask) -> Result { @@ -83,13 +89,16 @@ impl Postgres { } pub fn find_task_by_id(&self, id: Uuid) -> Option { - match fang_tasks::table + fang_tasks::table .filter(fang_tasks::id.eq(id)) .first::(&self.connection) - { - Ok(record) => Some(record), - _ => None, - } + .ok() + } + + pub fn remove_task(&self, id: Uuid) -> Result { + let query = fang_tasks::table.filter(fang_tasks::id.eq(id)); + + diesel::delete(query).execute(&self.connection) } pub fn finish_task(&self, task: &Task) -> Result { @@ -136,21 +145,18 @@ impl Postgres { } fn fetch_any_task(&self) -> Option { - match fang_tasks::table + fang_tasks::table .order(fang_tasks::created_at.asc()) .limit(1) .filter(fang_tasks::state.eq(FangTaskState::New)) .for_update() .skip_locked() .get_result::(&self.connection) - { - Ok(record) => Some(record), - _ => None, - } + .ok() } fn fetch_task_of_type(&self, task_type: &String) -> Option { - match fang_tasks::table + fang_tasks::table .order(fang_tasks::created_at.asc()) .limit(1) .filter(fang_tasks::state.eq(FangTaskState::New)) @@ -158,10 +164,7 @@ impl Postgres { .for_update() .skip_locked() .get_result::(&self.connection) - { - Ok(record) => Some(record), - _ => None, - } + .ok() } } @@ -182,7 +185,7 @@ mod postgres_tests { #[test] fn insert_inserts_task() { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); let new_task = NewTask { metadata: serde_json::json!(true), @@ -199,7 +202,7 @@ mod postgres_tests { #[test] fn fetch_task_fetches_the_oldest_task() { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); postgres.connection.test_transaction::<(), Error, _>(|| { let timestamp1 = Utc::now() - Duration::hours(40); @@ -220,7 +223,7 @@ mod postgres_tests { #[test] fn finish_task_updates_state_field() { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); postgres.connection.test_transaction::<(), Error, _>(|| { let task = insert_new_job(&postgres.connection); @@ -235,7 +238,7 @@ mod postgres_tests { #[test] fn fail_task_updates_state_field_and_sets_error_message() { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); postgres.connection.test_transaction::<(), Error, _>(|| { let task = insert_new_job(&postgres.connection); @@ -252,7 +255,7 @@ mod postgres_tests { #[test] fn fetch_and_touch_updates_state() { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); postgres.connection.test_transaction::<(), Error, _>(|| { let _task = insert_new_job(&postgres.connection); @@ -267,7 +270,7 @@ mod postgres_tests { #[test] fn fetch_and_touch_returns_none() { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); postgres.connection.test_transaction::<(), Error, _>(|| { let task = postgres.fetch_and_touch(&None).unwrap(); @@ -280,7 +283,7 @@ mod postgres_tests { #[test] fn push_task_serializes_and_inserts_task() { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); postgres.connection.test_transaction::<(), Error, _>(|| { let job = Job { number: 10 }; @@ -302,11 +305,43 @@ mod postgres_tests { }); } + #[test] + fn remove_task() { + let postgres = Postgres::new(); + + let new_task1 = NewTask { + metadata: serde_json::json!(true), + task_type: "common".to_string(), + }; + + let new_task2 = NewTask { + metadata: serde_json::json!(true), + task_type: "common".to_string(), + }; + + postgres.connection.test_transaction::<(), Error, _>(|| { + let task1 = postgres.insert(&new_task1).unwrap(); + assert!(postgres.find_task_by_id(task1.id).is_some()); + + let task2 = postgres.insert(&new_task2).unwrap(); + assert!(postgres.find_task_by_id(task2.id).is_some()); + + postgres.remove_task(task1.id).unwrap(); + assert!(postgres.find_task_by_id(task1.id).is_none()); + assert!(postgres.find_task_by_id(task2.id).is_some()); + + postgres.remove_task(task2.id).unwrap(); + assert!(postgres.find_task_by_id(task2.id).is_none()); + + Ok(()) + }); + } + // this test is ignored because it commits data to the db #[test] #[ignore] fn fetch_task_locks_the_record() { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); let timestamp1 = Utc::now() - Duration::hours(40); let task1 = insert_job(serde_json::json!(true), timestamp1, &postgres.connection); @@ -318,7 +353,7 @@ mod postgres_tests { let task2 = insert_job(serde_json::json!(false), timestamp2, &postgres.connection); let thread = std::thread::spawn(move || { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); postgres.connection.transaction::<(), Error, _>(|| { let found_task = postgres.fetch_task(&None).unwrap(); diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 8a68adb..27a1640 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -1,48 +1,89 @@ use crate::executor::Executor; +use crate::executor::RetentionMode; +use crate::executor::SleepParams; use crate::postgres::Postgres; use std::thread; pub struct WorkerPool { pub number_of_workers: u16, - pub task_type: Option, + pub worker_params: WorkerParams, } pub struct WorkerThread { pub name: String, - pub task_type: Option, + pub worker_params: WorkerParams, pub restarts: u64, } +#[derive(Clone)] +pub struct WorkerParams { + pub retention_mode: Option, + pub sleep_params: Option, + pub task_type: Option, +} + +impl WorkerParams { + pub fn new() -> Self { + Self { + retention_mode: None, + sleep_params: None, + task_type: None, + } + } + + pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) { + self.retention_mode = Some(retention_mode); + } + + pub fn set_sleep_params(&mut self, sleep_params: SleepParams) { + self.sleep_params = Some(sleep_params); + } + + pub fn set_task_type(&mut self, task_type: String) { + self.task_type = Some(task_type); + } +} + impl WorkerPool { - pub fn new(number_of_workers: u16, task_type: Option) -> Self { + pub fn new(number_of_workers: u16) -> Self { + let worker_params = WorkerParams::new(); + Self { number_of_workers, - task_type, + worker_params, + } + } + + pub fn new_with_params(number_of_workers: u16, worker_params: WorkerParams) -> Self { + Self { + number_of_workers, + worker_params, } } pub fn start(&self) { for idx in 1..self.number_of_workers + 1 { - let name = format!( - "worker_{}{}", - self.task_type.clone().unwrap_or("".to_string()), - idx - ); - WorkerThread::spawn_in_pool(self.task_type.clone(), name, 0) + let worker_type = self + .worker_params + .task_type + .clone() + .unwrap_or("".to_string()); + let name = format!("worker_{}{}", worker_type, idx); + WorkerThread::spawn_in_pool(self.worker_params.clone(), name, 0) } } } impl WorkerThread { - pub fn new(task_type: Option, name: String, restarts: u64) -> Self { + pub fn new(worker_params: WorkerParams, name: String, restarts: u64) -> Self { Self { name, - task_type, + worker_params, restarts, } } - pub fn spawn_in_pool(task_type: Option, name: String, restarts: u64) { + pub fn spawn_in_pool(worker_params: WorkerParams, name: String, restarts: u64) { let builder = thread::Builder::new().name(name.clone()); info!( @@ -53,16 +94,24 @@ impl WorkerThread { builder .spawn(move || { // when _job is dropped, it will be restarted (see Drop trait impl) - let _job = WorkerThread::new(task_type.clone(), name, restarts); + let _job = WorkerThread::new(worker_params.clone(), name, restarts); - let postgres = Postgres::new(None); + let postgres = Postgres::new(); let mut executor = Executor::new(postgres); - if let Some(task_type_str) = task_type { + if let Some(task_type_str) = worker_params.task_type { executor.set_task_type(task_type_str); } + if let Some(retention_mode) = worker_params.retention_mode { + executor.set_retention_mode(retention_mode); + } + + if let Some(sleep_params) = worker_params.sleep_params { + executor.set_sleep_params(sleep_params); + } + executor.run_tasks(); }) .unwrap(); @@ -71,7 +120,11 @@ impl WorkerThread { impl Drop for WorkerThread { fn drop(&mut self) { - WorkerThread::spawn_in_pool(self.task_type.clone(), self.name.clone(), self.restarts + 1) + WorkerThread::spawn_in_pool( + self.worker_params.clone(), + self.name.clone(), + self.restarts + 1, + ) } } @@ -114,7 +167,7 @@ mod job_pool_tests { #[typetag::serde] impl Runnable for MyJob { fn run(&self) -> Result<(), Error> { - let postgres = Postgres::new(None); + let postgres = Postgres::new(); thread::sleep(Duration::from_secs(3)); @@ -132,8 +185,8 @@ mod job_pool_tests { fn tasks_are_split_between_two_threads() { env_logger::init(); - let postgres = Postgres::new(None); - let job_pool = WorkerPool::new(2, None); + let postgres = Postgres::new(); + let job_pool = WorkerPool::new(2); postgres.push_task(&MyJob::new(0)).unwrap(); postgres.push_task(&MyJob::new(0)).unwrap();