#![deny(missing_docs, unsafe_code)] //! # sqlxmq //! //! A job queue built on `sqlx` and `PostgreSQL`. //! //! This library allows a CRUD application to run background jobs without complicating its //! deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications //! already using a `PostgreSQL` database. //! //! Although using a SQL database as a job queue means compromising on latency of //! delivered jobs, there are several show-stopping issues present in ordinary job //! queues which are avoided altogether. //! //! With most other job queues, in-flight jobs are state that is not covered by normal //! database backups. Even if jobs _are_ backed up, there is no way to restore both //! a database and a job queue to a consistent point-in-time without manually //! resolving conflicts. //! //! By storing jobs in the database, existing backup procedures will store a perfectly //! consistent state of both in-flight jobs and persistent data. Additionally, jobs can //! be spawned and completed as part of other transactions, making it easy to write correct //! application code. //! //! Leveraging the power of `PostgreSQL`, this job queue offers several features not //! present in other job queues. //! //! # Features //! //! - **Send/receive multiple jobs at once.** //! //! This reduces the number of queries to the database. //! //! - **Send jobs to be executed at a future date and time.** //! //! Avoids the need for a separate scheduling system. //! //! - **Reliable delivery of jobs.** //! //! - **Automatic retries with exponential backoff.** //! //! Number of retries and initial backoff parameters are configurable. //! //! - **Transactional sending of jobs.** //! //! Avoids sending spurious jobs if a transaction is rolled back. //! //! - **Transactional completion of jobs.** //! //! If all side-effects of a job are updates to the database, this provides //! true exactly-once execution of jobs. //! //! - **Transactional check-pointing of jobs.** //! //! Long-running jobs can check-point their state to avoid having to restart //! from the beginning if there is a failure: the next retry can continue //! from the last check-point. //! //! - **Opt-in strictly ordered job delivery.** //! //! Jobs within the same channel will be processed strictly in-order //! if this option is enabled for the job. //! //! - **Fair job delivery.** //! //! A channel with a lot of jobs ready to run will not starve a channel with fewer //! jobs. //! //! - **Opt-in two-phase commit.** //! //! This is particularly useful on an ordered channel where a position can be "reserved" //! in the job order, but not committed until later. //! //! - **JSON and/or binary payloads.** //! //! Jobs can use whichever is most convenient. //! //! - **Automatic keep-alive of jobs.** //! //! Long-running jobs will automatically be "kept alive" to prevent them being //! retried whilst they're still ongoing. //! //! - **Concurrency limits.** //! //! Specify the minimum and maximum number of concurrent jobs each runner should //! handle. //! //! - **Built-in job registry via an attribute macro.** //! //! Jobs can be easily registered with a runner, and default configuration specified //! on a per-job basis. //! //! - **Implicit channels.** //! //! Channels are implicitly created and destroyed when jobs are sent and processed, //! so no setup is required. //! //! - **Channel groups.** //! //! Easily subscribe to multiple channels at once, thanks to the separation of //! channel name and channel arguments. //! //! - **NOTIFY-based polling.** //! //! This saves resources when few jobs are being processed. //! //! # Getting started //! //! ## Database schema //! //! This crate expects certain database tables and stored procedures to exist. //! You can copy the migration files from this crate into your own migrations //! folder. //! //! All database items created by this crate are prefixed with `mq`, so as not //! to conflict with your own schema. //! //! ## Defining jobs //! //! The first step is to define a function to be run on the job queue. //! //! ```rust //! use std::error::Error; //! //! use sqlxmq::{job, CurrentJob}; //! //! // Arguments to the `#[job]` attribute allow setting default job options. //! #[job(channel_name = "foo")] //! async fn example_job( //! // The first argument should always be the current job. //! mut current_job: CurrentJob, //! // Additional arguments are optional, but can be used to access context //! // provided via [`JobRegistry::set_context`]. //! message: &'static str, //! ) -> Result<(), Box> { //! // Decode a JSON payload //! let who: Option = current_job.json()?; //! //! // Do some work //! println!("{}, {}!", message, who.as_deref().unwrap_or("world")); //! //! // Mark the job as complete //! current_job.complete().await?; //! //! Ok(()) //! } //! ``` //! //! ## Listening for jobs //! //! Next we need to create a job runner: this is what listens for new jobs //! and executes them. //! //! ```rust,no_run //! use std::error::Error; //! //! use sqlxmq::JobRegistry; //! //! # use sqlxmq::{job, CurrentJob}; //! # //! # #[job] //! # async fn example_job( //! # current_job: CurrentJob, //! # ) -> Result<(), Box> { Ok(()) } //! # //! # async fn connect_to_db() -> sqlx::Result> { //! # unimplemented!() //! # } //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { //! // You'll need to provide a Postgres connection pool. //! let pool = connect_to_db().await?; //! //! // Construct a job registry from our single job. //! let mut registry = JobRegistry::new(&[example_job]); //! // Here is where you can configure the registry //! // registry.set_error_handler(...) //! //! // And add context //! registry.set_context("Hello"); //! //! let runner = registry //! // Create a job runner using the connection pool. //! .runner(&pool) //! // Here is where you can configure the job runner //! // Aim to keep 10-20 jobs running at a time. //! .set_concurrency(10, 20) //! // Start the job runner in the background. //! .run() //! .await?; //! //! // The job runner will continue listening and running //! // jobs until `runner` is dropped. //! Ok(()) //! } //! ``` //! //! ## Spawning a job //! //! The final step is to actually run a job. //! //! ```rust //! # use std::error::Error; //! # use sqlxmq::{job, CurrentJob}; //! # //! # #[job] //! # async fn example_job( //! # current_job: CurrentJob, //! # ) -> Result<(), Box> { Ok(()) } //! # //! # async fn example( //! # pool: sqlx::Pool //! # ) -> Result<(), Box> { //! example_job.builder() //! // This is where we can override job configuration //! .set_channel_name("bar") //! .set_json("John")? //! .spawn(&pool) //! .await?; //! # Ok(()) //! # } //! ``` #[doc(hidden)] pub mod hidden; mod registry; mod runner; mod spawn; mod utils; pub use registry::*; pub use runner::*; pub use spawn::*; pub use sqlxmq_macros::job; pub use utils::OwnedHandle; /// Helper function to determine if a particular error condition is retryable. /// /// For best results, database operations should be automatically retried if one /// of these errors is returned. pub fn should_retry(error: &sqlx::Error) -> bool { if let Some(db_error) = error.as_database_error() { // It's more readable as a match #[allow(clippy::match_like_matches_macro)] match (db_error.code().as_deref(), db_error.constraint()) { // Foreign key constraint violation on ordered channel (Some("23503"), Some("mq_msgs_after_message_id_fkey")) => true, // Unique constraint violation on ordered channel (Some("23505"), Some("mq_msgs_channel_name_channel_args_after_message_id_idx")) => true, // Serialization failure (Some("40001"), _) => true, // Deadlock detected (Some("40P01"), _) => true, // Other _ => false, } } else { false } } #[cfg(test)] mod tests { use super::*; use crate as sqlxmq; use std::env; use std::error::Error; use std::future::Future; use std::ops::Deref; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Once}; use std::time::Duration; use futures::channel::mpsc; use futures::StreamExt; use sqlx::{Pool, Postgres}; use tokio::sync::{Mutex, MutexGuard}; use tokio::task; struct TestGuard(MutexGuard<'static, ()>, T); impl Deref for TestGuard { type Target = T; fn deref(&self) -> &T { &self.1 } } async fn test_pool() -> TestGuard> { static INIT_LOGGER: Once = Once::new(); static TEST_MUTEX: Mutex<()> = Mutex::const_new(()); let guard = TEST_MUTEX.lock().await; let _ = dotenv::dotenv(); INIT_LOGGER.call_once(pretty_env_logger::init); let pool = Pool::connect(&env::var("DATABASE_URL").unwrap()) .await .unwrap(); sqlx::query("TRUNCATE TABLE mq_payloads") .execute(&pool) .await .unwrap(); sqlx::query("DELETE FROM mq_msgs WHERE id != uuid_nil()") .execute(&pool) .await .unwrap(); TestGuard(guard, pool) } async fn test_job_runner( pool: &Pool, f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static, ) -> (JobRunnerHandle, Arc) where F::Output: Send + 'static, { let counter = Arc::new(AtomicUsize::new(0)); let counter2 = counter.clone(); let runner = JobRunnerOptions::new(pool, move |job| { counter2.fetch_add(1, Ordering::SeqCst); task::spawn(f(job)); }) .run() .await .unwrap(); (runner, counter) } fn job_proto<'a, 'b>(builder: &'a mut JobBuilder<'b>) -> &'a mut JobBuilder<'b> { builder.set_channel_name("bar") } #[job(channel_name = "foo", ordered, retries = 3, backoff_secs = 2.0)] async fn example_job1( mut current_job: CurrentJob, ) -> Result<(), Box> { current_job.complete().await?; Ok(()) } #[job(proto(job_proto))] async fn example_job2( mut current_job: CurrentJob, ) -> Result<(), Box> { current_job.complete().await?; Ok(()) } #[job] async fn example_job_with_ctx( mut current_job: CurrentJob, ctx1: i32, ctx2: &'static str, ) -> Result<(), Box> { assert_eq!(ctx1, 42); assert_eq!(ctx2, "Hello, world!"); current_job.complete().await?; Ok(()) } async fn named_job_runner(pool: &Pool) -> JobRunnerHandle { let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]); registry.set_context(42).set_context("Hello, world!"); registry.runner(pool).run().await.unwrap() } fn is_ci() -> bool { std::env::var("CI").ok().is_some() } fn default_pause() -> u64 { if is_ci() { 1000 } else { 200 } } async fn pause() { pause_ms(default_pause()).await; } async fn pause_ms(ms: u64) { tokio::time::sleep(Duration::from_millis(ms)).await; } #[tokio::test] async fn it_can_spawn_job() { { let pool = &*test_pool().await; let (_runner, counter) = test_job_runner(pool, |mut job| async move { job.complete().await }).await; assert_eq!(counter.load(Ordering::SeqCst), 0); JobBuilder::new("foo").spawn(pool).await.unwrap(); pause().await; assert_eq!(counter.load(Ordering::SeqCst), 1); } pause().await; } #[tokio::test] async fn it_can_clear_jobs() { { let pool = &*test_pool().await; JobBuilder::new("foo") .set_channel_name("foo") .spawn(pool) .await .unwrap(); JobBuilder::new("foo") .set_channel_name("foo") .spawn(pool) .await .unwrap(); JobBuilder::new("foo") .set_channel_name("bar") .spawn(pool) .await .unwrap(); JobBuilder::new("foo") .set_channel_name("bar") .spawn(pool) .await .unwrap(); JobBuilder::new("foo") .set_channel_name("baz") .spawn(pool) .await .unwrap(); JobBuilder::new("foo") .set_channel_name("baz") .spawn(pool) .await .unwrap(); sqlxmq::clear(pool, &["foo", "baz"]).await.unwrap(); let (_runner, counter) = test_job_runner(pool, |mut job| async move { job.complete().await }).await; pause().await; assert_eq!(counter.load(Ordering::SeqCst), 2); } pause().await; } #[tokio::test] async fn it_runs_jobs_in_order() { { let pool = &*test_pool().await; let (tx, mut rx) = mpsc::unbounded(); let (_runner, counter) = test_job_runner(pool, move |job| { let tx = tx.clone(); async move { tx.unbounded_send(job).unwrap(); } }) .await; assert_eq!(counter.load(Ordering::SeqCst), 0); JobBuilder::new("foo") .set_ordered(true) .spawn(pool) .await .unwrap(); JobBuilder::new("bar") .set_ordered(true) .spawn(pool) .await .unwrap(); pause().await; assert_eq!(counter.load(Ordering::SeqCst), 1); let mut job = rx.next().await.unwrap(); job.complete().await.unwrap(); pause().await; assert_eq!(counter.load(Ordering::SeqCst), 2); } pause().await; } #[tokio::test] async fn it_runs_jobs_in_parallel() { { let pool = &*test_pool().await; let (tx, mut rx) = mpsc::unbounded(); let (_runner, counter) = test_job_runner(pool, move |job| { let tx = tx.clone(); async move { tx.unbounded_send(job).unwrap(); } }) .await; assert_eq!(counter.load(Ordering::SeqCst), 0); JobBuilder::new("foo").spawn(pool).await.unwrap(); JobBuilder::new("bar").spawn(pool).await.unwrap(); pause().await; assert_eq!(counter.load(Ordering::SeqCst), 2); for _ in 0..2 { let mut job = rx.next().await.unwrap(); job.complete().await.unwrap(); } } pause().await; } #[tokio::test] async fn it_retries_failed_jobs() { { let pool = &*test_pool().await; let (_runner, counter) = test_job_runner(pool, move |_| async {}).await; let backoff = default_pause() + 300; assert_eq!(counter.load(Ordering::SeqCst), 0); JobBuilder::new("foo") .set_retry_backoff(Duration::from_millis(backoff)) .set_retries(2) .spawn(pool) .await .unwrap(); // First attempt pause().await; assert_eq!(counter.load(Ordering::SeqCst), 1); // Second attempt pause_ms(backoff).await; pause().await; assert_eq!(counter.load(Ordering::SeqCst), 2); // Third attempt pause_ms(backoff * 2).await; pause().await; assert_eq!(counter.load(Ordering::SeqCst), 3); // No more attempts pause_ms(backoff * 5).await; assert_eq!(counter.load(Ordering::SeqCst), 3); } pause().await; } #[tokio::test] async fn it_can_checkpoint_jobs() { { let pool = &*test_pool().await; let (_runner, counter) = test_job_runner(pool, move |mut current_job| async move { let state: bool = current_job.json().unwrap().unwrap(); if state { current_job.complete().await.unwrap(); } else { current_job .checkpoint(Checkpoint::new().set_json(&true).unwrap()) .await .unwrap(); } }) .await; let backoff = default_pause(); assert_eq!(counter.load(Ordering::SeqCst), 0); JobBuilder::new("foo") .set_retry_backoff(Duration::from_millis(backoff)) .set_retries(5) .set_json(&false) .unwrap() .spawn(pool) .await .unwrap(); // First attempt pause().await; assert_eq!(counter.load(Ordering::SeqCst), 1); // Second attempt pause_ms(backoff).await; assert_eq!(counter.load(Ordering::SeqCst), 2); // No more attempts pause_ms(backoff * 3).await; assert_eq!(counter.load(Ordering::SeqCst), 2); } pause().await; } #[tokio::test] async fn it_can_use_registry() { { let pool = &*test_pool().await; let _runner = named_job_runner(pool).await; example_job1.builder().spawn(pool).await.unwrap(); example_job2.builder().spawn(pool).await.unwrap(); example_job_with_ctx.builder().spawn(pool).await.unwrap(); pause().await; } pause().await; } }