1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-03 05:48:45 +00:00

add service readiness to ServerService

This commit is contained in:
Nikolay Kim 2018-09-07 11:35:25 -07:00
parent 467350c9fc
commit db1fe14fd2
5 changed files with 156 additions and 132 deletions

View file

@ -17,7 +17,7 @@ use actix::{
}; };
use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::server_service::{ServerNewService, ServerServiceFactory}; use super::server_service::{self, ServerNewService, ServerServiceFactory};
use super::worker::{Conn, StopWorker, Worker, WorkerClient}; use super::worker::{Conn, StopWorker, Worker, WorkerClient};
use super::NewService; use super::NewService;
use super::{PauseServer, ResumeServer, StopServer, Token}; use super::{PauseServer, ResumeServer, StopServer, Token};
@ -37,8 +37,6 @@ pub struct Server {
shutdown_timeout: u16, shutdown_timeout: u16,
signals: Option<Addr<signal::ProcessSignals>>, signals: Option<Addr<signal::ProcessSignals>>,
no_signals: bool, no_signals: bool,
maxconn: usize,
maxconnrate: usize,
} }
impl Default for Server { impl Default for Server {
@ -60,8 +58,6 @@ impl Server {
shutdown_timeout: 30, shutdown_timeout: 30,
signals: None, signals: None,
no_signals: false, no_signals: false,
maxconn: 102_400,
maxconnrate: 256,
} }
} }
@ -79,20 +75,9 @@ impl Server {
/// All socket listeners will stop accepting connections when this limit is /// All socket listeners will stop accepting connections when this limit is
/// reached for each worker. /// reached for each worker.
/// ///
/// By default max connections is set to a 100k. /// By default max connections is set to a 25k per worker.
pub fn maxconn(mut self, num: usize) -> Self { pub fn maxconn(self, num: usize) -> Self {
self.maxconn = num; server_service::max_concurrent_connections(num);
self
}
/// Sets the maximum per-worker concurrent connection establish process.
///
/// All listeners will stop accepting connections when this limit is
/// reached. It can be used to limit the global SSL CPU usage.
///
/// By default max connections is set to a 256.
pub fn maxconnrate(mut self, num: usize) -> Self {
self.maxconnrate = num;
self self
} }
@ -273,7 +258,7 @@ impl Server {
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr<Worker>, WorkerClient) { fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr<Worker>, WorkerClient) {
let (tx, rx) = unbounded::<Conn>(); let (tx, rx) = unbounded::<Conn>();
let conns = Connections::new(notify, self.maxconn, self.maxconnrate); let conns = Connections::new(notify, 0, 0);
let worker = WorkerClient::new(idx, tx, conns.clone()); let worker = WorkerClient::new(idx, tx, conns.clone());
let services: Vec<Box<ServerServiceFactory + Send>> = let services: Vec<Box<ServerServiceFactory + Send>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();

View file

@ -1,10 +1,10 @@
use std::sync::{ use std::cell::Cell;
atomic::{AtomicUsize, Ordering}, use std::rc::Rc;
Arc, use std::sync::atomic::{AtomicUsize, Ordering};
};
use std::{fmt, io, net}; use std::{fmt, io, net};
use futures::{future, Future, Poll}; use futures::task::AtomicTask;
use futures::{future, Async, Future, Poll};
use tokio_reactor::Handle; use tokio_reactor::Handle;
use tokio_tcp::TcpStream; use tokio_tcp::TcpStream;
@ -19,9 +19,38 @@ pub(crate) type BoxedServerService = Box<
>, >,
>; >;
const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
pub fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed);
}
pub(crate) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|counter| counter.total())
}
thread_local! {
static MAX_CONNS_COUNTER: Counter = Counter::new(MAX_CONNS.load(Ordering::Relaxed));
}
pub(crate) struct ServerService<T> { pub(crate) struct ServerService<T> {
inner: T, service: T,
counter: Arc<AtomicUsize>, counter: Counter,
}
impl<T> ServerService<T> {
fn new(service: T) -> Self {
MAX_CONNS_COUNTER.with(|counter| ServerService {
service,
counter: counter.clone(),
})
}
} }
impl<T> Service for ServerService<T> impl<T> Service for ServerService<T>
@ -36,7 +65,11 @@ where
type Future = Box<Future<Item = (), Error = ()>>; type Future = Box<Future<Item = (), Error = ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready().map_err(|_| ()) if self.counter.check() {
self.service.poll_ready().map_err(|_| ())
} else {
Ok(Async::NotReady)
}
} }
fn call(&mut self, stream: net::TcpStream) -> Self::Future { fn call(&mut self, stream: net::TcpStream) -> Self::Future {
@ -45,11 +78,14 @@ where
}); });
if let Ok(stream) = stream { if let Ok(stream) = stream {
let counter = self.counter.clone(); let guard = self.counter.get();
let _ = counter.fetch_add(1, Ordering::Relaxed);
Box::new(self.inner.call(stream).map_err(|_| ()).map(move |_| { Box::new(
let _ = counter.fetch_sub(1, Ordering::Relaxed); self.service
})) .call(stream)
.map_err(|_| ())
.map(move |_| drop(guard)),
)
} else { } else {
Box::new(future::err(())) Box::new(future::err(()))
} }
@ -61,7 +97,6 @@ where
F: Fn() -> T + Send + Clone, F: Fn() -> T + Send + Clone,
{ {
inner: F, inner: F,
counter: Arc<AtomicUsize>,
} }
impl<F, T> ServerNewService<F, T> impl<F, T> ServerNewService<F, T>
@ -73,16 +108,11 @@ where
T::Error: fmt::Display, T::Error: fmt::Display,
{ {
pub(crate) fn create(inner: F) -> Box<ServerServiceFactory + Send> { pub(crate) fn create(inner: F) -> Box<ServerServiceFactory + Send> {
Box::new(Self { Box::new(Self { inner })
inner,
counter: Arc::new(AtomicUsize::new(0)),
})
} }
} }
pub trait ServerServiceFactory { pub trait ServerServiceFactory {
fn counter(&self) -> Arc<AtomicUsize>;
fn clone_factory(&self) -> Box<ServerServiceFactory + Send>; fn clone_factory(&self) -> Box<ServerServiceFactory + Send>;
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>; fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>;
@ -96,26 +126,19 @@ where
T::Future: 'static, T::Future: 'static,
T::Error: fmt::Display, T::Error: fmt::Display,
{ {
fn counter(&self) -> Arc<AtomicUsize> {
self.counter.clone()
}
fn clone_factory(&self) -> Box<ServerServiceFactory + Send> { fn clone_factory(&self) -> Box<ServerServiceFactory + Send> {
Box::new(Self { Box::new(Self {
inner: self.inner.clone(), inner: self.inner.clone(),
counter: Arc::new(AtomicUsize::new(0)),
}) })
} }
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> { fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
let counter = self.counter.clone();
Box::new( Box::new(
(self.inner)() (self.inner)()
.new_service() .new_service()
.map_err(|_| ()) .map_err(|_| ())
.map(move |inner| { .map(move |inner| {
let service: BoxedServerService = let service: BoxedServerService = Box::new(ServerService::new(inner));
Box::new(ServerService { inner, counter });
service service
}), }),
) )
@ -123,10 +146,6 @@ where
} }
impl ServerServiceFactory for Box<ServerServiceFactory> { impl ServerServiceFactory for Box<ServerServiceFactory> {
fn counter(&self) -> Arc<AtomicUsize> {
self.as_ref().counter()
}
fn clone_factory(&self) -> Box<ServerServiceFactory + Send> { fn clone_factory(&self) -> Box<ServerServiceFactory + Send> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }
@ -135,3 +154,71 @@ impl ServerServiceFactory for Box<ServerServiceFactory> {
self.as_ref().create() self.as_ref().create()
} }
} }
#[derive(Clone)]
pub(crate) struct Counter(Rc<CounterInner>);
struct CounterInner {
count: Cell<usize>,
maxconn: usize,
task: AtomicTask,
}
impl Counter {
pub fn new(maxconn: usize) -> Self {
Counter(Rc::new(CounterInner {
maxconn,
count: Cell::new(0),
task: AtomicTask::new(),
}))
}
pub fn get(&self) -> CounterGuard {
CounterGuard::new(self.0.clone())
}
pub fn check(&self) -> bool {
self.0.check()
}
pub fn total(&self) -> usize {
self.0.count.get()
}
}
pub(crate) struct CounterGuard(Rc<CounterInner>);
impl CounterGuard {
fn new(inner: Rc<CounterInner>) -> Self {
inner.inc();
CounterGuard(inner)
}
}
impl Drop for CounterGuard {
fn drop(&mut self) {
self.0.dec();
}
}
impl CounterInner {
fn inc(&self) {
let num = self.count.get() + 1;
self.count.set(num);
if num == self.maxconn {
self.task.register();
}
}
fn dec(&self) {
let num = self.count.get();
self.count.set(num - 1);
if num == self.maxconn {
self.task.notify();
}
}
fn check(&self) -> bool {
self.count.get() < self.maxconn
}
}

View file

@ -1,18 +1,29 @@
//! SSL Services //! SSL Services
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use super::server_service::Counter;
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
mod openssl; mod openssl;
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
pub use self::openssl::{OpensslAcceptor, OpensslConnector}; pub use self::openssl::{OpensslAcceptor, OpensslConnector};
pub(crate) const MAX_CONN: AtomicUsize = AtomicUsize::new(0); pub(crate) const MAX_CONN: AtomicUsize = AtomicUsize::new(256);
/// Set max concurrent ssl connect operation per thread /// Sets the maximum per-worker concurrent ssl connection establish process.
///
/// All listeners will stop accepting connections when this limit is
/// reached. It can be used to limit the global SSL CPU usage.
///
/// By default max connections is set to a 256.
pub fn max_concurrent_ssl_connect(num: usize) { pub fn max_concurrent_ssl_connect(num: usize) {
MAX_CONN.store(num, Ordering::Relaxed); MAX_CONN.store(num, Ordering::Relaxed);
} }
thread_local! {
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
}
// #[cfg(feature = "tls")] // #[cfg(feature = "tls")]
// mod nativetls; // mod nativetls;
// #[cfg(feature = "tls")] // #[cfg(feature = "tls")]

View file

@ -1,17 +1,14 @@
use std::cell::Cell;
use std::io; use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::atomic::Ordering;
use futures::task::AtomicTask;
use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use openssl::ssl::{AlpnError, Error, SslAcceptor, SslAcceptorBuilder, SslConnector}; use openssl::ssl::{AlpnError, Error, SslAcceptor, SslAcceptorBuilder, SslConnector};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::{AcceptAsync, ConnectAsync, SslAcceptorExt, SslConnectorExt, SslStream}; use tokio_openssl::{AcceptAsync, ConnectAsync, SslAcceptorExt, SslConnectorExt, SslStream};
use super::MAX_CONN; use super::MAX_CONN_COUNTER;
use connector::ConnectionInfo; use connector::ConnectionInfo;
use server_service::{Counter, CounterGuard};
use {NewService, Service}; use {NewService, Service};
/// Support `SSL` connections via openssl package /// Support `SSL` connections via openssl package
@ -70,10 +67,12 @@ impl<T: AsyncRead + AsyncWrite> NewService for OpensslAcceptor<T> {
type Future = FutureResult<Self::Service, io::Error>; type Future = FutureResult<Self::Service, io::Error>;
fn new_service(&self) -> Self::Future { fn new_service(&self) -> Self::Future {
MAX_CONN_COUNTER.with(|counter| {
ok(OpensslAcceptorService { ok(OpensslAcceptorService {
acceptor: self.acceptor.clone(), acceptor: self.acceptor.clone(),
io: PhantomData, io: PhantomData,
inner: Rc::new(Inner::default()), counter: counter.clone(),
})
}) })
} }
} }
@ -81,7 +80,7 @@ impl<T: AsyncRead + AsyncWrite> NewService for OpensslAcceptor<T> {
pub struct OpensslAcceptorService<T> { pub struct OpensslAcceptorService<T> {
acceptor: SslAcceptor, acceptor: SslAcceptor,
io: PhantomData<T>, io: PhantomData<T>,
inner: Rc<Inner>, counter: Counter,
} }
impl<T: AsyncRead + AsyncWrite> Service for OpensslAcceptorService<T> { impl<T: AsyncRead + AsyncWrite> Service for OpensslAcceptorService<T> {
@ -91,7 +90,7 @@ impl<T: AsyncRead + AsyncWrite> Service for OpensslAcceptorService<T> {
type Future = OpensslAcceptorServiceFut<T>; type Future = OpensslAcceptorServiceFut<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.inner.check() { if self.counter.check() {
Ok(Async::Ready(())) Ok(Async::Ready(()))
} else { } else {
Ok(Async::NotReady) Ok(Async::NotReady)
@ -99,59 +98,19 @@ impl<T: AsyncRead + AsyncWrite> Service for OpensslAcceptorService<T> {
} }
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
self.inner.inc();
OpensslAcceptorServiceFut { OpensslAcceptorServiceFut {
inner: self.inner.clone(), _guard: self.counter.get(),
fut: SslAcceptorExt::accept_async(&self.acceptor, req), fut: SslAcceptorExt::accept_async(&self.acceptor, req),
} }
} }
} }
struct Inner {
maxconn: usize,
count: Cell<usize>,
task: AtomicTask,
}
impl Default for Inner {
fn default() -> Inner {
Inner {
maxconn: MAX_CONN.load(Ordering::Relaxed),
count: Cell::new(0),
task: AtomicTask::new(),
}
}
}
impl Inner {
fn inc(&self) {
let num = self.count.get() + 1;
self.count.set(num);
if num == self.maxconn {
self.task.register();
}
}
fn dec(&self) {
let num = self.count.get();
self.count.set(num - 1);
if num == self.maxconn {
self.task.notify();
}
}
fn check(&self) -> bool {
self.count.get() < self.maxconn
}
}
pub struct OpensslAcceptorServiceFut<T> pub struct OpensslAcceptorServiceFut<T>
where where
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
{ {
fut: AcceptAsync<T>, fut: AcceptAsync<T>,
inner: Rc<Inner>, _guard: CounterGuard,
} }
impl<T: AsyncRead + AsyncWrite> Future for OpensslAcceptorServiceFut<T> { impl<T: AsyncRead + AsyncWrite> Future for OpensslAcceptorServiceFut<T> {
@ -159,14 +118,7 @@ impl<T: AsyncRead + AsyncWrite> Future for OpensslAcceptorServiceFut<T> {
type Error = Error; type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let res = self.fut.poll(); self.fut.poll()
if let Ok(Async::NotReady) = res {
Ok(Async::NotReady)
} else {
self.inner.dec();
res
}
} }
} }

View file

@ -1,5 +1,3 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{net, time}; use std::{net, time};
use futures::sync::mpsc::{SendError, UnboundedSender}; use futures::sync::mpsc::{SendError, UnboundedSender};
@ -12,7 +10,7 @@ use actix::{
Response, WrapFuture, Response, WrapFuture,
}; };
use super::server_service::{BoxedServerService, ServerServiceFactory}; use super::server_service::{self, BoxedServerService, ServerServiceFactory};
use super::{server::Connections, Token}; use super::{server::Connections, Token};
#[derive(Message)] #[derive(Message)]
@ -61,7 +59,7 @@ impl Message for StopWorker {
pub(crate) struct Worker { pub(crate) struct Worker {
// conns: Connections, // conns: Connections,
services: Vec<BoxedServerService>, services: Vec<BoxedServerService>,
counters: Vec<Arc<AtomicUsize>>, // counters: Vec<Arc<AtomicUsize>>,
} }
impl Actor for Worker { impl Actor for Worker {
@ -74,7 +72,7 @@ impl Worker {
) -> Self { ) -> Self {
let wrk = Worker { let wrk = Worker {
services: Vec::new(), services: Vec::new(),
counters: services.iter().map(|i| i.counter()).collect(), // counters: services.iter().map(|i| i.counter()).collect(),
}; };
ctx.wait( ctx.wait(
@ -102,10 +100,7 @@ impl Worker {
) { ) {
// sleep for 1 second and then check again // sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
let num = slf let num = server_service::num_connections();
.counters
.iter()
.fold(0, |i, v| i + v.load(Ordering::Relaxed));
if num == 0 { if num == 0 {
let _ = tx.send(true); let _ = tx.send(true);
Arbiter::current().do_send(StopArbiter(0)); Arbiter::current().do_send(StopArbiter(0));
@ -134,20 +129,14 @@ impl Handler<StopWorker> for Worker {
type Result = Response<bool, ()>; type Result = Response<bool, ()>;
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
let num = self let num = server_service::num_connections();
.counters
.iter()
.fold(0, |i, v| i + v.load(Ordering::Relaxed));
if num == 0 { if num == 0 {
info!("Shutting down http worker, 0 connections"); info!("Shutting down http worker, 0 connections");
Response::reply(Ok(true)) Response::reply(Ok(true))
} else if let Some(dur) = msg.graceful { } else if let Some(dur) = msg.graceful {
self.shutdown(false); self.shutdown(false);
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let num = self let num = server_service::num_connections();
.counters
.iter()
.fold(0, |i, v| i + v.load(Ordering::Relaxed));
if num != 0 { if num != 0 {
info!("Graceful http worker shutdown, {} connections", num); info!("Graceful http worker shutdown, {} connections", num);
self.shutdown_timeout(ctx, tx, dur); self.shutdown_timeout(ctx, tx, dur);