From e4ce6dfbdf0c6cd25ec5c1723b6611cf456ed8f5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 9 Aug 2018 11:52:32 -0700 Subject: [PATCH] refactor workers management --- src/server/accept.rs | 131 +++----- src/server/channel.rs | 16 +- src/server/h1.rs | 14 +- src/server/mod.rs | 14 + src/server/server.rs | 504 +++++++++++++++++++++++++++++++ src/server/settings.rs | 68 +++-- src/server/srv.rs | 664 +++++++++++++++++++++++++---------------- src/server/worker.rs | 477 +++-------------------------- src/test.rs | 13 +- 9 files changed, 1061 insertions(+), 840 deletions(-) create mode 100644 src/server/server.rs diff --git a/src/server/accept.rs b/src/server/accept.rs index 61bc72fbe..d642c40f6 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -9,8 +9,9 @@ use tokio_timer::Delay; use actix::{msgs::Execute, Arbiter, System}; -use super::srv::ServerCommand; -use super::worker::{Conn, Socket, Token, WorkerClient}; +use super::server::ServerCommand; +use super::worker::{Conn, WorkerClient}; +use super::Token; pub(crate) enum Command { Pause, @@ -22,51 +23,27 @@ pub(crate) enum Command { struct ServerSocketInfo { addr: net::SocketAddr, token: Token, + handler: Token, sock: mio::net::TcpListener, timeout: Option, } #[derive(Clone)] -pub(crate) struct AcceptNotify { - ready: mio::SetReadiness, - maxconn: usize, - maxconn_low: usize, - maxconnrate: usize, - maxconnrate_low: usize, -} +pub(crate) struct AcceptNotify(mio::SetReadiness); impl AcceptNotify { - pub fn new(ready: mio::SetReadiness, maxconn: usize, maxconnrate: usize) -> Self { - let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 }; - let maxconnrate_low = if maxconnrate > 10 { - maxconnrate - 10 - } else { - 0 - }; - AcceptNotify { - ready, - maxconn, - maxconn_low, - maxconnrate, - maxconnrate_low, - } + pub(crate) fn new(ready: mio::SetReadiness) -> Self { + AcceptNotify(ready) } - pub fn notify_maxconn(&self, maxconn: usize) { - if maxconn > self.maxconn_low && maxconn <= self.maxconn { - let _ = self.ready.set_readiness(mio::Ready::readable()); - } - } - pub fn notify_maxconnrate(&self, connrate: usize) { - if connrate > self.maxconnrate_low && connrate <= self.maxconnrate { - let _ = self.ready.set_readiness(mio::Ready::readable()); - } + pub(crate) fn notify(&self) { + let _ = self.0.set_readiness(mio::Ready::readable()); } } impl Default for AcceptNotify { fn default() -> Self { - AcceptNotify::new(mio::Registration::new2().1, 0, 0) + AcceptNotify::new(mio::Registration::new2().1) } } @@ -81,8 +58,6 @@ pub(crate) struct AcceptLoop { mpsc::UnboundedSender, mpsc::UnboundedReceiver, )>, - maxconn: usize, - maxconnrate: usize, } impl AcceptLoop { @@ -97,8 +72,6 @@ impl AcceptLoop { cmd_reg: Some(cmd_reg), notify_ready, notify_reg: Some(notify_reg), - maxconn: 102_400, - maxconnrate: 256, rx: Some(rx), srv: Some(mpsc::unbounded()), } @@ -110,19 +83,12 @@ impl AcceptLoop { } pub fn get_notify(&self) -> AcceptNotify { - AcceptNotify::new(self.notify_ready.clone(), self.maxconn, self.maxconnrate) - } - - pub fn maxconn(&mut self, num: usize) { - self.maxconn = num; - } - - pub fn maxconnrate(&mut self, num: usize) { - self.maxconnrate = num; + AcceptNotify::new(self.notify_ready.clone()) } pub(crate) fn start( - &mut self, socks: Vec, workers: Vec, + &mut self, socks: Vec>, + workers: Vec, ) -> mpsc::UnboundedReceiver { let (tx, rx) = self.srv.take().expect("Can not re-use AcceptInfo"); @@ -130,8 +96,6 @@ impl AcceptLoop { self.rx.take().expect("Can not re-use AcceptInfo"), self.cmd_reg.take().expect("Can not re-use AcceptInfo"), self.notify_reg.take().expect("Can not re-use AcceptInfo"), - self.maxconn, - self.maxconnrate, socks, tx, workers, @@ -148,8 +112,6 @@ struct Accept { srv: mpsc::UnboundedSender, timer: (mio::Registration, mio::SetReadiness), next: usize, - maxconn: usize, - maxconnrate: usize, backpressure: bool, } @@ -175,9 +137,8 @@ impl Accept { #![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub(crate) fn start( rx: sync_mpsc::Receiver, cmd_reg: mio::Registration, - notify_reg: mio::Registration, maxconn: usize, maxconnrate: usize, - socks: Vec, srv: mpsc::UnboundedSender, - workers: Vec, + notify_reg: mio::Registration, socks: Vec>, + srv: mpsc::UnboundedSender, workers: Vec, ) { let sys = System::current(); @@ -187,8 +148,6 @@ impl Accept { .spawn(move || { System::set_current(sys); let mut accept = Accept::new(rx, socks, workers, srv); - accept.maxconn = maxconn; - accept.maxconnrate = maxconnrate; // Start listening for incoming commands if let Err(err) = accept.poll.register( @@ -215,7 +174,7 @@ impl Accept { } fn new( - rx: sync_mpsc::Receiver, socks: Vec, + rx: sync_mpsc::Receiver, socks: Vec>, workers: Vec, srv: mpsc::UnboundedSender, ) -> Accept { // Create a poll instance @@ -226,29 +185,33 @@ impl Accept { // Start accept let mut sockets = Slab::new(); - for sock in socks { - let server = mio::net::TcpListener::from_std(sock.lst) - .expect("Can not create mio::net::TcpListener"); + for (idx, srv_socks) in socks.into_iter().enumerate() { + for (hnd_token, lst) in srv_socks { + let addr = lst.local_addr().unwrap(); + let server = mio::net::TcpListener::from_std(lst) + .expect("Can not create mio::net::TcpListener"); - let entry = sockets.vacant_entry(); - let token = entry.key(); + let entry = sockets.vacant_entry(); + let token = entry.key(); - // Start listening for incoming connections - if let Err(err) = poll.register( - &server, - mio::Token(token + DELTA), - mio::Ready::readable(), - mio::PollOpt::edge(), - ) { - panic!("Can not register io: {}", err); + // Start listening for incoming connections + if let Err(err) = poll.register( + &server, + mio::Token(token + DELTA), + mio::Ready::readable(), + mio::PollOpt::edge(), + ) { + panic!("Can not register io: {}", err); + } + + entry.insert(ServerSocketInfo { + addr, + token: hnd_token, + handler: Token(idx), + sock: server, + timeout: None, + }); } - - entry.insert(ServerSocketInfo { - token: sock.token, - addr: sock.addr, - sock: server, - timeout: None, - }); } // Timer @@ -267,8 +230,6 @@ impl Accept { srv, next: 0, timer: (tm, tmr), - maxconn: 102_400, - maxconnrate: 256, backpressure: false, } } @@ -431,7 +392,7 @@ impl Accept { let mut idx = 0; while idx < self.workers.len() { idx += 1; - if self.workers[self.next].available(self.maxconn, self.maxconnrate) { + if self.workers[self.next].available() { match self.workers[self.next].send(msg) { Ok(_) => { self.next = (self.next + 1) % self.workers.len(); @@ -469,6 +430,7 @@ impl Accept { Ok((io, addr)) => Conn { io, token: info.token, + handler: info.handler, peer: Some(addr), }, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, @@ -489,11 +451,10 @@ impl Accept { Delay::new( Instant::now() + Duration::from_millis(510), ).map_err(|_| ()) - .and_then(move |_| { - let _ = - r.set_readiness(mio::Ready::readable()); - Ok(()) - }), + .and_then(move |_| { + let _ = r.set_readiness(mio::Ready::readable()); + Ok(()) + }), ); Ok(()) }, diff --git a/src/server/channel.rs b/src/server/channel.rs index c158f66b4..7de561c6b 100644 --- a/src/server/channel.rs +++ b/src/server/channel.rs @@ -7,7 +7,7 @@ use futures::{Async, Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; use super::settings::WorkerSettings; -use super::{h1, h2, HttpHandler, IoStream}; +use super::{h1, h2, ConnectionTag, HttpHandler, IoStream}; const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0"; @@ -30,6 +30,7 @@ where { proto: Option>, node: Option>>, + _tag: ConnectionTag, } impl HttpChannel @@ -40,9 +41,10 @@ where pub(crate) fn new( settings: Rc>, io: T, peer: Option, ) -> HttpChannel { - settings.add_channel(); + let _tag = settings.connection(); HttpChannel { + _tag, node: None, proto: Some(HttpProtocol::Unknown( settings, @@ -97,7 +99,6 @@ where let result = h1.poll(); match result { Ok(Async::Ready(())) | Err(_) => { - h1.settings().remove_channel(); if let Some(n) = self.node.as_mut() { n.remove() }; @@ -110,7 +111,6 @@ where let result = h2.poll(); match result { Ok(Async::Ready(())) | Err(_) => { - h2.settings().remove_channel(); if let Some(n) = self.node.as_mut() { n.remove() }; @@ -119,16 +119,10 @@ where } return result; } - Some(HttpProtocol::Unknown( - ref mut settings, - _, - ref mut io, - ref mut buf, - )) => { + Some(HttpProtocol::Unknown(_, _, ref mut io, ref mut buf)) => { match io.read_available(buf) { Ok(Async::Ready(true)) | Err(_) => { debug!("Ignored premature client disconnection"); - settings.remove_channel(); if let Some(n) = self.node.as_mut() { n.remove() }; diff --git a/src/server/h1.rs b/src/server/h1.rs index 2c07f0cf4..808dc11a1 100644 --- a/src/server/h1.rs +++ b/src/server/h1.rs @@ -468,7 +468,6 @@ where #[cfg(test)] mod tests { use std::net::Shutdown; - use std::sync::{atomic::AtomicUsize, Arc}; use std::{cmp, io, time}; use bytes::{Buf, Bytes, BytesMut}; @@ -478,20 +477,17 @@ mod tests { use super::*; use application::HttpApplication; use httpmessage::HttpMessage; - use server::accept::AcceptNotify; use server::h1decoder::Message; use server::settings::{ServerSettings, WorkerSettings}; - use server::{KeepAlive, Request}; + use server::{Connections, KeepAlive, Request}; - fn wrk_settings() -> WorkerSettings { - WorkerSettings::::new( + fn wrk_settings() -> Rc> { + Rc::new(WorkerSettings::::new( Vec::new(), KeepAlive::Os, ServerSettings::default(), - AcceptNotify::default(), - Arc::new(AtomicUsize::new(0)), - Arc::new(AtomicUsize::new(0)), - ) + Connections::default(), + )) } impl Message { diff --git a/src/server/mod.rs b/src/server/mod.rs index baf004926..f34497936 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -21,12 +21,16 @@ pub(crate) mod helpers; pub(crate) mod input; pub(crate) mod message; pub(crate) mod output; +mod server; pub(crate) mod settings; mod srv; mod ssl; mod worker; pub use self::message::Request; +pub use self::server::{ + ConnectionRateTag, ConnectionTag, Connections, Server, Service, ServiceHandler, +}; pub use self::settings::ServerSettings; pub use self::srv::HttpServer; pub use self::ssl::*; @@ -136,6 +140,16 @@ impl Message for StopServer { type Result = Result<(), ()>; } +/// Socket id token +#[derive(Clone, Copy)] +pub struct Token(usize); + +impl Token { + pub(crate) fn new(val: usize) -> Token { + Token(val) + } +} + /// Low level http request handler #[allow(unused_variables)] pub trait HttpHandler: 'static { diff --git a/src/server/server.rs b/src/server/server.rs new file mode 100644 index 000000000..ff88040fe --- /dev/null +++ b/src/server/server.rs @@ -0,0 +1,504 @@ +use std::{mem, net}; +use std::time::Duration; +use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; + +use futures::{Future, Stream, Sink}; +use futures::sync::{mpsc, mpsc::unbounded}; + +use actix::{fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, + Context, Handler, Response, System, StreamHandler, WrapFuture}; + +use super::accept::{AcceptLoop, AcceptNotify, Command}; +use super::worker::{StopWorker, Worker, WorkerClient, Conn}; +use super::{PauseServer, ResumeServer, StopServer, Token}; + +pub trait Service: Send + 'static { + /// Clone service + fn clone(&self) -> Box; + + /// Create service handler for this service + fn create(&self, conn: Connections) -> Box; +} + +impl Service for Box { + fn clone(&self) -> Box { + self.as_ref().clone() + } + + fn create(&self, conn: Connections) -> Box { + self.as_ref().create(conn) + } +} + +pub trait ServiceHandler { + /// Handle incoming stream + fn handle(&mut self, token: Token, io: net::TcpStream, peer: Option); + + /// Shutdown open handlers + fn shutdown(&self, _: bool) {} +} + +pub(crate) enum ServerCommand { + WorkerDied(usize), +} + +pub struct Server { + threads: usize, + workers: Vec<(usize, Addr)>, + services: Vec>, + sockets: Vec>, + accept: AcceptLoop, + exit: bool, + shutdown_timeout: u16, + signals: Option>, + no_signals: bool, + maxconn: usize, + maxconnrate: usize, +} + +impl Default for Server { + fn default() -> Self { + Self::new() + } +} + +impl Server { + /// Create new Server instance + pub fn new() -> Server { + Server { + threads: num_cpus::get(), + workers: Vec::new(), + services: Vec::new(), + sockets: Vec::new(), + accept: AcceptLoop::new(), + exit: false, + shutdown_timeout: 30, + signals: None, + no_signals: false, + maxconn: 102_400, + maxconnrate: 256, + } + } + + /// Set number of workers to start. + /// + /// By default http server uses number of available logical cpu as threads + /// count. + pub fn workers(mut self, num: usize) -> Self { + self.threads = num; + self + } + + /// Sets the maximum per-worker number of concurrent connections. + /// + /// All socket listeners will stop accepting connections when this limit is reached + /// for each worker. + /// + /// By default max connections is set to a 100k. + pub fn maxconn(mut self, num: usize) -> Self { + self.maxconn = num; + self + } + + /// Sets the maximum per-worker concurrent connection establish process. + /// + /// All listeners will stop accepting connections when this limit is reached. It + /// can be used to limit the global SSL CPU usage. + /// + /// By default max connections is set to a 256. + pub fn maxconnrate(mut self, num: usize) -> Self { + self.maxconnrate= num; + self + } + + /// Stop actix system. + /// + /// `SystemExit` message stops currently running system. + pub fn system_exit(mut self) -> Self { + self.exit = true; + self + } + + #[doc(hidden)] + /// Set alternative address for `ProcessSignals` actor. + pub fn signals(mut self, addr: Addr) -> Self { + self.signals = Some(addr); + self + } + + /// Disable signal handling + pub fn disable_signals(mut self) -> Self { + self.no_signals = true; + self + } + + /// Timeout for graceful workers shutdown. + /// + /// After receiving a stop signal, workers have this much time to finish + /// serving requests. Workers still alive after the timeout are force + /// dropped. + /// + /// By default shutdown timeout sets to 30 seconds. + pub fn shutdown_timeout(mut self, sec: u16) -> Self { + self.shutdown_timeout = sec; + self + } + + /// Add new service to server + pub fn service(mut self, srv: T, sockets: Vec<(Token, net::TcpListener)>) -> Self + where + T: Into> + { + self.services.push(srv.into()); + self.sockets.push(sockets); + self + } + + /// Spawn new thread and start listening for incoming connections. + /// + /// This method spawns new thread and starts new actix system. Other than + /// that it is similar to `start()` method. This method blocks. + /// + /// This methods panics if no socket addresses get bound. + /// + /// ```rust,ignore + /// # extern crate futures; + /// # extern crate actix_web; + /// # use futures::Future; + /// use actix_web::*; + /// + /// fn main() { + /// Server::new(). + /// .service( + /// HttpServer::new(|| App::new().resource("/", |r| r.h(|_| HttpResponse::Ok()))) + /// .bind("127.0.0.1:0") + /// .expect("Can not bind to 127.0.0.1:0")) + /// .run(); + /// } + /// ``` + pub fn run(self) { + let sys = System::new("http-server"); + self.start(); + sys.run(); + } + + /// Start + pub fn start(mut self) -> Addr { + if self.sockets.is_empty() { + panic!("Service should have at least one bound socket"); + } else { + info!("Starting {} http workers", self.threads); + + // start workers + let mut workers = Vec::new(); + for idx in 0..self.threads { + let (addr, worker) = self.start_worker(idx, self.accept.get_notify()); + workers.push(worker); + self.workers.push((idx, addr)); + } + + // start accept thread + for sock in &self.sockets { + for s in sock.iter() { + info!("Starting server on http://{:?}", s.1.local_addr().ok()); + } + } + let rx = self.accept.start( + mem::replace(&mut self.sockets, Vec::new()), workers); + + // start http server actor + let signals = self.subscribe_to_signals(); + let addr = Actor::create(move |ctx| { + ctx.add_stream(rx); + self + }); + if let Some(signals) = signals { + signals.do_send(signal::Subscribe(addr.clone().recipient())) + } + addr + } + } + + // subscribe to os signals + fn subscribe_to_signals(&self) -> Option> { + if !self.no_signals { + if let Some(ref signals) = self.signals { + Some(signals.clone()) + } else { + Some(System::current().registry().get::()) + } + } else { + None + } + } + + fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr, WorkerClient) { + let (tx, rx) = unbounded::>(); + let conns = Connections::new(notify, self.maxconn, self.maxconnrate); + let worker = WorkerClient::new(idx, tx, conns.clone()); + let services: Vec<_> = self.services.iter().map(|v| v.clone()).collect(); + + let addr = Arbiter::start(move |ctx: &mut Context<_>| { + ctx.add_message_stream(rx); + let handlers: Vec<_> = services.into_iter().map(|s| s.create(conns.clone())).collect(); + Worker::new(conns, handlers) + }); + + (addr, worker) + } +} + +impl Actor for Server +{ + type Context = Context; +} + +/// Signals support +/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system +/// message to `System` actor. +impl Handler for Server { + type Result = (); + + fn handle(&mut self, msg: signal::Signal, ctx: &mut Context) { + match msg.0 { + signal::SignalType::Int => { + info!("SIGINT received, exiting"); + self.exit = true; + Handler::::handle(self, StopServer { graceful: false }, ctx); + } + signal::SignalType::Term => { + info!("SIGTERM received, stopping"); + self.exit = true; + Handler::::handle(self, StopServer { graceful: true }, ctx); + } + signal::SignalType::Quit => { + info!("SIGQUIT received, exiting"); + self.exit = true; + Handler::::handle(self, StopServer { graceful: false }, ctx); + } + _ => (), + } + } +} + +impl Handler for Server { + type Result = (); + + fn handle(&mut self, _: PauseServer, _: &mut Context) { + self.accept.send(Command::Pause); + } +} + +impl Handler for Server { + type Result = (); + + fn handle(&mut self, _: ResumeServer, _: &mut Context) { + self.accept.send(Command::Resume); + } +} + +impl Handler for Server { + type Result = Response<(), ()>; + + fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { + // stop accept thread + self.accept.send(Command::Stop); + + // stop workers + let (tx, rx) = mpsc::channel(1); + + let dur = if msg.graceful { + Some(Duration::new(u64::from(self.shutdown_timeout), 0)) + } else { + None + }; + for worker in &self.workers { + let tx2 = tx.clone(); + ctx.spawn( + worker + .1 + .send(StopWorker { graceful: dur }) + .into_actor(self) + .then(move |_, slf, ctx| { + slf.workers.pop(); + if slf.workers.is_empty() { + let _ = tx2.send(()); + + // we need to stop system if server was spawned + if slf.exit { + ctx.run_later(Duration::from_millis(300), |_, _| { + System::current().stop(); + }); + } + } + + fut::ok(()) + }), + ); + } + + if !self.workers.is_empty() { + Response::async(rx.into_future().map(|_| ()).map_err(|_| ())) + } else { + // we need to stop system if server was spawned + if self.exit { + ctx.run_later(Duration::from_millis(300), |_, _| { + System::current().stop(); + }); + } + Response::reply(Ok(())) + } + } +} + +/// Commands from accept threads +impl StreamHandler for Server { + fn finished(&mut self, _: &mut Context) {} + + fn handle(&mut self, msg: ServerCommand, _: &mut Context) { + match msg { + ServerCommand::WorkerDied(idx) => { + let mut found = false; + for i in 0..self.workers.len() { + if self.workers[i].0 == idx { + self.workers.swap_remove(i); + found = true; + break; + } + } + + if found { + error!("Worker has died {:?}, restarting", idx); + + let mut new_idx = self.workers.len(); + 'found: loop { + for i in 0..self.workers.len() { + if self.workers[i].0 == new_idx { + new_idx += 1; + continue 'found; + } + } + break; + } + + let (addr, worker) = self.start_worker(new_idx, self.accept.get_notify()); + self.workers.push((new_idx, addr)); + self.accept.send(Command::Worker(worker)); + } + } + } + } +} + +#[derive(Clone, Default)] +pub struct Connections (Arc); + +impl Connections { + fn new(notify: AcceptNotify, maxconn: usize, maxconnrate: usize) -> Self { + let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 }; + let maxconnrate_low = if maxconnrate > 10 { + maxconnrate - 10 + } else { + 0 + }; + + Connections ( + Arc::new(ConnectionsInner { + notify, + maxconn, maxconnrate, + maxconn_low, maxconnrate_low, + conn: AtomicUsize::new(0), + connrate: AtomicUsize::new(0), + })) + } + + pub(crate) fn available(&self) -> bool { + self.0.available() + } + + pub(crate) fn num_connections(&self) -> usize { + self.0.conn.load(Ordering::Relaxed) + } + + /// Report opened connection + pub fn connection(&self) -> ConnectionTag { + ConnectionTag::new(self.0.clone()) + } + + /// Report rate connection, rate is usually ssl handshake + pub fn connection_rate(&self) -> ConnectionRateTag { + ConnectionRateTag::new(self.0.clone()) + } +} + +#[derive(Default)] +struct ConnectionsInner { + notify: AcceptNotify, + conn: AtomicUsize, + connrate: AtomicUsize, + maxconn: usize, + maxconnrate: usize, + maxconn_low: usize, + maxconnrate_low: usize, +} + +impl ConnectionsInner { + fn available(&self) -> bool { + if self.maxconnrate <= self.connrate.load(Ordering::Relaxed) { + false + } else { + self.maxconn > self.conn.load(Ordering::Relaxed) + } + } + + fn notify_maxconn(&self, maxconn: usize) { + if maxconn > self.maxconn_low && maxconn <= self.maxconn { + self.notify.notify(); + } + } + + fn notify_maxconnrate(&self, connrate: usize) { + if connrate > self.maxconnrate_low && connrate <= self.maxconnrate { + self.notify.notify(); + } + } + +} + +/// Type responsible for max connection stat. +/// +/// Max connections stat get updated on drop. +pub struct ConnectionTag(Arc); + +impl ConnectionTag { + fn new(inner: Arc) -> Self { + inner.conn.fetch_add(1, Ordering::Relaxed); + ConnectionTag(inner) + } +} + +impl Drop for ConnectionTag { + fn drop(&mut self) { + let conn = self.0.conn.fetch_sub(1, Ordering::Relaxed); + self.0.notify_maxconn(conn); + } +} + +/// Type responsible for max connection rate stat. +/// +/// Max connections rate stat get updated on drop. +pub struct ConnectionRateTag (Arc); + +impl ConnectionRateTag { + fn new(inner: Arc) -> Self { + inner.connrate.fetch_add(1, Ordering::Relaxed); + ConnectionRateTag(inner) + } +} + +impl Drop for ConnectionRateTag { + fn drop(&mut self) { + let connrate = self.0.connrate.fetch_sub(1, Ordering::Relaxed); + self.0.notify_maxconnrate(connrate); + } +} diff --git a/src/server/settings.rs b/src/server/settings.rs index 508be67dd..e9ca0f851 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -2,19 +2,22 @@ use std::cell::{RefCell, RefMut, UnsafeCell}; use std::collections::VecDeque; use std::fmt::Write; use std::rc::Rc; -use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; +use std::time::{Duration, Instant}; use std::{env, fmt, net}; +use actix::Arbiter; use bytes::BytesMut; +use futures::Stream; use futures_cpupool::CpuPool; use http::StatusCode; use lazycell::LazyCell; use parking_lot::Mutex; use time; +use tokio_timer::Interval; -use super::accept::AcceptNotify; use super::channel::Node; use super::message::{Request, RequestPool}; +use super::server::{ConnectionRateTag, ConnectionTag, Connections}; use super::KeepAlive; use body::Body; use httpresponse::{HttpResponse, HttpResponseBuilder, HttpResponsePool}; @@ -137,17 +140,36 @@ pub(crate) struct WorkerSettings { ka_enabled: bool, bytes: Rc, messages: &'static RequestPool, - channels: Arc, + conns: Connections, node: RefCell>, date: UnsafeCell, - connrate: Arc, - notify: AcceptNotify, +} + +impl WorkerSettings { + pub(crate) fn create( + apps: Vec, keep_alive: KeepAlive, settings: ServerSettings, + conns: Connections, + ) -> Rc> { + let settings = Rc::new(Self::new(apps, keep_alive, settings, conns)); + + // periodic date update + let s = settings.clone(); + Arbiter::spawn( + Interval::new(Instant::now(), Duration::from_secs(1)) + .map_err(|_| ()) + .and_then(move |_| { + s.update_date(); + Ok(()) + }).fold((), |(), _| Ok(())), + ); + + settings + } } impl WorkerSettings { pub(crate) fn new( - h: Vec, keep_alive: KeepAlive, settings: ServerSettings, - notify: AcceptNotify, channels: Arc, connrate: Arc, + h: Vec, keep_alive: KeepAlive, settings: ServerSettings, conns: Connections, ) -> WorkerSettings { let (keep_alive, ka_enabled) = match keep_alive { KeepAlive::Timeout(val) => (val as u64, true), @@ -163,16 +185,10 @@ impl WorkerSettings { date: UnsafeCell::new(Date::new()), keep_alive, ka_enabled, - channels, - connrate, - notify, + conns, } } - pub fn num_channels(&self) -> usize { - self.channels.load(Ordering::Relaxed) - } - pub fn head(&self) -> RefMut> { self.node.borrow_mut() } @@ -201,16 +217,11 @@ impl WorkerSettings { RequestPool::get(self.messages) } - pub fn add_channel(&self) { - self.channels.fetch_add(1, Ordering::Relaxed); + pub fn connection(&self) -> ConnectionTag { + self.conns.connection() } - pub fn remove_channel(&self) { - let val = self.channels.fetch_sub(1, Ordering::Relaxed); - self.notify.notify_maxconn(val); - } - - pub fn update_date(&self) { + fn update_date(&self) { // Unsafe: WorkerSetting is !Sync and !Send unsafe { &mut *self.date.get() }.update(); } @@ -230,13 +241,8 @@ impl WorkerSettings { } #[allow(dead_code)] - pub(crate) fn conn_rate_add(&self) { - self.connrate.fetch_add(1, Ordering::Relaxed); - } - #[allow(dead_code)] - pub(crate) fn conn_rate_del(&self) { - let val = self.connrate.fetch_sub(1, Ordering::Relaxed); - self.notify.notify_maxconnrate(val); + pub(crate) fn connection_rate(&self) -> ConnectionRateTag { + self.conns.connection_rate() } } @@ -309,9 +315,7 @@ mod tests { Vec::new(), KeepAlive::Os, ServerSettings::default(), - AcceptNotify::default(), - Arc::new(AtomicUsize::new(0)), - Arc::new(AtomicUsize::new(0)), + Connections::default(), ); let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); settings.set_date(&mut buf1, true); diff --git a/src/server/srv.rs b/src/server/srv.rs index c2bb6c819..eaf7802c7 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -1,16 +1,14 @@ +use std::marker::PhantomData; use std::rc::Rc; -use std::sync::{atomic::AtomicUsize, Arc}; -use std::time::Duration; -use std::{io, net}; +use std::sync::Arc; +use std::{io, mem, net, time}; -use actix::{ - fut, signal, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, - Response, StreamHandler, System, WrapFuture, -}; +use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, System}; -use futures::sync::mpsc; -use futures::{Future, Sink, Stream}; +use futures::{Future, Stream}; +use net2::{TcpBuilder, TcpStreamExt}; use num_cpus; +use tokio::executor::current_thread; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_tcp::TcpStream; @@ -23,39 +21,33 @@ use openssl::ssl::SslAcceptorBuilder; #[cfg(feature = "rust-tls")] use rustls::ServerConfig; -use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::channel::{HttpChannel, WrapperStream}; +use super::server::{Connections, Server, Service, ServiceHandler}; use super::settings::{ServerSettings, WorkerSettings}; -use super::worker::{Conn, StopWorker, Token, Worker, WorkerClient, WorkerFactory}; -use super::{AcceptorService, IntoHttpHandler, IoStream, KeepAlive}; -use super::{PauseServer, ResumeServer, StopServer}; +use super::worker::{Conn, Socket}; +use super::{ + AcceptorService, HttpHandler, IntoAsyncIo, IntoHttpHandler, IoStream, KeepAlive, + Token, +}; /// An HTTP Server pub struct HttpServer where H: IntoHttpHandler + 'static, { + factory: Arc Vec + Send + Sync>, + host: Option, + keep_alive: KeepAlive, + backlog: i32, threads: usize, - factory: WorkerFactory, - workers: Vec<(usize, Addr)>, - accept: AcceptLoop, exit: bool, shutdown_timeout: u16, - signals: Option>, no_http2: bool, no_signals: bool, - settings: Option>>, -} - -pub(crate) enum ServerCommand { - WorkerDied(usize), -} - -impl Actor for HttpServer -where - H: IntoHttpHandler, -{ - type Context = Context; + maxconn: usize, + maxconnrate: usize, + sockets: Vec, + handlers: Vec>>, } impl HttpServer @@ -72,15 +64,19 @@ where HttpServer { threads: num_cpus::get(), - factory: WorkerFactory::new(f), - workers: Vec::new(), - accept: AcceptLoop::new(), - exit: false, + factory: Arc::new(f), + host: None, + backlog: 2048, + keep_alive: KeepAlive::Os, shutdown_timeout: 30, - signals: None, + exit: true, no_http2: false, no_signals: false, - settings: None, + maxconn: 102_400, + maxconnrate: 256, + // settings: None, + sockets: Vec::new(), + handlers: Vec::new(), } } @@ -104,7 +100,7 @@ where /// /// This method should be called before `bind()` method call. pub fn backlog(mut self, num: i32) -> Self { - self.factory.backlog = num; + self.backlog = num; self } @@ -115,7 +111,7 @@ where /// /// By default max connections is set to a 100k. pub fn maxconn(mut self, num: usize) -> Self { - self.accept.maxconn(num); + self.maxconn = num; self } @@ -126,7 +122,7 @@ where /// /// By default max connections is set to a 256. pub fn maxconnrate(mut self, num: usize) -> Self { - self.accept.maxconnrate(num); + self.maxconnrate = num; self } @@ -134,7 +130,7 @@ where /// /// By default keep alive is set to a `Os`. pub fn keep_alive>(mut self, val: T) -> Self { - self.factory.keep_alive = val.into(); + self.keep_alive = val.into(); self } @@ -144,7 +140,7 @@ where /// generation. Check [ConnectionInfo](./dev/struct.ConnectionInfo. /// html#method.host) documentation for more information. pub fn server_hostname(mut self, val: String) -> Self { - self.factory.host = Some(val); + self.host = Some(val); self } @@ -156,12 +152,6 @@ where self } - /// Set alternative address for `ProcessSignals` actor. - pub fn signals(mut self, addr: Addr) -> Self { - self.signals = Some(addr); - self - } - /// Disable signal handling pub fn disable_signals(mut self) -> Self { self.no_signals = true; @@ -182,7 +172,10 @@ where /// Disable `HTTP/2` support #[doc(hidden)] - #[deprecated(since = "0.7.4", note = "please use acceptor service with proper ServerFlags parama")] + #[deprecated( + since = "0.7.4", + note = "please use acceptor service with proper ServerFlags parama" + )] pub fn no_http2(mut self) -> Self { self.no_http2 = true; self @@ -190,7 +183,7 @@ where /// Get addresses of bound sockets. pub fn addrs(&self) -> Vec { - self.factory.addrs() + self.sockets.iter().map(|s| s.addr).collect() } /// Get addresses of bound sockets and the scheme for it. @@ -200,7 +193,10 @@ where /// and the user should be presented with an enumeration of which /// socket requires which protocol. pub fn addrs_with_scheme(&self) -> Vec<(net::SocketAddr, &str)> { - self.factory.addrs_with_scheme() + self.handlers + .iter() + .map(|s| (s.addr(), s.scheme())) + .collect() } /// Use listener for accepting incoming connection requests @@ -208,19 +204,29 @@ where /// HttpServer does not change any configuration for TcpListener, /// it needs to be configured before passing it to listen() method. pub fn listen(mut self, lst: net::TcpListener) -> Self { - self.factory.listen(lst); + let token = Token(self.handlers.len()); + let addr = lst.local_addr().unwrap(); + self.handlers + .push(Box::new(SimpleHandler::new(lst.local_addr().unwrap()))); + self.sockets.push(Socket { lst, addr, token }); + self } /// Use listener for accepting incoming connection requests - pub fn listen_with( - mut self, lst: net::TcpListener, acceptor: A, - ) -> io::Result + pub fn listen_with(mut self, lst: net::TcpListener, acceptor: A) -> Self where A: AcceptorService + Send + 'static, { - self.factory.listen_with(lst, acceptor); - Ok(self) + let token = Token(self.handlers.len()); + let addr = lst.local_addr().unwrap(); + self.handlers.push(Box::new(StreamHandler::new( + lst.local_addr().unwrap(), + acceptor, + ))); + self.sockets.push(Socket { lst, addr, token }); + + self } #[cfg(feature = "tls")] @@ -233,12 +239,10 @@ where /// /// HttpServer does not change any configuration for TcpListener, /// it needs to be configured before passing it to listen() method. - pub fn listen_tls( - self, lst: net::TcpListener, acceptor: TlsAcceptor, - ) -> io::Result { + pub fn listen_tls(self, lst: net::TcpListener, acceptor: TlsAcceptor) -> Self { use super::NativeTlsAcceptor; - self.listen_with(lst, NativeTlsAcceptor::new(acceptor)) + Ok(self.listen_with(lst, NativeTlsAcceptor::new(acceptor))) } #[cfg(feature = "alpn")] @@ -262,7 +266,7 @@ where ServerFlags::HTTP1 | ServerFlags::HTTP2 }; - self.listen_with(lst, OpensslAcceptor::with_flags(builder, flags)?) + Ok(self.listen_with(lst, OpensslAcceptor::with_flags(builder, flags)?)) } #[cfg(feature = "rust-tls")] @@ -274,9 +278,7 @@ where /// Use listener for accepting incoming tls connection requests /// /// This method sets alpn protocols to "h2" and "http/1.1" - pub fn listen_rustls( - self, lst: net::TcpListener, builder: ServerConfig, - ) -> io::Result { + pub fn listen_rustls(self, lst: net::TcpListener, builder: ServerConfig) -> Self { use super::{RustlsAcceptor, ServerFlags}; // alpn support @@ -293,7 +295,16 @@ where /// /// To bind multiple addresses this method can be called multiple times. pub fn bind(mut self, addr: S) -> io::Result { - self.factory.bind(addr)?; + let sockets = self.bind2(addr)?; + + for lst in sockets { + let token = Token(self.handlers.len()); + let addr = lst.local_addr().unwrap(); + self.handlers + .push(Box::new(SimpleHandler::new(lst.local_addr().unwrap()))); + self.sockets.push(Socket { lst, addr, token }) + } + Ok(self) } @@ -304,10 +315,51 @@ where S: net::ToSocketAddrs, A: AcceptorService + Send + 'static, { - self.factory.bind_with(addr, &acceptor)?; + let sockets = self.bind2(addr)?; + + for lst in sockets { + let token = Token(self.handlers.len()); + let addr = lst.local_addr().unwrap(); + self.handlers.push(Box::new(StreamHandler::new( + lst.local_addr().unwrap(), + acceptor.clone(), + ))); + self.sockets.push(Socket { lst, addr, token }) + } + Ok(self) } + fn bind2( + &self, addr: S, + ) -> io::Result> { + let mut err = None; + let mut succ = false; + let mut sockets = Vec::new(); + for addr in addr.to_socket_addrs()? { + match create_tcp_listener(addr, self.backlog) { + Ok(lst) => { + succ = true; + sockets.push(lst); + } + Err(e) => err = Some(e), + } + } + + if !succ { + if let Some(e) = err.take() { + Err(e) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "Can not bind to address.", + )) + } + } else { + Ok(sockets) + } + } + #[cfg(feature = "tls")] #[doc(hidden)] #[deprecated( @@ -373,37 +425,59 @@ where self.bind_with(addr, RustlsAcceptor::with_flags(builder, flags)) } +} - fn start_workers(&mut self, notify: &AcceptNotify) -> Vec { - // start workers - let mut workers = Vec::new(); - for idx in 0..self.threads { - let (worker, addr) = self.factory.start(idx, notify.clone()); - workers.push(worker); - self.workers.push((idx, addr)); - } - info!("Starting {} http workers", self.threads); - workers +impl Into> for HttpServer { + fn into(self) -> Box { + Box::new(HttpService { + factory: self.factory, + host: self.host, + keep_alive: self.keep_alive, + handlers: self.handlers, + }) + } +} + +struct HttpService { + factory: Arc Vec + Send + Sync>, + host: Option, + keep_alive: KeepAlive, + handlers: Vec>>, +} + +impl Service for HttpService { + fn clone(&self) -> Box { + Box::new(HttpService { + factory: self.factory.clone(), + host: self.host.clone(), + keep_alive: self.keep_alive, + handlers: self.handlers.iter().map(|v| v.clone()).collect(), + }) } - // subscribe to os signals - fn subscribe_to_signals(&self) -> Option> { - if !self.no_signals { - if let Some(ref signals) = self.signals { - Some(signals.clone()) - } else { - Some(System::current().registry().get::()) - } - } else { - None - } + fn create(&self, conns: Connections) -> Box { + let addr = self.handlers[0].addr(); + let s = ServerSettings::new(Some(addr), &self.host, false); + let apps: Vec<_> = (*self.factory)() + .into_iter() + .map(|h| h.into_handler()) + .collect(); + let handlers = self.handlers.iter().map(|h| h.clone()).collect(); + + Box::new(HttpServiceHandler::new( + apps, + handlers, + self.keep_alive, + s, + conns, + )) } } impl HttpServer { /// Start listening for incoming connections. /// - /// This method starts number of http handler workers in separate threads. + /// This method starts number of http workers in separate threads. /// For each address this method starts separate thread which does /// `accept()` in a loop. /// @@ -426,31 +500,25 @@ impl HttpServer { /// sys.run(); // <- Run actix system, this method starts all async processes /// } /// ``` - pub fn start(mut self) -> Addr { - let sockets = self.factory.take_sockets(); - if sockets.is_empty() { - panic!("HttpServer::bind() has to be called before start()"); + pub fn start(mut self) -> Addr { + let mut srv = Server::new() + .workers(self.threads) + .maxconn(self.maxconn) + .maxconnrate(self.maxconnrate) + .shutdown_timeout(self.shutdown_timeout); + + srv = if self.exit { srv.system_exit() } else { srv }; + srv = if self.no_signals { + srv.disable_signals() } else { - let notify = self.accept.get_notify(); - let workers = self.start_workers(¬ify); + srv + }; - // start accept thread - for sock in &sockets { - info!("Starting server on http://{}", sock.addr); - } - let rx = self.accept.start(sockets, workers.clone()); - - // start http server actor - let signals = self.subscribe_to_signals(); - let addr = Actor::create(move |ctx| { - ctx.add_stream(rx); - self - }); - if let Some(signals) = signals { - signals.do_send(signal::Subscribe(addr.clone().recipient())) - } - addr - } + let sockets: Vec<_> = mem::replace(&mut self.sockets, Vec::new()) + .into_iter() + .map(|item| (item.token, item.lst)) + .collect(); + srv.service(self, sockets).start() } /// Spawn new thread and start listening for incoming connections. @@ -484,195 +552,279 @@ impl HttpServer { /// Start listening for incoming connections from a stream. /// /// This method uses only one thread for handling incoming connections. - pub fn start_incoming(mut self, stream: S, secure: bool) -> Addr + pub fn start_incoming(self, stream: S, secure: bool) where S: Stream + Send + 'static, T: AsyncRead + AsyncWrite + Send + 'static, { // set server settings let addr: net::SocketAddr = "127.0.0.1:8080".parse().unwrap(); - let settings = ServerSettings::new(Some(addr), &self.factory.host, secure); - let apps: Vec<_> = (*self.factory.factory)() + let srv_settings = ServerSettings::new(Some(addr), &self.host, secure); + let apps: Vec<_> = (*self.factory)() .into_iter() .map(|h| h.into_handler()) .collect(); - self.settings = Some(Rc::new(WorkerSettings::new( + let settings = WorkerSettings::create( apps, - self.factory.keep_alive, - settings, - AcceptNotify::default(), - Arc::new(AtomicUsize::new(0)), - Arc::new(AtomicUsize::new(0)), - ))); + self.keep_alive, + srv_settings, + Connections::default(), + ); // start server - let signals = self.subscribe_to_signals(); - let addr = HttpServer::create(move |ctx| { + HttpIncoming::create(move |ctx| { ctx.add_message_stream(stream.map_err(|_| ()).map(move |t| Conn { io: WrapperStream::new(t), + handler: Token::new(0), token: Token::new(0), peer: None, })); - self + HttpIncoming { settings } }); - - if let Some(signals) = signals { - signals.do_send(signal::Subscribe(addr.clone().recipient())) - } - addr } } -/// Signals support -/// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system -/// message to `System` actor. -impl Handler for HttpServer { - type Result = (); - - fn handle(&mut self, msg: signal::Signal, ctx: &mut Context) { - match msg.0 { - signal::SignalType::Int => { - info!("SIGINT received, exiting"); - self.exit = true; - Handler::::handle(self, StopServer { graceful: false }, ctx); - } - signal::SignalType::Term => { - info!("SIGTERM received, stopping"); - self.exit = true; - Handler::::handle(self, StopServer { graceful: true }, ctx); - } - signal::SignalType::Quit => { - info!("SIGQUIT received, exiting"); - self.exit = true; - Handler::::handle(self, StopServer { graceful: false }, ctx); - } - _ => (), - } - } +struct HttpIncoming { + settings: Rc>, } -/// Commands from accept threads -impl StreamHandler for HttpServer { - fn finished(&mut self, _: &mut Context) {} - - fn handle(&mut self, msg: ServerCommand, _: &mut Context) { - match msg { - ServerCommand::WorkerDied(idx) => { - let mut found = false; - for i in 0..self.workers.len() { - if self.workers[i].0 == idx { - self.workers.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker has died {:?}, restarting", idx); - - let mut new_idx = self.workers.len(); - 'found: loop { - for i in 0..self.workers.len() { - if self.workers[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let (worker, addr) = - self.factory.start(new_idx, self.accept.get_notify()); - self.workers.push((new_idx, addr)); - self.accept.send(Command::Worker(worker)); - } - } - } - } +impl Actor for HttpIncoming +where + H: HttpHandler, +{ + type Context = Context; } -impl Handler> for HttpServer +impl Handler> for HttpIncoming where T: IoStream, - H: IntoHttpHandler, + H: HttpHandler, { type Result = (); fn handle(&mut self, msg: Conn, _: &mut Context) -> Self::Result { Arbiter::spawn(HttpChannel::new( - Rc::clone(self.settings.as_ref().unwrap()), + Rc::clone(&self.settings), msg.io, msg.peer, )); } } -impl Handler for HttpServer { - type Result = (); - - fn handle(&mut self, _: PauseServer, _: &mut Context) { - self.accept.send(Command::Pause); - } +struct HttpServiceHandler +where + H: HttpHandler + 'static, +{ + settings: Rc>, + handlers: Vec>>, + tcp_ka: Option, } -impl Handler for HttpServer { - type Result = (); - - fn handle(&mut self, _: ResumeServer, _: &mut Context) { - self.accept.send(Command::Resume); - } -} - -impl Handler for HttpServer { - type Result = Response<(), ()>; - - fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { - // stop accept threads - self.accept.send(Command::Stop); - - // stop workers - let (tx, rx) = mpsc::channel(1); - - let dur = if msg.graceful { - Some(Duration::new(u64::from(self.shutdown_timeout), 0)) +impl HttpServiceHandler { + fn new( + apps: Vec, handlers: Vec>>, + keep_alive: KeepAlive, settings: ServerSettings, conns: Connections, + ) -> HttpServiceHandler { + let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { + Some(time::Duration::new(val as u64, 0)) } else { None }; - for worker in &self.workers { - let tx2 = tx.clone(); - ctx.spawn( - worker - .1 - .send(StopWorker { graceful: dur }) - .into_actor(self) - .then(move |_, slf, ctx| { - slf.workers.pop(); - if slf.workers.is_empty() { - let _ = tx2.send(()); + let settings = WorkerSettings::create(apps, keep_alive, settings, conns); - // we need to stop system if server was spawned - if slf.exit { - ctx.run_later(Duration::from_millis(300), |_, _| { - System::current().stop(); - }); - } - } - - fut::ok(()) - }), - ); - } - - if !self.workers.is_empty() { - Response::async(rx.into_future().map(|_| ()).map_err(|_| ())) - } else { - // we need to stop system if server was spawned - if self.exit { - ctx.run_later(Duration::from_millis(300), |_, _| { - System::current().stop(); - }); - } - Response::reply(Ok(())) + HttpServiceHandler { + handlers, + tcp_ka, + settings, } } -} \ No newline at end of file +} + +impl ServiceHandler for HttpServiceHandler +where + H: HttpHandler + 'static, +{ + fn handle( + &mut self, token: Token, io: net::TcpStream, peer: Option, + ) { + if self.tcp_ka.is_some() && io.set_keepalive(self.tcp_ka).is_err() { + error!("Can not set socket keep-alive option"); + } + self.handlers[token.0].handle(Rc::clone(&self.settings), io, peer); + } + + fn shutdown(&self, force: bool) { + if force { + self.settings.head().traverse::(); + } + } +} + +struct SimpleHandler { + addr: net::SocketAddr, + io: PhantomData, +} + +impl Clone for SimpleHandler { + fn clone(&self) -> Self { + SimpleHandler { + addr: self.addr, + io: PhantomData, + } + } +} + +impl SimpleHandler { + fn new(addr: net::SocketAddr) -> Self { + SimpleHandler { + addr, + io: PhantomData, + } + } +} + +impl IoStreamHandler for SimpleHandler +where + H: HttpHandler, + Io: IntoAsyncIo + Send + 'static, + Io::Io: IoStream, +{ + fn addr(&self) -> net::SocketAddr { + self.addr + } + + fn clone(&self) -> Box> { + Box::new(Clone::clone(self)) + } + + fn scheme(&self) -> &'static str { + "http" + } + + fn handle(&self, h: Rc>, io: Io, peer: Option) { + let mut io = match io.into_async_io() { + Ok(io) => io, + Err(err) => { + trace!("Failed to create async io: {}", err); + return; + } + }; + let _ = io.set_nodelay(true); + + current_thread::spawn(HttpChannel::new(h, io, peer)); + } +} + +struct StreamHandler { + acceptor: A, + addr: net::SocketAddr, + io: PhantomData, +} + +impl> StreamHandler { + fn new(addr: net::SocketAddr, acceptor: A) -> Self { + StreamHandler { + addr, + acceptor, + io: PhantomData, + } + } +} + +impl> Clone for StreamHandler { + fn clone(&self) -> Self { + StreamHandler { + addr: self.addr, + acceptor: self.acceptor.clone(), + io: PhantomData, + } + } +} + +impl IoStreamHandler for StreamHandler +where + H: HttpHandler, + Io: IntoAsyncIo + Send + 'static, + Io::Io: IoStream, + A: AcceptorService + Send + 'static, +{ + fn addr(&self) -> net::SocketAddr { + self.addr + } + + fn clone(&self) -> Box> { + Box::new(Clone::clone(self)) + } + + fn scheme(&self) -> &'static str { + self.acceptor.scheme() + } + + fn handle(&self, h: Rc>, io: Io, peer: Option) { + let mut io = match io.into_async_io() { + Ok(io) => io, + Err(err) => { + trace!("Failed to create async io: {}", err); + return; + } + }; + let _ = io.set_nodelay(true); + + let rate = h.connection_rate(); + current_thread::spawn(self.acceptor.accept(io).then(move |res| { + drop(rate); + match res { + Ok(io) => current_thread::spawn(HttpChannel::new(h, io, peer)), + Err(err) => trace!("Can not establish connection: {}", err), + } + Ok(()) + })) + } +} + +impl IoStreamHandler for Box> +where + H: HttpHandler, + Io: IntoAsyncIo, +{ + fn addr(&self) -> net::SocketAddr { + self.as_ref().addr() + } + + fn clone(&self) -> Box> { + self.as_ref().clone() + } + + fn scheme(&self) -> &'static str { + self.as_ref().scheme() + } + + fn handle(&self, h: Rc>, io: Io, peer: Option) { + self.as_ref().handle(h, io, peer) + } +} + +trait IoStreamHandler: Send +where + H: HttpHandler, +{ + fn clone(&self) -> Box>; + + fn addr(&self) -> net::SocketAddr; + + fn scheme(&self) -> &'static str; + + fn handle(&self, h: Rc>, io: Io, peer: Option); +} + +fn create_tcp_listener( + addr: net::SocketAddr, backlog: i32, +) -> io::Result { + let builder = match addr { + net::SocketAddr::V4(_) => TcpBuilder::new_v4()?, + net::SocketAddr::V6(_) => TcpBuilder::new_v6()?, + }; + builder.reuse_address(true)?; + builder.bind(addr)?; + Ok(builder.listen(backlog)?) +} diff --git a/src/server/worker.rs b/src/server/worker.rs index 168382e64..77128adc0 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -1,216 +1,41 @@ -use std::marker::PhantomData; -use std::rc::Rc; -use std::sync::{atomic::AtomicUsize, atomic::Ordering, Arc}; -use std::{io, mem, net, time}; +use std::{net, time}; -use futures::sync::mpsc::{unbounded, SendError, UnboundedSender}; +use futures::sync::mpsc::{SendError, UnboundedSender}; use futures::sync::oneshot; use futures::Future; -use net2::{TcpBuilder, TcpStreamExt}; -use tokio::executor::current_thread; -use tokio_tcp::TcpStream; use actix::msgs::StopArbiter; -use actix::{Actor, Addr, Arbiter, AsyncContext, Context, Handler, Message, Response}; +use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Message, Response}; -use super::accept::AcceptNotify; -use super::channel::HttpChannel; -use super::settings::{ServerSettings, WorkerSettings}; -use super::{ - AcceptorService, HttpHandler, IntoAsyncIo, IntoHttpHandler, IoStream, KeepAlive, -}; +use super::server::{Connections, ServiceHandler}; +use super::Token; #[derive(Message)] pub(crate) struct Conn { pub io: T, + pub handler: Token, pub token: Token, pub peer: Option, } -#[derive(Clone, Copy)] -pub struct Token(usize); - -impl Token { - pub(crate) fn new(val: usize) -> Token { - Token(val) - } -} - pub(crate) struct Socket { pub lst: net::TcpListener, pub addr: net::SocketAddr, pub token: Token, } -pub(crate) struct WorkerFactory { - pub factory: Arc Vec + Send + Sync>, - pub host: Option, - pub keep_alive: KeepAlive, - pub backlog: i32, - sockets: Vec, - handlers: Vec>>, -} - -impl WorkerFactory { - pub fn new(factory: F) -> Self - where - F: Fn() -> Vec + Send + Sync + 'static, - { - WorkerFactory { - factory: Arc::new(factory), - host: None, - backlog: 2048, - keep_alive: KeepAlive::Os, - sockets: Vec::new(), - handlers: Vec::new(), - } - } - - pub fn addrs(&self) -> Vec { - self.sockets.iter().map(|s| s.addr).collect() - } - - pub fn addrs_with_scheme(&self) -> Vec<(net::SocketAddr, &str)> { - self.handlers - .iter() - .map(|s| (s.addr(), s.scheme())) - .collect() - } - - pub fn take_sockets(&mut self) -> Vec { - mem::replace(&mut self.sockets, Vec::new()) - } - - pub fn listen(&mut self, lst: net::TcpListener) { - let token = Token(self.handlers.len()); - let addr = lst.local_addr().unwrap(); - self.handlers - .push(Box::new(SimpleHandler::new(lst.local_addr().unwrap()))); - self.sockets.push(Socket { lst, addr, token }) - } - - pub fn listen_with(&mut self, lst: net::TcpListener, acceptor: A) - where - A: AcceptorService + Send + 'static, - { - let token = Token(self.handlers.len()); - let addr = lst.local_addr().unwrap(); - self.handlers.push(Box::new(StreamHandler::new( - lst.local_addr().unwrap(), - acceptor, - ))); - self.sockets.push(Socket { lst, addr, token }) - } - - pub fn bind(&mut self, addr: S) -> io::Result<()> - where - S: net::ToSocketAddrs, - { - let sockets = self.bind2(addr)?; - - for lst in sockets { - let token = Token(self.handlers.len()); - let addr = lst.local_addr().unwrap(); - self.handlers - .push(Box::new(SimpleHandler::new(lst.local_addr().unwrap()))); - self.sockets.push(Socket { lst, addr, token }) - } - Ok(()) - } - - pub fn bind_with(&mut self, addr: S, acceptor: &A) -> io::Result<()> - where - S: net::ToSocketAddrs, - A: AcceptorService + Send + 'static, - { - let sockets = self.bind2(addr)?; - - for lst in sockets { - let token = Token(self.handlers.len()); - let addr = lst.local_addr().unwrap(); - self.handlers.push(Box::new(StreamHandler::new( - lst.local_addr().unwrap(), - acceptor.clone(), - ))); - self.sockets.push(Socket { lst, addr, token }) - } - Ok(()) - } - - fn bind2( - &self, addr: S, - ) -> io::Result> { - let mut err = None; - let mut succ = false; - let mut sockets = Vec::new(); - for addr in addr.to_socket_addrs()? { - match create_tcp_listener(addr, self.backlog) { - Ok(lst) => { - succ = true; - sockets.push(lst); - } - Err(e) => err = Some(e), - } - } - - if !succ { - if let Some(e) = err.take() { - Err(e) - } else { - Err(io::Error::new( - io::ErrorKind::Other, - "Can not bind to address.", - )) - } - } else { - Ok(sockets) - } - } - - pub fn start( - &mut self, idx: usize, notify: AcceptNotify, - ) -> (WorkerClient, Addr) { - let host = self.host.clone(); - let addr = self.handlers[0].addr(); - let factory = Arc::clone(&self.factory); - let ka = self.keep_alive; - let (tx, rx) = unbounded::>(); - let client = WorkerClient::new(idx, tx); - let conn = client.conn.clone(); - let sslrate = client.sslrate.clone(); - let handlers: Vec<_> = self.handlers.iter().map(|v| v.clone()).collect(); - - let addr = Arbiter::start(move |ctx: &mut Context<_>| { - let s = ServerSettings::new(Some(addr), &host, false); - let apps: Vec<_> = - (*factory)().into_iter().map(|h| h.into_handler()).collect(); - ctx.add_message_stream(rx); - let inner = WorkerInner::new(apps, handlers, ka, s, conn, sslrate, notify); - Worker { - inner: Box::new(inner), - } - }); - - (client, addr) - } -} - #[derive(Clone)] pub(crate) struct WorkerClient { pub idx: usize, tx: UnboundedSender>, - pub conn: Arc, - pub sslrate: Arc, + conns: Connections, } impl WorkerClient { - fn new(idx: usize, tx: UnboundedSender>) -> Self { - WorkerClient { - idx, - tx, - conn: Arc::new(AtomicUsize::new(0)), - sslrate: Arc::new(AtomicUsize::new(0)), - } + pub fn new( + idx: usize, tx: UnboundedSender>, conns: Connections, + ) -> Self { + WorkerClient { idx, tx, conns } } pub fn send( @@ -219,12 +44,8 @@ impl WorkerClient { self.tx.unbounded_send(msg) } - pub fn available(&self, maxconn: usize, maxsslrate: usize) -> bool { - if maxsslrate <= self.sslrate.load(Ordering::Relaxed) { - false - } else { - maxconn > self.conn.load(Ordering::Relaxed) - } + pub fn available(&self) -> bool { + self.conns.available() } } @@ -243,21 +64,21 @@ impl Message for StopWorker { /// Worker accepts Socket objects via unbounded channel and start requests /// processing. pub(crate) struct Worker { - inner: Box, + conns: Connections, + handlers: Vec>, } impl Actor for Worker { type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - self.update_date(ctx); - } } impl Worker { - fn update_date(&self, ctx: &mut Context) { - self.inner.update_date(); - ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_date(ctx)); + pub(crate) fn new(conns: Connections, handlers: Vec>) -> Self { + Worker { conns, handlers } + } + + fn shutdown(&self, force: bool) { + self.handlers.iter().for_each(|h| h.shutdown(force)); } fn shutdown_timeout( @@ -265,7 +86,7 @@ impl Worker { ) { // sleep for 1 second and then check again ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { - let num = slf.inner.num_channels(); + let num = slf.conns.num_connections(); if num == 0 { let _ = tx.send(true); Arbiter::current().do_send(StopArbiter(0)); @@ -273,7 +94,7 @@ impl Worker { slf.shutdown_timeout(ctx, tx, d); } else { info!("Force shutdown http worker, {} connections", num); - slf.inner.force_shutdown(); + slf.shutdown(true); let _ = tx.send(false); Arbiter::current().do_send(StopArbiter(0)); } @@ -285,7 +106,7 @@ impl Handler> for Worker { type Result = (); fn handle(&mut self, msg: Conn, _: &mut Context) { - self.inner.handle_connect(msg) + self.handlers[msg.handler.0].handle(msg.token, msg.io, msg.peer) } } @@ -294,253 +115,25 @@ impl Handler for Worker { type Result = Response; fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { - let num = self.inner.num_channels(); + let num = self.conns.num_connections(); if num == 0 { info!("Shutting down http worker, 0 connections"); Response::reply(Ok(true)) } else if let Some(dur) = msg.graceful { - info!("Graceful http worker shutdown, {} connections", num); + self.shutdown(false); let (tx, rx) = oneshot::channel(); - self.shutdown_timeout(ctx, tx, dur); - Response::async(rx.map_err(|_| ())) + let num = self.conns.num_connections(); + if num != 0 { + info!("Graceful http worker shutdown, {} connections", num); + self.shutdown_timeout(ctx, tx, dur); + Response::reply(Ok(true)) + } else { + Response::async(rx.map_err(|_| ())) + } } else { info!("Force shutdown http worker, {} connections", num); - self.inner.force_shutdown(); + self.shutdown(true); Response::reply(Ok(false)) } } } - -trait WorkerHandler { - fn update_date(&self); - - fn handle_connect(&mut self, Conn); - - fn force_shutdown(&self); - - fn num_channels(&self) -> usize; -} - -struct WorkerInner -where - H: HttpHandler + 'static, -{ - settings: Rc>, - socks: Vec>>, - tcp_ka: Option, -} - -impl WorkerInner { - pub(crate) fn new( - h: Vec, socks: Vec>>, - keep_alive: KeepAlive, settings: ServerSettings, conn: Arc, - sslrate: Arc, notify: AcceptNotify, - ) -> WorkerInner { - let tcp_ka = if let KeepAlive::Tcp(val) = keep_alive { - Some(time::Duration::new(val as u64, 0)) - } else { - None - }; - - WorkerInner { - settings: Rc::new(WorkerSettings::new( - h, keep_alive, settings, notify, conn, sslrate, - )), - socks, - tcp_ka, - } - } -} - -impl WorkerHandler for WorkerInner -where - H: HttpHandler + 'static, -{ - fn update_date(&self) { - self.settings.update_date(); - } - - fn handle_connect(&mut self, msg: Conn) { - if self.tcp_ka.is_some() && msg.io.set_keepalive(self.tcp_ka).is_err() { - error!("Can not set socket keep-alive option"); - } - self.socks[msg.token.0].handle(Rc::clone(&self.settings), msg.io, msg.peer); - } - - fn num_channels(&self) -> usize { - self.settings.num_channels() - } - - fn force_shutdown(&self) { - self.settings.head().traverse::(); - } -} - -struct SimpleHandler { - addr: net::SocketAddr, - io: PhantomData, -} - -impl Clone for SimpleHandler { - fn clone(&self) -> Self { - SimpleHandler { - addr: self.addr, - io: PhantomData, - } - } -} - -impl SimpleHandler { - fn new(addr: net::SocketAddr) -> Self { - SimpleHandler { - addr, - io: PhantomData, - } - } -} - -impl IoStreamHandler for SimpleHandler -where - H: HttpHandler, - Io: IntoAsyncIo + Send + 'static, - Io::Io: IoStream, -{ - fn addr(&self) -> net::SocketAddr { - self.addr - } - - fn clone(&self) -> Box> { - Box::new(Clone::clone(self)) - } - - fn scheme(&self) -> &'static str { - "http" - } - - fn handle(&self, h: Rc>, io: Io, peer: Option) { - let mut io = match io.into_async_io() { - Ok(io) => io, - Err(err) => { - trace!("Failed to create async io: {}", err); - return; - } - }; - let _ = io.set_nodelay(true); - - current_thread::spawn(HttpChannel::new(h, io, peer)); - } -} - -struct StreamHandler { - acceptor: A, - addr: net::SocketAddr, - io: PhantomData, -} - -impl> StreamHandler { - fn new(addr: net::SocketAddr, acceptor: A) -> Self { - StreamHandler { - addr, - acceptor, - io: PhantomData, - } - } -} - -impl> Clone for StreamHandler { - fn clone(&self) -> Self { - StreamHandler { - addr: self.addr, - acceptor: self.acceptor.clone(), - io: PhantomData, - } - } -} - -impl IoStreamHandler for StreamHandler -where - H: HttpHandler, - Io: IntoAsyncIo + Send + 'static, - Io::Io: IoStream, - A: AcceptorService + Send + 'static, -{ - fn addr(&self) -> net::SocketAddr { - self.addr - } - - fn clone(&self) -> Box> { - Box::new(Clone::clone(self)) - } - - fn scheme(&self) -> &'static str { - self.acceptor.scheme() - } - - fn handle(&self, h: Rc>, io: Io, peer: Option) { - let mut io = match io.into_async_io() { - Ok(io) => io, - Err(err) => { - trace!("Failed to create async io: {}", err); - return; - } - }; - let _ = io.set_nodelay(true); - - h.conn_rate_add(); - current_thread::spawn(self.acceptor.accept(io).then(move |res| { - h.conn_rate_del(); - match res { - Ok(io) => current_thread::spawn(HttpChannel::new(h, io, peer)), - Err(err) => trace!("Can not establish connection: {}", err), - } - Ok(()) - })) - } -} - -impl IoStreamHandler for Box> -where - H: HttpHandler, - Io: IntoAsyncIo, -{ - fn addr(&self) -> net::SocketAddr { - self.as_ref().addr() - } - - fn clone(&self) -> Box> { - self.as_ref().clone() - } - - fn scheme(&self) -> &'static str { - self.as_ref().scheme() - } - - fn handle(&self, h: Rc>, io: Io, peer: Option) { - self.as_ref().handle(h, io, peer) - } -} - -pub(crate) trait IoStreamHandler: Send -where - H: HttpHandler, -{ - fn clone(&self) -> Box>; - - fn addr(&self) -> net::SocketAddr; - - fn scheme(&self) -> &'static str; - - fn handle(&self, h: Rc>, io: Io, peer: Option); -} - -fn create_tcp_listener( - addr: net::SocketAddr, backlog: i32, -) -> io::Result { - let builder = match addr { - net::SocketAddr::V4(_) => TcpBuilder::new_v4()?, - net::SocketAddr::V6(_) => TcpBuilder::new_v6()?, - }; - builder.reuse_address(true)?; - builder.bind(addr)?; - Ok(builder.listen(backlog)?) -} diff --git a/src/test.rs b/src/test.rs index 42f511749..92aa6c8d2 100644 --- a/src/test.rs +++ b/src/test.rs @@ -17,6 +17,8 @@ use tokio::runtime::current_thread::Runtime; use openssl::ssl::SslAcceptorBuilder; #[cfg(feature = "rust-tls")] use rustls::ServerConfig; +#[cfg(feature = "alpn")] +use server::OpensslAcceptor; #[cfg(feature = "rust-tls")] use server::RustlsAcceptor; @@ -326,7 +328,7 @@ impl TestServerBuilder { config(&mut app); vec![app] }).workers(1) - .disable_signals(); + .disable_signals(); tx.send((System::current(), addr, TestServer::get_conn())) .unwrap(); @@ -336,7 +338,7 @@ impl TestServerBuilder { let ssl = self.ssl.take(); if let Some(ssl) = ssl { let tcp = net::TcpListener::bind(addr).unwrap(); - srv = srv.listen_ssl(tcp, ssl).unwrap(); + srv = srv.listen_with(tcp, OpensslAcceptor::new(ssl).unwrap()); } } #[cfg(feature = "rust-tls")] @@ -344,7 +346,7 @@ impl TestServerBuilder { let ssl = self.rust_ssl.take(); if let Some(ssl) = ssl { let tcp = net::TcpListener::bind(addr).unwrap(); - srv = srv.listen_with(tcp, RustlsAcceptor::new(ssl)).unwrap(); + srv = srv.listen_with(tcp, RustlsAcceptor::new(ssl)); } } if !has_ssl { @@ -722,8 +724,9 @@ impl TestRequest { /// This method generates `HttpRequest` instance and executes handler pub fn execute(self, f: F) -> Result - where F: FnOnce(&HttpRequest) -> R, - R: Responder + 'static, + where + F: FnOnce(&HttpRequest) -> R, + R: Responder + 'static, { let req = self.finish(); let resp = f(&req);