From 98f0951d3378ad690ceb34b4290439f5cb855dcb Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Wed, 22 Mar 2023 10:51:33 +0100 Subject: [PATCH] Update example --- examples/simple_worker/src/main.rs | 154 ++++++++++++++++++++--------- 1 file changed, 108 insertions(+), 46 deletions(-) diff --git a/examples/simple_worker/src/main.rs b/examples/simple_worker/src/main.rs index 8ba1beb..d61b0cf 100644 --- a/examples/simple_worker/src/main.rs +++ b/examples/simple_worker/src/main.rs @@ -1,20 +1,33 @@ use async_trait::async_trait; -use backie::{BackgroundTask, CurrentTask}; +use backie::{BackgroundTask, CurrentTask, QueueConfig, RetentionMode}; use backie::{PgTaskStore, Queue, WorkerPool}; use diesel_async::pg::AsyncPgConnection; use diesel_async::pooled_connection::{bb8::Pool, AsyncDieselConnectionManager}; use serde::{Deserialize, Serialize}; -use std::time::Duration; +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::Mutex; +use tokio::task::JoinSet; #[derive(Clone, Debug)] pub struct MyApplicationContext { app_name: String, + notify_finished: Arc>>>, } impl MyApplicationContext { - pub fn new(app_name: &str) -> Self { + pub fn new(app_name: &str, notify_finished: tokio::sync::oneshot::Sender<()>) -> Self { Self { app_name: app_name.to_string(), + notify_finished: Arc::new(Mutex::new(Some(notify_finished))), + } + } + + pub async fn notify_finished(&self) { + let mut lock = self.notify_finished.lock().await; + if let Some(sender) = lock.take() { + sender.send(()).unwrap(); } } } @@ -37,12 +50,6 @@ impl BackgroundTask for MyTask { type Error = anyhow::Error; async fn run(&self, task: CurrentTask, ctx: Self::AppData) -> Result<(), Self::Error> { - // let new_task = MyTask::new(self.number + 1); - // queue - // .insert_task(&new_task) - // .await - // .unwrap(); - log::info!( "[{}] Hello from {}! the current number is {}", task.id(), @@ -74,17 +81,6 @@ impl BackgroundTask for MyFailingTask { type Error = anyhow::Error; async fn run(&self, task: CurrentTask, _ctx: Self::AppData) -> Result<(), Self::Error> { - // let new_task = MyFailingTask::new(self.number + 1); - // queue - // .insert_task(&new_task) - // .await - // .unwrap(); - - // task.id(); - // task.keep_alive().await?; - // task.previous_error(); - // task.retry_count(); - log::info!("[{}] the current number is {}", task.id(), self.number); tokio::time::sleep(Duration::from_secs(3)).await; @@ -93,58 +89,124 @@ impl BackgroundTask for MyFailingTask { } } +#[derive(Serialize, Deserialize)] +struct EmptyTask { + pub idx: u64, +} + +#[async_trait] +impl BackgroundTask for EmptyTask { + const TASK_NAME: &'static str = "empty_task"; + const QUEUE: &'static str = "loaded_queue"; + type AppData = MyApplicationContext; + type Error = anyhow::Error; + + async fn run(&self, _task: CurrentTask, _ctx: Self::AppData) -> Result<(), Self::Error> { + Ok(()) + } +} + +#[derive(Serialize, Deserialize)] +struct FinalTask; + +#[async_trait] +impl BackgroundTask for FinalTask { + const TASK_NAME: &'static str = "final_task"; + const QUEUE: &'static str = "loaded_queue"; + type AppData = MyApplicationContext; + type Error = anyhow::Error; + + async fn run(&self, _task: CurrentTask, ctx: Self::AppData) -> Result<(), Self::Error> { + ctx.notify_finished().await; + Ok(()) + } +} + #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { env_logger::init(); let connection_url = "postgres://postgres:password@localhost/backie"; log::info!("Starting..."); - let max_pool_size: u32 = 3; let manager = AsyncDieselConnectionManager::::new(connection_url); let pool = Pool::builder() - .max_size(max_pool_size) + .max_size(300) .min_idle(Some(1)) .build(manager) .await .unwrap(); log::info!("Pool created ..."); - let task_store = PgTaskStore::new(pool); - let (tx, mut rx) = tokio::sync::watch::channel(false); + let (notify_finished, wait_done) = tokio::sync::oneshot::channel(); + // Some global application context I want to pass to my background tasks - let my_app_context = MyApplicationContext::new("Backie Example App"); + let my_app_context = MyApplicationContext::new("Backie Example App", notify_finished); + + // queue.enqueue(task1).await.unwrap(); + // queue.enqueue(task2).await.unwrap(); + // queue.enqueue(task3).await.unwrap(); + + // Store all task to join them later + let mut tasks = JoinSet::new(); + + for i in 0..1_000 { + tasks.spawn({ + let pool = pool.clone(); + async move { + let mut connection = pool.get().await.unwrap(); + + let task = EmptyTask { idx: i }; + task.enqueue(&mut connection).await.unwrap(); + } + }); + } + + while let Some(result) = tasks.join_next().await { + let _ = result?; + } + + (FinalTask {}) + .enqueue(&mut pool.get().await.unwrap()) + .await + .unwrap(); + log::info!("Tasks created ..."); + + let started = Instant::now(); // Register the task types I want to use and start the worker pool - let join_handle = - WorkerPool::new(task_store.clone(), move || my_app_context.clone()) - .register_task_type::() - .register_task_type::() - .configure_queue("default".into()) - .start(async move { - let _ = rx.changed().await; - }) - .await - .unwrap(); + let join_handle = WorkerPool::new(PgTaskStore::new(pool.clone()), move || my_app_context.clone()) + .register_task_type::() + .register_task_type::() + .register_task_type::() + .register_task_type::() + .configure_queue("default".into()) + .configure_queue( + QueueConfig::new("loaded_queue") + .pull_interval(Duration::from_millis(100)) + .retention_mode(RetentionMode::RemoveDone) + .num_workers(300), + ) + .start(async move { + let _ = rx.changed().await; + }) + .await + .unwrap(); log::info!("Workers started ..."); - let task1 = MyTask::new(0); - let task2 = MyTask::new(20_000); - let task3 = MyFailingTask::new(50_000); - - let queue = Queue::new(task_store); - queue.enqueue(task1).await.unwrap(); - queue.enqueue(task2).await.unwrap(); - queue.enqueue(task3).await.unwrap(); - log::info!("Tasks created ..."); + wait_done.await.unwrap(); + let elapsed = started.elapsed(); + println!("Ran 50k jobs in {} seconds", elapsed.as_secs()); // Wait for Ctrl+C - let _ = tokio::signal::ctrl_c().await; + // let _ = tokio::signal::ctrl_c().await; log::info!("Stopping ..."); tx.send(true).unwrap(); join_handle.await.unwrap(); log::info!("Workers Stopped!"); + + Ok(()) }