From a612b74aeb17bfe5ef822b3c626012a1fbb32eb1 Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Thu, 12 Dec 2019 02:03:44 +0100 Subject: [PATCH] actix-multipart: Fix multipart boundary reading (#1205) * actix-multipart: Fix multipart boundary reading If we're not ready to read the first line after the multipart field (which should be a "\r\n" line) then return Pending instead of Ready(None) so that we will get called again to read that line. Without this I was getting MultipartError::Boundary from read_boundary() because it got the "\r\n" line instead of the boundary. Also tweaks the test_stream test to test partial reads. This is a forward port of #1189 from 1.0 * actix-multipart: Update changes for boundary fix --- actix-multipart/CHANGES.md | 4 ++ actix-multipart/src/server.rs | 72 +++++++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 20 deletions(-) diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md index ae6f5bf14..c0792b84c 100644 --- a/actix-multipart/CHANGES.md +++ b/actix-multipart/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [2.0.0-alpha.4] - 2019-12-xx + +* Multipart handling now handles Pending during read of boundary #1205 + ## [0.2.0-alpha.2] - 2019-12-03 * Migrate to `std::future` diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 7d1bbca46..dca94d7a2 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -610,7 +610,7 @@ impl InnerField { } match payload.readline() { - Ok(None) => Poll::Ready(None), + Ok(None) => Poll::Pending, Ok(Some(line)) => { if line.as_ref() != b"\r\n" { log::warn!("multipart field did not read all the data or it is malformed"); @@ -867,6 +867,42 @@ mod tests { (tx, rx.map(|res| res.map_err(|_| panic!()))) } + // Stream that returns from a Bytes, one char at a time and Pending every other poll() + struct SlowStream { + bytes: Bytes, + pos: usize, + ready: bool, + } + + impl SlowStream { + fn new(bytes: Bytes) -> SlowStream { + return SlowStream { + bytes: bytes, + pos: 0, + ready: false, + } + } + } + + impl Stream for SlowStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); + if !this.ready { + this.ready = true; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + if this.pos == this.bytes.len() { + return Poll::Ready(None); + } + let res = Poll::Ready(Some(Ok(this.bytes.slice(this.pos..(this.pos + 1))))); + this.pos += 1; + this.ready = false; + res + } + } fn create_simple_request_with_header() -> (Bytes, HeaderMap) { let bytes = Bytes::from( @@ -969,12 +1005,22 @@ mod tests { } } + // Loops, collecting all bytes until end-of-field + async fn get_whole_field(field: &mut Field) -> BytesMut { + let mut b = BytesMut::new(); + loop { + match field.next().await { + Some(Ok(chunk)) => b.extend_from_slice(&chunk), + None => return b, + _ => unreachable!(), + } + } + } + #[actix_rt::test] async fn test_stream() { - let (sender, payload) = create_stream(); let (bytes, headers) = create_simple_request_with_header(); - - sender.send(Ok(bytes)).unwrap(); + let payload = SlowStream::new(bytes); let mut multipart = Multipart::new(&headers, payload); match multipart.next().await.unwrap() { @@ -986,14 +1032,7 @@ mod tests { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.next().await.unwrap() { - Ok(chunk) => assert_eq!(chunk, "test"), - _ => unreachable!(), - } - match field.next().await { - None => (), - _ => unreachable!(), - } + assert_eq!(get_whole_field(&mut field).await, "test"); } _ => unreachable!(), } @@ -1003,14 +1042,7 @@ mod tests { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.next().await { - Some(Ok(chunk)) => assert_eq!(chunk, "data"), - _ => unreachable!(), - } - match field.next().await { - None => (), - _ => unreachable!(), - } + assert_eq!(get_whole_field(&mut field).await, "data"); } _ => unreachable!(), }