becker/postgres-locking

Dearest Reviewer,

In an attempt to allow more then one runner to run at a time I have
added in postgres advisory locks to the polling call. This requires a
pre calculated number which is done on insert.

I have updated the stress test and added some ENV values and a README.
The stress test results are inline with what I get on the main branch.

```
Standard:
min: 0.038561309s
max: 37.321387572s
median: 29.594086093s
95th percentile: 34.564979967s
throughput: 263.7400277852498/s
Locking:
min: 0.979646111s
max: 37.211344452s
median: 29.578954612s
95th percentile: 34.722588948s
throughput: 266.5816441194382/s
```

I used pg_try_advisory_xact_lock because it is transaction based not
session based.

Becker
This commit is contained in:
becker 2022-02-04 15:48:35 -08:00
parent 6d3ed6fb99
commit bcc3c2b587
8 changed files with 378 additions and 11 deletions

View file

@ -0,0 +1 @@
-- Add down migration script here

View file

@ -0,0 +1,126 @@
-- Add up migration script here
ALTER TABLE mq_msgs
ADD COLUMN lock_number BIGINT;
CREATE OR REPLACE FUNCTION mq_insert(new_messages mq_new_t[])
RETURNS VOID AS $$
BEGIN
PERFORM pg_notify(CONCAT('mq_', channel_name), '')
FROM unnest(new_messages) AS new_msgs
GROUP BY channel_name;
IF FOUND THEN
PERFORM pg_notify('mq', '');
END IF;
INSERT INTO mq_payloads (
id,
name,
payload_json,
payload_bytes
) SELECT
id,
name,
payload_json::JSONB,
payload_bytes
FROM UNNEST(new_messages);
INSERT INTO mq_msgs (
id,
lock_number,
attempt_at,
attempts,
retry_backoff,
channel_name,
channel_args,
commit_interval,
after_message_id
)
SELECT
id,
('x' || translate(id::text, '-', ''))::bit(64)::bigint,
NOW() + delay + COALESCE(commit_interval, INTERVAL '0'),
retries + 1,
retry_backoff,
channel_name,
channel_args,
commit_interval,
CASE WHEN ordered
THEN
LAG(id, 1, mq_latest_message(channel_name, channel_args))
OVER (PARTITION BY channel_name, channel_args, ordered ORDER BY id)
ELSE
NULL
END
FROM UNNEST(new_messages);
END;
$$ LANGUAGE plpgsql;
UPDATE mq_msgs SET lock_number = ('x' || translate(id::text, '-', ''))::bit(64)::bigint WHERE lock_number IS NULL ;
-- Main entry-point for job runner: pulls a batch of messages from the queue.
CREATE FUNCTION mq_poll_locking(channel_names TEXT[], batch_size INT DEFAULT 1)
RETURNS TABLE(
id UUID,
is_committed BOOLEAN,
name TEXT,
payload_json TEXT,
payload_bytes BYTEA,
retry_backoff INTERVAL,
wait_time INTERVAL
) AS $$
BEGIN
RETURN QUERY UPDATE mq_msgs
SET
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
attempts = mq_msgs.attempts - 1,
retry_backoff = mq_msgs.retry_backoff * 2
FROM (
SELECT
msgs.id,
msgs.lock_number
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT * FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
ORDER BY mq_msgs.attempt_at ASC
LIMIT batch_size
) AS msgs ON TRUE
WHERE pg_try_advisory_xact_lock(lock_number)
LIMIT batch_size
) AS messages_to_update
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
WHERE mq_msgs.id = messages_to_update.id
RETURNING
mq_msgs.id,
mq_msgs.commit_interval IS NULL,
mq_payloads.name,
mq_payloads.payload_json::TEXT,
mq_payloads.payload_bytes,
mq_msgs.retry_backoff / 2,
interval '0' AS wait_time;
IF NOT FOUND THEN
RETURN QUERY SELECT
NULL::UUID,
NULL::BOOLEAN,
NULL::TEXT,
NULL::TEXT,
NULL::BYTEA,
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
END;
$$ LANGUAGE plpgsql;
ALTER TABLE mq_msgs
ALTER COLUMN lock_number SET NOT NULL

View file

@ -1 +1,3 @@
DATABASE_URL=postgres://postgres:password@localhost/sqlxmq_stress
MAX_JOBS=1000
MIN_JOBS=50

8
sqlxmq_stress/README Normal file
View file

@ -0,0 +1,8 @@
# Create database
database url is set in .env
`pqsl -c 'create database sqlxmq_stress'`
# run migrations
`sqlx migrate run`

View file

@ -0,0 +1 @@
-- Add down migration script here

View file

