1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-02 05:18:44 +00:00

refactor worker availibility management

This commit is contained in:
Nikolay Kim 2018-09-13 23:46:01 -07:00
parent 2764323580
commit ec7757f032
5 changed files with 245 additions and 169 deletions

View file

@ -41,7 +41,6 @@ actix = "0.7.0"
log = "0.4" log = "0.4"
num_cpus = "1.0" num_cpus = "1.0"
failure = "^0.1.2"
# io # io
mio = "^0.6.13" mio = "^0.6.13"

View file

@ -18,8 +18,6 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate bytes; extern crate bytes;
// #[macro_use]
extern crate failure;
#[macro_use] #[macro_use]
extern crate futures; extern crate futures;
extern crate mio; extern crate mio;

View file

@ -367,11 +367,11 @@ impl Accept {
while !self.workers.is_empty() { while !self.workers.is_empty() {
match self.workers[self.next].send(msg) { match self.workers[self.next].send(msg) {
Ok(_) => (), Ok(_) => (),
Err(err) => { Err(tmp) => {
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( let _ = self.srv.unbounded_send(ServerCommand::WorkerDied(
self.workers[self.next].idx, self.workers[self.next].idx,
)); ));
msg = err.into_inner(); msg = tmp;
self.workers.swap_remove(self.next); self.workers.swap_remove(self.next);
if self.workers.is_empty() { if self.workers.is_empty() {
error!("No workers"); error!("No workers");
@ -395,11 +395,11 @@ impl Accept {
self.next = (self.next + 1) % self.workers.len(); self.next = (self.next + 1) % self.workers.len();
return; return;
} }
Err(err) => { Err(tmp) => {
let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( let _ = self.srv.unbounded_send(ServerCommand::WorkerDied(
self.workers[self.next].idx, self.workers[self.next].idx,
)); ));
msg = err.into_inner(); msg = tmp;
self.workers.swap_remove(self.next); self.workers.swap_remove(self.next);
if self.workers.is_empty() { if self.workers.is_empty() {
error!("No workers"); error!("No workers");

View file

@ -7,13 +7,13 @@ use net2::TcpBuilder;
use num_cpus; use num_cpus;
use actix::{ use actix::{
actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, actors::signal, fut, msgs::Execute, Actor, ActorFuture, Addr, Arbiter, AsyncContext,
Response, StreamHandler, System, WrapFuture, Context, Handler, Response, StreamHandler, System, WrapFuture,
}; };
use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::accept::{AcceptLoop, AcceptNotify, Command};
use super::services::{InternalServerServiceFactory, ServerNewService, ServerServiceFactory}; use super::services::{InternalServerServiceFactory, ServerNewService, ServerServiceFactory};
use super::worker::{self, Conn, StopWorker, Worker, WorkerAvailability, WorkerClient}; use super::worker::{self, Worker, WorkerAvailability, WorkerClient};
use super::{PauseServer, ResumeServer, StopServer, Token}; use super::{PauseServer, ResumeServer, StopServer, Token};
pub(crate) enum ServerCommand { pub(crate) enum ServerCommand {
@ -23,7 +23,7 @@ pub(crate) enum ServerCommand {
/// Server /// Server
pub struct Server { pub struct Server {
threads: usize, threads: usize,
workers: Vec<(usize, Addr<Worker>)>, workers: Vec<(usize, WorkerClient)>,
services: Vec<Box<InternalServerServiceFactory>>, services: Vec<Box<InternalServerServiceFactory>>,
sockets: Vec<(Token, net::TcpListener)>, sockets: Vec<(Token, net::TcpListener)>,
accept: AcceptLoop, accept: AcceptLoop,
@ -183,9 +183,9 @@ impl Server {
// start workers // start workers
let mut workers = Vec::new(); let mut workers = Vec::new();
for idx in 0..self.threads { for idx in 0..self.threads {
let (addr, worker) = self.start_worker(idx, self.accept.get_notify()); let worker = self.start_worker(idx, self.accept.get_notify());
workers.push(worker); workers.push(worker.clone());
self.workers.push((idx, addr)); self.workers.push((idx, worker));
} }
// start accept thread // start accept thread
@ -222,19 +222,19 @@ impl Server {
} }
} }
fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr<Worker>, WorkerClient) { fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient {
let (tx, rx) = unbounded::<Conn>(); let (tx, rx) = unbounded();
let avail = WorkerAvailability::new(notify); let avail = WorkerAvailability::new(notify);
let worker = WorkerClient::new(idx, tx, avail.clone()); let worker = WorkerClient::new(idx, tx, avail.clone());
let services: Vec<Box<InternalServerServiceFactory>> = let services: Vec<Box<InternalServerServiceFactory>> =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
let addr = Arbiter::start(move |ctx: &mut Context<_>| { Arbiter::new(format!("actix-worker-{}", idx)).do_send(Execute::new(|| {
ctx.add_message_stream(rx); Worker::start(rx, services, avail);
Worker::new(ctx, services, avail) Ok::<_, ()>(())
}); }));
(addr, worker) worker
} }
} }
@ -306,7 +306,7 @@ impl Handler<StopServer> for Server {
ctx.spawn( ctx.spawn(
worker worker
.1 .1
.send(StopWorker { graceful: dur }) .stop(dur)
.into_actor(self) .into_actor(self)
.then(move |_, slf, ctx| { .then(move |_, slf, ctx| {
slf.workers.pop(); slf.workers.pop();
@ -370,8 +370,8 @@ impl StreamHandler<ServerCommand, ()> for Server {
break; break;
} }
let (addr, worker) = self.start_worker(new_idx, self.accept.get_notify()); let worker = self.start_worker(new_idx, self.accept.get_notify());
self.workers.push((new_idx, addr)); self.workers.push((new_idx, worker.clone()));
self.accept.send(Command::Worker(worker)); self.accept.send(Command::Worker(worker));
} }
} }

View file

@ -2,24 +2,29 @@ use std::cell::Cell;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::{net, time}; use std::{mem, net, time};
use futures::sync::mpsc::{SendError, UnboundedSender}; use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::task::AtomicTask; use futures::task::AtomicTask;
use futures::{future, Async, Future, Poll}; use futures::{future, Async, Future, Poll, Stream};
use tokio_current_thread::spawn; use tokio_current_thread::spawn;
use tokio_timer::{sleep, Delay};
use actix::msgs::StopArbiter; use actix::msgs::StopArbiter;
use actix::{ use actix::{Arbiter, Message};
fut, Actor, ActorContext, ActorFuture, Arbiter, AsyncContext, Context, Handler, Message,
Response, WrapFuture,
};
use super::accept::AcceptNotify; use super::accept::AcceptNotify;
use super::services::{BoxedServerService, InternalServerServiceFactory, ServerMessage}; use super::services::{BoxedServerService, InternalServerServiceFactory, ServerMessage};
use super::Token; use super::Token;
pub(crate) enum WorkerCommand {
Message(Conn),
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
Stop(Option<time::Duration>, oneshot::Sender<bool>),
}
#[derive(Debug, Message)] #[derive(Debug, Message)]
pub(crate) struct Conn { pub(crate) struct Conn {
pub io: net::TcpStream, pub io: net::TcpStream,
@ -52,22 +57,35 @@ thread_local! {
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct WorkerClient { pub(crate) struct WorkerClient {
pub idx: usize, pub idx: usize,
tx: UnboundedSender<Conn>, tx: UnboundedSender<WorkerCommand>,
avail: WorkerAvailability, avail: WorkerAvailability,
} }
impl WorkerClient { impl WorkerClient {
pub fn new(idx: usize, tx: UnboundedSender<Conn>, avail: WorkerAvailability) -> Self { pub fn new(
idx: usize, tx: UnboundedSender<WorkerCommand>, avail: WorkerAvailability,
) -> Self {
WorkerClient { idx, tx, avail } WorkerClient { idx, tx, avail }
} }
pub fn send(&self, msg: Conn) -> Result<(), SendError<Conn>> { pub fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx.unbounded_send(msg) self.tx
.unbounded_send(WorkerCommand::Message(msg))
.map_err(|e| match e.into_inner() {
WorkerCommand::Message(msg) => msg,
_ => panic!(),
})
} }
pub fn available(&self) -> bool { pub fn available(&self) -> bool {
self.avail.available() self.avail.available()
} }
pub fn stop(&self, graceful: Option<time::Duration>) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.unbounded_send(WorkerCommand::Stop(graceful, tx));
rx
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -96,69 +114,48 @@ impl WorkerAvailability {
} }
} }
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
pub(crate) struct StopWorker {
pub graceful: Option<time::Duration>,
}
impl Message for StopWorker {
type Result = Result<bool, ()>;
}
/// Http worker /// Http worker
/// ///
/// Worker accepts Socket objects via unbounded channel and start requests /// Worker accepts Socket objects via unbounded channel and start requests
/// processing. /// processing.
pub(crate) struct Worker { pub(crate) struct Worker {
rx: UnboundedReceiver<WorkerCommand>,
services: Vec<BoxedServerService>, services: Vec<BoxedServerService>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Connections, conns: Connections,
factories: Vec<Box<InternalServerServiceFactory>>, factories: Vec<Box<InternalServerServiceFactory>>,
} state: WorkerState,
impl Actor for Worker {
type Context = Context<Self>;
} }
impl Worker { impl Worker {
pub(crate) fn new( pub(crate) fn start(
ctx: &mut Context<Self>, factories: Vec<Box<InternalServerServiceFactory>>, rx: UnboundedReceiver<WorkerCommand>,
availability: WorkerAvailability, factories: Vec<Box<InternalServerServiceFactory>>, availability: WorkerAvailability,
) -> Self { ) {
availability.set(false); availability.set(false);
let wrk = MAX_CONNS_COUNTER.with(|conns| Worker { let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
rx,
availability, availability,
factories, factories,
services: Vec::new(), services: Vec::new(),
conns: conns.clone(), conns: conns.clone(),
state: WorkerState::Unavailable(Vec::new()),
}); });
let mut fut = Vec::new(); let mut fut = Vec::new();
for factory in &wrk.factories { for factory in &wrk.factories {
fut.push(factory.create()); fut.push(factory.create());
} }
ctx.wait( spawn(
future::join_all(fut) future::join_all(fut)
.into_actor(&wrk) .map_err(|e| {
.map_err(|e, _, ctx| {
error!("Can not start worker: {:?}", e); error!("Can not start worker: {:?}", e);
Arbiter::current().do_send(StopArbiter(0)); Arbiter::current().do_send(StopArbiter(0));
ctx.stop(); }).and_then(move |services| {
}).and_then(|services, act, ctx| { wrk.services.extend(services);
act.services.extend(services); wrk
let mut readiness = CheckReadiness {
avail: false,
idx: 0,
fut: None,
};
let _ = readiness.poll(act, ctx);
ctx.spawn(readiness);
fut::ok(())
}), }),
); );
wrk
} }
fn shutdown(&mut self, force: bool) { fn shutdown(&mut self, force: bool) {
@ -173,102 +170,10 @@ impl Worker {
} }
} }
fn shutdown_timeout( fn check_readiness(&mut self) -> Result<bool, usize> {
&mut self, ctx: &mut Context<Worker>, tx: oneshot::Sender<bool>, dur: time::Duration, let mut ready = self.conns.check();
) {
// sleep for 1 second and then check again
ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
let num = num_connections();
if num == 0 {
let _ = tx.send(true);
Arbiter::current().do_send(StopArbiter(0));
} else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
slf.shutdown_timeout(ctx, tx, d);
} else {
info!("Force shutdown http worker, {} connections", num);
slf.shutdown(true);
let _ = tx.send(false);
Arbiter::current().do_send(StopArbiter(0));
}
});
}
}
impl Handler<Conn> for Worker {
type Result = ();
fn handle(&mut self, msg: Conn, _: &mut Context<Self>) {
let guard = self.conns.get();
spawn(
self.services[msg.handler.0]
.call(ServerMessage::Connect(msg.io))
.map(|val| {
drop(guard);
val
}),
)
}
}
/// `StopWorker` message handler
impl Handler<StopWorker> for Worker {
type Result = Response<bool, ()>;
fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
let num = num_connections();
if num == 0 {
info!("Shutting down http worker, 0 connections");
Response::reply(Ok(true))
} else if let Some(dur) = msg.graceful {
self.shutdown(false);
let (tx, rx) = oneshot::channel();
let num = num_connections();
if num != 0 {
info!("Graceful http worker shutdown, {} connections", num);
self.shutdown_timeout(ctx, tx, dur);
Response::reply(Ok(true))
} else {
Response::async(rx.map_err(|_| ()))
}
} else {
info!("Force shutdown http worker, {} connections", num);
self.shutdown(true);
Response::reply(Ok(false))
}
}
}
struct CheckReadiness {
avail: bool,
idx: usize,
fut: Option<Box<Future<Item = BoxedServerService, Error = ()>>>,
}
impl ActorFuture for CheckReadiness {
type Item = ();
type Error = ();
type Actor = Worker;
fn poll(&mut self, act: &mut Worker, ctx: &mut Context<Worker>) -> Poll<(), ()> {
if self.fut.is_some() {
match self.fut.as_mut().unwrap().poll() {
Ok(Async::Ready(service)) => {
trace!("Service has been restarted");
act.services[self.idx] = service;
self.fut.take();
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => {
panic!("Can not restart service");
}
}
}
let mut ready = act.conns.check();
if ready {
// check if service is restarting
let mut failed = None; let mut failed = None;
for (idx, service) in act.services.iter_mut().enumerate() { for (idx, service) in self.services.iter_mut().enumerate() {
match service.poll_ready() { match service.poll_ready() {
Ok(Async::Ready(_)) => (), Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => ready = false, Ok(Async::NotReady) => ready = false,
@ -279,15 +184,189 @@ impl ActorFuture for CheckReadiness {
} }
} }
if let Some(idx) = failed { if let Some(idx) = failed {
self.idx = idx; Err(idx)
self.fut = Some(act.factories[idx].create()); } else {
return self.poll(act, ctx); Ok(ready)
} }
} }
if self.avail != ready {
self.avail = ready;
act.availability.set(ready);
} }
enum WorkerState {
None,
Available,
Unavailable(Vec<Conn>),
Restarting(usize, Box<Future<Item = BoxedServerService, Error = ()>>),
Shutdown(Delay, Delay, oneshot::Sender<bool>),
}
impl Future for Worker {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let state = mem::replace(&mut self.state, WorkerState::None);
match state {
WorkerState::Unavailable(mut conns) => {
match self.check_readiness() {
Ok(true) => {
self.state = WorkerState::Available;
// process requests from wait queue
while let Some(msg) = conns.pop() {
match self.check_readiness() {
Ok(true) => {
let guard = self.conns.get();
spawn(
self.services[msg.handler.0]
.call(ServerMessage::Connect(msg.io))
.map(|val| {
drop(guard);
val
}),
)
}
Ok(false) => {
self.state = WorkerState::Unavailable(conns);
return self.poll();
}
Err(idx) => {
self.state = WorkerState::Restarting(
idx,
self.factories[idx].create(),
);
return self.poll();
}
}
}
self.availability.set(true);
return self.poll();
}
Ok(false) => {
self.state = WorkerState::Unavailable(conns);
return Ok(Async::NotReady);
}
Err(idx) => {
self.state = WorkerState::Restarting(idx, self.factories[idx].create());
return self.poll();
}
}
}
WorkerState::Restarting(idx, mut fut) => {
match fut.poll() {
Ok(Async::Ready(service)) => {
trace!("Service has been restarted");
self.services[idx] = service;
self.state = WorkerState::Unavailable(Vec::new());
}
Ok(Async::NotReady) => {
self.state = WorkerState::Restarting(idx, fut);
return Ok(Async::NotReady);
}
Err(_) => {
panic!("Can not restart service");
}
}
return self.poll();
}
WorkerState::Shutdown(mut t1, mut t2, tx) => {
let num = num_connections();
if num == 0 {
let _ = tx.send(true);
Arbiter::current().do_send(StopArbiter(0));
return Ok(Async::Ready(()));
}
// check graceful timeout
match t2.poll().unwrap() {
Async::NotReady => (),
Async::Ready(_) => {
self.shutdown(true);
let _ = tx.send(false);
Arbiter::current().do_send(StopArbiter(0));
return Ok(Async::Ready(()));
}
}
// sleep for 1 second and then check again
match t1.poll().unwrap() {
Async::NotReady => (),
Async::Ready(_) => {
t1 = sleep(time::Duration::from_secs(1));
let _ = t1.poll();
}
}
self.state = WorkerState::Shutdown(t1, t2, tx);
return Ok(Async::NotReady);
}
WorkerState::Available => {
loop {
match self.rx.poll() {
// handle incoming tcp stream
Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => match self
.check_readiness()
{
Ok(true) => {
let guard = self.conns.get();
spawn(
self.services[msg.handler.0]
.call(ServerMessage::Connect(msg.io))
.map(|val| {
drop(guard);
val
}),
);
}
Ok(false) => {
self.availability.set(false);
self.state = WorkerState::Unavailable(vec![msg]);
}
Err(idx) => {
self.availability.set(false);
self.state =
WorkerState::Restarting(idx, self.factories[idx].create());
}
},
// `StopWorker` message handler
Ok(Async::Ready(Some(WorkerCommand::Stop(graceful, tx)))) => {
self.availability.set(false);
let num = num_connections();
if num == 0 {
info!("Shutting down http worker, 0 connections");
let _ = tx.send(true);
return Ok(Async::Ready(()));
} else if let Some(dur) = graceful {
self.shutdown(false);
let num = num_connections();
if num != 0 {
info!("Graceful http worker shutdown, {} connections", num);
break Some(WorkerState::Shutdown(
sleep(time::Duration::from_secs(1)),
sleep(dur),
tx,
));
} else {
let _ = tx.send(true);
return Ok(Async::Ready(()));
}
} else {
info!("Force shutdown http worker, {} connections", num);
self.shutdown(true);
let _ = tx.send(false);
return Ok(Async::Ready(()));
}
}
Ok(Async::NotReady) => {
self.state = WorkerState::Available;
return Ok(Async::NotReady);
}
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
}
}
}
WorkerState::None => panic!(),
};
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }