1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-29 19:40:34 +00:00

allow to force write to Framed object

This commit is contained in:
Nikolay Kim 2018-11-14 10:26:49 -08:00
parent c29501fcf3
commit dba86fbbf8
3 changed files with 56 additions and 1 deletions

View file

@ -1,5 +1,11 @@
# Changes
## [0.2.2] - 2018-11-14
* Refactor Connector and Resolver services
* Add low/high caps to Framed
## [0.2.0] - 2018-11-08
### Added

View file

@ -54,10 +54,19 @@ where
/// Same as `Framed::new()` with ability to specify write buffer low/high capacity watermarks.
pub fn new_with_caps(inner: T, codec: U, lw: usize, hw: usize) -> Framed<T, U> {
debug_assert!((lw < hw) && hw != 0);
Framed {
inner: framed_read2(framed_write2(Fuse(inner, codec), lw, hw)),
}
}
/// Force send item
pub fn force_send(
&mut self,
item: <U as Encoder>::Item,
) -> Result<(), <U as Encoder>::Error> {
self.inner.get_mut().force_send(item)
}
}
impl<T, U> Framed<T, U> {
@ -153,6 +162,22 @@ impl<T, U> Framed<T, U> {
}
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn map_codec<F, U2>(self, f: F) -> Framed<T, U2>
where
F: Fn(U) -> U2,
{
let (inner, read_buf) = self.inner.into_parts();
let (inner, write_buf, lw, hw) = inner.into_parts();
Framed {
inner: framed_read2_with_buffer(
framed_write2_with_buffer(Fuse(inner.0, f(inner.1)), write_buf, lw, hw),
read_buf,
),
}
}
/// Consumes the `Frame`, returning its underlying I/O stream, the buffer
/// with unprocessed data, and the codec.
///

View file

@ -79,6 +79,16 @@ impl<T, E> FramedWrite<T, E> {
}
}
impl<T, E> FramedWrite<T, E>
where
E: Encoder,
{
/// Force send item
pub fn force_send(&mut self, item: E::Item) -> Result<(), E::Error> {
self.inner.force_send(item)
}
}
impl<T, E> Sink for FramedWrite<T, E>
where
T: AsyncWrite,
@ -186,6 +196,20 @@ impl<T> FramedWrite2<T> {
}
}
impl<T> FramedWrite2<T>
where
T: Encoder,
{
pub fn force_send(&mut self, item: T::Item) -> Result<(), T::Error> {
let len = self.buffer.len();
if len < self.low_watermark {
self.buffer.reserve(self.high_watermark - len)
}
self.inner.encode(item, &mut self.buffer)?;
Ok(())
}
}
impl<T> Sink for FramedWrite2<T>
where
T: AsyncWrite + Encoder,
@ -203,7 +227,7 @@ where
self.buffer.reserve(self.high_watermark - len)
}
try!(self.inner.encode(item, &mut self.buffer));
self.inner.encode(item, &mut self.buffer)?;
Ok(AsyncSink::Ready)
}