Compare commits

...

12 commits

10 changed files with 209 additions and 144 deletions

1
.gitignore vendored
View file

@ -2,3 +2,4 @@
Cargo.lock
docs/content/docs/CHANGELOG.md
docs/content/docs/README.md
.DS_Store

View file

@ -1,6 +1,6 @@
[package]
name = "backie"
version = "0.2.0"
version = "0.6.0"
authors = [
"Rafael Caricio <rafael@caricio.com>",
]
@ -17,7 +17,6 @@ chrono = "0.4"
log = "0.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
anyhow = "1"
thiserror = "1"
uuid = { version = "1.1", features = ["v4", "serde"] }
async-trait = "0.1"

View file

@ -1,4 +1,4 @@
# Backie 🚲
<p align="center"><img src="logo.png" alt="Backie" width="400"></p>
Async persistent background task processing for Rust applications with Tokio. Queue asynchronous tasks
to be processed by workers. It's designed to be easy to use and horizontally scalable. It uses Postgres as
@ -31,6 +31,25 @@ Here are some of the Backie's key features:
- Task timeout: Tasks are retried if they are not completed in time
- Scheduling of tasks: Tasks can be scheduled to be executed at a specific time
## Task execution protocol
The following diagram shows the protocol used to execute tasks:
```mermaid
stateDiagram-v2
[*] --> Ready
Ready --> Running: Task is picked up by a worker
Running --> Done: Task is finished
Running --> Failed: Task failed
Failed --> Ready: Task is retried
Failed --> [*]: Task is not retried anymore, max retries reached
Done --> [*]
```
When a task goes from `Running` to `Failed` it is retried. The number of retries is controlled by the
[`BackgroundTask::MAX_RETRIES`] attribute. The default implementation uses `3` retries.
## Safety
This crate uses `#![forbid(unsafe_code)]` to ensure everything is implemented in 100% safe Rust.
@ -53,7 +72,6 @@ If you are not already using, you will also want to include the following depend
```toml
[dependencies]
async-trait = "0.1"
anyhow = "1"
serde = { version = "1.0", features = ["derive"] }
diesel = { version = "2.0", features = ["postgres", "serde_json", "chrono", "uuid"] }
diesel-async = { version = "0.2", features = ["postgres", "bb8"] }
@ -75,6 +93,9 @@ the whole application. This attribute is critical for reconstructing the task ba
The [`BackgroundTask::AppData`] can be used to argument the task with your application specific contextual information.
This is useful for example to pass a database connection pool to the task or other application configuration.
The [`BackgroundTask::Error`] is the error type that will be returned by the [`BackgroundTask::run`] method. You can
use this to define your own error type for your tasks.
The [`BackgroundTask::run`] method is where you define the behaviour of your background task execution. This method
will be called by the task queue workers.
@ -92,8 +113,9 @@ pub struct MyTask {
impl BackgroundTask for MyTask {
const TASK_NAME: &'static str = "my_task_unique_name";
type AppData = ();
type Error = ();
async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), anyhow::Error> {
async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Self::Error> {
// Do something
Ok(())
}

View file

@ -4,7 +4,6 @@ use backie::{PgTaskStore, Queue, WorkerPool};
use diesel_async::pg::AsyncPgConnection;
use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone, Debug)]
@ -35,8 +34,9 @@ impl MyTask {
impl BackgroundTask for MyTask {
const TASK_NAME: &'static str = "my_task";
type AppData = MyApplicationContext;
type Error = anyhow::Error;
async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), anyhow::Error> {
async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), Self::Error> {
// let new_task = MyTask::new(self.number + 1);
// queue
// .insert_task(&new_task)
@ -71,8 +71,9 @@ impl MyFailingTask {
impl BackgroundTask for MyFailingTask {
const TASK_NAME: &'static str = "my_failing_task";
type AppData = MyApplicationContext;
type Error = anyhow::Error;
async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), anyhow::Error> {
async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), Self::Error> {
// let new_task = MyFailingTask::new(self.number + 1);
// queue
// .insert_task(&new_task)
@ -117,8 +118,8 @@ async fn main() {
let my_app_context = MyApplicationContext::new("Backie Example App");
// Register the task types I want to use and start the worker pool
let (join_handle, _queue) =
WorkerPool::new(task_store.clone(), move |_| my_app_context.clone())
let join_handle =
WorkerPool::new(task_store.clone(), move || my_app_context.clone())
.register_task_type::<MyTask>()
.register_task_type::<MyFailingTask>()
.configure_queue("default".into())
@ -134,7 +135,7 @@ async fn main() {
let task2 = MyTask::new(20_000);
let task3 = MyFailingTask::new(50_000);
let queue = Queue::new(Arc::new(task_store)); // or use the `queue` instance returned by the worker pool
let queue = Queue::new(task_store);
queue.enqueue(task1).await.unwrap();
queue.enqueue(task2).await.unwrap();
queue.enqueue(task3).await.unwrap();

BIN
logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 131 KiB

View file

@ -26,7 +26,7 @@ impl Default for RetentionMode {
pub use queue::Queue;
pub use runnable::BackgroundTask;
pub use store::{PgTaskStore, TaskStore};
pub use task::{CurrentTask, Task, TaskId, TaskState};
pub use task::{CurrentTask, NewTask, Task, TaskId, TaskState};
pub use worker::Worker;
pub use worker_pool::{QueueConfig, WorkerPool};

View file

@ -2,22 +2,20 @@ use crate::errors::BackieError;
use crate::runnable::BackgroundTask;
use crate::store::TaskStore;
use crate::task::NewTask;
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone)]
pub struct Queue<S>
where
S: TaskStore + Clone,
S: TaskStore,
{
task_store: Arc<S>,
task_store: S,
}
impl<S> Queue<S>
where
S: TaskStore + Clone,
S: TaskStore,
{
pub fn new(task_store: Arc<S>) -> Self {
pub fn new(task_store: S) -> Self {
Queue { task_store }
}
@ -25,9 +23,21 @@ where
where
BT: BackgroundTask,
{
// TODO: Add option to specify the timeout of a task
self.task_store
.create_task(NewTask::new(background_task, Duration::from_secs(10))?)
.await?;
Ok(())
}
}
impl<S> Clone for Queue<S>
where
S: TaskStore + Clone,
{
fn clone(&self) -> Self {
Self {
task_store: self.task_store.clone(),
}
}
}

View file

@ -1,6 +1,7 @@
use crate::task::{CurrentTask, TaskHash};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, ser::Serialize};
use std::fmt::Debug;
/// The [`BackgroundTask`] trait is used to define the behaviour of a task. You must implement this
/// trait for all tasks you want to execute.
@ -29,8 +30,9 @@ use serde::{de::DeserializeOwned, ser::Serialize};
/// impl BackgroundTask for MyTask {
/// const TASK_NAME: &'static str = "my_task_unique_name";
/// type AppData = ();
/// type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
///
/// async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), anyhow::Error> {
/// async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Self::Error> {
/// // Do something
/// Ok(())
/// }
@ -51,14 +53,17 @@ pub trait BackgroundTask: Serialize + DeserializeOwned + Sync + Send + 'static {
/// Number of retries for tasks.
///
/// By default, it is set to 5.
const MAX_RETRIES: i32 = 5;
/// By default, it is set to 3.
const MAX_RETRIES: i32 = 3;
/// The application data provided to this task at runtime.
type AppData: Clone + Send + 'static;
/// An application custom error type.
type Error: Debug + Send + 'static;
/// Execute the task. This method should define its logic
async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), anyhow::Error>;
async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Self::Error>;
/// If set to true, no new tasks with the same metadata will be inserted
/// By default it is set to false.

View file

@ -30,7 +30,7 @@ pub enum TaskExecError {
TaskDeserializationFailed(#[from] serde_json::Error),
#[error("Task execution failed: {0}")]
ExecutionFailed(#[from] anyhow::Error),
ExecutionFailed(String),
#[error("Task panicked with: {0}")]
Panicked(String),
@ -46,8 +46,10 @@ where
{
Box::pin(async move {
let background_task: BT = serde_json::from_value(payload)?;
background_task.run(task_info, app_context).await?;
Ok(())
match background_task.run(task_info, app_context).await {
Ok(_) => Ok(()),
Err(err) => Err(TaskExecError::ExecutionFailed(format!("{:?}", err))),
}
})
}
@ -57,7 +59,7 @@ where
AppData: Clone + Send + 'static,
S: TaskStore + Clone,
{
store: Arc<S>,
store: S,
queue_name: String,
@ -79,7 +81,7 @@ where
S: TaskStore + Clone,
{
pub(crate) fn new(
store: Arc<S>,
store: S,
queue_name: String,
retention_mode: RetentionMode,
pull_interval: Duration,
@ -250,8 +252,9 @@ mod async_worker_tests {
impl BackgroundTask for WorkerAsyncTask {
const TASK_NAME: &'static str = "WorkerAsyncTask";
type AppData = ();
type Error = ();
async fn run(&self, _: CurrentTask, _: Self::AppData) -> Result<(), anyhow::Error> {
async fn run(&self, _: CurrentTask, _: Self::AppData) -> Result<(), ()> {
Ok(())
}
}
@ -265,8 +268,9 @@ mod async_worker_tests {
impl BackgroundTask for WorkerAsyncTaskSchedule {
const TASK_NAME: &'static str = "WorkerAsyncTaskSchedule";
type AppData = ();
type Error = ();
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), anyhow::Error> {
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), ()> {
Ok(())
}
@ -284,11 +288,12 @@ mod async_worker_tests {
impl BackgroundTask for AsyncFailedTask {
const TASK_NAME: &'static str = "AsyncFailedTask";
type AppData = ();
type Error = TaskError;
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), anyhow::Error> {
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), TaskError> {
let message = format!("number {} is wrong :(", self.number);
Err(TaskError::Custom(message).into())
Err(TaskError::Custom(message))
}
fn max_retries(&self) -> i32 {
@ -303,9 +308,10 @@ mod async_worker_tests {
impl BackgroundTask for AsyncRetryTask {
const TASK_NAME: &'static str = "AsyncRetryTask";
type AppData = ();
type Error = TaskError;
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), anyhow::Error> {
Err(TaskError::SomethingWrong.into())
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), Self::Error> {
Err(TaskError::SomethingWrong)
}
}
@ -316,8 +322,9 @@ mod async_worker_tests {
impl BackgroundTask for AsyncTaskType1 {
const TASK_NAME: &'static str = "AsyncTaskType1";
type AppData = ();
type Error = ();
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), anyhow::Error> {
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), Self::Error> {
Ok(())
}
}
@ -329,8 +336,9 @@ mod async_worker_tests {
impl BackgroundTask for AsyncTaskType2 {
const TASK_NAME: &'static str = "AsyncTaskType2";
type AppData = ();
type Error = ();
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), anyhow::Error> {
async fn run(&self, _task: CurrentTask, _data: Self::AppData) -> Result<(), ()> {
Ok(())
}
}

View file

@ -1,5 +1,4 @@
use crate::errors::BackieError;
use crate::queue::Queue;
use crate::runnable::BackgroundTask;
use crate::store::TaskStore;
use crate::worker::{runnable, ExecuteTaskFn};
@ -19,10 +18,7 @@ where
S: TaskStore + Clone,
{
/// Storage of tasks.
task_store: Arc<S>,
/// Queue used to spawn tasks.
queue: Queue<S>,
task_store: S,
/// Make possible to load the application data.
///
@ -49,17 +45,10 @@ where
/// Create a new worker pool.
pub fn new<A>(task_store: S, application_data_fn: A) -> Self
where
A: Fn(Queue<S>) -> AppData + Send + Sync + 'static,
A: Fn() -> AppData + Send + Sync + 'static,
{
let queue_store = Arc::new(task_store);
let queue = Queue::new(queue_store.clone());
let application_data_fn = {
let queue = queue.clone();
move || application_data_fn(queue.clone())
};
Self {
task_store: queue_store,
queue,
task_store,
application_data_fn: Arc::new(application_data_fn),
task_registry: BTreeMap::new(),
queue_tasks: BTreeMap::new(),
@ -86,10 +75,7 @@ where
self
}
pub async fn start<F>(
self,
graceful_shutdown: F,
) -> Result<(JoinHandle<()>, Queue<S>), BackieError>
pub async fn start<F>(self, graceful_shutdown: F) -> Result<JoinHandle<()>, BackieError>
where
F: Future<Output = ()> + Send + 'static,
{
@ -128,28 +114,25 @@ where
}
}
Ok((
tokio::spawn(async move {
graceful_shutdown.await;
if let Err(err) = tx.send(()) {
log::warn!("Failed to send shutdown signal to worker pool: {}", err);
Ok(tokio::spawn(async move {
graceful_shutdown.await;
if let Err(err) = tx.send(()) {
log::warn!("Failed to send shutdown signal to worker pool: {}", err);
} else {
// Wait for all workers to finish processing
let results = join_all(worker_handles)
.await
.into_iter()
.filter(Result::is_err)
.map(Result::unwrap_err)
.collect::<Vec<_>>();
if !results.is_empty() {
log::error!("Worker pool stopped with errors: {:?}", results);
} else {
// Wait for all workers to finish processing
let results = join_all(worker_handles)
.await
.into_iter()
.filter(Result::is_err)
.map(Result::unwrap_err)
.collect::<Vec<_>>();
if !results.is_empty() {
log::error!("Worker pool stopped with errors: {:?}", results);
} else {
log::info!("Worker pool stopped gracefully");
}
log::info!("Worker pool stopped gracefully");
}
}),
self.queue,
))
}
}))
}
}
@ -233,6 +216,7 @@ mod tests {
use crate::store::test_store::MemoryTaskStore;
use crate::store::PgTaskStore;
use crate::task::CurrentTask;
use crate::Queue;
use async_trait::async_trait;
use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager};
use diesel_async::AsyncPgConnection;
@ -241,7 +225,7 @@ mod tests {
use tokio::sync::Mutex;
#[derive(Clone, Debug)]
struct ApplicationContext {
pub struct ApplicationContext {
app_name: String,
}
@ -262,17 +246,50 @@ mod tests {
person: String,
}
/// This tests that one can customize the task parameters for the application.
#[async_trait]
impl BackgroundTask for GreetingTask {
const TASK_NAME: &'static str = "my_task";
trait MyAppTask {
const TASK_NAME: &'static str;
const QUEUE: &'static str = "default";
async fn run(
&self,
task_info: CurrentTask,
app_context: ApplicationContext,
) -> Result<(), ()>;
}
#[async_trait]
impl<T> BackgroundTask for T
where
T: MyAppTask + serde::de::DeserializeOwned + serde::ser::Serialize + Sync + Send + 'static,
{
const TASK_NAME: &'static str = T::TASK_NAME;
const QUEUE: &'static str = T::QUEUE;
type AppData = ApplicationContext;
type Error = ();
async fn run(
&self,
task_info: CurrentTask,
app_context: Self::AppData,
) -> Result<(), anyhow::Error> {
) -> Result<(), Self::Error> {
self.run(task_info, app_context).await
}
}
#[async_trait]
impl MyAppTask for GreetingTask {
const TASK_NAME: &'static str = "my_task";
async fn run(
&self,
task_info: CurrentTask,
app_context: ApplicationContext,
) -> Result<(), ()> {
println!(
"[{}] Hello {}! I'm {}.",
task_info.id(),
@ -293,12 +310,9 @@ mod tests {
const QUEUE: &'static str = "other_queue";
type AppData = ApplicationContext;
type Error = ();
async fn run(
&self,
task: CurrentTask,
context: Self::AppData,
) -> Result<(), anyhow::Error> {
async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), Self::Error> {
println!(
"[{}] Other task with {}!",
task.id(),
@ -312,7 +326,7 @@ mod tests {
async fn validate_all_registered_tasks_queues_are_configured() {
let my_app_context = ApplicationContext::new();
let result = WorkerPool::new(memory_store().await, move |_| my_app_context.clone())
let result = WorkerPool::new(memory_store(), move || my_app_context.clone())
.register_task_type::<GreetingTask>()
.start(futures::future::ready(()))
.await;
@ -330,14 +344,16 @@ mod tests {
async fn test_worker_pool_with_task() {
let my_app_context = ApplicationContext::new();
let (join_handle, queue) =
WorkerPool::new(memory_store().await, move |_| my_app_context.clone())
.register_task_type::<GreetingTask>()
.configure_queue(GreetingTask::QUEUE.into())
.start(futures::future::ready(()))
.await
.unwrap();
let task_store = memory_store();
let join_handle = WorkerPool::new(task_store.clone(), move || my_app_context.clone())
.register_task_type::<GreetingTask>()
.configure_queue(<GreetingTask as MyAppTask>::QUEUE.into())
.start(futures::future::ready(()))
.await
.unwrap();
let queue = Queue::new(task_store);
queue
.enqueue(GreetingTask {
person: "Rafael".to_string(),
@ -352,16 +368,17 @@ mod tests {
async fn test_worker_pool_with_multiple_task_types() {
let my_app_context = ApplicationContext::new();
let (join_handle, queue) =
WorkerPool::new(memory_store().await, move |_| my_app_context.clone())
.register_task_type::<GreetingTask>()
.register_task_type::<OtherTask>()
.configure_queue("default".into())
.configure_queue("other_queue".into())
.start(futures::future::ready(()))
.await
.unwrap();
let task_store = memory_store();
let join_handle = WorkerPool::new(task_store.clone(), move || my_app_context.clone())
.register_task_type::<GreetingTask>()
.register_task_type::<OtherTask>()
.configure_queue("default".into())
.configure_queue("other_queue".into())
.start(futures::future::ready(()))
.await
.unwrap();
let queue = Queue::new(task_store.clone());
queue
.enqueue(GreetingTask {
person: "Rafael".to_string(),
@ -392,11 +409,9 @@ mod tests {
type AppData = NotifyFinishedContext;
async fn run(
&self,
task: CurrentTask,
context: Self::AppData,
) -> Result<(), anyhow::Error> {
type Error = ();
async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), ()> {
// Notify the test that the task ran
match context.notify_finished.lock().await.take() {
None => println!("Cannot notify, already done that!"),
@ -415,17 +430,19 @@ mod tests {
notify_finished: Arc::new(Mutex::new(Some(tx))),
};
let (join_handle, queue) =
WorkerPool::new(memory_store().await, move |_| my_app_context.clone())
.register_task_type::<NotifyFinished>()
.configure_queue("default".into())
.start(async move {
rx.await.unwrap();
println!("Worker pool got notified to stop");
})
.await
.unwrap();
let memory_store = memory_store();
let join_handle = WorkerPool::new(memory_store.clone(), move || my_app_context.clone())
.register_task_type::<NotifyFinished>()
.configure_queue("default".into())
.start(async move {
rx.await.unwrap();
println!("Worker pool got notified to stop");
})
.await
.unwrap();
let queue = Queue::new(memory_store);
// Notifies the worker pool to stop after the task is executed
queue.enqueue(NotifyFinished).await.unwrap();
@ -456,11 +473,13 @@ mod tests {
type AppData = NotifyUnknownRanContext;
type Error = ();
async fn run(
&self,
task: CurrentTask,
context: Self::AppData,
) -> Result<(), anyhow::Error> {
) -> Result<(), Self::Error> {
// Notify the test that the task ran
match context.should_stop.lock().await.take() {
None => println!("Cannot notify, already done that!"),
@ -482,11 +501,9 @@ mod tests {
type AppData = NotifyUnknownRanContext;
async fn run(
&self,
task: CurrentTask,
context: Self::AppData,
) -> Result<(), anyhow::Error> {
type Error = ();
async fn run(&self, task: CurrentTask, context: Self::AppData) -> Result<(), ()> {
println!("[{}] Unknown task ran!", task.id());
context.unknown_task_ran.store(true, Ordering::Relaxed);
Ok(())
@ -500,11 +517,11 @@ mod tests {
unknown_task_ran: Arc::new(AtomicBool::new(false)),
};
let task_store = memory_store().await;
let task_store = memory_store();
let (join_handle, queue) = WorkerPool::new(task_store, {
let join_handle = WorkerPool::new(task_store.clone(), {
let my_app_context = my_app_context.clone();
move |_| my_app_context.clone()
move || my_app_context.clone()
})
.register_task_type::<NotifyStopDuringRun>()
.configure_queue("default".into())
@ -515,6 +532,7 @@ mod tests {
.await
.unwrap();
let queue = Queue::new(task_store);
// Enqueue a task that is not registered
queue.enqueue(UnknownTask).await.unwrap();
@ -538,21 +556,18 @@ mod tests {
impl BackgroundTask for BrokenTask {
const TASK_NAME: &'static str = "panic_me";
type AppData = ();
type Error = ();
async fn run(
&self,
_task: CurrentTask,
_context: Self::AppData,
) -> Result<(), anyhow::Error> {
async fn run(&self, _task: CurrentTask, _context: Self::AppData) -> Result<(), ()> {
panic!("Oh no!");
}
}
let (notify_stop_worker_pool, should_stop) = tokio::sync::oneshot::channel();
let task_store = memory_store().await;
let task_store = memory_store();
let (worker_pool_finished, queue) = WorkerPool::new(task_store.clone(), |_| ())
let worker_pool_finished = WorkerPool::new(task_store.clone(), || ())
.register_task_type::<BrokenTask>()
.configure_queue("default".into())
.start(async move {
@ -561,6 +576,7 @@ mod tests {
.await
.unwrap();
let queue = Queue::new(task_store.clone());
// Enqueue a task that will panic
queue.enqueue(BrokenTask).await.unwrap();
@ -610,11 +626,13 @@ mod tests {
type AppData = PlayerContext;
type Error = ();
async fn run(
&self,
_task: CurrentTask,
context: Self::AppData,
) -> Result<(), anyhow::Error> {
) -> Result<(), Self::Error> {
loop {
let msg = context.ping_rx.lock().await.recv().await.unwrap();
match msg {
@ -644,11 +662,11 @@ mod tests {
ping_rx: Arc::new(Mutex::new(ping_rx)),
};
let task_store = memory_store().await;
let task_store = memory_store();
let (worker_pool_finished, queue) = WorkerPool::new(task_store, {
let worker_pool_finished = WorkerPool::new(task_store.clone(), {
let player_context = player_context.clone();
move |_| player_context.clone()
move || player_context.clone()
})
.register_task_type::<KeepAliveTask>()
.configure_queue("default".into())
@ -659,6 +677,7 @@ mod tests {
.await
.unwrap();
let queue = Queue::new(task_store);
queue.enqueue(KeepAliveTask).await.unwrap();
// Make sure task is running
@ -684,7 +703,7 @@ mod tests {
ping_tx.send(PingPongGame::StopThisNow).await.unwrap();
}
async fn memory_store() -> MemoryTaskStore {
fn memory_store() -> MemoryTaskStore {
MemoryTaskStore::default()
}
@ -693,15 +712,15 @@ mod tests {
async fn test_worker_pool_with_pg_store() {
let my_app_context = ApplicationContext::new();
let (join_handle, _queue) =
WorkerPool::new(pg_task_store().await, move |_| my_app_context.clone())
.register_task_type::<GreetingTask>()
.configure_queue(
QueueConfig::new(GreetingTask::QUEUE).retention_mode(RetentionMode::RemoveDone),
)
.start(futures::future::ready(()))
.await
.unwrap();
let join_handle = WorkerPool::new(pg_task_store().await, move || my_app_context.clone())
.register_task_type::<GreetingTask>()
.configure_queue(
QueueConfig::new(<GreetingTask as MyAppTask>::QUEUE)
.retention_mode(RetentionMode::RemoveDone),
)
.start(futures::future::ready(()))
.await
.unwrap();
join_handle.await.unwrap();
}