diff --git a/CHANGES.md b/CHANGES.md index d86de70f0..f7e663d63 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,12 @@ # Changes +## [0.7.4] - 2018-08-xx + +### Added + +* Added `HttpServer::max_connections()` and `HttpServer::max_sslrate()`, accept backpressure #250 + + ## [0.7.3] - 2018-08-01 ### Added diff --git a/Cargo.toml b/Cargo.toml index 31440eb37..86cb53d10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-web" -version = "0.7.3" +version = "0.7.4" authors = ["Nikolay Kim "] description = "Actix web is a simple, pragmatic and extremely fast web framework for Rust." readme = "README.md" diff --git a/src/server/accept.rs b/src/server/accept.rs index 752805600..f846e4a40 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -10,13 +10,13 @@ use tokio_timer::Delay; use actix::{msgs::Execute, Arbiter, System}; use super::srv::{ServerCommand, Socket}; -use super::worker::Conn; +use super::worker::{Conn, WorkerClient}; pub(crate) enum Command { Pause, Resume, Stop, - Worker(usize, mpsc::UnboundedSender>), + Worker(WorkerClient), } struct ServerSocketInfo { @@ -26,40 +26,133 @@ struct ServerSocketInfo { timeout: Option, } +#[derive(Clone)] +pub(crate) struct AcceptNotify { + ready: mio::SetReadiness, + maxconn: usize, + maxconn_low: usize, + maxsslrate: usize, + maxsslrate_low: usize, +} + +impl AcceptNotify { + pub fn new(ready: mio::SetReadiness, maxconn: usize, maxsslrate: usize) -> Self { + let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 }; + let maxsslrate_low = if maxsslrate > 10 { maxsslrate - 10 } else { 0 }; + AcceptNotify { + ready, + maxconn, + maxconn_low, + maxsslrate, + maxsslrate_low, + } + } + + pub fn notify_maxconn(&self, maxconn: usize) { + if maxconn > self.maxconn_low && maxconn <= self.maxconn { + let _ = self.ready.set_readiness(mio::Ready::readable()); + } + } + pub fn notify_maxsslrate(&self, sslrate: usize) { + if sslrate > self.maxsslrate_low && sslrate <= self.maxsslrate { + let _ = self.ready.set_readiness(mio::Ready::readable()); + } + } +} + +impl Default for AcceptNotify { + fn default() -> Self { + AcceptNotify::new(mio::Registration::new2().1, 0, 0) + } +} + +pub(crate) struct AcceptLoop { + cmd_reg: Option, + cmd_ready: mio::SetReadiness, + notify_reg: Option, + notify_ready: mio::SetReadiness, + tx: sync_mpsc::Sender, + rx: Option>, + srv: Option<( + mpsc::UnboundedSender, + mpsc::UnboundedReceiver, + )>, + maxconn: usize, + maxsslrate: usize, +} + +impl AcceptLoop { + pub fn new() -> AcceptLoop { + let (tx, rx) = sync_mpsc::channel(); + let (cmd_reg, cmd_ready) = mio::Registration::new2(); + let (notify_reg, notify_ready) = mio::Registration::new2(); + + AcceptLoop { + tx, + cmd_ready, + cmd_reg: Some(cmd_reg), + notify_ready, + notify_reg: Some(notify_reg), + maxconn: 102_400, + maxsslrate: 256, + rx: Some(rx), + srv: Some(mpsc::unbounded()), + } + } + + pub fn send(&self, msg: Command) { + let _ = self.tx.send(msg); + let _ = self.cmd_ready.set_readiness(mio::Ready::readable()); + } + + pub fn get_notify(&self) -> AcceptNotify { + AcceptNotify::new(self.notify_ready.clone(), self.maxconn, self.maxsslrate) + } + + pub fn max_connections(&mut self, num: usize) { + self.maxconn = num; + } + + pub fn max_sslrate(&mut self, num: usize) { + self.maxsslrate = num; + } + + pub(crate) fn start( + &mut self, socks: Vec<(usize, Socket)>, workers: Vec, + ) -> mpsc::UnboundedReceiver { + let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo"); + + Accept::start( + self.rx.take().expect("Can not re-use AcceptInfo"), + self.cmd_reg.take().expect("Can not re-use AcceptInfo"), + self.notify_reg.take().expect("Can not re-use AcceptInfo"), + self.maxconn, + self.maxsslrate, + socks, + tx, + workers, + ); + rx + } +} + struct Accept { poll: mio::Poll, rx: sync_mpsc::Receiver, sockets: Slab, - workers: Vec<(usize, mpsc::UnboundedSender>)>, - _reg: mio::Registration, - next: usize, + workers: Vec, srv: mpsc::UnboundedSender, timer: (mio::Registration, mio::SetReadiness), + next: usize, + maxconn: usize, + maxsslrate: usize, + backpressure: bool, } +const DELTA: usize = 100; const CMD: mio::Token = mio::Token(0); const TIMER: mio::Token = mio::Token(1); - -pub(crate) fn start_accept_thread( - socks: Vec<(usize, Socket)>, srv: mpsc::UnboundedSender, - workers: Vec<(usize, mpsc::UnboundedSender>)>, -) -> (mio::SetReadiness, sync_mpsc::Sender) { - let (tx, rx) = sync_mpsc::channel(); - let (reg, readiness) = mio::Registration::new2(); - - let sys = System::current(); - - // start accept thread - #[cfg_attr(feature = "cargo-clippy", allow(cyclomatic_complexity))] - let _ = thread::Builder::new() - .name("actix-web accept loop".to_owned()) - .spawn(move || { - System::set_current(sys); - Accept::new(reg, rx, socks, workers, srv).poll(); - }); - - (readiness, tx) -} +const NOTIFY: mio::Token = mio::Token(2); /// This function defines errors that are per-connection. Which basically /// means that if we get this error from `accept()` system call it means @@ -75,11 +168,51 @@ fn connection_error(e: &io::Error) -> bool { } impl Accept { + #![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] + pub(crate) fn start( + rx: sync_mpsc::Receiver, cmd_reg: mio::Registration, + notify_reg: mio::Registration, maxconn: usize, maxsslrate: usize, + socks: Vec<(usize, Socket)>, srv: mpsc::UnboundedSender, + workers: Vec, + ) { + let sys = System::current(); + + // start accept thread + let _ = thread::Builder::new() + .name("actix-web accept loop".to_owned()) + .spawn(move || { + System::set_current(sys); + let mut accept = Accept::new(rx, socks, workers, srv); + accept.maxconn = maxconn; + accept.maxsslrate = maxsslrate; + + // Start listening for incoming commands + if let Err(err) = accept.poll.register( + &cmd_reg, + CMD, + mio::Ready::readable(), + mio::PollOpt::edge(), + ) { + panic!("Can not register Registration: {}", err); + } + + // Start listening for notify updates + if let Err(err) = accept.poll.register( + ¬ify_reg, + NOTIFY, + mio::Ready::readable(), + mio::PollOpt::edge(), + ) { + panic!("Can not register Registration: {}", err); + } + + accept.poll(); + }); + } + fn new( - _reg: mio::Registration, rx: sync_mpsc::Receiver, - socks: Vec<(usize, Socket)>, - workers: Vec<(usize, mpsc::UnboundedSender>)>, - srv: mpsc::UnboundedSender, + rx: sync_mpsc::Receiver, socks: Vec<(usize, Socket)>, + workers: Vec, srv: mpsc::UnboundedSender, ) -> Accept { // Create a poll instance let poll = match mio::Poll::new() { @@ -87,13 +220,6 @@ impl Accept { Err(err) => panic!("Can not create mio::Poll: {}", err), }; - // Start listening for incoming commands - if let Err(err) = - poll.register(&_reg, CMD, mio::Ready::readable(), mio::PollOpt::edge()) - { - panic!("Can not register Registration: {}", err); - } - // Start accept let mut sockets = Slab::new(); for (stoken, sock) in socks { @@ -106,7 +232,7 @@ impl Accept { // Start listening for incoming connections if let Err(err) = poll.register( &server, - mio::Token(token + 1000), + mio::Token(token + DELTA), mio::Ready::readable(), mio::PollOpt::edge(), ) { @@ -132,12 +258,14 @@ impl Accept { Accept { poll, rx, - _reg, sockets, workers, srv, next: 0, timer: (tm, tmr), + maxconn: 102_400, + maxsslrate: 256, + backpressure: false, } } @@ -157,7 +285,14 @@ impl Accept { return; }, TIMER => self.process_timer(), - _ => self.accept(token), + NOTIFY => self.backpressure(false), + _ => { + let token = usize::from(token); + if token < DELTA { + continue; + } + self.accept(token - DELTA); + } } } } @@ -170,7 +305,7 @@ impl Accept { if now > inst { if let Err(err) = self.poll.register( &info.sock, - mio::Token(token + 1000), + mio::Token(token + DELTA), mio::Ready::readable(), mio::PollOpt::edge(), ) { @@ -202,7 +337,7 @@ impl Accept { for (token, info) in self.sockets.iter() { if let Err(err) = self.poll.register( &info.sock, - mio::Token(token + 1000), + mio::Token(token + DELTA), mio::Ready::readable(), mio::PollOpt::edge(), ) { @@ -221,8 +356,9 @@ impl Accept { } return false; } - Command::Worker(idx, addr) => { - self.workers.push((idx, addr)); + Command::Worker(worker) => { + self.backpressure(false); + self.workers.push(worker); } }, Err(err) => match err { @@ -239,48 +375,100 @@ impl Accept { true } - fn accept(&mut self, token: mio::Token) { - let token = usize::from(token); - if token < 1000 { - return; + fn backpressure(&mut self, on: bool) { + if self.backpressure { + if !on { + self.backpressure = false; + for (token, info) in self.sockets.iter() { + if let Err(err) = self.poll.register( + &info.sock, + mio::Token(token + DELTA), + mio::Ready::readable(), + mio::PollOpt::edge(), + ) { + error!("Can not resume socket accept process: {}", err); + } else { + info!("Accepting connections on {} has been resumed", info.addr); + } + } + } + } else if on { + self.backpressure = true; + for (_, info) in self.sockets.iter() { + let _ = self.poll.deregister(&info.sock); + } } + } - if let Some(info) = self.sockets.get_mut(token - 1000) { - loop { - match info.sock.accept_std() { - Ok((io, addr)) => { - let mut msg = Conn { - io, - token: info.token, - peer: Some(addr), - http2: false, - }; - while !self.workers.is_empty() { - match self.workers[self.next].1.unbounded_send(msg) { - Ok(_) => (), - Err(err) => { - let _ = self.srv.unbounded_send( - ServerCommand::WorkerDied( - self.workers[self.next].0, - ), - ); - msg = err.into_inner(); - self.workers.swap_remove(self.next); - if self.workers.is_empty() { - error!("No workers"); - thread::sleep(Duration::from_millis(100)); - break; - } else if self.workers.len() <= self.next { - self.next = 0; - } - continue; - } - } + fn accept_one(&mut self, mut msg: Conn) { + if self.backpressure { + while !self.workers.is_empty() { + match self.workers[self.next].send(msg) { + Ok(_) => (), + Err(err) => { + let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( + self.workers[self.next].idx, + )); + msg = err.into_inner(); + self.workers.swap_remove(self.next); + if self.workers.is_empty() { + error!("No workers"); + return; + } else if self.workers.len() <= self.next { + self.next = 0; + } + continue; + } + } + self.next = (self.next + 1) % self.workers.len(); + break; + } + } else { + let mut idx = 0; + while idx < self.workers.len() { + idx += 1; + if self.workers[self.next].available(self.maxconn, self.maxsslrate) { + match self.workers[self.next].send(msg) { + Ok(_) => { self.next = (self.next + 1) % self.workers.len(); - break; + return; + } + Err(err) => { + let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( + self.workers[self.next].idx, + )); + msg = err.into_inner(); + self.workers.swap_remove(self.next); + if self.workers.is_empty() { + error!("No workers"); + self.backpressure(true); + return; + } else if self.workers.len() <= self.next { + self.next = 0; + } + continue; } } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break, + } + self.next = (self.next + 1) % self.workers.len(); + } + // enable backpressure + self.backpressure(true); + self.accept_one(msg); + } + } + + fn accept(&mut self, token: usize) { + loop { + let msg = if let Some(info) = self.sockets.get_mut(token) { + match info.sock.accept_std() { + Ok((io, addr)) => Conn { + io, + token: info.token, + peer: Some(addr), + http2: false, + }, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, Err(e) => { error!("Error accepting connection: {}", e); @@ -307,10 +495,14 @@ impl Accept { Ok(()) }, )); - break; + return; } } - } + } else { + return; + }; + + self.accept_one(msg); } } } diff --git a/src/server/h1.rs b/src/server/h1.rs index 511b32bce..9f3bda28f 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -464,6 +464,7 @@ where #[cfg(test)] mod tests { use std::net::Shutdown; + use std::sync::{atomic::AtomicUsize, Arc}; use std::{cmp, io, time}; use bytes::{Buf, Bytes, BytesMut}; @@ -473,10 +474,22 @@ mod tests { use super::*; use application::HttpApplication; use httpmessage::HttpMessage; + use server::accept::AcceptNotify; use server::h1decoder::Message; use server::settings::{ServerSettings, WorkerSettings}; use server::{KeepAlive, Request}; + fn wrk_settings() -> WorkerSettings { + WorkerSettings::::new( + Vec::new(), + KeepAlive::Os, + ServerSettings::default(), + AcceptNotify::default(), + Arc::new(AtomicUsize::new(0)), + Arc::new(AtomicUsize::new(0)), + ) + } + impl Message { fn message(self) -> Request { match self { @@ -506,8 +519,7 @@ mod tests { macro_rules! parse_ready { ($e:expr) => {{ - let settings: WorkerSettings = - WorkerSettings::new(Vec::new(), KeepAlive::Os, ServerSettings::default()); + let settings = wrk_settings(); match H1Decoder::new().decode($e, &settings) { Ok(Some(msg)) => msg.message(), Ok(_) => unreachable!("Eof during parsing http request"), @@ -518,8 +530,7 @@ mod tests { macro_rules! expect_parse_err { ($e:expr) => {{ - let settings: WorkerSettings = - WorkerSettings::new(Vec::new(), KeepAlive::Os, ServerSettings::default()); + let settings = wrk_settings(); match H1Decoder::new().decode($e, &settings) { Err(err) => match err { @@ -595,11 +606,7 @@ mod tests { fn test_req_parse() { let buf = Buffer::new("GET /test HTTP/1.1\r\n\r\n"); let readbuf = BytesMut::new(); - let settings = Rc::new(WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - )); + let settings = Rc::new(wrk_settings()); let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf); h1.poll_io(); @@ -611,11 +618,7 @@ mod tests { fn test_req_parse_err() { let buf = Buffer::new("GET /test HTTP/1\r\n\r\n"); let readbuf = BytesMut::new(); - let settings = Rc::new(WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - )); + let settings = Rc::new(wrk_settings()); let mut h1 = Http1::new(Rc::clone(&settings), buf, None, readbuf); h1.poll_io(); @@ -626,11 +629,7 @@ mod tests { #[test] fn test_parse() { let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n\r\n"); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); match reader.decode(&mut buf, &settings) { @@ -647,11 +646,7 @@ mod tests { #[test] fn test_parse_partial() { let mut buf = BytesMut::from("PUT /test HTTP/1"); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); match reader.decode(&mut buf, &settings) { @@ -674,11 +669,7 @@ mod tests { #[test] fn test_parse_post() { let mut buf = BytesMut::from("POST /test2 HTTP/1.0\r\n\r\n"); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); match reader.decode(&mut buf, &settings) { @@ -696,11 +687,7 @@ mod tests { fn test_parse_body() { let mut buf = BytesMut::from("GET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody"); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); match reader.decode(&mut buf, &settings) { @@ -727,11 +714,7 @@ mod tests { fn test_parse_body_crlf() { let mut buf = BytesMut::from("\r\nGET /test HTTP/1.1\r\nContent-Length: 4\r\n\r\nbody"); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); match reader.decode(&mut buf, &settings) { @@ -757,11 +740,7 @@ mod tests { #[test] fn test_parse_partial_eof() { let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n"); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); assert!(reader.decode(&mut buf, &settings).unwrap().is_none()); @@ -780,11 +759,7 @@ mod tests { #[test] fn test_headers_split_field() { let mut buf = BytesMut::from("GET /test HTTP/1.1\r\n"); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); assert!{ reader.decode(&mut buf, &settings).unwrap().is_none() } @@ -815,11 +790,7 @@ mod tests { Set-Cookie: c1=cookie1\r\n\ Set-Cookie: c2=cookie2\r\n\r\n", ); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); let req = msg.message(); @@ -1015,11 +986,7 @@ mod tests { #[test] fn test_http_request_upgrade() { - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut buf = BytesMut::from( "GET /test HTTP/1.1\r\n\ connection: upgrade\r\n\ @@ -1085,12 +1052,7 @@ mod tests { "GET /test HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n", ); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); - + let settings = wrk_settings(); let mut reader = H1Decoder::new(); let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); assert!(msg.is_payload()); @@ -1125,11 +1087,7 @@ mod tests { "GET /test HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n", ); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); assert!(msg.is_payload()); @@ -1163,11 +1121,7 @@ mod tests { "GET /test HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n", ); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); @@ -1214,11 +1168,7 @@ mod tests { &"GET /test HTTP/1.1\r\n\ transfer-encoding: chunked\r\n\r\n"[..], ); - let settings = WorkerSettings::::new( - Vec::new(), - KeepAlive::Os, - ServerSettings::default(), - ); + let settings = wrk_settings(); let mut reader = H1Decoder::new(); let msg = reader.decode(&mut buf, &settings).unwrap().unwrap(); diff --git a/src/server/settings.rs b/src/server/settings.rs index cc2e1c06e..8e30646d9 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -1,7 +1,8 @@ -use std::cell::{Cell, RefCell, RefMut, UnsafeCell}; +use std::cell::{RefCell, RefMut, UnsafeCell}; use std::collections::VecDeque; use std::fmt::Write; use std::rc::Rc; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; use std::{env, fmt, net}; use bytes::BytesMut; @@ -11,6 +12,7 @@ use lazycell::LazyCell; use parking_lot::Mutex; use time; +use super::accept::AcceptNotify; use super::channel::Node; use super::message::{Request, RequestPool}; use super::KeepAlive; @@ -93,21 +95,6 @@ impl ServerSettings { } } - pub(crate) fn parts(&self) -> (Option, String, bool) { - (self.addr, self.host.clone(), self.secure) - } - - pub(crate) fn from_parts(parts: (Option, String, bool)) -> Self { - let (addr, host, secure) = parts; - ServerSettings { - addr, - host, - secure, - cpu_pool: LazyCell::new(), - responses: HttpResponsePool::get_pool(), - } - } - /// Returns the socket address of the local half of this TCP connection pub fn local_addr(&self) -> Option { self.addr @@ -150,14 +137,17 @@ pub(crate) struct WorkerSettings { ka_enabled: bool, bytes: Rc, messages: &'static RequestPool, - channels: Cell, + channels: Arc, node: RefCell>, date: UnsafeCell, + sslrate: Arc, + notify: AcceptNotify, } impl WorkerSettings { pub(crate) fn new( h: Vec, keep_alive: KeepAlive, settings: ServerSettings, + notify: AcceptNotify, channels: Arc, sslrate: Arc, ) -> WorkerSettings { let (keep_alive, ka_enabled) = match keep_alive { KeepAlive::Timeout(val) => (val as u64, true), @@ -169,16 +159,18 @@ impl WorkerSettings { h: RefCell::new(h), bytes: Rc::new(SharedBytesPool::new()), messages: RequestPool::pool(settings), - channels: Cell::new(0), node: RefCell::new(Node::head()), date: UnsafeCell::new(Date::new()), keep_alive, ka_enabled, + channels, + sslrate, + notify, } } pub fn num_channels(&self) -> usize { - self.channels.get() + self.channels.load(Ordering::Relaxed) } pub fn head(&self) -> RefMut> { @@ -210,16 +202,12 @@ impl WorkerSettings { } pub fn add_channel(&self) { - self.channels.set(self.channels.get() + 1); + self.channels.fetch_add(1, Ordering::Relaxed); } pub fn remove_channel(&self) { - let num = self.channels.get(); - if num > 0 { - self.channels.set(num - 1); - } else { - error!("Number of removed channels is bigger than added channel. Bug in actix-web"); - } + let val = self.channels.fetch_sub(1, Ordering::Relaxed); + self.notify.notify_maxconn(val); } pub fn update_date(&self) { @@ -240,6 +228,16 @@ impl WorkerSettings { dst.extend_from_slice(date_bytes); } } + + #[allow(dead_code)] + pub(crate) fn ssl_conn_add(&self) { + self.sslrate.fetch_add(1, Ordering::Relaxed); + } + #[allow(dead_code)] + pub(crate) fn ssl_conn_del(&self) { + let val = self.sslrate.fetch_sub(1, Ordering::Relaxed); + self.notify.notify_maxsslrate(val); + } } struct Date { @@ -311,6 +309,9 @@ mod tests { Vec::new(), KeepAlive::Os, ServerSettings::default(), + AcceptNotify::default(), + Arc::new(AtomicUsize::new(0)), + Arc::new(AtomicUsize::new(0)), ); let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); settings.set_date(&mut buf1, true); diff --git a/src/server/srv.rs b/src/server/srv.rs index e776f7422..b6bd21967 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -1,5 +1,5 @@ use std::rc::Rc; -use std::sync::{mpsc as sync_mpsc, Arc}; +use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Duration; use std::{io, net}; @@ -10,10 +10,8 @@ use actix::{ use futures::sync::mpsc; use futures::{Future, Sink, Stream}; -use mio; use net2::TcpBuilder; use num_cpus; -use slab::Slab; use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "tls")] @@ -25,10 +23,12 @@ use openssl::ssl::{AlpnError, SslAcceptorBuilder}; #[cfg(feature = "rust-tls")] use rustls::ServerConfig; -use super::accept::{start_accept_thread, Command}; +use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::channel::{HttpChannel, WrapperStream}; use super::settings::{ServerSettings, WorkerSettings}; -use super::worker::{Conn, SocketInfo, StopWorker, StreamHandlerType, Worker}; +use super::worker::{ + Conn, StopWorker, StreamHandlerType, Worker, WorkerClient, WorkersPool, +}; use super::{IntoHttpHandler, IoStream, KeepAlive}; use super::{PauseServer, ResumeServer, StopServer}; @@ -54,17 +54,10 @@ where h: Option>>, threads: usize, backlog: i32, - host: Option, - keep_alive: KeepAlive, - factory: Arc Vec + Send + Sync>, - #[cfg_attr(feature = "cargo-clippy", allow(type_complexity))] - workers: Vec<(usize, Addr>)>, sockets: Vec, - accept: Option<( - mio::SetReadiness, - sync_mpsc::Sender, - Slab, - )>, + pool: WorkersPool, + workers: Vec<(usize, Addr>)>, + accept: AcceptLoop, exit: bool, shutdown_timeout: u16, signals: Option>, @@ -105,12 +98,10 @@ where h: None, threads: num_cpus::get(), backlog: 2048, - host: None, - keep_alive: KeepAlive::Os, - factory: Arc::new(f), + pool: WorkersPool::new(f), workers: Vec::new(), sockets: Vec::new(), - accept: None, + accept: AcceptLoop::new(), exit: false, shutdown_timeout: 30, signals: None, @@ -128,15 +119,6 @@ where self } - #[doc(hidden)] - #[deprecated( - since = "0.6.0", - note = "please use `HttpServer::workers()` instead" - )] - pub fn threads(self, num: usize) -> Self { - self.workers(num) - } - /// Set the maximum number of pending connections. /// /// This refers to the number of clients that can be waiting to be served. @@ -152,11 +134,34 @@ where self } + /// Sets the maximum per-worker number of concurrent connections. + /// + /// All socket listeners will stop accepting connections when this limit is reached + /// for each worker. + /// + /// By default max connections is set to a 100k. + pub fn max_connections(mut self, num: usize) -> Self { + self.accept.max_connections(num); + self + } + + /// Sets the maximum concurrent per-worker number of SSL handshakes. + /// + /// All listeners will stop accepting connections when this limit is reached. It + /// can be used to limit the global SSL CPU usage regardless of each worker + /// capacity. + /// + /// By default max connections is set to a 256. + pub fn max_sslrate(mut self, num: usize) -> Self { + self.accept.max_sslrate(num); + self + } + /// Set server keep-alive setting. /// /// By default keep alive is set to a `Os`. pub fn keep_alive>(mut self, val: T) -> Self { - self.keep_alive = val.into(); + self.pool.keep_alive = val.into(); self } @@ -166,7 +171,7 @@ where /// generation. Check [ConnectionInfo](./dev/struct.ConnectionInfo. /// html#method.host) documentation for more information. pub fn server_hostname(mut self, val: String) -> Self { - self.host = Some(val); + self.pool.host = Some(val); self } @@ -395,27 +400,12 @@ where Ok(self) } - fn start_workers( - &mut self, settings: &ServerSettings, sockets: &Slab, - ) -> Vec<(usize, mpsc::UnboundedSender>)> { + fn start_workers(&mut self, notify: &AcceptNotify) -> Vec { // start workers let mut workers = Vec::new(); for idx in 0..self.threads { - let (tx, rx) = mpsc::unbounded::>(); - - let ka = self.keep_alive; - let socks = sockets.clone(); - let factory = Arc::clone(&self.factory); - let parts = settings.parts(); - - let addr = Arbiter::start(move |ctx: &mut Context<_>| { - let s = ServerSettings::from_parts(parts); - let apps: Vec<_> = - (*factory)().into_iter().map(|h| h.into_handler()).collect(); - ctx.add_message_stream(rx); - Worker::new(apps, socks, ka, s) - }); - workers.push((idx, tx)); + let (worker, addr) = self.pool.start(idx, notify.clone()); + workers.push(worker); self.workers.push((idx, addr)); } info!("Starting {} http workers", self.threads); @@ -466,30 +456,20 @@ impl HttpServer { if self.sockets.is_empty() { panic!("HttpServer::bind() has to be called before start()"); } else { - let (tx, rx) = mpsc::unbounded(); - - let mut socks = Slab::new(); let mut addrs: Vec<(usize, Socket)> = Vec::new(); for socket in self.sockets.drain(..) { - let entry = socks.vacant_entry(); - let token = entry.key(); - entry.insert(SocketInfo { - addr: socket.addr, - htype: socket.tp.clone(), - }); + let token = self.pool.insert(socket.addr, socket.tp.clone()); addrs.push((token, socket)); } - - let settings = ServerSettings::new(Some(addrs[0].1.addr), &self.host, false); - let workers = self.start_workers(&settings, &socks); + let notify = self.accept.get_notify(); + let workers = self.start_workers(¬ify); // start accept thread for (_, sock) in &addrs { info!("Starting server on http://{}", sock.addr); } - let (r, cmd) = start_accept_thread(addrs, tx.clone(), workers.clone()); - self.accept = Some((r, cmd, socks)); + let rx = self.accept.start(addrs, workers.clone()); // start http server actor let signals = self.subscribe_to_signals(); @@ -600,15 +580,18 @@ impl HttpServer { { // set server settings let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); - let settings = ServerSettings::new(Some(addr), &self.host, secure); - let apps: Vec<_> = (*self.factory)() + let settings = ServerSettings::new(Some(addr), &self.pool.host, secure); + let apps: Vec<_> = (*self.pool.factory)() .into_iter() .map(|h| h.into_handler()) .collect(); self.h = Some(Rc::new(WorkerSettings::new( apps, - self.keep_alive, + self.pool.keep_alive, settings, + AcceptNotify::default(), + Arc::new(AtomicUsize::new(0)), + Arc::new(AtomicUsize::new(0)), ))); // start server @@ -676,7 +659,6 @@ impl StreamHandler for HttpServer { if found { error!("Worker has died {:?}, restarting", idx); - let (tx, rx) = mpsc::unbounded::>(); let mut new_idx = self.workers.len(); 'found: loop { @@ -689,25 +671,10 @@ impl StreamHandler for HttpServer { break; } - let ka = self.keep_alive; - let factory = Arc::clone(&self.factory); - let host = self.host.clone(); - let socks = self.accept.as_ref().unwrap().2.clone(); - let addr = socks[0].addr; - - let addr = Arbiter::start(move |ctx: &mut Context<_>| { - let settings = ServerSettings::new(Some(addr), &host, false); - let apps: Vec<_> = - (*factory)().into_iter().map(|h| h.into_handler()).collect(); - ctx.add_message_stream(rx); - Worker::new(apps, socks, ka, settings) - }); - if let Some(ref item) = &self.accept { - let _ = item.1.send(Command::Worker(new_idx, tx.clone())); - let _ = item.0.set_readiness(mio::Ready::readable()); - } - + let (worker, addr) = + self.pool.start(new_idx, self.accept.get_notify()); self.workers.push((new_idx, addr)); + self.accept.send(Command::Worker(worker)); } } } @@ -735,10 +702,7 @@ impl Handler for HttpServer { type Result = (); fn handle(&mut self, _: PauseServer, _: &mut Context) { - for item in &self.accept { - let _ = item.1.send(Command::Pause); - let _ = item.0.set_readiness(mio::Ready::readable()); - } + self.accept.send(Command::Pause); } } @@ -746,10 +710,7 @@ impl Handler for HttpServer { type Result = (); fn handle(&mut self, _: ResumeServer, _: &mut Context) { - for item in &self.accept { - let _ = item.1.send(Command::Resume); - let _ = item.0.set_readiness(mio::Ready::readable()); - } + self.accept.send(Command::Resume); } } @@ -758,10 +719,7 @@ impl Handler for HttpServer { fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { // stop accept threads - for item in &self.accept { - let _ = item.1.send(Command::Stop); - let _ = item.0.set_readiness(mio::Ready::readable()); - } + self.accept.send(Command::Stop); // stop workers let (tx, rx) = mpsc::channel(1); diff --git a/src/server/worker.rs b/src/server/worker.rs index 5e753ce58..ed0799563 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -1,9 +1,12 @@ +use std::rc::Rc; +use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; +use std::{net, time}; + +use futures::sync::mpsc::{unbounded, SendError, UnboundedSender}; use futures::sync::oneshot; use futures::Future; use net2::TcpStreamExt; use slab::Slab; -use std::rc::Rc; -use std::{net, time}; use tokio::executor::current_thread; use tokio_reactor::Handle; use tokio_tcp::TcpStream; @@ -24,16 +27,15 @@ use tokio_openssl::SslAcceptorExt; #[cfg(feature = "rust-tls")] use rustls::{ServerConfig, Session}; #[cfg(feature = "rust-tls")] -use std::sync::Arc; -#[cfg(feature = "rust-tls")] use tokio_rustls::ServerConfigExt; use actix::msgs::StopArbiter; -use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message, Response}; +use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, Message, Response}; -use server::channel::HttpChannel; -use server::settings::{ServerSettings, WorkerSettings}; -use server::{HttpHandler, KeepAlive}; +use super::accept::AcceptNotify; +use super::channel::HttpChannel; +use super::settings::{ServerSettings, WorkerSettings}; +use super::{HttpHandler, IntoHttpHandler, KeepAlive}; #[derive(Message)] pub(crate) struct Conn { @@ -49,6 +51,95 @@ pub(crate) struct SocketInfo { pub htype: StreamHandlerType, } +pub(crate) struct WorkersPool { + sockets: Slab, + pub factory: Arc Vec + Send + Sync>, + pub host: Option, + pub keep_alive: KeepAlive, +} + +impl WorkersPool { + pub fn new(factory: F) -> Self + where + F: Fn() -> Vec + Send + Sync + 'static, + { + WorkersPool { + factory: Arc::new(factory), + host: None, + keep_alive: KeepAlive::Os, + sockets: Slab::new(), + } + } + + pub fn insert(&mut self, addr: net::SocketAddr, htype: StreamHandlerType) -> usize { + let entry = self.sockets.vacant_entry(); + let token = entry.key(); + entry.insert(SocketInfo { addr, htype }); + token + } + + pub fn start( + &mut self, idx: usize, notify: AcceptNotify, + ) -> (WorkerClient, Addr>) { + let host = self.host.clone(); + let addr = self.sockets[0].addr; + let factory = Arc::clone(&self.factory); + let socks = self.sockets.clone(); + let ka = self.keep_alive; + let (tx, rx) = unbounded::>(); + let client = WorkerClient::new(idx, tx, self.sockets.clone()); + let conn = client.conn.clone(); + let sslrate = client.sslrate.clone(); + + let addr = Arbiter::start(move |ctx: &mut Context<_>| { + let s = ServerSettings::new(Some(addr), &host, false); + let apps: Vec<_> = + (*factory)().into_iter().map(|h| h.into_handler()).collect(); + ctx.add_message_stream(rx); + Worker::new(apps, socks, ka, s, conn, sslrate, notify) + }); + + (client, addr) + } +} + +#[derive(Clone)] +pub(crate) struct WorkerClient { + pub idx: usize, + tx: UnboundedSender>, + info: Slab, + pub conn: Arc, + pub sslrate: Arc, +} + +impl WorkerClient { + fn new( + idx: usize, tx: UnboundedSender>, info: Slab, + ) -> Self { + WorkerClient { + idx, + tx, + info, + conn: Arc::new(AtomicUsize::new(0)), + sslrate: Arc::new(AtomicUsize::new(0)), + } + } + + pub fn send( + &self, msg: Conn, + ) -> Result<(), SendError>> { + self.tx.unbounded_send(msg) + } + + pub fn available(&self, maxconn: usize, maxsslrate: usize) -> bool { + if maxsslrate <= self.sslrate.load(Ordering::Relaxed) { + false + } else { + maxconn > self.conn.load(Ordering::Relaxed) + } + } +} + /// Stop worker message. Returns `true` on successful shutdown /// and `false` if some connections still alive. pub(crate) struct StopWorker { @@ -75,7 +166,8 @@ where impl Worker { pub(crate) fn new( h: Vec, socks: Slab, keep_alive: KeepAlive, - settings: ServerSettings, + settings: ServerSettings, conn: Arc, sslrate: Arc, + notify: AcceptNotify, ) -> Worker { let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { Some(time::Duration::new(val as u64, 0)) @@ -84,7 +176,9 @@ impl Worker { }; Worker { - settings: Rc::new(WorkerSettings::new(h, keep_alive, settings)), + settings: Rc::new(WorkerSettings::new( + h, keep_alive, settings, notify, conn, sslrate, + )), socks, tcp_ka, } @@ -182,6 +276,18 @@ pub(crate) enum StreamHandlerType { } impl StreamHandlerType { + pub fn is_ssl(&self) -> bool { + match *self { + StreamHandlerType::Normal => false, + #[cfg(feature = "tls")] + StreamHandlerType::Tls(_) => true, + #[cfg(feature = "alpn")] + StreamHandlerType::Alpn(_) => true, + #[cfg(feature = "rust-tls")] + StreamHandlerType::Rustls(_) => true, + } + } + fn handle( &mut self, h: Rc>, msg: Conn, ) { @@ -201,9 +307,11 @@ impl StreamHandlerType { let _ = io.set_nodelay(true); let io = TcpStream::from_std(io, &Handle::default()) .expect("failed to associate TCP stream"); + self.settings.ssl_conn_add(); current_thread::spawn(TlsAcceptorExt::accept_async(acceptor, io).then( move |res| { + self.settings.ssl_conn_del(); match res { Ok(io) => current_thread::spawn(HttpChannel::new( h, io, peer, http2, @@ -222,9 +330,11 @@ impl StreamHandlerType { let _ = io.set_nodelay(true); let io = TcpStream::from_std(io, &Handle::default()) .expect("failed to associate TCP stream"); + self.settings.ssl_conn_add(); current_thread::spawn(SslAcceptorExt::accept_async(acceptor, io).then( move |res| { + self.settings.ssl_conn_del(); match res { Ok(io) => { let http2 = if let Some(p) = @@ -252,9 +362,11 @@ impl StreamHandlerType { let _ = io.set_nodelay(true); let io = TcpStream::from_std(io, &Handle::default()) .expect("failed to associate TCP stream"); + self.settings.ssl_conn_add(); current_thread::spawn(ServerConfigExt::accept_async(acceptor, io).then( move |res| { + self.settings.ssl_conn_del(); match res { Ok(io) => { let http2 = if let Some(p) =