diff --git a/jobs-core/src/actix_job.rs b/jobs-core/src/actix_job.rs index 56e4f8b..70e6aac 100644 --- a/jobs-core/src/actix_job.rs +++ b/jobs-core/src/actix_job.rs @@ -63,6 +63,11 @@ pub trait ActixJob: Serialize + DeserializeOwned + 'static { /// an actor in an actix-based system. fn run(self, state: Self::State) -> Self::Future; + /// Generate a Span that the job will be processed within + fn span(&self) -> Option { + None + } + /// If this job should not use it's default queue, this can be overridden in /// user-code. fn queue(&self) -> &str { @@ -125,6 +130,10 @@ where }) } + fn span(&self) -> Option { + ActixJob::span(self) + } + fn queue(&self) -> &str { ActixJob::queue(self) } diff --git a/jobs-core/src/job.rs b/jobs-core/src/job.rs index 1021f5f..da2b0de 100644 --- a/jobs-core/src/job.rs +++ b/jobs-core/src/job.rs @@ -4,6 +4,8 @@ use chrono::{offset::Utc, DateTime}; use serde::{de::DeserializeOwned, ser::Serialize}; use serde_json::Value; use std::{future::Future, pin::Pin}; +use tracing::Span; +use tracing_futures::Instrument; /// The Job trait defines parameters pertaining to an instance of background job /// @@ -95,6 +97,11 @@ pub trait Job: Serialize + DeserializeOwned + 'static { /// an actor in an actix-based system. fn run(self, state: Self::State) -> Self::Future; + /// Generate a Span that the job will be processed within + fn span(&self) -> Option { + None + } + /// If this job should not use it's default queue, this can be overridden in /// user-code. fn queue(&self) -> &str { @@ -159,10 +166,24 @@ pub fn process( where J: Job, { - let res = serde_json::from_value::(args).map(move |job| job.run(state)); + let res = serde_json::from_value::(args).map(move |job| { + if let Some(span) = job.span() { + let fut = span.in_scope(move || job.run(state)); + + (fut, Some(span)) + } else { + (job.run(state), None) + } + }); Box::pin(async move { - res?.await?; + let (fut, span) = res?; + + if let Some(span) = span { + fut.instrument(span).await?; + } else { + fut.await?; + } Ok(()) })