Add function for testing a single job.

This commit is contained in:
Diggory Blake 2022-06-30 17:21:27 +01:00
parent 069cfd75c7
commit bca15c561c
No known key found for this signature in database
GPG key ID: E6BDFA83146ABD40
3 changed files with 56 additions and 4 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "sqlxmq" name = "sqlxmq"
version = "0.4.0" version = "0.4.1"
authors = ["Diggory Blake <diggsey@googlemail.com>"] authors = ["Diggory Blake <diggsey@googlemail.com>"]
edition = "2018" edition = "2018"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
@ -23,7 +23,7 @@ uuid = { version = "1.1.2", features = ["v4"] }
log = "0.4.14" log = "0.4.14"
serde_json = "1.0.64" serde_json = "1.0.64"
serde = "1.0.124" serde = "1.0.124"
sqlxmq_macros = { version = "0.4.0", path = "sqlxmq_macros" } sqlxmq_macros = { version = "0.4.1", path = "sqlxmq_macros" }
anymap2 = "0.13.0" anymap2 = "0.13.0"
[features] [features]

View file

@ -1,6 +1,6 @@
[package] [package]
name = "sqlxmq_macros" name = "sqlxmq_macros"
version = "0.4.0" version = "0.4.1"
authors = ["Diggory Blake <diggsey@googlemail.com>"] authors = ["Diggory Blake <diggsey@googlemail.com>"]
edition = "2018" edition = "2018"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"

View file

@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
use sqlx::postgres::types::PgInterval; use sqlx::postgres::types::PgInterval;
use sqlx::postgres::PgListener; use sqlx::postgres::PgListener;
use sqlx::{Pool, Postgres}; use sqlx::{Pool, Postgres};
use tokio::sync::Notify; use tokio::sync::{oneshot, Notify};
use tokio::task; use tokio::task;
use uuid::Uuid; use uuid::Uuid;
@ -266,6 +266,58 @@ impl JobRunnerOptions {
listener_task, listener_task,
)))) ))))
} }
/// Run a single job and then return. Intended for use by tests. The job should
/// have been spawned normally and be ready to run.
pub async fn test_one(&self) -> Result<(), sqlx::Error> {
let options = self.clone();
let job_runner = Arc::new(JobRunner {
options,
running_jobs: AtomicUsize::new(0),
notify: Notify::new(),
});
log::info!("Polling for single message");
let mut messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, 1)")
.bind(&self.channel_names)
.fetch_all(&self.pool)
.await?;
assert_eq!(messages.len(), 1, "Expected one message to be ready");
let msg = messages.pop().unwrap();
if let PolledMessage {
id: Some(id),
is_committed: Some(true),
name: Some(name),
payload_json,
payload_bytes,
..
} = msg
{
let (tx, rx) = oneshot::channel::<()>();
let keep_alive = Some(OwnedHandle::new(task::spawn(async move {
let _tx = tx;
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
}
})));
let current_job = CurrentJob {
id,
name,
payload_json,
payload_bytes,
job_runner: job_runner.clone(),
keep_alive,
};
job_runner.running_jobs.fetch_add(1, Ordering::SeqCst);
(self.dispatch)(current_job);
// Wait for job to complete
let _ = rx.await;
}
Ok(())
}
} }
async fn start_listener(job_runner: Arc<JobRunner>) -> Result<OwnedHandle, sqlx::Error> { async fn start_listener(job_runner: Arc<JobRunner>) -> Result<OwnedHandle, sqlx::Error> {