diff --git a/jobs-core/Cargo.toml b/jobs-core/Cargo.toml index d107691..be58750 100644 --- a/jobs-core/Cargo.toml +++ b/jobs-core/Cargo.toml @@ -10,13 +10,13 @@ readme = "../README.md" edition = "2018" [features] -default = [] -with-actix = ["actix-rt", "tokio"] +default = ["error-logging"] +with-actix = ["actix-rt"] completion-logging = [] error-logging = [] [dependencies] -actix-rt = { version = "2.2.0", optional = true } +actix-rt = { version = "2.3.0", optional = true } anyhow = "1.0" async-mutex = "1.0.1" async-trait = "0.1.24" @@ -26,5 +26,4 @@ tracing-futures = "0.2.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -tokio = { version = "1", optional = true, default-features = false, features = ["sync"] } uuid = { version = "0.8.1", features = ["serde", "v4"] } diff --git a/jobs-core/src/actix_job.rs b/jobs-core/src/actix_job.rs index 70e6aac..efd3cc6 100644 --- a/jobs-core/src/actix_job.rs +++ b/jobs-core/src/actix_job.rs @@ -1,9 +1,14 @@ use crate::{Backoff, Job, MaxRetries}; +use actix_rt::task::JoinHandle; use anyhow::Error; use serde::{de::DeserializeOwned, ser::Serialize}; -use std::{future::Future, pin::Pin}; -use tokio::sync::oneshot; -use tracing::{error, Span}; +use std::{ + fmt::Debug, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tracing::Span; use tracing_futures::Instrument; /// The ActixJob trait defines parameters pertaining to an instance of background job @@ -96,12 +101,27 @@ pub trait ActixJob: Serialize + DeserializeOwned + 'static { } } +#[doc(hidden)] +pub struct UnwrapFuture(F); + +impl Future for UnwrapFuture +where + F: Future> + Unpin, + E: Debug, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.0).poll(cx).map(|res| res.unwrap()) + } +} + impl Job for T where T: ActixJob + std::panic::UnwindSafe, { type State = T::State; - type Future = Pin> + Send>>; + type Future = UnwrapFuture>>; const NAME: &'static str = ::NAME; const QUEUE: &'static str = ::QUEUE; @@ -110,24 +130,11 @@ where const TIMEOUT: i64 = ::TIMEOUT; fn run(self, state: Self::State) -> Self::Future { - let (tx, rx) = oneshot::channel(); - let fut = ActixJob::run(self, state); - let handle = actix_rt::spawn( - async move { - let result = fut.await; + let instrumented = fut.instrument(Span::current()); + let handle = actix_rt::spawn(instrumented); - if tx.send(result).is_err() { - error!("Job dropped"); - } - } - .instrument(Span::current()), - ); - - Box::pin(async move { - handle.await.unwrap(); - rx.await? - }) + UnwrapFuture(handle) } fn span(&self) -> Option {