1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-10 19:01:05 +00:00

Use thread pool for response body comression

This commit is contained in:
Nikolay Kim 2019-04-04 13:17:55 -07:00
parent bc834f6a03
commit d8bc66a18e
3 changed files with 91 additions and 73 deletions

View file

@ -8,10 +8,13 @@
* Render error and return as response body * Render error and return as response body
* Use thread pool for response body comression
### Deleted ### Deleted
* Removed PayloadBuffer * Removed PayloadBuffer
## [0.1.0-alpha.3] - 2019-04-02 ## [0.1.0-alpha.3] - 2019-04-02
### Added ### Added

View file

@ -1,13 +1,13 @@
//! Stream encoder //! Stream encoder
use std::io::{self, Write}; use std::io::{self, Write};
use bytes::Bytes; use actix_threadpool::{run, CpuFuture};
use futures::{Async, Poll};
#[cfg(feature = "brotli")] #[cfg(feature = "brotli")]
use brotli2::write::BrotliEncoder; use brotli2::write::BrotliEncoder;
use bytes::Bytes;
#[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))]
use flate2::write::{GzEncoder, ZlibEncoder}; use flate2::write::{GzEncoder, ZlibEncoder};
use futures::{Async, Future, Poll};
use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::body::{Body, BodySize, MessageBody, ResponseBody};
use crate::http::header::{ContentEncoding, CONTENT_ENCODING}; use crate::http::header::{ContentEncoding, CONTENT_ENCODING};
@ -16,9 +16,12 @@ use crate::{Error, ResponseHead};
use super::Writer; use super::Writer;
const INPLACE: usize = 2049;
pub struct Encoder<B> { pub struct Encoder<B> {
body: EncoderBody<B>, body: EncoderBody<B>,
encoder: Option<ContentEncoder>, encoder: Option<ContentEncoder>,
fut: Option<CpuFuture<ContentEncoder, io::Error>>,
} }
impl<B: MessageBody> Encoder<B> { impl<B: MessageBody> Encoder<B> {
@ -27,73 +30,58 @@ impl<B: MessageBody> Encoder<B> {
head: &mut ResponseHead, head: &mut ResponseHead,
body: ResponseBody<B>, body: ResponseBody<B>,
) -> ResponseBody<Encoder<B>> { ) -> ResponseBody<Encoder<B>> {
let has_ce = head.headers().contains_key(CONTENT_ENCODING); let can_encode = !(head.headers().contains_key(CONTENT_ENCODING)
match body { || head.status == StatusCode::SWITCHING_PROTOCOLS
ResponseBody::Other(b) => match b { || encoding == ContentEncoding::Identity
Body::None => ResponseBody::Other(Body::None), || encoding == ContentEncoding::Auto);
Body::Empty => ResponseBody::Other(Body::Empty),
Body::Bytes(buf) => {
if !(has_ce
|| encoding == ContentEncoding::Identity
|| encoding == ContentEncoding::Auto)
{
let mut enc = ContentEncoder::encoder(encoding).unwrap();
// TODO return error! let body = match body {
let _ = enc.write(buf.as_ref()); ResponseBody::Other(b) => match b {
let body = enc.finish().unwrap(); Body::None => return ResponseBody::Other(Body::None),
update_head(encoding, head); Body::Empty => return ResponseBody::Other(Body::Empty),
ResponseBody::Other(Body::Bytes(body)) Body::Bytes(buf) => {
if can_encode {
EncoderBody::Bytes(buf)
} else { } else {
ResponseBody::Other(Body::Bytes(buf)) return ResponseBody::Other(Body::Bytes(buf));
}
}
Body::Message(stream) => {
if has_ce || head.status == StatusCode::SWITCHING_PROTOCOLS {
ResponseBody::Body(Encoder {
body: EncoderBody::Other(stream),
encoder: None,
})
} else {
update_head(encoding, head);
head.no_chunking(false);
ResponseBody::Body(Encoder {
body: EncoderBody::Other(stream),
encoder: ContentEncoder::encoder(encoding),
})
} }
} }
Body::Message(stream) => EncoderBody::BoxedStream(stream),
}, },
ResponseBody::Body(stream) => { ResponseBody::Body(stream) => EncoderBody::Stream(stream),
if has_ce || head.status == StatusCode::SWITCHING_PROTOCOLS { };
ResponseBody::Body(Encoder {
body: EncoderBody::Body(stream), if can_encode {
encoder: None, update_head(encoding, head);
}) head.no_chunking(false);
} else { ResponseBody::Body(Encoder {
update_head(encoding, head); body,
head.no_chunking(false); fut: None,
ResponseBody::Body(Encoder { encoder: ContentEncoder::encoder(encoding),
body: EncoderBody::Body(stream), })
encoder: ContentEncoder::encoder(encoding), } else {
}) ResponseBody::Body(Encoder {
} body,
} fut: None,
encoder: None,
})
} }
} }
} }
enum EncoderBody<B> { enum EncoderBody<B> {
Body(B), Bytes(Bytes),
Other(Box<dyn MessageBody>), Stream(B),
BoxedStream(Box<dyn MessageBody>),
} }
impl<B: MessageBody> MessageBody for Encoder<B> { impl<B: MessageBody> MessageBody for Encoder<B> {
fn length(&self) -> BodySize { fn length(&self) -> BodySize {
if self.encoder.is_none() { if self.encoder.is_none() {
match self.body { match self.body {
EncoderBody::Body(ref b) => b.length(), EncoderBody::Bytes(ref b) => b.length(),
EncoderBody::Other(ref b) => b.length(), EncoderBody::Stream(ref b) => b.length(),
EncoderBody::BoxedStream(ref b) => b.length(),
} }
} else { } else {
BodySize::Stream BodySize::Stream
@ -102,20 +90,47 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> { fn poll_next(&mut self) -> Poll<Option<Bytes>, Error> {
loop { loop {
if let Some(ref mut fut) = self.fut {
let mut encoder = futures::try_ready!(fut.poll());
let chunk = encoder.take();
self.encoder = Some(encoder);
self.fut.take();
if !chunk.is_empty() {
return Ok(Async::Ready(Some(chunk)));
}
}
let result = match self.body { let result = match self.body {
EncoderBody::Body(ref mut b) => b.poll_next()?, EncoderBody::Bytes(ref mut b) => {
EncoderBody::Other(ref mut b) => b.poll_next()?, if b.is_empty() {
Async::Ready(None)
} else {
Async::Ready(Some(std::mem::replace(b, Bytes::new())))
}
}
EncoderBody::Stream(ref mut b) => b.poll_next()?,
EncoderBody::BoxedStream(ref mut b) => b.poll_next()?,
}; };
match result { match result {
Async::NotReady => return Ok(Async::NotReady), Async::NotReady => return Ok(Async::NotReady),
Async::Ready(Some(chunk)) => { Async::Ready(Some(chunk)) => {
if let Some(ref mut encoder) = self.encoder { if let Some(mut encoder) = self.encoder.take() {
if encoder.write(&chunk)? { if chunk.len() < INPLACE {
return Ok(Async::Ready(Some(encoder.take()))); encoder.write(&chunk)?;
let chunk = encoder.take();
self.encoder = Some(encoder);
if !chunk.is_empty() {
return Ok(Async::Ready(Some(chunk)));
}
} else {
self.fut = Some(run(move || {
encoder.write(&chunk)?;
Ok(encoder)
}));
continue;
} }
} else {
return Ok(Async::Ready(Some(chunk)));
} }
return Ok(Async::Ready(Some(chunk)));
} }
Async::Ready(None) => { Async::Ready(None) => {
if let Some(encoder) = self.encoder.take() { if let Some(encoder) = self.encoder.take() {
@ -203,11 +218,11 @@ impl ContentEncoder {
} }
} }
fn write(&mut self, data: &[u8]) -> Result<bool, io::Error> { fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
match *self { match *self {
#[cfg(feature = "brotli")] #[cfg(feature = "brotli")]
ContentEncoder::Br(ref mut encoder) => match encoder.write_all(data) { ContentEncoder::Br(ref mut encoder) => match encoder.write_all(data) {
Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), Ok(_) => Ok(()),
Err(err) => { Err(err) => {
trace!("Error decoding br encoding: {}", err); trace!("Error decoding br encoding: {}", err);
Err(err) Err(err)
@ -215,7 +230,7 @@ impl ContentEncoder {
}, },
#[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))]
ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) { ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) {
Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), Ok(_) => Ok(()),
Err(err) => { Err(err) => {
trace!("Error decoding gzip encoding: {}", err); trace!("Error decoding gzip encoding: {}", err);
Err(err) Err(err)
@ -223,7 +238,7 @@ impl ContentEncoder {
}, },
#[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))] #[cfg(any(feature = "flate2-zlib", feature = "flate2-rust"))]
ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) { ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) {
Ok(_) => Ok(!encoder.get_ref().buf.is_empty()), Ok(_) => Ok(()),
Err(err) => { Err(err) => {
trace!("Error decoding deflate encoding: {}", err); trace!("Error decoding deflate encoding: {}", err);
Err(err) Err(err)

View file

@ -22,8 +22,8 @@ where
impl<F, R> Factory<(), R> for F impl<F, R> Factory<(), R> for F
where where
F: Fn() -> R + Clone + 'static, F: Fn() -> R + Clone,
R: Responder + 'static, R: Responder,
{ {
fn call(&self, _: ()) -> R { fn call(&self, _: ()) -> R {
(self)() (self)()
@ -55,7 +55,7 @@ where
impl<F, T, R> NewService for Handler<F, T, R> impl<F, T, R> NewService for Handler<F, T, R>
where where
F: Factory<T, R>, F: Factory<T, R>,
R: Responder + 'static, R: Responder,
{ {
type Request = (T, HttpRequest); type Request = (T, HttpRequest);
type Response = ServiceResponse; type Response = ServiceResponse;
@ -76,7 +76,7 @@ where
pub struct HandlerService<F, T, R> pub struct HandlerService<F, T, R>
where where
F: Factory<T, R>, F: Factory<T, R>,
R: Responder + 'static, R: Responder,
{ {
hnd: F, hnd: F,
_t: PhantomData<(T, R)>, _t: PhantomData<(T, R)>,
@ -85,7 +85,7 @@ where
impl<F, T, R> Service for HandlerService<F, T, R> impl<F, T, R> Service for HandlerService<F, T, R>
where where
F: Factory<T, R>, F: Factory<T, R>,
R: Responder + 'static, R: Responder,
{ {
type Request = (T, HttpRequest); type Request = (T, HttpRequest);
type Response = ServiceResponse; type Response = ServiceResponse;
@ -355,8 +355,8 @@ impl<P, T: FromRequest<P>> Future for ExtractResponse<P, T> {
/// FromRequest trait impl for tuples /// FromRequest trait impl for tuples
macro_rules! factory_tuple ({ $(($n:tt, $T:ident)),+} => { macro_rules! factory_tuple ({ $(($n:tt, $T:ident)),+} => {
impl<Func, $($T,)+ Res> Factory<($($T,)+), Res> for Func impl<Func, $($T,)+ Res> Factory<($($T,)+), Res> for Func
where Func: Fn($($T,)+) -> Res + Clone + 'static, where Func: Fn($($T,)+) -> Res + Clone,
Res: Responder + 'static, Res: Responder,
{ {
fn call(&self, param: ($($T,)+)) -> Res { fn call(&self, param: ($($T,)+)) -> Res {
(self)($(param.$n,)+) (self)($(param.$n,)+)
@ -365,7 +365,7 @@ macro_rules! factory_tuple ({ $(($n:tt, $T:ident)),+} => {
impl<Func, $($T,)+ Res> AsyncFactory<($($T,)+), Res> for Func impl<Func, $($T,)+ Res> AsyncFactory<($($T,)+), Res> for Func
where Func: Fn($($T,)+) -> Res + Clone + 'static, where Func: Fn($($T,)+) -> Res + Clone + 'static,
Res: IntoFuture + 'static, Res: IntoFuture,
Res::Item: Into<Response>, Res::Item: Into<Response>,
Res::Error: Into<Error>, Res::Error: Into<Error>,
{ {