jobs-actix: simplify manager, add _with_threads option, add metrics

This commit is contained in:
asonix 2022-11-19 14:40:23 -06:00
parent 54b0b0fb0e
commit 3c34d66e48
5 changed files with 75 additions and 132 deletions

View file

@ -1,7 +1,7 @@
[package] [package]
name = "background-jobs-actix" name = "background-jobs-actix"
description = "in-process jobs processor based on Actix" description = "in-process jobs processor based on Actix"
version = "0.13.1" version = "0.14.0"
license = "AGPL-3.0" license = "AGPL-3.0"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
repository = "https://git.asonix.dog/asonix/background-jobs" repository = "https://git.asonix.dog/asonix/background-jobs"
@ -14,9 +14,10 @@ actix-rt = "2.5.1"
anyhow = "1.0" anyhow = "1.0"
async-mutex = "1.0.1" async-mutex = "1.0.1"
async-trait = "0.1.24" async-trait = "0.1.24"
background-jobs-core = { version = "0.13.0", path = "../jobs-core", features = [ background-jobs-core = { version = "0.14.0", path = "../jobs-core", features = [
"with-actix", "with-actix",
] } ] }
metrics = "0.20.1"
tracing = "0.1" tracing = "0.1"
tracing-futures = "0.2" tracing-futures = "0.2"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View file

