1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-26 19:41:12 +00:00

add client response timeout

This commit is contained in:
Nikolay Kim 2018-03-06 17:04:48 -08:00
parent 5c88441cd7
commit 57a1d68f89
4 changed files with 70 additions and 30 deletions

View file

@ -11,7 +11,8 @@
* Add `ResponseError` impl for `SendRequestError`. * Add `ResponseError` impl for `SendRequestError`.
This improves ergonomics of http client. This improves ergonomics of http client.
* Allow connection timeout to be set * Allow client connection timeout to be set #108
## 0.4.4 (2018-03-04) ## 0.4.4 (2018-03-04)

View file

@ -28,7 +28,7 @@ use server::IoStream;
/// with connection request. /// with connection request.
pub struct Connect { pub struct Connect {
pub uri: Uri, pub uri: Uri,
pub connection_timeout: Duration pub conn_timeout: Duration,
} }
impl Connect { impl Connect {
@ -36,7 +36,7 @@ impl Connect {
pub fn new<U>(uri: U) -> Result<Connect, HttpError> where Uri: HttpTryFrom<U> { pub fn new<U>(uri: U) -> Result<Connect, HttpError> where Uri: HttpTryFrom<U> {
Ok(Connect { Ok(Connect {
uri: Uri::try_from(uri).map_err(|e| e.into())?, uri: Uri::try_from(uri).map_err(|e| e.into())?,
connection_timeout: Duration::from_secs(1) conn_timeout: Duration::from_secs(1)
}) })
} }
} }
@ -167,7 +167,7 @@ impl Handler<Connect> for ClientConnector {
fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result {
let uri = &msg.uri; let uri = &msg.uri;
let connection_timeout = msg.connection_timeout; let conn_timeout = msg.conn_timeout;
// host name is required // host name is required
if uri.host().is_none() { if uri.host().is_none() {
@ -193,7 +193,8 @@ impl Handler<Connect> for ClientConnector {
ActorResponse::async( ActorResponse::async(
Connector::from_registry() Connector::from_registry()
.send(ResolveConnect::host_and_port(&host, port).timeout(connection_timeout)) .send(ResolveConnect::host_and_port(&host, port)
.timeout(conn_timeout))
.into_actor(self) .into_actor(self)
.map_err(|_, _, _| ClientConnectorError::Disconnected) .map_err(|_, _, _| ClientConnectorError::Disconnected)
.and_then(move |res, _act, _| { .and_then(move |res, _act, _| {

View file

@ -1,8 +1,10 @@
use std::{io, mem}; use std::{io, mem};
use std::time::Duration;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use http::header::CONTENT_ENCODING; use http::header::CONTENT_ENCODING;
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use futures::unsync::oneshot; use futures::unsync::oneshot;
use tokio_core::reactor::Timeout;
use actix::prelude::*; use actix::prelude::*;
@ -23,6 +25,9 @@ use super::{HttpResponseParser, HttpResponseParserError};
/// A set of errors that can occur during sending request and reading response /// A set of errors that can occur during sending request and reading response
#[derive(Fail, Debug)] #[derive(Fail, Debug)]
pub enum SendRequestError { pub enum SendRequestError {
/// Response took too long
#[fail(display = "Timeout out while waiting for response")]
Timeout,
/// Failed to connect to host /// Failed to connect to host
#[fail(display="Failed to connect to host: {}", _0)] #[fail(display="Failed to connect to host: {}", _0)]
Connector(#[cause] ClientConnectorError), Connector(#[cause] ClientConnectorError),
@ -40,6 +45,15 @@ impl From<io::Error> for SendRequestError {
} }
} }
impl From<ClientConnectorError> for SendRequestError {
fn from(err: ClientConnectorError) -> SendRequestError {
match err {
ClientConnectorError::Timeout => SendRequestError::Timeout,
_ => SendRequestError::Connector(err),
}
}
}
enum State { enum State {
New, New,
Connect(actix::dev::Request<Unsync, ClientConnector, Connect>), Connect(actix::dev::Request<Unsync, ClientConnector, Connect>),
@ -54,6 +68,8 @@ pub struct SendRequest {
req: ClientRequest, req: ClientRequest,
state: State, state: State,
conn: Addr<Unsync, ClientConnector>, conn: Addr<Unsync, ClientConnector>,
conn_timeout: Duration,
timeout: Option<Timeout>,
} }
impl SendRequest { impl SendRequest {
@ -64,15 +80,53 @@ impl SendRequest {
pub(crate) fn with_connector(req: ClientRequest, conn: Addr<Unsync, ClientConnector>) pub(crate) fn with_connector(req: ClientRequest, conn: Addr<Unsync, ClientConnector>)
-> SendRequest -> SendRequest
{ {
SendRequest{req, conn, state: State::New} SendRequest{req, conn,
state: State::New,
timeout: None,
conn_timeout: Duration::from_secs(1)
}
} }
pub(crate) fn with_connection(req: ClientRequest, conn: Connection) -> SendRequest pub(crate) fn with_connection(req: ClientRequest, conn: Connection) -> SendRequest
{ {
SendRequest{ SendRequest{req,
req,
state: State::Connection(conn), state: State::Connection(conn),
conn: ClientConnector::from_registry()} conn: ClientConnector::from_registry(),
timeout: None,
conn_timeout: Duration::from_secs(1),
}
}
/// Set request timeout
///
/// Request timeout is a total time before response should be received.
/// Default value is 5 seconds.
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(Timeout::new(timeout, Arbiter::handle()).unwrap());
self
}
/// Set connection timeout
///
/// Connection timeout includes resolving hostname and actual connection to
/// the host.
/// Default value is 1 second.
pub fn conn_timeout(mut self, timeout: Duration) -> Self {
self.conn_timeout = timeout;
self
}
fn poll_timeout(&mut self) -> Poll<(), SendRequestError> {
if self.timeout.is_none() {
self.timeout = Some(Timeout::new(
Duration::from_secs(5), Arbiter::handle()).unwrap());
}
match self.timeout.as_mut().unwrap().poll() {
Ok(Async::Ready(())) => Err(SendRequestError::Timeout),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => unreachable!()
}
} }
} }
@ -81,6 +135,8 @@ impl Future for SendRequest {
type Error = SendRequestError; type Error = SendRequestError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_timeout()?;
loop { loop {
let state = mem::replace(&mut self.state, State::None); let state = mem::replace(&mut self.state, State::None);
@ -88,7 +144,7 @@ impl Future for SendRequest {
State::New => State::New =>
self.state = State::Connect(self.conn.send(Connect { self.state = State::Connect(self.conn.send(Connect {
uri: self.req.uri().clone(), uri: self.req.uri().clone(),
connection_timeout: self.req.connection_timeout() conn_timeout: self.conn_timeout,
})), })),
State::Connect(mut conn) => match conn.poll() { State::Connect(mut conn) => match conn.poll() {
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
@ -99,7 +155,7 @@ impl Future for SendRequest {
Ok(stream) => { Ok(stream) => {
self.state = State::Connection(stream) self.state = State::Connection(stream)
}, },
Err(err) => return Err(SendRequestError::Connector(err)), Err(err) => return Err(err.into()),
}, },
Err(_) => return Err(SendRequestError::Connector( Err(_) => return Err(SendRequestError::Connector(
ClientConnectorError::Disconnected)) ClientConnectorError::Disconnected))

View file

@ -1,6 +1,5 @@
use std::{fmt, mem}; use std::{fmt, mem};
use std::io::Write; use std::io::Write;
use std::time::Duration;
use actix::{Addr, Unsync}; use actix::{Addr, Unsync};
use cookie::{Cookie, CookieJar}; use cookie::{Cookie, CookieJar};
@ -29,8 +28,6 @@ pub struct ClientRequest {
response_decompress: bool, response_decompress: bool,
buffer_capacity: Option<(usize, usize)>, buffer_capacity: Option<(usize, usize)>,
conn: ConnectionType, conn: ConnectionType,
connection_timeout: Duration
} }
enum ConnectionType { enum ConnectionType {
@ -54,7 +51,6 @@ impl Default for ClientRequest {
response_decompress: true, response_decompress: true,
buffer_capacity: None, buffer_capacity: None,
conn: ConnectionType::Default, conn: ConnectionType::Default,
connection_timeout: Duration::from_secs(1)
} }
} }
} }
@ -115,11 +111,6 @@ impl ClientRequest {
&self.uri &self.uri
} }
#[inline]
pub fn connection_timeout(&self) -> Duration {
self.connection_timeout
}
/// Set client request uri /// Set client request uri
#[inline] #[inline]
pub fn set_uri(&mut self, uri: Uri) { pub fn set_uri(&mut self, uri: Uri) {
@ -406,15 +397,6 @@ impl ClientRequestBuilder {
self self
} }
/// Set connection timeout
#[inline]
pub fn connection_timeout(&mut self, connection_timeout: Duration) -> &mut Self {
if let Some(ref mut request) = self.request {
request.connection_timeout = connection_timeout;
}
self
}
/// Set request's content type /// Set request's content type
#[inline] #[inline]
pub fn content_type<V>(&mut self, value: V) -> &mut Self pub fn content_type<V>(&mut self, value: V) -> &mut Self