diff --git a/Cargo.toml b/Cargo.toml index 6c6face79..6c0f0bc8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ members = [ "actix-framed", "actix-session", "actix-identity", - #"actix-multipart", + "actix-multipart", "actix-web-actors", "actix-web-codegen", "test-server", @@ -125,9 +125,9 @@ actix-http = { path = "actix-http" } actix-http-test = { path = "test-server" } actix-web-codegen = { path = "actix-web-codegen" } # actix-web-actors = { path = "actix-web-actors" } -# actix-session = { path = "actix-session" } +actix-session = { path = "actix-session" } actix-files = { path = "actix-files" } -# actix-multipart = { path = "actix-multipart" } +actix-multipart = { path = "actix-multipart" } awc = { path = "awc" } actix-codec = { git = "https://github.com/actix/actix-net.git" } diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index aa4e9be20..f5cdc8afd 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -18,17 +18,18 @@ name = "actix_multipart" path = "src/lib.rs" [dependencies] -actix-web = { version = "1.0.9", default-features = false } -actix-service = "0.4.2" +actix-web = { version = "2.0.0-alpha.1", default-features = false } +actix-service = "1.0.0-alpha.1" +actix-utils = "0.5.0-alpha.1" bytes = "0.4" derive_more = "0.15.0" httparse = "1.3" -futures = "0.1.24" +futures = "0.3.1" log = "0.4" mime = "0.3" time = "0.1" twoway = "0.2" [dev-dependencies] -actix-rt = "0.2.2" -actix-http = "0.2.11" \ No newline at end of file +actix-rt = "1.0.0-alpha.1" +actix-http = "0.3.0-alpha.1" \ No newline at end of file diff --git a/actix-multipart/src/extractor.rs b/actix-multipart/src/extractor.rs index 7274ed092..71c815227 100644 --- a/actix-multipart/src/extractor.rs +++ b/actix-multipart/src/extractor.rs @@ -1,5 +1,6 @@ //! Multipart payload support use actix_web::{dev::Payload, Error, FromRequest, HttpRequest}; +use futures::future::{ok, Ready}; use crate::server::Multipart; @@ -10,33 +11,31 @@ use crate::server::Multipart; /// ## Server example /// /// ```rust -/// # use futures::{Future, Stream}; -/// # use futures::future::{ok, result, Either}; +/// use futures::{Stream, StreamExt}; /// use actix_web::{web, HttpResponse, Error}; /// use actix_multipart as mp; /// -/// fn index(payload: mp::Multipart) -> impl Future { -/// payload.from_err() // <- get multipart stream for current request -/// .and_then(|field| { // <- iterate over multipart items +/// async fn index(mut payload: mp::Multipart) -> Result { +/// // iterate over multipart stream +/// while let Some(item) = payload.next().await { +/// let mut field = item?; +/// /// // Field in turn is stream of *Bytes* object -/// field.from_err() -/// .fold((), |_, chunk| { -/// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk)); -/// Ok::<_, Error>(()) -/// }) -/// }) -/// .fold((), |_, _| Ok::<_, Error>(())) -/// .map(|_| HttpResponse::Ok().into()) +/// while let Some(chunk) = field.next().await { +/// println!("-- CHUNK: \n{:?}", std::str::from_utf8(&chunk?)); +/// } +/// } +/// Ok(HttpResponse::Ok().into()) /// } /// # fn main() {} /// ``` impl FromRequest for Multipart { type Error = Error; - type Future = Result; + type Future = Ready>; type Config = (); #[inline] fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future { - Ok(Multipart::new(req.headers(), payload.take())) + ok(Multipart::new(req.headers(), payload.take())) } } diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index a7c787f46..dd7852c8e 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -1,15 +1,17 @@ //! Multipart payload support use std::cell::{Cell, RefCell, RefMut}; use std::marker::PhantomData; +use std::pin::Pin; use std::rc::Rc; +use std::task::{Context, Poll}; use std::{cmp, fmt}; use bytes::{Bytes, BytesMut}; -use futures::task::{current as current_task, Task}; -use futures::{Async, Poll, Stream}; +use futures::stream::{LocalBoxStream, Stream, StreamExt}; use httparse; use mime; +use actix_utils::task::LocalWaker; use actix_web::error::{ParseError, PayloadError}; use actix_web::http::header::{ self, ContentDisposition, HeaderMap, HeaderName, HeaderValue, @@ -60,7 +62,7 @@ impl Multipart { /// Create multipart instance for boundary. pub fn new(headers: &HeaderMap, stream: S) -> Multipart where - S: Stream + 'static, + S: Stream> + Unpin + 'static, { match Self::boundary(headers) { Ok(boundary) => Multipart { @@ -104,22 +106,25 @@ impl Multipart { } impl Stream for Multipart { - type Item = Field; - type Error = MultipartError; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { if let Some(err) = self.error.take() { - Err(err) + Poll::Ready(Some(Err(err))) } else if self.safety.current() { - let mut inner = self.inner.as_mut().unwrap().borrow_mut(); - if let Some(mut payload) = inner.payload.get_mut(&self.safety) { - payload.poll_stream()?; + let this = self.get_mut(); + let mut inner = this.inner.as_mut().unwrap().borrow_mut(); + if let Some(mut payload) = inner.payload.get_mut(&this.safety) { + payload.poll_stream(cx)?; } - inner.poll(&self.safety) + inner.poll(&this.safety, cx) } else if !self.safety.is_clean() { - Err(MultipartError::NotConsumed) + Poll::Ready(Some(Err(MultipartError::NotConsumed))) } else { - Ok(Async::NotReady) + Poll::Pending } } } @@ -238,9 +243,13 @@ impl InnerMultipart { Ok(Some(eof)) } - fn poll(&mut self, safety: &Safety) -> Poll, MultipartError> { + fn poll( + &mut self, + safety: &Safety, + cx: &mut Context, + ) -> Poll>> { if self.state == InnerState::Eof { - Ok(Async::Ready(None)) + Poll::Ready(None) } else { // release field loop { @@ -249,10 +258,13 @@ impl InnerMultipart { if safety.current() { let stop = match self.item { InnerMultipartItem::Field(ref mut field) => { - match field.borrow_mut().poll(safety)? { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(Some(_)) => continue, - Async::Ready(None) => true, + match field.borrow_mut().poll(safety) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(Ok(_))) => continue, + Poll::Ready(Some(Err(e))) => { + return Poll::Ready(Some(Err(e))) + } + Poll::Ready(None) => true, } } InnerMultipartItem::None => false, @@ -277,12 +289,12 @@ impl InnerMultipart { Some(eof) => { if eof { self.state = InnerState::Eof; - return Ok(Async::Ready(None)); + return Poll::Ready(None); } else { self.state = InnerState::Headers; } } - None => return Ok(Async::NotReady), + None => return Poll::Pending, } } // read boundary @@ -291,11 +303,11 @@ impl InnerMultipart { &mut *payload, &self.boundary, )? { - None => return Ok(Async::NotReady), + None => return Poll::Pending, Some(eof) => { if eof { self.state = InnerState::Eof; - return Ok(Async::Ready(None)); + return Poll::Ready(None); } else { self.state = InnerState::Headers; } @@ -311,14 +323,14 @@ impl InnerMultipart { self.state = InnerState::Boundary; headers } else { - return Ok(Async::NotReady); + return Poll::Pending; } } else { unreachable!() } } else { log::debug!("NotReady: field is in flight"); - return Ok(Async::NotReady); + return Poll::Pending; }; // content type @@ -335,7 +347,7 @@ impl InnerMultipart { // nested multipart stream if mt.type_() == mime::MULTIPART { - Err(MultipartError::Nested) + Poll::Ready(Some(Err(MultipartError::Nested))) } else { let field = Rc::new(RefCell::new(InnerField::new( self.payload.clone(), @@ -344,12 +356,7 @@ impl InnerMultipart { )?)); self.item = InnerMultipartItem::Field(Rc::clone(&field)); - Ok(Async::Ready(Some(Field::new( - safety.clone(), - headers, - mt, - field, - )))) + Poll::Ready(Some(Ok(Field::new(safety.clone(cx), headers, mt, field)))) } } } @@ -409,23 +416,21 @@ impl Field { } impl Stream for Field { - type Item = Bytes; - type Error = MultipartError; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if self.safety.current() { let mut inner = self.inner.borrow_mut(); if let Some(mut payload) = inner.payload.as_ref().unwrap().get_mut(&self.safety) { - payload.poll_stream()?; + payload.poll_stream(cx)?; } - inner.poll(&self.safety) } else if !self.safety.is_clean() { - Err(MultipartError::NotConsumed) + Poll::Ready(Some(Err(MultipartError::NotConsumed))) } else { - Ok(Async::NotReady) + Poll::Pending } } } @@ -482,9 +487,9 @@ impl InnerField { fn read_len( payload: &mut PayloadBuffer, size: &mut u64, - ) -> Poll, MultipartError> { + ) -> Poll>> { if *size == 0 { - Ok(Async::Ready(None)) + Poll::Ready(None) } else { match payload.read_max(*size)? { Some(mut chunk) => { @@ -494,13 +499,13 @@ impl InnerField { if !chunk.is_empty() { payload.unprocessed(chunk); } - Ok(Async::Ready(Some(ch))) + Poll::Ready(Some(Ok(ch))) } None => { if payload.eof && (*size != 0) { - Err(MultipartError::Incomplete) + Poll::Ready(Some(Err(MultipartError::Incomplete))) } else { - Ok(Async::NotReady) + Poll::Pending } } } @@ -512,15 +517,15 @@ impl InnerField { fn read_stream( payload: &mut PayloadBuffer, boundary: &str, - ) -> Poll, MultipartError> { + ) -> Poll>> { let mut pos = 0; let len = payload.buf.len(); if len == 0 { return if payload.eof { - Err(MultipartError::Incomplete) + Poll::Ready(Some(Err(MultipartError::Incomplete))) } else { - Ok(Async::NotReady) + Poll::Pending }; } @@ -537,10 +542,10 @@ impl InnerField { if let Some(b_len) = b_len { let b_size = boundary.len() + b_len; if len < b_size { - return Ok(Async::NotReady); + return Poll::Pending; } else if &payload.buf[b_len..b_size] == boundary.as_bytes() { // found boundary - return Ok(Async::Ready(None)); + return Poll::Ready(None); } } } @@ -552,9 +557,9 @@ impl InnerField { // check if we have enough data for boundary detection if cur + 4 > len { if cur > 0 { - Ok(Async::Ready(Some(payload.buf.split_to(cur).freeze()))) + Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze()))) } else { - Ok(Async::NotReady) + Poll::Pending } } else { // check boundary @@ -565,7 +570,7 @@ impl InnerField { { if cur != 0 { // return buffer - Ok(Async::Ready(Some(payload.buf.split_to(cur).freeze()))) + Poll::Ready(Some(Ok(payload.buf.split_to(cur).freeze()))) } else { pos = cur + 1; continue; @@ -577,49 +582,51 @@ impl InnerField { } } } else { - Ok(Async::Ready(Some(payload.buf.take().freeze()))) + Poll::Ready(Some(Ok(payload.buf.take().freeze()))) }; } } - fn poll(&mut self, s: &Safety) -> Poll, MultipartError> { + fn poll(&mut self, s: &Safety) -> Poll>> { if self.payload.is_none() { - return Ok(Async::Ready(None)); + return Poll::Ready(None); } let result = if let Some(mut payload) = self.payload.as_ref().unwrap().get_mut(s) { if !self.eof { let res = if let Some(ref mut len) = self.length { - InnerField::read_len(&mut *payload, len)? + InnerField::read_len(&mut *payload, len) } else { - InnerField::read_stream(&mut *payload, &self.boundary)? + InnerField::read_stream(&mut *payload, &self.boundary) }; match res { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(Some(bytes)) => return Ok(Async::Ready(Some(bytes))), - Async::Ready(None) => self.eof = true, + Poll::Pending => return Poll::Pending, + Poll::Ready(Some(Ok(bytes))) => return Poll::Ready(Some(Ok(bytes))), + Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Ready(None) => self.eof = true, } } - match payload.readline()? { - None => Async::Ready(None), - Some(line) => { + match payload.readline() { + Ok(None) => Poll::Ready(None), + Ok(Some(line)) => { if line.as_ref() != b"\r\n" { log::warn!("multipart field did not read all the data or it is malformed"); } - Async::Ready(None) + Poll::Ready(None) } + Err(e) => Poll::Ready(Some(Err(e))), } } else { - Async::NotReady + Poll::Pending }; - if Async::Ready(None) == result { + if let Poll::Ready(None) = result { self.payload.take(); } - Ok(result) + result } } @@ -659,7 +666,7 @@ impl Clone for PayloadRef { /// most task. #[derive(Debug)] struct Safety { - task: Option, + task: LocalWaker, level: usize, payload: Rc>, clean: Rc>, @@ -669,7 +676,7 @@ impl Safety { fn new() -> Safety { let payload = Rc::new(PhantomData); Safety { - task: None, + task: LocalWaker::new(), level: Rc::strong_count(&payload), clean: Rc::new(Cell::new(true)), payload, @@ -683,17 +690,17 @@ impl Safety { fn is_clean(&self) -> bool { self.clean.get() } -} -impl Clone for Safety { - fn clone(&self) -> Safety { + fn clone(&self, cx: &mut Context) -> Safety { let payload = Rc::clone(&self.payload); - Safety { - task: Some(current_task()), + let s = Safety { + task: LocalWaker::new(), level: Rc::strong_count(&payload), clean: self.clean.clone(), payload, - } + }; + s.task.register(cx.waker()); + s } } @@ -704,7 +711,7 @@ impl Drop for Safety { self.clean.set(true); } if let Some(task) = self.task.take() { - task.notify() + task.wake() } } } @@ -713,31 +720,32 @@ impl Drop for Safety { struct PayloadBuffer { eof: bool, buf: BytesMut, - stream: Box>, + stream: LocalBoxStream<'static, Result>, } impl PayloadBuffer { /// Create new `PayloadBuffer` instance fn new(stream: S) -> Self where - S: Stream + 'static, + S: Stream> + 'static, { PayloadBuffer { eof: false, buf: BytesMut::new(), - stream: Box::new(stream), + stream: stream.boxed_local(), } } - fn poll_stream(&mut self) -> Result<(), PayloadError> { + fn poll_stream(&mut self, cx: &mut Context) -> Result<(), PayloadError> { loop { - match self.stream.poll()? { - Async::Ready(Some(data)) => self.buf.extend_from_slice(&data), - Async::Ready(None) => { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Some(Ok(data))) => self.buf.extend_from_slice(&data), + Poll::Ready(Some(Err(e))) => return Err(e), + Poll::Ready(None) => { self.eof = true; return Ok(()); } - Async::NotReady => return Ok(()), + Poll::Pending => return Ok(()), } } } @@ -800,13 +808,14 @@ impl PayloadBuffer { #[cfg(test)] mod tests { - use actix_http::h1::Payload; - use bytes::Bytes; - use futures::unsync::mpsc; - use super::*; + + use actix_http::h1::Payload; + use actix_utils::mpsc; use actix_web::http::header::{DispositionParam, DispositionType}; - use actix_web::test::run_on; + use actix_web::test::block_on; + use bytes::Bytes; + use futures::future::lazy; #[test] fn test_boundary() { @@ -852,12 +861,12 @@ mod tests { } fn create_stream() -> ( - mpsc::UnboundedSender>, - impl Stream, + mpsc::Sender>, + impl Stream>, ) { - let (tx, rx) = mpsc::unbounded(); + let (tx, rx) = mpsc::channel(); - (tx, rx.map_err(|_| panic!()).and_then(|res| res)) + (tx, rx.map(|res| res.map_err(|_| panic!()))) } fn create_simple_request_with_header() -> (Bytes, HeaderMap) { @@ -884,28 +893,28 @@ mod tests { #[test] fn test_multipart_no_end_crlf() { - run_on(|| { + block_on(async { let (sender, payload) = create_stream(); let (bytes, headers) = create_simple_request_with_header(); let bytes_stripped = bytes.slice_to(bytes.len()); // strip crlf - sender.unbounded_send(Ok(bytes_stripped)).unwrap(); + sender.send(Ok(bytes_stripped)).unwrap(); drop(sender); // eof let mut multipart = Multipart::new(&headers, payload); - match multipart.poll().unwrap() { - Async::Ready(Some(_)) => (), + match multipart.next().await.unwrap() { + Ok(_) => (), _ => unreachable!(), } - match multipart.poll().unwrap() { - Async::Ready(Some(_)) => (), + match multipart.next().await.unwrap() { + Ok(_) => (), _ => unreachable!(), } - match multipart.poll().unwrap() { - Async::Ready(None) => (), + match multipart.next().await { + None => (), _ => unreachable!(), } }) @@ -913,15 +922,15 @@ mod tests { #[test] fn test_multipart() { - run_on(|| { + block_on(async { let (sender, payload) = create_stream(); let (bytes, headers) = create_simple_request_with_header(); - sender.unbounded_send(Ok(bytes)).unwrap(); + sender.send(Ok(bytes)).unwrap(); let mut multipart = Multipart::new(&headers, payload); - match multipart.poll().unwrap() { - Async::Ready(Some(mut field)) => { + match multipart.next().await { + Some(Ok(mut field)) => { let cd = field.content_disposition().unwrap(); assert_eq!(cd.disposition, DispositionType::FormData); assert_eq!(cd.parameters[0], DispositionParam::Name("file".into())); @@ -929,37 +938,37 @@ mod tests { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.poll().unwrap() { - Async::Ready(Some(chunk)) => assert_eq!(chunk, "test"), + match field.next().await.unwrap() { + Ok(chunk) => assert_eq!(chunk, "test"), _ => unreachable!(), } - match field.poll().unwrap() { - Async::Ready(None) => (), + match field.next().await { + None => (), _ => unreachable!(), } } _ => unreachable!(), } - match multipart.poll().unwrap() { - Async::Ready(Some(mut field)) => { + match multipart.next().await.unwrap() { + Ok(mut field) => { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.poll() { - Ok(Async::Ready(Some(chunk))) => assert_eq!(chunk, "data"), + match field.next().await { + Some(Ok(chunk)) => assert_eq!(chunk, "data"), _ => unreachable!(), } - match field.poll() { - Ok(Async::Ready(None)) => (), + match field.next().await { + None => (), _ => unreachable!(), } } _ => unreachable!(), } - match multipart.poll().unwrap() { - Async::Ready(None) => (), + match multipart.next().await { + None => (), _ => unreachable!(), } }); @@ -967,15 +976,15 @@ mod tests { #[test] fn test_stream() { - run_on(|| { + block_on(async { let (sender, payload) = create_stream(); let (bytes, headers) = create_simple_request_with_header(); - sender.unbounded_send(Ok(bytes)).unwrap(); + sender.send(Ok(bytes)).unwrap(); let mut multipart = Multipart::new(&headers, payload); - match multipart.poll().unwrap() { - Async::Ready(Some(mut field)) => { + match multipart.next().await.unwrap() { + Ok(mut field) => { let cd = field.content_disposition().unwrap(); assert_eq!(cd.disposition, DispositionType::FormData); assert_eq!(cd.parameters[0], DispositionParam::Name("file".into())); @@ -983,37 +992,37 @@ mod tests { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.poll().unwrap() { - Async::Ready(Some(chunk)) => assert_eq!(chunk, "test"), + match field.next().await.unwrap() { + Ok(chunk) => assert_eq!(chunk, "test"), _ => unreachable!(), } - match field.poll().unwrap() { - Async::Ready(None) => (), + match field.next().await { + None => (), _ => unreachable!(), } } _ => unreachable!(), } - match multipart.poll().unwrap() { - Async::Ready(Some(mut field)) => { + match multipart.next().await { + Some(Ok(mut field)) => { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.poll() { - Ok(Async::Ready(Some(chunk))) => assert_eq!(chunk, "data"), + match field.next().await { + Some(Ok(chunk)) => assert_eq!(chunk, "data"), _ => unreachable!(), } - match field.poll() { - Ok(Async::Ready(None)) => (), + match field.next().await { + None => (), _ => unreachable!(), } } _ => unreachable!(), } - match multipart.poll().unwrap() { - Async::Ready(None) => (), + match multipart.next().await { + None => (), _ => unreachable!(), } }); @@ -1021,26 +1030,26 @@ mod tests { #[test] fn test_basic() { - run_on(|| { + block_on(async { let (_, payload) = Payload::create(false); let mut payload = PayloadBuffer::new(payload); assert_eq!(payload.buf.len(), 0); - payload.poll_stream().unwrap(); + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); assert_eq!(None, payload.read_max(1).unwrap()); }) } #[test] fn test_eof() { - run_on(|| { + block_on(async { let (mut sender, payload) = Payload::create(false); let mut payload = PayloadBuffer::new(payload); assert_eq!(None, payload.read_max(4).unwrap()); sender.feed_data(Bytes::from("data")); sender.feed_eof(); - payload.poll_stream().unwrap(); + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); assert_eq!(Some(Bytes::from("data")), payload.read_max(4).unwrap()); assert_eq!(payload.buf.len(), 0); @@ -1051,24 +1060,24 @@ mod tests { #[test] fn test_err() { - run_on(|| { + block_on(async { let (mut sender, payload) = Payload::create(false); let mut payload = PayloadBuffer::new(payload); assert_eq!(None, payload.read_max(1).unwrap()); sender.set_error(PayloadError::Incomplete(None)); - payload.poll_stream().err().unwrap(); + lazy(|cx| payload.poll_stream(cx)).await.err().unwrap(); }) } #[test] fn test_readmax() { - run_on(|| { + block_on(async { let (mut sender, payload) = Payload::create(false); let mut payload = PayloadBuffer::new(payload); sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); - payload.poll_stream().unwrap(); + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); assert_eq!(payload.buf.len(), 10); assert_eq!(Some(Bytes::from("line1")), payload.read_max(5).unwrap()); @@ -1081,7 +1090,7 @@ mod tests { #[test] fn test_readexactly() { - run_on(|| { + block_on(async { let (mut sender, payload) = Payload::create(false); let mut payload = PayloadBuffer::new(payload); @@ -1089,7 +1098,7 @@ mod tests { sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); - payload.poll_stream().unwrap(); + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); assert_eq!(Some(Bytes::from_static(b"li")), payload.read_exact(2)); assert_eq!(payload.buf.len(), 8); @@ -1101,7 +1110,7 @@ mod tests { #[test] fn test_readuntil() { - run_on(|| { + block_on(async { let (mut sender, payload) = Payload::create(false); let mut payload = PayloadBuffer::new(payload); @@ -1109,7 +1118,7 @@ mod tests { sender.feed_data(Bytes::from("line1")); sender.feed_data(Bytes::from("line2")); - payload.poll_stream().unwrap(); + lazy(|cx| payload.poll_stream(cx)).await.unwrap(); assert_eq!( Some(Bytes::from("line")),