mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-22 03:51:00 +00:00
Don't have long-running spans
This commit is contained in:
parent
1d30a9472c
commit
cce5a97306
2 changed files with 45 additions and 33 deletions
|
@ -1,7 +1,7 @@
|
||||||
[package]
|
[package]
|
||||||
name = "background-jobs-actix"
|
name = "background-jobs-actix"
|
||||||
description = "in-process jobs processor based on Actix"
|
description = "in-process jobs processor based on Actix"
|
||||||
version = "0.10.0"
|
version = "0.10.1"
|
||||||
license = "AGPL-3.0"
|
license = "AGPL-3.0"
|
||||||
authors = ["asonix <asonix@asonix.dog>"]
|
authors = ["asonix <asonix@asonix.dog>"]
|
||||||
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
|
repository = "https://git.asonix.dog/Aardwolf/background-jobs"
|
||||||
|
|
|
@ -2,7 +2,7 @@ use crate::Server;
|
||||||
use actix_rt::spawn;
|
use actix_rt::spawn;
|
||||||
use background_jobs_core::{CachedProcessorMap, JobInfo};
|
use background_jobs_core::{CachedProcessorMap, JobInfo};
|
||||||
use tokio::sync::mpsc::{channel, Sender};
|
use tokio::sync::mpsc::{channel, Sender};
|
||||||
use tracing::{debug, error, info, warn, Span};
|
use tracing::{error, info, warn, Span};
|
||||||
use tracing_futures::Instrument;
|
use tracing_futures::Instrument;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
@ -22,6 +22,20 @@ pub(crate) struct LocalWorkerHandle {
|
||||||
queue: String,
|
queue: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl LocalWorkerHandle {
|
||||||
|
fn span(&self, operation: &str) -> Span {
|
||||||
|
tracing::info_span!(
|
||||||
|
"Worker",
|
||||||
|
worker.id = tracing::field::display(&self.id),
|
||||||
|
worker.queue = tracing::field::display(&self.queue),
|
||||||
|
worker.operation.id = tracing::field::display(&Uuid::new_v4()),
|
||||||
|
worker.operation.name = tracing::field::display(operation),
|
||||||
|
exception.message = tracing::field::Empty,
|
||||||
|
exception.details = tracing::field::Empty,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl Worker for LocalWorkerHandle {
|
impl Worker for LocalWorkerHandle {
|
||||||
async fn process(&self, job: JobInfo) -> Result<(), JobInfo> {
|
async fn process(&self, job: JobInfo) -> Result<(), JobInfo> {
|
||||||
|
@ -50,50 +64,48 @@ pub(crate) fn local_worker<State>(
|
||||||
) where
|
) where
|
||||||
State: Clone + 'static,
|
State: Clone + 'static,
|
||||||
{
|
{
|
||||||
let id = Uuid::new_v4();
|
spawn(async move {
|
||||||
|
let id = Uuid::new_v4();
|
||||||
|
let (tx, mut rx) = channel(16);
|
||||||
|
|
||||||
let span = tracing::info_span!(
|
let handle = LocalWorkerHandle { tx, id, queue };
|
||||||
"Worker",
|
|
||||||
worker.id = tracing::field::display(&id),
|
|
||||||
worker.queue = tracing::field::display(&queue),
|
|
||||||
exception.message = tracing::field::Empty,
|
|
||||||
exception.details = tracing::field::Empty,
|
|
||||||
);
|
|
||||||
|
|
||||||
spawn(
|
loop {
|
||||||
async move {
|
let span = handle.span("request");
|
||||||
let (tx, mut rx) = channel(16);
|
if let Err(e) = server
|
||||||
|
.request_job(Box::new(handle.clone()))
|
||||||
let handle = LocalWorkerHandle { tx, id, queue };
|
.instrument(span.clone())
|
||||||
|
.await
|
||||||
let span = Span::current();
|
{
|
||||||
|
|
||||||
debug!("Beginning worker loop for {}", id);
|
|
||||||
if let Err(e) = server.request_job(Box::new(handle.clone())).await {
|
|
||||||
let display = format!("{}", e);
|
let display = format!("{}", e);
|
||||||
let debug = format!("{:?}", e);
|
let debug = format!("{:?}", e);
|
||||||
span.record("exception.message", &tracing::field::display(&display));
|
span.record("exception.message", &tracing::field::display(&display));
|
||||||
span.record("exception.details", &tracing::field::display(&debug));
|
span.record("exception.details", &tracing::field::display(&debug));
|
||||||
error!("Failed to notify server of new worker, {}", e);
|
span.in_scope(|| error!("Failed to notify server of ready worker, {}", e));
|
||||||
return;
|
break;
|
||||||
}
|
}
|
||||||
while let Some(job) = rx.recv().await {
|
drop(span);
|
||||||
let return_job = processors.process(job).await;
|
|
||||||
|
|
||||||
if let Err(e) = server.return_job(return_job).await {
|
if let Some(job) = rx.recv().await {
|
||||||
warn!("Failed to return completed job, {}", e);
|
let return_job = processors
|
||||||
}
|
.process(job)
|
||||||
if let Err(e) = server.request_job(Box::new(handle.clone())).await {
|
.instrument(handle.span("process"))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let span = handle.span("return");
|
||||||
|
if let Err(e) = server.return_job(return_job).instrument(span.clone()).await {
|
||||||
let display = format!("{}", e);
|
let display = format!("{}", e);
|
||||||
let debug = format!("{:?}", e);
|
let debug = format!("{:?}", e);
|
||||||
span.record("exception.message", &tracing::field::display(&display));
|
span.record("exception.message", &tracing::field::display(&display));
|
||||||
span.record("exception.details", &tracing::field::display(&debug));
|
span.record("exception.details", &tracing::field::display(&debug));
|
||||||
error!("Failed to notify server of ready worker, {}", e);
|
span.in_scope(|| warn!("Failed to return completed job, {}", e));
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
info!("Worker closing");
|
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
.instrument(span),
|
handle.span("closing").in_scope(|| info!("Worker closing"));
|
||||||
);
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue