1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-02 05:18:44 +00:00

split encoding module

This commit is contained in:
Nikolay Kim 2018-06-24 10:42:20 +06:00
parent 40ca9ba9c5
commit 33260c7b35
9 changed files with 371 additions and 362 deletions

View file

@ -18,7 +18,7 @@ use error::Error;
use error::PayloadError; use error::PayloadError;
use header::ContentEncoding; use header::ContentEncoding;
use httpmessage::HttpMessage; use httpmessage::HttpMessage;
use server::encoding::PayloadStream; use server::input::PayloadStream;
use server::WriterState; use server::WriterState;
/// A set of errors that can occur during request sending and response reading /// A set of errors that can occur during request sending and response reading

View file

@ -21,7 +21,7 @@ use tokio_io::AsyncWrite;
use body::{Binary, Body}; use body::{Binary, Body};
use header::ContentEncoding; use header::ContentEncoding;
use server::encoding::{ContentEncoder, Output, TransferEncoding}; use server::output::{ContentEncoder, Output, TransferEncoding};
use server::WriterState; use server::WriterState;
use client::ClientRequest; use client::ClientRequest;

View file

@ -13,9 +13,9 @@ use httpresponse::HttpResponse;
use payload::{Payload, PayloadStatus, PayloadWriter}; use payload::{Payload, PayloadStatus, PayloadWriter};
use pipeline::Pipeline; use pipeline::Pipeline;
use super::encoding::PayloadType;
use super::h1decoder::{DecoderError, H1Decoder, Message}; use super::h1decoder::{DecoderError, H1Decoder, Message};
use super::h1writer::H1Writer; use super::h1writer::H1Writer;
use super::input::PayloadType;
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::Writer; use super::Writer;
use super::{HttpHandler, HttpHandlerTask, IoStream}; use super::{HttpHandler, HttpHandlerTask, IoStream};

View file

@ -6,8 +6,8 @@ use std::io;
use std::rc::Rc; use std::rc::Rc;
use tokio_io::AsyncWrite; use tokio_io::AsyncWrite;
use super::encoding::{ContentEncoder, Output};
use super::helpers; use super::helpers;
use super::output::{ContentEncoder, Output};
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body}; use body::{Binary, Body};

View file

@ -1,5 +1,3 @@
#![cfg_attr(feature = "cargo-clippy", allow(redundant_field_names))]
use std::collections::VecDeque; use std::collections::VecDeque;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::SocketAddr; use std::net::SocketAddr;
@ -23,8 +21,8 @@ use payload::{Payload, PayloadStatus, PayloadWriter};
use pipeline::Pipeline; use pipeline::Pipeline;
use uri::Url; use uri::Url;
use super::encoding::PayloadType;
use super::h2writer::H2Writer; use super::h2writer::H2Writer;
use super::input::PayloadType;
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::{HttpHandler, HttpHandlerTask, Writer}; use super::{HttpHandler, HttpHandlerTask, Writer};

View file

@ -11,8 +11,8 @@ use std::{cmp, io};
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
use http::{HttpTryFrom, Version}; use http::{HttpTryFrom, Version};
use super::encoding::{ContentEncoder, Output};
use super::helpers; use super::helpers;
use super::output::{ContentEncoder, Output};
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE}; use super::{Writer, WriterState, MAX_WRITE_BUFFER_SIZE};
use body::{Binary, Body}; use body::{Binary, Body};

357
src/server/input.rs Normal file
View file

