diff --git a/src/types/either.rs b/src/types/either.rs index 210495e47..d3b003587 100644 --- a/src/types/either.rs +++ b/src/types/either.rs @@ -1,8 +1,14 @@ //! For either helper, see [`Either`]. +use std::{ + future::Future, + mem, + pin::Pin, + task::{Context, Poll}, +}; + use bytes::Bytes; -use futures_core::future::LocalBoxFuture; -use futures_util::{FutureExt as _, TryFutureExt as _}; +use futures_core::ready; use crate::{ dev, @@ -180,41 +186,111 @@ where R: FromRequest + 'static, { type Error = EitherExtractError; - type Future = LocalBoxFuture<'static, Result>; + type Future = EitherExtractFut; type Config = (); fn from_request(req: &HttpRequest, payload: &mut dev::Payload) -> Self::Future { - let req2 = req.clone(); - - Bytes::from_request(req, payload) - .map_err(EitherExtractError::Bytes) - .and_then(|bytes| bytes_to_l_or_r(req2, bytes)) - .boxed_local() + EitherExtractFut { + req: req.clone(), + state: EitherExtractState::Bytes { + bytes: Bytes::from_request(req, payload), + }, + } } } -async fn bytes_to_l_or_r( - req: HttpRequest, - bytes: Bytes, -) -> Result, EitherExtractError> +#[pin_project::pin_project] +pub struct EitherExtractFut where - L: FromRequest + 'static, - R: FromRequest + 'static, + R: FromRequest, + L: FromRequest, { - let fallback = bytes.clone(); - let a_err; + req: HttpRequest, + #[pin] + state: EitherExtractState, +} - let mut pl = payload_from_bytes(bytes); - match L::from_request(&req, &mut pl).await { - Ok(a_data) => return Ok(Either::Left(a_data)), - // store A's error for returning if B also fails - Err(err) => a_err = err, - }; +#[pin_project::pin_project(project = EitherExtractProj)] +pub enum EitherExtractState +where + L: FromRequest, + R: FromRequest, +{ + Bytes { + #[pin] + bytes: ::Future, + }, + Left { + #[pin] + left: L::Future, + fallback: Bytes, + }, + Right { + #[pin] + right: R::Future, + left_err: Option, + }, +} - let mut pl = payload_from_bytes(fallback); - match R::from_request(&req, &mut pl).await { - Ok(b_data) => return Ok(Either::Right(b_data)), - Err(b_err) => Err(EitherExtractError::Extract(a_err, b_err)), +impl Future for EitherExtractFut +where + L: FromRequest, + R: FromRequest, + LF: Future> + 'static, + RF: Future> + 'static, + LE: Into, + RE: Into, +{ + type Output = Result, EitherExtractError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let ready = loop { + let next = match this.state.as_mut().project() { + EitherExtractProj::Bytes { bytes } => { + let res = ready!(bytes.poll(cx)); + match res { + Ok(bytes) => { + let fallback = bytes.clone(); + let left = + L::from_request(&this.req, &mut payload_from_bytes(bytes)); + EitherExtractState::Left { left, fallback } + } + Err(err) => break Err(EitherExtractError::Bytes(err)), + } + } + EitherExtractProj::Left { left, fallback } => { + let res = ready!(left.poll(cx)); + match res { + Ok(extracted) => break Ok(Either::Left(extracted)), + Err(left_err) => { + let right = R::from_request( + &this.req, + &mut payload_from_bytes(mem::take(fallback)), + ); + EitherExtractState::Right { + left_err: Some(left_err), + right, + } + } + } + } + EitherExtractProj::Right { right, left_err } => { + let res = ready!(right.poll(cx)); + match res { + Ok(data) => break Ok(Either::Right(data)), + Err(err) => { + break Err(EitherExtractError::Extract( + left_err.take().unwrap(), + err, + )); + } + } + } + }; + this.state.set(next); + }; + Poll::Ready(ready) } }