1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-10 17:29:36 +00:00

add keep-alive dispatcher tests

This commit is contained in:
Rob Ede 2022-01-27 06:29:46 +00:00
parent 14a4f325d3
commit 3ae4f0a629
No known key found for this signature in database
GPG key ID: 97C636207D3EF933
2 changed files with 166 additions and 20 deletions

View file

@ -1,5 +1,6 @@
use std::{future::Future, str, task::Poll};
use std::{future::Future, str, task::Poll, time::Duration};
use actix_rt::time::sleep;
use actix_service::fn_service;
use actix_utils::future::{ready, Ready};
use bytes::Bytes;
@ -63,23 +64,22 @@ fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, E
}
#[actix_rt::test]
#[ignore]
async fn test_keep_alive() {
async fn test_keep_alive_timeout() {
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
services,
cfg,
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
lazy(|cx| {
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
services,
cfg,
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
assert!(
@ -90,7 +90,7 @@ async fn test_keep_alive() {
// polls: initial
assert_eq!(h1.poll_count, 1);
let mut res = buf.write_buf_slice_mut();
let mut res = buf.take_write_buf().to_vec();
stabilize_date_header(&mut res);
let res = &res[..];
@ -105,8 +105,149 @@ async fn test_keep_alive() {
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
);
})
.await;
// sleep slightly longer than keep-alive timeout
sleep(Duration::from_millis(1100)).await;
lazy(|cx| {
assert!(
h1.as_mut().poll(cx).is_ready(),
"keep-alive should have resolved",
);
// polls: initial => keep-alive wake-up shutdown
assert_eq!(h1.poll_count, 2);
if let DispatcherStateProj::Normal { inner } = h1.project().inner.project() {
// connection closed
assert!(inner.flags.contains(Flags::SHUTDOWN));
assert!(inner.flags.contains(Flags::WRITE_DISCONNECT));
// and nothing added to write buffer
assert!(buf.write_buf_slice().is_empty());
}
})
.await;
}
#[actix_rt::test]
async fn test_keep_alive_follow_up_req() {
let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
let cfg = ServiceConfig::new(KeepAlive::Timeout(2), 100, 0, false, None);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
services,
cfg,
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
lazy(|cx| {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
assert!(
h1.as_mut().poll(cx).is_pending(),
"keep-alive should prevent poll from resolving"
);
// polls: initial
assert_eq!(h1.poll_count, 1);
let mut res = buf.take_write_buf().to_vec();
stabilize_date_header(&mut res);
let res = &res[..];
let exp = b"\
HTTP/1.1 200 OK\r\n\
content-length: 5\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
/abcd\
";
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
);
})
.await;
// sleep for less than KA timeout
sleep(Duration::from_millis(200)).await;
lazy(|cx| {
assert!(
h1.as_mut().poll(cx).is_pending(),
"keep-alive should not have resolved dispatcher yet",
);
// polls: initial => manual
assert_eq!(h1.poll_count, 2);
if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
// connection not closed
assert!(!inner.flags.contains(Flags::SHUTDOWN));
assert!(!inner.flags.contains(Flags::WRITE_DISCONNECT));
// and nothing added to write buffer
assert!(buf.write_buf_slice().is_empty());
}
})
.await;
lazy(|cx| {
buf.extend_read_buf(
"\
GET /efg HTTP/1.1\r\n\
Connection: close\r\n\
\r\n\r\n",
);
assert!(
h1.as_mut().poll(cx).is_ready(),
"connection close header should override keep-alive setting",
);
// polls: initial => manual => follow-up req => shutdown
assert_eq!(h1.poll_count, 4);
if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() {
// connection closed
assert!(inner.flags.contains(Flags::SHUTDOWN));
assert!(!inner.flags.contains(Flags::WRITE_DISCONNECT));
}
let mut res = buf.take_write_buf().to_vec();
stabilize_date_header(&mut res);
let res = &res[..];
let exp = b"\
HTTP/1.1 200 OK\r\n\
content-length: 4\r\n\
connection: close\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
/efg\
";
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
);

View file

@ -212,6 +212,11 @@ impl TestBuffer {
RefMut::map(self.write_buf.borrow_mut(), |b| b.as_mut())
}
#[allow(dead_code)]
pub(crate) fn take_write_buf(&self) -> Bytes {
self.write_buf.borrow_mut().split().freeze()
}
/// Add data to read buffer.
pub fn extend_read_buf<T: AsRef<[u8]>>(&mut self, data: T) {
self.read_buf.borrow_mut().extend_from_slice(data.as_ref())