@ -0,0 +1,126 @@
-- Add up migration script here
ALTER TABLE mq_msgs
ADD COLUMN lock_number BIGINT;
CREATE OR REPLACE FUNCTION mq_insert(new_messages mq_new_t[])
RETURNS VOID AS $$
BEGIN
PERFORM pg_notify(CONCAT('mq_', channel_name), '')
FROM unnest(new_messages) AS new_msgs
GROUP BY channel_name;
IF FOUND THEN
PERFORM pg_notify('mq', '');
END IF;
INSERT INTO mq_payloads (
id,
name,
payload_json,
payload_bytes
) SELECT
id,
name,
payload_json::JSONB,
payload_bytes
FROM UNNEST(new_messages);
INSERT INTO mq_msgs (
id,
lock_number,
attempt_at,
attempts,
retry_backoff,
channel_name,
channel_args,
commit_interval,
after_message_id
)
SELECT
id,
('x' || translate(id::text, '-', ''))::bit(64)::bigint,
NOW() + delay + COALESCE(commit_interval, INTERVAL '0'),
retries + 1,
retry_backoff,
channel_name,
channel_args,
commit_interval,
CASE WHEN ordered
THEN
LAG(id, 1, mq_latest_message(channel_name, channel_args))
OVER (PARTITION BY channel_name, channel_args, ordered ORDER BY id)
ELSE
NULL
END
FROM UNNEST(new_messages);
END;
$$ LANGUAGE plpgsql;
UPDATE mq_msgs SET lock_number = ('x' || translate(id::text, '-', ''))::bit(64)::bigint WHERE lock_number IS NULL ;
-- Main entry-point for job runner: pulls a batch of messages from the queue.
CREATE FUNCTION mq_poll_locking(channel_names TEXT[], batch_size INT DEFAULT 1)
RETURNS TABLE(
id UUID,
is_committed BOOLEAN,
name TEXT,
payload_json TEXT,
payload_bytes BYTEA,
retry_backoff INTERVAL,
wait_time INTERVAL
) AS $$
BEGIN
RETURN QUERY UPDATE mq_msgs
SET
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
attempts = mq_msgs.attempts - 1,
retry_backoff = mq_msgs.retry_backoff * 2
FROM (
SELECT
msgs.id,
msgs.lock_number
FROM mq_active_channels(channel_names, batch_size) AS active_channels
INNER JOIN LATERAL (
SELECT * FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND mq_msgs.attempt_at <= NOW()
AND mq_msgs.channel_name = active_channels.name
AND mq_msgs.channel_args = active_channels.args
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
ORDER BY mq_msgs.attempt_at ASC
LIMIT batch_size
) AS msgs ON TRUE
WHERE pg_try_advisory_xact_lock(lock_number)
LIMIT batch_size
) AS messages_to_update
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
WHERE mq_msgs.id = messages_to_update.id
RETURNING
mq_msgs.id,
mq_msgs.commit_interval IS NULL,
mq_payloads.name,
mq_payloads.payload_json::TEXT,
mq_payloads.payload_bytes,
mq_msgs.retry_backoff / 2,
interval '0' AS wait_time;
IF NOT FOUND THEN
RETURN QUERY SELECT
NULL::UUID,
NULL::BOOLEAN,
NULL::TEXT,
NULL::TEXT,
NULL::BYTEA,
NULL::INTERVAL,
MIN(mq_msgs.attempt_at) - NOW()
FROM mq_msgs
WHERE mq_msgs.id != uuid_nil()
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
END IF;
END;
$$ LANGUAGE plpgsql;
ALTER TABLE mq_msgs
ALTER COLUMN lock_number SET NOT NULL

View file

