diff --git a/src/lib.rs b/src/lib.rs index 2f694c7d7..60533ff39 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ extern crate tokio_timer; extern crate tower_service; extern crate trust_dns_resolver; +#[allow(unused_imports)] #[macro_use] extern crate actix; diff --git a/src/server.rs b/src/server.rs index c61a3f56d..c80410054 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,3 @@ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; use std::time::Duration; use std::{fmt, io, mem, net}; @@ -18,7 +14,7 @@ use actix::{ use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::server_service::{self, ServerNewService, ServerServiceFactory}; -use super::worker::{Conn, StopWorker, Worker, WorkerClient}; +use super::worker::{Conn, StopWorker, Worker, WorkerAvailability, WorkerClient}; use super::NewService; use super::{PauseServer, ResumeServer, StopServer, Token}; @@ -258,14 +254,14 @@ impl Server { fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr, WorkerClient) { let (tx, rx) = unbounded::(); - let conns = Connections::new(notify, 0, 0); - let worker = WorkerClient::new(idx, tx, conns.clone()); + let avail = WorkerAvailability::new(notify); + let worker = WorkerClient::new(idx, tx, avail.clone()); let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); let addr = Arbiter::start(move |ctx: &mut Context<_>| { ctx.add_message_stream(rx); - Worker::new(ctx, services) + Worker::new(ctx, services, avail) }); (addr, worker) @@ -413,68 +409,6 @@ impl StreamHandler for Server { } } -#[derive(Clone, Default)] -/// Contains information about connection. -pub struct Connections(Arc); - -impl Connections { - fn new(notify: AcceptNotify, maxconn: usize, maxconnrate: usize) -> Self { - let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 }; - let maxconnrate_low = if maxconnrate > 10 { - maxconnrate - 10 - } else { - 0 - }; - - Connections(Arc::new(ConnectionsInner { - notify, - maxconn, - maxconnrate, - maxconn_low, - maxconnrate_low, - conn: AtomicUsize::new(0), - connrate: AtomicUsize::new(0), - })) - } - - pub(crate) fn available(&self) -> bool { - self.0.available() - } -} - -#[derive(Default)] -struct ConnectionsInner { - notify: AcceptNotify, - conn: AtomicUsize, - connrate: AtomicUsize, - maxconn: usize, - maxconnrate: usize, - maxconn_low: usize, - maxconnrate_low: usize, -} - -impl ConnectionsInner { - fn available(&self) -> bool { - if self.maxconnrate <= self.connrate.load(Ordering::Relaxed) { - false - } else { - self.maxconn > self.conn.load(Ordering::Relaxed) - } - } - - fn notify_maxconn(&self, maxconn: usize) { - if maxconn > self.maxconn_low && maxconn <= self.maxconn { - self.notify.notify(); - } - } - - fn notify_maxconnrate(&self, connrate: usize) { - if connrate > self.maxconnrate_low && connrate <= self.maxconnrate { - self.notify.notify(); - } - } -} - fn bind_addr(addr: S) -> io::Result> { let mut err = None; let mut succ = false; diff --git a/src/server_service.rs b/src/server_service.rs index e6292f64f..6debdbdb5 100644 --- a/src/server_service.rs +++ b/src/server_service.rs @@ -3,16 +3,23 @@ use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, net}; +use futures::future::{err, ok}; use futures::task::AtomicTask; -use futures::{future, Async, Future, Poll}; +use futures::{Async, Future, Poll}; use tokio_reactor::Handle; use tokio_tcp::TcpStream; use super::{NewService, Service}; +pub enum ServerMessage { + Connect(net::TcpStream), + Shutdown, + ForceShutdown, +} + pub(crate) type BoxedServerService = Box< Service< - Request = net::TcpStream, + Request = ServerMessage, Response = (), Error = (), Future = Box>, @@ -59,7 +66,7 @@ where T::Future: 'static, T::Error: fmt::Display + 'static, { - type Request = net::TcpStream; + type Request = ServerMessage; type Response = (); type Error = (); type Future = Box>; @@ -72,22 +79,27 @@ where } } - fn call(&mut self, stream: net::TcpStream) -> Self::Future { - let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { - error!("Can not convert to an async tcp stream: {}", e); - }); + fn call(&mut self, req: ServerMessage) -> Self::Future { + match req { + ServerMessage::Connect(stream) => { + let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| { + error!("Can not convert to an async tcp stream: {}", e); + }); - if let Ok(stream) = stream { - let guard = self.counter.get(); + if let Ok(stream) = stream { + let guard = self.counter.get(); - Box::new( - self.service - .call(stream) - .map_err(|_| ()) - .map(move |_| drop(guard)), - ) - } else { - Box::new(future::err(())) + Box::new( + self.service + .call(stream) + .map_err(|_| ()) + .map(move |_| drop(guard)), + ) + } else { + Box::new(err(())) + } + } + _ => Box::new(ok(())), } } } @@ -133,14 +145,10 @@ where } fn create(&self) -> Box> { - Box::new( - (self.inner)() - .new_service() - .map(move |inner| { - let service: BoxedServerService = Box::new(ServerService::new(inner)); - service - }), - ) + Box::new((self.inner)().new_service().map(move |inner| { + let service: BoxedServerService = Box::new(ServerService::new(inner)); + service + })) } } diff --git a/src/worker.rs b/src/worker.rs index f88f4db8a..9cc054f39 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,8 +1,10 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::{net, time}; use futures::sync::mpsc::{SendError, UnboundedSender}; use futures::sync::oneshot; -use futures::{future, Future}; +use futures::{future, Async, Future, Poll}; use actix::msgs::StopArbiter; use actix::{ @@ -10,8 +12,9 @@ use actix::{ Response, WrapFuture, }; -use super::server_service::{self, BoxedServerService, ServerServiceFactory}; -use super::{server::Connections, Token}; +use super::accept::AcceptNotify; +use super::server_service::{self, BoxedServerService, ServerMessage, ServerServiceFactory}; +use super::Token; #[derive(Message)] pub(crate) struct Conn { @@ -25,12 +28,12 @@ pub(crate) struct Conn { pub(crate) struct WorkerClient { pub idx: usize, tx: UnboundedSender, - conns: Connections, + avail: WorkerAvailability, } impl WorkerClient { - pub fn new(idx: usize, tx: UnboundedSender, conns: Connections) -> Self { - WorkerClient { idx, tx, conns } + pub fn new(idx: usize, tx: UnboundedSender, avail: WorkerAvailability) -> Self { + WorkerClient { idx, tx, avail } } pub fn send(&self, msg: Conn) -> Result<(), SendError> { @@ -38,7 +41,33 @@ impl WorkerClient { } pub fn available(&self) -> bool { - self.conns.available() + self.avail.available() + } +} + +#[derive(Clone)] +pub(crate) struct WorkerAvailability { + notify: AcceptNotify, + available: Arc, +} + +impl WorkerAvailability { + pub fn new(notify: AcceptNotify) -> Self { + WorkerAvailability { + notify, + available: Arc::new(AtomicBool::new(false)), + } + } + + pub fn available(&self) -> bool { + self.available.load(Ordering::Acquire) + } + + pub fn set(&self, val: bool) { + let old = self.available.swap(val, Ordering::Release); + if !old && val { + self.notify.notify() + } } } @@ -57,9 +86,8 @@ impl Message for StopWorker { /// Worker accepts Socket objects via unbounded channel and start requests /// processing. pub(crate) struct Worker { - // conns: Connections, services: Vec, - // counters: Vec>, + availability: WorkerAvailability, } impl Actor for Worker { @@ -69,10 +97,11 @@ impl Actor for Worker { impl Worker { pub(crate) fn new( ctx: &mut Context, services: Vec>, + availability: WorkerAvailability, ) -> Self { let wrk = Worker { + availability, services: Vec::new(), - // counters: services.iter().map(|i| i.counter()).collect(), }; ctx.wait( @@ -82,8 +111,10 @@ impl Worker { error!("Can not start worker: {:?}", e); Arbiter::current().do_send(StopArbiter(0)); ctx.stop(); - }).and_then(|services, act, _| { + }).and_then(|services, act, ctx| { act.services.extend(services); + act.availability.set(true); + ctx.spawn(CheckReadiness(true)); fut::ok(()) }), ); @@ -91,12 +122,20 @@ impl Worker { wrk } - fn shutdown(&self, _force: bool) { - // self.services.iter().for_each(|h| h.shutdown(force)); + fn shutdown(&mut self, force: bool) { + if force { + self.services.iter_mut().for_each(|h| { + h.call(ServerMessage::ForceShutdown); + }); + } else { + self.services.iter_mut().for_each(|h| { + h.call(ServerMessage::Shutdown); + }); + } } fn shutdown_timeout( - &self, ctx: &mut Context, tx: oneshot::Sender, dur: time::Duration, + &mut self, ctx: &mut Context, tx: oneshot::Sender, dur: time::Duration, ) { // sleep for 1 second and then check again ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { @@ -120,7 +159,7 @@ impl Handler for Worker { type Result = (); fn handle(&mut self, msg: Conn, _: &mut Context) { - Arbiter::spawn(self.services[msg.handler.0].call(msg.io)) + Arbiter::spawn(self.services[msg.handler.0].call(ServerMessage::Connect(msg.io))) } } @@ -151,3 +190,26 @@ impl Handler for Worker { } } } + +struct CheckReadiness(bool); + +impl ActorFuture for CheckReadiness { + type Item = (); + type Error = (); + type Actor = Worker; + + fn poll(&mut self, act: &mut Worker, _: &mut Context) -> Poll<(), ()> { + let mut val = true; + for service in &mut act.services { + if let Ok(Async::NotReady) = service.poll_ready() { + val = false; + break; + } + } + if self.0 != val { + self.0 = val; + act.availability.set(val); + } + Ok(Async::NotReady) + } +}