1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-21 23:56:35 +00:00

ClientBody is not needed

This commit is contained in:
Nikolay Kim 2018-06-19 10:15:16 +06:00
parent 362b14c2f7
commit 247e8727cb
7 changed files with 88 additions and 147 deletions

View file

@ -1,94 +0,0 @@
use std::fmt;
use bytes::Bytes;
use futures::Stream;
use body::Binary;
use context::ActorHttpContext;
use error::Error;
/// Type represent streaming body
pub type ClientBodyStream = Box<Stream<Item = Bytes, Error = Error> + Send>;
/// Represents various types of http message body.
pub enum ClientBody {
/// Empty response. `Content-Length` header is set to `0`
Empty,
/// Specific response body.
Binary(Binary),
/// Unspecified streaming response. Developer is responsible for setting
/// right `Content-Length` or `Transfer-Encoding` headers.
Streaming(ClientBodyStream),
/// Special body type for actor response.
Actor(Box<ActorHttpContext + Send>),
}
impl ClientBody {
/// Does this body streaming.
#[inline]
pub fn is_streaming(&self) -> bool {
match *self {
ClientBody::Streaming(_) | ClientBody::Actor(_) => true,
_ => false,
}
}
/// Is this binary body.
#[inline]
pub fn is_binary(&self) -> bool {
match *self {
ClientBody::Binary(_) => true,
_ => false,
}
}
/// Is this binary empy.
#[inline]
pub fn is_empty(&self) -> bool {
match *self {
ClientBody::Empty => true,
_ => false,
}
}
/// Create body from slice (copy)
pub fn from_slice(s: &[u8]) -> ClientBody {
ClientBody::Binary(Binary::Bytes(Bytes::from(s)))
}
}
impl PartialEq for ClientBody {
fn eq(&self, other: &ClientBody) -> bool {
match *self {
ClientBody::Empty => match *other {
ClientBody::Empty => true,
_ => false,
},
ClientBody::Binary(ref b) => match *other {
ClientBody::Binary(ref b2) => b == b2,
_ => false,
},
ClientBody::Streaming(_) | ClientBody::Actor(_) => false,
}
}
}
impl fmt::Debug for ClientBody {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
ClientBody::Empty => write!(f, "ClientBody::Empty"),
ClientBody::Binary(ref b) => write!(f, "ClientBody::Binary({:?})", b),
ClientBody::Streaming(_) => write!(f, "ClientBody::Streaming(_)"),
ClientBody::Actor(_) => write!(f, "ClientBody::Actor(_)"),
}
}
}
impl<T> From<T> for ClientBody
where
T: Into<Binary>,
{
fn from(b: T) -> ClientBody {
ClientBody::Binary(b.into())
}
}

View file