@ -116,22 +116,24 @@
use actix_rt::{Arbiter, ArbiterHandle}; use actix_rt::{Arbiter, ArbiterHandle};
use anyhow::Error; use anyhow::Error;
use background_jobs_core::{ use background_jobs_core::{
memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Stats, Storage, memory_storage::Timer, new_job, new_scheduled_job, Job, ProcessorMap, Storage,
}; };
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
marker::PhantomData, marker::PhantomData,
num::NonZeroUsize,
ops::Deref, ops::Deref,
sync::Arc, sync::Arc,
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
use tokio::sync::Notify;
mod every; mod every;
mod server; mod server;
mod storage; mod storage;
mod worker; mod worker;
use self::{every::every, server::Server, worker::LocalWorkerStarter}; use self::{every::every, server::Server};
pub use background_jobs_core::ActixJob; pub use background_jobs_core::ActixJob;
@ -153,65 +155,61 @@ impl Timer for ActixTimer {
/// Manager for worker threads /// Manager for worker threads
/// ///
/// Manager attempts to restart workers as their arbiters die /// Manager attempts to restart workers as their arbiters die. Dropping the manager kills the
/// workers
pub struct Manager { pub struct Manager {
// the manager arbiter // the manager arbiter
arbiter_dropper: Option<ArbiterDropper>, arbiter: Option<Arbiter>,
// handle for queueing // handle for queueing
queue_handle: QueueHandle, queue_handle: QueueHandle,
} }
struct ArbiterDropper {
arbiter: Option<Arbiter>,
}
impl Manager { impl Manager {
/// Create a new manager to keep jobs alive /// Create a new manager to keep jobs alive
/// ///
/// Manager works by startinng a new Arbiter to run jobs, and if that arbiter ever dies, it /// Manager works by startinng a new Arbiter to run jobs, and if that arbiter ever dies, it
/// spins up another one and spawns the workers again /// spins up another one and spawns the workers again
fn new<State>(worker_config: WorkerConfig<State, Managed>) -> Self fn new<State>(worker_config: WorkerConfig<State, Managed>, thread_count: NonZeroUsize) -> Self
where where
State: Clone, State: Clone,
{ {
let manager_arbiter = Arbiter::new(); let manager_arbiter = Arbiter::new();
let worker_arbiter = Arbiter::new();
let notifier = Arc::new(tokio::sync::Notify::new());
let queue_handle = worker_config.queue_handle.clone(); let queue_handle = worker_config.queue_handle.clone();
let drop_notifier = DropNotifier::new(Arc::clone(&notifier)); for i in 0..thread_count.into() {
let worker_config = worker_config.clone();
manager_arbiter.spawn(async move { manager_arbiter.spawn(async move {
let mut drop_notifier = drop_notifier; let mut worker_arbiter = Arbiter::new();
let mut arbiter_dropper = ArbiterDropper::from(worker_arbiter);
loop { loop {
let notified = notifier.notified(); let notifier = DropNotifier::default();
worker_config.start_managed(&arbiter_dropper.handle(), &drop_notifier); worker_config.start_managed(&worker_arbiter.handle(), &());
notified.await; let notified = notifier.notify.notified();
tracing::warn!("Recovering from dead worker arbiter");
// drop_notifier needs to live at least as long as notifier.notified().await let drop_notifier = notifier.clone();
// in order to ensure we get notified by ticker or a worker, and not ourselves worker_arbiter.spawn(async move {
std::future::pending::<()>().await;
drop(drop_notifier); drop(drop_notifier);
// Assume arbiter is dead if we were notified
let online = arbiter_dropper.spawn(async {});
if online {
panic!("Arbiter should be dead by now");
}
drop(arbiter_dropper);
arbiter_dropper = ArbiterDropper::default();
drop_notifier = DropNotifier::new(Arc::clone(&notifier));
}
}); });
notified.await;
metrics::counter!("background-jobs.worker-arbiter.restart", 1, "number" => i.to_string());
tracing::warn!("Recovering from dead worker arbiter");
worker_arbiter.stop();
let _ = worker_arbiter.join();
worker_arbiter = Arbiter::new();
}
});
}
Manager { Manager {
arbiter_dropper: Some(ArbiterDropper::from(manager_arbiter)), arbiter: Some(manager_arbiter),
queue_handle, queue_handle,
} }
} }
@ -233,61 +231,21 @@ impl Deref for Manager {
impl Drop for Manager { impl Drop for Manager {
fn drop(&mut self) { fn drop(&mut self) {
tracing::warn!("Dropping manager, tearing down workers"); tracing::warn!("Dropping manager, tearing down workers");
self.arbiter_dropper.take(); if let Some(arbiter) = self.arbiter.take() {
}
}
impl Deref for ArbiterDropper {
type Target = Arbiter;
fn deref(&self) -> &Self::Target {
self.arbiter.as_ref().unwrap()
}
}
impl Drop for ArbiterDropper {
fn drop(&mut self) {
tracing::warn!("Stopping and joining arbiter");
let arbiter = self.arbiter.take().unwrap();
arbiter.stop(); arbiter.stop();
let _ = arbiter.join(); let _ = arbiter.join();
} }
}
impl From<Arbiter> for ArbiterDropper {
fn from(arbiter: Arbiter) -> Self {
Self {
arbiter: Some(arbiter),
}
} }
} }
impl Default for ArbiterDropper { #[derive(Clone, Default)]
fn default() -> Self {
ArbiterDropper {
arbiter: Some(Arbiter::new()),
}
}
}
#[derive(Clone)]
struct DropNotifier { struct DropNotifier {
inner: Arc<std::sync::Mutex<Option<Arc<tokio::sync::Notify>>>>, notify: Arc<Notify>,
}
impl DropNotifier {
fn new(notify: Arc<tokio::sync::Notify>) -> Self {
DropNotifier {
inner: Arc::new(std::sync::Mutex::new(Some(notify))),
}
}
} }
impl Drop for DropNotifier { impl Drop for DropNotifier {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(notifier) = self.inner.lock().unwrap().take() { self.notify.notify_waiters();
notifier.notify_waiters();
}
} }
} }
@ -306,8 +264,10 @@ where
} }
/// Marker type for Unmanaged workers /// Marker type for Unmanaged workers
#[derive(Clone)]
pub struct Unmanaged; pub struct Unmanaged;
/// Marker type for Managed workers /// Marker type for Managed workers
#[derive(Clone)]
pub struct Managed; pub struct Managed;
/// Worker Configuration /// Worker Configuration
@ -352,9 +312,14 @@ where
} }
} }
/// Start the workers on a managed arbiter, and return the manager struct /// Start the workers on a managed thread, returning the manager struct
pub fn start(self) -> Manager { pub fn start(self) -> Manager {
Manager::new(self) Self::start_with_threads(self, NonZeroUsize::try_from(1).expect("nonzero"))
}
/// Start the workers on the specified number of managed threads, returning the Manager struct
pub fn start_with_threads(self, thread_count: NonZeroUsize) -> Manager {
Manager::new(self, thread_count)
} }
} }
@ -448,7 +413,7 @@ where
let extras_2 = extras.clone(); let extras_2 = extras.clone();
arbiter.spawn_fn(move || { arbiter.spawn_fn(move || {
drop(LocalWorkerStarter::new( actix_rt::spawn(worker::local_worker(
queue, queue,
processors.cached(), processors.cached(),
server, server,
@ -506,9 +471,4 @@ impl QueueHandle {
{ {
actix_rt::spawn(every(self.clone(), duration, job)); actix_rt::spawn(every(self.clone(), duration, job));
} }
/// Return an overview of the processor's statistics
pub async fn get_stats(&self) -> Result<Stats, Error> {
self.inner.get_stats().await
}
} }

View file

@ -3,7 +3,7 @@ use crate::{
worker::Worker, worker::Worker,
}; };
use anyhow::Error; use anyhow::Error;
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Stats, Storage}; use background_jobs_core::{NewJobInfo, ReturnJobInfo, Storage};
use std::sync::Arc; use std::sync::Arc;
use tracing::{error, trace}; use tracing::{error, trace};
@ -60,8 +60,4 @@ impl Server {
pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> { pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> {
self.storage.return_job(job).await self.storage.return_job(job).await
} }
pub(crate) async fn get_stats(&self) -> Result<Stats, Error> {
self.storage.get_stats().await
}
} }

View file

@ -1,5 +1,5 @@
use anyhow::Error; use anyhow::Error;
use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Stats, Storage}; use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage};
use uuid::Uuid; use uuid::Uuid;
#[async_trait::async_trait] #[async_trait::async_trait]
@ -9,8 +9,6 @@ pub(crate) trait ActixStorage {
async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Error>; async fn request_job(&self, queue: &str, runner_id: Uuid) -> Result<JobInfo, Error>;
async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error>; async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error>;
async fn get_stats(&self) -> Result<Stats, Error>;
} }
pub(crate) struct StorageWrapper<S>(pub(crate) S) pub(crate) struct StorageWrapper<S>(pub(crate) S)
@ -35,8 +33,4 @@ where
async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error> { async fn return_job(&self, ret: ReturnJobInfo) -> Result<(), Error> {
Ok(self.0.return_job(ret).await?) Ok(self.0.return_job(ret).await?)
} }
async fn get_stats(&self) -> Result<Stats, Error> {
Ok(self.0.get_stats().await?)
}
} }

