diff --git a/actix-http/src/header/shared/charset.rs b/actix-http/src/header/shared/charset.rs index b482f6bce..3cc0f3e23 100644 --- a/actix-http/src/header/shared/charset.rs +++ b/actix-http/src/header/shared/charset.rs @@ -88,7 +88,7 @@ impl Charset { Iso_8859_8_E => "ISO-8859-8-E", Iso_8859_8_I => "ISO-8859-8-I", Gb2312 => "GB2312", - Big5 => "big5", + Big5 => "Big5", Koi8_R => "KOI8-R", Ext(ref s) => s, } @@ -128,7 +128,7 @@ impl FromStr for Charset { "ISO-8859-8-E" => Iso_8859_8_E, "ISO-8859-8-I" => Iso_8859_8_I, "GB2312" => Gb2312, - "big5" => Big5, + "BIG5" => Big5, "KOI8-R" => Koi8_R, s => Ext(s.to_owned()), }) diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index b1dea4832..5376ba128 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -20,7 +20,6 @@ actix-utils = "3.0.0" bytes = "1" derive_more = "0.99.5" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } -futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } httparse = "1.3" local-waker = "0.1" log = "0.4" @@ -30,5 +29,6 @@ twoway = "0.2" [dev-dependencies] actix-rt = "2.2" actix-http = "3.0.0-beta.13" +futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 43f9ccf5f..c9642cfad 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -17,7 +17,6 @@ use actix_web::{ }; use bytes::{Bytes, BytesMut}; use futures_core::stream::{LocalBoxStream, Stream}; -use futures_util::stream::StreamExt as _; use local_waker::LocalWaker; use crate::error::MultipartError; @@ -33,7 +32,7 @@ const MAX_HEADERS: usize = 32; pub struct Multipart { safety: Safety, error: Option, - inner: Option>>, + inner: Option, } enum InnerMultipartItem { @@ -67,7 +66,7 @@ impl Multipart { /// Create multipart instance for boundary. pub fn new(headers: &HeaderMap, stream: S) -> Multipart where - S: Stream> + Unpin + 'static, + S: Stream> + 'static, { match Self::boundary(headers) { Ok(boundary) => Multipart::from_boundary(boundary, stream), @@ -77,39 +76,32 @@ impl Multipart { /// Extract boundary info from headers. pub(crate) fn boundary(headers: &HeaderMap) -> Result { - if let Some(content_type) = headers.get(&header::CONTENT_TYPE) { - if let Ok(content_type) = content_type.to_str() { - if let Ok(ct) = content_type.parse::() { - if let Some(boundary) = ct.get_param(mime::BOUNDARY) { - Ok(boundary.as_str().to_owned()) - } else { - Err(MultipartError::Boundary) - } - } else { - Err(MultipartError::ParseContentType) - } - } else { - Err(MultipartError::ParseContentType) - } - } else { - Err(MultipartError::NoContentType) - } + headers + .get(&header::CONTENT_TYPE) + .ok_or(MultipartError::NoContentType)? + .to_str() + .ok() + .and_then(|content_type| content_type.parse::().ok()) + .ok_or(MultipartError::ParseContentType)? + .get_param(mime::BOUNDARY) + .map(|boundary| boundary.as_str().to_owned()) + .ok_or(MultipartError::Boundary) } /// Create multipart instance for given boundary and stream pub(crate) fn from_boundary(boundary: String, stream: S) -> Multipart where - S: Stream> + Unpin + 'static, + S: Stream> + 'static, { Multipart { error: None, safety: Safety::new(), - inner: Some(Rc::new(RefCell::new(InnerMultipart { + inner: Some(InnerMultipart { boundary, - payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))), + payload: PayloadRef::new(PayloadBuffer::new(stream)), state: InnerState::FirstBoundary, item: InnerMultipartItem::None, - }))), + }), } } @@ -126,20 +118,27 @@ impl Multipart { impl Stream for Multipart { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(err) = self.error.take() { - Poll::Ready(Some(Err(err))) - } else if self.safety.current() { - let this = self.get_mut(); - let mut inner = this.inner.as_mut().unwrap().borrow_mut(); - if let Some(mut payload) = inner.payload.get_mut(&this.safety) { - payload.poll_stream(cx)?; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + match this.inner.as_mut() { + Some(inner) => { + if let Some(mut buffer) = inner.payload.get_mut(&this.safety) { + // check safety and poll read payload to buffer. + buffer.poll_stream(cx)?; + } else if !this.safety.is_clean() { + // safety violation + return Poll::Ready(Some(Err(MultipartError::NotConsumed))); + } else { + return Poll::Pending; + } + + inner.poll(&this.safety, cx) } - inner.poll(&this.safety, cx) - } else if !self.safety.is_clean() { - Poll::Ready(Some(Err(MultipartError::NotConsumed))) - } else { - Poll::Pending + None => Poll::Ready(Some(Err(this + .error + .take() + .expect("Multipart polled after finish")))), } } } @@ -160,17 +159,15 @@ impl InnerMultipart { Ok(httparse::Status::Complete((_, hdrs))) => { // convert headers let mut headers = HeaderMap::with_capacity(hdrs.len()); + for h in hdrs { - if let Ok(name) = HeaderName::try_from(h.name) { - if let Ok(value) = HeaderValue::try_from(h.value) { - headers.append(name, value); - } else { - return Err(ParseError::Header.into()); - } - } else { - return Err(ParseError::Header.into()); - } + let name = + HeaderName::try_from(h.name).map_err(|_| ParseError::Header)?; + let value = HeaderValue::try_from(h.value) + .map_err(|_| ParseError::Header)?; + headers.append(name, value); } + Ok(Some(headers)) } Ok(httparse::Status::Partial) => Err(ParseError::Header.into()), @@ -466,17 +463,19 @@ impl Stream for Field { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.safety.current() { - let mut inner = self.inner.borrow_mut(); - if let Some(mut payload) = inner.payload.as_ref().unwrap().get_mut(&self.safety) { - payload.poll_stream(cx)?; - } - inner.poll(&self.safety) - } else if !self.safety.is_clean() { - Poll::Ready(Some(Err(MultipartError::NotConsumed))) + let this = self.get_mut(); + let mut inner = this.inner.borrow_mut(); + if let Some(mut buffer) = inner.payload.as_ref().unwrap().get_mut(&this.safety) { + // check safety and poll read payload to buffer. + buffer.poll_stream(cx)?; + } else if !this.safety.is_clean() { + // safety violation + return Poll::Ready(Some(Err(MultipartError::NotConsumed))); } else { - Poll::Pending + return Poll::Pending; } + + inner.poll(&this.safety) } } @@ -690,10 +689,7 @@ impl PayloadRef { } } - fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option> - where - 'a: 'b, - { + fn get_mut(&self, s: &Safety) -> Option> { if s.current() { Some(self.payload.borrow_mut()) } else { @@ -779,7 +775,7 @@ impl PayloadBuffer { PayloadBuffer { eof: false, buf: BytesMut::new(), - stream: stream.boxed_local(), + stream: Box::pin(stream), } } @@ -860,7 +856,7 @@ mod tests { use actix_web::test::TestRequest; use actix_web::FromRequest; use bytes::Bytes; - use futures_util::future::lazy; + use futures_util::{future::lazy, StreamExt}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream;