From 247e8727cbaa12742a95bea56319812888a4bb75 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 19 Jun 2018 10:15:16 +0600 Subject: [PATCH] ClientBody is not needed --- src/client/body.rs | 94 ------------------------------------------ src/client/mod.rs | 7 ++-- src/client/pipeline.rs | 67 +++++++++++++++++++++++++----- src/client/request.rs | 35 +++++++--------- src/client/writer.rs | 16 +++---- src/ws/client.rs | 6 +-- tests/test_client.rs | 10 +---- 7 files changed, 88 insertions(+), 147 deletions(-) delete mode 100644 src/client/body.rs diff --git a/src/client/body.rs b/src/client/body.rs deleted file mode 100644 index c79d7ad5e..000000000 --- a/src/client/body.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::fmt; - -use bytes::Bytes; -use futures::Stream; - -use body::Binary; -use context::ActorHttpContext; -use error::Error; - -/// Type represent streaming body -pub type ClientBodyStream = Box + Send>; - -/// Represents various types of http message body. -pub enum ClientBody { - /// Empty response. `Content-Length` header is set to `0` - Empty, - /// Specific response body. - Binary(Binary), - /// Unspecified streaming response. Developer is responsible for setting - /// right `Content-Length` or `Transfer-Encoding` headers. - Streaming(ClientBodyStream), - /// Special body type for actor response. - Actor(Box), -} - -impl ClientBody { - /// Does this body streaming. - #[inline] - pub fn is_streaming(&self) -> bool { - match *self { - ClientBody::Streaming(_) | ClientBody::Actor(_) => true, - _ => false, - } - } - - /// Is this binary body. - #[inline] - pub fn is_binary(&self) -> bool { - match *self { - ClientBody::Binary(_) => true, - _ => false, - } - } - - /// Is this binary empy. - #[inline] - pub fn is_empty(&self) -> bool { - match *self { - ClientBody::Empty => true, - _ => false, - } - } - - /// Create body from slice (copy) - pub fn from_slice(s: &[u8]) -> ClientBody { - ClientBody::Binary(Binary::Bytes(Bytes::from(s))) - } -} - -impl PartialEq for ClientBody { - fn eq(&self, other: &ClientBody) -> bool { - match *self { - ClientBody::Empty => match *other { - ClientBody::Empty => true, - _ => false, - }, - ClientBody::Binary(ref b) => match *other { - ClientBody::Binary(ref b2) => b == b2, - _ => false, - }, - ClientBody::Streaming(_) | ClientBody::Actor(_) => false, - } - } -} - -impl fmt::Debug for ClientBody { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - ClientBody::Empty => write!(f, "ClientBody::Empty"), - ClientBody::Binary(ref b) => write!(f, "ClientBody::Binary({:?})", b), - ClientBody::Streaming(_) => write!(f, "ClientBody::Streaming(_)"), - ClientBody::Actor(_) => write!(f, "ClientBody::Actor(_)"), - } - } -} - -impl From for ClientBody -where - T: Into, -{ - fn from(b: T) -> ClientBody { - ClientBody::Binary(b.into()) - } -} diff --git a/src/client/mod.rs b/src/client/mod.rs index b40fa2ece..bbc2d5f3f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -24,7 +24,6 @@ //! ); //! } //! ``` -mod body; mod connector; mod parser; mod pipeline; @@ -32,7 +31,6 @@ mod request; mod response; mod writer; -pub use self::body::{ClientBody, ClientBodyStream}; pub use self::connector::{ ClientConnector, ClientConnectorError, ClientConnectorStats, Connect, Connection, Pause, Resume, @@ -71,7 +69,9 @@ impl ResponseError for SendRequestError { /// use actix_web::client; /// /// fn main() { -/// tokio::run( +/// let mut sys = actix_web::actix::System::new("test"); +/// +/// sys.block_on( /// client::get("http://www.rust-lang.org") // <- Create request builder /// .header("User-Agent", "Actix-web") /// .finish().unwrap() @@ -79,7 +79,6 @@ impl ResponseError for SendRequestError { /// .map_err(|_| ()) /// .and_then(|response| { // <- server http response /// println!("Response: {:?}", response); -/// # process::exit(0); /// Ok(()) /// }), /// ); diff --git a/src/client/pipeline.rs b/src/client/pipeline.rs index de1ff1287..76075b52c 100644 --- a/src/client/pipeline.rs +++ b/src/client/pipeline.rs @@ -9,10 +9,11 @@ use tokio_timer::Delay; use actix::{Addr, Request, SystemService}; use super::{ - ClientBody, ClientBodyStream, ClientConnector, ClientConnectorError, ClientRequest, - ClientResponse, Connect, Connection, HttpClientWriter, HttpResponseParser, - HttpResponseParserError, + ClientConnector, ClientConnectorError, ClientRequest, ClientResponse, Connect, + Connection, HttpClientWriter, HttpResponseParser, HttpResponseParserError, }; +use body::{Body, BodyStream}; +use context::{ActorHttpContext, Frame}; use error::Error; use error::PayloadError; use header::ContentEncoding; @@ -177,9 +178,9 @@ impl Future for SendRequest { let mut writer = HttpClientWriter::new(); writer.start(&mut self.req)?; - let body = match self.req.replace_body(ClientBody::Empty) { - ClientBody::Streaming(stream) => IoBody::Payload(stream), - ClientBody::Actor(_) => panic!("Client actor is not supported"), + let body = match self.req.replace_body(Body::Empty) { + Body::Streaming(stream) => IoBody::Payload(stream), + Body::Actor(ctx) => IoBody::Actor(ctx), _ => IoBody::Done, }; @@ -245,7 +246,8 @@ pub(crate) struct Pipeline { } enum IoBody { - Payload(ClientBodyStream), + Payload(BodyStream), + Actor(Box), Done, } @@ -405,24 +407,67 @@ impl Pipeline { let mut done = false; if self.drain.is_none() && self.write_state != RunningState::Paused { - loop { + 'outter: loop { let result = match mem::replace(&mut self.body, IoBody::Done) { - IoBody::Payload(mut stream) => match stream.poll()? { + IoBody::Payload(mut body) => match body.poll()? { Async::Ready(None) => { self.writer.write_eof()?; self.body_completed = true; break; } Async::Ready(Some(chunk)) => { - self.body = IoBody::Payload(stream); + self.body = IoBody::Payload(body); self.writer.write(chunk.as_ref())? } Async::NotReady => { done = true; - self.body = IoBody::Payload(stream); + self.body = IoBody::Payload(body); break; } }, + IoBody::Actor(mut ctx) => { + if self.disconnected { + ctx.disconnected(); + } + match ctx.poll()? { + Async::Ready(Some(vec)) => { + if vec.is_empty() { + self.body = IoBody::Actor(ctx); + break; + } + let mut res = None; + for frame in vec { + match frame { + Frame::Chunk(None) => { + self.body_completed = true; + self.writer.write_eof()?; + break 'outter; + } + Frame::Chunk(Some(chunk)) => { + res = + Some(self.writer.write(chunk.as_ref())?) + } + Frame::Drain(fut) => self.drain = Some(fut), + } + } + self.body = IoBody::Actor(ctx); + if self.drain.is_some() { + self.write_state.resume(); + break; + } + res.unwrap() + } + Async::Ready(None) => { + done = true; + break; + } + Async::NotReady => { + done = true; + self.body = IoBody::Actor(ctx); + break; + } + } + } IoBody::Done => { self.body_completed = true; done = true; diff --git a/src/client/request.rs b/src/client/request.rs index bc8feb3e7..d82aa6063 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -13,9 +13,9 @@ use serde_json; use serde_urlencoded; use url::Url; -use super::body::ClientBody; use super::connector::{ClientConnector, Connection}; use super::pipeline::SendRequest; +use body::Body; use error::Error; use header::{ContentEncoding, Header, IntoHeaderValue}; use http::header::{self, HeaderName, HeaderValue}; @@ -34,7 +34,9 @@ use httprequest::HttpRequest; /// use actix_web::client::ClientRequest; /// /// fn main() { -/// tokio::run( +/// let mut sys = actix_web::actix::System::new("test"); +/// +/// sys.block_on( /// ClientRequest::get("http://www.rust-lang.org") // <- Create request builder /// .header("User-Agent", "Actix-web") /// .finish().unwrap() @@ -42,7 +44,6 @@ use httprequest::HttpRequest; /// .map_err(|_| ()) /// .and_then(|response| { // <- server http response /// println!("Response: {:?}", response); -/// # process::exit(0); /// Ok(()) /// }), /// ); @@ -53,7 +54,7 @@ pub struct ClientRequest { method: Method, version: Version, headers: HeaderMap, - body: ClientBody, + body: Body, chunked: bool, upgrade: bool, timeout: Option, @@ -76,7 +77,7 @@ impl Default for ClientRequest { method: Method::default(), version: Version::HTTP_11, headers: HeaderMap::with_capacity(16), - body: ClientBody::Empty, + body: Body::Empty, chunked: false, upgrade: false, timeout: None, @@ -220,17 +221,17 @@ impl ClientRequest { /// Get body of this response #[inline] - pub fn body(&self) -> &ClientBody { + pub fn body(&self) -> &Body { &self.body } /// Set a body - pub fn set_body>(&mut self, body: B) { + pub fn set_body>(&mut self, body: B) { self.body = body.into(); } /// Extract body, replace it with `Empty` - pub(crate) fn replace_body(&mut self, body: ClientBody) -> ClientBody { + pub(crate) fn replace_body(&mut self, body: Body) -> Body { mem::replace(&mut self.body, body) } @@ -585,9 +586,7 @@ impl ClientRequestBuilder { /// Set a body and generate `ClientRequest`. /// /// `ClientRequestBuilder` can not be used after this call. - pub fn body>( - &mut self, body: B, - ) -> Result { + pub fn body>(&mut self, body: B) -> Result { if let Some(e) = self.err.take() { return Err(e.into()); } @@ -659,13 +658,13 @@ impl ClientRequestBuilder { self.body(body) } - + /// Set a urlencoded body and generate `ClientRequest` /// /// `ClientRequestBuilder` can not be used after this call. pub fn form(&mut self, value: T) -> Result { let body = serde_urlencoded::to_string(&value)?; - + let contains = if let Some(parts) = parts(&mut self.request, &self.err) { parts.headers.contains_key(header::CONTENT_TYPE) } else { @@ -674,7 +673,7 @@ impl ClientRequestBuilder { if !contains { self.header(header::CONTENT_TYPE, "application/x-www-form-urlencoded"); } - + self.body(body) } @@ -683,19 +682,17 @@ impl ClientRequestBuilder { /// `ClientRequestBuilder` can not be used after this call. pub fn streaming(&mut self, stream: S) -> Result where - S: Stream + Send + 'static, + S: Stream + 'static, E: Into, { - self.body(ClientBody::Streaming(Box::new( - stream.map_err(|e| e.into()), - ))) + self.body(Body::Streaming(Box::new(stream.map_err(|e| e.into())))) } /// Set an empty body and generate `ClientRequest` /// /// `ClientRequestBuilder` can not be used after this call. pub fn finish(&mut self) -> Result { - self.body(ClientBody::Empty) + self.body(Body::Empty) } /// This method construct new `ClientRequestBuilder` diff --git a/src/client/writer.rs b/src/client/writer.rs index dfbce1f3e..f9961a79a 100644 --- a/src/client/writer.rs +++ b/src/client/writer.rs @@ -19,12 +19,12 @@ use http::{HttpTryFrom, Version}; use time::{self, Duration}; use tokio_io::AsyncWrite; -use body::Binary; +use body::{Binary, Body}; use header::ContentEncoding; use server::encoding::{ContentEncoder, TransferEncoding}; use server::WriterState; -use client::{ClientBody, ClientRequest}; +use client::ClientRequest; const AVERAGE_HEADER_SIZE: usize = 30; @@ -133,7 +133,7 @@ impl HttpClientWriter { ).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; // write headers - if let ClientBody::Binary(ref bytes) = *msg.body() { + if let Body::Binary(ref bytes) = *msg.body() { self.buffer .reserve(msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); } else { @@ -162,7 +162,7 @@ impl HttpClientWriter { self.headers_size = self.buffer.len() as u32; if msg.body().is_binary() { - if let ClientBody::Binary(bytes) = msg.replace_body(ClientBody::Empty) { + if let Body::Binary(bytes) = msg.replace_body(Body::Empty) { self.written += bytes.len() as u64; self.encoder.write(bytes.as_ref())?; } @@ -223,15 +223,15 @@ impl HttpClientWriter { fn content_encoder(buf: &mut BytesMut, req: &mut ClientRequest) -> ContentEncoder { let version = req.version(); - let mut body = req.replace_body(ClientBody::Empty); + let mut body = req.replace_body(Body::Empty); let mut encoding = req.content_encoding(); let mut transfer = match body { - ClientBody::Empty => { + Body::Empty => { req.headers_mut().remove(CONTENT_LENGTH); TransferEncoding::length(0) } - ClientBody::Binary(ref mut bytes) => { + Body::Binary(ref mut bytes) => { if encoding.is_compression() { let mut tmp = BytesMut::new(); let mut transfer = TransferEncoding::eof(); @@ -270,7 +270,7 @@ fn content_encoder(buf: &mut BytesMut, req: &mut ClientRequest) -> ContentEncode .insert(CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap()); TransferEncoding::eof() } - ClientBody::Streaming(_) | ClientBody::Actor(_) => { + Body::Streaming(_) | Body::Actor(_) => { if req.upgrade() { if version == Version::HTTP_2 { error!("Connection upgrade is forbidden for HTTP/2"); diff --git a/src/ws/client.rs b/src/ws/client.rs index 5087c885a..e9b7cf827 100644 --- a/src/ws/client.rs +++ b/src/ws/client.rs @@ -16,14 +16,14 @@ use sha1::Sha1; use actix::{Addr, SystemService}; -use body::Binary; +use body::{Binary, Body}; use error::{Error, UrlParseError}; use header::IntoHeaderValue; use httpmessage::HttpMessage; use payload::PayloadHelper; use client::{ - ClientBody, ClientConnector, ClientRequest, ClientRequestBuilder, ClientResponse, + ClientConnector, ClientRequest, ClientRequestBuilder, ClientResponse, HttpResponseParserError, SendRequest, SendRequestError, }; @@ -297,7 +297,7 @@ impl ClientHandshake { ); let (tx, rx) = unbounded(); - request.set_body(ClientBody::Streaming(Box::new(rx.map_err(|_| { + request.set_body(Body::Streaming(Box::new(rx.map_err(|_| { io::Error::new(io::ErrorKind::Other, "disconnected").into() })))); diff --git a/tests/test_client.rs b/tests/test_client.rs index 3128bb942..0d058c510 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -343,10 +343,7 @@ fn test_client_streaming_explicit() { let body = once(Ok(Bytes::from_static(STR.as_ref()))); - let request = srv - .get() - .body(client::ClientBody::Streaming(Box::new(body))) - .unwrap(); + let request = srv.get().body(Body::Streaming(Box::new(body))).unwrap(); let response = srv.execute(request.send()).unwrap(); assert!(response.status().is_success()); @@ -446,10 +443,7 @@ fn test_default_headers() { "\"" ))); - let request_override = srv.get() - .header("User-Agent", "test") - .finish() - .unwrap(); + let request_override = srv.get().header("User-Agent", "test").finish().unwrap(); let repr_override = format!("{:?}", request_override); assert!(repr_override.contains("\"user-agent\": \"test\"")); assert!(!repr_override.contains(concat!(