Retries (#92)
* add retries and errors fields * Exponential backoff * async retries (#93)
This commit is contained in:
parent
b76e134ca1
commit
7c0aa2fab9
15 changed files with 396 additions and 109 deletions
2
diesel.toml
Normal file
2
diesel.toml
Normal file
|
@ -0,0 +1,2 @@
|
|||
[print_schema]
|
||||
file = "src/blocking/schema.rs"
|
|
@ -1,6 +1,6 @@
|
|||
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
||||
|
||||
CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished');
|
||||
CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished', 'retried');
|
||||
|
||||
CREATE TABLE fang_tasks (
|
||||
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
|
||||
|
@ -9,6 +9,7 @@ CREATE TABLE fang_tasks (
|
|||
state fang_task_state DEFAULT 'new' NOT NULL,
|
||||
task_type VARCHAR DEFAULT 'common' NOT NULL,
|
||||
uniq_hash CHAR(64),
|
||||
retries INTEGER DEFAULT 0 NOT NULL,
|
||||
scheduled_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
||||
|
|
|
@ -10,6 +10,7 @@ use bb8_postgres::tokio_postgres::Socket;
|
|||
use bb8_postgres::tokio_postgres::Transaction;
|
||||
use bb8_postgres::PostgresConnectionManager;
|
||||
use chrono::DateTime;
|
||||
use chrono::Duration;
|
||||
use chrono::Utc;
|
||||
use cron::Schedule;
|
||||
use postgres_types::{FromSql, ToSql};
|
||||
|
@ -35,6 +36,7 @@ const REMOVE_TASKS_TYPE_QUERY: &str = include_str!("queries/remove_tasks_type.sq
|
|||
const FETCH_TASK_TYPE_QUERY: &str = include_str!("queries/fetch_task_type.sql");
|
||||
const FIND_TASK_BY_UNIQ_HASH_QUERY: &str = include_str!("queries/find_task_by_uniq_hash.sql");
|
||||
const FIND_TASK_BY_ID_QUERY: &str = include_str!("queries/find_task_by_id.sql");
|
||||
const RETRY_TASK_QUERY: &str = include_str!("queries/retry_task.sql");
|
||||
|
||||
pub const DEFAULT_TASK_TYPE: &str = "common";
|
||||
|
||||
|
@ -49,6 +51,8 @@ pub enum FangTaskState {
|
|||
Failed,
|
||||
#[postgres(name = "finished")]
|
||||
Finished,
|
||||
#[postgres(name = "retried")]
|
||||
Retried,
|
||||
}
|
||||
|
||||
impl Default for FangTaskState {
|
||||
|
@ -72,6 +76,8 @@ pub struct Task {
|
|||
#[builder(setter(into))]
|
||||
pub uniq_hash: Option<String>,
|
||||
#[builder(setter(into))]
|
||||
pub retries: i32,
|
||||
#[builder(setter(into))]
|
||||
pub scheduled_at: DateTime<Utc>,
|
||||
#[builder(setter(into))]
|
||||
pub created_at: DateTime<Utc>,
|
||||
|
@ -141,6 +147,13 @@ pub trait AsyncQueueable: Send {
|
|||
-> Result<Task, AsyncQueueError>;
|
||||
|
||||
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;
|
||||
|
||||
async fn schedule_retry(
|
||||
&mut self,
|
||||
task: &Task,
|
||||
backoff_seconds: u32,
|
||||
error: &str,
|
||||
) -> Result<Task, AsyncQueueError>;
|
||||
}
|
||||
|
||||
#[derive(TypedBuilder, Debug, Clone)]
|
||||
|
@ -311,6 +324,17 @@ impl AsyncQueueable for AsyncQueueTest<'_> {
|
|||
|
||||
AsyncQueue::<NoTls>::fail_task_query(transaction, task, error_message).await
|
||||
}
|
||||
|
||||
async fn schedule_retry(
|
||||
&mut self,
|
||||
task: &Task,
|
||||
backoff_seconds: u32,
|
||||
error: &str,
|
||||
) -> Result<Task, AsyncQueueError> {
|
||||
let transaction = &mut self.transaction;
|
||||
|
||||
AsyncQueue::<NoTls>::schedule_retry_query(transaction, task, backoff_seconds, error).await
|
||||
}
|
||||
}
|
||||
|
||||
impl<Tls> AsyncQueue<Tls>
|
||||
|
@ -420,6 +444,26 @@ where
|
|||
Ok(failed_task)
|
||||
}
|
||||
|
||||
async fn schedule_retry_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
task: &Task,
|
||||
backoff_seconds: u32,
|
||||
error: &str,
|
||||
) -> Result<Task, AsyncQueueError> {
|
||||
let now = Utc::now();
|
||||
let scheduled_at = now + Duration::seconds(backoff_seconds as i64);
|
||||
let retries = task.retries + 1;
|
||||
|
||||
let row: Row = transaction
|
||||
.query_one(
|
||||
RETRY_TASK_QUERY,
|
||||
&[&error, &retries, &scheduled_at, &now, &task.id],
|
||||
)
|
||||
.await?;
|
||||
let failed_task = Self::row_to_task(row);
|
||||
Ok(failed_task)
|
||||
}
|
||||
|
||||
async fn fetch_and_touch_task_query(
|
||||
transaction: &mut Transaction<'_>,
|
||||
task_type: Option<String>,
|
||||
|
@ -568,6 +612,7 @@ where
|
|||
let uniq_hash: Option<String> = row.try_get("uniq_hash").ok();
|
||||
let state: FangTaskState = row.get("state");
|
||||
let task_type: String = row.get("task_type");
|
||||
let retries: i32 = row.get("retries");
|
||||
let created_at: DateTime<Utc> = row.get("created_at");
|
||||
let updated_at: DateTime<Utc> = row.get("updated_at");
|
||||
let scheduled_at: DateTime<Utc> = row.get("scheduled_at");
|
||||
|
@ -579,6 +624,7 @@ where
|
|||
.state(state)
|
||||
.uniq_hash(uniq_hash)
|
||||
.task_type(task_type)
|
||||
.retries(retries)
|
||||
.created_at(created_at)
|
||||
.updated_at(updated_at)
|
||||
.scheduled_at(scheduled_at)
|
||||
|
@ -781,6 +827,23 @@ where
|
|||
|
||||
Ok(task)
|
||||
}
|
||||
|
||||
async fn schedule_retry(
|
||||
&mut self,
|
||||
task: &Task,
|
||||
backoff_seconds: u32,
|
||||
error: &str,
|
||||
) -> Result<Task, AsyncQueueError> {
|
||||
self.check_if_connection()?;
|
||||
let mut connection = self.pool.as_ref().unwrap().get().await?;
|
||||
let mut transaction = connection.transaction().await?;
|
||||
|
||||
let task =
|
||||
Self::schedule_retry_query(&mut transaction, task, backoff_seconds, error).await?;
|
||||
transaction.commit().await?;
|
||||
|
||||
Ok(task)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -8,6 +8,7 @@ use bb8_postgres::tokio_postgres::Error as TokioPostgresError;
|
|||
use serde_json::Error as SerdeError;
|
||||
|
||||
const COMMON_TYPE: &str = "common";
|
||||
pub const RETRIES_NUMBER: i32 = 20;
|
||||
|
||||
impl From<AsyncQueueError> for FangError {
|
||||
fn from(error: AsyncQueueError) -> Self {
|
||||
|
@ -52,4 +53,12 @@ pub trait AsyncRunnable: Send + Sync {
|
|||
fn cron(&self) -> Option<Scheduled> {
|
||||
None
|
||||
}
|
||||
|
||||
fn max_retries(&self) -> i32 {
|
||||
RETRIES_NUMBER
|
||||
}
|
||||
|
||||
fn backoff(&self, attempt: u32) -> u32 {
|
||||
u32::pow(2, attempt)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,62 +31,64 @@ where
|
|||
pub async fn run(
|
||||
&mut self,
|
||||
task: Task,
|
||||
actual_task: Box<dyn AsyncRunnable>,
|
||||
runnable: Box<dyn AsyncRunnable>,
|
||||
) -> Result<(), FangError> {
|
||||
let result = self.execute_task(task, actual_task).await;
|
||||
self.finalize_task(result).await
|
||||
let result = runnable.run(&mut self.queue).await;
|
||||
|
||||
match result {
|
||||
Ok(_) => self.finalize_task(task, &result).await?,
|
||||
|
||||
Err(ref error) => {
|
||||
if task.retries < runnable.max_retries() {
|
||||
let backoff_seconds = runnable.backoff(task.retries as u32);
|
||||
|
||||
self.queue
|
||||
.schedule_retry(&task, backoff_seconds, &error.description)
|
||||
.await?;
|
||||
} else {
|
||||
self.finalize_task(task, &result).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_task(
|
||||
&mut self,
|
||||
task: Task,
|
||||
actual_task: Box<dyn AsyncRunnable>,
|
||||
) -> Result<Task, (Task, String)> {
|
||||
let task_result = actual_task.run(&mut self.queue).await;
|
||||
match task_result {
|
||||
Ok(()) => Ok(task),
|
||||
Err(error) => Err((task, error.description)),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finalize_task(
|
||||
&mut self,
|
||||
result: Result<Task, (Task, String)>,
|
||||
task: Task,
|
||||
result: &Result<(), FangError>,
|
||||
) -> Result<(), FangError> {
|
||||
match self.retention_mode {
|
||||
RetentionMode::KeepAll => match result {
|
||||
Ok(task) => {
|
||||
Ok(_) => {
|
||||
self.queue
|
||||
.update_task_state(task, FangTaskState::Finished)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
Err((task, error)) => {
|
||||
self.queue.fail_task(task, &error).await?;
|
||||
Ok(())
|
||||
Err(error) => {
|
||||
self.queue.fail_task(task, &error.description).await?;
|
||||
}
|
||||
},
|
||||
RetentionMode::RemoveAll => match result {
|
||||
Ok(task) => {
|
||||
Ok(_) => {
|
||||
self.queue.remove_task(task.id).await?;
|
||||
Ok(())
|
||||
}
|
||||
Err((task, _error)) => {
|
||||
Err(_error) => {
|
||||
self.queue.remove_task(task.id).await?;
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
RetentionMode::RemoveFinished => match result {
|
||||
Ok(task) => {
|
||||
Ok(_) => {
|
||||
self.queue.remove_task(task.id).await?;
|
||||
Ok(())
|
||||
}
|
||||
Err((task, error)) => {
|
||||
self.queue.fail_task(task, &error).await?;
|
||||
Ok(())
|
||||
Err(error) => {
|
||||
self.queue.fail_task(task, &error.description).await?;
|
||||
}
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn sleep(&mut self) {
|
||||
|
@ -148,62 +150,64 @@ impl<'a> AsyncWorkerTest<'a> {
|
|||
pub async fn run(
|
||||
&mut self,
|
||||
task: Task,
|
||||
actual_task: Box<dyn AsyncRunnable>,
|
||||
runnable: Box<dyn AsyncRunnable>,
|
||||
) -> Result<(), FangError> {
|
||||
let result = self.execute_task(task, actual_task).await;
|
||||
self.finalize_task(result).await
|
||||
let result = runnable.run(self.queue).await;
|
||||
|
||||
match result {
|
||||
Ok(_) => self.finalize_task(task, &result).await?,
|
||||
|
||||
Err(ref error) => {
|
||||
if task.retries < runnable.max_retries() {
|
||||
let backoff_seconds = runnable.backoff(task.retries as u32);
|
||||
|
||||
self.queue
|
||||
.schedule_retry(&task, backoff_seconds, &error.description)
|
||||
.await?;
|
||||
} else {
|
||||
self.finalize_task(task, &result).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_task(
|
||||
&mut self,
|
||||
task: Task,
|
||||
actual_task: Box<dyn AsyncRunnable>,
|
||||
) -> Result<Task, (Task, String)> {
|
||||
let task_result = actual_task.run(self.queue).await;
|
||||
match task_result {
|
||||
Ok(()) => Ok(task),
|
||||
Err(error) => Err((task, error.description)),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn finalize_task(
|
||||
&mut self,
|
||||
result: Result<Task, (Task, String)>,
|
||||
task: Task,
|
||||
result: &Result<(), FangError>,
|
||||
) -> Result<(), FangError> {
|
||||
match self.retention_mode {
|
||||
RetentionMode::KeepAll => match result {
|
||||
Ok(task) => {
|
||||
Ok(_) => {
|
||||
self.queue
|
||||
.update_task_state(task, FangTaskState::Finished)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
Err((task, error)) => {
|
||||
self.queue.fail_task(task, &error).await?;
|
||||
Ok(())
|
||||
Err(error) => {
|
||||
self.queue.fail_task(task, &error.description).await?;
|
||||
}
|
||||
},
|
||||
RetentionMode::RemoveAll => match result {
|
||||
Ok(task) => {
|
||||
Ok(_) => {
|
||||
self.queue.remove_task(task.id).await?;
|
||||
Ok(())
|
||||
}
|
||||
Err((task, _error)) => {
|
||||
Err(_error) => {
|
||||
self.queue.remove_task(task.id).await?;
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
RetentionMode::RemoveFinished => match result {
|
||||
Ok(task) => {
|
||||
Ok(_) => {
|
||||
self.queue.remove_task(task.id).await?;
|
||||
Ok(())
|
||||
}
|
||||
Err((task, error)) => {
|
||||
self.queue.fail_task(task, &error).await?;
|
||||
Ok(())
|
||||
Err(error) => {
|
||||
self.queue.fail_task(task, &error.description).await?;
|
||||
}
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn sleep(&mut self) {
|
||||
|
@ -308,6 +312,29 @@ mod async_worker_tests {
|
|||
description: message,
|
||||
})
|
||||
}
|
||||
|
||||
fn max_retries(&self) -> i32 {
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
struct AsyncRetryTask {}
|
||||
|
||||
#[typetag::serde]
|
||||
#[async_trait]
|
||||
impl AsyncRunnable for AsyncRetryTask {
|
||||
async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), FangError> {
|
||||
let message = "Failed".to_string();
|
||||
|
||||
Err(FangError {
|
||||
description: message,
|
||||
})
|
||||
}
|
||||
|
||||
fn max_retries(&self) -> i32 {
|
||||
2
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
|
@ -339,6 +366,7 @@ mod async_worker_tests {
|
|||
"type2".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn execute_and_finishes_task() {
|
||||
let pool = pool().await;
|
||||
|
@ -398,6 +426,51 @@ mod async_worker_tests {
|
|||
assert_eq!(FangTaskState::Finished, task.state);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn retries_task_test() {
|
||||
let pool = pool().await;
|
||||
let mut connection = pool.get().await.unwrap();
|
||||
let transaction = connection.transaction().await.unwrap();
|
||||
|
||||
let mut test = AsyncQueueTest::builder().transaction(transaction).build();
|
||||
|
||||
let actual_task = AsyncRetryTask {};
|
||||
|
||||
let task = test.insert_task(&actual_task).await.unwrap();
|
||||
|
||||
let id = task.id;
|
||||
|
||||
let mut worker = AsyncWorkerTest::builder()
|
||||
.queue(&mut test as &mut dyn AsyncQueueable)
|
||||
.retention_mode(RetentionMode::KeepAll)
|
||||
.build();
|
||||
|
||||
worker.run_tasks_until_none().await.unwrap();
|
||||
|
||||
let task = worker.queue.find_task_by_id(id).await.unwrap();
|
||||
|
||||
assert_eq!(id, task.id);
|
||||
assert_eq!(FangTaskState::Retried, task.state);
|
||||
assert_eq!(1, task.retries);
|
||||
|
||||
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
|
||||
worker.run_tasks_until_none().await.unwrap();
|
||||
|
||||
let task = worker.queue.find_task_by_id(id).await.unwrap();
|
||||
|
||||
assert_eq!(id, task.id);
|
||||
assert_eq!(FangTaskState::Retried, task.state);
|
||||
assert_eq!(2, task.retries);
|
||||
|
||||
tokio::time::sleep(core::time::Duration::from_secs(10)).await;
|
||||
worker.run_tasks_until_none().await.unwrap();
|
||||
|
||||
let task = test.find_task_by_id(id).await.unwrap();
|
||||
assert_eq!(id, task.id);
|
||||
assert_eq!(FangTaskState::Failed, task.state);
|
||||
assert_eq!("Failed".to_string(), task.error_message.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn saves_error_for_failed_task() {
|
||||
let pool = pool().await;
|
||||
|
@ -426,6 +499,7 @@ mod async_worker_tests {
|
|||
);
|
||||
test.transaction.rollback().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn executes_task_only_of_specific_type() {
|
||||
let pool = pool().await;
|
||||
|
|
|
@ -1 +1 @@
|
|||
SELECT * FROM fang_tasks WHERE task_type = $1 AND state = 'new' AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED
|
||||
SELECT * FROM fang_tasks WHERE task_type = $1 AND state in ('new', 'retried') AND $2 >= scheduled_at ORDER BY created_at ASC, scheduled_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED
|
||||
|
|
|
@ -1 +1 @@
|
|||
SELECT * FROM fang_tasks WHERE uniq_hash = $1 AND state = 'new' LIMIT 1
|
||||
SELECT * FROM fang_tasks WHERE uniq_hash = $1 AND state in ('new', 'retried') LIMIT 1
|
||||
|
|
1
src/asynk/queries/retry_task.sql
Normal file
1
src/asynk/queries/retry_task.sql
Normal file
|
@ -0,0 +1 @@
|
|||
UPDATE "fang_tasks" SET "state" = 'retried' , "error_message" = $1, "retries" = $2, scheduled_at = $3, "updated_at" = $4 WHERE id = $5 RETURNING *
|
|
@ -1,10 +1,12 @@
|
|||
pub mod error;
|
||||
pub mod fang_task_state;
|
||||
pub mod queue;
|
||||
pub mod runnable;
|
||||
pub mod schema;
|
||||
pub mod worker;
|
||||
pub mod worker_pool;
|
||||
|
||||
pub use fang_task_state::FangTaskState;
|
||||
pub use queue::*;
|
||||
pub use runnable::Runnable;
|
||||
pub use schema::*;
|
||||
|
|
9
src/blocking/fang_task_state.rs
Normal file
9
src/blocking/fang_task_state.rs
Normal file
|
@ -0,0 +1,9 @@
|
|||
#[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)]
|
||||
#[DieselTypePath = "crate::schema::sql_types::FangTaskState"]
|
||||
pub enum FangTaskState {
|
||||
New,
|
||||
InProgress,
|
||||
Failed,
|
||||
Finished,
|
||||
Retried,
|
||||
}
|
|
@ -1,9 +1,10 @@
|
|||
use crate::fang_task_state::FangTaskState;
|
||||
use crate::runnable::Runnable;
|
||||
use crate::schema::fang_tasks;
|
||||
use crate::schema::FangTaskState;
|
||||
use crate::CronError;
|
||||
use crate::Scheduled::*;
|
||||
use chrono::DateTime;
|
||||
use chrono::Duration;
|
||||
use chrono::Utc;
|
||||
use cron::Schedule;
|
||||
use diesel::pg::PgConnection;
|
||||
|
@ -43,6 +44,8 @@ pub struct Task {
|
|||
#[builder(setter(into))]
|
||||
pub uniq_hash: Option<String>,
|
||||
#[builder(setter(into))]
|
||||
pub retries: i32,
|
||||
#[builder(setter(into))]
|
||||
pub scheduled_at: DateTime<Utc>,
|
||||
#[builder(setter(into))]
|
||||
pub created_at: DateTime<Utc>,
|
||||
|
@ -102,9 +105,16 @@ pub trait Queueable {
|
|||
|
||||
fn update_task_state(&self, task: &Task, state: FangTaskState) -> Result<Task, QueueError>;
|
||||
|
||||
fn fail_task(&self, task: &Task, error: String) -> Result<Task, QueueError>;
|
||||
fn fail_task(&self, task: &Task, error: &str) -> Result<Task, QueueError>;
|
||||
|
||||
fn schedule_task(&self, task: &dyn Runnable) -> Result<Task, QueueError>;
|
||||
|
||||
fn schedule_retry(
|
||||
&self,
|
||||
task: &Task,
|
||||
backoff_in_seconds: u32,
|
||||
error: &str,
|
||||
) -> Result<Task, QueueError>;
|
||||
}
|
||||
|
||||
#[derive(Clone, TypedBuilder)]
|
||||
|
@ -173,7 +183,7 @@ impl Queueable for Queue {
|
|||
Self::update_task_state_query(&mut connection, task, state)
|
||||
}
|
||||
|
||||
fn fail_task(&self, task: &Task, error: String) -> Result<Task, QueueError> {
|
||||
fn fail_task(&self, task: &Task, error: &str) -> Result<Task, QueueError> {
|
||||
let mut connection = self.get_connection()?;
|
||||
|
||||
Self::fail_task_query(&mut connection, task, error)
|
||||
|
@ -184,6 +194,17 @@ impl Queueable for Queue {
|
|||
|
||||
Self::find_task_by_id_query(&mut connection, id)
|
||||
}
|
||||
|
||||
fn schedule_retry(
|
||||
&self,
|
||||
task: &Task,
|
||||
backoff_seconds: u32,
|
||||
error: &str,
|
||||
) -> Result<Task, QueueError> {
|
||||
let mut connection = self.get_connection()?;
|
||||
|
||||
Self::schedule_retry_query(&mut connection, task, backoff_seconds, error)
|
||||
}
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
|
@ -357,7 +378,7 @@ impl Queue {
|
|||
pub fn fail_task_query(
|
||||
connection: &mut PgConnection,
|
||||
task: &Task,
|
||||
error: String,
|
||||
error: &str,
|
||||
) -> Result<Task, QueueError> {
|
||||
Ok(diesel::update(task)
|
||||
.set((
|
||||
|
@ -392,7 +413,7 @@ impl Queue {
|
|||
.order(fang_tasks::scheduled_at.asc())
|
||||
.limit(1)
|
||||
.filter(fang_tasks::scheduled_at.le(Utc::now()))
|
||||
.filter(fang_tasks::state.eq(FangTaskState::New))
|
||||
.filter(fang_tasks::state.eq_any(vec![FangTaskState::New, FangTaskState::Retried]))
|
||||
.filter(fang_tasks::task_type.eq(task_type))
|
||||
.for_update()
|
||||
.skip_locked()
|
||||
|
@ -406,10 +427,32 @@ impl Queue {
|
|||
) -> Option<Task> {
|
||||
fang_tasks::table
|
||||
.filter(fang_tasks::uniq_hash.eq(uniq_hash))
|
||||
.filter(fang_tasks::state.eq(FangTaskState::New))
|
||||
.filter(fang_tasks::state.eq_any(vec![FangTaskState::New, FangTaskState::Retried]))
|
||||
.first::<Task>(connection)
|
||||
.ok()
|
||||
}
|
||||
|
||||
pub fn schedule_retry_query(
|
||||
connection: &mut PgConnection,
|
||||
task: &Task,
|
||||
backoff_seconds: u32,
|
||||
error: &str,
|
||||
) -> Result<Task, QueueError> {
|
||||
let now = Self::current_time();
|
||||
let scheduled_at = now + Duration::seconds(backoff_seconds as i64);
|
||||
|
||||
let task = diesel::update(task)
|
||||
.set((
|
||||
fang_tasks::state.eq(FangTaskState::Retried),
|
||||
fang_tasks::error_message.eq(error),
|
||||
fang_tasks::retries.eq(task.retries + 1),
|
||||
fang_tasks::scheduled_at.eq(scheduled_at),
|
||||
fang_tasks::updated_at.eq(now),
|
||||
))
|
||||
.get_result::<Task>(connection)?;
|
||||
|
||||
Ok(task)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -417,9 +460,9 @@ mod queue_tests {
|
|||
use super::Queue;
|
||||
use super::Queueable;
|
||||
use crate::chrono::SubsecRound;
|
||||
use crate::fang_task_state::FangTaskState;
|
||||
use crate::runnable::Runnable;
|
||||
use crate::runnable::COMMON_TYPE;
|
||||
use crate::schema::FangTaskState;
|
||||
use crate::typetag;
|
||||
use crate::FangError;
|
||||
use crate::Scheduled;
|
||||
|
@ -586,7 +629,7 @@ mod queue_tests {
|
|||
|
||||
let error = "Failed".to_string();
|
||||
|
||||
let found_task = Queue::fail_task_query(conn, &task, error.clone()).unwrap();
|
||||
let found_task = Queue::fail_task_query(conn, &task, &error).unwrap();
|
||||
|
||||
let metadata = found_task.metadata.as_object().unwrap();
|
||||
let number = metadata["number"].as_u64();
|
||||
|
|
|
@ -3,6 +3,7 @@ use crate::FangError;
|
|||
use crate::Scheduled;
|
||||
|
||||
pub const COMMON_TYPE: &str = "common";
|
||||
pub const RETRIES_NUMBER: i32 = 20;
|
||||
|
||||
#[typetag::serde(tag = "type")]
|
||||
pub trait Runnable {
|
||||
|
@ -19,4 +20,12 @@ pub trait Runnable {
|
|||
fn cron(&self) -> Option<Scheduled> {
|
||||
None
|
||||
}
|
||||
|
||||
fn max_retries(&self) -> i32 {
|
||||
RETRIES_NUMBER
|
||||
}
|
||||
|
||||
fn backoff(&self, attempt: u32) -> u32 {
|
||||
u32::pow(2, attempt)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,27 +1,14 @@
|
|||
// @generated automatically by Diesel CLI.
|
||||
|
||||
pub mod sql_types {
|
||||
#[derive(diesel::sql_types::SqlType)]
|
||||
#[diesel(postgres_type(name = "fang_task_state"))]
|
||||
pub struct FangTaskState;
|
||||
}
|
||||
|
||||
#[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)]
|
||||
#[DieselTypePath = "crate::blocking::schema::sql_types::FangTaskState"]
|
||||
pub enum FangTaskState {
|
||||
New,
|
||||
InProgress,
|
||||
Failed,
|
||||
Finished,
|
||||
}
|
||||
|
||||
table! {
|
||||
diesel::table! {
|
||||
use diesel::sql_types::*;
|
||||
use super::sql_types::FangTaskState;
|
||||
use diesel::sql_types::Jsonb;
|
||||
use diesel::sql_types::Nullable;
|
||||
use diesel::sql_types::Text;
|
||||
use diesel::sql_types::Timestamptz;
|
||||
use diesel::sql_types::Uuid;
|
||||
use diesel::sql_types::Varchar;
|
||||
use diesel::pg::sql_types::Bpchar;
|
||||
|
||||
fang_tasks (id) {
|
||||
id -> Uuid,
|
||||
|
@ -30,6 +17,7 @@ table! {
|
|||
state -> FangTaskState,
|
||||
task_type -> Varchar,
|
||||
uniq_hash -> Nullable<Bpchar>,
|
||||
retries -> Int4,
|
||||
scheduled_at -> Timestamptz,
|
||||
created_at -> Timestamptz,
|
||||
updated_at -> Timestamptz,
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
#![allow(clippy::borrowed_box)]
|
||||
#![allow(clippy::unnecessary_unwrap)]
|
||||
|
||||
use crate::fang_task_state::FangTaskState;
|
||||
use crate::queue::Queueable;
|
||||
use crate::queue::Task;
|
||||
use crate::runnable::Runnable;
|
||||
use crate::runnable::COMMON_TYPE;
|
||||
use crate::schema::FangTaskState;
|
||||
use crate::FangError;
|
||||
use crate::Scheduled::*;
|
||||
use crate::{RetentionMode, SleepParams};
|
||||
|
@ -30,8 +33,23 @@ where
|
|||
BQueue: Queueable + Clone + Sync + Send + 'static,
|
||||
{
|
||||
pub fn run(&self, task: Task) {
|
||||
let result = self.execute_task(task);
|
||||
self.finalize_task(result)
|
||||
let runnable: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
let result = runnable.run(&self.queue);
|
||||
|
||||
match result {
|
||||
Ok(_) => self.finalize_task(task, &result),
|
||||
Err(ref error) => {
|
||||
if task.retries < runnable.max_retries() {
|
||||
let backoff_seconds = runnable.backoff(task.retries as u32);
|
||||
|
||||
self.queue
|
||||
.schedule_retry(&task, backoff_seconds, &error.description)
|
||||
.expect("Failed to retry");
|
||||
} else {
|
||||
self.finalize_task(task, &result);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_tasks(&mut self) -> Result<(), FangError> {
|
||||
|
@ -102,39 +120,31 @@ where
|
|||
thread::sleep(self.sleep_params.sleep_period);
|
||||
}
|
||||
|
||||
fn execute_task(&self, task: Task) -> Result<Task, (Task, String)> {
|
||||
let actual_task: Box<dyn Runnable> = serde_json::from_value(task.metadata.clone()).unwrap();
|
||||
let task_result = actual_task.run(&self.queue);
|
||||
|
||||
match task_result {
|
||||
Ok(()) => Ok(task),
|
||||
Err(error) => Err((task, error.description)),
|
||||
}
|
||||
}
|
||||
|
||||
fn finalize_task(&self, result: Result<Task, (Task, String)>) {
|
||||
fn finalize_task(&self, task: Task, result: &Result<(), FangError>) {
|
||||
match self.retention_mode {
|
||||
RetentionMode::KeepAll => {
|
||||
match result {
|
||||
Ok(task) => self
|
||||
Ok(_) => self
|
||||
.queue
|
||||
.update_task_state(&task, FangTaskState::Finished)
|
||||
.unwrap(),
|
||||
Err((task, error)) => self.queue.fail_task(&task, error).unwrap(),
|
||||
Err(error) => self.queue.fail_task(&task, &error.description).unwrap(),
|
||||
};
|
||||
}
|
||||
|
||||
RetentionMode::RemoveAll => {
|
||||
match result {
|
||||
Ok(task) => self.queue.remove_task(task.id).unwrap(),
|
||||
Err((task, _error)) => self.queue.remove_task(task.id).unwrap(),
|
||||
Ok(_) => self.queue.remove_task(task.id).unwrap(),
|
||||
Err(_error) => self.queue.remove_task(task.id).unwrap(),
|
||||
};
|
||||
}
|
||||
|
||||
RetentionMode::RemoveFinished => match result {
|
||||
Ok(task) => {
|
||||
Ok(_) => {
|
||||
self.queue.remove_task(task.id).unwrap();
|
||||
}
|
||||
Err((task, error)) => {
|
||||
self.queue.fail_task(&task, error).unwrap();
|
||||
Err(error) => {
|
||||
self.queue.fail_task(&task, &error.description).unwrap();
|
||||
}
|
||||
},
|
||||
}
|
||||
|
@ -146,9 +156,9 @@ mod worker_tests {
|
|||
use super::RetentionMode;
|
||||
use super::Runnable;
|
||||
use super::Worker;
|
||||
use crate::fang_task_state::FangTaskState;
|
||||
use crate::queue::Queue;
|
||||
use crate::queue::Queueable;
|
||||
use crate::schema::FangTaskState;
|
||||
use crate::typetag;
|
||||
use crate::FangError;
|
||||
use chrono::Utc;
|
||||
|
@ -187,11 +197,39 @@ mod worker_tests {
|
|||
})
|
||||
}
|
||||
|
||||
fn max_retries(&self) -> i32 {
|
||||
0
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"F_task".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct RetryTask {
|
||||
pub number: u16,
|
||||
}
|
||||
|
||||
#[typetag::serde]
|
||||
impl Runnable for RetryTask {
|
||||
fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> {
|
||||
let message = format!("Saving Pepe. Attempt {}", self.number);
|
||||
|
||||
Err(FangError {
|
||||
description: message,
|
||||
})
|
||||
}
|
||||
|
||||
fn max_retries(&self) -> i32 {
|
||||
2
|
||||
}
|
||||
|
||||
fn task_type(&self) -> String {
|
||||
"Retry_task".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct TaskType1 {}
|
||||
|
||||
|
@ -322,4 +360,53 @@ mod worker_tests {
|
|||
|
||||
Queue::remove_tasks_of_type_query(&mut pooled_connection, "F_task").unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn retries_task() {
|
||||
let task = RetryTask { number: 10 };
|
||||
|
||||
let pool = Queue::connection_pool(5);
|
||||
|
||||
let queue = Queue::builder().connection_pool(pool).build();
|
||||
|
||||
let mut worker = Worker::<Queue>::builder()
|
||||
.queue(queue)
|
||||
.retention_mode(RetentionMode::KeepAll)
|
||||
.task_type(task.task_type())
|
||||
.build();
|
||||
|
||||
let mut pooled_connection = worker.queue.connection_pool.get().unwrap();
|
||||
|
||||
let task = Queue::insert_query(&mut pooled_connection, &task, Utc::now()).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::New, task.state);
|
||||
|
||||
worker.run(task.clone());
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||
|
||||
let found_task = Queue::find_task_by_id_query(&mut pooled_connection, task.id).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::Retried, found_task.state);
|
||||
assert_eq!(1, found_task.retries);
|
||||
|
||||
worker.run_tasks_until_none().unwrap();
|
||||
|
||||
std::thread::sleep(std::time::Duration::from_millis(14000));
|
||||
|
||||
worker.run_tasks_until_none().unwrap();
|
||||
|
||||
let found_task = Queue::find_task_by_id_query(&mut pooled_connection, task.id).unwrap();
|
||||
|
||||
assert_eq!(FangTaskState::Failed, found_task.state);
|
||||
assert_eq!(2, found_task.retries);
|
||||
|
||||
assert_eq!(
|
||||
"Saving Pepe. Attempt 10".to_string(),
|
||||
found_task.error_message.unwrap()
|
||||
);
|
||||
|
||||
Queue::remove_tasks_of_type_query(&mut pooled_connection, "Retry_task").unwrap();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,6 @@ pub struct FangError {
|
|||
pub description: String,
|
||||
}
|
||||
|
||||
#[macro_use]
|
||||
#[cfg(feature = "blocking")]
|
||||
extern crate diesel;
|
||||
|
||||
|
|
Loading…
Reference in a new issue