jobs-actix: spawn with task names

This commit is contained in:
asonix 2024-01-10 21:11:45 -06:00
parent e663dbe62e
commit 495977b8d8
7 changed files with 61 additions and 30 deletions

View file

@ -23,5 +23,6 @@ tokio = { version = "1", default-features = false, features = [
"macros",
"rt",
"sync",
"tracing",
] }
uuid = { version = "1", features = ["v7", "serde"] }

View file

@ -1,13 +1,14 @@
use std::future::Future;
use background_jobs_core::{JoinError, UnsendSpawner};
use tokio::task::JoinHandle;
/// Provide a spawner for actix-based systems for Unsend Jobs
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ActixSpawner;
#[doc(hidden)]
pub struct ActixHandle<T>(actix_rt::task::JoinHandle<T>);
pub struct ActixHandle<T>(Option<JoinHandle<T>>);
impl UnsendSpawner for ActixSpawner {
type Handle<T> = ActixHandle<T> where T: Send;
@ -17,7 +18,7 @@ impl UnsendSpawner for ActixSpawner {
Fut: Future + 'static,
Fut::Output: Send + 'static,
{
ActixHandle(actix_rt::spawn(future))
ActixHandle(crate::spawn::spawn("job-task", future).ok())
}
}
@ -30,14 +31,19 @@ impl<T> Future for ActixHandle<T> {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let res = std::task::ready!(std::pin::Pin::new(&mut self.0).poll(cx));
if let Some(mut handle) = self.0.as_mut() {
let res = std::task::ready!(std::pin::Pin::new(&mut handle).poll(cx));
std::task::Poll::Ready(res.map_err(|_| JoinError))
} else {
std::task::Poll::Ready(Err(JoinError))
}
}
}
impl<T> Drop for ActixHandle<T> {
fn drop(&mut self) {
self.0.abort();
if let Some(handle) = &self.0 {
handle.abort();
}
}
}

View file

@ -1,7 +1,7 @@
use crate::QueueHandle;
use actix_rt::time::{interval_at, Instant};
use background_jobs_core::Job;
use std::time::Duration;
use tokio::time::{interval_at, Instant};
/// A type used to schedule recurring jobs.
///

View file

@ -131,6 +131,7 @@ use tokio::sync::Notify;
mod actix_job;
mod every;
mod server;
mod spawn;
mod storage;
mod worker;
@ -148,9 +149,7 @@ impl Timer for ActixTimer {
where
F: std::future::Future + Send + Sync,
{
actix_rt::time::timeout(duration, future)
.await
.map_err(|_| ())
tokio::time::timeout(duration, future).await.map_err(|_| ())
}
}
@ -444,12 +443,12 @@ where
let extras_2 = extras.clone();
arbiter.spawn_fn(move || {
actix_rt::spawn(worker::local_worker(
queue,
processors.cached(),
server,
extras_2,
));
if let Err(e) = spawn::spawn(
"local-worker",
worker::local_worker(queue, processors.cached(), server, extras_2),
) {
tracing::error!("Failed to spawn worker {e}");
}
});
}
}
@ -496,10 +495,10 @@ impl QueueHandle {
///
/// This job will be added to it's queue on the server once every `Duration`. It will be
/// processed whenever workers are free to do so.
pub fn every<J>(&self, duration: Duration, job: J)
pub fn every<J>(&self, duration: Duration, job: J) -> std::io::Result<()>
where
J: Job + Clone + Send + 'static,
{
actix_rt::spawn(every(self.clone(), duration, job));
spawn::spawn("every", every(self.clone(), duration, job)).map(|_| ())
}
}

19
jobs-actix/src/spawn.rs Normal file
View file

@ -0,0 +1,19 @@
use std::future::Future;
use tokio::task::JoinHandle;
#[cfg(tokio_unstable)]
pub(crate) fn spawn<F>(name: &str, future: F) -> std::io::Result<JoinHandle<F::Output>>
where
F: Future + 'static,
{
tokio::task::Builder::new().name(name).spawn_local(future)
}
#[cfg(not(tokio_unstable))]
pub(crate) fn spawn<F>(name: &str, future: F) -> std::io::Result<JoinHandle<F::Output>>
where
F: Future + 'static,
{
Ok(tokio::task::spawn_local(future))
}

View file

@ -24,14 +24,21 @@ impl<State: Clone + 'static, Extras: 'static> Drop for LocalWorkerStarter<State,
let extras = self.extras.take().unwrap();
if let Ok(true) = res {
actix_rt::spawn(local_worker(
if let Err(e) = crate::spawn::spawn(
"local-worker",
local_worker(
self.queue.clone(),
self.processors.clone(),
self.server.clone(),
extras,
));
),
) {
tracing::error!("Failed to re-spawn local worker: {e}");
} else {
tracing::warn!("Not restarting worker, Arbiter is dead");
metrics::counter!("background-jobs.actix.worker.restart").increment(1);
}
} else {
tracing::info!("Shutting down worker");
drop(extras);
}
}
@ -57,8 +64,7 @@ async fn heartbeat_job<F: Future>(
runner_id: Uuid,
heartbeat_interval: u64,
) -> F::Output {
let mut interval =
actix_rt::time::interval(std::time::Duration::from_millis(heartbeat_interval));
let mut interval = tokio::time::interval(std::time::Duration::from_millis(heartbeat_interval));
let mut future = std::pin::pin!(future);
@ -86,7 +92,7 @@ async fn heartbeat_job<F: Future>(
}
async fn time_job<F: Future>(future: F, job_id: Uuid) -> F::Output {
let mut interval = actix_rt::time::interval(std::time::Duration::from_secs(5));
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
interval.tick().await;
let mut count = 0;
@ -147,7 +153,7 @@ pub(crate) async fn local_worker<State, Extras>(
let id = Uuid::now_v7();
let log_on_drop = RunOnDrop(|| {
make_span(id, &queue, "closing").in_scope(|| tracing::warn!("Worker closing"));
make_span(id, &queue, "closing").in_scope(|| tracing::info!("Worker closing"));
});
loop {

View file

@ -20,6 +20,6 @@ serde = { version = "1", features = ["derive"] }
serde_cbor = "0.11"
time = { version = "0.3", features = ["serde-human-readable"] }
thiserror = "1.0"
tokio = { version = "1", default-features = false, features = ["rt", "sync"] }
tokio = { version = "1", default-features = false, features = ["rt", "sync", "tracing"] }
tracing = "0.1"
uuid = { version = "1", features = ["v7", "serde"] }