From 55a29d37782443ccf7485caf3c7a7dd270bd60f7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 6 Feb 2019 11:44:15 -0800 Subject: [PATCH] add h2 server support --- .travis.yml | 2 +- Cargo.toml | 7 +- src/body.rs | 21 +++ src/client/connector.rs | 11 +- src/client/h2proto.rs | 57 +++++-- src/client/response.rs | 2 +- src/config.rs | 4 + src/error.rs | 10 ++ src/h1/dispatcher.rs | 4 +- src/h2/dispatcher.rs | 325 ++++++++++++++++++++++++++++++++++++++++ src/h2/mod.rs | 42 ++++++ src/h2/service.rs | 136 +++++++++++------ src/httpmessage.rs | 20 +-- src/json.rs | 2 +- src/message.rs | 4 - src/request.rs | 73 ++++++--- src/test.rs | 21 +-- test-server/Cargo.toml | 5 + test-server/src/lib.rs | 25 +++- tests/cert.pem | 43 ++---- tests/key.pem | 79 ++++------ tests/test_server.rs | 85 ++++++++++- 22 files changed, 774 insertions(+), 204 deletions(-) create mode 100644 src/h2/dispatcher.rs diff --git a/.travis.yml b/.travis.yml index b7b43895e..c9c9db14f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,7 @@ script: - | if [[ "$TRAVIS_RUST_VERSION" != "nightly" ]]; then cargo clean - cargo test + cargo test --features="ssl" fi - | if [[ "$TRAVIS_RUST_VERSION" == "nightly" ]]; then diff --git a/Cargo.toml b/Cargo.toml index 37e6a066c..bbb31c161 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,13 +34,13 @@ default = ["session"] session = ["cookie/secure"] # openssl -ssl = ["openssl", "actix-connector/ssl"] +ssl = ["openssl", "actix-connector/ssl", "actix-server/ssl", "actix-http-test/ssl"] [dependencies] actix-service = "0.2.1" actix-codec = "0.1.0" actix-connector = "0.2.0" -actix-utils = "0.2.0" +actix-utils = "0.2.1" base64 = "0.10" backtrace = "0.3" @@ -80,6 +80,3 @@ actix-server = "0.2" actix-http-test = { path="test-server" } env_logger = "0.6" serde_derive = "1.0" - -[patch.crates-io] -actix-utils = { git = "https://github.com/actix/actix-net.git" } diff --git a/src/body.rs b/src/body.rs index 12e2d0345..1c54d4ce7 100644 --- a/src/body.rs +++ b/src/body.rs @@ -20,6 +20,18 @@ pub enum BodyLength { Stream, } +impl BodyLength { + pub fn is_eof(&self) -> bool { + match self { + BodyLength::None + | BodyLength::Empty + | BodyLength::Sized(0) + | BodyLength::Sized64(0) => true, + _ => false, + } + } +} + /// Type that provides this trait can be streamed to a peer. pub trait MessageBody { fn length(&self) -> BodyLength; @@ -42,6 +54,15 @@ pub enum ResponseBody { Other(Body), } +impl ResponseBody { + pub fn into_body(self) -> ResponseBody { + match self { + ResponseBody::Body(b) => ResponseBody::Other(b), + ResponseBody::Other(b) => ResponseBody::Other(b), + } + } +} + impl ResponseBody { pub fn as_ref(&self) -> Option<&B> { if let ResponseBody::Body(ref b) = self { diff --git a/src/client/connector.rs b/src/client/connector.rs index 8e3c4b5ae..32ba50121 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -12,11 +12,7 @@ use super::error::ConnectorError; use super::pool::{ConnectionPool, Protocol}; #[cfg(feature = "ssl")] -use actix_connector::ssl::OpensslConnector; -#[cfg(feature = "ssl")] -use openssl::ssl::{SslConnector, SslMethod}; -#[cfg(feature = "ssl")] -const H2: &[u8] = b"h2"; +use openssl::ssl::SslConnector; #[cfg(not(feature = "ssl"))] type SslConnector = (); @@ -40,6 +36,8 @@ impl Default for Connector { #[cfg(feature = "ssl")] { use log::error; + use openssl::ssl::{SslConnector, SslMethod}; + let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); let _ = ssl .set_alpn_protos(b"\x02h2\x08http/1.1") @@ -167,6 +165,9 @@ impl Connector { } #[cfg(feature = "ssl")] { + const H2: &[u8] = b"h2"; + use actix_connector::ssl::OpensslConnector; + let ssl_service = Apply::new( TimeoutService::new(self.timeout), self.resolver diff --git a/src/client/h2proto.rs b/src/client/h2proto.rs index e3d5be0b0..ecd18cf82 100644 --- a/src/client/h2proto.rs +++ b/src/client/h2proto.rs @@ -4,16 +4,19 @@ use std::time; use actix_codec::{AsyncRead, AsyncWrite}; use bytes::Bytes; use futures::future::{err, Either}; -use futures::{Async, Future, Poll, Stream}; +use futures::{Async, Future, Poll}; use h2::{client::SendRequest, SendStream}; -use http::{request::Request, Version}; +use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; +use http::{request::Request, HttpTryFrom, Version}; + +use crate::body::{BodyLength, MessageBody}; +use crate::h2::Payload; +use crate::message::{RequestHead, ResponseHead}; use super::connection::{ConnectionType, IoConnection}; use super::error::SendRequestError; use super::pool::Acquired; use super::response::ClientResponse; -use crate::body::{BodyLength, MessageBody}; -use crate::message::{RequestHead, ResponseHead}; pub(crate) fn send_request( io: SendRequest, @@ -27,7 +30,8 @@ where B: MessageBody, { trace!("Sending client request: {:?} {:?}", head, body.length()); - let eof = match body.length() { + let length = body.length(); + let eof = match length { BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => true, _ => false, }; @@ -38,11 +42,44 @@ where let mut req = Request::new(()); *req.uri_mut() = head.uri; *req.method_mut() = head.method; - *req.headers_mut() = head.headers; *req.version_mut() = Version::HTTP_2; + let mut skip_len = true; + let mut has_date = false; + + // Content length + let _ = match length { + BodyLength::Chunked | BodyLength::None => None, + BodyLength::Stream => { + skip_len = false; + None + } + BodyLength::Empty => req + .headers_mut() + .insert(CONTENT_LENGTH, HeaderValue::from_static("0")), + BodyLength::Sized(len) => req.headers_mut().insert( + CONTENT_LENGTH, + HeaderValue::try_from(format!("{}", len)).unwrap(), + ), + BodyLength::Sized64(len) => req.headers_mut().insert( + CONTENT_LENGTH, + HeaderValue::try_from(format!("{}", len)).unwrap(), + ), + }; + + // copy headers + for (key, value) in head.headers.iter() { + match *key { + CONNECTION | TRANSFER_ENCODING => continue, // http2 specific + CONTENT_LENGTH if skip_len => continue, + DATE => has_date = true, + _ => (), + } + req.headers_mut().append(key, value.clone()); + } + match io.send_request(req, eof) { - Ok((resp, send)) => { + Ok((res, send)) => { release(io, pool, created, false); if !eof { @@ -52,10 +89,10 @@ where send, buf: None, } - .and_then(move |_| resp.map_err(SendRequestError::from)), + .and_then(move |_| res.map_err(SendRequestError::from)), )) } else { - Either::B(resp.map_err(SendRequestError::from)) + Either::B(res.map_err(SendRequestError::from)) } } Err(e) => { @@ -74,7 +111,7 @@ where Ok(ClientResponse { head, - payload: RefCell::new(Some(Box::new(body.from_err()))), + payload: RefCell::new(Some(Box::new(Payload::new(body)))), }) }) .from_err() diff --git a/src/client/response.rs b/src/client/response.rs index 9010a3c56..6224d3cb5 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -27,7 +27,7 @@ impl HttpMessage for ClientResponse { } #[inline] - fn payload(&self) -> Self::Stream { + fn payload(self) -> Self::Stream { if let Some(payload) = self.payload.borrow_mut().take() { payload } else { diff --git a/src/config.rs b/src/config.rs index c37601dbe..960f13706 100644 --- a/src/config.rs +++ b/src/config.rs @@ -171,6 +171,10 @@ impl ServiceConfig { buf[35..].copy_from_slice(b"\r\n\r\n"); dst.extend_from_slice(&buf); } + + pub(crate) fn set_date_header(&self, dst: &mut BytesMut) { + dst.extend_from_slice(&self.0.timer.date().bytes); + } } /// A service config builder diff --git a/src/error.rs b/src/error.rs index 43bf7cfb8..03224b558 100644 --- a/src/error.rs +++ b/src/error.rs @@ -389,6 +389,10 @@ pub enum DispatchError { #[display(fmt = "Parse error: {}", _0)] Parse(ParseError), + /// Http/2 error + #[display(fmt = "{}", _0)] + H2(h2::Error), + /// The first request did not complete within the specified timeout. #[display(fmt = "The first request did not complete within the specified timeout")] SlowRequestTimeout, @@ -426,6 +430,12 @@ impl From for DispatchError { } } +impl From for DispatchError { + fn from(err: h2::Error) -> Self { + DispatchError::H2(err) + } +} + /// A set of error that can occure during parsing content type #[derive(PartialEq, Debug, Display)] pub enum ContentTypeError { diff --git a/src/h1/dispatcher.rs b/src/h1/dispatcher.rs index 7780223f2..1295dfddf 100644 --- a/src/h1/dispatcher.rs +++ b/src/h1/dispatcher.rs @@ -309,11 +309,11 @@ where self.flags.insert(Flags::STARTED); match msg { - Message::Item(req) => { + Message::Item(mut req) => { match self.framed.get_codec().message_type() { MessageType::Payload => { let (ps, pl) = Payload::create(false); - *req.inner.payload.borrow_mut() = Some(pl); + req = req.set_payload(pl); self.payload = Some(ps); } MessageType::Stream => { diff --git a/src/h2/dispatcher.rs b/src/h2/dispatcher.rs new file mode 100644 index 000000000..2994d0a3d --- /dev/null +++ b/src/h2/dispatcher.rs @@ -0,0 +1,325 @@ +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::time::Instant; +use std::{fmt, mem}; + +use actix_codec::{AsyncRead, AsyncWrite}; +use actix_service::Service; +use bitflags::bitflags; +use bytes::{Bytes, BytesMut}; +use futures::{try_ready, Async, Future, Poll, Sink, Stream}; +use h2::server::{Connection, SendResponse}; +use h2::{RecvStream, SendStream}; +use http::header::{ + HeaderValue, ACCEPT_ENCODING, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING, +}; +use http::HttpTryFrom; +use log::{debug, error, trace}; +use tokio_timer::Delay; + +use crate::body::{Body, BodyLength, MessageBody, ResponseBody}; +use crate::config::ServiceConfig; +use crate::error::{DispatchError, Error, ParseError, PayloadError, ResponseError}; +use crate::message::ResponseHead; +use crate::request::Request; +use crate::response::Response; + +use super::{H2ServiceResult, Payload}; + +const CHUNK_SIZE: usize = 16_384; + +bitflags! { + struct Flags: u8 { + const DISCONNECTED = 0b0000_0001; + const SHUTDOWN = 0b0000_0010; + } +} + +/// Dispatcher for HTTP/2 protocol +pub struct Dispatcher { + flags: Flags, + service: S, + connection: Connection, + config: ServiceConfig, + ka_expire: Instant, + ka_timer: Option, + _t: PhantomData, +} + +impl Dispatcher +where + T: AsyncRead + AsyncWrite, + S: Service, Response = Response> + 'static, + S::Error: Into + fmt::Debug, + B: MessageBody + 'static, +{ + pub fn new( + service: S, + connection: Connection, + config: ServiceConfig, + timeout: Option, + ) -> Self { + let keepalive = config.keep_alive_enabled(); + // let flags = if keepalive { + // Flags::KEEPALIVE | Flags::KEEPALIVE_ENABLED + // } else { + // Flags::empty() + // }; + + // keep-alive timer + let (ka_expire, ka_timer) = if let Some(delay) = timeout { + (delay.deadline(), Some(delay)) + } else if let Some(delay) = config.keep_alive_timer() { + (delay.deadline(), Some(delay)) + } else { + (config.now(), None) + }; + + Dispatcher { + service, + config, + ka_expire, + ka_timer, + connection, + flags: Flags::empty(), + _t: PhantomData, + } + } +} + +impl Future for Dispatcher +where + T: AsyncRead + AsyncWrite, + S: Service, Response = Response> + 'static, + S::Error: Into + fmt::Debug, + B: MessageBody + 'static, +{ + type Item = (); + type Error = DispatchError<()>; + + #[inline] + fn poll(&mut self) -> Poll { + loop { + match self.connection.poll()? { + Async::Ready(None) => { + self.flags.insert(Flags::DISCONNECTED); + } + Async::Ready(Some((req, res))) => { + // update keep-alive expire + if self.ka_timer.is_some() { + if let Some(expire) = self.config.keep_alive_expire() { + self.ka_expire = expire; + } + } + + let (parts, body) = req.into_parts(); + let mut req = Request::with_payload(Payload::new(body)); + + let head = &mut req.inner_mut().head; + head.uri = parts.uri; + head.method = parts.method; + head.version = parts.version; + head.headers = parts.headers; + tokio_current_thread::spawn(ServiceResponse:: { + state: ServiceResponseState::ServiceCall( + self.service.call(req), + Some(res), + ), + config: self.config.clone(), + buffer: None, + }) + } + Async::NotReady => return Ok(Async::NotReady), + } + } + } +} + +struct ServiceResponse { + state: ServiceResponseState, + config: ServiceConfig, + buffer: Option, +} + +enum ServiceResponseState { + ServiceCall(S::Future, Option>), + SendPayload(SendStream, ResponseBody), +} + +impl ServiceResponse +where + S: Service, Response = Response> + 'static, + S::Error: Into + fmt::Debug, + B: MessageBody + 'static, +{ + fn prepare_response( + &self, + head: &ResponseHead, + length: &mut BodyLength, + ) -> http::Response<()> { + let mut has_date = false; + let mut skip_len = length != &BodyLength::Stream; + + let mut res = http::Response::new(()); + *res.status_mut() = head.status; + *res.version_mut() = http::Version::HTTP_2; + + // Content length + match head.status { + http::StatusCode::NO_CONTENT + | http::StatusCode::CONTINUE + | http::StatusCode::PROCESSING => *length = BodyLength::None, + http::StatusCode::SWITCHING_PROTOCOLS => { + skip_len = true; + *length = BodyLength::Stream; + } + _ => (), + } + let _ = match length { + BodyLength::Chunked | BodyLength::None | BodyLength::Stream => None, + BodyLength::Empty => res + .headers_mut() + .insert(CONTENT_LENGTH, HeaderValue::from_static("0")), + BodyLength::Sized(len) => res.headers_mut().insert( + CONTENT_LENGTH, + HeaderValue::try_from(format!("{}", len)).unwrap(), + ), + BodyLength::Sized64(len) => res.headers_mut().insert( + CONTENT_LENGTH, + HeaderValue::try_from(format!("{}", len)).unwrap(), + ), + }; + + // copy headers + for (key, value) in head.headers.iter() { + match *key { + CONNECTION | TRANSFER_ENCODING => continue, // http2 specific + CONTENT_LENGTH if skip_len => continue, + DATE => has_date = true, + _ => (), + } + res.headers_mut().append(key, value.clone()); + } + + // set date header + if !has_date { + let mut bytes = BytesMut::with_capacity(29); + self.config.set_date_header(&mut bytes); + res.headers_mut() + .insert(DATE, HeaderValue::try_from(bytes.freeze()).unwrap()); + } + + res + } +} + +impl Future for ServiceResponse +where + S: Service, Response = Response> + 'static, + S::Error: Into + fmt::Debug, + B: MessageBody + 'static, +{ + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll { + match self.state { + ServiceResponseState::ServiceCall(ref mut call, ref mut send) => { + match call.poll() { + Ok(Async::Ready(res)) => { + let (res, body) = res.replace_body(()); + + let mut send = send.take().unwrap(); + let mut length = body.length(); + let h2_res = self.prepare_response(res.head(), &mut length); + + let stream = send + .send_response(h2_res, length.is_eof()) + .map_err(|e| { + trace!("Error sending h2 response: {:?}", e); + })?; + + if length.is_eof() { + Ok(Async::Ready(())) + } else { + self.state = ServiceResponseState::SendPayload(stream, body); + self.poll() + } + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => { + let res: Response = e.into().into(); + let (res, body) = res.replace_body(()); + + let mut send = send.take().unwrap(); + let mut length = body.length(); + let h2_res = self.prepare_response(res.head(), &mut length); + + let stream = send + .send_response(h2_res, length.is_eof()) + .map_err(|e| { + trace!("Error sending h2 response: {:?}", e); + })?; + + if length.is_eof() { + Ok(Async::Ready(())) + } else { + self.state = ServiceResponseState::SendPayload( + stream, + body.into_body(), + ); + self.poll() + } + } + } + } + ServiceResponseState::SendPayload(ref mut stream, ref mut body) => loop { + loop { + if let Some(ref mut buffer) = self.buffer { + match stream.poll_capacity().map_err(|e| warn!("{:?}", e))? { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(None) => return Ok(Async::Ready(())), + Async::Ready(Some(cap)) => { + let len = buffer.len(); + let bytes = buffer.split_to(std::cmp::min(cap, len)); + + if let Err(e) = stream.send_data(bytes, false) { + warn!("{:?}", e); + return Err(()); + } else if !buffer.is_empty() { + let cap = std::cmp::min(buffer.len(), CHUNK_SIZE); + stream.reserve_capacity(cap); + } else { + self.buffer.take(); + } + } + } + } else { + match body.poll_next() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => { + if let Err(e) = stream.send_data(Bytes::new(), true) { + warn!("{:?}", e); + return Err(()); + } else { + return Ok(Async::Ready(())); + } + } + Ok(Async::Ready(Some(chunk))) => { + stream.reserve_capacity(std::cmp::min( + chunk.len(), + CHUNK_SIZE, + )); + self.buffer = Some(chunk); + } + Err(e) => { + error!("Response payload stream error: {:?}", e); + return Err(()); + } + } + } + } + }, + } + } +} diff --git a/src/h2/mod.rs b/src/h2/mod.rs index 4a54ec9fe..55e057607 100644 --- a/src/h2/mod.rs +++ b/src/h2/mod.rs @@ -1,7 +1,17 @@ +#![allow(dead_code, unused_imports)] + use std::fmt; +use bytes::Bytes; +use futures::{Async, Poll, Stream}; +use h2::RecvStream; + +mod dispatcher; mod service; +pub use self::service::H2Service; +use crate::error::PayloadError; + /// H1 service response type pub enum H2ServiceResult { Disconnected, @@ -18,3 +28,35 @@ impl fmt::Debug for H2ServiceResult { } } } + +/// H2 receive stream +pub struct Payload { + pl: RecvStream, +} + +impl Payload { + pub(crate) fn new(pl: RecvStream) -> Self { + Self { pl } + } +} + +impl Stream for Payload { + type Item = Bytes; + type Error = PayloadError; + + fn poll(&mut self) -> Poll, Self::Error> { + match self.pl.poll() { + Ok(Async::Ready(Some(chunk))) => { + let len = chunk.len(); + if let Err(err) = self.pl.release_capacity().release_capacity(len) { + Err(err.into()) + } else { + Ok(Async::Ready(Some(chunk))) + } + } + Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => Err(err.into()), + } + } +} diff --git a/src/h2/service.rs b/src/h2/service.rs index 827f84488..b598b0a6d 100644 --- a/src/h2/service.rs +++ b/src/h2/service.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; use std::marker::PhantomData; -use std::net; +use std::{io, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_service::{IntoNewService, NewService, Service}; @@ -8,16 +8,17 @@ use bytes::Bytes; use futures::future::{ok, FutureResult}; use futures::{try_ready, Async, Future, Poll, Stream}; use h2::server::{self, Connection, Handshake}; +use h2::RecvStream; use log::error; use crate::body::MessageBody; use crate::config::{KeepAlive, ServiceConfig}; -use crate::error::{DispatchError, ParseError}; +use crate::error::{DispatchError, Error, ParseError, ResponseError}; use crate::request::Request; use crate::response::Response; -// use super::dispatcher::Dispatcher; -use super::H2ServiceResult; +use super::dispatcher::Dispatcher; +use super::{H2ServiceResult, Payload}; /// `NewService` implementation for HTTP2 transport pub struct H2Service { @@ -28,10 +29,10 @@ pub struct H2Service { impl H2Service where - S: NewService> + Clone, - S::Service: Clone, - S::Error: Debug, - B: MessageBody, + S: NewService, Response = Response> + Clone, + S::Service: Clone + 'static, + S::Error: Into + Debug + 'static, + B: MessageBody + 'static, { /// Create new `HttpService` instance. pub fn new>(service: F) -> Self { @@ -53,14 +54,14 @@ where impl NewService for H2Service where T: AsyncRead + AsyncWrite, - S: NewService> + Clone, - S::Service: Clone, - S::Error: Debug, - B: MessageBody, + S: NewService, Response = Response> + Clone, + S::Service: Clone + 'static, + S::Error: Into + Debug, + B: MessageBody + 'static, { type Request = T; - type Response = H2ServiceResult; - type Error = (); //DispatchError; + type Response = (); + type Error = DispatchError<()>; type InitError = S::InitError; type Service = H2ServiceHandler; type Future = H2ServiceResponse; @@ -90,9 +91,9 @@ pub struct H2ServiceBuilder { impl H2ServiceBuilder where - S: NewService, - S::Service: Clone, - S::Error: Debug, + S: NewService>, + S::Service: Clone + 'static, + S::Error: Into + Debug + 'static, { /// Create instance of `H2ServiceBuilder` pub fn new() -> H2ServiceBuilder { @@ -185,6 +186,25 @@ where self } + // #[cfg(feature = "ssl")] + // /// Configure alpn protocols for SslAcceptorBuilder. + // pub fn configure_openssl( + // builder: &mut openssl::ssl::SslAcceptorBuilder, + // ) -> io::Result<()> { + // let protos: &[u8] = b"\x02h2"; + // builder.set_alpn_select_callback(|_, protos| { + // const H2: &[u8] = b"\x02h2"; + // if protos.windows(3).any(|window| window == H2) { + // Ok(b"h2") + // } else { + // Err(openssl::ssl::AlpnError::NOACK) + // } + // }); + // builder.set_alpn_protos(&protos)?; + + // Ok(()) + // } + /// Finish service configuration and create `H1Service` instance. pub fn finish(self, service: F) -> H2Service where @@ -214,10 +234,10 @@ pub struct H2ServiceResponse { impl Future for H2ServiceResponse where T: AsyncRead + AsyncWrite, - S: NewService>, - S::Service: Clone, - S::Error: Debug, - B: MessageBody, + S: NewService, Response = Response>, + S::Service: Clone + 'static, + S::Error: Into + Debug, + B: MessageBody + 'static, { type Item = H2ServiceHandler; type Error = S::InitError; @@ -240,9 +260,9 @@ pub struct H2ServiceHandler { impl H2ServiceHandler where - S: Service> + Clone, - S::Error: Debug, - B: MessageBody, + S: Service, Response = Response> + Clone + 'static, + S::Error: Into + Debug, + B: MessageBody + 'static, { fn new(cfg: ServiceConfig, srv: S) -> H2ServiceHandler { H2ServiceHandler { @@ -256,55 +276,79 @@ where impl Service for H2ServiceHandler where T: AsyncRead + AsyncWrite, - S: Service> + Clone, - S::Error: Debug, - B: MessageBody, + S: Service, Response = Response> + Clone + 'static, + S::Error: Into + Debug, + B: MessageBody + 'static, { type Request = T; - type Response = H2ServiceResult; - type Error = (); // DispatchError; + type Response = (); + type Error = DispatchError<()>; type Future = H2ServiceHandlerResponse; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.srv.poll_ready().map_err(|_| ()) + self.srv.poll_ready().map_err(|e| { + error!("Service readiness error: {:?}", e); + DispatchError::Service(()) + }) } fn call(&mut self, req: T) -> Self::Future { H2ServiceHandlerResponse { - state: State::Handshake(server::handshake(req)), - _t: PhantomData, + state: State::Handshake( + Some(self.srv.clone()), + Some(self.cfg.clone()), + server::handshake(req), + ), } } } -enum State { - Handshake(Handshake), - Connection(Connection), - Empty, +enum State { + Incoming(Dispatcher), + Handshake(Option, Option, Handshake), } pub struct H2ServiceHandlerResponse where T: AsyncRead + AsyncWrite, - S: Service> + Clone, - S::Error: Debug, - B: MessageBody, + S: Service, Response = Response> + Clone + 'static, + S::Error: Into + Debug, + B: MessageBody + 'static, { - state: State, - _t: PhantomData, + state: State, } impl Future for H2ServiceHandlerResponse where T: AsyncRead + AsyncWrite, - S: Service> + Clone, - S::Error: Debug, + S: Service, Response = Response> + Clone, + S::Error: Into + Debug, B: MessageBody, { - type Item = H2ServiceResult; - type Error = (); + type Item = (); + type Error = DispatchError<()>; fn poll(&mut self) -> Poll { - unimplemented!() + match self.state { + State::Incoming(ref mut disp) => disp.poll(), + State::Handshake(ref mut srv, ref mut config, ref mut handshake) => { + match handshake.poll() { + Ok(Async::Ready(conn)) => { + self.state = State::Incoming(Dispatcher::new( + srv.take().unwrap(), + conn, + config.take().unwrap(), + None, + )); + self.poll() + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => { + trace!("H2 handshake error: {}", err); + return Err(err.into()); + } + } + } + } } } diff --git a/src/httpmessage.rs b/src/httpmessage.rs index 589617fc9..c50de2e94 100644 --- a/src/httpmessage.rs +++ b/src/httpmessage.rs @@ -25,7 +25,7 @@ pub trait HttpMessage: Sized { fn headers(&self) -> &HeaderMap; /// Message payload stream - fn payload(&self) -> Self::Stream; + fn payload(self) -> Self::Stream; #[doc(hidden)] /// Get a header @@ -128,7 +128,7 @@ pub trait HttpMessage: Sized { /// } /// # fn main() {} /// ``` - fn body(&self) -> MessageBody { + fn body(self) -> MessageBody { MessageBody::new(self) } @@ -162,7 +162,7 @@ pub trait HttpMessage: Sized { /// } /// # fn main() {} /// ``` - fn urlencoded(&self) -> UrlEncoded { + fn urlencoded(self) -> UrlEncoded { UrlEncoded::new(self) } @@ -198,12 +198,12 @@ pub trait HttpMessage: Sized { /// } /// # fn main() {} /// ``` - fn json(&self) -> JsonBody { + fn json(self) -> JsonBody { JsonBody::new(self) } /// Return stream of lines. - fn readlines(&self) -> Readlines { + fn readlines(self) -> Readlines { Readlines::new(self) } } @@ -220,7 +220,7 @@ pub struct Readlines { impl Readlines { /// Create a new stream to read request line by line. - fn new(req: &T) -> Self { + fn new(req: T) -> Self { let encoding = match req.encoding() { Ok(enc) => enc, Err(err) => return Self::err(req, err.into()), @@ -242,7 +242,7 @@ impl Readlines { self } - fn err(req: &T, err: ReadlinesError) -> Self { + fn err(req: T, err: ReadlinesError) -> Self { Readlines { stream: req.payload(), buff: BytesMut::new(), @@ -362,7 +362,7 @@ pub struct MessageBody { impl MessageBody { /// Create `MessageBody` for request. - pub fn new(req: &T) -> MessageBody { + pub fn new(req: T) -> MessageBody { let mut len = None; if let Some(l) = req.headers().get(header::CONTENT_LENGTH) { if let Ok(s) = l.to_str() { @@ -457,7 +457,7 @@ pub struct UrlEncoded { impl UrlEncoded { /// Create a new future to URL encode a request - pub fn new(req: &T) -> UrlEncoded { + pub fn new(req: T) -> UrlEncoded { // check content type if req.content_type().to_lowercase() != "application/x-www-form-urlencoded" { return Self::err(UrlencodedError::ContentType); @@ -800,7 +800,7 @@ mod tests { Contrary to popular belief, Lorem Ipsum is not simply random text.", )) .finish(); - let mut r = Readlines::new(&req); + let mut r = Readlines::new(req); match r.poll().ok().unwrap() { Async::Ready(Some(s)) => assert_eq!( s, diff --git a/src/json.rs b/src/json.rs index d06449cb0..fc1ab4d25 100644 --- a/src/json.rs +++ b/src/json.rs @@ -50,7 +50,7 @@ pub struct JsonBody { impl JsonBody { /// Create `JsonBody` for request. - pub fn new(req: &T) -> Self { + pub fn new(req: T) -> Self { // check content-type let json = if let Ok(Some(mime)) = req.mime_type() { mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON) diff --git a/src/message.rs b/src/message.rs index 31d61f63d..a73392221 100644 --- a/src/message.rs +++ b/src/message.rs @@ -5,7 +5,6 @@ use std::rc::Rc; use http::{HeaderMap, Method, StatusCode, Uri, Version}; use crate::extensions::Extensions; -use crate::payload::Payload; /// Represents various types of connection #[derive(Copy, Clone, PartialEq, Debug)] @@ -149,7 +148,6 @@ impl ResponseHead { pub struct Message { pub head: T, pub extensions: RefCell, - pub payload: RefCell>, pub(crate) pool: &'static MessagePool, } @@ -159,7 +157,6 @@ impl Message { pub fn reset(&mut self) { self.head.clear(); self.extensions.borrow_mut().clear(); - *self.payload.borrow_mut() = None; } } @@ -168,7 +165,6 @@ impl Default for Message { Message { pool: T::pool(), head: T::default(), - payload: RefCell::new(None), extensions: RefCell::new(Extensions::new()), } } diff --git a/src/request.rs b/src/request.rs index 60ddee19b..b60a772e1 100644 --- a/src/request.rs +++ b/src/request.rs @@ -2,43 +2,76 @@ use std::cell::{Ref, RefMut}; use std::fmt; use std::rc::Rc; +use bytes::Bytes; +use futures::Stream; use http::{header, HeaderMap, Method, Uri, Version}; +use crate::error::PayloadError; use crate::extensions::Extensions; use crate::httpmessage::HttpMessage; use crate::message::{Message, MessagePool, RequestHead}; use crate::payload::Payload; /// Request -pub struct Request { +pub struct Request

