diff --git a/Cargo.toml b/Cargo.toml index be5fed3..e2b9287 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ optional = true [dependencies.tokio] version = "1.20" -features = ["rt", "time"] +features = ["rt", "time", "macros"] optional = true [dependencies.async-trait] @@ -68,6 +68,3 @@ optional = true [dependencies.async-recursion] version = "1" optional = true - -[dev-dependencies] -tokio = { version = "1.20", features = ["macros"] } diff --git a/fang_examples/simple_async_worker/src/lib.rs b/fang_examples/simple_async_worker/src/lib.rs index 28207d7..25951b3 100644 --- a/fang_examples/simple_async_worker/src/lib.rs +++ b/fang_examples/simple_async_worker/src/lib.rs @@ -18,19 +18,54 @@ impl MyTask { } } +#[derive(Serialize, Deserialize)] +#[serde(crate = "fang::serde")] +pub struct MyFailingTask { + pub number: u16, +} + +impl MyFailingTask { + pub fn new(number: u16) -> Self { + Self { number } + } +} + #[async_trait] #[typetag::serde] impl AsyncRunnable for MyTask { async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> { - log::info!("the current number is {}", self.number); - tokio::time::sleep(Duration::from_secs(3)).await; - let new_task = MyTask::new(self.number + 1); queue .insert_task(&new_task as &dyn AsyncRunnable) .await .unwrap(); + log::info!("the current number is {}", self.number); + tokio::time::sleep(Duration::from_secs(3)).await; + Ok(()) } } + +#[async_trait] +#[typetag::serde] +impl AsyncRunnable for MyFailingTask { + async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> { + let new_task = MyFailingTask::new(self.number + 1); + queue + .insert_task(&new_task as &dyn AsyncRunnable) + .await + .unwrap(); + + log::info!("the current number is {}", self.number); + tokio::time::sleep(Duration::from_secs(3)).await; + + let b = true; + + if b { + panic!("Hello!"); + } else { + Ok(()) + } + } +} diff --git a/fang_examples/simple_async_worker/src/main.rs b/fang_examples/simple_async_worker/src/main.rs index 61fb09e..ff58e8c 100644 --- a/fang_examples/simple_async_worker/src/main.rs +++ b/fang_examples/simple_async_worker/src/main.rs @@ -3,6 +3,7 @@ use fang::asynk::async_queue::AsyncQueueable; use fang::asynk::async_worker_pool::AsyncWorkerPool; use fang::AsyncRunnable; use fang::NoTls; +use simple_async_worker::MyFailingTask; use simple_async_worker::MyTask; use std::time::Duration; @@ -22,7 +23,7 @@ async fn main() { log::info!("Queue connected..."); let mut pool: AsyncWorkerPool> = AsyncWorkerPool::builder() - .number_of_workers(max_pool_size) + .number_of_workers(10_u32) .queue(queue.clone()) .build(); @@ -33,6 +34,7 @@ async fn main() { let task1 = MyTask::new(0); let task2 = MyTask::new(20_000); + let task3 = MyFailingTask::new(50_000); queue .insert_task(&task1 as &dyn AsyncRunnable) @@ -43,5 +45,10 @@ async fn main() { .await .unwrap(); + queue + .insert_task(&task3 as &dyn AsyncRunnable) + .await + .unwrap(); + tokio::time::sleep(Duration::from_secs(100)).await; } diff --git a/src/asynk/async_scheduler.rs b/src/asynk/async_scheduler.rs index aec1561..eed582a 100644 --- a/src/asynk/async_scheduler.rs +++ b/src/asynk/async_scheduler.rs @@ -5,56 +5,75 @@ use crate::asynk::Error; use async_recursion::async_recursion; use log::error; use std::time::Duration; +use tokio::task::JoinHandle; use tokio::time::sleep; use typed_builder::TypedBuilder; -#[derive(TypedBuilder)] -pub struct Scheduler<'a> { +#[derive(TypedBuilder, Clone)] +pub struct Scheduler +where + AQueue: AsyncQueueable + Clone + Sync + 'static, +{ #[builder(setter(into))] pub check_period: u64, #[builder(setter(into))] pub error_margin_seconds: u64, #[builder(setter(into))] - pub queue: &'a mut dyn AsyncQueueable, + pub queue: AQueue, #[builder(default = 0, setter(into))] pub number_of_restarts: u32, } -impl<'a> Scheduler<'a> { +impl Scheduler +where + AQueue: AsyncQueueable + Clone + Sync + 'static, +{ #[async_recursion(?Send)] pub async fn start(&mut self) -> Result<(), Error> { - let task_res = self.schedule_loop().await; + let join_handle: JoinHandle> = self.schedule_loop().await; - sleep(Duration::from_secs(1)).await; - - match task_res { + match join_handle.await { Err(err) => { error!( - "Scheduler failed, restarting {:?}. Number of restarts {}", + "Scheduler panicked, restarting {:?}. Number of restarts {}", err, self.number_of_restarts ); self.number_of_restarts += 1; + sleep(Duration::from_secs(1)).await; self.start().await } - Ok(_) => { - error!( - "Scheduler stopped. restarting. Number of restarts {}", - self.number_of_restarts - ); - self.number_of_restarts += 1; - self.start().await - } + Ok(task_res) => match task_res { + Err(err) => { + error!( + "Scheduler failed, restarting {:?}. Number of restarts {}", + err, self.number_of_restarts + ); + self.number_of_restarts += 1; + self.start().await + } + Ok(_) => { + error!( + "Scheduler stopped. restarting. Number of restarts {}", + self.number_of_restarts + ); + self.number_of_restarts += 1; + self.start().await + } + }, } } - pub async fn schedule_loop(&mut self) -> Result<(), Error> { - let sleep_duration = Duration::from_secs(self.check_period); + pub async fn schedule_loop(&mut self) -> JoinHandle> { + let mut scheduler = self.clone(); + tokio::spawn(async move { + let sleep_duration = Duration::from_secs(scheduler.check_period); - loop { - self.schedule().await?; + loop { + scheduler.schedule().await?; - sleep(sleep_duration).await; - } + sleep(sleep_duration).await; + } + }) } pub async fn schedule(&mut self) -> Result<(), Error> { @@ -86,8 +105,23 @@ impl<'a> Scheduler<'a> { } Ok(()) } +} - #[cfg(test)] +#[cfg(test)] +#[derive(TypedBuilder)] +pub struct SchedulerTest<'a> { + #[builder(setter(into))] + pub check_period: u64, + #[builder(setter(into))] + pub error_margin_seconds: u64, + #[builder(setter(into))] + pub queue: &'a mut dyn AsyncQueueable, + #[builder(default = 0, setter(into))] + pub number_of_restarts: u32, +} + +#[cfg(test)] +impl<'a> SchedulerTest<'a> { async fn schedule_test(&mut self) -> Result<(), Error> { let sleep_duration = Duration::from_secs(self.check_period); @@ -110,11 +144,28 @@ impl<'a> Scheduler<'a> { }; } } + + async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> { + match task.scheduled_at { + None => { + self.queue.schedule_next_task(task).await?; + } + Some(_) => { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + self.queue.insert_task(&*actual_task).await?; + + self.queue.schedule_next_task(task).await?; + } + } + Ok(()) + } } #[cfg(test)] mod async_scheduler_tests { - use super::Scheduler; + use super::SchedulerTest; use crate::asynk::async_queue::AsyncQueueTest; use crate::asynk::async_queue::AsyncQueueable; use crate::asynk::async_queue::PeriodicTask; @@ -166,7 +217,7 @@ mod async_scheduler_tests { let check_period: u64 = 1; let error_margin_seconds: u64 = 2; - let mut scheduler = Scheduler::builder() + let mut scheduler = SchedulerTest::builder() .check_period(check_period) .error_margin_seconds(error_margin_seconds) .queue(&mut test as &mut dyn AsyncQueueable) diff --git a/src/asynk/async_worker_pool.rs b/src/asynk/async_worker_pool.rs index 8a60622..d84754c 100644 --- a/src/asynk/async_worker_pool.rs +++ b/src/asynk/async_worker_pool.rs @@ -4,7 +4,7 @@ use crate::asynk::Error; use crate::{RetentionMode, SleepParams}; use async_recursion::async_recursion; use log::error; -use std::time::Duration; +use tokio::task::JoinHandle; use typed_builder::TypedBuilder; #[derive(TypedBuilder, Clone)] @@ -22,55 +22,43 @@ where pub number_of_workers: u32, } -#[derive(TypedBuilder, Clone)] -pub struct WorkerParams { - #[builder(setter(into, strip_option), default)] - pub retention_mode: Option, - #[builder(setter(into, strip_option), default)] - pub sleep_params: Option, - #[builder(setter(into, strip_option), default)] - pub task_type: Option, -} - impl AsyncWorkerPool where AQueue: AsyncQueueable + Clone + Sync + 'static, { pub async fn start(&mut self) { - for _idx in 0..self.number_of_workers { - let queue = self.queue.clone(); - let sleep_params = self.sleep_params.clone(); - let retention_mode = self.retention_mode.clone(); - - tokio::spawn(async move { - Self::supervise_worker(queue, sleep_params, retention_mode).await - }); + for idx in 0..self.number_of_workers { + let pool = self.clone(); + tokio::spawn(Self::supervise_task(pool, 0, idx)); } } #[async_recursion] - pub async fn supervise_worker( - queue: AQueue, - sleep_params: SleepParams, - retention_mode: RetentionMode, - ) -> Result<(), Error> { - let result = - Self::run_worker(queue.clone(), sleep_params.clone(), retention_mode.clone()).await; + pub async fn supervise_task(pool: AsyncWorkerPool, restarts: u64, worker_number: u32) { + let restarts = restarts + 1; + let join_handle = Self::spawn_worker( + pool.queue.clone(), + pool.sleep_params.clone(), + pool.retention_mode.clone(), + ) + .await; - tokio::time::sleep(Duration::from_secs(1)).await; - - match result { - Err(err) => { - error!("Worker failed. Restarting. {:?}", err); - Self::supervise_worker(queue, sleep_params, retention_mode).await - } - Ok(_) => { - error!("Worker stopped. Restarting"); - Self::supervise_worker(queue, sleep_params, retention_mode).await - } + if (join_handle.await).is_err() { + error!( + "Worker {} stopped. Restarting. the number of restarts {}", + worker_number, restarts, + ); + Self::supervise_task(pool, restarts, worker_number).await; } } + pub async fn spawn_worker( + queue: AQueue, + sleep_params: SleepParams, + retention_mode: RetentionMode, + ) -> JoinHandle> { + tokio::spawn(async move { Self::run_worker(queue, sleep_params, retention_mode).await }) + } pub async fn run_worker( queue: AQueue, sleep_params: SleepParams,