Rename task -> job to avoid confusion with async tasks

This commit is contained in:
Diggory Blake 2021-03-29 21:39:07 +01:00
parent 2d4a0356e6
commit 37f70afa33
9 changed files with 343 additions and 346 deletions

124
README.md
View file

@ -1,96 +1,96 @@
# sqlxmq # sqlxmq
A task queue built on `sqlx` and `PostgreSQL`. A job queue built on `sqlx` and `PostgreSQL`.
This library allows a CRUD application to run background tasks without complicating its This library allows a CRUD application to run background jobs without complicating its
deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications
already using a `PostgreSQL` database. already using a `PostgreSQL` database.
Although using a SQL database as a task queue means compromising on latency of Although using a SQL database as a job queue means compromising on latency of
delivered tasks, there are several show-stopping issues present in ordinary task delivered jobs, there are several show-stopping issues present in ordinary job
queues which are avoided altogether. queues which are avoided altogether.
With any other task queue, in-flight tasks are state that is not covered by normal With most other job queues, in-flight jobs are state that is not covered by normal
database backups. Even if tasks _are_ backed up, there is no way to restore both database backups. Even if jobs _are_ backed up, there is no way to restore both
a database and a task queue to a consistent point-in-time without manually a database and a job queue to a consistent point-in-time without manually
resolving conflicts. resolving conflicts.
By storing tasks in the database, existing backup procedures will store a perfectly By storing jobs in the database, existing backup procedures will store a perfectly
consistent state of both in-flight tasks and persistent data. Additionally, tasks can consistent state of both in-flight jobs and persistent data. Additionally, jobs can
be spawned and completed as part of other transactions, making it easy to write correct be spawned and completed as part of other transactions, making it easy to write correct
application code. application code.
Leveraging the power of `PostgreSQL`, this task queue offers several features not Leveraging the power of `PostgreSQL`, this job queue offers several features not
present in other task queues. present in other job queues.
# Features # Features
- **Send/receive multiple tasks at once.** - **Send/receive multiple jobs at once.**
This reduces the number of queries to the database. This reduces the number of queries to the database.
- **Send tasks to be executed at a future date and time.** - **Send jobs to be executed at a future date and time.**
Avoids the need for a separate scheduling system. Avoids the need for a separate scheduling system.
- **Reliable delivery of tasks.** - **Reliable delivery of jobs.**
- **Automatic retries with exponential backoff.** - **Automatic retries with exponential backoff.**
Number of retries and initial backoff parameters are configurable. Number of retries and initial backoff parameters are configurable.
- **Transactional sending of tasks.** - **Transactional sending of jobs.**
Avoids sending spurious tasks if a transaction is rolled back. Avoids sending spurious jobs if a transaction is rolled back.
- **Transactional completion of tasks.** - **Transactional completion of jobs.**
If all side-effects of a task are updates to the database, this provides If all side-effects of a job are updates to the database, this provides
true exactly-once execution of tasks. true exactly-once execution of jobs.
- **Transactional check-pointing of tasks.** - **Transactional check-pointing of jobs.**
Long-running tasks can check-point their state to avoid having to restart Long-running jobs can check-point their state to avoid having to restart
from the beginning if there is a failure: the next retry can continue from the beginning if there is a failure: the next retry can continue
from the last check-point. from the last check-point.
- **Opt-in strictly ordered task delivery.** - **Opt-in strictly ordered job delivery.**
Tasks within the same channel will be processed strictly in-order Jobs within the same channel will be processed strictly in-order
if this option is enabled for the task. if this option is enabled for the job.
- **Fair task delivery.** - **Fair job delivery.**
A channel with a lot of tasks ready to run will not starve a channel with fewer A channel with a lot of jobs ready to run will not starve a channel with fewer
tasks. jobs.
- **Opt-in two-phase commit.** - **Opt-in two-phase commit.**
This is particularly useful on an ordered channel where a position can be "reserved" This is particularly useful on an ordered channel where a position can be "reserved"
in the task order, but not committed until later. in the job order, but not committed until later.
- **JSON and/or binary payloads.** - **JSON and/or binary payloads.**
Tasks can use whichever is most convenient. Jobs can use whichever is most convenient.
- **Automatic keep-alive of tasks.** - **Automatic keep-alive of jobs.**
Long-running tasks will automatically be "kept alive" to prevent them being Long-running jobs will automatically be "kept alive" to prevent them being
retried whilst they're still ongoing. retried whilst they're still ongoing.
- **Concurrency limits.** - **Concurrency limits.**
Specify the minimum and maximum number of concurrent tasks each runner should Specify the minimum and maximum number of concurrent jobs each runner should
handle. handle.
- **Built-in task registry via an attribute macro.** - **Built-in job registry via an attribute macro.**
Tasks can be easily registered with a runner, and default configuration specified Jobs can be easily registered with a runner, and default configuration specified
on a per-task basis. on a per-job basis.
- **Implicit channels.** - **Implicit channels.**
Channels are implicitly created and destroyed when tasks are sent and processed, Channels are implicitly created and destroyed when jobs are sent and processed,
so no setup is required. so no setup is required.
- **Channel groups.** - **Channel groups.**
@ -100,75 +100,75 @@ present in other task queues.
- **NOTIFY-based polling.** - **NOTIFY-based polling.**
This saves resources when few tasks are being processed. This saves resources when few jobs are being processed.
# Getting started # Getting started
## Defining tasks ## Defining jobs
The first step is to define a function to be run on the task queue. The first step is to define a function to be run on the job queue.
```rust ```rust
use sqlxmq::{task, CurrentTask}; use sqlxmq::{job, CurrentJob};
// Arguments to the `#[task]` attribute allow setting default task options. // Arguments to the `#[job]` attribute allow setting default job options.
#[task(channel_name = "foo")] #[job(channel_name = "foo")]
async fn example_task( async fn example_job(
mut current_task: CurrentTask, mut current_job: CurrentJob,
) -> sqlx::Result<()> { ) -> sqlx::Result<()> {
// Decode a JSON payload // Decode a JSON payload
let who: Option<String> = current_task.json()?; let who: Option<String> = current_job.json()?;
// Do some work // Do some work
println!("Hello, {}!", who.as_deref().unwrap_or("world")); println!("Hello, {}!", who.as_deref().unwrap_or("world"));
// Mark the task as complete // Mark the job as complete
current_task.complete().await?; current_job.complete().await?;
Ok(()) Ok(())
} }
``` ```
## Listening for tasks ## Listening for jobs
Next we need to create a task runner: this is what listens for new tasks Next we need to create a job runner: this is what listens for new jobs
and executes them. and executes them.
```rust ```rust
use sqlxmq::TaskRegistry; use sqlxmq::JobRegistry;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error>> {
// You'll need to provide a Postgres connection pool. // You'll need to provide a Postgres connection pool.
let pool = connect_to_db().await?; let pool = connect_to_db().await?;
// Construct a task registry from our single task. // Construct a job registry from our single job.
let mut registry = TaskRegistry::new(&[example_task]); let mut registry = JobRegistry::new(&[example_job]);
// Here is where you can configure the registry // Here is where you can configure the registry
// registry.set_error_handler(...) // registry.set_error_handler(...)
let runner = registry let runner = registry
// Create a task runner using the connection pool. // Create a job runner using the connection pool.
.runner(&pool) .runner(&pool)
// Here is where you can configure the task runner // Here is where you can configure the job runner
// Aim to keep 10-20 tasks running at a time. // Aim to keep 10-20 jobs running at a time.
.set_concurrency(10, 20) .set_concurrency(10, 20)
// Start the task runner in the background. // Start the job runner in the background.
.run() .run()
.await?; .await?;
// The task runner will continue listening and running // The job runner will continue listening and running
// tasks until `runner` is dropped. // jobs until `runner` is dropped.
} }
``` ```
## Spawning a task ## Spawning a job
The final step is to actually run a task. The final step is to actually run a job.
```rust ```rust
example_task.new() example_job.new()
// This is where we override task configuration // This is where we override job configuration
.set_channel_name("bar") .set_channel_name("bar")
.set_json("John") .set_json("John")
.spawn(&pool) .spawn(&pool)

View file

