1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-05 14:58:44 +00:00

use server keep-alive timer as slow request timer

This commit is contained in:
Nikolay Kim 2018-09-15 09:55:38 -07:00
parent c3f8b5cf22
commit d65c72b44d
9 changed files with 81 additions and 30 deletions

View file

@ -1,5 +1,12 @@
# Changes # Changes
## [0.7.8] - 2018-09-xx
### Added
* Use server `Keep-Alive` setting as slow request timeout.
## [0.7.7] - 2018-09-11 ## [0.7.7] - 2018-09-11
### Fixed ### Fixed

View file

@ -821,11 +821,9 @@ mod tests {
scope scope
.route("/path1", Method::GET, |_: HttpRequest<_>| { .route("/path1", Method::GET, |_: HttpRequest<_>| {
HttpResponse::Ok() HttpResponse::Ok()
}).route( }).route("/path1", Method::DELETE, |_: HttpRequest<_>| {
"/path1", HttpResponse::Ok()
Method::DELETE, })
|_: HttpRequest<_>| HttpResponse::Ok(),
)
}).finish(); }).finish();
let req = TestRequest::with_uri("/app/path1").request(); let req = TestRequest::with_uri("/app/path1").request();

View file

@ -451,10 +451,13 @@ impl Accept {
Delay::new( Delay::new(
Instant::now() + Duration::from_millis(510), Instant::now() + Duration::from_millis(510),
).map_err(|_| ()) ).map_err(|_| ())
.and_then(move |_| { .and_then(
let _ = r.set_readiness(mio::Ready::readable()); move |_| {
let _ =
r.set_readiness(mio::Ready::readable());
Ok(()) Ok(())
}), },
),
); );
Ok(()) Ok(())
}, },

View file

@ -5,6 +5,7 @@ use std::{io, ptr, time};
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use futures::{Async, Future, Poll}; use futures::{Async, Future, Poll};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use super::settings::WorkerSettings; use super::settings::WorkerSettings;
use super::{h1, h2, ConnectionTag, HttpHandler, IoStream}; use super::{h1, h2, ConnectionTag, HttpHandler, IoStream};
@ -30,6 +31,7 @@ where
{ {
proto: Option<HttpProtocol<T, H>>, proto: Option<HttpProtocol<T, H>>,
node: Option<Node<HttpChannel<T, H>>>, node: Option<Node<HttpChannel<T, H>>>,
ka_timeout: Option<Delay>,
_tag: ConnectionTag, _tag: ConnectionTag,
} }
@ -42,9 +44,11 @@ where
settings: Rc<WorkerSettings<H>>, io: T, peer: Option<SocketAddr>, settings: Rc<WorkerSettings<H>>, io: T, peer: Option<SocketAddr>,
) -> HttpChannel<T, H> { ) -> HttpChannel<T, H> {
let _tag = settings.connection(); let _tag = settings.connection();
let ka_timeout = settings.keep_alive_timer();
HttpChannel { HttpChannel {
_tag, _tag,
ka_timeout,
node: None, node: None,
proto: Some(HttpProtocol::Unknown( proto: Some(HttpProtocol::Unknown(
settings, settings,
@ -77,6 +81,21 @@ where
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// keep-alive timer
if let Some(ref mut timer) = self.ka_timeout {
match timer.poll() {
Ok(Async::Ready(_)) => {
trace!("Slow request timed out, close connection");
if let Some(n) = self.node.as_mut() {
n.remove()
};
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => (),
Err(_) => panic!("Something is really wrong"),
}
}
if self.node.is_none() { if self.node.is_none() {
let el = self as *mut _; let el = self as *mut _;
self.node = Some(Node::new(el)); self.node = Some(Node::new(el));
@ -161,7 +180,12 @@ where
match kind { match kind {
ProtocolKind::Http1 => { ProtocolKind::Http1 => {
self.proto = Some(HttpProtocol::H1(h1::Http1::new( self.proto = Some(HttpProtocol::H1(h1::Http1::new(
settings, io, addr, buf, is_eof, settings,
io,
addr,
buf,
is_eof,
self.ka_timeout.take(),
))); )));
return self.poll(); return self.poll();
} }
@ -171,6 +195,7 @@ where
io, io,
addr, addr,
buf.freeze(), buf.freeze(),
self.ka_timeout.take(),
))); )));
return self.poll(); return self.poll();
} }

View file

@ -91,7 +91,7 @@ where
{ {
pub fn new( pub fn new(
settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>, settings: Rc<WorkerSettings<H>>, stream: T, addr: Option<SocketAddr>,
buf: BytesMut, is_eof: bool, buf: BytesMut, is_eof: bool, keepalive_timer: Option<Delay>,
) -> Self { ) -> Self {
Http1 { Http1 {
flags: if is_eof { flags: if is_eof {
@ -103,10 +103,10 @@ where
decoder: H1Decoder::new(), decoder: H1Decoder::new(),
payload: None, payload: None,
tasks: VecDeque::new(), tasks: VecDeque::new(),
keepalive_timer: None,
addr, addr,
buf, buf,
settings, settings,
keepalive_timer,
} }
} }
@ -364,7 +364,7 @@ where
if self.keepalive_timer.is_none() && keep_alive > 0 { if self.keepalive_timer.is_none() && keep_alive > 0 {
trace!("Start keep-alive timer"); trace!("Start keep-alive timer");
let mut timer = let mut timer =
Delay::new(Instant::now() + Duration::new(keep_alive, 0)); Delay::new(Instant::now() + Duration::from_secs(keep_alive));
// register timer // register timer
let _ = timer.poll(); let _ = timer.poll();
self.keepalive_timer = Some(timer); self.keepalive_timer = Some(timer);
@ -632,7 +632,7 @@ mod tests {
let readbuf = BytesMut::new(); let readbuf = BytesMut::new();
let settings = Rc::new(wrk_settings()); let settings = Rc::new(wrk_settings());
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false); let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None);
h1.poll_io(); h1.poll_io();
h1.poll_io(); h1.poll_io();
assert_eq!(h1.tasks.len(), 1); assert_eq!(h1.tasks.len(), 1);
@ -645,7 +645,7 @@ mod tests {
BytesMut::from(Vec::<u8>::from(&b"GET /test HTTP/1.1\r\n\r\n"[..])); BytesMut::from(Vec::<u8>::from(&b"GET /test HTTP/1.1\r\n\r\n"[..]));
let settings = Rc::new(wrk_settings()); let settings = Rc::new(wrk_settings());
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true); let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, true, None);
h1.poll_io(); h1.poll_io();
assert_eq!(h1.tasks.len(), 1); assert_eq!(h1.tasks.len(), 1);
} }
@ -656,7 +656,7 @@ mod tests {
let readbuf = BytesMut::new(); let readbuf = BytesMut::new();
let settings = Rc::new(wrk_settings()); let settings = Rc::new(wrk_settings());
let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false); let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf, false, None);
h1.poll_io(); h1.poll_io();
h1.poll_io(); h1.poll_io();
assert!(h1.flags.contains(Flags::ERROR)); assert!(h1.flags.contains(Flags::ERROR));

View file

@ -59,6 +59,7 @@ where
{ {
pub fn new( pub fn new(
settings: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes, settings: Rc<WorkerSettings<H>>, io: T, addr: Option<SocketAddr>, buf: Bytes,
keepalive_timer: Option<Delay>,
) -> Self { ) -> Self {
let extensions = io.extensions(); let extensions = io.extensions();
Http2 { Http2 {
@ -68,10 +69,10 @@ where
unread: if buf.is_empty() { None } else { Some(buf) }, unread: if buf.is_empty() { None } else { Some(buf) },
inner: io, inner: io,
})), })),
keepalive_timer: None,
addr, addr,
settings, settings,
extensions, extensions,
keepalive_timer,
} }
} }

View file

@ -13,7 +13,7 @@ use http::StatusCode;
use lazycell::LazyCell; use lazycell::LazyCell;
use parking_lot::Mutex; use parking_lot::Mutex;
use time; use time;
use tokio_timer::Interval; use tokio_timer::{Delay, Interval};
use super::channel::Node; use super::channel::Node;
use super::message::{Request, RequestPool}; use super::message::{Request, RequestPool};
@ -197,6 +197,16 @@ impl<H> WorkerSettings<H> {
&self.h &self.h
} }
pub fn keep_alive_timer(&self) -> Option<Delay> {
if self.keep_alive != 0 {
Some(Delay::new(
Instant::now() + Duration::from_secs(self.keep_alive),
))
} else {
None
}
}
pub fn keep_alive(&self) -> u64 { pub fn keep_alive(&self) -> u64 {
self.keep_alive self.keep_alive
} }

