diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index 74ba1aed1..2a8a8360f 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::fmt; use std::marker::PhantomData; use actix_server_config::ServerConfig as SrvConfig; @@ -6,39 +6,52 @@ use actix_service::{IntoNewService, NewService, Service}; use crate::body::MessageBody; use crate::config::{KeepAlive, ServiceConfig}; +use crate::error::Error; +use crate::h1::{ExpectHandler, H1Service}; +use crate::h2::H2Service; use crate::request::Request; use crate::response::Response; - -use crate::h1::H1Service; -use crate::h2::H2Service; use crate::service::HttpService; /// A http service builder /// /// This type can be used to construct an instance of `http service` through a /// builder-like pattern. -pub struct HttpServiceBuilder { +pub struct HttpServiceBuilder { keep_alive: KeepAlive, client_timeout: u64, client_disconnect: u64, + expect: X, _t: PhantomData<(T, S)>, } -impl HttpServiceBuilder +impl HttpServiceBuilder where S: NewService, - S::Error: Debug, + S::Error: Into, + S::InitError: fmt::Debug, { /// Create instance of `ServiceConfigBuilder` - pub fn new() -> HttpServiceBuilder { + pub fn new() -> Self { HttpServiceBuilder { keep_alive: KeepAlive::Timeout(5), client_timeout: 5000, client_disconnect: 0, + expect: ExpectHandler, _t: PhantomData, } } +} +impl HttpServiceBuilder +where + S: NewService, + S::Error: Into, + S::InitError: fmt::Debug, + X: NewService, + X::Error: Into, + X::InitError: fmt::Debug, +{ /// Set server keep-alive setting. /// /// By default keep alive is set to a 5 seconds. @@ -94,10 +107,12 @@ where // } /// Finish service configuration and create *http service* for HTTP/1 protocol. - pub fn h1(self, service: F) -> H1Service + pub fn h1(self, service: F) -> H1Service where B: MessageBody + 'static, F: IntoNewService, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, { let cfg = ServiceConfig::new( @@ -105,7 +120,7 @@ where self.client_timeout, self.client_disconnect, ); - H1Service::with_config(cfg, service.into_new_service()) + H1Service::with_config(cfg, service.into_new_service()).expect(self.expect) } /// Finish service configuration and create *http service* for HTTP/2 protocol. @@ -113,6 +128,8 @@ where where B: MessageBody + 'static, F: IntoNewService, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, ::Future: 'static, { @@ -129,6 +146,8 @@ where where B: MessageBody + 'static, F: IntoNewService, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, ::Future: 'static, { diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index f476ad5fb..1c9a3aab0 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -25,13 +25,13 @@ type SslConnector = (); /// The `Connector` type uses a builder-like combinator pattern for service /// construction that finishes by calling the `.finish()` method. /// -/// ```rust -/// use actix-web::client::Connector; -/// use time::Duration; +/// ```rust,ignore +/// use std::time::Duration; +/// use actix_http::client::Connector; /// /// let connector = Connector::new() -/// .timeout(Duration::from_secs(5)) -/// .finish(); +/// .timeout(Duration::from_secs(5)) +/// .finish(); /// ``` pub struct Connector { connector: T, diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 3e0076b49..fc37d3243 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -71,6 +71,12 @@ impl fmt::Debug for Error { } } +impl From<()> for Error { + fn from(_: ()) -> Self { + Error::from(UnitError) + } +} + impl std::error::Error for Error { fn description(&self) -> &str { "actix-http::Error" @@ -111,6 +117,13 @@ impl ResponseError for TimeoutError { } } +#[derive(Debug, Display)] +#[display(fmt = "UnknownError")] +struct UnitError; + +/// `InternalServerError` for `JsonError` +impl ResponseError for UnitError {} + /// `InternalServerError` for `JsonError` impl ResponseError for JsonError {} @@ -120,6 +133,10 @@ impl ResponseError for FormError {} /// `InternalServerError` for `TimerError` impl ResponseError for TimerError {} +#[cfg(feature = "ssl")] +/// `InternalServerError` for `SslError` +impl ResponseError for openssl::ssl::Error {} + /// Return `BAD_REQUEST` for `de::value::Error` impl ResponseError for DeError { fn error_response(&self) -> Response { @@ -331,7 +348,7 @@ impl ResponseError for crate::cookie::ParseError { /// A set of errors that can occur during dispatching http requests pub enum DispatchError { /// Service error - Service, + Service(Error), /// An `io::Error` that occurred while trying to read or write to a network /// stream. diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index 6e891e7cd..64731ac94 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -154,33 +154,37 @@ impl Encoder for Codec { ) -> Result<(), Self::Error> { match item { Message::Item((mut res, length)) => { - // set response version - res.head_mut().version = self.version; - - // connection status - self.ctype = if let Some(ct) = res.head().ctype() { - if ct == ConnectionType::KeepAlive { - self.ctype - } else { - ct - } + if res.head().status == StatusCode::CONTINUE { + dst.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); } else { - self.ctype - }; + // set response version + res.head_mut().version = self.version; - // encode message - let len = dst.len(); - self.encoder.encode( - dst, - &mut res, - self.flags.contains(Flags::HEAD), - self.flags.contains(Flags::STREAM), - self.version, - length, - self.ctype, - &self.config, - )?; - self.headers_size = (dst.len() - len) as u32; + // connection status + self.ctype = if let Some(ct) = res.head().ctype() { + if ct == ConnectionType::KeepAlive { + self.ctype + } else { + ct + } + } else { + self.ctype + }; + + // encode message + let len = dst.len(); + self.encoder.encode( + dst, + &mut res, + self.flags.contains(Flags::HEAD), + self.flags.contains(Flags::STREAM), + self.version, + length, + self.ctype, + &self.config, + )?; + self.headers_size = (dst.len() - len) as u32; + } } Message::Chunk(Some(bytes)) => { self.encoder.encode_chunk(bytes.as_ref(), dst)?; diff --git a/actix-http/src/h1/decoder.rs b/actix-http/src/h1/decoder.rs index dfd9fe25c..10652d627 100644 --- a/actix-http/src/h1/decoder.rs +++ b/actix-http/src/h1/decoder.rs @@ -51,6 +51,8 @@ pub(crate) enum PayloadLength { pub(crate) trait MessageType: Sized { fn set_connection_type(&mut self, ctype: Option); + fn set_expect(&mut self); + fn headers_mut(&mut self) -> &mut HeaderMap; fn decode(src: &mut BytesMut) -> Result, ParseError>; @@ -62,6 +64,7 @@ pub(crate) trait MessageType: Sized { ) -> Result { let mut ka = None; let mut has_upgrade = false; + let mut expect = false; let mut chunked = false; let mut content_length = None; @@ -126,6 +129,12 @@ pub(crate) trait MessageType: Sized { } } } + header::EXPECT => { + let bytes = value.as_bytes(); + if bytes.len() >= 4 && &bytes[0..4] == b"100-" { + expect = true; + } + } _ => (), } @@ -136,6 +145,9 @@ pub(crate) trait MessageType: Sized { } } self.set_connection_type(ka); + if expect { + self.set_expect() + } // https://tools.ietf.org/html/rfc7230#section-3.3.3 if chunked { @@ -163,6 +175,10 @@ impl MessageType for Request { } } + fn set_expect(&mut self) { + self.head_mut().set_expect(); + } + fn headers_mut(&mut self) -> &mut HeaderMap { &mut self.head_mut().headers } @@ -235,6 +251,8 @@ impl MessageType for ResponseHead { } } + fn set_expect(&mut self) {} + fn headers_mut(&mut self) -> &mut HeaderMap { &mut self.headers } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 0f9b495b3..e2306fde7 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -1,5 +1,4 @@ use std::collections::VecDeque; -use std::fmt::Debug; use std::mem; use std::time::Instant; @@ -13,8 +12,9 @@ use tokio_timer::Delay; use crate::body::{Body, BodySize, MessageBody, ResponseBody}; use crate::config::ServiceConfig; -use crate::error::DispatchError; +use crate::error::{DispatchError, Error}; use crate::error::{ParseError, PayloadError}; +use crate::http::StatusCode; use crate::request::Request; use crate::response::Response; @@ -37,24 +37,33 @@ bitflags! { } /// Dispatcher for HTTP/1.1 protocol -pub struct Dispatcher, B: MessageBody> +pub struct Dispatcher where - S::Error: Debug, + S: Service, + S::Error: Into, + B: MessageBody, + X: Service, + X::Error: Into, { - inner: Option>, + inner: Option>, } -struct InnerDispatcher, B: MessageBody> +struct InnerDispatcher where - S::Error: Debug, + S: Service, + S::Error: Into, + B: MessageBody, + X: Service, + X::Error: Into, { service: CloneableService, + expect: CloneableService, flags: Flags, framed: Framed, error: Option, config: ServiceConfig, - state: State, + state: State, payload: Option, messages: VecDeque, @@ -67,13 +76,24 @@ enum DispatcherMessage { Error(Response<()>), } -enum State, B: MessageBody> { +enum State +where + S: Service, + X: Service, + B: MessageBody, +{ None, + ExpectCall(X::Future), ServiceCall(S::Future), SendPayload(ResponseBody), } -impl, B: MessageBody> State { +impl State +where + S: Service, + X: Service, + B: MessageBody, +{ fn is_empty(&self) -> bool { if let State::None = self { true @@ -83,21 +103,29 @@ impl, B: MessageBody> State { } } -impl Dispatcher +impl Dispatcher where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Debug, + S::Error: Into, S::Response: Into>, B: MessageBody, + X: Service, + X::Error: Into, { /// Create http/1 dispatcher. - pub fn new(stream: T, config: ServiceConfig, service: CloneableService) -> Self { + pub fn new( + stream: T, + config: ServiceConfig, + service: CloneableService, + expect: CloneableService, + ) -> Self { Dispatcher::with_timeout( Framed::new(stream, Codec::new(config.clone())), config, None, service, + expect, ) } @@ -107,6 +135,7 @@ where config: ServiceConfig, timeout: Option, service: CloneableService, + expect: CloneableService, ) -> Self { let keepalive = config.keep_alive_enabled(); let flags = if keepalive { @@ -132,6 +161,7 @@ where error: None, messages: VecDeque::new(), service, + expect, flags, config, ka_expire, @@ -141,13 +171,15 @@ where } } -impl InnerDispatcher +impl InnerDispatcher where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Debug, + S::Error: Into, S::Response: Into>, B: MessageBody, + X: Service, + X::Error: Into, { fn can_read(&self) -> bool { if self.flags.contains(Flags::DISCONNECTED) { @@ -195,7 +227,7 @@ where &mut self, message: Response<()>, body: ResponseBody, - ) -> Result, DispatchError> { + ) -> Result, DispatchError> { self.framed .force_send(Message::Item((message, body.length()))) .map_err(|err| { @@ -213,6 +245,15 @@ where } } + fn send_continue(&mut self) -> Result<(), DispatchError> { + self.framed + .force_send(Message::Item(( + Response::empty(StatusCode::CONTINUE), + BodySize::Empty, + ))) + .map_err(|err| DispatchError::Io(err)) + } + fn poll_response(&mut self) -> Result<(), DispatchError> { let mut retry = self.can_read(); loop { @@ -227,6 +268,22 @@ where } None => None, }, + State::ExpectCall(mut fut) => match fut.poll() { + Ok(Async::Ready(req)) => { + self.send_continue()?; + Some(State::ServiceCall(self.service.call(req))) + } + Ok(Async::NotReady) => { + self.state = State::ExpectCall(fut); + None + } + Err(e) => { + let e = e.into(); + let res: Response = e.into(); + let (res, body) = res.replace_body(()); + Some(self.send_response(res, body.into_body())?) + } + }, State::ServiceCall(mut fut) => match fut.poll() { Ok(Async::Ready(res)) => { let (res, body) = res.into().replace_body(()); @@ -289,7 +346,28 @@ where Ok(()) } - fn handle_request(&mut self, req: Request) -> Result, DispatchError> { + fn handle_request(&mut self, req: Request) -> Result, DispatchError> { + // Handle `EXPECT: 100-Continue` header + let req = if req.head().expect() { + let mut task = self.expect.call(req); + match task.poll() { + Ok(Async::Ready(req)) => { + self.send_continue()?; + req + } + Ok(Async::NotReady) => return Ok(State::ExpectCall(task)), + Err(e) => { + let e = e.into(); + let res: Response = e.into(); + let (res, body) = res.replace_body(()); + return self.send_response(res, body.into_body()); + } + } + } else { + req + }; + + // Call service let mut task = self.service.call(req); match task.poll() { Ok(Async::Ready(res)) => { @@ -329,10 +407,6 @@ where req = req1; self.payload = Some(ps); } - //MessageType::Stream => { - // self.unhandled = Some(req); - // return Ok(updated); - //} _ => (), } @@ -482,13 +556,15 @@ where } } -impl Future for Dispatcher +impl Future for Dispatcher where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Debug, + S::Error: Into, S::Response: Into>, B: MessageBody, + X: Service, + X::Error: Into, { type Item = (); type Error = DispatchError; @@ -558,6 +634,7 @@ mod tests { use super::*; use crate::error::Error; + use crate::h1::ExpectHandler; struct Buffer { buf: Bytes, @@ -620,6 +697,7 @@ mod tests { CloneableService::new( (|_| ok::<_, Error>(Response::Ok().finish())).into_service(), ), + CloneableService::new(ExpectHandler), ); assert!(h1.poll().is_ok()); assert!(h1.poll().is_ok()); diff --git a/actix-http/src/h1/expect.rs b/actix-http/src/h1/expect.rs new file mode 100644 index 000000000..86fcb2cc3 --- /dev/null +++ b/actix-http/src/h1/expect.rs @@ -0,0 +1,36 @@ +use actix_service::{NewService, Service}; +use futures::future::{ok, FutureResult}; +use futures::{Async, Poll}; + +use crate::error::Error; +use crate::request::Request; + +pub struct ExpectHandler; + +impl NewService for ExpectHandler { + type Request = Request; + type Response = Request; + type Error = Error; + type Service = ExpectHandler; + type InitError = Error; + type Future = FutureResult; + + fn new_service(&self, _: &()) -> Self::Future { + ok(ExpectHandler) + } +} + +impl Service for ExpectHandler { + type Request = Request; + type Response = Request; + type Error = Error; + type Future = FutureResult; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(Async::Ready(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + ok(req) + } +} diff --git a/actix-http/src/h1/mod.rs b/actix-http/src/h1/mod.rs index a05f2800c..dd29547e5 100644 --- a/actix-http/src/h1/mod.rs +++ b/actix-http/src/h1/mod.rs @@ -6,12 +6,14 @@ mod codec; mod decoder; mod dispatcher; mod encoder; +mod expect; mod payload; mod service; pub use self::client::{ClientCodec, ClientPayloadCodec}; pub use self::codec::Codec; pub use self::dispatcher::Dispatcher; +pub use self::expect::ExpectHandler; pub use self::payload::{Payload, PayloadWriter}; pub use self::service::{H1Service, H1ServiceHandler, OneRequest}; diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index d7ab50626..c3d21b4db 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::fmt; use std::marker::PhantomData; use actix_codec::{AsyncRead, AsyncWrite, Framed}; @@ -10,25 +10,27 @@ use futures::{try_ready, Async, Future, IntoFuture, Poll, Stream}; use crate::body::MessageBody; use crate::config::{KeepAlive, ServiceConfig}; -use crate::error::{DispatchError, ParseError}; +use crate::error::{DispatchError, Error, ParseError}; use crate::request::Request; use crate::response::Response; use super::codec::Codec; use super::dispatcher::Dispatcher; -use super::Message; +use super::{ExpectHandler, Message}; /// `NewService` implementation for HTTP1 transport -pub struct H1Service { +pub struct H1Service { srv: S, cfg: ServiceConfig, + expect: X, _t: PhantomData<(T, P, B)>, } impl H1Service where S: NewService, - S::Error: Debug, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, B: MessageBody, { @@ -39,6 +41,7 @@ where H1Service { cfg, srv: service.into_new_service(), + expect: ExpectHandler, _t: PhantomData, } } @@ -51,29 +54,59 @@ where H1Service { cfg, srv: service.into_new_service(), + expect: ExpectHandler, _t: PhantomData, } } } -impl NewService for H1Service +impl H1Service +where + S: NewService, + S::Error: Into, + S::Response: Into>, + S::InitError: fmt::Debug, + B: MessageBody, +{ + pub fn expect(self, expect: U) -> H1Service + where + U: NewService, + U::Error: Into, + U::InitError: fmt::Debug, + { + H1Service { + expect, + cfg: self.cfg, + srv: self.srv, + _t: PhantomData, + } + } +} + +impl NewService for H1Service where T: AsyncRead + AsyncWrite, S: NewService, - S::Error: Debug, + S::Error: Into, S::Response: Into>, + S::InitError: fmt::Debug, B: MessageBody, + X: NewService, + X::Error: Into, + X::InitError: fmt::Debug, { type Request = Io; type Response = (); type Error = DispatchError; - type InitError = S::InitError; - type Service = H1ServiceHandler; - type Future = H1ServiceResponse; + type InitError = (); + type Service = H1ServiceHandler; + type Future = H1ServiceResponse; fn new_service(&self, cfg: &SrvConfig) -> Self::Future { H1ServiceResponse { fut: self.srv.new_service(cfg).into_future(), + fut_ex: Some(self.expect.new_service(&())), + expect: None, cfg: Some(self.cfg.clone()), _t: PhantomData, } @@ -81,77 +114,136 @@ where } #[doc(hidden)] -pub struct H1ServiceResponse, B> { - fut: ::Future, +pub struct H1ServiceResponse +where + S: NewService, + S::Error: Into, + S::InitError: fmt::Debug, + X: NewService, + X::Error: Into, + X::InitError: fmt::Debug, +{ + fut: S::Future, + fut_ex: Option, + expect: Option, cfg: Option, _t: PhantomData<(T, P, B)>, } -impl Future for H1ServiceResponse +impl Future for H1ServiceResponse where T: AsyncRead + AsyncWrite, S: NewService, - S::Error: Debug, + S::Error: Into, S::Response: Into>, + S::InitError: fmt::Debug, B: MessageBody, + X: NewService, + X::Error: Into, + X::InitError: fmt::Debug, { - type Item = H1ServiceHandler; - type Error = S::InitError; + type Item = H1ServiceHandler; + type Error = (); fn poll(&mut self) -> Poll { - let service = try_ready!(self.fut.poll()); + if let Some(ref mut fut) = self.fut_ex { + let expect = try_ready!(fut + .poll() + .map_err(|e| log::error!("Init http service error: {:?}", e))); + self.expect = Some(expect); + self.fut_ex.take(); + } + + let service = try_ready!(self + .fut + .poll() + .map_err(|e| log::error!("Init http service error: {:?}", e))); Ok(Async::Ready(H1ServiceHandler::new( self.cfg.take().unwrap(), service, + self.expect.take().unwrap(), ))) } } /// `Service` implementation for HTTP1 transport -pub struct H1ServiceHandler { +pub struct H1ServiceHandler { srv: CloneableService, + expect: CloneableService, cfg: ServiceConfig, _t: PhantomData<(T, P, B)>, } -impl H1ServiceHandler +impl H1ServiceHandler where S: Service, - S::Error: Debug, + S::Error: Into, S::Response: Into>, B: MessageBody, + X: Service, + X::Error: Into, { - fn new(cfg: ServiceConfig, srv: S) -> H1ServiceHandler { + fn new(cfg: ServiceConfig, srv: S, expect: X) -> H1ServiceHandler { H1ServiceHandler { srv: CloneableService::new(srv), + expect: CloneableService::new(expect), cfg, _t: PhantomData, } } } -impl Service for H1ServiceHandler +impl Service for H1ServiceHandler where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Debug, + S::Error: Into, S::Response: Into>, B: MessageBody, + X: Service, + X::Error: Into, { type Request = Io; type Response = (); type Error = DispatchError; - type Future = Dispatcher; + type Future = Dispatcher; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.srv.poll_ready().map_err(|e| { - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service - }) + let ready = self + .expect + .poll_ready() + .map_err(|e| { + let e = e.into(); + log::error!("Http service readiness error: {:?}", e); + DispatchError::Service(e) + })? + .is_ready(); + + let ready = self + .srv + .poll_ready() + .map_err(|e| { + let e = e.into(); + log::error!("Http service readiness error: {:?}", e); + DispatchError::Service(e) + })? + .is_ready() + && ready; + + if ready { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } } fn call(&mut self, req: Self::Request) -> Self::Future { - Dispatcher::new(req.into_parts().0, self.cfg.clone(), self.srv.clone()) + Dispatcher::new( + req.into_parts().0, + self.cfg.clone(), + self.srv.clone(), + self.expect.clone(), + ) } } diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 0ef40fc08..e00996048 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -46,7 +46,7 @@ impl Dispatcher where T: AsyncRead + AsyncWrite, S: Service, - S::Error: fmt::Debug, + S::Error: Into, S::Future: 'static, S::Response: Into>, B: MessageBody + 'static, @@ -88,7 +88,7 @@ impl Future for Dispatcher where T: AsyncRead + AsyncWrite, S: Service, - S::Error: fmt::Debug, + S::Error: Into, S::Future: 'static, S::Response: Into>, B: MessageBody + 'static, @@ -146,7 +146,7 @@ enum ServiceResponseState { impl ServiceResponse where F: Future, - F::Error: fmt::Debug, + F::Error: Into, F::Item: Into>, B: MessageBody + 'static, { @@ -214,7 +214,7 @@ where impl Future for ServiceResponse where F: Future, - F::Error: fmt::Debug, + F::Error: Into, F::Item: Into>, B: MessageBody + 'static, { diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index 16ccd79a5..8ab244b50 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -32,7 +32,7 @@ pub struct H2Service { impl H2Service where S: NewService, - S::Error: Debug, + S::Error: Into, S::Response: Into>, ::Future: 'static, B: MessageBody + 'static, @@ -65,7 +65,7 @@ impl NewService for H2Service where T: AsyncRead + AsyncWrite, S: NewService, - S::Error: Debug, + S::Error: Into, S::Response: Into>, ::Future: 'static, B: MessageBody + 'static, @@ -97,7 +97,7 @@ impl Future for H2ServiceResponse where T: AsyncRead + AsyncWrite, S: NewService, - S::Error: Debug, + S::Error: Into, S::Response: Into>, ::Future: 'static, B: MessageBody + 'static, @@ -124,7 +124,7 @@ pub struct H2ServiceHandler { impl H2ServiceHandler where S: Service, - S::Error: Debug, + S::Error: Into, S::Future: 'static, S::Response: Into>, B: MessageBody + 'static, @@ -142,7 +142,7 @@ impl Service for H2ServiceHandler where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Debug, + S::Error: Into, S::Future: 'static, S::Response: Into>, B: MessageBody + 'static, @@ -154,8 +154,9 @@ where fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.srv.poll_ready().map_err(|e| { + let e = e.into(); error!("Service readiness error: {:?}", e); - DispatchError::Service + DispatchError::Service(e) }) } @@ -186,7 +187,7 @@ pub struct H2ServiceHandlerResponse where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Debug, + S::Error: Into, S::Future: 'static, S::Response: Into>, B: MessageBody + 'static, @@ -198,7 +199,7 @@ impl Future for H2ServiceHandlerResponse where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Debug, + S::Error: Into, S::Future: 'static, S::Response: Into>, B: MessageBody, diff --git a/actix-http/src/message.rs b/actix-http/src/message.rs index 3466f66df..2fdb28e40 100644 --- a/actix-http/src/message.rs +++ b/actix-http/src/message.rs @@ -23,7 +23,8 @@ bitflags! { const CLOSE = 0b0000_0001; const KEEP_ALIVE = 0b0000_0010; const UPGRADE = 0b0000_0100; - const NO_CHUNKING = 0b0000_1000; + const EXPECT = 0b0000_1000; + const NO_CHUNKING = 0b0001_0000; } } @@ -145,6 +146,17 @@ impl RequestHead { self.flags.remove(Flags::NO_CHUNKING); } } + + #[inline] + /// Request contains `EXPECT` header + pub fn expect(&self) -> bool { + self.flags.contains(Flags::EXPECT) + } + + #[inline] + pub(crate) fn set_expect(&mut self) { + self.flags.insert(Flags::EXPECT); + } } #[derive(Debug)] diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index c3fed133d..0c8c2eef6 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -51,6 +51,18 @@ impl Response { } } + #[inline] + pub(crate) fn empty(status: StatusCode) -> Response<()> { + let mut head: Message = Message::new(); + head.status = status; + + Response { + head, + body: ResponseBody::Body(()), + error: None, + } + } + /// Constructs an error response #[inline] pub fn from_error(error: Error) -> Response { diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index f97cc0483..f259e3021 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -1,4 +1,3 @@ -use std::fmt::Debug; use std::marker::PhantomData; use std::{fmt, io}; @@ -9,27 +8,28 @@ use actix_utils::cloneable::CloneableService; use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures::{try_ready, Async, Future, IntoFuture, Poll}; use h2::server::{self, Handshake}; -use log::error; use crate::body::MessageBody; use crate::builder::HttpServiceBuilder; use crate::config::{KeepAlive, ServiceConfig}; -use crate::error::DispatchError; +use crate::error::{DispatchError, Error}; use crate::request::Request; use crate::response::Response; use crate::{h1, h2::Dispatcher}; /// `NewService` HTTP1.1/HTTP2 transport implementation -pub struct HttpService { +pub struct HttpService { srv: S, cfg: ServiceConfig, + expect: X, _t: PhantomData<(T, P, B)>, } impl HttpService where S: NewService, - S::Error: Debug, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, ::Future: 'static, B: MessageBody + 'static, @@ -43,7 +43,8 @@ where impl HttpService where S: NewService, - S::Error: Debug, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, ::Future: 'static, B: MessageBody + 'static, @@ -55,6 +56,7 @@ where HttpService { cfg, srv: service.into_new_service(), + expect: h1::ExpectHandler, _t: PhantomData, } } @@ -67,30 +69,65 @@ where HttpService { cfg, srv: service.into_new_service(), + expect: h1::ExpectHandler, _t: PhantomData, } } } -impl NewService for HttpService +impl HttpService +where + S: NewService, + S::Error: Into, + S::InitError: fmt::Debug, + S::Response: Into>, + B: MessageBody, +{ + /// Provide service for `EXPECT: 100-Continue` support. + /// + /// Service get called with request that contains `EXPECT` header. + /// Service must return request in case of success, in that case + /// request will be forwarded to main service. + pub fn expect(self, expect: U) -> HttpService + where + U: NewService, + U::Error: Into, + U::InitError: fmt::Debug, + { + HttpService { + expect, + cfg: self.cfg, + srv: self.srv, + _t: PhantomData, + } + } +} + +impl NewService for HttpService where T: AsyncRead + AsyncWrite, S: NewService, - S::Error: Debug, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, ::Future: 'static, B: MessageBody + 'static, + X: NewService, + X::Error: Into, + X::InitError: fmt::Debug, { type Request = ServerIo; type Response = (); type Error = DispatchError; - type InitError = S::InitError; - type Service = HttpServiceHandler; - type Future = HttpServiceResponse; + type InitError = (); + type Service = HttpServiceHandler; + type Future = HttpServiceResponse; fn new_service(&self, cfg: &SrvConfig) -> Self::Future { HttpServiceResponse { fut: self.srv.new_service(cfg).into_future(), + fut_ex: Some(self.expect.new_service(&())), + expect: None, cfg: Some(self.cfg.clone()), _t: PhantomData, } @@ -98,76 +135,122 @@ where } #[doc(hidden)] -pub struct HttpServiceResponse, B> { - fut: ::Future, +pub struct HttpServiceResponse, B, X: NewService> { + fut: S::Future, + fut_ex: Option, + expect: Option, cfg: Option, _t: PhantomData<(T, P, B)>, } -impl Future for HttpServiceResponse +impl Future for HttpServiceResponse where T: AsyncRead + AsyncWrite, S: NewService, - S::Error: Debug, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, ::Future: 'static, B: MessageBody + 'static, + X: NewService, + X::Error: Into, + X::InitError: fmt::Debug, { - type Item = HttpServiceHandler; - type Error = S::InitError; + type Item = HttpServiceHandler; + type Error = (); fn poll(&mut self) -> Poll { - let service = try_ready!(self.fut.poll()); + if let Some(ref mut fut) = self.fut_ex { + let expect = try_ready!(fut + .poll() + .map_err(|e| log::error!("Init http service error: {:?}", e))); + self.expect = Some(expect); + self.fut_ex.take(); + } + + let service = try_ready!(self + .fut + .poll() + .map_err(|e| log::error!("Init http service error: {:?}", e))); Ok(Async::Ready(HttpServiceHandler::new( self.cfg.take().unwrap(), service, + self.expect.take().unwrap(), ))) } } /// `Service` implementation for http transport -pub struct HttpServiceHandler { +pub struct HttpServiceHandler { srv: CloneableService, + expect: CloneableService, cfg: ServiceConfig, - _t: PhantomData<(T, P, B)>, + _t: PhantomData<(T, P, B, X)>, } -impl HttpServiceHandler +impl HttpServiceHandler where S: Service, - S::Error: Debug, + S::Error: Into, S::Future: 'static, S::Response: Into>, B: MessageBody + 'static, + X: Service, + X::Error: Into, { - fn new(cfg: ServiceConfig, srv: S) -> HttpServiceHandler { + fn new(cfg: ServiceConfig, srv: S, expect: X) -> HttpServiceHandler { HttpServiceHandler { cfg, srv: CloneableService::new(srv), + expect: CloneableService::new(expect), _t: PhantomData, } } } -impl Service for HttpServiceHandler +impl Service for HttpServiceHandler where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Debug, + S::Error: Into, S::Future: 'static, S::Response: Into>, B: MessageBody + 'static, + X: Service, + X::Error: Into, { type Request = ServerIo; type Response = (); type Error = DispatchError; - type Future = HttpServiceHandlerResponse; + type Future = HttpServiceHandlerResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.srv.poll_ready().map_err(|e| { - error!("Service readiness error: {:?}", e); - DispatchError::Service - }) + let ready = self + .expect + .poll_ready() + .map_err(|e| { + let e = e.into(); + log::error!("Http service readiness error: {:?}", e); + DispatchError::Service(e) + })? + .is_ready(); + + let ready = self + .srv + .poll_ready() + .map_err(|e| { + let e = e.into(); + log::error!("Http service readiness error: {:?}", e); + DispatchError::Service(e) + })? + .is_ready() + && ready; + + if ready { + Ok(Async::Ready(())) + } else { + Ok(Async::NotReady) + } } fn call(&mut self, req: Self::Request) -> Self::Future { @@ -191,6 +274,7 @@ where io, self.cfg.clone(), self.srv.clone(), + self.expect.clone(), )), }, _ => HttpServiceHandlerResponse { @@ -199,46 +283,63 @@ where BytesMut::with_capacity(14), self.cfg.clone(), self.srv.clone(), + self.expect.clone(), ))), }, } } } -enum State, B: MessageBody> +enum State where + S: Service, S::Future: 'static, - S::Error: fmt::Debug, + S::Error: Into, T: AsyncRead + AsyncWrite, + B: MessageBody, + X: Service, + X::Error: Into, { - H1(h1::Dispatcher), + H1(h1::Dispatcher), H2(Dispatcher, S, B>), - Unknown(Option<(T, BytesMut, ServiceConfig, CloneableService)>), + Unknown( + Option<( + T, + BytesMut, + ServiceConfig, + CloneableService, + CloneableService, + )>, + ), Handshake(Option<(Handshake, Bytes>, ServiceConfig, CloneableService)>), } -pub struct HttpServiceHandlerResponse +pub struct HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Debug, + S::Error: Into, S::Future: 'static, S::Response: Into>, B: MessageBody + 'static, + X: Service, + X::Error: Into, { - state: State, + state: State, } const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; -impl Future for HttpServiceHandlerResponse +impl Future for HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite, S: Service, - S::Error: Debug, + S::Error: Into, S::Future: 'static, S::Response: Into>, B: MessageBody, + X: Service, + X::Error: Into, { type Item = (); type Error = DispatchError; @@ -265,7 +366,7 @@ where } else { panic!() } - let (io, buf, cfg, srv) = data.take().unwrap(); + let (io, buf, cfg, srv, expect) = data.take().unwrap(); if buf[..14] == HTTP2_PREFACE[..] { let io = Io { inner: io, @@ -279,8 +380,9 @@ where h1::Codec::new(cfg.clone()), buf, )); - self.state = - State::H1(h1::Dispatcher::with_timeout(framed, cfg, None, srv)) + self.state = State::H1(h1::Dispatcher::with_timeout( + framed, cfg, None, srv, expect, + )) } self.poll() } diff --git a/actix-http/tests/test_client.rs b/actix-http/tests/test_client.rs index 817164f81..cfe0999fd 100644 --- a/actix-http/tests/test_client.rs +++ b/actix-http/tests/test_client.rs @@ -61,7 +61,9 @@ fn test_connection_close() { .finish(|_| ok::<_, ()>(Response::Ok().body(STR))) .map(|_| ()) }); + println!("REQ: {:?}", srv.get("/").force_close()); let response = srv.block_on(srv.get("/").force_close().send()).unwrap(); + println!("RES: {:?}", response); assert!(response.status().is_success()); } diff --git a/src/server.rs b/src/server.rs index 2817f549c..efc70773f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,7 +2,7 @@ use std::marker::PhantomData; use std::sync::Arc; use std::{fmt, io, net}; -use actix_http::{body::MessageBody, HttpService, KeepAlive, Request, Response}; +use actix_http::{body::MessageBody, Error, HttpService, KeepAlive, Request, Response}; use actix_rt::System; use actix_server::{Server, ServerBuilder}; use actix_server_config::ServerConfig; @@ -53,7 +53,8 @@ where F: Fn() -> I + Send + Clone + 'static, I: IntoNewService, S: NewService, - S::Error: fmt::Debug, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, S::Service: 'static, B: MessageBody, @@ -72,7 +73,8 @@ where F: Fn() -> I + Send + Clone + 'static, I: IntoNewService, S: NewService, - S::Error: fmt::Debug + 'static, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, S::Service: 'static, B: MessageBody + 'static, @@ -442,7 +444,8 @@ where F: Fn() -> I + Send + Clone + 'static, I: IntoNewService, S: NewService, - S::Error: fmt::Debug, + S::Error: Into, + S::InitError: fmt::Debug, S::Response: Into>, S::Service: 'static, B: MessageBody, diff --git a/test-server/src/lib.rs b/test-server/src/lib.rs index 98bef99be..3f77f3786 100644 --- a/test-server/src/lib.rs +++ b/test-server/src/lib.rs @@ -90,13 +90,13 @@ impl TestServer { Connector::new() .timeout(time::Duration::from_millis(500)) .ssl(builder.build()) - .service() + .finish() } #[cfg(not(feature = "ssl"))] { Connector::new() .timeout(time::Duration::from_millis(500)) - .service() + .finish() } }; diff --git a/tests/test_httpserver.rs b/tests/test_httpserver.rs index dca3377c9..c0d2e81c4 100644 --- a/tests/test_httpserver.rs +++ b/tests/test_httpserver.rs @@ -61,7 +61,7 @@ fn test_start() { .connector( client::Connector::new() .timeout(Duration::from_millis(100)) - .service(), + .finish(), ) .finish(), ) @@ -136,7 +136,7 @@ fn test_start_ssl() { awc::Connector::new() .ssl(builder.build()) .timeout(Duration::from_millis(100)) - .service(), + .finish(), ) .finish(), )