From 37f70afa333bf3ba23d05e8793247c721920764f Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Mon, 29 Mar 2021 21:39:07 +0100 Subject: [PATCH] Rename task -> job to avoid confusion with async tasks --- README.md | 124 +++++++------- migrations/20210316025847_setup.up.sql | 2 +- sqlxmq_macros/src/lib.rs | 70 ++++---- src/hidden.rs | 6 +- src/lib.rs | 218 ++++++++++++------------- src/registry.rs | 82 +++++----- src/runner.rs | 151 +++++++++-------- src/spawn.rs | 30 ++-- src/utils.rs | 6 +- 9 files changed, 343 insertions(+), 346 deletions(-) diff --git a/README.md b/README.md index 8b5bcac..588e92f 100644 --- a/README.md +++ b/README.md @@ -1,96 +1,96 @@ # sqlxmq -A task queue built on `sqlx` and `PostgreSQL`. +A job queue built on `sqlx` and `PostgreSQL`. -This library allows a CRUD application to run background tasks without complicating its +This library allows a CRUD application to run background jobs without complicating its deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications already using a `PostgreSQL` database. -Although using a SQL database as a task queue means compromising on latency of -delivered tasks, there are several show-stopping issues present in ordinary task +Although using a SQL database as a job queue means compromising on latency of +delivered jobs, there are several show-stopping issues present in ordinary job queues which are avoided altogether. -With any other task queue, in-flight tasks are state that is not covered by normal -database backups. Even if tasks _are_ backed up, there is no way to restore both -a database and a task queue to a consistent point-in-time without manually +With most other job queues, in-flight jobs are state that is not covered by normal +database backups. Even if jobs _are_ backed up, there is no way to restore both +a database and a job queue to a consistent point-in-time without manually resolving conflicts. -By storing tasks in the database, existing backup procedures will store a perfectly -consistent state of both in-flight tasks and persistent data. Additionally, tasks can +By storing jobs in the database, existing backup procedures will store a perfectly +consistent state of both in-flight jobs and persistent data. Additionally, jobs can be spawned and completed as part of other transactions, making it easy to write correct application code. -Leveraging the power of `PostgreSQL`, this task queue offers several features not -present in other task queues. +Leveraging the power of `PostgreSQL`, this job queue offers several features not +present in other job queues. # Features -- **Send/receive multiple tasks at once.** +- **Send/receive multiple jobs at once.** This reduces the number of queries to the database. -- **Send tasks to be executed at a future date and time.** +- **Send jobs to be executed at a future date and time.** Avoids the need for a separate scheduling system. -- **Reliable delivery of tasks.** +- **Reliable delivery of jobs.** - **Automatic retries with exponential backoff.** Number of retries and initial backoff parameters are configurable. -- **Transactional sending of tasks.** +- **Transactional sending of jobs.** - Avoids sending spurious tasks if a transaction is rolled back. + Avoids sending spurious jobs if a transaction is rolled back. -- **Transactional completion of tasks.** +- **Transactional completion of jobs.** - If all side-effects of a task are updates to the database, this provides - true exactly-once execution of tasks. + If all side-effects of a job are updates to the database, this provides + true exactly-once execution of jobs. -- **Transactional check-pointing of tasks.** +- **Transactional check-pointing of jobs.** - Long-running tasks can check-point their state to avoid having to restart + Long-running jobs can check-point their state to avoid having to restart from the beginning if there is a failure: the next retry can continue from the last check-point. -- **Opt-in strictly ordered task delivery.** +- **Opt-in strictly ordered job delivery.** - Tasks within the same channel will be processed strictly in-order - if this option is enabled for the task. + Jobs within the same channel will be processed strictly in-order + if this option is enabled for the job. -- **Fair task delivery.** +- **Fair job delivery.** - A channel with a lot of tasks ready to run will not starve a channel with fewer - tasks. + A channel with a lot of jobs ready to run will not starve a channel with fewer + jobs. - **Opt-in two-phase commit.** This is particularly useful on an ordered channel where a position can be "reserved" - in the task order, but not committed until later. + in the job order, but not committed until later. - **JSON and/or binary payloads.** - Tasks can use whichever is most convenient. + Jobs can use whichever is most convenient. -- **Automatic keep-alive of tasks.** +- **Automatic keep-alive of jobs.** - Long-running tasks will automatically be "kept alive" to prevent them being + Long-running jobs will automatically be "kept alive" to prevent them being retried whilst they're still ongoing. - **Concurrency limits.** - Specify the minimum and maximum number of concurrent tasks each runner should + Specify the minimum and maximum number of concurrent jobs each runner should handle. -- **Built-in task registry via an attribute macro.** +- **Built-in job registry via an attribute macro.** - Tasks can be easily registered with a runner, and default configuration specified - on a per-task basis. + Jobs can be easily registered with a runner, and default configuration specified + on a per-job basis. - **Implicit channels.** - Channels are implicitly created and destroyed when tasks are sent and processed, + Channels are implicitly created and destroyed when jobs are sent and processed, so no setup is required. - **Channel groups.** @@ -100,75 +100,75 @@ present in other task queues. - **NOTIFY-based polling.** - This saves resources when few tasks are being processed. + This saves resources when few jobs are being processed. # Getting started -## Defining tasks +## Defining jobs -The first step is to define a function to be run on the task queue. +The first step is to define a function to be run on the job queue. ```rust -use sqlxmq::{task, CurrentTask}; +use sqlxmq::{job, CurrentJob}; -// Arguments to the `#[task]` attribute allow setting default task options. -#[task(channel_name = "foo")] -async fn example_task( - mut current_task: CurrentTask, +// Arguments to the `#[job]` attribute allow setting default job options. +#[job(channel_name = "foo")] +async fn example_job( + mut current_job: CurrentJob, ) -> sqlx::Result<()> { // Decode a JSON payload - let who: Option = current_task.json()?; + let who: Option = current_job.json()?; // Do some work println!("Hello, {}!", who.as_deref().unwrap_or("world")); - // Mark the task as complete - current_task.complete().await?; + // Mark the job as complete + current_job.complete().await?; Ok(()) } ``` -## Listening for tasks +## Listening for jobs -Next we need to create a task runner: this is what listens for new tasks +Next we need to create a job runner: this is what listens for new jobs and executes them. ```rust -use sqlxmq::TaskRegistry; +use sqlxmq::JobRegistry; #[tokio::main] async fn main() -> Result<(), Box> { // You'll need to provide a Postgres connection pool. let pool = connect_to_db().await?; - // Construct a task registry from our single task. - let mut registry = TaskRegistry::new(&[example_task]); + // Construct a job registry from our single job. + let mut registry = JobRegistry::new(&[example_job]); // Here is where you can configure the registry // registry.set_error_handler(...) let runner = registry - // Create a task runner using the connection pool. + // Create a job runner using the connection pool. .runner(&pool) - // Here is where you can configure the task runner - // Aim to keep 10-20 tasks running at a time. + // Here is where you can configure the job runner + // Aim to keep 10-20 jobs running at a time. .set_concurrency(10, 20) - // Start the task runner in the background. + // Start the job runner in the background. .run() .await?; - // The task runner will continue listening and running - // tasks until `runner` is dropped. + // The job runner will continue listening and running + // jobs until `runner` is dropped. } ``` -## Spawning a task +## Spawning a job -The final step is to actually run a task. +The final step is to actually run a job. ```rust -example_task.new() - // This is where we override task configuration +example_job.new() + // This is where we override job configuration .set_channel_name("bar") .set_json("John") .spawn(&pool) diff --git a/migrations/20210316025847_setup.up.sql b/migrations/20210316025847_setup.up.sql index 1623269..0b528fa 100644 --- a/migrations/20210316025847_setup.up.sql +++ b/migrations/20210316025847_setup.up.sql @@ -96,7 +96,7 @@ RETURNS TABLE(name TEXT, args TEXT) AS $$ LIMIT batch_size $$ LANGUAGE SQL STABLE; --- Main entry-point for task runner: pulls a batch of messages from the queue. +-- Main entry-point for job runner: pulls a batch of messages from the queue. CREATE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) RETURNS TABLE( id UUID, diff --git a/sqlxmq_macros/src/lib.rs b/sqlxmq_macros/src/lib.rs index 6af2021..5f0af87 100644 --- a/sqlxmq_macros/src/lib.rs +++ b/sqlxmq_macros/src/lib.rs @@ -13,7 +13,7 @@ use syn::{ }; #[derive(Default)] -struct TaskOptions { +struct JobOptions { proto: Option, name: Option, channel_name: Option, @@ -28,7 +28,7 @@ enum OptionValue<'a> { Path(&'a Path), } -fn interpret_task_arg(options: &mut TaskOptions, arg: NestedMeta) -> Result<()> { +fn interpret_job_arg(options: &mut JobOptions, arg: NestedMeta) -> Result<()> { fn error(arg: NestedMeta) -> Result<()> { Err(Error::new_spanned(arg, "Unexpected attribute argument")) } @@ -90,99 +90,99 @@ fn interpret_task_arg(options: &mut TaskOptions, arg: NestedMeta) -> Result<()> Ok(()) } -/// Marks a function as being a background task. +/// Marks a function as being a background job. /// -/// The function must take a single `CurrentTask` argument, and should +/// The function must take a single `CurrentJob` argument, and should /// be async or return a future. /// /// The async result must be a `Result<(), E>` type, where `E` is convertible /// to a `Box`, which is the case for most /// error types. /// -/// Several options can be provided to the `#[task]` attribute: +/// Several options can be provided to the `#[job]` attribute: /// /// # Name /// /// ``` -/// #[task("example")] -/// #[task(name="example")] +/// #[job("example")] +/// #[job(name="example")] /// ``` /// -/// This overrides the name for this task. If unspecified, the fully-qualified -/// name of the function is used. If you move a task to a new module or rename -/// the function, you may which to override the task name to prevent it from +/// This overrides the name for this job. If unspecified, the fully-qualified +/// name of the function is used. If you move a job to a new module or rename +/// the function, you may which to override the job name to prevent it from /// changing. /// /// # Channel name /// /// ``` -/// #[task(channel_name="foo")] +/// #[job(channel_name="foo")] /// ``` /// -/// This sets the default channel name on which the task will be spawned. +/// This sets the default channel name on which the job will be spawned. /// /// # Retries /// /// ``` -/// #[task(retries = 3)] +/// #[job(retries = 3)] /// ``` /// -/// This sets the default number of retries for the task. +/// This sets the default number of retries for the job. /// /// # Retry backoff /// /// ``` -/// #[task(backoff_secs=1.5)] -/// #[task(backoff_secs=2)] +/// #[job(backoff_secs=1.5)] +/// #[job(backoff_secs=2)] /// ``` /// -/// This sets the default initial retry backoff for the task in seconds. +/// This sets the default initial retry backoff for the job in seconds. /// /// # Ordered /// /// ``` -/// #[task(ordered)] -/// #[task(ordered=true)] -/// #[task(ordered=false)] +/// #[job(ordered)] +/// #[job(ordered=true)] +/// #[job(ordered=false)] /// ``` /// -/// This sets whether the task will be strictly ordered by default. +/// This sets whether the job will be strictly ordered by default. /// /// # Prototype /// /// ``` /// fn my_proto<'a, 'b>( -/// builder: &'a mut TaskBuilder<'b> -/// ) -> &'a mut TaskBuilder<'b> { +/// builder: &'a mut JobBuilder<'b> +/// ) -> &'a mut JobBuilder<'b> { /// builder.set_channel_name("bar") /// } /// -/// #[task(proto(my_proto))] +/// #[job(proto(my_proto))] /// ``` /// -/// This allows setting several task options at once using the specified function, -/// and can be convient if you have several tasks which should have similar +/// This allows setting several job options at once using the specified function, +/// and can be convient if you have several jobs which should have similar /// defaults. /// /// # Combinations /// -/// Multiple task options can be combined. The order is not important, but the +/// Multiple job options can be combined. The order is not important, but the /// prototype will always be applied first so that explicit options can override it. /// Each option can only be provided once in the attribute. /// /// ``` -/// #[task("my_task", proto(my_proto), retries=0, ordered)] +/// #[job("my_job", proto(my_proto), retries=0, ordered)] /// ``` /// #[proc_macro_attribute] -pub fn task(attr: TokenStream, item: TokenStream) -> TokenStream { +pub fn job(attr: TokenStream, item: TokenStream) -> TokenStream { let args = parse_macro_input!(attr as AttributeArgs); let mut inner_fn = parse_macro_input!(item as ItemFn); - let mut options = TaskOptions::default(); + let mut options = JobOptions::default(); let mut errors = Vec::new(); for arg in args { - if let Err(e) = interpret_task_arg(&mut options, arg) { + if let Err(e) = interpret_job_arg(&mut options, arg) { errors.push(e.into_compile_error()); } } @@ -226,15 +226,15 @@ pub fn task(attr: TokenStream, item: TokenStream) -> TokenStream { let expanded = quote! { #(#errors)* #[allow(non_upper_case_globals)] - #vis static #name: &'static sqlxmq::NamedTask = &{ + #vis static #name: &'static sqlxmq::NamedJob = &{ #inner_fn - sqlxmq::NamedTask::new_internal( + sqlxmq::NamedJob::new_internal( #fq_name, sqlxmq::hidden::BuildFn(|builder| { builder #(#chain)* }), - sqlxmq::hidden::RunFn(|registry, current_task| { - registry.spawn_internal(#fq_name, inner(current_task)); + sqlxmq::hidden::RunFn(|registry, current_job| { + registry.spawn_internal(#fq_name, inner(current_job)); }), ) }; diff --git a/src/hidden.rs b/src/hidden.rs index 1673871..cc6ebcb 100644 --- a/src/hidden.rs +++ b/src/hidden.rs @@ -1,6 +1,6 @@ -use crate::{CurrentTask, TaskBuilder, TaskRegistry}; +use crate::{CurrentJob, JobBuilder, JobRegistry}; #[doc(hidden)] -pub struct BuildFn(pub for<'a> fn(&'a mut TaskBuilder<'static>) -> &'a mut TaskBuilder<'static>); +pub struct BuildFn(pub for<'a> fn(&'a mut JobBuilder<'static>) -> &'a mut JobBuilder<'static>); #[doc(hidden)] -pub struct RunFn(pub fn(&TaskRegistry, CurrentTask)); +pub struct RunFn(pub fn(&JobRegistry, CurrentJob)); diff --git a/src/lib.rs b/src/lib.rs index cf24930..05171e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,97 +1,97 @@ #![deny(missing_docs, unsafe_code)] //! # sqlxmq //! -//! A task queue built on `sqlx` and `PostgreSQL`. +//! A job queue built on `sqlx` and `PostgreSQL`. //! -//! This library allows a CRUD application to run background tasks without complicating its +//! This library allows a CRUD application to run background jobs without complicating its //! deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications //! already using a `PostgreSQL` database. //! -//! Although using a SQL database as a task queue means compromising on latency of -//! delivered tasks, there are several show-stopping issues present in ordinary task +//! Although using a SQL database as a job queue means compromising on latency of +//! delivered jobs, there are several show-stopping issues present in ordinary job //! queues which are avoided altogether. //! -//! With any other task queue, in-flight tasks are state that is not covered by normal -//! database backups. Even if tasks _are_ backed up, there is no way to restore both -//! a database and a task queue to a consistent point-in-time without manually +//! With most other job queues, in-flight jobs are state that is not covered by normal +//! database backups. Even if jobs _are_ backed up, there is no way to restore both +//! a database and a job queue to a consistent point-in-time without manually //! resolving conflicts. //! -//! By storing tasks in the database, existing backup procedures will store a perfectly -//! consistent state of both in-flight tasks and persistent data. Additionally, tasks can +//! By storing jobs in the database, existing backup procedures will store a perfectly +//! consistent state of both in-flight jobs and persistent data. Additionally, jobs can //! be spawned and completed as part of other transactions, making it easy to write correct //! application code. //! -//! Leveraging the power of `PostgreSQL`, this task queue offers several features not -//! present in other task queues. +//! Leveraging the power of `PostgreSQL`, this job queue offers several features not +//! present in other job queues. //! //! # Features //! -//! - **Send/receive multiple tasks at once.** +//! - **Send/receive multiple jobs at once.** //! //! This reduces the number of queries to the database. //! -//! - **Send tasks to be executed at a future date and time.** +//! - **Send jobs to be executed at a future date and time.** //! //! Avoids the need for a separate scheduling system. //! -//! - **Reliable delivery of tasks.** +//! - **Reliable delivery of jobs.** //! //! - **Automatic retries with exponential backoff.** //! //! Number of retries and initial backoff parameters are configurable. //! -//! - **Transactional sending of tasks.** +//! - **Transactional sending of jobs.** //! -//! Avoids sending spurious tasks if a transaction is rolled back. +//! Avoids sending spurious jobs if a transaction is rolled back. //! -//! - **Transactional completion of tasks.** +//! - **Transactional completion of jobs.** //! -//! If all side-effects of a task are updates to the database, this provides -//! true exactly-once execution of tasks. +//! If all side-effects of a job are updates to the database, this provides +//! true exactly-once execution of jobs. //! -//! - **Transactional check-pointing of tasks.** +//! - **Transactional check-pointing of jobs.** //! -//! Long-running tasks can check-point their state to avoid having to restart +//! Long-running jobs can check-point their state to avoid having to restart //! from the beginning if there is a failure: the next retry can continue //! from the last check-point. //! -//! - **Opt-in strictly ordered task delivery.** +//! - **Opt-in strictly ordered job delivery.** //! -//! Tasks within the same channel will be processed strictly in-order -//! if this option is enabled for the task. +//! Jobs within the same channel will be processed strictly in-order +//! if this option is enabled for the job. //! -//! - **Fair task delivery.** +//! - **Fair job delivery.** //! -//! A channel with a lot of tasks ready to run will not starve a channel with fewer -//! tasks. +//! A channel with a lot of jobs ready to run will not starve a channel with fewer +//! jobs. //! //! - **Opt-in two-phase commit.** //! //! This is particularly useful on an ordered channel where a position can be "reserved" -//! in the task order, but not committed until later. +//! in the job order, but not committed until later. //! //! - **JSON and/or binary payloads.** //! -//! Tasks can use whichever is most convenient. +//! Jobs can use whichever is most convenient. //! -//! - **Automatic keep-alive of tasks.** +//! - **Automatic keep-alive of jobs.** //! -//! Long-running tasks will automatically be "kept alive" to prevent them being +//! Long-running jobs will automatically be "kept alive" to prevent them being //! retried whilst they're still ongoing. //! //! - **Concurrency limits.** //! -//! Specify the minimum and maximum number of concurrent tasks each runner should +//! Specify the minimum and maximum number of concurrent jobs each runner should //! handle. //! -//! - **Built-in task registry via an attribute macro.** +//! - **Built-in job registry via an attribute macro.** //! -//! Tasks can be easily registered with a runner, and default configuration specified -//! on a per-task basis. +//! Jobs can be easily registered with a runner, and default configuration specified +//! on a per-job basis. //! //! - **Implicit channels.** //! -//! Channels are implicitly created and destroyed when tasks are sent and processed, +//! Channels are implicitly created and destroyed when jobs are sent and processed, //! so no setup is required. //! //! - **Channel groups.** @@ -101,75 +101,75 @@ //! //! - **NOTIFY-based polling.** //! -//! This saves resources when few tasks are being processed. +//! This saves resources when few jobs are being processed. //! //! # Getting started //! -//! ## Defining tasks +//! ## Defining jobs //! -//! The first step is to define a function to be run on the task queue. +//! The first step is to define a function to be run on the job queue. //! //! ```rust -//! use sqlxmq::{task, CurrentTask}; +//! use sqlxmq::{job, CurrentJob}; //! -//! // Arguments to the `#[task]` attribute allow setting default task options. -//! #[task(channel_name = "foo")] -//! async fn example_task( -//! mut current_task: CurrentTask, +//! // Arguments to the `#[job]` attribute allow setting default job options. +//! #[job(channel_name = "foo")] +//! async fn example_job( +//! mut current_job: CurrentJob, //! ) -> sqlx::Result<()> { //! // Decode a JSON payload -//! let who: Option = current_task.json()?; +//! let who: Option = current_job.json()?; //! //! // Do some work //! println!("Hello, {}!", who.as_deref().unwrap_or("world")); //! -//! // Mark the task as complete -//! current_task.complete().await?; +//! // Mark the job as complete +//! current_job.complete().await?; //! //! Ok(()) //! } //! ``` //! -//! ## Listening for tasks +//! ## Listening for jobs //! -//! Next we need to create a task runner: this is what listens for new tasks +//! Next we need to create a job runner: this is what listens for new jobs //! and executes them. //! //! ```rust -//! use sqlxmq::TaskRegistry; +//! use sqlxmq::JobRegistry; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { //! // You'll need to provide a Postgres connection pool. //! let pool = connect_to_db().await?; //! -//! // Construct a task registry from our single task. -//! let mut registry = TaskRegistry::new(&[example_task]); +//! // Construct a job registry from our single job. +//! let mut registry = JobRegistry::new(&[example_job]); //! // Here is where you can configure the registry //! // registry.set_error_handler(...) //! //! let runner = registry -//! // Create a task runner using the connection pool. +//! // Create a job runner using the connection pool. //! .runner(&pool) -//! // Here is where you can configure the task runner -//! // Aim to keep 10-20 tasks running at a time. +//! // Here is where you can configure the job runner +//! // Aim to keep 10-20 jobs running at a time. //! .set_concurrency(10, 20) -//! // Start the task runner in the background. +//! // Start the job runner in the background. //! .run() //! .await?; //! -//! // The task runner will continue listening and running -//! // tasks until `runner` is dropped. +//! // The job runner will continue listening and running +//! // jobs until `runner` is dropped. //! } //! ``` //! -//! ## Spawning a task +//! ## Spawning a job //! -//! The final step is to actually run a task. +//! The final step is to actually run a job. //! //! ```rust -//! example_task.new() -//! // This is where we override task configuration +//! example_job.new() +//! // This is where we override job configuration //! .set_channel_name("bar") //! .set_json("John") //! .spawn(&pool) @@ -186,8 +186,8 @@ mod utils; pub use registry::*; pub use runner::*; pub use spawn::*; -pub use sqlxmq_macros::task; -pub use utils::OwnedTask; +pub use sqlxmq_macros::job; +pub use utils::OwnedHandle; #[cfg(test)] mod tests { @@ -244,18 +244,18 @@ mod tests { TestGuard(guard, pool) } - async fn test_task_runner( + async fn test_job_runner( pool: &Pool, - f: impl (Fn(CurrentTask) -> F) + Send + Sync + 'static, - ) -> (OwnedTask, Arc) + f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static, + ) -> (OwnedHandle, Arc) where F::Output: Send + 'static, { let counter = Arc::new(AtomicUsize::new(0)); let counter2 = counter.clone(); - let runner = TaskRunnerOptions::new(pool, move |task| { + let runner = JobRunnerOptions::new(pool, move |job| { counter2.fetch_add(1, Ordering::SeqCst); - task::spawn(f(task)); + task::spawn(f(job)); }) .run() .await @@ -263,28 +263,28 @@ mod tests { (runner, counter) } - fn task_proto<'a, 'b>(builder: &'a mut TaskBuilder<'b>) -> &'a mut TaskBuilder<'b> { + fn job_proto<'a, 'b>(builder: &'a mut JobBuilder<'b>) -> &'a mut JobBuilder<'b> { builder.set_channel_name("bar") } - #[task(channel_name = "foo", ordered, retries = 3, backoff_secs = 2.0)] - async fn example_task1( - mut current_task: CurrentTask, + #[job(channel_name = "foo", ordered, retries = 3, backoff_secs = 2.0)] + async fn example_job1( + mut current_job: CurrentJob, ) -> Result<(), Box> { - current_task.complete().await?; + current_job.complete().await?; Ok(()) } - #[task(proto(task_proto))] - async fn example_task2( - mut current_task: CurrentTask, + #[job(proto(job_proto))] + async fn example_job2( + mut current_job: CurrentJob, ) -> Result<(), Box> { - current_task.complete().await?; + current_job.complete().await?; Ok(()) } - async fn named_task_runner(pool: &Pool) -> OwnedTask { - TaskRegistry::new(&[example_task1, example_task2]) + async fn named_job_runner(pool: &Pool) -> OwnedHandle { + JobRegistry::new(&[example_job1, example_job2]) .runner(pool) .run() .await @@ -300,37 +300,37 @@ mod tests { } #[tokio::test] - async fn it_can_spawn_task() { + async fn it_can_spawn_job() { let pool = &*test_pool().await; let (_runner, counter) = - test_task_runner(&pool, |mut task| async move { task.complete().await }).await; + test_job_runner(&pool, |mut job| async move { job.complete().await }).await; assert_eq!(counter.load(Ordering::SeqCst), 0); - TaskBuilder::new("foo").spawn(pool).await.unwrap(); + JobBuilder::new("foo").spawn(pool).await.unwrap(); pause().await; assert_eq!(counter.load(Ordering::SeqCst), 1); } #[tokio::test] - async fn it_runs_tasks_in_order() { + async fn it_runs_jobs_in_order() { let pool = &*test_pool().await; let (tx, mut rx) = mpsc::unbounded(); - let (_runner, counter) = test_task_runner(&pool, move |task| { + let (_runner, counter) = test_job_runner(&pool, move |job| { let tx = tx.clone(); async move { - tx.unbounded_send(task).unwrap(); + tx.unbounded_send(job).unwrap(); } }) .await; assert_eq!(counter.load(Ordering::SeqCst), 0); - TaskBuilder::new("foo") + JobBuilder::new("foo") .set_ordered(true) .spawn(pool) .await .unwrap(); - TaskBuilder::new("bar") + JobBuilder::new("bar") .set_ordered(true) .spawn(pool) .await @@ -339,48 +339,48 @@ mod tests { pause().await; assert_eq!(counter.load(Ordering::SeqCst), 1); - let mut task = rx.next().await.unwrap(); - task.complete().await.unwrap(); + let mut job = rx.next().await.unwrap(); + job.complete().await.unwrap(); pause().await; assert_eq!(counter.load(Ordering::SeqCst), 2); } #[tokio::test] - async fn it_runs_tasks_in_parallel() { + async fn it_runs_jobs_in_parallel() { let pool = &*test_pool().await; let (tx, mut rx) = mpsc::unbounded(); - let (_runner, counter) = test_task_runner(&pool, move |task| { + let (_runner, counter) = test_job_runner(&pool, move |job| { let tx = tx.clone(); async move { - tx.unbounded_send(task).unwrap(); + tx.unbounded_send(job).unwrap(); } }) .await; assert_eq!(counter.load(Ordering::SeqCst), 0); - TaskBuilder::new("foo").spawn(pool).await.unwrap(); - TaskBuilder::new("bar").spawn(pool).await.unwrap(); + JobBuilder::new("foo").spawn(pool).await.unwrap(); + JobBuilder::new("bar").spawn(pool).await.unwrap(); pause().await; assert_eq!(counter.load(Ordering::SeqCst), 2); for _ in 0..2 { - let mut task = rx.next().await.unwrap(); - task.complete().await.unwrap(); + let mut job = rx.next().await.unwrap(); + job.complete().await.unwrap(); } } #[tokio::test] - async fn it_retries_failed_tasks() { + async fn it_retries_failed_jobs() { let pool = &*test_pool().await; - let (_runner, counter) = test_task_runner(&pool, move |_| async {}).await; + let (_runner, counter) = test_job_runner(&pool, move |_| async {}).await; let backoff = 200; assert_eq!(counter.load(Ordering::SeqCst), 0); - TaskBuilder::new("foo") + JobBuilder::new("foo") .set_retry_backoff(Duration::from_millis(backoff)) .set_retries(2) .spawn(pool) @@ -407,14 +407,14 @@ mod tests { } #[tokio::test] - async fn it_can_checkpoint_tasks() { + async fn it_can_checkpoint_jobs() { let pool = &*test_pool().await; - let (_runner, counter) = test_task_runner(&pool, move |mut current_task| async move { - let state: bool = current_task.json().unwrap().unwrap(); + let (_runner, counter) = test_job_runner(&pool, move |mut current_job| async move { + let state: bool = current_job.json().unwrap().unwrap(); if state { - current_task.complete().await.unwrap(); + current_job.complete().await.unwrap(); } else { - current_task + current_job .checkpoint(Checkpoint::new().set_json(&true).unwrap()) .await .unwrap(); @@ -425,7 +425,7 @@ mod tests { let backoff = 200; assert_eq!(counter.load(Ordering::SeqCst), 0); - TaskBuilder::new("foo") + JobBuilder::new("foo") .set_retry_backoff(Duration::from_millis(backoff)) .set_retries(5) .set_json(&false) @@ -451,10 +451,10 @@ mod tests { #[tokio::test] async fn it_can_use_registry() { let pool = &*test_pool().await; - let _runner = named_task_runner(pool).await; + let _runner = named_job_runner(pool).await; - example_task1.new().spawn(pool).await.unwrap(); - example_task2.new().spawn(pool).await.unwrap(); + example_job1.new().spawn(pool).await.unwrap(); + example_job2.new().spawn(pool).await.unwrap(); pause().await; } } diff --git a/src/registry.rs b/src/registry.rs index 8d587fe..51c32e8 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -9,42 +9,42 @@ use uuid::Uuid; use crate::hidden::{BuildFn, RunFn}; use crate::utils::Opaque; -use crate::{TaskBuilder, TaskRunnerOptions}; +use crate::{JobBuilder, JobRunnerOptions}; -/// Stores a mapping from task name to task. Can be used to construct -/// a task runner. -pub struct TaskRegistry { +/// Stores a mapping from job name to job. Can be used to construct +/// a job runner. +pub struct JobRegistry { error_handler: Arc) + Send + Sync>, - task_map: HashMap<&'static str, &'static NamedTask>, + job_map: HashMap<&'static str, &'static NamedJob>, } -/// Error returned when a task is received whose name is not in the registry. +/// Error returned when a job is received whose name is not in the registry. #[derive(Debug)] -pub struct UnknownTaskError; +pub struct UnknownJobError; -impl Error for UnknownTaskError {} -impl Display for UnknownTaskError { +impl Error for UnknownJobError {} +impl Display for UnknownJobError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("Unknown task") + f.write_str("Unknown job") } } -impl TaskRegistry { - /// Construct a new task registry from the provided task list. - pub fn new(tasks: &[&'static NamedTask]) -> Self { - let mut task_map = HashMap::new(); - for &task in tasks { - if task_map.insert(task.name(), task).is_some() { - panic!("Duplicate task registered: {}", task.name()); +impl JobRegistry { + /// Construct a new job registry from the provided job list. + pub fn new(jobs: &[&'static NamedJob]) -> Self { + let mut job_map = HashMap::new(); + for &job in jobs { + if job_map.insert(job.name(), job).is_some() { + panic!("Duplicate job registered: {}", job.name()); } } Self { error_handler: Arc::new(Self::default_error_handler), - task_map, + job_map, } } - /// Set a function to be called whenever a task returns an error. + /// Set a function to be called whenever a job returns an error. pub fn set_error_handler( &mut self, error_handler: impl Fn(&str, Box) + Send + Sync + 'static, @@ -53,14 +53,14 @@ impl TaskRegistry { self } - /// Look-up a task by name. - pub fn resolve_task(&self, name: &str) -> Option<&'static NamedTask> { - self.task_map.get(name).copied() + /// Look-up a job by name. + pub fn resolve_job(&self, name: &str) -> Option<&'static NamedJob> { + self.job_map.get(name).copied() } /// The default error handler implementation, which simply logs the error. pub fn default_error_handler(name: &str, error: Box) { - log::error!("Task {} failed: {}", name, error); + log::error!("Job {} failed: {}", name, error); } #[doc(hidden)] @@ -77,29 +77,29 @@ impl TaskRegistry { }); } - /// Construct a task runner from this registry and the provided connection + /// Construct a job runner from this registry and the provided connection /// pool. - pub fn runner(self, pool: &Pool) -> TaskRunnerOptions { - TaskRunnerOptions::new(pool, move |current_task| { - if let Some(task) = self.resolve_task(current_task.name()) { - (task.run_fn.0 .0)(&self, current_task); + pub fn runner(self, pool: &Pool) -> JobRunnerOptions { + JobRunnerOptions::new(pool, move |current_job| { + if let Some(job) = self.resolve_job(current_job.name()) { + (job.run_fn.0 .0)(&self, current_job); } else { - (self.error_handler)(current_task.name(), Box::new(UnknownTaskError)) + (self.error_handler)(current_job.name(), Box::new(UnknownJobError)) } }) } } -/// Type for a named task. Functions annotated with `#[task]` are -/// transformed into static variables whose type is `&'static NamedTask`. +/// Type for a named job. Functions annotated with `#[job]` are +/// transformed into static variables whose type is `&'static NamedJob`. #[derive(Debug)] -pub struct NamedTask { +pub struct NamedJob { name: &'static str, build_fn: Opaque, run_fn: Opaque, } -impl NamedTask { +impl NamedJob { #[doc(hidden)] pub const fn new_internal(name: &'static str, build_fn: BuildFn, run_fn: RunFn) -> Self { Self { @@ -108,21 +108,21 @@ impl NamedTask { run_fn: Opaque(run_fn), } } - /// Initialize a task builder with the name and defaults of this task. - pub fn new(&self) -> TaskBuilder<'static> { - let mut builder = TaskBuilder::new(self.name); + /// Initialize a job builder with the name and defaults of this job. + pub fn new(&self) -> JobBuilder<'static> { + let mut builder = JobBuilder::new(self.name); (self.build_fn.0 .0)(&mut builder); builder } - /// Initialize a task builder with the name and defaults of this task, - /// using the provided task ID. - pub fn new_with_id(&self, id: Uuid) -> TaskBuilder<'static> { - let mut builder = TaskBuilder::new_with_id(id, self.name); + /// Initialize a job builder with the name and defaults of this job, + /// using the provided job ID. + pub fn new_with_id(&self, id: Uuid) -> JobBuilder<'static> { + let mut builder = JobBuilder::new_with_id(id, self.name); (self.build_fn.0 .0)(&mut builder); builder } - /// Returns the name of this task. + /// Returns the name of this job. pub const fn name(&self) -> &'static str { self.name } diff --git a/src/runner.rs b/src/runner.rs index bec5d7b..44b58e6 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -12,27 +12,27 @@ use tokio::sync::Notify; use tokio::task; use uuid::Uuid; -use crate::utils::{Opaque, OwnedTask}; +use crate::utils::{Opaque, OwnedHandle}; -/// Type used to build a task runner. +/// Type used to build a job runner. #[derive(Debug, Clone)] -pub struct TaskRunnerOptions { +pub struct JobRunnerOptions { min_concurrency: usize, max_concurrency: usize, channel_names: Option>, - dispatch: Opaque>, + dispatch: Opaque>, pool: Pool, keep_alive: bool, } #[derive(Debug)] -struct TaskRunner { - options: TaskRunnerOptions, - running_tasks: AtomicUsize, +struct JobRunner { + options: JobRunnerOptions, + running_jobs: AtomicUsize, notify: Notify, } -/// Type used to checkpoint a running task. +/// Type used to checkpoint a running job. #[derive(Debug, Clone)] pub struct Checkpoint<'a> { duration: Duration, @@ -42,7 +42,7 @@ pub struct Checkpoint<'a> { } impl<'a> Checkpoint<'a> { - /// Construct a new checkpoint which also keeps the task alive + /// Construct a new checkpoint which also keeps the job alive /// for the specified interval. pub fn new_keep_alive(duration: Duration) -> Self { Self { @@ -56,7 +56,7 @@ impl<'a> Checkpoint<'a> { pub fn new() -> Self { Self::new_keep_alive(Duration::from_secs(0)) } - /// Add extra retries to the current task. + /// Add extra retries to the current job. pub fn set_extra_retries(&mut self, extra_retries: usize) -> &mut Self { self.extra_retries = extra_retries; self @@ -79,11 +79,11 @@ impl<'a> Checkpoint<'a> { } async fn execute<'b, E: sqlx::Executor<'b, Database = Postgres>>( &self, - task_id: Uuid, + job_id: Uuid, executor: E, ) -> Result<(), sqlx::Error> { sqlx::query("SELECT mq_checkpoint($1, $2, $3, $4, $5)") - .bind(task_id) + .bind(job_id) .bind(self.duration) .bind(self.payload_json.as_deref()) .bind(self.payload_bytes) @@ -94,24 +94,24 @@ impl<'a> Checkpoint<'a> { } } -/// Handle to the currently executing task. -/// When dropped, the task is assumed to no longer be running. -/// To prevent the task being retried, it must be explicitly completed using +/// Handle to the currently executing job. +/// When dropped, the job is assumed to no longer be running. +/// To prevent the job being retried, it must be explicitly completed using /// one of the `.complete_` methods. #[derive(Debug)] -pub struct CurrentTask { +pub struct CurrentJob { id: Uuid, name: String, payload_json: Option, payload_bytes: Option>, - task_runner: Arc, - keep_alive: Option, + job_runner: Arc, + keep_alive: Option, } -impl CurrentTask { - /// Returns the database pool used to receive this task. +impl CurrentJob { + /// Returns the database pool used to receive this job. pub fn pool(&self) -> &Pool { - &self.task_runner.options.pool + &self.job_runner.options.pool } async fn delete( &self, @@ -123,8 +123,8 @@ impl CurrentTask { .await?; Ok(()) } - /// Complete this task and commit the provided transaction at the same time. - /// If the transaction cannot be committed, the task will not be completed. + /// Complete this job and commit the provided transaction at the same time. + /// If the transaction cannot be committed, the job will not be completed. pub async fn complete_with_transaction( &mut self, mut tx: sqlx::Transaction<'_, Postgres>, @@ -134,15 +134,15 @@ impl CurrentTask { self.keep_alive = None; Ok(()) } - /// Complete this task. + /// Complete this job. pub async fn complete(&mut self) -> Result<(), sqlx::Error> { self.delete(self.pool()).await?; self.keep_alive = None; Ok(()) } - /// Checkpoint this task and commit the provided transaction at the same time. - /// If the transaction cannot be committed, the task will not be checkpointed. - /// Checkpointing allows the task payload to be replaced for the next retry. + /// Checkpoint this job and commit the provided transaction at the same time. + /// If the transaction cannot be committed, the job will not be checkpointed. + /// Checkpointing allows the job payload to be replaced for the next retry. pub async fn checkpoint_with_transaction( &mut self, mut tx: sqlx::Transaction<'_, Postgres>, @@ -152,12 +152,12 @@ impl CurrentTask { tx.commit().await?; Ok(()) } - /// Checkpointing allows the task payload to be replaced for the next retry. + /// Checkpointing allows the job payload to be replaced for the next retry. pub async fn checkpoint(&mut self, checkpoint: &Checkpoint<'_>) -> Result<(), sqlx::Error> { checkpoint.execute(self.id, self.pool()).await?; Ok(()) } - /// Prevent this task from being retried for the specified interval. + /// Prevent this job from being retried for the specified interval. pub async fn keep_alive(&mut self, duration: Duration) -> Result<(), sqlx::Error> { sqlx::query("SELECT mq_keep_alive(ARRAY[$1], $2)") .bind(self.id) @@ -166,15 +166,15 @@ impl CurrentTask { .await?; Ok(()) } - /// Returns the ID of this task. + /// Returns the ID of this job. pub fn id(&self) -> Uuid { self.id } - /// Returns the name of this task. + /// Returns the name of this job. pub fn name(&self) -> &str { &self.name } - /// Extracts the JSON payload belonging to this task (if present). + /// Extracts the JSON payload belonging to this job (if present). pub fn json<'a, T: Deserialize<'a>>(&'a self) -> Result, serde_json::Error> { if let Some(payload_json) = &self.payload_json { serde_json::from_str(payload_json).map(Some) @@ -182,33 +182,30 @@ impl CurrentTask { Ok(None) } } - /// Returns the raw JSON payload for this task. + /// Returns the raw JSON payload for this job. pub fn raw_json(&self) -> Option<&str> { self.payload_json.as_deref() } - /// Returns the raw binary payload for this task. + /// Returns the raw binary payload for this job. pub fn raw_bytes(&self) -> Option<&[u8]> { self.payload_bytes.as_deref() } } -impl Drop for CurrentTask { +impl Drop for CurrentJob { fn drop(&mut self) { - if self - .task_runner - .running_tasks - .fetch_sub(1, Ordering::SeqCst) - == self.task_runner.options.min_concurrency + if self.job_runner.running_jobs.fetch_sub(1, Ordering::SeqCst) + == self.job_runner.options.min_concurrency { - self.task_runner.notify.notify_one(); + self.job_runner.notify.notify_one(); } } } -impl TaskRunnerOptions { - /// Begin constructing a new task runner using the specified connection pool, +impl JobRunnerOptions { + /// Begin constructing a new job runner using the specified connection pool, /// and the provided execution function. - pub fn new(pool: &Pool, f: F) -> Self { + pub fn new(pool: &Pool, f: F) -> Self { Self { min_concurrency: 16, max_concurrency: 32, @@ -218,8 +215,8 @@ impl TaskRunnerOptions { pool: pool.clone(), } } - /// Set the concurrency limits for this task runner. When the number of active - /// tasks falls below the minimum, the runner will poll for more, up to the maximum. + /// Set the concurrency limits for this job runner. When the number of active + /// jobs falls below the minimum, the runner will poll for more, up to the maximum. /// /// The difference between the min and max will dictate the maximum batch size which /// can be received: larger batch sizes are more efficient. @@ -228,8 +225,8 @@ impl TaskRunnerOptions { self.max_concurrency = max_concurrency; self } - /// Set the channel names which this task runner will subscribe to. If unspecified, - /// the task runner will subscribe to all channels. + /// Set the channel names which this job runner will subscribe to. If unspecified, + /// the job runner will subscribe to all channels. pub fn set_channel_names<'a>(&'a mut self, channel_names: &[&str]) -> &'a mut Self { self.channel_names = Some( channel_names @@ -240,33 +237,33 @@ impl TaskRunnerOptions { ); self } - /// Choose whether to automatically keep tasks alive whilst they're still + /// Choose whether to automatically keep jobs alive whilst they're still /// running. Defaults to `true`. pub fn set_keep_alive(&mut self, keep_alive: bool) -> &mut Self { self.keep_alive = keep_alive; self } - /// Start the task runner in the background. The task runner will stop when the + /// Start the job runner in the background. The job runner will stop when the /// returned handle is dropped. - pub async fn run(&self) -> Result { + pub async fn run(&self) -> Result { let options = self.clone(); - let task_runner = Arc::new(TaskRunner { + let job_runner = Arc::new(JobRunner { options, - running_tasks: AtomicUsize::new(0), + running_jobs: AtomicUsize::new(0), notify: Notify::new(), }); - let listener_task = start_listener(task_runner.clone()).await?; - Ok(OwnedTask(task::spawn(main_loop( - task_runner, + let listener_task = start_listener(job_runner.clone()).await?; + Ok(OwnedHandle(task::spawn(main_loop( + job_runner, listener_task, )))) } } -async fn start_listener(task_runner: Arc) -> Result { - let mut listener = PgListener::connect_with(&task_runner.options.pool).await?; - if let Some(channels) = &task_runner.options.channel_names { +async fn start_listener(job_runner: Arc) -> Result { + let mut listener = PgListener::connect_with(&job_runner.options.pool).await?; + if let Some(channels) = &job_runner.options.channel_names { let names: Vec = channels.iter().map(|c| format!("mq_{}", c)).collect(); listener .listen_all(names.iter().map(|s| s.as_str())) @@ -274,9 +271,9 @@ async fn start_listener(task_runner: Arc) -> Result Duration { } async fn poll_and_dispatch( - task_runner: &Arc, + job_runner: &Arc, batch_size: i32, ) -> Result { log::info!("Polling for messages"); - let options = &task_runner.options; + let options = &job_runner.options; let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2)") .bind(&options.channel_names) .bind(batch_size) @@ -350,7 +347,7 @@ async fn poll_and_dispatch( { let retry_backoff = to_duration(retry_backoff); let keep_alive = if options.keep_alive { - Some(OwnedTask(task::spawn(keep_task_alive( + Some(OwnedHandle(task::spawn(keep_job_alive( id, options.pool.clone(), retry_backoff, @@ -358,31 +355,31 @@ async fn poll_and_dispatch( } else { None }; - let current_task = CurrentTask { + let current_job = CurrentJob { id, name, payload_json, payload_bytes, - task_runner: task_runner.clone(), + job_runner: job_runner.clone(), keep_alive, }; - task_runner.running_tasks.fetch_add(1, Ordering::SeqCst); - (options.dispatch)(current_task); + job_runner.running_jobs.fetch_add(1, Ordering::SeqCst); + (options.dispatch)(current_job); } } Ok(wait_time) } -async fn main_loop(task_runner: Arc, _listener_task: OwnedTask) { - let options = &task_runner.options; +async fn main_loop(job_runner: Arc, _listener_task: OwnedHandle) { + let options = &job_runner.options; let mut failures = 0; loop { - let running_tasks = task_runner.running_tasks.load(Ordering::SeqCst); - let duration = if running_tasks < options.min_concurrency { - let batch_size = (options.max_concurrency - running_tasks) as i32; + let running_jobs = job_runner.running_jobs.load(Ordering::SeqCst); + let duration = if running_jobs < options.min_concurrency { + let batch_size = (options.max_concurrency - running_jobs) as i32; - match poll_and_dispatch(&task_runner, batch_size).await { + match poll_and_dispatch(&job_runner, batch_size).await { Ok(duration) => { failures = 0; duration @@ -398,11 +395,11 @@ async fn main_loop(task_runner: Arc, _listener_task: OwnedTask) { }; // Wait for us to be notified, or for the timeout to elapse - let _ = tokio::time::timeout(duration, task_runner.notify.notified()).await; + let _ = tokio::time::timeout(duration, job_runner.notify.notified()).await; } } -async fn keep_task_alive(id: Uuid, pool: Pool, mut interval: Duration) { +async fn keep_job_alive(id: Uuid, pool: Pool, mut interval: Duration) { loop { tokio::time::sleep(interval / 2).await; interval *= 2; @@ -412,7 +409,7 @@ async fn keep_task_alive(id: Uuid, pool: Pool, mut interval: Duration) .execute(&pool) .await { - log::error!("Failed to keep task {} alive: {}", id, e); + log::error!("Failed to keep job {} alive: {}", id, e); break; } } diff --git a/src/spawn.rs b/src/spawn.rs index 8f636a8..263753f 100644 --- a/src/spawn.rs +++ b/src/spawn.rs @@ -6,9 +6,9 @@ use serde::Serialize; use sqlx::Postgres; use uuid::Uuid; -/// Type for building a task to send. +/// Type for building a job to send. #[derive(Debug, Clone)] -pub struct TaskBuilder<'a> { +pub struct JobBuilder<'a> { id: Uuid, delay: Duration, channel_name: &'a str, @@ -22,12 +22,12 @@ pub struct TaskBuilder<'a> { payload_bytes: Option<&'a [u8]>, } -impl<'a> TaskBuilder<'a> { - /// Prepare to send a task with the specified name. +impl<'a> JobBuilder<'a> { + /// Prepare to send a job with the specified name. pub fn new(name: &'a str) -> Self { Self::new_with_id(Uuid::new_v4(), name) } - /// Prepare to send a task with the specified name and ID. + /// Prepare to send a job with the specified name and ID. pub fn new_with_id(id: Uuid, name: &'a str) -> Self { Self { id, @@ -76,32 +76,32 @@ impl<'a> TaskBuilder<'a> { self.commit_interval = commit_interval; self } - /// Set whether this task is strictly ordered with respect to other ordered - /// task in the same channel (default false). + /// Set whether this job is strictly ordered with respect to other ordered + /// job in the same channel (default false). pub fn set_ordered(&mut self, ordered: bool) -> &mut Self { self.ordered = ordered; self } - /// Set a delay before this task is executed (default none). + /// Set a delay before this job is executed (default none). pub fn set_delay(&mut self, delay: Duration) -> &mut Self { self.delay = delay; self } - /// Set a raw JSON payload for the task. + /// Set a raw JSON payload for the job. pub fn set_raw_json(&mut self, raw_json: &'a str) -> &mut Self { self.payload_json = Some(Cow::Borrowed(raw_json)); self } - /// Set a raw binary payload for the task. + /// Set a raw binary payload for the job. pub fn set_raw_bytes(&mut self, raw_bytes: &'a [u8]) -> &mut Self { self.payload_bytes = Some(raw_bytes); self } - /// Set a JSON payload for the task. + /// Set a JSON payload for the job. pub fn set_json( &mut self, value: &T, @@ -111,7 +111,7 @@ impl<'a> TaskBuilder<'a> { Ok(self) } - /// Spawn the task using the given executor. This might be a connection + /// Spawn the job using the given executor. This might be a connection /// pool, a connection, or a transaction. pub async fn spawn<'b, E: sqlx::Executor<'b, Database = Postgres>>( &self, @@ -137,14 +137,14 @@ impl<'a> TaskBuilder<'a> { } } -/// Commit the specified tasks. The tasks should have been previously spawned +/// Commit the specified jobs. The jobs should have been previously spawned /// with the two-phase commit option enabled. pub async fn commit<'b, E: sqlx::Executor<'b, Database = Postgres>>( executor: E, - task_ids: &[Uuid], + job_ids: &[Uuid], ) -> Result<(), sqlx::Error> { sqlx::query("SELECT mq_commit($1)") - .bind(task_ids) + .bind(job_ids) .execute(executor) .await?; Ok(()) diff --git a/src/utils.rs b/src/utils.rs index 593b486..6da6697 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -27,13 +27,13 @@ impl DerefMut for Opaque { } } -/// A handle to a background task which will be automatically cancelled if +/// A handle to a background job which will be automatically cancelled if /// the handle is dropped. Extract the inner join handle to prevent this /// behaviour. #[derive(Debug)] -pub struct OwnedTask(pub JoinHandle<()>); +pub struct OwnedHandle(pub JoinHandle<()>); -impl Drop for OwnedTask { +impl Drop for OwnedHandle { fn drop(&mut self) { self.0.abort(); }