From 250d2f0a33369a657563f58b5e8218338a267ec4 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Sat, 2 Jul 2022 14:03:26 -0500 Subject: [PATCH] Ensure memory storage timer works --- examples/basic-example/src/main.rs | 8 +++++++- jobs-actix/src/lib.rs | 24 +++++++++++++++++++++--- jobs-core/src/storage.rs | 2 +- jobs-sled/Cargo.toml | 1 + jobs-sled/src/lib.rs | 15 +++++++++++---- src/lib.rs | 9 ++++++++- 6 files changed, 49 insertions(+), 10 deletions(-) diff --git a/examples/basic-example/src/main.rs b/examples/basic-example/src/main.rs index 38e09a7..a3c37f8 100644 --- a/examples/basic-example/src/main.rs +++ b/examples/basic-example/src/main.rs @@ -1,6 +1,11 @@ use actix_rt::Arbiter; use anyhow::Error; -use background_jobs::{ActixJob as Job, MaxRetries, WorkerConfig}; +use background_jobs::{ + // memory_storage::{ActixTimer, Storage}, + ActixJob as Job, + MaxRetries, + WorkerConfig, +}; use background_jobs_sled_storage::Storage; use std::{ future::{ready, Ready}, @@ -33,6 +38,7 @@ async fn main() -> Result<(), Error> { // Set up our Storage let db = sled::Config::new().temporary(true).open()?; let storage = Storage::new(db)?; + // let storage = Storage::new(ActixTimer); let arbiter = Arbiter::new(); diff --git a/jobs-actix/src/lib.rs b/jobs-actix/src/lib.rs index 97d3118..b387607 100644 --- a/jobs-actix/src/lib.rs +++ b/jobs-actix/src/lib.rs @@ -14,7 +14,7 @@ //! ```rust //! use anyhow::Error; //! use background_jobs_core::{Backoff, Job, MaxRetries}; -//! use background_jobs_actix::WorkerConfig; +//! use background_jobs_actix::{ActixTimer, WorkerConfig}; //! use std::future::{ready, Ready}; //! //! const DEFAULT_QUEUE: &'static str = "default"; @@ -35,7 +35,7 @@ //! // Set up our Storage //! // For this example, we use the default in-memory storage mechanism //! use background_jobs_core::memory_storage::Storage; -//! let storage = Storage::new(); +//! let storage = Storage::new(ActixTimer); //! //! // Configure and start our workers //! let queue_handle = WorkerConfig::new(storage, move |_| MyState::new("My App")) @@ -115,7 +115,9 @@ use actix_rt::{Arbiter, ArbiterHandle}; use anyhow::Error; -use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; +use background_jobs_core::{ + memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage, +}; use std::{ collections::BTreeMap, marker::PhantomData, @@ -133,6 +135,22 @@ use self::{every::every, server::Server, worker::LocalWorkerStarter}; pub use background_jobs_core::ActixJob; +/// A timer implementation for the Memory Storage backend +#[derive(Debug, Clone)] +pub struct ActixTimer; + +#[async_trait::async_trait] +impl Timer for ActixTimer { + async fn timeout(&self, duration: Duration, future: F) -> Result + where + F: std::future::Future + Send + Sync, + { + actix_rt::time::timeout(duration, future) + .await + .map_err(|_| ()) + } +} + /// Manager for worker threads /// /// Manager attempts to restart workers as their arbiters die diff --git a/jobs-core/src/storage.rs b/jobs-core/src/storage.rs index 8f58c9d..dd6e358 100644 --- a/jobs-core/src/storage.rs +++ b/jobs-core/src/storage.rs @@ -146,7 +146,7 @@ pub mod memory_storage { /// Race a future against the clock, returning an empty tuple if the clock wins async fn timeout(&self, duration: Duration, future: F) -> Result where - F: std::future::Future; + F: std::future::Future + Send + Sync; } #[derive(Clone)] diff --git a/jobs-sled/Cargo.toml b/jobs-sled/Cargo.toml index 05215dc..156eeed 100644 --- a/jobs-sled/Cargo.toml +++ b/jobs-sled/Cargo.toml @@ -19,4 +19,5 @@ sled = "0.34" serde_cbor = "0.11" thiserror = "1.0" tokio = { version = "1", default-features = false, features = ["rt", "sync"] } +tracing = "0.1" uuid = { version = "1", features = ["v4", "serde"] } diff --git a/jobs-sled/src/lib.rs b/jobs-sled/src/lib.rs index 284f86b..7b23bc8 100644 --- a/jobs-sled/src/lib.rs +++ b/jobs-sled/src/lib.rs @@ -22,7 +22,7 @@ use sled::{Db, Tree}; use std::{ collections::HashMap, sync::{Arc, Mutex}, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use tokio::sync::Notify; use uuid::Uuid; @@ -114,6 +114,12 @@ impl background_jobs_core::Storage for Storage { async fn fetch_job_from_queue(&self, queue: &str) -> Result { loop { + let notifier = self.notifier(queue.to_owned()); + + let notified = notifier.notified(); + tokio::pin!(notified); + notified.as_mut().enable(); + let this = self.clone(); let queue2 = queue.to_owned(); @@ -194,9 +200,10 @@ impl background_jobs_core::Storage for Storage { }) .await?; - let notifier = self.notifier(queue.to_owned()); - - let _ = timeout(duration, notifier.notified()).await; + let before = Instant::now(); + tracing::debug!("Waiting for notification for at most {:?}", duration); + let _ = timeout(duration, notified).await; + tracing::debug!("Notified after {:?}", before.elapsed()); } } diff --git a/src/lib.rs b/src/lib.rs index 6eedc0c..516c0bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -161,7 +161,7 @@ //! `background-jobs-core` crate, which provides the Job trait, as well as some //! other useful types for implementing a jobs processor and job store. -pub use background_jobs_core::{memory_storage, Backoff, Job, JobStat, MaxRetries, Stats}; +pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Stats}; pub mod dev { //! Useful types and methods for developing Storage and Processor implementations. @@ -171,5 +171,12 @@ pub mod dev { }; } +pub mod memory_storage { + pub use background_jobs_core::memory_storage::{Storage, Timer}; + + #[cfg(feature = "background-jobs-actix")] + pub use background_jobs_actix::ActixTimer; +} + #[cfg(feature = "background-jobs-actix")] pub use background_jobs_actix::{ActixJob, Manager, QueueHandle, WorkerConfig};