diff --git a/Cargo.toml b/Cargo.toml index ab812d1b0..0b5c4f3d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ members = [ # "actix-web-codegen", # "test-server", ] -exclude = ["actix-http"] +exclude = ["awc", "actix-http", "test-server"] [features] default = ["brotli", "flate2-zlib", "client", "fail"] diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 7b86bfb14..44e76ae0e 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -253,7 +253,7 @@ where impl From> for Body where S: Stream> + Unpin + 'static, - E: Into + Unpin + 'static, + E: Into + 'static, { fn from(s: BodyStream) -> Body { Body::from_message(s) @@ -368,10 +368,17 @@ where } } +impl Unpin for BodyStream +where + S: Stream> + Unpin, + E: Into, +{ +} + impl MessageBody for BodyStream where S: Stream> + Unpin, - E: Into + Unpin, + E: Into, { fn size(&self) -> BodySize { BodySize::Stream diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 70ffff6c0..0901fdb2d 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -22,7 +22,7 @@ pub(crate) enum ConnectionType { } pub trait Connection { - type Io: AsyncRead + AsyncWrite; + type Io: AsyncRead + AsyncWrite + Unpin; type Future: Future>; fn protocol(&self) -> Protocol; diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 4b0e612b8..ea181237e 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "awc" -version = "0.2.8" +version = "0.3.0-alpha.1" authors = ["Nikolay Kim "] description = "Actix http client." readme = "README.md" @@ -14,23 +14,23 @@ categories = ["network-programming", "asynchronous", license = "MIT/Apache-2.0" exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] edition = "2018" -workspace = ".." +# workspace = ".." [lib] name = "awc" path = "src/lib.rs" [package.metadata.docs.rs] -features = ["ssl", "brotli", "flate2-zlib"] +features = ["openssl", "brotli", "flate2-zlib"] [features] default = ["brotli", "flate2-zlib"] # openssl -ssl = ["openssl", "actix-http/ssl"] +openssl = ["open-ssl", "actix-http/openssl"] # rustls -rust-tls = ["rustls", "actix-http/rust-tls"] +rustls = ["rust-tls", "actix-http/rustls"] # brotli encoding, requires c compiler brotli = ["actix-http/brotli"] @@ -42,13 +42,14 @@ flate2-zlib = ["actix-http/flate2-zlib"] flate2-rust = ["actix-http/flate2-rust"] [dependencies] -actix-codec = "0.1.2" -actix-service = "0.4.1" -actix-http = "0.2.11" +actix-codec = "0.2.0-alpha.1" +actix-service = "1.0.0-alpha.1" +actix-http = "0.3.0-alpha.1" + base64 = "0.10.1" bytes = "0.4" derive_more = "0.15.0" -futures = "0.1.25" +futures = "0.3.1" log =" 0.4" mime = "0.3" percent-encoding = "2.1" @@ -56,21 +57,33 @@ rand = "0.7" serde = "1.0" serde_json = "1.0" serde_urlencoded = "0.6.1" -tokio-timer = "0.2.8" -openssl = { version="0.10", optional = true } -rustls = { version = "0.15.2", optional = true } +tokio-timer = "0.3.0-alpha.6" +open-ssl = { version="0.10", package="openssl", optional = true } +rust-tls = { version = "0.16.0", package="rustls", optional = true } [dev-dependencies] -actix-rt = "0.2.2" -actix-web = { version = "1.0.8", features=["ssl"] } -actix-http = { version = "0.2.11", features=["ssl"] } -actix-http-test = { version = "0.2.0", features=["ssl"] } -actix-utils = "0.4.1" -actix-server = { version = "0.6.0", features=["ssl", "rust-tls"] } +actix-rt = "1.0.0-alpha.1" +#actix-web = { version = "1.0.8", features=["ssl"] } +actix-http = { version = "0.3.0-alpha.1", features=["openssl"] } +#actix-http-test = { version = "0.2.0", features=["ssl"] } +actix-utils = "0.5.0-alpha.1" +actix-server = { version = "0.8.0-alpha.1", features=["openssl", "rustls"] } brotli2 = { version="0.3.2" } flate2 = { version="1.0.2" } env_logger = "0.6" rand = "0.7" tokio-tcp = "0.1" -webpki = "0.19" -rustls = { version = "0.15.2", features = ["dangerous_configuration"] } +webpki = { version = "0.21" } +rus-tls = { version = "0.16.0", package="rustls", features = ["dangerous_configuration"] } + +[patch.crates-io] +actix-http = { path = "../actix-http" } + +actix-codec = { path = "../../actix-net/actix-codec" } +actix-connect = { path = "../../actix-net/actix-connect" } +actix-rt = { path = "../../actix-net/actix-rt" } +actix-server = { path = "../../actix-net/actix-server" } +actix-server-config = { path = "../../actix-net/actix-server-config" } +actix-service = { path = "../../actix-net/actix-service" } +actix-threadpool = { path = "../../actix-net/actix-threadpool" } +actix-utils = { path = "../../actix-net/actix-utils" } diff --git a/awc/src/connect.rs b/awc/src/connect.rs index 97864d300..cc92fdbb6 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -1,4 +1,6 @@ +use std::pin::Pin; use std::rc::Rc; +use std::task::{Context, Poll}; use std::{fmt, io, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; @@ -10,7 +12,7 @@ use actix_http::h1::ClientCodec; use actix_http::http::HeaderMap; use actix_http::{RequestHead, RequestHeadType, ResponseHead}; use actix_service::Service; -use futures::{Future, Poll}; +use futures::future::{FutureExt, LocalBoxFuture}; use crate::response::ClientResponse; @@ -22,7 +24,7 @@ pub(crate) trait Connect { head: RequestHead, body: Body, addr: Option, - ) -> Box>; + ) -> LocalBoxFuture<'static, Result>; fn send_request_extra( &mut self, @@ -30,18 +32,16 @@ pub(crate) trait Connect { extra_headers: Option, body: Body, addr: Option, - ) -> Box>; + ) -> LocalBoxFuture<'static, Result>; /// Send request, returns Response and Framed fn open_tunnel( &mut self, head: RequestHead, addr: Option, - ) -> Box< - dyn Future< - Item = (ResponseHead, Framed), - Error = SendRequestError, - >, + ) -> LocalBoxFuture< + 'static, + Result<(ResponseHead, Framed), SendRequestError>, >; /// Send request and extra headers, returns Response and Framed @@ -50,11 +50,9 @@ pub(crate) trait Connect { head: Rc, extra_headers: Option, addr: Option, - ) -> Box< - dyn Future< - Item = (ResponseHead, Framed), - Error = SendRequestError, - >, + ) -> LocalBoxFuture< + 'static, + Result<(ResponseHead, Framed), SendRequestError>, >; } @@ -72,21 +70,23 @@ where head: RequestHead, body: Body, addr: Option, - ) -> Box> { - Box::new( - self.0 - // connect to the host - .call(ClientConnect { - uri: head.uri.clone(), - addr, - }) - .from_err() - // send request - .and_then(move |connection| { - connection.send_request(RequestHeadType::from(head), body) - }) - .map(|(head, payload)| ClientResponse::new(head, payload)), - ) + ) -> LocalBoxFuture<'static, Result> { + // connect to the host + let fut = self.0.call(ClientConnect { + uri: head.uri.clone(), + addr, + }); + + async move { + let connection = fut.await?; + + // send request + connection + .send_request(RequestHeadType::from(head), body) + .await + .map(|(head, payload)| ClientResponse::new(head, payload)) + } + .boxed_local() } fn send_request_extra( @@ -95,51 +95,51 @@ where extra_headers: Option, body: Body, addr: Option, - ) -> Box> { - Box::new( - self.0 - // connect to the host - .call(ClientConnect { - uri: head.uri.clone(), - addr, - }) - .from_err() - // send request - .and_then(move |connection| { - connection - .send_request(RequestHeadType::Rc(head, extra_headers), body) - }) - .map(|(head, payload)| ClientResponse::new(head, payload)), - ) + ) -> LocalBoxFuture<'static, Result> { + // connect to the host + let fut = self.0.call(ClientConnect { + uri: head.uri.clone(), + addr, + }); + + async move { + let connection = fut.await?; + + // send request + let (head, payload) = connection + .send_request(RequestHeadType::Rc(head, extra_headers), body) + .await?; + + Ok(ClientResponse::new(head, payload)) + } + .boxed_local() } fn open_tunnel( &mut self, head: RequestHead, addr: Option, - ) -> Box< - dyn Future< - Item = (ResponseHead, Framed), - Error = SendRequestError, - >, + ) -> LocalBoxFuture< + 'static, + Result<(ResponseHead, Framed), SendRequestError>, > { - Box::new( - self.0 - // connect to the host - .call(ClientConnect { - uri: head.uri.clone(), - addr, - }) - .from_err() - // send request - .and_then(move |connection| { - connection.open_tunnel(RequestHeadType::from(head)) - }) - .map(|(head, framed)| { - let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); - (head, framed) - }), - ) + // connect to the host + let fut = self.0.call(ClientConnect { + uri: head.uri.clone(), + addr, + }); + + async move { + let connection = fut.await?; + + // send request + let (head, framed) = + connection.open_tunnel(RequestHeadType::from(head)).await?; + + let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); + Ok((head, framed)) + } + .boxed_local() } fn open_tunnel_extra( @@ -147,48 +147,47 @@ where head: Rc, extra_headers: Option, addr: Option, - ) -> Box< - dyn Future< - Item = (ResponseHead, Framed), - Error = SendRequestError, - >, + ) -> LocalBoxFuture< + 'static, + Result<(ResponseHead, Framed), SendRequestError>, > { - Box::new( - self.0 - // connect to the host - .call(ClientConnect { - uri: head.uri.clone(), - addr, - }) - .from_err() - // send request - .and_then(move |connection| { - connection.open_tunnel(RequestHeadType::Rc(head, extra_headers)) - }) - .map(|(head, framed)| { - let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); - (head, framed) - }), - ) + // connect to the host + let fut = self.0.call(ClientConnect { + uri: head.uri.clone(), + addr, + }); + + async move { + let connection = fut.await?; + + // send request + let (head, framed) = connection + .open_tunnel(RequestHeadType::Rc(head, extra_headers)) + .await?; + + let framed = framed.map_io(|io| BoxedSocket(Box::new(Socket(io)))); + Ok((head, framed)) + } + .boxed_local() } } trait AsyncSocket { - fn as_read(&self) -> &dyn AsyncRead; - fn as_read_mut(&mut self) -> &mut dyn AsyncRead; - fn as_write(&mut self) -> &mut dyn AsyncWrite; + fn as_read(&self) -> &(dyn AsyncRead + Unpin); + fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin); + fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin); } -struct Socket(T); +struct Socket(T); -impl AsyncSocket for Socket { - fn as_read(&self) -> &dyn AsyncRead { +impl AsyncSocket for Socket { + fn as_read(&self) -> &(dyn AsyncRead + Unpin) { &self.0 } - fn as_read_mut(&mut self) -> &mut dyn AsyncRead { + fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin) { &mut self.0 } - fn as_write(&mut self) -> &mut dyn AsyncWrite { + fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin) { &mut self.0 } } @@ -201,30 +200,37 @@ impl fmt::Debug for BoxedSocket { } } -impl io::Read for BoxedSocket { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0.as_read_mut().read(buf) - } -} - impl AsyncRead for BoxedSocket { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.0.as_read().prepare_uninitialized_buffer(buf) } -} -impl io::Write for BoxedSocket { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.as_write().write(buf) - } - - fn flush(&mut self) -> io::Result<()> { - self.0.as_write().flush() + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(self.get_mut().0.as_read_mut()).poll_read(cx, buf) } } impl AsyncWrite for BoxedSocket { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.0.as_write().shutdown() + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(self.get_mut().0.as_write()).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(self.get_mut().0.as_write()).poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(self.get_mut().0.as_write()).poll_shutdown(cx) } } diff --git a/awc/src/frozen.rs b/awc/src/frozen.rs index d9f65d431..61ba87aad 100644 --- a/awc/src/frozen.rs +++ b/awc/src/frozen.rs @@ -82,7 +82,7 @@ impl FrozenClientRequest { /// Send a streaming body. pub fn send_stream(&self, stream: S) -> SendClientRequest where - S: Stream + 'static, + S: Stream> + Unpin + 'static, E: Into + 'static, { RequestSender::Rc(self.head.clone(), None).send_stream( @@ -203,7 +203,7 @@ impl FrozenSendBuilder { /// Complete request construction and send a streaming body. pub fn send_stream(self, stream: S) -> SendClientRequest where - S: Stream + 'static, + S: Stream> + Unpin + 'static, E: Into + 'static, { if let Some(e) = self.err { diff --git a/awc/src/request.rs b/awc/src/request.rs index 6ff68ae66..831234437 100644 --- a/awc/src/request.rs +++ b/awc/src/request.rs @@ -478,7 +478,7 @@ impl ClientRequest { /// Set an streaming body and generate `ClientRequest`. pub fn send_stream(self, stream: S) -> SendClientRequest where - S: Stream + 'static, + S: Stream> + Unpin + 'static, E: Into + 'static, { let slf = match self.prep_for_sending() { diff --git a/awc/src/response.rs b/awc/src/response.rs index d186526de..a7b08eedc 100644 --- a/awc/src/response.rs +++ b/awc/src/response.rs @@ -1,9 +1,11 @@ use std::cell::{Ref, RefMut}; use std::fmt; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use bytes::{Bytes, BytesMut}; -use futures::{Async, Future, Poll, Stream}; +use futures::{ready, Future, Stream}; use actix_http::cookie::Cookie; use actix_http::error::{CookieParseError, PayloadError}; @@ -104,7 +106,7 @@ impl ClientResponse { impl ClientResponse where - S: Stream, + S: Stream>, { /// Loads http response's body. pub fn body(&mut self) -> MessageBody { @@ -125,13 +127,12 @@ where impl Stream for ClientResponse where - S: Stream, + S: Stream> + Unpin, { - type Item = Bytes; - type Error = PayloadError; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.payload.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.get_mut().payload).poll_next(cx) } } @@ -155,7 +156,7 @@ pub struct MessageBody { impl MessageBody where - S: Stream, + S: Stream>, { /// Create `MessageBody` for request. pub fn new(res: &mut ClientResponse) -> MessageBody { @@ -198,23 +199,24 @@ where impl Future for MessageBody where - S: Stream, + S: Stream> + Unpin, { - type Item = Bytes; - type Error = PayloadError; + type Output = Result; - fn poll(&mut self) -> Poll { - if let Some(err) = self.err.take() { - return Err(err); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + if let Some(err) = this.err.take() { + return Poll::Ready(Err(err)); } - if let Some(len) = self.length.take() { - if len > self.fut.as_ref().unwrap().limit { - return Err(PayloadError::Overflow); + if let Some(len) = this.length.take() { + if len > this.fut.as_ref().unwrap().limit { + return Poll::Ready(Err(PayloadError::Overflow)); } } - self.fut.as_mut().unwrap().poll() + Pin::new(&mut this.fut.as_mut().unwrap()).poll(cx) } } @@ -233,7 +235,7 @@ pub struct JsonBody { impl JsonBody where - S: Stream, + S: Stream>, U: DeserializeOwned, { /// Create `JsonBody` for request. @@ -279,27 +281,35 @@ where } } -impl Future for JsonBody +impl Unpin for JsonBody where - T: Stream, + T: Stream> + Unpin, U: DeserializeOwned, { - type Item = U; - type Error = JsonPayloadError; +} - fn poll(&mut self) -> Poll { +impl Future for JsonBody +where + T: Stream> + Unpin, + U: DeserializeOwned, +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { if let Some(err) = self.err.take() { - return Err(err); + return Poll::Ready(Err(err)); } if let Some(len) = self.length.take() { if len > self.fut.as_ref().unwrap().limit { - return Err(JsonPayloadError::Payload(PayloadError::Overflow)); + return Poll::Ready(Err(JsonPayloadError::Payload( + PayloadError::Overflow, + ))); } } - let body = futures::try_ready!(self.fut.as_mut().unwrap().poll()); - Ok(Async::Ready(serde_json::from_slice::(&body)?)) + let body = ready!(Pin::new(&mut self.get_mut().fut.as_mut().unwrap()).poll(cx))?; + Poll::Ready(serde_json::from_slice::(&body).map_err(JsonPayloadError::from)) } } @@ -321,24 +331,25 @@ impl ReadBody { impl Future for ReadBody where - S: Stream, + S: Stream> + Unpin, { - type Item = Bytes; - type Error = PayloadError; + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); - fn poll(&mut self) -> Poll { loop { - return match self.stream.poll()? { - Async::Ready(Some(chunk)) => { - if (self.buf.len() + chunk.len()) > self.limit { - Err(PayloadError::Overflow) + return match Pin::new(&mut this.stream).poll_next(cx)? { + Poll::Ready(Some(chunk)) => { + if (this.buf.len() + chunk.len()) > this.limit { + Poll::Ready(Err(PayloadError::Overflow)) } else { - self.buf.extend_from_slice(&chunk); + this.buf.extend_from_slice(&chunk); continue; } } - Async::Ready(None) => Ok(Async::Ready(self.buf.take().freeze())), - Async::NotReady => Ok(Async::NotReady), + Poll::Ready(None) => Poll::Ready(Ok(this.buf.take().freeze())), + Poll::Pending => Poll::Pending, }; } } diff --git a/awc/src/sender.rs b/awc/src/sender.rs index 95109b92d..f7461113a 100644 --- a/awc/src/sender.rs +++ b/awc/src/sender.rs @@ -1,13 +1,15 @@ use std::net; +use std::pin::Pin; use std::rc::Rc; -use std::time::{Duration, Instant}; +use std::task::{Context, Poll}; +use std::time::Duration; use bytes::Bytes; use derive_more::From; -use futures::{try_ready, Async, Future, Poll, Stream}; +use futures::{future::LocalBoxFuture, ready, Future, Stream}; use serde::Serialize; use serde_json; -use tokio_timer::Delay; +use tokio_timer::{delay_for, Delay}; use actix_http::body::{Body, BodyStream}; use actix_http::encoding::Decoder; @@ -47,7 +49,7 @@ impl Into for PrepForSendingError { #[must_use = "futures do nothing unless polled"] pub enum SendClientRequest { Fut( - Box>, + LocalBoxFuture<'static, Result>, Option, bool, ), @@ -56,41 +58,51 @@ pub enum SendClientRequest { impl SendClientRequest { pub(crate) fn new( - send: Box>, + send: LocalBoxFuture<'static, Result>, response_decompress: bool, timeout: Option, ) -> SendClientRequest { - let delay = timeout.map(|t| Delay::new(Instant::now() + t)); + let delay = timeout.map(|t| delay_for(t)); SendClientRequest::Fut(send, delay, response_decompress) } } impl Future for SendClientRequest { - type Item = ClientResponse>>; - type Error = SendRequestError; + type Output = + Result>>, SendRequestError>; - fn poll(&mut self) -> Poll { - match self { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + + match this { SendClientRequest::Fut(send, delay, response_decompress) => { if delay.is_some() { - match delay.poll() { - Ok(Async::NotReady) => (), - _ => return Err(SendRequestError::Timeout), + match Pin::new(delay.as_mut().unwrap()).poll(cx) { + Poll::Pending => (), + _ => return Poll::Ready(Err(SendRequestError::Timeout)), } } - let res = try_ready!(send.poll()).map_body(|head, payload| { - if *response_decompress { - Payload::Stream(Decoder::from_headers(payload, &head.headers)) - } else { - Payload::Stream(Decoder::new(payload, ContentEncoding::Identity)) - } + let res = ready!(Pin::new(send).poll(cx)).map(|res| { + res.map_body(|head, payload| { + if *response_decompress { + Payload::Stream(Decoder::from_headers( + payload, + &head.headers, + )) + } else { + Payload::Stream(Decoder::new( + payload, + ContentEncoding::Identity, + )) + } + }) }); - Ok(Async::Ready(res)) + Poll::Ready(res) } SendClientRequest::Err(ref mut e) => match e.take() { - Some(e) => Err(e), + Some(e) => Poll::Ready(Err(e)), None => panic!("Attempting to call completed future"), }, } @@ -223,7 +235,7 @@ impl RequestSender { stream: S, ) -> SendClientRequest where - S: Stream + 'static, + S: Stream> + Unpin + 'static, E: Into + 'static, { self.send_body( diff --git a/awc/src/ws.rs b/awc/src/ws.rs index 77cbc7ca4..979a382af 100644 --- a/awc/src/ws.rs +++ b/awc/src/ws.rs @@ -7,7 +7,6 @@ use std::{fmt, str}; use actix_codec::Framed; use actix_http::cookie::{Cookie, CookieJar}; use actix_http::{ws, Payload, RequestHead}; -use futures::future::{err, Either, Future}; use percent_encoding::percent_encode; use tokio_timer::Timeout; @@ -210,27 +209,26 @@ impl WebsocketsRequest { } /// Complete request construction and connect to a websockets server. - pub fn connect( + pub async fn connect( mut self, - ) -> impl Future), Error = WsClientError> - { + ) -> Result<(ClientResponse, Framed), WsClientError> { if let Some(e) = self.err.take() { - return Either::A(err(e.into())); + return Err(e.into()); } // validate uri let uri = &self.head.uri; if uri.host().is_none() { - return Either::A(err(InvalidUrl::MissingHost.into())); + return Err(InvalidUrl::MissingHost.into()); } else if uri.scheme_part().is_none() { - return Either::A(err(InvalidUrl::MissingScheme.into())); + return Err(InvalidUrl::MissingScheme.into()); } else if let Some(scheme) = uri.scheme_part() { match scheme.as_str() { "http" | "ws" | "https" | "wss" => (), - _ => return Either::A(err(InvalidUrl::UnknownScheme.into())), + _ => return Err(InvalidUrl::UnknownScheme.into()), } } else { - return Either::A(err(InvalidUrl::UnknownScheme.into())); + return Err(InvalidUrl::UnknownScheme.into()); } if !self.head.headers.contains_key(header::HOST) { @@ -294,90 +292,83 @@ impl WebsocketsRequest { .config .connector .borrow_mut() - .open_tunnel(head, self.addr) - .from_err() - .and_then(move |(head, framed)| { - // verify response - if head.status != StatusCode::SWITCHING_PROTOCOLS { - return Err(WsClientError::InvalidResponseStatus(head.status)); - } - // Check for "UPGRADE" to websocket header - let has_hdr = if let Some(hdr) = head.headers.get(&header::UPGRADE) { - if let Ok(s) = hdr.to_str() { - s.to_ascii_lowercase().contains("websocket") - } else { - false - } - } else { - false - }; - if !has_hdr { - log::trace!("Invalid upgrade header"); - return Err(WsClientError::InvalidUpgradeHeader); - } - // Check for "CONNECTION" header - if let Some(conn) = head.headers.get(&header::CONNECTION) { - if let Ok(s) = conn.to_str() { - if !s.to_ascii_lowercase().contains("upgrade") { - log::trace!("Invalid connection header: {}", s); - return Err(WsClientError::InvalidConnectionHeader( - conn.clone(), - )); - } - } else { - log::trace!("Invalid connection header: {:?}", conn); - return Err(WsClientError::InvalidConnectionHeader( - conn.clone(), - )); - } - } else { - log::trace!("Missing connection header"); - return Err(WsClientError::MissingConnectionHeader); - } - - if let Some(hdr_key) = head.headers.get(&header::SEC_WEBSOCKET_ACCEPT) { - let encoded = ws::hash_key(key.as_ref()); - if hdr_key.as_bytes() != encoded.as_bytes() { - log::trace!( - "Invalid challenge response: expected: {} received: {:?}", - encoded, - key - ); - return Err(WsClientError::InvalidChallengeResponse( - encoded, - hdr_key.clone(), - )); - } - } else { - log::trace!("Missing SEC-WEBSOCKET-ACCEPT header"); - return Err(WsClientError::MissingWebSocketAcceptHeader); - }; - - // response and ws framed - Ok(( - ClientResponse::new(head, Payload::None), - framed.map_codec(|_| { - if server_mode { - ws::Codec::new().max_size(max_size) - } else { - ws::Codec::new().max_size(max_size).client_mode() - } - }), - )) - }); + .open_tunnel(head, self.addr); // set request timeout - if let Some(timeout) = self.config.timeout { - Either::B(Either::A(Timeout::new(fut, timeout).map_err(|e| { - if let Some(e) = e.into_inner() { - e - } else { - SendRequestError::Timeout.into() - } - }))) + let (head, framed) = if let Some(timeout) = self.config.timeout { + Timeout::new(fut, timeout) + .await + .map_err(|_| SendRequestError::Timeout.into()) + .and_then(|res| res)? } else { - Either::B(Either::B(fut)) + fut.await? + }; + + // verify response + if head.status != StatusCode::SWITCHING_PROTOCOLS { + return Err(WsClientError::InvalidResponseStatus(head.status)); } + + // Check for "UPGRADE" to websocket header + let has_hdr = if let Some(hdr) = head.headers.get(&header::UPGRADE) { + if let Ok(s) = hdr.to_str() { + s.to_ascii_lowercase().contains("websocket") + } else { + false + } + } else { + false + }; + if !has_hdr { + log::trace!("Invalid upgrade header"); + return Err(WsClientError::InvalidUpgradeHeader); + } + + // Check for "CONNECTION" header + if let Some(conn) = head.headers.get(&header::CONNECTION) { + if let Ok(s) = conn.to_str() { + if !s.to_ascii_lowercase().contains("upgrade") { + log::trace!("Invalid connection header: {}", s); + return Err(WsClientError::InvalidConnectionHeader(conn.clone())); + } + } else { + log::trace!("Invalid connection header: {:?}", conn); + return Err(WsClientError::InvalidConnectionHeader(conn.clone())); + } + } else { + log::trace!("Missing connection header"); + return Err(WsClientError::MissingConnectionHeader); + } + + if let Some(hdr_key) = head.headers.get(&header::SEC_WEBSOCKET_ACCEPT) { + let encoded = ws::hash_key(key.as_ref()); + if hdr_key.as_bytes() != encoded.as_bytes() { + log::trace!( + "Invalid challenge response: expected: {} received: {:?}", + encoded, + key + ); + return Err(WsClientError::InvalidChallengeResponse( + encoded, + hdr_key.clone(), + )); + } + } else { + log::trace!("Missing SEC-WEBSOCKET-ACCEPT header"); + return Err(WsClientError::MissingWebSocketAcceptHeader); + }; + + // response and ws framed + Ok(( + ClientResponse::new(head, Payload::None), + framed.map_codec(|_| { + if server_mode { + ws::Codec::new().max_size(max_size) + } else { + ws::Codec::new().max_size(max_size).client_mode() + } + }), + )) } } diff --git a/test-server/Cargo.toml b/test-server/Cargo.toml index 77301b0a9..35bf1a0ed 100644 --- a/test-server/Cargo.toml +++ b/test-server/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous", license = "MIT/Apache-2.0" exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"] edition = "2018" -workspace = ".." +# workspace = ".." [package.metadata.docs.rs] features = [] @@ -27,20 +27,22 @@ path = "src/lib.rs" default = [] # openssl -ssl = ["openssl", "actix-server/ssl", "awc/ssl"] +openssl = ["open-ssl", "actix-server/openssl", "awc/openssl"] [dependencies] -actix-codec = "0.1.2" -actix-rt = "0.2.2" -actix-service = "0.4.1" -actix-server = "0.6.0" -actix-utils = "0.4.1" -awc = "0.2.6" -actix-connect = "0.2.2" +actix-service = "1.0.0-alpha.1" +actix-codec = "0.2.0-alpha.1" +actix-connect = "1.0.0-alpha.1" +actix-utils = "0.5.0-alpha.1" +actix-rt = "1.0.0-alpha.1" +actix-server = "0.8.0-alpha.1" +actix-server-config = "0.3.0-alpha.1" +actix-testing = "0.3.0-alpha.1" +awc = "0.3.0-alpha.1" base64 = "0.10" bytes = "0.4" -futures = "0.1" +futures = "0.3.1" http = "0.1.8" log = "0.4" env_logger = "0.6" @@ -51,10 +53,25 @@ sha1 = "0.6" slab = "0.4" serde_urlencoded = "0.6.1" time = "0.1" -tokio-tcp = "0.1" -tokio-timer = "0.2" -openssl = { version="0.10", optional = true } +tokio-net = "0.2.0-alpha.6" +tokio-timer = "0.3.0-alpha.6" + +open-ssl = { version="0.10", package="openssl", optional = true } [dev-dependencies] -actix-web = "1.0.7" -actix-http = "0.2.9" +#actix-web = "1.0.7" +actix-http = "0.3.0-alpha.1" + +[patch.crates-io] +actix-http = { path = "../actix-http" } +awc = { path = "../awc" } + +actix-codec = { path = "../../actix-net/actix-codec" } +actix-connect = { path = "../../actix-net/actix-connect" } +actix-rt = { path = "../../actix-net/actix-rt" } +actix-server = { path = "../../actix-net/actix-server" } +actix-server-config = { path = "../../actix-net/actix-server-config" } +actix-service = { path = "../../actix-net/actix-service" } +actix-testing = { path = "../../actix-net/actix-testing" } +actix-threadpool = { path = "../../actix-net/actix-threadpool" } +actix-utils = { path = "../../actix-net/actix-utils" } diff --git a/test-server/src/lib.rs b/test-server/src/lib.rs index ebdec688f..17acfe291 100644 --- a/test-server/src/lib.rs +++ b/test-server/src/lib.rs @@ -1,73 +1,18 @@ //! Various helpers for Actix applications to use during testing. -use std::cell::RefCell; use std::sync::mpsc; use std::{net, thread, time}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use actix_rt::{Runtime, System}; -use actix_server::{Server, StreamServiceFactory}; +use actix_rt::{System}; +use actix_server::{Server, ServiceFactory}; use awc::{error::PayloadError, ws, Client, ClientRequest, ClientResponse, Connector}; use bytes::Bytes; -use futures::future::lazy; -use futures::{Future, IntoFuture, Stream}; +use futures::{Stream, future::lazy}; use http::Method; use net2::TcpBuilder; -use tokio_tcp::TcpStream; +use tokio_net::tcp::TcpStream; -thread_local! { - static RT: RefCell = { - RefCell::new(Inner(Some(Runtime::new().unwrap()))) - }; -} - -struct Inner(Option); - -impl Inner { - fn get_mut(&mut self) -> &mut Runtime { - self.0.as_mut().unwrap() - } -} - -impl Drop for Inner { - fn drop(&mut self) { - std::mem::forget(self.0.take().unwrap()) - } -} - -/// Runs the provided future, blocking the current thread until the future -/// completes. -/// -/// This function can be used to synchronously block the current thread -/// until the provided `future` has resolved either successfully or with an -/// error. The result of the future is then returned from this function -/// call. -/// -/// Note that this function is intended to be used only for testing purpose. -/// This function panics on nested call. -pub fn block_on(f: F) -> Result -where - F: IntoFuture, -{ - RT.with(move |rt| rt.borrow_mut().get_mut().block_on(f.into_future())) -} - -/// Runs the provided function, blocking the current thread until the resul -/// future completes. -/// -/// This function can be used to synchronously block the current thread -/// until the provided `future` has resolved either successfully or with an -/// error. The result of the future is then returned from this function -/// call. -/// -/// Note that this function is intended to be used only for testing purpose. -/// This function panics on nested call. -pub fn block_fn(f: F) -> Result -where - F: FnOnce() -> R, - R: IntoFuture, -{ - RT.with(move |rt| rt.borrow_mut().get_mut().block_on(lazy(f))) -} +pub use actix_testing::*; /// The `TestServer` type. /// @@ -110,7 +55,7 @@ pub struct TestServerRuntime { impl TestServer { #[allow(clippy::new_ret_no_self)] /// Start new test server with application factory - pub fn new>(factory: F) -> TestServerRuntime { + pub fn new>(factory: F) -> TestServerRuntime { let (tx, rx) = mpsc::channel(); // run server in separate thread @@ -131,7 +76,7 @@ impl TestServer { let (system, addr) = rx.recv().unwrap(); - let client = block_on(lazy(move || { + let client = block_on(lazy(move |_| { let connector = { #[cfg(feature = "ssl")] { @@ -161,9 +106,9 @@ impl TestServer { })) .unwrap(); - block_on(lazy( - || Ok::<_, ()>(actix_connect::start_default_resolver()), - )) + block_on(lazy(|_| { + Ok::<_, ()>(actix_connect::start_default_resolver()) + })) .unwrap(); TestServerRuntime { @@ -185,31 +130,6 @@ impl TestServer { } impl TestServerRuntime { - /// Execute future on current core - pub fn block_on(&mut self, fut: F) -> Result - where - F: Future, - { - block_on(fut) - } - - /// Execute future on current core - pub fn block_on_fn(&mut self, f: F) -> Result - where - F: FnOnce() -> R, - R: Future, - { - block_on(lazy(f)) - } - - /// Execute function on current core - pub fn execute(&mut self, fut: F) -> R - where - F: FnOnce() -> R, - { - block_on(lazy(|| Ok::<_, ()>(fut()))).unwrap() - } - /// Construct test server url pub fn addr(&self) -> net::SocketAddr { self.addr @@ -313,9 +233,9 @@ impl TestServerRuntime { mut response: ClientResponse, ) -> Result where - S: Stream + 'static, + S: Stream> + Unpin + 'static, { - self.block_on(response.body().limit(10_485_760)) + block_on(response.body().limit(10_485_760)) } /// Connect to websocket server at a given path @@ -326,7 +246,9 @@ impl TestServerRuntime { { let url = self.url(path); let connect = self.client.ws(url).connect(); - block_on(lazy(move || connect.map(|(_, framed)| framed))) + block_on(async move { + connect.await.map(|(_, framed)| framed) + }) } /// Connect to a websocket server