configure workers (#52)

This commit is contained in:
Pmarquez 2022-08-01 16:55:36 +00:00 committed by GitHub
parent 7e66196373
commit 785dc88b8f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2,8 +2,7 @@ use crate::asynk::async_queue::AsyncQueue;
use crate::asynk::async_queue::AsyncQueueable;
use crate::asynk::async_worker::AsyncWorker;
use crate::asynk::Error;
use crate::RetentionMode;
use crate::SleepParams;
use crate::{RetentionMode, SleepParams};
use async_recursion::async_recursion;
use bb8_postgres::tokio_postgres::tls::MakeTlsConnect;
use bb8_postgres::tokio_postgres::tls::TlsConnect;
@ -22,6 +21,10 @@ where
{
#[builder(setter(into))]
pub queue: AsyncQueue<Tls>,
#[builder(default, setter(into))]
pub sleep_params: SleepParams,
#[builder(default, setter(into))]
pub retention_mode: RetentionMode,
#[builder(setter(into))]
pub number_of_workers: u16,
}
@ -46,31 +49,47 @@ where
pub async fn start(&mut self) {
for _idx in 1..self.number_of_workers + 1 {
let queue = self.queue.clone();
tokio::spawn(async move { Self::supervise_worker(queue).await });
let sleep_params = self.sleep_params.clone();
let retention_mode = self.retention_mode.clone();
tokio::spawn(async move {
Self::supervise_worker(queue, sleep_params, retention_mode).await
});
}
}
#[async_recursion]
pub async fn supervise_worker(queue: AsyncQueue<Tls>) -> Result<(), Error> {
let result = Self::run_worker(queue.clone()).await;
pub async fn supervise_worker(
queue: AsyncQueue<Tls>,
sleep_params: SleepParams,
retention_mode: RetentionMode,
) -> Result<(), Error> {
let result =
Self::run_worker(queue.clone(), sleep_params.clone(), retention_mode.clone()).await;
tokio::time::sleep(Duration::from_secs(1)).await;
match result {
Err(err) => {
error!("Worker failed. Restarting. {:?}", err);
Self::supervise_worker(queue).await
Self::supervise_worker(queue, sleep_params, retention_mode).await
}
Ok(_) => {
error!("Worker stopped. Restarting");
Self::supervise_worker(queue).await
Self::supervise_worker(queue, sleep_params, retention_mode).await
}
}
}
pub async fn run_worker(mut queue: AsyncQueue<Tls>) -> Result<(), Error> {
pub async fn run_worker(
mut queue: AsyncQueue<Tls>,
sleep_params: SleepParams,
retention_mode: RetentionMode,
) -> Result<(), Error> {
let mut worker = AsyncWorker::builder()
.queue(&mut queue as &mut dyn AsyncQueueable)
.sleep_params(sleep_params)
.retention_mode(retention_mode)
.build();
worker.run_tasks().await