Remove lingering .enter calls

This commit is contained in:
Aode (lion) 2021-09-17 17:09:55 -05:00
parent ba60a5cbe0
commit 68a80085de
4 changed files with 52 additions and 55 deletions

View file

@ -119,6 +119,6 @@ impl Job for PanickingJob {
const MAX_RETRIES: MaxRetries = MaxRetries::Count(0); const MAX_RETRIES: MaxRetries = MaxRetries::Count(0);
fn run(self, _: MyState) -> Self::Future { fn run(self, _: MyState) -> Self::Future {
panic!("A panicking job does not stop others from running") panic!("boom")
} }
} }

View file

@ -53,7 +53,6 @@ pub(crate) fn local_worker<State>(
let id = Uuid::new_v4(); let id = Uuid::new_v4();
let span = tracing::info_span!( let span = tracing::info_span!(
parent: None,
"Worker", "Worker",
worker.id = tracing::field::display(&id), worker.id = tracing::field::display(&id),
worker.queue = tracing::field::display(&queue), worker.queue = tracing::field::display(&queue),

View file

@ -107,20 +107,17 @@ where
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let span = Span::current();
let handle = actix_rt::spawn(async move {
let entered = span.enter();
let fut = ActixJob::run(self, state); let fut = ActixJob::run(self, state);
drop(entered); let handle = actix_rt::spawn(
async move {
let result = fut.instrument(span.clone()).await; let result = fut.await;
if tx.send(result).is_err() { if tx.send(result).is_err() {
let entered = span.enter();
error!("Job dropped"); error!("Job dropped");
drop(entered);
} }
}); }
.instrument(Span::current()),
);
Box::pin(async move { Box::pin(async move {
handle.await.unwrap(); handle.await.unwrap();

View file

@ -79,16 +79,17 @@ where
/// intended for internal use. /// intended for internal use.
pub async fn process(&self, job: JobInfo) -> ReturnJobInfo { pub async fn process(&self, job: JobInfo) -> ReturnJobInfo {
let span = job_span(&job); let span = job_span(&job);
let opt = self.inner.get(job.name()).map(|name| {
let entered = span.enter(); let fut = async move {
let fut = process(Arc::clone(name), (self.state_fn)(), job.clone()); let opt = self
drop(entered); .inner
fut .get(job.name())
}); .map(|name| process(Arc::clone(name), (self.state_fn)(), job.clone()));
let res = if let Some(fut) = opt { let res = if let Some(fut) = opt {
fut.instrument(span.clone()).await fut.await
} else { } else {
let span = Span::current();
span.record( span.record(
"exception.message", "exception.message",
&tracing::field::display("Not registered"), &tracing::field::display("Not registered"),
@ -97,13 +98,14 @@ where
"exception.details", "exception.details",
&tracing::field::display("Not registered"), &tracing::field::display("Not registered"),
); );
let entered = span.enter();
error!("Not registered"); error!("Not registered");
drop(entered);
ReturnJobInfo::unregistered(job.id()) ReturnJobInfo::unregistered(job.id())
}; };
res res
};
fut.instrument(span).await
} }
} }
@ -118,14 +120,11 @@ where
pub async fn process(&self, job: JobInfo) -> ReturnJobInfo { pub async fn process(&self, job: JobInfo) -> ReturnJobInfo {
let span = job_span(&job); let span = job_span(&job);
let fut = async move {
let res = if let Some(name) = self.inner.get(job.name()) { let res = if let Some(name) = self.inner.get(job.name()) {
let entered = span.enter(); process(Arc::clone(name), self.state.clone(), job).await
let fut = process(Arc::clone(name), self.state.clone(), job);
drop(entered);
fut.instrument(span.clone()).await
} else { } else {
let entered = span.enter(); let span = Span::current();
span.record( span.record(
"exception.message", "exception.message",
&tracing::field::display("Not registered"), &tracing::field::display("Not registered"),
@ -135,11 +134,13 @@ where
&tracing::field::display("Not registered"), &tracing::field::display("Not registered"),
); );
error!("Not registered"); error!("Not registered");
drop(entered);
ReturnJobInfo::unregistered(job.id()) ReturnJobInfo::unregistered(job.id())
}; };
res res
};
fut.instrument(span).await
} }
} }