add max_retry_backoff option to job runner (Diggsey#19)

Credits to 0489b5178a
This commit is contained in:
Vlad Zagvozdkin 2024-04-25 17:54:42 +05:00
parent 387b301656
commit f693c6381c
No known key found for this signature in database
GPG key ID: BA78F79338112F49
4 changed files with 187 additions and 1 deletions

View file

@ -0,0 +1,61 @@
-- Main entry-point for job runner: pulls a batch of messages from the queue.
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
RETURNS TABLE(
id UUID,
is_committed BOOLEAN,
name TEXT,
payload_json TEXT,
payload_bytes BYTEA,
retry_backoff INTERVAL,
wait_time INTERVAL
) AS $$
BEGIN
RETURN QUERY UPDATE mq_msgs
SET
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
attempts = mq_msgs.attempts - 1,
retry_backoff = mq_msgs.retry_backoff * 2
FROM (
SELECT
msgs.id
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT mq_msgs.id FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
ORDER BY mq_msgs.attempt_at ASC
LIMIT batch_size
) AS msgs ON TRUE
LIMIT batch_size
) AS messages_to_update
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
WHERE mq_msgs.id = messages_to_update.id
AND mq_msgs.attempt_at <= NOW()
RETURNING
mq_msgs.id,
mq_msgs.commit_interval IS NULL,
mq_payloads.name,
mq_payloads.payload_json::TEXT,
mq_payloads.payload_bytes,
mq_msgs.retry_backoff / 2,
interval '0' AS wait_time;
IF NOT FOUND THEN
RETURN QUERY SELECT
NULL::UUID,
NULL::BOOLEAN,
NULL::TEXT,
NULL::TEXT,
NULL::BYTEA,
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
END;
$$ LANGUAGE plpgsql;

View file

@ -0,0 +1,61 @@
-- Main entry-point for job runner: pulls a batch of messages from the queue.
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1, max_retry_text INTERVAL DEFAULT NULL)
RETURNS TABLE(
id UUID,
is_committed BOOLEAN,
name TEXT,
payload_json TEXT,
payload_bytes BYTEA,
retry_backoff INTERVAL,
wait_time INTERVAL
) AS $$
BEGIN
RETURN QUERY UPDATE mq_msgs
SET
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
attempts = mq_msgs.attempts - 1,
retry_backoff = CASE WHEN max_retry_text IS NULL THEN mq_msgs.retry_backoff * 2 ELSE LEAST(mq_msgs.retry_backoff * 2, max_retry_text) END
FROM (
SELECT
msgs.id
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT mq_msgs.id FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
ORDER BY mq_msgs.attempt_at ASC
LIMIT batch_size
) AS msgs ON TRUE
LIMIT batch_size
) AS messages_to_update
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
WHERE mq_msgs.id = messages_to_update.id
AND mq_msgs.attempt_at <= NOW()
RETURNING
mq_msgs.id,
mq_msgs.commit_interval IS NULL,
mq_payloads.name,
mq_payloads.payload_json::TEXT,
mq_payloads.payload_bytes,
mq_msgs.retry_backoff / 2,
interval '0' AS wait_time;
IF NOT FOUND THEN
RETURN QUERY SELECT
NULL::UUID,
NULL::BOOLEAN,
NULL::TEXT,
NULL::TEXT,
NULL::BYTEA,
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
END;
$$ LANGUAGE plpgsql;

View file

@ -556,6 +556,58 @@ mod tests {
pause().await; pause().await;
} }
#[tokio::test]
async fn it_uses_max_retry_backoff_correctly() {
{
let pool = &*test_pool().await;
let backoff = default_pause() + 300;
let counter = Arc::new(AtomicUsize::new(0));
let counter2 = counter.clone();
let _runner = JobRunnerOptions::new(pool, move |_job| {
counter2.fetch_add(1, Ordering::SeqCst);
})
.set_max_retry_backoff(Duration::from_millis(backoff * 4))
.run()
.await
.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 0);
JobBuilder::new("foo")
.set_retry_backoff(Duration::from_millis(backoff))
.set_retries(4)
.spawn(pool)
.await
.unwrap();
// First attempt
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
// Second attempt
pause_ms(backoff).await;
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
// Third attempt
pause_ms(backoff * 2).await;
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 3);
// Fourth attempt
pause_ms(backoff * 4).await;
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 4);
// Fifth attempt with now constant pause
pause_ms(backoff * 4).await;
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 5);
}
pause().await;
}
#[tokio::test] #[tokio::test]
async fn it_can_checkpoint_jobs() { async fn it_can_checkpoint_jobs() {
{ {

View file

@ -19,6 +19,7 @@ use crate::utils::{Opaque, OwnedHandle};
pub struct JobRunnerOptions { pub struct JobRunnerOptions {
min_concurrency: usize, min_concurrency: usize,
max_concurrency: usize, max_concurrency: usize,
max_retry_backoff: Option<Duration>,
channel_names: Option<Vec<String>>, channel_names: Option<Vec<String>>,
dispatch: Opaque<Arc<dyn Fn(CurrentJob) + Send + Sync + 'static>>, dispatch: Opaque<Arc<dyn Fn(CurrentJob) + Send + Sync + 'static>>,
pool: Pool<Postgres>, pool: Pool<Postgres>,
@ -222,12 +223,22 @@ impl JobRunnerOptions {
Self { Self {
min_concurrency: 16, min_concurrency: 16,
max_concurrency: 32, max_concurrency: 32,
max_retry_backoff: None,
channel_names: None, channel_names: None,
keep_alive: true, keep_alive: true,
dispatch: Opaque(Arc::new(f)), dispatch: Opaque(Arc::new(f)),
pool: pool.clone(), pool: pool.clone(),
} }
} }
/// Set the max retry backoff for this job runner. If the initial retry backoff is 1,
/// and `max_retry_backoff` is 8, then delays between retries are going to be
/// from this sequence: 1, 2, 4, 8, 8, 8...
///
/// Default is None, meaning no limit
pub fn set_max_retry_backoff(&mut self, duration: Duration) -> &mut Self {
self.max_retry_backoff = Some(duration);
self
}
/// Set the concurrency limits for this job runner. When the number of active /// Set the concurrency limits for this job runner. When the number of active
/// jobs falls below the minimum, the runner will poll for more, up to the maximum. /// jobs falls below the minimum, the runner will poll for more, up to the maximum.
/// ///
@ -403,9 +414,10 @@ async fn poll_and_dispatch(
log::info!("Polling for messages"); log::info!("Polling for messages");
let options = &job_runner.options; let options = &job_runner.options;
let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2)") let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2, $3)")
.bind(&options.channel_names) .bind(&options.channel_names)
.bind(batch_size) .bind(batch_size)
.bind(&options.max_retry_backoff)
.fetch_all(&options.pool) .fetch_all(&options.pool)
.await?; .await?;