Compare commits

..

2 commits

Author SHA1 Message Date
Rafael Caricio 912127857f
Update readme 2023-03-22 11:00:14 +01:00
Rafael Caricio f198d3983a
Update logo with turbofish 2023-03-22 10:52:45 +01:00
7 changed files with 51 additions and 143 deletions

View file

@ -1,6 +1,5 @@
<p align="center"><img src="logo.png" alt="Backie" width="400"></p>
---
Async persistent background task processing for Rust applications with Tokio. Queue asynchronous tasks
to be processed by workers. It's designed to be easy to use and horizontally scalable. It uses Postgres as
a storage backend and can also be extended to support other types of storage.

View file

@ -1,33 +1,20 @@
use async_trait::async_trait;
use backie::{BackgroundTask, CurrentTask, QueueConfig, RetentionMode};
use backie::{BackgroundTask, CurrentTask};
use backie::{PgTaskStore, Queue, WorkerPool};
use diesel_async::pg::AsyncPgConnection;
use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use std::time::Duration;
#[derive(Clone, Debug)]
pub struct MyApplicationContext {
app_name: String,
notify_finished: Arc<Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
}
impl MyApplicationContext {
pub fn new(app_name: &str, notify_finished: tokio::sync::oneshot::Sender<()>) -> Self {
pub fn new(app_name: &str) -> Self {
Self {
app_name: app_name.to_string(),
notify_finished: Arc::new(Mutex::new(Some(notify_finished))),
}
}
pub async fn notify_finished(&self) {
let mut lock = self.notify_finished.lock().await;
if let Some(sender) = lock.take() {
sender.send(()).unwrap();
}
}
}
@ -50,6 +37,12 @@ impl BackgroundTask for MyTask {
type Error = anyhow::Error;
async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), Self::Error> {
// let new_task = MyTask::new(self.number + 1);
// queue
// .insert_task(&new_task)
// .await
// .unwrap();
log::info!(
"[{}] Hello from {}! the current number is {}",
task.id(),
@ -81,6 +74,17 @@ impl BackgroundTask for MyFailingTask {
type Error = anyhow::Error;
async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), Self::Error> {
// let new_task = MyFailingTask::new(self.number + 1);
// queue
// .insert_task(&new_task)
// .await
// .unwrap();
// task.id();
// task.keep_alive().await?;
// task.previous_error();
// task.retry_count();
log::info!("[{}] the current number is {}", task.id(), self.number);
tokio::time::sleep(Duration::from_secs(3)).await;
@ -89,124 +93,58 @@ impl BackgroundTask for MyFailingTask {
}
}
#[derive(Serialize, Deserialize)]
struct EmptyTask {
pub idx: u64,
}
#[async_trait]
impl BackgroundTask for EmptyTask {
const TASK_NAME: &'static str = "empty_task";
const QUEUE: &'static str = "loaded_queue";
type AppData = MyApplicationContext;
type Error = anyhow::Error;
async fn run(&self, _task: CurrentTask, _ctx: Self::AppData) -> Result<(), Self::Error> {
Ok(())
}
}
#[derive(Serialize, Deserialize)]
struct FinalTask;
#[async_trait]
impl BackgroundTask for FinalTask {
const TASK_NAME: &'static str = "final_task";
const QUEUE: &'static str = "loaded_queue";
type AppData = MyApplicationContext;
type Error = anyhow::Error;
async fn run(&self, _task: CurrentTask, ctx: Self::AppData) -> Result<(), Self::Error> {
ctx.notify_finished().await;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
async fn main() {
env_logger::init();
let connection_url = "postgres://postgres:password@localhost/backie";
log::info!("Starting...");
let max_pool_size: u32 = 3;
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(connection_url);
let pool = Pool::builder()
.max_size(300)
.max_size(max_pool_size)
.min_idle(Some(1))
.build(manager)
.await
.unwrap();
log::info!("Pool created ...");
let task_store = PgTaskStore::new(pool);
let (tx, mut rx) = tokio::sync::watch::channel(false);
let (notify_finished, wait_done) = tokio::sync::oneshot::channel();
// Some global application context I want to pass to my background tasks
let my_app_context = MyApplicationContext::new("Backie Example App", notify_finished);
// queue.enqueue(task1).await.unwrap();
// queue.enqueue(task2).await.unwrap();
// queue.enqueue(task3).await.unwrap();
// Store all task to join them later
let mut tasks = JoinSet::new();
for i in 0..1_000 {
tasks.spawn({
let pool = pool.clone();
async move {
let mut connection = pool.get().await.unwrap();
let task = EmptyTask { idx: i };
task.enqueue(&mut connection).await.unwrap();
}
});
}
while let Some(result) = tasks.join_next().await {
let _ = result?;
}
(FinalTask {})
.enqueue(&mut pool.get().await.unwrap())
.await
.unwrap();
log::info!("Tasks created ...");
let started = Instant::now();
let my_app_context = MyApplicationContext::new("Backie Example App");
// Register the task types I want to use and start the worker pool
let join_handle = WorkerPool::new(PgTaskStore::new(pool.clone()), move || my_app_context.clone())
.register_task_type::<MyTask>()
.register_task_type::<MyFailingTask>()
.register_task_type::<EmptyTask>()
.register_task_type::<FinalTask>()
.configure_queue("default".into())
.configure_queue(
QueueConfig::new("loaded_queue")
.pull_interval(Duration::from_millis(100))
.retention_mode(RetentionMode::RemoveDone)
.num_workers(300),
)
.start(async move {
let _ = rx.changed().await;
})
.await
.unwrap();
let join_handle =
WorkerPool::new(task_store.clone(), move || my_app_context.clone())
.register_task_type::<MyTask>()
.register_task_type::<MyFailingTask>()
.configure_queue("default".into())
.start(async move {
let _ = rx.changed().await;
})
.await
.unwrap();
log::info!("Workers started ...");
wait_done.await.unwrap();
let elapsed = started.elapsed();
println!("Ran 50k jobs in {} seconds", elapsed.as_secs());
let task1 = MyTask::new(0);
let task2 = MyTask::new(20_000);
let task3 = MyFailingTask::new(50_000);
let queue = Queue::new(task_store);
queue.enqueue(task1).await.unwrap();
queue.enqueue(task2).await.unwrap();
queue.enqueue(task3).await.unwrap();
log::info!("Tasks created ...");
// Wait for Ctrl+C
// let _ = tokio::signal::ctrl_c().await;
let _ = tokio::signal::ctrl_c().await;
log::info!("Stopping ...");
tx.send(true).unwrap();
join_handle.await.unwrap();
log::info!("Workers Stopped!");
Ok(())
}

BIN
logo.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 56 KiB

After

Width:  |  Height:  |  Size: 131 KiB

View file

@ -7,10 +7,7 @@ use chrono::Duration;
use chrono::Utc;
use diesel::prelude::*;
use diesel::ExpressionMethods;
use diesel::query_builder::{Query, QueryFragment, QueryId};
use diesel::sql_types::{HasSqlType, SingleValue};
use diesel_async::return_futures::GetResult;
use diesel_async::{pg::AsyncPgConnection, AsyncConnection, RunQueryDsl};
use diesel_async::{pg::AsyncPgConnection, RunQueryDsl};
impl Task {
pub(crate) async fn remove(

View file

@ -25,7 +25,7 @@ where
{
// TODO: Add option to specify the timeout of a task
self.task_store
.create_task(NewTask::with_timeout(background_task, Duration::from_secs(10))?)
.create_task(NewTask::new(background_task, Duration::from_secs(10))?)
.await?;
Ok(())
}

View file

@ -1,7 +1,5 @@
use crate::errors::AsyncQueueError;
use crate::task::{NewTask, Task, TaskId, TaskState};
use crate::BackgroundTask;
use async_trait::async_trait;
use diesel::result::Error::QueryBuilderError;
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection;
@ -19,23 +17,6 @@ impl PgTaskStore {
}
}
/// A trait that is used to enqueue tasks for the PostgreSQL backend.
#[async_trait::async_trait]
pub trait PgQueueTask {
async fn enqueue(self, connection: &mut AsyncPgConnection) -> Result<(), AsyncQueueError>;
}
impl<T> PgQueueTask for T
where
T: BackgroundTask,
{
async fn enqueue(self, connection: &mut AsyncPgConnection) -> Result<(), AsyncQueueError> {
let new_task = NewTask::new::<T>(self)?;
Task::insert(connection, new_task).await?;
Ok(())
}
}
#[async_trait::async_trait]
impl TaskStore for PgTaskStore {
async fn pull_next_task(

View file

@ -117,9 +117,9 @@ pub struct NewTask {
}
impl NewTask {
pub(crate) fn with_timeout<T>(background_task: T, timeout: Duration) -> Result<Self, serde_json::Error>
where
T: BackgroundTask,
pub(crate) fn new<T>(background_task: T, timeout: Duration) -> Result<Self, serde_json::Error>
where
T: BackgroundTask,
{
let max_retries = background_task.max_retries();
let uniq_hash = background_task.uniq();
@ -134,13 +134,6 @@ impl NewTask {
max_retries,
})
}
pub(crate) fn new<T>(background_task: T) -> Result<Self, serde_json::Error>
where
T: BackgroundTask,
{
Self::with_timeout(background_task, Duration::from_secs(120))
}
}
#[cfg(test)]