mirror of
https://github.com/Diggsey/sqlxmq.git
synced 2024-06-02 06:12:32 +00:00
Compare commits
31 commits
Author | SHA1 | Date | |
---|---|---|---|
387b301656 | |||
c1d7ad4a2f | |||
5e4f160d91 | |||
75e12bac6c | |||
2fdabd9a98 | |||
b332b6d826 | |||
043774fba1 | |||
1aff20c3e5 | |||
e1cbd9f551 | |||
46c03f8e99 | |||
9d07f31663 | |||
c121e6c997 | |||
bca15c561c | |||
069cfd75c7 | |||
989b11ffe0 | |||
7a13e04b21 | |||
6fde3901a4 | |||
6d3ed6fb99 | |||
5b04ca15f7 | |||
592b7e6cb1 | |||
ea408f7ab6 | |||
15cafb3590 | |||
59adc6313f | |||
262a3e29ff | |||
4718637891 | |||
ab35a58716 | |||
4c52fe9f24 | |||
05d59b0e39 | |||
9424605cfb | |||
08b71b59ae | |||
5d287b7247 |
23
.github/workflows/publish.yml
vendored
23
.github/workflows/publish.yml
vendored
|
@ -9,23 +9,10 @@ jobs:
|
|||
name: Release
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
toolchain: stable
|
||||
override: true
|
||||
- uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: login
|
||||
args: -- ${{secrets.CARGO_TOKEN}}
|
||||
- uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: publish
|
||||
args: --manifest-path sqlxmq_macros/Cargo.toml
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
- run: cargo login ${{secrets.CARGO_TOKEN}}
|
||||
- run: cargo publish --manifest-path sqlxmq_macros/Cargo.toml
|
||||
- name: Wait for crates.io to update
|
||||
run: sleep 30
|
||||
- uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: publish
|
||||
args: --manifest-path Cargo.toml
|
||||
- run: cargo publish --manifest-path Cargo.toml
|
||||
|
|
91
.github/workflows/toolchain.yml
vendored
91
.github/workflows/toolchain.yml
vendored
|
@ -7,47 +7,31 @@ jobs:
|
|||
name: Check
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
toolchain: stable
|
||||
override: true
|
||||
- uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: check
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- run: cargo check
|
||||
|
||||
fmt:
|
||||
name: Rustfmt
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions-rs/toolchain@v1
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
with:
|
||||
profile: minimal
|
||||
toolchain: stable
|
||||
override: true
|
||||
- run: rustup component add rustfmt
|
||||
- uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: fmt
|
||||
args: -- --check
|
||||
components: rustfmt
|
||||
- run: cargo fmt -- --check
|
||||
|
||||
clippy:
|
||||
name: Clippy
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions-rs/toolchain@v1
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
with:
|
||||
profile: minimal
|
||||
toolchain: stable
|
||||
override: true
|
||||
- run: rustup component add clippy
|
||||
- uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: clippy
|
||||
args: -- -D warnings
|
||||
components: clippy
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- run: cargo clippy --all-targets -- -D warnings
|
||||
|
||||
test:
|
||||
name: Test
|
||||
|
@ -55,21 +39,12 @@ jobs:
|
|||
env:
|
||||
RUST_BACKTRACE: "1"
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
toolchain: stable
|
||||
override: true
|
||||
- uses: actions-rs/install@v0.1
|
||||
with:
|
||||
crate: sqlx-cli
|
||||
use-tool-cache: true
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- run: cargo install sqlx-cli --locked
|
||||
- uses: ./.github/actions/postgres
|
||||
- uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: -- --nocapture
|
||||
- run: cargo test -- --nocapture
|
||||
|
||||
test_nightly:
|
||||
name: Test (Nightly)
|
||||
|
@ -77,18 +52,20 @@ jobs:
|
|||
env:
|
||||
RUST_BACKTRACE: "1"
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
toolchain: nightly
|
||||
override: true
|
||||
- uses: actions-rs/install@v0.1
|
||||
with:
|
||||
crate: sqlx-cli
|
||||
use-tool-cache: true
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@nightly
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- run: cargo install sqlx-cli --locked
|
||||
- uses: ./.github/actions/postgres
|
||||
- uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: -- --nocapture
|
||||
- run: cargo test -- --nocapture
|
||||
|
||||
readme:
|
||||
name: Readme
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: dtolnay/rust-toolchain@stable
|
||||
- uses: Swatinem/rust-cache@v2
|
||||
- run: cargo install cargo-sync-readme --locked
|
||||
- name: Sync readme
|
||||
run: cargo sync-readme -c
|
||||
|
|
5
.vscode/settings.json
vendored
5
.vscode/settings.json
vendored
|
@ -1,4 +1,7 @@
|
|||
{
|
||||
"rust-analyzer.checkOnSave.allFeatures": false,
|
||||
"rust-analyzer.cargo.allFeatures": false
|
||||
"rust-analyzer.cargo.allFeatures": false,
|
||||
"rust-analyzer.cargo.features": [
|
||||
"runtime-tokio-native-tls"
|
||||
]
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "sqlxmq"
|
||||
version = "0.2.2"
|
||||
version = "0.5.0"
|
||||
authors = ["Diggory Blake <diggsey@googlemail.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT OR Apache-2.0"
|
||||
|
@ -15,15 +15,15 @@ members = ["sqlxmq_macros", "sqlxmq_stress"]
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
sqlx = { version = "0.5.2", features = ["postgres", "chrono", "uuid"] }
|
||||
sqlx = { version = "0.7.1", features = ["postgres", "chrono", "uuid"] }
|
||||
tokio = { version = "1.8.3", features = ["full"] }
|
||||
dotenv = "0.15.0"
|
||||
chrono = "0.4.19"
|
||||
uuid = { version = "0.8.2", features = ["v4"] }
|
||||
uuid = { version = "1.1.2", features = ["v4"] }
|
||||
log = "0.4.14"
|
||||
serde_json = "1.0.64"
|
||||
serde = "1.0.124"
|
||||
sqlxmq_macros = { version = "0.2.2", path = "sqlxmq_macros" }
|
||||
sqlxmq_macros = { version = "0.5.0", path = "sqlxmq_macros" }
|
||||
anymap2 = "0.13.0"
|
||||
|
||||
[features]
|
||||
|
@ -35,3 +35,4 @@ runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls"]
|
|||
dotenv = "0.15.0"
|
||||
pretty_env_logger = "0.4.0"
|
||||
futures = "0.3.13"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
|
|
38
README.md
38
README.md
|
@ -1,9 +1,11 @@
|
|||
# sqlxmq
|
||||
|
||||
[![CI Status](https://github.com/Diggsey/sqlxmq/workflows/CI/badge.svg)](https://github.com/Diggsey/sqlxmq/actions?query=workflow%3ACI)
|
||||
[![Documentation](https://docs.rs/sqlxmq/badge.svg)](https://docs.rs/sqlxmq)
|
||||
[![crates.io](https://img.shields.io/crates/v/sqlxmq.svg)](https://crates.io/crates/sqlxmq)
|
||||
|
||||
<!-- cargo-sync-readme start -->
|
||||
|
||||
# sqlxmq
|
||||
|
||||
A job queue built on `sqlx` and `PostgreSQL`.
|
||||
|
||||
This library allows a CRUD application to run background jobs without complicating its
|
||||
|
@ -122,6 +124,8 @@ to conflict with your own schema.
|
|||
The first step is to define a function to be run on the job queue.
|
||||
|
||||
```rust
|
||||
use std::error::Error;
|
||||
|
||||
use sqlxmq::{job, CurrentJob};
|
||||
|
||||
// Arguments to the `#[job]` attribute allow setting default job options.
|
||||
|
@ -130,9 +134,9 @@ async fn example_job(
|
|||
// The first argument should always be the current job.
|
||||
mut current_job: CurrentJob,
|
||||
// Additional arguments are optional, but can be used to access context
|
||||
// provided via `JobRegistry::set_context`.
|
||||
// provided via [`JobRegistry::set_context`].
|
||||
message: &'static str,
|
||||
) -> sqlx::Result<()> {
|
||||
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
|
||||
// Decode a JSON payload
|
||||
let who: Option<String> = current_job.json()?;
|
||||
|
||||
|
@ -151,9 +155,12 @@ async fn example_job(
|
|||
Next we need to create a job runner: this is what listens for new jobs
|
||||
and executes them.
|
||||
|
||||
```rust
|
||||
```rust,no_run
|
||||
use std::error::Error;
|
||||
|
||||
use sqlxmq::JobRegistry;
|
||||
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
// You'll need to provide a Postgres connection pool.
|
||||
|
@ -179,6 +186,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
|
||||
// The job runner will continue listening and running
|
||||
// jobs until `runner` is dropped.
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -190,7 +198,25 @@ The final step is to actually run a job.
|
|||
example_job.builder()
|
||||
// This is where we can override job configuration
|
||||
.set_channel_name("bar")
|
||||
.set_json("John")
|
||||
.set_json("John")?
|
||||
.spawn(&pool)
|
||||
.await?;
|
||||
```
|
||||
|
||||
<!-- cargo-sync-readme end -->
|
||||
|
||||
## Note on README
|
||||
|
||||
Most of the readme is automatically copied from the crate documentation by [cargo-readme-sync][].
|
||||
This way the readme is always in sync with the docs and examples are tested.
|
||||
|
||||
So if you find a part of the readme you'd like to change between `<!-- cargo-sync-readme start -->`
|
||||
and `<!-- cargo-sync-readme end -->` markers, don't edit `README.md` directly, but rather change
|
||||
the documentation on top of `src/lib.rs` and then synchronize the readme with:
|
||||
```bash
|
||||
cargo sync-readme
|
||||
```
|
||||
(make sure the cargo command is installed):
|
||||
```bash
|
||||
cargo install cargo-sync-readme
|
||||
|
||||
|
|
37
examples/graceful-shutdown.rs
Normal file
37
examples/graceful-shutdown.rs
Normal file
|
@ -0,0 +1,37 @@
|
|||
use sqlxmq::{job, CurrentJob, JobRegistry};
|
||||
use std::time::Duration;
|
||||
use tokio::time;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
dotenv::dotenv().ok();
|
||||
let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL").unwrap()).await?;
|
||||
|
||||
sleep.builder().set_json(&5u64)?.spawn(&db).await?;
|
||||
|
||||
let mut handle = JobRegistry::new(&[sleep]).runner(&db).run().await?;
|
||||
|
||||
// Let's emulate a stop signal in a couple of seconts after running the job
|
||||
time::sleep(Duration::from_secs(2)).await;
|
||||
println!("A stop signal received");
|
||||
|
||||
// Stop listening for new jobs
|
||||
handle.stop().await;
|
||||
|
||||
// Wait for the running jobs to stop for maximum 10 seconds
|
||||
handle.wait_jobs_finish(Duration::from_secs(10)).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[job]
|
||||
pub async fn sleep(mut job: CurrentJob) -> sqlx::Result<()> {
|
||||
let second = Duration::from_secs(1);
|
||||
let mut to_sleep: u64 = job.json().unwrap().unwrap();
|
||||
while to_sleep > 0 {
|
||||
println!("job#{} {to_sleep} more seconds to sleep ...", job.id());
|
||||
time::sleep(second).await;
|
||||
to_sleep -= 1;
|
||||
}
|
||||
job.complete().await
|
||||
}
|
|
@ -9,4 +9,4 @@ DROP FUNCTION mq_latest_message;
|
|||
DROP TABLE mq_payloads;
|
||||
DROP TABLE mq_msgs;
|
||||
DROP FUNCTION mq_uuid_exists;
|
||||
DROP TYPE mq_new_t;
|
||||
DROP TYPE mq_new_t;
|
||||
|
|
|
@ -56,7 +56,7 @@ CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid
|
|||
CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL;
|
||||
|
||||
-- Index for ensuring strict message order
|
||||
CREATE UNIQUE INDEX ON mq_msgs(channel_name, channel_args, after_message_id);
|
||||
CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id);
|
||||
|
||||
|
||||
-- Large, less frequently updated table of message payloads
|
||||
|
@ -286,3 +286,4 @@ RETURNS VOID AS $$
|
|||
WHERE
|
||||
id = msg_id;
|
||||
$$ LANGUAGE SQL;
|
||||
|
||||
|
|
2
migrations/20210921115907_clear.down.sql
Normal file
2
migrations/20210921115907_clear.down.sql
Normal file
|
@ -0,0 +1,2 @@
|
|||
DROP FUNCTION mq_clear;
|
||||
DROP FUNCTION mq_clear_all;
|
21
migrations/20210921115907_clear.up.sql
Normal file
21
migrations/20210921115907_clear.up.sql
Normal file
|
@ -0,0 +1,21 @@
|
|||
-- Deletes all messages from a list of channel names.
|
||||
CREATE FUNCTION mq_clear(channel_names TEXT[])
|
||||
RETURNS VOID AS $$
|
||||
BEGIN
|
||||
WITH deleted_ids AS (
|
||||
DELETE FROM mq_msgs WHERE channel_name = ANY(channel_names) RETURNING id
|
||||
)
|
||||
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Deletes all messages.
|
||||
CREATE FUNCTION mq_clear_all()
|
||||
RETURNS VOID AS $$
|
||||
BEGIN
|
||||
WITH deleted_ids AS (
|
||||
DELETE FROM mq_msgs RETURNING id
|
||||
)
|
||||
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
15
migrations/20211013151757_fix_mq_latest_message.down.sql
Normal file
15
migrations/20211013151757_fix_mq_latest_message.down.sql
Normal file
|
@ -0,0 +1,15 @@
|
|||
CREATE OR REPLACE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT)
|
||||
RETURNS UUID AS $$
|
||||
SELECT COALESCE(
|
||||
(
|
||||
SELECT id FROM mq_msgs
|
||||
WHERE channel_name = from_channel_name
|
||||
AND channel_args = from_channel_args
|
||||
AND after_message_id IS NOT NULL
|
||||
AND id != uuid_nil()
|
||||
ORDER BY created_at DESC, id DESC
|
||||
LIMIT 1
|
||||
),
|
||||
uuid_nil()
|
||||
)
|
||||
$$ LANGUAGE SQL STABLE;
|
19
migrations/20211013151757_fix_mq_latest_message.up.sql
Normal file
19
migrations/20211013151757_fix_mq_latest_message.up.sql
Normal file
|
@ -0,0 +1,19 @@
|
|||
CREATE OR REPLACE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT)
|
||||
RETURNS UUID AS $$
|
||||
SELECT COALESCE(
|
||||
(
|
||||
SELECT id FROM mq_msgs
|
||||
WHERE channel_name = from_channel_name
|
||||
AND channel_args = from_channel_args
|
||||
AND after_message_id IS NOT NULL
|
||||
AND id != uuid_nil()
|
||||
AND NOT EXISTS(
|
||||
SELECT * FROM mq_msgs AS mq_msgs2
|
||||
WHERE mq_msgs2.after_message_id = mq_msgs.id
|
||||
)
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
),
|
||||
uuid_nil()
|
||||
)
|
||||
$$ LANGUAGE SQL STABLE;
|
60
migrations/20220208120856_fix_concurrent_poll.down.sql
Normal file
60
migrations/20220208120856_fix_concurrent_poll.down.sql
Normal file
|
@ -0,0 +1,60 @@
|
|||
-- Main entry-point for job runner: pulls a batch of messages from the queue.
|
||||
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
|
||||
RETURNS TABLE(
|
||||
id UUID,
|
||||
is_committed BOOLEAN,
|
||||
name TEXT,
|
||||
payload_json TEXT,
|
||||
payload_bytes BYTEA,
|
||||
retry_backoff INTERVAL,
|
||||
wait_time INTERVAL
|
||||
) AS $$
|
||||
BEGIN
|
||||
RETURN QUERY UPDATE mq_msgs
|
||||
SET
|
||||
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
|
||||
attempts = mq_msgs.attempts - 1,
|
||||
retry_backoff = mq_msgs.retry_backoff * 2
|
||||
FROM (
|
||||
SELECT
|
||||
msgs.id
|
||||
FROM mq_active_channels(channel_names, batch_size) AS active_channels
|
||||
INNER JOIN LATERAL (
|
||||
SELECT * FROM mq_msgs
|
||||
WHERE mq_msgs.id != uuid_nil()
|
||||
AND mq_msgs.attempt_at <= NOW()
|
||||
AND mq_msgs.channel_name = active_channels.name
|
||||
AND mq_msgs.channel_args = active_channels.args
|
||||
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||
ORDER BY mq_msgs.attempt_at ASC
|
||||
LIMIT batch_size
|
||||
) AS msgs ON TRUE
|
||||
LIMIT batch_size
|
||||
) AS messages_to_update
|
||||
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
|
||||
WHERE mq_msgs.id = messages_to_update.id
|
||||
RETURNING
|
||||
mq_msgs.id,
|
||||
mq_msgs.commit_interval IS NULL,
|
||||
mq_payloads.name,
|
||||
mq_payloads.payload_json::TEXT,
|
||||
mq_payloads.payload_bytes,
|
||||
mq_msgs.retry_backoff / 2,
|
||||
interval '0' AS wait_time;
|
||||
|
||||
IF NOT FOUND THEN
|
||||
RETURN QUERY SELECT
|
||||
NULL::UUID,
|
||||
NULL::BOOLEAN,
|
||||
NULL::TEXT,
|
||||
NULL::TEXT,
|
||||
NULL::BYTEA,
|
||||
NULL::INTERVAL,
|
||||
MIN(mq_msgs.attempt_at) - NOW()
|
||||
FROM mq_msgs
|
||||
WHERE mq_msgs.id != uuid_nil()
|
||||
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
62
migrations/20220208120856_fix_concurrent_poll.up.sql
Normal file
62
migrations/20220208120856_fix_concurrent_poll.up.sql
Normal file
|
@ -0,0 +1,62 @@
|
|||
|
||||
-- Main entry-point for job runner: pulls a batch of messages from the queue.
|
||||
CREATE OR REPLACE FUNCTION mq_poll(channel_names TEXT[], batch_size INT DEFAULT 1)
|
||||
RETURNS TABLE(
|
||||
id UUID,
|
||||
is_committed BOOLEAN,
|
||||
name TEXT,
|
||||
payload_json TEXT,
|
||||
payload_bytes BYTEA,
|
||||
retry_backoff INTERVAL,
|
||||
wait_time INTERVAL
|
||||
) AS $$
|
||||
BEGIN
|
||||
RETURN QUERY UPDATE mq_msgs
|
||||
SET
|
||||
attempt_at = CASE WHEN mq_msgs.attempts = 1 THEN NULL ELSE NOW() + mq_msgs.retry_backoff END,
|
||||
attempts = mq_msgs.attempts - 1,
|
||||
retry_backoff = mq_msgs.retry_backoff * 2
|
||||
FROM (
|
||||
SELECT
|
||||
msgs.id
|
||||
FROM mq_active_channels(channel_names, batch_size) AS active_channels
|
||||
INNER JOIN LATERAL (
|
||||
SELECT mq_msgs.id FROM mq_msgs
|
||||
WHERE mq_msgs.id != uuid_nil()
|
||||
AND mq_msgs.attempt_at <= NOW()
|
||||
AND mq_msgs.channel_name = active_channels.name
|
||||
AND mq_msgs.channel_args = active_channels.args
|
||||
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||
ORDER BY mq_msgs.attempt_at ASC
|
||||
LIMIT batch_size
|
||||
) AS msgs ON TRUE
|
||||
LIMIT batch_size
|
||||
) AS messages_to_update
|
||||
LEFT JOIN mq_payloads ON mq_payloads.id = messages_to_update.id
|
||||
WHERE mq_msgs.id = messages_to_update.id
|
||||
AND mq_msgs.attempt_at <= NOW()
|
||||
RETURNING
|
||||
mq_msgs.id,
|
||||
mq_msgs.commit_interval IS NULL,
|
||||
mq_payloads.name,
|
||||
mq_payloads.payload_json::TEXT,
|
||||
mq_payloads.payload_bytes,
|
||||
mq_msgs.retry_backoff / 2,
|
||||
interval '0' AS wait_time;
|
||||
|
||||
IF NOT FOUND THEN
|
||||
RETURN QUERY SELECT
|
||||
NULL::UUID,
|
||||
NULL::BOOLEAN,
|
||||
NULL::TEXT,
|
||||
NULL::TEXT,
|
||||
NULL::BYTEA,
|
||||
NULL::INTERVAL,
|
||||
MIN(mq_msgs.attempt_at) - NOW()
|
||||
FROM mq_msgs
|
||||
WHERE mq_msgs.id != uuid_nil()
|
||||
AND NOT mq_uuid_exists(mq_msgs.after_message_id)
|
||||
AND (channel_names IS NULL OR mq_msgs.channel_name = ANY(channel_names));
|
||||
END IF;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
|
@ -0,0 +1 @@
|
|||
-- Add down migration script here
|
|
@ -0,0 +1,29 @@
|
|||
CREATE OR REPLACE FUNCTION mq_clear(channel_names TEXT[])
|
||||
RETURNS VOID AS $$
|
||||
BEGIN
|
||||
WITH deleted_ids AS (
|
||||
DELETE FROM mq_msgs
|
||||
WHERE channel_name = ANY(channel_names)
|
||||
AND id != uuid_nil()
|
||||
RETURNING id
|
||||
)
|
||||
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
COMMENT ON FUNCTION mq_clear IS
|
||||
'Deletes all messages with corresponding payloads from a list of channel names';
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION mq_clear_all()
|
||||
RETURNS VOID AS $$
|
||||
BEGIN
|
||||
WITH deleted_ids AS (
|
||||
DELETE FROM mq_msgs
|
||||
WHERE id != uuid_nil()
|
||||
RETURNING id
|
||||
)
|
||||
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
COMMENT ON FUNCTION mq_clear_all IS
|
||||
'Deletes all messages with corresponding payloads';
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "sqlxmq_macros"
|
||||
version = "0.2.2"
|
||||
version = "0.5.0"
|
||||
authors = ["Diggory Blake <diggsey@googlemail.com>"]
|
||||
edition = "2018"
|
||||
license = "MIT OR Apache-2.0"
|
||||
|
@ -14,5 +14,6 @@ documentation = "https://docs.rs/sqlxmq"
|
|||
proc-macro = true
|
||||
|
||||
[dependencies]
|
||||
syn = { version = "1.0.64", features = ["full"] }
|
||||
quote = "1.0.9"
|
||||
syn = { version = "1.0.80", features = ["full"] }
|
||||
quote = "1.0.10"
|
||||
proc-macro2 = "1.0.30"
|
||||
|
|
|
@ -6,10 +6,12 @@
|
|||
use std::mem;
|
||||
|
||||
use proc_macro::TokenStream;
|
||||
use quote::quote;
|
||||
use proc_macro2::TokenStream as TokenStream2;
|
||||
use quote::{quote, ToTokens, TokenStreamExt};
|
||||
use syn::{
|
||||
parse_macro_input, parse_quote, AttributeArgs, Error, ItemFn, Lit, Meta, NestedMeta, Path,
|
||||
Result, Visibility,
|
||||
parse::{Parse, ParseStream},
|
||||
parse_macro_input, parse_quote, AttrStyle, Attribute, AttributeArgs, Error, Lit, Meta,
|
||||
NestedMeta, Path, Result, Signature, Visibility,
|
||||
};
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -90,6 +92,44 @@ fn interpret_job_arg(options: &mut JobOptions, arg: NestedMeta) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MaybeItemFn {
|
||||
attrs: Vec<Attribute>,
|
||||
vis: Visibility,
|
||||
sig: Signature,
|
||||
block: TokenStream2,
|
||||
}
|
||||
|
||||
/// This parses a `TokenStream` into a `MaybeItemFn`
|
||||
/// (just like `ItemFn`, but skips parsing the body).
|
||||
impl Parse for MaybeItemFn {
|
||||
fn parse(input: ParseStream<'_>) -> syn::Result<Self> {
|
||||
let attrs = input.call(syn::Attribute::parse_outer)?;
|
||||
let vis: Visibility = input.parse()?;
|
||||
let sig: Signature = input.parse()?;
|
||||
let block: TokenStream2 = input.parse()?;
|
||||
Ok(Self {
|
||||
attrs,
|
||||
vis,
|
||||
sig,
|
||||
block,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl ToTokens for MaybeItemFn {
|
||||
fn to_tokens(&self, tokens: &mut TokenStream2) {
|
||||
tokens.append_all(
|
||||
self.attrs
|
||||
.iter()
|
||||
.filter(|attr| matches!(attr.style, AttrStyle::Outer)),
|
||||
);
|
||||
self.vis.to_tokens(tokens);
|
||||
self.sig.to_tokens(tokens);
|
||||
self.block.to_tokens(tokens);
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks a function as being a background job.
|
||||
///
|
||||
/// The first argument to the function must have type `CurrentJob`.
|
||||
|
@ -181,7 +221,7 @@ fn interpret_job_arg(options: &mut JobOptions, arg: NestedMeta) -> Result<()> {
|
|||
#[proc_macro_attribute]
|
||||
pub fn job(attr: TokenStream, item: TokenStream) -> TokenStream {
|
||||
let args = parse_macro_input!(attr as AttributeArgs);
|
||||
let mut inner_fn = parse_macro_input!(item as ItemFn);
|
||||
let mut inner_fn = parse_macro_input!(item as MaybeItemFn);
|
||||
|
||||
let mut options = JobOptions::default();
|
||||
let mut errors = Vec::new();
|
||||
|
@ -191,6 +231,11 @@ pub fn job(attr: TokenStream, item: TokenStream) -> TokenStream {
|
|||
}
|
||||
}
|
||||
|
||||
let outer_docs = inner_fn
|
||||
.attrs
|
||||
.iter()
|
||||
.filter(|attr| attr.path.is_ident("doc"));
|
||||
|
||||
let vis = mem::replace(&mut inner_fn.vis, Visibility::Inherited);
|
||||
let name = mem::replace(&mut inner_fn.sig.ident, parse_quote! {inner});
|
||||
let fq_name = if let Some(name) = options.name {
|
||||
|
@ -241,6 +286,7 @@ pub fn job(attr: TokenStream, item: TokenStream) -> TokenStream {
|
|||
|
||||
let expanded = quote! {
|
||||
#(#errors)*
|
||||
#(#outer_docs)*
|
||||
#[allow(non_upper_case_globals)]
|
||||
#vis static #name: &'static sqlxmq::NamedJob = &{
|
||||
#inner_fn
|
||||
|
|
|
@ -10,7 +10,7 @@ edition = "2018"
|
|||
sqlxmq = { path = ".." }
|
||||
tokio = { version = "1.4.0", features = ["full"] }
|
||||
dotenv = "0.15.0"
|
||||
sqlx = "0.5.1"
|
||||
sqlx = "0.6.0"
|
||||
serde = "1.0.125"
|
||||
lazy_static = "1.4.0"
|
||||
futures = "0.3.13"
|
||||
|
|
|
@ -9,4 +9,4 @@ DROP FUNCTION mq_latest_message;
|
|||
DROP TABLE mq_payloads;
|
||||
DROP TABLE mq_msgs;
|
||||
DROP FUNCTION mq_uuid_exists;
|
||||
DROP TYPE mq_new_t;
|
||||
DROP TYPE mq_new_t;
|
||||
|
|
|
@ -56,7 +56,7 @@ CREATE INDEX ON mq_msgs(channel_name, channel_args, attempt_at) WHERE id != uuid
|
|||
CREATE INDEX ON mq_msgs(channel_name, channel_args, created_at, id) WHERE id != uuid_nil() AND after_message_id IS NOT NULL;
|
||||
|
||||
-- Index for ensuring strict message order
|
||||
CREATE UNIQUE INDEX ON mq_msgs(channel_name, channel_args, after_message_id);
|
||||
CREATE UNIQUE INDEX mq_msgs_channel_name_channel_args_after_message_id_idx ON mq_msgs(channel_name, channel_args, after_message_id);
|
||||
|
||||
|
||||
-- Large, less frequently updated table of message payloads
|
||||
|
@ -286,3 +286,4 @@ RETURNS VOID AS $$
|
|||
WHERE
|
||||
id = msg_id;
|
||||
$$ LANGUAGE SQL;
|
||||
|
||||
|
|
2
sqlxmq_stress/migrations/20210921115907_clear.down.sql
Normal file
2
sqlxmq_stress/migrations/20210921115907_clear.down.sql
Normal file
|
@ -0,0 +1,2 @@
|
|||
DROP FUNCTION mq_clear;
|
||||
DROP FUNCTION mq_clear_all;
|
21
sqlxmq_stress/migrations/20210921115907_clear.up.sql
Normal file
21
sqlxmq_stress/migrations/20210921115907_clear.up.sql
Normal file
|
@ -0,0 +1,21 @@
|
|||
-- Deletes all messages from a list of channel names.
|
||||
CREATE FUNCTION mq_clear(channel_names TEXT[])
|
||||
RETURNS VOID AS $$
|
||||
BEGIN
|
||||
WITH deleted_ids AS (
|
||||
DELETE FROM mq_msgs WHERE channel_name = ANY(channel_names) RETURNING id
|
||||
)
|
||||
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Deletes all messages.
|
||||
CREATE FUNCTION mq_clear_all()
|
||||
RETURNS VOID AS $$
|
||||
BEGIN
|
||||
WITH deleted_ids AS (
|
||||
DELETE FROM mq_msgs RETURNING id
|
||||
)
|
||||
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
|
@ -0,0 +1,15 @@
|
|||
CREATE OR REPLACE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT)
|
||||
RETURNS UUID AS $$
|
||||
SELECT COALESCE(
|
||||
(
|
||||
SELECT id FROM mq_msgs
|
||||
WHERE channel_name = from_channel_name
|
||||
AND channel_args = from_channel_args
|
||||
AND after_message_id IS NOT NULL
|
||||
AND id != uuid_nil()
|
||||
ORDER BY created_at DESC, id DESC
|
||||
LIMIT 1
|
||||
),
|
||||
uuid_nil()
|
||||
)
|
||||
$$ LANGUAGE SQL STABLE;
|
|
@ -0,0 +1,19 @@
|
|||
CREATE OR REPLACE FUNCTION mq_latest_message(from_channel_name TEXT, from_channel_args TEXT)
|
||||
RETURNS UUID AS $$
|
||||
SELECT COALESCE(
|
||||
(
|
||||
SELECT id FROM mq_msgs
|
||||
WHERE channel_name = from_channel_name
|
||||
AND channel_args = from_channel_args
|
||||
AND after_message_id IS NOT NULL
|
||||
AND id != uuid_nil()
|
||||
AND NOT EXISTS(
|
||||
SELECT * FROM mq_msgs AS mq_msgs2
|
||||
WHERE mq_msgs2.after_message_id = mq_msgs.id
|
||||
)
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
),
|
||||
uuid_nil()
|
||||
)
|
||||
$$ LANGUAGE SQL STABLE;
|
|
@ -1,5 +1,6 @@
|
|||
use std::env;
|
||||
use std::error::Error;
|
||||
use std::process::abort;
|
||||
use std::sync::RwLock;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
|
@ -66,7 +67,13 @@ async fn start_job(
|
|||
async fn schedule_tasks(num_jobs: usize, interval: Duration, pool: Pool<Postgres>) {
|
||||
let mut stream = tokio::time::interval(interval);
|
||||
for i in 0..num_jobs {
|
||||
task::spawn(start_job(pool.clone(), i));
|
||||
let pool = pool.clone();
|
||||
task::spawn(async move {
|
||||
if let Err(e) = start_job(pool, i).await {
|
||||
eprintln!("Failed to start job: {:?}", e);
|
||||
abort();
|
||||
}
|
||||
});
|
||||
stream.tick().await;
|
||||
}
|
||||
}
|
||||
|
@ -77,6 +84,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
|
||||
let pool = Pool::connect(&env::var("DATABASE_URL")?).await?;
|
||||
|
||||
// Make sure the queues are empty
|
||||
sqlxmq::clear_all(&pool).await?;
|
||||
|
||||
let registry = JobRegistry::new(&[example_job]);
|
||||
|
||||
let _runner = registry
|
||||
|
|
106
src/lib.rs
106
src/lib.rs
|
@ -234,6 +234,31 @@ pub use spawn::*;
|
|||
pub use sqlxmq_macros::job;
|
||||
pub use utils::OwnedHandle;
|
||||
|
||||
/// Helper function to determine if a particular error condition is retryable.
|
||||
///
|
||||
/// For best results, database operations should be automatically retried if one
|
||||
/// of these errors is returned.
|
||||
pub fn should_retry(error: &sqlx::Error) -> bool {
|
||||
if let Some(db_error) = error.as_database_error() {
|
||||
// It's more readable as a match
|
||||
#[allow(clippy::match_like_matches_macro)]
|
||||
match (db_error.code().as_deref(), db_error.constraint()) {
|
||||
// Foreign key constraint violation on ordered channel
|
||||
(Some("23503"), Some("mq_msgs_after_message_id_fkey")) => true,
|
||||
// Unique constraint violation on ordered channel
|
||||
(Some("23505"), Some("mq_msgs_channel_name_channel_args_after_message_id_idx")) => true,
|
||||
// Serialization failure
|
||||
(Some("40001"), _) => true,
|
||||
// Deadlock detected
|
||||
(Some("40P01"), _) => true,
|
||||
// Other
|
||||
_ => false,
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
@ -271,7 +296,7 @@ mod tests {
|
|||
|
||||
let _ = dotenv::dotenv();
|
||||
|
||||
INIT_LOGGER.call_once(|| pretty_env_logger::init());
|
||||
INIT_LOGGER.call_once(pretty_env_logger::init);
|
||||
|
||||
let pool = Pool::connect(&env::var("DATABASE_URL").unwrap())
|
||||
.await
|
||||
|
@ -292,7 +317,7 @@ mod tests {
|
|||
async fn test_job_runner<F: Future + Send + 'static>(
|
||||
pool: &Pool<Postgres>,
|
||||
f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static,
|
||||
) -> (OwnedHandle, Arc<AtomicUsize>)
|
||||
) -> (JobRunnerHandle, Arc<AtomicUsize>)
|
||||
where
|
||||
F::Output: Send + 'static,
|
||||
{
|
||||
|
@ -340,14 +365,26 @@ mod tests {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn named_job_runner(pool: &Pool<Postgres>) -> OwnedHandle {
|
||||
async fn named_job_runner(pool: &Pool<Postgres>) -> JobRunnerHandle {
|
||||
let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]);
|
||||
registry.set_context(42).set_context("Hello, world!");
|
||||
registry.runner(pool).run().await.unwrap()
|
||||
}
|
||||
|
||||
fn is_ci() -> bool {
|
||||
std::env::var("CI").ok().is_some()
|
||||
}
|
||||
|
||||
fn default_pause() -> u64 {
|
||||
if is_ci() {
|
||||
1000
|
||||
} else {
|
||||
200
|
||||
}
|
||||
}
|
||||
|
||||
async fn pause() {
|
||||
pause_ms(200).await;
|
||||
pause_ms(default_pause()).await;
|
||||
}
|
||||
|
||||
async fn pause_ms(ms: u64) {
|
||||
|
@ -359,7 +396,7 @@ mod tests {
|
|||
{
|
||||
let pool = &*test_pool().await;
|
||||
let (_runner, counter) =
|
||||
test_job_runner(&pool, |mut job| async move { job.complete().await }).await;
|
||||
test_job_runner(pool, |mut job| async move { job.complete().await }).await;
|
||||
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
||||
JobBuilder::new("foo").spawn(pool).await.unwrap();
|
||||
|
@ -369,13 +406,59 @@ mod tests {
|
|||
pause().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn it_can_clear_jobs() {
|
||||
{
|
||||
let pool = &*test_pool().await;
|
||||
JobBuilder::new("foo")
|
||||
.set_channel_name("foo")
|
||||
.spawn(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
JobBuilder::new("foo")
|
||||
.set_channel_name("foo")
|
||||
.spawn(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
JobBuilder::new("foo")
|
||||
.set_channel_name("bar")
|
||||
.spawn(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
JobBuilder::new("foo")
|
||||
.set_channel_name("bar")
|
||||
.spawn(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
JobBuilder::new("foo")
|
||||
.set_channel_name("baz")
|
||||
.spawn(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
JobBuilder::new("foo")
|
||||
.set_channel_name("baz")
|
||||
.spawn(pool)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
sqlxmq::clear(pool, &["foo", "baz"]).await.unwrap();
|
||||
|
||||
let (_runner, counter) =
|
||||
test_job_runner(pool, |mut job| async move { job.complete().await }).await;
|
||||
|
||||
pause().await;
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
||||
}
|
||||
pause().await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn it_runs_jobs_in_order() {
|
||||
{
|
||||
let pool = &*test_pool().await;
|
||||
let (tx, mut rx) = mpsc::unbounded();
|
||||
|
||||
let (_runner, counter) = test_job_runner(&pool, move |job| {
|
||||
let (_runner, counter) = test_job_runner(pool, move |job| {
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
tx.unbounded_send(job).unwrap();
|
||||
|
@ -413,7 +496,7 @@ mod tests {
|
|||
let pool = &*test_pool().await;
|
||||
let (tx, mut rx) = mpsc::unbounded();
|
||||
|
||||
let (_runner, counter) = test_job_runner(&pool, move |job| {
|
||||
let (_runner, counter) = test_job_runner(pool, move |job| {
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
tx.unbounded_send(job).unwrap();
|
||||
|
@ -440,9 +523,9 @@ mod tests {
|
|||
async fn it_retries_failed_jobs() {
|
||||
{
|
||||
let pool = &*test_pool().await;
|
||||
let (_runner, counter) = test_job_runner(&pool, move |_| async {}).await;
|
||||
let (_runner, counter) = test_job_runner(pool, move |_| async {}).await;
|
||||
|
||||
let backoff = 500;
|
||||
let backoff = default_pause() + 300;
|
||||
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
||||
JobBuilder::new("foo")
|
||||
|
@ -477,7 +560,7 @@ mod tests {
|
|||
async fn it_can_checkpoint_jobs() {
|
||||
{
|
||||
let pool = &*test_pool().await;
|
||||
let (_runner, counter) = test_job_runner(&pool, move |mut current_job| async move {
|
||||
let (_runner, counter) = test_job_runner(pool, move |mut current_job| async move {
|
||||
let state: bool = current_job.json().unwrap().unwrap();
|
||||
if state {
|
||||
current_job.complete().await.unwrap();
|
||||
|
@ -490,7 +573,7 @@ mod tests {
|
|||
})
|
||||
.await;
|
||||
|
||||
let backoff = 200;
|
||||
let backoff = default_pause();
|
||||
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 0);
|
||||
JobBuilder::new("foo")
|
||||
|
@ -508,7 +591,6 @@ mod tests {
|
|||
|
||||
// Second attempt
|
||||
pause_ms(backoff).await;
|
||||
pause().await;
|
||||
assert_eq!(counter.load(Ordering::SeqCst), 2);
|
||||
|
||||
// No more attempts
|
||||
|
|
|
@ -4,6 +4,7 @@ use std::error::Error;
|
|||
use std::fmt::Display;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use anymap2::any::CloneAnySendSync;
|
||||
use anymap2::Map;
|
||||
|
@ -14,10 +15,13 @@ use crate::hidden::{BuildFn, RunFn};
|
|||
use crate::utils::Opaque;
|
||||
use crate::{JobBuilder, JobRunnerOptions};
|
||||
|
||||
type BoxedError = Box<dyn Error + Send + 'static>;
|
||||
|
||||
/// Stores a mapping from job name to job. Can be used to construct
|
||||
/// a job runner.
|
||||
pub struct JobRegistry {
|
||||
error_handler: Arc<dyn Fn(&str, Box<dyn Error + Send + 'static>) + Send + Sync>,
|
||||
#[allow(clippy::type_complexity)]
|
||||
error_handler: Arc<dyn Fn(&str, BoxedError) + Send + Sync>,
|
||||
job_map: HashMap<&'static str, &'static NamedJob>,
|
||||
context: Map<dyn CloneAnySendSync + Send + Sync>,
|
||||
}
|
||||
|
@ -52,7 +56,7 @@ impl JobRegistry {
|
|||
/// Set a function to be called whenever a job returns an error.
|
||||
pub fn set_error_handler(
|
||||
&mut self,
|
||||
error_handler: impl Fn(&str, Box<dyn Error + Send + 'static>) + Send + Sync + 'static,
|
||||
error_handler: impl Fn(&str, BoxedError) + Send + Sync + 'static,
|
||||
) -> &mut Self {
|
||||
self.error_handler = Arc::new(error_handler);
|
||||
self
|
||||
|
@ -82,8 +86,8 @@ impl JobRegistry {
|
|||
}
|
||||
|
||||
/// The default error handler implementation, which simply logs the error.
|
||||
pub fn default_error_handler(name: &str, error: Box<dyn Error + Send + 'static>) {
|
||||
log::error!("Job {} failed: {}", name, error);
|
||||
pub fn default_error_handler(name: &str, error: BoxedError) {
|
||||
log::error!("Job `{}` failed: {}", name, error);
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
|
@ -94,8 +98,16 @@ impl JobRegistry {
|
|||
) {
|
||||
let error_handler = self.error_handler.clone();
|
||||
tokio::spawn(async move {
|
||||
let start_time = Instant::now();
|
||||
log::info!("Job `{}` started.", name);
|
||||
if let Err(e) = f.await {
|
||||
error_handler(name, e.into());
|
||||
} else {
|
||||
log::info!(
|
||||
"Job `{}` completed in {}s.",
|
||||
name,
|
||||
start_time.elapsed().as_secs_f64()
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
100
src/runner.rs
100
src/runner.rs
|
@ -2,13 +2,13 @@ use std::borrow::Cow;
|
|||
use std::fmt::Debug;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::postgres::types::PgInterval;
|
||||
use sqlx::postgres::PgListener;
|
||||
use sqlx::{Pool, Postgres};
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::{oneshot, Notify};
|
||||
use tokio::task;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -32,6 +32,12 @@ struct JobRunner {
|
|||
notify: Notify,
|
||||
}
|
||||
|
||||
/// Job runner handle
|
||||
pub struct JobRunnerHandle {
|
||||
runner: Arc<JobRunner>,
|
||||
handle: Option<OwnedHandle>,
|
||||
}
|
||||
|
||||
/// Type used to checkpoint a running job.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct Checkpoint<'a> {
|
||||
|
@ -136,7 +142,7 @@ impl CurrentJob {
|
|||
&mut self,
|
||||
mut tx: sqlx::Transaction<'_, Postgres>,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
self.delete(&mut tx).await?;
|
||||
self.delete(&mut *tx).await?;
|
||||
tx.commit().await?;
|
||||
self.stop_keep_alive().await;
|
||||
Ok(())
|
||||
|
@ -155,7 +161,7 @@ impl CurrentJob {
|
|||
mut tx: sqlx::Transaction<'_, Postgres>,
|
||||
checkpoint: &Checkpoint<'_>,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
checkpoint.execute(self.id, &mut tx).await?;
|
||||
checkpoint.execute(self.id, &mut *tx).await?;
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -253,7 +259,7 @@ impl JobRunnerOptions {
|
|||
|
||||
/// Start the job runner in the background. The job runner will stop when the
|
||||
/// returned handle is dropped.
|
||||
pub async fn run(&self) -> Result<OwnedHandle, sqlx::Error> {
|
||||
pub async fn run(&self) -> Result<JobRunnerHandle, sqlx::Error> {
|
||||
let options = self.clone();
|
||||
let job_runner = Arc::new(JobRunner {
|
||||
options,
|
||||
|
@ -261,10 +267,86 @@ impl JobRunnerOptions {
|
|||
notify: Notify::new(),
|
||||
});
|
||||
let listener_task = start_listener(job_runner.clone()).await?;
|
||||
Ok(OwnedHandle::new(task::spawn(main_loop(
|
||||
job_runner,
|
||||
listener_task,
|
||||
))))
|
||||
let handle = OwnedHandle::new(task::spawn(main_loop(job_runner.clone(), listener_task)));
|
||||
Ok(JobRunnerHandle {
|
||||
runner: job_runner,
|
||||
handle: Some(handle),
|
||||
})
|
||||
}
|
||||
|
||||
/// Run a single job and then return. Intended for use by tests. The job should
|
||||
/// have been spawned normally and be ready to run.
|
||||
pub async fn test_one(&self) -> Result<(), sqlx::Error> {
|
||||
let options = self.clone();
|
||||
let job_runner = Arc::new(JobRunner {
|
||||
options,
|
||||
running_jobs: AtomicUsize::new(0),
|
||||
notify: Notify::new(),
|
||||
});
|
||||
|
||||
log::info!("Polling for single message");
|
||||
let mut messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, 1)")
|
||||
.bind(&self.channel_names)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
|
||||
assert_eq!(messages.len(), 1, "Expected one message to be ready");
|
||||
let msg = messages.pop().unwrap();
|
||||
|
||||
if let PolledMessage {
|
||||
id: Some(id),
|
||||
is_committed: Some(true),
|
||||
name: Some(name),
|
||||
payload_json,
|
||||
payload_bytes,
|
||||
..
|
||||
} = msg
|
||||
{
|
||||
let (tx, rx) = oneshot::channel::<()>();
|
||||
let keep_alive = Some(OwnedHandle::new(task::spawn(async move {
|
||||
let _tx = tx;
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
})));
|
||||
let current_job = CurrentJob {
|
||||
id,
|
||||
name,
|
||||
payload_json,
|
||||
payload_bytes,
|
||||
job_runner: job_runner.clone(),
|
||||
keep_alive,
|
||||
};
|
||||
job_runner.running_jobs.fetch_add(1, Ordering::SeqCst);
|
||||
(self.dispatch)(current_job);
|
||||
|
||||
// Wait for job to complete
|
||||
let _ = rx.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl JobRunnerHandle {
|
||||
/// Return the number of still running jobs
|
||||
pub fn num_running_jobs(&self) -> usize {
|
||||
self.runner.running_jobs.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Wait for the jobs to finish, but not more than `timeout`
|
||||
pub async fn wait_jobs_finish(&self, timeout: Duration) {
|
||||
let start = Instant::now();
|
||||
let step = Duration::from_millis(10);
|
||||
while self.num_running_jobs() > 0 && start.elapsed() < timeout {
|
||||
tokio::time::sleep(step).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Stop the inner task and wait for it to finish.
|
||||
pub async fn stop(&mut self) {
|
||||
if let Some(handle) = self.handle.take() {
|
||||
handle.stop().await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
34
src/spawn.rs
34
src/spawn.rs
|
@ -149,3 +149,37 @@ pub async fn commit<'b, E: sqlx::Executor<'b, Database = Postgres>>(
|
|||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clear jobs from the specified channels.
|
||||
pub async fn clear<'b, E: sqlx::Executor<'b, Database = Postgres>>(
|
||||
executor: E,
|
||||
channel_names: &[&str],
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query("SELECT mq_clear($1)")
|
||||
.bind(channel_names)
|
||||
.execute(executor)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clear jobs from all channels.
|
||||
pub async fn clear_all<'b, E: sqlx::Executor<'b, Database = Postgres>>(
|
||||
executor: E,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query("SELECT mq_clear_all()")
|
||||
.execute(executor)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if a job with that ID exists
|
||||
pub async fn exists<'b, E: sqlx::Executor<'b, Database = Postgres>>(
|
||||
executor: E,
|
||||
id: Uuid,
|
||||
) -> Result<bool, sqlx::Error> {
|
||||
let exists = sqlx::query_scalar("SELECT EXISTS(SELECT id FROM mq_msgs WHERE id = $1)")
|
||||
.bind(id)
|
||||
.fetch_one(executor)
|
||||
.await?;
|
||||
Ok(exists)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue