From 8d0a23e2f9ebc9d81107c51442f79f46ed690cea Mon Sep 17 00:00:00 2001 From: Pmarquez <48651252+pxp9@users.noreply.github.com> Date: Sat, 23 Jul 2022 14:24:22 +0000 Subject: [PATCH] Async Worker (#44) * stating with async worker * async sleep solve and update state test * uncomenting line * retention mode and sleep params in lib.rs * fixing code style * trying to solve * dont like changes * Add AsyncQueueable trait (#45) * add trait * update runnable trait * fix test Co-authored-by: Ayrat Badykov --- Cargo.toml | 1 + src/asynk/async_queue.rs | 356 +++++++++++++++--------- src/asynk/async_runnable.rs | 6 +- src/asynk/async_worker.rs | 109 ++++++++ src/asynk/mod.rs | 2 +- src/asynk/queries/update_task_state.sql | 2 +- src/blocking/executor.rs | 44 +-- src/blocking/worker_pool.rs | 5 +- src/lib.rs | 36 +++ 9 files changed, 383 insertions(+), 178 deletions(-) create mode 100644 src/asynk/async_worker.rs diff --git a/Cargo.toml b/Cargo.toml index 0f2b123..fb739d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ rust-version = "1.62" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] +default = ["blocking", "asynk"] blocking = ["diesel", "diesel-derive-enum", "dotenv"] asynk = ["bb8-postgres", "postgres-types", "tokio", "async-trait", "typed-builder"] diff --git a/src/asynk/async_queue.rs b/src/asynk/async_queue.rs index e404272..628ffd8 100644 --- a/src/asynk/async_queue.rs +++ b/src/asynk/async_queue.rs @@ -1,4 +1,4 @@ -use crate::asynk::AsyncRunnable; +use async_trait::async_trait; use bb8_postgres::bb8::Pool; use bb8_postgres::bb8::RunError; use bb8_postgres::tokio_postgres::row::Row; @@ -90,6 +90,7 @@ pub struct NewPeriodicTask { #[builder(setter(into))] pub period_in_seconds: i32, } + #[derive(Debug, Error)] pub enum AsyncQueueError { #[error(transparent)] @@ -100,6 +101,36 @@ pub enum AsyncQueueError { ResultError { expected: u64, found: u64 }, } +#[async_trait] +pub trait AsyncQueueable { + async fn fetch_and_touch_task( + &mut self, + task_type: &Option, + ) -> Result, AsyncQueueError>; + + async fn insert_task( + &mut self, + task: serde_json::Value, + task_type: &str, + ) -> Result; + + async fn remove_all_tasks(&mut self) -> Result; + + async fn remove_task(&mut self, task: Task) -> Result; + + async fn remove_tasks_type(&mut self, task_type: &str) -> Result; + + async fn update_task_state( + &mut self, + task: Task, + state: FangTaskState, + ) -> Result; + + async fn fail_task(&mut self, task: Task, error_message: &str) + -> Result; +} + +#[derive(Debug, Clone)] pub struct AsyncQueue where Tls: MakeTlsConnect + Clone + Send + Sync + 'static, @@ -117,80 +148,11 @@ where >::TlsConnect: Send, <>::TlsConnect as TlsConnect>::Future: Send, { - pub fn new(pool: Pool>) -> Self { - AsyncQueue { pool } - } + pub async fn connect(uri: impl ToString, tls: Tls) -> Result { + let manager = PostgresConnectionManager::new_from_stringlike(uri, tls)?; + let pool = Pool::builder().build(manager).await?; - pub async fn fetch_and_touch_task( - &mut self, - task_type: &Option, - ) -> Result { - let mut connection = self.pool.get().await?; - let mut transaction = connection.transaction().await?; - - let task = Self::fetch_and_touch_task_query(&mut transaction, task_type).await?; - - transaction.commit().await?; - - Ok(task) - } - - pub async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result { - let mut connection = self.pool.get().await?; - let mut transaction = connection.transaction().await?; - - let task = Self::insert_task_query(&mut transaction, task).await?; - - transaction.commit().await?; - - Ok(task) - } - - pub async fn remove_all_tasks(&mut self) -> Result { - let mut connection = self.pool.get().await?; - let mut transaction = connection.transaction().await?; - - let result = Self::remove_all_tasks_query(&mut transaction).await?; - - transaction.commit().await?; - - Ok(result) - } - - pub async fn remove_task(&mut self, task: &Task) -> Result { - let mut connection = self.pool.get().await?; - let mut transaction = connection.transaction().await?; - - let result = Self::remove_task_query(&mut transaction, task).await?; - - transaction.commit().await?; - - Ok(result) - } - - pub async fn remove_tasks_type(&mut self, task_type: &str) -> Result { - let mut connection = self.pool.get().await?; - let mut transaction = connection.transaction().await?; - - let result = Self::remove_tasks_type_query(&mut transaction, task_type).await?; - - transaction.commit().await?; - - Ok(result) - } - - pub async fn fail_task( - &mut self, - task: Task, - error_message: &str, - ) -> Result { - let mut connection = self.pool.get().await?; - let mut transaction = connection.transaction().await?; - - let task = Self::fail_task_query(&mut transaction, task, error_message).await?; - transaction.commit().await?; - - Ok(task) + Ok(Self { pool }) } pub async fn remove_all_tasks_query( @@ -201,7 +163,7 @@ where pub async fn remove_task_query( transaction: &mut Transaction<'_>, - task: &Task, + task: Task, ) -> Result { Self::execute_query(transaction, REMOVE_TASK_QUERY, &[&task.id], Some(1)).await } @@ -238,19 +200,25 @@ where pub async fn fetch_and_touch_task_query( transaction: &mut Transaction<'_>, task_type: &Option, - ) -> Result { + ) -> Result, AsyncQueueError> { let task_type = match task_type { Some(passed_task_type) => passed_task_type, None => DEFAULT_TASK_TYPE, }; - let mut task = Self::get_task_type_query(transaction, task_type).await?; - - Self::update_task_state_query(transaction, &task, FangTaskState::InProgress).await?; - - task.state = FangTaskState::InProgress; - - Ok(task) + let task = match Self::get_task_type_query(transaction, task_type).await { + Ok(some_task) => Some(some_task), + Err(_) => None, + }; + let result_task = if let Some(some_task) = task { + Some( + Self::update_task_state_query(transaction, some_task, FangTaskState::InProgress) + .await?, + ) + } else { + None + }; + Ok(result_task) } pub async fn get_task_type_query( @@ -268,27 +236,23 @@ where pub async fn update_task_state_query( transaction: &mut Transaction<'_>, - task: &Task, + task: Task, state: FangTaskState, - ) -> Result { + ) -> Result { let updated_at = Utc::now(); - Self::execute_query( - transaction, - UPDATE_TASK_STATE_QUERY, - &[&state, &updated_at, &task.id], - Some(1), - ) - .await + let row: Row = transaction + .query_one(UPDATE_TASK_STATE_QUERY, &[&state, &updated_at, &task.id]) + .await?; + let task = Self::row_to_task(row); + Ok(task) } pub async fn insert_task_query( transaction: &mut Transaction<'_>, - task: &dyn AsyncRunnable, + metadata: serde_json::Value, + task_type: &str, ) -> Result { - let metadata = serde_json::to_value(task).unwrap(); - let task_type = task.task_type(); - let row: Row = transaction .query_one(INSERT_TASK_QUERY, &[&metadata, &task_type]) .await?; @@ -339,16 +303,117 @@ where } } +#[async_trait] +impl AsyncQueueable for AsyncQueue +where + Tls: MakeTlsConnect + Clone + Send + Sync + 'static, + >::Stream: Send + Sync, + >::TlsConnect: Send, + <>::TlsConnect as TlsConnect>::Future: Send, +{ + async fn fetch_and_touch_task( + &mut self, + task_type: &Option, + ) -> Result, AsyncQueueError> { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let task = Self::fetch_and_touch_task_query(&mut transaction, task_type).await?; + + transaction.commit().await?; + + Ok(task) + } + + async fn insert_task( + &mut self, + metadata: serde_json::Value, + task_type: &str, + ) -> Result { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let task = Self::insert_task_query(&mut transaction, metadata, task_type).await?; + + transaction.commit().await?; + + Ok(task) + } + + async fn remove_all_tasks(&mut self) -> Result { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let result = Self::remove_all_tasks_query(&mut transaction).await?; + + transaction.commit().await?; + + Ok(result) + } + + async fn remove_task(&mut self, task: Task) -> Result { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let result = Self::remove_task_query(&mut transaction, task).await?; + + transaction.commit().await?; + + Ok(result) + } + + async fn remove_tasks_type(&mut self, task_type: &str) -> Result { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let result = Self::remove_tasks_type_query(&mut transaction, task_type).await?; + + transaction.commit().await?; + + Ok(result) + } + + async fn update_task_state( + &mut self, + task: Task, + state: FangTaskState, + ) -> Result { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let task = Self::update_task_state_query(&mut transaction, task, state).await?; + transaction.commit().await?; + + Ok(task) + } + + async fn fail_task( + &mut self, + task: Task, + error_message: &str, + ) -> Result { + let mut connection = self.pool.get().await?; + let mut transaction = connection.transaction().await?; + + let task = Self::fail_task_query(&mut transaction, task, error_message).await?; + transaction.commit().await?; + + Ok(task) + } +} + #[cfg(test)] mod async_queue_tests { use super::AsyncQueue; + use super::AsyncQueueable; use super::FangTaskState; + use super::Task; use crate::asynk::AsyncRunnable; use crate::asynk::Error; use async_trait::async_trait; use bb8_postgres::bb8::Pool; - use bb8_postgres::tokio_postgres::Client; use bb8_postgres::tokio_postgres::NoTls; + use bb8_postgres::tokio_postgres::Transaction; use bb8_postgres::PostgresConnectionManager; use serde::{Deserialize, Serialize}; @@ -358,9 +423,9 @@ mod async_queue_tests { } #[typetag::serde] - #[async_trait] + #[async_trait(?Send)] impl AsyncRunnable for AsyncTask { - async fn run(&self, _connection: &Client) -> Result<(), Error> { + async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> { Ok(()) } } @@ -371,10 +436,14 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let mut transaction = connection.transaction().await.unwrap(); + let task = AsyncTask { number: 1 }; + let metadata = serde_json::to_value(&task as &dyn AsyncRunnable).unwrap(); + let task = - AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) + AsyncQueue::::insert_task_query(&mut transaction, metadata, &task.task_type()) .await .unwrap(); + let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -383,29 +452,62 @@ mod async_queue_tests { assert_eq!(Some("AsyncTask"), type_task); transaction.rollback().await.unwrap(); } + + #[tokio::test] + async fn update_task_state_test() { + let pool = pool().await; + let mut connection = pool.get().await.unwrap(); + let mut transaction = connection.transaction().await.unwrap(); + + let task = insert_task(&mut transaction, &AsyncTask { number: 1 }).await; + + let metadata = task.metadata.as_object().unwrap(); + let number = metadata["number"].as_u64(); + let type_task = metadata["type"].as_str(); + let id = task.id; + + assert_eq!(Some(1), number); + assert_eq!(Some("AsyncTask"), type_task); + + let finished_task = AsyncQueue::::update_task_state_query( + &mut transaction, + task, + FangTaskState::Finished, + ) + .await + .unwrap(); + + assert_eq!(id, finished_task.id); + assert_eq!(FangTaskState::Finished, finished_task.state); + + transaction.rollback().await.unwrap(); + } + #[tokio::test] async fn failed_task_query_test() { let pool = pool().await; let mut connection = pool.get().await.unwrap(); let mut transaction = connection.transaction().await.unwrap(); - let task = - AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) - .await - .unwrap(); + let task = insert_task(&mut transaction, &AsyncTask { number: 1 }).await; + let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); let id = task.id; + assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); + let failed_task = AsyncQueue::::fail_task_query(&mut transaction, task, "Some error") .await .unwrap(); + assert_eq!(id, failed_task.id); assert_eq!(Some("Some error"), failed_task.error_message.as_deref()); assert_eq!(FangTaskState::Failed, failed_task.state); + transaction.rollback().await.unwrap(); } @@ -415,10 +517,7 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let mut transaction = connection.transaction().await.unwrap(); - let task = - AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) - .await - .unwrap(); + let task = insert_task(&mut transaction, &AsyncTask { number: 1 }).await; let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); @@ -427,10 +526,8 @@ mod async_queue_tests { assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); - let task = - AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 2 }) - .await - .unwrap(); + let task = insert_task(&mut transaction, &AsyncTask { number: 2 }).await; + let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -452,10 +549,8 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let mut transaction = connection.transaction().await.unwrap(); - let task = - AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) - .await - .unwrap(); + let task = insert_task(&mut transaction, &AsyncTask { number: 1 }).await; + let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -463,10 +558,8 @@ mod async_queue_tests { assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); - let task = - AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 2 }) - .await - .unwrap(); + let task = insert_task(&mut transaction, &AsyncTask { number: 2 }).await; + let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -476,7 +569,9 @@ mod async_queue_tests { let task = AsyncQueue::::fetch_and_touch_task_query(&mut transaction, &None) .await + .unwrap() .unwrap(); + let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -486,6 +581,7 @@ mod async_queue_tests { let task = AsyncQueue::::fetch_and_touch_task_query(&mut transaction, &None) .await + .unwrap() .unwrap(); let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); @@ -503,10 +599,8 @@ mod async_queue_tests { let mut connection = pool.get().await.unwrap(); let mut transaction = connection.transaction().await.unwrap(); - let task = - AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 1 }) - .await - .unwrap(); + let task = insert_task(&mut transaction, &AsyncTask { number: 1 }).await; + let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -514,10 +608,8 @@ mod async_queue_tests { assert_eq!(Some(1), number); assert_eq!(Some("AsyncTask"), type_task); - let task = - AsyncQueue::::insert_task_query(&mut transaction, &AsyncTask { number: 2 }) - .await - .unwrap(); + let task = insert_task(&mut transaction, &AsyncTask { number: 2 }).await; + let metadata = task.metadata.as_object().unwrap(); let number = metadata["number"].as_u64(); let type_task = metadata["type"].as_str(); @@ -547,4 +639,12 @@ mod async_queue_tests { Pool::builder().build(pg_mgr).await.unwrap() } + + async fn insert_task(transaction: &mut Transaction<'_>, task: &dyn AsyncRunnable) -> Task { + let metadata = serde_json::to_value(task).unwrap(); + + AsyncQueue::::insert_task_query(transaction, metadata, &task.task_type()) + .await + .unwrap() + } } diff --git a/src/asynk/async_runnable.rs b/src/asynk/async_runnable.rs index a933a8d..048b3bd 100644 --- a/src/asynk/async_runnable.rs +++ b/src/asynk/async_runnable.rs @@ -1,5 +1,5 @@ +use crate::asynk::async_queue::AsyncQueueable; use async_trait::async_trait; -use bb8_postgres::tokio_postgres::Client; const COMMON_TYPE: &str = "common"; @@ -9,9 +9,9 @@ pub struct Error { } #[typetag::serde(tag = "type")] -#[async_trait] +#[async_trait(?Send)] pub trait AsyncRunnable { - async fn run(&self, client: &Client) -> Result<(), Error>; + async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), Error>; fn task_type(&self) -> String { COMMON_TYPE.to_string() diff --git a/src/asynk/async_worker.rs b/src/asynk/async_worker.rs new file mode 100644 index 0000000..fa4c1cc --- /dev/null +++ b/src/asynk/async_worker.rs @@ -0,0 +1,109 @@ +use crate::asynk::async_queue::AsyncQueue; +use crate::asynk::async_queue::AsyncQueueable; +use crate::asynk::async_queue::FangTaskState; +use crate::asynk::async_queue::Task; +use crate::asynk::async_runnable::AsyncRunnable; +use crate::asynk::Error; +use crate::{RetentionMode, SleepParams}; +use bb8_postgres::tokio_postgres::tls::MakeTlsConnect; +use bb8_postgres::tokio_postgres::tls::TlsConnect; +use bb8_postgres::tokio_postgres::Socket; +use log::error; +use std::time::Duration; +use typed_builder::TypedBuilder; + +#[derive(TypedBuilder, Debug)] +pub struct AsyncWorker +where + Tls: MakeTlsConnect + Clone + Send + Sync + 'static, + >::Stream: Send + Sync, + >::TlsConnect: Send, + <>::TlsConnect as TlsConnect>::Future: Send, +{ + #[builder(setter(into))] + pub queue: AsyncQueue, + #[builder(setter(into))] + pub task_type: Option, + #[builder(setter(into))] + pub sleep_params: SleepParams, + #[builder(setter(into))] + pub retention_mode: RetentionMode, +} +impl AsyncWorker +where + Tls: MakeTlsConnect + Clone + Send + Sync + 'static, + >::Stream: Send + Sync, + >::TlsConnect: Send, + <>::TlsConnect as TlsConnect>::Future: Send, +{ + pub async fn run(&mut self, task: Task) { + let result = self.execute_task(task).await; + self.finalize_task(result).await + } + async fn execute_task(&mut self, task: Task) -> Result { + let actual_task: Box = + serde_json::from_value(task.metadata.clone()).unwrap(); + + let task_result = actual_task.run(&mut self.queue).await; + match task_result { + Ok(()) => Ok(task), + Err(error) => Err((task, error.description)), + } + } + async fn finalize_task(&mut self, result: Result) { + match self.retention_mode { + RetentionMode::KeepAll => { + match result { + Ok(task) => self + .queue + .update_task_state(task, FangTaskState::Finished) + .await + .unwrap(), + Err((task, error)) => self.queue.fail_task(task, &error).await.unwrap(), + }; + } + RetentionMode::RemoveAll => { + match result { + Ok(task) => self.queue.remove_task(task).await.unwrap(), + Err((task, _error)) => self.queue.remove_task(task).await.unwrap(), + }; + } + RetentionMode::RemoveFinished => match result { + Ok(task) => { + self.queue.remove_task(task).await.unwrap(); + } + Err((task, error)) => { + self.queue.fail_task(task, &error).await.unwrap(); + } + }, + } + } + pub async fn sleep(&mut self) { + self.sleep_params.maybe_increase_sleep_period(); + + tokio::time::sleep(Duration::from_secs(self.sleep_params.sleep_period)).await; + } + pub async fn run_tasks(&mut self) -> Result<(), Error> { + loop { + match self + .queue + .fetch_and_touch_task(&self.task_type.clone()) + .await + { + Ok(Some(task)) => { + self.sleep_params.maybe_reset_sleep_period(); + self.run(task).await; + } + Ok(None) => { + self.sleep().await; + } + + Err(error) => { + error!("Failed to fetch a task {:?}", error); + + self.sleep().await; + } + }; + } + } +} diff --git a/src/asynk/mod.rs b/src/asynk/mod.rs index 171d637..bf6e0cf 100644 --- a/src/asynk/mod.rs +++ b/src/asynk/mod.rs @@ -1,5 +1,5 @@ pub mod async_queue; pub mod async_runnable; - +pub mod async_worker; pub use async_runnable::AsyncRunnable; pub use async_runnable::Error; diff --git a/src/asynk/queries/update_task_state.sql b/src/asynk/queries/update_task_state.sql index 21b9e82..afca5c0 100644 --- a/src/asynk/queries/update_task_state.sql +++ b/src/asynk/queries/update_task_state.sql @@ -1 +1 @@ -UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 +UPDATE "fang_tasks" SET "state" = $1 , "updated_at" = $2 WHERE id = $3 RETURNING id , state , metadata , error_message , task_type , created_at , updated_at diff --git a/src/blocking/executor.rs b/src/blocking/executor.rs index 69f9bcf..353bedc 100644 --- a/src/blocking/executor.rs +++ b/src/blocking/executor.rs @@ -1,7 +1,8 @@ use crate::error::FangError; use crate::queue::Queue; +use crate::queue::Task; use crate::worker_pool::{SharedState, WorkerState}; -use crate::Task; +use crate::{RetentionMode, SleepParams}; use diesel::pg::PgConnection; use diesel::r2d2::{ConnectionManager, PooledConnection}; use log::error; @@ -15,47 +16,6 @@ pub struct Executor { pub retention_mode: RetentionMode, shared_state: Option, } - -#[derive(Clone)] -pub enum RetentionMode { - KeepAll, - RemoveAll, - RemoveFinished, -} - -#[derive(Clone)] -pub struct SleepParams { - pub sleep_period: u64, - pub max_sleep_period: u64, - pub min_sleep_period: u64, - pub sleep_step: u64, -} - -impl SleepParams { - pub fn maybe_reset_sleep_period(&mut self) { - if self.sleep_period != self.min_sleep_period { - self.sleep_period = self.min_sleep_period; - } - } - - pub fn maybe_increase_sleep_period(&mut self) { - if self.sleep_period < self.max_sleep_period { - self.sleep_period += self.sleep_step; - } - } -} - -impl Default for SleepParams { - fn default() -> Self { - SleepParams { - sleep_period: 5, - max_sleep_period: 15, - min_sleep_period: 5, - sleep_step: 5, - } - } -} - #[derive(Debug)] pub struct Error { pub description: String, diff --git a/src/blocking/worker_pool.rs b/src/blocking/worker_pool.rs index 97f0ec5..651c787 100644 --- a/src/blocking/worker_pool.rs +++ b/src/blocking/worker_pool.rs @@ -2,9 +2,8 @@ use crate::diesel::r2d2; use crate::diesel::PgConnection; use crate::error::FangError; use crate::executor::Executor; -use crate::executor::RetentionMode; -use crate::executor::SleepParams; use crate::queue::Queue; +use crate::{RetentionMode, SleepParams}; use log::error; use log::info; use std::collections::HashMap; @@ -223,11 +222,11 @@ mod task_pool_tests { use super::WorkerParams; use super::WorkerPool; use crate::executor::Error; - use crate::executor::RetentionMode; use crate::executor::Runnable; use crate::queue::Queue; use crate::schema::{fang_tasks, FangTaskState}; use crate::typetag; + use crate::RetentionMode; use crate::Task; use diesel::pg::PgConnection; use diesel::prelude::*; diff --git a/src/lib.rs b/src/lib.rs index cf12c64..d4fc169 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,40 @@ #![allow(clippy::extra_unused_lifetimes)] +#[derive(Clone, Debug)] +pub enum RetentionMode { + KeepAll, + RemoveAll, + RemoveFinished, +} +#[derive(Clone, Debug)] +pub struct SleepParams { + pub sleep_period: u64, + pub max_sleep_period: u64, + pub min_sleep_period: u64, + pub sleep_step: u64, +} +impl SleepParams { + pub fn maybe_reset_sleep_period(&mut self) { + if self.sleep_period != self.min_sleep_period { + self.sleep_period = self.min_sleep_period; + } + } + + pub fn maybe_increase_sleep_period(&mut self) { + if self.sleep_period < self.max_sleep_period { + self.sleep_period += self.sleep_step; + } + } +} +impl Default for SleepParams { + fn default() -> Self { + SleepParams { + sleep_period: 5, + max_sleep_period: 15, + min_sleep_period: 5, + sleep_step: 5, + } + } +} #[macro_use] #[cfg(feature = "blocking")]