Blocking refactor (#74)
* Adapting code to new Fang tables structure and refactoring to make it more generic (Not finished) * Refactoring of the blocking module * Finishing blocking module, starting to modify tests * Worker tests done, Queue tests left * Maybe all tests done ?? * Makefile clippy * Examples fixed * asynk feature Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>
This commit is contained in:
parent
dffee33b9c
commit
360140d064
29 changed files with 1247 additions and 1637 deletions
11
Cargo.toml
11
Cargo.toml
|
@ -14,22 +14,22 @@ rust-version = "1.62"
|
||||||
[features]
|
[features]
|
||||||
default = ["blocking", "asynk"]
|
default = ["blocking", "asynk"]
|
||||||
blocking = ["diesel", "diesel-derive-enum", "dotenv"]
|
blocking = ["diesel", "diesel-derive-enum", "dotenv"]
|
||||||
asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder", "async-recursion"]
|
asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "async-recursion"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
cron = "0.11"
|
cron = "0.11"
|
||||||
hex = "0.4"
|
|
||||||
sha2 = "0.10"
|
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
|
hex = "0.4"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
serde = "1"
|
serde = "1"
|
||||||
serde_derive = "1.0.141"
|
serde_derive = "1.0.141"
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
|
sha2 = "0.10"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
|
typed-builder = "0.10"
|
||||||
typetag = "0.2"
|
typetag = "0.2"
|
||||||
uuid = { version = "0.8", features = ["v4"] }
|
uuid = { version = "0.8", features = ["v4"] }
|
||||||
|
|
||||||
|
|
||||||
[dependencies.diesel]
|
[dependencies.diesel]
|
||||||
version = "1.4"
|
version = "1.4"
|
||||||
features = ["postgres", "serde_json", "chrono", "uuidv07", "r2d2"]
|
features = ["postgres", "serde_json", "chrono", "uuidv07", "r2d2"]
|
||||||
|
@ -63,9 +63,6 @@ optional = true
|
||||||
version = "0.1"
|
version = "0.1"
|
||||||
optional = true
|
optional = true
|
||||||
|
|
||||||
[dependencies.typed-builder]
|
|
||||||
version = "0.10"
|
|
||||||
optional = true
|
|
||||||
|
|
||||||
[dependencies.async-recursion]
|
[dependencies.async-recursion]
|
||||||
version = "1"
|
version = "1"
|
||||||
|
|
17
Makefile
Normal file
17
Makefile
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
db:
|
||||||
|
docker run --rm -d --name postgres -p 5432:5432 \
|
||||||
|
-e POSTGRES_DB=fang \
|
||||||
|
-e POSTGRES_USER=postgres \
|
||||||
|
-e POSTGRES_PASSWORD=postgres \
|
||||||
|
postgres:latest
|
||||||
|
clippy:
|
||||||
|
cargo clippy --all-features
|
||||||
|
diesel:
|
||||||
|
DATABASE_URL=postgres://postgres:postgres@localhost/fang diesel migration run
|
||||||
|
stop:
|
||||||
|
docker kill postgres
|
||||||
|
tests:
|
||||||
|
DATABASE_URL=postgres://postgres:postgres@localhost/fang cargo test --all-features -- --color always --nocapture
|
||||||
|
|
||||||
|
ignored:
|
||||||
|
DATABASE_URL=postgres://postgres:postgres@localhost/fang cargo test --all-features -- --color always --nocapture --ignored
|
|
@ -1,10 +1,10 @@
|
||||||
use fang::async_trait;
|
use fang::async_trait;
|
||||||
use fang::asynk::async_queue::AsyncQueueable;
|
use fang::asynk::async_queue::AsyncQueueable;
|
||||||
use fang::asynk::async_runnable::Error;
|
use fang::asynk::async_runnable::Error;
|
||||||
use fang::asynk::async_runnable::Scheduled;
|
|
||||||
use fang::serde::{Deserialize, Serialize};
|
use fang::serde::{Deserialize, Serialize};
|
||||||
use fang::typetag;
|
use fang::typetag;
|
||||||
use fang::AsyncRunnable;
|
use fang::AsyncRunnable;
|
||||||
|
use fang::Scheduled;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
#[serde(crate = "fang::serde")]
|
#[serde(crate = "fang::serde")]
|
||||||
|
|
|
@ -6,7 +6,8 @@ edition = "2018"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
fang = { path = "../../" , features = ["blocking"]}
|
fang = { path = "../../../" , features = ["blocking"]}
|
||||||
signal-hook = "0.3.10"
|
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
env_logger = "0.9.0"
|
env_logger = "0.9.0"
|
||||||
|
log = "0.4.0"
|
||||||
|
diesel = "1.4.8"
|
35
fang_examples/blocking/simple_cron_worker/src/lib.rs
Normal file
35
fang_examples/blocking/simple_cron_worker/src/lib.rs
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
use fang::runnable::Runnable;
|
||||||
|
use fang::serde::{Deserialize, Serialize};
|
||||||
|
use fang::typetag;
|
||||||
|
use fang::Error;
|
||||||
|
use fang::Queueable;
|
||||||
|
use fang::Scheduled;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
#[serde(crate = "fang::serde")]
|
||||||
|
pub struct MyCronTask {}
|
||||||
|
|
||||||
|
#[typetag::serde]
|
||||||
|
impl Runnable for MyCronTask {
|
||||||
|
fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> {
|
||||||
|
log::info!("CRON !!!!!!!!!!!!!!!!!");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn task_type(&self) -> String {
|
||||||
|
"cron_test".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cron(&self) -> Option<Scheduled> {
|
||||||
|
// sec min hour day of month month day of week year
|
||||||
|
// be careful works only with UTC hour.
|
||||||
|
// https://www.timeanddate.com/worldclock/timezone/utc
|
||||||
|
let expression = "0/20 * * * Aug-Sep * 2022/1";
|
||||||
|
Some(Scheduled::CronPattern(expression.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn uniq(&self) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
43
fang_examples/blocking/simple_cron_worker/src/main.rs
Normal file
43
fang_examples/blocking/simple_cron_worker/src/main.rs
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
use diesel::r2d2;
|
||||||
|
use dotenv::dotenv;
|
||||||
|
use fang::PgConnection;
|
||||||
|
use fang::Queue;
|
||||||
|
use fang::Queueable;
|
||||||
|
use fang::RetentionMode;
|
||||||
|
use fang::WorkerPool;
|
||||||
|
use simple_worker::MyCronTask;
|
||||||
|
use std::env;
|
||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
pub fn connection_pool(pool_size: u32) -> r2d2::Pool<r2d2::ConnectionManager<PgConnection>> {
|
||||||
|
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||||
|
|
||||||
|
let manager = r2d2::ConnectionManager::<PgConnection>::new(database_url);
|
||||||
|
|
||||||
|
r2d2::Pool::builder()
|
||||||
|
.max_size(pool_size)
|
||||||
|
.build(manager)
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
dotenv().ok();
|
||||||
|
|
||||||
|
env_logger::init();
|
||||||
|
|
||||||
|
let queue = Queue::builder().connection_pool(connection_pool(2)).build();
|
||||||
|
|
||||||
|
let mut worker_pool = WorkerPool::<Queue>::builder()
|
||||||
|
.queue(queue)
|
||||||
|
.retention_mode(RetentionMode::KeepAll)
|
||||||
|
.number_of_workers(2_u32)
|
||||||
|
.task_type("cron_test".to_string())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
worker_pool.queue.schedule_task(&MyCronTask {}).unwrap();
|
||||||
|
|
||||||
|
worker_pool.start().unwrap();
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(100))
|
||||||
|
}
|
2
fang_examples/blocking/simple_worker/.gitignore
vendored
Normal file
2
fang_examples/blocking/simple_worker/.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
/target
|
||||||
|
Cargo.lock
|
13
fang_examples/blocking/simple_worker/Cargo.toml
Normal file
13
fang_examples/blocking/simple_worker/Cargo.toml
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
[package]
|
||||||
|
name = "simple_worker"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
fang = { path = "../../../" , features = ["blocking"]}
|
||||||
|
dotenv = "0.15.0"
|
||||||
|
env_logger = "0.9.0"
|
||||||
|
log = "0.4.0"
|
||||||
|
diesel = "1.4.8"
|
3
fang_examples/blocking/simple_worker/README.md
Normal file
3
fang_examples/blocking/simple_worker/README.md
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
## Simple example
|
||||||
|
|
||||||
|
The job described in this example enqueues a new job during its execution saving thread name of the current worker to its metadata.
|
96
fang_examples/blocking/simple_worker/src/lib.rs
Normal file
96
fang_examples/blocking/simple_worker/src/lib.rs
Normal file
|
@ -0,0 +1,96 @@
|
||||||
|
use fang::runnable::Runnable;
|
||||||
|
use fang::serde::{Deserialize, Serialize};
|
||||||
|
use fang::typetag;
|
||||||
|
use fang::Error;
|
||||||
|
use fang::Queueable;
|
||||||
|
use std::thread;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
#[serde(crate = "fang::serde")]
|
||||||
|
pub struct MyTask {
|
||||||
|
pub number: u16,
|
||||||
|
pub current_thread_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MyTask {
|
||||||
|
pub fn new(number: u16) -> Self {
|
||||||
|
let handle = thread::current();
|
||||||
|
let current_thread_name = handle.name().unwrap().to_string();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
number,
|
||||||
|
current_thread_name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[typetag::serde]
|
||||||
|
impl Runnable for MyTask {
|
||||||
|
fn run(&self, queue: &dyn Queueable) -> Result<(), Error> {
|
||||||
|
thread::sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
|
let new_task = MyTask::new(self.number + 1);
|
||||||
|
|
||||||
|
log::info!(
|
||||||
|
"The number is {}, thread name {}",
|
||||||
|
self.number,
|
||||||
|
self.current_thread_name
|
||||||
|
);
|
||||||
|
|
||||||
|
queue.insert_task(&new_task).unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn task_type(&self) -> String {
|
||||||
|
"worker_pool_test".to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
#[serde(crate = "fang::serde")]
|
||||||
|
pub struct MyFailingTask {
|
||||||
|
pub number: u16,
|
||||||
|
pub current_thread_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MyFailingTask {
|
||||||
|
pub fn new(number: u16) -> Self {
|
||||||
|
let handle = thread::current();
|
||||||
|
let current_thread_name = handle.name().unwrap().to_string();
|
||||||
|
Self {
|
||||||
|
number,
|
||||||
|
current_thread_name,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[typetag::serde]
|
||||||
|
impl Runnable for MyFailingTask {
|
||||||
|
fn run(&self, queue: &dyn Queueable) -> Result<(), Error> {
|
||||||
|
let new_task = MyFailingTask::new(self.number + 1);
|
||||||
|
|
||||||
|
queue.insert_task(&new_task).unwrap();
|
||||||
|
|
||||||
|
log::info!(
|
||||||
|
"Failing task number {}, Thread name:{}",
|
||||||
|
self.number,
|
||||||
|
self.current_thread_name
|
||||||
|
);
|
||||||
|
|
||||||
|
thread::sleep(Duration::from_secs(3));
|
||||||
|
|
||||||
|
let b = true;
|
||||||
|
|
||||||
|
if b {
|
||||||
|
panic!("Hello!");
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn task_type(&self) -> String {
|
||||||
|
"worker_pool_test".to_string()
|
||||||
|
}
|
||||||
|
}
|
50
fang_examples/blocking/simple_worker/src/main.rs
Normal file
50
fang_examples/blocking/simple_worker/src/main.rs
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
use diesel::r2d2;
|
||||||
|
use dotenv::dotenv;
|
||||||
|
use fang::PgConnection;
|
||||||
|
use fang::Queue;
|
||||||
|
use fang::Queueable;
|
||||||
|
use fang::RetentionMode;
|
||||||
|
use fang::WorkerPool;
|
||||||
|
use simple_worker::MyFailingTask;
|
||||||
|
use simple_worker::MyTask;
|
||||||
|
use std::env;
|
||||||
|
use std::thread::sleep;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
pub fn connection_pool(pool_size: u32) -> r2d2::Pool<r2d2::ConnectionManager<PgConnection>> {
|
||||||
|
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
|
||||||
|
|
||||||
|
let manager = r2d2::ConnectionManager::<PgConnection>::new(database_url);
|
||||||
|
|
||||||
|
r2d2::Pool::builder()
|
||||||
|
.max_size(pool_size)
|
||||||
|
.build(manager)
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
dotenv().ok();
|
||||||
|
|
||||||
|
env_logger::init();
|
||||||
|
|
||||||
|
let queue = Queue::builder().connection_pool(connection_pool(3)).build();
|
||||||
|
|
||||||
|
let mut worker_pool = WorkerPool::<Queue>::builder()
|
||||||
|
.queue(queue)
|
||||||
|
.retention_mode(RetentionMode::KeepAll)
|
||||||
|
.number_of_workers(3_u32)
|
||||||
|
.task_type("worker_pool_test".to_string())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
worker_pool.queue.insert_task(&MyTask::new(1)).unwrap();
|
||||||
|
worker_pool.queue.insert_task(&MyTask::new(1000)).unwrap();
|
||||||
|
|
||||||
|
worker_pool
|
||||||
|
.queue
|
||||||
|
.insert_task(&MyFailingTask::new(5000))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
worker_pool.start().unwrap();
|
||||||
|
|
||||||
|
sleep(Duration::from_secs(100))
|
||||||
|
}
|
|
@ -1,44 +0,0 @@
|
||||||
use fang::serde::{Deserialize, Serialize};
|
|
||||||
use fang::typetag;
|
|
||||||
use fang::Error;
|
|
||||||
use fang::PgConnection;
|
|
||||||
use fang::Queue;
|
|
||||||
use fang::Runnable;
|
|
||||||
use std::thread;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
#[serde(crate = "fang::serde")]
|
|
||||||
pub struct MyJob {
|
|
||||||
pub number: u16,
|
|
||||||
pub current_thread_name: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MyJob {
|
|
||||||
pub fn new(number: u16) -> Self {
|
|
||||||
let handle = thread::current();
|
|
||||||
let current_thread_name = handle.name().unwrap().to_string();
|
|
||||||
|
|
||||||
Self {
|
|
||||||
number,
|
|
||||||
current_thread_name,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[typetag::serde]
|
|
||||||
impl Runnable for MyJob {
|
|
||||||
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
|
||||||
thread::sleep(Duration::from_secs(3));
|
|
||||||
|
|
||||||
let new_job = MyJob::new(self.number + 1);
|
|
||||||
|
|
||||||
Queue::push_task_query(connection, &new_job).unwrap();
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn task_type(&self) -> String {
|
|
||||||
"worker_pool_test".to_string()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,41 +0,0 @@
|
||||||
use dotenv::dotenv;
|
|
||||||
use fang::Queue;
|
|
||||||
use fang::RetentionMode;
|
|
||||||
use fang::WorkerParams;
|
|
||||||
use fang::WorkerPool;
|
|
||||||
use signal_hook::{consts::signal::*, iterator::Signals};
|
|
||||||
use simple_worker::MyJob;
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
dotenv().ok();
|
|
||||||
|
|
||||||
env_logger::init();
|
|
||||||
|
|
||||||
let mut worker_params = WorkerParams::new();
|
|
||||||
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
|
||||||
|
|
||||||
let mut worker_pool = WorkerPool::new_with_params(2, worker_params);
|
|
||||||
worker_pool.start().unwrap();
|
|
||||||
|
|
||||||
let queue = Queue::new();
|
|
||||||
|
|
||||||
queue.push_task(&MyJob::new(1)).unwrap();
|
|
||||||
queue.push_task(&MyJob::new(1000)).unwrap();
|
|
||||||
|
|
||||||
let mut signals = Signals::new(&[SIGINT, SIGTERM]).unwrap();
|
|
||||||
for sig in signals.forever() {
|
|
||||||
match sig {
|
|
||||||
SIGINT => {
|
|
||||||
println!("Received SIGINT");
|
|
||||||
worker_pool.shutdown().unwrap();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
SIGTERM => {
|
|
||||||
println!("Received SIGTERM");
|
|
||||||
worker_pool.shutdown().unwrap();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_ => unreachable!("Received unexpected signal: {:?}", sig),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,5 +3,8 @@ pub mod async_runnable;
|
||||||
pub mod async_worker;
|
pub mod async_worker;
|
||||||
pub mod async_worker_pool;
|
pub mod async_worker_pool;
|
||||||
|
|
||||||
|
pub use async_queue::*;
|
||||||
pub use async_runnable::AsyncRunnable;
|
pub use async_runnable::AsyncRunnable;
|
||||||
pub use async_runnable::Error as AsyncError;
|
pub use async_runnable::Error as AsyncError;
|
||||||
|
pub use async_worker::*;
|
||||||
|
pub use async_worker_pool::*;
|
|
@ -1,6 +1,7 @@
|
||||||
use crate::async_runnable::Scheduled::*;
|
|
||||||
use crate::asynk::async_runnable::AsyncRunnable;
|
use crate::asynk::async_runnable::AsyncRunnable;
|
||||||
use crate::asynk::async_runnable::Error as FangError;
|
use crate::asynk::async_runnable::Error as FangError;
|
||||||
|
use crate::CronError;
|
||||||
|
use crate::Scheduled::*;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bb8_postgres::bb8::Pool;
|
use bb8_postgres::bb8::Pool;
|
||||||
use bb8_postgres::bb8::RunError;
|
use bb8_postgres::bb8::RunError;
|
||||||
|
@ -76,16 +77,6 @@ pub struct Task {
|
||||||
pub updated_at: DateTime<Utc>,
|
pub updated_at: DateTime<Utc>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
pub enum CronError {
|
|
||||||
#[error(transparent)]
|
|
||||||
LibraryError(#[from] cron::error::Error),
|
|
||||||
#[error("You have to implement method `cron()` in your AsyncRunnable")]
|
|
||||||
TaskNotSchedulableError,
|
|
||||||
#[error("No timestamps match with this cron pattern")]
|
|
||||||
NoTimestampsError,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
pub enum AsyncQueueError {
|
pub enum AsyncQueueError {
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
|
@ -717,9 +708,9 @@ mod async_queue_tests {
|
||||||
use super::AsyncQueueable;
|
use super::AsyncQueueable;
|
||||||
use super::FangTaskState;
|
use super::FangTaskState;
|
||||||
use super::Task;
|
use super::Task;
|
||||||
use crate::async_runnable::Scheduled;
|
|
||||||
use crate::asynk::AsyncError as Error;
|
use crate::asynk::AsyncError as Error;
|
||||||
use crate::asynk::AsyncRunnable;
|
use crate::asynk::AsyncRunnable;
|
||||||
|
use crate::Scheduled;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bb8_postgres::bb8::Pool;
|
use bb8_postgres::bb8::Pool;
|
||||||
use bb8_postgres::tokio_postgres::NoTls;
|
use bb8_postgres::tokio_postgres::NoTls;
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use crate::asynk::async_queue::AsyncQueueable;
|
use crate::asynk::async_queue::AsyncQueueable;
|
||||||
|
use crate::Scheduled;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::DateTime;
|
|
||||||
use chrono::Utc;
|
|
||||||
|
|
||||||
const COMMON_TYPE: &str = "common";
|
const COMMON_TYPE: &str = "common";
|
||||||
|
|
||||||
|
@ -10,11 +9,6 @@ pub struct Error {
|
||||||
pub description: String,
|
pub description: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum Scheduled {
|
|
||||||
CronPattern(String),
|
|
||||||
ScheduleOnce(DateTime<Utc>),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[typetag::serde(tag = "type")]
|
#[typetag::serde(tag = "type")]
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait AsyncRunnable: Send + Sync {
|
pub trait AsyncRunnable: Send + Sync {
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
use crate::async_runnable::Scheduled::*;
|
|
||||||
use crate::asynk::async_queue::AsyncQueueable;
|
use crate::asynk::async_queue::AsyncQueueable;
|
||||||
use crate::asynk::async_queue::FangTaskState;
|
use crate::asynk::async_queue::FangTaskState;
|
||||||
use crate::asynk::async_queue::Task;
|
use crate::asynk::async_queue::Task;
|
||||||
use crate::asynk::async_queue::DEFAULT_TASK_TYPE;
|
use crate::asynk::async_queue::DEFAULT_TASK_TYPE;
|
||||||
use crate::asynk::async_runnable::AsyncRunnable;
|
use crate::asynk::async_runnable::AsyncRunnable;
|
||||||
use crate::asynk::AsyncError as Error;
|
use crate::asynk::AsyncError as Error;
|
||||||
|
use crate::Scheduled::*;
|
||||||
use crate::{RetentionMode, SleepParams};
|
use crate::{RetentionMode, SleepParams};
|
||||||
use log::error;
|
use log::error;
|
||||||
use typed_builder::TypedBuilder;
|
use typed_builder::TypedBuilder;
|
||||||
|
@ -242,7 +242,6 @@ impl<'a> AsyncWorkerTest<'a> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod async_worker_tests {
|
mod async_worker_tests {
|
||||||
use super::AsyncWorkerTest;
|
use super::AsyncWorkerTest;
|
||||||
use crate::async_runnable::Scheduled;
|
|
||||||
use crate::asynk::async_queue::AsyncQueueTest;
|
use crate::asynk::async_queue::AsyncQueueTest;
|
||||||
use crate::asynk::async_queue::AsyncQueueable;
|
use crate::asynk::async_queue::AsyncQueueable;
|
||||||
use crate::asynk::async_queue::FangTaskState;
|
use crate::asynk::async_queue::FangTaskState;
|
||||||
|
@ -250,6 +249,7 @@ mod async_worker_tests {
|
||||||
use crate::asynk::AsyncError as Error;
|
use crate::asynk::AsyncError as Error;
|
||||||
use crate::asynk::AsyncRunnable;
|
use crate::asynk::AsyncRunnable;
|
||||||
use crate::RetentionMode;
|
use crate::RetentionMode;
|
||||||
|
use crate::Scheduled;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bb8_postgres::bb8::Pool;
|
use bb8_postgres::bb8::Pool;
|
||||||
use bb8_postgres::tokio_postgres::NoTls;
|
use bb8_postgres::tokio_postgres::NoTls;
|
||||||
|
@ -283,7 +283,7 @@ mod async_worker_tests {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
fn cron(&self) -> Option<Scheduled> {
|
fn cron(&self) -> Option<Scheduled> {
|
||||||
Some(Scheduled::ScheduleOnce(Utc::now() + Duration::seconds(7)))
|
Some(Scheduled::ScheduleOnce(Utc::now() + Duration::seconds(1)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -383,7 +383,7 @@ mod async_worker_tests {
|
||||||
assert_eq!(id, task.id);
|
assert_eq!(id, task.id);
|
||||||
assert_eq!(FangTaskState::New, task.state);
|
assert_eq!(FangTaskState::New, task.state);
|
||||||
|
|
||||||
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
|
tokio::time::sleep(core::time::Duration::from_secs(3)).await;
|
||||||
|
|
||||||
worker.run_tasks_until_none().await.unwrap();
|
worker.run_tasks_until_none().await.unwrap();
|
||||||
|
|
||||||
|
|
|
@ -1,13 +1,12 @@
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod executor;
|
|
||||||
pub mod queue;
|
pub mod queue;
|
||||||
pub mod scheduler;
|
pub mod runnable;
|
||||||
pub mod schema;
|
pub mod schema;
|
||||||
|
pub mod worker;
|
||||||
pub mod worker_pool;
|
pub mod worker_pool;
|
||||||
|
|
||||||
pub use error::FangError;
|
pub use error::FangError;
|
||||||
pub use executor::*;
|
|
||||||
pub use queue::*;
|
pub use queue::*;
|
||||||
pub use scheduler::*;
|
|
||||||
pub use schema::*;
|
pub use schema::*;
|
||||||
|
pub use worker::*;
|
||||||
pub use worker_pool::*;
|
pub use worker_pool::*;
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::blocking::queue::QueueError;
|
||||||
use std::io::Error as IoError;
|
use std::io::Error as IoError;
|
||||||
use std::sync::PoisonError;
|
use std::sync::PoisonError;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
@ -6,7 +7,8 @@ use thiserror::Error;
|
||||||
pub enum FangError {
|
pub enum FangError {
|
||||||
#[error("The shared state in an executor thread became poisoned")]
|
#[error("The shared state in an executor thread became poisoned")]
|
||||||
PoisonedLock,
|
PoisonedLock,
|
||||||
|
#[error(transparent)]
|
||||||
|
QueueError(#[from] QueueError),
|
||||||
#[error("Failed to create executor thread")]
|
#[error("Failed to create executor thread")]
|
||||||
ExecutorThreadCreationFailed {
|
ExecutorThreadCreationFailed {
|
||||||
#[from]
|
#[from]
|
||||||
|
|
|
@ -1,328 +0,0 @@
|
||||||
use crate::error::FangError;
|
|
||||||
use crate::queue::Queue;
|
|
||||||
use crate::queue::Task;
|
|
||||||
use crate::worker_pool::{SharedState, WorkerState};
|
|
||||||
use crate::{RetentionMode, SleepParams};
|
|
||||||
use diesel::pg::PgConnection;
|
|
||||||
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
|
||||||
use log::error;
|
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
pub struct Executor {
|
|
||||||
pub pooled_connection: PooledConnection<ConnectionManager<PgConnection>>,
|
|
||||||
pub task_type: Option<String>,
|
|
||||||
pub sleep_params: SleepParams,
|
|
||||||
pub retention_mode: RetentionMode,
|
|
||||||
shared_state: Option<SharedState>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Error {
|
|
||||||
pub description: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[typetag::serde(tag = "type")]
|
|
||||||
pub trait Runnable {
|
|
||||||
fn run(&self, connection: &PgConnection) -> Result<(), Error>;
|
|
||||||
|
|
||||||
fn task_type(&self) -> String {
|
|
||||||
"common".to_string()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Executor {
|
|
||||||
pub fn new(pooled_connection: PooledConnection<ConnectionManager<PgConnection>>) -> Self {
|
|
||||||
Self {
|
|
||||||
pooled_connection,
|
|
||||||
sleep_params: SleepParams::default(),
|
|
||||||
retention_mode: RetentionMode::RemoveFinished,
|
|
||||||
task_type: None,
|
|
||||||
shared_state: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_shared_state(&mut self, shared_state: SharedState) {
|
|
||||||
self.shared_state = Some(shared_state);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_task_type(&mut self, task_type: String) {
|
|
||||||
self.task_type = Some(task_type);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_sleep_params(&mut self, sleep_params: SleepParams) {
|
|
||||||
self.sleep_params = sleep_params;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) {
|
|
||||||
self.retention_mode = retention_mode;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run(&self, task: Task) {
|
|
||||||
let result = self.execute_task(task);
|
|
||||||
self.finalize_task(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run_tasks(&mut self) -> Result<(), FangError> {
|
|
||||||
loop {
|
|
||||||
if let Some(ref shared_state) = self.shared_state {
|
|
||||||
let shared_state = shared_state.read()?;
|
|
||||||
if let WorkerState::Shutdown = *shared_state {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match Queue::fetch_and_touch_query(&self.pooled_connection, &self.task_type.clone()) {
|
|
||||||
Ok(Some(task)) => {
|
|
||||||
self.maybe_reset_sleep_period();
|
|
||||||
self.run(task);
|
|
||||||
}
|
|
||||||
Ok(None) => {
|
|
||||||
self.sleep();
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(error) => {
|
|
||||||
error!("Failed to fetch a task {:?}", error);
|
|
||||||
|
|
||||||
self.sleep();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn maybe_reset_sleep_period(&mut self) {
|
|
||||||
self.sleep_params.maybe_reset_sleep_period();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn sleep(&mut self) {
|
|
||||||
self.sleep_params.maybe_increase_sleep_period();
|
|
||||||
|
|
||||||
thread::sleep(self.sleep_params.sleep_period);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn execute_task(&self, task: Task) -> Result<Task, (Task, String)> {
|
|
||||||
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
|
|
||||||
let task_result = actual_task.run(&self.pooled_connection);
|
|
||||||
|
|
||||||
match task_result {
|
|
||||||
Ok(()) => Ok(task),
|
|
||||||
Err(error) => Err((task, error.description)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn finalize_task(&self, result: Result<Task, (Task, String)>) {
|
|
||||||
match self.retention_mode {
|
|
||||||
RetentionMode::KeepAll => {
|
|
||||||
match result {
|
|
||||||
Ok(task) => Queue::finish_task_query(&self.pooled_connection, &task).unwrap(),
|
|
||||||
Err((task, error)) => {
|
|
||||||
Queue::fail_task_query(&self.pooled_connection, &task, error).unwrap()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
RetentionMode::RemoveAll => {
|
|
||||||
match result {
|
|
||||||
Ok(task) => Queue::remove_task_query(&self.pooled_connection, task.id).unwrap(),
|
|
||||||
Err((task, _error)) => {
|
|
||||||
Queue::remove_task_query(&self.pooled_connection, task.id).unwrap()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
RetentionMode::RemoveFinished => match result {
|
|
||||||
Ok(task) => {
|
|
||||||
Queue::remove_task_query(&self.pooled_connection, task.id).unwrap();
|
|
||||||
}
|
|
||||||
Err((task, error)) => {
|
|
||||||
Queue::fail_task_query(&self.pooled_connection, &task, error).unwrap();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod executor_tests {
|
|
||||||
use super::Error;
|
|
||||||
use super::Executor;
|
|
||||||
use super::RetentionMode;
|
|
||||||
use super::Runnable;
|
|
||||||
use crate::queue::Queue;
|
|
||||||
use crate::schema::FangTaskState;
|
|
||||||
use crate::typetag;
|
|
||||||
use crate::NewTask;
|
|
||||||
use diesel::connection::Connection;
|
|
||||||
use diesel::pg::PgConnection;
|
|
||||||
use diesel::r2d2::{ConnectionManager, PooledConnection};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct ExecutorTaskTest {
|
|
||||||
pub number: u16,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[typetag::serde]
|
|
||||||
impl Runnable for ExecutorTaskTest {
|
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
|
||||||
println!("the number is {}", self.number);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct FailedTask {
|
|
||||||
pub number: u16,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[typetag::serde]
|
|
||||||
impl Runnable for FailedTask {
|
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
|
||||||
let message = format!("the number is {}", self.number);
|
|
||||||
|
|
||||||
Err(Error {
|
|
||||||
description: message,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct TaskType1 {}
|
|
||||||
|
|
||||||
#[typetag::serde]
|
|
||||||
impl Runnable for TaskType1 {
|
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn task_type(&self) -> String {
|
|
||||||
"type1".to_string()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct TaskType2 {}
|
|
||||||
|
|
||||||
#[typetag::serde]
|
|
||||||
impl Runnable for TaskType2 {
|
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn task_type(&self) -> String {
|
|
||||||
"type2".to_string()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn serialize(job: &dyn Runnable) -> serde_json::Value {
|
|
||||||
serde_json::to_value(job).unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn executes_and_finishes_task() {
|
|
||||||
let job = ExecutorTaskTest { number: 10 };
|
|
||||||
|
|
||||||
let new_task = NewTask {
|
|
||||||
metadata: serialize(&job),
|
|
||||||
task_type: "common".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut executor = Executor::new(pooled_connection());
|
|
||||||
executor.set_retention_mode(RetentionMode::KeepAll);
|
|
||||||
|
|
||||||
executor
|
|
||||||
.pooled_connection
|
|
||||||
.test_transaction::<(), Error, _>(|| {
|
|
||||||
let task = Queue::insert_query(&executor.pooled_connection, &new_task).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(FangTaskState::New, task.state);
|
|
||||||
|
|
||||||
executor.run(task.clone());
|
|
||||||
|
|
||||||
let found_task =
|
|
||||||
Queue::find_task_by_id_query(&executor.pooled_connection, task.id).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(FangTaskState::Finished, found_task.state);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[ignore]
|
|
||||||
fn executes_task_only_of_specific_type() {
|
|
||||||
let task1 = TaskType1 {};
|
|
||||||
let task2 = TaskType2 {};
|
|
||||||
|
|
||||||
let new_task1 = NewTask {
|
|
||||||
metadata: serialize(&task1),
|
|
||||||
task_type: "type1".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let new_task2 = NewTask {
|
|
||||||
metadata: serialize(&task2),
|
|
||||||
task_type: "type2".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let executor = Executor::new(pooled_connection());
|
|
||||||
|
|
||||||
let task1 = Queue::insert_query(&executor.pooled_connection, &new_task1).unwrap();
|
|
||||||
let task2 = Queue::insert_query(&executor.pooled_connection, &new_task2).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(FangTaskState::New, task1.state);
|
|
||||||
assert_eq!(FangTaskState::New, task2.state);
|
|
||||||
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
let mut executor = Executor::new(pooled_connection());
|
|
||||||
executor.set_retention_mode(RetentionMode::KeepAll);
|
|
||||||
executor.set_task_type("type1".to_string());
|
|
||||||
|
|
||||||
executor.run_tasks().unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
|
||||||
|
|
||||||
let found_task1 =
|
|
||||||
Queue::find_task_by_id_query(&executor.pooled_connection, task1.id).unwrap();
|
|
||||||
assert_eq!(FangTaskState::Finished, found_task1.state);
|
|
||||||
|
|
||||||
let found_task2 =
|
|
||||||
Queue::find_task_by_id_query(&executor.pooled_connection, task2.id).unwrap();
|
|
||||||
assert_eq!(FangTaskState::New, found_task2.state);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn saves_error_for_failed_task() {
|
|
||||||
let task = FailedTask { number: 10 };
|
|
||||||
|
|
||||||
let new_task = NewTask {
|
|
||||||
metadata: serialize(&task),
|
|
||||||
task_type: "common".to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let executor = Executor::new(pooled_connection());
|
|
||||||
|
|
||||||
executor
|
|
||||||
.pooled_connection
|
|
||||||
.test_transaction::<(), Error, _>(|| {
|
|
||||||
let task = Queue::insert_query(&executor.pooled_connection, &new_task).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(FangTaskState::New, task.state);
|
|
||||||
|
|
||||||
executor.run(task.clone());
|
|
||||||
|
|
||||||
let found_task =
|
|
||||||
Queue::find_task_by_id_query(&executor.pooled_connection, task.id).unwrap();
|
|
||||||
|
|
||||||
assert_eq!(FangTaskState::Failed, found_task.state);
|
|
||||||
assert_eq!(
|
|
||||||
"the number is 10".to_string(),
|
|
||||||
found_task.error_message.unwrap()
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn pooled_connection() -> PooledConnection<ConnectionManager<PgConnection>> {
|
|
||||||
Queue::connection_pool(5).get().unwrap()
|
|
||||||
}
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load diff
22
src/blocking/runnable.rs
Normal file
22
src/blocking/runnable.rs
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
use crate::queue::Queueable;
|
||||||
|
use crate::Error;
|
||||||
|
use crate::Scheduled;
|
||||||
|
|
||||||
|
pub const COMMON_TYPE: &str = "common";
|
||||||
|
|
||||||
|
#[typetag::serde(tag = "type")]
|
||||||
|
pub trait Runnable {
|
||||||
|
fn run(&self, _queueable: &dyn Queueable) -> Result<(), Error>;
|
||||||
|
|
||||||
|
fn task_type(&self) -> String {
|
||||||
|
COMMON_TYPE.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn uniq(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cron(&self) -> Option<Scheduled> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,125 +0,0 @@
|
||||||
use crate::executor::Runnable;
|
|
||||||
use crate::queue::Queue;
|
|
||||||
use crate::PeriodicTask;
|
|
||||||
use std::thread;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
pub struct Scheduler {
|
|
||||||
pub check_period: Duration,
|
|
||||||
pub error_margin: Duration,
|
|
||||||
pub queue: Queue,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for Scheduler {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
Scheduler::start(self.check_period, self.error_margin)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Scheduler {
|
|
||||||
pub fn start(check_period: Duration, error_margin: Duration) {
|
|
||||||
let queue = Queue::new();
|
|
||||||
let builder = thread::Builder::new().name("scheduler".to_string());
|
|
||||||
|
|
||||||
builder
|
|
||||||
.spawn(move || {
|
|
||||||
let scheduler = Self::new(check_period, error_margin, queue);
|
|
||||||
|
|
||||||
scheduler.schedule_loop();
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new(check_period: Duration, error_margin: Duration, queue: Queue) -> Self {
|
|
||||||
Self {
|
|
||||||
check_period,
|
|
||||||
queue,
|
|
||||||
error_margin,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn schedule_loop(&self) {
|
|
||||||
loop {
|
|
||||||
self.schedule();
|
|
||||||
|
|
||||||
thread::sleep(self.check_period);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn schedule(&self) {
|
|
||||||
if let Some(tasks) = self.queue.fetch_periodic_tasks(self.error_margin) {
|
|
||||||
for task in tasks {
|
|
||||||
self.process_task(task);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_task(&self, task: PeriodicTask) {
|
|
||||||
match task.scheduled_at {
|
|
||||||
None => {
|
|
||||||
self.queue.schedule_next_task_execution(&task).unwrap();
|
|
||||||
}
|
|
||||||
Some(_) => {
|
|
||||||
let actual_task: Box<dyn Runnable> =
|
|
||||||
serde_json::from_value(task.metadata.clone()).unwrap();
|
|
||||||
|
|
||||||
self.queue.push_task(&(*actual_task)).unwrap();
|
|
||||||
|
|
||||||
self.queue.schedule_next_task_execution(&task).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod task_scheduler_tests {
|
|
||||||
use super::Scheduler;
|
|
||||||
use crate::executor::Error;
|
|
||||||
use crate::executor::Runnable;
|
|
||||||
use crate::queue::Queue;
|
|
||||||
use crate::schema::fang_tasks;
|
|
||||||
use crate::typetag;
|
|
||||||
use crate::Task;
|
|
||||||
use diesel::pg::PgConnection;
|
|
||||||
use diesel::prelude::*;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::thread;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct ScheduledTask {}
|
|
||||||
|
|
||||||
#[typetag::serde]
|
|
||||||
impl Runnable for ScheduledTask {
|
|
||||||
fn run(&self, _connection: &PgConnection) -> Result<(), Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn task_type(&self) -> String {
|
|
||||||
"schedule".to_string()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[ignore]
|
|
||||||
fn schedules_tasks() {
|
|
||||||
let queue = Queue::new();
|
|
||||||
|
|
||||||
queue.push_periodic_task(&ScheduledTask {}, 10000).unwrap();
|
|
||||||
Scheduler::start(Duration::from_secs(1), Duration::from_secs(2));
|
|
||||||
|
|
||||||
let sleep_duration = Duration::from_secs(15);
|
|
||||||
thread::sleep(sleep_duration);
|
|
||||||
|
|
||||||
let tasks = get_all_tasks(&queue.connection);
|
|
||||||
|
|
||||||
assert_eq!(1, tasks.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_all_tasks(conn: &PgConnection) -> Vec<Task> {
|
|
||||||
fang_tasks::table
|
|
||||||
.filter(fang_tasks::task_type.eq("schedule"))
|
|
||||||
.get_results::<Task>(conn)
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -24,17 +24,8 @@ table! {
|
||||||
error_message -> Nullable<Text>,
|
error_message -> Nullable<Text>,
|
||||||
state -> FangTaskStateMapping,
|
state -> FangTaskStateMapping,
|
||||||
task_type -> Varchar,
|
task_type -> Varchar,
|
||||||
created_at -> Timestamptz,
|
uniq_hash -> Nullable<Text>,
|
||||||
updated_at -> Timestamptz,
|
scheduled_at -> Timestamptz,
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
table! {
|
|
||||||
fang_periodic_tasks (id) {
|
|
||||||
id -> Uuid,
|
|
||||||
metadata -> Jsonb,
|
|
||||||
period_in_millis -> Int8,
|
|
||||||
scheduled_at -> Nullable<Timestamptz>,
|
|
||||||
created_at -> Timestamptz,
|
created_at -> Timestamptz,
|
||||||
updated_at -> Timestamptz,
|
updated_at -> Timestamptz,
|
||||||
}
|
}
|
||||||
|
|
330
src/blocking/worker.rs
Normal file
330
src/blocking/worker.rs
Normal file
|
@ -0,0 +1,330 @@
|
||||||
|
use crate::error::FangError;
|
||||||
|
use crate::queue::Queueable;
|
||||||
|
use crate::queue::Task;
|
||||||
|
use crate::runnable::Runnable;
|
||||||
|
use crate::runnable::COMMON_TYPE;
|
||||||
|
use crate::schema::FangTaskState;
|
||||||
|
use crate::Scheduled::*;
|
||||||
|
use crate::{RetentionMode, SleepParams};
|
||||||
|
use log::error;
|
||||||
|
use std::thread;
|
||||||
|
use typed_builder::TypedBuilder;
|
||||||
|
|
||||||
|
#[derive(TypedBuilder)]
|
||||||
|
pub struct Worker<BQueue>
|
||||||
|
where
|
||||||
|
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||||
|
{
|
||||||
|
#[builder(setter(into))]
|
||||||
|
pub queue: BQueue,
|
||||||
|
#[builder(default=COMMON_TYPE.to_string(), setter(into))]
|
||||||
|
pub task_type: String,
|
||||||
|
#[builder(default, setter(into))]
|
||||||
|
pub sleep_params: SleepParams,
|
||||||
|
#[builder(default, setter(into))]
|
||||||
|
pub retention_mode: RetentionMode,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Error {
|
||||||
|
pub description: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<BQueue> Worker<BQueue>
|
||||||
|
where
|
||||||
|
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||||
|
{
|
||||||
|
pub fn run(&self, task: Task) {
|
||||||
|
let result = self.execute_task(task);
|
||||||
|
self.finalize_task(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run_tasks(&mut self) -> Result<(), FangError> {
|
||||||
|
loop {
|
||||||
|
match self.queue.fetch_and_touch_task(self.task_type.clone()) {
|
||||||
|
Ok(Some(task)) => {
|
||||||
|
let actual_task: Box<dyn Runnable> =
|
||||||
|
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||||
|
|
||||||
|
// check if task is scheduled or not
|
||||||
|
if let Some(CronPattern(_)) = actual_task.cron() {
|
||||||
|
// program task
|
||||||
|
self.queue.schedule_task(&*actual_task)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.maybe_reset_sleep_period();
|
||||||
|
self.run(task);
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
self.sleep();
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(error) => {
|
||||||
|
error!("Failed to fetch a task {:?}", error);
|
||||||
|
|
||||||
|
self.sleep();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn run_tasks_until_none(&mut self) -> Result<(), FangError> {
|
||||||
|
loop {
|
||||||
|
match self.queue.fetch_and_touch_task(self.task_type.clone()) {
|
||||||
|
Ok(Some(task)) => {
|
||||||
|
let actual_task: Box<dyn Runnable> =
|
||||||
|
serde_json::from_value(task.metadata.clone()).unwrap();
|
||||||
|
|
||||||
|
// check if task is scheduled or not
|
||||||
|
if let Some(CronPattern(_)) = actual_task.cron() {
|
||||||
|
// program task
|
||||||
|
self.queue.schedule_task(&*actual_task)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.maybe_reset_sleep_period();
|
||||||
|
self.run(task);
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
error!("Failed to fetch a task {:?}", error);
|
||||||
|
|
||||||
|
self.sleep();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn maybe_reset_sleep_period(&mut self) {
|
||||||
|
self.sleep_params.maybe_reset_sleep_period();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sleep(&mut self) {
|
||||||
|
self.sleep_params.maybe_increase_sleep_period();
|
||||||
|
|
||||||
|
thread::sleep(self.sleep_params.sleep_period);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn execute_task(&self, task: Task) -> Result<Task, (Task, String)> {
|
||||||
|
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
|
||||||
|
let task_result = actual_task.run(&self.queue);
|
||||||
|
|
||||||
|
match task_result {
|
||||||
|
Ok(()) => Ok(task),
|
||||||
|
Err(error) => Err((task, error.description)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn finalize_task(&self, result: Result<Task, (Task, String)>) {
|
||||||
|
match self.retention_mode {
|
||||||
|
RetentionMode::KeepAll => {
|
||||||
|
match result {
|
||||||
|
Ok(task) => self
|
||||||
|
.queue
|
||||||
|
.update_task_state(&task, FangTaskState::Finished)
|
||||||
|
.unwrap(),
|
||||||
|
Err((task, error)) => self.queue.fail_task(&task, error).unwrap(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
RetentionMode::RemoveAll => {
|
||||||
|
match result {
|
||||||
|
Ok(task) => self.queue.remove_task(task.id).unwrap(),
|
||||||
|
Err((task, _error)) => self.queue.remove_task(task.id).unwrap(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
RetentionMode::RemoveFinished => match result {
|
||||||
|
Ok(task) => {
|
||||||
|
self.queue.remove_task(task.id).unwrap();
|
||||||
|
}
|
||||||
|
Err((task, error)) => {
|
||||||
|
self.queue.fail_task(&task, error).unwrap();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod worker_tests {
|
||||||
|
use super::Error;
|
||||||
|
use super::RetentionMode;
|
||||||
|
use super::Runnable;
|
||||||
|
use super::Worker;
|
||||||
|
use crate::queue::Queue;
|
||||||
|
use crate::queue::Queueable;
|
||||||
|
use crate::schema::FangTaskState;
|
||||||
|
use crate::typetag;
|
||||||
|
use chrono::Utc;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct WorkerTaskTest {
|
||||||
|
pub number: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[typetag::serde]
|
||||||
|
impl Runnable for WorkerTaskTest {
|
||||||
|
fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> {
|
||||||
|
println!("the number is {}", self.number);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn task_type(&self) -> String {
|
||||||
|
"worker_task".to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct FailedTask {
|
||||||
|
pub number: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[typetag::serde]
|
||||||
|
impl Runnable for FailedTask {
|
||||||
|
fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> {
|
||||||
|
let message = format!("the number is {}", self.number);
|
||||||
|
|
||||||
|
Err(Error {
|
||||||
|
description: message,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn task_type(&self) -> String {
|
||||||
|
"F_task".to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct TaskType1 {}
|
||||||
|
|
||||||
|
#[typetag::serde]
|
||||||
|
impl Runnable for TaskType1 {
|
||||||
|
fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn task_type(&self) -> String {
|
||||||
|
"type1".to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct TaskType2 {}
|
||||||
|
|
||||||
|
#[typetag::serde]
|
||||||
|
impl Runnable for TaskType2 {
|
||||||
|
fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn task_type(&self) -> String {
|
||||||
|
"type2".to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Worker tests has to commit because the worker operations commits
|
||||||
|
#[test]
|
||||||
|
#[ignore]
|
||||||
|
fn executes_and_finishes_task() {
|
||||||
|
let task = WorkerTaskTest { number: 10 };
|
||||||
|
|
||||||
|
let pool = Queue::connection_pool(5);
|
||||||
|
|
||||||
|
let queue = Queue::builder().connection_pool(pool).build();
|
||||||
|
|
||||||
|
let worker = Worker::<Queue>::builder()
|
||||||
|
.queue(queue)
|
||||||
|
.retention_mode(RetentionMode::KeepAll)
|
||||||
|
.task_type(task.task_type())
|
||||||
|
.build();
|
||||||
|
let pooled_connection = worker.queue.connection_pool.get().unwrap();
|
||||||
|
|
||||||
|
let task = Queue::insert_query(&pooled_connection, &task, Utc::now()).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(FangTaskState::New, task.state);
|
||||||
|
|
||||||
|
// this operation commits and thats why need to commit this test
|
||||||
|
worker.run(task.clone());
|
||||||
|
|
||||||
|
let found_task = Queue::find_task_by_id_query(&pooled_connection, task.id).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(FangTaskState::Finished, found_task.state);
|
||||||
|
|
||||||
|
Queue::remove_tasks_of_type_query(&pooled_connection, "worker_task").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[ignore]
|
||||||
|
fn executes_task_only_of_specific_type() {
|
||||||
|
let task1 = TaskType1 {};
|
||||||
|
let task2 = TaskType2 {};
|
||||||
|
|
||||||
|
let pool = Queue::connection_pool(5);
|
||||||
|
|
||||||
|
let queue = Queue::builder().connection_pool(pool).build();
|
||||||
|
|
||||||
|
let mut worker = Worker::<Queue>::builder()
|
||||||
|
.queue(queue)
|
||||||
|
.task_type(task1.task_type())
|
||||||
|
.retention_mode(RetentionMode::KeepAll)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let pooled_connection = worker.queue.connection_pool.get().unwrap();
|
||||||
|
|
||||||
|
let task1 = Queue::insert_query(&pooled_connection, &task1, Utc::now()).unwrap();
|
||||||
|
let task2 = Queue::insert_query(&pooled_connection, &task2, Utc::now()).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(FangTaskState::New, task1.state);
|
||||||
|
assert_eq!(FangTaskState::New, task2.state);
|
||||||
|
|
||||||
|
worker.run_tasks_until_none().unwrap();
|
||||||
|
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||||
|
|
||||||
|
let found_task1 = Queue::find_task_by_id_query(&pooled_connection, task1.id).unwrap();
|
||||||
|
assert_eq!(FangTaskState::Finished, found_task1.state);
|
||||||
|
|
||||||
|
let found_task2 = Queue::find_task_by_id_query(&pooled_connection, task2.id).unwrap();
|
||||||
|
assert_eq!(FangTaskState::New, found_task2.state);
|
||||||
|
|
||||||
|
Queue::remove_tasks_of_type_query(&pooled_connection, "type1").unwrap();
|
||||||
|
Queue::remove_tasks_of_type_query(&pooled_connection, "type2").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[ignore]
|
||||||
|
fn saves_error_for_failed_task() {
|
||||||
|
let task = FailedTask { number: 10 };
|
||||||
|
|
||||||
|
let pool = Queue::connection_pool(5);
|
||||||
|
|
||||||
|
let queue = Queue::builder().connection_pool(pool).build();
|
||||||
|
|
||||||
|
let worker = Worker::<Queue>::builder()
|
||||||
|
.queue(queue)
|
||||||
|
.retention_mode(RetentionMode::KeepAll)
|
||||||
|
.task_type(task.task_type())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let pooled_connection = worker.queue.connection_pool.get().unwrap();
|
||||||
|
|
||||||
|
let task = Queue::insert_query(&pooled_connection, &task, Utc::now()).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(FangTaskState::New, task.state);
|
||||||
|
|
||||||
|
worker.run(task.clone());
|
||||||
|
|
||||||
|
let found_task = Queue::find_task_by_id_query(&pooled_connection, task.id).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(FangTaskState::Failed, found_task.state);
|
||||||
|
assert_eq!(
|
||||||
|
"the number is 10".to_string(),
|
||||||
|
found_task.error_message.unwrap()
|
||||||
|
);
|
||||||
|
|
||||||
|
Queue::remove_tasks_of_type_query(&pooled_connection, "F_task").unwrap();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,36 +1,38 @@
|
||||||
use crate::diesel::r2d2;
|
|
||||||
use crate::diesel::PgConnection;
|
|
||||||
use crate::error::FangError;
|
use crate::error::FangError;
|
||||||
use crate::executor::Executor;
|
use crate::queue::Queueable;
|
||||||
use crate::queue::Queue;
|
use crate::worker::Worker;
|
||||||
use crate::{RetentionMode, SleepParams};
|
use crate::RetentionMode;
|
||||||
|
use crate::SleepParams;
|
||||||
use log::error;
|
use log::error;
|
||||||
use log::info;
|
use log::info;
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::sync::{Arc, RwLock};
|
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
use typed_builder::TypedBuilder;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone, TypedBuilder)]
|
||||||
pub struct WorkerPool {
|
pub struct WorkerPool<BQueue>
|
||||||
|
where
|
||||||
|
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||||
|
{
|
||||||
|
#[builder(setter(into))]
|
||||||
pub number_of_workers: u32,
|
pub number_of_workers: u32,
|
||||||
pub worker_params: WorkerParams,
|
#[builder(setter(into))]
|
||||||
pub connection_pool: r2d2::Pool<r2d2::ConnectionManager<PgConnection>>,
|
pub queue: BQueue,
|
||||||
shared_state: SharedState,
|
#[builder(setter(into), default)]
|
||||||
thread_join_handles: Arc<RwLock<HashMap<String, thread::JoinHandle<()>>>>,
|
pub task_type: String,
|
||||||
|
#[builder(setter(into), default)]
|
||||||
|
pub sleep_params: SleepParams,
|
||||||
|
#[builder(setter(into), default)]
|
||||||
|
pub retention_mode: RetentionMode,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct WorkerThread {
|
#[derive(Clone, TypedBuilder)]
|
||||||
|
pub struct WorkerThread<BQueue>
|
||||||
|
where
|
||||||
|
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||||
|
{
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub restarts: u64,
|
pub restarts: u64,
|
||||||
pub worker_pool: WorkerPool,
|
pub worker_pool: WorkerPool<BQueue>,
|
||||||
graceful_shutdown: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type SharedState = Arc<RwLock<WorkerState>>;
|
|
||||||
|
|
||||||
pub enum WorkerState {
|
|
||||||
Running,
|
|
||||||
Shutdown,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -40,342 +42,73 @@ pub struct WorkerParams {
|
||||||
pub task_type: Option<String>,
|
pub task_type: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for WorkerParams {
|
impl<BQueue> WorkerPool<BQueue>
|
||||||
fn default() -> Self {
|
where
|
||||||
Self::new()
|
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||||
}
|
{
|
||||||
}
|
|
||||||
|
|
||||||
impl WorkerParams {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
retention_mode: None,
|
|
||||||
sleep_params: None,
|
|
||||||
task_type: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_retention_mode(&mut self, retention_mode: RetentionMode) {
|
|
||||||
self.retention_mode = Some(retention_mode);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_sleep_params(&mut self, sleep_params: SleepParams) {
|
|
||||||
self.sleep_params = Some(sleep_params);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_task_type(&mut self, task_type: String) {
|
|
||||||
self.task_type = Some(task_type);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl WorkerPool {
|
|
||||||
pub fn new(number_of_workers: u32) -> Self {
|
|
||||||
let worker_params = WorkerParams::new();
|
|
||||||
let connection_pool = Queue::connection_pool(number_of_workers);
|
|
||||||
|
|
||||||
Self {
|
|
||||||
number_of_workers,
|
|
||||||
worker_params,
|
|
||||||
connection_pool,
|
|
||||||
shared_state: Arc::new(RwLock::new(WorkerState::Running)),
|
|
||||||
thread_join_handles: Arc::new(RwLock::new(HashMap::with_capacity(
|
|
||||||
number_of_workers as usize,
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_with_params(number_of_workers: u32, worker_params: WorkerParams) -> Self {
|
|
||||||
let connection_pool = Queue::connection_pool(number_of_workers);
|
|
||||||
|
|
||||||
Self {
|
|
||||||
number_of_workers,
|
|
||||||
worker_params,
|
|
||||||
connection_pool,
|
|
||||||
shared_state: Arc::new(RwLock::new(WorkerState::Running)),
|
|
||||||
thread_join_handles: Arc::new(RwLock::new(HashMap::with_capacity(
|
|
||||||
number_of_workers as usize,
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn start(&mut self) -> Result<(), FangError> {
|
pub fn start(&mut self) -> Result<(), FangError> {
|
||||||
for idx in 1..self.number_of_workers + 1 {
|
for idx in 1..self.number_of_workers + 1 {
|
||||||
let worker_type = self
|
let name = format!("worker_{}{}", self.task_type, idx);
|
||||||
.worker_params
|
|
||||||
.task_type
|
|
||||||
.clone()
|
|
||||||
.unwrap_or_else(|| "".to_string());
|
|
||||||
let name = format!("worker_{}{}", worker_type, idx);
|
|
||||||
WorkerThread::spawn_in_pool(name.clone(), 0, self.clone())?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempt graceful shutdown of each job thread, blocks until all threads exit. Threads exit
|
let worker_thread = WorkerThread::builder()
|
||||||
/// when their current job finishes.
|
.name(name.clone())
|
||||||
pub fn shutdown(&mut self) -> Result<(), FangError> {
|
.restarts(0)
|
||||||
*self.shared_state.write()? = WorkerState::Shutdown;
|
.worker_pool(self.clone())
|
||||||
|
.build();
|
||||||
|
|
||||||
for (worker_name, thread) in self.thread_join_handles.write()?.drain() {
|
worker_thread.spawn()?;
|
||||||
if let Err(err) = thread.join() {
|
|
||||||
error!(
|
|
||||||
"Failed to exit executor thread '{}' cleanly: {:?}",
|
|
||||||
worker_name, err
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerThread {
|
impl<BQueue> WorkerThread<BQueue>
|
||||||
pub fn new(name: String, restarts: u64, worker_pool: WorkerPool) -> Self {
|
where
|
||||||
Self {
|
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||||
name,
|
{
|
||||||
restarts,
|
fn spawn(self) -> Result<(), FangError> {
|
||||||
worker_pool,
|
|
||||||
graceful_shutdown: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn spawn_in_pool(
|
|
||||||
name: String,
|
|
||||||
restarts: u64,
|
|
||||||
worker_pool: WorkerPool,
|
|
||||||
) -> Result<(), FangError> {
|
|
||||||
info!(
|
info!(
|
||||||
"starting a worker thread {}, number of restarts {}",
|
"starting a worker thread {}, number of restarts {}",
|
||||||
name, restarts
|
self.name, self.restarts
|
||||||
);
|
);
|
||||||
|
|
||||||
let job = WorkerThread::new(name.clone(), restarts, worker_pool.clone());
|
let builder = thread::Builder::new().name(self.name.clone());
|
||||||
let join_handle = Self::spawn_thread(name.clone(), job)?;
|
|
||||||
worker_pool
|
|
||||||
.thread_join_handles
|
|
||||||
.write()?
|
|
||||||
.insert(name, join_handle);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn spawn_thread(
|
|
||||||
name: String,
|
|
||||||
mut job: WorkerThread,
|
|
||||||
) -> Result<thread::JoinHandle<()>, FangError> {
|
|
||||||
let builder = thread::Builder::new().name(name.clone());
|
|
||||||
builder
|
builder
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
match job.worker_pool.connection_pool.get() {
|
let mut worker: Worker<BQueue> = Worker::builder()
|
||||||
Ok(connection) => {
|
.queue(self.worker_pool.queue.clone())
|
||||||
let mut executor = Executor::new(connection);
|
.task_type(self.worker_pool.task_type.clone())
|
||||||
executor.set_shared_state(job.worker_pool.shared_state.clone());
|
.retention_mode(self.worker_pool.retention_mode.clone())
|
||||||
|
.sleep_params(self.worker_pool.sleep_params.clone())
|
||||||
|
.build();
|
||||||
|
|
||||||
if let Some(ref task_type_str) = job.worker_pool.worker_params.task_type {
|
// Run worker
|
||||||
executor.set_task_type(task_type_str.to_owned());
|
if let Err(error) = worker.run_tasks() {
|
||||||
}
|
error!(
|
||||||
|
"Error executing tasks in worker '{}': {:?}",
|
||||||
if let Some(ref retention_mode) =
|
self.name, error
|
||||||
job.worker_pool.worker_params.retention_mode
|
);
|
||||||
{
|
|
||||||
executor.set_retention_mode(retention_mode.to_owned());
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ref sleep_params) = job.worker_pool.worker_params.sleep_params {
|
|
||||||
executor.set_sleep_params(sleep_params.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run executor
|
|
||||||
match executor.run_tasks() {
|
|
||||||
Ok(_) => {
|
|
||||||
job.graceful_shutdown = true;
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
error!("Error executing tasks in worker '{}': {:?}", name, error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
error!("Failed to get postgres connection: {:?}", error);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.map_err(FangError::from)
|
.map_err(FangError::from)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for WorkerThread {
|
impl<BQueue> Drop for WorkerThread<BQueue>
|
||||||
|
where
|
||||||
|
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||||
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.graceful_shutdown {
|
self.restarts += 1;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
WorkerThread::spawn_in_pool(
|
error!(
|
||||||
self.name.clone(),
|
"Worker {} stopped. Restarting. The number of restarts {}",
|
||||||
self.restarts + 1,
|
self.name, self.restarts,
|
||||||
self.worker_pool.clone(),
|
);
|
||||||
)
|
|
||||||
.unwrap();
|
self.clone().spawn().unwrap();
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod task_pool_tests {
|
|
||||||
use super::WorkerParams;
|
|
||||||
use super::WorkerPool;
|
|
||||||
use crate::executor::Error;
|
|
||||||
use crate::executor::Runnable;
|
|
||||||
use crate::queue::Queue;
|
|
||||||
use crate::schema::{fang_tasks, FangTaskState};
|
|
||||||
use crate::typetag;
|
|
||||||
use crate::RetentionMode;
|
|
||||||
use crate::Task;
|
|
||||||
use diesel::pg::PgConnection;
|
|
||||||
use diesel::prelude::*;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::thread;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct MyTask {
|
|
||||||
pub number: u16,
|
|
||||||
pub current_thread_name: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MyTask {
|
|
||||||
pub fn new(number: u16) -> Self {
|
|
||||||
let handle = thread::current();
|
|
||||||
let current_thread_name = handle.name().unwrap().to_string();
|
|
||||||
|
|
||||||
Self {
|
|
||||||
number,
|
|
||||||
current_thread_name,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[typetag::serde]
|
|
||||||
impl Runnable for MyTask {
|
|
||||||
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
|
||||||
thread::sleep(Duration::from_secs(3));
|
|
||||||
|
|
||||||
let new_task = MyTask::new(self.number + 1);
|
|
||||||
|
|
||||||
Queue::push_task_query(connection, &new_task).unwrap();
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn task_type(&self) -> String {
|
|
||||||
"worker_pool_test".to_string()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct ShutdownTask {
|
|
||||||
pub number: u16,
|
|
||||||
pub current_thread_name: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ShutdownTask {
|
|
||||||
pub fn new(number: u16) -> Self {
|
|
||||||
let handle = thread::current();
|
|
||||||
let current_thread_name = handle.name().unwrap().to_string();
|
|
||||||
|
|
||||||
Self {
|
|
||||||
number,
|
|
||||||
current_thread_name,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[typetag::serde]
|
|
||||||
impl Runnable for ShutdownTask {
|
|
||||||
fn run(&self, connection: &PgConnection) -> Result<(), Error> {
|
|
||||||
thread::sleep(Duration::from_secs(3));
|
|
||||||
|
|
||||||
let new_task = MyTask::new(self.number + 1);
|
|
||||||
|
|
||||||
Queue::push_task_query(connection, &new_task).unwrap();
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn task_type(&self) -> String {
|
|
||||||
"shutdown_test".to_string()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_all_tasks(conn: &PgConnection, job_type: &str) -> Vec<Task> {
|
|
||||||
fang_tasks::table
|
|
||||||
.filter(fang_tasks::task_type.eq(job_type))
|
|
||||||
.get_results::<Task>(conn)
|
|
||||||
.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Following tests ignored because they commit data to the db
|
|
||||||
#[test]
|
|
||||||
#[ignore]
|
|
||||||
fn tasks_are_finished_on_shutdown() {
|
|
||||||
let queue = Queue::new();
|
|
||||||
|
|
||||||
let mut worker_params = WorkerParams::new();
|
|
||||||
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
|
||||||
let mut task_pool = WorkerPool::new_with_params(2, worker_params);
|
|
||||||
|
|
||||||
queue.push_task(&ShutdownTask::new(100)).unwrap();
|
|
||||||
queue.push_task(&ShutdownTask::new(200)).unwrap();
|
|
||||||
|
|
||||||
task_pool.start().unwrap();
|
|
||||||
thread::sleep(Duration::from_secs(1));
|
|
||||||
task_pool.shutdown().unwrap();
|
|
||||||
thread::sleep(Duration::from_secs(5));
|
|
||||||
|
|
||||||
let tasks = get_all_tasks(&queue.connection, "shutdown_test");
|
|
||||||
let in_progress_tasks = tasks
|
|
||||||
.iter()
|
|
||||||
.filter(|task| task.state == FangTaskState::InProgress);
|
|
||||||
let finished_tasks = tasks
|
|
||||||
.iter()
|
|
||||||
.filter(|task| task.state == FangTaskState::Finished);
|
|
||||||
|
|
||||||
// Asserts first two tasks are allowed to finish, the tasks they spawn are not started
|
|
||||||
// though. No tasks should be in progress after a graceful shutdown.
|
|
||||||
assert_eq!(in_progress_tasks.count(), 0);
|
|
||||||
assert_eq!(finished_tasks.count(), 2);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[ignore]
|
|
||||||
fn tasks_are_split_between_two_threads() {
|
|
||||||
let queue = Queue::new();
|
|
||||||
|
|
||||||
let mut worker_params = WorkerParams::new();
|
|
||||||
worker_params.set_retention_mode(RetentionMode::KeepAll);
|
|
||||||
let mut task_pool = WorkerPool::new_with_params(2, worker_params);
|
|
||||||
|
|
||||||
queue.push_task(&MyTask::new(100)).unwrap();
|
|
||||||
queue.push_task(&MyTask::new(200)).unwrap();
|
|
||||||
|
|
||||||
task_pool.start().unwrap();
|
|
||||||
|
|
||||||
thread::sleep(Duration::from_secs(100));
|
|
||||||
|
|
||||||
let tasks = get_all_tasks(&queue.connection, "worker_pool_test");
|
|
||||||
|
|
||||||
assert!(tasks.len() > 40);
|
|
||||||
|
|
||||||
let test_worker1_tasks = tasks.clone().into_iter().filter(|task| {
|
|
||||||
serde_json::to_string(&task.metadata)
|
|
||||||
.unwrap()
|
|
||||||
.contains("worker_1")
|
|
||||||
});
|
|
||||||
|
|
||||||
let test_worker2_tasks = tasks.into_iter().filter(|task| {
|
|
||||||
serde_json::to_string(&task.metadata)
|
|
||||||
.unwrap()
|
|
||||||
.contains("worker_2")
|
|
||||||
});
|
|
||||||
|
|
||||||
assert!(test_worker1_tasks.count() > 20);
|
|
||||||
assert!(test_worker2_tasks.count() > 20);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
16
src/lib.rs
16
src/lib.rs
|
@ -1,6 +1,22 @@
|
||||||
#![allow(clippy::extra_unused_lifetimes)]
|
#![allow(clippy::extra_unused_lifetimes)]
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
pub enum Scheduled {
|
||||||
|
CronPattern(String),
|
||||||
|
ScheduleOnce(DateTime<Utc>),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum CronError {
|
||||||
|
#[error(transparent)]
|
||||||
|
LibraryError(#[from] cron::error::Error),
|
||||||
|
#[error("You have to implement method `cron()` in your AsyncRunnable")]
|
||||||
|
TaskNotSchedulableError,
|
||||||
|
#[error("No timestamps match with this cron pattern")]
|
||||||
|
NoTimestampsError,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum RetentionMode {
|
pub enum RetentionMode {
|
||||||
|
|
Loading…
Reference in a new issue