@ -81,7 +81,20 @@ async fn schedule_tasks(num_jobs: usize, interval: Duration, pool: Pool<Postgres
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let _ = dotenv::dotenv();
standard().await?;
locking().await?;
Ok(())
}
async fn standard() -> Result<(), Box<dyn Error>> {
let min_jobs = env::var("MIN_JOBS")
.expect("MIN_JOBS should be set in .env or when called")
.parse::<usize>()
.expect("MIN_JOBS should be a usize number");
let max_jobs = env::var("MAX_JOBS")
.expect("MAX_JOBS should be set in .env or when called")
.parse::<usize>()
.expect("MAX_JOBS should be a usize number");
let pool = Pool::connect(&env::var("DATABASE_URL")?).await?;
// Make sure the queues are empty
@ -91,7 +104,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let _runner = registry
.runner(&pool)
.set_concurrency(50, 100)
.set_concurrency(min_jobs, max_jobs)
.run()
.await?;
let num_jobs = 10000;
@ -117,6 +130,64 @@ async fn main() -> Result<(), Box<dyn Error>> {
);
let throughput = num_jobs as f64 / total_duration.as_secs_f64();
println!("Standard:");
println!("min: {}s", min.as_secs_f64());
println!("max: {}s", max.as_secs_f64());
println!("median: {}s", median.as_secs_f64());
println!("95th percentile: {}s", pct.as_secs_f64());
println!("throughput: {}/s", throughput);
// The job runner will continue listening and running
// jobs until `runner` is dropped.
Ok(())
}
async fn locking() -> Result<(), Box<dyn Error>> {
let min_jobs = env::var("MIN_JOBS")
.expect("MIN_JOBS should be set in .env or when called")
.parse::<usize>()
.expect("MIN_JOBS should be a usize number");
let max_jobs = env::var("MAX_JOBS")
.expect("MAX_JOBS should be set in .env or when called")
.parse::<usize>()
.expect("MAX_JOBS should be a usize number");
let pool = Pool::connect(&env::var("DATABASE_URL")?).await?;
// Make sure the queues are empty
sqlxmq::clear_all(&pool).await?;
let registry = JobRegistry::new(&[example_job]);
let _runner = registry
.runner(&pool)
.set_concurrency(min_jobs, max_jobs)
.set_locking(true)
.run()
.await?;
let num_jobs = 10000;
let interval = Duration::from_nanos(700_000);
let (tx, rx) = mpsc::unbounded();
*CHANNEL.write()? = tx;
let start_time = Instant::now();
task::spawn(schedule_tasks(num_jobs, interval, pool.clone()));
let mut results: Vec<_> = rx.take(num_jobs).collect().await;
let total_duration = start_time.elapsed();
assert_eq!(results.len(), num_jobs);
results.sort_by_key(|r| r.duration);
let (min, max, median, pct) = (
results[0].duration,
results[num_jobs - 1].duration,
results[num_jobs / 2].duration,
results[(num_jobs * 19) / 20].duration,
);
let throughput = num_jobs as f64 / total_duration.as_secs_f64();
println!("Locking:");
println!("min: {}s", min.as_secs_f64());
println!("max: {}s", max.as_secs_f64());
println!("median: {}s", median.as_secs_f64());

View file

@ -23,6 +23,7 @@ pub struct JobRunnerOptions {
dispatch: Opaque<Arc<dyn Fn(CurrentJob) + Send + Sync + 'static>>,
pool: Pool<Postgres>,
keep_alive: bool,
locking: bool,
}
#[derive(Debug)]
@ -220,8 +221,18 @@ impl JobRunnerOptions {
keep_alive: true,
dispatch: Opaque(Arc::new(f)),
pool: pool.clone(),
locking: false,
}
}
/// If locking is setup a postgres advisor transaction lock "pg_try_advisory_xact_lock" will
/// be used when selecting jobs. This is useful when running more then one runner.
///
/// Defaults to `false`.
pub fn set_locking(&mut self, locking: bool) -> &mut Self {
self.locking = locking;
self
}
/// Set the concurrency limits for this job runner. When the number of active
/// jobs falls below the minimum, the runner will poll for more, up to the maximum.
///
@ -314,19 +325,11 @@ fn to_duration(interval: PgInterval) -> Duration {
}
}
async fn poll_and_dispatch(
async fn handle_messages(
job_runner: &Arc<JobRunner>,
batch_size: i32,
messages: Vec<PolledMessage>,
) -> Result<Duration, sqlx::Error> {
log::info!("Polling for messages");
let options = &job_runner.options;
let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2)")
.bind(&options.channel_names)
.bind(batch_size)
.fetch_all(&options.pool)
.await?;
let ids_to_delete: Vec<_> = messages
.iter()
.filter(|msg| msg.is_committed == Some(false))
@ -387,6 +390,35 @@ async fn poll_and_dispatch(
Ok(wait_time)
}
async fn poll_and_dispatch(
job_runner: &Arc<JobRunner>,
batch_size: i32,
) -> Result<Duration, sqlx::Error> {
log::info!("Polling for messages");
let options = &job_runner.options;
if !options.locking {
let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, $2)")
.bind(&options.channel_names)
.bind(batch_size)
.fetch_all(&options.pool)
.await?;
handle_messages(job_runner, messages).await
} else {
let transaction = options.pool.begin().await?;
let messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll_locking($1, $2)")
.bind(&options.channel_names)
.bind(batch_size)
.fetch_all(&options.pool)
.await?;
let results = handle_messages(job_runner, messages).await;
transaction.commit().await?;
results
}
}
async fn main_loop(job_runner: Arc<JobRunner>, _listener_task: OwnedHandle) {
let options = &job_runner.options;
let mut failures = 0;