Compare commits

...

12 commits

Author SHA1 Message Date
Diggory Blake 387b301656
Fix CI (#49) 2023-10-15 16:06:13 +01:00
Diggory Blake c1d7ad4a2f
Bump version and fix clippy warning 2023-10-11 23:37:28 +01:00
Incomplete 5e4f160d91
Upgrade sqlx to 0.7.1 (#48) 2023-10-11 19:37:25 +01:00
Imbolc 75e12bac6c
Wait for running jobs to finish (#40)
Wait for running jobs to finish
2022-07-16 22:47:57 +01:00
Imbolc 2fdabd9a98
Fixes #41 mq_clear_ must keep the nil message (#43) 2022-07-13 15:11:37 +01:00
Imbolc b332b6d826
Ci tests (#42)
* Increase pause during test runs in CI
2022-07-13 14:59:27 +01:00
Diggory Blake 043774fba1
Merge pull request #35 from imbolc/master
Sync readme
2022-07-06 18:04:21 +01:00
imbolc 1aff20c3e5 Sync readme 2022-07-06 19:16:38 +03:00
Diggory Blake e1cbd9f551
Merge pull request #33 from imbolc/patch-3
Fix a few docstrings
2022-07-05 16:10:10 +01:00
Diggory Blake 46c03f8e99
Merge pull request #32 from imbolc/patch-2
Readme example forgotten unwrap
2022-07-05 16:09:41 +01:00
Imbolc 9d07f31663
Fix a few docstrings 2022-07-05 18:04:42 +03:00
Imbolc c121e6c997
Readme example forgotten unwrap 2022-07-05 12:46:35 +03:00
13 changed files with 210 additions and 105 deletions

View file

@ -9,23 +9,10 @@ jobs:
name: Release
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/cargo@v1
with:
command: login
args: -- ${{secrets.CARGO_TOKEN}}
- uses: actions-rs/cargo@v1
with:
command: publish
args: --manifest-path sqlxmq_macros/Cargo.toml
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- run: cargo login ${{secrets.CARGO_TOKEN}}
- run: cargo publish --manifest-path sqlxmq_macros/Cargo.toml
- name: Wait for crates.io to update
run: sleep 30
- uses: actions-rs/cargo@v1
with:
command: publish
args: --manifest-path Cargo.toml
- run: cargo publish --manifest-path Cargo.toml

View file

@ -7,47 +7,31 @@ jobs:
name: Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/cargo@v1
with:
command: check
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- run: cargo check
fmt:
name: Rustfmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
profile: minimal
toolchain: stable
override: true
- run: rustup component add rustfmt
- uses: actions-rs/cargo@v1
with:
command: fmt
args: -- --check
components: rustfmt
- run: cargo fmt -- --check
clippy:
name: Clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
with:
profile: minimal
toolchain: stable
override: true
- run: rustup component add clippy
- uses: actions-rs/cargo@v1
with:
command: clippy
args: --all-targets -- -D warnings
components: clippy
- uses: Swatinem/rust-cache@v2
- run: cargo clippy --all-targets -- -D warnings
test:
name: Test
@ -55,21 +39,12 @@ jobs:
env:
RUST_BACKTRACE: "1"
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- uses: actions-rs/install@v0.1
with:
crate: sqlx-cli
use-tool-cache: true
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- run: cargo install sqlx-cli --locked
- uses: ./.github/actions/postgres
- uses: actions-rs/cargo@v1
with:
command: test
args: -- --nocapture
- run: cargo test -- --nocapture
test_nightly:
name: Test (Nightly)
@ -77,18 +52,20 @@ jobs:
env:
RUST_BACKTRACE: "1"
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly
override: true
- uses: actions-rs/install@v0.1
with:
crate: sqlx-cli
use-tool-cache: true
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@nightly
- uses: Swatinem/rust-cache@v2
- run: cargo install sqlx-cli --locked
- uses: ./.github/actions/postgres
- uses: actions-rs/cargo@v1
with:
command: test
args: -- --nocapture
- run: cargo test -- --nocapture
readme:
name: Readme
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- run: cargo install cargo-sync-readme --locked
- name: Sync readme
run: cargo sync-readme -c

View file

@ -1,4 +1,7 @@
{
"rust-analyzer.checkOnSave.allFeatures": false,
"rust-analyzer.cargo.allFeatures": false
"rust-analyzer.cargo.allFeatures": false,
"rust-analyzer.cargo.features": [
"runtime-tokio-native-tls"
]
}

View file

@ -1,6 +1,6 @@
[package]
name = "sqlxmq"
version = "0.4.1"
version = "0.5.0"
authors = ["Diggory Blake <diggsey@googlemail.com>"]
edition = "2018"
license = "MIT OR Apache-2.0"
@ -15,7 +15,7 @@ members = ["sqlxmq_macros", "sqlxmq_stress"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
sqlx = { version = "0.6.0", features = ["postgres", "chrono", "uuid"] }
sqlx = { version = "0.7.1", features = ["postgres", "chrono", "uuid"] }
tokio = { version = "1.8.3", features = ["full"] }
dotenv = "0.15.0"
chrono = "0.4.19"
@ -23,7 +23,7 @@ uuid = { version = "1.1.2", features = ["v4"] }
log = "0.4.14"
serde_json = "1.0.64"
serde = "1.0.124"
sqlxmq_macros = { version = "0.4.1", path = "sqlxmq_macros" }
sqlxmq_macros = { version = "0.5.0", path = "sqlxmq_macros" }
anymap2 = "0.13.0"
[features]
@ -35,3 +35,4 @@ runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls"]
dotenv = "0.15.0"
pretty_env_logger = "0.4.0"
futures = "0.3.13"
tokio = { version = "1", features = ["full"] }

View file

@ -1,9 +1,11 @@
# sqlxmq
[![CI Status](https://github.com/Diggsey/sqlxmq/workflows/CI/badge.svg)](https://github.com/Diggsey/sqlxmq/actions?query=workflow%3ACI)
[![Documentation](https://docs.rs/sqlxmq/badge.svg)](https://docs.rs/sqlxmq)
[![crates.io](https://img.shields.io/crates/v/sqlxmq.svg)](https://crates.io/crates/sqlxmq)
<!-- cargo-sync-readme start -->
# sqlxmq
A job queue built on `sqlx` and `PostgreSQL`.
This library allows a CRUD application to run background jobs without complicating its
@ -122,6 +124,8 @@ to conflict with your own schema.
The first step is to define a function to be run on the job queue.
```rust
use std::error::Error;
use sqlxmq::{job, CurrentJob};
// Arguments to the `#[job]` attribute allow setting default job options.
@ -130,9 +134,9 @@ async fn example_job(
// The first argument should always be the current job.
mut current_job: CurrentJob,
// Additional arguments are optional, but can be used to access context
// provided via `JobRegistry::set_context`.
// provided via [`JobRegistry::set_context`].
message: &'static str,
) -> sqlx::Result<()> {
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
// Decode a JSON payload
let who: Option<String> = current_job.json()?;
@ -151,9 +155,12 @@ async fn example_job(
Next we need to create a job runner: this is what listens for new jobs
and executes them.
```rust
```rust,no_run
use std::error::Error;
use sqlxmq::JobRegistry;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// You'll need to provide a Postgres connection pool.
@ -179,6 +186,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// The job runner will continue listening and running
// jobs until `runner` is dropped.
Ok(())
}
```
@ -190,7 +198,25 @@ The final step is to actually run a job.
example_job.builder()
// This is where we can override job configuration
.set_channel_name("bar")
.set_json("John")
.set_json("John")?
.spawn(&pool)
.await?;
```
<!-- cargo-sync-readme end -->
## Note on README
Most of the readme is automatically copied from the crate documentation by [cargo-readme-sync][].
This way the readme is always in sync with the docs and examples are tested.
So if you find a part of the readme you'd like to change between `<!-- cargo-sync-readme start -->`
and `<!-- cargo-sync-readme end -->` markers, don't edit `README.md` directly, but rather change
the documentation on top of `src/lib.rs` and then synchronize the readme with:
```bash
cargo sync-readme
```
(make sure the cargo command is installed):
```bash
cargo install cargo-sync-readme

View file

@ -0,0 +1,37 @@
use sqlxmq::{job, CurrentJob, JobRegistry};
use std::time::Duration;
use tokio::time;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv::dotenv().ok();
let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL").unwrap()).await?;
sleep.builder().set_json(&5u64)?.spawn(&db).await?;
let mut handle = JobRegistry::new(&[sleep]).runner(&db).run().await?;
// Let's emulate a stop signal in a couple of seconts after running the job
time::sleep(Duration::from_secs(2)).await;
println!("A stop signal received");
// Stop listening for new jobs
handle.stop().await;
// Wait for the running jobs to stop for maximum 10 seconds
handle.wait_jobs_finish(Duration::from_secs(10)).await;
Ok(())
}
#[job]
pub async fn sleep(mut job: CurrentJob) -> sqlx::Result<()> {
let second = Duration::from_secs(1);
let mut to_sleep: u64 = job.json().unwrap().unwrap();
while to_sleep > 0 {
println!("job#{} {to_sleep} more seconds to sleep ...", job.id());
time::sleep(second).await;
to_sleep -= 1;
}
job.complete().await
}

View file

@ -0,0 +1 @@
-- Add down migration script here

View file

@ -0,0 +1,29 @@
CREATE OR REPLACE FUNCTION mq_clear(channel_names TEXT[])
RETURNS VOID AS $$
BEGIN
WITH deleted_ids AS (
DELETE FROM mq_msgs
WHERE channel_name = ANY(channel_names)
AND id != uuid_nil()
RETURNING id
)
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION mq_clear IS
'Deletes all messages with corresponding payloads from a list of channel names';
CREATE OR REPLACE FUNCTION mq_clear_all()
RETURNS VOID AS $$
BEGIN
WITH deleted_ids AS (
DELETE FROM mq_msgs
WHERE id != uuid_nil()
RETURNING id
)
DELETE FROM mq_payloads WHERE id IN (SELECT id FROM deleted_ids);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION mq_clear_all IS
'Deletes all messages with corresponding payloads';

View file

@ -1,6 +1,6 @@
[package]
name = "sqlxmq_macros"
version = "0.4.1"
version = "0.5.0"
authors = ["Diggory Blake <diggsey@googlemail.com>"]
edition = "2018"
license = "MIT OR Apache-2.0"

View file

@ -317,7 +317,7 @@ mod tests {
async fn test_job_runner<F: Future + Send + 'static>(
pool: &Pool<Postgres>,
f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static,
) -> (OwnedHandle, Arc<AtomicUsize>)
) -> (JobRunnerHandle, Arc<AtomicUsize>)
where
F::Output: Send + 'static,
{
@ -365,14 +365,26 @@ mod tests {
Ok(())
}
async fn named_job_runner(pool: &Pool<Postgres>) -> OwnedHandle {
async fn named_job_runner(pool: &Pool<Postgres>) -> JobRunnerHandle {
let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]);
registry.set_context(42).set_context("Hello, world!");
registry.runner(pool).run().await.unwrap()
}
fn is_ci() -> bool {
std::env::var("CI").ok().is_some()
}
fn default_pause() -> u64 {
if is_ci() {
1000
} else {
200
}
}
async fn pause() {
pause_ms(200).await;
pause_ms(default_pause()).await;
}
async fn pause_ms(ms: u64) {
@ -513,7 +525,7 @@ mod tests {
let pool = &*test_pool().await;
let (_runner, counter) = test_job_runner(pool, move |_| async {}).await;
let backoff = 500;
let backoff = default_pause() + 300;
assert_eq!(counter.load(Ordering::SeqCst), 0);
JobBuilder::new("foo")
@ -561,7 +573,7 @@ mod tests {
})
.await;
let backoff = 200;
let backoff = default_pause();
assert_eq!(counter.load(Ordering::SeqCst), 0);
JobBuilder::new("foo")
@ -579,7 +591,6 @@ mod tests {
// Second attempt
pause_ms(backoff).await;
pause().await;
assert_eq!(counter.load(Ordering::SeqCst), 2);
// No more attempts

View file

@ -15,10 +15,13 @@ use crate::hidden::{BuildFn, RunFn};
use crate::utils::Opaque;
use crate::{JobBuilder, JobRunnerOptions};
type BoxedError = Box<dyn Error + Send + 'static>;
/// Stores a mapping from job name to job. Can be used to construct
/// a job runner.
pub struct JobRegistry {
error_handler: Arc<dyn Fn(&str, Box<dyn Error + Send + 'static>) + Send + Sync>,
#[allow(clippy::type_complexity)]
error_handler: Arc<dyn Fn(&str, BoxedError) + Send + Sync>,
job_map: HashMap<&'static str, &'static NamedJob>,
context: Map<dyn CloneAnySendSync + Send + Sync>,
}
@ -53,7 +56,7 @@ impl JobRegistry {
/// Set a function to be called whenever a job returns an error.
pub fn set_error_handler(
&mut self,
error_handler: impl Fn(&str, Box<dyn Error + Send + 'static>) + Send + Sync + 'static,
error_handler: impl Fn(&str, BoxedError) + Send + Sync + 'static,
) -> &mut Self {
self.error_handler = Arc::new(error_handler);
self
@ -83,7 +86,7 @@ impl JobRegistry {
}
/// The default error handler implementation, which simply logs the error.
pub fn default_error_handler(name: &str, error: Box<dyn Error + Send + 'static>) {
pub fn default_error_handler(name: &str, error: BoxedError) {
log::error!("Job `{}` failed: {}", name, error);
}

View file

@ -2,7 +2,7 @@ use std::borrow::Cow;
use std::fmt::Debug;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use sqlx::postgres::types::PgInterval;
@ -32,6 +32,12 @@ struct JobRunner {
notify: Notify,
}
/// Job runner handle
pub struct JobRunnerHandle {
runner: Arc<JobRunner>,
handle: Option<OwnedHandle>,
}
/// Type used to checkpoint a running job.
#[derive(Debug, Clone, Default)]
pub struct Checkpoint<'a> {
@ -136,7 +142,7 @@ impl CurrentJob {
&mut self,
mut tx: sqlx::Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
self.delete(&mut tx).await?;
self.delete(&mut *tx).await?;
tx.commit().await?;
self.stop_keep_alive().await;
Ok(())
@ -155,7 +161,7 @@ impl CurrentJob {
mut tx: sqlx::Transaction<'_, Postgres>,
checkpoint: &Checkpoint<'_>,
) -> Result<(), sqlx::Error> {
checkpoint.execute(self.id, &mut tx).await?;
checkpoint.execute(self.id, &mut *tx).await?;
tx.commit().await?;
Ok(())
}
@ -253,7 +259,7 @@ impl JobRunnerOptions {
/// Start the job runner in the background. The job runner will stop when the
/// returned handle is dropped.
pub async fn run(&self) -> Result<OwnedHandle, sqlx::Error> {
pub async fn run(&self) -> Result<JobRunnerHandle, sqlx::Error> {
let options = self.clone();
let job_runner = Arc::new(JobRunner {
options,
@ -261,10 +267,11 @@ impl JobRunnerOptions {
notify: Notify::new(),
});
let listener_task = start_listener(job_runner.clone()).await?;
Ok(OwnedHandle::new(task::spawn(main_loop(
job_runner,
listener_task,
))))
let handle = OwnedHandle::new(task::spawn(main_loop(job_runner.clone(), listener_task)));
Ok(JobRunnerHandle {
runner: job_runner,
handle: Some(handle),
})
}
/// Run a single job and then return. Intended for use by tests. The job should
@ -320,6 +327,29 @@ impl JobRunnerOptions {
}
}
impl JobRunnerHandle {
/// Return the number of still running jobs
pub fn num_running_jobs(&self) -> usize {
self.runner.running_jobs.load(Ordering::Relaxed)
}
/// Wait for the jobs to finish, but not more than `timeout`
pub async fn wait_jobs_finish(&self, timeout: Duration) {
let start = Instant::now();
let step = Duration::from_millis(10);
while self.num_running_jobs() > 0 && start.elapsed() < timeout {
tokio::time::sleep(step).await;
}
}
/// Stop the inner task and wait for it to finish.
pub async fn stop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.stop().await
}
}
}
async fn start_listener(job_runner: Arc<JobRunner>) -> Result<OwnedHandle, sqlx::Error> {
let mut listener = PgListener::connect_with(&job_runner.options.pool).await?;
if let Some(channels) = &job_runner.options.channel_names {

View file

@ -150,7 +150,7 @@ pub async fn commit<'b, E: sqlx::Executor<'b, Database = Postgres>>(
Ok(())
}
/// Clear jobs from the specified queues.
/// Clear jobs from the specified channels.
pub async fn clear<'b, E: sqlx::Executor<'b, Database = Postgres>>(
executor: E,
channel_names: &[&str],
@ -162,7 +162,7 @@ pub async fn clear<'b, E: sqlx::Executor<'b, Database = Postgres>>(
Ok(())
}
/// Clear jobs from the specified queues.
/// Clear jobs from all channels.
pub async fn clear_all<'b, E: sqlx::Executor<'b, Database = Postgres>>(
executor: E,
) -> Result<(), sqlx::Error> {