1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-26 11:31:09 +00:00

simplify content-length calculation

This commit is contained in:
Nikolay Kim 2017-12-16 07:29:15 -08:00
parent ed8bd3d6a3
commit b1f33e29ec
7 changed files with 52 additions and 39 deletions

View file

@ -54,7 +54,7 @@ Each result is best of five runs. All measurements are req/sec.
Name | 1 thread | 1 pipeline | 3 thread | 3 pipeline | 8 thread | 8 pipeline Name | 1 thread | 1 pipeline | 3 thread | 3 pipeline | 8 thread | 8 pipeline
---- | -------- | ---------- | -------- | ---------- | -------- | ---------- ---- | -------- | ---------- | -------- | ---------- | -------- | ----------
Actix | 91.200 | 912.000 | 122.100 | 2.083.000 | 107.400 | 2.650.000 Actix | 91.200 | 950.000 | 122.100 | 2.083.000 | 107.400 | 2.730.000
Gotham | 61.000 | 178.000 | | | | Gotham | 61.000 | 178.000 | | | |
Iron | | | | | 94.500 | 78.000 Iron | | | | | 94.500 | 78.000
Rocket | | | | | 95.500 | failed Rocket | | | | | 95.500 | failed

View file

@ -13,10 +13,9 @@ use flate2::write::{GzEncoder, DeflateDecoder, DeflateEncoder};
use brotli2::write::{BrotliDecoder, BrotliEncoder}; use brotli2::write::{BrotliDecoder, BrotliEncoder};
use bytes::{Bytes, BytesMut, BufMut, Writer}; use bytes::{Bytes, BytesMut, BufMut, Writer};
use helpers;
use helpers::SharedBytes;
use body::{Body, Binary}; use body::{Body, Binary};
use error::PayloadError; use error::PayloadError;
use helpers::SharedBytes;
use httprequest::HttpMessage; use httprequest::HttpMessage;
use httpresponse::HttpResponse; use httpresponse::HttpResponse;
use payload::{PayloadSender, PayloadWriter}; use payload::{PayloadSender, PayloadWriter};
@ -390,7 +389,7 @@ impl PayloadEncoder {
if resp.chunked() { if resp.chunked() {
error!("Chunked transfer is enabled but body is set to Empty"); error!("Chunked transfer is enabled but body is set to Empty");
} }
resp.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0")); resp.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::eof(buf) TransferEncoding::eof(buf)
}, },
Body::Binary(ref mut bytes) => { Body::Binary(ref mut bytes) => {
@ -409,18 +408,12 @@ impl PayloadEncoder {
// TODO return error! // TODO return error!
let _ = enc.write(bytes.as_ref()); let _ = enc.write(bytes.as_ref());
let _ = enc.write_eof(); let _ = enc.write_eof();
let b = enc.get_mut().take();
resp.headers_mut().insert( *bytes = Binary::from(enc.get_mut().take());
CONTENT_LENGTH, helpers::convert_into_header(b.len()));
*bytes = Binary::from(b);
encoding = ContentEncoding::Identity; encoding = ContentEncoding::Identity;
TransferEncoding::eof(buf)
} else {
resp.headers_mut().insert(
CONTENT_LENGTH, helpers::convert_into_header(bytes.len()));
TransferEncoding::eof(buf)
} }
resp.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::eof(buf)
} }
Body::Streaming(_) | Body::StreamingContext => { Body::Streaming(_) | Body::StreamingContext => {
if resp.chunked() { if resp.chunked() {
@ -734,11 +727,9 @@ impl TransferEncoding {
return *remaining == 0 return *remaining == 0
} }
let max = cmp::min(*remaining, msg.len() as u64); let max = cmp::min(*remaining, msg.len() as u64);
trace!("sized write = {}", max);
self.buffer.get_mut().extend_from_slice(msg[..max as usize].as_ref()); self.buffer.get_mut().extend_from_slice(msg[..max as usize].as_ref());
*remaining -= max as u64; *remaining -= max as u64;
trace!("encoded {} bytes, remaining = {}", max, remaining);
*remaining == 0 *remaining == 0
}, },
} }

View file

