1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-03 05:48:45 +00:00

refactor DateService (#1983)

This commit is contained in:
fakeshadow 2021-02-12 13:52:58 -08:00 committed by GitHub
parent 95113ad12f
commit 366c032c36
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 78 deletions

View file

@ -4,9 +4,11 @@ use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
use std::{fmt, net}; use std::{fmt, net};
use actix_rt::time::{sleep, sleep_until, Instant, Sleep}; use actix_rt::{
task::JoinHandle,
time::{interval, sleep_until, Instant, Sleep},
};
use bytes::BytesMut; use bytes::BytesMut;
use futures_util::{future, FutureExt};
use time::OffsetDateTime; use time::OffsetDateTime;
/// "Sun, 06 Nov 1994 08:49:37 GMT".len() /// "Sun, 06 Nov 1994 08:49:37 GMT".len()
@ -49,7 +51,7 @@ struct Inner {
ka_enabled: bool, ka_enabled: bool,
secure: bool, secure: bool,
local_addr: Option<std::net::SocketAddr>, local_addr: Option<std::net::SocketAddr>,
timer: DateService, date_service: DateService,
} }
impl Clone for ServiceConfig { impl Clone for ServiceConfig {
@ -91,7 +93,7 @@ impl ServiceConfig {
client_disconnect, client_disconnect,
secure, secure,
local_addr, local_addr,
timer: DateService::new(), date_service: DateService::new(),
})) }))
} }
@ -125,7 +127,7 @@ impl ServiceConfig {
let delay_time = self.0.client_timeout; let delay_time = self.0.client_timeout;
if delay_time != 0 { if delay_time != 0 {
Some(sleep_until( Some(sleep_until(
self.0.timer.now() + Duration::from_millis(delay_time), self.0.date_service.now() + Duration::from_millis(delay_time),
)) ))
} else { } else {
None None
@ -136,7 +138,7 @@ impl ServiceConfig {
pub fn client_timer_expire(&self) -> Option<Instant> { pub fn client_timer_expire(&self) -> Option<Instant> {
let delay = self.0.client_timeout; let delay = self.0.client_timeout;
if delay != 0 { if delay != 0 {
Some(self.0.timer.now() + Duration::from_millis(delay)) Some(self.0.date_service.now() + Duration::from_millis(delay))
} else { } else {
None None
} }
@ -146,7 +148,7 @@ impl ServiceConfig {
pub fn client_disconnect_timer(&self) -> Option<Instant> { pub fn client_disconnect_timer(&self) -> Option<Instant> {
let delay = self.0.client_disconnect; let delay = self.0.client_disconnect;
if delay != 0 { if delay != 0 {
Some(self.0.timer.now() + Duration::from_millis(delay)) Some(self.0.date_service.now() + Duration::from_millis(delay))
} else { } else {
None None
} }
@ -156,7 +158,7 @@ impl ServiceConfig {
/// Return keep-alive timer delay is configured. /// Return keep-alive timer delay is configured.
pub fn keep_alive_timer(&self) -> Option<Sleep> { pub fn keep_alive_timer(&self) -> Option<Sleep> {
if let Some(ka) = self.0.keep_alive { if let Some(ka) = self.0.keep_alive {
Some(sleep_until(self.0.timer.now() + ka)) Some(sleep_until(self.0.date_service.now() + ka))
} else { } else {
None None
} }
@ -165,7 +167,7 @@ impl ServiceConfig {
/// Keep-alive expire time /// Keep-alive expire time
pub fn keep_alive_expire(&self) -> Option<Instant> { pub fn keep_alive_expire(&self) -> Option<Instant> {
if let Some(ka) = self.0.keep_alive { if let Some(ka) = self.0.keep_alive {
Some(self.0.timer.now() + ka) Some(self.0.date_service.now() + ka)
} else { } else {
None None
} }
@ -173,7 +175,7 @@ impl ServiceConfig {
#[inline] #[inline]
pub(crate) fn now(&self) -> Instant { pub(crate) fn now(&self) -> Instant {
self.0.timer.now() self.0.date_service.now()
} }
#[doc(hidden)] #[doc(hidden)]
@ -181,7 +183,7 @@ impl ServiceConfig {
let mut buf: [u8; 39] = [0; 39]; let mut buf: [u8; 39] = [0; 39];
buf[..6].copy_from_slice(b"date: "); buf[..6].copy_from_slice(b"date: ");
self.0 self.0
.timer .date_service
.set_date(|date| buf[6..35].copy_from_slice(&date.bytes)); .set_date(|date| buf[6..35].copy_from_slice(&date.bytes));
buf[35..].copy_from_slice(b"\r\n\r\n"); buf[35..].copy_from_slice(b"\r\n\r\n");
dst.extend_from_slice(&buf); dst.extend_from_slice(&buf);
@ -189,7 +191,7 @@ impl ServiceConfig {
pub(crate) fn set_date_header(&self, dst: &mut BytesMut) { pub(crate) fn set_date_header(&self, dst: &mut BytesMut) {
self.0 self.0
.timer .date_service
.set_date(|date| dst.extend_from_slice(&date.bytes)); .set_date(|date| dst.extend_from_slice(&date.bytes));
} }
} }
@ -230,57 +232,75 @@ impl fmt::Write for Date {
} }
} }
#[derive(Clone)] /// Service for update Date and Instant periodically at 500 millis interval.
struct DateService(Rc<DateServiceInner>); struct DateService {
current: Rc<Cell<(Date, Instant)>>,
struct DateServiceInner { handle: JoinHandle<()>,
current: Cell<Option<(Date, Instant)>>,
} }
impl DateServiceInner { impl Drop for DateService {
fn new() -> Self { fn drop(&mut self) {
DateServiceInner { // stop the timer update async task on drop.
current: Cell::new(None), self.handle.abort();
}
}
fn reset(&self) {
self.current.take();
}
fn update(&self) {
let now = Instant::now();
let date = Date::new();
self.current.set(Some((date, now)));
} }
} }
impl DateService { impl DateService {
fn new() -> Self { fn new() -> Self {
DateService(Rc::new(DateServiceInner::new())) // shared date and timer for DateService and update async task.
} let current = Rc::new(Cell::new((Date::new(), Instant::now())));
let current_clone = Rc::clone(&current);
// spawn an async task sleep for 500 milli and update current date/timer in a loop.
// handle is used to stop the task on DateService drop.
let handle = actix_rt::spawn(async move {
#[cfg(test)]
let _notify = notify_on_drop::NotifyOnDrop::new();
fn check_date(&self) { let mut interval = interval(Duration::from_millis(500));
if self.0.current.get().is_none() { loop {
self.0.update(); let now = interval.tick().await;
let date = Date::new();
current_clone.set((date, now));
}
});
// periodic date update DateService { current, handle }
let s = self.clone();
actix_rt::spawn(sleep(Duration::from_millis(500)).then(move |_| {
s.0.reset();
future::ready(())
}));
}
} }
fn now(&self) -> Instant { fn now(&self) -> Instant {
self.check_date(); self.current.get().1
self.0.current.get().unwrap().1
} }
fn set_date<F: FnMut(&Date)>(&self, mut f: F) { fn set_date<F: FnMut(&Date)>(&self, mut f: F) {
self.check_date(); f(&self.current.get().0);
f(&self.0.current.get().unwrap().0); }
}
// test drop behavior of DateService. only enabled in tests.
// TODO: move to a util module for testing all spawn handle drop style tasks.
#[cfg(test)]
mod notify_on_drop {
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
static NOTIFY_DROPPED: AtomicBool = AtomicBool::new(false);
pub(crate) fn is_dropped() -> bool {
NOTIFY_DROPPED.load(SeqCst)
}
pub(crate) struct NotifyOnDrop;
impl NotifyOnDrop {
pub(crate) fn new() -> Self {
NOTIFY_DROPPED.store(false, SeqCst);
NotifyOnDrop
}
}
impl Drop for NotifyOnDrop {
fn drop(&mut self) {
NOTIFY_DROPPED.store(true, SeqCst);
}
} }
} }
@ -288,14 +308,53 @@ impl DateService {
mod tests { mod tests {
use super::*; use super::*;
// Test modifying the date from within the closure use actix_rt::task::yield_now;
// passed to `set_date`
#[test] #[actix_rt::test]
fn test_evil_date() { async fn test_date_service_update() {
let service = DateService::new(); let settings = ServiceConfig::new(KeepAlive::Os, 0, 0, false, None);
// Make sure that `check_date` doesn't try to spawn a task
service.0.update(); yield_now().await;
service.set_date(|_| service.0.reset());
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf1);
let now1 = settings.now();
sleep_until(Instant::now() + Duration::from_secs(2)).await;
yield_now().await;
let now2 = settings.now();
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf2);
assert_ne!(now1, now2);
assert_ne!(buf1, buf2);
drop(settings);
assert!(notify_on_drop::is_dropped());
}
#[actix_rt::test]
async fn test_date_service_drop() {
let service = Rc::new(DateService::new());
// yield so date service have a chance to register the spawned timer update task.
yield_now().await;
let clone1 = service.clone();
let clone2 = service.clone();
let clone3 = service.clone();
drop(clone1);
assert_eq!(false, notify_on_drop::is_dropped());
drop(clone2);
assert_eq!(false, notify_on_drop::is_dropped());
drop(clone3);
assert_eq!(false, notify_on_drop::is_dropped());
drop(service);
assert!(notify_on_drop::is_dropped());
} }
#[test] #[test]

View file

@ -201,8 +201,8 @@ mod tests {
use super::*; use super::*;
use crate::HttpMessage; use crate::HttpMessage;
#[test] #[actix_rt::test]
fn test_http_request_chunked_payload_and_next_message() { async fn test_http_request_chunked_payload_and_next_message() {
let mut codec = Codec::default(); let mut codec = Codec::default();
let mut buf = BytesMut::from( let mut buf = BytesMut::from(

View file

@ -529,8 +529,8 @@ mod tests {
); );
} }
#[test] #[actix_rt::test]
fn test_camel_case() { async fn test_camel_case() {
let mut bytes = BytesMut::with_capacity(2048); let mut bytes = BytesMut::with_capacity(2048);
let mut head = RequestHead::default(); let mut head = RequestHead::default();
head.set_camel_case_headers(true); head.set_camel_case_headers(true);
@ -593,8 +593,8 @@ mod tests {
assert!(data.contains("date: date\r\n")); assert!(data.contains("date: date\r\n"));
} }
#[test] #[actix_rt::test]
fn test_extra_headers() { async fn test_extra_headers() {
let mut bytes = BytesMut::with_capacity(2048); let mut bytes = BytesMut::with_capacity(2048);
let mut head = RequestHead::default(); let mut head = RequestHead::default();
@ -627,8 +627,8 @@ mod tests {
assert!(data.contains("date: date\r\n")); assert!(data.contains("date: date\r\n"));
} }
#[test] #[actix_rt::test]
fn test_no_content_length() { async fn test_no_content_length() {
let mut bytes = BytesMut::with_capacity(2048); let mut bytes = BytesMut::with_capacity(2048);
let mut res: Response<()> = let mut res: Response<()> =

View file

@ -42,24 +42,25 @@ fn bench_async_burst(c: &mut Criterion) {
c.bench_function("get_body_async_burst", move |b| { c.bench_function("get_body_async_burst", move |b| {
b.iter_custom(|iters| { b.iter_custom(|iters| {
let client = rt.block_on(async {
rt.block_on(async { Client::new().get(url.clone()).freeze().unwrap() }); let client = Client::new().get(url.clone()).freeze().unwrap();
let start = std::time::Instant::now();
// benchmark body
let start = std::time::Instant::now();
// benchmark body
let resps = rt.block_on(async move {
let burst = (0..iters).map(|_| client.send()); let burst = (0..iters).map(|_| client.send());
join_all(burst).await let resps = join_all(burst).await;
});
let elapsed = start.elapsed();
// if there are failed requests that might be an issue let elapsed = start.elapsed();
let failed = resps.iter().filter(|r| r.is_err()).count();
if failed > 0 {
eprintln!("failed {} requests (might be bench timeout)", failed);
};
elapsed // if there are failed requests that might be an issue
let failed = resps.iter().filter(|r| r.is_err()).count();
if failed > 0 {
eprintln!("failed {} requests (might be bench timeout)", failed);
};
elapsed
})
}) })
}); });
} }