@ -24,7 +24,6 @@
//! ); //! );
//! } //! }
//! ``` //! ```
mod body;
mod connector; mod connector;
mod parser; mod parser;
mod pipeline; mod pipeline;
@ -32,7 +31,6 @@ mod request;
mod response; mod response;
mod writer; mod writer;
pub use self::body::{ClientBody, ClientBodyStream};
pub use self::connector::{ pub use self::connector::{
ClientConnector, ClientConnectorError, ClientConnectorStats, Connect, Connection, ClientConnector, ClientConnectorError, ClientConnectorStats, Connect, Connection,
Pause, Resume, Pause, Resume,
@ -71,7 +69,9 @@ impl ResponseError for SendRequestError {
/// use actix_web::client; /// use actix_web::client;
/// ///
/// fn main() { /// fn main() {
/// tokio::run( /// let mut sys = actix_web::actix::System::new("test");
///
/// sys.block_on(
/// client::get("http://www.rust-lang.org") // <- Create request builder /// client::get("http://www.rust-lang.org") // <- Create request builder
/// .header("User-Agent", "Actix-web") /// .header("User-Agent", "Actix-web")
/// .finish().unwrap() /// .finish().unwrap()
@ -79,7 +79,6 @@ impl ResponseError for SendRequestError {
/// .map_err(|_| ()) /// .map_err(|_| ())
/// .and_then(|response| { // <- server http response /// .and_then(|response| { // <- server http response
/// println!("Response: {:?}", response); /// println!("Response: {:?}", response);
/// # process::exit(0);
/// Ok(()) /// Ok(())
/// }), /// }),
/// ); /// );

View file

@ -9,10 +9,11 @@ use tokio_timer::Delay;
use actix::{Addr, Request, SystemService}; use actix::{Addr, Request, SystemService};
use super::{ use super::{
ClientBody, ClientBodyStream, ClientConnector, ClientConnectorError, ClientRequest, ClientConnector, ClientConnectorError, ClientRequest, ClientResponse, Connect,
ClientResponse, Connect, Connection, HttpClientWriter, HttpResponseParser, Connection, HttpClientWriter, HttpResponseParser, HttpResponseParserError,
HttpResponseParserError,
}; };
use body::{Body, BodyStream};
use context::{ActorHttpContext, Frame};
use error::Error; use error::Error;
use error::PayloadError; use error::PayloadError;
use header::ContentEncoding; use header::ContentEncoding;
@ -177,9 +178,9 @@ impl Future for SendRequest {
let mut writer = HttpClientWriter::new(); let mut writer = HttpClientWriter::new();
writer.start(&mut self.req)?; writer.start(&mut self.req)?;
let body = match self.req.replace_body(ClientBody::Empty) { let body = match self.req.replace_body(Body::Empty) {
ClientBody::Streaming(stream) => IoBody::Payload(stream), Body::Streaming(stream) => IoBody::Payload(stream),
ClientBody::Actor(_) => panic!("Client actor is not supported"), Body::Actor(ctx) => IoBody::Actor(ctx),
_ => IoBody::Done, _ => IoBody::Done,
}; };
@ -245,7 +246,8 @@ pub(crate) struct Pipeline {
} }
enum IoBody { enum IoBody {
Payload(ClientBodyStream), Payload(BodyStream),
Actor(Box<ActorHttpContext>),
Done, Done,
} }
@ -405,24 +407,67 @@ impl Pipeline {
let mut done = false; let mut done = false;
if self.drain.is_none() && self.write_state != RunningState::Paused { if self.drain.is_none() && self.write_state != RunningState::Paused {
loop { 'outter: loop {
let result = match mem::replace(&mut self.body, IoBody::Done) { let result = match mem::replace(&mut self.body, IoBody::Done) {
IoBody::Payload(mut stream) => match stream.poll()? { IoBody::Payload(mut body) => match body.poll()? {
Async::Ready(None) => { Async::Ready(None) => {
self.writer.write_eof()?; self.writer.write_eof()?;
self.body_completed = true; self.body_completed = true;
break; break;
} }
Async::Ready(Some(chunk)) => { Async::Ready(Some(chunk)) => {
self.body = IoBody::Payload(stream); self.body = IoBody::Payload(body);
self.writer.write(chunk.as_ref())? self.writer.write(chunk.as_ref())?
} }
Async::NotReady => { Async::NotReady => {
done = true; done = true;
self.body = IoBody::Payload(stream); self.body = IoBody::Payload(body);
break; break;
} }
}, },
IoBody::Actor(mut ctx) => {
if self.disconnected {
ctx.disconnected();
}
match ctx.poll()? {
Async::Ready(Some(vec)) => {
if vec.is_empty() {
self.body = IoBody::Actor(ctx);
break;
}
let mut res = None;
for frame in vec {
match frame {
Frame::Chunk(None) => {
self.body_completed = true;
self.writer.write_eof()?;
break 'outter;
}
Frame::Chunk(Some(chunk)) => {
res =
Some(self.writer.write(chunk.as_ref())?)
}
Frame::Drain(fut) => self.drain = Some(fut),
}
}
self.body = IoBody::Actor(ctx);
if self.drain.is_some() {
self.write_state.resume();
break;
}
res.unwrap()
}
Async::Ready(None) => {
done = true;
break;
}
Async::NotReady => {
done = true;
self.body = IoBody::Actor(ctx);
break;
}
}
}
IoBody::Done => { IoBody::Done => {
self.body_completed = true; self.body_completed = true;
done = true; done = true;

View file

@ -13,9 +13,9 @@ use serde_json;
use serde_urlencoded; use serde_urlencoded;
use url::Url; use url::Url;
use super::body::ClientBody;
use super::connector::{ClientConnector, Connection}; use super::connector::{ClientConnector, Connection};
use super::pipeline::SendRequest; use super::pipeline::SendRequest;
use body::Body;
use error::Error; use error::Error;
use header::{ContentEncoding, Header, IntoHeaderValue}; use header::{ContentEncoding, Header, IntoHeaderValue};
use http::header::{self, HeaderName, HeaderValue}; use http::header::{self, HeaderName, HeaderValue};
@ -34,7 +34,9 @@ use httprequest::HttpRequest;
/// use actix_web::client::ClientRequest; /// use actix_web::client::ClientRequest;
/// ///
/// fn main() { /// fn main() {
/// tokio::run( /// let mut sys = actix_web::actix::System::new("test");
///
/// sys.block_on(
/// ClientRequest::get("http://www.rust-lang.org") // <- Create request builder /// ClientRequest::get("http://www.rust-lang.org") // <- Create request builder
/// .header("User-Agent", "Actix-web") /// .header("User-Agent", "Actix-web")
/// .finish().unwrap() /// .finish().unwrap()
@ -42,7 +44,6 @@ use httprequest::HttpRequest;
/// .map_err(|_| ()) /// .map_err(|_| ())
/// .and_then(|response| { // <- server http response /// .and_then(|response| { // <- server http response
/// println!("Response: {:?}", response); /// println!("Response: {:?}", response);
/// # process::exit(0);
/// Ok(()) /// Ok(())
/// }), /// }),
/// ); /// );
@ -53,7 +54,7 @@ pub struct ClientRequest {
method: Method, method: Method,
version: Version, version: Version,
headers: HeaderMap, headers: HeaderMap,
body: ClientBody, body: Body,
chunked: bool, chunked: bool,
upgrade: bool, upgrade: bool,
timeout: Option<Duration>, timeout: Option<Duration>,
@ -76,7 +77,7 @@ impl Default for ClientRequest {
method: Method::default(), method: Method::default(),
version: Version::HTTP_11, version: Version::HTTP_11,
headers: HeaderMap::with_capacity(16), headers: HeaderMap::with_capacity(16),
body: ClientBody::Empty, body: Body::Empty,
chunked: false, chunked: false,
upgrade: false, upgrade: false,
timeout: None, timeout: None,
@ -220,17 +221,17 @@ impl ClientRequest {
/// Get body of this response /// Get body of this response
#[inline] #[inline]
pub fn body(&self) -> &ClientBody { pub fn body(&self) -> &Body {
&self.body &self.body
} }
/// Set a body /// Set a body
pub fn set_body<B: Into<ClientBody>>(&mut self, body: B) { pub fn set_body<B: Into<Body>>(&mut self, body: B) {
self.body = body.into(); self.body = body.into();
} }
/// Extract body, replace it with `Empty` /// Extract body, replace it with `Empty`
pub(crate) fn replace_body(&mut self, body: ClientBody) -> ClientBody { pub(crate) fn replace_body(&mut self, body: Body) -> Body {
mem::replace(&mut self.body, body) mem::replace(&mut self.body, body)
} }
@ -585,9 +586,7 @@ impl ClientRequestBuilder {
/// Set a body and generate `ClientRequest`. /// Set a body and generate `ClientRequest`.
/// ///
/// `ClientRequestBuilder` can not be used after this call. /// `ClientRequestBuilder` can not be used after this call.
pub fn body<B: Into<ClientBody>>( pub fn body<B: Into<Body>>(&mut self, body: B) -> Result<ClientRequest, Error> {
&mut self, body: B,
) -> Result<ClientRequest, Error> {
if let Some(e) = self.err.take() { if let Some(e) = self.err.take() {
return Err(e.into()); return Err(e.into());
} }
@ -683,19 +682,17 @@ impl ClientRequestBuilder {
/// `ClientRequestBuilder` can not be used after this call. /// `ClientRequestBuilder` can not be used after this call.
pub fn streaming<S, E>(&mut self, stream: S) -> Result<ClientRequest, Error> pub fn streaming<S, E>(&mut self, stream: S) -> Result<ClientRequest, Error>
where where
S: Stream<Item = Bytes, Error = E> + Send + 'static, S: Stream<Item = Bytes, Error = E> + 'static,
E: Into<Error>, E: Into<Error>,
{ {
self.body(ClientBody::Streaming(Box::new( self.body(Body::Streaming(Box::new(stream.map_err(|e| e.into()))))
stream.map_err(|e| e.into()),
)))
} }
/// Set an empty body and generate `ClientRequest` /// Set an empty body and generate `ClientRequest`
/// ///
/// `ClientRequestBuilder` can not be used after this call. /// `ClientRequestBuilder` can not be used after this call.
pub fn finish(&mut self) -> Result<ClientRequest, Error> { pub fn finish(&mut self) -> Result<ClientRequest, Error> {
self.body(ClientBody::Empty) self.body(Body::Empty)
} }
/// This method construct new `ClientRequestBuilder` /// This method construct new `ClientRequestBuilder`

View file

@ -19,12 +19,12 @@ use http::{HttpTryFrom, Version};
use time::{self, Duration}; use time::{self, Duration};
use tokio_io::AsyncWrite; use tokio_io::AsyncWrite;
use body::Binary; use body::{Binary, Body};
use header::ContentEncoding; use header::ContentEncoding;
use server::encoding::{ContentEncoder, TransferEncoding}; use server::encoding::{ContentEncoder, TransferEncoding};
use server::WriterState; use server::WriterState;
use client::{ClientBody, ClientRequest}; use client::ClientRequest;
const AVERAGE_HEADER_SIZE: usize = 30; const AVERAGE_HEADER_SIZE: usize = 30;
@ -133,7 +133,7 @@ impl HttpClientWriter {
).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; ).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// write headers // write headers
if let ClientBody::Binary(ref bytes) = *msg.body() { if let Body::Binary(ref bytes) = *msg.body() {
self.buffer self.buffer
.reserve(msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len()); .reserve(msg.headers().len() * AVERAGE_HEADER_SIZE + bytes.len());
} else { } else {
@ -162,7 +162,7 @@ impl HttpClientWriter {
self.headers_size = self.buffer.len() as u32; self.headers_size = self.buffer.len() as u32;
if msg.body().is_binary() { if msg.body().is_binary() {
if let ClientBody::Binary(bytes) = msg.replace_body(ClientBody::Empty) { if let Body::Binary(bytes) = msg.replace_body(Body::Empty) {
self.written += bytes.len() as u64; self.written += bytes.len() as u64;
self.encoder.write(bytes.as_ref())?; self.encoder.write(bytes.as_ref())?;
} }
@ -223,15 +223,15 @@ impl HttpClientWriter {
fn content_encoder(buf: &mut BytesMut, req: &mut ClientRequest) -> ContentEncoder { fn content_encoder(buf: &mut BytesMut, req: &mut ClientRequest) -> ContentEncoder {
let version = req.version(); let version = req.version();
let mut body = req.replace_body(ClientBody::Empty); let mut body = req.replace_body(Body::Empty);
let mut encoding = req.content_encoding(); let mut encoding = req.content_encoding();
let mut transfer = match body { let mut transfer = match body {
ClientBody::Empty => { Body::Empty => {
req.headers_mut().remove(CONTENT_LENGTH); req.headers_mut().remove(CONTENT_LENGTH);
TransferEncoding::length(0) TransferEncoding::length(0)
} }
ClientBody::Binary(ref mut bytes) => { Body::Binary(ref mut bytes) => {
if encoding.is_compression() { if encoding.is_compression() {
let mut tmp = BytesMut::new(); let mut tmp = BytesMut::new();
let mut transfer = TransferEncoding::eof(); let mut transfer = TransferEncoding::eof();
@ -270,7 +270,7 @@ fn content_encoder(buf: &mut BytesMut, req: &mut ClientRequest) -> ContentEncode
.insert(CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap()); .insert(CONTENT_LENGTH, HeaderValue::try_from(b.freeze()).unwrap());
TransferEncoding::eof() TransferEncoding::eof()
} }
ClientBody::Streaming(_) | ClientBody::Actor(_) => { Body::Streaming(_) | Body::Actor(_) => {
if req.upgrade() { if req.upgrade() {
if version == Version::HTTP_2 { if version == Version::HTTP_2 {
error!("Connection upgrade is forbidden for HTTP/2"); error!("Connection upgrade is forbidden for HTTP/2");

View file

@ -16,14 +16,14 @@ use sha1::Sha1;
use actix::{Addr, SystemService}; use actix::{Addr, SystemService};
use body::Binary; use body::{Binary, Body};
use error::{Error, UrlParseError}; use error::{Error, UrlParseError};
use header::IntoHeaderValue; use header::IntoHeaderValue;
use httpmessage::HttpMessage; use httpmessage::HttpMessage;
use payload::PayloadHelper; use payload::PayloadHelper;
use client::{ use client::{
ClientBody, ClientConnector, ClientRequest, ClientRequestBuilder, ClientResponse, ClientConnector, ClientRequest, ClientRequestBuilder, ClientResponse,
HttpResponseParserError, SendRequest, SendRequestError, HttpResponseParserError, SendRequest, SendRequestError,
}; };
@ -297,7 +297,7 @@ impl ClientHandshake {
); );
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
request.set_body(ClientBody::Streaming(Box::new(rx.map_err(|_| { request.set_body(Body::Streaming(Box::new(rx.map_err(|_| {
io::Error::new(io::ErrorKind::Other, "disconnected").into() io::Error::new(io::ErrorKind::Other, "disconnected").into()
})))); }))));

View file

@ -343,10 +343,7 @@ fn test_client_streaming_explicit() {
let body = once(Ok(Bytes::from_static(STR.as_ref()))); let body = once(Ok(Bytes::from_static(STR.as_ref())));
let request = srv let request = srv.get().body(Body::Streaming(Box::new(body))).unwrap();
.get()
.body(client::ClientBody::Streaming(Box::new(body)))
.unwrap();
let response = srv.execute(request.send()).unwrap(); let response = srv.execute(request.send()).unwrap();
assert!(response.status().is_success()); assert!(response.status().is_success());
@ -446,10 +443,7 @@ fn test_default_headers() {
"\"" "\""
))); )));
let request_override = srv.get() let request_override = srv.get().header("User-Agent", "test").finish().unwrap();
.header("User-Agent", "test")
.finish()
.unwrap();
let repr_override = format!("{:?}", request_override); let repr_override = format!("{:?}", request_override);
assert!(repr_override.contains("\"user-agent\": \"test\"")); assert!(repr_override.contains("\"user-agent\": \"test\""));
assert!(!repr_override.contains(concat!( assert!(!repr_override.contains(concat!(