1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-13 02:39:32 +00:00

clean up multipart and field stream trait impl (#2462)

This commit is contained in:
fakeshadow 2021-11-25 08:10:53 +08:00 committed by GitHub
parent 52bbbd1d73
commit 89c6d62656
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -32,7 +32,7 @@ const MAX_HEADERS: usize = 32;
pub struct Multipart { pub struct Multipart {
safety: Safety, safety: Safety,
error: Option<MultipartError>, error: Option<MultipartError>,
inner: Option<Rc<RefCell<InnerMultipart>>>, inner: Option<InnerMultipart>,
} }
enum InnerMultipartItem { enum InnerMultipartItem {
@ -96,12 +96,12 @@ impl Multipart {
Multipart { Multipart {
error: None, error: None,
safety: Safety::new(), safety: Safety::new(),
inner: Some(Rc::new(RefCell::new(InnerMultipart { inner: Some(InnerMultipart {
boundary, boundary,
payload: PayloadRef::new(PayloadBuffer::new(stream)), payload: PayloadRef::new(PayloadBuffer::new(stream)),
state: InnerState::FirstBoundary, state: InnerState::FirstBoundary,
item: InnerMultipartItem::None, item: InnerMultipartItem::None,
}))), }),
} }
} }
@ -118,20 +118,27 @@ impl Multipart {
impl Stream for Multipart { impl Stream for Multipart {
type Item = Result<Field, MultipartError>; type Item = Result<Field, MultipartError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(err) = self.error.take() { let this = self.get_mut();
Poll::Ready(Some(Err(err)))
} else if self.safety.current() { match this.inner.as_mut() {
let this = self.get_mut(); Some(inner) => {
let mut inner = this.inner.as_mut().unwrap().borrow_mut(); if let Some(mut buffer) = inner.payload.get_mut(&this.safety) {
if let Some(mut payload) = inner.payload.get_mut(&this.safety) { // check safety and poll read payload to buffer.
payload.poll_stream(cx)?; buffer.poll_stream(cx)?;
} else if !this.safety.is_clean() {
// safety violation
return Poll::Ready(Some(Err(MultipartError::NotConsumed)));
} else {
return Poll::Pending;
}
inner.poll(&this.safety, cx)
} }
inner.poll(&this.safety, cx) None => Poll::Ready(Some(Err(this
} else if !self.safety.is_clean() { .error
Poll::Ready(Some(Err(MultipartError::NotConsumed))) .take()
} else { .expect("Multipart polled after finish")))),
Poll::Pending
} }
} }
} }
@ -152,17 +159,15 @@ impl InnerMultipart {
Ok(httparse::Status::Complete((_, hdrs))) => { Ok(httparse::Status::Complete((_, hdrs))) => {
// convert headers // convert headers
let mut headers = HeaderMap::with_capacity(hdrs.len()); let mut headers = HeaderMap::with_capacity(hdrs.len());
for h in hdrs { for h in hdrs {
if let Ok(name) = HeaderName::try_from(h.name) { let name =
if let Ok(value) = HeaderValue::try_from(h.value) { HeaderName::try_from(h.name).map_err(|_| ParseError::Header)?;
headers.append(name, value); let value = HeaderValue::try_from(h.value)
} else { .map_err(|_| ParseError::Header)?;
return Err(ParseError::Header.into()); headers.append(name, value);
}
} else {
return Err(ParseError::Header.into());
}
} }
Ok(Some(headers)) Ok(Some(headers))
} }
Ok(httparse::Status::Partial) => Err(ParseError::Header.into()), Ok(httparse::Status::Partial) => Err(ParseError::Header.into()),
@ -458,17 +463,19 @@ impl Stream for Field {
type Item = Result<Bytes, MultipartError>; type Item = Result<Bytes, MultipartError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.safety.current() { let this = self.get_mut();
let mut inner = self.inner.borrow_mut(); let mut inner = this.inner.borrow_mut();
if let Some(mut payload) = inner.payload.as_ref().unwrap().get_mut(&self.safety) { if let Some(mut buffer) = inner.payload.as_ref().unwrap().get_mut(&this.safety) {
payload.poll_stream(cx)?; // check safety and poll read payload to buffer.
} buffer.poll_stream(cx)?;
inner.poll(&self.safety) } else if !this.safety.is_clean() {
} else if !self.safety.is_clean() { // safety violation
Poll::Ready(Some(Err(MultipartError::NotConsumed))) return Poll::Ready(Some(Err(MultipartError::NotConsumed)));
} else { } else {
Poll::Pending return Poll::Pending;
} }
inner.poll(&this.safety)
} }
} }