From 989b11ffe052c7903d4593569c92894843780ca2 Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Tue, 8 Feb 2022 13:10:05 +0000 Subject: [PATCH] Fix race condition with multiple runners --- Cargo.toml | 4 +- ...0220208120856_fix_concurrent_poll.down.sql | 60 ++++++++++++++++++ .../20220208120856_fix_concurrent_poll.up.sql | 62 +++++++++++++++++++ sqlxmq_macros/Cargo.toml | 2 +- 4 files changed, 125 insertions(+), 3 deletions(-) create mode 100644 migrations/20220208120856_fix_concurrent_poll.down.sql create mode 100644 migrations/20220208120856_fix_concurrent_poll.up.sql diff --git a/Cargo.toml b/Cargo.toml index 3aa19e5..5ec4227 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sqlxmq" -version = "0.3.4" +version = "0.3.5" authors = ["Diggory Blake "] edition = "2018" license = "MIT OR Apache-2.0" @@ -23,7 +23,7 @@ uuid = { version = "0.8.2", features = ["v4"] } log = "0.4.14" serde_json = "1.0.64" serde = "1.0.124" -sqlxmq_macros = { version = "0.3.4", path = "sqlxmq_macros" } +sqlxmq_macros = { version = "0.3.5", path = "sqlxmq_macros" } anymap2 = "0.13.0" [features] diff --git a/migrations/20220208120856_fix_concurrent_poll.down.sql b/migrations/20220208120856_fix_concurrent_poll.down.sql new file mode 100644 index 0000000..6cd2d21 --- /dev/null +++ b/migrations/20220208120856_fix_concurrent_poll.down.sql @@ -0,0 +1,60 @@ +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = mq_msgs.retry_backoff * 2 + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT * FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/migrations/20220208120856_fix_concurrent_poll.up.sql b/migrations/20220208120856_fix_concurrent_poll.up.sql new file mode 100644 index 0000000..cae6151 --- /dev/null +++ b/migrations/20220208120856_fix_concurrent_poll.up.sql @@ -0,0 +1,62 @@ + +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = mq_msgs.retry_backoff * 2 + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT mq_msgs.id FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + AND mq_msgs.attempt_at <= NOW() + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql; diff --git a/sqlxmq_macros/Cargo.toml b/sqlxmq_macros/Cargo.toml index cfa75b9..673b114 100644 --- a/sqlxmq_macros/Cargo.toml +++ b/sqlxmq_macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sqlxmq_macros" -version = "0.3.4" +version = "0.3.5" authors = ["Diggory Blake "] edition = "2018" license = "MIT OR Apache-2.0"