diff --git a/Cargo.toml b/Cargo.toml index c31c325ad..ad01691e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ bytes = "0.4" futures = "0.1" slab = "0.4" tokio = "0.1" +tokio-codec = "0.1" tokio-io = "0.1" tokio-tcp = "0.1" tokio-timer = "0.2" diff --git a/src/framed.rs b/src/framed.rs new file mode 100644 index 000000000..93e462bc6 --- /dev/null +++ b/src/framed.rs @@ -0,0 +1,304 @@ +//! Framed dispatcher service and related utilities +use std::fmt; +use std::marker::PhantomData; + +use actix; +use futures::future::{ok, Either, FutureResult}; +use futures::unsync::mpsc; +use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; +use tokio_codec::{Decoder, Encoder, Framed}; +use tokio_io::{AsyncRead, AsyncWrite}; + +use service::{IntoNewService, IntoService, NewService, Service}; + +type Item = ::Item; +type StreamItem = Result<::Item, ::Error>; + +pub struct FramedNewService { + factory: S, + _t: PhantomData<(T, U)>, +} + +impl FramedNewService +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + S: NewService, Response = Option>> + Clone, + <::Service as Service>::Future: 'static, + <::Service as Service>::Error: From<::Error> + 'static, + ::Item: fmt::Debug + 'static, + ::Error: fmt::Debug + 'static, +{ + pub fn new>(factory: F1) -> Self { + Self { + factory: factory.into_new_service(), + _t: PhantomData, + } + } +} + +impl Clone for FramedNewService +where + S: Clone, +{ + fn clone(&self) -> Self { + Self { + factory: self.factory.clone(), + _t: PhantomData, + } + } +} + +impl NewService for FramedNewService +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + S: NewService, Response = Option>> + Clone, + <::Service as Service>::Future: 'static, + <::Service as Service>::Error: From<::Error> + 'static, + ::Item: fmt::Debug + 'static, + ::Error: fmt::Debug + 'static, +{ + type Request = Framed; + type Response = FramedDispatcher; + type Error = S::InitError; + type InitError = S::InitError; + type Service = FramedService; + type Future = FutureResult; + + fn new_service(&self) -> Self::Future { + ok(FramedService { + factory: self.factory.clone(), + _t: PhantomData, + }) + } +} + +pub struct FramedService { + factory: S, + _t: PhantomData<(T, U)>, +} + +impl Clone for FramedService +where + S: Clone, +{ + fn clone(&self) -> Self { + Self { + factory: self.factory.clone(), + _t: PhantomData, + } + } +} + +impl Service for FramedService +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + S: NewService, Response = Option>>, + <::Service as Service>::Future: 'static, + <::Service as Service>::Error: From<::Error> + 'static, + ::Item: fmt::Debug + 'static, + ::Error: fmt::Debug + 'static, +{ + type Request = Framed; + type Response = FramedDispatcher; + type Error = S::InitError; + type Future = FramedServiceResponseFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + FramedServiceResponseFuture { + fut: self.factory.new_service(), + framed: Some(req), + } + } +} + +#[doc(hidden)] +pub struct FramedServiceResponseFuture +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + S: NewService, Response = Option>>, + <::Service as Service>::Future: 'static, + <::Service as Service>::Error: From<::Error> + 'static, + ::Item: fmt::Debug + 'static, + ::Error: fmt::Debug + 'static, +{ + fut: S::Future, + framed: Option>, +} + +impl Future for FramedServiceResponseFuture +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + S: NewService, Response = Option>>, + <::Service as Service>::Future: 'static, + <::Service as Service>::Error: From<::Error> + 'static, + ::Item: fmt::Debug + 'static, + ::Error: fmt::Debug + 'static, +{ + type Item = FramedDispatcher; + type Error = S::InitError; + + fn poll(&mut self) -> Poll { + match self.fut.poll()? { + Async::NotReady => Ok(Async::NotReady), + Async::Ready(service) => Ok(Async::Ready(FramedDispatcher::new( + self.framed.take().unwrap(), + service, + ))), + } + } +} + +/// FramedDispatcher - is a future that reads frames from Framed object +/// and pass then to the service. +pub struct FramedDispatcher +where + S: Service, + T: AsyncRead + AsyncWrite, + U: Encoder + Decoder, +{ + service: S, + framed: Framed, + item: Option>, + write_item: Option>, + write_rx: mpsc::Receiver, S::Error>>, + write_tx: mpsc::Sender, S::Error>>, + flushed: bool, +} + +impl FramedDispatcher +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + S: Service, Response = Option>>, + S::Future: 'static, + S::Error: From<::Error> + 'static, + ::Item: fmt::Debug + 'static, + ::Error: fmt::Debug + 'static, +{ + pub fn new>(framed: Framed, service: F) -> Self { + let (write_tx, write_rx) = mpsc::channel(16); + FramedDispatcher { + framed, + item: None, + service: service.into_service(), + write_rx, + write_tx, + write_item: None, + flushed: true, + } + } +} + +impl Future for FramedDispatcher +where + T: AsyncRead + AsyncWrite, + U: Decoder + Encoder, + S: Service, Response = Option>>, + S::Future: 'static, + S::Error: From<::Error> + 'static, + ::Item: fmt::Debug + 'static, + ::Error: fmt::Debug + 'static, +{ + type Item = (); + type Error = S::Error; + + fn poll(&mut self) -> Poll { + if let Async::Ready(_) = self.service.poll_ready()? { + let mut item = self.item.take(); + loop { + if let Some(item) = item { + match self.service.poll_ready()? { + Async::Ready(_) => { + let sender = self.write_tx.clone(); + actix::Arbiter::spawn(self.service.call(item).then(|item| { + let item = match item { + Ok(item) => { + if let Some(item) = item { + Ok(item) + } else { + return Either::B(ok(())); + } + } + Err(err) => Err(err), + }; + Either::A(sender.send(item).map(|_| ()).map_err(|_| ())) + })); + } + Async::NotReady => { + self.item = Some(item); + break; + } + } + } + match self.framed.poll() { + Ok(Async::Ready(Some(el))) => item = Some(Ok(el)), + Err(err) => item = Some(Err(err)), + Ok(Async::NotReady) => break, + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + } + } + } + + // write + let mut item = self.write_item.take(); + loop { + item = if let Some(msg) = item { + self.flushed = false; + match self.framed.start_send(msg) { + Ok(AsyncSink::Ready) => None, + Ok(AsyncSink::NotReady(item)) => Some(item), + Err(err) => { + trace!("Connection error: {:?}", err); + return Err(err.into()); + } + } + } else { + None + }; + + // flush sink + if !self.flushed { + match self.framed.poll_complete() { + Ok(Async::Ready(_)) => { + self.flushed = true; + } + Ok(Async::NotReady) => break, + Err(err) => { + trace!("Connection flush error: {:?}", err); + return Err(err.into()); + } + } + } + + // check channel + if self.flushed { + if item.is_none() { + match self.write_rx.poll() { + Ok(Async::Ready(Some(msg))) => match msg { + Ok(msg) => item = Some(msg), + Err(err) => return Err(err), + }, + Ok(Async::NotReady) => break, + Err(_) => panic!("Bug in gw code"), + Ok(Async::Ready(None)) => panic!("Bug in gw code"), + } + } else { + continue; + } + } else { + self.write_item = item; + break; + } + } + Ok(Async::NotReady) + } +} diff --git a/src/lib.rs b/src/lib.rs index 7443118b8..324c30841 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ extern crate net2; extern crate num_cpus; extern crate slab; extern crate tokio; +extern crate tokio_codec; extern crate tokio_current_thread; extern crate tokio_io; extern crate tokio_reactor; @@ -57,6 +58,7 @@ extern crate webpki; extern crate webpki_roots; pub mod connector; +pub mod framed; pub mod resolver; pub mod server; pub mod service;