Recover from panics draft (#57)

* Recover from panics draft

* Handling worker pool panics and Scheduler panics , fix clippy also

* Update src/asynk/async_scheduler.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* add new task

Co-authored-by: pxp9 <48651252+pxp9@users.noreply.github.com>
This commit is contained in:
Ayrat Badykov 2022-08-04 18:22:53 +03:00 committed by GitHub
parent cf2ce19c97
commit ead367d33f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 149 additions and 71 deletions

View file

@ -54,7 +54,7 @@ optional = true
[dependencies.tokio] [dependencies.tokio]
version = "1.20" version = "1.20"
features = ["rt", "time"] features = ["rt", "time", "macros"]
optional = true optional = true
[dependencies.async-trait] [dependencies.async-trait]
@ -68,6 +68,3 @@ optional = true
[dependencies.async-recursion] [dependencies.async-recursion]
version = "1" version = "1"
optional = true optional = true
[dev-dependencies]
tokio = { version = "1.20", features = ["macros"] }

View file

@ -18,19 +18,54 @@ impl MyTask {
} }
} }
#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
pub struct MyFailingTask {
pub number: u16,
}
impl MyFailingTask {
pub fn new(number: u16) -> Self {
Self { number }
}
}
#[async_trait] #[async_trait]
#[typetag::serde] #[typetag::serde]
impl AsyncRunnable for MyTask { impl AsyncRunnable for MyTask {
async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> { async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> {
log::info!("the current number is {}", self.number);
tokio::time::sleep(Duration::from_secs(3)).await;
let new_task = MyTask::new(self.number + 1); let new_task = MyTask::new(self.number + 1);
queue queue
.insert_task(&new_task as &dyn AsyncRunnable) .insert_task(&new_task as &dyn AsyncRunnable)
.await .await
.unwrap(); .unwrap();
log::info!("the current number is {}", self.number);
tokio::time::sleep(Duration::from_secs(3)).await;
Ok(()) Ok(())
} }
} }
#[async_trait]
#[typetag::serde]
impl AsyncRunnable for MyFailingTask {
async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), Error> {
let new_task = MyFailingTask::new(self.number + 1);
queue
.insert_task(&new_task as &dyn AsyncRunnable)
.await
.unwrap();
log::info!("the current number is {}", self.number);
tokio::time::sleep(Duration::from_secs(3)).await;
let b = true;
if b {
panic!("Hello!");
} else {
Ok(())
}
}
}

View file

@ -3,6 +3,7 @@ use fang::asynk::async_queue::AsyncQueueable;
use fang::asynk::async_worker_pool::AsyncWorkerPool; use fang::asynk::async_worker_pool::AsyncWorkerPool;
use fang::AsyncRunnable; use fang::AsyncRunnable;
use fang::NoTls; use fang::NoTls;
use simple_async_worker::MyFailingTask;
use simple_async_worker::MyTask; use simple_async_worker::MyTask;
use std::time::Duration; use std::time::Duration;
@ -22,7 +23,7 @@ async fn main() {
log::info!("Queue connected..."); log::info!("Queue connected...");
let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder() let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()
.number_of_workers(max_pool_size) .number_of_workers(10_u32)
.queue(queue.clone()) .queue(queue.clone())
.build(); .build();
@ -33,6 +34,7 @@ async fn main() {
let task1 = MyTask::new(0); let task1 = MyTask::new(0);
let task2 = MyTask::new(20_000); let task2 = MyTask::new(20_000);
let task3 = MyFailingTask::new(50_000);
queue queue
.insert_task(&task1 as &dyn AsyncRunnable) .insert_task(&task1 as &dyn AsyncRunnable)
@ -43,5 +45,10 @@ async fn main() {
.await .await
.unwrap(); .unwrap();
queue
.insert_task(&task3 as &dyn AsyncRunnable)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(100)).await; tokio::time::sleep(Duration::from_secs(100)).await;
} }

View file

