From a64205e502b08b39e8f5feb15ae3086156bdb979 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 29 May 2018 16:32:39 -0700 Subject: [PATCH] refactor TransferEncoding; allow to use client api with threaded tokio runtime --- src/body.rs | 41 +---------- src/client/body.rs | 94 ++++++++++++++++++++++++ src/client/mod.rs | 16 ++++- src/client/pipeline.rs | 110 ++++++++++------------------- src/client/request.rs | 29 +++++--- src/client/writer.rs | 111 ++++++++++++++++------------- src/context.rs | 2 +- src/fs.rs | 1 - src/pipeline.rs | 2 +- src/server/encoding.rs | 157 ++++++++++++++++++++++------------------- src/server/h1writer.rs | 8 +-- src/server/h2writer.rs | 8 +-- src/server/shared.rs | 9 --- src/test.rs | 12 +--- src/ws/client.rs | 8 +-- src/ws/context.rs | 3 +- tests/test_client.rs | 5 +- tests/test_server.rs | 18 ++--- 18 files changed, 344 insertions(+), 290 deletions(-) create mode 100644 src/client/body.rs diff --git a/src/body.rs b/src/body.rs index 5ce0d1292..3838d8d07 100644 --- a/src/body.rs +++ b/src/body.rs @@ -1,6 +1,5 @@ use bytes::{Bytes, BytesMut}; use futures::Stream; -use std::rc::Rc; use std::sync::Arc; use std::{fmt, mem}; @@ -35,10 +34,8 @@ pub enum Binary { /// Static slice Slice(&'static [u8]), /// Shared string body - SharedString(Rc), - /// Shared string body #[doc(hidden)] - ArcSharedString(Arc), + SharedString(Arc), /// Shared vec body SharedVec(Arc>), } @@ -140,7 +137,6 @@ impl Binary { Binary::Bytes(ref bytes) => bytes.len(), Binary::Slice(slice) => slice.len(), Binary::SharedString(ref s) => s.len(), - Binary::ArcSharedString(ref s) => s.len(), Binary::SharedVec(ref s) => s.len(), } } @@ -162,7 +158,6 @@ impl Clone for Binary { Binary::Bytes(ref bytes) => Binary::Bytes(bytes.clone()), Binary::Slice(slice) => Binary::Bytes(Bytes::from(slice)), Binary::SharedString(ref s) => Binary::SharedString(s.clone()), - Binary::ArcSharedString(ref s) => Binary::ArcSharedString(s.clone()), Binary::SharedVec(ref s) => Binary::SharedVec(s.clone()), } } @@ -174,7 +169,6 @@ impl Into for Binary { Binary::Bytes(bytes) => bytes, Binary::Slice(slice) => Bytes::from(slice), Binary::SharedString(s) => Bytes::from(s.as_str()), - Binary::ArcSharedString(s) => Bytes::from(s.as_str()), Binary::SharedVec(s) => Bytes::from(AsRef::<[u8]>::as_ref(s.as_ref())), } } @@ -222,27 +216,15 @@ impl From for Binary { } } -impl From> for Binary { - fn from(body: Rc) -> Binary { - Binary::SharedString(body) - } -} - -impl<'a> From<&'a Rc> for Binary { - fn from(body: &'a Rc) -> Binary { - Binary::SharedString(Rc::clone(body)) - } -} - impl From> for Binary { fn from(body: Arc) -> Binary { - Binary::ArcSharedString(body) + Binary::SharedString(body) } } impl<'a> From<&'a Arc> for Binary { fn from(body: &'a Arc) -> Binary { - Binary::ArcSharedString(Arc::clone(body)) + Binary::SharedString(Arc::clone(body)) } } @@ -265,7 +247,6 @@ impl AsRef<[u8]> for Binary { Binary::Bytes(ref bytes) => bytes.as_ref(), Binary::Slice(slice) => slice, Binary::SharedString(ref s) => s.as_bytes(), - Binary::ArcSharedString(ref s) => s.as_bytes(), Binary::SharedVec(ref s) => s.as_ref().as_ref(), } } @@ -324,22 +305,6 @@ mod tests { assert_eq!(Binary::from(Bytes::from("test")).as_ref(), b"test"); } - #[test] - fn test_ref_string() { - let b = Rc::new("test".to_owned()); - assert_eq!(Binary::from(&b).len(), 4); - assert_eq!(Binary::from(&b).as_ref(), b"test"); - } - - #[test] - fn test_rc_string() { - let b = Rc::new("test".to_owned()); - assert_eq!(Binary::from(b.clone()).len(), 4); - assert_eq!(Binary::from(b.clone()).as_ref(), b"test"); - assert_eq!(Binary::from(&b).len(), 4); - assert_eq!(Binary::from(&b).as_ref(), b"test"); - } - #[test] fn test_arc_string() { let b = Arc::new("test".to_owned()); diff --git a/src/client/body.rs b/src/client/body.rs new file mode 100644 index 000000000..c79d7ad5e --- /dev/null +++ b/src/client/body.rs @@ -0,0 +1,94 @@ +use std::fmt; + +use bytes::Bytes; +use futures::Stream; + +use body::Binary; +use context::ActorHttpContext; +use error::Error; + +/// Type represent streaming body +pub type ClientBodyStream = Box + Send>; + +/// Represents various types of http message body. +pub enum ClientBody { + /// Empty response. `Content-Length` header is set to `0` + Empty, + /// Specific response body. + Binary(Binary), + /// Unspecified streaming response. Developer is responsible for setting + /// right `Content-Length` or `Transfer-Encoding` headers. + Streaming(ClientBodyStream), + /// Special body type for actor response. + Actor(Box), +} + +impl ClientBody { + /// Does this body streaming. + #[inline] + pub fn is_streaming(&self) -> bool { + match *self { + ClientBody::Streaming(_) | ClientBody::Actor(_) => true, + _ => false, + } + } + + /// Is this binary body. + #[inline] + pub fn is_binary(&self) -> bool { + match *self { + ClientBody::Binary(_) => true, + _ => false, + } + } + + /// Is this binary empy. + #[inline] + pub fn is_empty(&self) -> bool { + match *self { + ClientBody::Empty => true, + _ => false, + } + } + + /// Create body from slice (copy) + pub fn from_slice(s: &[u8]) -> ClientBody { + ClientBody::Binary(Binary::Bytes(Bytes::from(s))) + } +} + +impl PartialEq for ClientBody { + fn eq(&self, other: &ClientBody) -> bool { + match *self { + ClientBody::Empty => match *other { + ClientBody::Empty => true, + _ => false, + }, + ClientBody::Binary(ref b) => match *other { + ClientBody::Binary(ref b2) => b == b2, + _ => false, + }, + ClientBody::Streaming(_) | ClientBody::Actor(_) => false, + } + } +} + +impl fmt::Debug for ClientBody { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + ClientBody::Empty => write!(f, "ClientBody::Empty"), + ClientBody::Binary(ref b) => write!(f, "ClientBody::Binary({:?})", b), + ClientBody::Streaming(_) => write!(f, "ClientBody::Streaming(_)"), + ClientBody::Actor(_) => write!(f, "ClientBody::Actor(_)"), + } + } +} + +impl From for ClientBody +where + T: Into, +{ + fn from(b: T) -> ClientBody { + ClientBody::Binary(b.into()) + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 96033a211..8aded0114 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,11 +1,12 @@ //! Http client api //! -//! ```rust,ignore +//! ```rust //! # extern crate actix; //! # extern crate actix_web; //! # extern crate futures; //! # extern crate tokio; //! # use futures::Future; +//! # use std::process; //! use actix_web::client; //! //! fn main() { @@ -17,11 +18,14 @@ //! .map_err(|_| ()) //! .and_then(|response| { // <- server http response //! println!("Response: {:?}", response); +//! # process::exit(0); //! Ok(()) //! }) //! }); //! } //! ``` + +mod body; mod connector; mod parser; mod pipeline; @@ -29,6 +33,7 @@ mod request; mod response; mod writer; +pub use self::body::{ClientBody, ClientBodyStream}; pub use self::connector::{ ClientConnector, ClientConnectorError, ClientConnectorStats, Connect, Connection, Pause, Resume, @@ -56,11 +61,15 @@ impl ResponseError for SendRequestError { /// Create request builder for `GET` requests /// -/// ```rust,ignore +/// +/// ```rust /// # extern crate actix; /// # extern crate actix_web; /// # extern crate futures; -/// # use futures::{future, Future}; +/// # extern crate tokio; +/// # extern crate env_logger; +/// # use futures::Future; +/// # use std::process; /// use actix_web::client; /// /// fn main() { @@ -72,6 +81,7 @@ impl ResponseError for SendRequestError { /// .map_err(|_| ()) /// .and_then(|response| { // <- server http response /// println!("Response: {:?}", response); +/// # process::exit(0); /// Ok(()) /// })); /// } diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index 5543aa4c3..a2105ecb7 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -1,5 +1,5 @@ use bytes::{Bytes, BytesMut}; -use futures::unsync::oneshot; +use futures::sync::oneshot; use futures::{Async, Future, Poll}; use http::header::CONTENT_ENCODING; use std::time::{Duration, Instant}; @@ -8,18 +8,16 @@ use tokio_timer::Delay; use actix::prelude::*; -use super::HttpClientWriter; -use super::{ClientConnector, ClientConnectorError, Connect, Connection}; -use super::{ClientRequest, ClientResponse}; -use super::{HttpResponseParser, HttpResponseParserError}; -use body::{Body, BodyStream}; -use context::{ActorHttpContext, Frame}; +use super::{ + ClientBody, ClientBodyStream, ClientConnector, ClientConnectorError, ClientRequest, + ClientResponse, Connect, Connection, HttpClientWriter, HttpResponseParser, + HttpResponseParserError, +}; use error::Error; use error::PayloadError; use header::ContentEncoding; use httpmessage::HttpMessage; use server::encoding::PayloadStream; -use server::shared::SharedBytes; use server::WriterState; /// A set of errors that can occur during request sending and response reading @@ -68,7 +66,7 @@ enum State { pub struct SendRequest { req: ClientRequest, state: State, - conn: Addr, + conn: Option>, conn_timeout: Duration, wait_timeout: Duration, timeout: Option, @@ -76,7 +74,14 @@ pub struct SendRequest { impl SendRequest { pub(crate) fn new(req: ClientRequest) -> SendRequest { - SendRequest::with_connector(req, ClientConnector::from_registry()) + SendRequest { + req, + conn: None, + state: State::New, + timeout: None, + wait_timeout: Duration::from_secs(5), + conn_timeout: Duration::from_secs(1), + } } pub(crate) fn with_connector( @@ -84,7 +89,7 @@ impl SendRequest { ) -> SendRequest { SendRequest { req, - conn, + conn: Some(conn), state: State::New, timeout: None, wait_timeout: Duration::from_secs(5), @@ -96,7 +101,7 @@ impl SendRequest { SendRequest { req, state: State::Connection(conn), - conn: ClientConnector::from_registry(), + conn: None, timeout: None, wait_timeout: Duration::from_secs(5), conn_timeout: Duration::from_secs(1), @@ -142,7 +147,12 @@ impl Future for SendRequest { match state { State::New => { - self.state = State::Connect(self.conn.send(Connect { + let conn = if let Some(conn) = self.conn.take() { + conn + } else { + ClientConnector::from_registry() + }; + self.state = State::Connect(conn.send(Connect { uri: self.req.uri().clone(), wait_timeout: self.wait_timeout, conn_timeout: self.conn_timeout, @@ -160,16 +170,16 @@ impl Future for SendRequest { Err(_) => { return Err(SendRequestError::Connector( ClientConnectorError::Disconnected, - )) + )); } }, State::Connection(conn) => { - let mut writer = HttpClientWriter::new(SharedBytes::default()); + let mut writer = HttpClientWriter::new(); writer.start(&mut self.req)?; - let body = match self.req.replace_body(Body::Empty) { - Body::Streaming(stream) => IoBody::Payload(stream), - Body::Actor(ctx) => IoBody::Actor(ctx), + let body = match self.req.replace_body(ClientBody::Empty) { + ClientBody::Streaming(stream) => IoBody::Payload(stream), + ClientBody::Actor(_) => panic!("Client actor is not supported"), _ => IoBody::Done, }; @@ -208,7 +218,9 @@ impl Future for SendRequest { self.state = State::Send(pl); return Ok(Async::NotReady); } - Err(err) => return Err(SendRequestError::ParseError(err)), + Err(err) => { + return Err(SendRequestError::ParseError(err)); + } } } State::None => unreachable!(), @@ -233,8 +245,7 @@ pub(crate) struct Pipeline { } enum IoBody { - Payload(BodyStream), - Actor(Box), + Payload(ClientBodyStream), Done, } @@ -380,10 +391,7 @@ impl Pipeline { match self.timeout.as_mut().unwrap().poll() { Ok(Async::Ready(())) => return Err(SendRequestError::Timeout), Ok(Async::NotReady) => (), - Err(e) => { - println!("err: {:?}", e); - return Err(SendRequestError::Timeout); - } + Err(_) => return Err(SendRequestError::Timeout), } } Ok(()) @@ -397,66 +405,24 @@ impl Pipeline { let mut done = false; if self.drain.is_none() && self.write_state != RunningState::Paused { - 'outter: loop { + loop { let result = match mem::replace(&mut self.body, IoBody::Done) { - IoBody::Payload(mut body) => match body.poll()? { + IoBody::Payload(mut stream) => match stream.poll()? { Async::Ready(None) => { self.writer.write_eof()?; self.body_completed = true; break; } Async::Ready(Some(chunk)) => { - self.body = IoBody::Payload(body); - self.writer.write(chunk.into())? + self.body = IoBody::Payload(stream); + self.writer.write(chunk.as_ref())? } Async::NotReady => { done = true; - self.body = IoBody::Payload(body); + self.body = IoBody::Payload(stream); break; } }, - IoBody::Actor(mut ctx) => { - if self.disconnected { - ctx.disconnected(); - } - match ctx.poll()? { - Async::Ready(Some(vec)) => { - if vec.is_empty() { - self.body = IoBody::Actor(ctx); - break; - } - let mut res = None; - for frame in vec { - match frame { - Frame::Chunk(None) => { - self.body_completed = true; - self.writer.write_eof()?; - break 'outter; - } - Frame::Chunk(Some(chunk)) => { - res = Some(self.writer.write(chunk)?) - } - Frame::Drain(fut) => self.drain = Some(fut), - } - } - self.body = IoBody::Actor(ctx); - if self.drain.is_some() { - self.write_state.resume(); - break; - } - res.unwrap() - } - Async::Ready(None) => { - done = true; - break; - } - Async::NotReady => { - done = true; - self.body = IoBody::Actor(ctx); - break; - } - } - } IoBody::Done => { self.body_completed = true; done = true; diff --git a/src/client/request.rs b/src/client/request.rs index d4baa2546..97b97e01b 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -12,9 +12,9 @@ use serde::Serialize; use serde_json; use url::Url; +use super::body::ClientBody; use super::connector::{ClientConnector, Connection}; use super::pipeline::SendRequest; -use body::Body; use error::Error; use header::{ContentEncoding, Header, IntoHeaderValue}; use http::header::{self, HeaderName, HeaderValue}; @@ -24,11 +24,13 @@ use httprequest::HttpRequest; /// An HTTP Client Request /// -/// ```rust,ignore +/// ```rust /// # extern crate actix; /// # extern crate actix_web; /// # extern crate futures; +/// # extern crate tokio; /// # use futures::Future; +/// # use std::process; /// use actix_web::client::ClientRequest; /// /// fn main() { @@ -40,6 +42,7 @@ use httprequest::HttpRequest; /// .map_err(|_| ()) /// .and_then(|response| { // <- server http response /// println!("Response: {:?}", response); +/// # process::exit(0); /// Ok(()) /// }) /// ); @@ -50,7 +53,7 @@ pub struct ClientRequest { method: Method, version: Version, headers: HeaderMap, - body: Body, + body: ClientBody, chunked: bool, upgrade: bool, timeout: Option, @@ -73,7 +76,7 @@ impl Default for ClientRequest { method: Method::default(), version: Version::HTTP_11, headers: HeaderMap::with_capacity(16), - body: Body::Empty, + body: ClientBody::Empty, chunked: false, upgrade: false, timeout: None, @@ -217,17 +220,17 @@ impl ClientRequest { /// Get body of this response #[inline] - pub fn body(&self) -> &Body { + pub fn body(&self) -> &ClientBody { &self.body } /// Set a body - pub fn set_body>(&mut self, body: B) { + pub fn set_body>(&mut self, body: B) { self.body = body.into(); } /// Extract body, replace it with `Empty` - pub(crate) fn replace_body(&mut self, body: Body) -> Body { + pub(crate) fn replace_body(&mut self, body: ClientBody) -> ClientBody { mem::replace(&mut self.body, body) } @@ -578,7 +581,9 @@ impl ClientRequestBuilder { /// Set a body and generate `ClientRequest`. /// /// `ClientRequestBuilder` can not be used after this call. - pub fn body>(&mut self, body: B) -> Result { + pub fn body>( + &mut self, body: B, + ) -> Result { if let Some(e) = self.err.take() { return Err(e.into()); } @@ -644,17 +649,19 @@ impl ClientRequestBuilder { /// `ClientRequestBuilder` can not be used after this call. pub fn streaming(&mut self, stream: S) -> Result where - S: Stream + 'static, + S: Stream + Send + 'static, E: Into, { - self.body(Body::Streaming(Box::new(stream.map_err(|e| e.into())))) + self.body(ClientBody::Streaming(Box::new( + stream.map_err(|e| e.into()), + ))) } /// Set an empty body and generate `ClientRequest` /// /// `ClientRequestBuilder` can not be used after this call. pub fn finish(&mut self) -> Result { - self.body(Body::Empty) + self.body(ClientBody::Empty) } /// This method construct new `ClientRequestBuilder` diff --git a/src/client/writer.rs b/src/client/writer.rs index addc03240..dfbce1f3e 100644 --- a/src/client/writer.rs +++ b/src/client/writer.rs @@ -19,13 +19,12 @@ use http::{HttpTryFrom, Version}; use time::{self, Duration}; use tokio_io::AsyncWrite; -use body::{Binary, Body}; +use body::Binary; use header::ContentEncoding; use server::encoding::{ContentEncoder, TransferEncoding}; -use server::shared::SharedBytes; use server::WriterState; -use client::ClientRequest; +use client::{ClientBody, ClientRequest}; const AVERAGE_HEADER_SIZE: usize = 30; @@ -42,20 +41,20 @@ pub(crate) struct HttpClientWriter { flags: Flags, written: u64, headers_size: u32, - buffer: SharedBytes, + buffer: Box, buffer_capacity: usize, encoder: ContentEncoder, } impl HttpClientWriter { - pub fn new(buffer: SharedBytes) -> HttpClientWriter { - let encoder = ContentEncoder::Identity(TransferEncoding::eof(buffer.clone())); + pub fn new() -> HttpClientWriter { + let encoder = ContentEncoder::Identity(TransferEncoding::eof()); HttpClientWriter { flags: Flags::empty(), written: 0, headers_size: 0, buffer_capacity: 0, - buffer, + buffer: Box::new(BytesMut::new()), encoder, } } @@ -98,12 +97,23 @@ impl HttpClientWriter { } } +pub struct Writer<'a>(pub &'a mut BytesMut); + +impl<'a> io::Write for Writer<'a> { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + impl HttpClientWriter { pub fn start(&mut self, msg: &mut ClientRequest) -> io::Result<()> { // prepare task self.flags.insert(Flags::STARTED); - self.encoder = content_encoder(self.buffer.clone(), msg); - + self.encoder = content_encoder(self.buffer.as_mut(), msg); if msg.upgrade() { self.flags.insert(Flags::UPGRADE); } @@ -112,7 +122,7 @@ impl HttpClientWriter { { // status line writeln!( - self.buffer, + Writer(&mut self.buffer), "{} {} {:?}\r", msg.method(), msg.uri() @@ -120,40 +130,41 @@ impl HttpClientWriter { .map(|u| u.as_str()) .unwrap_or("/"), msg.version() - )?; + ).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; // write headers - let mut buffer = self.buffer.get_mut(); - if let Body::Binary(ref bytes) = *msg.body() { - buffer.reserve(msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); + if let ClientBody::Binary(ref bytes) = *msg.body() { + self.buffer + .reserve(msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); } else { - buffer.reserve(msg.headers().len() * AVERAGE_HEADER_SIZE); + self.buffer + .reserve(msg.headers().len() * AVERAGE_HEADER_SIZE); } for (key, value) in msg.headers() { let v = value.as_ref(); let k = key.as_str().as_bytes(); - buffer.reserve(k.len() + v.len() + 4); - buffer.put_slice(k); - buffer.put_slice(b": "); - buffer.put_slice(v); - buffer.put_slice(b"\r\n"); + self.buffer.reserve(k.len() + v.len() + 4); + self.buffer.put_slice(k); + self.buffer.put_slice(b": "); + self.buffer.put_slice(v); + self.buffer.put_slice(b"\r\n"); } // set date header if !msg.headers().contains_key(DATE) { - buffer.extend_from_slice(b"date: "); - set_date(&mut buffer); - buffer.extend_from_slice(b"\r\n\r\n"); + self.buffer.extend_from_slice(b"date: "); + set_date(&mut self.buffer); + self.buffer.extend_from_slice(b"\r\n\r\n"); } else { - buffer.extend_from_slice(b"\r\n"); + self.buffer.extend_from_slice(b"\r\n"); } - self.headers_size = buffer.len() as u32; + self.headers_size = self.buffer.len() as u32; if msg.body().is_binary() { - if let Body::Binary(bytes) = msg.replace_body(Body::Empty) { + if let ClientBody::Binary(bytes) = msg.replace_body(ClientBody::Empty) { self.written += bytes.len() as u64; - self.encoder.write(bytes)?; + self.encoder.write(bytes.as_ref())?; } } else { self.buffer_capacity = msg.write_buffer_capacity(); @@ -162,7 +173,7 @@ impl HttpClientWriter { Ok(()) } - pub fn write(&mut self, payload: Binary) -> io::Result { + pub fn write(&mut self, payload: &[u8]) -> io::Result { self.written += payload.len() as u64; if !self.flags.contains(Flags::DISCONNECTED) { if self.flags.contains(Flags::UPGRADE) { @@ -210,20 +221,21 @@ impl HttpClientWriter { } } -fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder { +fn content_encoder(buf: &mut BytesMut, req: &mut ClientRequest) -> ContentEncoder { let version = req.version(); - let mut body = req.replace_body(Body::Empty); + let mut body = req.replace_body(ClientBody::Empty); let mut encoding = req.content_encoding(); - let transfer = match body { - Body::Empty => { + let mut transfer = match body { + ClientBody::Empty => { req.headers_mut().remove(CONTENT_LENGTH); - TransferEncoding::length(0, buf) + TransferEncoding::length(0) } - Body::Binary(ref mut bytes) => { + ClientBody::Binary(ref mut bytes) => { if encoding.is_compression() { - let tmp = SharedBytes::default(); - let transfer = TransferEncoding::eof(tmp.clone()); + let mut tmp = BytesMut::new(); + let mut transfer = TransferEncoding::eof(); + transfer.set_buffer(&mut tmp); let mut enc = match encoding { #[cfg(feature = "flate2")] ContentEncoding::Deflate => ContentEncoder::Deflate( @@ -242,7 +254,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder ContentEncoding::Auto => unreachable!(), }; // TODO return error! - let _ = enc.write(bytes.clone()); + let _ = enc.write(bytes.as_ref()); let _ = enc.write_eof(); *bytes = Binary::from(tmp.take()); @@ -256,9 +268,9 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder let _ = write!(b, "{}", bytes.len()); req.headers_mut() .insert(CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap()); - TransferEncoding::eof(buf) + TransferEncoding::eof() } - Body::Streaming(_) | Body::Actor(_) => { + ClientBody::Streaming(_) | ClientBody::Actor(_) => { if req.upgrade() { if version == Version::HTTP_2 { error!("Connection upgrade is forbidden for HTTP/2"); @@ -270,9 +282,9 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder encoding = ContentEncoding::Identity; req.headers_mut().remove(CONTENT_ENCODING); } - TransferEncoding::eof(buf) + TransferEncoding::eof() } else { - streaming_encoding(buf, version, req) + streaming_encoding(version, req) } } }; @@ -283,6 +295,7 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder HeaderValue::from_static(encoding.as_str()), ); } + transfer.set_buffer(buf); req.replace_body(body); match encoding { @@ -303,19 +316,17 @@ fn content_encoder(buf: SharedBytes, req: &mut ClientRequest) -> ContentEncoder } } -fn streaming_encoding( - buf: SharedBytes, version: Version, req: &mut ClientRequest, -) -> TransferEncoding { +fn streaming_encoding(version: Version, req: &mut ClientRequest) -> TransferEncoding { if req.chunked() { // Enable transfer encoding req.headers_mut().remove(CONTENT_LENGTH); if version == Version::HTTP_2 { req.headers_mut().remove(TRANSFER_ENCODING); - TransferEncoding::eof(buf) + TransferEncoding::eof() } else { req.headers_mut() .insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - TransferEncoding::chunked(buf) + TransferEncoding::chunked() } } else { // if Content-Length is specified, then use it as length hint @@ -338,9 +349,9 @@ fn streaming_encoding( if !chunked { if let Some(len) = len { - TransferEncoding::length(len, buf) + TransferEncoding::length(len) } else { - TransferEncoding::eof(buf) + TransferEncoding::eof() } } else { // Enable transfer encoding @@ -348,11 +359,11 @@ fn streaming_encoding( Version::HTTP_11 => { req.headers_mut() .insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - TransferEncoding::chunked(buf) + TransferEncoding::chunked() } _ => { req.headers_mut().remove(TRANSFER_ENCODING); - TransferEncoding::eof(buf) + TransferEncoding::eof() } } } diff --git a/src/context.rs b/src/context.rs index 76594e7bf..b40b4bbbc 100644 --- a/src/context.rs +++ b/src/context.rs @@ -1,5 +1,5 @@ +use futures::sync::oneshot; use futures::sync::oneshot::Sender; -use futures::unsync::oneshot; use futures::{Async, Future, Poll}; use smallvec::SmallVec; use std::marker::PhantomData; diff --git a/src/fs.rs b/src/fs.rs index a8f66dede..8a3621072 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -801,7 +801,6 @@ mod tests { .finish() .unwrap(); let response = srv.execute(request.send()).unwrap(); - assert_eq!(response.status(), StatusCode::PARTIAL_CONTENT); // Invalid range header diff --git a/src/pipeline.rs b/src/pipeline.rs index be85dc330..289c5fcbb 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -3,7 +3,7 @@ use std::marker::PhantomData; use std::rc::Rc; use std::{io, mem}; -use futures::unsync::oneshot; +use futures::sync::oneshot; use futures::{Async, Future, Poll, Stream}; use log::Level::Debug; diff --git a/src/server/encoding.rs b/src/server/encoding.rs index 4379a4ba2..6d814482e 100644 --- a/src/server/encoding.rs +++ b/src/server/encoding.rs @@ -1,7 +1,7 @@ use std::fmt::Write as FmtWrite; use std::io::{Read, Write}; use std::str::FromStr; -use std::{cmp, io, mem}; +use std::{cmp, io, mem, ptr}; #[cfg(feature = "brotli")] use brotli2::write::{BrotliDecoder, BrotliEncoder}; @@ -25,8 +25,6 @@ use httprequest::HttpInnerMessage; use httpresponse::HttpResponse; use payload::{PayloadSender, PayloadStatus, PayloadWriter}; -use super::shared::SharedBytes; - pub(crate) enum PayloadType { Sender(PayloadSender), Encoding(Box), @@ -381,12 +379,12 @@ pub(crate) enum ContentEncoder { } impl ContentEncoder { - pub fn empty(bytes: SharedBytes) -> ContentEncoder { - ContentEncoder::Identity(TransferEncoding::eof(bytes)) + pub fn empty() -> ContentEncoder { + ContentEncoder::Identity(TransferEncoding::eof()) } pub fn for_server( - buf: SharedBytes, req: &HttpInnerMessage, resp: &mut HttpResponse, + buf: &mut BytesMut, req: &HttpInnerMessage, resp: &mut HttpResponse, response_encoding: ContentEncoding, ) -> ContentEncoder { let version = resp.version().unwrap_or_else(|| req.version); @@ -441,7 +439,7 @@ impl ContentEncoder { if req.method != Method::HEAD { resp.headers_mut().remove(CONTENT_LENGTH); } - TransferEncoding::length(0, buf) + TransferEncoding::length(0) } &Body::Binary(_) => { #[cfg(any(feature = "brotli", feature = "flate2"))] @@ -449,8 +447,9 @@ impl ContentEncoder { if !(encoding == ContentEncoding::Identity || encoding == ContentEncoding::Auto) { - let tmp = SharedBytes::default(); - let transfer = TransferEncoding::eof(tmp.clone()); + let mut tmp = BytesMut::default(); + let mut transfer = TransferEncoding::eof(); + transfer.set_buffer(&mut tmp); let mut enc = match encoding { #[cfg(feature = "flate2")] ContentEncoding::Deflate => ContentEncoder::Deflate( @@ -472,7 +471,7 @@ impl ContentEncoder { let bin = resp.replace_body(Body::Empty).binary(); // TODO return error! - let _ = enc.write(bin); + let _ = enc.write(bin.as_ref()); let _ = enc.write_eof(); let body = tmp.take(); len = body.len(); @@ -492,7 +491,7 @@ impl ContentEncoder { } else { // resp.headers_mut().remove(CONTENT_LENGTH); } - TransferEncoding::eof(buf) + TransferEncoding::eof() } &Body::Streaming(_) | &Body::Actor(_) => { if resp.upgrade() { @@ -503,14 +502,14 @@ impl ContentEncoder { encoding = ContentEncoding::Identity; resp.headers_mut().remove(CONTENT_ENCODING); } - TransferEncoding::eof(buf) + TransferEncoding::eof() } else { if !(encoding == ContentEncoding::Identity || encoding == ContentEncoding::Auto) { resp.headers_mut().remove(CONTENT_LENGTH); } - ContentEncoder::streaming_encoding(buf, version, resp) + ContentEncoder::streaming_encoding(version, resp) } } }; @@ -519,6 +518,7 @@ impl ContentEncoder { resp.set_body(Body::Empty); transfer.kind = TransferEncodingKind::Length(0); } + transfer.set_buffer(buf); match encoding { #[cfg(feature = "flate2")] @@ -539,7 +539,7 @@ impl ContentEncoder { } fn streaming_encoding( - buf: SharedBytes, version: Version, resp: &mut HttpResponse, + version: Version, resp: &mut HttpResponse, ) -> TransferEncoding { match resp.chunked() { Some(true) => { @@ -547,14 +547,14 @@ impl ContentEncoder { resp.headers_mut().remove(CONTENT_LENGTH); if version == Version::HTTP_2 { resp.headers_mut().remove(TRANSFER_ENCODING); - TransferEncoding::eof(buf) + TransferEncoding::eof() } else { resp.headers_mut() .insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - TransferEncoding::chunked(buf) + TransferEncoding::chunked() } } - Some(false) => TransferEncoding::eof(buf), + Some(false) => TransferEncoding::eof(), None => { // if Content-Length is specified, then use it as length hint let (len, chunked) = @@ -577,9 +577,9 @@ impl ContentEncoder { if !chunked { if let Some(len) = len { - TransferEncoding::length(len, buf) + TransferEncoding::length(len) } else { - TransferEncoding::eof(buf) + TransferEncoding::eof() } } else { // Enable transfer encoding @@ -589,11 +589,11 @@ impl ContentEncoder { TRANSFER_ENCODING, HeaderValue::from_static("chunked"), ); - TransferEncoding::chunked(buf) + TransferEncoding::chunked() } _ => { resp.headers_mut().remove(TRANSFER_ENCODING); - TransferEncoding::eof(buf) + TransferEncoding::eof() } } } @@ -619,10 +619,8 @@ impl ContentEncoder { #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] #[inline(always)] pub fn write_eof(&mut self) -> Result { - let encoder = mem::replace( - self, - ContentEncoder::Identity(TransferEncoding::eof(SharedBytes::empty())), - ); + let encoder = + mem::replace(self, ContentEncoder::Identity(TransferEncoding::eof())); match encoder { #[cfg(feature = "brotli")] @@ -662,38 +660,32 @@ impl ContentEncoder { #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] #[inline(always)] - pub fn write(&mut self, data: Binary) -> Result<(), io::Error> { + pub fn write(&mut self, data: &[u8]) -> Result<(), io::Error> { match *self { #[cfg(feature = "brotli")] - ContentEncoder::Br(ref mut encoder) => { - match encoder.write_all(data.as_ref()) { - Ok(_) => Ok(()), - Err(err) => { - trace!("Error decoding br encoding: {}", err); - Err(err) - } + ContentEncoder::Br(ref mut encoder) => match encoder.write_all(data) { + Ok(_) => Ok(()), + Err(err) => { + trace!("Error decoding br encoding: {}", err); + Err(err) } - } + }, #[cfg(feature = "flate2")] - ContentEncoder::Gzip(ref mut encoder) => { - match encoder.write_all(data.as_ref()) { - Ok(_) => Ok(()), - Err(err) => { - trace!("Error decoding gzip encoding: {}", err); - Err(err) - } + ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) { + Ok(_) => Ok(()), + Err(err) => { + trace!("Error decoding gzip encoding: {}", err); + Err(err) } - } + }, #[cfg(feature = "flate2")] - ContentEncoder::Deflate(ref mut encoder) => { - match encoder.write_all(data.as_ref()) { - Ok(_) => Ok(()), - Err(err) => { - trace!("Error decoding deflate encoding: {}", err); - Err(err) - } + ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) { + Ok(_) => Ok(()), + Err(err) => { + trace!("Error decoding deflate encoding: {}", err); + Err(err) } - } + }, ContentEncoder::Identity(ref mut encoder) => { encoder.encode(data)?; Ok(()) @@ -705,10 +697,12 @@ impl ContentEncoder { /// Encoders to handle different Transfer-Encodings. #[derive(Debug, Clone)] pub(crate) struct TransferEncoding { + buf: *mut BytesMut, kind: TransferEncodingKind, - buffer: SharedBytes, } +unsafe impl Send for TransferEncoding {} + #[derive(Debug, PartialEq, Clone)] enum TransferEncodingKind { /// An Encoder for when Transfer-Encoding includes `chunked`. @@ -724,27 +718,31 @@ enum TransferEncodingKind { } impl TransferEncoding { + pub(crate) fn set_buffer(&mut self, buf: *mut BytesMut) { + self.buf = buf; + } + #[inline] - pub fn eof(bytes: SharedBytes) -> TransferEncoding { + pub fn eof() -> TransferEncoding { TransferEncoding { kind: TransferEncodingKind::Eof, - buffer: bytes, + buf: ptr::null_mut(), } } #[inline] - pub fn chunked(bytes: SharedBytes) -> TransferEncoding { + pub fn chunked() -> TransferEncoding { TransferEncoding { kind: TransferEncodingKind::Chunked(false), - buffer: bytes, + buf: ptr::null_mut(), } } #[inline] - pub fn length(len: u64, bytes: SharedBytes) -> TransferEncoding { + pub fn length(len: u64) -> TransferEncoding { TransferEncoding { kind: TransferEncodingKind::Length(len), - buffer: bytes, + buf: ptr::null_mut(), } } @@ -759,11 +757,13 @@ impl TransferEncoding { /// Encode message. Return `EOF` state of encoder #[inline] - pub fn encode(&mut self, mut msg: Binary) -> io::Result { + pub fn encode(&mut self, msg: &[u8]) -> io::Result { match self.kind { TransferEncodingKind::Eof => { let eof = msg.is_empty(); - self.buffer.extend(msg); + debug_assert!(!self.buf.is_null()); + let buf = unsafe { &mut *self.buf }; + buf.extend(msg); Ok(eof) } TransferEncodingKind::Chunked(ref mut eof) => { @@ -773,15 +773,20 @@ impl TransferEncoding { if msg.is_empty() { *eof = true; - self.buffer.extend_from_slice(b"0\r\n\r\n"); + debug_assert!(!self.buf.is_null()); + let buf = unsafe { &mut *self.buf }; + buf.extend_from_slice(b"0\r\n\r\n"); } else { let mut buf = BytesMut::new(); writeln!(&mut buf, "{:X}\r", msg.len()) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - self.buffer.reserve(buf.len() + msg.len() + 2); - self.buffer.extend(buf.into()); - self.buffer.extend(msg); - self.buffer.extend_from_slice(b"\r\n"); + + debug_assert!(!self.buf.is_null()); + let b = unsafe { &mut *self.buf }; + b.reserve(buf.len() + msg.len() + 2); + b.extend_from_slice(buf.as_ref()); + b.extend_from_slice(msg); + b.extend_from_slice(b"\r\n"); } Ok(*eof) } @@ -791,7 +796,9 @@ impl TransferEncoding { return Ok(*remaining == 0); } let len = cmp::min(*remaining, msg.len() as u64); - self.buffer.extend(msg.take().split_to(len as usize).into()); + + debug_assert!(!self.buf.is_null()); + unsafe { &mut *self.buf }.extend(&msg[..len as usize]); *remaining -= len as u64; Ok(*remaining == 0) @@ -811,7 +818,10 @@ impl TransferEncoding { TransferEncodingKind::Chunked(ref mut eof) => { if !*eof { *eof = true; - self.buffer.extend_from_slice(b"0\r\n\r\n"); + + debug_assert!(!self.buf.is_null()); + let buf = unsafe { &mut *self.buf }; + buf.extend_from_slice(b"0\r\n\r\n"); } true } @@ -822,7 +832,7 @@ impl TransferEncoding { impl io::Write for TransferEncoding { #[inline] fn write(&mut self, buf: &[u8]) -> io::Result { - self.encode(Binary::from_slice(buf))?; + self.encode(buf)?; Ok(buf.len()) } @@ -904,12 +914,15 @@ mod tests { #[test] fn test_chunked_te() { - let bytes = SharedBytes::default(); - let mut enc = TransferEncoding::chunked(bytes.clone()); - assert!(!enc.encode(Binary::from(b"test".as_ref())).ok().unwrap()); - assert!(enc.encode(Binary::from(b"".as_ref())).ok().unwrap()); + let mut bytes = BytesMut::new(); + let mut enc = TransferEncoding::chunked(); + { + enc.set_buffer(&mut bytes); + assert!(!enc.encode(b"test").ok().unwrap()); + assert!(enc.encode(b"").ok().unwrap()); + } assert_eq!( - bytes.get_mut().take().freeze(), + bytes.take().freeze(), Bytes::from_static(b"4\r\ntest\r\n0\r\n\r\n") ); } diff --git a/src/server/h1writer.rs b/src/server/h1writer.rs index 648a164f3..a144a2ff9 100644 --- a/src/server/h1writer.rs +++ b/src/server/h1writer.rs @@ -46,7 +46,7 @@ impl H1Writer { ) -> H1Writer { H1Writer { flags: Flags::KEEPALIVE, - encoder: ContentEncoder::empty(buf.clone()), + encoder: ContentEncoder::empty(), written: 0, headers_size: 0, buffer: buf, @@ -116,7 +116,7 @@ impl Writer for H1Writer { ) -> io::Result { // prepare task self.encoder = - ContentEncoder::for_server(self.buffer.clone(), req, msg, encoding); + ContentEncoder::for_server(self.buffer.get_mut(), req, msg, encoding); if msg.keep_alive().unwrap_or_else(|| req.keep_alive()) { self.flags = Flags::STARTED | Flags::KEEPALIVE; } else { @@ -223,7 +223,7 @@ impl Writer for H1Writer { if let Body::Binary(bytes) = body { self.written = bytes.len() as u64; - self.encoder.write(bytes)?; + self.encoder.write(bytes.as_ref())?; } else { // capacity, makes sense only for streaming or actor self.buffer_capacity = msg.write_buffer_capacity(); @@ -251,7 +251,7 @@ impl Writer for H1Writer { } } else { // TODO: add warning, write after EOF - self.encoder.write(payload)?; + self.encoder.write(payload.as_ref())?; } } else { // could be response to EXCEPT header diff --git a/src/server/h2writer.rs b/src/server/h2writer.rs index 5fc13154a..78a1ce18b 100644 --- a/src/server/h2writer.rs +++ b/src/server/h2writer.rs @@ -51,7 +51,7 @@ impl H2Writer { respond, settings, stream: None, - encoder: ContentEncoder::empty(buf.clone()), + encoder: ContentEncoder::empty(), flags: Flags::empty(), written: 0, buffer: buf, @@ -88,7 +88,7 @@ impl Writer for H2Writer { // prepare response self.flags.insert(Flags::STARTED); self.encoder = - ContentEncoder::for_server(self.buffer.clone(), req, msg, encoding); + ContentEncoder::for_server(self.buffer.get_mut(), req, msg, encoding); if let Body::Empty = *msg.body() { self.flags.insert(Flags::EOF); } @@ -143,7 +143,7 @@ impl Writer for H2Writer { if let Body::Binary(bytes) = body { self.flags.insert(Flags::EOF); self.written = bytes.len() as u64; - self.encoder.write(bytes)?; + self.encoder.write(bytes.as_ref())?; if let Some(ref mut stream) = self.stream { self.flags.insert(Flags::RESERVED); stream.reserve_capacity(cmp::min(self.buffer.len(), CHUNK_SIZE)); @@ -162,7 +162,7 @@ impl Writer for H2Writer { if !self.flags.contains(Flags::DISCONNECTED) { if self.flags.contains(Flags::STARTED) { // TODO: add warning, write after EOF - self.encoder.write(payload)?; + self.encoder.write(payload.as_ref())?; } else { // might be response for EXCEPT self.buffer.extend_from_slice(payload.as_ref()) diff --git a/src/server/shared.rs b/src/server/shared.rs index 2d7e285b3..54f7b1e65 100644 --- a/src/server/shared.rs +++ b/src/server/shared.rs @@ -48,10 +48,6 @@ impl Drop for SharedBytes { } impl SharedBytes { - pub fn empty() -> Self { - SharedBytes(None, None) - } - pub fn new(bytes: Rc, pool: Rc) -> SharedBytes { SharedBytes(Some(bytes), Some(pool)) } @@ -87,11 +83,6 @@ impl SharedBytes { self.get_mut().take() } - #[inline] - pub fn reserve(&self, cnt: usize) { - self.get_mut().reserve(cnt) - } - #[inline] #[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))] pub fn extend(&self, data: Binary) { diff --git a/src/test.rs b/src/test.rs index 28403625b..558695ad7 100644 --- a/src/test.rs +++ b/src/test.rs @@ -60,7 +60,6 @@ use ws; /// ``` pub struct TestServer { addr: net::SocketAddr, - thread: Option>, server_sys: Addr, ssl: bool, conn: Addr, @@ -107,7 +106,7 @@ impl TestServer { let (tx, rx) = mpsc::channel(); // run server in separate thread - let join = thread::spawn(move || { + thread::spawn(move || { let sys = System::new("actix-test-server"); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); @@ -134,7 +133,6 @@ impl TestServer { server_sys, conn, ssl: false, - thread: Some(join), rt: Runtime::new().unwrap(), } } @@ -190,10 +188,7 @@ impl TestServer { /// Stop http server fn stop(&mut self) { - if let Some(handle) = self.thread.take() { - self.server_sys.do_send(msgs::SystemExit(0)); - let _ = handle.join(); - } + self.server_sys.do_send(msgs::SystemExit(0)); } /// Execute future on current core @@ -287,7 +282,7 @@ impl TestServerBuilder { let ssl = false; // run server in separate thread - let join = thread::spawn(move || { + thread::spawn(move || { let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); @@ -332,7 +327,6 @@ impl TestServerBuilder { ssl, conn, server_sys, - thread: Some(join), rt: Runtime::new().unwrap(), } } diff --git a/src/ws/client.rs b/src/ws/client.rs index f8366752e..fcf6ed40a 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -7,7 +7,7 @@ use std::{fmt, io, str}; use base64; use bytes::Bytes; use cookie::Cookie; -use futures::unsync::mpsc::{unbounded, UnboundedSender}; +use futures::sync::mpsc::{unbounded, UnboundedSender}; use futures::{Async, Future, Poll, Stream}; use http::header::{self, HeaderName, HeaderValue}; use http::{Error as HttpError, HttpTryFrom, StatusCode}; @@ -16,14 +16,14 @@ use sha1::Sha1; use actix::prelude::*; -use body::{Binary, Body}; +use body::Binary; use error::{Error, UrlParseError}; use header::IntoHeaderValue; use httpmessage::HttpMessage; use payload::PayloadHelper; use client::{ - ClientConnector, ClientRequest, ClientRequestBuilder, ClientResponse, + ClientBody, ClientConnector, ClientRequest, ClientRequestBuilder, ClientResponse, HttpResponseParserError, SendRequest, SendRequestError, }; @@ -283,7 +283,7 @@ impl ClientHandshake { ); let (tx, rx) = unbounded(); - request.set_body(Body::Streaming(Box::new(rx.map_err(|_| { + request.set_body(ClientBody::Streaming(Box::new(rx.map_err(|_| { io::Error::new(io::ErrorKind::Other, "disconnected").into() })))); diff --git a/src/ws/context.rs b/src/ws/context.rs index 22323f49f..2d7802b0a 100644 --- a/src/ws/context.rs +++ b/src/ws/context.rs @@ -1,5 +1,4 @@ -use futures::sync::oneshot::Sender; -use futures::unsync::oneshot; +use futures::sync::oneshot::{self, Sender}; use futures::{Async, Poll}; use smallvec::SmallVec; diff --git a/tests/test_client.rs b/tests/test_client.rs index 5496e59fd..9b9fb0e82 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -343,7 +343,10 @@ fn test_client_streaming_explicit() { let body = once(Ok(Bytes::from_static(STR.as_ref()))); - let request = srv.get().body(Body::Streaming(Box::new(body))).unwrap(); + let request = srv + .get() + .body(client::ClientBody::Streaming(Box::new(body))) + .unwrap(); let response = srv.execute(request.send()).unwrap(); assert!(response.status().is_success()); diff --git a/tests/test_server.rs b/tests/test_server.rs index 120eef06c..ed2848f79 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -32,7 +32,7 @@ use tokio::executor::current_thread; use tokio::runtime::current_thread::Runtime; use tokio_tcp::TcpStream; -use actix::System; +use actix::{msgs::SystemExit, Arbiter, System}; use actix_web::*; const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ @@ -74,12 +74,11 @@ fn test_start() { let srv = srv.bind("127.0.0.1:0").unwrap(); let addr = srv.addrs()[0]; let srv_addr = srv.start(); - let _ = tx.send((addr, srv_addr)); + let _ = tx.send((addr, srv_addr, Arbiter::system())); }); }); - let (addr, srv_addr) = rx.recv().unwrap(); + let (addr, srv_addr, sys) = rx.recv().unwrap(); - let _sys = System::new("test-server"); let mut rt = Runtime::new().unwrap(); { let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) @@ -102,7 +101,7 @@ fn test_start() { // resume let _ = srv_addr.send(server::ResumeServer).wait(); - thread::sleep(time::Duration::from_millis(400)); + thread::sleep(time::Duration::from_millis(200)); { let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) .finish() @@ -110,6 +109,8 @@ fn test_start() { let response = rt.block_on(req.send()).unwrap(); assert!(response.status().is_success()); } + + let _ = sys.send(SystemExit(0)).wait(); } #[test] @@ -129,12 +130,11 @@ fn test_shutdown() { let srv = srv.bind("127.0.0.1:0").unwrap(); let addr = srv.addrs()[0]; let srv_addr = srv.shutdown_timeout(1).start(); - let _ = tx.send((addr, srv_addr)); + let _ = tx.send((addr, srv_addr, Arbiter::system())); }); }); - let (addr, srv_addr) = rx.recv().unwrap(); + let (addr, srv_addr, sys) = rx.recv().unwrap(); - let _sys = System::new("test-server"); let mut rt = Runtime::new().unwrap(); { let req = client::ClientRequest::get(format!("http://{}/", addr).as_str()) @@ -147,6 +147,8 @@ fn test_shutdown() { thread::sleep(time::Duration::from_millis(1000)); assert!(net::TcpStream::connect(addr).is_err()); + + let _ = sys.send(SystemExit(0)).wait(); } #[test]