1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-02 21:39:26 +00:00

Remove boxed future from HttpMessage (#1834)

This commit is contained in:
fakeshadow 2020-12-17 19:40:49 +08:00 committed by GitHub
parent 97f615c245
commit 2a5215c1d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -10,11 +10,8 @@ use bytes::{Bytes, BytesMut};
use encoding_rs::{Encoding, UTF_8};
use futures_core::stream::Stream;
use futures_util::{
future::{
err, ok, Either, ErrInto, FutureExt as _, LocalBoxFuture, Ready,
TryFutureExt as _,
},
stream::StreamExt as _,
future::{err, ok, Either, ErrInto, Ready, TryFutureExt as _},
ready,
};
use mime::Mime;
@ -305,10 +302,12 @@ impl PayloadConfig {
// Allow shared refs to default.
const DEFAULT_CONFIG: PayloadConfig = PayloadConfig {
limit: 262_144, // 2^18 bytes (~256kB)
limit: DEFAULT_CONFIG_LIMIT,
mimetype: None,
};
const DEFAULT_CONFIG_LIMIT: usize = 262_144; // 2^18 bytes (~256kB)
impl Default for PayloadConfig {
fn default() -> Self {
DEFAULT_CONFIG.clone()
@ -326,99 +325,83 @@ pub struct HttpMessageBody {
limit: usize,
length: Option<usize>,
#[cfg(feature = "compress")]
stream: Option<dev::Decompress<dev::Payload>>,
stream: dev::Decompress<dev::Payload>,
#[cfg(not(feature = "compress"))]
stream: Option<dev::Payload>,
stream: dev::Payload,
buf: BytesMut,
err: Option<PayloadError>,
fut: Option<LocalBoxFuture<'static, Result<Bytes, PayloadError>>>,
}
impl HttpMessageBody {
/// Create `MessageBody` for request.
#[allow(clippy::borrow_interior_mutable_const)]
pub fn new(req: &HttpRequest, payload: &mut dev::Payload) -> HttpMessageBody {
let mut len = None;
let mut length = None;
let mut err = None;
if let Some(l) = req.headers().get(&header::CONTENT_LENGTH) {
if let Ok(s) = l.to_str() {
if let Ok(l) = s.parse::<usize>() {
len = Some(l)
} else {
return Self::err(PayloadError::UnknownLength);
}
} else {
return Self::err(PayloadError::UnknownLength);
match l.to_str() {
Ok(s) => match s.parse::<usize>() {
Ok(l) if l > DEFAULT_CONFIG_LIMIT => {
err = Some(PayloadError::Overflow)
}
Ok(l) => length = Some(l),
Err(_) => err = Some(PayloadError::UnknownLength),
},
Err(_) => err = Some(PayloadError::UnknownLength),
}
}
#[cfg(feature = "compress")]
let stream = Some(dev::Decompress::from_headers(payload.take(), req.headers()));
let stream = dev::Decompress::from_headers(payload.take(), req.headers());
#[cfg(not(feature = "compress"))]
let stream = Some(payload.take());
let stream = payload.take();
HttpMessageBody {
stream,
limit: 262_144,
length: len,
fut: None,
err: None,
limit: DEFAULT_CONFIG_LIMIT,
length,
buf: BytesMut::with_capacity(8192),
err,
}
}
/// Change max size of payload. By default max size is 256Kb
pub fn limit(mut self, limit: usize) -> Self {
if let Some(l) = self.length {
if l > limit {
self.err = Some(PayloadError::Overflow);
}
}
self.limit = limit;
self
}
fn err(e: PayloadError) -> Self {
HttpMessageBody {
stream: None,
limit: 262_144,
fut: None,
err: Some(e),
length: None,
}
}
}
impl Future for HttpMessageBody {
type Output = Result<Bytes, PayloadError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(ref mut fut) = self.fut {
return Pin::new(fut).poll(cx);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if let Some(e) = this.err.take() {
return Poll::Ready(Err(e));
}
if let Some(err) = self.err.take() {
return Poll::Ready(Err(err));
}
if let Some(len) = self.length.take() {
if len > self.limit {
return Poll::Ready(Err(PayloadError::Overflow));
}
}
// future
let limit = self.limit;
let mut stream = self.stream.take().unwrap();
self.fut = Some(
async move {
let mut body = BytesMut::with_capacity(8192);
while let Some(item) = stream.next().await {
let chunk = item?;
if body.len() + chunk.len() > limit {
return Err(PayloadError::Overflow);
loop {
let res = ready!(Pin::new(&mut this.stream).poll_next(cx));
match res {
Some(chunk) => {
let chunk = chunk?;
if this.buf.len() + chunk.len() > this.limit {
return Poll::Ready(Err(PayloadError::Overflow));
} else {
body.extend_from_slice(&chunk);
this.buf.extend_from_slice(&chunk);
}
}
Ok(body.freeze())
None => return Poll::Ready(Ok(this.buf.split().freeze())),
}
.boxed_local(),
);
self.poll(cx)
}
}
}