From a74d94389608ce98000b7d8c5337bd6033c23fdb Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Sun, 5 Dec 2021 08:43:17 +0200 Subject: [PATCH] fix shutdown test and add changelog entry --- CHANGELOG.md | 4 ++++ README.md | 13 ++++++++++- src/worker_pool.rs | 58 +++++++++++++++++++++++++++++++++++----------- 3 files changed, 61 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8adc33b..c08a739 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 0.5.0 (2021-12-05) + +- Add graceful shutdown - [#14](https://github.com/ayrat555/fang/pull/14) + ## 0.4.2 (2021-11-30) - Bump deps - [#13](https://github.com/ayrat555/fang/pull/13) diff --git a/README.md b/README.md index da87fe6..842d4e5 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Background job processing library for Rust. It uses Postgres DB as a task queue. ```toml [dependencies] -fang = "0.4.1" +fang = "0.5" serde = { version = "1.0", features = ["derive"] } ``` @@ -96,6 +96,17 @@ use fang::WorkerPool; WorkerPool::new(10).start(); ``` +Use `shutdown` to stop worker threads, they will try to finish in-progress tasks. + +```rust + +use fang::WorkerPool; + +worker_pool = WorkerPool::new(10).start().unwrap; + +worker_pool.shutdown() +``` + Using a library like [signal-hook][signal-hook], it's possible to gracefully shutdown a worker. See the Simple Worker for an example implementation. diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 00c452b..bb4d394 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -270,9 +270,44 @@ mod job_pool_tests { } } - fn get_all_tasks(conn: &PgConnection) -> Vec { + #[derive(Serialize, Deserialize)] + struct ShutdownJob { + pub number: u16, + pub current_thread_name: String, + } + + impl ShutdownJob { + pub fn new(number: u16) -> Self { + let handle = thread::current(); + let current_thread_name = handle.name().unwrap().to_string(); + + Self { + number, + current_thread_name, + } + } + } + + #[typetag::serde] + impl Runnable for ShutdownJob { + fn run(&self, connection: &PgConnection) -> Result<(), Error> { + thread::sleep(Duration::from_secs(3)); + + let new_job = MyJob::new(self.number + 1); + + Queue::push_task_query(connection, &new_job).unwrap(); + + Ok(()) + } + + fn task_type(&self) -> String { + "shutdown_test".to_string() + } + } + + fn get_all_tasks(conn: &PgConnection, job_type: &str) -> Vec { fang_tasks::table - .filter(fang_tasks::task_type.eq("worker_pool_test")) + .filter(fang_tasks::task_type.eq(job_type)) .get_results::(conn) .unwrap() } @@ -287,28 +322,25 @@ mod job_pool_tests { worker_params.set_retention_mode(RetentionMode::KeepAll); let mut job_pool = WorkerPool::new_with_params(2, worker_params); - queue.push_task(&MyJob::new(100)).unwrap(); - queue.push_task(&MyJob::new(200)).unwrap(); + queue.push_task(&ShutdownJob::new(100)).unwrap(); + queue.push_task(&ShutdownJob::new(200)).unwrap(); job_pool.start().unwrap(); thread::sleep(Duration::from_secs(1)); job_pool.shutdown().unwrap(); thread::sleep(Duration::from_secs(5)); - let tasks = get_all_tasks(&queue.connection); - // TODO - Replace with group_by when it's not nightly anymore - let new_tasks = tasks.iter().filter(|task| task.state == FangTaskState::New); - // let in_progress_tasks = tasks - // .iter() - // .filter(|task| task.state == FangTaskState::InProgress); + let tasks = get_all_tasks(&queue.connection, "shutdown_test"); + let in_progress_tasks = tasks + .iter() + .filter(|task| task.state == FangTaskState::InProgress); let finished_tasks = tasks .iter() .filter(|task| task.state == FangTaskState::Finished); // Asserts first two tasks are allowed to finish, the tasks they spawn are not started // though. No tasks should be in progress after a graceful shutdown. - assert_eq!(new_tasks.count(), 2); - // assert_eq!(in_progress_tasks.count(), 0); // TODO - Can't verify because of dirty DB + assert_eq!(in_progress_tasks.count(), 0); assert_eq!(finished_tasks.count(), 2); } @@ -328,7 +360,7 @@ mod job_pool_tests { thread::sleep(Duration::from_secs(100)); - let tasks = get_all_tasks(&queue.connection); + let tasks = get_all_tasks(&queue.connection, "worker_pool_test"); assert!(tasks.len() > 40);