@ -5,56 +5,75 @@ use crate::asynk::Error;
use async_recursion::async_recursion; use async_recursion::async_recursion;
use log::error; use log::error;
use std::time::Duration; use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep; use tokio::time::sleep;
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
#[derive(TypedBuilder)] #[derive(TypedBuilder, Clone)]
pub struct Scheduler<'a> { pub struct Scheduler<AQueue>
where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
#[builder(setter(into))] #[builder(setter(into))]
pub check_period: u64, pub check_period: u64,
#[builder(setter(into))] #[builder(setter(into))]
pub error_margin_seconds: u64, pub error_margin_seconds: u64,
#[builder(setter(into))] #[builder(setter(into))]
pub queue: &'a mut dyn AsyncQueueable, pub queue: AQueue,
#[builder(default = 0, setter(into))] #[builder(default = 0, setter(into))]
pub number_of_restarts: u32, pub number_of_restarts: u32,
} }
impl<'a> Scheduler<'a> { impl<AQueue> Scheduler<AQueue>
where
AQueue: AsyncQueueable + Clone + Sync + 'static,
{
#[async_recursion(?Send)] #[async_recursion(?Send)]
pub async fn start(&mut self) -> Result<(), Error> { pub async fn start(&mut self) -> Result<(), Error> {
let task_res = self.schedule_loop().await; let join_handle: JoinHandle<Result<(), Error>> = self.schedule_loop().await;
sleep(Duration::from_secs(1)).await; match join_handle.await {
match task_res {
Err(err) => { Err(err) => {
error!( error!(
"Scheduler failed, restarting {:?}. Number of restarts {}", "Scheduler panicked, restarting {:?}. Number of restarts {}",
err, self.number_of_restarts err, self.number_of_restarts
); );
self.number_of_restarts += 1; self.number_of_restarts += 1;
sleep(Duration::from_secs(1)).await;
self.start().await self.start().await
} }
Ok(_) => { Ok(task_res) => match task_res {
error!( Err(err) => {
"Scheduler stopped. restarting. Number of restarts {}", error!(
self.number_of_restarts "Scheduler failed, restarting {:?}. Number of restarts {}",
); err, self.number_of_restarts
self.number_of_restarts += 1; );
self.start().await self.number_of_restarts += 1;
} self.start().await
}
Ok(_) => {
error!(
"Scheduler stopped. restarting. Number of restarts {}",
self.number_of_restarts
);
self.number_of_restarts += 1;
self.start().await
}
},
} }
} }
pub async fn schedule_loop(&mut self) -> Result<(), Error> { pub async fn schedule_loop(&mut self) -> JoinHandle<Result<(), Error>> {
let sleep_duration = Duration::from_secs(self.check_period); let mut scheduler = self.clone();
tokio::spawn(async move {
let sleep_duration = Duration::from_secs(scheduler.check_period);
loop { loop {
self.schedule().await?; scheduler.schedule().await?;
sleep(sleep_duration).await; sleep(sleep_duration).await;
} }
})
} }
pub async fn schedule(&mut self) -> Result<(), Error> { pub async fn schedule(&mut self) -> Result<(), Error> {
@ -86,8 +105,23 @@ impl<'a> Scheduler<'a> {
} }
Ok(()) Ok(())
} }
}
#[cfg(test)] #[cfg(test)]
#[derive(TypedBuilder)]
pub struct SchedulerTest<'a> {
#[builder(setter(into))]
pub check_period: u64,
#[builder(setter(into))]
pub error_margin_seconds: u64,
#[builder(setter(into))]
pub queue: &'a mut dyn AsyncQueueable,
#[builder(default = 0, setter(into))]
pub number_of_restarts: u32,
}
#[cfg(test)]
impl<'a> SchedulerTest<'a> {
async fn schedule_test(&mut self) -> Result<(), Error> { async fn schedule_test(&mut self) -> Result<(), Error> {
let sleep_duration = Duration::from_secs(self.check_period); let sleep_duration = Duration::from_secs(self.check_period);
@ -110,11 +144,28 @@ impl<'a> Scheduler<'a> {
}; };
} }
} }
async fn process_task(&mut self, task: PeriodicTask) -> Result<(), Error> {
match task.scheduled_at {
None => {
self.queue.schedule_next_task(task).await?;
}
Some(_) => {
let actual_task: Box<dyn AsyncRunnable> =
serde_json::from_value(task.metadata.clone()).unwrap();
self.queue.insert_task(&*actual_task).await?;
self.queue.schedule_next_task(task).await?;
}
}
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]
mod async_scheduler_tests { mod async_scheduler_tests {
use super::Scheduler; use super::SchedulerTest;
use crate::asynk::async_queue::AsyncQueueTest; use crate::asynk::async_queue::AsyncQueueTest;
use crate::asynk::async_queue::AsyncQueueable; use crate::asynk::async_queue::AsyncQueueable;
use crate::asynk::async_queue::PeriodicTask; use crate::asynk::async_queue::PeriodicTask;
@ -166,7 +217,7 @@ mod async_scheduler_tests {
let check_period: u64 = 1; let check_period: u64 = 1;
let error_margin_seconds: u64 = 2; let error_margin_seconds: u64 = 2;
let mut scheduler = Scheduler::builder() let mut scheduler = SchedulerTest::builder()
.check_period(check_period) .check_period(check_period)
.error_margin_seconds(error_margin_seconds) .error_margin_seconds(error_margin_seconds)
.queue(&mut test as &mut dyn AsyncQueueable) .queue(&mut test as &mut dyn AsyncQueueable)

View file

@ -4,7 +4,7 @@ use crate::asynk::Error;
use crate::{RetentionMode, SleepParams}; use crate::{RetentionMode, SleepParams};
use async_recursion::async_recursion; use async_recursion::async_recursion;
use log::error; use log::error;
use std::time::Duration; use tokio::task::JoinHandle;
use typed_builder::TypedBuilder; use typed_builder::TypedBuilder;
#[derive(TypedBuilder, Clone)] #[derive(TypedBuilder, Clone)]
@ -22,55 +22,43 @@ where
pub number_of_workers: u32, pub number_of_workers: u32,
} }
#[derive(TypedBuilder, Clone)]
pub struct WorkerParams {
#[builder(setter(into, strip_option), default)]
pub retention_mode: Option<RetentionMode>,
#[builder(setter(into, strip_option), default)]
pub sleep_params: Option<SleepParams>,
#[builder(setter(into, strip_option), default)]
pub task_type: Option<String>,
}
impl<AQueue> AsyncWorkerPool<AQueue> impl<AQueue> AsyncWorkerPool<AQueue>
where where
AQueue: AsyncQueueable + Clone + Sync + 'static, AQueue: AsyncQueueable + Clone + Sync + 'static,
{ {
pub async fn start(&mut self) { pub async fn start(&mut self) {
for _idx in 0..self.number_of_workers { for idx in 0..self.number_of_workers {
let queue = self.queue.clone(); let pool = self.clone();
let sleep_params = self.sleep_params.clone(); tokio::spawn(Self::supervise_task(pool, 0, idx));
let retention_mode = self.retention_mode.clone();
tokio::spawn(async move {
Self::supervise_worker(queue, sleep_params, retention_mode).await
});
} }
} }
#[async_recursion] #[async_recursion]
pub async fn supervise_worker( pub async fn supervise_task(pool: AsyncWorkerPool<AQueue>, restarts: u64, worker_number: u32) {
queue: AQueue, let restarts = restarts + 1;
sleep_params: SleepParams, let join_handle = Self::spawn_worker(
retention_mode: RetentionMode, pool.queue.clone(),
) -> Result<(), Error> { pool.sleep_params.clone(),
let result = pool.retention_mode.clone(),
Self::run_worker(queue.clone(), sleep_params.clone(), retention_mode.clone()).await; )
.await;
tokio::time::sleep(Duration::from_secs(1)).await; if (join_handle.await).is_err() {
error!(
match result { "Worker {} stopped. Restarting. the number of restarts {}",
Err(err) => { worker_number, restarts,
error!("Worker failed. Restarting. {:?}", err); );
Self::supervise_worker(queue, sleep_params, retention_mode).await Self::supervise_task(pool, restarts, worker_number).await;
}
Ok(_) => {
error!("Worker stopped. Restarting");
Self::supervise_worker(queue, sleep_params, retention_mode).await
}
} }
} }
pub async fn spawn_worker(
queue: AQueue,
sleep_params: SleepParams,
retention_mode: RetentionMode,
) -> JoinHandle<Result<(), Error>> {
tokio::spawn(async move { Self::run_worker(queue, sleep_params, retention_mode).await })
}
pub async fn run_worker( pub async fn run_worker(
queue: AQueue, queue: AQueue,
sleep_params: SleepParams, sleep_params: SleepParams,