1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-17 04:35:57 +00:00

separate stop worker channel

This commit is contained in:
Nikolay Kim 2018-11-01 15:33:35 -07:00
parent 60144a3cb8
commit 0e3d1068da
2 changed files with 64 additions and 51 deletions

View file

@ -249,15 +249,16 @@ impl Server {
} }
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient {
let (tx, rx) = unbounded(); let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let timeout = self.shutdown_timeout; let timeout = self.shutdown_timeout;
let avail = WorkerAvailability::new(notify); let avail = WorkerAvailability::new(notify);
let worker = WorkerClient::new(idx, tx, avail.clone()); let worker = WorkerClient::new(idx, tx1, tx2, avail.clone());
let services: Vec<Box<InternalServiceFactory>> = let services: Vec<Box<InternalServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(move || { Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(move || {
Worker::start(rx, services, avail, timeout.clone()); Worker::start(rx1, rx2, services, avail, timeout.clone());
Ok::<_, ()>(()) Ok::<_, ()>(())
})); }));
@ -317,6 +318,7 @@ impl Handler<StopServer> for Server {
type Result = Response<(), ()>; type Result = Response<(), ()>;
fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
println!("STOP command");
// stop accept thread // stop accept thread
self.accept.send(Command::Stop); self.accept.send(Command::Stop);

View file

@ -16,11 +16,13 @@ use super::services::{BoxedServerService, InternalServiceFactory, ServerMessage}
use super::Token; use super::Token;
use counter::Counter; use counter::Counter;
pub(crate) enum WorkerCommand { pub(crate) struct WorkerCommand(Conn);
Message(Conn),
/// Stop worker message. Returns `true` on successful shutdown /// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive. /// and `false` if some connections still alive.
Stop(bool, oneshot::Sender<bool>), pub(crate) struct StopCommand {
graceful: bool,
result: oneshot::Sender<bool>,
} }
#[derive(Debug, Message)] #[derive(Debug, Message)]
@ -55,26 +57,30 @@ thread_local! {
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct WorkerClient { pub(crate) struct WorkerClient {
pub idx: usize, pub idx: usize,
tx: UnboundedSender<WorkerCommand>, tx1: UnboundedSender<WorkerCommand>,
tx2: UnboundedSender<StopCommand>,
avail: WorkerAvailability, avail: WorkerAvailability,
} }
impl WorkerClient { impl WorkerClient {
pub fn new( pub fn new(
idx: usize, idx: usize,
tx: UnboundedSender<WorkerCommand>, tx1: UnboundedSender<WorkerCommand>,
tx2: UnboundedSender<StopCommand>,
avail: WorkerAvailability, avail: WorkerAvailability,
) -> Self { ) -> Self {
WorkerClient { idx, tx, avail } WorkerClient {
idx,
tx1,
tx2,
avail,
}
} }
pub fn send(&self, msg: Conn) -> Result<(), Conn> { pub fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx self.tx1
.unbounded_send(WorkerCommand::Message(msg)) .unbounded_send(WorkerCommand(msg))
.map_err(|e| match e.into_inner() { .map_err(|msg| msg.into_inner().0)
WorkerCommand::Message(msg) => msg,
_ => panic!(),
})
} }
pub fn available(&self) -> bool { pub fn available(&self) -> bool {
@ -82,8 +88,8 @@ impl WorkerClient {
} }
pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> { pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel(); let (result, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(WorkerCommand::Stop(graceful, tx)); let _ = self.tx2.unbounded_send(StopCommand { graceful, result });
rx rx
} }
} }
@ -120,6 +126,7 @@ impl WorkerAvailability {
/// processing. /// processing.
pub(crate) struct Worker { pub(crate) struct Worker {
rx: UnboundedReceiver<WorkerCommand>, rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>,
services: Vec<BoxedServerService>, services: Vec<BoxedServerService>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Counter, conns: Counter,
@ -131,6 +138,7 @@ pub(crate) struct Worker {
impl Worker { impl Worker {
pub(crate) fn start( pub(crate) fn start(
rx: UnboundedReceiver<WorkerCommand>, rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>,
factories: Vec<Box<InternalServiceFactory>>, factories: Vec<Box<InternalServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
shutdown_timeout: time::Duration, shutdown_timeout: time::Duration,
@ -138,6 +146,7 @@ impl Worker {
availability.set(false); availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker { let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
rx, rx,
rx2,
availability, availability,
factories, factories,
shutdown_timeout, shutdown_timeout,
@ -216,6 +225,39 @@ impl Future for Worker {
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// `StopWorker` message handler
match self.rx2.poll() {
Ok(Async::Ready(Some(StopCommand { graceful, result }))) => {
self.availability.set(false);
let num = num_connections();
if num == 0 {
info!("Shutting down worker, 0 connections");
let _ = result.send(true);
return Ok(Async::Ready(()));
} else if graceful {
self.shutdown(false);
let num = num_connections();
if num != 0 {
info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown(
sleep(time::Duration::from_secs(1)),
sleep(self.shutdown_timeout),
result,
);
} else {
let _ = result.send(true);
return Ok(Async::Ready(()));
}
} else {
info!("Force shutdown worker, {} connections", num);
self.shutdown(true);
let _ = result.send(false);
return Ok(Async::Ready(()));
}
}
_ => (),
}
let state = mem::replace(&mut self.state, WorkerState::None); let state = mem::replace(&mut self.state, WorkerState::None);
match state { match state {
@ -321,7 +363,7 @@ impl Future for Worker {
loop { loop {
match self.rx.poll() { match self.rx.poll() {
// handle incoming tcp stream // handle incoming tcp stream
Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => { Ok(Async::Ready(Some(WorkerCommand(msg)))) => {
match self.check_readiness(false) { match self.check_readiness(false) {
Ok(true) => { Ok(true) => {
let guard = self.conns.get(); let guard = self.conns.get();
@ -348,35 +390,6 @@ impl Future for Worker {
} }
return self.poll(); return self.poll();
} }
// `StopWorker` message handler
Ok(Async::Ready(Some(WorkerCommand::Stop(graceful, tx)))) => {
self.availability.set(false);
let num = num_connections();
if num == 0 {
info!("Shutting down worker, 0 connections");
let _ = tx.send(true);
return Ok(Async::Ready(()));
} else if graceful {
self.shutdown(false);
let num = num_connections();
if num != 0 {
info!("Graceful worker shutdown, {} connections", num);
break Some(WorkerState::Shutdown(
sleep(time::Duration::from_secs(1)),
sleep(self.shutdown_timeout),
tx,
));
} else {
let _ = tx.send(true);
return Ok(Async::Ready(()));
}
} else {
info!("Force shutdown worker, {} connections", num);
self.shutdown(true);
let _ = tx.send(false);
return Ok(Async::Ready(()));
}
}
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
self.state = WorkerState::Available; self.state = WorkerState::Available;
return Ok(Async::NotReady); return Ok(Async::NotReady);
@ -387,7 +400,5 @@ impl Future for Worker {
} }
WorkerState::None => panic!(), WorkerState::None => panic!(),
}; };
Ok(Async::NotReady)
} }
} }