diff --git a/src/h1/codec.rs b/src/h1/codec.rs index a91f5cb34..020466482 100644 --- a/src/h1/codec.rs +++ b/src/h1/codec.rs @@ -103,6 +103,17 @@ impl Codec { self.flags.contains(Flags::KEEPALIVE) } + /// Check last request's message type + pub fn message_type(&self) -> InMessageType { + if self.flags.contains(Flags::UNHANDLED) { + InMessageType::Unhandled + } else if self.payload.is_none() { + InMessageType::None + } else { + InMessageType::Payload + } + } + /// prepare transfer encoding pub fn prepare_te(&mut self, res: &mut Response) { self.te @@ -275,6 +286,7 @@ impl Decoder for Codec { } RequestPayloadType::Unhandled => { self.payload = None; + self.flags.insert(Flags::UNHANDLED); InMessageType::Unhandled } }; diff --git a/src/h1/mod.rs b/src/h1/mod.rs index 4e196ad54..2a276b7a0 100644 --- a/src/h1/mod.rs +++ b/src/h1/mod.rs @@ -10,7 +10,7 @@ mod service; pub use self::codec::{Codec, InMessage, InMessageType, OutMessage}; pub use self::decoder::{PayloadDecoder, RequestDecoder}; pub use self::dispatcher::Dispatcher; -pub use self::service::{H1Service, H1ServiceHandler}; +pub use self::service::{H1Service, H1ServiceHandler, H1SimpleService}; use request::Request; diff --git a/src/h1/service.rs b/src/h1/service.rs index 6e9d7d651..bf92e8d2f 100644 --- a/src/h1/service.rs +++ b/src/h1/service.rs @@ -2,15 +2,18 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::net; +use actix_net::codec::Framed; use actix_net::service::{IntoNewService, NewService, Service}; -use futures::{Async, Future, Poll}; +use futures::future::{ok, FutureResult}; +use futures::{Async, Future, Poll, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; use config::{KeepAlive, ServiceConfig}; -use error::DispatchError; +use error::{DispatchError, ParseError}; use request::Request; use response::Response; +use super::codec::{Codec, InMessage}; use super::dispatcher::Dispatcher; use super::H1ServiceResult; @@ -191,6 +194,7 @@ where } } +#[doc(hidden)] pub struct H1ServiceResponse { fut: S::Future, cfg: Option, @@ -256,3 +260,94 @@ where Dispatcher::new(req, self.cfg.clone(), self.srv.clone()) } } + +/// `NewService` implementation for `H1SimpleServiceHandler` service +pub struct H1SimpleService { + config: ServiceConfig, + _t: PhantomData, +} + +impl H1SimpleService { + /// Create new `H1SimpleService` instance. + pub fn new() -> Self { + H1SimpleService { + config: ServiceConfig::default(), + _t: PhantomData, + } + } +} + +impl NewService for H1SimpleService +where + T: AsyncRead + AsyncWrite, +{ + type Request = T; + type Response = (Request, Framed); + type Error = ParseError; + type InitError = (); + type Service = H1SimpleServiceHandler; + type Future = FutureResult; + + fn new_service(&self) -> Self::Future { + ok(H1SimpleServiceHandler { + config: self.config.clone(), + _t: PhantomData, + }) + } +} + +/// `Service` implementation for HTTP1 transport. Reads one request and returns +/// request and framed object. +pub struct H1SimpleServiceHandler { + config: ServiceConfig, + _t: PhantomData, +} + +impl Service for H1SimpleServiceHandler +where + T: AsyncRead + AsyncWrite, +{ + type Request = T; + type Response = (Request, Framed); + type Error = ParseError; + type Future = H1SimpleServiceHandlerResponse; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + H1SimpleServiceHandlerResponse { + framed: Some(Framed::new(req, Codec::new(self.config.clone()))), + } + } +} + +#[doc(hidden)] +pub struct H1SimpleServiceHandlerResponse +where + T: AsyncRead + AsyncWrite, +{ + framed: Option>, +} + +impl Future for H1SimpleServiceHandlerResponse +where + T: AsyncRead + AsyncWrite, +{ + type Item = (Request, Framed); + type Error = ParseError; + + fn poll(&mut self) -> Poll { + match self.framed.as_mut().unwrap().poll()? { + Async::Ready(Some(req)) => match req { + InMessage::Message(req, _) => { + Ok(Async::Ready((req, self.framed.take().unwrap()))) + } + InMessage::Chunk(_) => unreachable!("Something is wrong"), + }, + Async::Ready(None) => Err(ParseError::Incomplete), + Async::NotReady => Ok(Async::NotReady), + } + } +}