2021-03-29 02:05:20 +00:00
|
|
|
#![deny(missing_docs, unsafe_code)]
|
|
|
|
//! # sqlxmq
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! A job queue built on `sqlx` and `PostgreSQL`.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! This library allows a CRUD application to run background jobs without complicating its
|
2021-03-29 02:05:20 +00:00
|
|
|
//! deployment. The only runtime dependency is `PostgreSQL`, so this is ideal for applications
|
|
|
|
//! already using a `PostgreSQL` database.
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Although using a SQL database as a job queue means compromising on latency of
|
|
|
|
//! delivered jobs, there are several show-stopping issues present in ordinary job
|
2021-03-29 02:05:20 +00:00
|
|
|
//! queues which are avoided altogether.
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! With most other job queues, in-flight jobs are state that is not covered by normal
|
|
|
|
//! database backups. Even if jobs _are_ backed up, there is no way to restore both
|
|
|
|
//! a database and a job queue to a consistent point-in-time without manually
|
2021-03-29 02:05:20 +00:00
|
|
|
//! resolving conflicts.
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! By storing jobs in the database, existing backup procedures will store a perfectly
|
|
|
|
//! consistent state of both in-flight jobs and persistent data. Additionally, jobs can
|
2021-03-29 02:05:20 +00:00
|
|
|
//! be spawned and completed as part of other transactions, making it easy to write correct
|
|
|
|
//! application code.
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Leveraging the power of `PostgreSQL`, this job queue offers several features not
|
|
|
|
//! present in other job queues.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! # Features
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! - **Send/receive multiple jobs at once.**
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! This reduces the number of queries to the database.
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! - **Send jobs to be executed at a future date and time.**
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! Avoids the need for a separate scheduling system.
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! - **Reliable delivery of jobs.**
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! - **Automatic retries with exponential backoff.**
|
|
|
|
//!
|
|
|
|
//! Number of retries and initial backoff parameters are configurable.
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! - **Transactional sending of jobs.**
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Avoids sending spurious jobs if a transaction is rolled back.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! - **Transactional completion of jobs.**
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! If all side-effects of a job are updates to the database, this provides
|
|
|
|
//! true exactly-once execution of jobs.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! - **Transactional check-pointing of jobs.**
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Long-running jobs can check-point their state to avoid having to restart
|
2021-03-29 02:05:20 +00:00
|
|
|
//! from the beginning if there is a failure: the next retry can continue
|
|
|
|
//! from the last check-point.
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! - **Opt-in strictly ordered job delivery.**
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Jobs within the same channel will be processed strictly in-order
|
|
|
|
//! if this option is enabled for the job.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! - **Fair job delivery.**
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! A channel with a lot of jobs ready to run will not starve a channel with fewer
|
|
|
|
//! jobs.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! - **Opt-in two-phase commit.**
|
|
|
|
//!
|
|
|
|
//! This is particularly useful on an ordered channel where a position can be "reserved"
|
2021-03-29 20:39:07 +00:00
|
|
|
//! in the job order, but not committed until later.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! - **JSON and/or binary payloads.**
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Jobs can use whichever is most convenient.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! - **Automatic keep-alive of jobs.**
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Long-running jobs will automatically be "kept alive" to prevent them being
|
2021-03-29 02:05:20 +00:00
|
|
|
//! retried whilst they're still ongoing.
|
|
|
|
//!
|
|
|
|
//! - **Concurrency limits.**
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Specify the minimum and maximum number of concurrent jobs each runner should
|
2021-03-29 02:05:20 +00:00
|
|
|
//! handle.
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! - **Built-in job registry via an attribute macro.**
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Jobs can be easily registered with a runner, and default configuration specified
|
|
|
|
//! on a per-job basis.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! - **Implicit channels.**
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Channels are implicitly created and destroyed when jobs are sent and processed,
|
2021-03-29 02:05:20 +00:00
|
|
|
//! so no setup is required.
|
|
|
|
//!
|
|
|
|
//! - **Channel groups.**
|
|
|
|
//!
|
|
|
|
//! Easily subscribe to multiple channels at once, thanks to the separation of
|
|
|
|
//! channel name and channel arguments.
|
|
|
|
//!
|
|
|
|
//! - **NOTIFY-based polling.**
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! This saves resources when few jobs are being processed.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! # Getting started
|
|
|
|
//!
|
2021-03-30 00:00:51 +00:00
|
|
|
//! ## Database schema
|
|
|
|
//!
|
|
|
|
//! This crate expects certain database tables and stored procedures to exist.
|
|
|
|
//! You can copy the migration files from this crate into your own migrations
|
|
|
|
//! folder.
|
|
|
|
//!
|
|
|
|
//! All database items created by this crate are prefixed with `mq`, so as not
|
|
|
|
//! to conflict with your own schema.
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! ## Defining jobs
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! The first step is to define a function to be run on the job queue.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! ```rust
|
2021-03-29 21:33:07 +00:00
|
|
|
//! use std::error::Error;
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! use sqlxmq::{job, CurrentJob};
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! // Arguments to the `#[job]` attribute allow setting default job options.
|
|
|
|
//! #[job(channel_name = "foo")]
|
|
|
|
//! async fn example_job(
|
2021-07-17 17:45:06 +00:00
|
|
|
//! // The first argument should always be the current job.
|
2021-03-29 20:39:07 +00:00
|
|
|
//! mut current_job: CurrentJob,
|
2021-07-17 17:45:06 +00:00
|
|
|
//! // Additional arguments are optional, but can be used to access context
|
|
|
|
//! // provided via [`JobRegistry::set_context`].
|
|
|
|
//! message: &'static str,
|
2021-03-29 21:33:07 +00:00
|
|
|
//! ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
|
2021-03-29 02:05:20 +00:00
|
|
|
//! // Decode a JSON payload
|
2021-03-29 20:39:07 +00:00
|
|
|
//! let who: Option<String> = current_job.json()?;
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! // Do some work
|
2021-07-17 17:45:06 +00:00
|
|
|
//! println!("{}, {}!", message, who.as_deref().unwrap_or("world"));
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! // Mark the job as complete
|
|
|
|
//! current_job.complete().await?;
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! Ok(())
|
|
|
|
//! }
|
|
|
|
//! ```
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! ## Listening for jobs
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! Next we need to create a job runner: this is what listens for new jobs
|
2021-03-29 02:05:20 +00:00
|
|
|
//! and executes them.
|
|
|
|
//!
|
2021-03-29 21:33:07 +00:00
|
|
|
//! ```rust,no_run
|
|
|
|
//! use std::error::Error;
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! use sqlxmq::JobRegistry;
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 21:33:07 +00:00
|
|
|
//! # use sqlxmq::{job, CurrentJob};
|
|
|
|
//! #
|
|
|
|
//! # #[job]
|
|
|
|
//! # async fn example_job(
|
|
|
|
//! # current_job: CurrentJob,
|
|
|
|
//! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { Ok(()) }
|
|
|
|
//! #
|
|
|
|
//! # async fn connect_to_db() -> sqlx::Result<sqlx::Pool<sqlx::Postgres>> {
|
|
|
|
//! # unimplemented!()
|
|
|
|
//! # }
|
|
|
|
//!
|
2021-03-29 02:05:20 +00:00
|
|
|
//! #[tokio::main]
|
|
|
|
//! async fn main() -> Result<(), Box<dyn Error>> {
|
|
|
|
//! // You'll need to provide a Postgres connection pool.
|
|
|
|
//! let pool = connect_to_db().await?;
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! // Construct a job registry from our single job.
|
|
|
|
//! let mut registry = JobRegistry::new(&[example_job]);
|
2021-03-29 02:05:20 +00:00
|
|
|
//! // Here is where you can configure the registry
|
|
|
|
//! // registry.set_error_handler(...)
|
|
|
|
//!
|
2021-07-17 17:45:06 +00:00
|
|
|
//! // And add context
|
|
|
|
//! registry.set_context("Hello");
|
|
|
|
//!
|
2021-03-29 02:05:20 +00:00
|
|
|
//! let runner = registry
|
2021-03-29 20:39:07 +00:00
|
|
|
//! // Create a job runner using the connection pool.
|
2021-03-29 02:05:20 +00:00
|
|
|
//! .runner(&pool)
|
2021-03-29 20:39:07 +00:00
|
|
|
//! // Here is where you can configure the job runner
|
|
|
|
//! // Aim to keep 10-20 jobs running at a time.
|
2021-03-29 02:05:20 +00:00
|
|
|
//! .set_concurrency(10, 20)
|
2021-03-29 20:39:07 +00:00
|
|
|
//! // Start the job runner in the background.
|
2021-03-29 02:05:20 +00:00
|
|
|
//! .run()
|
|
|
|
//! .await?;
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! // The job runner will continue listening and running
|
|
|
|
//! // jobs until `runner` is dropped.
|
2021-03-29 21:33:07 +00:00
|
|
|
//! Ok(())
|
2021-03-29 02:05:20 +00:00
|
|
|
//! }
|
|
|
|
//! ```
|
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! ## Spawning a job
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
2021-03-29 20:39:07 +00:00
|
|
|
//! The final step is to actually run a job.
|
2021-03-29 02:05:20 +00:00
|
|
|
//!
|
|
|
|
//! ```rust
|
2021-03-29 21:33:07 +00:00
|
|
|
//! # use std::error::Error;
|
|
|
|
//! # use sqlxmq::{job, CurrentJob};
|
|
|
|
//! #
|
|
|
|
//! # #[job]
|
|
|
|
//! # async fn example_job(
|
|
|
|
//! # current_job: CurrentJob,
|
|
|
|
//! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> { Ok(()) }
|
|
|
|
//! #
|
|
|
|
//! # async fn example(
|
|
|
|
//! # pool: sqlx::Pool<sqlx::Postgres>
|
|
|
|
//! # ) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
|
2021-03-29 22:06:47 +00:00
|
|
|
//! example_job.builder()
|
2021-03-30 00:00:51 +00:00
|
|
|
//! // This is where we can override job configuration
|
2021-03-29 02:05:20 +00:00
|
|
|
//! .set_channel_name("bar")
|
2021-03-29 21:33:07 +00:00
|
|
|
//! .set_json("John")?
|
2021-03-29 02:05:20 +00:00
|
|
|
//! .spawn(&pool)
|
|
|
|
//! .await?;
|
2021-03-29 21:33:07 +00:00
|
|
|
//! # Ok(())
|
|
|
|
//! # }
|
2021-03-29 02:05:20 +00:00
|
|
|
//! ```
|
|
|
|
|
|
|
|
#[doc(hidden)]
|
|
|
|
pub mod hidden;
|
|
|
|
mod registry;
|
2021-03-28 01:57:17 +00:00
|
|
|
mod runner;
|
|
|
|
mod spawn;
|
|
|
|
mod utils;
|
|
|
|
|
2021-03-29 02:05:20 +00:00
|
|
|
pub use registry::*;
|
2021-03-28 01:57:17 +00:00
|
|
|
pub use runner::*;
|
|
|
|
pub use spawn::*;
|
2021-03-29 20:39:07 +00:00
|
|
|
pub use sqlxmq_macros::job;
|
|
|
|
pub use utils::OwnedHandle;
|
2021-03-28 01:57:17 +00:00
|
|
|
|
2021-09-20 12:27:48 +00:00
|
|
|
/// Helper function to determine if a particular error condition is retryable.
|
|
|
|
///
|
|
|
|
/// For best results, database operations should be automatically retried if one
|
|
|
|
/// of these errors is returned.
|
|
|
|
pub fn should_retry(error: &sqlx::Error) -> bool {
|
|
|
|
if let Some(db_error) = error.as_database_error() {
|
2021-09-20 12:39:13 +00:00
|
|
|
// It's more readable as a match
|
|
|
|
#[allow(clippy::match_like_matches_macro)]
|
2022-01-12 10:18:30 +00:00
|
|
|
match (db_error.code().as_deref(), db_error.constraint()) {
|
2021-09-21 11:14:32 +00:00
|
|
|
// Foreign key constraint violation on ordered channel
|
|
|
|
(Some("23503"), Some("mq_msgs_after_message_id_fkey")) => true,
|
2021-09-20 12:27:48 +00:00
|
|
|
// Unique constraint violation on ordered channel
|
|
|
|
(Some("23505"), Some("mq_msgs_channel_name_channel_args_after_message_id_idx")) => true,
|
|
|
|
// Serialization failure
|
|
|
|
(Some("40001"), _) => true,
|
|
|
|
// Deadlock detected
|
|
|
|
(Some("40P01"), _) => true,
|
|
|
|
// Other
|
|
|
|
_ => false,
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-28 01:57:17 +00:00
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use super::*;
|
2021-03-29 02:05:20 +00:00
|
|
|
use crate as sqlxmq;
|
2021-03-28 01:57:17 +00:00
|
|
|
|
|
|
|
use std::env;
|
2021-03-29 02:05:20 +00:00
|
|
|
use std::error::Error;
|
2021-03-28 01:57:17 +00:00
|
|
|
use std::future::Future;
|
|
|
|
use std::ops::Deref;
|
|
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
use std::sync::{Arc, Once};
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use futures::channel::mpsc;
|
|
|
|
use futures::StreamExt;
|
|
|
|
use sqlx::{Pool, Postgres};
|
|
|
|
use tokio::sync::{Mutex, MutexGuard};
|
|
|
|
use tokio::task;
|
|
|
|
|
|
|
|
struct TestGuard<T>(MutexGuard<'static, ()>, T);
|
|
|
|
|
|
|
|
impl<T> Deref for TestGuard<T> {
|
|
|
|
type Target = T;
|
|
|
|
|
|
|
|
fn deref(&self) -> &T {
|
|
|
|
&self.1
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn test_pool() -> TestGuard<Pool<Postgres>> {
|
|
|
|
static INIT_LOGGER: Once = Once::new();
|
|
|
|
static TEST_MUTEX: Mutex<()> = Mutex::const_new(());
|
|
|
|
|
|
|
|
let guard = TEST_MUTEX.lock().await;
|
|
|
|
|
|
|
|
let _ = dotenv::dotenv();
|
|
|
|
|
2021-09-21 12:14:53 +00:00
|
|
|
INIT_LOGGER.call_once(pretty_env_logger::init);
|
2021-03-28 01:57:17 +00:00
|
|
|
|
|
|
|
let pool = Pool::connect(&env::var("DATABASE_URL").unwrap())
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
sqlx::query("TRUNCATE TABLE mq_payloads")
|
|
|
|
.execute(&pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
sqlx::query("DELETE FROM mq_msgs WHERE id != uuid_nil()")
|
|
|
|
.execute(&pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
TestGuard(guard, pool)
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
async fn test_job_runner<F: Future + Send + 'static>(
|
2021-03-28 01:57:17 +00:00
|
|
|
pool: &Pool<Postgres>,
|
2021-03-29 20:39:07 +00:00
|
|
|
f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static,
|
2022-07-16 21:47:57 +00:00
|
|
|
) -> (JobRunnerHandle, Arc<AtomicUsize>)
|
2021-03-28 01:57:17 +00:00
|
|
|
where
|
|
|
|
F::Output: Send + 'static,
|
|
|
|
{
|
|
|
|
let counter = Arc::new(AtomicUsize::new(0));
|
|
|
|
let counter2 = counter.clone();
|
2021-03-29 20:39:07 +00:00
|
|
|
let runner = JobRunnerOptions::new(pool, move |job| {
|
2021-03-28 01:57:17 +00:00
|
|
|
counter2.fetch_add(1, Ordering::SeqCst);
|
2021-03-29 20:39:07 +00:00
|
|
|
task::spawn(f(job));
|
2021-03-28 01:57:17 +00:00
|
|
|
})
|
|
|
|
.run()
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
(runner, counter)
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
fn job_proto<'a, 'b>(builder: &'a mut JobBuilder<'b>) -> &'a mut JobBuilder<'b> {
|
2021-03-29 02:05:20 +00:00
|
|
|
builder.set_channel_name("bar")
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
#[job(channel_name = "foo", ordered, retries = 3, backoff_secs = 2.0)]
|
|
|
|
async fn example_job1(
|
|
|
|
mut current_job: CurrentJob,
|
2021-03-29 02:05:20 +00:00
|
|
|
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
|
2021-03-29 20:39:07 +00:00
|
|
|
current_job.complete().await?;
|
2021-03-29 02:05:20 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-03-29 20:39:07 +00:00
|
|
|
#[job(proto(job_proto))]
|
|
|
|
async fn example_job2(
|
|
|
|
mut current_job: CurrentJob,
|
2021-03-29 02:05:20 +00:00
|
|
|
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
|
2021-03-29 20:39:07 +00:00
|
|
|
current_job.complete().await?;
|
2021-03-29 02:05:20 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-07-17 17:45:06 +00:00
|
|
|
#[job]
|
|
|
|
async fn example_job_with_ctx(
|
|
|
|
mut current_job: CurrentJob,
|
|
|
|
ctx1: i32,
|
|
|
|
ctx2: &'static str,
|
|
|
|
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
|
|
|
|
assert_eq!(ctx1, 42);
|
|
|
|
assert_eq!(ctx2, "Hello, world!");
|
|
|
|
current_job.complete().await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-07-16 21:47:57 +00:00
|
|
|
async fn named_job_runner(pool: &Pool<Postgres>) -> JobRunnerHandle {
|
2021-07-17 17:45:06 +00:00
|
|
|
let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]);
|
|
|
|
registry.set_context(42).set_context("Hello, world!");
|
|
|
|
registry.runner(pool).run().await.unwrap()
|
2021-03-29 02:05:20 +00:00
|
|
|
}
|
|
|
|
|
2022-07-13 13:59:27 +00:00
|
|
|
fn is_ci() -> bool {
|
|
|
|
std::env::var("CI").ok().is_some()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn default_pause() -> u64 {
|
|
|
|
if is_ci() {
|
|
|
|
1000
|
|
|
|
} else {
|
|
|
|
200
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-28 01:57:17 +00:00
|
|
|
async fn pause() {
|
2022-07-13 13:59:27 +00:00
|
|
|
pause_ms(default_pause()).await;
|
2021-03-28 01:57:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn pause_ms(ms: u64) {
|
|
|
|
tokio::time::sleep(Duration::from_millis(ms)).await;
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
2021-03-29 20:39:07 +00:00
|
|
|
async fn it_can_spawn_job() {
|
2021-07-17 17:45:06 +00:00
|
|
|
{
|
|
|
|
let pool = &*test_pool().await;
|
|
|
|
let (_runner, counter) =
|
2021-09-21 12:14:53 +00:00
|
|
|
test_job_runner(pool, |mut job| async move { job.complete().await }).await;
|
2021-07-17 17:45:06 +00:00
|
|
|
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
|
|
|
JobBuilder::new("foo").spawn(pool).await.unwrap();
|
|
|
|
pause().await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
|
|
|
}
|
2021-03-28 01:57:17 +00:00
|
|
|
pause().await;
|
|
|
|
}
|
|
|
|
|
2021-09-20 12:27:48 +00:00
|
|
|
#[tokio::test]
|
|
|
|
async fn it_can_clear_jobs() {
|
|
|
|
{
|
|
|
|
let pool = &*test_pool().await;
|
|
|
|
JobBuilder::new("foo")
|
|
|
|
.set_channel_name("foo")
|
|
|
|
.spawn(pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
JobBuilder::new("foo")
|
|
|
|
.set_channel_name("foo")
|
|
|
|
.spawn(pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
JobBuilder::new("foo")
|
|
|
|
.set_channel_name("bar")
|
|
|
|
.spawn(pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
JobBuilder::new("foo")
|
|
|
|
.set_channel_name("bar")
|
|
|
|
.spawn(pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
JobBuilder::new("foo")
|
|
|
|
.set_channel_name("baz")
|
|
|
|
.spawn(pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
JobBuilder::new("foo")
|
|
|
|
.set_channel_name("baz")
|
|
|
|
.spawn(pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
sqlxmq::clear(pool, &["foo", "baz"]).await.unwrap();
|
|
|
|
|
|
|
|
let (_runner, counter) =
|
2021-09-21 12:14:53 +00:00
|
|
|
test_job_runner(pool, |mut job| async move { job.complete().await }).await;
|
2021-09-20 12:27:48 +00:00
|
|
|
|
|
|
|
pause().await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
|
|
|
}
|
|
|
|
pause().await;
|
|
|
|
}
|
|
|
|
|
2021-03-28 01:57:17 +00:00
|
|
|
#[tokio::test]
|
2021-03-29 20:39:07 +00:00
|
|
|
async fn it_runs_jobs_in_order() {
|
2021-07-17 17:45:06 +00:00
|
|
|
{
|
|
|
|
let pool = &*test_pool().await;
|
|
|
|
let (tx, mut rx) = mpsc::unbounded();
|
|
|
|
|
2021-09-21 12:14:53 +00:00
|
|
|
let (_runner, counter) = test_job_runner(pool, move |job| {
|
2021-07-17 17:45:06 +00:00
|
|
|
let tx = tx.clone();
|
|
|
|
async move {
|
|
|
|
tx.unbounded_send(job).unwrap();
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.await;
|
|
|
|
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
|
|
|
JobBuilder::new("foo")
|
|
|
|
.set_ordered(true)
|
|
|
|
.spawn(pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
JobBuilder::new("bar")
|
|
|
|
.set_ordered(true)
|
|
|
|
.spawn(pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
pause().await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
2021-03-28 01:57:17 +00:00
|
|
|
|
2021-07-17 17:45:06 +00:00
|
|
|
let mut job = rx.next().await.unwrap();
|
|
|
|
job.complete().await.unwrap();
|
2021-03-28 01:57:17 +00:00
|
|
|
|
2021-07-17 17:45:06 +00:00
|
|
|
pause().await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
|
|
|
}
|
2021-03-28 01:57:17 +00:00
|
|
|
pause().await;
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
2021-03-29 20:39:07 +00:00
|
|
|
async fn it_runs_jobs_in_parallel() {
|
2021-07-17 17:45:06 +00:00
|
|
|
{
|
|
|
|
let pool = &*test_pool().await;
|
|
|
|
let (tx, mut rx) = mpsc::unbounded();
|
|
|
|
|
2021-09-21 12:14:53 +00:00
|
|
|
let (_runner, counter) = test_job_runner(pool, move |job| {
|
2021-07-17 17:45:06 +00:00
|
|
|
let tx = tx.clone();
|
|
|
|
async move {
|
|
|
|
tx.unbounded_send(job).unwrap();
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.await;
|
|
|
|
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
|
|
|
JobBuilder::new("foo").spawn(pool).await.unwrap();
|
|
|
|
JobBuilder::new("bar").spawn(pool).await.unwrap();
|
|
|
|
|
|
|
|
pause().await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
|
|
|
|
|
|
|
for _ in 0..2 {
|
|
|
|
let mut job = rx.next().await.unwrap();
|
|
|
|
job.complete().await.unwrap();
|
2021-03-28 01:57:17 +00:00
|
|
|
}
|
|
|
|
}
|
2021-07-17 17:45:06 +00:00
|
|
|
pause().await;
|
2021-03-28 01:57:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
2021-03-29 20:39:07 +00:00
|
|
|
async fn it_retries_failed_jobs() {
|
2021-07-17 17:45:06 +00:00
|
|
|
{
|
|
|
|
let pool = &*test_pool().await;
|
2021-09-21 12:14:53 +00:00
|
|
|
let (_runner, counter) = test_job_runner(pool, move |_| async {}).await;
|
2021-07-17 17:45:06 +00:00
|
|
|
|
2022-07-13 13:59:27 +00:00
|
|
|
let backoff = default_pause() + 300;
|
2021-07-17 17:45:06 +00:00
|
|
|
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
|
|
|
JobBuilder::new("foo")
|
|
|
|
.set_retry_backoff(Duration::from_millis(backoff))
|
|
|
|
.set_retries(2)
|
|
|
|
.spawn(pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
// First attempt
|
|
|
|
pause().await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
|
|
|
|
|
|
|
// Second attempt
|
|
|
|
pause_ms(backoff).await;
|
|
|
|
pause().await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
|
|
|
|
|
|
|
// Third attempt
|
|
|
|
pause_ms(backoff * 2).await;
|
|
|
|
pause().await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 3);
|
|
|
|
|
|
|
|
// No more attempts
|
|
|
|
pause_ms(backoff * 5).await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 3);
|
|
|
|
}
|
2021-03-28 01:57:17 +00:00
|
|
|
pause().await;
|
|
|
|
}
|
2021-03-29 02:05:20 +00:00
|
|
|
|
|
|
|
#[tokio::test]
|
2021-03-29 20:39:07 +00:00
|
|
|
async fn it_can_checkpoint_jobs() {
|
2021-07-17 17:45:06 +00:00
|
|
|
{
|
|
|
|
let pool = &*test_pool().await;
|
2021-09-21 12:14:53 +00:00
|
|
|
let (_runner, counter) = test_job_runner(pool, move |mut current_job| async move {
|
2021-07-17 17:45:06 +00:00
|
|
|
let state: bool = current_job.json().unwrap().unwrap();
|
|
|
|
if state {
|
|
|
|
current_job.complete().await.unwrap();
|
|
|
|
} else {
|
|
|
|
current_job
|
|
|
|
.checkpoint(Checkpoint::new().set_json(&true).unwrap())
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.await;
|
|
|
|
|
2022-07-13 13:59:27 +00:00
|
|
|
let backoff = default_pause();
|
2021-07-17 17:45:06 +00:00
|
|
|
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
|
|
|
JobBuilder::new("foo")
|
|
|
|
.set_retry_backoff(Duration::from_millis(backoff))
|
|
|
|
.set_retries(5)
|
|
|
|
.set_json(&false)
|
|
|
|
.unwrap()
|
|
|
|
.spawn(pool)
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
// First attempt
|
|
|
|
pause().await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
|
|
|
|
|
|
|
// Second attempt
|
|
|
|
pause_ms(backoff).await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
|
|
|
|
|
|
|
// No more attempts
|
|
|
|
pause_ms(backoff * 3).await;
|
|
|
|
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
|
|
|
}
|
2021-03-29 02:05:20 +00:00
|
|
|
pause().await;
|
|
|
|
}
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
async fn it_can_use_registry() {
|
2021-07-17 17:45:06 +00:00
|
|
|
{
|
|
|
|
let pool = &*test_pool().await;
|
|
|
|
let _runner = named_job_runner(pool).await;
|
|
|
|
|
|
|
|
example_job1.builder().spawn(pool).await.unwrap();
|
|
|
|
example_job2.builder().spawn(pool).await.unwrap();
|
|
|
|
example_job_with_ctx.builder().spawn(pool).await.unwrap();
|
|
|
|
pause().await;
|
|
|
|
}
|
2021-03-29 02:05:20 +00:00
|
|
|
pause().await;
|
|
|
|
}
|
2021-03-28 01:57:17 +00:00
|
|
|
}
|