diff --git a/examples/client.rs b/examples/client.rs new file mode 100644 index 000000000..06b708e20 --- /dev/null +++ b/examples/client.rs @@ -0,0 +1,33 @@ +use actix_http::{client, Error}; +use actix_rt::System; +use bytes::BytesMut; +use futures::{future::lazy, Future, Stream}; + +fn main() -> Result<(), Error> { + std::env::set_var("RUST_LOG", "actix_http=trace"); + env_logger::init(); + + System::new("test").block_on(lazy(|| { + let mut connector = client::Connector::default().service(); + + client::ClientRequest::get("https://www.rust-lang.org/") // <- Create request builder + .header("User-Agent", "Actix-web") + .finish() + .unwrap() + .send(&mut connector) // <- Send http request + .from_err() + .and_then(|response| { + // <- server http response + println!("Response: {:?}", response); + + // read response body + response + .from_err() + .fold(BytesMut::new(), move |mut acc, chunk| { + acc.extend_from_slice(&chunk); + Ok::<_, Error>(acc) + }) + .map(|body| println!("Downloaded: {:?} bytes", body.len())) + }) + })) +} diff --git a/src/client/connection.rs b/src/client/connection.rs index ed156bf84..b192caaeb 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -1,11 +1,35 @@ -use std::{fmt, io, time}; +use std::{fmt, time}; use actix_codec::{AsyncRead, AsyncWrite}; -use futures::Poll; +use bytes::Bytes; +use futures::Future; +use h2::client::SendRequest; +use crate::body::MessageBody; +use crate::message::RequestHead; + +use super::error::SendRequestError; use super::pool::Acquired; +use super::response::ClientResponse; +use super::{h1proto, h2proto}; -pub trait Connection: AsyncRead + AsyncWrite + 'static { +pub(crate) enum ConnectionType { + H1(Io), + H2(SendRequest), +} + +pub trait RequestSender { + type Future: Future; + + /// Close connection + fn send_request( + self, + head: RequestHead, + body: B, + ) -> Self::Future; +} + +pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static { /// Close connection fn close(&mut self); @@ -16,7 +40,7 @@ pub trait Connection: AsyncRead + AsyncWrite + 'static { #[doc(hidden)] /// HTTP client connection pub struct IoConnection { - io: Option, + io: Option>, created: time::Instant, pool: Option>, } @@ -26,77 +50,83 @@ where T: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Connection {:?}", self.io) + match self.io { + Some(ConnectionType::H1(ref io)) => write!(f, "H1Connection({:?})", io), + Some(ConnectionType::H2(_)) => write!(f, "H2Connection"), + None => write!(f, "Connection(Empty)"), + } } } impl IoConnection { - pub(crate) fn new(io: T, created: time::Instant, pool: Acquired) -> Self { + pub(crate) fn new( + io: ConnectionType, + created: time::Instant, + pool: Option>, + ) -> Self { IoConnection { + pool, created, io: Some(io), - pool: Some(pool), } } - /// Raw IO stream - pub fn get_mut(&mut self) -> &mut T { - self.io.as_mut().unwrap() - } - - pub(crate) fn into_inner(self) -> (T, time::Instant) { + pub(crate) fn into_inner(self) -> (ConnectionType, time::Instant) { (self.io.unwrap(), self.created) } } -impl Connection for IoConnection { - /// Close connection - fn close(&mut self) { - if let Some(mut pool) = self.pool.take() { - if let Some(io) = self.io.take() { - pool.close(IoConnection { - io: Some(io), - created: self.created, - pool: None, - }) - } - } - } +impl RequestSender for IoConnection +where + T: AsyncRead + AsyncWrite + 'static, +{ + type Future = Box>; - /// Release this connection to the connection pool - fn release(&mut self) { - if let Some(mut pool) = self.pool.take() { - if let Some(io) = self.io.take() { - pool.release(IoConnection { - io: Some(io), - created: self.created, - pool: None, - }) - } + fn send_request( + mut self, + head: RequestHead, + body: B, + ) -> Self::Future { + match self.io.take().unwrap() { + ConnectionType::H1(io) => Box::new(h1proto::send_request( + io, + head, + body, + self.created, + self.pool, + )), + ConnectionType::H2(io) => Box::new(h2proto::send_request( + io, + head, + body, + self.created, + self.pool, + )), } } } -impl io::Read for IoConnection { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.io.as_mut().unwrap().read(buf) - } +#[allow(dead_code)] +pub(crate) enum EitherConnection { + A(IoConnection), + B(IoConnection), } -impl AsyncRead for IoConnection {} +impl RequestSender for EitherConnection +where + A: AsyncRead + AsyncWrite + 'static, + B: AsyncRead + AsyncWrite + 'static, +{ + type Future = Box>; -impl io::Write for IoConnection { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.io.as_mut().unwrap().write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.io.as_mut().unwrap().flush() - } -} - -impl AsyncWrite for IoConnection { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.io.as_mut().unwrap().shutdown() + fn send_request( + self, + head: RequestHead, + body: RB, + ) -> Self::Future { + match self { + EitherConnection::A(con) => con.send_request(head, body), + EitherConnection::B(con) => con.send_request(head, body), + } } } diff --git a/src/client/connector.rs b/src/client/connector.rs index 05caf1ed2..b573181ba 100644 --- a/src/client/connector.rs +++ b/src/client/connector.rs @@ -1,23 +1,22 @@ use std::time::Duration; -use std::{fmt, io}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_connector::{Resolver, TcpConnector}; use actix_service::{Service, ServiceExt}; use actix_utils::timeout::{TimeoutError, TimeoutService}; -use futures::future::Either; -use futures::Poll; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use super::connect::Connect; -use super::connection::{Connection, IoConnection}; +use super::connection::RequestSender; use super::error::ConnectorError; -use super::pool::ConnectionPool; +use super::pool::{ConnectionPool, Protocol}; #[cfg(feature = "ssl")] use actix_connector::ssl::OpensslConnector; #[cfg(feature = "ssl")] use openssl::ssl::{SslConnector, SslMethod}; +#[cfg(feature = "ssl")] +const H2: &[u8] = b"h2"; #[cfg(not(feature = "ssl"))] type SslConnector = (); @@ -40,7 +39,12 @@ impl Default for Connector { let connector = { #[cfg(feature = "ssl")] { - SslConnector::builder(SslMethod::tls()).unwrap().build() + use log::error; + let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap(); + let _ = ssl + .set_alpn_protos(b"\x02h2\x08http/1.1") + .map_err(|e| error!("Can not set alpn protocol: {:?}", e)); + ssl.build() } #[cfg(not(feature = "ssl"))] { @@ -133,15 +137,17 @@ impl Connector { /// Finish configuration process and create connector service. pub fn service( self, - ) -> impl Service + Clone + ) -> impl Service + Clone { #[cfg(not(feature = "ssl"))] { let connector = TimeoutService::new( self.timeout, - self.resolver - .map_err(ConnectorError::from) - .and_then(TcpConnector::default().from_err()), + self.resolver.map_err(ConnectorError::from).and_then( + TcpConnector::default() + .from_err() + .map(|(msg, io)| (msg, io, Protocol::Http1)), + ), ) .map_err(|e| match e { TimeoutError::Service(e) => e, @@ -168,7 +174,20 @@ impl Connector { .and_then(TcpConnector::default().from_err()) .and_then( OpensslConnector::service(self.connector) - .map_err(ConnectorError::from), + .map_err(ConnectorError::from) + .map(|(msg, io)| { + let h2 = io + .get_ref() + .ssl() + .selected_alpn_protocol() + .map(|protos| protos.windows(2).any(|w| w == H2)) + .unwrap_or(false); + if h2 { + (msg, io, Protocol::Http2) + } else { + (msg, io, Protocol::Http1) + } + }), ), ) .map_err(|e| match e { @@ -178,9 +197,11 @@ impl Connector { let tcp_service = TimeoutService::new( self.timeout, - self.resolver - .map_err(ConnectorError::from) - .and_then(TcpConnector::default().from_err()), + self.resolver.map_err(ConnectorError::from).and_then( + TcpConnector::default() + .from_err() + .map(|(msg, io)| (msg, io, Protocol::Http1)), + ), ) .map_err(|e| match e { TimeoutError::Service(e) => e, @@ -209,13 +230,16 @@ impl Connector { #[cfg(not(feature = "ssl"))] mod connect_impl { + use futures::future::{err, Either, FutureResult}; + use futures::Poll; + use super::*; - use futures::future::{err, FutureResult}; + use crate::client::connection::IoConnection; pub(crate) struct InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { pub(crate) tcp_pool: ConnectionPool, } @@ -223,7 +247,8 @@ mod connect_impl { impl Clone for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service + Clone, + T: Service + + Clone, { fn clone(&self) -> Self { InnerConnector { @@ -235,7 +260,7 @@ mod connect_impl { impl Service for InnerConnector where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { type Response = IoConnection; type Error = ConnectorError; @@ -264,17 +289,26 @@ mod connect_impl { mod connect_impl { use std::marker::PhantomData; - use futures::future::{err, FutureResult}; + use futures::future::{err, Either, FutureResult}; use futures::{Async, Future, Poll}; use super::*; + use crate::client::connection::EitherConnection; pub(crate) struct InnerConnector where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service, - T2: Service, + T1: Service< + Connect, + Response = (Connect, Io1, Protocol), + Error = ConnectorError, + >, + T2: Service< + Connect, + Response = (Connect, Io2, Protocol), + Error = ConnectorError, + >, { pub(crate) tcp_pool: ConnectionPool, pub(crate) ssl_pool: ConnectionPool, @@ -284,8 +318,16 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service + Clone, - T2: Service + Clone, + T1: Service< + Connect, + Response = (Connect, Io1, Protocol), + Error = ConnectorError, + > + Clone, + T2: Service< + Connect, + Response = (Connect, Io2, Protocol), + Error = ConnectorError, + > + Clone, { fn clone(&self) -> Self { InnerConnector { @@ -299,10 +341,18 @@ mod connect_impl { where Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, - T1: Service, - T2: Service, + T1: Service< + Connect, + Response = (Connect, Io1, Protocol), + Error = ConnectorError, + >, + T2: Service< + Connect, + Response = (Connect, Io2, Protocol), + Error = ConnectorError, + >, { - type Response = IoEither, IoConnection>; + type Response = EitherConnection; type Error = ConnectorError; type Future = Either< FutureResult, @@ -336,7 +386,7 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseA where Io1: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { fut: as Service>::Future, _t: PhantomData, @@ -344,17 +394,17 @@ mod connect_impl { impl Future for InnerConnectorResponseA where - T: Service, + T: Service, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { - type Item = IoEither, IoConnection>; + type Item = EitherConnection; type Error = ConnectorError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), - Async::Ready(res) => Ok(Async::Ready(IoEither::A(res))), + Async::Ready(res) => Ok(Async::Ready(EitherConnection::A(res))), } } } @@ -362,7 +412,7 @@ mod connect_impl { pub(crate) struct InnerConnectorResponseB where Io2: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { fut: as Service>::Future, _t: PhantomData, @@ -370,129 +420,18 @@ mod connect_impl { impl Future for InnerConnectorResponseB where - T: Service, + T: Service, Io1: AsyncRead + AsyncWrite + 'static, Io2: AsyncRead + AsyncWrite + 'static, { - type Item = IoEither, IoConnection>; + type Item = EitherConnection; type Error = ConnectorError; fn poll(&mut self) -> Poll { match self.fut.poll()? { Async::NotReady => Ok(Async::NotReady), - Async::Ready(res) => Ok(Async::Ready(IoEither::B(res))), + Async::Ready(res) => Ok(Async::Ready(EitherConnection::B(res))), } } } } - -pub(crate) enum IoEither { - A(Io1), - B(Io2), -} - -impl Connection for IoEither -where - Io1: Connection, - Io2: Connection, -{ - fn close(&mut self) { - match self { - IoEither::A(ref mut io) => io.close(), - IoEither::B(ref mut io) => io.close(), - } - } - - fn release(&mut self) { - match self { - IoEither::A(ref mut io) => io.release(), - IoEither::B(ref mut io) => io.release(), - } - } -} - -impl io::Read for IoEither -where - Io1: Connection, - Io2: Connection, -{ - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match self { - IoEither::A(ref mut io) => io.read(buf), - IoEither::B(ref mut io) => io.read(buf), - } - } -} - -impl AsyncRead for IoEither -where - Io1: Connection, - Io2: Connection, -{ - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - match self { - IoEither::A(ref io) => io.prepare_uninitialized_buffer(buf), - IoEither::B(ref io) => io.prepare_uninitialized_buffer(buf), - } - } -} - -impl AsyncWrite for IoEither -where - Io1: Connection, - Io2: Connection, -{ - fn shutdown(&mut self) -> Poll<(), io::Error> { - match self { - IoEither::A(ref mut io) => io.shutdown(), - IoEither::B(ref mut io) => io.shutdown(), - } - } - - fn poll_write(&mut self, buf: &[u8]) -> Poll { - match self { - IoEither::A(ref mut io) => io.poll_write(buf), - IoEither::B(ref mut io) => io.poll_write(buf), - } - } - - fn poll_flush(&mut self) -> Poll<(), io::Error> { - match self { - IoEither::A(ref mut io) => io.poll_flush(), - IoEither::B(ref mut io) => io.poll_flush(), - } - } -} - -impl io::Write for IoEither -where - Io1: Connection, - Io2: Connection, -{ - fn flush(&mut self) -> io::Result<()> { - match self { - IoEither::A(ref mut io) => io.flush(), - IoEither::B(ref mut io) => io.flush(), - } - } - - fn write(&mut self, buf: &[u8]) -> io::Result { - match self { - IoEither::A(ref mut io) => io.write(buf), - IoEither::B(ref mut io) => io.write(buf), - } - } -} - -impl fmt::Debug for IoEither -where - Io1: fmt::Debug, - Io2: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - IoEither::A(ref io) => io.fmt(fmt), - IoEither::B(ref io) => io.fmt(fmt), - } - } -} diff --git a/src/client/error.rs b/src/client/error.rs index 2a5df9c97..e27a83d85 100644 --- a/src/client/error.rs +++ b/src/client/error.rs @@ -6,7 +6,8 @@ use trust_dns_resolver::error::ResolveError; #[cfg(feature = "ssl")] use openssl::ssl::{Error as SslError, HandshakeError}; -use crate::error::{Error, ParseError}; +use crate::error::{Error, ParseError, ResponseError}; +use crate::response::Response; /// A set of errors that can occur while connecting to an HTTP host #[derive(Debug, Display, From)] @@ -32,6 +33,10 @@ pub enum ConnectorError { #[display(fmt = "No dns records found for the input")] NoRecords, + /// Http2 error + #[display(fmt = "{}", _0)] + H2(h2::Error), + /// Connecting took too long #[display(fmt = "Timeout out while establishing connection")] Timeout, @@ -80,6 +85,23 @@ pub enum SendRequestError { Send(io::Error), /// Error parsing response Response(ParseError), + /// Http2 error + #[display(fmt = "{}", _0)] + H2(h2::Error), /// Error sending request body Body(Error), } + +/// Convert `SendRequestError` to a server `Response` +impl ResponseError for SendRequestError { + fn error_response(&self) -> Response { + match *self { + SendRequestError::Connector(ConnectorError::Timeout) => { + Response::GatewayTimeout() + } + SendRequestError::Connector(_) => Response::BadGateway(), + _ => Response::InternalServerError(), + } + .into() + } +} diff --git a/src/client/pipeline.rs b/src/client/h1proto.rs similarity index 67% rename from src/client/pipeline.rs rename to src/client/h1proto.rs index 8d946d644..ed3c66d62 100644 --- a/src/client/pipeline.rs +++ b/src/client/h1proto.rs @@ -1,38 +1,42 @@ -use std::collections::VecDeque; +use std::{io, time}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_service::Service; use bytes::Bytes; use futures::future::{err, ok, Either}; use futures::{Async, Future, Poll, Sink, Stream}; +use super::connection::{ConnectionLifetime, ConnectionType, IoConnection}; use super::error::{ConnectorError, SendRequestError}; +use super::pool::Acquired; use super::response::ClientResponse; -use super::{Connect, Connection}; use crate::body::{BodyLength, MessageBody, PayloadStream}; use crate::error::PayloadError; use crate::h1; use crate::message::RequestHead; -pub(crate) fn send_request( +pub(crate) fn send_request( + io: T, head: RequestHead, body: B, - connector: &mut T, + created: time::Instant, + pool: Option>, ) -> impl Future where - T: Service, + T: AsyncRead + AsyncWrite + 'static, B: MessageBody, - I: Connection, { + let io = H1Connection { + io: Some(io), + created: created, + pool: pool, + }; + let len = body.length(); - connector - // connect to the host - .call(Connect::new(head.uri.clone())) + // create Framed and send reqest + Framed::new(io, h1::ClientCodec::default()) + .send((head, len).into()) .from_err() - // create Framed and send reqest - .map(|io| Framed::new(io, h1::ClientCodec::default())) - .and_then(move |framed| framed.send((head, len).into()).from_err()) // send request body .and_then(move |framed| match body.length() { BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => { @@ -64,11 +68,70 @@ where }) } +#[doc(hidden)] +/// HTTP client connection +pub struct H1Connection { + io: Option, + created: time::Instant, + pool: Option>, +} + +impl ConnectionLifetime for H1Connection { + /// Close connection + fn close(&mut self) { + if let Some(mut pool) = self.pool.take() { + if let Some(io) = self.io.take() { + pool.close(IoConnection::new( + ConnectionType::H1(io), + self.created, + None, + )); + } + } + } + + /// Release this connection to the connection pool + fn release(&mut self) { + if let Some(mut pool) = self.pool.take() { + if let Some(io) = self.io.take() { + pool.release(IoConnection::new( + ConnectionType::H1(io), + self.created, + None, + )); + } + } + } +} + +impl io::Read for H1Connection { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.io.as_mut().unwrap().read(buf) + } +} + +impl AsyncRead for H1Connection {} + +impl io::Write for H1Connection { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.io.as_mut().unwrap().write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.io.as_mut().unwrap().flush() + } +} + +impl AsyncWrite for H1Connection { + fn shutdown(&mut self) -> Poll<(), io::Error> { + self.io.as_mut().unwrap().shutdown() + } +} + /// Future responsible for sending request body to the peer -struct SendBody { +pub(crate) struct SendBody { body: Option, framed: Option>, - write_buf: VecDeque>, flushed: bool, } @@ -77,11 +140,10 @@ where I: AsyncRead + AsyncWrite + 'static, B: MessageBody, { - fn new(body: B, framed: Framed) -> Self { + pub(crate) fn new(body: B, framed: Framed) -> Self { SendBody { body: Some(body), framed: Some(framed), - write_buf: VecDeque::new(), flushed: true, } } @@ -89,7 +151,7 @@ where impl Future for SendBody where - I: Connection, + I: ConnectionLifetime, B: MessageBody, { type Item = Framed; @@ -158,15 +220,15 @@ impl Payload<()> { } } -impl Payload { - fn stream(framed: Framed) -> PayloadStream { +impl Payload { + pub fn stream(framed: Framed) -> PayloadStream { Box::new(Payload { framed: Some(framed.map_codec(|codec| codec.into_payload_codec())), }) } } -impl Stream for Payload { +impl Stream for Payload { type Item = Bytes; type Error = PayloadError; @@ -190,7 +252,7 @@ impl Stream for Payload { fn release_connection(framed: Framed, force_close: bool) where - T: Connection, + T: ConnectionLifetime, { let mut parts = framed.into_parts(); if !force_close && parts.read_buf.is_empty() && parts.write_buf.is_empty() { diff --git a/src/client/h2proto.rs b/src/client/h2proto.rs new file mode 100644 index 000000000..fe42b909c --- /dev/null +++ b/src/client/h2proto.rs @@ -0,0 +1,151 @@ +use std::cell::RefCell; +use std::time; + +use actix_codec::{AsyncRead, AsyncWrite}; +use bytes::Bytes; +use futures::future::{err, Either}; +use futures::{Async, Future, Poll, Stream}; +use h2::{client::SendRequest, SendStream}; +use http::{request::Request, Version}; + +use super::connection::{ConnectionType, IoConnection}; +use super::error::SendRequestError; +use super::pool::Acquired; +use super::response::ClientResponse; +use crate::body::{BodyLength, MessageBody}; +use crate::message::{RequestHead, ResponseHead}; + +pub(crate) fn send_request( + io: SendRequest, + head: RequestHead, + body: B, + created: time::Instant, + pool: Option>, +) -> impl Future +where + T: AsyncRead + AsyncWrite + 'static, + B: MessageBody, +{ + trace!("Sending client request: {:?} {:?}", head, body.length()); + let eof = match body.length() { + BodyLength::None | BodyLength::Empty | BodyLength::Sized(0) => true, + _ => false, + }; + + io.ready() + .map_err(SendRequestError::from) + .and_then(move |mut io| { + let mut req = Request::new(()); + *req.uri_mut() = head.uri; + *req.method_mut() = head.method; + *req.headers_mut() = head.headers; + *req.version_mut() = Version::HTTP_2; + + match io.send_request(req, eof) { + Ok((resp, send)) => { + release(io, pool, created, false); + + if !eof { + Either::A(Either::B( + SendBody { + body, + send, + buf: None, + } + .and_then(move |_| resp.map_err(SendRequestError::from)), + )) + } else { + Either::B(resp.map_err(SendRequestError::from)) + } + } + Err(e) => { + release(io, pool, created, e.is_io()); + Either::A(Either::A(err(e.into()))) + } + } + }) + .and_then(|resp| { + let (parts, body) = resp.into_parts(); + + let mut head = ResponseHead::default(); + head.version = parts.version; + head.status = parts.status; + head.headers = parts.headers; + + Ok(ClientResponse { + head, + payload: RefCell::new(Some(Box::new(body.from_err()))), + }) + }) + .from_err() +} + +struct SendBody { + body: B, + send: SendStream, + buf: Option, +} + +impl Future for SendBody { + type Item = (); + type Error = SendRequestError; + + fn poll(&mut self) -> Poll { + if self.buf.is_none() { + match self.body.poll_next() { + Ok(Async::Ready(Some(buf))) => { + self.send.reserve_capacity(buf.len()); + self.buf = Some(buf); + } + Ok(Async::Ready(None)) => { + if let Err(e) = self.send.send_data(Bytes::new(), true) { + return Err(e.into()); + } + self.send.reserve_capacity(0); + return Ok(Async::Ready(())); + } + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(e) => return Err(e.into()), + } + } + + loop { + match self.send.poll_capacity() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), + Ok(Async::Ready(Some(cap))) => { + let mut buf = self.buf.take().unwrap(); + let len = buf.len(); + let bytes = buf.split_to(std::cmp::min(cap, len)); + + if let Err(e) = self.send.send_data(bytes, false) { + return Err(e.into()); + } else { + if !buf.is_empty() { + self.send.reserve_capacity(buf.len()); + self.buf = Some(buf); + } + return self.poll(); + } + } + Err(e) => return Err(e.into()), + } + } + } +} + +// release SendRequest object +fn release( + io: SendRequest, + pool: Option>, + created: time::Instant, + close: bool, +) { + if let Some(mut pool) = pool { + if close { + pool.close(IoConnection::new(ConnectionType::H2(io), created, None)); + } else { + pool.release(IoConnection::new(ConnectionType::H2(io), created, None)); + } + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 76c3f8b88..c6498f371 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -3,13 +3,14 @@ mod connect; mod connection; mod connector; mod error; -mod pipeline; +mod h1proto; +mod h2proto; mod pool; mod request; mod response; pub use self::connect::Connect; -pub use self::connection::Connection; +pub use self::connection::RequestSender; pub use self::connector::Connector; pub use self::error::{ConnectorError, InvalidUrlKind, SendRequestError}; pub use self::request::{ClientRequest, ClientRequestBuilder}; diff --git a/src/client/pool.rs b/src/client/pool.rs index b577587d8..089c2627a 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -6,10 +6,12 @@ use std::time::{Duration, Instant}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::Service; +use bytes::Bytes; use futures::future::{ok, Either, FutureResult}; use futures::task::AtomicTask; use futures::unsync::oneshot; use futures::{Async, Future, Poll}; +use h2::client::{handshake, Handshake}; use hashbrown::HashMap; use http::uri::Authority; use indexmap::IndexSet; @@ -17,9 +19,15 @@ use slab::Slab; use tokio_timer::{sleep, Delay}; use super::connect::Connect; -use super::connection::IoConnection; +use super::connection::{ConnectionType, IoConnection}; use super::error::ConnectorError; +#[derive(Clone, Copy, PartialEq)] +pub enum Protocol { + Http1, + Http2, +} + #[derive(Hash, Eq, PartialEq, Clone, Debug)] pub(crate) struct Key { authority: Authority, @@ -31,13 +39,6 @@ impl From for Key { } } -#[derive(Debug)] -struct AvailableConnection { - io: T, - used: Instant, - created: Instant, -} - /// Connections pool pub(crate) struct ConnectionPool( T, @@ -47,7 +48,7 @@ pub(crate) struct ConnectionPool( impl ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { pub(crate) fn new( connector: T, @@ -86,7 +87,7 @@ where impl Service for ConnectionPool where Io: AsyncRead + AsyncWrite + 'static, - T: Service, + T: Service, { type Response = IoConnection; type Error = ConnectorError; @@ -109,7 +110,7 @@ where Either::A(ok(IoConnection::new( io, created, - Acquired(key, Some(self.1.clone())), + Some(Acquired(key, Some(self.1.clone()))), ))) } Acquire::NotAvailable => { @@ -190,12 +191,13 @@ where { fut: F, key: Key, + h2: Option>, inner: Option>>>, } impl OpenConnection where - F: Future, + F: Future, Io: AsyncRead + AsyncWrite + 'static, { fn new(key: Key, inner: Rc>>, fut: F) -> Self { @@ -203,6 +205,7 @@ where key, fut, inner: Some(inner), + h2: None, } } } @@ -222,110 +225,165 @@ where impl Future for OpenConnection where - F: Future, + F: Future, Io: AsyncRead + AsyncWrite, { type Item = IoConnection; type Error = ConnectorError; fn poll(&mut self) -> Poll { + if let Some(ref mut h2) = self.h2 { + return match h2.poll() { + Ok(Async::Ready((snd, connection))) => { + tokio_current_thread::spawn(connection.map_err(|_| ())); + Ok(Async::Ready(IoConnection::new( + ConnectionType::H2(snd), + Instant::now(), + Some(Acquired(self.key.clone(), self.inner.clone())), + ))) + } + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(e) => Err(e.into()), + }; + } + match self.fut.poll() { Err(err) => Err(err.into()), - Ok(Async::Ready((_, io))) => { + Ok(Async::Ready((_, io, proto))) => { let _ = self.inner.take(); - Ok(Async::Ready(IoConnection::new( - io, - Instant::now(), - Acquired(self.key.clone(), self.inner.clone()), - ))) - } - Ok(Async::NotReady) => Ok(Async::NotReady), - } - } -} - -struct OpenWaitingConnection -where - Io: AsyncRead + AsyncWrite + 'static, -{ - fut: F, - key: Key, - rx: Option, ConnectorError>>>, - inner: Option>>>, -} - -impl OpenWaitingConnection -where - F: Future + 'static, - Io: AsyncRead + AsyncWrite + 'static, -{ - fn spawn( - key: Key, - rx: oneshot::Sender, ConnectorError>>, - inner: Rc>>, - fut: F, - ) { - tokio_current_thread::spawn(OpenWaitingConnection { - key, - fut, - rx: Some(rx), - inner: Some(inner), - }) - } -} - -impl Drop for OpenWaitingConnection -where - Io: AsyncRead + AsyncWrite + 'static, -{ - fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - let mut inner = inner.as_ref().borrow_mut(); - inner.release(); - inner.check_availibility(); - } - } -} - -impl Future for OpenWaitingConnection -where - F: Future, - Io: AsyncRead + AsyncWrite, -{ - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - match self.fut.poll() { - Err(err) => { - let _ = self.inner.take(); - if let Some(rx) = self.rx.take() { - let _ = rx.send(Err(err)); - } - Err(()) - } - Ok(Async::Ready((_, io))) => { - let _ = self.inner.take(); - if let Some(rx) = self.rx.take() { - let _ = rx.send(Ok(IoConnection::new( - io, + if proto == Protocol::Http1 { + Ok(Async::Ready(IoConnection::new( + ConnectionType::H1(io), Instant::now(), - Acquired(self.key.clone(), self.inner.clone()), - ))); + Some(Acquired(self.key.clone(), self.inner.clone())), + ))) + } else { + self.h2 = Some(handshake(io)); + return self.poll(); } - Ok(Async::Ready(())) } Ok(Async::NotReady) => Ok(Async::NotReady), } } } +// struct OpenWaitingConnection +// where +// Io: AsyncRead + AsyncWrite + 'static, +// { +// fut: F, +// key: Key, +// h2: Option>, +// rx: Option, ConnectorError>>>, +// inner: Option>>>, +// } + +// impl OpenWaitingConnection +// where +// F: Future + 'static, +// Io: AsyncRead + AsyncWrite + 'static, +// { +// fn spawn( +// key: Key, +// rx: oneshot::Sender, ConnectorError>>, +// inner: Rc>>, +// fut: F, +// ) { +// tokio_current_thread::spawn(OpenWaitingConnection { +// key, +// fut, +// h2: None, +// rx: Some(rx), +// inner: Some(inner), +// }) +// } +// } + +// impl Drop for OpenWaitingConnection +// where +// Io: AsyncRead + AsyncWrite + 'static, +// { +// fn drop(&mut self) { +// if let Some(inner) = self.inner.take() { +// let mut inner = inner.as_ref().borrow_mut(); +// inner.release(); +// inner.check_availibility(); +// } +// } +// } + +// impl Future for OpenWaitingConnection +// where +// F: Future, +// Io: AsyncRead + AsyncWrite, +// { +// type Item = (); +// type Error = (); + +// fn poll(&mut self) -> Poll { +// if let Some(ref mut h2) = self.h2 { +// return match h2.poll() { +// Ok(Async::Ready((snd, connection))) => { +// tokio_current_thread::spawn(connection.map_err(|_| ())); +// let _ = self.rx.take().unwrap().send(Ok(IoConnection::new( +// ConnectionType::H2(snd), +// Instant::now(), +// Some(Acquired(self.key.clone(), self.inner.clone())), +// ))); +// Ok(Async::Ready(())) +// } +// Ok(Async::NotReady) => Ok(Async::NotReady), +// Err(e) => { +// let _ = self.inner.take(); +// if let Some(rx) = self.rx.take() { +// let _ = rx.send(Err(e.into())); +// } + +// Err(()) +// } +// }; +// } + +// match self.fut.poll() { +// Err(err) => { +// let _ = self.inner.take(); +// if let Some(rx) = self.rx.take() { +// let _ = rx.send(Err(err)); +// } +// Err(()) +// } +// Ok(Async::Ready((_, io, proto))) => { +// let _ = self.inner.take(); +// if proto == Protocol::Http1 { +// let _ = self.rx.take().unwrap().send(Ok(IoConnection::new( +// ConnectionType::H1(io), +// Instant::now(), +// Some(Acquired(self.key.clone(), self.inner.clone())), +// ))); +// } else { +// self.h2 = Some(handshake(io)); +// return self.poll(); +// } +// Ok(Async::Ready(())) +// } +// Ok(Async::NotReady) => Ok(Async::NotReady), +// } +// } +// } + enum Acquire { - Acquired(T, Instant), + Acquired(ConnectionType, Instant), Available, NotAvailable, } +// #[derive(Debug)] +struct AvailableConnection { + io: ConnectionType, + used: Instant, + created: Instant, +} + pub(crate) struct Inner { conn_lifetime: Duration, conn_keep_alive: Duration, @@ -355,7 +413,7 @@ impl Inner { self.waiters_queue.remove(&(key.clone(), token)); } - fn release_conn(&mut self, key: &Key, io: Io, created: Instant) { + fn release_conn(&mut self, key: &Key, io: ConnectionType, created: Instant) { self.acquired -= 1; self.available .entry(key.clone()) @@ -408,24 +466,30 @@ where || (now - conn.created) > self.conn_lifetime { if let Some(timeout) = self.disconnect_timeout { - tokio_current_thread::spawn(CloseConnection::new( - conn.io, timeout, - )) + if let ConnectionType::H1(io) = conn.io { + tokio_current_thread::spawn(CloseConnection::new( + io, timeout, + )) + } } } else { let mut io = conn.io; let mut buf = [0; 2]; - match io.read(&mut buf) { - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (), - Ok(n) if n > 0 => { - if let Some(timeout) = self.disconnect_timeout { - tokio_current_thread::spawn(CloseConnection::new( - io, timeout, - )) + if let ConnectionType::H1(ref mut s) = io { + match s.read(&mut buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => (), + Ok(n) if n > 0 => { + if let Some(timeout) = self.disconnect_timeout { + if let ConnectionType::H1(io) = io { + tokio_current_thread::spawn( + CloseConnection::new(io, timeout), + ) + } + } + continue; } - continue; + Ok(_) | Err(_) => continue, } - Ok(_) | Err(_) => continue, } return Acquire::Acquired(io, conn.created); } @@ -434,10 +498,12 @@ where Acquire::Available } - fn release_close(&mut self, io: Io) { + fn release_close(&mut self, io: ConnectionType) { self.acquired -= 1; if let Some(timeout) = self.disconnect_timeout { - tokio_current_thread::spawn(CloseConnection::new(io, timeout)) + if let ConnectionType::H1(io) = io { + tokio_current_thread::spawn(CloseConnection::new(io, timeout)) + } } } @@ -448,65 +514,65 @@ where } } -struct ConnectorPoolSupport -where - Io: AsyncRead + AsyncWrite + 'static, -{ - connector: T, - inner: Rc>>, -} +// struct ConnectorPoolSupport +// where +// Io: AsyncRead + AsyncWrite + 'static, +// { +// connector: T, +// inner: Rc>>, +// } -impl Future for ConnectorPoolSupport -where - Io: AsyncRead + AsyncWrite + 'static, - T: Service, - T::Future: 'static, -{ - type Item = (); - type Error = (); +// impl Future for ConnectorPoolSupport +// where +// Io: AsyncRead + AsyncWrite + 'static, +// T: Service, +// T::Future: 'static, +// { +// type Item = (); +// type Error = (); - fn poll(&mut self) -> Poll { - let mut inner = self.inner.as_ref().borrow_mut(); - inner.task.register(); +// fn poll(&mut self) -> Poll { +// let mut inner = self.inner.as_ref().borrow_mut(); +// inner.task.register(); - // check waiters - loop { - let (key, token) = { - if let Some((key, token)) = inner.waiters_queue.get_index(0) { - (key.clone(), *token) - } else { - break; - } - }; - match inner.acquire(&key) { - Acquire::NotAvailable => break, - Acquire::Acquired(io, created) => { - let (_, tx) = inner.waiters.remove(token); - if let Err(conn) = tx.send(Ok(IoConnection::new( - io, - created, - Acquired(key.clone(), Some(self.inner.clone())), - ))) { - let (io, created) = conn.unwrap().into_inner(); - inner.release_conn(&key, io, created); - } - } - Acquire::Available => { - let (connect, tx) = inner.waiters.remove(token); - OpenWaitingConnection::spawn( - key.clone(), - tx, - self.inner.clone(), - self.connector.call(connect), - ); - } - } - let _ = inner.waiters_queue.swap_remove_index(0); - } +// // check waiters +// loop { +// let (key, token) = { +// if let Some((key, token)) = inner.waiters_queue.get_index(0) { +// (key.clone(), *token) +// } else { +// break; +// } +// }; +// match inner.acquire(&key) { +// Acquire::NotAvailable => break, +// Acquire::Acquired(io, created) => { +// let (_, tx) = inner.waiters.remove(token); +// if let Err(conn) = tx.send(Ok(IoConnection::new( +// io, +// created, +// Some(Acquired(key.clone(), Some(self.inner.clone()))), +// ))) { +// let (io, created) = conn.unwrap().into_inner(); +// inner.release_conn(&key, io, created); +// } +// } +// Acquire::Available => { +// let (connect, tx) = inner.waiters.remove(token); +// OpenWaitingConnection::spawn( +// key.clone(), +// tx, +// self.inner.clone(), +// self.connector.call(connect), +// ); +// } +// } +// let _ = inner.waiters_queue.swap_remove_index(0); +// } - Ok(Async::NotReady) - } -} +// Ok(Async::NotReady) +// } +// } struct CloseConnection { io: T, diff --git a/src/client/request.rs b/src/client/request.rs index fbb1e840b..a2233a2f1 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -17,8 +17,9 @@ use crate::http::{ }; use crate::message::{ConnectionType, Head, RequestHead}; +use super::connection::RequestSender; use super::response::ClientResponse; -use super::{pipeline, Connect, Connection, ConnectorError, SendRequestError}; +use super::{Connect, ConnectorError, SendRequestError}; /// An HTTP Client Request /// @@ -37,7 +38,6 @@ use super::{pipeline, Connect, Connection, ConnectorError, SendRequestError}; /// .map_err(|_| ()) /// .and_then(|response| { // <- server http response /// println!("Response: {:?}", response); -/// # actix_rt::System::current().stop(); /// Ok(()) /// }) /// })); @@ -175,10 +175,18 @@ where connector: &mut T, ) -> impl Future where + B: 'static, T: Service, - I: Connection, + I: RequestSender, { - pipeline::send_request(self.head, self.body, connector) + let Self { head, body } = self; + + connector + // connect to the host + .call(Connect::new(head.uri.clone())) + .from_err() + // send request + .and_then(move |connection| connection.send_request(head, body)) } } @@ -273,7 +281,6 @@ impl ClientRequestBuilder { /// .unwrap(); /// } /// ``` - #[doc(hidden)] pub fn set(&mut self, hdr: H) -> &mut Self { if let Some(parts) = parts(&mut self.head, &self.err) { match hdr.try_into() { diff --git a/src/client/response.rs b/src/client/response.rs index 6bfdfc321..005c0875b 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -10,7 +10,7 @@ use crate::error::PayloadError; use crate::httpmessage::HttpMessage; use crate::message::{Head, ResponseHead}; -use super::pipeline::Payload; +use super::h1proto::Payload; /// Client Response pub struct ClientResponse { diff --git a/src/error.rs b/src/error.rs index 8af422fc8..5470534f7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -327,7 +327,7 @@ impl From for ParseError { } } -#[derive(Display, Debug)] +#[derive(Display, Debug, From)] /// A set of errors that can occur during payload parsing pub enum PayloadError { /// A payload reached EOF, but is not complete. @@ -342,6 +342,9 @@ pub enum PayloadError { /// A payload length is unknown. #[display(fmt = "A payload length is unknown.")] UnknownLength, + /// Http2 payload error + #[display(fmt = "{}", _0)] + H2Payload(h2::Error), } impl From for PayloadError { diff --git a/src/extensions.rs b/src/extensions.rs index 7bb965c96..f7805641b 100644 --- a/src/extensions.rs +++ b/src/extensions.rs @@ -1,6 +1,5 @@ use std::any::{Any, TypeId}; use std::fmt; -use std::hash::{BuildHasherDefault, Hasher}; use hashbrown::HashMap; diff --git a/src/lib.rs b/src/lib.rs index 5adc9236e..0dbaee6aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,6 +59,9 @@ //! * `session` - enables session support, includes `ring` crate as //! dependency //! +#[macro_use] +extern crate log; + pub mod body; pub mod client; mod config; diff --git a/src/response.rs b/src/response.rs index 49f2b63fb..5e1c0d076 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] //! Http response use std::cell::RefCell; use std::collections::VecDeque; diff --git a/test-server/src/lib.rs b/test-server/src/lib.rs index 36c0d7d50..cd74d456b 100644 --- a/test-server/src/lib.rs +++ b/test-server/src/lib.rs @@ -13,8 +13,8 @@ use net2::TcpBuilder; use actix_http::body::MessageBody; use actix_http::client::{ - ClientRequest, ClientRequestBuilder, ClientResponse, Connect, Connection, Connector, - ConnectorError, SendRequestError, + ClientRequest, ClientRequestBuilder, ClientResponse, Connect, Connector, + ConnectorError, RequestSender, SendRequestError, }; use actix_http::ws; @@ -57,7 +57,7 @@ impl TestServer { pub fn with_factory( factory: F, ) -> TestServerRuntime< - impl Service + Clone, + impl Service + Clone, > { let (tx, rx) = mpsc::channel(); @@ -89,7 +89,7 @@ impl TestServer { } fn new_connector( - ) -> impl Service + Clone + ) -> impl Service + Clone { #[cfg(feature = "ssl")] { @@ -192,7 +192,7 @@ impl TestServerRuntime { impl TestServerRuntime where T: Service + Clone, - T::Response: Connection, + T::Response: RequestSender, { /// Connect to websocket server at a given path pub fn ws_at( @@ -212,7 +212,7 @@ where } /// Send request and read response message - pub fn send_request( + pub fn send_request( &mut self, req: ClientRequest, ) -> Result {