From f693c6381ce1c2fc62c66723f61b5f67d836724c Mon Sep 17 00:00:00 2001 From: Vlad Zagvozdkin Date: Thu, 25 Apr 2024 17:54:42 +0500 Subject: [PATCH] add max_retry_backoff option to job runner (Diggsey#19) Credits to https://github.com/StructionSite/sqlxmq/commit/0489b5178ae01ae828c7cbe69d60f298e3e71cb2 --- .../20240424183215_add_max_retry.down.sql | 61 +++++++++++++++++++ .../20240424183215_add_max_retry.up.sql | 61 +++++++++++++++++++ src/lib.rs | 52 ++++++++++++++++ src/runner.rs | 14 ++++- 4 files changed, 187 insertions(+), 1 deletion(-) create mode 100644 migrations/20240424183215_add_max_retry.down.sql create mode 100644 migrations/20240424183215_add_max_retry.up.sql diff --git a/migrations/20240424183215_add_max_retry.down.sql b/migrations/20240424183215_add_max_retry.down.sql new file mode 100644 index 0000000..d8f3728 --- /dev/null +++ b/migrations/20240424183215_add_max_retry.down.sql @@ -0,0 +1,61 @@ +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = mq_msgs.retry_backoff * 2 + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT mq_msgs.id FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + AND mq_msgs.attempt_at <= NOW() + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql; diff --git a/migrations/20240424183215_add_max_retry.up.sql b/migrations/20240424183215_add_max_retry.up.sql new file mode 100644 index 0000000..b8e067e --- /dev/null +++ b/migrations/20240424183215_add_max_retry.up.sql @@ -0,0 +1,61 @@ +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1, max_retry_text INTERVAL DEFAULT NULL) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = CASE WHEN max_retry_text IS NULL THEN mq_msgs.retry_backoff * 2 ELSE LEAST(mq_msgs.retry_backoff * 2, max_retry_text) END + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT mq_msgs.id FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + AND mq_msgs.attempt_at <= NOW() + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql; diff --git a/src/lib.rs b/src/lib.rs index 3423821..1986c28 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -556,6 +556,58 @@ mod tests { pause().await; } + #[tokio::test] + async fn it_uses_max_retry_backoff_correctly() { + { + let pool = &*test_pool().await; + + let backoff = default_pause() + 300; + + let counter = Arc::new(AtomicUsize::new(0)); + let counter2 = counter.clone(); + let _runner = JobRunnerOptions::new(pool, move |_job| { + counter2.fetch_add(1, Ordering::SeqCst); + }) + .set_max_retry_backoff(Duration::from_millis(backoff * 4)) + .run() + .await + .unwrap(); + + assert_eq!(counter.load(Ordering::SeqCst), 0); + JobBuilder::new("foo") + .set_retry_backoff(Duration::from_millis(backoff)) + .set_retries(4) + .spawn(pool) + .await + .unwrap(); + + // First attempt + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 1); + + // Second attempt + pause_ms(backoff).await; + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 2); + + // Third attempt + pause_ms(backoff * 2).await; + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 3); + + // Fourth attempt + pause_ms(backoff * 4).await; + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 4); + + // Fifth attempt with now constant pause + pause_ms(backoff * 4).await; + pause().await; + assert_eq!(counter.load(Ordering::SeqCst), 5); + } + pause().await; + } + #[tokio::test] async fn it_can_checkpoint_jobs() { { diff --git a/src/runner.rs b/src/runner.rs index 85ddba5..57a1704 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -19,6 +19,7 @@ use crate::utils::{Opaque, OwnedHandle}; pub struct JobRunnerOptions { min_concurrency: usize, max_concurrency: usize, + max_retry_backoff: Option, channel_names: Option>, dispatch: Opaque>, pool: Pool, @@ -222,12 +223,22 @@ impl JobRunnerOptions { Self { min_concurrency: 16, max_concurrency: 32, + max_retry_backoff: None, channel_names: None, keep_alive: true, dispatch: Opaque(Arc::new(f)), pool: pool.clone(), } } + /// Set the max retry backoff for this job runner. If the initial retry backoff is 1, + /// and `max_retry_backoff` is 8, then delays between retries are going to be + /// from this sequence: 1, 2, 4, 8, 8, 8... + /// + /// Default is None, meaning no limit + pub fn set_max_retry_backoff(&mut self, duration: Duration) -> &mut Self { + self.max_retry_backoff = Some(duration); + self + } /// Set the concurrency limits for this job runner. When the number of active /// jobs falls below the minimum, the runner will poll for more, up to the maximum. /// @@ -403,9 +414,10 @@ async fn poll_and_dispatch( log::info!("Polling for messages"); let options = &job_runner.options; - let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2)") + let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2, $3)") .bind(&options.channel_names) .bind(batch_size) + .bind(&options.max_retry_backoff) .fetch_all(&options.pool) .await?;