Multiple fixes:

- Add functions to clear channels.
- Clear all channels before stress test.
- Abort stress test if spawning a job fails.
- Add function to determine whether an operation can be retried.
- Bump version to 0.3.0.
This commit is contained in:
Diggory Blake 2021-09-20 13:27:48 +01:00
parent 4529494e98
commit 5d287b7247
No known key found for this signature in database
GPG key ID: E6BDFA83146ABD40
9 changed files with 155 additions and 8 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "sqlxmq" name = "sqlxmq"
version = "0.2.2" version = "0.3.0"
authors = ["Diggory Blake <diggsey@googlemail.com>"] authors = ["Diggory Blake <diggsey@googlemail.com>"]
edition = "2018" edition = "2018"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
@ -23,7 +23,7 @@ uuid = { version = "0.8.2", features = ["v4"] }
log = "0.4.14" log = "0.4.14"
serde_json = "1.0.64" serde_json = "1.0.64"
serde = "1.0.124" serde = "1.0.124"
sqlxmq_macros = { version = "0.2.2", path = "sqlxmq_macros" } sqlxmq_macros = { version = "0.3.0", path = "sqlxmq_macros" }
anymap2 = "0.13.0" anymap2 = "0.13.0"
[features] [features]

View file

@ -1,3 +1,5 @@
DROP FUNCTION mq_clear;
DROP FUNCTION mq_clear_all;
DROP FUNCTION mq_checkpoint; DROP FUNCTION mq_checkpoint;
DROP FUNCTION mq_keep_alive; DROP FUNCTION mq_keep_alive;
DROP FUNCTION mq_delete; DROP FUNCTION mq_delete;
@ -9,4 +11,4 @@ DROP FUNCTION mq_latest_message;
DROP TABLE mq_payloads; DROP TABLE mq_payloads;
DROP TABLE mq_msgs; DROP TABLE mq_msgs;
DROP FUNCTION mq_uuid_exists; DROP FUNCTION mq_uuid_exists;
DROP TYPE mq_new_t; DROP TYPE mq_new_t;

View file

@ -56,7 +56,7 @@ CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid
CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL; CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL;
-- Index for ensuring strict message order -- Index for ensuring strict message order
CREATE UNIQUE INDEX ON mq_msgs(channel_name, channel_args, after_message_id); CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id);
-- Large, less frequently updated table of message payloads -- Large, less frequently updated table of message payloads
@ -286,3 +286,25 @@ RETURNS VOID AS $$
WHERE WHERE
id = msg_id; id = msg_id;
$$ LANGUAGE SQL; $$ LANGUAGE SQL;
-- Deletes all messages from a list of channel names.
CREATE FUNCTION mq_clear(channel_names TEXT[])
RETURNS VOID AS $$
BEGIN
WITH deleted_ids AS (
DELETE FROM mq_msgs WHERE channel_name = ANY(channel_names) RETURNING id
)
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
END;
$$ LANGUAGE plpgsql;
-- Deletes all messages.
CREATE FUNCTION mq_clear_all()
RETURNS VOID AS $$
BEGIN
WITH deleted_ids AS (
DELETE FROM mq_msgs RETURNING id
)
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
END;
$$ LANGUAGE plpgsql;

View file

@ -1,6 +1,6 @@
[package] [package]
name = "sqlxmq_macros" name = "sqlxmq_macros"
version = "0.2.2" version = "0.3.0"
authors = ["Diggory Blake <diggsey@googlemail.com>"] authors = ["Diggory Blake <diggsey@googlemail.com>"]
edition = "2018" edition = "2018"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"

View file

@ -1,3 +1,5 @@
DROP FUNCTION mq_clear;
DROP FUNCTION mq_clear_all;
DROP FUNCTION mq_checkpoint; DROP FUNCTION mq_checkpoint;
DROP FUNCTION mq_keep_alive; DROP FUNCTION mq_keep_alive;
DROP FUNCTION mq_delete; DROP FUNCTION mq_delete;
@ -9,4 +11,4 @@ DROP FUNCTION mq_latest_message;
DROP TABLE mq_payloads; DROP TABLE mq_payloads;
DROP TABLE mq_msgs; DROP TABLE mq_msgs;
DROP FUNCTION mq_uuid_exists; DROP FUNCTION mq_uuid_exists;
DROP TYPE mq_new_t; DROP TYPE mq_new_t;

View file

