From 40982d91bbe527ebae4805c934b99d98c64d7bd6 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Mon, 9 Dec 2019 10:19:59 +0100 Subject: [PATCH] actix-multipart: Fix multipart boundary reading If we're not ready to read the first line after the multipart field (which should be a "\r\n" line) then return Pending instead of Ready(None) so that we will get called again to read that line. Without this I was getting MultipartError::Boundary from read_boundary() because it got the "\r\n" line instead of the boundary. Also tweaks the test_stream test to test partial reads. This is a forward port of #1189 from 1.0 --- actix-multipart/src/server.rs | 72 +++++++++++++++++++++++++---------- 1 file changed, 52 insertions(+), 20 deletions(-) diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 7d1bbca4..dca94d7a 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -610,7 +610,7 @@ impl InnerField { } match payload.readline() { - Ok(None) => Poll::Ready(None), + Ok(None) => Poll::Pending, Ok(Some(line)) => { if line.as_ref() != b"\r\n" { log::warn!("multipart field did not read all the data or it is malformed"); @@ -867,6 +867,42 @@ mod tests { (tx, rx.map(|res| res.map_err(|_| panic!()))) } + // Stream that returns from a Bytes, one char at a time and Pending every other poll() + struct SlowStream { + bytes: Bytes, + pos: usize, + ready: bool, + } + + impl SlowStream { + fn new(bytes: Bytes) -> SlowStream { + return SlowStream { + bytes: bytes, + pos: 0, + ready: false, + } + } + } + + impl Stream for SlowStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + if !this.ready { + this.ready = true; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + if this.pos == this.bytes.len() { + return Poll::Ready(None); + } + let res = Poll::Ready(Some(Ok(this.bytes.slice(this.pos..(this.pos + 1))))); + this.pos += 1; + this.ready = false; + res + } + } fn create_simple_request_with_header() -> (Bytes, HeaderMap) { let bytes = Bytes::from( @@ -969,12 +1005,22 @@ mod tests { } } + // Loops, collecting all bytes until end-of-field + async fn get_whole_field(field: &mut Field) -> BytesMut { + let mut b = BytesMut::new(); + loop { + match field.next().await { + Some(Ok(chunk)) => b.extend_from_slice(&chunk), + None => return b, + _ => unreachable!(), + } + } + } + #[actix_rt::test] async fn test_stream() { - let (sender, payload) = create_stream(); let (bytes, headers) = create_simple_request_with_header(); - - sender.send(Ok(bytes)).unwrap(); + let payload = SlowStream::new(bytes); let mut multipart = Multipart::new(&headers, payload); match multipart.next().await.unwrap() { @@ -986,14 +1032,7 @@ mod tests { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.next().await.unwrap() { - Ok(chunk) => assert_eq!(chunk, "test"), - _ => unreachable!(), - } - match field.next().await { - None => (), - _ => unreachable!(), - } + assert_eq!(get_whole_field(&mut field).await, "test"); } _ => unreachable!(), } @@ -1003,14 +1042,7 @@ mod tests { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.next().await { - Some(Ok(chunk)) => assert_eq!(chunk, "data"), - _ => unreachable!(), - } - match field.next().await { - None => (), - _ => unreachable!(), - } + assert_eq!(get_whole_field(&mut field).await, "data"); } _ => unreachable!(), }