From 75e12bac6cab666b754b32cada95d766df5b53f7 Mon Sep 17 00:00:00 2001 From: Imbolc Date: Sun, 17 Jul 2022 00:47:57 +0300 Subject: [PATCH] Wait for running jobs to finish (#40) Wait for running jobs to finish --- Cargo.toml | 1 + examples/graceful-shutdown.rs | 37 ++++++++++++++++++++++++++++++ src/lib.rs | 4 ++-- src/runner.rs | 42 ++++++++++++++++++++++++++++++----- 4 files changed, 76 insertions(+), 8 deletions(-) create mode 100644 examples/graceful-shutdown.rs diff --git a/Cargo.toml b/Cargo.toml index d9d40a7..855f732 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,4 @@ runtime-tokio-rustls = ["sqlx/runtime-tokio-rustls"] dotenv = "0.15.0" pretty_env_logger = "0.4.0" futures = "0.3.13" +tokio = { version = "1", features = ["full"] } diff --git a/examples/graceful-shutdown.rs b/examples/graceful-shutdown.rs new file mode 100644 index 0000000..1af237e --- /dev/null +++ b/examples/graceful-shutdown.rs @@ -0,0 +1,37 @@ +use sqlxmq::{job, CurrentJob, JobRegistry}; +use std::time::Duration; +use tokio::time; + +#[tokio::main] +async fn main() -> Result<(), Box> { + dotenv::dotenv().ok(); + let db = sqlx::PgPool::connect(&std::env::var("DATABASE_URL").unwrap()).await?; + + sleep.builder().set_json(&5u64)?.spawn(&db).await?; + + let mut handle = JobRegistry::new(&[sleep]).runner(&db).run().await?; + + // Let's emulate a stop signal in a couple of seconts after running the job + time::sleep(Duration::from_secs(2)).await; + println!("A stop signal received"); + + // Stop listening for new jobs + handle.stop().await; + + // Wait for the running jobs to stop for maximum 10 seconds + handle.wait_jobs_finish(Duration::from_secs(10)).await; + + Ok(()) +} + +#[job] +pub async fn sleep(mut job: CurrentJob) -> sqlx::Result<()> { + let second = Duration::from_secs(1); + let mut to_sleep: u64 = job.json().unwrap().unwrap(); + while to_sleep > 0 { + println!("job#{} {to_sleep} more seconds to sleep ...", job.id()); + time::sleep(second).await; + to_sleep -= 1; + } + job.complete().await +} diff --git a/src/lib.rs b/src/lib.rs index 1e79370..3423821 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -317,7 +317,7 @@ mod tests { async fn test_job_runner( pool: &Pool, f: impl (Fn(CurrentJob) -> F) + Send + Sync + 'static, - ) -> (OwnedHandle, Arc) + ) -> (JobRunnerHandle, Arc) where F::Output: Send + 'static, { @@ -365,7 +365,7 @@ mod tests { Ok(()) } - async fn named_job_runner(pool: &Pool) -> OwnedHandle { + async fn named_job_runner(pool: &Pool) -> JobRunnerHandle { let mut registry = JobRegistry::new(&[example_job1, example_job2, example_job_with_ctx]); registry.set_context(42).set_context("Hello, world!"); registry.runner(pool).run().await.unwrap() diff --git a/src/runner.rs b/src/runner.rs index a7cc953..fcd27ab 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -2,7 +2,7 @@ use std::borrow::Cow; use std::fmt::Debug; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use serde::{Deserialize, Serialize}; use sqlx::postgres::types::PgInterval; @@ -32,6 +32,12 @@ struct JobRunner { notify: Notify, } +/// Job runner handle +pub struct JobRunnerHandle { + runner: Arc, + handle: Option, +} + /// Type used to checkpoint a running job. #[derive(Debug, Clone, Default)] pub struct Checkpoint<'a> { @@ -253,7 +259,7 @@ impl JobRunnerOptions { /// Start the job runner in the background. The job runner will stop when the /// returned handle is dropped. - pub async fn run(&self) -> Result { + pub async fn run(&self) -> Result { let options = self.clone(); let job_runner = Arc::new(JobRunner { options, @@ -261,10 +267,11 @@ impl JobRunnerOptions { notify: Notify::new(), }); let listener_task = start_listener(job_runner.clone()).await?; - Ok(OwnedHandle::new(task::spawn(main_loop( - job_runner, - listener_task, - )))) + let handle = OwnedHandle::new(task::spawn(main_loop(job_runner.clone(), listener_task))); + Ok(JobRunnerHandle { + runner: job_runner, + handle: Some(handle), + }) } /// Run a single job and then return. Intended for use by tests. The job should @@ -320,6 +327,29 @@ impl JobRunnerOptions { } } +impl JobRunnerHandle { + /// Return the number of still running jobs + pub fn num_running_jobs(&self) -> usize { + self.runner.running_jobs.load(Ordering::Relaxed) + } + + /// Wait for the jobs to finish, but not more than `timeout` + pub async fn wait_jobs_finish(&self, timeout: Duration) { + let start = Instant::now(); + let step = Duration::from_millis(10); + while self.num_running_jobs() > 0 && start.elapsed() < timeout { + tokio::time::sleep(step).await; + } + } + + /// Stop the inner task and wait for it to finish. + pub async fn stop(&mut self) { + if let Some(handle) = self.handle.take() { + handle.stop().await + } + } +} + async fn start_listener(job_runner: Arc) -> Result { let mut listener = PgListener::connect_with(&job_runner.options.pool).await?; if let Some(channels) = &job_runner.options.channel_names {