diff --git a/src/asynk.rs b/src/asynk.rs index b63cf51..a75dd03 100644 --- a/src/asynk.rs +++ b/src/asynk.rs @@ -5,6 +5,5 @@ pub mod async_worker_pool; pub use async_queue::*; pub use async_runnable::AsyncRunnable; -pub use async_runnable::Error as AsyncError; pub use async_worker::*; pub use async_worker_pool::*; diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index 98508b1..35faebe 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -1,5 +1,4 @@ use crate::asynk::async_runnable::AsyncRunnable; -use crate::asynk::async_runnable::Error as FangError; use crate::CronError; use crate::Scheduled::*; use async_trait::async_trait; @@ -103,15 +102,6 @@ impl From for AsyncQueueError { } } -impl From for FangError { - fn from(error: AsyncQueueError) -> Self { - let message = format!("{:?}", error); - FangError { - description: message, - } - } -} - #[async_trait] pub trait AsyncQueueable: Send { async fn fetch_and_touch_task( @@ -708,8 +698,8 @@ mod async_queue_tests { use super::AsyncQueueable; use super::FangTaskState; use super::Task; - use crate::asynk::AsyncError as Error; use crate::asynk::AsyncRunnable; + use crate::FangError; use crate::Scheduled; use async_trait::async_trait; use bb8_postgres::bb8::Pool; @@ -729,7 +719,7 @@ mod async_queue_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } } @@ -743,7 +733,7 @@ mod async_queue_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTaskSchedule { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } diff --git a/src/asynk/async_runnable.rs b/src/asynk/async_runnable.rs index 752329d..49b0122 100644 --- a/src/asynk/async_runnable.rs +++ b/src/asynk/async_runnable.rs @@ -1,18 +1,45 @@ +use crate::async_queue::AsyncQueueError; use crate::asynk::async_queue::AsyncQueueable; +use crate::FangError; use crate::Scheduled; use async_trait::async_trait; +use bb8_postgres::bb8::RunError; +use bb8_postgres::tokio_postgres::Error as TokioPostgresError; +use serde_json::Error as SerdeError; const COMMON_TYPE: &str = "common"; -#[derive(Debug)] -pub struct Error { - pub description: String, +impl From for FangError { + fn from(error: AsyncQueueError) -> Self { + let message = format!("{:?}", error); + FangError { + description: message, + } + } +} + +impl From for FangError { + fn from(error: TokioPostgresError) -> Self { + Self::from(AsyncQueueError::PgError(error)) + } +} + +impl From> for FangError { + fn from(error: RunError) -> Self { + Self::from(AsyncQueueError::PoolError(error)) + } +} + +impl From for FangError { + fn from(error: SerdeError) -> Self { + Self::from(AsyncQueueError::SerdeError(error)) + } } #[typetag::serde(tag = "type")] #[async_trait] pub trait AsyncRunnable: Send + Sync { - async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), Error>; + async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), FangError>; fn task_type(&self) -> String { COMMON_TYPE.to_string() diff --git a/src/asynk/async_worker.rs b/src/asynk/async_worker.rs index f63057f..6dbd7e8 100644 --- a/src/asynk/async_worker.rs +++ b/src/asynk/async_worker.rs @@ -3,7 +3,7 @@ use crate::asynk::async_queue::FangTaskState; use crate::asynk::async_queue::Task; use crate::asynk::async_queue::DEFAULT_TASK_TYPE; use crate::asynk::async_runnable::AsyncRunnable; -use crate::asynk::AsyncError as Error; +use crate::FangError; use crate::Scheduled::*; use crate::{RetentionMode, SleepParams}; use log::error; @@ -32,7 +32,7 @@ where &mut self, task: Task, actual_task: Box, - ) -> Result<(), Error> { + ) -> Result<(), FangError> { let result = self.execute_task(task, actual_task).await; self.finalize_task(result).await } @@ -49,7 +49,10 @@ where } } - async fn finalize_task(&mut self, result: Result) -> Result<(), Error> { + async fn finalize_task( + &mut self, + result: Result, + ) -> Result<(), FangError> { match self.retention_mode { RetentionMode::KeepAll => match result { Ok(task) => { @@ -92,7 +95,7 @@ where tokio::time::sleep(self.sleep_params.sleep_period).await; } - pub async fn run_tasks(&mut self) -> Result<(), Error> { + pub async fn run_tasks(&mut self) -> Result<(), FangError> { loop { //fetch task match self @@ -146,7 +149,7 @@ impl<'a> AsyncWorkerTest<'a> { &mut self, task: Task, actual_task: Box, - ) -> Result<(), Error> { + ) -> Result<(), FangError> { let result = self.execute_task(task, actual_task).await; self.finalize_task(result).await } @@ -163,7 +166,10 @@ impl<'a> AsyncWorkerTest<'a> { } } - async fn finalize_task(&mut self, result: Result) -> Result<(), Error> { + async fn finalize_task( + &mut self, + result: Result, + ) -> Result<(), FangError> { match self.retention_mode { RetentionMode::KeepAll => match result { Ok(task) => { @@ -206,7 +212,7 @@ impl<'a> AsyncWorkerTest<'a> { tokio::time::sleep(self.sleep_params.sleep_period).await; } - pub async fn run_tasks_until_none(&mut self) -> Result<(), Error> { + pub async fn run_tasks_until_none(&mut self) -> Result<(), FangError> { loop { match self .queue @@ -246,8 +252,8 @@ mod async_worker_tests { use crate::asynk::async_queue::AsyncQueueable; use crate::asynk::async_queue::FangTaskState; use crate::asynk::async_worker::Task; - use crate::asynk::AsyncError as Error; use crate::asynk::AsyncRunnable; + use crate::FangError; use crate::RetentionMode; use crate::Scheduled; use async_trait::async_trait; @@ -266,7 +272,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for WorkerAsyncTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } } @@ -279,7 +285,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for WorkerAsyncTaskSchedule { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } fn cron(&self) -> Option { @@ -295,10 +301,10 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncFailedTask { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { let message = format!("number {} is wrong :(", self.number); - Err(Error { + Err(FangError { description: message, }) } @@ -310,7 +316,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTaskType1 { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } @@ -325,7 +331,7 @@ mod async_worker_tests { #[typetag::serde] #[async_trait] impl AsyncRunnable for AsyncTaskType2 { - async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { Ok(()) } diff --git a/src/asynk/async_worker_pool.rs b/src/asynk/async_worker_pool.rs index 57796c4..6a2e000 100644 --- a/src/asynk/async_worker_pool.rs +++ b/src/asynk/async_worker_pool.rs @@ -1,7 +1,7 @@ use crate::asynk::async_queue::AsyncQueueable; use crate::asynk::async_queue::DEFAULT_TASK_TYPE; use crate::asynk::async_worker::AsyncWorker; -use crate::asynk::AsyncError as Error; +use crate::FangError; use crate::{RetentionMode, SleepParams}; use async_recursion::async_recursion; use log::error; @@ -61,7 +61,7 @@ where sleep_params: SleepParams, retention_mode: RetentionMode, task_type: String, - ) -> JoinHandle> { + ) -> JoinHandle> { tokio::spawn(async move { Self::run_worker(queue, sleep_params, retention_mode, task_type).await }) @@ -71,7 +71,7 @@ where sleep_params: SleepParams, retention_mode: RetentionMode, task_type: String, - ) -> Result<(), Error> { + ) -> Result<(), FangError> { let mut worker: AsyncWorker = AsyncWorker::builder() .queue(queue) .sleep_params(sleep_params) diff --git a/src/blocking.rs b/src/blocking.rs index 80cf811..7f4f296 100644 --- a/src/blocking.rs +++ b/src/blocking.rs @@ -5,7 +5,6 @@ pub mod schema; pub mod worker; pub mod worker_pool; -pub use error::FangError; pub use queue::*; pub use runnable::Runnable; pub use schema::*; diff --git a/src/blocking/error.rs b/src/blocking/error.rs index f7da9bc..a5205fb 100644 --- a/src/blocking/error.rs +++ b/src/blocking/error.rs @@ -1,23 +1,31 @@ use crate::blocking::queue::QueueError; +use crate::FangError; +use diesel::r2d2::PoolError; +use diesel::result::Error as DieselError; use std::io::Error as IoError; -use std::sync::PoisonError; -use thiserror::Error; -#[derive(Error, Debug)] -pub enum FangError { - #[error("The shared state in an executor thread became poisoned")] - PoisonedLock, - #[error(transparent)] - QueueError(#[from] QueueError), - #[error("Failed to create executor thread")] - ExecutorThreadCreationFailed { - #[from] - source: IoError, - }, -} - -impl From> for FangError { - fn from(_: PoisonError) -> Self { - Self::PoisonedLock +impl From for FangError { + fn from(error: IoError) -> Self { + let description = format!("{:?}", error); + FangError { description } + } +} + +impl From for FangError { + fn from(error: QueueError) -> Self { + let description = format!("{:?}", error); + FangError { description } + } +} + +impl From for FangError { + fn from(error: DieselError) -> Self { + Self::from(QueueError::DieselError(error)) + } +} + +impl From for FangError { + fn from(error: PoolError) -> Self { + Self::from(QueueError::PoolError(error)) } } diff --git a/src/blocking/queue.rs b/src/blocking/queue.rs index 249d4dd..1b5a81f 100644 --- a/src/blocking/queue.rs +++ b/src/blocking/queue.rs @@ -369,7 +369,7 @@ mod queue_tests { use crate::runnable::COMMON_TYPE; use crate::schema::FangTaskState; use crate::typetag; - use crate::worker::Error as WorkerError; + use crate::FangError; use chrono::Utc; use diesel::connection::Connection; use diesel::result::Error; @@ -382,7 +382,7 @@ mod queue_tests { #[typetag::serde] impl Runnable for PepeTask { - fn run(&self, _queue: &dyn Queueable) -> Result<(), WorkerError> { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { println!("the number is {}", self.number); Ok(()) @@ -399,7 +399,7 @@ mod queue_tests { #[typetag::serde] impl Runnable for AyratTask { - fn run(&self, _queue: &dyn Queueable) -> Result<(), WorkerError> { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { println!("the number is {}", self.number); Ok(()) diff --git a/src/blocking/runnable.rs b/src/blocking/runnable.rs index 1d787cb..0ddad54 100644 --- a/src/blocking/runnable.rs +++ b/src/blocking/runnable.rs @@ -1,12 +1,12 @@ use crate::queue::Queueable; -use crate::Error; +use crate::FangError; use crate::Scheduled; pub const COMMON_TYPE: &str = "common"; #[typetag::serde(tag = "type")] pub trait Runnable { - fn run(&self, _queueable: &dyn Queueable) -> Result<(), Error>; + fn run(&self, _queueable: &dyn Queueable) -> Result<(), FangError>; fn task_type(&self) -> String { COMMON_TYPE.to_string() diff --git a/src/blocking/worker.rs b/src/blocking/worker.rs index 67f7255..bae864d 100644 --- a/src/blocking/worker.rs +++ b/src/blocking/worker.rs @@ -1,9 +1,9 @@ -use crate::error::FangError; use crate::queue::Queueable; use crate::queue::Task; use crate::runnable::Runnable; use crate::runnable::COMMON_TYPE; use crate::schema::FangTaskState; +use crate::FangError; use crate::Scheduled::*; use crate::{RetentionMode, SleepParams}; use log::error; @@ -25,11 +25,6 @@ where pub retention_mode: RetentionMode, } -#[derive(Debug)] -pub struct Error { - pub description: String, -} - impl Worker where BQueue: Queueable + Clone + Sync + Send + 'static, @@ -148,7 +143,6 @@ where #[cfg(test)] mod worker_tests { - use super::Error; use super::RetentionMode; use super::Runnable; use super::Worker; @@ -156,6 +150,7 @@ mod worker_tests { use crate::queue::Queueable; use crate::schema::FangTaskState; use crate::typetag; + use crate::FangError; use chrono::Utc; use serde::{Deserialize, Serialize}; @@ -166,7 +161,7 @@ mod worker_tests { #[typetag::serde] impl Runnable for WorkerTaskTest { - fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { println!("the number is {}", self.number); Ok(()) @@ -184,10 +179,10 @@ mod worker_tests { #[typetag::serde] impl Runnable for FailedTask { - fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { let message = format!("the number is {}", self.number); - Err(Error { + Err(FangError { description: message, }) } @@ -202,7 +197,7 @@ mod worker_tests { #[typetag::serde] impl Runnable for TaskType1 { - fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { Ok(()) } @@ -216,7 +211,7 @@ mod worker_tests { #[typetag::serde] impl Runnable for TaskType2 { - fn run(&self, _queue: &dyn Queueable) -> Result<(), Error> { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { Ok(()) } diff --git a/src/blocking/worker_pool.rs b/src/blocking/worker_pool.rs index 1bfada8..d445535 100644 --- a/src/blocking/worker_pool.rs +++ b/src/blocking/worker_pool.rs @@ -1,6 +1,6 @@ -use crate::error::FangError; use crate::queue::Queueable; use crate::worker::Worker; +use crate::FangError; use crate::RetentionMode; use crate::SleepParams; use log::error; diff --git a/src/lib.rs b/src/lib.rs index fe0f7c1..47d3f27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,6 +62,11 @@ impl Default for SleepParams { } } +#[derive(Debug)] +pub struct FangError { + pub description: String, +} + #[macro_use] #[cfg(feature = "blocking")] extern crate diesel;