diff --git a/src/server/server.rs b/src/server/server.rs index 09e795d03..834fa747c 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -249,15 +249,16 @@ impl Server { } fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { - let (tx, rx) = unbounded(); + let (tx1, rx1) = unbounded(); + let (tx2, rx2) = unbounded(); let timeout = self.shutdown_timeout; let avail = WorkerAvailability::new(notify); - let worker = WorkerClient::new(idx, tx, avail.clone()); + let worker = WorkerClient::new(idx, tx1, tx2, avail.clone()); let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(move || { - Worker::start(rx, services, avail, timeout.clone()); + Worker::start(rx1, rx2, services, avail, timeout.clone()); Ok::<_, ()>(()) })); @@ -317,6 +318,7 @@ impl Handler for Server { type Result = Response<(), ()>; fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { + println!("STOP command"); // stop accept thread self.accept.send(Command::Stop); diff --git a/src/server/worker.rs b/src/server/worker.rs index 68e9cbf7a..e9d8f8c8e 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -16,11 +16,13 @@ use super::services::{BoxedServerService, InternalServiceFactory, ServerMessage} use super::Token; use counter::Counter; -pub(crate) enum WorkerCommand { - Message(Conn), - /// Stop worker message. Returns `true` on successful shutdown - /// and `false` if some connections still alive. - Stop(bool, oneshot::Sender), +pub(crate) struct WorkerCommand(Conn); + +/// Stop worker message. Returns `true` on successful shutdown +/// and `false` if some connections still alive. +pub(crate) struct StopCommand { + graceful: bool, + result: oneshot::Sender, } #[derive(Debug, Message)] @@ -55,26 +57,30 @@ thread_local! { #[derive(Clone)] pub(crate) struct WorkerClient { pub idx: usize, - tx: UnboundedSender, + tx1: UnboundedSender, + tx2: UnboundedSender, avail: WorkerAvailability, } impl WorkerClient { pub fn new( idx: usize, - tx: UnboundedSender, + tx1: UnboundedSender, + tx2: UnboundedSender, avail: WorkerAvailability, ) -> Self { - WorkerClient { idx, tx, avail } + WorkerClient { + idx, + tx1, + tx2, + avail, + } } pub fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx - .unbounded_send(WorkerCommand::Message(msg)) - .map_err(|e| match e.into_inner() { - WorkerCommand::Message(msg) => msg, - _ => panic!(), - }) + self.tx1 + .unbounded_send(WorkerCommand(msg)) + .map_err(|msg| msg.into_inner().0) } pub fn available(&self) -> bool { @@ -82,8 +88,8 @@ impl WorkerClient { } pub fn stop(&self, graceful: bool) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - let _ = self.tx.unbounded_send(WorkerCommand::Stop(graceful, tx)); + let (result, rx) = oneshot::channel(); + let _ = self.tx2.unbounded_send(StopCommand { graceful, result }); rx } } @@ -120,6 +126,7 @@ impl WorkerAvailability { /// processing. pub(crate) struct Worker { rx: UnboundedReceiver, + rx2: UnboundedReceiver, services: Vec, availability: WorkerAvailability, conns: Counter, @@ -131,6 +138,7 @@ pub(crate) struct Worker { impl Worker { pub(crate) fn start( rx: UnboundedReceiver, + rx2: UnboundedReceiver, factories: Vec>, availability: WorkerAvailability, shutdown_timeout: time::Duration, @@ -138,6 +146,7 @@ impl Worker { availability.set(false); let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker { rx, + rx2, availability, factories, shutdown_timeout, @@ -216,6 +225,39 @@ impl Future for Worker { type Error = (); fn poll(&mut self) -> Poll { + // `StopWorker` message handler + match self.rx2.poll() { + Ok(Async::Ready(Some(StopCommand { graceful, result }))) => { + self.availability.set(false); + let num = num_connections(); + if num == 0 { + info!("Shutting down worker, 0 connections"); + let _ = result.send(true); + return Ok(Async::Ready(())); + } else if graceful { + self.shutdown(false); + let num = num_connections(); + if num != 0 { + info!("Graceful worker shutdown, {} connections", num); + self.state = WorkerState::Shutdown( + sleep(time::Duration::from_secs(1)), + sleep(self.shutdown_timeout), + result, + ); + } else { + let _ = result.send(true); + return Ok(Async::Ready(())); + } + } else { + info!("Force shutdown worker, {} connections", num); + self.shutdown(true); + let _ = result.send(false); + return Ok(Async::Ready(())); + } + } + _ => (), + } + let state = mem::replace(&mut self.state, WorkerState::None); match state { @@ -321,7 +363,7 @@ impl Future for Worker { loop { match self.rx.poll() { // handle incoming tcp stream - Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => { + Ok(Async::Ready(Some(WorkerCommand(msg)))) => { match self.check_readiness(false) { Ok(true) => { let guard = self.conns.get(); @@ -348,35 +390,6 @@ impl Future for Worker { } return self.poll(); } - // `StopWorker` message handler - Ok(Async::Ready(Some(WorkerCommand::Stop(graceful, tx)))) => { - self.availability.set(false); - let num = num_connections(); - if num == 0 { - info!("Shutting down worker, 0 connections"); - let _ = tx.send(true); - return Ok(Async::Ready(())); - } else if graceful { - self.shutdown(false); - let num = num_connections(); - if num != 0 { - info!("Graceful worker shutdown, {} connections", num); - break Some(WorkerState::Shutdown( - sleep(time::Duration::from_secs(1)), - sleep(self.shutdown_timeout), - tx, - )); - } else { - let _ = tx.send(true); - return Ok(Async::Ready(())); - } - } else { - info!("Force shutdown worker, {} connections", num); - self.shutdown(true); - let _ = tx.send(false); - return Ok(Async::Ready(())); - } - } Ok(Async::NotReady) => { self.state = WorkerState::Available; return Ok(Async::NotReady); @@ -387,7 +400,5 @@ impl Future for Worker { } WorkerState::None => panic!(), }; - - Ok(Async::NotReady) } }