From 7fcb63f75c8966da5f7f7780f9fcf495c303825e Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Thu, 9 Mar 2023 19:12:50 +0100 Subject: [PATCH] Update readme --- README.md | 4 +- src/worker.rs | 127 +++++++++++++++++++++++++------------------------- 2 files changed, 65 insertions(+), 66 deletions(-) diff --git a/README.md b/README.md index 22d1100..cba2f3f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,4 @@ -[![Crates.io][s1]][ci] [![docs page][docs-badge]][docs] ![test][ga-test] ![style][ga-style] - -# Backie +# Backie 🚲 Async background job processing library with Diesel and Tokio. It's a heavily modified fork of [fang](https://github.com/ayrat555/fang). diff --git a/src/worker.rs b/src/worker.rs index ef8ca47..aecc558 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -32,6 +32,70 @@ impl Worker where Q: Queueable + Clone + Sync + 'static, { + pub(crate) async fn run_tasks(&mut self) -> Result<(), BackieError> { + loop { + // Need to check if has to stop before pulling next task + match self.queue.pull_next_task(self.task_type.clone()).await? { + Some(task) => { + let actual_task: Box = + serde_json::from_value(task.payload.clone())?; + + // check if task is scheduled or not + if let Some(CronPattern(_)) = actual_task.cron() { + // program task + //self.queue.schedule_task(&*actual_task).await?; + } + // run scheduled task + // TODO: what do we do if the task fails? it's an internal error, inform the logs + let _ = self.run(task, actual_task).await; + } + None => { + // Listen to watchable future + // All that until a max timeout + match &mut self.shutdown { + Some(recv) => { + // Listen to watchable future + // All that until a max timeout + select! { + _ = recv.changed().fuse() => { + log::info!("Shutting down worker"); + return Ok(()); + } + _ = tokio::time::sleep(std::time::Duration::from_secs(1)).fuse() => {} + } + } + None => { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + }; + } + }; + } + } + + #[cfg(test)] + pub async fn run_tasks_until_none(&mut self) -> Result<(), BackieError> { + loop { + match self.queue.pull_next_task(self.task_type.clone()).await? { + Some(task) => { + let actual_task: Box = + serde_json::from_value(task.payload.clone()).unwrap(); + + // check if task is scheduled or not + if let Some(CronPattern(_)) = actual_task.cron() { + // program task + // self.queue.schedule_task(&*actual_task).await?; + } + // run scheduled task + self.run(task, actual_task).await?; + } + None => { + return Ok(()); + } + }; + } + } + async fn run( &mut self, task: Task, @@ -101,69 +165,6 @@ where Ok(()) } - - pub(crate) async fn run_tasks(&mut self) -> Result<(), BackieError> { - loop { - match self.queue.pull_next_task(self.task_type.clone()).await? { - Some(task) => { - let actual_task: Box = - serde_json::from_value(task.payload.clone())?; - - // check if task is scheduled or not - if let Some(CronPattern(_)) = actual_task.cron() { - // program task - //self.queue.schedule_task(&*actual_task).await?; - } - // run scheduled task - // TODO: what do we do if the task fails? it's an internal error, inform the logs - let _ = self.run(task, actual_task).await; - } - None => { - // Listen to watchable future - // All that until a max timeout - match &mut self.shutdown { - Some(recv) => { - // Listen to watchable future - // All that until a max timeout - select! { - _ = recv.changed().fuse() => { - log::info!("Shutting down worker"); - return Ok(()); - } - _ = tokio::time::sleep(std::time::Duration::from_secs(1)).fuse() => {} - } - } - None => { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }; - } - }; - } - } - - #[cfg(test)] - pub async fn run_tasks_until_none(&mut self) -> Result<(), BackieError> { - loop { - match self.queue.pull_next_task(self.task_type.clone()).await? { - Some(task) => { - let actual_task: Box = - serde_json::from_value(task.payload.clone()).unwrap(); - - // check if task is scheduled or not - if let Some(CronPattern(_)) = actual_task.cron() { - // program task - // self.queue.schedule_task(&*actual_task).await?; - } - // run scheduled task - self.run(task, actual_task).await?; - } - None => { - return Ok(()); - } - }; - } - } } #[cfg(test)]