@ -96,7 +96,7 @@ RETURNS TABLE(name TEXT, args TEXT) AS $$
LIMIT batch_size LIMIT batch_size
$$ LANGUAGE SQL STABLE; $$ LANGUAGE SQL STABLE;
-- Main entry-point for task runner: pulls a batch of messages from the queue. -- Main entry-point for job runner: pulls a batch of messages from the queue.
CREATE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) CREATE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
RETURNS TABLE( RETURNS TABLE(
id UUID, id UUID,

View file

@ -13,7 +13,7 @@ use syn::{
}; };
#[derive(Default)] #[derive(Default)]
struct TaskOptions { struct JobOptions {
proto: Option<Path>, proto: Option<Path>,
name: Option<String>, name: Option<String>,
channel_name: Option<String>, channel_name: Option<String>,
@ -28,7 +28,7 @@ enum OptionValue<'a> {
Path(&'a Path), Path(&'a Path),
} }
fn interpret_task_arg(options: &mut TaskOptions, arg: NestedMeta) -> Result<()> { fn interpret_job_arg(options: &mut JobOptions, arg: NestedMeta) -> Result<()> {
fn error(arg: NestedMeta) -> Result<()> { fn error(arg: NestedMeta) -> Result<()> {
Err(Error::new_spanned(arg, "Unexpected attribute argument")) Err(Error::new_spanned(arg, "Unexpected attribute argument"))
} }
@ -90,99 +90,99 @@ fn interpret_task_arg(options: &mut TaskOptions, arg: NestedMeta) -> Result<()>
Ok(()) Ok(())
} }
/// Marks a function as being a background task. /// Marks a function as being a background job.
/// ///
/// The function must take a single `CurrentTask` argument, and should /// The function must take a single `CurrentJob` argument, and should
/// be async or return a future. /// be async or return a future.
/// ///
/// The async result must be a `Result<(), E>` type, where `E` is convertible /// The async result must be a `Result<(), E>` type, where `E` is convertible
/// to a `Box<dyn Error + Send + Sync + 'static>`, which is the case for most /// to a `Box<dyn Error + Send + Sync + 'static>`, which is the case for most
/// error types. /// error types.
/// ///
/// Several options can be provided to the `#[task]` attribute: /// Several options can be provided to the `#[job]` attribute:
/// ///
/// # Name /// # Name
/// ///
/// ``` /// ```
/// #[task("example")] /// #[job("example")]
/// #[task(name="example")] /// #[job(name="example")]
/// ``` /// ```
/// ///
/// This overrides the name for this task. If unspecified, the fully-qualified /// This overrides the name for this job. If unspecified, the fully-qualified
/// name of the function is used. If you move a task to a new module or rename /// name of the function is used. If you move a job to a new module or rename
/// the function, you may which to override the task name to prevent it from /// the function, you may which to override the job name to prevent it from
/// changing. /// changing.
/// ///
/// # Channel name /// # Channel name
/// ///
/// ``` /// ```
/// #[task(channel_name="foo")] /// #[job(channel_name="foo")]
/// ``` /// ```
/// ///
/// This sets the default channel name on which the task will be spawned. /// This sets the default channel name on which the job will be spawned.
/// ///
/// # Retries /// # Retries
/// ///
/// ``` /// ```
/// #[task(retries = 3)] /// #[job(retries = 3)]
/// ``` /// ```
/// ///
/// This sets the default number of retries for the task. /// This sets the default number of retries for the job.
/// ///
/// # Retry backoff /// # Retry backoff
/// ///
/// ``` /// ```
/// #[task(backoff_secs=1.5)] /// #[job(backoff_secs=1.5)]
/// #[task(backoff_secs=2)] /// #[job(backoff_secs=2)]
/// ``` /// ```
/// ///
/// This sets the default initial retry backoff for the task in seconds. /// This sets the default initial retry backoff for the job in seconds.
/// ///
/// # Ordered /// # Ordered
/// ///
/// ``` /// ```
/// #[task(ordered)] /// #[job(ordered)]
/// #[task(ordered=true)] /// #[job(ordered=true)]
/// #[task(ordered=false)] /// #[job(ordered=false)]
/// ``` /// ```
/// ///
/// This sets whether the task will be strictly ordered by default. /// This sets whether the job will be strictly ordered by default.
/// ///
/// # Prototype /// # Prototype
/// ///
/// ``` /// ```
/// fn my_proto<'a, 'b>( /// fn my_proto<'a, 'b>(
/// builder: &'a mut TaskBuilder<'b> /// builder: &'a mut JobBuilder<'b>
/// ) -> &'a mut TaskBuilder<'b> { /// ) -> &'a mut JobBuilder<'b> {
/// builder.set_channel_name("bar") /// builder.set_channel_name("bar")
/// } /// }
/// ///
/// #[task(proto(my_proto))] /// #[job(proto(my_proto))]
/// ``` /// ```
/// ///
/// This allows setting several task options at once using the specified function, /// This allows setting several job options at once using the specified function,
/// and can be convient if you have several tasks which should have similar /// and can be convient if you have several jobs which should have similar
/// defaults. /// defaults.
/// ///
/// # Combinations /// # Combinations
/// ///
/// Multiple task options can be combined. The order is not important, but the /// Multiple job options can be combined. The order is not important, but the
/// prototype will always be applied first so that explicit options can override it. /// prototype will always be applied first so that explicit options can override it.
/// Each option can only be provided once in the attribute. /// Each option can only be provided once in the attribute.
/// ///
/// ``` /// ```
/// #[task("my_task", proto(my_proto), retries=0, ordered)] /// #[job("my_job", proto(my_proto), retries=0, ordered)]
/// ``` /// ```
/// ///
#[proc_macro_attribute] #[proc_macro_attribute]
pub fn task(attr: TokenStream, item: TokenStream) -> TokenStream { pub fn job(attr: TokenStream, item: TokenStream) -> TokenStream {
let args = parse_macro_input!(attr as AttributeArgs); let args = parse_macro_input!(attr as AttributeArgs);
let mut inner_fn = parse_macro_input!(item as ItemFn); let mut inner_fn = parse_macro_input!(item as ItemFn);
let mut options = TaskOptions::default(); let mut options = JobOptions::default();
let mut errors = Vec::new(); let mut errors = Vec::new();
for arg in args { for arg in args {
if let Err(e) = interpret_task_arg(&mut options, arg) { if let Err(e) = interpret_job_arg(&mut options, arg) {
errors.push(e.into_compile_error()); errors.push(e.into_compile_error());
} }
} }
@ -226,15 +226,15 @@ pub fn task(attr: TokenStream, item: TokenStream) -> TokenStream {
let expanded = quote! { let expanded = quote! {
#(#errors)* #(#errors)*
#[allow(non_upper_case_globals)] #[allow(non_upper_case_globals)]
#vis static #name: &'static sqlxmq::NamedTask = &{ #vis static #name: &'static sqlxmq::NamedJob = &{
#inner_fn #inner_fn
sqlxmq::NamedTask::new_internal( sqlxmq::NamedJob::new_internal(
#fq_name, #fq_name,
sqlxmq::hidden::BuildFn(|builder| { sqlxmq::hidden::BuildFn(|builder| {
builder #(#chain)* builder #(#chain)*
}), }),
sqlxmq::hidden::RunFn(|registry, current_task| { sqlxmq::hidden::RunFn(|registry, current_job| {
registry.spawn_internal(#fq_name, inner(current_task)); registry.spawn_internal(#fq_name, inner(current_job));
}), }),
) )
}; };

View file

@ -1,6 +1,6 @@
use crate::{CurrentTask, TaskBuilder, TaskRegistry}; use crate::{CurrentJob, JobBuilder, JobRegistry};
#[doc(hidden)] #[doc(hidden)]
pub struct BuildFn(pub for<'a> fn(&'a mut TaskBuilder<'static>) -> &'a mut TaskBuilder<'static>); pub struct BuildFn(pub for<'a> fn(&'a mut JobBuilder<'static>) -> &'a mut JobBuilder<'static>);
#[doc(hidden)] #[doc(hidden)]
pub struct RunFn(pub fn(&TaskRegistry, CurrentTask)); pub struct RunFn(pub fn(&JobRegistry, CurrentJob));

View file

@ -1,97 +1,97 @@
#![deny(missing_docs, unsafe_code)] #![deny(missing_docs, unsafe_code)]
//! # sqlxmq //! # sqlxmq
//! //!
//! A task queue built on `sqlx` and `PostgreSQL`. //! A job queue built on `sqlx` and `PostgreSQL`.
//! //!
//! This library allows a CRUD application to run background tasks without complicating its //! This library allows a CRUD application to run background jobs without complicating its
//! deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications //! deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications
//! already using a `PostgreSQL` database. //! already using a `PostgreSQL` database.
//! //!
//! Although using a SQL database as a task queue means compromising on latency of //! Although using a SQL database as a job queue means compromising on latency of
//! delivered tasks, there are several show-stopping issues present in ordinary task //! delivered jobs, there are several show-stopping issues present in ordinary job
//! queues which are avoided altogether. //! queues which are avoided altogether.
//! //!
//! With any other task queue, in-flight tasks are state that is not covered by normal //! With most other job queues, in-flight jobs are state that is not covered by normal
//! database backups. Even if tasks _are_ backed up, there is no way to restore both //! database backups. Even if jobs _are_ backed up, there is no way to restore both
//! a database and a task queue to a consistent point-in-time without manually //! a database and a job queue to a consistent point-in-time without manually
//! resolving conflicts. //! resolving conflicts.
//! //!
//! By storing tasks in the database, existing backup procedures will store a perfectly //! By storing jobs in the database, existing backup procedures will store a perfectly
//! consistent state of both in-flight tasks and persistent data. Additionally, tasks can //! consistent state of both in-flight jobs and persistent data. Additionally, jobs can
//! be spawned and completed as part of other transactions, making it easy to write correct //! be spawned and completed as part of other transactions, making it easy to write correct
//! application code. //! application code.
//! //!
//! Leveraging the power of `PostgreSQL`, this task queue offers several features not //! Leveraging the power of `PostgreSQL`, this job queue offers several features not
//! present in other task queues. //! present in other job queues.
//! //!
//! # Features //! # Features
//! //!
//! - **Send/receive multiple tasks at once.** //! - **Send/receive multiple jobs at once.**
//! //!
//! This reduces the number of queries to the database. //! This reduces the number of queries to the database.
//! //!
//! - **Send tasks to be executed at a future date and time.** //! - **Send jobs to be executed at a future date and time.**
//! //!
//! Avoids the need for a separate scheduling system. //! Avoids the need for a separate scheduling system.
//! //!
//! - **Reliable delivery of tasks.** //! - **Reliable delivery of jobs.**
//! //!
//! - **Automatic retries with exponential backoff.** //! - **Automatic retries with exponential backoff.**
//! //!
//! Number of retries and initial backoff parameters are configurable. //! Number of retries and initial backoff parameters are configurable.
//! //!
//! - **Transactional sending of tasks.** //! - **Transactional sending of jobs.**
//! //!
//! Avoids sending spurious tasks if a transaction is rolled back. //! Avoids sending spurious jobs if a transaction is rolled back.
//! //!
//! - **Transactional completion of tasks.** //! - **Transactional completion of jobs.**
//! //!
//! If all side-effects of a task are updates to the database, this provides //! If all side-effects of a job are updates to the database, this provides
//! true exactly-once execution of tasks. //! true exactly-once execution of jobs.
//! //!
//! - **Transactional check-pointing of tasks.** //! - **Transactional check-pointing of jobs.**
//! //!
//! Long-running tasks can check-point their state to avoid having to restart //! Long-running jobs can check-point their state to avoid having to restart
//! from the beginning if there is a failure: the next retry can continue //! from the beginning if there is a failure: the next retry can continue
//! from the last check-point. //! from the last check-point.
//! //!
//! - **Opt-in strictly ordered task delivery.** //! - **Opt-in strictly ordered job delivery.**
//! //!
//! Tasks within the same channel will be processed strictly in-order //! Jobs within the same channel will be processed strictly in-order
//! if this option is enabled for the task. //! if this option is enabled for the job.
//! //!
//! - **Fair task delivery.** //! - **Fair job delivery.**
//! //!
//! A channel with a lot of tasks ready to run will not starve a channel with fewer //! A channel with a lot of jobs ready to run will not starve a channel with fewer
//! tasks. //! jobs.
//! //!
//! - **Opt-in two-phase commit.** //! - **Opt-in two-phase commit.**
//! //!
//! This is particularly useful on an ordered channel where a position can be "reserved" //! This is particularly useful on an ordered channel where a position can be "reserved"
//! in the task order, but not committed until later. //! in the job order, but not committed until later.
//! //!
//! - **JSON and/or binary payloads.** //! - **JSON and/or binary payloads.**
//! //!
//! Tasks can use whichever is most convenient. //! Jobs can use whichever is most convenient.
//! //!
//! - **Automatic keep-alive of tasks.** //! - **Automatic keep-alive of jobs.**
//! //!
//! Long-running tasks will automatically be "kept alive" to prevent them being //! Long-running jobs will automatically be "kept alive" to prevent them being
//! retried whilst they're still ongoing. //! retried whilst they're still ongoing.
//! //!
//! - **Concurrency limits.** //! - **Concurrency limits.**
//! //!
//! Specify the minimum and maximum number of concurrent tasks each runner should //! Specify the minimum and maximum number of concurrent jobs each runner should
//! handle. //! handle.
//! //!
//! - **Built-in task registry via an attribute macro.** //! - **Built-in job registry via an attribute macro.**
//! //!
//! Tasks can be easily registered with a runner, and default configuration specified //! Jobs can be easily registered with a runner, and default configuration specified
//! on a per-task basis. //! on a per-job basis.
//! //!
//! - **Implicit channels.** //! - **Implicit channels.**
//! //!
//! Channels are implicitly created and destroyed when tasks are sent and processed, //! Channels are implicitly created and destroyed when jobs are sent and processed,
//! so no setup is required. //! so no setup is required.
//! //!
//! - **Channel groups.** //! - **Channel groups.**
@ -101,75 +101,75 @@
//! //!
//! - **NOTIFY-based polling.** //! - **NOTIFY-based polling.**
//! //!
//! This saves resources when few tasks are being processed. //! This saves resources when few jobs are being processed.
//! //!
//! # Getting started //! # Getting started
//! //!
//! ## Defining tasks //! ## Defining jobs
//! //!
//! The first step is to define a function to be run on the task queue. //! The first step is to define a function to be run on the job queue.
//! //!
//! ```rust //! ```rust
//! use sqlxmq::{task, CurrentTask}; //! use sqlxmq::{job, CurrentJob};
//! //!
//! // Arguments to the `#[task]` attribute allow setting default task options. //! // Arguments to the `#[job]` attribute allow setting default job options.
//! #[task(channel_name = "foo")] //! #[job(channel_name = "foo")]
//! async fn example_task( //! async fn example_job(
//! mut current_task: CurrentTask, //! mut current_job: CurrentJob,
//! ) -> sqlx::Result<()> { //! ) -> sqlx::Result<()> {
//! // Decode a JSON payload //! // Decode a JSON payload
//! let who: Option<String> = current_task.json()?; //! let who: Option<String> = current_job.json()?;
//! //!
//! // Do some work //! // Do some work
//! println!("Hello, {}!", who.as_deref().unwrap_or("world")); //! println!("Hello, {}!", who.as_deref().unwrap_or("world"));
//! //!
//! // Mark the task as complete //! // Mark the job as complete
//! current_task.complete().await?; //! current_job.complete().await?;
//! //!
//! Ok(()) //! Ok(())
//! } //! }
//! ``` //! ```
//! //!
//! ## Listening for tasks //! ## Listening for jobs
//! //!
//! Next we need to create a task runner: this is what listens for new tasks //! Next we need to create a job runner: this is what listens for new jobs
//! and executes them. //! and executes them.
//! //!
//! ```rust //! ```rust
//! use sqlxmq::TaskRegistry; //! use sqlxmq::JobRegistry;
//! //!
//! #[tokio::main] //! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn Error>> { //! async fn main() -> Result<(), Box<dyn Error>> {
//! // You'll need to provide a Postgres connection pool. //! // You'll need to provide a Postgres connection pool.
//! let pool = connect_to_db().await?; //! let pool = connect_to_db().await?;
//! //!
//! // Construct a task registry from our single task. //! // Construct a job registry from our single job.
//! let mut registry = TaskRegistry::new(&[example_task]); //! let mut registry = JobRegistry::new(&[example_job]);
//! // Here is where you can configure the registry //! // Here is where you can configure the registry
//! // registry.set_error_handler(...) //! // registry.set_error_handler(...)
//! //!
//! let runner = registry //! let runner = registry
//! // Create a task runner using the connection pool. //! // Create a job runner using the connection pool.
//! .runner(&pool) //! .runner(&pool)
//! // Here is where you can configure the task runner //! // Here is where you can configure the job runner
//! // Aim to keep 10-20 tasks running at a time. //! // Aim to keep 10-20 jobs running at a time.
//! .set_concurrency(10, 20) //! .set_concurrency(10, 20)
//! // Start the task runner in the background. //! // Start the job runner in the background.
//! .run() //! .run()
//! .await?; //! .await?;
//! //!
//! // The task runner will continue listening and running //! // The job runner will continue listening and running
//! // tasks until `runner` is dropped. //! // jobs until `runner` is dropped.
//! } //! }
//! ``` //! ```
//! //!
//! ## Spawning a task //! ## Spawning a job
//! //!
//! The final step is to actually run a task. //! The final step is to actually run a job.
//! //!
//! ```rust //! ```rust
//! example_task.new() //! example_job.new()
//! // This is where we override task configuration //! // This is where we override job configuration
//! .set_channel_name("bar") //! .set_channel_name("bar")
//! .set_json("John") //! .set_json("John")
//! .spawn(&pool) //! .spawn(&pool)
@ -186,8 +186,8 @@ mod utils;
pub use registry::*; pub use registry::*;
pub use runner::*; pub use runner::*;
pub use spawn::*; pub use spawn::*;
pub use sqlxmq_macros::task; pub use sqlxmq_macros::job;
pub use utils::OwnedTask; pub use utils::OwnedHandle;
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@ -244,18 +244,18 @@ mod tests {
TestGuard(guard, pool) TestGuard(guard, pool)
} }
async fn test_task_runner<F: Future + Send + 'static>( async fn test_job_runner<F: Future + Send + 'static>(
pool: &Pool<Postgres>, pool: &Pool<Postgres>,
f: impl (Fn(CurrentTask) -> F) + Send + Sync + 'static, f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static,
) -> (OwnedTask, Arc<AtomicUsize>) ) -> (OwnedHandle, Arc<AtomicUsize>)
where where
F::Output: Send + 'static, F::Output: Send + 'static,
{ {
let counter = Arc::new(AtomicUsize::new(0)); let counter = Arc::new(AtomicUsize::new(0));
let counter2 = counter.clone(); let counter2 = counter.clone();
let runner = TaskRunnerOptions::new(pool, move |task| { let runner = JobRunnerOptions::new(pool, move |job| {
counter2.fetch_add(1, Ordering::SeqCst); counter2.fetch_add(1, Ordering::SeqCst);
task::spawn(f(task)); task::spawn(f(job));
}) })
.run() .run()
.await .await
@ -263,28 +263,28 @@ mod tests {
(runner, counter) (runner, counter)
} }
fn task_proto<'a, 'b>(builder: &'a mut TaskBuilder<'b>) -> &'a mut TaskBuilder<'b> { fn job_proto<'a, 'b>(builder: &'a mut JobBuilder<'b>) -> &'a mut JobBuilder<'b> {
builder.set_channel_name("bar") builder.set_channel_name("bar")
} }
#[task(channel_name = "foo", ordered, retries = 3, backoff_secs = 2.0)] #[job(channel_name = "foo", ordered, retries = 3, backoff_secs = 2.0)]
async fn example_task1( async fn example_job1(
mut current_task: CurrentTask, mut current_job: CurrentJob,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
current_task.complete().await?; current_job.complete().await?;
Ok(()) Ok(())
} }
#[task(proto(task_proto))] #[job(proto(job_proto))]
async fn example_task2( async fn example_job2(
mut current_task: CurrentTask, mut current_job: CurrentJob,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
current_task.complete().await?; current_job.complete().await?;
Ok(()) Ok(())
} }
async fn named_task_runner(pool: &Pool<Postgres>) -> OwnedTask { async fn named_job_runner(pool: &Pool<Postgres>) -> OwnedHandle {
TaskRegistry::new(&[example_task1, example_task2]) JobRegistry::new(&[example_job1, example_job2])
.runner(pool) .runner(pool)
.run() .run()
.await .await
@ -300,37 +300,37 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn it_can_spawn_task() { async fn it_can_spawn_job() {
let pool = &*test_pool().await; let pool = &*test_pool().await;
let (_runner, counter) = let (_runner, counter) =
test_task_runner(&pool, |mut task| async move { task.complete().await }).await; test_job_runner(&pool, |mut job| async move { job.complete().await }).await;
assert_eq!(counter.load(Ordering::SeqCst), 0); assert_eq!(counter.load(Ordering::SeqCst), 0);
TaskBuilder::new("foo").spawn(pool).await.unwrap(); JobBuilder::new("foo").spawn(pool).await.unwrap();
pause().await; pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 1); assert_eq!(counter.load(Ordering::SeqCst), 1);
} }
#[tokio::test] #[tokio::test]
async fn it_runs_tasks_in_order() { async fn it_runs_jobs_in_order() {
let pool = &*test_pool().await; let pool = &*test_pool().await;
let (tx, mut rx) = mpsc::unbounded(); let (tx, mut rx) = mpsc::unbounded();
let (_runner, counter) = test_task_runner(&pool, move |task| { let (_runner, counter) = test_job_runner(&pool, move |job| {
let tx = tx.clone(); let tx = tx.clone();
async move { async move {
tx.unbounded_send(task).unwrap(); tx.unbounded_send(job).unwrap();
} }
}) })
.await; .await;
assert_eq!(counter.load(Ordering::SeqCst), 0); assert_eq!(counter.load(Ordering::SeqCst), 0);
TaskBuilder::new("foo") JobBuilder::new("foo")
.set_ordered(true) .set_ordered(true)
.spawn(pool) .spawn(pool)
.await .await
.unwrap(); .unwrap();
TaskBuilder::new("bar") JobBuilder::new("bar")
.set_ordered(true) .set_ordered(true)
.spawn(pool) .spawn(pool)
.await .await
@ -339,48 +339,48 @@ mod tests {
pause().await; pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 1); assert_eq!(counter.load(Ordering::SeqCst), 1);
let mut task = rx.next().await.unwrap(); let mut job = rx.next().await.unwrap();
task.complete().await.unwrap(); job.complete().await.unwrap();
pause().await; pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 2); assert_eq!(counter.load(Ordering::SeqCst), 2);
} }
#[tokio::test] #[tokio::test]
async fn it_runs_tasks_in_parallel() { async fn it_runs_jobs_in_parallel() {
let pool = &*test_pool().await; let pool = &*test_pool().await;
let (tx, mut rx) = mpsc::unbounded(); let (tx, mut rx) = mpsc::unbounded();
let (_runner, counter) = test_task_runner(&pool, move |task| { let (_runner, counter) = test_job_runner(&pool, move |job| {
let tx = tx.clone(); let tx = tx.clone();
async move { async move {
tx.unbounded_send(task).unwrap(); tx.unbounded_send(job).unwrap();
} }
}) })
.await; .await;
assert_eq!(counter.load(Ordering::SeqCst), 0); assert_eq!(counter.load(Ordering::SeqCst), 0);
TaskBuilder::new("foo").spawn(pool).await.unwrap(); JobBuilder::new("foo").spawn(pool).await.unwrap();
TaskBuilder::new("bar").spawn(pool).await.unwrap(); JobBuilder::new("bar").spawn(pool).await.unwrap();
pause().await; pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 2); assert_eq!(counter.load(Ordering::SeqCst), 2);
for _ in 0..2 { for _ in 0..2 {
let mut task = rx.next().await.unwrap(); let mut job = rx.next().await.unwrap();
task.complete().await.unwrap(); job.complete().await.unwrap();
} }
} }
#[tokio::test] #[tokio::test]
async fn it_retries_failed_tasks() { async fn it_retries_failed_jobs() {
let pool = &*test_pool().await; let pool = &*test_pool().await;
let (_runner, counter) = test_task_runner(&pool, move |_| async {}).await; let (_runner, counter) = test_job_runner(&pool, move |_| async {}).await;
let backoff = 200; let backoff = 200;
assert_eq!(counter.load(Ordering::SeqCst), 0); assert_eq!(counter.load(Ordering::SeqCst), 0);
TaskBuilder::new("foo") JobBuilder::new("foo")
.set_retry_backoff(Duration::from_millis(backoff)) .set_retry_backoff(Duration::from_millis(backoff))
.set_retries(2) .set_retries(2)
.spawn(pool) .spawn(pool)
@ -407,14 +407,14 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn it_can_checkpoint_tasks() { async fn it_can_checkpoint_jobs() {
let pool = &*test_pool().await; let pool = &*test_pool().await;
let (_runner, counter) = test_task_runner(&pool, move |mut current_task| async move { let (_runner, counter) = test_job_runner(&pool, move |mut current_job| async move {
let state: bool = current_task.json().unwrap().unwrap(); let state: bool = current_job.json().unwrap().unwrap();
if state { if state {
current_task.complete().await.unwrap(); current_job.complete().await.unwrap();
} else { } else {
current_task current_job
.checkpoint(Checkpoint::new().set_json(&true).unwrap()) .checkpoint(Checkpoint::new().set_json(&true).unwrap())
.await .await
.unwrap(); .unwrap();
@ -425,7 +425,7 @@ mod tests {
let backoff = 200; let backoff = 200;
assert_eq!(counter.load(Ordering::SeqCst), 0); assert_eq!(counter.load(Ordering::SeqCst), 0);
TaskBuilder::new("foo") JobBuilder::new("foo")
.set_retry_backoff(Duration::from_millis(backoff)) .set_retry_backoff(Duration::from_millis(backoff))
.set_retries(5) .set_retries(5)
.set_json(&false) .set_json(&false)
@ -451,10 +451,10 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn it_can_use_registry() { async fn it_can_use_registry() {
let pool = &*test_pool().await; let pool = &*test_pool().await;
let _runner = named_task_runner(pool).await; let _runner = named_job_runner(pool).await;
example_task1.new().spawn(pool).await.unwrap(); example_job1.new().spawn(pool).await.unwrap();
example_task2.new().spawn(pool).await.unwrap(); example_job2.new().spawn(pool).await.unwrap();
pause().await; pause().await;
} }
} }

View file

@ -9,42 +9,42 @@ use uuid::Uuid;
use crate::hidden::{BuildFn, RunFn}; use crate::hidden::{BuildFn, RunFn};
use crate::utils::Opaque; use crate::utils::Opaque;
use crate::{TaskBuilder, TaskRunnerOptions}; use crate::{JobBuilder, JobRunnerOptions};
/// Stores a mapping from task name to task. Can be used to construct /// Stores a mapping from job name to job. Can be used to construct
/// a task runner. /// a job runner.
pub struct TaskRegistry { pub struct JobRegistry {
error_handler: Arc<dyn Fn(&str, Box<dyn Error + Send + 'static>) + Send + Sync>, error_handler: Arc<dyn Fn(&str, Box<dyn Error + Send + 'static>) + Send + Sync>,
task_map: HashMap<&'static str, &'static NamedTask>, job_map: HashMap<&'static str, &'static NamedJob>,
} }
/// Error returned when a task is received whose name is not in the registry. /// Error returned when a job is received whose name is not in the registry.
#[derive(Debug)] #[derive(Debug)]
pub struct UnknownTaskError; pub struct UnknownJobError;
impl Error for UnknownTaskError {} impl Error for UnknownJobError {}
impl Display for UnknownTaskError { impl Display for UnknownJobError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("Unknown task") f.write_str("Unknown job")
} }
} }
impl TaskRegistry { impl JobRegistry {
/// Construct a new task registry from the provided task list. /// Construct a new job registry from the provided job list.
pub fn new(tasks: &[&'static NamedTask]) -> Self { pub fn new(jobs: &[&'static NamedJob]) -> Self {
let mut task_map = HashMap::new(); let mut job_map = HashMap::new();
for &task in tasks { for &job in jobs {
if task_map.insert(task.name(), task).is_some() { if job_map.insert(job.name(), job).is_some() {
panic!("Duplicate task registered: {}", task.name()); panic!("Duplicate job registered: {}", job.name());
} }
} }
Self { Self {
error_handler: Arc::new(Self::default_error_handler), error_handler: Arc::new(Self::default_error_handler),
task_map, job_map,
} }
} }
/// Set a function to be called whenever a task returns an error. /// Set a function to be called whenever a job returns an error.
pub fn set_error_handler( pub fn set_error_handler(
&mut self, &mut self,
error_handler: impl Fn(&str, Box<dyn Error + Send + 'static>) + Send + Sync + 'static, error_handler: impl Fn(&str, Box<dyn Error + Send + 'static>) + Send + Sync + 'static,
@ -53,14 +53,14 @@ impl TaskRegistry {
self self
} }
/// Look-up a task by name. /// Look-up a job by name.
pub fn resolve_task(&self, name: &str) -> Option<&'static NamedTask> { pub fn resolve_job(&self, name: &str) -> Option<&'static NamedJob> {
self.task_map.get(name).copied() self.job_map.get(name).copied()
} }
/// The default error handler implementation, which simply logs the error. /// The default error handler implementation, which simply logs the error.
pub fn default_error_handler(name: &str, error: Box<dyn Error + Send + 'static>) { pub fn default_error_handler(name: &str, error: Box<dyn Error + Send + 'static>) {
log::error!("Task {} failed: {}", name, error); log::error!("Job {} failed: {}", name, error);
} }
#[doc(hidden)] #[doc(hidden)]
@ -77,29 +77,29 @@ impl TaskRegistry {
}); });
} }
/// Construct a task runner from this registry and the provided connection /// Construct a job runner from this registry and the provided connection
/// pool. /// pool.
pub fn runner(self, pool: &Pool<Postgres>) -> TaskRunnerOptions { pub fn runner(self, pool: &Pool<Postgres>) -> JobRunnerOptions {
TaskRunnerOptions::new(pool, move |current_task| { JobRunnerOptions::new(pool, move |current_job| {
if let Some(task) = self.resolve_task(current_task.name()) { if let Some(job) = self.resolve_job(current_job.name()) {
(task.run_fn.0 .0)(&self, current_task); (job.run_fn.0 .0)(&self, current_job);
} else { } else {
(self.error_handler)(current_task.name(), Box::new(UnknownTaskError)) (self.error_handler)(current_job.name(), Box::new(UnknownJobError))
} }
}) })
} }
} }
/// Type for a named task. Functions annotated with `#[task]` are /// Type for a named job. Functions annotated with `#[job]` are
/// transformed into static variables whose type is `&'static NamedTask`. /// transformed into static variables whose type is `&'static NamedJob`.
#[derive(Debug)] #[derive(Debug)]
pub struct NamedTask { pub struct NamedJob {
name: &'static str, name: &'static str,
build_fn: Opaque<BuildFn>, build_fn: Opaque<BuildFn>,
run_fn: Opaque<RunFn>, run_fn: Opaque<RunFn>,
} }
impl NamedTask { impl NamedJob {
#[doc(hidden)] #[doc(hidden)]
pub const fn new_internal(name: &'static str, build_fn: BuildFn, run_fn: RunFn) -> Self { pub const fn new_internal(name: &'static str, build_fn: BuildFn, run_fn: RunFn) -> Self {
Self { Self {
@ -108,21 +108,21 @@ impl NamedTask {
run_fn: Opaque(run_fn), run_fn: Opaque(run_fn),
} }
} }
/// Initialize a task builder with the name and defaults of this task. /// Initialize a job builder with the name and defaults of this job.
pub fn new(&self) -> TaskBuilder<'static> { pub fn new(&self) -> JobBuilder<'static> {
let mut builder = TaskBuilder::new(self.name); let mut builder = JobBuilder::new(self.name);
(self.build_fn.0 .0)(&mut builder); (self.build_fn.0 .0)(&mut builder);
builder builder
} }
/// Initialize a task builder with the name and defaults of this task, /// Initialize a job builder with the name and defaults of this job,
/// using the provided task ID. /// using the provided job ID.
pub fn new_with_id(&self, id: Uuid) -> TaskBuilder<'static> { pub fn new_with_id(&self, id: Uuid) -> JobBuilder<'static> {
let mut builder = TaskBuilder::new_with_id(id, self.name); let mut builder = JobBuilder::new_with_id(id, self.name);
(self.build_fn.0 .0)(&mut builder); (self.build_fn.0 .0)(&mut builder);
builder builder
} }
/// Returns the name of this task. /// Returns the name of this job.
pub const fn name(&self) -> &'static str { pub const fn name(&self) -> &'static str {
self.name self.name
} }

View file

@ -12,27 +12,27 @@ use tokio::sync::Notify;
use tokio::task; use tokio::task;
use uuid::Uuid; use uuid::Uuid;
use crate::utils::{Opaque, OwnedTask}; use crate::utils::{Opaque, OwnedHandle};
/// Type used to build a task runner. /// Type used to build a job runner.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct TaskRunnerOptions { pub struct JobRunnerOptions {
min_concurrency: usize, min_concurrency: usize,
max_concurrency: usize, max_concurrency: usize,
channel_names: Option<Vec<String>>, channel_names: Option<Vec<String>>,
dispatch: Opaque<Arc<dyn Fn(CurrentTask) + Send + Sync + 'static>>, dispatch: Opaque<Arc<dyn Fn(CurrentJob) + Send + Sync + 'static>>,
pool: Pool<Postgres>, pool: Pool<Postgres>,
keep_alive: bool, keep_alive: bool,
} }
#[derive(Debug)] #[derive(Debug)]
struct TaskRunner { struct JobRunner {
options: TaskRunnerOptions, options: JobRunnerOptions,
running_tasks: AtomicUsize, running_jobs: AtomicUsize,
notify: Notify, notify: Notify,
} }
/// Type used to checkpoint a running task. /// Type used to checkpoint a running job.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Checkpoint<'a> { pub struct Checkpoint<'a> {
duration: Duration, duration: Duration,
@ -42,7 +42,7 @@ pub struct Checkpoint<'a> {
} }
impl<'a> Checkpoint<'a> { impl<'a> Checkpoint<'a> {
/// Construct a new checkpoint which also keeps the task alive /// Construct a new checkpoint which also keeps the job alive
/// for the specified interval. /// for the specified interval.
pub fn new_keep_alive(duration: Duration) -> Self { pub fn new_keep_alive(duration: Duration) -> Self {
Self { Self {
@ -56,7 +56,7 @@ impl<'a> Checkpoint<'a> {
pub fn new() -> Self { pub fn new() -> Self {
Self::new_keep_alive(Duration::from_secs(0)) Self::new_keep_alive(Duration::from_secs(0))
} }
/// Add extra retries to the current task. /// Add extra retries to the current job.
pub fn set_extra_retries(&mut self, extra_retries: usize) -> &mut Self { pub fn set_extra_retries(&mut self, extra_retries: usize) -> &mut Self {
self.extra_retries = extra_retries; self.extra_retries = extra_retries;
self self
@ -79,11 +79,11 @@ impl<'a> Checkpoint<'a> {
} }
async fn execute<'b, E: sqlx::Executor<'b, Database = Postgres>>( async fn execute<'b, E: sqlx::Executor<'b, Database = Postgres>>(
&self, &self,
task_id: Uuid, job_id: Uuid,
executor: E, executor: E,
) -> Result<(), sqlx::Error> { ) -> Result<(), sqlx::Error> {
sqlx::query("SELECT mq_checkpoint($1, $2, $3, $4, $5)") sqlx::query("SELECT mq_checkpoint($1, $2, $3, $4, $5)")
.bind(task_id) .bind(job_id)
.bind(self.duration) .bind(self.duration)
.bind(self.payload_json.as_deref()) .bind(self.payload_json.as_deref())
.bind(self.payload_bytes) .bind(self.payload_bytes)
@ -94,24 +94,24 @@ impl<'a> Checkpoint<'a> {
} }
} }
/// Handle to the currently executing task. /// Handle to the currently executing job.
/// When dropped, the task is assumed to no longer be running. /// When dropped, the job is assumed to no longer be running.
/// To prevent the task being retried, it must be explicitly completed using /// To prevent the job being retried, it must be explicitly completed using
/// one of the `.complete_` methods. /// one of the `.complete_` methods.
#[derive(Debug)] #[derive(Debug)]
pub struct CurrentTask { pub struct CurrentJob {
id: Uuid, id: Uuid,
name: String, name: String,
payload_json: Option<String>, payload_json: Option<String>,
payload_bytes: Option<Vec<u8>>, payload_bytes: Option<Vec<u8>>,
task_runner: Arc<TaskRunner>, job_runner: Arc<JobRunner>,
keep_alive: Option<OwnedTask>, keep_alive: Option<OwnedHandle>,
} }
impl CurrentTask { impl CurrentJob {
/// Returns the database pool used to receive this task. /// Returns the database pool used to receive this job.
pub fn pool(&self) -> &Pool<Postgres> { pub fn pool(&self) -> &Pool<Postgres> {
&self.task_runner.options.pool &self.job_runner.options.pool
} }
async fn delete( async fn delete(
&self, &self,
@ -123,8 +123,8 @@ impl CurrentTask {
.await?; .await?;
Ok(()) Ok(())
} }
/// Complete this task and commit the provided transaction at the same time. /// Complete this job and commit the provided transaction at the same time.
/// If the transaction cannot be committed, the task will not be completed. /// If the transaction cannot be committed, the job will not be completed.
pub async fn complete_with_transaction( pub async fn complete_with_transaction(
&mut self, &mut self,
mut tx: sqlx::Transaction<'_, Postgres>, mut tx: sqlx::Transaction<'_, Postgres>,
@ -134,15 +134,15 @@ impl CurrentTask {
self.keep_alive = None; self.keep_alive = None;
Ok(()) Ok(())
} }
/// Complete this task. /// Complete this job.
pub async fn complete(&mut self) -> Result<(), sqlx::Error> { pub async fn complete(&mut self) -> Result<(), sqlx::Error> {
self.delete(self.pool()).await?; self.delete(self.pool()).await?;
self.keep_alive = None; self.keep_alive = None;
Ok(()) Ok(())
} }
/// Checkpoint this task and commit the provided transaction at the same time. /// Checkpoint this job and commit the provided transaction at the same time.
/// If the transaction cannot be committed, the task will not be checkpointed. /// If the transaction cannot be committed, the job will not be checkpointed.
/// Checkpointing allows the task payload to be replaced for the next retry. /// Checkpointing allows the job payload to be replaced for the next retry.
pub async fn checkpoint_with_transaction( pub async fn checkpoint_with_transaction(
&mut self, &mut self,
mut tx: sqlx::Transaction<'_, Postgres>, mut tx: sqlx::Transaction<'_, Postgres>,
@ -152,12 +152,12 @@ impl CurrentTask {
tx.commit().await?; tx.commit().await?;
Ok(()) Ok(())
} }
/// Checkpointing allows the task payload to be replaced for the next retry. /// Checkpointing allows the job payload to be replaced for the next retry.
pub async fn checkpoint(&mut self, checkpoint: &Checkpoint<'_>) -> Result<(), sqlx::Error> { pub async fn checkpoint(&mut self, checkpoint: &Checkpoint<'_>) -> Result<(), sqlx::Error> {
checkpoint.execute(self.id, self.pool()).await?; checkpoint.execute(self.id, self.pool()).await?;
Ok(()) Ok(())
} }
/// Prevent this task from being retried for the specified interval. /// Prevent this job from being retried for the specified interval.
pub async fn keep_alive(&mut self, duration: Duration) -> Result<(), sqlx::Error> { pub async fn keep_alive(&mut self, duration: Duration) -> Result<(), sqlx::Error> {
sqlx::query("SELECT mq_keep_alive(ARRAY[$1], $2)") sqlx::query("SELECT mq_keep_alive(ARRAY[$1], $2)")
.bind(self.id) .bind(self.id)
@ -166,15 +166,15 @@ impl CurrentTask {
.await?; .await?;
Ok(()) Ok(())
} }
/// Returns the ID of this task. /// Returns the ID of this job.
pub fn id(&self) -> Uuid { pub fn id(&self) -> Uuid {
self.id self.id
} }
/// Returns the name of this task. /// Returns the name of this job.
pub fn name(&self) -> &str { pub fn name(&self) -> &str {
&self.name &self.name
} }
/// Extracts the JSON payload belonging to this task (if present). /// Extracts the JSON payload belonging to this job (if present).
pub fn json<'a, T: Deserialize<'a>>(&'a self) -> Result<Option<T>, serde_json::Error> { pub fn json<'a, T: Deserialize<'a>>(&'a self) -> Result<Option<T>, serde_json::Error> {
if let Some(payload_json) = &self.payload_json { if let Some(payload_json) = &self.payload_json {
serde_json::from_str(payload_json).map(Some) serde_json::from_str(payload_json).map(Some)
@ -182,33 +182,30 @@ impl CurrentTask {
Ok(None) Ok(None)
} }
} }
/// Returns the raw JSON payload for this task. /// Returns the raw JSON payload for this job.
pub fn raw_json(&self) -> Option<&str> { pub fn raw_json(&self) -> Option<&str> {
self.payload_json.as_deref() self.payload_json.as_deref()
} }
/// Returns the raw binary payload for this task. /// Returns the raw binary payload for this job.
pub fn raw_bytes(&self) -> Option<&[u8]> { pub fn raw_bytes(&self) -> Option<&[u8]> {
self.payload_bytes.as_deref() self.payload_bytes.as_deref()
} }
} }
impl Drop for CurrentTask { impl Drop for CurrentJob {
fn drop(&mut self) { fn drop(&mut self) {
if self if self.job_runner.running_jobs.fetch_sub(1, Ordering::SeqCst)
.task_runner == self.job_runner.options.min_concurrency
.running_tasks
.fetch_sub(1, Ordering::SeqCst)
== self.task_runner.options.min_concurrency
{ {
self.task_runner.notify.notify_one(); self.job_runner.notify.notify_one();
} }
} }
} }
impl TaskRunnerOptions { impl JobRunnerOptions {
/// Begin constructing a new task runner using the specified connection pool, /// Begin constructing a new job runner using the specified connection pool,
/// and the provided execution function. /// and the provided execution function.
pub fn new<F: Fn(CurrentTask) + Send + Sync + 'static>(pool: &Pool<Postgres>, f: F) -> Self { pub fn new<F: Fn(CurrentJob) + Send + Sync + 'static>(pool: &Pool<Postgres>, f: F) -> Self {
Self { Self {
min_concurrency: 16, min_concurrency: 16,
max_concurrency: 32, max_concurrency: 32,
@ -218,8 +215,8 @@ impl TaskRunnerOptions {
pool: pool.clone(), pool: pool.clone(),
} }
} }
/// Set the concurrency limits for this task runner. When the number of active /// Set the concurrency limits for this job runner. When the number of active
/// tasks falls below the minimum, the runner will poll for more, up to the maximum. /// jobs falls below the minimum, the runner will poll for more, up to the maximum.
/// ///
/// The difference between the min and max will dictate the maximum batch size which /// The difference between the min and max will dictate the maximum batch size which
/// can be received: larger batch sizes are more efficient. /// can be received: larger batch sizes are more efficient.
@ -228,8 +225,8 @@ impl TaskRunnerOptions {
self.max_concurrency = max_concurrency; self.max_concurrency = max_concurrency;
self self
} }
/// Set the channel names which this task runner will subscribe to. If unspecified, /// Set the channel names which this job runner will subscribe to. If unspecified,
/// the task runner will subscribe to all channels. /// the job runner will subscribe to all channels.
pub fn set_channel_names<'a>(&'a mut self, channel_names: &[&str]) -> &'a mut Self { pub fn set_channel_names<'a>(&'a mut self, channel_names: &[&str]) -> &'a mut Self {
self.channel_names = Some( self.channel_names = Some(
channel_names channel_names
@ -240,33 +237,33 @@ impl TaskRunnerOptions {
); );
self self
} }
/// Choose whether to automatically keep tasks alive whilst they're still /// Choose whether to automatically keep jobs alive whilst they're still
/// running. Defaults to `true`. /// running. Defaults to `true`.
pub fn set_keep_alive(&mut self, keep_alive: bool) -> &mut Self { pub fn set_keep_alive(&mut self, keep_alive: bool) -> &mut Self {
self.keep_alive = keep_alive; self.keep_alive = keep_alive;
self self
} }
/// Start the task runner in the background. The task runner will stop when the /// Start the job runner in the background. The job runner will stop when the
/// returned handle is dropped. /// returned handle is dropped.
pub async fn run(&self) -> Result<OwnedTask, sqlx::Error> { pub async fn run(&self) -> Result<OwnedHandle, sqlx::Error> {
let options = self.clone(); let options = self.clone();
let task_runner = Arc::new(TaskRunner { let job_runner = Arc::new(JobRunner {
options, options,
running_tasks: AtomicUsize::new(0), running_jobs: AtomicUsize::new(0),
notify: Notify::new(), notify: Notify::new(),
}); });
let listener_task = start_listener(task_runner.clone()).await?; let listener_task = start_listener(job_runner.clone()).await?;
Ok(OwnedTask(task::spawn(main_loop( Ok(OwnedHandle(task::spawn(main_loop(
task_runner, job_runner,
listener_task, listener_task,
)))) ))))
} }
} }
async fn start_listener(task_runner: Arc<TaskRunner>) -> Result<OwnedTask, sqlx::Error> { async fn start_listener(job_runner: Arc<JobRunner>) -> Result<OwnedHandle, sqlx::Error> {
let mut listener = PgListener::connect_with(&task_runner.options.pool).await?; let mut listener = PgListener::connect_with(&job_runner.options.pool).await?;
if let Some(channels) = &task_runner.options.channel_names { if let Some(channels) = &job_runner.options.channel_names {
let names: Vec<String> = channels.iter().map(|c| format!("mq_{}", c)).collect(); let names: Vec<String> = channels.iter().map(|c| format!("mq_{}", c)).collect();
listener listener
.listen_all(names.iter().map(|s| s.as_str())) .listen_all(names.iter().map(|s| s.as_str()))
@ -274,9 +271,9 @@ async fn start_listener(task_runner: Arc<TaskRunner>) -> Result<OwnedTask, sqlx:
} else { } else {
listener.listen("mq").await?; listener.listen("mq").await?;
} }
Ok(OwnedTask(task::spawn(async move { Ok(OwnedHandle(task::spawn(async move {
while let Ok(_) = listener.recv().await { while let Ok(_) = listener.recv().await {
task_runner.notify.notify_one(); job_runner.notify.notify_one();
} }
}))) })))
} }
@ -304,12 +301,12 @@ fn to_duration(interval: PgInterval) -> Duration {
} }
async fn poll_and_dispatch( async fn poll_and_dispatch(
task_runner: &Arc<TaskRunner>, job_runner: &Arc<JobRunner>,
batch_size: i32, batch_size: i32,
) -> Result<Duration, sqlx::Error> { ) -> Result<Duration, sqlx::Error> {
log::info!("Polling for messages"); log::info!("Polling for messages");
let options = &task_runner.options; let options = &job_runner.options;
let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2)") let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2)")
.bind(&options.channel_names) .bind(&options.channel_names)
.bind(batch_size) .bind(batch_size)
@ -350,7 +347,7 @@ async fn poll_and_dispatch(
{ {
let retry_backoff = to_duration(retry_backoff); let retry_backoff = to_duration(retry_backoff);
let keep_alive = if options.keep_alive { let keep_alive = if options.keep_alive {
Some(OwnedTask(task::spawn(keep_task_alive( Some(OwnedHandle(task::spawn(keep_job_alive(
id, id,
options.pool.clone(), options.pool.clone(),
retry_backoff, retry_backoff,
@ -358,31 +355,31 @@ async fn poll_and_dispatch(
} else { } else {
None None
}; };
let current_task = CurrentTask { let current_job = CurrentJob {
id, id,
name, name,
payload_json, payload_json,
payload_bytes, payload_bytes,
task_runner: task_runner.clone(), job_runner: job_runner.clone(),
keep_alive, keep_alive,
}; };
task_runner.running_tasks.fetch_add(1, Ordering::SeqCst); job_runner.running_jobs.fetch_add(1, Ordering::SeqCst);
(options.dispatch)(current_task); (options.dispatch)(current_job);
} }
} }
Ok(wait_time) Ok(wait_time)
} }
async fn main_loop(task_runner: Arc<TaskRunner>, _listener_task: OwnedTask) { async fn main_loop(job_runner: Arc<JobRunner>, _listener_task: OwnedHandle) {
let options = &task_runner.options; let options = &job_runner.options;
let mut failures = 0; let mut failures = 0;
loop { loop {
let running_tasks = task_runner.running_tasks.load(Ordering::SeqCst); let running_jobs = job_runner.running_jobs.load(Ordering::SeqCst);
let duration = if running_tasks < options.min_concurrency { let duration = if running_jobs < options.min_concurrency {
let batch_size = (options.max_concurrency - running_tasks) as i32; let batch_size = (options.max_concurrency - running_jobs) as i32;
match poll_and_dispatch(&task_runner, batch_size).await { match poll_and_dispatch(&job_runner, batch_size).await {
Ok(duration) => { Ok(duration) => {
failures = 0; failures = 0;
duration duration
@ -398,11 +395,11 @@ async fn main_loop(task_runner: Arc<TaskRunner>, _listener_task: OwnedTask) {
}; };
// Wait for us to be notified, or for the timeout to elapse // Wait for us to be notified, or for the timeout to elapse
let _ = tokio::time::timeout(duration, task_runner.notify.notified()).await; let _ = tokio::time::timeout(duration, job_runner.notify.notified()).await;
} }
} }
async fn keep_task_alive(id: Uuid, pool: Pool<Postgres>, mut interval: Duration) { async fn keep_job_alive(id: Uuid, pool: Pool<Postgres>, mut interval: Duration) {
loop { loop {
tokio::time::sleep(interval / 2).await; tokio::time::sleep(interval / 2).await;
interval *= 2; interval *= 2;
@ -412,7 +409,7 @@ async fn keep_task_alive(id: Uuid, pool: Pool<Postgres>, mut interval: Duration)
.execute(&pool) .execute(&pool)
.await .await
{ {
log::error!("Failed to keep task {} alive: {}", id, e); log::error!("Failed to keep job {} alive: {}", id, e);
break; break;
} }
} }

View file

@ -6,9 +6,9 @@ use serde::Serialize;
use sqlx::Postgres; use sqlx::Postgres;
use uuid::Uuid; use uuid::Uuid;
/// Type for building a task to send. /// Type for building a job to send.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct TaskBuilder<'a> { pub struct JobBuilder<'a> {
id: Uuid, id: Uuid,
delay: Duration, delay: Duration,
channel_name: &'a str, channel_name: &'a str,
@ -22,12 +22,12 @@ pub struct TaskBuilder<'a> {
payload_bytes: Option<&'a [u8]>, payload_bytes: Option<&'a [u8]>,
} }
impl<'a> TaskBuilder<'a> { impl<'a> JobBuilder<'a> {
/// Prepare to send a task with the specified name. /// Prepare to send a job with the specified name.
pub fn new(name: &'a str) -> Self { pub fn new(name: &'a str) -> Self {
Self::new_with_id(Uuid::new_v4(), name) Self::new_with_id(Uuid::new_v4(), name)
} }
/// Prepare to send a task with the specified name and ID. /// Prepare to send a job with the specified name and ID.
pub fn new_with_id(id: Uuid, name: &'a str) -> Self { pub fn new_with_id(id: Uuid, name: &'a str) -> Self {
Self { Self {
id, id,
@ -76,32 +76,32 @@ impl<'a> TaskBuilder<'a> {
self.commit_interval = commit_interval; self.commit_interval = commit_interval;
self self
} }
/// Set whether this task is strictly ordered with respect to other ordered /// Set whether this job is strictly ordered with respect to other ordered
/// task in the same channel (default false). /// job in the same channel (default false).
pub fn set_ordered(&mut self, ordered: bool) -> &mut Self { pub fn set_ordered(&mut self, ordered: bool) -> &mut Self {
self.ordered = ordered; self.ordered = ordered;
self self
} }
/// Set a delay before this task is executed (default none). /// Set a delay before this job is executed (default none).
pub fn set_delay(&mut self, delay: Duration) -> &mut Self { pub fn set_delay(&mut self, delay: Duration) -> &mut Self {
self.delay = delay; self.delay = delay;
self self
} }
/// Set a raw JSON payload for the task. /// Set a raw JSON payload for the job.
pub fn set_raw_json(&mut self, raw_json: &'a str) -> &mut Self { pub fn set_raw_json(&mut self, raw_json: &'a str) -> &mut Self {
self.payload_json = Some(Cow::Borrowed(raw_json)); self.payload_json = Some(Cow::Borrowed(raw_json));
self self
} }
/// Set a raw binary payload for the task. /// Set a raw binary payload for the job.
pub fn set_raw_bytes(&mut self, raw_bytes: &'a [u8]) -> &mut Self { pub fn set_raw_bytes(&mut self, raw_bytes: &'a [u8]) -> &mut Self {
self.payload_bytes = Some(raw_bytes); self.payload_bytes = Some(raw_bytes);
self self
} }
/// Set a JSON payload for the task. /// Set a JSON payload for the job.
pub fn set_json<T: ?Sized + Serialize>( pub fn set_json<T: ?Sized + Serialize>(
&mut self, &mut self,
value: &T, value: &T,
@ -111,7 +111,7 @@ impl<'a> TaskBuilder<'a> {
Ok(self) Ok(self)
} }
/// Spawn the task using the given executor. This might be a connection /// Spawn the job using the given executor. This might be a connection
/// pool, a connection, or a transaction. /// pool, a connection, or a transaction.
pub async fn spawn<'b, E: sqlx::Executor<'b, Database = Postgres>>( pub async fn spawn<'b, E: sqlx::Executor<'b, Database = Postgres>>(
&self, &self,
@ -137,14 +137,14 @@ impl<'a> TaskBuilder<'a> {
} }
} }
/// Commit the specified tasks. The tasks should have been previously spawned /// Commit the specified jobs. The jobs should have been previously spawned
/// with the two-phase commit option enabled. /// with the two-phase commit option enabled.
pub async fn commit<'b, E: sqlx::Executor<'b, Database = Postgres>>( pub async fn commit<'b, E: sqlx::Executor<'b, Database = Postgres>>(
executor: E, executor: E,
task_ids: &[Uuid], job_ids: &[Uuid],
) -> Result<(), sqlx::Error> { ) -> Result<(), sqlx::Error> {
sqlx::query("SELECT mq_commit($1)") sqlx::query("SELECT mq_commit($1)")
.bind(task_ids) .bind(job_ids)
.execute(executor) .execute(executor)
.await?; .await?;
Ok(()) Ok(())

View file

@ -27,13 +27,13 @@ impl<T: Any> DerefMut for Opaque<T> {
} }
} }
/// A handle to a background task which will be automatically cancelled if /// A handle to a background job which will be automatically cancelled if
/// the handle is dropped. Extract the inner join handle to prevent this /// the handle is dropped. Extract the inner join handle to prevent this
/// behaviour. /// behaviour.
#[derive(Debug)] #[derive(Debug)]
pub struct OwnedTask(pub JoinHandle<()>); pub struct OwnedHandle(pub JoinHandle<()>);
impl Drop for OwnedTask { impl Drop for OwnedHandle {
fn drop(&mut self) { fn drop(&mut self) {
self.0.abort(); self.0.abort();
} }