From 52bbbd1d7399d37ca587c67f7c130e25b61f89b1 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 25 Nov 2021 04:53:11 +0800 Subject: [PATCH] Mnior cleanup of multipart API. (#2461) --- actix-multipart/Cargo.toml | 2 +- actix-multipart/src/server.rs | 43 +++++++++++++---------------------- 2 files changed, 17 insertions(+), 28 deletions(-) diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index b1dea4832..5376ba128 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -20,7 +20,6 @@ actix-utils = "3.0.0" bytes = "1" derive_more = "0.99.5" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } -futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } httparse = "1.3" local-waker = "0.1" log = "0.4" @@ -30,5 +29,6 @@ twoway = "0.2" [dev-dependencies] actix-rt = "2.2" actix-http = "3.0.0-beta.13" +futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } tokio = { version = "1", features = ["sync"] } tokio-stream = "0.1" diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 43f9ccf5f..55ac00ff7 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -17,7 +17,6 @@ use actix_web::{ }; use bytes::{Bytes, BytesMut}; use futures_core::stream::{LocalBoxStream, Stream}; -use futures_util::stream::StreamExt as _; use local_waker::LocalWaker; use crate::error::MultipartError; @@ -67,7 +66,7 @@ impl Multipart { /// Create multipart instance for boundary. pub fn new(headers: &HeaderMap, stream: S) -> Multipart where - S: Stream> + Unpin + 'static, + S: Stream> + 'static, { match Self::boundary(headers) { Ok(boundary) => Multipart::from_boundary(boundary, stream), @@ -77,36 +76,29 @@ impl Multipart { /// Extract boundary info from headers. pub(crate) fn boundary(headers: &HeaderMap) -> Result { - if let Some(content_type) = headers.get(&header::CONTENT_TYPE) { - if let Ok(content_type) = content_type.to_str() { - if let Ok(ct) = content_type.parse::() { - if let Some(boundary) = ct.get_param(mime::BOUNDARY) { - Ok(boundary.as_str().to_owned()) - } else { - Err(MultipartError::Boundary) - } - } else { - Err(MultipartError::ParseContentType) - } - } else { - Err(MultipartError::ParseContentType) - } - } else { - Err(MultipartError::NoContentType) - } + headers + .get(&header::CONTENT_TYPE) + .ok_or(MultipartError::NoContentType)? + .to_str() + .ok() + .and_then(|content_type| content_type.parse::().ok()) + .ok_or(MultipartError::ParseContentType)? + .get_param(mime::BOUNDARY) + .map(|boundary| boundary.as_str().to_owned()) + .ok_or(MultipartError::Boundary) } /// Create multipart instance for given boundary and stream pub(crate) fn from_boundary(boundary: String, stream: S) -> Multipart where - S: Stream> + Unpin + 'static, + S: Stream> + 'static, { Multipart { error: None, safety: Safety::new(), inner: Some(Rc::new(RefCell::new(InnerMultipart { boundary, - payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))), + payload: PayloadRef::new(PayloadBuffer::new(stream)), state: InnerState::FirstBoundary, item: InnerMultipartItem::None, }))), @@ -690,10 +682,7 @@ impl PayloadRef { } } - fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option> - where - 'a: 'b, - { + fn get_mut(&self, s: &Safety) -> Option> { if s.current() { Some(self.payload.borrow_mut()) } else { @@ -779,7 +768,7 @@ impl PayloadBuffer { PayloadBuffer { eof: false, buf: BytesMut::new(), - stream: stream.boxed_local(), + stream: Box::pin(stream), } } @@ -860,7 +849,7 @@ mod tests { use actix_web::test::TestRequest; use actix_web::FromRequest; use bytes::Bytes; - use futures_util::future::lazy; + use futures_util::{future::lazy, StreamExt}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream;