diff --git a/Cargo.toml b/Cargo.toml index 19e16ff..7528ff9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fang" -version = "0.4.2" +version = "0.5.0" authors = ["Ayrat Badykov "] description = "Background job processing library for Rust" repository = "https://github.com/ayrat555/fang" @@ -20,3 +20,4 @@ serde_json = "1.0" typetag = "0.1" log = "0.4" serde = { version = "1.0", features = ["derive"] } +thiserror = "1.0.29" diff --git a/README.md b/README.md index b02d4f0..da87fe6 100644 --- a/README.md +++ b/README.md @@ -96,10 +96,12 @@ use fang::WorkerPool; WorkerPool::new(10).start(); ``` +Using a library like [signal-hook][signal-hook], it's possible to gracefully shutdown a worker. See the +Simple Worker for an example implementation. Check out: -- [simple example](https://github.com/ayrat555/fang/tree/master/fang_examples/simple_worker) - simple worker example +- [Simple Worker Example](https://github.com/ayrat555/fang/tree/master/fang_examples/simple_worker) - simple worker example - [El Monitorro](https://github.com/ayrat555/el_monitorro) - telegram feed reader. It uses Fang to synchronize feeds and deliver updates to users. ### Configuration @@ -231,6 +233,28 @@ In the example above, `push_periodic_task` is used to save the specified task to 4. Push to the branch (`git push origin my-new-feature`) 5. Create new Pull Request +### Running tests locally + +``` +cargo install diesel_cli + +docker run --rm -d --name postgres -p 5432:5432 \ + -e POSTGRES_DB=fang \ + -e POSTGRES_USER=postgres \ + -e POSTGRES_PASSWORD=postgres \ + postgres:latest + +DATABASE_URL=postgres://postgres:postgres@localhost/fang diesel migration run + +// Run regular tests +cargo test --all-features + +// Run dirty/long tests, DB must be recreated afterwards +cargo test --all-features -- --ignored --test-threads=1 + +docker kill postgres +``` + ## Author Ayrat Badykov (@ayrat555) @@ -242,3 +266,4 @@ Ayrat Badykov (@ayrat555) [docs]: https://docs.rs/fang/ [ga-test]: https://github.com/ayrat555/fang/actions/workflows/rust.yml/badge.svg [ga-style]: https://github.com/ayrat555/fang/actions/workflows/style.yml/badge.svg +[signal-hook]: https://crates.io/crates/signal-hook diff --git a/fang_examples/simple_worker/Cargo.toml b/fang_examples/simple_worker/Cargo.toml index c30f3f2..f1c3328 100644 --- a/fang_examples/simple_worker/Cargo.toml +++ b/fang_examples/simple_worker/Cargo.toml @@ -8,5 +8,6 @@ edition = "2018" [dependencies] fang = { path = "../../" } serde = { version = "1.0", features = ["derive"] } +signal-hook = "0.3.10" dotenv = "0.15.0" env_logger = "0.9.0" diff --git a/fang_examples/simple_worker/src/main.rs b/fang_examples/simple_worker/src/main.rs index e2b68ba..8a84982 100644 --- a/fang_examples/simple_worker/src/main.rs +++ b/fang_examples/simple_worker/src/main.rs @@ -4,6 +4,7 @@ use fang::RetentionMode; use fang::WorkerParams; use fang::WorkerPool; use simple_worker::MyJob; +use signal_hook::{consts::signal::*, iterator::Signals}; fn main() { dotenv().ok(); @@ -13,12 +14,28 @@ fn main() { let mut worker_params = WorkerParams::new(); worker_params.set_retention_mode(RetentionMode::KeepAll); - WorkerPool::new_with_params(2, worker_params).start(); + let mut worker_pool = WorkerPool::new_with_params(2, worker_params); + worker_pool.start().unwrap(); let queue = Queue::new(); queue.push_task(&MyJob::new(1)).unwrap(); queue.push_task(&MyJob::new(1000)).unwrap(); - std::thread::park(); + let mut signals = Signals::new(&[SIGINT, SIGTERM]).unwrap(); + for sig in signals.forever() { + match sig { + SIGINT => { + println!("Received SIGINT"); + worker_pool.shutdown().unwrap(); + break; + }, + SIGTERM => { + println!("Received SIGTERM"); + worker_pool.shutdown().unwrap(); + break; + }, + _ => unreachable!(format!("Received unexpected signal: {:?}", sig)), + } + } } diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..a1cb125 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,21 @@ +use std::io::Error as IoError; +use std::sync::PoisonError; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum FangError { + #[error("The shared state in an executor thread became poisoned")] + PoisonedLock, + + #[error("Failed to create executor thread")] + ExecutorThreadCreationFailed { + #[from] + source: IoError, + }, +} + +impl From> for FangError { + fn from(_: PoisonError) -> Self { + Self::PoisonedLock + } +} diff --git a/src/executor.rs b/src/executor.rs index 2e6dd74..1255b7b 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,5 +1,7 @@ +use crate::error::FangError; use crate::queue::Queue; use crate::queue::Task; +use crate::worker_pool::{SharedState, WorkerState}; use diesel::pg::PgConnection; use diesel::r2d2::{ConnectionManager, PooledConnection}; use log::error; @@ -11,6 +13,7 @@ pub struct Executor { pub task_type: Option, pub sleep_params: SleepParams, pub retention_mode: RetentionMode, + shared_state: Option, } #[derive(Clone)] @@ -74,9 +77,14 @@ impl Executor { sleep_params: SleepParams::default(), retention_mode: RetentionMode::RemoveFinished, task_type: None, + shared_state: None, } } + pub fn set_shared_state(&mut self, shared_state: SharedState) { + self.shared_state = Some(shared_state); + } + pub fn set_task_type(&mut self, task_type: String) { self.task_type = Some(task_type); } @@ -91,12 +99,18 @@ impl Executor { pub fn run(&self, task: Task) { let result = self.execute_task(task); - self.finalize_task(result) } - pub fn run_tasks(&mut self) { + pub fn run_tasks(&mut self) -> Result<(), FangError> { loop { + if let Some(ref shared_state) = self.shared_state { + let shared_state = shared_state.read()?; + if let WorkerState::Shutdown = *shared_state { + return Ok(()); + } + } + match Queue::fetch_and_touch_query(&self.pooled_connection, &self.task_type.clone()) { Ok(Some(task)) => { self.maybe_reset_sleep_period(); @@ -301,7 +315,7 @@ mod executor_tests { executor.set_retention_mode(RetentionMode::KeepAll); executor.set_task_type("type1".to_string()); - executor.run_tasks(); + executor.run_tasks().unwrap(); }); std::thread::sleep(std::time::Duration::from_millis(1000)); diff --git a/src/lib.rs b/src/lib.rs index 10414fd..1ac552d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,12 +3,14 @@ #[macro_use] extern crate diesel; +pub mod error; pub mod executor; pub mod queue; pub mod scheduler; pub mod schema; pub mod worker_pool; +pub use error::FangError; pub use executor::*; pub use queue::*; pub use scheduler::*; diff --git a/src/worker_pool.rs b/src/worker_pool.rs index ef20219..00c452b 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -1,24 +1,37 @@ use crate::diesel::r2d2; use crate::diesel::PgConnection; +use crate::error::FangError; use crate::executor::Executor; use crate::executor::RetentionMode; use crate::executor::SleepParams; use crate::queue::Queue; use log::error; use log::info; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; use std::thread; +#[derive(Clone)] pub struct WorkerPool { pub number_of_workers: u32, pub worker_params: WorkerParams, pub connection_pool: r2d2::Pool>, + shared_state: SharedState, + thread_join_handles: Arc>>>, } pub struct WorkerThread { pub name: String, - pub worker_params: WorkerParams, pub restarts: u64, - pub connection_pool: r2d2::Pool>, + pub worker_pool: WorkerPool, + graceful_shutdown: bool, +} + +pub type SharedState = Arc>; + +pub enum WorkerState { + Running, + Shutdown, } #[derive(Clone)] @@ -65,6 +78,10 @@ impl WorkerPool { number_of_workers, worker_params, connection_pool, + shared_state: Arc::new(RwLock::new(WorkerState::Running)), + thread_join_handles: Arc::new(RwLock::new(HashMap::with_capacity( + number_of_workers as usize, + ))), } } @@ -75,10 +92,14 @@ impl WorkerPool { number_of_workers, worker_params, connection_pool, + shared_state: Arc::new(RwLock::new(WorkerState::Running)), + thread_join_handles: Arc::new(RwLock::new(HashMap::with_capacity( + number_of_workers as usize, + ))), } } - pub fn start(&self) { + pub fn start(&mut self) -> Result<(), FangError> { for idx in 1..self.number_of_workers + 1 { let worker_type = self .worker_params @@ -86,89 +107,114 @@ impl WorkerPool { .clone() .unwrap_or_else(|| "".to_string()); let name = format!("worker_{}{}", worker_type, idx); - WorkerThread::spawn_in_pool( - self.worker_params.clone(), - name, - 0, - self.connection_pool.clone(), - ) + WorkerThread::spawn_in_pool(name.clone(), 0, self.clone())?; } + Ok(()) + } + + /// Attempt graceful shutdown of each job thread, blocks until all threads exit. Threads exit + /// when their current job finishes. + pub fn shutdown(&mut self) -> Result<(), FangError> { + *self.shared_state.write()? = WorkerState::Shutdown; + + for (worker_name, thread) in self.thread_join_handles.write()?.drain() { + if let Err(err) = thread.join() { + error!( + "Failed to exit executor thread '{}' cleanly: {:?}", + worker_name, err + ); + } + } + Ok(()) } } impl WorkerThread { - pub fn new( - worker_params: WorkerParams, - name: String, - restarts: u64, - connection_pool: r2d2::Pool>, - ) -> Self { + pub fn new(name: String, restarts: u64, worker_pool: WorkerPool) -> Self { Self { name, - worker_params, restarts, - connection_pool, + worker_pool, + graceful_shutdown: false, } } pub fn spawn_in_pool( - worker_params: WorkerParams, name: String, restarts: u64, - connection_pool: r2d2::Pool>, - ) { - let builder = thread::Builder::new().name(name.clone()); - + worker_pool: WorkerPool, + ) -> Result<(), FangError> { info!( "starting a worker thread {}, number of restarts {}", name, restarts ); + let job = WorkerThread::new(name.clone(), restarts, worker_pool.clone()); + let join_handle = Self::spawn_thread(name.clone(), job)?; + worker_pool + .thread_join_handles + .write()? + .insert(name, join_handle); + Ok(()) + } + + fn spawn_thread( + name: String, + mut job: WorkerThread, + ) -> Result, FangError> { + let builder = thread::Builder::new().name(name.clone()); builder .spawn(move || { - // when _job is dropped, it will be restarted (see Drop trait impl) - let _job = WorkerThread::new( - worker_params.clone(), - name, - restarts, - connection_pool.clone(), - ); - - match connection_pool.get() { + match job.worker_pool.connection_pool.get() { Ok(connection) => { let mut executor = Executor::new(connection); + executor.set_shared_state(job.worker_pool.shared_state.clone()); - if let Some(task_type_str) = worker_params.task_type { - executor.set_task_type(task_type_str); + if let Some(ref task_type_str) = job.worker_pool.worker_params.task_type { + executor.set_task_type(task_type_str.to_owned()); } - if let Some(retention_mode) = worker_params.retention_mode { - executor.set_retention_mode(retention_mode); + if let Some(ref retention_mode) = + job.worker_pool.worker_params.retention_mode + { + executor.set_retention_mode(retention_mode.to_owned()); } - if let Some(sleep_params) = worker_params.sleep_params { - executor.set_sleep_params(sleep_params); + if let Some(ref sleep_params) = job.worker_pool.worker_params.sleep_params { + executor.set_sleep_params(sleep_params.clone()); } - executor.run_tasks(); + // Run executor + match executor.run_tasks() { + Ok(_) => { + job.graceful_shutdown = true; + } + Err(error) => { + error!("Error executing tasks in worker '{}': {:?}", name, error); + } + } } Err(error) => { error!("Failed to get postgres connection: {:?}", error); } } }) - .unwrap(); + .map_err(FangError::from) } } impl Drop for WorkerThread { fn drop(&mut self) { + if self.graceful_shutdown { + return; + } + WorkerThread::spawn_in_pool( - self.worker_params.clone(), self.name.clone(), self.restarts + 1, - self.connection_pool.clone(), + self.worker_pool.clone(), ) + .unwrap(); } } @@ -181,7 +227,7 @@ mod job_pool_tests { use crate::executor::Runnable; use crate::queue::Queue; use crate::queue::Task; - use crate::schema::fang_tasks; + use crate::schema::{fang_tasks, FangTaskState}; use crate::typetag; use diesel::pg::PgConnection; use diesel::prelude::*; @@ -207,13 +253,6 @@ mod job_pool_tests { } } - fn get_all_tasks(conn: &PgConnection) -> Vec { - fang_tasks::table - .filter(fang_tasks::task_type.eq("worker_pool_test")) - .get_results::(conn) - .unwrap() - } - #[typetag::serde] impl Runnable for MyJob { fn run(&self, connection: &PgConnection) -> Result<(), Error> { @@ -231,7 +270,48 @@ mod job_pool_tests { } } - // this test is ignored because it commits data to the db + fn get_all_tasks(conn: &PgConnection) -> Vec { + fang_tasks::table + .filter(fang_tasks::task_type.eq("worker_pool_test")) + .get_results::(conn) + .unwrap() + } + + // Following tests ignored because they commit data to the db + #[test] + #[ignore] + fn tasks_are_finished_on_shutdown() { + let queue = Queue::new(); + + let mut worker_params = WorkerParams::new(); + worker_params.set_retention_mode(RetentionMode::KeepAll); + let mut job_pool = WorkerPool::new_with_params(2, worker_params); + + queue.push_task(&MyJob::new(100)).unwrap(); + queue.push_task(&MyJob::new(200)).unwrap(); + + job_pool.start().unwrap(); + thread::sleep(Duration::from_secs(1)); + job_pool.shutdown().unwrap(); + thread::sleep(Duration::from_secs(5)); + + let tasks = get_all_tasks(&queue.connection); + // TODO - Replace with group_by when it's not nightly anymore + let new_tasks = tasks.iter().filter(|task| task.state == FangTaskState::New); + // let in_progress_tasks = tasks + // .iter() + // .filter(|task| task.state == FangTaskState::InProgress); + let finished_tasks = tasks + .iter() + .filter(|task| task.state == FangTaskState::Finished); + + // Asserts first two tasks are allowed to finish, the tasks they spawn are not started + // though. No tasks should be in progress after a graceful shutdown. + assert_eq!(new_tasks.count(), 2); + // assert_eq!(in_progress_tasks.count(), 0); // TODO - Can't verify because of dirty DB + assert_eq!(finished_tasks.count(), 2); + } + #[test] #[ignore] fn tasks_are_split_between_two_threads() { @@ -239,12 +319,12 @@ mod job_pool_tests { let mut worker_params = WorkerParams::new(); worker_params.set_retention_mode(RetentionMode::KeepAll); - let job_pool = WorkerPool::new_with_params(2, worker_params); + let mut job_pool = WorkerPool::new_with_params(2, worker_params); queue.push_task(&MyJob::new(100)).unwrap(); queue.push_task(&MyJob::new(200)).unwrap(); - job_pool.start(); + job_pool.start().unwrap(); thread::sleep(Duration::from_secs(100));