From 68a80085dedbe01d2952adfef9c810661e4aa513 Mon Sep 17 00:00:00 2001 From: "Aode (lion)" Date: Fri, 17 Sep 2021 17:09:55 -0500 Subject: [PATCH] Remove lingering .enter calls --- examples/actix-example/src/main.rs | 2 +- jobs-actix/src/worker.rs | 1 - jobs-core/src/actix_job.rs | 21 ++++---- jobs-core/src/processor_map.rs | 83 +++++++++++++++--------------- 4 files changed, 52 insertions(+), 55 deletions(-) diff --git a/examples/actix-example/src/main.rs b/examples/actix-example/src/main.rs index fe283ad..adb8d7d 100644 --- a/examples/actix-example/src/main.rs +++ b/examples/actix-example/src/main.rs @@ -119,6 +119,6 @@ impl Job for PanickingJob { const MAX_RETRIES: MaxRetries = MaxRetries::Count(0); fn run(self, _: MyState) -> Self::Future { - panic!("A panicking job does not stop others from running") + panic!("boom") } } diff --git a/jobs-actix/src/worker.rs b/jobs-actix/src/worker.rs index 755edbe..f47b168 100644 --- a/jobs-actix/src/worker.rs +++ b/jobs-actix/src/worker.rs @@ -53,7 +53,6 @@ pub(crate) fn local_worker( let id = Uuid::new_v4(); let span = tracing::info_span!( - parent: None, "Worker", worker.id = tracing::field::display(&id), worker.queue = tracing::field::display(&queue), diff --git a/jobs-core/src/actix_job.rs b/jobs-core/src/actix_job.rs index e0101b3..56e4f8b 100644 --- a/jobs-core/src/actix_job.rs +++ b/jobs-core/src/actix_job.rs @@ -107,20 +107,17 @@ where fn run(self, state: Self::State) -> Self::Future { let (tx, rx) = oneshot::channel(); - let span = Span::current(); - let handle = actix_rt::spawn(async move { - let entered = span.enter(); - let fut = ActixJob::run(self, state); - drop(entered); + let fut = ActixJob::run(self, state); + let handle = actix_rt::spawn( + async move { + let result = fut.await; - let result = fut.instrument(span.clone()).await; - - if tx.send(result).is_err() { - let entered = span.enter(); - error!("Job dropped"); - drop(entered); + if tx.send(result).is_err() { + error!("Job dropped"); + } } - }); + .instrument(Span::current()), + ); Box::pin(async move { handle.await.unwrap(); diff --git a/jobs-core/src/processor_map.rs b/jobs-core/src/processor_map.rs index db6674e..841effd 100644 --- a/jobs-core/src/processor_map.rs +++ b/jobs-core/src/processor_map.rs @@ -79,31 +79,33 @@ where /// intended for internal use. pub async fn process(&self, job: JobInfo) -> ReturnJobInfo { let span = job_span(&job); - let opt = self.inner.get(job.name()).map(|name| { - let entered = span.enter(); - let fut = process(Arc::clone(name), (self.state_fn)(), job.clone()); - drop(entered); - fut - }); - let res = if let Some(fut) = opt { - fut.instrument(span.clone()).await - } else { - span.record( - "exception.message", - &tracing::field::display("Not registered"), - ); - span.record( - "exception.details", - &tracing::field::display("Not registered"), - ); - let entered = span.enter(); - error!("Not registered"); - drop(entered); - ReturnJobInfo::unregistered(job.id()) + let fut = async move { + let opt = self + .inner + .get(job.name()) + .map(|name| process(Arc::clone(name), (self.state_fn)(), job.clone())); + + let res = if let Some(fut) = opt { + fut.await + } else { + let span = Span::current(); + span.record( + "exception.message", + &tracing::field::display("Not registered"), + ); + span.record( + "exception.details", + &tracing::field::display("Not registered"), + ); + error!("Not registered"); + ReturnJobInfo::unregistered(job.id()) + }; + + res }; - res + fut.instrument(span).await } } @@ -118,28 +120,27 @@ where pub async fn process(&self, job: JobInfo) -> ReturnJobInfo { let span = job_span(&job); - let res = if let Some(name) = self.inner.get(job.name()) { - let entered = span.enter(); - let fut = process(Arc::clone(name), self.state.clone(), job); - drop(entered); + let fut = async move { + let res = if let Some(name) = self.inner.get(job.name()) { + process(Arc::clone(name), self.state.clone(), job).await + } else { + let span = Span::current(); + span.record( + "exception.message", + &tracing::field::display("Not registered"), + ); + span.record( + "exception.details", + &tracing::field::display("Not registered"), + ); + error!("Not registered"); + ReturnJobInfo::unregistered(job.id()) + }; - fut.instrument(span.clone()).await - } else { - let entered = span.enter(); - span.record( - "exception.message", - &tracing::field::display("Not registered"), - ); - span.record( - "exception.details", - &tracing::field::display("Not registered"), - ); - error!("Not registered"); - drop(entered); - ReturnJobInfo::unregistered(job.id()) + res }; - res + fut.instrument(span).await } }