From 326843db944ba04bdc4b87d37cead2df8a032ea2 Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Sun, 11 Apr 2021 13:26:08 +0100 Subject: [PATCH] Add stress test --- Cargo.toml | 4 +- sqlxmq_stress/.env | 1 + sqlxmq_stress/Cargo.toml | 16 + .../migrations/20210316025847_setup.down.sql | 12 + .../migrations/20210316025847_setup.up.sql | 288 ++++++++++++++++++ sqlxmq_stress/src/main.rs | 119 ++++++++ 6 files changed, 438 insertions(+), 2 deletions(-) create mode 100644 sqlxmq_stress/.env create mode 100644 sqlxmq_stress/Cargo.toml create mode 100644 sqlxmq_stress/migrations/20210316025847_setup.down.sql create mode 100644 sqlxmq_stress/migrations/20210316025847_setup.up.sql create mode 100644 sqlxmq_stress/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index e3b8146..327bb41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ readme = "README.md" documentation = "https://docs.rs/sqlxmq" [workspace] -members = ["sqlxmq_macros"] +members = ["sqlxmq_macros", "sqlxmq_stress"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -21,7 +21,7 @@ sqlx = { version = "0.5.1", features = [ "chrono", "uuid", ] } -tokio = { version = "1.3.0", features = ["full"] } +tokio = { version = "1.4.0", features = ["full"] } dotenv = "0.15.0" chrono = "0.4.19" uuid = { version = "0.8.2", features = ["v4"] } diff --git a/sqlxmq_stress/.env b/sqlxmq_stress/.env new file mode 100644 index 0000000..436fa61 --- /dev/null +++ b/sqlxmq_stress/.env @@ -0,0 +1 @@ +DATABASE_URL=postgres://postgres:password@localhost/sqlxmq_stress diff --git a/sqlxmq_stress/Cargo.toml b/sqlxmq_stress/Cargo.toml new file mode 100644 index 0000000..bd5829e --- /dev/null +++ b/sqlxmq_stress/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "sqlxmq_stress" +version = "0.1.0" +authors = ["Diggory Blake "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +sqlxmq = { path = ".." } +tokio = { version = "1.4.0", features = ["full"] } +dotenv = "0.15.0" +sqlx = "0.5.1" +serde = "1.0.125" +lazy_static = "1.4.0" +futures = "0.3.13" diff --git a/sqlxmq_stress/migrations/20210316025847_setup.down.sql b/sqlxmq_stress/migrations/20210316025847_setup.down.sql new file mode 100644 index 0000000..933f6d4 --- /dev/null +++ b/sqlxmq_stress/migrations/20210316025847_setup.down.sql @@ -0,0 +1,12 @@ +DROP FUNCTION mq_checkpoint; +DROP FUNCTION mq_keep_alive; +DROP FUNCTION mq_delete; +DROP FUNCTION mq_commit; +DROP FUNCTION mq_insert; +DROP FUNCTION mq_poll; +DROP FUNCTION mq_active_channels; +DROP FUNCTION mq_latest_message; +DROP TABLE mq_payloads; +DROP TABLE mq_msgs; +DROP FUNCTION mq_uuid_exists; +DROP TYPE mq_new_t; \ No newline at end of file diff --git a/sqlxmq_stress/migrations/20210316025847_setup.up.sql b/sqlxmq_stress/migrations/20210316025847_setup.up.sql new file mode 100644 index 0000000..bcb1ab5 --- /dev/null +++ b/sqlxmq_stress/migrations/20210316025847_setup.up.sql @@ -0,0 +1,288 @@ +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +-- The UDT for creating messages +CREATE TYPE mq_new_t AS ( + -- Unique message ID + id UUID, + -- Delay before message is processed + delay INTERVAL, + -- Number of retries if initial processing fails + retries INT, + -- Initial backoff between retries + retry_backoff INTERVAL, + -- Name of channel + channel_name TEXT, + -- Arguments to channel + channel_args TEXT, + -- Interval for two-phase commit (or NULL to disable two-phase commit) + commit_interval INTERVAL, + -- Whether this message should be processed in order with respect to other + -- ordered messages. + ordered BOOLEAN, + -- Name of message + name TEXT, + -- JSON payload + payload_json TEXT, + -- Binary payload + payload_bytes BYTEA +); + +-- Small, frequently updated table of messages +CREATE TABLE mq_msgs ( + id UUID PRIMARY KEY, + created_at TIMESTAMPTZ DEFAULT NOW(), + attempt_at TIMESTAMPTZ DEFAULT NOW(), + attempts INT NOT NULL DEFAULT 5, + retry_backoff INTERVAL NOT NULL DEFAULT INTERVAL '1 second', + channel_name TEXT NOT NULL, + channel_args TEXT NOT NULL, + commit_interval INTERVAL, + after_message_id UUID DEFAULT uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT +); + +-- Insert dummy message so that the 'nil' UUID can be referenced +INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (uuid_nil(), '', '', NULL); + +-- Internal helper function to check that a UUID is neither NULL nor NIL +CREATE FUNCTION mq_uuid_exists( + id UUID +) RETURNS BOOLEAN AS $$ + SELECT id IS NOT NULL AND id != uuid_nil() +$$ LANGUAGE SQL IMMUTABLE; + +-- Index for polling +CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid_nil() AND NOT mq_uuid_exists(after_message_id); +-- Index for adding messages +CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL; + +-- Index for ensuring strict message order +CREATE UNIQUE INDEX ON mq_msgs(channel_name, channel_args, after_message_id); + + +-- Large, less frequently updated table of message payloads +CREATE TABLE mq_payloads( + id UUID PRIMARY KEY, + name TEXT NOT NULL, + payload_json JSONB, + payload_bytes BYTEA +); + +-- Internal helper function to return the most recently added message in a queue. +CREATE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT) +RETURNS UUID AS $$ + SELECT COALESCE( + ( + SELECT id FROM mq_msgs + WHERE channel_name = from_channel_name + AND channel_args = from_channel_args + AND after_message_id IS NOT NULL + AND id != uuid_nil() + ORDER BY created_at DESC, id DESC + LIMIT 1 + ), + uuid_nil() + ) +$$ LANGUAGE SQL STABLE; + +-- Internal helper function to randomly select a set of channels with "ready" messages. +CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT) +RETURNS TABLE(name TEXT, args TEXT) AS $$ + SELECT channel_name, channel_args + FROM mq_msgs + WHERE id != uuid_nil() + AND attempt_at <= NOW() + AND (channel_names IS NULL OR channel_name = ANY(channel_names)) + AND NOT mq_uuid_exists(after_message_id) + GROUP BY channel_name, channel_args + ORDER BY RANDOM() + LIMIT batch_size +$$ LANGUAGE SQL STABLE; + +-- Main entry-point for job runner: pulls a batch of messages from the queue. +CREATE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1) +RETURNS TABLE( + id UUID, + is_committed BOOLEAN, + name TEXT, + payload_json TEXT, + payload_bytes BYTEA, + retry_backoff INTERVAL, + wait_time INTERVAL +) AS $$ +BEGIN + RETURN QUERY UPDATE mq_msgs + SET + attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END, + attempts = mq_msgs.attempts - 1, + retry_backoff = mq_msgs.retry_backoff * 2 + FROM ( + SELECT + msgs.id + FROM mq_active_channels(channel_names, batch_size) AS active_channels + INNER JOIN LATERAL ( + SELECT * FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND mq_msgs.attempt_at <= NOW() + AND mq_msgs.channel_name = active_channels.name + AND mq_msgs.channel_args = active_channels.args + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + ORDER BY mq_msgs.attempt_at ASC + LIMIT batch_size + ) AS msgs ON TRUE + LIMIT batch_size + ) AS messages_to_update + LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id + WHERE mq_msgs.id = messages_to_update.id + RETURNING + mq_msgs.id, + mq_msgs.commit_interval IS NULL, + mq_payloads.name, + mq_payloads.payload_json::TEXT, + mq_payloads.payload_bytes, + mq_msgs.retry_backoff / 2, + interval '0' AS wait_time; + + IF NOT FOUND THEN + RETURN QUERY SELECT + NULL::UUID, + NULL::BOOLEAN, + NULL::TEXT, + NULL::TEXT, + NULL::BYTEA, + NULL::INTERVAL, + MIN(mq_msgs.attempt_at) - NOW() + FROM mq_msgs + WHERE mq_msgs.id != uuid_nil() + AND NOT mq_uuid_exists(mq_msgs.after_message_id) + AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names)); + END IF; +END; +$$ LANGUAGE plpgsql; + +-- Creates new messages +CREATE FUNCTION mq_insert(new_messages mq_new_t[]) +RETURNS VOID AS $$ +BEGIN + PERFORM pg_notify(CONCAT('mq_', channel_name), '') + FROM unnest(new_messages) AS new_msgs + GROUP BY channel_name; + + IF FOUND THEN + PERFORM pg_notify('mq', ''); + END IF; + + INSERT INTO mq_payloads ( + id, + name, + payload_json, + payload_bytes + ) SELECT + id, + name, + payload_json::JSONB, + payload_bytes + FROM UNNEST(new_messages); + + INSERT INTO mq_msgs ( + id, + attempt_at, + attempts, + retry_backoff, + channel_name, + channel_args, + commit_interval, + after_message_id + ) + SELECT + id, + NOW() + delay + COALESCE(commit_interval, INTERVAL '0'), + retries + 1, + retry_backoff, + channel_name, + channel_args, + commit_interval, + CASE WHEN ordered + THEN + LAG(id, 1, mq_latest_message(channel_name, channel_args)) + OVER (PARTITION BY channel_name, channel_args, ordered ORDER BY id) + ELSE + NULL + END + FROM UNNEST(new_messages); +END; +$$ LANGUAGE plpgsql; + +-- Commits messages previously created with a non-NULL commit interval. +CREATE FUNCTION mq_commit(msg_ids UUID[]) +RETURNS VOID AS $$ +BEGIN + UPDATE mq_msgs + SET + attempt_at = attempt_at - commit_interval, + commit_interval = NULL + WHERE id = ANY(msg_ids) + AND commit_interval IS NOT NULL; +END; +$$ LANGUAGE plpgsql; + + +-- Deletes messages from the queue. This occurs when a message has been +-- processed, or when it expires without being processed. +CREATE FUNCTION mq_delete(msg_ids UUID[]) +RETURNS VOID AS $$ +BEGIN + PERFORM pg_notify(CONCAT('mq_', channel_name), '') + FROM mq_msgs + WHERE id = ANY(msg_ids) + AND after_message_id = uuid_nil() + GROUP BY channel_name; + + IF FOUND THEN + PERFORM pg_notify('mq', ''); + END IF; + + DELETE FROM mq_msgs WHERE id = ANY(msg_ids); + DELETE FROM mq_payloads WHERE id = ANY(msg_ids); +END; +$$ LANGUAGE plpgsql; + + +-- Can be called during the initial commit interval, or when processing +-- a message. Indicates that the caller is still active and will prevent either +-- the commit interval elapsing or the message being retried for the specified +-- interval. +CREATE FUNCTION mq_keep_alive(msg_ids UUID[], duration INTERVAL) +RETURNS VOID AS $$ + UPDATE mq_msgs + SET + attempt_at = NOW() + duration, + commit_interval = commit_interval + ((NOW() + duration) - attempt_at) + WHERE id = ANY(msg_ids) + AND attempt_at < NOW() + duration; +$$ LANGUAGE SQL; + + +-- Called during lengthy processing of a message to checkpoint the progress. +-- As well as behaving like `mq_keep_alive`, the message payload can be +-- updated. +CREATE FUNCTION mq_checkpoint( + msg_id UUID, + duration INTERVAL, + new_payload_json TEXT, + new_payload_bytes BYTEA, + extra_retries INT +) +RETURNS VOID AS $$ + UPDATE mq_msgs + SET + attempt_at = GREATEST(attempt_at, NOW() + duration), + attempts = attempts + COALESCE(extra_retries, 0) + WHERE id = msg_id; + + UPDATE mq_payloads + SET + payload_json = COALESCE(new_payload_json::JSONB, payload_json), + payload_bytes = COALESCE(new_payload_bytes, payload_bytes) + WHERE + id = msg_id; +$$ LANGUAGE SQL; diff --git a/sqlxmq_stress/src/main.rs b/sqlxmq_stress/src/main.rs new file mode 100644 index 0000000..5ed2d0c --- /dev/null +++ b/sqlxmq_stress/src/main.rs @@ -0,0 +1,119 @@ +use std::env; +use std::error::Error; +use std::sync::RwLock; +use std::time::{Duration, Instant}; + +use futures::channel::mpsc; +use futures::StreamExt; +use lazy_static::lazy_static; +use serde::{Deserialize, Serialize}; +use sqlx::{Pool, Postgres}; +use sqlxmq::{job, CurrentJob, JobRegistry}; +use tokio::task; + +lazy_static! { + static ref INSTANT_EPOCH: Instant = Instant::now(); + static ref CHANNEL: RwLock> = RwLock::new(mpsc::unbounded().0); +} + +struct JobResult { + duration: Duration, +} + +#[derive(Serialize, Deserialize)] +struct JobData { + start_time: Duration, +} + +// Arguments to the `#[job]` attribute allow setting default job options. +#[job(channel_name = "foo")] +async fn example_job( + mut current_job: CurrentJob, +) -> Result<(), Box> { + // Decode a JSON payload + let data: JobData = current_job.json()?.unwrap(); + + // Mark the job as complete + current_job.complete().await?; + let end_time = INSTANT_EPOCH.elapsed(); + + CHANNEL.read().unwrap().unbounded_send(JobResult { + duration: end_time - data.start_time, + })?; + + Ok(()) +} + +async fn start_job( + pool: Pool, + seed: usize, +) -> Result<(), Box> { + let channel_name = if seed % 3 == 0 { "foo" } else { "bar" }; + let channel_args = format!("{}", seed / 32); + example_job + .builder() + // This is where we can override job configuration + .set_channel_name(channel_name) + .set_channel_args(&channel_args) + .set_json(&JobData { + start_time: INSTANT_EPOCH.elapsed(), + })? + .spawn(&pool) + .await?; + Ok(()) +} + +async fn schedule_tasks(num_jobs: usize, interval: Duration, pool: Pool) { + let mut stream = tokio::time::interval(interval); + for i in 0..num_jobs { + task::spawn(start_job(pool.clone(), i)); + stream.tick().await; + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let _ = dotenv::dotenv(); + + let pool = Pool::connect(&env::var("DATABASE_URL")?).await?; + + let registry = JobRegistry::new(&[example_job]); + + let _runner = registry + .runner(&pool) + .set_concurrency(50, 100) + .run() + .await?; + let num_jobs = 10000; + let interval = Duration::from_nanos(700_000); + + let (tx, rx) = mpsc::unbounded(); + *CHANNEL.write()? = tx; + + let start_time = Instant::now(); + task::spawn(schedule_tasks(num_jobs, interval, pool.clone())); + + let mut results: Vec<_> = rx.take(num_jobs).collect().await; + let total_duration = start_time.elapsed(); + + assert_eq!(results.len(), num_jobs); + + results.sort_by_key(|r| r.duration); + let (min, max, median, pct) = ( + results[0].duration, + results[num_jobs - 1].duration, + results[num_jobs / 2].duration, + results[(num_jobs * 19) / 20].duration, + ); + let throughput = num_jobs as f64 / total_duration.as_secs_f64(); + + println!("min: {}s", min.as_secs_f64()); + println!("max: {}s", max.as_secs_f64()); + println!("median: {}s", median.as_secs_f64()); + println!("95th percentile: {}s", pct.as_secs_f64()); + println!("throughput: {}/s", throughput); + + // The job runner will continue listening and running + // jobs until `runner` is dropped. + Ok(()) +}