From 133d1427611b6c26a69b545d31f3515a4743fcfb Mon Sep 17 00:00:00 2001 From: Ayrat Badykov Date: Sun, 31 Jul 2022 16:32:37 +0300 Subject: [PATCH] Async worker pool draft (#49) * Async worker pool draft * ignore all targets * simple worker example * add debug logs * fix * fix tests * remove log * remove debug logs * fix clippy --- .gitignore | 2 +- fang_examples/simple_async_worker/Cargo.toml | 16 ++++ fang_examples/simple_async_worker/src/lib.rs | 34 ++++++++ fang_examples/simple_async_worker/src/main.rs | 41 ++++++++++ src/asynk/async_queue.rs | 4 +- src/asynk/async_runnable.rs | 4 +- src/asynk/async_scheduler.rs | 2 +- src/asynk/async_worker.rs | 8 +- src/asynk/async_worker_pool.rs | 78 +++++++++++++++++++ src/asynk/mod.rs | 2 + src/lib.rs | 3 + 11 files changed, 184 insertions(+), 10 deletions(-) create mode 100644 fang_examples/simple_async_worker/Cargo.toml create mode 100644 fang_examples/simple_async_worker/src/lib.rs create mode 100644 fang_examples/simple_async_worker/src/main.rs create mode 100644 src/asynk/async_worker_pool.rs diff --git a/.gitignore b/.gitignore index 626b0a1..63b2ddb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ -/target +**/target Cargo.lock src/schema.rs diff --git a/fang_examples/simple_async_worker/Cargo.toml b/fang_examples/simple_async_worker/Cargo.toml new file mode 100644 index 0000000..0f138f5 --- /dev/null +++ b/fang_examples/simple_async_worker/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "simple_async_worker" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +fang = { path = "../../" , features = ["asynk"]} +env_logger = "0.9.0" +log = "0.4.0" +serde = { version = "1.0", features = ["derive"] } +tokio = { version = "1", features = ["full"] } +serde_json = "1" +async-trait = "0.1" +tokio-postgres = "0.7" diff --git a/fang_examples/simple_async_worker/src/lib.rs b/fang_examples/simple_async_worker/src/lib.rs new file mode 100644 index 0000000..58c206e --- /dev/null +++ b/fang_examples/simple_async_worker/src/lib.rs @@ -0,0 +1,34 @@ +use async_trait::async_trait; +use fang::asynk::async_queue::AsyncQueueable; +use fang::asynk::async_runnable::Error; +use fang::typetag; +use fang::AsyncRunnable; +use serde::Deserialize; +use serde::Serialize; +use std::time::Duration; + +#[derive(Serialize, Deserialize)] +pub struct MyTask { + pub number: u16, +} + +impl MyTask { + pub fn new(number: u16) -> Self { + Self { number } + } +} + +#[async_trait] +#[typetag::serde] +impl AsyncRunnable for MyTask { + async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> { + log::info!("the curreny number is {}", self.number); + tokio::time::sleep(Duration::from_secs(3)).await; + + let new_task = MyTask::new(self.number + 1); + let metadata = serde_json::to_value(&new_task as &dyn AsyncRunnable).unwrap(); + queue.insert_task(metadata, "common").await.unwrap(); + + Ok(()) + } +} diff --git a/fang_examples/simple_async_worker/src/main.rs b/fang_examples/simple_async_worker/src/main.rs new file mode 100644 index 0000000..c5012b0 --- /dev/null +++ b/fang_examples/simple_async_worker/src/main.rs @@ -0,0 +1,41 @@ +use fang::asynk::async_queue::AsyncQueue; +use fang::asynk::async_queue::AsyncQueueable; +use fang::asynk::async_worker_pool::AsyncWorkerPool; +use fang::AsyncRunnable; +use simple_async_worker::MyTask; +use std::time::Duration; +use tokio_postgres::NoTls; + +#[tokio::main] +async fn main() { + env_logger::init(); + + log::info!("Starting..."); + + let mut queue = AsyncQueue::connect("postgres://postgres:postgres@localhost/fang", NoTls, true) + .await + .unwrap(); + + log::info!("Queue connected..."); + + let mut pool = AsyncWorkerPool::builder() + .queue(queue.clone()) + .number_of_workers(2 as u16) + .build(); + + log::info!("Pool created ..."); + + pool.start().await; + log::info!("Workers started ..."); + + let task1 = MyTask::new(0); + let task2 = MyTask::new(20_000); + + let metadata1 = serde_json::to_value(&task1 as &dyn AsyncRunnable).unwrap(); + let metadata2 = serde_json::to_value(&task2 as &dyn AsyncRunnable).unwrap(); + + queue.insert_task(metadata1, "common").await.unwrap(); + queue.insert_task(metadata2, "common").await.unwrap(); + + tokio::time::sleep(Duration::from_secs(100)).await; +} diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index 2fb3e9f..24cdd47 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -109,7 +109,7 @@ impl From for FangError { } #[async_trait] -pub trait AsyncQueueable { +pub trait AsyncQueueable: Send { async fn fetch_and_touch_task( &mut self, task_type: Option, @@ -753,7 +753,7 @@ mod async_queue_tests { } #[typetag::serde] - #[async_trait(?Send)] + #[async_trait] impl AsyncRunnable for AsyncTask { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { Ok(()) diff --git a/src/asynk/async_runnable.rs b/src/asynk/async_runnable.rs index 048b3bd..e14f698 100644 --- a/src/asynk/async_runnable.rs +++ b/src/asynk/async_runnable.rs @@ -9,8 +9,8 @@ pub struct Error { } #[typetag::serde(tag = "type")] -#[async_trait(?Send)] -pub trait AsyncRunnable { +#[async_trait] +pub trait AsyncRunnable: Send + Sync { async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), Error>; fn task_type(&self) -> String { diff --git a/src/asynk/async_scheduler.rs b/src/asynk/async_scheduler.rs index e369f27..fa53a63 100644 --- a/src/asynk/async_scheduler.rs +++ b/src/asynk/async_scheduler.rs @@ -139,7 +139,7 @@ mod async_scheduler_tests { } #[typetag::serde] - #[async_trait(?Send)] + #[async_trait] impl AsyncRunnable for AsyncScheduledTask { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { Ok(()) diff --git a/src/asynk/async_worker.rs b/src/asynk/async_worker.rs index 71ec481..63425fd 100644 --- a/src/asynk/async_worker.rs +++ b/src/asynk/async_worker.rs @@ -152,7 +152,7 @@ mod async_worker_tests { } #[typetag::serde] - #[async_trait(?Send)] + #[async_trait] impl AsyncRunnable for WorkerAsyncTask { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { Ok(()) @@ -164,7 +164,7 @@ mod async_worker_tests { } #[typetag::serde] - #[async_trait(?Send)] + #[async_trait] impl AsyncRunnable for AsyncFailedTask { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { let message = format!("number {} is wrong :(", self.number); @@ -179,7 +179,7 @@ mod async_worker_tests { struct AsyncTaskType1 {} #[typetag::serde] - #[async_trait(?Send)] + #[async_trait] impl AsyncRunnable for AsyncTaskType1 { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { Ok(()) @@ -194,7 +194,7 @@ mod async_worker_tests { struct AsyncTaskType2 {} #[typetag::serde] - #[async_trait(?Send)] + #[async_trait] impl AsyncRunnable for AsyncTaskType2 { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { Ok(()) diff --git a/src/asynk/async_worker_pool.rs b/src/asynk/async_worker_pool.rs new file mode 100644 index 0000000..876bcce --- /dev/null +++ b/src/asynk/async_worker_pool.rs @@ -0,0 +1,78 @@ +use crate::asynk::async_queue::AsyncQueue; +use crate::asynk::async_queue::AsyncQueueable; +use crate::asynk::async_worker::AsyncWorker; +use crate::asynk::Error; +use crate::RetentionMode; +use crate::SleepParams; +use async_recursion::async_recursion; +use bb8_postgres::tokio_postgres::tls::MakeTlsConnect; +use bb8_postgres::tokio_postgres::tls::TlsConnect; +use bb8_postgres::tokio_postgres::Socket; +use log::error; +use std::time::Duration; +use typed_builder::TypedBuilder; + +#[derive(TypedBuilder, Clone)] +pub struct AsyncWorkerPool +where + Tls: MakeTlsConnect + Clone + Send + Sync + 'static, + >::Stream: Send + Sync, + >::TlsConnect: Send, + <>::TlsConnect as TlsConnect>::Future: Send, +{ + #[builder(setter(into))] + pub queue: AsyncQueue, + #[builder(setter(into))] + pub number_of_workers: u16, +} + +#[derive(TypedBuilder, Clone)] +pub struct WorkerParams { + #[builder(setter(into, strip_option), default)] + pub retention_mode: Option, + #[builder(setter(into, strip_option), default)] + pub sleep_params: Option, + #[builder(setter(into, strip_option), default)] + pub task_type: Option, +} + +impl AsyncWorkerPool +where + Tls: MakeTlsConnect + Clone + Send + Sync + 'static, + >::Stream: Send + Sync, + >::TlsConnect: Send, + <>::TlsConnect as TlsConnect>::Future: Send, +{ + pub async fn start(&mut self) { + for _idx in 1..self.number_of_workers + 1 { + let queue = self.queue.clone(); + tokio::spawn(async move { Self::supervise_worker(queue).await }); + } + } + + #[async_recursion] + pub async fn supervise_worker(queue: AsyncQueue) -> Result<(), Error> { + let result = Self::run_worker(queue.clone()).await; + + tokio::time::sleep(Duration::from_secs(1)).await; + + match result { + Err(err) => { + error!("Worker failed. Restarting. {:?}", err); + Self::supervise_worker(queue).await + } + Ok(_) => { + error!("Worker stopped. Restarting"); + Self::supervise_worker(queue).await + } + } + } + + pub async fn run_worker(mut queue: AsyncQueue) -> Result<(), Error> { + let mut worker = AsyncWorker::builder() + .queue(&mut queue as &mut dyn AsyncQueueable) + .build(); + + worker.run_tasks().await + } +} diff --git a/src/asynk/mod.rs b/src/asynk/mod.rs index 45de86c..ab9fb02 100644 --- a/src/asynk/mod.rs +++ b/src/asynk/mod.rs @@ -2,5 +2,7 @@ pub mod async_queue; pub mod async_runnable; pub mod async_scheduler; pub mod async_worker; +pub mod async_worker_pool; + pub use async_runnable::AsyncRunnable; pub use async_runnable::Error; diff --git a/src/lib.rs b/src/lib.rs index c9d36f7..f0cec22 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,3 +59,6 @@ pub use blocking::*; #[cfg(feature = "asynk")] pub mod asynk; + +#[cfg(feature = "asynk")] +pub use asynk::*;