@ -56,7 +56,7 @@ CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid
CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL; CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL;
-- Index for ensuring strict message order -- Index for ensuring strict message order
CREATE UNIQUE INDEX ON mq_msgs(channel_name, channel_args, after_message_id); CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id);
-- Large, less frequently updated table of message payloads -- Large, less frequently updated table of message payloads
@ -286,3 +286,25 @@ RETURNS VOID AS $$
WHERE WHERE
id = msg_id; id = msg_id;
$$ LANGUAGE SQL; $$ LANGUAGE SQL;
-- Deletes all messages from a list of channel names.
CREATE FUNCTION mq_clear(channel_names TEXT[])
RETURNS VOID AS $$
BEGIN
WITH deleted_ids AS (
DELETE FROM mq_msgs WHERE channel_name = ANY(channel_names) RETURNING id
)
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
END;
$$ LANGUAGE plpgsql;
-- Deletes all messages.
CREATE FUNCTION mq_clear_all()
RETURNS VOID AS $$
BEGIN
WITH deleted_ids AS (
DELETE FROM mq_msgs RETURNING id
)
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
END;
$$ LANGUAGE plpgsql;

View file

@ -1,5 +1,6 @@
use std::env; use std::env;
use std::error::Error; use std::error::Error;
use std::process::abort;
use std::sync::RwLock; use std::sync::RwLock;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -66,7 +67,13 @@ async fn start_job(
async fn schedule_tasks(num_jobs: usize, interval: Duration, pool: Pool<Postgres>) { async fn schedule_tasks(num_jobs: usize, interval: Duration, pool: Pool<Postgres>) {
let mut stream = tokio::time::interval(interval); let mut stream = tokio::time::interval(interval);
for i in 0..num_jobs { for i in 0..num_jobs {
task::spawn(start_job(pool.clone(), i)); let pool = pool.clone();
task::spawn(async move {
if let Err(e) = start_job(pool, i).await {
eprintln!("Failed to start job: {:?}", e);
abort();
}
});
stream.tick().await; stream.tick().await;
} }
} }
@ -77,6 +84,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
let pool = Pool::connect(&env::var("DATABASE_URL")?).await?; let pool = Pool::connect(&env::var("DATABASE_URL")?).await?;
// Make sure the queues are empty
sqlxmq::clear_all(&pool).await?;
let registry = JobRegistry::new(&[example_job]); let registry = JobRegistry::new(&[example_job]);
let _runner = registry let _runner = registry

View file

@ -234,6 +234,27 @@ pub use spawn::*;
pub use sqlxmq_macros::job; pub use sqlxmq_macros::job;
pub use utils::OwnedHandle; pub use utils::OwnedHandle;
/// Helper function to determine if a particular error condition is retryable.
///
/// For best results, database operations should be automatically retried if one
/// of these errors is returned.
pub fn should_retry(error: &sqlx::Error) -> bool {
if let Some(db_error) = error.as_database_error() {
match (db_error.code().as_deref(), db_error.constraint().as_deref()) {
// Unique constraint violation on ordered channel
(Some("23505"), Some("mq_msgs_channel_name_channel_args_after_message_id_idx")) => true,
// Serialization failure
(Some("40001"), _) => true,
// Deadlock detected
(Some("40P01"), _) => true,
// Other
_ => false,
}
} else {
false
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -369,6 +390,52 @@ mod tests {
pause().await; pause().await;
} }
#[tokio::test]
async fn it_can_clear_jobs() {
{
let pool = &*test_pool().await;
JobBuilder::new("foo")
.set_channel_name("foo")
.spawn(pool)
.await
.unwrap();
JobBuilder::new("foo")
.set_channel_name("foo")
.spawn(pool)
.await
.unwrap();
JobBuilder::new("foo")
.set_channel_name("bar")
.spawn(pool)
.await
.unwrap();
JobBuilder::new("foo")
.set_channel_name("bar")
.spawn(pool)
.await
.unwrap();
JobBuilder::new("foo")
.set_channel_name("baz")
.spawn(pool)
.await
.unwrap();
JobBuilder::new("foo")
.set_channel_name("baz")
.spawn(pool)
.await
.unwrap();
sqlxmq::clear(pool, &["foo", "baz"]).await.unwrap();
let (_runner, counter) =
test_job_runner(&pool, |mut job| async move { job.complete().await }).await;
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
}
pause().await;
}
#[tokio::test] #[tokio::test]
async fn it_runs_jobs_in_order() { async fn it_runs_jobs_in_order() {
{ {

View file

@ -149,3 +149,25 @@ pub async fn commit<'b, E: sqlx::Executor<'b, Database = Postgres>>(
.await?; .await?;
Ok(()) Ok(())
} }
/// Clear jobs from the specified queues.
pub async fn clear<'b, E: sqlx::Executor<'b, Database = Postgres>>(
executor: E,
channel_names: &[&str],
) -> Result<(), sqlx::Error> {
sqlx::query("SELECT mq_clear($1)")
.bind(channel_names)
.execute(executor)
.await?;
Ok(())
}
/// Clear jobs from the specified queues.
pub async fn clear_all<'b, E: sqlx::Executor<'b, Database = Postgres>>(
executor: E,
) -> Result<(), sqlx::Error> {
sqlx::query("SELECT mq_clear_all()")
.execute(executor)
.await?;
Ok(())
}