Compare commits
2 commits
new-equeue
...
main
Author | SHA1 | Date | |
---|---|---|---|
912127857f | |||
f198d3983a |
7 changed files with 51 additions and 143 deletions
|
@ -1,6 +1,5 @@
|
||||||
<p align="center"><img src="logo.png" alt="Backie" width="400"></p>
|
<p align="center"><img src="logo.png" alt="Backie" width="400"></p>
|
||||||
|
|
||||||
---
|
|
||||||
Async persistent background task processing for Rust applications with Tokio. Queue asynchronous tasks
|
Async persistent background task processing for Rust applications with Tokio. Queue asynchronous tasks
|
||||||
to be processed by workers. It's designed to be easy to use and horizontally scalable. It uses Postgres as
|
to be processed by workers. It's designed to be easy to use and horizontally scalable. It uses Postgres as
|
||||||
a storage backend and can also be extended to support other types of storage.
|
a storage backend and can also be extended to support other types of storage.
|
||||||
|
|
|
@ -1,33 +1,20 @@
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use backie::{BackgroundTask, CurrentTask, QueueConfig, RetentionMode};
|
use backie::{BackgroundTask, CurrentTask};
|
||||||
use backie::{PgTaskStore, Queue, WorkerPool};
|
use backie::{PgTaskStore, Queue, WorkerPool};
|
||||||
use diesel_async::pg::AsyncPgConnection;
|
use diesel_async::pg::AsyncPgConnection;
|
||||||
use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager};
|
use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::BTreeMap;
|
use std::time::Duration;
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
use tokio::sync::Mutex;
|
|
||||||
use tokio::task::JoinSet;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct MyApplicationContext {
|
pub struct MyApplicationContext {
|
||||||
app_name: String,
|
app_name: String,
|
||||||
notify_finished: Arc<Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MyApplicationContext {
|
impl MyApplicationContext {
|
||||||
pub fn new(app_name: &str, notify_finished: tokio::sync::oneshot::Sender<()>) -> Self {
|
pub fn new(app_name: &str) -> Self {
|
||||||
Self {
|
Self {
|
||||||
app_name: app_name.to_string(),
|
app_name: app_name.to_string(),
|
||||||
notify_finished: Arc::new(Mutex::new(Some(notify_finished))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn notify_finished(&self) {
|
|
||||||
let mut lock = self.notify_finished.lock().await;
|
|
||||||
if let Some(sender) = lock.take() {
|
|
||||||
sender.send(()).unwrap();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -50,6 +37,12 @@ impl BackgroundTask for MyTask {
|
||||||
type Error = anyhow::Error;
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), Self::Error> {
|
async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), Self::Error> {
|
||||||
|
// let new_task = MyTask::new(self.number + 1);
|
||||||
|
// queue
|
||||||
|
// .insert_task(&new_task)
|
||||||
|
// .await
|
||||||
|
// .unwrap();
|
||||||
|
|
||||||
log::info!(
|
log::info!(
|
||||||
"[{}] Hello from {}! the current number is {}",
|
"[{}] Hello from {}! the current number is {}",
|
||||||
task.id(),
|
task.id(),
|
||||||
|
@ -81,6 +74,17 @@ impl BackgroundTask for MyFailingTask {
|
||||||
type Error = anyhow::Error;
|
type Error = anyhow::Error;
|
||||||
|
|
||||||
async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), Self::Error> {
|
async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), Self::Error> {
|
||||||
|
// let new_task = MyFailingTask::new(self.number + 1);
|
||||||
|
// queue
|
||||||
|
// .insert_task(&new_task)
|
||||||
|
// .await
|
||||||
|
// .unwrap();
|
||||||
|
|
||||||
|
// task.id();
|
||||||
|
// task.keep_alive().await?;
|
||||||
|
// task.previous_error();
|
||||||
|
// task.retry_count();
|
||||||
|
|
||||||
log::info!("[{}] the current number is {}", task.id(), self.number);
|
log::info!("[{}] the current number is {}", task.id(), self.number);
|
||||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
|
@ -89,106 +93,36 @@ impl BackgroundTask for MyFailingTask {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct EmptyTask {
|
|
||||||
pub idx: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl BackgroundTask for EmptyTask {
|
|
||||||
const TASK_NAME: &'static str = "empty_task";
|
|
||||||
const QUEUE: &'static str = "loaded_queue";
|
|
||||||
type AppData = MyApplicationContext;
|
|
||||||
type Error = anyhow::Error;
|
|
||||||
|
|
||||||
async fn run(&self, _task: CurrentTask, _ctx: Self::AppData) -> Result<(), Self::Error> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
|
||||||
struct FinalTask;
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl BackgroundTask for FinalTask {
|
|
||||||
const TASK_NAME: &'static str = "final_task";
|
|
||||||
const QUEUE: &'static str = "loaded_queue";
|
|
||||||
type AppData = MyApplicationContext;
|
|
||||||
type Error = anyhow::Error;
|
|
||||||
|
|
||||||
async fn run(&self, _task: CurrentTask, ctx: Self::AppData) -> Result<(), Self::Error> {
|
|
||||||
ctx.notify_finished().await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> anyhow::Result<()> {
|
async fn main() {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let connection_url = "postgres://postgres:password@localhost/backie";
|
let connection_url = "postgres://postgres:password@localhost/backie";
|
||||||
|
|
||||||
log::info!("Starting...");
|
log::info!("Starting...");
|
||||||
|
let max_pool_size: u32 = 3;
|
||||||
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(connection_url);
|
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(connection_url);
|
||||||
let pool = Pool::builder()
|
let pool = Pool::builder()
|
||||||
.max_size(300)
|
.max_size(max_pool_size)
|
||||||
.min_idle(Some(1))
|
.min_idle(Some(1))
|
||||||
.build(manager)
|
.build(manager)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
log::info!("Pool created ...");
|
log::info!("Pool created ...");
|
||||||
|
|
||||||
|
let task_store = PgTaskStore::new(pool);
|
||||||
|
|
||||||
let (tx, mut rx) = tokio::sync::watch::channel(false);
|
let (tx, mut rx) = tokio::sync::watch::channel(false);
|
||||||
|
|
||||||
let (notify_finished, wait_done) = tokio::sync::oneshot::channel();
|
|
||||||
|
|
||||||
// Some global application context I want to pass to my background tasks
|
// Some global application context I want to pass to my background tasks
|
||||||
let my_app_context = MyApplicationContext::new("Backie Example App", notify_finished);
|
let my_app_context = MyApplicationContext::new("Backie Example App");
|
||||||
|
|
||||||
// queue.enqueue(task1).await.unwrap();
|
|
||||||
// queue.enqueue(task2).await.unwrap();
|
|
||||||
// queue.enqueue(task3).await.unwrap();
|
|
||||||
|
|
||||||
// Store all task to join them later
|
|
||||||
let mut tasks = JoinSet::new();
|
|
||||||
|
|
||||||
for i in 0..1_000 {
|
|
||||||
tasks.spawn({
|
|
||||||
let pool = pool.clone();
|
|
||||||
async move {
|
|
||||||
let mut connection = pool.get().await.unwrap();
|
|
||||||
|
|
||||||
let task = EmptyTask { idx: i };
|
|
||||||
task.enqueue(&mut connection).await.unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
while let Some(result) = tasks.join_next().await {
|
|
||||||
let _ = result?;
|
|
||||||
}
|
|
||||||
|
|
||||||
(FinalTask {})
|
|
||||||
.enqueue(&mut pool.get().await.unwrap())
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
log::info!("Tasks created ...");
|
|
||||||
|
|
||||||
let started = Instant::now();
|
|
||||||
|
|
||||||
// Register the task types I want to use and start the worker pool
|
// Register the task types I want to use and start the worker pool
|
||||||
let join_handle = WorkerPool::new(PgTaskStore::new(pool.clone()), move || my_app_context.clone())
|
let join_handle =
|
||||||
|
WorkerPool::new(task_store.clone(), move || my_app_context.clone())
|
||||||
.register_task_type::<MyTask>()
|
.register_task_type::<MyTask>()
|
||||||
.register_task_type::<MyFailingTask>()
|
.register_task_type::<MyFailingTask>()
|
||||||
.register_task_type::<EmptyTask>()
|
|
||||||
.register_task_type::<FinalTask>()
|
|
||||||
.configure_queue("default".into())
|
.configure_queue("default".into())
|
||||||
.configure_queue(
|
|
||||||
QueueConfig::new("loaded_queue")
|
|
||||||
.pull_interval(Duration::from_millis(100))
|
|
||||||
.retention_mode(RetentionMode::RemoveDone)
|
|
||||||
.num_workers(300),
|
|
||||||
)
|
|
||||||
.start(async move {
|
.start(async move {
|
||||||
let _ = rx.changed().await;
|
let _ = rx.changed().await;
|
||||||
})
|
})
|
||||||
|
@ -197,16 +131,20 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
log::info!("Workers started ...");
|
log::info!("Workers started ...");
|
||||||
|
|
||||||
wait_done.await.unwrap();
|
let task1 = MyTask::new(0);
|
||||||
let elapsed = started.elapsed();
|
let task2 = MyTask::new(20_000);
|
||||||
println!("Ran 50k jobs in {} seconds", elapsed.as_secs());
|
let task3 = MyFailingTask::new(50_000);
|
||||||
|
|
||||||
|
let queue = Queue::new(task_store);
|
||||||
|
queue.enqueue(task1).await.unwrap();
|
||||||
|
queue.enqueue(task2).await.unwrap();
|
||||||
|
queue.enqueue(task3).await.unwrap();
|
||||||
|
log::info!("Tasks created ...");
|
||||||
|
|
||||||
// Wait for Ctrl+C
|
// Wait for Ctrl+C
|
||||||
// let _ = tokio::signal::ctrl_c().await;
|
let _ = tokio::signal::ctrl_c().await;
|
||||||
log::info!("Stopping ...");
|
log::info!("Stopping ...");
|
||||||
tx.send(true).unwrap();
|
tx.send(true).unwrap();
|
||||||
join_handle.await.unwrap();
|
join_handle.await.unwrap();
|
||||||
log::info!("Workers Stopped!");
|
log::info!("Workers Stopped!");
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
BIN
logo.png
BIN
logo.png
Binary file not shown.
Before Width: | Height: | Size: 56 KiB After Width: | Height: | Size: 131 KiB |
|
@ -7,10 +7,7 @@ use chrono::Duration;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
use diesel::ExpressionMethods;
|
use diesel::ExpressionMethods;
|
||||||
use diesel::query_builder::{Query, QueryFragment, QueryId};
|
use diesel_async::{pg::AsyncPgConnection, RunQueryDsl};
|
||||||
use diesel::sql_types::{HasSqlType, SingleValue};
|
|
||||||
use diesel_async::return_futures::GetResult;
|
|
||||||
use diesel_async::{pg::AsyncPgConnection, AsyncConnection, RunQueryDsl};
|
|
||||||
|
|
||||||
impl Task {
|
impl Task {
|
||||||
pub(crate) async fn remove(
|
pub(crate) async fn remove(
|
||||||
|
|
|
@ -25,7 +25,7 @@ where
|
||||||
{
|
{
|
||||||
// TODO: Add option to specify the timeout of a task
|
// TODO: Add option to specify the timeout of a task
|
||||||
self.task_store
|
self.task_store
|
||||||
.create_task(NewTask::with_timeout(background_task, Duration::from_secs(10))?)
|
.create_task(NewTask::new(background_task, Duration::from_secs(10))?)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
19
src/store.rs
19
src/store.rs
|
@ -1,7 +1,5 @@
|
||||||
use crate::errors::AsyncQueueError;
|
use crate::errors::AsyncQueueError;
|
||||||
use crate::task::{NewTask, Task, TaskId, TaskState};
|
use crate::task::{NewTask, Task, TaskId, TaskState};
|
||||||
use crate::BackgroundTask;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use diesel::result::Error::QueryBuilderError;
|
use diesel::result::Error::QueryBuilderError;
|
||||||
use diesel_async::scoped_futures::ScopedFutureExt;
|
use diesel_async::scoped_futures::ScopedFutureExt;
|
||||||
use diesel_async::AsyncConnection;
|
use diesel_async::AsyncConnection;
|
||||||
|
@ -19,23 +17,6 @@ impl PgTaskStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A trait that is used to enqueue tasks for the PostgreSQL backend.
|
|
||||||
#[async_trait::async_trait]
|
|
||||||
pub trait PgQueueTask {
|
|
||||||
async fn enqueue(self, connection: &mut AsyncPgConnection) -> Result<(), AsyncQueueError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> PgQueueTask for T
|
|
||||||
where
|
|
||||||
T: BackgroundTask,
|
|
||||||
{
|
|
||||||
async fn enqueue(self, connection: &mut AsyncPgConnection) -> Result<(), AsyncQueueError> {
|
|
||||||
let new_task = NewTask::new::<T>(self)?;
|
|
||||||
Task::insert(connection, new_task).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl TaskStore for PgTaskStore {
|
impl TaskStore for PgTaskStore {
|
||||||
async fn pull_next_task(
|
async fn pull_next_task(
|
||||||
|
|
|
@ -117,7 +117,7 @@ pub struct NewTask {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NewTask {
|
impl NewTask {
|
||||||
pub(crate) fn with_timeout<T>(background_task: T, timeout: Duration) -> Result<Self, serde_json::Error>
|
pub(crate) fn new<T>(background_task: T, timeout: Duration) -> Result<Self, serde_json::Error>
|
||||||
where
|
where
|
||||||
T: BackgroundTask,
|
T: BackgroundTask,
|
||||||
{
|
{
|
||||||
|
@ -134,13 +134,6 @@ impl NewTask {
|
||||||
max_retries,
|
max_retries,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn new<T>(background_task: T) -> Result<Self, serde_json::Error>
|
|
||||||
where
|
|
||||||
T: BackgroundTask,
|
|
||||||
{
|
|
||||||
Self::with_timeout(background_task, Duration::from_secs(120))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
Loading…
Reference in a new issue