From db1fe14fd2c7e6876617e648f93e25f2adec2c2e Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 7 Sep 2018 11:35:25 -0700 Subject: [PATCH] add service readiness to ServerService --- src/server.rs | 25 ++----- src/server_service.rs | 151 +++++++++++++++++++++++++++++++++--------- src/ssl/mod.rs | 15 ++++- src/ssl/openssl.rs | 74 ++++----------------- src/worker.rs | 23 ++----- 5 files changed, 156 insertions(+), 132 deletions(-) diff --git a/src/server.rs b/src/server.rs index 2541b3a19..a5e23d297 100644 --- a/src/server.rs +++ b/src/server.rs @@ -17,7 +17,7 @@ use actix::{ }; use super::accept::{AcceptLoop, AcceptNotify, Command}; -use super::server_service::{ServerNewService, ServerServiceFactory}; +use super::server_service::{self, ServerNewService, ServerServiceFactory}; use super::worker::{Conn, StopWorker, Worker, WorkerClient}; use super::NewService; use super::{PauseServer, ResumeServer, StopServer, Token}; @@ -37,8 +37,6 @@ pub struct Server { shutdown_timeout: u16, signals: Option>, no_signals: bool, - maxconn: usize, - maxconnrate: usize, } impl Default for Server { @@ -60,8 +58,6 @@ impl Server { shutdown_timeout: 30, signals: None, no_signals: false, - maxconn: 102_400, - maxconnrate: 256, } } @@ -79,20 +75,9 @@ impl Server { /// All socket listeners will stop accepting connections when this limit is /// reached for each worker. /// - /// By default max connections is set to a 100k. - pub fn maxconn(mut self, num: usize) -> Self { - self.maxconn = num; - self - } - - /// Sets the maximum per-worker concurrent connection establish process. - /// - /// All listeners will stop accepting connections when this limit is - /// reached. It can be used to limit the global SSL CPU usage. - /// - /// By default max connections is set to a 256. - pub fn maxconnrate(mut self, num: usize) -> Self { - self.maxconnrate = num; + /// By default max connections is set to a 25k per worker. + pub fn maxconn(self, num: usize) -> Self { + server_service::max_concurrent_connections(num); self } @@ -273,7 +258,7 @@ impl Server { fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr, WorkerClient) { let (tx, rx) = unbounded::(); - let conns = Connections::new(notify, self.maxconn, self.maxconnrate); + let conns = Connections::new(notify, 0, 0); let worker = WorkerClient::new(idx, tx, conns.clone()); let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); diff --git a/src/server_service.rs b/src/server_service.rs index 8bfe5b556..0afc99d90 100644 --- a/src/server_service.rs +++ b/src/server_service.rs @@ -1,10 +1,10 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; +use std::cell::Cell; +use std::rc::Rc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, io, net}; -use futures::{future, Future, Poll}; +use futures::task::AtomicTask; +use futures::{future, Async, Future, Poll}; use tokio_reactor::Handle; use tokio_tcp::TcpStream; @@ -19,9 +19,38 @@ pub(crate) type BoxedServerService = Box< >, >; +const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); + +/// Sets the maximum per-worker number of concurrent connections. +/// +/// All socket listeners will stop accepting connections when this limit is +/// reached for each worker. +/// +/// By default max connections is set to a 25k per worker. +pub fn max_concurrent_connections(num: usize) { + MAX_CONNS.store(num, Ordering::Relaxed); +} + +pub(crate) fn num_connections() -> usize { + MAX_CONNS_COUNTER.with(|counter| counter.total()) +} + +thread_local! { + static MAX_CONNS_COUNTER: Counter = Counter::new(MAX_CONNS.load(Ordering::Relaxed)); +} + pub(crate) struct ServerService { - inner: T, - counter: Arc, + service: T, + counter: Counter, +} + +impl ServerService { + fn new(service: T) -> Self { + MAX_CONNS_COUNTER.with(|counter| ServerService { + service, + counter: counter.clone(), + }) + } } impl Service for ServerService @@ -36,7 +65,11 @@ where type Future = Box>; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready().map_err(|_| ()) + if self.counter.check() { + self.service.poll_ready().map_err(|_| ()) + } else { + Ok(Async::NotReady) + } } fn call(&mut self, stream: net::TcpStream) -> Self::Future { @@ -45,11 +78,14 @@ where }); if let Ok(stream) = stream { - let counter = self.counter.clone(); - let _ = counter.fetch_add(1, Ordering::Relaxed); - Box::new(self.inner.call(stream).map_err(|_| ()).map(move |_| { - let _ = counter.fetch_sub(1, Ordering::Relaxed); - })) + let guard = self.counter.get(); + + Box::new( + self.service + .call(stream) + .map_err(|_| ()) + .map(move |_| drop(guard)), + ) } else { Box::new(future::err(())) } @@ -61,7 +97,6 @@ where F: Fn() -> T + Send + Clone, { inner: F, - counter: Arc, } impl ServerNewService @@ -73,16 +108,11 @@ where T::Error: fmt::Display, { pub(crate) fn create(inner: F) -> Box { - Box::new(Self { - inner, - counter: Arc::new(AtomicUsize::new(0)), - }) + Box::new(Self { inner }) } } pub trait ServerServiceFactory { - fn counter(&self) -> Arc; - fn clone_factory(&self) -> Box; fn create(&self) -> Box>; @@ -96,26 +126,19 @@ where T::Future: 'static, T::Error: fmt::Display, { - fn counter(&self) -> Arc { - self.counter.clone() - } - fn clone_factory(&self) -> Box { Box::new(Self { inner: self.inner.clone(), - counter: Arc::new(AtomicUsize::new(0)), }) } fn create(&self) -> Box> { - let counter = self.counter.clone(); Box::new( (self.inner)() .new_service() .map_err(|_| ()) .map(move |inner| { - let service: BoxedServerService = - Box::new(ServerService { inner, counter }); + let service: BoxedServerService = Box::new(ServerService::new(inner)); service }), ) @@ -123,10 +146,6 @@ where } impl ServerServiceFactory for Box { - fn counter(&self) -> Arc { - self.as_ref().counter() - } - fn clone_factory(&self) -> Box { self.as_ref().clone_factory() } @@ -135,3 +154,71 @@ impl ServerServiceFactory for Box { self.as_ref().create() } } + +#[derive(Clone)] +pub(crate) struct Counter(Rc); + +struct CounterInner { + count: Cell, + maxconn: usize, + task: AtomicTask, +} + +impl Counter { + pub fn new(maxconn: usize) -> Self { + Counter(Rc::new(CounterInner { + maxconn, + count: Cell::new(0), + task: AtomicTask::new(), + })) + } + + pub fn get(&self) -> CounterGuard { + CounterGuard::new(self.0.clone()) + } + + pub fn check(&self) -> bool { + self.0.check() + } + + pub fn total(&self) -> usize { + self.0.count.get() + } +} + +pub(crate) struct CounterGuard(Rc); + +impl CounterGuard { + fn new(inner: Rc) -> Self { + inner.inc(); + CounterGuard(inner) + } +} + +impl Drop for CounterGuard { + fn drop(&mut self) { + self.0.dec(); + } +} + +impl CounterInner { + fn inc(&self) { + let num = self.count.get() + 1; + self.count.set(num); + if num == self.maxconn { + self.task.register(); + } + } + + fn dec(&self) { + let num = self.count.get(); + self.count.set(num - 1); + if num == self.maxconn { + self.task.notify(); + } + } + + fn check(&self) -> bool { + self.count.get() < self.maxconn + } +} diff --git a/src/ssl/mod.rs b/src/ssl/mod.rs index 5504f15ef..bb4408850 100644 --- a/src/ssl/mod.rs +++ b/src/ssl/mod.rs @@ -1,18 +1,29 @@ //! SSL Services use std::sync::atomic::{AtomicUsize, Ordering}; +use super::server_service::Counter; + #[cfg(feature = "ssl")] mod openssl; #[cfg(feature = "ssl")] pub use self::openssl::{OpensslAcceptor, OpensslConnector}; -pub(crate) const MAX_CONN: AtomicUsize = AtomicUsize::new(0); +pub(crate) const MAX_CONN: AtomicUsize = AtomicUsize::new(256); -/// Set max concurrent ssl connect operation per thread +/// Sets the maximum per-worker concurrent ssl connection establish process. +/// +/// All listeners will stop accepting connections when this limit is +/// reached. It can be used to limit the global SSL CPU usage. +/// +/// By default max connections is set to a 256. pub fn max_concurrent_ssl_connect(num: usize) { MAX_CONN.store(num, Ordering::Relaxed); } +thread_local! { + static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed)); +} + // #[cfg(feature = "tls")] // mod nativetls; // #[cfg(feature = "tls")] diff --git a/src/ssl/openssl.rs b/src/ssl/openssl.rs index b51fadbe9..69690ee27 100644 --- a/src/ssl/openssl.rs +++ b/src/ssl/openssl.rs @@ -1,17 +1,14 @@ -use std::cell::Cell; use std::io; use std::marker::PhantomData; -use std::rc::Rc; -use std::sync::atomic::Ordering; -use futures::task::AtomicTask; use futures::{future::ok, future::FutureResult, Async, Future, Poll}; use openssl::ssl::{AlpnError, Error, SslAcceptor, SslAcceptorBuilder, SslConnector}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::{AcceptAsync, ConnectAsync, SslAcceptorExt, SslConnectorExt, SslStream}; -use super::MAX_CONN; +use super::MAX_CONN_COUNTER; use connector::ConnectionInfo; +use server_service::{Counter, CounterGuard}; use {NewService, Service}; /// Support `SSL` connections via openssl package @@ -70,10 +67,12 @@ impl NewService for OpensslAcceptor { type Future = FutureResult; fn new_service(&self) -> Self::Future { - ok(OpensslAcceptorService { - acceptor: self.acceptor.clone(), - io: PhantomData, - inner: Rc::new(Inner::default()), + MAX_CONN_COUNTER.with(|counter| { + ok(OpensslAcceptorService { + acceptor: self.acceptor.clone(), + io: PhantomData, + counter: counter.clone(), + }) }) } } @@ -81,7 +80,7 @@ impl NewService for OpensslAcceptor { pub struct OpensslAcceptorService { acceptor: SslAcceptor, io: PhantomData, - inner: Rc, + counter: Counter, } impl Service for OpensslAcceptorService { @@ -91,7 +90,7 @@ impl Service for OpensslAcceptorService { type Future = OpensslAcceptorServiceFut; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - if self.inner.check() { + if self.counter.check() { Ok(Async::Ready(())) } else { Ok(Async::NotReady) @@ -99,59 +98,19 @@ impl Service for OpensslAcceptorService { } fn call(&mut self, req: Self::Request) -> Self::Future { - self.inner.inc(); - OpensslAcceptorServiceFut { - inner: self.inner.clone(), + _guard: self.counter.get(), fut: SslAcceptorExt::accept_async(&self.acceptor, req), } } } -struct Inner { - maxconn: usize, - count: Cell, - task: AtomicTask, -} - -impl Default for Inner { - fn default() -> Inner { - Inner { - maxconn: MAX_CONN.load(Ordering::Relaxed), - count: Cell::new(0), - task: AtomicTask::new(), - } - } -} - -impl Inner { - fn inc(&self) { - let num = self.count.get() + 1; - self.count.set(num); - if num == self.maxconn { - self.task.register(); - } - } - - fn dec(&self) { - let num = self.count.get(); - self.count.set(num - 1); - if num == self.maxconn { - self.task.notify(); - } - } - - fn check(&self) -> bool { - self.count.get() < self.maxconn - } -} - pub struct OpensslAcceptorServiceFut where T: AsyncRead + AsyncWrite, { fut: AcceptAsync, - inner: Rc, + _guard: CounterGuard, } impl Future for OpensslAcceptorServiceFut { @@ -159,14 +118,7 @@ impl Future for OpensslAcceptorServiceFut { type Error = Error; fn poll(&mut self) -> Poll { - let res = self.fut.poll(); - - if let Ok(Async::NotReady) = res { - Ok(Async::NotReady) - } else { - self.inner.dec(); - res - } + self.fut.poll() } } diff --git a/src/worker.rs b/src/worker.rs index 80bf13566..f88f4db8a 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,5 +1,3 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; use std::{net, time}; use futures::sync::mpsc::{SendError, UnboundedSender}; @@ -12,7 +10,7 @@ use actix::{ Response, WrapFuture, }; -use super::server_service::{BoxedServerService, ServerServiceFactory}; +use super::server_service::{self, BoxedServerService, ServerServiceFactory}; use super::{server::Connections, Token}; #[derive(Message)] @@ -61,7 +59,7 @@ impl Message for StopWorker { pub(crate) struct Worker { // conns: Connections, services: Vec, - counters: Vec>, + // counters: Vec>, } impl Actor for Worker { @@ -74,7 +72,7 @@ impl Worker { ) -> Self { let wrk = Worker { services: Vec::new(), - counters: services.iter().map(|i| i.counter()).collect(), + // counters: services.iter().map(|i| i.counter()).collect(), }; ctx.wait( @@ -102,10 +100,7 @@ impl Worker { ) { // sleep for 1 second and then check again ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { - let num = slf - .counters - .iter() - .fold(0, |i, v| i + v.load(Ordering::Relaxed)); + let num = server_service::num_connections(); if num == 0 { let _ = tx.send(true); Arbiter::current().do_send(StopArbiter(0)); @@ -134,20 +129,14 @@ impl Handler for Worker { type Result = Response; fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { - let num = self - .counters - .iter() - .fold(0, |i, v| i + v.load(Ordering::Relaxed)); + let num = server_service::num_connections(); if num == 0 { info!("Shutting down http worker, 0 connections"); Response::reply(Ok(true)) } else if let Some(dur) = msg.graceful { self.shutdown(false); let (tx, rx) = oneshot::channel(); - let num = self - .counters - .iter() - .fold(0, |i, v| i + v.load(Ordering::Relaxed)); + let num = server_service::num_connections(); if num != 0 { info!("Graceful http worker shutdown, {} connections", num); self.shutdown_timeout(ctx, tx, dur);