Use JoinHandle to transfer result

This commit is contained in:
Aode (Lion) 2021-10-11 18:49:39 -05:00
parent 461c9e5ed2
commit e6ecf3aa67
2 changed files with 30 additions and 24 deletions

View file

@ -10,13 +10,13 @@ readme = "../README.md"
edition = "2018" edition = "2018"
[features] [features]
default = [] default = ["error-logging"]
with-actix = ["actix-rt", "tokio"] with-actix = ["actix-rt"]
completion-logging = [] completion-logging = []
error-logging = [] error-logging = []
[dependencies] [dependencies]
actix-rt = { version = "2.2.0", optional = true } actix-rt = { version = "2.3.0", optional = true }
anyhow = "1.0" anyhow = "1.0"
async-mutex = "1.0.1" async-mutex = "1.0.1"
async-trait = "0.1.24" async-trait = "0.1.24"
@ -26,5 +26,4 @@ tracing-futures = "0.2.5"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", optional = true, default-features = false, features = ["sync"] }
uuid = { version = "0.8.1", features = ["serde", "v4"] } uuid = { version = "0.8.1", features = ["serde", "v4"] }

View file

@ -1,9 +1,14 @@
use crate::{Backoff, Job, MaxRetries}; use crate::{Backoff, Job, MaxRetries};
use actix_rt::task::JoinHandle;
use anyhow::Error; use anyhow::Error;
use serde::{de::DeserializeOwned, ser::Serialize}; use serde::{de::DeserializeOwned, ser::Serialize};
use std::{future::Future, pin::Pin}; use std::{
use tokio::sync::oneshot; fmt::Debug,
use tracing::{error, Span}; future::Future,
pin::Pin,
task::{Context, Poll},
};
use tracing::Span;
use tracing_futures::Instrument; use tracing_futures::Instrument;
/// The ActixJob trait defines parameters pertaining to an instance of background job /// The ActixJob trait defines parameters pertaining to an instance of background job
@ -96,12 +101,27 @@ pub trait ActixJob: Serialize + DeserializeOwned + 'static {
} }
} }
#[doc(hidden)]
pub struct UnwrapFuture<F>(F);
impl<F, T, E> Future for UnwrapFuture<F>
where
F: Future<Output = Result<T, E>> + Unpin,
E: Debug,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx).map(|res| res.unwrap())
}
}
impl<T> Job for T impl<T> Job for T
where where
T: ActixJob + std::panic::UnwindSafe, T: ActixJob + std::panic::UnwindSafe,
{ {
type State = T::State; type State = T::State;
type Future = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>; type Future = UnwrapFuture<JoinHandle<Result<(), Error>>>;
const NAME: &'static str = <Self as ActixJob>::NAME; const NAME: &'static str = <Self as ActixJob>::NAME;
const QUEUE: &'static str = <Self as ActixJob>::QUEUE; const QUEUE: &'static str = <Self as ActixJob>::QUEUE;
@ -110,24 +130,11 @@ where
const TIMEOUT: i64 = <Self as ActixJob>::TIMEOUT; const TIMEOUT: i64 = <Self as ActixJob>::TIMEOUT;
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
let (tx, rx) = oneshot::channel();
let fut = ActixJob::run(self, state); let fut = ActixJob::run(self, state);
let handle = actix_rt::spawn( let instrumented = fut.instrument(Span::current());
async move { let handle = actix_rt::spawn(instrumented);
let result = fut.await;
if tx.send(result).is_err() { UnwrapFuture(handle)
error!("Job dropped");
}
}
.instrument(Span::current()),
);
Box::pin(async move {
handle.await.unwrap();
rx.await?
})
} }
fn span(&self) -> Option<Span> { fn span(&self) -> Option<Span> {