mirror of
https://github.com/Diggsey/sqlxmq.git
synced 2024-10-31 21:58:58 +00:00
Add stress test
This commit is contained in:
parent
a02cc97e09
commit
326843db94
6 changed files with 438 additions and 2 deletions
|
@ -10,7 +10,7 @@ readme = "README.md"
|
|||
documentation = "https://docs.rs/sqlxmq"
|
||||
|
||||
[workspace]
|
||||
members = ["sqlxmq_macros"]
|
||||
members = ["sqlxmq_macros", "sqlxmq_stress"]
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
|
@ -21,7 +21,7 @@ sqlx = { version = "0.5.1", features = [
|
|||
"chrono",
|
||||
"uuid",
|
||||
] }
|
||||
tokio = { version = "1.3.0", features = ["full"] }
|
||||
tokio = { version = "1.4.0", features = ["full"] }
|
||||
dotenv = "0.15.0"
|
||||
chrono = "0.4.19"
|
||||
uuid = { version = "0.8.2", features = ["v4"] }
|
||||
|
|
1
sqlxmq_stress/.env
Normal file
1
sqlxmq_stress/.env
Normal file
|
@ -0,0 +1 @@
|
|||
DATABASE_URL=postgres://postgres:password@localhost/sqlxmq_stress
|
16
sqlxmq_stress/Cargo.toml
Normal file
16
sqlxmq_stress/Cargo.toml
Normal file
|
@ -0,0 +1,16 @@
|
|||
[package]
|
||||
name = "sqlxmq_stress"
|
||||
version = "0.1.0"
|
||||
authors = ["Diggory Blake <diggsey@googlemail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
sqlxmq = { path = ".." }
|
||||
tokio = { version = "1.4.0", features = ["full"] }
|
||||
dotenv = "0.15.0"
|
||||
sqlx = "0.5.1"
|
||||
serde = "1.0.125"
|
||||
lazy_static = "1.4.0"
|
||||
futures = "0.3.13"
|
12
sqlxmq_stress/migrations/20210316025847_setup.down.sql
Normal file
12
sqlxmq_stress/migrations/20210316025847_setup.down.sql
Normal file
|
@ -0,0 +1,12 @@
|
|||
DROP FUNCTION mq_checkpoint;
|
||||
DROP FUNCTION mq_keep_alive;
|
||||
DROP FUNCTION mq_delete;
|
||||
DROP FUNCTION mq_commit;
|
||||
DROP FUNCTION mq_insert;
|
||||
DROP FUNCTION mq_poll;
|
||||
DROP FUNCTION mq_active_channels;
|
||||
DROP FUNCTION mq_latest_message;
|
||||
DROP TABLE mq_payloads;
|
||||
DROP TABLE mq_msgs;
|
||||
DROP FUNCTION mq_uuid_exists;
|
||||
DROP TYPE mq_new_t;
|
288
sqlxmq_stress/migrations/20210316025847_setup.up.sql
Normal file
288
sqlxmq_stress/migrations/20210316025847_setup.up.sql
Normal file
|
@ -0,0 +1,288 @@
|
|||
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
|
||||
|
||||
-- The UDT for creating messages
|
||||
CREATE TYPE mq_new_t AS (
|
||||
-- Unique message ID
|
||||
id UUID,
|
||||
-- Delay before message is processed
|
||||
delay INTERVAL,
|
||||
-- Number of retries if initial processing fails
|
||||
retries INT,
|
||||
-- Initial backoff between retries
|
||||
retry_backoff INTERVAL,
|
||||
-- Name of channel
|
||||
channel_name TEXT,
|
||||
-- Arguments to channel
|
||||
channel_args TEXT,
|
||||
-- Interval for two-phase commit (or NULL to disable two-phase commit)
|
||||
commit_interval INTERVAL,
|
||||
-- Whether this message should be processed in order with respect to other
|
||||
-- ordered messages.
|
||||
ordered BOOLEAN,
|
||||
-- Name of message
|
||||
name TEXT,
|
||||
-- JSON payload
|
||||
payload_json TEXT,
|
||||
-- Binary payload
|
||||
payload_bytes BYTEA
|
||||
);
|
||||
|
||||
-- Small, frequently updated table of messages
|
||||
CREATE TABLE mq_msgs (
|
||||
id UUID PRIMARY KEY,
|
||||
created_at TIMESTAMPTZ DEFAULT NOW(),
|
||||
attempt_at TIMESTAMPTZ DEFAULT NOW(),
|
||||
attempts INT NOT NULL DEFAULT 5,
|
||||
retry_backoff INTERVAL NOT NULL DEFAULT INTERVAL '1 second',
|
||||
channel_name TEXT NOT NULL,
|
||||
channel_args TEXT NOT NULL,
|
||||
commit_interval INTERVAL,
|
||||
after_message_id UUID DEFAULT uuid_nil() REFERENCES mq_msgs(id) ON DELETE SET DEFAULT
|
||||
);
|
||||
|
||||
-- Insert dummy message so that the 'nil' UUID can be referenced
|
||||
INSERT INTO mq_msgs (id, channel_name, channel_args, after_message_id) VALUES (uuid_nil(), '', '', NULL);
|
||||
|
||||
-- Internal helper function to check that a UUID is neither NULL nor NIL
|
||||
CREATE FUNCTION mq_uuid_exists(
|
||||
id UUID
|
||||
) RETURNS BOOLEAN AS $$
|
||||
SELECT id IS NOT NULL AND id != uuid_nil()
|
||||
$$ LANGUAGE SQL IMMUTABLE;
|
||||
|
||||
-- Index for polling
|
||||
CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid_nil() AND NOT mq_uuid_exists(after_message_id);
|
||||
-- Index for adding messages
|
||||
CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL;
|
||||
|
||||
-- Index for ensuring strict message order
|
||||
CREATE UNIQUE INDEX ON mq_msgs(channel_name, channel_args, after_message_id);
|
||||
|
||||
|
||||
-- Large, less frequently updated table of message payloads
|
||||
CREATE TABLE mq_payloads(
|
||||
id UUID PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
payload_json JSONB,
|
||||
payload_bytes BYTEA
|
||||
);
|
||||
|
||||
-- Internal helper function to return the most recently added message in a queue.
|
||||
CREATE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT)
|
||||
RETURNS UUID AS $$
|
||||
SELECT COALESCE(
|
||||
(
|
||||
SELECT id FROM mq_msgs
|
||||
WHERE channel_name = from_channel_name
|
||||
AND channel_args = from_channel_args
|
||||
AND after_message_id IS NOT NULL
|
||||
AND id != uuid_nil()
|
||||
ORDER BY created_at DESC, id DESC
|
||||
LIMIT 1
|
||||
),
|
||||
uuid_nil()
|
||||
)
|
||||
$$ LANGUAGE SQL STABLE;
|
||||
|
||||
-- Internal helper function to randomly select a set of channels with "ready" messages.
|
||||
CREATE FUNCTION mq_active_channels(channel_names TEXT[], batch_size INT)
|
||||
RETURNS TABLE(name TEXT, args TEXT) AS $$
|
||||
SELECT channel_name, channel_args
|
||||
FROM mq_msgs
|
||||
WHERE id != uuid_nil()
|
||||
AND attempt_at <= NOW()
|
||||
AND (channel_names IS NULL OR channel_name = ANY(channel_names))
|
||||
AND NOT mq_uuid_exists(after_message_id)
|
||||
GROUP BY channel_name, channel_args
|
||||
ORDER BY RANDOM()
|
||||
LIMIT batch_size
|
||||
$$ LANGUAGE SQL STABLE;
|
||||
|
||||
-- Main entry-point for job runner: pulls a batch of messages from the queue.
|
||||
CREATE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
|
||||
RETURNS TABLE(
|
||||
id UUID,
|
||||
is_committed BOOLEAN,
|
||||
name TEXT,
|
||||
payload_json TEXT,
|
||||
payload_bytes BYTEA,
|
||||
retry_backoff INTERVAL,
|
||||
wait_time INTERVAL
|
||||
) AS $$
|
||||
BEGIN
|
||||
RETURN QUERY UPDATE mq_msgs
|
||||
SET
|
||||
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
|
||||
attempts = mq_msgs.attempts - 1,
|
||||
retry_backoff = mq_msgs.retry_backoff * 2
|
||||
FROM (
|
||||
SELECT
|
||||
msgs.id
|
||||
FROM mq_active_channels(channel_names, batch_size) AS active_channels
|
||||
INNER JOIN LATERAL (
|
||||
SELECT * FROM mq_msgs
|
||||
WHERE mq_msgs.id != uuid_nil()
|
||||
AND mq_msgs.attempt_at <= NOW()
|
||||
AND mq_msgs.channel_name = active_channels.name
|
||||
AND mq_msgs.channel_args = active_channels.args
|
||||
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||
ORDER BY mq_msgs.attempt_at ASC
|
||||
LIMIT batch_size
|
||||
) AS msgs ON TRUE
|
||||
LIMIT batch_size
|
||||
) AS messages_to_update
|
||||
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
|
||||
WHERE mq_msgs.id = messages_to_update.id
|
||||
RETURNING
|
||||
mq_msgs.id,
|
||||
mq_msgs.commit_interval IS NULL,
|
||||
mq_payloads.name,
|
||||
mq_payloads.payload_json::TEXT,
|
||||
mq_payloads.payload_bytes,
|
||||
mq_msgs.retry_backoff / 2,
|
||||
interval '0' AS wait_time;
|
||||
|
||||
IF NOT FOUND THEN
|
||||
RETURN QUERY SELECT
|
||||
NULL::UUID,
|
||||
NULL::BOOLEAN,
|
||||
NULL::TEXT,
|
||||
NULL::TEXT,
|
||||
NULL::BYTEA,
|
||||
NULL::INTERVAL,
|
||||
MIN(mq_msgs.attempt_at) - NOW()
|
||||
FROM mq_msgs
|
||||
WHERE mq_msgs.id != uuid_nil()
|
||||
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Creates new messages
|
||||
CREATE FUNCTION mq_insert(new_messages mq_new_t[])
|
||||
RETURNS VOID AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify(CONCAT('mq_', channel_name), '')
|
||||
FROM unnest(new_messages) AS new_msgs
|
||||
GROUP BY channel_name;
|
||||
|
||||
IF FOUND THEN
|
||||
PERFORM pg_notify('mq', '');
|
||||
END IF;
|
||||
|
||||
INSERT INTO mq_payloads (
|
||||
id,
|
||||
name,
|
||||
payload_json,
|
||||
payload_bytes
|
||||
) SELECT
|
||||
id,
|
||||
name,
|
||||
payload_json::JSONB,
|
||||
payload_bytes
|
||||
FROM UNNEST(new_messages);
|
||||
|
||||
INSERT INTO mq_msgs (
|
||||
id,
|
||||
attempt_at,
|
||||
attempts,
|
||||
retry_backoff,
|
||||
channel_name,
|
||||
channel_args,
|
||||
commit_interval,
|
||||
after_message_id
|
||||
)
|
||||
SELECT
|
||||
id,
|
||||
NOW() + delay + COALESCE(commit_interval, INTERVAL '0'),
|
||||
retries + 1,
|
||||
retry_backoff,
|
||||
channel_name,
|
||||
channel_args,
|
||||
commit_interval,
|
||||
CASE WHEN ordered
|
||||
THEN
|
||||
LAG(id, 1, mq_latest_message(channel_name, channel_args))
|
||||
OVER (PARTITION BY channel_name, channel_args, ordered ORDER BY id)
|
||||
ELSE
|
||||
NULL
|
||||
END
|
||||
FROM UNNEST(new_messages);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Commits messages previously created with a non-NULL commit interval.
|
||||
CREATE FUNCTION mq_commit(msg_ids UUID[])
|
||||
RETURNS VOID AS $$
|
||||
BEGIN
|
||||
UPDATE mq_msgs
|
||||
SET
|
||||
attempt_at = attempt_at - commit_interval,
|
||||
commit_interval = NULL
|
||||
WHERE id = ANY(msg_ids)
|
||||
AND commit_interval IS NOT NULL;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
|
||||
-- Deletes messages from the queue. This occurs when a message has been
|
||||
-- processed, or when it expires without being processed.
|
||||
CREATE FUNCTION mq_delete(msg_ids UUID[])
|
||||
RETURNS VOID AS $$
|
||||
BEGIN
|
||||
PERFORM pg_notify(CONCAT('mq_', channel_name), '')
|
||||
FROM mq_msgs
|
||||
WHERE id = ANY(msg_ids)
|
||||
AND after_message_id = uuid_nil()
|
||||
GROUP BY channel_name;
|
||||
|
||||
IF FOUND THEN
|
||||
PERFORM pg_notify('mq', '');
|
||||
END IF;
|
||||
|
||||
DELETE FROM mq_msgs WHERE id = ANY(msg_ids);
|
||||
DELETE FROM mq_payloads WHERE id = ANY(msg_ids);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
|
||||
-- Can be called during the initial commit interval, or when processing
|
||||
-- a message. Indicates that the caller is still active and will prevent either
|
||||
-- the commit interval elapsing or the message being retried for the specified
|
||||
-- interval.
|
||||
CREATE FUNCTION mq_keep_alive(msg_ids UUID[], duration INTERVAL)
|
||||
RETURNS VOID AS $$
|
||||
UPDATE mq_msgs
|
||||
SET
|
||||
attempt_at = NOW() + duration,
|
||||
commit_interval = commit_interval + ((NOW() + duration) - attempt_at)
|
||||
WHERE id = ANY(msg_ids)
|
||||
AND attempt_at < NOW() + duration;
|
||||
$$ LANGUAGE SQL;
|
||||
|
||||
|
||||
-- Called during lengthy processing of a message to checkpoint the progress.
|
||||
-- As well as behaving like `mq_keep_alive`, the message payload can be
|
||||
-- updated.
|
||||
CREATE FUNCTION mq_checkpoint(
|
||||
msg_id UUID,
|
||||
duration INTERVAL,
|
||||
new_payload_json TEXT,
|
||||
new_payload_bytes BYTEA,
|
||||
extra_retries INT
|
||||
)
|
||||
RETURNS VOID AS $$
|
||||
UPDATE mq_msgs
|
||||
SET
|
||||
attempt_at = GREATEST(attempt_at, NOW() + duration),
|
||||
attempts = attempts + COALESCE(extra_retries, 0)
|
||||
WHERE id = msg_id;
|
||||
|
||||
UPDATE mq_payloads
|
||||
SET
|
||||
payload_json = COALESCE(new_payload_json::JSONB, payload_json),
|
||||
payload_bytes = COALESCE(new_payload_bytes, payload_bytes)
|
||||
WHERE
|
||||
id = msg_id;
|
||||
$$ LANGUAGE SQL;
|
119
sqlxmq_stress/src/main.rs
Normal file
119
sqlxmq_stress/src/main.rs
Normal file
|
@ -0,0 +1,119 @@
|
|||
use std::env;
|
||||
use std::error::Error;
|
||||
use std::sync::RwLock;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures::channel::mpsc;
|
||||
use futures::StreamExt;
|
||||
use lazy_static::lazy_static;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::{Pool, Postgres};
|
||||
use sqlxmq::{job, CurrentJob, JobRegistry};
|
||||
use tokio::task;
|
||||
|
||||
lazy_static! {
|
||||
static ref INSTANT_EPOCH: Instant = Instant::now();
|
||||
static ref CHANNEL: RwLock<mpsc::UnboundedSender<JobResult>> = RwLock::new(mpsc::unbounded().0);
|
||||
}
|
||||
|
||||
struct JobResult {
|
||||
duration: Duration,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct JobData {
|
||||
start_time: Duration,
|
||||
}
|
||||
|
||||
// Arguments to the `#[job]` attribute allow setting default job options.
|
||||
#[job(channel_name = "foo")]
|
||||
async fn example_job(
|
||||
mut current_job: CurrentJob,
|
||||
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
|
||||
// Decode a JSON payload
|
||||
let data: JobData = current_job.json()?.unwrap();
|
||||
|
||||
// Mark the job as complete
|
||||
current_job.complete().await?;
|
||||
let end_time = INSTANT_EPOCH.elapsed();
|
||||
|
||||
CHANNEL.read().unwrap().unbounded_send(JobResult {
|
||||
duration: end_time - data.start_time,
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start_job(
|
||||
pool: Pool<Postgres>,
|
||||
seed: usize,
|
||||
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
|
||||
let channel_name = if seed % 3 == 0 { "foo" } else { "bar" };
|
||||
let channel_args = format!("{}", seed / 32);
|
||||
example_job
|
||||
.builder()
|
||||
// This is where we can override job configuration
|
||||
.set_channel_name(channel_name)
|
||||
.set_channel_args(&channel_args)
|
||||
.set_json(&JobData {
|
||||
start_time: INSTANT_EPOCH.elapsed(),
|
||||
})?
|
||||
.spawn(&pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn schedule_tasks(num_jobs: usize, interval: Duration, pool: Pool<Postgres>) {
|
||||
let mut stream = tokio::time::interval(interval);
|
||||
for i in 0..num_jobs {
|
||||
task::spawn(start_job(pool.clone(), i));
|
||||
stream.tick().await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let _ = dotenv::dotenv();
|
||||
|
||||
let pool = Pool::connect(&env::var("DATABASE_URL")?).await?;
|
||||
|
||||
let registry = JobRegistry::new(&[example_job]);
|
||||
|
||||
let _runner = registry
|
||||
.runner(&pool)
|
||||
.set_concurrency(50, 100)
|
||||
.run()
|
||||
.await?;
|
||||
let num_jobs = 10000;
|
||||
let interval = Duration::from_nanos(700_000);
|
||||
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
*CHANNEL.write()? = tx;
|
||||
|
||||
let start_time = Instant::now();
|
||||
task::spawn(schedule_tasks(num_jobs, interval, pool.clone()));
|
||||
|
||||
let mut results: Vec<_> = rx.take(num_jobs).collect().await;
|
||||
let total_duration = start_time.elapsed();
|
||||
|
||||
assert_eq!(results.len(), num_jobs);
|
||||
|
||||
results.sort_by_key(|r| r.duration);
|
||||
let (min, max, median, pct) = (
|
||||
results[0].duration,
|
||||
results[num_jobs - 1].duration,
|
||||
results[num_jobs / 2].duration,
|
||||
results[(num_jobs * 19) / 20].duration,
|
||||
);
|
||||
let throughput = num_jobs as f64 / total_duration.as_secs_f64();
|
||||
|
||||
println!("min: {}s", min.as_secs_f64());
|
||||
println!("max: {}s", max.as_secs_f64());
|
||||
println!("median: {}s", median.as_secs_f64());
|
||||
println!("95th percentile: {}s", pct.as_secs_f64());
|
||||
println!("throughput: {}/s", throughput);
|
||||
|
||||
// The job runner will continue listening and running
|
||||
// jobs until `runner` is dropped.
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in a new issue