Async worker pool draft (#49)

* Async worker pool draft

* ignore all targets

* simple worker example

* add debug logs

* fix

* fix tests

* remove log

* remove debug logs

* fix clippy
This commit is contained in:
Ayrat Badykov 2022-07-31 16:32:37 +03:00 committed by GitHub
parent 6222c15d99
commit 133d142761
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 184 additions and 10 deletions

2
.gitignore vendored
View file

@ -1,3 +1,3 @@
/target **/target
Cargo.lock Cargo.lock
src/schema.rs src/schema.rs

View file

@ -0,0 +1,16 @@
[package]
name = "simple_async_worker"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
fang = { path = "../../" , features = ["asynk"]}
env_logger = "0.9.0"
log = "0.4.0"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"
async-trait = "0.1"
tokio-postgres = "0.7"

View file

@ -0,0 +1,34 @@
use async_trait::async_trait;
use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_runnable::Error;
use fang::typetag;
use fang::AsyncRunnable;
use serde::Deserialize;
use serde::Serialize;
use std::time::Duration;
#[derive(Serialize, Deserialize)]
pub struct MyTask {
pub number: u16,
}
impl MyTask {
pub fn new(number: u16) -> Self {
Self { number }
}
}
#[async_trait]
#[typetag::serde]
impl AsyncRunnable for MyTask {
async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> {
log::info!("the curreny number is {}", self.number);
tokio::time::sleep(Duration::from_secs(3)).await;
let new_task = MyTask::new(self.number + 1);
let metadata = serde_json::to_value(&new_task as &dyn AsyncRunnable).unwrap();
queue.insert_task(metadata, "common").await.unwrap();
Ok(())
}
}

View file

@ -0,0 +1,41 @@
use fang::asynk::async_queue::AsyncQueue;
use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_worker_pool::AsyncWorkerPool;
use fang::AsyncRunnable;
use simple_async_worker::MyTask;
use std::time::Duration;
use tokio_postgres::NoTls;
#[tokio::main]
async fn main() {
env_logger::init();
log::info!("Starting...");
let mut queue = AsyncQueue::connect("postgres://postgres:postgres@localhost/fang", NoTls, true)
.await
.unwrap();
log::info!("Queue connected...");
let mut pool = AsyncWorkerPool::builder()
.queue(queue.clone())
.number_of_workers(2 as u16)
.build();
log::info!("Pool created ...");
pool.start().await;
log::info!("Workers started ...");
let task1 = MyTask::new(0);
let task2 = MyTask::new(20_000);
let metadata1 = serde_json::to_value(&task1 as &dyn AsyncRunnable).unwrap();
let metadata2 = serde_json::to_value(&task2 as &dyn AsyncRunnable).unwrap();
queue.insert_task(metadata1, "common").await.unwrap();
queue.insert_task(metadata2, "common").await.unwrap();
tokio::time::sleep(Duration::from_secs(100)).await;
}

View file

@ -109,7 +109,7 @@ impl From<AsyncQueueError> for FangError {
} }
#[async_trait] #[async_trait]
pub trait AsyncQueueable { pub trait AsyncQueueable: Send {
async fn fetch_and_touch_task( async fn fetch_and_touch_task(
&mut self, &mut self,
task_type: Option<String>, task_type: Option<String>,
@ -753,7 +753,7 @@ mod async_queue_tests {
} }
#[typetag::serde] #[typetag::serde]
#[async_trait(?Send)] #[async_trait]
impl AsyncRunnable for AsyncTask { impl AsyncRunnable for AsyncTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(()) Ok(())

View file

@ -9,8 +9,8 @@ pub struct Error {
} }
#[typetag::serde(tag = "type")] #[typetag::serde(tag = "type")]
#[async_trait(?Send)] #[async_trait]
pub trait AsyncRunnable { pub trait AsyncRunnable: Send + Sync {
async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), Error>; async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), Error>;
fn task_type(&self) -> String { fn task_type(&self) -> String {

View file

@ -139,7 +139,7 @@ mod async_scheduler_tests {
} }
#[typetag::serde] #[typetag::serde]
#[async_trait(?Send)] #[async_trait]
impl AsyncRunnable for AsyncScheduledTask { impl AsyncRunnable for AsyncScheduledTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(()) Ok(())

View file

@ -152,7 +152,7 @@ mod async_worker_tests {
} }
#[typetag::serde] #[typetag::serde]
#[async_trait(?Send)] #[async_trait]
impl AsyncRunnable for WorkerAsyncTask { impl AsyncRunnable for WorkerAsyncTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(()) Ok(())
@ -164,7 +164,7 @@ mod async_worker_tests {
} }
#[typetag::serde] #[typetag::serde]
#[async_trait(?Send)] #[async_trait]
impl AsyncRunnable for AsyncFailedTask { impl AsyncRunnable for AsyncFailedTask {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
let message = format!("number {} is wrong :(", self.number); let message = format!("number {} is wrong :(", self.number);
@ -179,7 +179,7 @@ mod async_worker_tests {
struct AsyncTaskType1 {} struct AsyncTaskType1 {}
#[typetag::serde] #[typetag::serde]
#[async_trait(?Send)] #[async_trait]
impl AsyncRunnable for AsyncTaskType1 { impl AsyncRunnable for AsyncTaskType1 {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(()) Ok(())
@ -194,7 +194,7 @@ mod async_worker_tests {
struct AsyncTaskType2 {} struct AsyncTaskType2 {}
#[typetag::serde] #[typetag::serde]
#[async_trait(?Send)] #[async_trait]
impl AsyncRunnable for AsyncTaskType2 { impl AsyncRunnable for AsyncTaskType2 {
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
Ok(()) Ok(())

View file

@ -0,0 +1,78 @@
use crate::asynk::async_queue::AsyncQueue;
use crate::asynk::async_queue::AsyncQueueable;
use crate::asynk::async_worker::AsyncWorker;
use crate::asynk::Error;
use crate::RetentionMode;
use crate::SleepParams;
use async_recursion::async_recursion;
use bb8_postgres::tokio_postgres::tls::MakeTlsConnect;
use bb8_postgres::tokio_postgres::tls::TlsConnect;
use bb8_postgres::tokio_postgres::Socket;
use log::error;
use std::time::Duration;
use typed_builder::TypedBuilder;
#[derive(TypedBuilder, Clone)]
pub struct AsyncWorkerPool<Tls>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
#[builder(setter(into))]
pub queue: AsyncQueue<Tls>,
#[builder(setter(into))]
pub number_of_workers: u16,
}
#[derive(TypedBuilder, Clone)]
pub struct WorkerParams {
#[builder(setter(into, strip_option), default)]
pub retention_mode: Option<RetentionMode>,
#[builder(setter(into, strip_option), default)]
pub sleep_params: Option<SleepParams>,
#[builder(setter(into, strip_option), default)]
pub task_type: Option<String>,
}
impl<Tls> AsyncWorkerPool<Tls>
where
Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
<Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
pub async fn start(&mut self) {
for _idx in 1..self.number_of_workers + 1 {
let queue = self.queue.clone();
tokio::spawn(async move { Self::supervise_worker(queue).await });
}
}
#[async_recursion]
pub async fn supervise_worker(queue: AsyncQueue<Tls>) -> Result<(), Error> {
let result = Self::run_worker(queue.clone()).await;
tokio::time::sleep(Duration::from_secs(1)).await;
match result {
Err(err) => {
error!("Worker failed. Restarting. {:?}", err);
Self::supervise_worker(queue).await
}
Ok(_) => {
error!("Worker stopped. Restarting");
Self::supervise_worker(queue).await
}
}
}
pub async fn run_worker(mut queue: AsyncQueue<Tls>) -> Result<(), Error> {
let mut worker = AsyncWorker::builder()
.queue(&mut queue as &mut dyn AsyncQueueable)
.build();
worker.run_tasks().await
}
}

View file

@ -2,5 +2,7 @@ pub mod async_queue;
pub mod async_runnable; pub mod async_runnable;
pub mod async_scheduler; pub mod async_scheduler;
pub mod async_worker; pub mod async_worker;
pub mod async_worker_pool;
pub use async_runnable::AsyncRunnable; pub use async_runnable::AsyncRunnable;
pub use async_runnable::Error; pub use async_runnable::Error;

View file

@ -59,3 +59,6 @@ pub use blocking::*;
#[cfg(feature = "asynk")] #[cfg(feature = "asynk")]
pub mod asynk; pub mod asynk;
#[cfg(feature = "asynk")]
pub use asynk::*;