1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-11 09:49:29 +00:00
actix-web/actix-http/src/encoding/decoder.rs

221 lines
7 KiB
Rust
Raw Normal View History

use std::future::Future;
2019-03-26 22:14:32 +00:00
use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
2019-03-26 22:14:32 +00:00
use actix_rt::task::{spawn_blocking, JoinHandle};
2019-12-20 07:50:07 +00:00
use brotli2::write::BrotliDecoder;
2019-03-28 18:08:24 +00:00
use bytes::Bytes;
2019-03-26 22:14:32 +00:00
use flate2::write::{GzDecoder, ZlibDecoder};
2019-12-13 05:24:57 +00:00
use futures_core::{ready, Stream};
2019-03-26 22:14:32 +00:00
use super::Writer;
use crate::error::{BlockingError, PayloadError};
2019-03-26 22:14:32 +00:00
use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};
const INPLACE: usize = 2049;
2019-03-28 18:08:24 +00:00
pub struct Decoder<S> {
2019-03-26 22:14:32 +00:00
decoder: Option<ContentDecoder>,
2019-03-28 18:08:24 +00:00
stream: S,
eof: bool,
fut: Option<JoinHandle<Result<(Option<Bytes>, ContentDecoder), io::Error>>>,
2019-03-26 22:14:32 +00:00
}
2019-03-28 18:08:24 +00:00
impl<S> Decoder<S>
2019-03-26 22:14:32 +00:00
where
S: Stream<Item = Result<Bytes, PayloadError>>,
2019-03-26 22:14:32 +00:00
{
2019-03-28 18:08:24 +00:00
/// Construct a decoder.
#[inline]
pub fn new(stream: S, encoding: ContentEncoding) -> Decoder<S> {
2019-03-26 22:14:32 +00:00
let decoder = match encoding {
ContentEncoding::Br => Some(ContentDecoder::Br(Box::new(
2019-12-20 07:50:07 +00:00
BrotliDecoder::new(Writer::new()),
2019-03-26 22:14:32 +00:00
))),
ContentEncoding::Deflate => Some(ContentDecoder::Deflate(Box::new(
ZlibDecoder::new(Writer::new()),
))),
ContentEncoding::Gzip => Some(ContentDecoder::Gzip(Box::new(
GzDecoder::new(Writer::new()),
))),
_ => None,
};
2019-03-28 18:08:24 +00:00
Decoder {
decoder,
stream,
fut: None,
eof: false,
}
2019-03-26 22:14:32 +00:00
}
2019-03-28 18:08:24 +00:00
/// Construct decoder based on headers.
#[inline]
pub fn from_headers(stream: S, headers: &HeaderMap) -> Decoder<S> {
2019-03-26 22:14:32 +00:00
// check content-encoding
let encoding = if let Some(enc) = headers.get(&CONTENT_ENCODING) {
2019-03-26 22:14:32 +00:00
if let Ok(enc) = enc.to_str() {
ContentEncoding::from(enc)
} else {
ContentEncoding::Identity
}
} else {
ContentEncoding::Identity
};
Self::new(stream, encoding)
}
}
2019-03-28 18:08:24 +00:00
impl<S> Stream for Decoder<S>
2019-03-26 22:14:32 +00:00
where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
2019-03-26 22:14:32 +00:00
{
type Item = Result<Bytes, PayloadError>;
2019-03-26 22:14:32 +00:00
fn poll_next(
mut self: Pin<&mut Self>,
2019-12-07 18:46:51 +00:00
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
2019-03-26 22:14:32 +00:00
loop {
2019-03-28 18:08:24 +00:00
if let Some(ref mut fut) = self.fut {
let (chunk, decoder) =
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
2019-03-28 18:08:24 +00:00
self.decoder = Some(decoder);
self.fut.take();
if let Some(chunk) = chunk {
return Poll::Ready(Some(Ok(chunk)));
2019-03-28 18:08:24 +00:00
}
}
if self.eof {
return Poll::Ready(None);
2019-03-28 18:08:24 +00:00
}
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err))),
Poll::Ready(Some(Ok(chunk))) => {
2019-03-28 18:08:24 +00:00
if let Some(mut decoder) = self.decoder.take() {
if chunk.len() < INPLACE {
2019-03-28 18:08:24 +00:00
let chunk = decoder.feed_data(chunk)?;
self.decoder = Some(decoder);
if let Some(chunk) = chunk {
return Poll::Ready(Some(Ok(chunk)));
}
} else {
self.fut = Some(spawn_blocking(move || {
let chunk = decoder.feed_data(chunk)?;
Ok((chunk, decoder))
}));
}
2019-03-28 18:08:24 +00:00
continue;
2019-03-26 22:14:32 +00:00
} else {
return Poll::Ready(Some(Ok(chunk)));
2019-03-26 22:14:32 +00:00
}
}
Poll::Ready(None) => {
2019-03-28 18:08:24 +00:00
self.eof = true;
return if let Some(mut decoder) = self.decoder.take() {
match decoder.feed_eof() {
Ok(Some(res)) => Poll::Ready(Some(Ok(res))),
Ok(None) => Poll::Ready(None),
Err(err) => Poll::Ready(Some(Err(err.into()))),
}
2019-03-26 22:14:32 +00:00
} else {
Poll::Ready(None)
2019-03-26 22:14:32 +00:00
};
}
Poll::Pending => break,
2019-03-26 22:14:32 +00:00
}
}
Poll::Pending
2019-03-26 22:14:32 +00:00
}
}
enum ContentDecoder {
Deflate(Box<ZlibDecoder<Writer>>),
Gzip(Box<GzDecoder<Writer>>),
2019-12-20 07:50:07 +00:00
Br(Box<BrotliDecoder<Writer>>),
2019-03-26 22:14:32 +00:00
}
impl ContentDecoder {
fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
match self {
ContentDecoder::Br(ref mut decoder) => match decoder.flush() {
Ok(()) => {
let b = decoder.get_mut().take();
2019-03-26 22:14:32 +00:00
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
ContentDecoder::Gzip(ref mut decoder) => match decoder.try_finish() {
Ok(_) => {
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
ContentDecoder::Deflate(ref mut decoder) => match decoder.try_finish() {
Ok(_) => {
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
}
}
fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
match self {
ContentDecoder::Br(ref mut decoder) => match decoder.write_all(&data) {
Ok(_) => {
decoder.flush()?;
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
ContentDecoder::Gzip(ref mut decoder) => match decoder.write_all(&data) {
Ok(_) => {
decoder.flush()?;
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
ContentDecoder::Deflate(ref mut decoder) => match decoder.write_all(&data) {
Ok(_) => {
decoder.flush()?;
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
}
}
}