1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-01 21:08:43 +00:00

refactor connections counter

This commit is contained in:
Nikolay Kim 2018-09-08 09:36:38 -07:00
parent 5f016bd53c
commit 8b13236d41
6 changed files with 130 additions and 129 deletions

View file

@ -54,7 +54,9 @@ tokio-io = "0.1"
tokio-tcp = "0.1"
tokio-timer = "0.2"
tokio-reactor = "0.1"
tokio-current-thread = "0.1"
tower-service = "0.1"
trust-dns-resolver = "0.10.0-alpha.2"
# native-tls

View file

@ -27,6 +27,7 @@ extern crate net2;
extern crate num_cpus;
extern crate slab;
extern crate tokio;
extern crate tokio_current_thread;
extern crate tokio_io;
extern crate tokio_reactor;
extern crate tokio_tcp;

View file

@ -13,8 +13,8 @@ use actix::{
};
use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::server_service::{self, ServerNewService, ServerServiceFactory};
use super::worker::{Conn, StopWorker, Worker, WorkerAvailability, WorkerClient};
use super::server_service::{ServerNewService, ServerServiceFactory};
use super::worker::{self, Conn, StopWorker, Worker, WorkerAvailability, WorkerClient};
use super::NewService;
use super::{PauseServer, ResumeServer, StopServer, Token};
@ -73,7 +73,7 @@ impl Server {
///
/// By default max connections is set to a 25k per worker.
pub fn maxconn(self, num: usize) -> Self {
server_service::max_concurrent_connections(num);
worker::max_concurrent_connections(num);
self
}

View file

@ -1,11 +1,7 @@
use std::cell::Cell;
use std::net;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::future::{err, ok};
use futures::task::AtomicTask;
use futures::{Async, Future, Poll};
use futures::{Future, Poll};
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
@ -26,37 +22,13 @@ pub(crate) type BoxedServerService = Box<
>,
>;
const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
pub fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed);
}
pub(crate) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|counter| counter.total())
}
thread_local! {
static MAX_CONNS_COUNTER: Counter = Counter::new(MAX_CONNS.load(Ordering::Relaxed));
}
pub(crate) struct ServerService<T> {
service: T,
counter: Counter,
}
impl<T> ServerService<T> {
fn new(service: T) -> Self {
MAX_CONNS_COUNTER.with(|counter| ServerService {
service,
counter: counter.clone(),
})
ServerService { service }
}
}
@ -72,11 +44,7 @@ where
type Future = Box<Future<Item = (), Error = ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.counter.check() {
self.service.poll_ready().map_err(|_| ())
} else {
Ok(Async::NotReady)
}
self.service.poll_ready().map_err(|_| ())
}
fn call(&mut self, req: ServerMessage) -> Self::Future {
@ -87,14 +55,7 @@ where
});
if let Ok(stream) = stream {
let guard = self.counter.get();
Box::new(
self.service
.call(stream)
.map_err(|_| ())
.map(move |_| drop(guard)),
)
Box::new(self.service.call(stream))
} else {
Box::new(err(()))
}
@ -159,71 +120,3 @@ impl ServerServiceFactory for Box<ServerServiceFactory> {
self.as_ref().create()
}
}
#[derive(Clone)]
pub(crate) struct Counter(Rc<CounterInner>);
struct CounterInner {
count: Cell<usize>,
maxconn: usize,
task: AtomicTask,
}
impl Counter {
pub fn new(maxconn: usize) -> Self {
Counter(Rc::new(CounterInner {
maxconn,
count: Cell::new(0),
task: AtomicTask::new(),
}))
}
pub fn get(&self) -> CounterGuard {
CounterGuard::new(self.0.clone())
}
pub fn check(&self) -> bool {
self.0.check()
}
pub fn total(&self) -> usize {
self.0.count.get()
}
}
pub(crate) struct CounterGuard(Rc<CounterInner>);
impl CounterGuard {
fn new(inner: Rc<CounterInner>) -> Self {
inner.inc();
CounterGuard(inner)
}
}
impl Drop for CounterGuard {
fn drop(&mut self) {
self.0.dec();
}
}
impl CounterInner {
fn inc(&self) {
let num = self.count.get() + 1;
self.count.set(num);
if num == self.maxconn {
self.task.register();
}
}
fn dec(&self) {
let num = self.count.get();
self.count.set(num - 1);
if num == self.maxconn {
self.task.notify();
}
}
fn check(&self) -> bool {
self.count.get() < self.maxconn
}
}

View file

@ -1,7 +1,7 @@
//! SSL Services
use std::sync::atomic::{AtomicUsize, Ordering};
use super::server_service::Counter;
use super::worker::Connections;
#[cfg(feature = "ssl")]
mod openssl;
@ -21,7 +21,7 @@ pub fn max_concurrent_ssl_connect(num: usize) {
}
thread_local! {
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
static MAX_CONN_COUNTER: Connections = Connections::new(MAX_CONN.load(Ordering::Relaxed));
}
// #[cfg(feature = "tls")]

View file

