diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md index ca61176c7..72725bdc0 100644 --- a/actix-multipart/CHANGES.md +++ b/actix-multipart/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.5] - 2019-xx-xx + +* Multipart handling now handles NotReady during read of boundary #1189 + ## [0.1.4] - 2019-09-12 * Multipart handling now parses requests which do not end in CRLF #1038 diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index a7c787f46..960ec8b86 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -604,7 +604,7 @@ impl InnerField { } match payload.readline()? { - None => Async::Ready(None), + None => Async::NotReady, Some(line) => { if line.as_ref() != b"\r\n" { log::warn!("multipart field did not read all the data or it is malformed"); @@ -860,6 +860,42 @@ mod tests { (tx, rx.map_err(|_| panic!()).and_then(|res| res)) } + // Stream that returns from a Bytes, one char at a time and NotReady every other poll() + struct SlowStream { + bytes: Bytes, + pos: usize, + ready: bool, + } + + impl SlowStream { + fn new(bytes: Bytes) -> SlowStream { + return SlowStream { + bytes: bytes, + pos: 0, + ready: false, + } + } + } + + impl Stream for SlowStream { + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll, Self::Error> { + if !self.ready { + self.ready = true; + return Ok(Async::NotReady); + } + if self.pos == self.bytes.len() { + return Ok(Async::Ready(None)); + } + let res = Ok(Async::Ready(Some(self.bytes.slice(self.pos, self.pos + 1)))); + self.pos += 1; + self.ready = false; + res + } + } + fn create_simple_request_with_header() -> (Bytes, HeaderMap) { let bytes = Bytes::from( "testasdadsad\r\n\ @@ -965,16 +1001,38 @@ mod tests { }); } + // Retries on NotReady + fn loop_poll(stream: &mut T) -> Poll, T::Error> + where T: Stream { + loop { + let r = stream.poll(); + match r { + Ok(Async::NotReady) => continue, + _ => return r, + } + } + } + + // Loops polling, collecting all bytes until end-of-field + fn get_whole_field(field: &mut Field) -> BytesMut { + let mut b = BytesMut::new(); + loop { + match loop_poll(field) { + Ok(Async::Ready(Some(chunk))) => b.extend_from_slice(&chunk), + Ok(Async::Ready(None)) => return b, + _ => unreachable!(), + } + } + } + #[test] fn test_stream() { run_on(|| { - let (sender, payload) = create_stream(); let (bytes, headers) = create_simple_request_with_header(); - - sender.unbounded_send(Ok(bytes)).unwrap(); + let payload = SlowStream::new(bytes); let mut multipart = Multipart::new(&headers, payload); - match multipart.poll().unwrap() { + match loop_poll(&mut multipart).unwrap() { Async::Ready(Some(mut field)) => { let cd = field.content_disposition().unwrap(); assert_eq!(cd.disposition, DispositionType::FormData); @@ -983,39 +1041,20 @@ mod tests { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.poll().unwrap() { - Async::Ready(Some(chunk)) => assert_eq!(chunk, "test"), - _ => unreachable!(), - } - match field.poll().unwrap() { - Async::Ready(None) => (), - _ => unreachable!(), - } + assert_eq!(get_whole_field(&mut field), "test"); } _ => unreachable!(), } - match multipart.poll().unwrap() { + match loop_poll(&mut multipart).unwrap() { Async::Ready(Some(mut field)) => { assert_eq!(field.content_type().type_(), mime::TEXT); assert_eq!(field.content_type().subtype(), mime::PLAIN); - match field.poll() { - Ok(Async::Ready(Some(chunk))) => assert_eq!(chunk, "data"), - _ => unreachable!(), - } - match field.poll() { - Ok(Async::Ready(None)) => (), - _ => unreachable!(), - } + assert_eq!(get_whole_field(&mut field), "data"); } _ => unreachable!(), } - - match multipart.poll().unwrap() { - Async::Ready(None) => (), - _ => unreachable!(), - } }); }