1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-05-20 09:18:26 +00:00

Merge branch 'master' into get-all-order

This commit is contained in:
Rob Ede 2021-11-28 00:20:08 +00:00 committed by GitHub
commit e2a803d278
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 64 deletions

View file

@ -88,7 +88,7 @@ impl Charset {
Iso_8859_8_E => "ISO-8859-8-E", Iso_8859_8_E => "ISO-8859-8-E",
Iso_8859_8_I => "ISO-8859-8-I", Iso_8859_8_I => "ISO-8859-8-I",
Gb2312 => "GB2312", Gb2312 => "GB2312",
Big5 => "big5", Big5 => "Big5",
Koi8_R => "KOI8-R", Koi8_R => "KOI8-R",
Ext(ref s) => s, Ext(ref s) => s,
} }
@ -128,7 +128,7 @@ impl FromStr for Charset {
"ISO-8859-8-E" => Iso_8859_8_E, "ISO-8859-8-E" => Iso_8859_8_E,
"ISO-8859-8-I" => Iso_8859_8_I, "ISO-8859-8-I" => Iso_8859_8_I,
"GB2312" => Gb2312, "GB2312" => Gb2312,
"big5" => Big5, "BIG5" => Big5,
"KOI8-R" => Koi8_R, "KOI8-R" => Koi8_R,
s => Ext(s.to_owned()), s => Ext(s.to_owned()),
}) })

View file

@ -20,7 +20,6 @@ actix-utils = "3.0.0"
bytes = "1" bytes = "1"
derive_more = "0.99.5" derive_more = "0.99.5"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
httparse = "1.3" httparse = "1.3"
local-waker = "0.1" local-waker = "0.1"
log = "0.4" log = "0.4"
@ -30,5 +29,6 @@ twoway = "0.2"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.2" actix-rt = "2.2"
actix-http = "3.0.0-beta.13" actix-http = "3.0.0-beta.13"
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1" tokio-stream = "0.1"

View file

