mirror of
https://github.com/Diggsey/sqlxmq.git
synced 2025-01-04 21:38:51 +00:00
Fix race condition with multiple runners
This commit is contained in:
parent
7a13e04b21
commit
989b11ffe0
4 changed files with 125 additions and 3 deletions
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "sqlxmq"
|
||||
version = "0.3.4"
|
||||
version = "0.3.5"
|
||||
authors = ["Diggory Blake <diggsey@googlemail.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT OR Apache-2.0"
|
||||
|
@ -23,7 +23,7 @@ uuid = { version = "0.8.2", features = ["v4"] }
|
|||
log = "0.4.14"
|
||||
serde_json = "1.0.64"
|
||||
serde = "1.0.124"
|
||||
sqlxmq_macros = { version = "0.3.4", path = "sqlxmq_macros" }
|
||||
sqlxmq_macros = { version = "0.3.5", path = "sqlxmq_macros" }
|
||||
anymap2 = "0.13.0"
|
||||
|
||||
[features]
|
||||
|
|
60
migrations/20220208120856_fix_concurrent_poll.down.sql
Normal file
60
migrations/20220208120856_fix_concurrent_poll.down.sql
Normal file
|
@ -0,0 +1,60 @@
|
|||
-- Main entry-point for job runner: pulls a batch of messages from the queue.
|
||||
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
|
||||
RETURNS TABLE(
|
||||
id UUID,
|
||||
is_committed BOOLEAN,
|
||||
name TEXT,
|
||||
payload_json TEXT,
|
||||
payload_bytes BYTEA,
|
||||
retry_backoff INTERVAL,
|
||||
wait_time INTERVAL
|
||||
) AS $$
|
||||
BEGIN
|
||||
RETURN QUERY UPDATE mq_msgs
|
||||
SET
|
||||
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
|
||||
attempts = mq_msgs.attempts - 1,
|
||||
retry_backoff = mq_msgs.retry_backoff * 2
|
||||
FROM (
|
||||
SELECT
|
||||
msgs.id
|
||||
FROM mq_active_channels(channel_names, batch_size) AS active_channels
|
||||
INNER JOIN LATERAL (
|
||||
SELECT * FROM mq_msgs
|
||||
WHERE mq_msgs.id != uuid_nil()
|
||||
AND mq_msgs.attempt_at <= NOW()
|
||||
AND mq_msgs.channel_name = active_channels.name
|
||||
AND mq_msgs.channel_args = active_channels.args
|
||||
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||
ORDER BY mq_msgs.attempt_at ASC
|
||||
LIMIT batch_size
|
||||
) AS msgs ON TRUE
|
||||
LIMIT batch_size
|
||||
) AS messages_to_update
|
||||
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
|
||||
WHERE mq_msgs.id = messages_to_update.id
|
||||
RETURNING
|
||||
mq_msgs.id,
|
||||
mq_msgs.commit_interval IS NULL,
|
||||
mq_payloads.name,
|
||||
mq_payloads.payload_json::TEXT,
|
||||
mq_payloads.payload_bytes,
|
||||
mq_msgs.retry_backoff / 2,
|
||||
interval '0' AS wait_time;
|
||||
|
||||
IF NOT FOUND THEN
|
||||
RETURN QUERY SELECT
|
||||
NULL::UUID,
|
||||
NULL::BOOLEAN,
|
||||
NULL::TEXT,
|
||||
NULL::TEXT,
|
||||
NULL::BYTEA,
|
||||
NULL::INTERVAL,
|
||||
MIN(mq_msgs.attempt_at) - NOW()
|
||||
FROM mq_msgs
|
||||
WHERE mq_msgs.id != uuid_nil()
|
||||
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
62
migrations/20220208120856_fix_concurrent_poll.up.sql
Normal file
62
migrations/20220208120856_fix_concurrent_poll.up.sql
Normal file
|
@ -0,0 +1,62 @@
|
|||
|
||||
-- Main entry-point for job runner: pulls a batch of messages from the queue.
|
||||
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
|
||||
RETURNS TABLE(
|
||||
id UUID,
|
||||
is_committed BOOLEAN,
|
||||
name TEXT,
|
||||
payload_json TEXT,
|
||||
payload_bytes BYTEA,
|
||||
retry_backoff INTERVAL,
|
||||
wait_time INTERVAL
|
||||
) AS $$
|
||||
BEGIN
|
||||
RETURN QUERY UPDATE mq_msgs
|
||||
SET
|
||||
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
|
||||
attempts = mq_msgs.attempts - 1,
|
||||
retry_backoff = mq_msgs.retry_backoff * 2
|
||||
FROM (
|
||||
SELECT
|
||||
msgs.id
|
||||
FROM mq_active_channels(channel_names, batch_size) AS active_channels
|
||||
INNER JOIN LATERAL (
|
||||
SELECT mq_msgs.id FROM mq_msgs
|
||||
WHERE mq_msgs.id != uuid_nil()
|
||||
AND mq_msgs.attempt_at <= NOW()
|
||||
AND mq_msgs.channel_name = active_channels.name
|
||||
AND mq_msgs.channel_args = active_channels.args
|
||||
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||
ORDER BY mq_msgs.attempt_at ASC
|
||||
LIMIT batch_size
|
||||
) AS msgs ON TRUE
|
||||
LIMIT batch_size
|
||||
) AS messages_to_update
|
||||
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
|
||||
WHERE mq_msgs.id = messages_to_update.id
|
||||
AND mq_msgs.attempt_at <= NOW()
|
||||
RETURNING
|
||||
mq_msgs.id,
|
||||
mq_msgs.commit_interval IS NULL,
|
||||
mq_payloads.name,
|
||||
mq_payloads.payload_json::TEXT,
|
||||
mq_payloads.payload_bytes,
|
||||
mq_msgs.retry_backoff / 2,
|
||||
interval '0' AS wait_time;
|
||||
|
||||
IF NOT FOUND THEN
|
||||
RETURN QUERY SELECT
|
||||
NULL::UUID,
|
||||
NULL::BOOLEAN,
|
||||
NULL::TEXT,
|
||||
NULL::TEXT,
|
||||
NULL::BYTEA,
|
||||
NULL::INTERVAL,
|
||||
MIN(mq_msgs.attempt_at) - NOW()
|
||||
FROM mq_msgs
|
||||
WHERE mq_msgs.id != uuid_nil()
|
||||
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "sqlxmq_macros"
|
||||
version = "0.3.4"
|
||||
version = "0.3.5"
|
||||
authors = ["Diggory Blake <diggsey@googlemail.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT OR Apache-2.0"
|
||||
|
|
Loading…
Reference in a new issue