fix shutdown test and add changelog entry
This commit is contained in:
parent
0473978460
commit
a74d943896
3 changed files with 61 additions and 14 deletions
|
@ -1,5 +1,9 @@
|
||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 0.5.0 (2021-12-05)
|
||||||
|
|
||||||
|
- Add graceful shutdown - [#14](https://github.com/ayrat555/fang/pull/14)
|
||||||
|
|
||||||
## 0.4.2 (2021-11-30)
|
## 0.4.2 (2021-11-30)
|
||||||
|
|
||||||
- Bump deps - [#13](https://github.com/ayrat555/fang/pull/13)
|
- Bump deps - [#13](https://github.com/ayrat555/fang/pull/13)
|
||||||
|
|
13
README.md
13
README.md
|
@ -14,7 +14,7 @@ Background job processing library for Rust. It uses Postgres DB as a task queue.
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
[dependencies]
|
[dependencies]
|
||||||
fang = "0.4.1"
|
fang = "0.5"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -96,6 +96,17 @@ use fang::WorkerPool;
|
||||||
WorkerPool::new(10).start();
|
WorkerPool::new(10).start();
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Use `shutdown` to stop worker threads, they will try to finish in-progress tasks.
|
||||||
|
|
||||||
|
```rust
|
||||||
|
|
||||||
|
use fang::WorkerPool;
|
||||||
|
|
||||||
|
worker_pool = WorkerPool::new(10).start().unwrap;
|
||||||
|
|
||||||
|
worker_pool.shutdown()
|
||||||
|
```
|
||||||
|
|
||||||
Using a library like [signal-hook][signal-hook], it's possible to gracefully shutdown a worker. See the
|
Using a library like [signal-hook][signal-hook], it's possible to gracefully shutdown a worker. See the
|
||||||
Simple Worker for an example implementation.
|
Simple Worker for an example implementation.
|
||||||
|
|
||||||
|
|
|
@ -270,9 +270,44 @@ mod job_pool_tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_all_tasks(conn: &PgConnection) -> Vec<Task> {
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct ShutdownJob {
|
||||||
|
pub number: u16,
|
||||||
|
pub current_thread_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ShutdownJob {
|
||||||
|
pub fn new(number: u16) -> Self {
|
||||||
|
let handle = thread::current();
|
||||||
|
let current_thread_name = handle.name().unwrap().to_string();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
number,
|
||||||
|
current_thread_name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[typetag::serde]
|
||||||
|
impl Runnable for ShutdownJob {
|
||||||
|
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
||||||
|
thread::sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
|
let new_job = MyJob::new(self.number + 1);
|
||||||
|
|
||||||
|
Queue::push_task_query(connection, &new_job).unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn task_type(&self) -> String {
|
||||||
|
"shutdown_test".to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_all_tasks(conn: &PgConnection, job_type: &str) -> Vec<Task> {
|
||||||
fang_tasks::table
|
fang_tasks::table
|
||||||
.filter(fang_tasks::task_type.eq("worker_pool_test"))
|
.filter(fang_tasks::task_type.eq(job_type))
|
||||||
.get_results::<Task>(conn)
|
.get_results::<Task>(conn)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
@ -287,28 +322,25 @@ mod job_pool_tests {
|
||||||
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
||||||
let mut job_pool = WorkerPool::new_with_params(2, worker_params);
|
let mut job_pool = WorkerPool::new_with_params(2, worker_params);
|
||||||
|
|
||||||
queue.push_task(&MyJob::new(100)).unwrap();
|
queue.push_task(&ShutdownJob::new(100)).unwrap();
|
||||||
queue.push_task(&MyJob::new(200)).unwrap();
|
queue.push_task(&ShutdownJob::new(200)).unwrap();
|
||||||
|
|
||||||
job_pool.start().unwrap();
|
job_pool.start().unwrap();
|
||||||
thread::sleep(Duration::from_secs(1));
|
thread::sleep(Duration::from_secs(1));
|
||||||
job_pool.shutdown().unwrap();
|
job_pool.shutdown().unwrap();
|
||||||
thread::sleep(Duration::from_secs(5));
|
thread::sleep(Duration::from_secs(5));
|
||||||
|
|
||||||
let tasks = get_all_tasks(&queue.connection);
|
let tasks = get_all_tasks(&queue.connection, "shutdown_test");
|
||||||
// TODO - Replace with group_by when it's not nightly anymore
|
let in_progress_tasks = tasks
|
||||||
let new_tasks = tasks.iter().filter(|task| task.state == FangTaskState::New);
|
.iter()
|
||||||
// let in_progress_tasks = tasks
|
.filter(|task| task.state == FangTaskState::InProgress);
|
||||||
// .iter()
|
|
||||||
// .filter(|task| task.state == FangTaskState::InProgress);
|
|
||||||
let finished_tasks = tasks
|
let finished_tasks = tasks
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|task| task.state == FangTaskState::Finished);
|
.filter(|task| task.state == FangTaskState::Finished);
|
||||||
|
|
||||||
// Asserts first two tasks are allowed to finish, the tasks they spawn are not started
|
// Asserts first two tasks are allowed to finish, the tasks they spawn are not started
|
||||||
// though. No tasks should be in progress after a graceful shutdown.
|
// though. No tasks should be in progress after a graceful shutdown.
|
||||||
assert_eq!(new_tasks.count(), 2);
|
assert_eq!(in_progress_tasks.count(), 0);
|
||||||
// assert_eq!(in_progress_tasks.count(), 0); // TODO - Can't verify because of dirty DB
|
|
||||||
assert_eq!(finished_tasks.count(), 2);
|
assert_eq!(finished_tasks.count(), 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -328,7 +360,7 @@ mod job_pool_tests {
|
||||||
|
|
||||||
thread::sleep(Duration::from_secs(100));
|
thread::sleep(Duration::from_secs(100));
|
||||||
|
|
||||||
let tasks = get_all_tasks(&queue.connection);
|
let tasks = get_all_tasks(&queue.connection, "worker_pool_test");
|
||||||
|
|
||||||
assert!(tasks.len() > 40);
|
assert!(tasks.len() > 40);
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue