Allow definition of custom error type

This commit is contained in:
Rafael Caricio 2023-03-13 17:46:59 +01:00
parent 2b42a27b72
commit aa1144e54f
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
6 changed files with 91 additions and 46 deletions

View file

@ -17,7 +17,6 @@ chrono = "0.4"
log = "0.4" log = "0.4"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
anyhow = "1"
thiserror = "1" thiserror = "1"
uuid = { version = "1.1", features = ["v4", "serde"] } uuid = { version = "1.1", features = ["v4", "serde"] }
async-trait = "0.1" async-trait = "0.1"

View file

@ -53,7 +53,6 @@ If you are not already using, you will also want to include the following depend
```toml ```toml
[dependencies] [dependencies]
async-trait = "0.1" async-trait = "0.1"
anyhow = "1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] } diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] }
diesel-async = { version = "0.2", features = ["postgres", "bb8"] } diesel-async = { version = "0.2", features = ["postgres", "bb8"] }
@ -75,6 +74,9 @@ the whole application. This attribute is critical for reconstructing the task ba
The [`BackgroundTask::AppData`] can be used to argument the task with your application specific contextual information. The [`BackgroundTask::AppData`] can be used to argument the task with your application specific contextual information.
This is useful for example to pass a database connection pool to the task or other application configuration. This is useful for example to pass a database connection pool to the task or other application configuration.
The [`BackgroundTask::Error`] is the error type that will be returned by the [`BackgroundTask::run`] method. You can
use this to define your own error type for your tasks.
The [`BackgroundTask::run`] method is where you define the behaviour of your background task execution. This method The [`BackgroundTask::run`] method is where you define the behaviour of your background task execution. This method
will be called by the task queue workers. will be called by the task queue workers.
@ -92,8 +94,9 @@ pub struct MyTask {
impl BackgroundTask for MyTask { impl BackgroundTask for MyTask {
const TASK_NAME: &'static str = "my_task_unique_name"; const TASK_NAME: &'static str = "my_task_unique_name";
type AppData = (); type AppData = ();
type Error = ();
async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), anyhow::Error> { async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Self::Error> {
// Do something // Do something
Ok(()) Ok(())
} }

View file

@ -34,8 +34,9 @@ impl MyTask {
impl BackgroundTask for MyTask { impl BackgroundTask for MyTask {
const TASK_NAME: &'static str = "my_task"; const TASK_NAME: &'static str = "my_task";
type AppData = MyApplicationContext; type AppData = MyApplicationContext;
type Error = anyhow::Error;
async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), anyhow::Error> { async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), Self::Error> {
// let new_task = MyTask::new(self.number + 1); // let new_task = MyTask::new(self.number + 1);
// queue // queue
// .insert_task(&new_task) // .insert_task(&new_task)
@ -70,8 +71,9 @@ impl MyFailingTask {
impl BackgroundTask for MyFailingTask { impl BackgroundTask for MyFailingTask {
const TASK_NAME: &'static str = "my_failing_task"; const TASK_NAME: &'static str = "my_failing_task";
type AppData = MyApplicationContext; type AppData = MyApplicationContext;
type Error = anyhow::Error;
async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), anyhow::Error> { async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), Self::Error> {
// let new_task = MyFailingTask::new(self.number + 1); // let new_task = MyFailingTask::new(self.number + 1);
// queue // queue
// .insert_task(&new_task) // .insert_task(&new_task)

View file

@ -1,6 +1,7 @@
use crate::task::{CurrentTask, TaskHash}; use crate::task::{CurrentTask, TaskHash};
use async_trait::async_trait; use async_trait::async_trait;
use serde::{de::DeserializeOwned, ser::Serialize}; use serde::{de::DeserializeOwned, ser::Serialize};
use std::fmt::Debug;
/// The [`BackgroundTask`] trait is used to define the behaviour of a task. You must implement this /// The [`BackgroundTask`] trait is used to define the behaviour of a task. You must implement this
/// trait for all tasks you want to execute. /// trait for all tasks you want to execute.
@ -29,8 +30,9 @@ use serde::{de::DeserializeOwned, ser::Serialize};
/// impl BackgroundTask for MyTask { /// impl BackgroundTask for MyTask {
/// const TASK_NAME: &'static str = "my_task_unique_name"; /// const TASK_NAME: &'static str = "my_task_unique_name";
/// type AppData = (); /// type AppData = ();
/// type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
/// ///
/// async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), anyhow::Error> { /// async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Self::Error> {
/// // Do something /// // Do something
/// Ok(()) /// Ok(())
/// } /// }
@ -57,8 +59,11 @@ pub trait BackgroundTask: Serialize + DeserializeOwned + Sync + Send + 'static {
/// The application data provided to this task at runtime. /// The application data provided to this task at runtime.
type AppData: Clone + Send + 'static; type AppData: Clone + Send + 'static;
/// An application custom error type.
type Error: Debug + Send + 'static;
/// Execute the task. This method should define its logic /// Execute the task. This method should define its logic
async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), anyhow::Error>; async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Self::Error>;
/// If set to true, no new tasks with the same metadata will be inserted /// If set to true, no new tasks with the same metadata will be inserted
/// By default it is set to false. /// By default it is set to false.

View file

@ -30,7 +30,7 @@ pub enum TaskExecError {
TaskDeserializationFailed(#[from] serde_json::Error), TaskDeserializationFailed(#[from] serde_json::Error),
#[error("Task execution failed: {0}")] #[error("Task execution failed: {0}")]
ExecutionFailed(#[from] anyhow::Error), ExecutionFailed(String),
#[error("Task panicked with: {0}")] #[error("Task panicked with: {0}")]
Panicked(String), Panicked(String),
@ -46,8 +46,10 @@ where
{ {
Box::pin(async move { Box::pin(async move {
let background_task: BT = serde_json::from_value(payload)?; let background_task: BT = serde_json::from_value(payload)?;
background_task.run(task_info, app_context).await?; match background_task.run(task_info, app_context).await {
Ok(()) Ok(_) => Ok(()),
Err(err) => Err(TaskExecError::ExecutionFailed(format!("{:?}", err))),
}
}) })
} }
@ -250,8 +252,9 @@ mod async_worker_tests {
impl BackgroundTask for WorkerAsyncTask { impl BackgroundTask for WorkerAsyncTask {
const TASK_NAME: &'static str = "WorkerAsyncTask"; const TASK_NAME: &'static str = "WorkerAsyncTask";
type AppData = (); type AppData = ();
type Error = ();
async fn run(&self, _: CurrentTask, _: Self::AppData) -> Result<(), anyhow::Error> { async fn run(&self, _: CurrentTask, _: Self::AppData) -> Result<(), ()> {
Ok(()) Ok(())
} }
} }
@ -265,8 +268,9 @@ mod async_worker_tests {
impl BackgroundTask for WorkerAsyncTaskSchedule { impl BackgroundTask for WorkerAsyncTaskSchedule {
const TASK_NAME: &'static str = "WorkerAsyncTaskSchedule"; const TASK_NAME: &'static str = "WorkerAsyncTaskSchedule";
type AppData = (); type AppData = ();
type Error = ();
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), anyhow::Error> { async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), ()> {
Ok(()) Ok(())
} }
@ -284,11 +288,12 @@ mod async_worker_tests {
impl BackgroundTask for AsyncFailedTask { impl BackgroundTask for AsyncFailedTask {
const TASK_NAME: &'static str = "AsyncFailedTask"; const TASK_NAME: &'static str = "AsyncFailedTask";
type AppData = (); type AppData = ();
type Error = TaskError;
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), anyhow::Error> { async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), TaskError> {
let message = format!("number {} is wrong :(", self.number); let message = format!("number {} is wrong :(", self.number);
Err(TaskError::Custom(message).into()) Err(TaskError::Custom(message))
} }
fn max_retries(&self) -> i32 { fn max_retries(&self) -> i32 {
@ -303,9 +308,10 @@ mod async_worker_tests {
impl BackgroundTask for AsyncRetryTask { impl BackgroundTask for AsyncRetryTask {
const TASK_NAME: &'static str = "AsyncRetryTask"; const TASK_NAME: &'static str = "AsyncRetryTask";
type AppData = (); type AppData = ();
type Error = TaskError;
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), anyhow::Error> { async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), Self::Error> {
Err(TaskError::SomethingWrong.into()) Err(TaskError::SomethingWrong)
} }
} }
@ -316,8 +322,9 @@ mod async_worker_tests {
impl BackgroundTask for AsyncTaskType1 { impl BackgroundTask for AsyncTaskType1 {
const TASK_NAME: &'static str = "AsyncTaskType1"; const TASK_NAME: &'static str = "AsyncTaskType1";
type AppData = (); type AppData = ();
type Error = ();
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), anyhow::Error> { async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), Self::Error> {
Ok(()) Ok(())
} }
} }
@ -329,8 +336,9 @@ mod async_worker_tests {
impl BackgroundTask for AsyncTaskType2 { impl BackgroundTask for AsyncTaskType2 {
const TASK_NAME: &'static str = "AsyncTaskType2"; const TASK_NAME: &'static str = "AsyncTaskType2";
type AppData = (); type AppData = ();
type Error = ();
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), anyhow::Error> { async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), ()> {
Ok(()) Ok(())
} }
} }

View file

@ -240,7 +240,7 @@ mod tests {
use tokio::sync::Mutex; use tokio::sync::Mutex;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct ApplicationContext { pub struct ApplicationContext {
app_name: String, app_name: String,
} }
@ -261,17 +261,50 @@ mod tests {
person: String, person: String,
} }
/// This tests that one can customize the task parameters for the application.
#[async_trait] #[async_trait]
impl BackgroundTask for GreetingTask { trait MyAppTask {
const TASK_NAME: &'static str = "my_task"; const TASK_NAME: &'static str;
const QUEUE: &'static str = "default";
async fn run(
&self,
task_info: CurrentTask,
app_context: ApplicationContext,
) -> Result<(), ()>;
}
#[async_trait]
impl<T> BackgroundTask for T
where
T: MyAppTask + serde::de::DeserializeOwned + serde::ser::Serialize + Sync + Send + 'static,
{
const TASK_NAME: &'static str = T::TASK_NAME;
const QUEUE: &'static str = T::QUEUE;
type AppData = ApplicationContext; type AppData = ApplicationContext;
type Error = ();
async fn run( async fn run(
&self, &self,
task_info: CurrentTask, task_info: CurrentTask,
app_context: Self::AppData, app_context: Self::AppData,
) -> Result<(), anyhow::Error> { ) -> Result<(), Self::Error> {
self.run(task_info, app_context).await
}
}
#[async_trait]
impl MyAppTask for GreetingTask {
const TASK_NAME: &'static str = "my_task";
async fn run(
&self,
task_info: CurrentTask,
app_context: ApplicationContext,
) -> Result<(), ()> {
println!( println!(
"[{}] Hello {}! I'm {}.", "[{}] Hello {}! I'm {}.",
task_info.id(), task_info.id(),
@ -292,12 +325,9 @@ mod tests {
const QUEUE: &'static str = "other_queue"; const QUEUE: &'static str = "other_queue";
type AppData = ApplicationContext; type AppData = ApplicationContext;
type Error = ();
async fn run( async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Self::Error> {
&self,
task: CurrentTask,
context: Self::AppData,
) -> Result<(), anyhow::Error> {
println!( println!(
"[{}] Other task with {}!", "[{}] Other task with {}!",
task.id(), task.id(),
@ -332,7 +362,7 @@ mod tests {
let (join_handle, queue) = let (join_handle, queue) =
WorkerPool::new(memory_store().await, move |_| my_app_context.clone()) WorkerPool::new(memory_store().await, move |_| my_app_context.clone())
.register_task_type::<GreetingTask>() .register_task_type::<GreetingTask>()
.configure_queue(GreetingTask::QUEUE.into()) .configure_queue(<GreetingTask as MyAppTask>::QUEUE.into())
.start(futures::future::ready(())) .start(futures::future::ready(()))
.await .await
.unwrap(); .unwrap();
@ -391,11 +421,9 @@ mod tests {
type AppData = NotifyFinishedContext; type AppData = NotifyFinishedContext;
async fn run( type Error = ();
&self,
task: CurrentTask, async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), ()> {
context: Self::AppData,
) -> Result<(), anyhow::Error> {
// Notify the test that the task ran // Notify the test that the task ran
match context.notify_finished.lock().await.take() { match context.notify_finished.lock().await.take() {
None => println!("Cannot notify, already done that!"), None => println!("Cannot notify, already done that!"),
@ -455,11 +483,13 @@ mod tests {
type AppData = NotifyUnknownRanContext; type AppData = NotifyUnknownRanContext;
type Error = ();
async fn run( async fn run(
&self, &self,
task: CurrentTask, task: CurrentTask,
context: Self::AppData, context: Self::AppData,
) -> Result<(), anyhow::Error> { ) -> Result<(), Self::Error> {
// Notify the test that the task ran // Notify the test that the task ran
match context.should_stop.lock().await.take() { match context.should_stop.lock().await.take() {
None => println!("Cannot notify, already done that!"), None => println!("Cannot notify, already done that!"),
@ -481,11 +511,9 @@ mod tests {
type AppData = NotifyUnknownRanContext; type AppData = NotifyUnknownRanContext;
async fn run( type Error = ();
&self,
task: CurrentTask, async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), ()> {
context: Self::AppData,
) -> Result<(), anyhow::Error> {
println!("[{}] Unknown task ran!", task.id()); println!("[{}] Unknown task ran!", task.id());
context.unknown_task_ran.store(true, Ordering::Relaxed); context.unknown_task_ran.store(true, Ordering::Relaxed);
Ok(()) Ok(())
@ -537,12 +565,9 @@ mod tests {
impl BackgroundTask for BrokenTask { impl BackgroundTask for BrokenTask {
const TASK_NAME: &'static str = "panic_me"; const TASK_NAME: &'static str = "panic_me";
type AppData = (); type AppData = ();
type Error = ();
async fn run( async fn run(&self, _task: CurrentTask, _context: Self::AppData) -> Result<(), ()> {
&self,
_task: CurrentTask,
_context: Self::AppData,
) -> Result<(), anyhow::Error> {
panic!("Oh no!"); panic!("Oh no!");
} }
} }
@ -609,11 +634,13 @@ mod tests {
type AppData = PlayerContext; type AppData = PlayerContext;
type Error = ();
async fn run( async fn run(
&self, &self,
_task: CurrentTask, _task: CurrentTask,
context: Self::AppData, context: Self::AppData,
) -> Result<(), anyhow::Error> { ) -> Result<(), Self::Error> {
loop { loop {
let msg = context.ping_rx.lock().await.recv().await.unwrap(); let msg = context.ping_rx.lock().await.recv().await.unwrap();
match msg { match msg {
@ -696,7 +723,8 @@ mod tests {
WorkerPool::new(pg_task_store().await, move |_| my_app_context.clone()) WorkerPool::new(pg_task_store().await, move |_| my_app_context.clone())
.register_task_type::<GreetingTask>() .register_task_type::<GreetingTask>()
.configure_queue( .configure_queue(
QueueConfig::new(GreetingTask::QUEUE).retention_mode(RetentionMode::RemoveDone), QueueConfig::new(<GreetingTask as MyAppTask>::QUEUE)
.retention_mode(RetentionMode::RemoveDone),
) )
.start(futures::future::ready(())) .start(futures::future::ready(()))
.await .await