Ensure memory storage timer works

This commit is contained in:
Aode (lion) 2022-07-02 14:03:26 -05:00
parent 1ac3c0bc86
commit 250d2f0a33
6 changed files with 49 additions and 10 deletions

View file

@ -1,6 +1,11 @@
use actix_rt::Arbiter; use actix_rt::Arbiter;
use anyhow::Error; use anyhow::Error;
use background_jobs::{ActixJob as Job, MaxRetries, WorkerConfig}; use background_jobs::{
// memory_storage::{ActixTimer, Storage},
ActixJob as Job,
MaxRetries,
WorkerConfig,
};
use background_jobs_sled_storage::Storage; use background_jobs_sled_storage::Storage;
use std::{ use std::{
future::{ready, Ready}, future::{ready, Ready},
@ -33,6 +38,7 @@ async fn main() -> Result<(), Error> {
// Set up our Storage // Set up our Storage
let db = sled::Config::new().temporary(true).open()?; let db = sled::Config::new().temporary(true).open()?;
let storage = Storage::new(db)?; let storage = Storage::new(db)?;
// let storage = Storage::new(ActixTimer);
let arbiter = Arbiter::new(); let arbiter = Arbiter::new();

View file

@ -14,7 +14,7 @@
//! ```rust //! ```rust
//! use anyhow::Error; //! use anyhow::Error;
//! use background_jobs_core::{Backoff, Job, MaxRetries}; //! use background_jobs_core::{Backoff, Job, MaxRetries};
//! use background_jobs_actix::WorkerConfig; //! use background_jobs_actix::{ActixTimer, WorkerConfig};
//! use std::future::{ready, Ready}; //! use std::future::{ready, Ready};
//! //!
//! const DEFAULT_QUEUE: &'static str = "default"; //! const DEFAULT_QUEUE: &'static str = "default";
@ -35,7 +35,7 @@
//! // Set up our Storage //! // Set up our Storage
//! // For this example, we use the default in-memory storage mechanism //! // For this example, we use the default in-memory storage mechanism
//! use background_jobs_core::memory_storage::Storage; //! use background_jobs_core::memory_storage::Storage;
//! let storage = Storage::new(); //! let storage = Storage::new(ActixTimer);
//! //!
//! // Configure and start our workers //! // Configure and start our workers
//! let queue_handle = WorkerConfig::new(storage, move |_| MyState::new("My App")) //! let queue_handle = WorkerConfig::new(storage, move |_| MyState::new("My App"))
@ -115,7 +115,9 @@
use actix_rt::{Arbiter, ArbiterHandle}; use actix_rt::{Arbiter, ArbiterHandle};
use anyhow::Error; use anyhow::Error;
use background_jobs_core::{new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage}; use background_jobs_core::{
memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage,
};
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
marker::PhantomData, marker::PhantomData,
@ -133,6 +135,22 @@ use self::{every::every, server::Server, worker::LocalWorkerStarter};
pub use background_jobs_core::ActixJob; pub use background_jobs_core::ActixJob;
/// A timer implementation for the Memory Storage backend
#[derive(Debug, Clone)]
pub struct ActixTimer;
#[async_trait::async_trait]
impl Timer for ActixTimer {
async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
where
F: std::future::Future + Send + Sync,
{
actix_rt::time::timeout(duration, future)
.await
.map_err(|_| ())
}
}
/// Manager for worker threads /// Manager for worker threads
/// ///
/// Manager attempts to restart workers as their arbiters die /// Manager attempts to restart workers as their arbiters die

View file

@ -146,7 +146,7 @@ pub mod memory_storage {
/// Race a future against the clock, returning an empty tuple if the clock wins /// Race a future against the clock, returning an empty tuple if the clock wins
async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()> async fn timeout<F>(&self, duration: Duration, future: F) -> Result<F::Output, ()>
where where
F: std::future::Future; F: std::future::Future + Send + Sync;
} }
#[derive(Clone)] #[derive(Clone)]

View file

@ -19,4 +19,5 @@ sled = "0.34"
serde_cbor = "0.11" serde_cbor = "0.11"
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["rt", "sync"] } tokio = { version = "1", default-features = false, features = ["rt", "sync"] }
tracing = "0.1"
uuid = { version = "1", features = ["v4", "serde"] } uuid = { version = "1", features = ["v4", "serde"] }

View file

@ -22,7 +22,7 @@ use sled::{Db, Tree};
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
time::{Duration, SystemTime}, time::{Duration, Instant, SystemTime},
}; };
use tokio::sync::Notify; use tokio::sync::Notify;
use uuid::Uuid; use uuid::Uuid;
@ -114,6 +114,12 @@ impl background_jobs_core::Storage for Storage {
async fn fetch_job_from_queue(&self, queue: &str) -> Result<JobInfo> { async fn fetch_job_from_queue(&self, queue: &str) -> Result<JobInfo> {
loop { loop {
let notifier = self.notifier(queue.to_owned());
let notified = notifier.notified();
tokio::pin!(notified);
notified.as_mut().enable();
let this = self.clone(); let this = self.clone();
let queue2 = queue.to_owned(); let queue2 = queue.to_owned();
@ -194,9 +200,10 @@ impl background_jobs_core::Storage for Storage {
}) })
.await?; .await?;
let notifier = self.notifier(queue.to_owned()); let before = Instant::now();
tracing::debug!("Waiting for notification for at most {:?}", duration);
let _ = timeout(duration, notifier.notified()).await; let _ = timeout(duration, notified).await;
tracing::debug!("Notified after {:?}", before.elapsed());
} }
} }

View file

@ -161,7 +161,7 @@
//! `background-jobs-core` crate, which provides the Job trait, as well as some //! `background-jobs-core` crate, which provides the Job trait, as well as some
//! other useful types for implementing a jobs processor and job store. //! other useful types for implementing a jobs processor and job store.
pub use background_jobs_core::{memory_storage, Backoff, Job, JobStat, MaxRetries, Stats}; pub use background_jobs_core::{Backoff, Job, JobStat, MaxRetries, Stats};
pub mod dev { pub mod dev {
//! Useful types and methods for developing Storage and Processor implementations. //! Useful types and methods for developing Storage and Processor implementations.
@ -171,5 +171,12 @@ pub mod dev {
}; };
} }
pub mod memory_storage {
pub use background_jobs_core::memory_storage::{Storage, Timer};
#[cfg(feature = "background-jobs-actix")]
pub use background_jobs_actix::ActixTimer;
}
#[cfg(feature = "background-jobs-actix")] #[cfg(feature = "background-jobs-actix")]
pub use background_jobs_actix::{ActixJob, Manager, QueueHandle, WorkerConfig}; pub use background_jobs_actix::{ActixJob, Manager, QueueHandle, WorkerConfig};