mirror of
https://git.asonix.dog/asonix/background-jobs.git
synced 2024-11-22 03:51:00 +00:00
actix: Improve tracing
This commit is contained in:
parent
1a835863d4
commit
ee91b27933
4 changed files with 15 additions and 13 deletions
|
@ -12,7 +12,6 @@ edition = "2021"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-rt = "2.5.1"
|
actix-rt = "2.5.1"
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
async-mutex = "1.0.1"
|
|
||||||
async-trait = "0.1.24"
|
async-trait = "0.1.24"
|
||||||
background-jobs-core = { version = "0.14.0", path = "../jobs-core", features = [
|
background-jobs-core = { version = "0.14.0", path = "../jobs-core", features = [
|
||||||
"with-actix",
|
"with-actix",
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use crate::{Job, QueueHandle};
|
use crate::{Job, QueueHandle};
|
||||||
use actix_rt::time::{interval_at, Instant};
|
use actix_rt::time::{interval_at, Instant};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
/// A type used to schedule recurring jobs.
|
/// A type used to schedule recurring jobs.
|
||||||
///
|
///
|
||||||
|
@ -20,7 +19,7 @@ where
|
||||||
|
|
||||||
let job = job.clone();
|
let job = job.clone();
|
||||||
if spawner.queue::<J>(job).await.is_err() {
|
if spawner.queue::<J>(job).await.is_err() {
|
||||||
error!("Failed to queue job: {}", J::NAME);
|
tracing::error!("Failed to queue job: {}", J::NAME);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ use crate::storage::{ActixStorage, StorageWrapper};
|
||||||
use anyhow::Error;
|
use anyhow::Error;
|
||||||
use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage};
|
use background_jobs_core::{JobInfo, NewJobInfo, ReturnJobInfo, Storage};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tracing::trace;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
/// The server Actor
|
/// The server Actor
|
||||||
|
@ -34,7 +33,7 @@ impl Server {
|
||||||
worker_id: Uuid,
|
worker_id: Uuid,
|
||||||
worker_queue: &str,
|
worker_queue: &str,
|
||||||
) -> Result<JobInfo, Error> {
|
) -> Result<JobInfo, Error> {
|
||||||
trace!("Worker {} requested job", worker_id);
|
tracing::trace!("Worker {} requested job", worker_id);
|
||||||
self.storage.request_job(worker_queue, worker_id).await
|
self.storage.request_job(worker_queue, worker_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,16 +34,16 @@ impl<State: Clone + 'static, Extras: 'static> Drop for LocalWorkerStarter<State,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct LogOnDrop<F>(F)
|
struct RunOnDrop<F>(F)
|
||||||
where
|
where
|
||||||
F: Fn() -> Span;
|
F: Fn();
|
||||||
|
|
||||||
impl<F> Drop for LogOnDrop<F>
|
impl<F> Drop for RunOnDrop<F>
|
||||||
where
|
where
|
||||||
F: Fn() -> Span,
|
F: Fn(),
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
(self.0)().in_scope(|| tracing::info!("Worker closing"));
|
(self.0)();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,7 +94,9 @@ pub(crate) async fn local_worker<State, Extras>(
|
||||||
|
|
||||||
let id = Uuid::new_v4();
|
let id = Uuid::new_v4();
|
||||||
|
|
||||||
let log_on_drop = LogOnDrop(|| make_span(id, &queue, "closing"));
|
let log_on_drop = RunOnDrop(|| {
|
||||||
|
make_span(id, &queue, "closing").in_scope(|| tracing::warn!("Worker closing"));
|
||||||
|
});
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let request_span = make_span(id, &queue, "request");
|
let request_span = make_span(id, &queue, "request");
|
||||||
|
@ -119,9 +121,11 @@ pub(crate) async fn local_worker<State, Extras>(
|
||||||
};
|
};
|
||||||
drop(request_span);
|
drop(request_span);
|
||||||
|
|
||||||
|
let process_span = make_span(id, &queue, "process");
|
||||||
let job_id = job.id();
|
let job_id = job.id();
|
||||||
let return_job = time_job(Box::pin(processors.process(job)), job_id)
|
let return_job = process_span
|
||||||
.instrument(make_span(id, &queue, "process"))
|
.in_scope(|| time_job(Box::pin(processors.process(job)), job_id))
|
||||||
|
.instrument(process_span)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let return_span = make_span(id, &queue, "return");
|
let return_span = make_span(id, &queue, "return");
|
||||||
|
@ -147,6 +151,7 @@ pub(crate) async fn local_worker<State, Extras>(
|
||||||
|
|
||||||
fn make_span(id: Uuid, queue: &str, operation: &str) -> Span {
|
fn make_span(id: Uuid, queue: &str, operation: &str) -> Span {
|
||||||
tracing::info_span!(
|
tracing::info_span!(
|
||||||
|
parent: None,
|
||||||
"Worker",
|
"Worker",
|
||||||
worker.id = tracing::field::display(id),
|
worker.id = tracing::field::display(id),
|
||||||
worker.queue = tracing::field::display(queue),
|
worker.queue = tracing::field::display(queue),
|
||||||
|
|
Loading…
Reference in a new issue