diff --git a/diesel.toml b/diesel.toml new file mode 100644 index 0000000..73a18e3 --- /dev/null +++ b/diesel.toml @@ -0,0 +1,2 @@ +[print_schema] +file = "src/blocking/schema.rs" \ No newline at end of file diff --git a/migrations/2022-08-20-151615_create_fang_tasks/up.sql b/migrations/2022-08-20-151615_create_fang_tasks/up.sql index ebebb6e..cd4b354 100644 --- a/migrations/2022-08-20-151615_create_fang_tasks/up.sql +++ b/migrations/2022-08-20-151615_create_fang_tasks/up.sql @@ -1,6 +1,6 @@ CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished'); +CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished', 'retried'); CREATE TABLE fang_tasks ( id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), @@ -9,6 +9,7 @@ CREATE TABLE fang_tasks ( state fang_task_state DEFAULT 'new' NOT NULL, task_type VARCHAR DEFAULT 'common' NOT NULL, uniq_hash CHAR(64), + retries INTEGER DEFAULT 0 NOT NULL, scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW() diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index fc16953..641655c 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -10,6 +10,7 @@ use bb8_postgres::tokio_postgres::Socket; use bb8_postgres::tokio_postgres::Transaction; use bb8_postgres::PostgresConnectionManager; use chrono::DateTime; +use chrono::Duration; use chrono::Utc; use cron::Schedule; use postgres_types::{FromSql, ToSql}; @@ -35,6 +36,7 @@ const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sq const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql"); const FIND_TASK_BY_UNIQ_HASH_QUERY: &str = include_str!("queries/find_task_by_uniq_hash.sql"); const FIND_TASK_BY_ID_QUERY: &str = include_str!("queries/find_task_by_id.sql"); +const RETRY_TASK_QUERY: &str = include_str!("queries/retry_task.sql"); pub const DEFAULT_TASK_TYPE: &str = "common"; @@ -49,6 +51,8 @@ pub enum FangTaskState { Failed, #[postgres(name = "finished")] Finished, + #[postgres(name = "retried")] + Retried, } impl Default for FangTaskState { @@ -72,6 +76,8 @@ pub struct Task { #[builder(setter(into))] pub uniq_hash: Option, #[builder(setter(into))] + pub retries: i32, + #[builder(setter(into))] pub scheduled_at: DateTime, #[builder(setter(into))] pub created_at: DateTime, @@ -141,6 +147,13 @@ pub trait AsyncQueueable: Send { -> Result; async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result; + + async fn schedule_retry( + &mut self, + task: &Task, + backoff_seconds: u32, + error: &str, + ) -> Result; } #[derive(TypedBuilder, Debug, Clone)] @@ -311,6 +324,17 @@ impl AsyncQueueable for AsyncQueueTest<'_> { AsyncQueue::::fail_task_query(transaction, task, error_message).await } + + async fn schedule_retry( + &mut self, + task: &Task, + backoff_seconds: u32, + error: &str, + ) -> Result { + let transaction = &mut self.transaction; + + AsyncQueue::::schedule_retry_query(transaction, task, backoff_seconds, error).await + } } impl AsyncQueue @@ -420,6 +444,26 @@ where Ok(failed_task) } + async fn schedule_retry_query( + transaction: &mut Transaction<'_>, + task: &Task, + backoff_seconds: u32, + error: &str, + ) -> Result { + let now = Utc::now(); + let scheduled_at = now + Duration::seconds(backoff_seconds as i64); + let retries = task.retries + 1; + + let row: Row = transaction + .query_one( + RETRY_TASK_QUERY, + &[&error, &retries, &scheduled_at, &now, &task.id], + ) + .await?; + let failed_task = Self::row_to_task(row); + Ok(failed_task) + } + async fn fetch_and_touch_task_query( transaction: &mut Transaction<'_>, task_type: Option, @@ -568,6 +612,7 @@ where let uniq_hash: Option = row.try_get("uniq_hash").ok(); let state: FangTaskState = row.get("state"); let task_type: String = row.get("task_type"); + let retries: i32 = row.get("retries"); let created_at: DateTime = row.get("created_at"); let updated_at: DateTime = row.get("updated_at"); let scheduled_at: DateTime = row.get("scheduled_at"); @@ -579,6 +624,7 @@ where .state(state) .uniq_hash(uniq_hash) .task_type(task_type) + .retries(retries) .created_at(created_at) .updated_at(updated_at) .scheduled_at(scheduled_at) @@ -781,6 +827,23 @@ where Ok(task) } + + async fn schedule_retry( + &mut self, + task: &Task, + backoff_seconds: u32, + error: &str, + ) -> Result { + self.check_if_connection()?; + let mut connection = self.pool.as_ref().unwrap().get().await?; + let mut transaction = connection.transaction().await?; + + let task = + Self::schedule_retry_query(&mut transaction, task, backoff_seconds, error).await?; + transaction.commit().await?; + + Ok(task) + } } #[cfg(test)] diff --git a/src/asynk/async_runnable.rs b/src/asynk/async_runnable.rs index 49b0122..05efa99 100644 --- a/src/asynk/async_runnable.rs +++ b/src/asynk/async_runnable.rs @@ -8,6 +8,7 @@ use bb8_postgres::tokio_postgres::Error as TokioPostgresError; use serde_json::Error as SerdeError; const COMMON_TYPE: &str = "common"; +pub const RETRIES_NUMBER: i32 = 20; impl From for FangError { fn from(error: AsyncQueueError) -> Self { @@ -52,4 +53,12 @@ pub trait AsyncRunnable: Send + Sync { fn cron(&self) -> Option { None } + + fn max_retries(&self) -> i32 { + RETRIES_NUMBER + } + + fn backoff(&self, attempt: u32) -> u32 { + u32::pow(2, attempt) + } } diff --git a/src/asynk/async_worker.rs b/src/asynk/async_worker.rs index e362dcf..97543f1 100644 --- a/src/asynk/async_worker.rs +++ b/src/asynk/async_worker.rs @@ -31,62 +31,64 @@ where pub async fn run( &mut self, task: Task, - actual_task: Box, + runnable: Box, ) -> Result<(), FangError> { - let result = self.execute_task(task, actual_task).await; - self.finalize_task(result).await - } + let result = runnable.run(&mut self.queue).await; - async fn execute_task( - &mut self, - task: Task, - actual_task: Box, - ) -> Result { - let task_result = actual_task.run(&mut self.queue).await; - match task_result { - Ok(()) => Ok(task), - Err(error) => Err((task, error.description)), + match result { + Ok(_) => self.finalize_task(task, &result).await?, + + Err(ref error) => { + if task.retries < runnable.max_retries() { + let backoff_seconds = runnable.backoff(task.retries as u32); + + self.queue + .schedule_retry(&task, backoff_seconds, &error.description) + .await?; + } else { + self.finalize_task(task, &result).await?; + } + } } + + Ok(()) } async fn finalize_task( &mut self, - result: Result, + task: Task, + result: &Result<(), FangError>, ) -> Result<(), FangError> { match self.retention_mode { RetentionMode::KeepAll => match result { - Ok(task) => { + Ok(_) => { self.queue .update_task_state(task, FangTaskState::Finished) .await?; - Ok(()) } - Err((task, error)) => { - self.queue.fail_task(task, &error).await?; - Ok(()) + Err(error) => { + self.queue.fail_task(task, &error.description).await?; } }, RetentionMode::RemoveAll => match result { - Ok(task) => { + Ok(_) => { self.queue.remove_task(task.id).await?; - Ok(()) } - Err((task, _error)) => { + Err(_error) => { self.queue.remove_task(task.id).await?; - Ok(()) } }, RetentionMode::RemoveFinished => match result { - Ok(task) => { + Ok(_) => { self.queue.remove_task(task.id).await?; - Ok(()) } - Err((task, error)) => { - self.queue.fail_task(task, &error).await?; - Ok(()) + Err(error) => { + self.queue.fail_task(task, &error.description).await?; } }, - } + }; + + Ok(()) } pub async fn sleep(&mut self) { @@ -148,62 +150,64 @@ impl<'a> AsyncWorkerTest<'a> { pub async fn run( &mut self, task: Task, - actual_task: Box, + runnable: Box, ) -> Result<(), FangError> { - let result = self.execute_task(task, actual_task).await; - self.finalize_task(result).await - } + let result = runnable.run(self.queue).await; - async fn execute_task( - &mut self, - task: Task, - actual_task: Box, - ) -> Result { - let task_result = actual_task.run(self.queue).await; - match task_result { - Ok(()) => Ok(task), - Err(error) => Err((task, error.description)), + match result { + Ok(_) => self.finalize_task(task, &result).await?, + + Err(ref error) => { + if task.retries < runnable.max_retries() { + let backoff_seconds = runnable.backoff(task.retries as u32); + + self.queue + .schedule_retry(&task, backoff_seconds, &error.description) + .await?; + } else { + self.finalize_task(task, &result).await?; + } + } } + + Ok(()) } async fn finalize_task( &mut self, - result: Result, + task: Task, + result: &Result<(), FangError>, ) -> Result<(), FangError> { match self.retention_mode { RetentionMode::KeepAll => match result { - Ok(task) => { + Ok(_) => { self.queue .update_task_state(task, FangTaskState::Finished) .await?; - Ok(()) } - Err((task, error)) => { - self.queue.fail_task(task, &error).await?; - Ok(()) + Err(error) => { + self.queue.fail_task(task, &error.description).await?; } }, RetentionMode::RemoveAll => match result { - Ok(task) => { + Ok(_) => { self.queue.remove_task(task.id).await?; - Ok(()) } - Err((task, _error)) => { + Err(_error) => { self.queue.remove_task(task.id).await?; - Ok(()) } }, RetentionMode::RemoveFinished => match result { - Ok(task) => { + Ok(_) => { self.queue.remove_task(task.id).await?; - Ok(()) } - Err((task, error)) => { - self.queue.fail_task(task, &error).await?; - Ok(()) + Err(error) => { + self.queue.fail_task(task, &error.description).await?; } }, - } + }; + + Ok(()) } pub async fn sleep(&mut self) { @@ -308,6 +312,29 @@ mod async_worker_tests { description: message, }) } + + fn max_retries(&self) -> i32 { + 0 + } + } + + #[derive(Serialize, Deserialize, Clone)] + struct AsyncRetryTask {} + + #[typetag::serde] + #[async_trait] + impl AsyncRunnable for AsyncRetryTask { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> { + let message = "Failed".to_string(); + + Err(FangError { + description: message, + }) + } + + fn max_retries(&self) -> i32 { + 2 + } } #[derive(Serialize, Deserialize)] @@ -339,6 +366,7 @@ mod async_worker_tests { "type2".to_string() } } + #[tokio::test] async fn execute_and_finishes_task() { let pool = pool().await; @@ -398,6 +426,51 @@ mod async_worker_tests { assert_eq!(FangTaskState::Finished, task.state); } + #[tokio::test] + async fn retries_task_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let transaction = connection.transaction().await.unwrap(); + + let mut test = AsyncQueueTest::builder().transaction(transaction).build(); + + let actual_task = AsyncRetryTask {}; + + let task = test.insert_task(&actual_task).await.unwrap(); + + let id = task.id; + + let mut worker = AsyncWorkerTest::builder() + .queue(&mut test as &mut dyn AsyncQueueable) + .retention_mode(RetentionMode::KeepAll) + .build(); + + worker.run_tasks_until_none().await.unwrap(); + + let task = worker.queue.find_task_by_id(id).await.unwrap(); + + assert_eq!(id, task.id); + assert_eq!(FangTaskState::Retried, task.state); + assert_eq!(1, task.retries); + + tokio::time::sleep(core::time::Duration::from_secs(5)).await; + worker.run_tasks_until_none().await.unwrap(); + + let task = worker.queue.find_task_by_id(id).await.unwrap(); + + assert_eq!(id, task.id); + assert_eq!(FangTaskState::Retried, task.state); + assert_eq!(2, task.retries); + + tokio::time::sleep(core::time::Duration::from_secs(10)).await; + worker.run_tasks_until_none().await.unwrap(); + + let task = test.find_task_by_id(id).await.unwrap(); + assert_eq!(id, task.id); + assert_eq!(FangTaskState::Failed, task.state); + assert_eq!("Failed".to_string(), task.error_message.unwrap()); + } + #[tokio::test] async fn saves_error_for_failed_task() { let pool = pool().await; @@ -426,6 +499,7 @@ mod async_worker_tests { ); test.transaction.rollback().await.unwrap(); } + #[tokio::test] async fn executes_task_only_of_specific_type() { let pool = pool().await; diff --git a/src/asynk/queries/fetch_task_type.sql b/src/asynk/queries/fetch_task_type.sql index 91e2f85..e055820 100644 --- a/src/asynk/queries/fetch_task_type.sql +++ b/src/asynk/queries/fetch_task_type.sql @@ -1 +1 @@ -SELECT * FROM fang_tasks WHERE task_type = $1 AND state = 'new' AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED +SELECT * FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED diff --git a/src/asynk/queries/find_task_by_uniq_hash.sql b/src/asynk/queries/find_task_by_uniq_hash.sql index 3694a58..cb53f45 100644 --- a/src/asynk/queries/find_task_by_uniq_hash.sql +++ b/src/asynk/queries/find_task_by_uniq_hash.sql @@ -1 +1 @@ -SELECT * FROM fang_tasks WHERE uniq_hash = $1 AND state = 'new' LIMIT 1 +SELECT * FROM fang_tasks WHERE uniq_hash = $1 AND state in ('new', 'retried') LIMIT 1 diff --git a/src/asynk/queries/retry_task.sql b/src/asynk/queries/retry_task.sql new file mode 100644 index 0000000..f26267c --- /dev/null +++ b/src/asynk/queries/retry_task.sql @@ -0,0 +1 @@ +UPDATE "fang_tasks" SET "state" = 'retried' , "error_message" = $1, "retries" = $2, scheduled_at = $3, "updated_at" = $4 WHERE id = $5 RETURNING * diff --git a/src/blocking.rs b/src/blocking.rs index 7f4f296..1f4f214 100644 --- a/src/blocking.rs +++ b/src/blocking.rs @@ -1,10 +1,12 @@ pub mod error; +pub mod fang_task_state; pub mod queue; pub mod runnable; pub mod schema; pub mod worker; pub mod worker_pool; +pub use fang_task_state::FangTaskState; pub use queue::*; pub use runnable::Runnable; pub use schema::*; diff --git a/src/blocking/fang_task_state.rs b/src/blocking/fang_task_state.rs new file mode 100644 index 0000000..14c983f --- /dev/null +++ b/src/blocking/fang_task_state.rs @@ -0,0 +1,9 @@ +#[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)] +#[DieselTypePath = "crate::schema::sql_types::FangTaskState"] +pub enum FangTaskState { + New, + InProgress, + Failed, + Finished, + Retried, +} diff --git a/src/blocking/queue.rs b/src/blocking/queue.rs index 47060fd..8fa570c 100644 --- a/src/blocking/queue.rs +++ b/src/blocking/queue.rs @@ -1,9 +1,10 @@ +use crate::fang_task_state::FangTaskState; use crate::runnable::Runnable; use crate::schema::fang_tasks; -use crate::schema::FangTaskState; use crate::CronError; use crate::Scheduled::*; use chrono::DateTime; +use chrono::Duration; use chrono::Utc; use cron::Schedule; use diesel::pg::PgConnection; @@ -43,6 +44,8 @@ pub struct Task { #[builder(setter(into))] pub uniq_hash: Option, #[builder(setter(into))] + pub retries: i32, + #[builder(setter(into))] pub scheduled_at: DateTime, #[builder(setter(into))] pub created_at: DateTime, @@ -102,9 +105,16 @@ pub trait Queueable { fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result; - fn fail_task(&self, task: &Task, error: String) -> Result; + fn fail_task(&self, task: &Task, error: &str) -> Result; fn schedule_task(&self, task: &dyn Runnable) -> Result; + + fn schedule_retry( + &self, + task: &Task, + backoff_in_seconds: u32, + error: &str, + ) -> Result; } #[derive(Clone, TypedBuilder)] @@ -173,7 +183,7 @@ impl Queueable for Queue { Self::update_task_state_query(&mut connection, task, state) } - fn fail_task(&self, task: &Task, error: String) -> Result { + fn fail_task(&self, task: &Task, error: &str) -> Result { let mut connection = self.get_connection()?; Self::fail_task_query(&mut connection, task, error) @@ -184,6 +194,17 @@ impl Queueable for Queue { Self::find_task_by_id_query(&mut connection, id) } + + fn schedule_retry( + &self, + task: &Task, + backoff_seconds: u32, + error: &str, + ) -> Result { + let mut connection = self.get_connection()?; + + Self::schedule_retry_query(&mut connection, task, backoff_seconds, error) + } } impl Queue { @@ -357,7 +378,7 @@ impl Queue { pub fn fail_task_query( connection: &mut PgConnection, task: &Task, - error: String, + error: &str, ) -> Result { Ok(diesel::update(task) .set(( @@ -392,7 +413,7 @@ impl Queue { .order(fang_tasks::scheduled_at.asc()) .limit(1) .filter(fang_tasks::scheduled_at.le(Utc::now())) - .filter(fang_tasks::state.eq(FangTaskState::New)) + .filter(fang_tasks::state.eq_any(vec![FangTaskState::New, FangTaskState::Retried])) .filter(fang_tasks::task_type.eq(task_type)) .for_update() .skip_locked() @@ -406,10 +427,32 @@ impl Queue { ) -> Option { fang_tasks::table .filter(fang_tasks::uniq_hash.eq(uniq_hash)) - .filter(fang_tasks::state.eq(FangTaskState::New)) + .filter(fang_tasks::state.eq_any(vec![FangTaskState::New, FangTaskState::Retried])) .first::(connection) .ok() } + + pub fn schedule_retry_query( + connection: &mut PgConnection, + task: &Task, + backoff_seconds: u32, + error: &str, + ) -> Result { + let now = Self::current_time(); + let scheduled_at = now + Duration::seconds(backoff_seconds as i64); + + let task = diesel::update(task) + .set(( + fang_tasks::state.eq(FangTaskState::Retried), + fang_tasks::error_message.eq(error), + fang_tasks::retries.eq(task.retries + 1), + fang_tasks::scheduled_at.eq(scheduled_at), + fang_tasks::updated_at.eq(now), + )) + .get_result::(connection)?; + + Ok(task) + } } #[cfg(test)] @@ -417,9 +460,9 @@ mod queue_tests { use super::Queue; use super::Queueable; use crate::chrono::SubsecRound; + use crate::fang_task_state::FangTaskState; use crate::runnable::Runnable; use crate::runnable::COMMON_TYPE; - use crate::schema::FangTaskState; use crate::typetag; use crate::FangError; use crate::Scheduled; @@ -586,7 +629,7 @@ mod queue_tests { let error = "Failed".to_string(); - let found_task = Queue::fail_task_query(conn, &task, error.clone()).unwrap(); + let found_task = Queue::fail_task_query(conn, &task, &error).unwrap(); let metadata = found_task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); diff --git a/src/blocking/runnable.rs b/src/blocking/runnable.rs index 0ddad54..6a84150 100644 --- a/src/blocking/runnable.rs +++ b/src/blocking/runnable.rs @@ -3,6 +3,7 @@ use crate::FangError; use crate::Scheduled; pub const COMMON_TYPE: &str = "common"; +pub const RETRIES_NUMBER: i32 = 20; #[typetag::serde(tag = "type")] pub trait Runnable { @@ -19,4 +20,12 @@ pub trait Runnable { fn cron(&self) -> Option { None } + + fn max_retries(&self) -> i32 { + RETRIES_NUMBER + } + + fn backoff(&self, attempt: u32) -> u32 { + u32::pow(2, attempt) + } } diff --git a/src/blocking/schema.rs b/src/blocking/schema.rs index 28a0fd2..4f9dff2 100644 --- a/src/blocking/schema.rs +++ b/src/blocking/schema.rs @@ -1,27 +1,14 @@ +// @generated automatically by Diesel CLI. + pub mod sql_types { #[derive(diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "fang_task_state"))] pub struct FangTaskState; } -#[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)] -#[DieselTypePath = "crate::blocking::schema::sql_types::FangTaskState"] -pub enum FangTaskState { - New, - InProgress, - Failed, - Finished, -} - -table! { +diesel::table! { + use diesel::sql_types::*; use super::sql_types::FangTaskState; - use diesel::sql_types::Jsonb; - use diesel::sql_types::Nullable; - use diesel::sql_types::Text; - use diesel::sql_types::Timestamptz; - use diesel::sql_types::Uuid; - use diesel::sql_types::Varchar; - use diesel::pg::sql_types::Bpchar; fang_tasks (id) { id -> Uuid, @@ -30,6 +17,7 @@ table! { state -> FangTaskState, task_type -> Varchar, uniq_hash -> Nullable, + retries -> Int4, scheduled_at -> Timestamptz, created_at -> Timestamptz, updated_at -> Timestamptz, diff --git a/src/blocking/worker.rs b/src/blocking/worker.rs index 1fc451a..590d637 100644 --- a/src/blocking/worker.rs +++ b/src/blocking/worker.rs @@ -1,8 +1,11 @@ +#![allow(clippy::borrowed_box)] +#![allow(clippy::unnecessary_unwrap)] + +use crate::fang_task_state::FangTaskState; use crate::queue::Queueable; use crate::queue::Task; use crate::runnable::Runnable; use crate::runnable::COMMON_TYPE; -use crate::schema::FangTaskState; use crate::FangError; use crate::Scheduled::*; use crate::{RetentionMode, SleepParams}; @@ -30,8 +33,23 @@ where BQueue: Queueable + Clone + Sync + Send + 'static, { pub fn run(&self, task: Task) { - let result = self.execute_task(task); - self.finalize_task(result) + let runnable: Box = serde_json::from_value(task.metadata.clone()).unwrap(); + let result = runnable.run(&self.queue); + + match result { + Ok(_) => self.finalize_task(task, &result), + Err(ref error) => { + if task.retries < runnable.max_retries() { + let backoff_seconds = runnable.backoff(task.retries as u32); + + self.queue + .schedule_retry(&task, backoff_seconds, &error.description) + .expect("Failed to retry"); + } else { + self.finalize_task(task, &result); + } + } + } } pub fn run_tasks(&mut self) -> Result<(), FangError> { @@ -102,39 +120,31 @@ where thread::sleep(self.sleep_params.sleep_period); } - fn execute_task(&self, task: Task) -> Result { - let actual_task: Box = serde_json::from_value(task.metadata.clone()).unwrap(); - let task_result = actual_task.run(&self.queue); - - match task_result { - Ok(()) => Ok(task), - Err(error) => Err((task, error.description)), - } - } - - fn finalize_task(&self, result: Result) { + fn finalize_task(&self, task: Task, result: &Result<(), FangError>) { match self.retention_mode { RetentionMode::KeepAll => { match result { - Ok(task) => self + Ok(_) => self .queue .update_task_state(&task, FangTaskState::Finished) .unwrap(), - Err((task, error)) => self.queue.fail_task(&task, error).unwrap(), + Err(error) => self.queue.fail_task(&task, &error.description).unwrap(), }; } + RetentionMode::RemoveAll => { match result { - Ok(task) => self.queue.remove_task(task.id).unwrap(), - Err((task, _error)) => self.queue.remove_task(task.id).unwrap(), + Ok(_) => self.queue.remove_task(task.id).unwrap(), + Err(_error) => self.queue.remove_task(task.id).unwrap(), }; } + RetentionMode::RemoveFinished => match result { - Ok(task) => { + Ok(_) => { self.queue.remove_task(task.id).unwrap(); } - Err((task, error)) => { - self.queue.fail_task(&task, error).unwrap(); + Err(error) => { + self.queue.fail_task(&task, &error.description).unwrap(); } }, } @@ -146,9 +156,9 @@ mod worker_tests { use super::RetentionMode; use super::Runnable; use super::Worker; + use crate::fang_task_state::FangTaskState; use crate::queue::Queue; use crate::queue::Queueable; - use crate::schema::FangTaskState; use crate::typetag; use crate::FangError; use chrono::Utc; @@ -187,11 +197,39 @@ mod worker_tests { }) } + fn max_retries(&self) -> i32 { + 0 + } + fn task_type(&self) -> String { "F_task".to_string() } } + #[derive(Serialize, Deserialize)] + struct RetryTask { + pub number: u16, + } + + #[typetag::serde] + impl Runnable for RetryTask { + fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> { + let message = format!("Saving Pepe. Attempt {}", self.number); + + Err(FangError { + description: message, + }) + } + + fn max_retries(&self) -> i32 { + 2 + } + + fn task_type(&self) -> String { + "Retry_task".to_string() + } + } + #[derive(Serialize, Deserialize)] struct TaskType1 {} @@ -322,4 +360,53 @@ mod worker_tests { Queue::remove_tasks_of_type_query(&mut pooled_connection, "F_task").unwrap(); } + + #[test] + #[ignore] + fn retries_task() { + let task = RetryTask { number: 10 }; + + let pool = Queue::connection_pool(5); + + let queue = Queue::builder().connection_pool(pool).build(); + + let mut worker = Worker::::builder() + .queue(queue) + .retention_mode(RetentionMode::KeepAll) + .task_type(task.task_type()) + .build(); + + let mut pooled_connection = worker.queue.connection_pool.get().unwrap(); + + let task = Queue::insert_query(&mut pooled_connection, &task, Utc::now()).unwrap(); + + assert_eq!(FangTaskState::New, task.state); + + worker.run(task.clone()); + + std::thread::sleep(std::time::Duration::from_millis(1000)); + + let found_task = Queue::find_task_by_id_query(&mut pooled_connection, task.id).unwrap(); + + assert_eq!(FangTaskState::Retried, found_task.state); + assert_eq!(1, found_task.retries); + + worker.run_tasks_until_none().unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(14000)); + + worker.run_tasks_until_none().unwrap(); + + let found_task = Queue::find_task_by_id_query(&mut pooled_connection, task.id).unwrap(); + + assert_eq!(FangTaskState::Failed, found_task.state); + assert_eq!(2, found_task.retries); + + assert_eq!( + "Saving Pepe. Attempt 10".to_string(), + found_task.error_message.unwrap() + ); + + Queue::remove_tasks_of_type_query(&mut pooled_connection, "Retry_task").unwrap(); + } } diff --git a/src/lib.rs b/src/lib.rs index 17ef76e..edb2b18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,7 +68,6 @@ pub struct FangError { pub description: String, } -#[macro_use] #[cfg(feature = "blocking")] extern crate diesel;