From 42a49da1999d1b3fa716791c1bed12e5af4dd68e Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 1 Sep 2018 10:29:56 -0700 Subject: [PATCH] add StreamDispatcher --- src/lib.rs | 1 + src/stream.rs | 90 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 src/stream.rs diff --git a/src/lib.rs b/src/lib.rs index b6ebaf5a0..2f694c7d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,6 +58,7 @@ mod server; mod server_service; pub mod service; pub mod ssl; +pub mod stream; mod worker; pub use configurable::{IntoNewConfigurableService, NewConfigurableService}; diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 000000000..f0ef120ec --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,90 @@ +use futures::unsync::mpsc; +use futures::{Async, Future, Poll, Stream}; +use tokio::executor::current_thread::spawn; + +use super::{IntoService, Service}; + +pub struct StreamDispatcher { + stream: S, + service: T, + item: Option>, + stop_rx: mpsc::UnboundedReceiver<()>, + stop_tx: mpsc::UnboundedSender<()>, +} + +impl StreamDispatcher +where + S: Stream, + T: Service, Response = (), Error = ()>, + T::Future: 'static, +{ + pub fn new>(stream: S, service: F) -> Self { + let (stop_tx, stop_rx) = mpsc::unbounded(); + StreamDispatcher { + stream, + item: None, + service: service.into_service(), + stop_rx, + stop_tx, + } + } +} + +impl Future for StreamDispatcher +where + S: Stream, + T: Service, Response = (), Error = ()>, + T::Future: 'static, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + if let Ok(Async::Ready(Some(_))) = self.stop_rx.poll() { + return Ok(Async::Ready(())); + } + + let mut item = self.item.take(); + loop { + if item.is_some() { + match self.service.poll_ready()? { + Async::Ready(_) => spawn(StreamDispatcherService { + fut: self.service.call(item.take().unwrap()), + stop: self.stop_tx.clone(), + }), + Async::NotReady => { + self.item = item; + return Ok(Async::NotReady); + } + } + } + match self.stream.poll() { + Ok(Async::Ready(Some(el))) => item = Some(Ok(el)), + Err(err) => item = Some(Err(err)), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + } + } + } +} + +struct StreamDispatcherService { + fut: F, + stop: mpsc::UnboundedSender<()>, +} + +impl Future for StreamDispatcherService { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.fut.poll() { + Ok(Async::Ready(_)) => Ok(Async::Ready(())), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(_) => { + let _ = self.stop.unbounded_send(()); + Ok(Async::Ready(())) + } + } + } +}