mirror of
https://github.com/actix/actix-web.git
synced 2025-01-04 14:28:50 +00:00
add write buffer capacity caps for Framed
This commit is contained in:
parent
8f20f69559
commit
8886672ae6
3 changed files with 55 additions and 34 deletions
|
@ -11,6 +11,9 @@ use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
|
use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
|
||||||
use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
|
use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
|
||||||
|
|
||||||
|
const LW: usize = 2 * 1024;
|
||||||
|
const HW: usize = 8 * 1024;
|
||||||
|
|
||||||
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
|
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
|
||||||
/// the `Encoder` and `Decoder` traits to encode and decode frames.
|
/// the `Encoder` and `Decoder` traits to encode and decode frames.
|
||||||
///
|
///
|
||||||
|
@ -45,7 +48,14 @@ where
|
||||||
/// break them into separate objects, allowing them to interact more easily.
|
/// break them into separate objects, allowing them to interact more easily.
|
||||||
pub fn new(inner: T, codec: U) -> Framed<T, U> {
|
pub fn new(inner: T, codec: U) -> Framed<T, U> {
|
||||||
Framed {
|
Framed {
|
||||||
inner: framed_read2(framed_write2(Fuse(inner, codec))),
|
inner: framed_read2(framed_write2(Fuse(inner, codec), LW, HW)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same as `Framed::new()` with ability to specify write buffer low/high capacity watermarks.
|
||||||
|
pub fn new_with_cap(inner: T, codec: U, lw: usize, hw: usize) -> Framed<T, U> {
|
||||||
|
Framed {
|
||||||
|
inner: framed_read2(framed_write2(Fuse(inner, codec), lw, hw)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -75,7 +85,7 @@ impl<T, U> Framed<T, U> {
|
||||||
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
|
pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
|
||||||
Framed {
|
Framed {
|
||||||
inner: framed_read2_with_buffer(
|
inner: framed_read2_with_buffer(
|
||||||
framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf),
|
framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf, parts.write_buf_lw, parts.write_buf_hw),
|
||||||
parts.read_buf,
|
parts.read_buf,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
@ -123,11 +133,11 @@ impl<T, U> Framed<T, U> {
|
||||||
/// Consume the `Frame`, returning `Frame` with different codec.
|
/// Consume the `Frame`, returning `Frame` with different codec.
|
||||||
pub fn into_framed<U2>(self, codec: U2) -> Framed<T, U2> {
|
pub fn into_framed<U2>(self, codec: U2) -> Framed<T, U2> {
|
||||||
let (inner, read_buf) = self.inner.into_parts();
|
let (inner, read_buf) = self.inner.into_parts();
|
||||||
let (inner, write_buf) = inner.into_parts();
|
let (inner, write_buf, lw, hw) = inner.into_parts();
|
||||||
|
|
||||||
Framed {
|
Framed {
|
||||||
inner: framed_read2_with_buffer(
|
inner: framed_read2_with_buffer(
|
||||||
framed_write2_with_buffer(Fuse(inner.0, codec), write_buf),
|
framed_write2_with_buffer(Fuse(inner.0, codec), write_buf, lw, hw),
|
||||||
read_buf,
|
read_buf,
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
@ -141,13 +151,15 @@ impl<T, U> Framed<T, U> {
|
||||||
/// being worked with.
|
/// being worked with.
|
||||||
pub fn into_parts(self) -> FramedParts<T, U> {
|
pub fn into_parts(self) -> FramedParts<T, U> {
|
||||||
let (inner, read_buf) = self.inner.into_parts();
|
let (inner, read_buf) = self.inner.into_parts();
|
||||||
let (inner, write_buf) = inner.into_parts();
|
let (inner, write_buf, write_buf_lw, write_buf_hw) = inner.into_parts();
|
||||||
|
|
||||||
FramedParts {
|
FramedParts {
|
||||||
io: inner.0,
|
io: inner.0,
|
||||||
codec: inner.1,
|
codec: inner.1,
|
||||||
read_buf: read_buf,
|
read_buf,
|
||||||
write_buf: write_buf,
|
write_buf,
|
||||||
|
write_buf_lw,
|
||||||
|
write_buf_hw,
|
||||||
_priv: (),
|
_priv: (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -273,6 +285,12 @@ pub struct FramedParts<T, U> {
|
||||||
/// A buffer with unprocessed data which are not written yet.
|
/// A buffer with unprocessed data which are not written yet.
|
||||||
pub write_buf: BytesMut,
|
pub write_buf: BytesMut,
|
||||||
|
|
||||||
|
/// A buffer low watermark capacity
|
||||||
|
pub write_buf_lw: usize,
|
||||||
|
|
||||||
|
/// A buffer high watermark capacity
|
||||||
|
pub write_buf_hw: usize,
|
||||||
|
|
||||||
/// This private field allows us to add additional fields in the future in a
|
/// This private field allows us to add additional fields in the future in a
|
||||||
/// backwards compatible way.
|
/// backwards compatible way.
|
||||||
_priv: (),
|
_priv: (),
|
||||||
|
@ -286,6 +304,8 @@ impl<T, U> FramedParts<T, U> {
|
||||||
codec,
|
codec,
|
||||||
read_buf: BytesMut::new(),
|
read_buf: BytesMut::new(),
|
||||||
write_buf: BytesMut::new(),
|
write_buf: BytesMut::new(),
|
||||||
|
write_buf_lw: LW,
|
||||||
|
write_buf_hw: HW,
|
||||||
_priv: (),
|
_priv: (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,20 +16,19 @@ pub struct FramedWrite<T, E> {
|
||||||
pub struct FramedWrite2<T> {
|
pub struct FramedWrite2<T> {
|
||||||
inner: T,
|
inner: T,
|
||||||
buffer: BytesMut,
|
buffer: BytesMut,
|
||||||
|
low_watermark: usize,
|
||||||
|
high_watermark: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
const INITIAL_CAPACITY: usize = 8 * 1024;
|
|
||||||
const BACKPRESSURE_BOUNDARY: usize = INITIAL_CAPACITY;
|
|
||||||
|
|
||||||
impl<T, E> FramedWrite<T, E>
|
impl<T, E> FramedWrite<T, E>
|
||||||
where
|
where
|
||||||
T: AsyncWrite,
|
T: AsyncWrite,
|
||||||
E: Encoder,
|
E: Encoder,
|
||||||
{
|
{
|
||||||
/// Creates a new `FramedWrite` with the given `encoder`.
|
/// Creates a new `FramedWrite` with the given `encoder`.
|
||||||
pub fn new(inner: T, encoder: E) -> FramedWrite<T, E> {
|
pub fn new(inner: T, encoder: E, lw: usize, hw: usize) -> FramedWrite<T, E> {
|
||||||
FramedWrite {
|
FramedWrite {
|
||||||
inner: framed_write2(Fuse(inner, encoder)),
|
inner: framed_write2(Fuse(inner, encoder), lw, hw),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -124,21 +123,25 @@ where
|
||||||
|
|
||||||
// ===== impl FramedWrite2 =====
|
// ===== impl FramedWrite2 =====
|
||||||
|
|
||||||
pub fn framed_write2<T>(inner: T) -> FramedWrite2<T> {
|
pub fn framed_write2<T>(inner: T, low_watermark: usize, high_watermark: usize) -> FramedWrite2<T> {
|
||||||
FramedWrite2 {
|
FramedWrite2 {
|
||||||
inner: inner,
|
inner,
|
||||||
buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
|
low_watermark,
|
||||||
|
high_watermark,
|
||||||
|
buffer: BytesMut::with_capacity(high_watermark),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn framed_write2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedWrite2<T> {
|
pub fn framed_write2_with_buffer<T>(inner: T, mut buffer: BytesMut, low_watermark: usize, high_watermark: usize) -> FramedWrite2<T> {
|
||||||
if buf.capacity() < INITIAL_CAPACITY {
|
if buffer.capacity() < high_watermark {
|
||||||
let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
|
let bytes_to_reserve = high_watermark - buffer.capacity();
|
||||||
buf.reserve(bytes_to_reserve);
|
buffer.reserve(bytes_to_reserve);
|
||||||
}
|
}
|
||||||
FramedWrite2 {
|
FramedWrite2 {
|
||||||
inner: inner,
|
inner,
|
||||||
buffer: buf,
|
buffer,
|
||||||
|
low_watermark,
|
||||||
|
high_watermark,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,8 +154,8 @@ impl<T> FramedWrite2<T> {
|
||||||
self.inner
|
self.inner
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn into_parts(self) -> (T, BytesMut) {
|
pub fn into_parts(self) -> (T, BytesMut, usize, usize) {
|
||||||
(self.inner, self.buffer)
|
(self.inner, self.buffer, self.low_watermark, self.high_watermark)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_mut(&mut self) -> &mut T {
|
pub fn get_mut(&mut self) -> &mut T {
|
||||||
|
@ -168,15 +171,13 @@ where
|
||||||
type SinkError = T::Error;
|
type SinkError = T::Error;
|
||||||
|
|
||||||
fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> {
|
fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> {
|
||||||
// If the buffer is already over 8KiB, then attempt to flush it. If after
|
// Check the buffer capacity
|
||||||
// flushing it's *still* over 8KiB, then apply backpressure (reject the
|
let len = self.buffer.len();
|
||||||
// send).
|
if len >= self.high_watermark {
|
||||||
if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
|
return Ok(AsyncSink::NotReady(item));
|
||||||
try!(self.poll_complete());
|
}
|
||||||
|
if len < self.low_watermark {
|
||||||
if self.buffer.len() >= BACKPRESSURE_BOUNDARY {
|
self.buffer.reserve(self.high_watermark - len)
|
||||||
return Ok(AsyncSink::NotReady(item));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try!(self.inner.encode(item, &mut self.buffer));
|
try!(self.inner.encode(item, &mut self.buffer));
|
||||||
|
|
|
@ -14,12 +14,12 @@
|
||||||
|
|
||||||
mod bcodec;
|
mod bcodec;
|
||||||
mod framed;
|
mod framed;
|
||||||
mod framed2;
|
// mod framed2;
|
||||||
mod framed_read;
|
mod framed_read;
|
||||||
mod framed_write;
|
mod framed_write;
|
||||||
|
|
||||||
pub use self::bcodec::BytesCodec;
|
pub use self::bcodec::BytesCodec;
|
||||||
pub use self::framed::{Framed, FramedParts};
|
pub use self::framed::{Framed, FramedParts};
|
||||||
pub use self::framed2::{Framed2, FramedParts2};
|
// pub use self::framed2::{Framed2, FramedParts2};
|
||||||
pub use self::framed_read::FramedRead;
|
pub use self::framed_read::FramedRead;
|
||||||
pub use self::framed_write::FramedWrite;
|
pub use self::framed_write::FramedWrite;
|
||||||
|
|
Loading…
Reference in a new issue