{ + pub(crate) payload: Option

, pub(crate) inner: Rc>, } -impl HttpMessage for Request { - type Stream = Payload; +impl

HttpMessage for Request

+where + P: Stream, +{ + type Stream = P; fn headers(&self) -> &HeaderMap { &self.inner.head.headers } #[inline] - fn payload(&self) -> Payload { - if let Some(payload) = self.inner.payload.borrow_mut().take() { - payload - } else { - Payload::empty() + fn payload(mut self) -> P { + self.payload.take().unwrap() + } +} + +impl Request { + /// Create new Request instance + pub fn new() -> Request { + Request { + payload: Some(Payload::empty()), + inner: MessagePool::get_message(), } } } -impl Request { +impl Request { /// Create new Request instance - pub fn new() -> Request { + pub fn with_payload(payload: Payload) -> Request { Request { + payload: Some(payload), inner: MessagePool::get_message(), } } + /// Create new Request instance + pub fn set_payload

(self, payload: P) -> Request

{ + Request { + payload: Some(payload), + inner: self.inner.clone(), + } + } + + /// Take request's payload + pub fn take_payload(mut self) -> (Payload, Request<()>) { + ( + self.payload.take().unwrap(), + Request { + payload: Some(()), + inner: self.inner.clone(), + }, + ) + } + // /// Create new Request instance with pool // pub(crate) fn with_pool(pool: &'static MessagePool) -> Request { // Request { @@ -143,17 +176,17 @@ impl Request { self.inner().head.method == Method::CONNECT } - #[doc(hidden)] - /// Note: this method should be called only as part of clone operation - /// of wrapper type. - pub fn clone_request(&self) -> Self { - Request { - inner: self.inner.clone(), - } - } + // #[doc(hidden)] + // /// Note: this method should be called only as part of clone operation + // /// of wrapper type. + // pub fn clone_request(&self) -> Self { + // Request { + // inner: self.inner.clone(), + // } + // } } -impl Drop for Request { +impl Drop for Request { fn drop(&mut self) { if Rc::strong_count(&self.inner) == 1 { self.inner.pool.release(self.inner.clone()); @@ -161,7 +194,7 @@ impl Drop for Request { } } -impl fmt::Debug for Request { +impl fmt::Debug for Request { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { writeln!( f, diff --git a/src/test.rs b/src/test.rs index a26f31e59..852dd3c0b 100644 --- a/src/test.rs +++ b/src/test.rs @@ -148,15 +148,18 @@ impl TestRequest { .. } = self; - let mut req = Request::new(); - { - let inner = req.inner_mut(); - inner.head.uri = uri; - inner.head.method = method; - inner.head.version = version; - inner.head.headers = headers; - *inner.payload.borrow_mut() = payload; - } + let mut req = if let Some(pl) = payload { + Request::with_payload(pl) + } else { + Request::with_payload(Payload::empty()) + }; + + let inner = req.inner_mut(); + inner.head.uri = uri; + inner.head.method = method; + inner.head.version = version; + inner.head.headers = headers; + // req.set_cookies(cookies); req } diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index 81a3d909c..9c71a25c0 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -28,6 +28,9 @@ default = ["session"] # sessions feature, session require "ring" crate and c compiler session = ["cookie/secure"] +# openssl +ssl = ["openssl", "actix-http/ssl", "actix-server/ssl"] + [dependencies] actix-codec = "0.1" actix-service = "0.2.0" @@ -52,3 +55,5 @@ serde_urlencoded = "0.5.3" time = "0.1" tokio-tcp = "0.1" tokio-timer = "0.2" + +openssl = { version="0.10", optional = true } diff --git a/test-server/src/lib.rs b/test-server/src/lib.rs index 8083ebb15..3d6d917e5 100644 --- a/test-server/src/lib.rs +++ b/test-server/src/lib.rs @@ -3,6 +3,12 @@ use std::sync::mpsc; use std::{net, thread}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; +use actix_http::body::MessageBody; +use actix_http::client::{ + ClientRequest, ClientRequestBuilder, ClientResponse, Connect, Connection, Connector, + ConnectorError, SendRequestError, +}; +use actix_http::ws; use actix_rt::{Runtime, System}; use actix_server::{Server, StreamServiceFactory}; use actix_service::Service; @@ -11,13 +17,6 @@ use futures::future::{lazy, Future}; use http::Method; use net2::TcpBuilder; -use actix_http::body::MessageBody; -use actix_http::client::{ - ClientRequest, ClientRequestBuilder, ClientResponse, Connect, Connection, Connector, - ConnectorError, SendRequestError, -}; -use actix_http::ws; - /// The `TestServer` type. /// /// `TestServer` is very simple test server that simplify process of writing @@ -101,6 +100,9 @@ impl TestServer { let mut builder = SslConnector::builder(SslMethod::tls()).unwrap(); builder.set_verify(SslVerifyMode::NONE); + let _ = builder + .set_alpn_protos(b"\x02h2\x08http/1.1") + .map_err(|e| log::error!("Can not set alpn protocol: {:?}", e)); Connector::default().ssl(builder.build()).service() } #[cfg(not(feature = "ssl"))] @@ -151,6 +153,15 @@ impl TestServerRuntime { } } + /// Construct test https server url + pub fn surl(&self, uri: &str) -> String { + if uri.starts_with('/') { + format!("https://127.0.0.1:{}{}", self.addr.port(), uri) + } else { + format!("https://127.0.0.1:{}/{}", self.addr.port(), uri) + } + } + /// Create `GET` request pub fn get(&self) -> ClientRequestBuilder { ClientRequest::get(self.url("/").as_str()) diff --git a/tests/cert.pem b/tests/cert.pem index db04fbfae..5e195d98d 100644 --- a/tests/cert.pem +++ b/tests/cert.pem @@ -1,31 +1,16 @@ -----BEGIN CERTIFICATE----- -MIIFXTCCA0WgAwIBAgIJAJ3tqfd0MLLNMA0GCSqGSIb3DQEBCwUAMGExCzAJBgNV -BAYTAlVTMQswCQYDVQQIDAJDRjELMAkGA1UEBwwCU0YxEDAOBgNVBAoMB0NvbXBh -bnkxDDAKBgNVBAsMA09yZzEYMBYGA1UEAwwPd3d3LmV4YW1wbGUuY29tMB4XDTE4 -MDcyOTE4MDgzNFoXDTE5MDcyOTE4MDgzNFowYTELMAkGA1UEBhMCVVMxCzAJBgNV -BAgMAkNGMQswCQYDVQQHDAJTRjEQMA4GA1UECgwHQ29tcGFueTEMMAoGA1UECwwD -T3JnMRgwFgYDVQQDDA93d3cuZXhhbXBsZS5jb20wggIiMA0GCSqGSIb3DQEBAQUA -A4ICDwAwggIKAoICAQDZbMgDYilVH1Nv0QWEhOXG6ETmtjZrdLqrNg3NBWBIWCDF -cQ+fyTWxARx6vkF8A/3zpJyTcfQW8HgG38jw/A61QKaHBxzwq0HlNwY9Hh+Neeuk -L4wgrlQ0uTC7IEMrOJjNN0GPyRQVfVbGa8QcSCpOg85l8GCxLvVwkBH/M5atoMtJ -EzniNfK+gtk3hOL2tBqBCu9NDjhXPnJwNDLtTG1tQaHUJW/r281Wvv9I46H83DkU -05lYtauh0bKh5znCH2KpFmBGqJNRzou3tXZFZzZfaCPBJPZR8j5TjoinehpDtkPh -4CSio0PF2eIFkDKRUbdz/327HgEARJMXx+w1yHpS2JwHFgy5O76i68/Smx8j3DDA -2WIkOYAJFRMH0CBHKdsvUDOGpCgN+xv3whl+N806nCfC4vCkwA+FuB3ko11logng -dvr+y0jIUSU4THF3dMDEXYayF3+WrUlw0cBnUNJdXky85ZP81aBfBsjNSBDx4iL4 -e4NhfZRS5oHpHy1t3nYfuttS/oet+Ke5KUpaqNJguSIoeTBSmgzDzL1TJxFLOzUT -2c/A9M69FdvSY0JB4EJX0W9K01Vd0JRNPwsY+/zvFIPama3suKOUTqYcsbwxx9xa -TMDr26cIQcgUAUOKZO43sQGWNzXX3FYVNwczKhkB8UX6hOrBJsEYiau4LGdokQID -AQABoxgwFjAUBgNVHREEDTALgglsb2NhbGhvc3QwDQYJKoZIhvcNAQELBQADggIB -AIX+Qb4QRBxHl5X2UjRyLfWVkimtGlwI8P+eJZL3DrHBH/TpqAaCvTf0EbRC32nm -ASDMwIghaMvyrW40QN6V/CWRRi25cXUfsIZr1iHAHK0eZJV8SWooYtt4iNrcUs3g -4OTvDxhNmDyNwV9AXhJsBKf80dCW6/84jItqVAj20/OO4Rkd2tEeI8NomiYBc6a1 -hgwvv02myYF5hG/xZ9YSqeroBCZHwGYoJJnSpMPqJsxbCVnx2/U9FzGwcRmNHFCe -0g7EJZd3//8Plza6nkTBjJ/V7JnLqMU+ltx4mAgZO8rfzIr84qZdt0YN33VJQhYq -seuMySxrsuaAoxAmm8IoK9cW4IPzx1JveBQiroNlq5YJGf2UW7BTc3gz6c2tINZi -7ailBVdhlMnDXAf3/9xiiVlRAHOxgZh/7sRrKU7kDEHM4fGoc0YyZBTQKndPYMwO -3Bd82rlQ4sd46XYutTrB+mBYClVrJs+OzbNedTsR61DVNKKsRG4mNPyKSAIgOfM5 -XmSvCMPN5JK9U0DsNIV2/SnVsmcklQczT35FLTxl9ntx8ys7ZYK+SppD7XuLfWMq -GT9YMWhlpw0aRDg/aayeeOcnsNBhzAFMcOpQj1t6Fgv4+zbS9BM2bT0hbX86xjkr -E6wWgkuCslMgQlEJ+TM5RhYrI5/rVZQhvmgcob/9gPZv +MIICljCCAX4CCQDFdWu66640QjANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJ1 +czAeFw0xOTAyMDQyMzEyNTBaFw0yMDAyMDQyMzEyNTBaMA0xCzAJBgNVBAYTAnVz +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzZUXMnS5X8HWxTvHAc82 +Q2d32fiPQGtD+fp3OV90l6RC9jgMdH4yTVUgX5mYYcW0k89RaP8g61H6b76F9gcd +yZ1idqKI1AU9aeBUPV8wkrouhR/6Omv8fA7yr9tVmNo53jPN7WyKoBoU0r7Yj9Ez +g3qjv/808Jlgby3EhduruyyfdvSt5ZFXnOz2D3SF9DS4yrM2jSw4ZTuoVMfZ8vZe +FVzLo/+sV8qokU6wBTEOAmZQ7e/zZV4qAoH2Z3Vj/uD1Zr/MXYyh81RdXpDqIXwV +Z29LEOa2eTGFEdvfG+tdvvuIvSdF3+WbLrwn2ECfwJ8zmKyTauPRV4pj7ks+wkBI +EQIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQB6dmuWBOpFfDdu0mdsDb8XnJY1svjH +4kbztXhjQJ/WuhCUIwvXFyz9dqQCq+TbJUbUEzZJEfaq1uaI3iB5wd35ArSoAGJA +k0lonzyeSM+cmNOe/5BPqWhd1qPwbsfgMoCCkZUoTT5Rvw6yt00XIqZzMqrsvRBX +hAcUW3zBtFQNP6aQqsMdn4ClZE0WHf+LzWy2NQh+Sf46tSYBHELfdUawgR789PB4 +/gNjAeklq06JmE/3gELijwaijVIuUsMC9ua//ITk4YIFpqanPtka+7BpfTegPGNs +HCj1g7Jot97oQMuvDOJeso91aiSA+gutepCClZICT8LxNRkY3ZlXYp92 -----END CERTIFICATE----- diff --git a/tests/key.pem b/tests/key.pem index aac387c64..50ded0ce0 100644 --- a/tests/key.pem +++ b/tests/key.pem @@ -1,51 +1,28 @@ ------BEGIN RSA PRIVATE KEY----- -MIIJKAIBAAKCAgEA2WzIA2IpVR9Tb9EFhITlxuhE5rY2a3S6qzYNzQVgSFggxXEP -n8k1sQEcer5BfAP986Sck3H0FvB4Bt/I8PwOtUCmhwcc8KtB5TcGPR4fjXnrpC+M -IK5UNLkwuyBDKziYzTdBj8kUFX1WxmvEHEgqToPOZfBgsS71cJAR/zOWraDLSRM5 -4jXyvoLZN4Ti9rQagQrvTQ44Vz5ycDQy7UxtbUGh1CVv69vNVr7/SOOh/Nw5FNOZ -WLWrodGyoec5wh9iqRZgRqiTUc6Lt7V2RWc2X2gjwST2UfI+U46Ip3oaQ7ZD4eAk -oqNDxdniBZAykVG3c/99ux4BAESTF8fsNch6UticBxYMuTu+ouvP0psfI9wwwNli -JDmACRUTB9AgRynbL1AzhqQoDfsb98IZfjfNOpwnwuLwpMAPhbgd5KNdZaIJ4Hb6 -/stIyFElOExxd3TAxF2Gshd/lq1JcNHAZ1DSXV5MvOWT/NWgXwbIzUgQ8eIi+HuD -YX2UUuaB6R8tbd52H7rbUv6HrfinuSlKWqjSYLkiKHkwUpoMw8y9UycRSzs1E9nP -wPTOvRXb0mNCQeBCV9FvStNVXdCUTT8LGPv87xSD2pmt7LijlE6mHLG8McfcWkzA -69unCEHIFAFDimTuN7EBljc119xWFTcHMyoZAfFF+oTqwSbBGImruCxnaJECAwEA -AQKCAgAME3aoeXNCPxMrSri7u4Xnnk71YXl0Tm9vwvjRQlMusXZggP8VKN/KjP0/ -9AE/GhmoxqPLrLCZ9ZE1EIjgmZ9Xgde9+C8rTtfCG2RFUL7/5J2p6NonlocmxoJm -YkxYwjP6ce86RTjQWL3RF3s09u0inz9/efJk5O7M6bOWMQ9VZXDlBiRY5BYvbqUR -6FeSzD4MnMbdyMRoVBeXE88gTvZk8xhB6DJnLzYgc0tKiRoeKT0iYv5JZw25VyRM -ycLzfTrFmXCPfB1ylb483d9Ly4fBlM8nkx37PzEnAuukIawDxsPOb9yZC+hfvNJI -7NFiMN+3maEqG2iC00w4Lep4skHY7eHUEUMl+Wjr+koAy2YGLWAwHZQTm7iXn9Ab -L6adL53zyCKelRuEQOzbeosJAqS+5fpMK0ekXyoFIuskj7bWuIoCX7K/kg6q5IW+ -vC2FrlsrbQ79GztWLVmHFO1I4J9M5r666YS0qdh8c+2yyRl4FmSiHfGxb3eOKpxQ -b6uI97iZlkxPF9LYUCSc7wq0V2gGz+6LnGvTHlHrOfVXqw/5pLAKhXqxvnroDTwz -0Ay/xFF6ei/NSxBY5t8ztGCBm45wCU3l8pW0X6dXqwUipw5b4MRy1VFRu6rqlmbL -OPSCuLxqyqsigiEYsBgS/icvXz9DWmCQMPd2XM9YhsHvUq+R4QKCAQEA98EuMMXI -6UKIt1kK2t/3OeJRyDd4iv/fCMUAnuPjLBvFE4cXD/SbqCxcQYqb+pue3PYkiTIC -71rN8OQAc5yKhzmmnCE5N26br/0pG4pwEjIr6mt8kZHmemOCNEzvhhT83nfKmV0g -9lNtuGEQMiwmZrpUOF51JOMC39bzcVjYX2Cmvb7cFbIq3lR0zwM+aZpQ4P8LHCIu -bgHmwbdlkLyIULJcQmHIbo6nPFB3ZZE4mqmjwY+rA6Fh9rgBa8OFCfTtrgeYXrNb -IgZQ5U8GoYRPNC2ot0vpTinraboa/cgm6oG4M7FW1POCJTl+/ktHEnKuO5oroSga -/BSg7hCNFVaOhwKCAQEA4Kkys0HtwEbV5mY/NnvUD5KwfXX7BxoXc9lZ6seVoLEc -KjgPYxqYRVrC7dB2YDwwp3qcRTi/uBAgFNm3iYlDzI4xS5SeaudUWjglj7BSgXE2 -iOEa7EwcvVPluLaTgiWjlzUKeUCNNHWSeQOt+paBOT+IgwRVemGVpAgkqQzNh/nP -tl3p9aNtgzEm1qVlPclY/XUCtf3bcOR+z1f1b4jBdn0leu5OhnxkC+Htik+2fTXD -jt6JGrMkanN25YzsjnD3Sn+v6SO26H99wnYx5oMSdmb8SlWRrKtfJHnihphjG/YY -l1cyorV6M/asSgXNQfGJm4OuJi0I4/FL2wLUHnU+JwKCAQEAzh4WipcRthYXXcoj -gMKRkMOb3GFh1OpYqJgVExtudNTJmZxq8GhFU51MR27Eo7LycMwKy2UjEfTOnplh -Us2qZiPtW7k8O8S2m6yXlYUQBeNdq9IuuYDTaYD94vsazscJNSAeGodjE+uGvb1q -1wLqE87yoE7dUInYa1cOA3+xy2/CaNuviBFJHtzOrSb6tqqenQEyQf6h9/12+DTW -t5pSIiixHrzxHiFqOoCLRKGToQB+71rSINwTf0nITNpGBWmSj5VcC3VV3TG5/XxI -fPlxV2yhD5WFDPVNGBGvwPDSh4jSMZdZMSNBZCy4XWFNSKjGEWoK4DFYed3DoSt9 -5IG1YwKCAQA63ntHl64KJUWlkwNbboU583FF3uWBjee5VqoGKHhf3CkKMxhtGqnt -+oN7t5VdUEhbinhqdx1dyPPvIsHCS3K1pkjqii4cyzNCVNYa2dQ00Qq+QWZBpwwc -3GAkz8rFXsGIPMDa1vxpU6mnBjzPniKMcsZ9tmQDppCEpBGfLpio2eAA5IkK8eEf -cIDB3CM0Vo94EvI76CJZabaE9IJ+0HIJb2+jz9BJ00yQBIqvJIYoNy9gP5Xjpi+T -qV/tdMkD5jwWjHD3AYHLWKUGkNwwkAYFeqT/gX6jpWBP+ZRPOp011X3KInJFSpKU -DT5GQ1Dux7EMTCwVGtXqjO8Ym5wjwwsfAoIBAEcxlhIW1G6BiNfnWbNPWBdh3v/K -5Ln98Rcrz8UIbWyl7qNPjYb13C1KmifVG1Rym9vWMO3KuG5atK3Mz2yLVRtmWAVc -fxzR57zz9MZFDun66xo+Z1wN3fVxQB4CYpOEI4Lb9ioX4v85hm3D6RpFukNtRQEc -Gfr4scTjJX4jFWDp0h6ffMb8mY+quvZoJ0TJqV9L9Yj6Ksdvqez/bdSraev97bHQ -4gbQxaTZ6WjaD4HjpPQefMdWp97Metg0ZQSS8b8EzmNFgyJ3XcjirzwliKTAQtn6 -I2sd0NCIooelrKRD8EJoDUwxoOctY7R97wpZ7/wEHU45cBCbRV3H4JILS5c= ------END RSA PRIVATE KEY----- +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDNlRcydLlfwdbF +O8cBzzZDZ3fZ+I9Aa0P5+nc5X3SXpEL2OAx0fjJNVSBfmZhhxbSTz1Fo/yDrUfpv +voX2Bx3JnWJ2oojUBT1p4FQ9XzCSui6FH/o6a/x8DvKv21WY2jneM83tbIqgGhTS +vtiP0TODeqO//zTwmWBvLcSF26u7LJ929K3lkVec7PYPdIX0NLjKszaNLDhlO6hU +x9ny9l4VXMuj/6xXyqiRTrAFMQ4CZlDt7/NlXioCgfZndWP+4PVmv8xdjKHzVF1e +kOohfBVnb0sQ5rZ5MYUR298b612++4i9J0Xf5ZsuvCfYQJ/AnzOYrJNq49FXimPu +Sz7CQEgRAgMBAAECggEBALC547EaKmko5wmyM4dYq9sRzTPxuqO0EkGIkIkfh8j8 +ChxDXmGeQnu8HBJSpW4XWP5fkCpkd9YTKOh6rgorX+37f7NgUaOBxaOIlqITfFwF +9Qu3y5IBVpEHAJUwRcsaffiILBRX5GtxQElSijRHsLLr8GySZN4X25B3laNEjcJe +NWJrDaxOn0m0MMGRvBpM8PaZu1Mn9NWxt04b/fteVLdN4TAcuY9TgvVZBq92S2FM +qvZcnJCQckNOuMOptVdP45qPkerKUohpOcqBfIiWFaalC378jE3Dm68p7slt3R6y +I1wVqCI4+MZfM3CtKcYJV0fdqklJCvXORvRiT8OZKakCgYEA5YnhgXOu4CO4DR1T +Lacv716DPyHeKVa6TbHhUhWw4bLwNLUsEL98jeU9SZ6VH8enBlDm5pCsp2i//t9n +8hoykN4L0rS4EyAGENouTRkLhtHfjTAKTKDK8cNvEaS8NOBJWrI0DTiHtFbCRBvI +zRx5VhrB5H4DDbqn7QV9g+GBKvMCgYEA5Ug3bN0RNUi3KDoIRcnWc06HsX307su7 +tB4cGqXJqVOJCrkk5sefGF502+W8m3Ldjaakr+Q9BoOdZX6boZnFtVetT8Hyzk1C +Rkiyz3GcwovOkQK//UmljsuRjgHF+PuQGX5ol4YlJtXU21k5bCsi1Tmyp7IufiGV +AQRMVZVbeesCgYA/QBZGwKTgmJcf7gO8ocRAto999wwr4f0maazIHLgICXHNZFsH +JmzhANk5jxxSjIaG5AYsZJNe8ittxQv0l6l1Z+pkHm5Wvs1NGYIGtq8JcI2kbyd3 +ZBtoMU1K1FUUUPWFq3NSbVBfrkSL1ggoFP+ObYMePmcDAntBgfDLRXl9ZwKBgQCt +/dh5l2UIn27Gawt+EkXX6L8WVTQ6xoZhj/vZyPe4tDip14gGTXQQ5RUfDj7LZCZ2 +6P/OrpAU0mnt7F8kCfI7xBY0EUU1gvGJLn/q5heElt2hs4mIJ4woSZjiP7xBTn2y +qveqDNVCnEBUWGg4Cp/7WTaXBaM8ejV9uQpIY/gwEwKBgQCCYnd9fD8L4nGyOLoD +eUzMV7G8TZfinlxCNMVXfpn4Z8OaYHOk5NiujHK55w4ghx06NQw038qhnX0ogbjU +caWOwCIbrYgx2fwYuOZbJFXdjWlpjIK3RFOcbNCNgCRLT6Lgz4uZYZ9RVftADvMi +zR1QsLWnIvARbTtOPfZqizT2gQ== +-----END PRIVATE KEY----- diff --git a/tests/test_server.rs b/tests/test_server.rs index c23840ead..9fa27e71b 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -5,16 +5,16 @@ use std::{net, thread}; use actix_http_test::TestServer; use actix_service::NewService; use bytes::Bytes; -use futures::future::{self, ok}; +use futures::future::{self, ok, Future}; use futures::stream::once; use actix_http::{ - body, client, h1, http, Body, Error, HttpMessage as HttpMessage2, KeepAlive, + body, client, h1, h2, http, Body, Error, HttpMessage as HttpMessage2, KeepAlive, Request, Response, }; #[test] -fn test_h1_v2() { +fn test_h1() { let mut srv = TestServer::with_factory(|| { h1::H1Service::build() .keep_alive(KeepAlive::Disabled) @@ -30,6 +30,85 @@ fn test_h1_v2() { assert!(response.status().is_success()); } +#[cfg(feature = "ssl")] +fn ssl_acceptor() -> std::io::Result> { + use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; + // load ssl keys + let mut builder = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); + builder + .set_private_key_file("tests/key.pem", SslFiletype::PEM) + .unwrap(); + builder + .set_certificate_chain_file("tests/cert.pem") + .unwrap(); + builder.set_alpn_select_callback(|_, protos| { + const H2: &[u8] = b"\x02h2"; + if protos.windows(3).any(|window| window == H2) { + Ok(b"h2") + } else { + Err(openssl::ssl::AlpnError::NOACK) + } + }); + builder.set_alpn_protos(b"\x02h2")?; + Ok(actix_server::ssl::OpensslAcceptor::new(builder.build())) +} + +#[cfg(feature = "ssl")] +#[test] +fn test_h2() -> std::io::Result<()> { + let openssl = ssl_acceptor()?; + let mut srv = TestServer::with_factory(move || { + openssl + .clone() + .map_err(|e| println!("Openssl error: {}", e)) + .and_then( + h2::H2Service::build() + .finish(|_| future::ok::<_, Error>(Response::Ok().finish())) + .map_err(|_| ()), + ) + }); + + let req = client::ClientRequest::get(srv.surl("/")).finish().unwrap(); + let response = srv.send_request(req).unwrap(); + println!("RES: {:?}", response); + assert!(response.status().is_success()); + Ok(()) +} + +#[cfg(feature = "ssl")] +#[test] +fn test_h2_body() -> std::io::Result<()> { + // std::env::set_var("RUST_LOG", "actix_http=trace"); + // env_logger::init(); + + let data = "HELLOWORLD".to_owned().repeat(64 * 1024); + let openssl = ssl_acceptor()?; + let mut srv = TestServer::with_factory(move || { + openssl + .clone() + .map_err(|e| println!("Openssl error: {}", e)) + .and_then( + h2::H2Service::build() + .finish(|req: Request<_>| { + req.body() + .limit(1024 * 1024) + .and_then(|body| Ok(Response::Ok().body(body))) + }) + .map_err(|_| ()), + ) + }); + + let req = client::ClientRequest::get(srv.surl("/")) + .body(data.clone()) + .unwrap(); + let response = srv.send_request(req).unwrap(); + assert!(response.status().is_success()); + + let body = srv.block_on(response.body().limit(1024 * 1024)).unwrap(); + assert_eq!(&body, data.as_bytes()); + Ok(()) +} + #[test] fn test_slow_request() { let srv = TestServer::with_factory(|| {