View file

@ -120,6 +120,7 @@ impl TestServer {
HttpServer::new(factory) HttpServer::new(factory)
.disable_signals() .disable_signals()
.listen(tcp) .listen(tcp)
.keep_alive(5)
.start(); .start();
tx.send((System::current(), local_addr, TestServer::get_conn())) tx.send((System::current(), local_addr, TestServer::get_conn()))
@ -328,6 +329,7 @@ impl<S: 'static> TestServerBuilder<S> {
config(&mut app); config(&mut app);
vec![app] vec![app]
}).workers(1) }).workers(1)
.keep_alive(5)
.disable_signals(); .disable_signals();
tx.send((System::current(), addr, TestServer::get_conn())) tx.send((System::current(), addr, TestServer::get_conn()))

View file

@ -407,24 +407,29 @@ fn test_client_cookie_handling() {
let cookie2 = cookie2b.clone(); let cookie2 = cookie2b.clone();
app.handler(move |req: &HttpRequest| { app.handler(move |req: &HttpRequest| {
// Check cookies were sent correctly // Check cookies were sent correctly
req.cookie("cookie1").ok_or_else(err) req.cookie("cookie1")
.and_then(|c1| if c1.value() == "value1" { .ok_or_else(err)
.and_then(|c1| {
if c1.value() == "value1" {
Ok(()) Ok(())
} else { } else {
Err(err()) Err(err())
}) }
.and_then(|()| req.cookie("cookie2").ok_or_else(err)) }).and_then(|()| req.cookie("cookie2").ok_or_else(err))
.and_then(|c2| if c2.value() == "value2" { .and_then(|c2| {
if c2.value() == "value2" {
Ok(()) Ok(())
} else { } else {
Err(err()) Err(err())
}
}) })
// Send some cookies back // Send some cookies back
.map(|_| HttpResponse::Ok() .map(|_| {
HttpResponse::Ok()
.cookie(cookie1.clone()) .cookie(cookie1.clone())
.cookie(cookie2.clone()) .cookie(cookie2.clone())
.finish() .finish()
) })
}) })
}); });