@ -1,10 +1,14 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::cell::Cell;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::{net, time};
use futures::sync::mpsc::{SendError, UnboundedSender};
use futures::sync::oneshot;
use futures::task::AtomicTask;
use futures::{future, Async, Future, Poll};
use tokio_current_thread::spawn;
use actix::msgs::StopArbiter;
use actix::{
@ -13,7 +17,7 @@ use actix::{
};
use super::accept::AcceptNotify;
use super::server_service::{self, BoxedServerService, ServerMessage, ServerServiceFactory};
use super::server_service::{BoxedServerService, ServerMessage, ServerServiceFactory};
use super::Token;
#[derive(Message)]
@ -24,6 +28,27 @@ pub(crate) struct Conn {
pub peer: Option<net::SocketAddr>,
}
const MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
pub fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed);
}
pub(crate) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|conns| conns.total())
}
thread_local! {
static MAX_CONNS_COUNTER: Connections =
Connections::new(MAX_CONNS.load(Ordering::Relaxed));
}
#[derive(Clone)]
pub(crate) struct WorkerClient {
pub idx: usize,
@ -88,6 +113,7 @@ impl Message for StopWorker {
pub(crate) struct Worker {
services: Vec<BoxedServerService>,
availability: WorkerAvailability,
conns: Connections,
}
impl Actor for Worker {
@ -99,10 +125,11 @@ impl Worker {
ctx: &mut Context<Self>, services: Vec<Box<ServerServiceFactory + Send>>,
availability: WorkerAvailability,
) -> Self {
let wrk = Worker {
let wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
availability,
services: Vec::new(),
};
conns: conns.clone(),
});
ctx.wait(
future::join_all(services.into_iter().map(|s| s.create()))
@ -139,7 +166,7 @@ impl Worker {
) {
// sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
let num = server_service::num_connections();
let num = num_connections();
if num == 0 {
let _ = tx.send(true);
Arbiter::current().do_send(StopArbiter(0));
@ -159,7 +186,15 @@ impl Handler<Conn> for Worker {
type Result = ();
fn handle(&mut self, msg: Conn, _: &mut Context<Self>) {
Arbiter::spawn(self.services[msg.handler.0].call(ServerMessage::Connect(msg.io)))
let guard = self.conns.get();
spawn(
self.services[msg.handler.0]
.call(ServerMessage::Connect(msg.io))
.map(|val| {
drop(guard);
val
}),
)
}
}
@ -168,14 +203,14 @@ impl Handler<StopWorker> for Worker {
type Result = Response<bool, ()>;
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
let num = server_service::num_connections();
let num = num_connections();
if num == 0 {
info!("Shutting down http worker, 0 connections");
Response::reply(Ok(true))
} else if let Some(dur) = msg.graceful {
self.shutdown(false);
let (tx, rx) = oneshot::channel();
let num = server_service::num_connections();
let num = num_connections();
if num != 0 {
info!("Graceful http worker shutdown, {} connections", num);
self.shutdown_timeout(ctx, tx, dur);
@ -199,11 +234,13 @@ impl ActorFuture for CheckReadiness {
type Actor = Worker;
fn poll(&mut self, act: &mut Worker, _: &mut Context<Worker>) -> Poll<(), ()> {
let mut val = true;
for service in &mut act.services {
if let Ok(Async::NotReady) = service.poll_ready() {
val = false;
break;
let mut val = act.conns.check();
if val {
for service in &mut act.services {
if let Ok(Async::NotReady) = service.poll_ready() {
val = false;
break;
}
}
}
if self.0 != val {
@ -213,3 +250,71 @@ impl ActorFuture for CheckReadiness {
Ok(Async::NotReady)
}
}
#[derive(Clone)]
pub(crate) struct Connections(Rc<ConnectionsInner>);
struct ConnectionsInner {
count: Cell<usize>,
maxconn: usize,
task: AtomicTask,
}
impl Connections {
pub fn new(maxconn: usize) -> Self {
Connections(Rc::new(ConnectionsInner {
maxconn,
count: Cell::new(0),
task: AtomicTask::new(),
}))
}
pub fn get(&self) -> ConnectionsGuard {
ConnectionsGuard::new(self.0.clone())
}
pub fn check(&self) -> bool {
self.0.check()
}
pub fn total(&self) -> usize {
self.0.count.get()
}
}
pub(crate) struct ConnectionsGuard(Rc<ConnectionsInner>);
impl ConnectionsGuard {
fn new(inner: Rc<ConnectionsInner>) -> Self {
inner.inc();
ConnectionsGuard(inner)
}
}
impl Drop for ConnectionsGuard {
fn drop(&mut self) {
self.0.dec();
}
}
impl ConnectionsInner {
fn inc(&self) {
let num = self.count.get() + 1;
self.count.set(num);
if num == self.maxconn {
self.task.register();
}
}
fn dec(&self) {
let num = self.count.get();
self.count.set(num - 1);
if num == self.maxconn {
self.task.notify();
}
}
fn check(&self) -> bool {
self.count.get() < self.maxconn
}
}