@ -0,0 +1,357 @@
use std::io::{Read, Write};
use std::{cmp, io};
#[cfg(feature = "brotli")]
use brotli2::write::BrotliDecoder;
use bytes::{BufMut, Bytes, BytesMut};
use error::PayloadError;
#[cfg(feature = "flate2")]
use flate2::read::GzDecoder;
#[cfg(feature = "flate2")]
use flate2::write::DeflateDecoder;
use header::ContentEncoding;
use http::header::{HeaderMap, CONTENT_ENCODING};
use payload::{PayloadSender, PayloadStatus, PayloadWriter};
pub(crate) enum PayloadType {
Sender(PayloadSender),
Encoding(Box<EncodedPayload>),
}
impl PayloadType {
#[cfg(any(feature = "brotli", feature = "flate2"))]
pub fn new(headers: &HeaderMap, sender: PayloadSender) -> PayloadType {
// check content-encoding
let enc = if let Some(enc) = headers.get(CONTENT_ENCODING) {
if let Ok(enc) = enc.to_str() {
ContentEncoding::from(enc)
} else {
ContentEncoding::Auto
}
} else {
ContentEncoding::Auto
};
match enc {
ContentEncoding::Auto | ContentEncoding::Identity => {
PayloadType::Sender(sender)
}
_ => PayloadType::Encoding(Box::new(EncodedPayload::new(sender, enc))),
}
}
#[cfg(not(any(feature = "brotli", feature = "flate2")))]
pub fn new(headers: &HeaderMap, sender: PayloadSender) -> PayloadType {
PayloadType::Sender(sender)
}
}
impl PayloadWriter for PayloadType {
#[inline]
fn set_error(&mut self, err: PayloadError) {
match *self {
PayloadType::Sender(ref mut sender) => sender.set_error(err),
PayloadType::Encoding(ref mut enc) => enc.set_error(err),
}
}
#[inline]
fn feed_eof(&mut self) {
match *self {
PayloadType::Sender(ref mut sender) => sender.feed_eof(),
PayloadType::Encoding(ref mut enc) => enc.feed_eof(),
}
}
#[inline]
fn feed_data(&mut self, data: Bytes) {
match *self {
PayloadType::Sender(ref mut sender) => sender.feed_data(data),
PayloadType::Encoding(ref mut enc) => enc.feed_data(data),
}
}
#[inline]
fn need_read(&self) -> PayloadStatus {
match *self {
PayloadType::Sender(ref sender) => sender.need_read(),
PayloadType::Encoding(ref enc) => enc.need_read(),
}
}
}
/// Payload wrapper with content decompression support
pub(crate) struct EncodedPayload {
inner: PayloadSender,
error: bool,
payload: PayloadStream,
}
impl EncodedPayload {
pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload {
EncodedPayload {
inner,
error: false,
payload: PayloadStream::new(enc),
}
}
}
impl PayloadWriter for EncodedPayload {
fn set_error(&mut self, err: PayloadError) {
self.inner.set_error(err)
}
fn feed_eof(&mut self) {
if !self.error {
match self.payload.feed_eof() {
Err(err) => {
self.error = true;
self.set_error(PayloadError::Io(err));
}
Ok(value) => {
if let Some(b) = value {
self.inner.feed_data(b);
}
self.inner.feed_eof();
}
}
}
}
fn feed_data(&mut self, data: Bytes) {
if self.error {
return;
}
match self.payload.feed_data(data) {
Ok(Some(b)) => self.inner.feed_data(b),
Ok(None) => (),
Err(e) => {
self.error = true;
self.set_error(e.into());
}
}
}
#[inline]
fn need_read(&self) -> PayloadStatus {
self.inner.need_read()
}
}
pub(crate) enum Decoder {
#[cfg(feature = "flate2")]
Deflate(Box<DeflateDecoder<Writer>>),
#[cfg(feature = "flate2")]
Gzip(Option<Box<GzDecoder<Wrapper>>>),
#[cfg(feature = "brotli")]
Br(Box<BrotliDecoder<Writer>>),
Identity,
}
// should go after write::GzDecoder get implemented
#[derive(Debug)]
pub(crate) struct Wrapper {
pub buf: BytesMut,
pub eof: bool,
}
impl io::Read for Wrapper {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = cmp::min(buf.len(), self.buf.len());
buf[..len].copy_from_slice(&self.buf[..len]);
self.buf.split_to(len);
if len == 0 {
if self.eof {
Ok(0)
} else {
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
}
} else {
Ok(len)
}
}
}
impl io::Write for Wrapper {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub(crate) struct Writer {
buf: BytesMut,
}
impl Writer {
fn new() -> Writer {
Writer {
buf: BytesMut::with_capacity(8192),
}
}
fn take(&mut self) -> Bytes {
self.buf.take().freeze()
}
}
impl io::Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
/// Payload stream with decompression support
pub(crate) struct PayloadStream {
decoder: Decoder,
dst: BytesMut,
}
impl PayloadStream {
pub fn new(enc: ContentEncoding) -> PayloadStream {
let dec = match enc {
#[cfg(feature = "brotli")]
ContentEncoding::Br => {
Decoder::Br(Box::new(BrotliDecoder::new(Writer::new())))
}
#[cfg(feature = "flate2")]
ContentEncoding::Deflate => {
Decoder::Deflate(Box::new(DeflateDecoder::new(Writer::new())))
}
#[cfg(feature = "flate2")]
ContentEncoding::Gzip => Decoder::Gzip(None),
_ => Decoder::Identity,
};
PayloadStream {
decoder: dec,
dst: BytesMut::new(),
}
}
}
impl PayloadStream {
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
match self.decoder {
#[cfg(feature = "brotli")]
Decoder::Br(ref mut decoder) => match decoder.finish() {
Ok(mut writer) => {
let b = writer.take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
#[cfg(feature = "flate2")]
Decoder::Gzip(ref mut decoder) => {
if let Some(ref mut decoder) = *decoder {
decoder.as_mut().get_mut().eof = true;
self.dst.reserve(8192);
match decoder.read(unsafe { self.dst.bytes_mut() }) {
Ok(n) => {
unsafe { self.dst.advance_mut(n) };
return Ok(Some(self.dst.take().freeze()));
}
Err(e) => return Err(e),
}
} else {
Ok(None)
}
}
#[cfg(feature = "flate2")]
Decoder::Deflate(ref mut decoder) => match decoder.try_finish() {
Ok(_) => {
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
Decoder::Identity => Ok(None),
}
}
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
match self.decoder {
#[cfg(feature = "brotli")]
Decoder::Br(ref mut decoder) => match decoder.write_all(&data) {
Ok(_) => {
decoder.flush()?;
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
#[cfg(feature = "flate2")]
Decoder::Gzip(ref mut decoder) => {
if decoder.is_none() {
*decoder = Some(Box::new(GzDecoder::new(Wrapper {
buf: BytesMut::from(data),
eof: false,
})));
} else {
let _ = decoder.as_mut().unwrap().write(&data);
}
loop {
self.dst.reserve(8192);
match decoder
.as_mut()
.as_mut()
.unwrap()
.read(unsafe { self.dst.bytes_mut() })
{
Ok(n) => {
if n != 0 {
unsafe { self.dst.advance_mut(n) };
}
if n == 0 {
return Ok(Some(self.dst.take().freeze()));
}
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock
&& !self.dst.is_empty()
{
return Ok(Some(self.dst.take().freeze()));
}
return Err(e);
}
}
}
}
#[cfg(feature = "flate2")]
Decoder::Deflate(ref mut decoder) => match decoder.write_all(&data) {
Ok(_) => {
decoder.flush()?;
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
Decoder::Identity => Ok(Some(data)),
}
}
}

View file

@ -8,13 +8,14 @@ use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
mod channel; mod channel;
pub(crate) mod encoding;
pub(crate) mod h1; pub(crate) mod h1;
pub(crate) mod h1decoder; pub(crate) mod h1decoder;
mod h1writer; mod h1writer;
mod h2; mod h2;
mod h2writer; mod h2writer;
pub(crate) mod helpers; pub(crate) mod helpers;
pub(crate) mod input;
pub(crate) mod output;
pub(crate) mod settings; pub(crate) mod settings;
mod srv; mod srv;
mod worker; mod worker;

View file

@ -1,372 +1,24 @@
use std::fmt::Write as FmtWrite; use std::fmt::Write as FmtWrite;
use std::io::{Read, Write}; use std::io::Write;
use std::str::FromStr; use std::str::FromStr;
use std::{cmp, fmt, io, mem}; use std::{cmp, fmt, io, mem};
#[cfg(feature = "brotli")] #[cfg(feature = "brotli")]
use brotli2::write::{BrotliDecoder, BrotliEncoder}; use brotli2::write::BrotliEncoder;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::BytesMut;
#[cfg(feature = "flate2")] #[cfg(feature = "flate2")]
use flate2::read::GzDecoder; use flate2::write::{DeflateEncoder, GzEncoder};
#[cfg(feature = "flate2")]
use flate2::write::{DeflateDecoder, DeflateEncoder, GzEncoder};
#[cfg(feature = "flate2")] #[cfg(feature = "flate2")]
use flate2::Compression; use flate2::Compression;
use http::header::{ use http::header::{
HeaderMap, HeaderValue, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, HeaderValue, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING,
TRANSFER_ENCODING,
}; };
use http::{HttpTryFrom, Method, Version}; use http::{HttpTryFrom, Method, Version};
use body::{Binary, Body}; use body::{Binary, Body};
use error::PayloadError;
use header::ContentEncoding; use header::ContentEncoding;
use httprequest::HttpInnerMessage; use httprequest::HttpInnerMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use payload::{PayloadSender, PayloadStatus, PayloadWriter};
pub(crate) enum PayloadType {
Sender(PayloadSender),
Encoding(Box<EncodedPayload>),
}
impl PayloadType {
#[cfg(any(feature = "brotli", feature = "flate2"))]
pub fn new(headers: &HeaderMap, sender: PayloadSender) -> PayloadType {
// check content-encoding
let enc = if let Some(enc) = headers.get(CONTENT_ENCODING) {
if let Ok(enc) = enc.to_str() {
ContentEncoding::from(enc)
} else {
ContentEncoding::Auto
}
} else {
ContentEncoding::Auto
};
match enc {
ContentEncoding::Auto | ContentEncoding::Identity => {
PayloadType::Sender(sender)
}
_ => PayloadType::Encoding(Box::new(EncodedPayload::new(sender, enc))),
}
}
#[cfg(not(any(feature = "brotli", feature = "flate2")))]
pub fn new(headers: &HeaderMap, sender: PayloadSender) -> PayloadType {
PayloadType::Sender(sender)
}
}
impl PayloadWriter for PayloadType {
#[inline]
fn set_error(&mut self, err: PayloadError) {
match *self {
PayloadType::Sender(ref mut sender) => sender.set_error(err),
PayloadType::Encoding(ref mut enc) => enc.set_error(err),
}
}
#[inline]
fn feed_eof(&mut self) {
match *self {
PayloadType::Sender(ref mut sender) => sender.feed_eof(),
PayloadType::Encoding(ref mut enc) => enc.feed_eof(),
}
}
#[inline]
fn feed_data(&mut self, data: Bytes) {
match *self {
PayloadType::Sender(ref mut sender) => sender.feed_data(data),
PayloadType::Encoding(ref mut enc) => enc.feed_data(data),
}
}
#[inline]
fn need_read(&self) -> PayloadStatus {
match *self {
PayloadType::Sender(ref sender) => sender.need_read(),
PayloadType::Encoding(ref enc) => enc.need_read(),
}
}
}
/// Payload wrapper with content decompression support
pub(crate) struct EncodedPayload {
inner: PayloadSender,
error: bool,
payload: PayloadStream,
}
impl EncodedPayload {
pub fn new(inner: PayloadSender, enc: ContentEncoding) -> EncodedPayload {
EncodedPayload {
inner,
error: false,
payload: PayloadStream::new(enc),
}
}
}
impl PayloadWriter for EncodedPayload {
fn set_error(&mut self, err: PayloadError) {
self.inner.set_error(err)
}
fn feed_eof(&mut self) {
if !self.error {
match self.payload.feed_eof() {
Err(err) => {
self.error = true;
self.set_error(PayloadError::Io(err));
}
Ok(value) => {
if let Some(b) = value {
self.inner.feed_data(b);
}
self.inner.feed_eof();
}
}
}
}
fn feed_data(&mut self, data: Bytes) {
if self.error {
return;
}
match self.payload.feed_data(data) {
Ok(Some(b)) => self.inner.feed_data(b),
Ok(None) => (),
Err(e) => {
self.error = true;
self.set_error(e.into());
}
}
}
#[inline]
fn need_read(&self) -> PayloadStatus {
self.inner.need_read()
}
}
pub(crate) enum Decoder {
#[cfg(feature = "flate2")]
Deflate(Box<DeflateDecoder<Writer>>),
#[cfg(feature = "flate2")]
Gzip(Option<Box<GzDecoder<Wrapper>>>),
#[cfg(feature = "brotli")]
Br(Box<BrotliDecoder<Writer>>),
Identity,
}
// should go after write::GzDecoder get implemented
#[derive(Debug)]
pub(crate) struct Wrapper {
pub buf: BytesMut,
pub eof: bool,
}
impl io::Read for Wrapper {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = cmp::min(buf.len(), self.buf.len());
buf[..len].copy_from_slice(&self.buf[..len]);
self.buf.split_to(len);
if len == 0 {
if self.eof {
Ok(0)
} else {
Err(io::Error::new(io::ErrorKind::WouldBlock, ""))
}
} else {
Ok(len)
}
}
}
impl io::Write for Wrapper {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub(crate) struct Writer {
buf: BytesMut,
}
impl Writer {
fn new() -> Writer {
Writer {
buf: BytesMut::with_capacity(8192),
}
}
fn take(&mut self) -> Bytes {
self.buf.take().freeze()
}
}
impl io::Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buf.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
/// Payload stream with decompression support
pub(crate) struct PayloadStream {
decoder: Decoder,
dst: BytesMut,
}
impl PayloadStream {
pub fn new(enc: ContentEncoding) -> PayloadStream {
let dec = match enc {
#[cfg(feature = "brotli")]
ContentEncoding::Br => {
Decoder::Br(Box::new(BrotliDecoder::new(Writer::new())))
}
#[cfg(feature = "flate2")]
ContentEncoding::Deflate => {
Decoder::Deflate(Box::new(DeflateDecoder::new(Writer::new())))
}
#[cfg(feature = "flate2")]
ContentEncoding::Gzip => Decoder::Gzip(None),
_ => Decoder::Identity,
};
PayloadStream {
decoder: dec,
dst: BytesMut::new(),
}
}
}
impl PayloadStream {
pub fn feed_eof(&mut self) -> io::Result<Option<Bytes>> {
match self.decoder {
#[cfg(feature = "brotli")]
Decoder::Br(ref mut decoder) => match decoder.finish() {
Ok(mut writer) => {
let b = writer.take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
#[cfg(feature = "flate2")]
Decoder::Gzip(ref mut decoder) => {
if let Some(ref mut decoder) = *decoder {
decoder.as_mut().get_mut().eof = true;
self.dst.reserve(8192);
match decoder.read(unsafe { self.dst.bytes_mut() }) {
Ok(n) => {
unsafe { self.dst.advance_mut(n) };
return Ok(Some(self.dst.take().freeze()));
}
Err(e) => return Err(e),
}
} else {
Ok(None)
}
}
#[cfg(feature = "flate2")]
Decoder::Deflate(ref mut decoder) => match decoder.try_finish() {
Ok(_) => {
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
Decoder::Identity => Ok(None),
}
}
pub fn feed_data(&mut self, data: Bytes) -> io::Result<Option<Bytes>> {
match self.decoder {
#[cfg(feature = "brotli")]
Decoder::Br(ref mut decoder) => match decoder.write_all(&data) {
Ok(_) => {
decoder.flush()?;
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
#[cfg(feature = "flate2")]
Decoder::Gzip(ref mut decoder) => {
if decoder.is_none() {
*decoder = Some(Box::new(GzDecoder::new(Wrapper {
buf: BytesMut::from(data),
eof: false,
})));
} else {
let _ = decoder.as_mut().unwrap().write(&data);
}
loop {
self.dst.reserve(8192);
match decoder
.as_mut()
.as_mut()
.unwrap()
.read(unsafe { self.dst.bytes_mut() })
{
Ok(n) => {
if n != 0 {
unsafe { self.dst.advance_mut(n) };
}
if n == 0 {
return Ok(Some(self.dst.take().freeze()));
}
}
Err(e) => {
if e.kind() == io::ErrorKind::WouldBlock
&& !self.dst.is_empty()
{
return Ok(Some(self.dst.take().freeze()));
}
return Err(e);
}
}
}
}
#[cfg(feature = "flate2")]
Decoder::Deflate(ref mut decoder) => match decoder.write_all(&data) {
Ok(_) => {
decoder.flush()?;
let b = decoder.get_mut().take();
if !b.is_empty() {
Ok(Some(b))
} else {
Ok(None)
}
}
Err(e) => Err(e),
},
Decoder::Identity => Ok(Some(data)),
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum Output { pub(crate) enum Output {
@ -1071,6 +723,7 @@ impl AcceptEncoding {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use bytes::Bytes;
#[test] #[test]
fn test_chunked_te() { fn test_chunked_te() {