From c0ce6f303e0f5798c61f16e960a0babe1df3c416 Mon Sep 17 00:00:00 2001 From: "Aode (Lion)" Date: Wed, 6 Oct 2021 20:31:27 -0500 Subject: [PATCH] Move more out of spawned tasks --- Cargo.toml | 4 +- examples/actix-example/Cargo.toml | 2 +- examples/actix-example/src/main.rs | 12 +++-- jobs-actix/Cargo.toml | 4 +- jobs-actix/src/every.rs | 7 +-- jobs-actix/src/lib.rs | 79 ++++++++++++++++++------------ jobs-actix/src/server.rs | 2 +- jobs-actix/src/worker.rs | 72 +++++++++++++-------------- 8 files changed, 101 insertions(+), 81 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c36c3e8..24e2144 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs" description = "Background Jobs implemented with actix and futures" -version = "0.10.0" +version = "0.11.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -27,6 +27,6 @@ version = "0.10.0" path = "jobs-core" [dependencies.background-jobs-actix] -version = "0.10.0" +version = "0.11.0" path = "jobs-actix" optional = true diff --git a/examples/actix-example/Cargo.toml b/examples/actix-example/Cargo.toml index 51b7573..fe4fecc 100644 --- a/examples/actix-example/Cargo.toml +++ b/examples/actix-example/Cargo.toml @@ -10,7 +10,7 @@ edition = "2018" actix-rt = "2.0.0" anyhow = "1.0" async-trait = "0.1.24" -background-jobs = { version = "0.10.0", path = "../..", features = ["error-logging"] } +background-jobs = { version = "0.11.0", path = "../..", features = ["error-logging"] } background-jobs-sled-storage = { version = "0.10.0", path = "../../jobs-sled" } chrono = "0.4" tracing = "0.1" diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index adb8d7d..c51c875 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -46,14 +46,16 @@ async fn main() -> Result<(), Error> { // Queue some panicking job for _ in 0..32 { - queue_handle.queue(PanickingJob)?; + queue_handle.queue(PanickingJob).await?; } // Queue our jobs - queue_handle.queue(MyJob::new(1, 2))?; - queue_handle.queue(MyJob::new(3, 4))?; - queue_handle.queue(MyJob::new(5, 6))?; - queue_handle.schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2))?; + queue_handle.queue(MyJob::new(1, 2)).await?; + queue_handle.queue(MyJob::new(3, 4)).await?; + queue_handle.queue(MyJob::new(5, 6)).await?; + queue_handle + .schedule(MyJob::new(7, 8), Utc::now() + Duration::seconds(2)) + .await?; // Block on Actix actix_rt::signal::ctrl_c().await?; diff --git a/jobs-actix/Cargo.toml b/jobs-actix/Cargo.toml index c791536..692673b 100644 --- a/jobs-actix/Cargo.toml +++ b/jobs-actix/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "background-jobs-actix" description = "in-process jobs processor based on Actix" -version = "0.10.1" +version = "0.11.0" license = "AGPL-3.0" authors = ["asonix "] repository = "https://git.asonix.dog/Aardwolf/background-jobs" @@ -22,5 +22,5 @@ num_cpus = "1.10.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -tokio = { version = "1", default-features = false, features = ["sync"] } +tokio = { version = "1", default-features = false, features = ["rt", "sync"] } uuid = { version ="0.8.1", features = ["v4", "serde"] } diff --git a/jobs-actix/src/every.rs b/jobs-actix/src/every.rs index 4aae3a7..efa117e 100644 --- a/jobs-actix/src/every.rs +++ b/jobs-actix/src/every.rs @@ -14,14 +14,15 @@ where J: Job + Clone + Send, { let spawner_clone = spawner.clone(); - spawner.arbiter.spawn(async move { + spawner.tokio_rt.spawn(async move { let mut interval = interval_at(Instant::now(), duration); loop { interval.tick().await; - if spawner_clone.queue(job.clone()).is_err() { - error!("Failed to queue job"); + let job = job.clone(); + if spawner_clone.queue::(job).await.is_err() { + error!("Failed to queue job: {}", J::NAME); } } }); diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 3788705..13421b0 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -121,7 +121,6 @@ use anyhow::Error; use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; use chrono::{DateTime, Utc}; use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use tracing::error; mod every; mod server; @@ -143,10 +142,25 @@ pub fn create_server(storage: S) -> QueueHandle where S: Storage + Sync + 'static, { - let arbiter = Arbiter::current(); + create_server_in_arbiter(storage, &Arbiter::current()) +} + +/// Create a new Server +/// +/// In previous versions of this library, the server itself was run on it's own dedicated threads +/// and guarded access to jobs via messages. Since we now have futures-aware synchronization +/// primitives, the Server has become an object that gets shared between client threads. +/// +/// This method will panic if not called from an actix runtime +pub fn create_server_in_arbiter(storage: S, arbiter: &ArbiterHandle) -> QueueHandle +where + S: Storage + Sync + 'static, +{ + let tokio_rt = tokio::runtime::Handle::current(); + QueueHandle { inner: Server::new(&arbiter, storage), - arbiter, + tokio_rt, } } @@ -209,19 +223,11 @@ where /// /// This method will panic if not called from an actix runtime pub fn start(self, queue_handle: QueueHandle) { - for (key, count) in self.queues.into_iter() { - for _ in 0..count { - local_worker( - key.clone(), - self.processors.cached(), - queue_handle.inner.clone(), - ); - } - } + self.start_in_arbiter(&Arbiter::current(), queue_handle) } /// Start the workers in the provided arbiter - pub fn start_in_arbiter(self, arbiter: &Arbiter, queue_handle: QueueHandle) { + pub fn start_in_arbiter(self, arbiter: &ArbiterHandle, queue_handle: QueueHandle) { for (key, count) in self.queues.into_iter() { for _ in 0..count { let key = key.clone(); @@ -229,7 +235,7 @@ where let server = queue_handle.inner.clone(); arbiter.spawn_fn(move || { - local_worker(key, processors.cached(), server); + actix_rt::spawn(local_worker(key, processors.cached(), server)); }); } } @@ -243,7 +249,7 @@ where #[derive(Clone)] pub struct QueueHandle { inner: Server, - arbiter: ArbiterHandle, + tokio_rt: tokio::runtime::Handle, } impl QueueHandle { @@ -251,17 +257,37 @@ impl QueueHandle { /// /// This job will be sent to the server for storage, and will execute whenever a worker for the /// job's queue is free to do so. - pub fn queue(&self, job: J) -> Result<(), Error> + pub async fn queue(&self, job: J) -> Result<(), Error> where J: Job, { let job = new_job(job)?; + self.inner.new_job(job).await?; + Ok(()) + } + + /// Queues a job for execution + /// + /// This job will be sent to the server for storage, and will execute whenever a worker for the + /// job's queue is free to do so. + pub async fn blocking_queue(&self, job: J) -> Result<(), Error> + where + J: Job, + { + self.tokio_rt.block_on(self.queue(job)) + } + + /// Schedule a job for execution later + /// + /// This job will be sent to the server for storage, and will execute after the specified time + /// and when a worker for the job's queue is free to do so. + pub async fn schedule(&self, job: J, after: DateTime) -> Result<(), Error> + where + J: Job, + { + let job = new_scheduled_job(job, after)?; let server = self.inner.clone(); - self.arbiter.spawn(async move { - if let Err(e) = server.new_job(job).await { - error!("Error creating job, {}", e); - } - }); + server.new_job(job).await?; Ok(()) } @@ -269,18 +295,11 @@ impl QueueHandle { /// /// This job will be sent to the server for storage, and will execute after the specified time /// and when a worker for the job's queue is free to do so. - pub fn schedule(&self, job: J, after: DateTime) -> Result<(), Error> + pub fn blocking_schedule(&self, job: J, after: DateTime) -> Result<(), Error> where J: Job, { - let job = new_scheduled_job(job, after)?; - let server = self.inner.clone(); - self.arbiter.spawn(async move { - if let Err(e) = server.new_job(job).await { - error!("Error creating job, {}", e); - } - }); - Ok(()) + self.tokio_rt.block_on(self.schedule(job, after)) } /// Queues a job for recurring execution diff --git a/jobs-actix/src/server.rs b/jobs-actix/src/server.rs index 229a8f0..fe9c687 100644 --- a/jobs-actix/src/server.rs +++ b/jobs-actix/src/server.rs @@ -109,7 +109,7 @@ impl Server { trace!("Trying to find job for worker {}", worker.id()); if let Ok(Some(job)) = self.storage.request_job(&queue, worker.id()).await { if let Err(job) = worker.process(job).await { - error!("Worker has hung up"); + error!("Worker {} has hung up", worker.id()); self.storage.return_job(job.unexecuted()).await? } } else { diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index dfb0c2c..4567f77 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -1,5 +1,4 @@ use crate::Server; -use actix_rt::spawn; use background_jobs_core::{CachedProcessorMap, JobInfo}; use tokio::sync::mpsc::{channel, Sender}; use tracing::{error, info, warn, Span}; @@ -57,55 +56,54 @@ impl Worker for LocalWorkerHandle { } } -pub(crate) fn local_worker( +pub(crate) async fn local_worker( queue: String, processors: CachedProcessorMap, server: Server, ) where State: Clone + 'static, { - spawn(async move { - let id = Uuid::new_v4(); - let (tx, mut rx) = channel(16); + let id = Uuid::new_v4(); + let (tx, mut rx) = channel(16); - let handle = LocalWorkerHandle { tx, id, queue }; + let handle = LocalWorkerHandle { tx, id, queue }; - loop { - let span = handle.span("request"); - if let Err(e) = server - .request_job(Box::new(handle.clone())) - .instrument(span.clone()) - .await - { + loop { + let span = handle.span("request"); + if let Err(e) = server + .request_job(Box::new(handle.clone())) + .instrument(span.clone()) + .await + { + let display = format!("{}", e); + let debug = format!("{:?}", e); + span.record("exception.message", &tracing::field::display(&display)); + span.record("exception.details", &tracing::field::display(&debug)); + span.in_scope(|| error!("Failed to notify server of ready worker, {}", e)); + break; + } + drop(span); + + if let Some(job) = rx.recv().await { + let return_job = processors + .process(job) + .instrument(handle.span("process")) + .await; + + let span = handle.span("return"); + if let Err(e) = server.return_job(return_job).instrument(span.clone()).await { let display = format!("{}", e); let debug = format!("{:?}", e); span.record("exception.message", &tracing::field::display(&display)); span.record("exception.details", &tracing::field::display(&debug)); - span.in_scope(|| error!("Failed to notify server of ready worker, {}", e)); - break; - } - drop(span); - - if let Some(job) = rx.recv().await { - let return_job = processors - .process(job) - .instrument(handle.span("process")) - .await; - - let span = handle.span("return"); - if let Err(e) = server.return_job(return_job).instrument(span.clone()).await { - let display = format!("{}", e); - let debug = format!("{:?}", e); - span.record("exception.message", &tracing::field::display(&display)); - span.record("exception.details", &tracing::field::display(&debug)); - span.in_scope(|| warn!("Failed to return completed job, {}", e)); - } - - continue; + span.in_scope(|| warn!("Failed to return completed job, {}", e)); } - break; + continue; } - handle.span("closing").in_scope(|| info!("Worker closing")); - }); + + break; + } + + handle.span("closing").in_scope(|| info!("Worker closing")); }