@ -435,7 +435,7 @@ impl Reader {
let read = if buf.is_empty() { let read = if buf.is_empty() {
match self.read_from_io(io, buf) { match self.read_from_io(io, buf) {
Ok(Async::Ready(0)) => { Ok(Async::Ready(0)) => {
debug!("Ignored premature client disconnection"); // debug!("Ignored premature client disconnection");
return Err(ReaderError::Disconnect); return Err(ReaderError::Disconnect);
}, },
Ok(Async::Ready(_)) => (), Ok(Async::Ready(_)) => (),
@ -713,7 +713,6 @@ impl Decoder {
pub fn decode(&mut self, body: &mut BytesMut) -> Poll<Option<Bytes>, io::Error> { pub fn decode(&mut self, body: &mut BytesMut) -> Poll<Option<Bytes>, io::Error> {
match self.kind { match self.kind {
Kind::Length(ref mut remaining) => { Kind::Length(ref mut remaining) => {
trace!("Sized read, remaining={:?}", remaining);
if *remaining == 0 { if *remaining == 0 {
Ok(Async::Ready(None)) Ok(Async::Ready(None))
} else { } else {
@ -794,7 +793,6 @@ impl ChunkedState {
} }
} }
fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll<ChunkedState, io::Error> { fn read_size(rdr: &mut BytesMut, size: &mut u64) -> Poll<ChunkedState, io::Error> {
trace!("Read chunk hex size");
let radix = 16; let radix = 16;
match byte!(rdr) { match byte!(rdr) {
b @ b'0'...b'9' => { b @ b'0'...b'9' => {
@ -833,14 +831,12 @@ impl ChunkedState {
} }
} }
fn read_extension(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> { fn read_extension(rdr: &mut BytesMut) -> Poll<ChunkedState, io::Error> {
trace!("read_extension");
match byte!(rdr) { match byte!(rdr) {
b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)), b'\r' => Ok(Async::Ready(ChunkedState::SizeLf)),
_ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions _ => Ok(Async::Ready(ChunkedState::Extension)), // no supported extensions
} }
} }
fn read_size_lf(rdr: &mut BytesMut, size: &mut u64) -> Poll<ChunkedState, io::Error> { fn read_size_lf(rdr: &mut BytesMut, size: &mut u64) -> Poll<ChunkedState, io::Error> {
trace!("Chunk size is {:?}", size);
match byte!(rdr) { match byte!(rdr) {
b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)), b'\n' if *size > 0 => Ok(Async::Ready(ChunkedState::Body)),
b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)), b'\n' if *size == 0 => Ok(Async::Ready(ChunkedState::EndCr)),

View file

@ -2,7 +2,7 @@ use std::io;
use futures::{Async, Poll}; use futures::{Async, Poll};
use tokio_io::AsyncWrite; use tokio_io::AsyncWrite;
use http::Version; use http::Version;
use http::header::{HeaderValue, CONNECTION, DATE}; use http::header::{HeaderValue, CONNECTION, DATE, CONTENT_LENGTH};
use helpers; use helpers;
use body::Body; use body::Body;
@ -124,7 +124,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
-> Result<WriterState, io::Error> -> Result<WriterState, io::Error>
{ {
trace!("Prepare response with status: {:?}", msg.status()); //trace!("Prepare response with status: {:?}", msg.status());
// prepare task // prepare task
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
@ -146,11 +146,12 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
} else if version >= Version::HTTP_11 { } else if version >= Version::HTTP_11 {
msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("close")); msg.headers_mut().insert(CONNECTION, HeaderValue::from_static("close"));
} }
let body = msg.replace_body(Body::Empty);
// render message // render message
{ {
let mut buffer = self.encoder.get_mut(); let mut buffer = self.encoder.get_mut();
if let Body::Binary(ref bytes) = *msg.body() { if let Body::Binary(ref bytes) = body {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else { } else {
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE); buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE);
@ -174,6 +175,20 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
buffer.extend_from_slice(b"\r\n"); buffer.extend_from_slice(b"\r\n");
} }
match body {
Body::Empty => {
buffer.extend_from_slice(CONTENT_LENGTH.as_str().as_bytes());
buffer.extend_from_slice(b": 0\r\n");
}
Body::Binary(ref bytes) => {
buffer.extend_from_slice(CONTENT_LENGTH.as_str().as_bytes());
buffer.extend_from_slice(b": ");
helpers::convert_usize(bytes.len(), &mut buffer);
buffer.extend_from_slice(b"\r\n");
}
_ => ()
}
// using helpers::date is quite a lot faster // using helpers::date is quite a lot faster
if !msg.headers().contains_key(DATE) { if !msg.headers().contains_key(DATE) {
buffer.reserve(helpers::DATE_VALUE_LENGTH + 8); buffer.reserve(helpers::DATE_VALUE_LENGTH + 8);
@ -187,14 +202,13 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
self.headers_size = buffer.len() as u32; self.headers_size = buffer.len() as u32;
} }
trace!("Response: {:?}", msg); // trace!("Response: {:?}", msg);
if msg.body().is_binary() { if let Body::Binary(bytes) = body {
let body = msg.replace_body(Body::Empty); self.encoder.write(bytes.as_ref())?;
if let Body::Binary(bytes) = body { return Ok(WriterState::Done)
self.encoder.write(bytes.as_ref())?; } else {
return Ok(WriterState::Done) msg.replace_body(body);
}
} }
Ok(WriterState::Done) Ok(WriterState::Done)
} }
@ -221,7 +235,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
self.encoder.write_eof()?; self.encoder.write_eof()?;
if !self.encoder.is_eof() { if !self.encoder.is_eof() {
//debug!("last payload item, but it is not EOF "); // debug!("last payload item, but it is not EOF ");
Err(io::Error::new(io::ErrorKind::Other, Err(io::Error::new(io::ErrorKind::Other,
"Last payload item, but eof is not reached")) "Last payload item, but eof is not reached"))
} else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE { } else if self.encoder.len() > MAX_WRITE_BUFFER_SIZE {

View file

@ -4,7 +4,7 @@ use futures::{Async, Poll};
use http2::{Reason, SendStream}; use http2::{Reason, SendStream};
use http2::server::Respond; use http2::server::Respond;
use http::{Version, HttpTryFrom, Response}; use http::{Version, HttpTryFrom, Response};
use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE}; use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE, CONTENT_LENGTH};
use helpers; use helpers;
use body::Body; use body::Body;
@ -114,7 +114,7 @@ impl Writer for H2Writer {
fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse) fn start(&mut self, req: &mut HttpMessage, msg: &mut HttpResponse)
-> Result<WriterState, io::Error> -> Result<WriterState, io::Error>
{ {
trace!("Prepare response with status: {:?}", msg.status()); // trace!("Prepare response with status: {:?}", msg.status());
// prepare response // prepare response
self.flags.insert(Flags::STARTED); self.flags.insert(Flags::STARTED);
@ -142,6 +142,19 @@ impl Writer for H2Writer {
resp.headers_mut().insert(key, value.clone()); resp.headers_mut().insert(key, value.clone());
} }
match *msg.body() {
Body::Binary(ref bytes) => {
let mut val = BytesMut::new();
helpers::convert_usize(bytes.len(), &mut val);
resp.headers_mut().insert(
CONTENT_LENGTH, HeaderValue::try_from(val.freeze()).unwrap());
}
Body::Empty => {
resp.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
},
_ => (),
}
match self.respond.send_response(resp, self.flags.contains(Flags::EOF)) { match self.respond.send_response(resp, self.flags.contains(Flags::EOF)) {
Ok(stream) => Ok(stream) =>
self.stream = Some(stream), self.stream = Some(stream),
@ -149,7 +162,7 @@ impl Writer for H2Writer {
return Err(io::Error::new(io::ErrorKind::Other, "err")), return Err(io::Error::new(io::ErrorKind::Other, "err")),
} }
trace!("Response: {:?}", msg); // trace!("Response: {:?}", msg);
if msg.body().is_binary() { if msg.body().is_binary() {
if let Body::Binary(bytes) = msg.replace_body(Body::Empty) { if let Body::Binary(bytes) = msg.replace_body(Body::Empty) {

View file

@ -6,7 +6,6 @@ use std::ops::{Deref, DerefMut};
use std::collections::VecDeque; use std::collections::VecDeque;
use time; use time;
use bytes::BytesMut; use bytes::BytesMut;
use http::header::HeaderValue;
use httprequest::HttpMessage; use httprequest::HttpMessage;
@ -285,7 +284,7 @@ pub(crate) fn convert_u16(mut n: u16, bytes: &mut BytesMut) {
} }
} }
pub(crate) fn convert_into_header(mut n: usize) -> HeaderValue { pub(crate) fn convert_usize(mut n: usize, bytes: &mut BytesMut) {
let mut curr: isize = 39; let mut curr: isize = 39;
let mut buf: [u8; 39] = unsafe { mem::uninitialized() }; let mut buf: [u8; 39] = unsafe { mem::uninitialized() };
let buf_ptr = buf.as_mut_ptr(); let buf_ptr = buf.as_mut_ptr();
@ -330,8 +329,8 @@ pub(crate) fn convert_into_header(mut n: usize) -> HeaderValue {
} }
unsafe { unsafe {
HeaderValue::from_bytes( bytes.extend_from_slice(
slice::from_raw_parts(buf_ptr.offset(curr), buf.len() - curr as usize)).unwrap() slice::from_raw_parts(buf_ptr.offset(curr), buf.len() - curr as usize));
} }
} }

View file

@ -220,7 +220,7 @@ impl Stream for WsStream {
loop { loop {
match wsframe::Frame::parse(&mut self.buf) { match wsframe::Frame::parse(&mut self.buf) {
Ok(Some(frame)) => { Ok(Some(frame)) => {
trace!("WsFrame {}", frame); // trace!("WsFrame {}", frame);
let (_finished, opcode, payload) = frame.unpack(); let (_finished, opcode, payload) = frame.unpack();
match opcode { match opcode {