actix: imlement restarting for ticker, workers if Arbiter is still live

This commit is contained in:
Aode (lion) 2021-10-29 14:54:31 -05:00
parent c93fec98a0
commit c48d42ead6
4 changed files with 93 additions and 17 deletions

View file

@ -1,7 +1,7 @@
[package]
name = "background-jobs-actix"
description = "in-process jobs processor based on Actix"
version = "0.9.3"
version = "0.9.4"
license-file = "../LICENSE"
authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/Aardwolf/background-jobs"

View file

@ -143,7 +143,22 @@ pub fn create_server<S>(storage: S) -> QueueHandle
where
S: Storage + Sync + 'static,
{
let arbiter = Arbiter::current();
create_server_in_arbiter_handle(Arbiter::current(), storage)
}
/// Create a new server in the provided Arbiter
pub fn create_server_in_arbiter<S>(arbiter: &Arbiter, storage: S) -> QueueHandle
where
S: Storage + Sync + 'static,
{
create_server_in_arbiter_handle(arbiter.handle(), storage)
}
/// Create a new server in the provided ArbiterHandle
pub fn create_server_in_arbiter_handle<S>(arbiter: ArbiterHandle, storage: S) -> QueueHandle
where
S: Storage + Sync + 'static,
{
QueueHandle {
inner: Server::new(&arbiter, storage),
arbiter,

View file

@ -9,7 +9,7 @@ use actix_rt::{
use anyhow::Error;
use async_mutex::Mutex;
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage};
use log::{error, trace};
use log::{error, trace, warn};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
@ -31,9 +31,46 @@ pub(crate) struct ServerCache {
pub(crate) struct Server {
storage: Arc<dyn ActixStorage + Send + Sync>,
cache: ServerCache,
arbiter: ArbiterHandle,
}
struct Ticker {
server: Server,
}
impl Drop for Ticker {
fn drop(&mut self) {
let online = self.server.arbiter.spawn(async move {});
if online {
let server = self.server.clone();
self.server.arbiter.spawn(async move {
// ensure new ticker is spawned when existing ticker dies
let _ticker = server.ticker();
let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
loop {
interval.tick().await;
if let Err(e) = server.check_db().await {
error!("Error while checking database for new jobs, {}", e);
}
}
});
} else {
warn!("Not restarting ticker, arbiter has died");
}
}
}
impl Server {
fn ticker(&self) -> Ticker {
Ticker {
server: self.clone(),
}
}
/// Create a new Server from a compatible storage implementation
pub(crate) fn new<S>(arbiter: &ArbiterHandle, storage: S) -> Self
where
@ -42,21 +79,12 @@ impl Server {
let server = Server {
storage: Arc::new(StorageWrapper(storage)),
cache: ServerCache::new(),
arbiter: arbiter.clone(),
};
let server2 = server.clone();
arbiter.spawn(async move {
let mut interval = interval_at(Instant::now(), Duration::from_secs(1));
drop(server.ticker());
loop {
interval.tick().await;
if let Err(e) = server.check_db().await {
error!("Error while checking database for new jobs, {}", e);
}
}
});
server2
server
}
async fn check_db(&self) -> Result<(), Error> {

View file

@ -1,7 +1,7 @@
use crate::Server;
use actix_rt::spawn;
use actix_rt::{spawn, Arbiter};
use background_jobs_core::{CachedProcessorMap, JobInfo};
use log::{debug, error, warn};
use log::{debug, error, info, warn};
use tokio::sync::mpsc::{channel, Sender};
use uuid::Uuid;
@ -42,6 +42,32 @@ impl Worker for LocalWorkerHandle {
}
}
struct LocalWorkerStarter<State: Clone + 'static> {
queue: String,
processors: CachedProcessorMap<State>,
server: Server,
}
impl<State: Clone + 'static> Drop for LocalWorkerStarter<State> {
fn drop(&mut self) {
let res = std::panic::catch_unwind(|| {
let handle = Arbiter::current();
handle.spawn(async move {})
});
if let Ok(true) = res {
local_worker(
self.queue.clone(),
self.processors.clone(),
self.server.clone(),
)
} else {
warn!("Not restarting worker, arbiter has died");
}
}
}
struct WarnOnDrop(Uuid);
impl Drop for WarnOnDrop {
@ -57,6 +83,11 @@ pub(crate) fn local_worker<State>(
) where
State: Clone + 'static,
{
let starter = LocalWorkerStarter {
queue: queue.clone(),
processors: processors.clone(),
server: server.clone(),
};
let id = Uuid::new_v4();
let (tx, mut rx) = channel(16);
@ -64,6 +95,7 @@ pub(crate) fn local_worker<State>(
let handle = LocalWorkerHandle { tx, id, queue };
spawn(async move {
info!("Starting worker {}", id);
let warn_on_drop = WarnOnDrop(id);
debug!("Beginning worker loop for {}", id);
if let Err(e) = server.request_job(Box::new(handle.clone())).await {
@ -82,5 +114,6 @@ pub(crate) fn local_worker<State>(
}
}
drop(warn_on_drop);
drop(starter);
});
}