1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-22 15:08:06 +00:00

various server optimizations

This commit is contained in:
Nikolay Kim 2017-12-14 19:34:31 -08:00
parent b61c2a0cf0
commit c37565cc4a
9 changed files with 385 additions and 127 deletions

View file

@ -14,6 +14,7 @@ use brotli2::write::{BrotliDecoder, BrotliEncoder};
use bytes::{Bytes, BytesMut, BufMut, Writer};
use helpers;
use helpers::SharedBytes;
use body::{Body, Binary};
use error::PayloadError;
use httprequest::HttpMessage;
@ -337,15 +338,15 @@ impl PayloadWriter for EncodedPayload {
pub(crate) struct PayloadEncoder(ContentEncoder);
impl Default for PayloadEncoder {
fn default() -> PayloadEncoder {
PayloadEncoder(ContentEncoder::Identity(TransferEncoding::eof()))
}
}
impl PayloadEncoder {
pub fn new(req: &HttpMessage, resp: &mut HttpResponse) -> PayloadEncoder {
pub fn empty(bytes: SharedBytes) -> PayloadEncoder {
PayloadEncoder(ContentEncoder::Identity(TransferEncoding::eof(bytes)))
}
pub fn new(buf: SharedBytes, req: &HttpMessage, resp: &mut HttpResponse)
-> PayloadEncoder
{
let version = resp.version().unwrap_or_else(|| req.version);
let mut body = resp.replace_body(Body::Empty);
let has_body = match body {
@ -390,11 +391,11 @@ impl PayloadEncoder {
error!("Chunked transfer is enabled but body is set to Empty");
}
resp.headers_mut().insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
TransferEncoding::eof()
TransferEncoding::eof(buf)
},
Body::Binary(ref mut bytes) => {
if compression {
let transfer = TransferEncoding::eof();
let transfer = TransferEncoding::eof(SharedBytes::default());
let mut enc = match encoding {
ContentEncoding::Deflate => ContentEncoder::Deflate(
DeflateEncoder::new(transfer, Compression::Default)),
@ -414,11 +415,11 @@ impl PayloadEncoder {
CONTENT_LENGTH, helpers::convert_into_header(b.len()));
*bytes = Binary::from(b);
encoding = ContentEncoding::Identity;
TransferEncoding::eof()
TransferEncoding::eof(buf)
} else {
resp.headers_mut().insert(
CONTENT_LENGTH, helpers::convert_into_header(bytes.len()));
TransferEncoding::eof()
TransferEncoding::eof(buf)
}
}
Body::Streaming(_) | Body::StreamingContext => {
@ -429,26 +430,26 @@ impl PayloadEncoder {
}
if version == Version::HTTP_2 {
resp.headers_mut().remove(TRANSFER_ENCODING);
TransferEncoding::eof()
TransferEncoding::eof(buf)
} else {
resp.headers_mut().insert(
TRANSFER_ENCODING, HeaderValue::from_static("chunked"));
TransferEncoding::chunked()
TransferEncoding::chunked(buf)
}
} else if let Some(len) = resp.headers().get(CONTENT_LENGTH) {
// Content-Length
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
TransferEncoding::length(len)
TransferEncoding::length(len, buf)
} else {
debug!("illegal Content-Length: {:?}", len);
TransferEncoding::eof()
TransferEncoding::eof(buf)
}
} else {
TransferEncoding::eof()
TransferEncoding::eof(buf)
}
} else {
TransferEncoding::eof()
TransferEncoding::eof(buf)
}
}
Body::Upgrade(_) | Body::UpgradeContext => {
@ -462,7 +463,7 @@ impl PayloadEncoder {
encoding = ContentEncoding::Identity;
resp.headers_mut().remove(CONTENT_ENCODING);
}
TransferEncoding::eof()
TransferEncoding::eof(buf)
}
};
resp.replace_body(body);
@ -540,13 +541,13 @@ impl ContentEncoder {
pub fn get_ref(&self) -> &BytesMut {
match *self {
ContentEncoder::Br(ref encoder) =>
&encoder.get_ref().buffer,
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Deflate(ref encoder) =>
&encoder.get_ref().buffer,
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Gzip(ref encoder) =>
&encoder.get_ref().buffer,
encoder.get_ref().buffer.get_ref(),
ContentEncoder::Identity(ref encoder) =>
&encoder.buffer,
encoder.buffer.get_ref(),
}
}
@ -554,20 +555,21 @@ impl ContentEncoder {
pub fn get_mut(&mut self) -> &mut BytesMut {
match *self {
ContentEncoder::Br(ref mut encoder) =>
&mut encoder.get_mut().buffer,
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Deflate(ref mut encoder) =>
&mut encoder.get_mut().buffer,
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Gzip(ref mut encoder) =>
&mut encoder.get_mut().buffer,
encoder.get_mut().buffer.get_mut(),
ContentEncoder::Identity(ref mut encoder) =>
&mut encoder.buffer,
encoder.buffer.get_mut(),
}
}
#[cfg_attr(feature = "cargo-clippy", allow(inline_always))]
#[inline(always)]
pub fn write_eof(&mut self) -> Result<(), io::Error> {
let encoder = mem::replace(self, ContentEncoder::Identity(TransferEncoding::eof()));
let encoder = mem::replace(
self, ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::default())));
match encoder {
ContentEncoder::Br(encoder) => {
@ -639,7 +641,7 @@ impl ContentEncoder {
}
}
ContentEncoder::Identity(ref mut encoder) => {
encoder.write_all(data)?;
encoder.encode(data);
Ok(())
}
}
@ -650,7 +652,7 @@ impl ContentEncoder {
#[derive(Debug, Clone)]
pub(crate) struct TransferEncoding {
kind: TransferEncodingKind,
buffer: BytesMut,
buffer: SharedBytes,
}
#[derive(Debug, PartialEq, Clone)]
@ -670,26 +672,26 @@ enum TransferEncodingKind {
impl TransferEncoding {
#[inline]
pub fn eof() -> TransferEncoding {
pub fn eof(bytes: SharedBytes) -> TransferEncoding {
TransferEncoding {
kind: TransferEncodingKind::Eof,
buffer: BytesMut::new(),
buffer: bytes,
}
}
#[inline]
pub fn chunked() -> TransferEncoding {
pub fn chunked(bytes: SharedBytes) -> TransferEncoding {
TransferEncoding {
kind: TransferEncodingKind::Chunked(false),
buffer: BytesMut::new(),
buffer: bytes,
}
}
#[inline]
pub fn length(len: u64) -> TransferEncoding {
pub fn length(len: u64, bytes: SharedBytes) -> TransferEncoding {
TransferEncoding {
kind: TransferEncodingKind::Length(len),
buffer: BytesMut::new(),
buffer: bytes,
}
}
@ -709,7 +711,7 @@ impl TransferEncoding {
pub fn encode(&mut self, msg: &[u8]) -> bool {
match self.kind {
TransferEncodingKind::Eof => {
self.buffer.extend_from_slice(msg);
self.buffer.get_mut().extend_from_slice(msg);
msg.is_empty()
},
TransferEncodingKind::Chunked(ref mut eof) => {
@ -719,11 +721,11 @@ impl TransferEncoding {
if msg.is_empty() {
*eof = true;
self.buffer.extend_from_slice(b"0\r\n\r\n");
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n");
} else {
write!(self.buffer, "{:X}\r\n", msg.len()).unwrap();
self.buffer.extend_from_slice(msg);
self.buffer.extend_from_slice(b"\r\n");
write!(self.buffer.get_mut(), "{:X}\r\n", msg.len()).unwrap();
self.buffer.get_mut().extend_from_slice(msg);
self.buffer.get_mut().extend_from_slice(b"\r\n");
}
*eof
},
@ -733,7 +735,7 @@ impl TransferEncoding {
}
let max = cmp::min(*remaining, msg.len() as u64);
trace!("sized write = {}", max);
self.buffer.extend_from_slice(msg[..max as usize].as_ref());
self.buffer.get_mut().extend_from_slice(msg[..max as usize].as_ref());
*remaining -= max as u64;
trace!("encoded {} bytes, remaining = {}", max, remaining);
@ -750,7 +752,7 @@ impl TransferEncoding {
TransferEncodingKind::Chunked(ref mut eof) => {
if !*eof {
*eof = true;
self.buffer.extend_from_slice(b"0\r\n\r\n");
self.buffer.get_mut().extend_from_slice(b"0\r\n\r\n");
}
},
}

102
src/h1.rs
View file

@ -1,4 +1,4 @@
use std::{self, io, ptr};
use std::{self, io};
use std::rc::Rc;
use std::net::SocketAddr;
use std::time::Duration;
@ -16,14 +16,15 @@ use tokio_core::reactor::Timeout;
use pipeline::Pipeline;
use encoding::PayloadType;
use channel::{HttpHandler, HttpHandlerTask};
use h1writer::H1Writer;
use h1writer::{Writer, H1Writer};
use server::WorkerSettings;
use httpcodes::HTTPNotFound;
use httprequest::HttpRequest;
use error::{ParseError, PayloadError, ResponseError};
use payload::{Payload, PayloadWriter, DEFAULT_BUFFER_SIZE};
const INIT_BUFFER_SIZE: usize = 8192;
const LW_BUFFER_SIZE: usize = 4096;
const HW_BUFFER_SIZE: usize = 16_384;
const MAX_BUFFER_SIZE: usize = 131_072;
const MAX_HEADERS: usize = 100;
const MAX_PIPELINED_MESSAGES: usize = 16;
@ -78,10 +79,11 @@ impl<T, H> Http1<T, H>
H: HttpHandler + 'static
{
pub fn new(h: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>) -> Self {
let bytes = h.get_shared_bytes();
Http1{ flags: Flags::KEEPALIVE,
settings: h,
addr: addr,
stream: H1Writer::new(stream),
stream: H1Writer::new(stream, bytes),
reader: Reader::new(),
read_buf: BytesMut::new(),
tasks: VecDeque::new(),
@ -92,6 +94,18 @@ impl<T, H> Http1<T, H>
(self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze())
}
fn poll_completed(&mut self) -> Result<bool, ()> {
// check stream state
match self.stream.poll_completed() {
Ok(Async::Ready(_)) => Ok(false),
Ok(Async::NotReady) => Ok(true),
Err(err) => {
debug!("Error sending data: {}", err);
Err(())
}
}
}
pub fn poll(&mut self) -> Poll<Http1Result, ()> {
// keep-alive timer
if self.keepalive_timer.is_some() {
@ -116,6 +130,10 @@ impl<T, H> Http1<T, H>
if !io && !item.flags.contains(EntryFlags::EOF) {
if item.flags.contains(EntryFlags::ERROR) {
// check stream state
if let Ok(Async::NotReady) = self.stream.poll_completed() {
return Ok(Async::NotReady)
}
return Err(())
}
@ -146,6 +164,12 @@ impl<T, H> Http1<T, H>
// it is not possible to recover from error
// during pipe handling, so just drop connection
error!("Unhandled error: {}", err);
item.flags.insert(EntryFlags::ERROR);
// check stream state, we still can have valid data in buffer
if let Ok(Async::NotReady) = self.stream.poll_completed() {
return Ok(Async::NotReady)
}
return Err(())
}
}
@ -178,6 +202,10 @@ impl<T, H> Http1<T, H>
// no keep-alive
if !self.flags.contains(Flags::KEEPALIVE) && self.tasks.is_empty() {
// check stream state
if self.poll_completed()? {
return Ok(Async::NotReady)
}
if self.flags.contains(Flags::H2) {
return Ok(Async::Ready(Http1Result::Switch))
} else {
@ -188,7 +216,9 @@ impl<T, H> Http1<T, H>
// read incoming data
while !self.flags.contains(Flags::ERROR) && !self.flags.contains(Flags::H2) &&
self.tasks.len() < MAX_PIPELINED_MESSAGES {
match self.reader.parse(self.stream.get_mut(), &mut self.read_buf) {
match self.reader.parse(self.stream.get_mut(),
&mut self.read_buf, &self.settings)
{
Ok(Async::Ready(Item::Http1(mut req))) => {
not_ready = false;
@ -264,10 +294,16 @@ impl<T, H> Http1<T, H>
self.keepalive_timer = Some(to);
}
} else {
// check stream state
if self.poll_completed()? {
return Ok(Async::NotReady)
}
// keep-alive disable, drop connection
return Ok(Async::Ready(Http1Result::Done))
}
} else {
// check stream state
self.poll_completed()?;
// keep-alive unset, rely on operating system
return Ok(Async::NotReady)
}
@ -279,6 +315,11 @@ impl<T, H> Http1<T, H>
// check for parse error
if self.tasks.is_empty() {
// check stream state
if self.poll_completed()? {
return Ok(Async::NotReady)
}
if self.flags.contains(Flags::H2) {
return Ok(Async::Ready(Http1Result::Switch))
}
@ -288,6 +329,7 @@ impl<T, H> Http1<T, H>
}
if not_ready {
self.poll_completed()?;
return Ok(Async::NotReady)
}
}
@ -358,7 +400,9 @@ impl Reader {
}
}
pub fn parse<T>(&mut self, io: &mut T, buf: &mut BytesMut) -> Poll<Item, ReaderError>
pub fn parse<T, H>(&mut self, io: &mut T,
buf: &mut BytesMut,
settings: &WorkerSettings<H>) -> Poll<Item, ReaderError>
where T: AsyncRead
{
loop {
@ -394,7 +438,7 @@ impl Reader {
}
loop {
match Reader::parse_message(buf).map_err(ReaderError::Error)? {
match Reader::parse_message(buf, settings).map_err(ReaderError::Error)? {
Message::Http1(msg, decoder) => {
if let Some(payload) = decoder {
self.payload = Some(payload);
@ -465,15 +509,9 @@ impl Reader {
}
fn read_from_io<T: AsyncRead>(&mut self, io: &mut T, buf: &mut BytesMut)
-> Poll<usize, io::Error>
{
if buf.remaining_mut() < INIT_BUFFER_SIZE {
buf.reserve(INIT_BUFFER_SIZE);
unsafe { // Zero out unused memory
let b = buf.bytes_mut();
let len = b.len();
ptr::write_bytes(b.as_mut_ptr(), 0, len);
}
-> Poll<usize, io::Error> {
if buf.remaining_mut() < LW_BUFFER_SIZE {
buf.reserve(HW_BUFFER_SIZE);
}
unsafe {
let n = match io.read(buf.bytes_mut()) {
@ -490,7 +528,9 @@ impl Reader {
}
}
fn parse_message(buf: &mut BytesMut) -> Result<Message, ParseError> {
fn parse_message<H>(buf: &mut BytesMut, settings: &WorkerSettings<H>)
-> Result<Message, ParseError>
{
if buf.is_empty() {
return Ok(Message::NotReady);
}
@ -537,13 +577,14 @@ impl Reader {
let uri = Uri::from_shared(path).map_err(ParseError::Uri)?;
// convert headers
let mut headers = HeaderMap::with_capacity(headers_len);
let msg = settings.get_http_message();
msg.get_mut().headers.reserve(headers_len);
for header in headers_indices[..headers_len].iter() {
if let Ok(name) = HeaderName::try_from(slice.slice(header.name.0, header.name.1)) {
if let Ok(value) = HeaderValue::try_from(
slice.slice(header.value.0, header.value.1))
{
headers.append(name, value);
msg.get_mut().headers.append(name, value);
} else {
return Err(ParseError::Header)
}
@ -552,25 +593,27 @@ impl Reader {
}
}
let decoder = if upgrade(&method, &headers) {
let decoder = if upgrade(&method, &msg.get_mut().headers) {
Decoder::eof()
} else {
let has_len = headers.contains_key(header::CONTENT_LENGTH);
let has_len = msg.get_mut().headers.contains_key(header::CONTENT_LENGTH);
// Chunked encoding
if chunked(&headers)? {
if chunked(&msg.get_mut().headers)? {
if has_len {
return Err(ParseError::Header)
}
Decoder::chunked()
} else {
if !has_len {
let msg = HttpRequest::new(method, uri, version, headers, None);
return Ok(Message::Http1(msg, None))
msg.get_mut().uri = uri;
msg.get_mut().method = method;
msg.get_mut().version = version;
return Ok(Message::Http1(HttpRequest::from_message(msg), None))
}
// Content-Length
let len = headers.get(header::CONTENT_LENGTH).unwrap();
let len = msg.get_mut().headers.get(header::CONTENT_LENGTH).unwrap();
if let Ok(s) = len.to_str() {
if let Ok(len) = s.parse::<u64>() {
Decoder::length(len)
@ -587,11 +630,14 @@ impl Reader {
let (psender, payload) = Payload::new(false);
let info = PayloadInfo {
tx: PayloadType::new(&headers, psender),
tx: PayloadType::new(&msg.get_mut().headers, psender),
decoder: decoder,
};
let msg = HttpRequest::new(method, uri, version, headers, Some(payload));
Ok(Message::Http1(msg, Some(info)))
msg.get_mut().uri = uri;
msg.get_mut().method = method;
msg.get_mut().version = version;
msg.get_mut().payload = Some(payload);
Ok(Message::Http1(HttpRequest::from_message(msg), Some(info)))
}
}

View file

@ -6,6 +6,7 @@ use http::header::{HeaderValue, CONNECTION, DATE};
use helpers;
use body::Body;
use helpers::SharedBytes;
use encoding::PayloadEncoder;
use httprequest::HttpMessage;
use httpresponse::HttpResponse;
@ -31,7 +32,7 @@ pub trait Writer {
fn write_eof(&mut self) -> Result<WriterState, io::Error>;
fn poll_complete(&mut self) -> Poll<(), io::Error>;
fn poll_completed(&mut self) -> Poll<(), io::Error>;
}
bitflags! {
@ -49,17 +50,19 @@ pub(crate) struct H1Writer<T: AsyncWrite> {
encoder: PayloadEncoder,
written: u64,
headers_size: u32,
buffer: SharedBytes,
}
impl<T: AsyncWrite> H1Writer<T> {
pub fn new(stream: T) -> H1Writer<T> {
pub fn new(stream: T, buf: SharedBytes) -> H1Writer<T> {
H1Writer {
flags: Flags::empty(),
stream: stream,
encoder: PayloadEncoder::default(),
encoder: PayloadEncoder::empty(buf.clone()),
written: 0,
headers_size: 0,
buffer: buf,
}
}
@ -125,7 +128,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
// prepare task
self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(req, msg);
self.encoder = PayloadEncoder::new(self.buffer.clone(), req, msg);
if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) {
self.flags.insert(Flags::KEEPALIVE);
}
@ -148,9 +151,9 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
{
let mut buffer = self.encoder.get_mut();
if let Body::Binary(ref bytes) = *msg.body() {
buffer.reserve(150 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else {
buffer.reserve(150 + msg.headers().len() * AVERAGE_HEADER_SIZE);
buffer.reserve(256 + msg.headers().len() * AVERAGE_HEADER_SIZE);
}
match version {
@ -229,7 +232,7 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
}
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
fn poll_completed(&mut self) -> Poll<(), io::Error> {
match self.write_to_stream() {
Ok(WriterState::Done) => Ok(Async::Ready(())),
Ok(WriterState::Pause) => Ok(Async::NotReady),

View file

@ -8,6 +8,7 @@ use http::header::{HeaderValue, CONNECTION, TRANSFER_ENCODING, DATE};
use helpers;
use body::Body;
use helpers::SharedBytes;
use encoding::PayloadEncoder;
use httprequest::HttpMessage;
use httpresponse::HttpResponse;
@ -38,7 +39,7 @@ impl H2Writer {
H2Writer {
respond: respond,
stream: None,
encoder: PayloadEncoder::default(),
encoder: PayloadEncoder::empty(SharedBytes::default()),
flags: Flags::empty(),
written: 0,
}
@ -115,7 +116,7 @@ impl Writer for H2Writer {
// prepare response
self.flags.insert(Flags::STARTED);
self.encoder = PayloadEncoder::new(req, msg);
self.encoder = PayloadEncoder::new(SharedBytes::default(), req, msg);
if let Body::Empty = *msg.body() {
self.flags.insert(Flags::EOF);
}
@ -193,7 +194,7 @@ impl Writer for H2Writer {
}
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
fn poll_completed(&mut self) -> Poll<(), io::Error> {
match self.write_to_stream() {
Ok(WriterState::Done) => Ok(Async::Ready(())),
Ok(WriterState::Pause) => Ok(Async::NotReady),

View file

@ -1,10 +1,15 @@
use std::{str, mem, ptr, slice};
use std::cell::RefCell;
use std::fmt::{self, Write};
use std::rc::Rc;
use std::ops::{Deref, DerefMut};
use std::collections::VecDeque;
use time;
use bytes::BytesMut;
use http::header::HeaderValue;
use httprequest::HttpMessage;
// "Sun, 06 Nov 1994 08:49:37 GMT".len()
pub const DATE_VALUE_LENGTH: usize = 29;
@ -51,6 +56,171 @@ impl fmt::Write for CachedDate {
}
}
/// Internal use only! unsafe
#[derive(Debug)]
pub(crate) struct SharedBytesPool(RefCell<VecDeque<Rc<BytesMut>>>);
impl SharedBytesPool {
pub fn new() -> SharedBytesPool {
SharedBytesPool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get_bytes(&self) -> Rc<BytesMut> {
if let Some(bytes) = self.0.borrow_mut().pop_front() {
bytes
} else {
Rc::new(BytesMut::new())
}
}
pub fn release_bytes(&self, mut bytes: Rc<BytesMut>) {
if self.0.borrow().len() < 128 {
Rc::get_mut(&mut bytes).unwrap().take();
self.0.borrow_mut().push_front(bytes);
}
}
}
#[derive(Debug)]
pub(crate) struct SharedBytes(
Option<Rc<BytesMut>>, Option<Rc<SharedBytesPool>>);
impl Drop for SharedBytes {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(bytes) = self.0.take() {
if Rc::strong_count(&bytes) == 1 {
pool.release_bytes(bytes);
}
}
}
}
}
impl SharedBytes {
pub fn new(bytes: Rc<BytesMut>, pool: Rc<SharedBytesPool>) -> SharedBytes {
SharedBytes(Some(bytes), Some(pool))
}
#[inline]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))]
pub fn get_mut(&self) -> &mut BytesMut {
let r: &BytesMut = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}
#[inline]
pub fn get_ref(&self) -> &BytesMut {
self.0.as_ref().unwrap()
}
}
impl Default for SharedBytes {
fn default() -> Self {
SharedBytes(Some(Rc::new(BytesMut::new())), None)
}
}
impl Clone for SharedBytes {
fn clone(&self) -> SharedBytes {
SharedBytes(self.0.clone(), self.1.clone())
}
}
/// Internal use only! unsafe
pub(crate) struct SharedMessagePool(RefCell<VecDeque<Rc<HttpMessage>>>);
impl SharedMessagePool {
pub fn new() -> SharedMessagePool {
SharedMessagePool(RefCell::new(VecDeque::with_capacity(128)))
}
pub fn get(&self) -> Rc<HttpMessage> {
if let Some(msg) = self.0.borrow_mut().pop_front() {
msg
} else {
Rc::new(HttpMessage::default())
}
}
pub fn release(&self, mut msg: Rc<HttpMessage>) {
if self.0.borrow().len() < 128 {
Rc::get_mut(&mut msg).unwrap().reset();
self.0.borrow_mut().push_front(msg);
}
}
}
pub(crate) struct SharedHttpMessage(
Option<Rc<HttpMessage>>, Option<Rc<SharedMessagePool>>);
impl Drop for SharedHttpMessage {
fn drop(&mut self) {
if let Some(ref pool) = self.1 {
if let Some(msg) = self.0.take() {
if Rc::strong_count(&msg) == 1 {
pool.release(msg);
}
}
}
}
}
impl Deref for SharedHttpMessage {
type Target = HttpMessage;
fn deref(&self) -> &HttpMessage {
self.get_ref()
}
}
impl DerefMut for SharedHttpMessage {
fn deref_mut(&mut self) -> &mut HttpMessage {
self.get_mut()
}
}
impl Clone for SharedHttpMessage {
fn clone(&self) -> SharedHttpMessage {
SharedHttpMessage(self.0.clone(), self.1.clone())
}
}
impl Default for SharedHttpMessage {
fn default() -> SharedHttpMessage {
SharedHttpMessage(Some(Rc::new(HttpMessage::default())), None)
}
}
impl SharedHttpMessage {
pub fn from_message(msg: HttpMessage) -> SharedHttpMessage {
SharedHttpMessage(Some(Rc::new(msg)), None)
}
pub fn new(msg: Rc<HttpMessage>, pool: Rc<SharedMessagePool>) -> SharedHttpMessage {
SharedHttpMessage(Some(msg), Some(pool))
}
#[inline(always)]
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref, inline_always))]
pub fn get_mut(&self) -> &mut HttpMessage {
let r: &HttpMessage = self.0.as_ref().unwrap().as_ref();
unsafe{mem::transmute(r)}
}
#[inline]
pub fn get_ref(&self) -> &HttpMessage {
self.0.as_ref().unwrap()
}
}
const DEC_DIGITS_LUT: &[u8] =
b"0001020304050607080910111213141516171819\
2021222324252627282930313233343536373839\

View file

@ -15,6 +15,7 @@ use param::Params;
use router::Router;
use payload::Payload;
use multipart::Multipart;
use helpers::SharedHttpMessage;
use error::{ParseError, PayloadError, UrlGenerationError,
MultipartError, CookieParseError, HttpRangeError, UrlencodedError};
@ -69,10 +70,20 @@ impl HttpMessage {
self.version != Version::HTTP_10
}
}
pub(crate) fn reset(&mut self) {
self.headers.clear();
self.extensions.clear();
self.params.clear();
self.cookies.take();
self.addr.take();
self.payload.take();
self.info.take();
}
}
/// An HTTP Request
pub struct HttpRequest<S=()>(Rc<HttpMessage>, Option<Rc<S>>, Option<Router<S>>);
pub struct HttpRequest<S=()>(SharedHttpMessage, Option<Rc<S>>, Option<Router<S>>);
impl HttpRequest<()> {
/// Construct a new Request.
@ -81,7 +92,7 @@ impl HttpRequest<()> {
version: Version, headers: HeaderMap, payload: Option<Payload>) -> HttpRequest
{
HttpRequest(
Rc::new(HttpMessage {
SharedHttpMessage::from_message(HttpMessage {
method: method,
uri: uri,
version: version,
@ -98,6 +109,10 @@ impl HttpRequest<()> {
)
}
pub(crate) fn from_message(msg: SharedHttpMessage) -> HttpRequest {
HttpRequest(msg, None, None)
}
/// Construct a new Request.
#[inline]
#[cfg(test)]
@ -106,7 +121,7 @@ impl HttpRequest<()> {
use std::str::FromStr;
HttpRequest(
Rc::new(HttpMessage {
SharedHttpMessage::from_message(HttpMessage {
method: Method::GET,
uri: Uri::from_str(path).unwrap(),
version: Version::HTTP_11,
@ -133,7 +148,7 @@ impl<S> HttpRequest<S> {
/// Construct new http request without state.
pub fn clone_without_state(&self) -> HttpRequest {
HttpRequest(Rc::clone(&self.0), None, None)
HttpRequest(self.0.clone(), None, None)
}
// get mutable reference for inner message
@ -142,10 +157,15 @@ impl<S> HttpRequest<S> {
#[allow(mutable_transmutes)]
#[cfg_attr(feature = "cargo-clippy", allow(mut_from_ref))]
fn as_mut(&self) -> &mut HttpMessage {
let r: &HttpMessage = self.0.as_ref();
unsafe{mem::transmute(r)}
self.0.get_mut()
}
#[inline]
fn as_ref(&self) -> &HttpMessage {
self.0.get_ref()
}
#[inline]
pub(crate) fn get_inner(&mut self) -> &mut HttpMessage {
self.as_mut()
}
@ -173,22 +193,22 @@ impl<S> HttpRequest<S> {
/// Read the Request Uri.
#[inline]
pub fn uri(&self) -> &Uri { &self.0.uri }
pub fn uri(&self) -> &Uri { &self.as_ref().uri }
/// Read the Request method.
#[inline]
pub fn method(&self) -> &Method { &self.0.method }
pub fn method(&self) -> &Method { &self.as_ref().method }
/// Read the Request Version.
#[inline]
pub fn version(&self) -> Version {
self.0.version
self.as_ref().version
}
/// Read the Request Headers.
#[inline]
pub fn headers(&self) -> &HeaderMap {
&self.0.headers
&self.as_ref().headers
}
#[doc(hidden)]
@ -200,17 +220,17 @@ impl<S> HttpRequest<S> {
/// The target path of this Request.
#[inline]
pub fn path(&self) -> &str {
self.0.uri.path()
self.as_ref().uri.path()
}
/// Get *ConnectionInfo* for currect request.
pub fn connection_info(&self) -> &ConnectionInfo {
if self.0.info.is_none() {
if self.as_ref().info.is_none() {
let info: ConnectionInfo<'static> = unsafe{
mem::transmute(ConnectionInfo::new(self))};
self.as_mut().info = Some(info);
}
self.0.info.as_ref().unwrap()
self.as_ref().info.as_ref().unwrap()
}
pub fn url_for<U, I>(&self, name: &str, elements: U) -> Result<Url, UrlGenerationError>
@ -237,7 +257,7 @@ impl<S> HttpRequest<S> {
#[inline]
pub fn peer_addr(&self) -> Option<&SocketAddr> {
self.0.addr.as_ref()
self.as_ref().addr.as_ref()
}
#[inline]
@ -248,7 +268,7 @@ impl<S> HttpRequest<S> {
/// Return a new iterator that yields pairs of `Cow<str>` for query parameters
pub fn query(&self) -> HashMap<String, String> {
let mut q: HashMap<String, String> = HashMap::new();
if let Some(query) = self.0.uri.query().as_ref() {
if let Some(query) = self.as_ref().uri.query().as_ref() {
for (key, val) in form_urlencoded::parse(query.as_ref()) {
q.insert(key.to_string(), val.to_string());
}
@ -261,7 +281,7 @@ impl<S> HttpRequest<S> {
/// E.g., id=10
#[inline]
pub fn query_string(&self) -> &str {
if let Some(query) = self.0.uri.query().as_ref() {
if let Some(query) = self.as_ref().uri.query().as_ref() {
query
} else {
""
@ -271,7 +291,7 @@ impl<S> HttpRequest<S> {
/// Load request cookies.
#[inline]
pub fn cookies(&self) -> Result<&Vec<Cookie<'static>>, CookieParseError> {
if self.0.cookies.is_none() {
if self.as_ref().cookies.is_none() {
let msg = self.as_mut();
let mut cookies = Vec::new();
if let Some(val) = msg.headers.get(header::COOKIE) {
@ -283,7 +303,7 @@ impl<S> HttpRequest<S> {
}
msg.cookies = Some(cookies)
}
Ok(self.0.cookies.as_ref().unwrap())
Ok(self.as_ref().cookies.as_ref().unwrap())
}
/// Return request cookie.
@ -304,7 +324,7 @@ impl<S> HttpRequest<S> {
/// for matching storing that segment of the request url in the Params object.
#[inline]
pub fn match_info(&self) -> &Params {
unsafe{ mem::transmute(&self.0.params) }
unsafe{ mem::transmute(&self.as_ref().params) }
}
/// Set request Params.
@ -315,25 +335,25 @@ impl<S> HttpRequest<S> {
/// Checks if a connection should be kept alive.
pub fn keep_alive(&self) -> bool {
if let Some(conn) = self.0.headers.get(header::CONNECTION) {
if let Some(conn) = self.headers().get(header::CONNECTION) {
if let Ok(conn) = conn.to_str() {
if self.0.version == Version::HTTP_10 && conn.contains("keep-alive") {
if self.as_ref().version == Version::HTTP_10 && conn.contains("keep-alive") {
true
} else {
self.0.version == Version::HTTP_11 &&
self.as_ref().version == Version::HTTP_11 &&
!(conn.contains("close") || conn.contains("upgrade"))
}
} else {
false
}
} else {
self.0.version != Version::HTTP_10
self.as_ref().version != Version::HTTP_10
}
}
/// Read the request content type
pub fn content_type(&self) -> &str {
if let Some(content_type) = self.0.headers.get(header::CONTENT_TYPE) {
if let Some(content_type) = self.headers().get(header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
return content_type
}
@ -343,17 +363,17 @@ impl<S> HttpRequest<S> {
/// Check if request requires connection upgrade
pub(crate) fn upgrade(&self) -> bool {
if let Some(conn) = self.0.headers.get(header::CONNECTION) {
if let Some(conn) = self.as_ref().headers.get(header::CONNECTION) {
if let Ok(s) = conn.to_str() {
return s.to_lowercase().contains("upgrade")
}
}
self.0.method == Method::CONNECT
self.as_ref().method == Method::CONNECT
}
/// Check if request has chunked transfer encoding
pub fn chunked(&self) -> Result<bool, ParseError> {
if let Some(encodings) = self.0.headers.get(header::TRANSFER_ENCODING) {
if let Some(encodings) = self.headers().get(header::TRANSFER_ENCODING) {
if let Ok(s) = encodings.to_str() {
Ok(s.to_lowercase().contains("chunked"))
} else {
@ -367,7 +387,7 @@ impl<S> HttpRequest<S> {
/// Parses Range HTTP header string as per RFC 2616.
/// `size` is full size of response (file).
pub fn range(&self, size: u64) -> Result<Vec<HttpRange>, HttpRangeError> {
if let Some(range) = self.0.headers.get(header::RANGE) {
if let Some(range) = self.headers().get(header::RANGE) {
HttpRange::parse(unsafe{str::from_utf8_unchecked(range.as_bytes())}, size)
.map_err(|e| e.into())
} else {
@ -378,7 +398,7 @@ impl<S> HttpRequest<S> {
/// Returns reference to the associated http payload.
#[inline]
pub fn payload(&self) -> Option<&Payload> {
self.0.payload.as_ref()
self.as_ref().payload.as_ref()
}
/// Returns mutable reference to the associated http payload.
@ -397,7 +417,7 @@ impl<S> HttpRequest<S> {
///
/// Content-type: multipart/form-data;
pub fn multipart(&mut self) -> Result<Multipart, MultipartError> {
let boundary = Multipart::boundary(&self.0.headers)?;
let boundary = Multipart::boundary(self.headers())?;
if let Some(payload) = self.take_payload() {
Ok(Multipart::new(boundary, payload))
} else {
@ -434,7 +454,7 @@ impl<S> HttpRequest<S> {
}
// check content type
let t = if let Some(content_type) = self.0.headers.get(header::CONTENT_TYPE) {
let t = if let Some(content_type) = self.headers().get(header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
content_type.to_lowercase() == "application/x-www-form-urlencoded"
} else {
@ -460,29 +480,29 @@ impl Default for HttpRequest<()> {
/// Construct default request
fn default() -> HttpRequest {
HttpRequest(Rc::new(HttpMessage::default()), None, None)
HttpRequest(SharedHttpMessage::default(), None, None)
}
}
impl<S> Clone for HttpRequest<S> {
fn clone(&self) -> HttpRequest<S> {
HttpRequest(Rc::clone(&self.0), self.1.clone(), None)
HttpRequest(self.0.clone(), self.1.clone(), None)
}
}
impl<S> fmt::Debug for HttpRequest<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let res = write!(f, "\nHttpRequest {:?} {}:{}\n",
self.0.version, self.0.method, self.0.uri);
self.as_ref().version, self.as_ref().method, self.as_ref().uri);
if !self.query_string().is_empty() {
let _ = write!(f, " query: ?{:?}\n", self.query_string());
}
if !self.match_info().is_empty() {
let _ = write!(f, " params: {:?}\n", self.0.params);
let _ = write!(f, " params: {:?}\n", self.as_ref().params);
}
let _ = write!(f, " headers:\n");
for key in self.0.headers.keys() {
let vals: Vec<_> = self.0.headers.get_all(key).iter().collect();
for key in self.as_ref().headers.keys() {
let vals: Vec<_> = self.as_ref().headers.get_all(key).iter().collect();
if vals.len() > 1 {
let _ = write!(f, " {:?}: {:?}\n", key, vals);
} else {

View file

@ -30,6 +30,10 @@ impl<'a> Default for Params<'a> {
impl<'a> Params<'a> {
pub(crate) fn clear(&mut self) {
self.0.clear();
}
pub(crate) fn add(&mut self, name: &'a str, value: &'a str) {
self.0.push((name, value));
}

View file

@ -755,16 +755,18 @@ impl<S> ProcessResponse<S> {
}
}
// flush io
match io.poll_complete() {
Ok(Async::Ready(_)) =>
self.running.resume(),
Ok(Async::NotReady) =>
return Err(PipelineState::Response(self)),
Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
// flush io but only if we need to
if self.running == RunningState::Paused || !self.drain.0.is_empty() {
match io.poll_completed() {
Ok(Async::Ready(_)) =>
self.running.resume(),
Ok(Async::NotReady) =>
return Err(PipelineState::Response(self)),
Err(err) => {
debug!("Error sending data: {}", err);
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
}
}

View file

@ -413,6 +413,8 @@ pub(crate) struct WorkerSettings<H> {
h: Vec<H>,
enabled: bool,
keep_alive: u64,
bytes: Rc<helpers::SharedBytesPool>,
messages: Rc<helpers::SharedMessagePool>,
}
impl<H> WorkerSettings<H> {
@ -421,6 +423,8 @@ impl<H> WorkerSettings<H> {
h: h,
enabled: if let Some(ka) = keep_alive { ka > 0 } else { false },
keep_alive: keep_alive.unwrap_or(0),
bytes: Rc::new(helpers::SharedBytesPool::new()),
messages: Rc::new(helpers::SharedMessagePool::new()),
}
}
@ -433,6 +437,12 @@ impl<H> WorkerSettings<H> {
pub fn keep_alive_enabled(&self) -> bool {
self.enabled
}
pub fn get_shared_bytes(&self) -> helpers::SharedBytes {
helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes))
}
pub fn get_http_message(&self) -> helpers::SharedHttpMessage {
helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages))
}
}
impl<H: 'static> Worker<H> {