actix: simplify worker & server implementations

This commit is contained in:
asonix 2022-12-12 10:37:11 -06:00
parent 8af4af8374
commit 45c5bb946c
2 changed files with 60 additions and 119 deletions

View file

@ -1,11 +1,9 @@
use crate::{ use crate::storage::{ActixStorage, StorageWrapper};
storage::{ActixStorage, StorageWrapper},
worker::Worker,
};
use anyhow::Error; use anyhow::Error;
use background_jobs_core::{NewJobInfo, ReturnJobInfo, Storage}; use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage};
use std::sync::Arc; use std::sync::Arc;
use tracing::{error, trace}; use tracing::trace;
use uuid::Uuid;
/// The server Actor /// The server Actor
/// ///
@ -28,33 +26,16 @@ impl Server {
} }
pub(crate) async fn new_job(&self, job: NewJobInfo) -> Result<(), Error> { pub(crate) async fn new_job(&self, job: NewJobInfo) -> Result<(), Error> {
let ready = job.is_ready(); self.storage.new_job(job).await.map(|_| ())
self.storage.new_job(job).await?;
if !ready {
trace!("New job is not ready for processing yet, returning");
return Ok(());
}
Ok(())
} }
pub(crate) async fn request_job( pub(crate) async fn request_job(
&self, &self,
worker: Box<dyn Worker + Send + Sync + 'static>, worker_id: Uuid,
) -> Result<(), Error> { worker_queue: &str,
trace!("Worker {} requested job", worker.id()); ) -> Result<JobInfo, Error> {
let job = self trace!("Worker {} requested job", worker_id);
.storage self.storage.request_job(worker_queue, worker_id).await
.request_job(worker.queue(), worker.id())
.await?;
if let Err(job) = worker.process(job).await {
error!("Worker {} has hung up", worker.id());
self.storage.return_job(job.unexecuted()).await?;
}
Ok(())
} }
pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> { pub(crate) async fn return_job(&self, job: ReturnJobInfo) -> Result<(), Error> {

View file

@ -1,63 +1,10 @@
use std::future::Future;
use crate::Server; use crate::Server;
use background_jobs_core::{CachedProcessorMap, JobInfo}; use background_jobs_core::CachedProcessorMap;
use tokio::sync::mpsc::{channel, Sender}; use std::future::Future;
use tracing::Span; use tracing::Span;
use tracing_futures::Instrument; use tracing_futures::Instrument;
use uuid::Uuid; use uuid::Uuid;
#[async_trait::async_trait]
pub trait Worker {
async fn process(&self, job: JobInfo) -> Result<(), JobInfo>;
fn id(&self) -> Uuid;
fn queue(&self) -> &str;
}
#[derive(Clone)]
pub(crate) struct LocalWorkerHandle {
tx: Sender<JobInfo>,
id: Uuid,
queue: String,
}
impl LocalWorkerHandle {
fn span(&self, operation: &str) -> Span {
tracing::info_span!(
"Worker",
worker.id = tracing::field::display(&self.id),
worker.queue = tracing::field::display(&self.queue),
worker.operation.id = tracing::field::display(&Uuid::new_v4()),
worker.operation.name = tracing::field::display(operation),
exception.message = tracing::field::Empty,
exception.details = tracing::field::Empty,
)
}
}
#[async_trait::async_trait]
impl Worker for LocalWorkerHandle {
async fn process(&self, job: JobInfo) -> Result<(), JobInfo> {
match self.tx.clone().send(job).await {
Err(e) => {
tracing::error!("Unable to send job");
Err(e.0)
}
_ => Ok(()),
}
}
fn id(&self) -> Uuid {
self.id
}
fn queue(&self) -> &str {
&self.queue
}
}
struct LocalWorkerStarter<State: Clone + 'static, Extras: 'static> { struct LocalWorkerStarter<State: Clone + 'static, Extras: 'static> {
queue: String, queue: String,
processors: CachedProcessorMap<State>, processors: CachedProcessorMap<State>,
@ -146,53 +93,66 @@ pub(crate) async fn local_worker<State, Extras>(
}; };
let id = Uuid::new_v4(); let id = Uuid::new_v4();
let (tx, mut rx) = channel(16);
let handle = LocalWorkerHandle { tx, id, queue }; let log_on_drop = LogOnDrop(|| make_span(id, &queue, "closing"));
let log_on_drop = LogOnDrop(|| handle.span("closing"));
loop { loop {
let span = handle.span("request"); let request_span = make_span(id, &queue, "request");
if let Err(e) = server
.request_job(Box::new(handle.clone())) let job = match request_span
.instrument(span.clone()) .in_scope(|| server.request_job(id, &queue))
.instrument(request_span.clone())
.await .await
{ {
Ok(job) => job,
Err(e) => {
metrics::counter!("background-jobs.worker.failed-request", 1); metrics::counter!("background-jobs.worker.failed-request", 1);
let display = format!("{}", e); let display_val = format!("{}", e);
let debug = format!("{:?}", e); let debug = format!("{:?}", e);
span.record("exception.message", &tracing::field::display(&display)); request_span.record("exception.message", &tracing::field::display(&display_val));
span.record("exception.details", &tracing::field::display(&debug)); request_span.record("exception.details", &tracing::field::display(&debug));
span.in_scope(|| tracing::error!("Failed to notify server of ready worker, {}", e)); request_span
.in_scope(|| tracing::error!("Failed to notify server of ready worker"));
break; break;
} }
drop(span); };
drop(request_span);
if let Some(job) = rx.recv().await {
let job_id = job.id(); let job_id = job.id();
let return_job = time_job(Box::pin(processors.process(job)), job_id) let return_job = time_job(Box::pin(processors.process(job)), job_id)
.instrument(handle.span("process")) .instrument(make_span(id, &queue, "process"))
.await; .await;
let span = handle.span("return"); let return_span = make_span(id, &queue, "return");
if let Err(e) = server.return_job(return_job).instrument(span.clone()).await { if let Err(e) = return_span
.in_scope(|| server.return_job(return_job))
.instrument(return_span.clone())
.await
{
metrics::counter!("background-jobs.worker.failed-return", 1); metrics::counter!("background-jobs.worker.failed-return", 1);
let display = format!("{}", e); let display_val = format!("{}", e);
let debug = format!("{:?}", e); let debug = format!("{:?}", e);
span.record("exception.message", &tracing::field::display(&display)); return_span.record("exception.message", &tracing::field::display(&display_val));
span.record("exception.details", &tracing::field::display(&debug)); return_span.record("exception.details", &tracing::field::display(&debug));
span.in_scope(|| tracing::warn!("Failed to return completed job, {}", e)); return_span.in_scope(|| tracing::warn!("Failed to return completed job"));
} }
drop(return_span);
continue;
}
break;
} }
drop(log_on_drop); drop(log_on_drop);
drop(starter); drop(starter);
} }
fn make_span(id: Uuid, queue: &str, operation: &str) -> Span {
tracing::info_span!(
"Worker",
worker.id = tracing::field::display(id),
worker.queue = tracing::field::display(queue),
worker.operation.id = tracing::field::display(&Uuid::new_v4()),
worker.operation.name = tracing::field::display(operation),
exception.message = tracing::field::Empty,
exception.details = tracing::field::Empty,
)
}