mirror of
https://github.com/Diggsey/sqlxmq.git
synced 2025-06-07 14:38:47 +00:00
fix: lock rows and skip them during concurrent poll
This commit is contained in:
parent
387b301656
commit
2dcd240ec0
2 changed files with 125 additions and 0 deletions
|
@ -0,0 +1,62 @@
|
||||||
|
|
||||||
|
-- Main entry-point for job runner: pulls a batch of messages from the queue.
|
||||||
|
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
|
||||||
|
RETURNS TABLE(
|
||||||
|
id UUID,
|
||||||
|
is_committed BOOLEAN,
|
||||||
|
name TEXT,
|
||||||
|
payload_json TEXT,
|
||||||
|
payload_bytes BYTEA,
|
||||||
|
retry_backoff INTERVAL,
|
||||||
|
wait_time INTERVAL
|
||||||
|
) AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN QUERY UPDATE mq_msgs
|
||||||
|
SET
|
||||||
|
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
|
||||||
|
attempts = mq_msgs.attempts - 1,
|
||||||
|
retry_backoff = mq_msgs.retry_backoff * 2
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
msgs.id
|
||||||
|
FROM mq_active_channels(channel_names, batch_size) AS active_channels
|
||||||
|
INNER JOIN LATERAL (
|
||||||
|
SELECT mq_msgs.id FROM mq_msgs
|
||||||
|
WHERE mq_msgs.id != uuid_nil()
|
||||||
|
AND mq_msgs.attempt_at <= NOW()
|
||||||
|
AND mq_msgs.channel_name = active_channels.name
|
||||||
|
AND mq_msgs.channel_args = active_channels.args
|
||||||
|
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||||
|
ORDER BY mq_msgs.attempt_at ASC
|
||||||
|
LIMIT batch_size
|
||||||
|
) AS msgs ON TRUE
|
||||||
|
LIMIT batch_size
|
||||||
|
) AS messages_to_update
|
||||||
|
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
|
||||||
|
WHERE mq_msgs.id = messages_to_update.id
|
||||||
|
AND mq_msgs.attempt_at <= NOW()
|
||||||
|
RETURNING
|
||||||
|
mq_msgs.id,
|
||||||
|
mq_msgs.commit_interval IS NULL,
|
||||||
|
mq_payloads.name,
|
||||||
|
mq_payloads.payload_json::TEXT,
|
||||||
|
mq_payloads.payload_bytes,
|
||||||
|
mq_msgs.retry_backoff / 2,
|
||||||
|
interval '0' AS wait_time;
|
||||||
|
|
||||||
|
IF NOT FOUND THEN
|
||||||
|
RETURN QUERY SELECT
|
||||||
|
NULL::UUID,
|
||||||
|
NULL::BOOLEAN,
|
||||||
|
NULL::TEXT,
|
||||||
|
NULL::TEXT,
|
||||||
|
NULL::BYTEA,
|
||||||
|
NULL::INTERVAL,
|
||||||
|
MIN(mq_msgs.attempt_at) - NOW()
|
||||||
|
FROM mq_msgs
|
||||||
|
WHERE mq_msgs.id != uuid_nil()
|
||||||
|
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||||
|
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;- Add down migration script here
|
|
@ -0,0 +1,63 @@
|
||||||
|
|
||||||
|
-- Main entry-point for job runner: pulls a batch of messages from the queue.
|
||||||
|
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
|
||||||
|
RETURNS TABLE(
|
||||||
|
id UUID,
|
||||||
|
is_committed BOOLEAN,
|
||||||
|
name TEXT,
|
||||||
|
payload_json TEXT,
|
||||||
|
payload_bytes BYTEA,
|
||||||
|
retry_backoff INTERVAL,
|
||||||
|
wait_time INTERVAL
|
||||||
|
) AS $$
|
||||||
|
BEGIN
|
||||||
|
RETURN QUERY UPDATE mq_msgs
|
||||||
|
SET
|
||||||
|
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
|
||||||
|
attempts = mq_msgs.attempts - 1,
|
||||||
|
retry_backoff = mq_msgs.retry_backoff * 2
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
msgs.id
|
||||||
|
FROM mq_active_channels(channel_names, batch_size) AS active_channels
|
||||||
|
INNER JOIN LATERAL (
|
||||||
|
SELECT mq_msgs.id FROM mq_msgs
|
||||||
|
WHERE mq_msgs.id != uuid_nil()
|
||||||
|
AND mq_msgs.attempt_at <= NOW()
|
||||||
|
AND mq_msgs.channel_name = active_channels.name
|
||||||
|
AND mq_msgs.channel_args = active_channels.args
|
||||||
|
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||||
|
ORDER BY mq_msgs.attempt_at ASC
|
||||||
|
LIMIT batch_size
|
||||||
|
FOR UPDATE SKIP LOCKED
|
||||||
|
) AS msgs ON TRUE
|
||||||
|
LIMIT batch_size
|
||||||
|
) AS messages_to_update
|
||||||
|
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
|
||||||
|
WHERE mq_msgs.id = messages_to_update.id
|
||||||
|
AND mq_msgs.attempt_at <= NOW()
|
||||||
|
RETURNING
|
||||||
|
mq_msgs.id,
|
||||||
|
mq_msgs.commit_interval IS NULL,
|
||||||
|
mq_payloads.name,
|
||||||
|
mq_payloads.payload_json::TEXT,
|
||||||
|
mq_payloads.payload_bytes,
|
||||||
|
mq_msgs.retry_backoff / 2,
|
||||||
|
interval '0' AS wait_time;
|
||||||
|
|
||||||
|
IF NOT FOUND THEN
|
||||||
|
RETURN QUERY SELECT
|
||||||
|
NULL::UUID,
|
||||||
|
NULL::BOOLEAN,
|
||||||
|
NULL::TEXT,
|
||||||
|
NULL::TEXT,
|
||||||
|
NULL::BYTEA,
|
||||||
|
NULL::INTERVAL,
|
||||||
|
MIN(mq_msgs.attempt_at) - NOW()
|
||||||
|
FROM mq_msgs
|
||||||
|
WHERE mq_msgs.id != uuid_nil()
|
||||||
|
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||||
|
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
Loading…
Reference in a new issue