From bca15c561cd70af9187f36df075d34234fc5024e Mon Sep 17 00:00:00 2001 From: Diggory Blake Date: Thu, 30 Jun 2022 17:21:27 +0100 Subject: [PATCH] Add function for testing a single job. --- Cargo.toml | 4 +-- sqlxmq_macros/Cargo.toml | 2 +- src/runner.rs | 54 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 17917f5..d9d40a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sqlxmq" -version = "0.4.0" +version = "0.4.1" authors = ["Diggory Blake "] edition = "2018" license = "MIT OR Apache-2.0" @@ -23,7 +23,7 @@ uuid = { version = "1.1.2", features = ["v4"] } log = "0.4.14" serde_json = "1.0.64" serde = "1.0.124" -sqlxmq_macros = { version = "0.4.0", path = "sqlxmq_macros" } +sqlxmq_macros = { version = "0.4.1", path = "sqlxmq_macros" } anymap2 = "0.13.0" [features] diff --git a/sqlxmq_macros/Cargo.toml b/sqlxmq_macros/Cargo.toml index 28e5be3..92059fa 100644 --- a/sqlxmq_macros/Cargo.toml +++ b/sqlxmq_macros/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sqlxmq_macros" -version = "0.4.0" +version = "0.4.1" authors = ["Diggory Blake "] edition = "2018" license = "MIT OR Apache-2.0" diff --git a/src/runner.rs b/src/runner.rs index 1442b84..a7cc953 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use sqlx::postgres::types::PgInterval; use sqlx::postgres::PgListener; use sqlx::{Pool, Postgres}; -use tokio::sync::Notify; +use tokio::sync::{oneshot, Notify}; use tokio::task; use uuid::Uuid; @@ -266,6 +266,58 @@ impl JobRunnerOptions { listener_task, )))) } + + /// Run a single job and then return. Intended for use by tests. The job should + /// have been spawned normally and be ready to run. + pub async fn test_one(&self) -> Result<(), sqlx::Error> { + let options = self.clone(); + let job_runner = Arc::new(JobRunner { + options, + running_jobs: AtomicUsize::new(0), + notify: Notify::new(), + }); + + log::info!("Polling for single message"); + let mut messages = sqlx::query_as::<_, PolledMessage>("SELECT * FROM mq_poll($1, 1)") + .bind(&self.channel_names) + .fetch_all(&self.pool) + .await?; + + assert_eq!(messages.len(), 1, "Expected one message to be ready"); + let msg = messages.pop().unwrap(); + + if let PolledMessage { + id: Some(id), + is_committed: Some(true), + name: Some(name), + payload_json, + payload_bytes, + .. + } = msg + { + let (tx, rx) = oneshot::channel::<()>(); + let keep_alive = Some(OwnedHandle::new(task::spawn(async move { + let _tx = tx; + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + } + }))); + let current_job = CurrentJob { + id, + name, + payload_json, + payload_bytes, + job_runner: job_runner.clone(), + keep_alive, + }; + job_runner.running_jobs.fetch_add(1, Ordering::SeqCst); + (self.dispatch)(current_job); + + // Wait for job to complete + let _ = rx.await; + } + Ok(()) + } } async fn start_listener(job_runner: Arc) -> Result {