Allow jobs to wrap themselves in their own span

This commit is contained in:
Aode (Lion) 2021-09-21 10:32:48 -05:00
parent fbd614474f
commit 1d30a9472c
2 changed files with 32 additions and 2 deletions

View file

@ -63,6 +63,11 @@ pub trait ActixJob: Serialize + DeserializeOwned + 'static {
/// an actor in an actix-based system. /// an actor in an actix-based system.
fn run(self, state: Self::State) -> Self::Future; fn run(self, state: Self::State) -> Self::Future;
/// Generate a Span that the job will be processed within
fn span(&self) -> Option<Span> {
None
}
/// If this job should not use it's default queue, this can be overridden in /// If this job should not use it's default queue, this can be overridden in
/// user-code. /// user-code.
fn queue(&self) -> &str { fn queue(&self) -> &str {
@ -125,6 +130,10 @@ where
}) })
} }
fn span(&self) -> Option<Span> {
ActixJob::span(self)
}
fn queue(&self) -> &str { fn queue(&self) -> &str {
ActixJob::queue(self) ActixJob::queue(self)
} }

View file

@ -4,6 +4,8 @@ use chrono::{offset::Utc, DateTime};
use serde::{de::DeserializeOwned, ser::Serialize}; use serde::{de::DeserializeOwned, ser::Serialize};
use serde_json::Value; use serde_json::Value;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use tracing::Span;
use tracing_futures::Instrument;
/// The Job trait defines parameters pertaining to an instance of background job /// The Job trait defines parameters pertaining to an instance of background job
/// ///
@ -95,6 +97,11 @@ pub trait Job: Serialize + DeserializeOwned + 'static {
/// an actor in an actix-based system. /// an actor in an actix-based system.
fn run(self, state: Self::State) -> Self::Future; fn run(self, state: Self::State) -> Self::Future;
/// Generate a Span that the job will be processed within
fn span(&self) -> Option<Span> {
None
}
/// If this job should not use it's default queue, this can be overridden in /// If this job should not use it's default queue, this can be overridden in
/// user-code. /// user-code.
fn queue(&self) -> &str { fn queue(&self) -> &str {
@ -159,10 +166,24 @@ pub fn process<J>(
where where
J: Job, J: Job,
{ {
let res = serde_json::from_value::<J>(args).map(move |job| job.run(state)); let res = serde_json::from_value::<J>(args).map(move |job| {
if let Some(span) = job.span() {
let fut = span.in_scope(move || job.run(state));
(fut, Some(span))
} else {
(job.run(state), None)
}
});
Box::pin(async move { Box::pin(async move {
res?.await?; let (fut, span) = res?;
if let Some(span) = span {
fut.instrument(span).await?;
} else {
fut.await?;
}
Ok(()) Ok(())
}) })