From 2d4a0356e6653113457924a267dc15fb35afefca Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Mon, 29 Mar 2021 03:05:20 +0100 Subject: [PATCH] Add registry and document everything --- Cargo.toml | 4 + README.md | 176 +++++++++++++++++++++++++ sqlxmq_macros/Cargo.toml | 13 ++ sqlxmq_macros/src/lib.rs | 244 +++++++++++++++++++++++++++++++++++ src/hidden.rs | 6 + src/lib.rs | 269 ++++++++++++++++++++++++++++++++++++++- src/registry.rs | 129 +++++++++++++++++++ src/runner.rs | 71 ++++++++++- src/spawn.rs | 51 +++++++- src/utils.rs | 3 + 10 files changed, 957 insertions(+), 9 deletions(-) create mode 100644 README.md create mode 100644 sqlxmq_macros/Cargo.toml create mode 100644 sqlxmq_macros/src/lib.rs create mode 100644 src/hidden.rs create mode 100644 src/registry.rs diff --git a/Cargo.toml b/Cargo.toml index f56a9bc..aef6842 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,9 @@ version = "0.1.0" authors = ["Diggory Blake "] edition = "2018" +[workspace] +members = ["sqlxmq_macros"] + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] @@ -21,6 +24,7 @@ uuid = { version = "0.8.2", features = ["v4"] } log = "0.4.14" serde_json = "1.0.64" serde = "1.0.124" +sqlxmq_macros = { version = "0.1", path = "sqlxmq_macros" } [dev-dependencies] dotenv = "0.15.0" diff --git a/README.md b/README.md new file mode 100644 index 0000000..8b5bcac --- /dev/null +++ b/README.md @@ -0,0 +1,176 @@ +# sqlxmq + +A task queue built on `sqlx` and `PostgreSQL`. + +This library allows a CRUD application to run background tasks without complicating its +deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications +already using a `PostgreSQL` database. + +Although using a SQL database as a task queue means compromising on latency of +delivered tasks, there are several show-stopping issues present in ordinary task +queues which are avoided altogether. + +With any other task queue, in-flight tasks are state that is not covered by normal +database backups. Even if tasks _are_ backed up, there is no way to restore both +a database and a task queue to a consistent point-in-time without manually +resolving conflicts. + +By storing tasks in the database, existing backup procedures will store a perfectly +consistent state of both in-flight tasks and persistent data. Additionally, tasks can +be spawned and completed as part of other transactions, making it easy to write correct +application code. + +Leveraging the power of `PostgreSQL`, this task queue offers several features not +present in other task queues. + +# Features + +- **Send/receive multiple tasks at once.** + + This reduces the number of queries to the database. + +- **Send tasks to be executed at a future date and time.** + + Avoids the need for a separate scheduling system. + +- **Reliable delivery of tasks.** + +- **Automatic retries with exponential backoff.** + + Number of retries and initial backoff parameters are configurable. + +- **Transactional sending of tasks.** + + Avoids sending spurious tasks if a transaction is rolled back. + +- **Transactional completion of tasks.** + + If all side-effects of a task are updates to the database, this provides + true exactly-once execution of tasks. + +- **Transactional check-pointing of tasks.** + + Long-running tasks can check-point their state to avoid having to restart + from the beginning if there is a failure: the next retry can continue + from the last check-point. + +- **Opt-in strictly ordered task delivery.** + + Tasks within the same channel will be processed strictly in-order + if this option is enabled for the task. + +- **Fair task delivery.** + + A channel with a lot of tasks ready to run will not starve a channel with fewer + tasks. + +- **Opt-in two-phase commit.** + + This is particularly useful on an ordered channel where a position can be "reserved" + in the task order, but not committed until later. + +- **JSON and/or binary payloads.** + + Tasks can use whichever is most convenient. + +- **Automatic keep-alive of tasks.** + + Long-running tasks will automatically be "kept alive" to prevent them being + retried whilst they're still ongoing. + +- **Concurrency limits.** + + Specify the minimum and maximum number of concurrent tasks each runner should + handle. + +- **Built-in task registry via an attribute macro.** + + Tasks can be easily registered with a runner, and default configuration specified + on a per-task basis. + +- **Implicit channels.** + + Channels are implicitly created and destroyed when tasks are sent and processed, + so no setup is required. + +- **Channel groups.** + + Easily subscribe to multiple channels at once, thanks to the separation of + channel name and channel arguments. + +- **NOTIFY-based polling.** + + This saves resources when few tasks are being processed. + +# Getting started + +## Defining tasks + +The first step is to define a function to be run on the task queue. + +```rust +use sqlxmq::{task, CurrentTask}; + +// Arguments to the `#[task]` attribute allow setting default task options. +#[task(channel_name = "foo")] +async fn example_task( + mut current_task: CurrentTask, +) -> sqlx::Result<()> { + // Decode a JSON payload + let who: Option = current_task.json()?; + + // Do some work + println!("Hello, {}!", who.as_deref().unwrap_or("world")); + + // Mark the task as complete + current_task.complete().await?; + + Ok(()) +} +``` + +## Listening for tasks + +Next we need to create a task runner: this is what listens for new tasks +and executes them. + +```rust +use sqlxmq::TaskRegistry; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // You'll need to provide a Postgres connection pool. + let pool = connect_to_db().await?; + + // Construct a task registry from our single task. + let mut registry = TaskRegistry::new(&[example_task]); + // Here is where you can configure the registry + // registry.set_error_handler(...) + + let runner = registry + // Create a task runner using the connection pool. + .runner(&pool) + // Here is where you can configure the task runner + // Aim to keep 10-20 tasks running at a time. + .set_concurrency(10, 20) + // Start the task runner in the background. + .run() + .await?; + + // The task runner will continue listening and running + // tasks until `runner` is dropped. +} +``` + +## Spawning a task + +The final step is to actually run a task. + +```rust +example_task.new() + // This is where we override task configuration + .set_channel_name("bar") + .set_json("John") + .spawn(&pool) + .await?; +``` diff --git a/sqlxmq_macros/Cargo.toml b/sqlxmq_macros/Cargo.toml new file mode 100644 index 0000000..d70cda4 --- /dev/null +++ b/sqlxmq_macros/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "sqlxmq_macros" +version = "0.1.0" +authors = ["Diggory Blake "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +proc-macro = true + +[dependencies] +syn = { version = "1.0.64", features = ["derive"] } +quote = "1.0.9" diff --git a/sqlxmq_macros/src/lib.rs b/sqlxmq_macros/src/lib.rs new file mode 100644 index 0000000..6af2021 --- /dev/null +++ b/sqlxmq_macros/src/lib.rs @@ -0,0 +1,244 @@ +#![deny(missing_docs, unsafe_code)] +//! # sqlxmq_macros +//! +//! Provides procedural macros for the `sqlxmq` crate. + +use std::mem; + +use proc_macro::TokenStream; +use quote::quote; +use syn::{ + parse_macro_input, parse_quote, AttributeArgs, Error, ItemFn, Lit, Meta, NestedMeta, Path, + Result, Visibility, +}; + +#[derive(Default)] +struct TaskOptions { + proto: Option, + name: Option, + channel_name: Option, + retries: Option, + backoff_secs: Option, + ordered: Option, +} + +enum OptionValue<'a> { + None, + Lit(&'a Lit), + Path(&'a Path), +} + +fn interpret_task_arg(options: &mut TaskOptions, arg: NestedMeta) -> Result<()> { + fn error(arg: NestedMeta) -> Result<()> { + Err(Error::new_spanned(arg, "Unexpected attribute argument")) + } + match &arg { + NestedMeta::Lit(Lit::Str(s)) if options.name.is_none() => { + options.name = Some(s.value()); + } + NestedMeta::Meta(m) => { + if let Some(ident) = m.path().get_ident() { + let name = ident.to_string(); + let value = match &m { + Meta::List(l) => { + if let NestedMeta::Meta(Meta::Path(p)) = &l.nested[0] { + OptionValue::Path(p) + } else { + return error(arg); + } + } + Meta::Path(_) => OptionValue::None, + Meta::NameValue(nvp) => OptionValue::Lit(&nvp.lit), + }; + match (name.as_str(), value) { + ("proto", OptionValue::Path(p)) if options.proto.is_none() => { + options.proto = Some(p.clone()); + } + ("name", OptionValue::Lit(Lit::Str(s))) if options.name.is_none() => { + options.name = Some(s.value()); + } + ("channel_name", OptionValue::Lit(Lit::Str(s))) + if options.channel_name.is_none() => + { + options.channel_name = Some(s.value()); + } + ("retries", OptionValue::Lit(Lit::Int(n))) if options.retries.is_none() => { + options.name = Some(n.base10_parse()?); + } + ("backoff_secs", OptionValue::Lit(Lit::Float(n))) + if options.backoff_secs.is_none() => + { + options.backoff_secs = Some(n.base10_parse()?); + } + ("backoff_secs", OptionValue::Lit(Lit::Int(n))) + if options.backoff_secs.is_none() => + { + options.backoff_secs = Some(n.base10_parse()?); + } + ("ordered", OptionValue::None) if options.ordered.is_none() => { + options.ordered = Some(true); + } + ("ordered", OptionValue::Lit(Lit::Bool(b))) if options.ordered.is_none() => { + options.ordered = Some(b.value); + } + _ => return error(arg), + } + } + } + _ => return error(arg), + } + Ok(()) +} + +/// Marks a function as being a background task. +/// +/// The function must take a single `CurrentTask` argument, and should +/// be async or return a future. +/// +/// The async result must be a `Result<(), E>` type, where `E` is convertible +/// to a `Box`, which is the case for most +/// error types. +/// +/// Several options can be provided to the `#[task]` attribute: +/// +/// # Name +/// +/// ``` +/// #[task("example")] +/// #[task(name="example")] +/// ``` +/// +/// This overrides the name for this task. If unspecified, the fully-qualified +/// name of the function is used. If you move a task to a new module or rename +/// the function, you may which to override the task name to prevent it from +/// changing. +/// +/// # Channel name +/// +/// ``` +/// #[task(channel_name="foo")] +/// ``` +/// +/// This sets the default channel name on which the task will be spawned. +/// +/// # Retries +/// +/// ``` +/// #[task(retries = 3)] +/// ``` +/// +/// This sets the default number of retries for the task. +/// +/// # Retry backoff +/// +/// ``` +/// #[task(backoff_secs=1.5)] +/// #[task(backoff_secs=2)] +/// ``` +/// +/// This sets the default initial retry backoff for the task in seconds. +/// +/// # Ordered +/// +/// ``` +/// #[task(ordered)] +/// #[task(ordered=true)] +/// #[task(ordered=false)] +/// ``` +/// +/// This sets whether the task will be strictly ordered by default. +/// +/// # Prototype +/// +/// ``` +/// fn my_proto<'a, 'b>( +/// builder: &'a mut TaskBuilder<'b> +/// ) -> &'a mut TaskBuilder<'b> { +/// builder.set_channel_name("bar") +/// } +/// +/// #[task(proto(my_proto))] +/// ``` +/// +/// This allows setting several task options at once using the specified function, +/// and can be convient if you have several tasks which should have similar +/// defaults. +/// +/// # Combinations +/// +/// Multiple task options can be combined. The order is not important, but the +/// prototype will always be applied first so that explicit options can override it. +/// Each option can only be provided once in the attribute. +/// +/// ``` +/// #[task("my_task", proto(my_proto), retries=0, ordered)] +/// ``` +/// +#[proc_macro_attribute] +pub fn task(attr: TokenStream, item: TokenStream) -> TokenStream { + let args = parse_macro_input!(attr as AttributeArgs); + let mut inner_fn = parse_macro_input!(item as ItemFn); + + let mut options = TaskOptions::default(); + let mut errors = Vec::new(); + for arg in args { + if let Err(e) = interpret_task_arg(&mut options, arg) { + errors.push(e.into_compile_error()); + } + } + + let vis = mem::replace(&mut inner_fn.vis, Visibility::Inherited); + let name = mem::replace(&mut inner_fn.sig.ident, parse_quote! {inner}); + let fq_name = if let Some(name) = options.name { + quote! { #name } + } else { + let name_str = name.to_string(); + quote! { concat!(module_path!(), "::", #name_str) } + }; + + let mut chain = Vec::new(); + if let Some(proto) = &options.proto { + chain.push(quote! { + .set_proto(#proto) + }); + } + if let Some(channel_name) = &options.channel_name { + chain.push(quote! { + .set_channel_name(#channel_name) + }); + } + if let Some(retries) = &options.retries { + chain.push(quote! { + .set_retries(#retries) + }); + } + if let Some(backoff_secs) = &options.backoff_secs { + chain.push(quote! { + .set_retry_backoff(::std::time::Duration::from_secs_f64(#backoff_secs)) + }); + } + if let Some(ordered) = options.ordered { + chain.push(quote! { + .set_ordered(#ordered) + }); + } + + let expanded = quote! { + #(#errors)* + #[allow(non_upper_case_globals)] + #vis static #name: &'static sqlxmq::NamedTask = &{ + #inner_fn + sqlxmq::NamedTask::new_internal( + #fq_name, + sqlxmq::hidden::BuildFn(|builder| { + builder #(#chain)* + }), + sqlxmq::hidden::RunFn(|registry, current_task| { + registry.spawn_internal(#fq_name, inner(current_task)); + }), + ) + }; + }; + // Hand the output tokens back to the compiler. + TokenStream::from(expanded) +} diff --git a/src/hidden.rs b/src/hidden.rs new file mode 100644 index 0000000..1673871 --- /dev/null +++ b/src/hidden.rs @@ -0,0 +1,6 @@ +use crate::{CurrentTask, TaskBuilder, TaskRegistry}; + +#[doc(hidden)] +pub struct BuildFn(pub for<'a> fn(&'a mut TaskBuilder<'static>) -> &'a mut TaskBuilder<'static>); +#[doc(hidden)] +pub struct RunFn(pub fn(&TaskRegistry, CurrentTask)); diff --git a/src/lib.rs b/src/lib.rs index cba97bc..cf24930 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,201 @@ +#![deny(missing_docs, unsafe_code)] +//! # sqlxmq +//! +//! A task queue built on `sqlx` and `PostgreSQL`. +//! +//! This library allows a CRUD application to run background tasks without complicating its +//! deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications +//! already using a `PostgreSQL` database. +//! +//! Although using a SQL database as a task queue means compromising on latency of +//! delivered tasks, there are several show-stopping issues present in ordinary task +//! queues which are avoided altogether. +//! +//! With any other task queue, in-flight tasks are state that is not covered by normal +//! database backups. Even if tasks _are_ backed up, there is no way to restore both +//! a database and a task queue to a consistent point-in-time without manually +//! resolving conflicts. +//! +//! By storing tasks in the database, existing backup procedures will store a perfectly +//! consistent state of both in-flight tasks and persistent data. Additionally, tasks can +//! be spawned and completed as part of other transactions, making it easy to write correct +//! application code. +//! +//! Leveraging the power of `PostgreSQL`, this task queue offers several features not +//! present in other task queues. +//! +//! # Features +//! +//! - **Send/receive multiple tasks at once.** +//! +//! This reduces the number of queries to the database. +//! +//! - **Send tasks to be executed at a future date and time.** +//! +//! Avoids the need for a separate scheduling system. +//! +//! - **Reliable delivery of tasks.** +//! +//! - **Automatic retries with exponential backoff.** +//! +//! Number of retries and initial backoff parameters are configurable. +//! +//! - **Transactional sending of tasks.** +//! +//! Avoids sending spurious tasks if a transaction is rolled back. +//! +//! - **Transactional completion of tasks.** +//! +//! If all side-effects of a task are updates to the database, this provides +//! true exactly-once execution of tasks. +//! +//! - **Transactional check-pointing of tasks.** +//! +//! Long-running tasks can check-point their state to avoid having to restart +//! from the beginning if there is a failure: the next retry can continue +//! from the last check-point. +//! +//! - **Opt-in strictly ordered task delivery.** +//! +//! Tasks within the same channel will be processed strictly in-order +//! if this option is enabled for the task. +//! +//! - **Fair task delivery.** +//! +//! A channel with a lot of tasks ready to run will not starve a channel with fewer +//! tasks. +//! +//! - **Opt-in two-phase commit.** +//! +//! This is particularly useful on an ordered channel where a position can be "reserved" +//! in the task order, but not committed until later. +//! +//! - **JSON and/or binary payloads.** +//! +//! Tasks can use whichever is most convenient. +//! +//! - **Automatic keep-alive of tasks.** +//! +//! Long-running tasks will automatically be "kept alive" to prevent them being +//! retried whilst they're still ongoing. +//! +//! - **Concurrency limits.** +//! +//! Specify the minimum and maximum number of concurrent tasks each runner should +//! handle. +//! +//! - **Built-in task registry via an attribute macro.** +//! +//! Tasks can be easily registered with a runner, and default configuration specified +//! on a per-task basis. +//! +//! - **Implicit channels.** +//! +//! Channels are implicitly created and destroyed when tasks are sent and processed, +//! so no setup is required. +//! +//! - **Channel groups.** +//! +//! Easily subscribe to multiple channels at once, thanks to the separation of +//! channel name and channel arguments. +//! +//! - **NOTIFY-based polling.** +//! +//! This saves resources when few tasks are being processed. +//! +//! # Getting started +//! +//! ## Defining tasks +//! +//! The first step is to define a function to be run on the task queue. +//! +//! ```rust +//! use sqlxmq::{task, CurrentTask}; +//! +//! // Arguments to the `#[task]` attribute allow setting default task options. +//! #[task(channel_name = "foo")] +//! async fn example_task( +//! mut current_task: CurrentTask, +//! ) -> sqlx::Result<()> { +//! // Decode a JSON payload +//! let who: Option = current_task.json()?; +//! +//! // Do some work +//! println!("Hello, {}!", who.as_deref().unwrap_or("world")); +//! +//! // Mark the task as complete +//! current_task.complete().await?; +//! +//! Ok(()) +//! } +//! ``` +//! +//! ## Listening for tasks +//! +//! Next we need to create a task runner: this is what listens for new tasks +//! and executes them. +//! +//! ```rust +//! use sqlxmq::TaskRegistry; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! // You'll need to provide a Postgres connection pool. +//! let pool = connect_to_db().await?; +//! +//! // Construct a task registry from our single task. +//! let mut registry = TaskRegistry::new(&[example_task]); +//! // Here is where you can configure the registry +//! // registry.set_error_handler(...) +//! +//! let runner = registry +//! // Create a task runner using the connection pool. +//! .runner(&pool) +//! // Here is where you can configure the task runner +//! // Aim to keep 10-20 tasks running at a time. +//! .set_concurrency(10, 20) +//! // Start the task runner in the background. +//! .run() +//! .await?; +//! +//! // The task runner will continue listening and running +//! // tasks until `runner` is dropped. +//! } +//! ``` +//! +//! ## Spawning a task +//! +//! The final step is to actually run a task. +//! +//! ```rust +//! example_task.new() +//! // This is where we override task configuration +//! .set_channel_name("bar") +//! .set_json("John") +//! .spawn(&pool) +//! .await?; +//! ``` + +#[doc(hidden)] +pub mod hidden; +mod registry; mod runner; mod spawn; mod utils; +pub use registry::*; pub use runner::*; pub use spawn::*; +pub use sqlxmq_macros::task; pub use utils::OwnedTask; #[cfg(test)] mod tests { use super::*; + use crate as sqlxmq; use std::env; + use std::error::Error; use std::future::Future; use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -78,8 +263,36 @@ mod tests { (runner, counter) } + fn task_proto<'a, 'b>(builder: &'a mut TaskBuilder<'b>) -> &'a mut TaskBuilder<'b> { + builder.set_channel_name("bar") + } + + #[task(channel_name = "foo", ordered, retries = 3, backoff_secs = 2.0)] + async fn example_task1( + mut current_task: CurrentTask, + ) -> Result<(), Box> { + current_task.complete().await?; + Ok(()) + } + + #[task(proto(task_proto))] + async fn example_task2( + mut current_task: CurrentTask, + ) -> Result<(), Box> { + current_task.complete().await?; + Ok(()) + } + + async fn named_task_runner(pool: &Pool) -> OwnedTask { + TaskRegistry::new(&[example_task1, example_task2]) + .runner(pool) + .run() + .await + .unwrap() + } + async fn pause() { - pause_ms(50).await; + pause_ms(100).await; } async fn pause_ms(ms: u64) { @@ -164,7 +377,7 @@ mod tests { let pool = &*test_pool().await; let (_runner, counter) = test_task_runner(&pool, move |_| async {}).await; - let backoff = 100; + let backoff = 200; assert_eq!(counter.load(Ordering::SeqCst), 0); TaskBuilder::new("foo") @@ -192,4 +405,56 @@ mod tests { pause_ms(backoff * 5).await; assert_eq!(counter.load(Ordering::SeqCst), 3); } + + #[tokio::test] + async fn it_can_checkpoint_tasks() { + let pool = &*test_pool().await; + let (_runner, counter) = test_task_runner(&pool, move |mut current_task| async move { + let state: bool = current_task.json().unwrap().unwrap(); + if state { + current_task.complete().await.unwrap(); + } else { + current_task + .checkpoint(Checkpoint::new().set_json(&true).unwrap()) + .await + .unwrap(); + } + }) + .await; + + let backoff = 200; + + assert_eq!(counter.load(Ordering::SeqCst), 0); + TaskBuilder::new("foo") + .set_retry_backoff(Duration::from_millis(backoff)) + .set_retries(5) + .set_json(&false) + .unwrap() + .spawn(pool) + .await + .unwrap(); + + // First attempt + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 1); + + // Second attempt + pause_ms(backoff).await; + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 2); + + // No more attempts + pause_ms(backoff * 3).await; + assert_eq!(counter.load(Ordering::SeqCst), 2); + } + + #[tokio::test] + async fn it_can_use_registry() { + let pool = &*test_pool().await; + let _runner = named_task_runner(pool).await; + + example_task1.new().spawn(pool).await.unwrap(); + example_task2.new().spawn(pool).await.unwrap(); + pause().await; + } } diff --git a/src/registry.rs b/src/registry.rs new file mode 100644 index 0000000..8d587fe --- /dev/null +++ b/src/registry.rs @@ -0,0 +1,129 @@ +use std::collections::HashMap; +use std::error::Error; +use std::fmt::Display; +use std::future::Future; +use std::sync::Arc; + +use sqlx::{Pool, Postgres}; +use uuid::Uuid; + +use crate::hidden::{BuildFn, RunFn}; +use crate::utils::Opaque; +use crate::{TaskBuilder, TaskRunnerOptions}; + +/// Stores a mapping from task name to task. Can be used to construct +/// a task runner. +pub struct TaskRegistry { + error_handler: Arc) + Send + Sync>, + task_map: HashMap<&'static str, &'static NamedTask>, +} + +/// Error returned when a task is received whose name is not in the registry. +#[derive(Debug)] +pub struct UnknownTaskError; + +impl Error for UnknownTaskError {} +impl Display for UnknownTaskError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Unknown task") + } +} + +impl TaskRegistry { + /// Construct a new task registry from the provided task list. + pub fn new(tasks: &[&'static NamedTask]) -> Self { + let mut task_map = HashMap::new(); + for &task in tasks { + if task_map.insert(task.name(), task).is_some() { + panic!("Duplicate task registered: {}", task.name()); + } + } + Self { + error_handler: Arc::new(Self::default_error_handler), + task_map, + } + } + + /// Set a function to be called whenever a task returns an error. + pub fn set_error_handler( + &mut self, + error_handler: impl Fn(&str, Box) + Send + Sync + 'static, + ) -> &mut Self { + self.error_handler = Arc::new(error_handler); + self + } + + /// Look-up a task by name. + pub fn resolve_task(&self, name: &str) -> Option<&'static NamedTask> { + self.task_map.get(name).copied() + } + + /// The default error handler implementation, which simply logs the error. + pub fn default_error_handler(name: &str, error: Box) { + log::error!("Task {} failed: {}", name, error); + } + + #[doc(hidden)] + pub fn spawn_internal>>( + &self, + name: &'static str, + f: impl Future> + Send + 'static, + ) { + let error_handler = self.error_handler.clone(); + tokio::spawn(async move { + if let Err(e) = f.await { + error_handler(name, e.into()); + } + }); + } + + /// Construct a task runner from this registry and the provided connection + /// pool. + pub fn runner(self, pool: &Pool) -> TaskRunnerOptions { + TaskRunnerOptions::new(pool, move |current_task| { + if let Some(task) = self.resolve_task(current_task.name()) { + (task.run_fn.0 .0)(&self, current_task); + } else { + (self.error_handler)(current_task.name(), Box::new(UnknownTaskError)) + } + }) + } +} + +/// Type for a named task. Functions annotated with `#[task]` are +/// transformed into static variables whose type is `&'static NamedTask`. +#[derive(Debug)] +pub struct NamedTask { + name: &'static str, + build_fn: Opaque, + run_fn: Opaque, +} + +impl NamedTask { + #[doc(hidden)] + pub const fn new_internal(name: &'static str, build_fn: BuildFn, run_fn: RunFn) -> Self { + Self { + name, + build_fn: Opaque(build_fn), + run_fn: Opaque(run_fn), + } + } + /// Initialize a task builder with the name and defaults of this task. + pub fn new(&self) -> TaskBuilder<'static> { + let mut builder = TaskBuilder::new(self.name); + (self.build_fn.0 .0)(&mut builder); + builder + } + /// Initialize a task builder with the name and defaults of this task, + /// using the provided task ID. + pub fn new_with_id(&self, id: Uuid) -> TaskBuilder<'static> { + let mut builder = TaskBuilder::new_with_id(id, self.name); + (self.build_fn.0 .0)(&mut builder); + builder + } + + /// Returns the name of this task. + pub const fn name(&self) -> &'static str { + self.name + } +} diff --git a/src/runner.rs b/src/runner.rs index 550ad95..bec5d7b 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -14,12 +14,13 @@ use uuid::Uuid; use crate::utils::{Opaque, OwnedTask}; +/// Type used to build a task runner. #[derive(Debug, Clone)] pub struct TaskRunnerOptions { min_concurrency: usize, max_concurrency: usize, channel_names: Option>, - runner: Opaque>, + dispatch: Opaque>, pool: Pool, keep_alive: bool, } @@ -31,6 +32,7 @@ struct TaskRunner { notify: Notify, } +/// Type used to checkpoint a running task. #[derive(Debug, Clone)] pub struct Checkpoint<'a> { duration: Duration, @@ -40,7 +42,9 @@ pub struct Checkpoint<'a> { } impl<'a> Checkpoint<'a> { - pub fn new(duration: Duration) -> Self { + /// Construct a new checkpoint which also keeps the task alive + /// for the specified interval. + pub fn new_keep_alive(duration: Duration) -> Self { Self { duration, extra_retries: 0, @@ -48,18 +52,26 @@ impl<'a> Checkpoint<'a> { payload_bytes: None, } } + /// Construct a new checkpoint. + pub fn new() -> Self { + Self::new_keep_alive(Duration::from_secs(0)) + } + /// Add extra retries to the current task. pub fn set_extra_retries(&mut self, extra_retries: usize) -> &mut Self { self.extra_retries = extra_retries; self } + /// Specify a new raw JSON payload. pub fn set_raw_json(&mut self, raw_json: &'a str) -> &mut Self { self.payload_json = Some(Cow::Borrowed(raw_json)); self } + /// Specify a new raw binary payload. pub fn set_raw_bytes(&mut self, raw_bytes: &'a [u8]) -> &mut Self { self.payload_bytes = Some(raw_bytes); self } + /// Specify a new JSON payload. pub fn set_json(&mut self, value: &T) -> Result<&mut Self, serde_json::Error> { let value = serde_json::to_string(value)?; self.payload_json = Some(Cow::Owned(value)); @@ -82,6 +94,10 @@ impl<'a> Checkpoint<'a> { } } +/// Handle to the currently executing task. +/// When dropped, the task is assumed to no longer be running. +/// To prevent the task being retried, it must be explicitly completed using +/// one of the `.complete_` methods. #[derive(Debug)] pub struct CurrentTask { id: Uuid, @@ -93,6 +109,7 @@ pub struct CurrentTask { } impl CurrentTask { + /// Returns the database pool used to receive this task. pub fn pool(&self) -> &Pool { &self.task_runner.options.pool } @@ -106,6 +123,8 @@ impl CurrentTask { .await?; Ok(()) } + /// Complete this task and commit the provided transaction at the same time. + /// If the transaction cannot be committed, the task will not be completed. pub async fn complete_with_transaction( &mut self, mut tx: sqlx::Transaction<'_, Postgres>, @@ -115,11 +134,15 @@ impl CurrentTask { self.keep_alive = None; Ok(()) } + /// Complete this task. pub async fn complete(&mut self) -> Result<(), sqlx::Error> { self.delete(self.pool()).await?; self.keep_alive = None; Ok(()) } + /// Checkpoint this task and commit the provided transaction at the same time. + /// If the transaction cannot be committed, the task will not be checkpointed. + /// Checkpointing allows the task payload to be replaced for the next retry. pub async fn checkpoint_with_transaction( &mut self, mut tx: sqlx::Transaction<'_, Postgres>, @@ -129,10 +152,12 @@ impl CurrentTask { tx.commit().await?; Ok(()) } + /// Checkpointing allows the task payload to be replaced for the next retry. pub async fn checkpoint(&mut self, checkpoint: &Checkpoint<'_>) -> Result<(), sqlx::Error> { checkpoint.execute(self.id, self.pool()).await?; Ok(()) } + /// Prevent this task from being retried for the specified interval. pub async fn keep_alive(&mut self, duration: Duration) -> Result<(), sqlx::Error> { sqlx::query("SELECT mq_keep_alive(ARRAY[$1], $2)") .bind(self.id) @@ -141,12 +166,15 @@ impl CurrentTask { .await?; Ok(()) } + /// Returns the ID of this task. pub fn id(&self) -> Uuid { self.id } + /// Returns the name of this task. pub fn name(&self) -> &str { &self.name } + /// Extracts the JSON payload belonging to this task (if present). pub fn json<'a, T: Deserialize<'a>>(&'a self) -> Result, serde_json::Error> { if let Some(payload_json) = &self.payload_json { serde_json::from_str(payload_json).map(Some) @@ -154,9 +182,11 @@ impl CurrentTask { Ok(None) } } + /// Returns the raw JSON payload for this task. pub fn raw_json(&self) -> Option<&str> { self.payload_json.as_deref() } + /// Returns the raw binary payload for this task. pub fn raw_bytes(&self) -> Option<&[u8]> { self.payload_bytes.as_deref() } @@ -176,16 +206,49 @@ impl Drop for CurrentTask { } impl TaskRunnerOptions { + /// Begin constructing a new task runner using the specified connection pool, + /// and the provided execution function. pub fn new(pool: &Pool, f: F) -> Self { Self { min_concurrency: 16, max_concurrency: 32, channel_names: None, keep_alive: true, - runner: Opaque(Arc::new(f)), + dispatch: Opaque(Arc::new(f)), pool: pool.clone(), } } + /// Set the concurrency limits for this task runner. When the number of active + /// tasks falls below the minimum, the runner will poll for more, up to the maximum. + /// + /// The difference between the min and max will dictate the maximum batch size which + /// can be received: larger batch sizes are more efficient. + pub fn set_concurrency(&mut self, min_concurrency: usize, max_concurrency: usize) -> &mut Self { + self.min_concurrency = min_concurrency; + self.max_concurrency = max_concurrency; + self + } + /// Set the channel names which this task runner will subscribe to. If unspecified, + /// the task runner will subscribe to all channels. + pub fn set_channel_names<'a>(&'a mut self, channel_names: &[&str]) -> &'a mut Self { + self.channel_names = Some( + channel_names + .iter() + .copied() + .map(ToOwned::to_owned) + .collect(), + ); + self + } + /// Choose whether to automatically keep tasks alive whilst they're still + /// running. Defaults to `true`. + pub fn set_keep_alive(&mut self, keep_alive: bool) -> &mut Self { + self.keep_alive = keep_alive; + self + } + + /// Start the task runner in the background. The task runner will stop when the + /// returned handle is dropped. pub async fn run(&self) -> Result { let options = self.clone(); let task_runner = Arc::new(TaskRunner { @@ -304,7 +367,7 @@ async fn poll_and_dispatch( keep_alive, }; task_runner.running_tasks.fetch_add(1, Ordering::SeqCst); - (options.runner)(current_task); + (options.dispatch)(current_task); } } diff --git a/src/spawn.rs b/src/spawn.rs index da90905..8f636a8 100644 --- a/src/spawn.rs +++ b/src/spawn.rs @@ -6,13 +6,14 @@ use serde::Serialize; use sqlx::Postgres; use uuid::Uuid; +/// Type for building a task to send. #[derive(Debug, Clone)] pub struct TaskBuilder<'a> { id: Uuid, delay: Duration, channel_name: &'a str, channel_args: &'a str, - retries: usize, + retries: u32, retry_backoff: Duration, commit_interval: Option, ordered: bool, @@ -22,9 +23,11 @@ pub struct TaskBuilder<'a> { } impl<'a> TaskBuilder<'a> { + /// Prepare to send a task with the specified name. pub fn new(name: &'a str) -> Self { Self::new_with_id(Uuid::new_v4(), name) } + /// Prepare to send a task with the specified name and ID. pub fn new_with_id(id: Uuid, name: &'a str) -> Self { Self { id, @@ -40,47 +43,76 @@ impl<'a> TaskBuilder<'a> { payload_bytes: None, } } + /// Use the provided function to set any number of configuration + /// options at once. + pub fn set_proto<'b>( + &'b mut self, + proto: impl FnOnce(&'b mut Self) -> &'b mut Self, + ) -> &'b mut Self { + proto(self) + } + /// Set the channel name (default ""). pub fn set_channel_name(&mut self, channel_name: &'a str) -> &mut Self { self.channel_name = channel_name; self } + /// Set the channel arguments (default ""). pub fn set_channel_args(&mut self, channel_args: &'a str) -> &mut Self { self.channel_args = channel_args; self } - pub fn set_retries(&mut self, retries: usize) -> &mut Self { + /// Set the number of retries after the initial attempt (default 4). + pub fn set_retries(&mut self, retries: u32) -> &mut Self { self.retries = retries; self } + /// Set the initial backoff for retries (default 1s). pub fn set_retry_backoff(&mut self, retry_backoff: Duration) -> &mut Self { self.retry_backoff = retry_backoff; self } + /// Set the commit interval for two-phase commit (default disabled). pub fn set_commit_interval(&mut self, commit_interval: Option) -> &mut Self { self.commit_interval = commit_interval; self } + /// Set whether this task is strictly ordered with respect to other ordered + /// task in the same channel (default false). pub fn set_ordered(&mut self, ordered: bool) -> &mut Self { self.ordered = ordered; self } + + /// Set a delay before this task is executed (default none). pub fn set_delay(&mut self, delay: Duration) -> &mut Self { self.delay = delay; self } + + /// Set a raw JSON payload for the task. pub fn set_raw_json(&mut self, raw_json: &'a str) -> &mut Self { self.payload_json = Some(Cow::Borrowed(raw_json)); self } + + /// Set a raw binary payload for the task. pub fn set_raw_bytes(&mut self, raw_bytes: &'a [u8]) -> &mut Self { self.payload_bytes = Some(raw_bytes); self } - pub fn set_json(&mut self, value: &T) -> Result<&mut Self, serde_json::Error> { + + /// Set a JSON payload for the task. + pub fn set_json( + &mut self, + value: &T, + ) -> Result<&mut Self, serde_json::Error> { let value = serde_json::to_string(value)?; self.payload_json = Some(Cow::Owned(value)); Ok(self) } + + /// Spawn the task using the given executor. This might be a connection + /// pool, a connection, or a transaction. pub async fn spawn<'b, E: sqlx::Executor<'b, Database = Postgres>>( &self, executor: E, @@ -104,3 +136,16 @@ impl<'a> TaskBuilder<'a> { Ok(self.id) } } + +/// Commit the specified tasks. The tasks should have been previously spawned +/// with the two-phase commit option enabled. +pub async fn commit<'b, E: sqlx::Executor<'b, Database = Postgres>>( + executor: E, + task_ids: &[Uuid], +) -> Result<(), sqlx::Error> { + sqlx::query("SELECT mq_commit($1)") + .bind(task_ids) + .execute(executor) + .await?; + Ok(()) +} diff --git a/src/utils.rs b/src/utils.rs index 8ab8cfc..593b486 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -27,6 +27,9 @@ impl DerefMut for Opaque { } } +/// A handle to a background task which will be automatically cancelled if +/// the handle is dropped. Extract the inner join handle to prevent this +/// behaviour. #[derive(Debug)] pub struct OwnedTask(pub JoinHandle<()>);