@ -17,7 +17,6 @@ use actix_web::{
}; };
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::stream::{LocalBoxStream, Stream}; use futures_core::stream::{LocalBoxStream, Stream};
use futures_util::stream::StreamExt as _;
use local_waker::LocalWaker; use local_waker::LocalWaker;
use crate::error::MultipartError; use crate::error::MultipartError;
@ -33,7 +32,7 @@ const MAX_HEADERS: usize = 32;
pub struct Multipart { pub struct Multipart {
safety: Safety, safety: Safety,
error: Option<MultipartError>, error: Option<MultipartError>,
inner: Option<Rc<RefCell<InnerMultipart>>>, inner: Option<InnerMultipart>,
} }
enum InnerMultipartItem { enum InnerMultipartItem {
@ -67,7 +66,7 @@ impl Multipart {
/// Create multipart instance for boundary. /// Create multipart instance for boundary.
pub fn new<S>(headers: &HeaderMap, stream: S) -> Multipart pub fn new<S>(headers: &HeaderMap, stream: S) -> Multipart
where where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static, S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{ {
match Self::boundary(headers) { match Self::boundary(headers) {
Ok(boundary) => Multipart::from_boundary(boundary, stream), Ok(boundary) => Multipart::from_boundary(boundary, stream),
@ -77,39 +76,32 @@ impl Multipart {
/// Extract boundary info from headers. /// Extract boundary info from headers.
pub(crate) fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> { pub(crate) fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> {
if let Some(content_type) = headers.get(&header::CONTENT_TYPE) { headers
if let Ok(content_type) = content_type.to_str() { .get(&header::CONTENT_TYPE)
if let Ok(ct) = content_type.parse::<mime::Mime>() { .ok_or(MultipartError::NoContentType)?
if let Some(boundary) = ct.get_param(mime::BOUNDARY) { .to_str()
Ok(boundary.as_str().to_owned()) .ok()
} else { .and_then(|content_type| content_type.parse::<mime::Mime>().ok())
Err(MultipartError::Boundary) .ok_or(MultipartError::ParseContentType)?
} .get_param(mime::BOUNDARY)
} else { .map(|boundary| boundary.as_str().to_owned())
Err(MultipartError::ParseContentType) .ok_or(MultipartError::Boundary)
}
} else {
Err(MultipartError::ParseContentType)
}
} else {
Err(MultipartError::NoContentType)
}
} }
/// Create multipart instance for given boundary and stream /// Create multipart instance for given boundary and stream
pub(crate) fn from_boundary<S>(boundary: String, stream: S) -> Multipart pub(crate) fn from_boundary<S>(boundary: String, stream: S) -> Multipart
where where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static, S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{ {
Multipart { Multipart {
error: None, error: None,
safety: Safety::new(), safety: Safety::new(),
inner: Some(Rc::new(RefCell::new(InnerMultipart { inner: Some(InnerMultipart {
boundary, boundary,
payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))), payload: PayloadRef::new(PayloadBuffer::new(stream)),
state: InnerState::FirstBoundary, state: InnerState::FirstBoundary,
item: InnerMultipartItem::None, item: InnerMultipartItem::None,
}))), }),
} }
} }
@ -126,20 +118,27 @@ impl Multipart {
impl Stream for Multipart { impl Stream for Multipart {
type Item = Result<Field, MultipartError>; type Item = Result<Field, MultipartError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(err) = self.error.take() { let this = self.get_mut();
Poll::Ready(Some(Err(err)))
} else if self.safety.current() { match this.inner.as_mut() {
let this = self.get_mut(); Some(inner) => {
let mut inner = this.inner.as_mut().unwrap().borrow_mut(); if let Some(mut buffer) = inner.payload.get_mut(&this.safety) {
if let Some(mut payload) = inner.payload.get_mut(&this.safety) { // check safety and poll read payload to buffer.
payload.poll_stream(cx)?; buffer.poll_stream(cx)?;
} else if !this.safety.is_clean() {
// safety violation
return Poll::Ready(Some(Err(MultipartError::NotConsumed)));
} else {
return Poll::Pending;
}
inner.poll(&this.safety, cx)
} }
inner.poll(&this.safety, cx) None => Poll::Ready(Some(Err(this
} else if !self.safety.is_clean() { .error
Poll::Ready(Some(Err(MultipartError::NotConsumed))) .take()
} else { .expect("Multipart polled after finish")))),
Poll::Pending
} }
} }
} }
@ -160,17 +159,15 @@ impl InnerMultipart {
Ok(httparse::Status::Complete((_, hdrs))) => { Ok(httparse::Status::Complete((_, hdrs))) => {
// convert headers // convert headers
let mut headers = HeaderMap::with_capacity(hdrs.len()); let mut headers = HeaderMap::with_capacity(hdrs.len());
for h in hdrs { for h in hdrs {
if let Ok(name) = HeaderName::try_from(h.name) { let name =
if let Ok(value) = HeaderValue::try_from(h.value) { HeaderName::try_from(h.name).map_err(|_| ParseError::Header)?;
headers.append(name, value); let value = HeaderValue::try_from(h.value)
} else { .map_err(|_| ParseError::Header)?;
return Err(ParseError::Header.into()); headers.append(name, value);
}
} else {
return Err(ParseError::Header.into());
}
} }
Ok(Some(headers)) Ok(Some(headers))
} }
Ok(httparse::Status::Partial) => Err(ParseError::Header.into()), Ok(httparse::Status::Partial) => Err(ParseError::Header.into()),
@ -466,17 +463,19 @@ impl Stream for Field {
type Item = Result<Bytes, MultipartError>; type Item = Result<Bytes, MultipartError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.safety.current() { let this = self.get_mut();
let mut inner = self.inner.borrow_mut(); let mut inner = this.inner.borrow_mut();
if let Some(mut payload) = inner.payload.as_ref().unwrap().get_mut(&self.safety) { if let Some(mut buffer) = inner.payload.as_ref().unwrap().get_mut(&this.safety) {
payload.poll_stream(cx)?; // check safety and poll read payload to buffer.
} buffer.poll_stream(cx)?;
inner.poll(&self.safety) } else if !this.safety.is_clean() {
} else if !self.safety.is_clean() { // safety violation
Poll::Ready(Some(Err(MultipartError::NotConsumed))) return Poll::Ready(Some(Err(MultipartError::NotConsumed)));
} else { } else {
Poll::Pending return Poll::Pending;
} }
inner.poll(&this.safety)
} }
} }
@ -690,10 +689,7 @@ impl PayloadRef {
} }
} }
fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<RefMut<'a, PayloadBuffer>> fn get_mut(&self, s: &Safety) -> Option<RefMut<'_, PayloadBuffer>> {
where
'a: 'b,
{
if s.current() { if s.current() {
Some(self.payload.borrow_mut()) Some(self.payload.borrow_mut())
} else { } else {
@ -779,7 +775,7 @@ impl PayloadBuffer {
PayloadBuffer { PayloadBuffer {
eof: false, eof: false,
buf: BytesMut::new(), buf: BytesMut::new(),
stream: stream.boxed_local(), stream: Box::pin(stream),
} }
} }
@ -860,7 +856,7 @@ mod tests {
use actix_web::test::TestRequest; use actix_web::test::TestRequest;
use actix_web::FromRequest; use actix_web::FromRequest;
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::lazy; use futures_util::{future::lazy, StreamExt};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;