1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-31 12:28:48 +00:00

restart failed services; fix readiness check

This commit is contained in:
Nikolay Kim 2018-09-12 08:25:14 -07:00
parent 3395512040
commit f66eec00e7

View file

@ -114,6 +114,7 @@ pub(crate) struct Worker {
services: Vec<BoxedServerService>, services: Vec<BoxedServerService>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Connections, conns: Connections,
factories: Vec<Box<InternalServerServiceFactory>>,
} }
impl Actor for Worker { impl Actor for Worker {
@ -122,17 +123,23 @@ impl Actor for Worker {
impl Worker { impl Worker {
pub(crate) fn new( pub(crate) fn new(
ctx: &mut Context<Self>, services: Vec<Box<InternalServerServiceFactory>>, ctx: &mut Context<Self>, factories: Vec<Box<InternalServerServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
) -> Self { ) -> Self {
availability.set(false);
let wrk = MAX_CONNS_COUNTER.with(|conns| Worker { let wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
availability, availability,
factories,
services: Vec::new(), services: Vec::new(),
conns: conns.clone(), conns: conns.clone(),
}); });
let mut fut = Vec::new();
for factory in &wrk.factories {
fut.push(factory.create());
}
ctx.wait( ctx.wait(
future::join_all(services.into_iter().map(|s| s.create())) future::join_all(fut)
.into_actor(&wrk) .into_actor(&wrk)
.map_err(|e, _, ctx| { .map_err(|e, _, ctx| {
error!("Can not start worker: {:?}", e); error!("Can not start worker: {:?}", e);
@ -140,8 +147,13 @@ impl Worker {
ctx.stop(); ctx.stop();
}).and_then(|services, act, ctx| { }).and_then(|services, act, ctx| {
act.services.extend(services); act.services.extend(services);
act.availability.set(true); let mut readiness = CheckReadiness {
ctx.spawn(CheckReadiness(true)); avail: true,
idx: 0,
fut: None,
};
let _ = readiness.poll(act, ctx);
ctx.spawn(readiness);
fut::ok(()) fut::ok(())
}), }),
); );
@ -226,26 +238,55 @@ impl Handler<StopWorker> for Worker {
} }
} }
struct CheckReadiness(bool); struct CheckReadiness {
avail: bool,
idx: usize,
fut: Option<Box<Future<Item = BoxedServerService, Error = ()>>>,
}
impl ActorFuture for CheckReadiness { impl ActorFuture for CheckReadiness {
type Item = (); type Item = ();
type Error = (); type Error = ();
type Actor = Worker; type Actor = Worker;
fn poll(&mut self, act: &mut Worker, _: &mut Context<Worker>) -> Poll<(), ()> { fn poll(&mut self, act: &mut Worker, ctx: &mut Context<Worker>) -> Poll<(), ()> {
let mut val = act.conns.check(); if self.fut.is_some() {
if val { match self.fut.as_mut().unwrap().poll() {
for service in &mut act.services { Ok(Async::Ready(service)) => {
if let Ok(Async::NotReady) = service.poll_ready() { trace!("Service has been restarted");
val = false; act.services[self.idx] = service;
break; self.fut.take();
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => {
panic!("Can not restart service");
} }
} }
} }
if self.0 != val {
self.0 = val; let mut ready = act.conns.check();
act.availability.set(val); if ready {
// check if service is restarting
let mut failed = None;
for (idx, service) in act.services.iter_mut().enumerate() {
match service.poll_ready() {
Ok(Async::Ready(_)) => (),
Ok(Async::NotReady) => ready = false,
Err(_) => {
error!("Service readiness check returned error, restarting");
failed = Some(idx);
}
}
}
if let Some(idx) = failed {
self.idx = idx;
self.fut = Some(act.factories[idx].create());
return self.poll(act, ctx);
}
}
if self.avail != ready {
self.avail = ready;
act.availability.set(ready);
} }
Ok(Async::NotReady) Ok(Async::NotReady)
} }