diff --git a/Cargo.toml b/Cargo.toml index 8b62964..ece3ced 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sqlxmq" -version = "0.2.2" +version = "0.3.0" authors = ["Diggory Blake "] edition = "2018" license = "MIT OR Apache-2.0" @@ -23,7 +23,7 @@ uuid = { version = "0.8.2", features = ["v4"] } log = "0.4.14" serde_json = "1.0.64" serde = "1.0.124" -sqlxmq_macros = { version = "0.2.2", path = "sqlxmq_macros" } +sqlxmq_macros = { version = "0.3.0", path = "sqlxmq_macros" } anymap2 = "0.13.0" [features] diff --git a/migrations/20210316025847_setup.down.sql b/migrations/20210316025847_setup.down.sql index 933f6d4..f40b008 100644 --- a/migrations/20210316025847_setup.down.sql +++ b/migrations/20210316025847_setup.down.sql @@ -1,3 +1,5 @@ +DROP FUNCTION mq_clear; +DROP FUNCTION mq_clear_all; DROP FUNCTION mq_checkpoint; DROP FUNCTION mq_keep_alive; DROP FUNCTION mq_delete; @@ -9,4 +11,4 @@ DROP FUNCTION mq_latest_message; DROP TABLE mq_payloads; DROP TABLE mq_msgs; DROP FUNCTION mq_uuid_exists; -DROP TYPE mq_new_t; \ No newline at end of file +DROP TYPE mq_new_t; diff --git a/migrations/20210316025847_setup.up.sql b/migrations/20210316025847_setup.up.sql index bcb1ab5..187c7e3 100644 --- a/migrations/20210316025847_setup.up.sql +++ b/migrations/20210316025847_setup.up.sql @@ -56,7 +56,7 @@ CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL; -- Index for ensuring strict message order -CREATE UNIQUE INDEX ON mq_msgs(channel_name, channel_args, after_message_id); +CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id); -- Large, less frequently updated table of message payloads @@ -286,3 +286,25 @@ RETURNS VOID AS $$ WHERE id = msg_id; $$ LANGUAGE SQL; + +-- Deletes all messages from a list of channel names. +CREATE FUNCTION mq_clear(channel_names TEXT[]) +RETURNS VOID AS $$ +BEGIN + WITH deleted_ids AS ( + DELETE FROM mq_msgs WHERE channel_name = ANY(channel_names) RETURNING id + ) + DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); +END; +$$ LANGUAGE plpgsql; + +-- Deletes all messages. +CREATE FUNCTION mq_clear_all() +RETURNS VOID AS $$ +BEGIN + WITH deleted_ids AS ( + DELETE FROM mq_msgs RETURNING id + ) + DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); +END; +$$ LANGUAGE plpgsql; diff --git a/sqlxmq_macros/Cargo.toml b/sqlxmq_macros/Cargo.toml index 86d2f11..ff4b209 100644 --- a/sqlxmq_macros/Cargo.toml +++ b/sqlxmq_macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sqlxmq_macros" -version = "0.2.2" +version = "0.3.0" authors = ["Diggory Blake "] edition = "2018" license = "MIT OR Apache-2.0" diff --git a/sqlxmq_stress/migrations/20210316025847_setup.down.sql b/sqlxmq_stress/migrations/20210316025847_setup.down.sql index 933f6d4..f40b008 100644 --- a/sqlxmq_stress/migrations/20210316025847_setup.down.sql +++ b/sqlxmq_stress/migrations/20210316025847_setup.down.sql @@ -1,3 +1,5 @@ +DROP FUNCTION mq_clear; +DROP FUNCTION mq_clear_all; DROP FUNCTION mq_checkpoint; DROP FUNCTION mq_keep_alive; DROP FUNCTION mq_delete; @@ -9,4 +11,4 @@ DROP FUNCTION mq_latest_message; DROP TABLE mq_payloads; DROP TABLE mq_msgs; DROP FUNCTION mq_uuid_exists; -DROP TYPE mq_new_t; \ No newline at end of file +DROP TYPE mq_new_t; diff --git a/sqlxmq_stress/migrations/20210316025847_setup.up.sql b/sqlxmq_stress/migrations/20210316025847_setup.up.sql index bcb1ab5..187c7e3 100644 --- a/sqlxmq_stress/migrations/20210316025847_setup.up.sql +++ b/sqlxmq_stress/migrations/20210316025847_setup.up.sql @@ -56,7 +56,7 @@ CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL; -- Index for ensuring strict message order -CREATE UNIQUE INDEX ON mq_msgs(channel_name, channel_args, after_message_id); +CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id); -- Large, less frequently updated table of message payloads @@ -286,3 +286,25 @@ RETURNS VOID AS $$ WHERE id = msg_id; $$ LANGUAGE SQL; + +-- Deletes all messages from a list of channel names. +CREATE FUNCTION mq_clear(channel_names TEXT[]) +RETURNS VOID AS $$ +BEGIN + WITH deleted_ids AS ( + DELETE FROM mq_msgs WHERE channel_name = ANY(channel_names) RETURNING id + ) + DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); +END; +$$ LANGUAGE plpgsql; + +-- Deletes all messages. +CREATE FUNCTION mq_clear_all() +RETURNS VOID AS $$ +BEGIN + WITH deleted_ids AS ( + DELETE FROM mq_msgs RETURNING id + ) + DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids); +END; +$$ LANGUAGE plpgsql; diff --git a/sqlxmq_stress/src/main.rs b/sqlxmq_stress/src/main.rs index 5ed2d0c..1721b7b 100644 --- a/sqlxmq_stress/src/main.rs +++ b/sqlxmq_stress/src/main.rs @@ -1,5 +1,6 @@ use std::env; use std::error::Error; +use std::process::abort; use std::sync::RwLock; use std::time::{Duration, Instant}; @@ -66,7 +67,13 @@ async fn start_job( async fn schedule_tasks(num_jobs: usize, interval: Duration, pool: Pool) { let mut stream = tokio::time::interval(interval); for i in 0..num_jobs { - task::spawn(start_job(pool.clone(), i)); + let pool = pool.clone(); + task::spawn(async move { + if let Err(e) = start_job(pool, i).await { + eprintln!("Failed to start job: {:?}", e); + abort(); + } + }); stream.tick().await; } } @@ -77,6 +84,9 @@ async fn main() -> Result<(), Box> { let pool = Pool::connect(&env::var("DATABASE_URL")?).await?; + // Make sure the queues are empty + sqlxmq::clear_all(&pool).await?; + let registry = JobRegistry::new(&[example_job]); let _runner = registry diff --git a/src/lib.rs b/src/lib.rs index 3867955..558f707 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -234,6 +234,27 @@ pub use spawn::*; pub use sqlxmq_macros::job; pub use utils::OwnedHandle; +/// Helper function to determine if a particular error condition is retryable. +/// +/// For best results, database operations should be automatically retried if one +/// of these errors is returned. +pub fn should_retry(error: &sqlx::Error) -> bool { + if let Some(db_error) = error.as_database_error() { + match (db_error.code().as_deref(), db_error.constraint().as_deref()) { + // Unique constraint violation on ordered channel + (Some("23505"), Some("mq_msgs_channel_name_channel_args_after_message_id_idx")) => true, + // Serialization failure + (Some("40001"), _) => true, + // Deadlock detected + (Some("40P01"), _) => true, + // Other + _ => false, + } + } else { + false + } +} + #[cfg(test)] mod tests { use super::*; @@ -369,6 +390,52 @@ mod tests { pause().await; } + #[tokio::test] + async fn it_can_clear_jobs() { + { + let pool = &*test_pool().await; + JobBuilder::new("foo") + .set_channel_name("foo") + .spawn(pool) + .await + .unwrap(); + JobBuilder::new("foo") + .set_channel_name("foo") + .spawn(pool) + .await + .unwrap(); + JobBuilder::new("foo") + .set_channel_name("bar") + .spawn(pool) + .await + .unwrap(); + JobBuilder::new("foo") + .set_channel_name("bar") + .spawn(pool) + .await + .unwrap(); + JobBuilder::new("foo") + .set_channel_name("baz") + .spawn(pool) + .await + .unwrap(); + JobBuilder::new("foo") + .set_channel_name("baz") + .spawn(pool) + .await + .unwrap(); + + sqlxmq::clear(pool, &["foo", "baz"]).await.unwrap(); + + let (_runner, counter) = + test_job_runner(&pool, |mut job| async move { job.complete().await }).await; + + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 2); + } + pause().await; + } + #[tokio::test] async fn it_runs_jobs_in_order() { { diff --git a/src/spawn.rs b/src/spawn.rs index 263753f..2035cad 100644 --- a/src/spawn.rs +++ b/src/spawn.rs @@ -149,3 +149,25 @@ pub async fn commit<'b, E: sqlx::Executor<'b, Database = Postgres>>( .await?; Ok(()) } + +/// Clear jobs from the specified queues. +pub async fn clear<'b, E: sqlx::Executor<'b, Database = Postgres>>( + executor: E, + channel_names: &[&str], +) -> Result<(), sqlx::Error> { + sqlx::query("SELECT mq_clear($1)") + .bind(channel_names) + .execute(executor) + .await?; + Ok(()) +} + +/// Clear jobs from the specified queues. +pub async fn clear_all<'b, E: sqlx::Executor<'b, Database = Postgres>>( + executor: E, +) -> Result<(), sqlx::Error> { + sqlx::query("SELECT mq_clear_all()") + .execute(executor) + .await?; + Ok(()) +}