View file

@ -3,7 +3,7 @@ use std::future::Future;
use crate::Server; use crate::Server;
use background_jobs_core::{CachedProcessorMap, JobInfo}; use background_jobs_core::{CachedProcessorMap, JobInfo};
use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::mpsc::{channel, Sender};
use tracing::{error, info, warn, Span}; use tracing::Span;
use tracing_futures::Instrument; use tracing_futures::Instrument;
use uuid::Uuid; use uuid::Uuid;
@ -42,7 +42,7 @@ impl Worker for LocalWorkerHandle {
async fn process(&self, job: JobInfo) -> Result<(), JobInfo> { async fn process(&self, job: JobInfo) -> Result<(), JobInfo> {
match self.tx.clone().send(job).await { match self.tx.clone().send(job).await {
Err(e) => { Err(e) => {
error!("Unable to send job"); tracing::error!("Unable to send job");
Err(e.0) Err(e.0)
} }
_ => Ok(()), _ => Ok(()),
@ -58,31 +58,17 @@ impl Worker for LocalWorkerHandle {
} }
} }
pub(crate) struct LocalWorkerStarter<State: Clone + 'static, Extras: 'static> { struct LocalWorkerStarter<State: Clone + 'static, Extras: 'static> {
queue: String, queue: String,
processors: CachedProcessorMap<State>, processors: CachedProcessorMap<State>,
server: Server, server: Server,
extras: Option<Extras>, extras: Option<Extras>,
} }
impl<State: Clone + 'static, Extras: 'static> LocalWorkerStarter<State, Extras> {
pub(super) fn new(
queue: String,
processors: CachedProcessorMap<State>,
server: Server,
extras: Extras,
) -> Self {
LocalWorkerStarter {
queue,
processors,
server,
extras: Some(extras),
}
}
}
impl<State: Clone + 'static, Extras: 'static> Drop for LocalWorkerStarter<State, Extras> { impl<State: Clone + 'static, Extras: 'static> Drop for LocalWorkerStarter<State, Extras> {
fn drop(&mut self) { fn drop(&mut self) {
metrics::gauge!("background-jobs.worker.running", -1.0, "queue" => self.queue.clone());
let res = std::panic::catch_unwind(|| actix_rt::Arbiter::current().spawn(async move {})); let res = std::panic::catch_unwind(|| actix_rt::Arbiter::current().spawn(async move {}));
let extras = self.extras.take().unwrap(); let extras = self.extras.take().unwrap();
@ -95,7 +81,7 @@ impl<State: Clone + 'static, Extras: 'static> Drop for LocalWorkerStarter<State,
extras, extras,
)); ));
} else { } else {
warn!("Not restarting worker, Arbiter is dead"); tracing::warn!("Not restarting worker, Arbiter is dead");
drop(extras); drop(extras);
} }
} }
@ -110,7 +96,7 @@ where
F: Fn() -> Span, F: Fn() -> Span,
{ {
fn drop(&mut self) { fn drop(&mut self) {
(self.0)().in_scope(|| info!("Worker closing")); (self.0)().in_scope(|| tracing::info!("Worker closing"));
} }
} }
@ -127,21 +113,21 @@ async fn time_job<F: Future + Unpin>(mut future: F, job_id: Uuid) -> <F as Futur
if count > (60 * 60) { if count > (60 * 60) {
if count % (60 * 20) == 0 { if count % (60 * 20) == 0 {
warn!("Job {} is taking a long time: {} hours", job_id, count / 60 / 60); tracing::warn!("Job {} is taking a long time: {} hours", job_id, count / 60 / 60);
} }
} else if count > 60 { } else if count > 60 {
if count % 20 == 0 { if count % 20 == 0 {
warn!("Job {} is taking a long time: {} minutes", job_id, count / 60); tracing::warn!("Job {} is taking a long time: {} minutes", job_id, count / 60);
} }
} else { } else {
info!("Job {} is taking a long time: {} seconds", job_id, count); tracing::info!("Job {} is taking a long time: {} seconds", job_id, count);
} }
} }
} }
} }
} }
async fn local_worker<State, Extras>( pub(crate) async fn local_worker<State, Extras>(
queue: String, queue: String,
processors: CachedProcessorMap<State>, processors: CachedProcessorMap<State>,
server: Server, server: Server,
@ -150,6 +136,8 @@ async fn local_worker<State, Extras>(
State: Clone + 'static, State: Clone + 'static,
Extras: 'static, Extras: 'static,
{ {
metrics::gauge!("background-jobs.worker.running", 1.0, "queue" => queue.clone());
let starter = LocalWorkerStarter { let starter = LocalWorkerStarter {
queue: queue.clone(), queue: queue.clone(),
processors: processors.clone(), processors: processors.clone(),
@ -171,11 +159,13 @@ async fn local_worker<State, Extras>(
.instrument(span.clone()) .instrument(span.clone())
.await .await
{ {
metrics::counter!("background-jobs.worker.failed-request", 1);
let display = format!("{}", e); let display = format!("{}", e);
let debug = format!("{:?}", e); let debug = format!("{:?}", e);
span.record("exception.message", &tracing::field::display(&display)); span.record("exception.message", &tracing::field::display(&display));
span.record("exception.details", &tracing::field::display(&debug)); span.record("exception.details", &tracing::field::display(&debug));
span.in_scope(|| error!("Failed to notify server of ready worker, {}", e)); span.in_scope(|| tracing::error!("Failed to notify server of ready worker, {}", e));
break; break;
} }
drop(span); drop(span);
@ -188,11 +178,13 @@ async fn local_worker<State, Extras>(
let span = handle.span("return"); let span = handle.span("return");
if let Err(e) = server.return_job(return_job).instrument(span.clone()).await { if let Err(e) = server.return_job(return_job).instrument(span.clone()).await {
metrics::counter!("background-jobs.worker.failed-return", 1);
let display = format!("{}", e); let display = format!("{}", e);
let debug = format!("{:?}", e); let debug = format!("{:?}", e);
span.record("exception.message", &tracing::field::display(&display)); span.record("exception.message", &tracing::field::display(&display));
span.record("exception.details", &tracing::field::display(&debug)); span.record("exception.details", &tracing::field::display(&debug));
span.in_scope(|| warn!("Failed to return completed job, {}", e)); span.in_scope(|| tracing::warn!("Failed to return completed job, {}", e));
} }
continue; continue;