diff --git a/Cargo.toml b/Cargo.toml index ad01691e0..484e1172b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,6 @@ actix = "0.7.0" log = "0.4" num_cpus = "1.0" -failure = "^0.1.2" # io mio = "^0.6.13" diff --git a/src/lib.rs b/src/lib.rs index 324c30841..ca4a65736 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,8 +18,6 @@ #[macro_use] extern crate log; extern crate bytes; -// #[macro_use] -extern crate failure; #[macro_use] extern crate futures; extern crate mio; diff --git a/src/server/accept.rs b/src/server/accept.rs index 681e988c0..09b1f16f7 100644 --- a/src/server/accept.rs +++ b/src/server/accept.rs @@ -367,11 +367,11 @@ impl Accept { while !self.workers.is_empty() { match self.workers[self.next].send(msg) { Ok(_) => (), - Err(err) => { + Err(tmp) => { let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( self.workers[self.next].idx, )); - msg = err.into_inner(); + msg = tmp; self.workers.swap_remove(self.next); if self.workers.is_empty() { error!("No workers"); @@ -395,11 +395,11 @@ impl Accept { self.next = (self.next + 1) % self.workers.len(); return; } - Err(err) => { + Err(tmp) => { let _ = self.srv.unbounded_send(ServerCommand::WorkerDied( self.workers[self.next].idx, )); - msg = err.into_inner(); + msg = tmp; self.workers.swap_remove(self.next); if self.workers.is_empty() { error!("No workers"); diff --git a/src/server/server.rs b/src/server/server.rs index fb3c6fcd6..3afcbe736 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -7,13 +7,13 @@ use net2::TcpBuilder; use num_cpus; use actix::{ - actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, - Response, StreamHandler, System, WrapFuture, + actors::signal, fut, msgs::Execute, Actor, ActorFuture, Addr, Arbiter, AsyncContext, + Context, Handler, Response, StreamHandler, System, WrapFuture, }; use super::accept::{AcceptLoop, AcceptNotify, Command}; use super::services::{InternalServerServiceFactory, ServerNewService, ServerServiceFactory}; -use super::worker::{self, Conn, StopWorker, Worker, WorkerAvailability, WorkerClient}; +use super::worker::{self, Worker, WorkerAvailability, WorkerClient}; use super::{PauseServer, ResumeServer, StopServer, Token}; pub(crate) enum ServerCommand { @@ -23,7 +23,7 @@ pub(crate) enum ServerCommand { /// Server pub struct Server { threads: usize, - workers: Vec<(usize, Addr)>, + workers: Vec<(usize, WorkerClient)>, services: Vec>, sockets: Vec<(Token, net::TcpListener)>, accept: AcceptLoop, @@ -183,9 +183,9 @@ impl Server { // start workers let mut workers = Vec::new(); for idx in 0..self.threads { - let (addr, worker) = self.start_worker(idx, self.accept.get_notify()); - workers.push(worker); - self.workers.push((idx, addr)); + let worker = self.start_worker(idx, self.accept.get_notify()); + workers.push(worker.clone()); + self.workers.push((idx, worker)); } // start accept thread @@ -222,19 +222,19 @@ impl Server { } } - fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr, WorkerClient) { - let (tx, rx) = unbounded::(); + fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient { + let (tx, rx) = unbounded(); let avail = WorkerAvailability::new(notify); let worker = WorkerClient::new(idx, tx, avail.clone()); let services: Vec> = self.services.iter().map(|v| v.clone_factory()).collect(); - let addr = Arbiter::start(move |ctx: &mut Context<_>| { - ctx.add_message_stream(rx); - Worker::new(ctx, services, avail) - }); + Arbiter::new(format!("actix-worker-{}", idx)).do_send(Execute::new(|| { + Worker::start(rx, services, avail); + Ok::<_, ()>(()) + })); - (addr, worker) + worker } } @@ -306,7 +306,7 @@ impl Handler for Server { ctx.spawn( worker .1 - .send(StopWorker { graceful: dur }) + .stop(dur) .into_actor(self) .then(move |_, slf, ctx| { slf.workers.pop(); @@ -370,8 +370,8 @@ impl StreamHandler for Server { break; } - let (addr, worker) = self.start_worker(new_idx, self.accept.get_notify()); - self.workers.push((new_idx, addr)); + let worker = self.start_worker(new_idx, self.accept.get_notify()); + self.workers.push((new_idx, worker.clone())); self.accept.send(Command::Worker(worker)); } } diff --git a/src/server/worker.rs b/src/server/worker.rs index 985f68910..7d5a1fc06 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -2,24 +2,29 @@ use std::cell::Cell; use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; -use std::{net, time}; +use std::{mem, net, time}; -use futures::sync::mpsc::{SendError, UnboundedSender}; +use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use futures::sync::oneshot; use futures::task::AtomicTask; -use futures::{future, Async, Future, Poll}; +use futures::{future, Async, Future, Poll, Stream}; use tokio_current_thread::spawn; +use tokio_timer::{sleep, Delay}; use actix::msgs::StopArbiter; -use actix::{ - fut, Actor, ActorContext, ActorFuture, Arbiter, AsyncContext, Context, Handler, Message, - Response, WrapFuture, -}; +use actix::{Arbiter, Message}; use super::accept::AcceptNotify; use super::services::{BoxedServerService, InternalServerServiceFactory, ServerMessage}; use super::Token; +pub(crate) enum WorkerCommand { + Message(Conn), + /// Stop worker message. Returns `true` on successful shutdown + /// and `false` if some connections still alive. + Stop(Option, oneshot::Sender), +} + #[derive(Debug, Message)] pub(crate) struct Conn { pub io: net::TcpStream, @@ -52,22 +57,35 @@ thread_local! { #[derive(Clone)] pub(crate) struct WorkerClient { pub idx: usize, - tx: UnboundedSender, + tx: UnboundedSender, avail: WorkerAvailability, } impl WorkerClient { - pub fn new(idx: usize, tx: UnboundedSender, avail: WorkerAvailability) -> Self { + pub fn new( + idx: usize, tx: UnboundedSender, avail: WorkerAvailability, + ) -> Self { WorkerClient { idx, tx, avail } } - pub fn send(&self, msg: Conn) -> Result<(), SendError> { - self.tx.unbounded_send(msg) + pub fn send(&self, msg: Conn) -> Result<(), Conn> { + self.tx + .unbounded_send(WorkerCommand::Message(msg)) + .map_err(|e| match e.into_inner() { + WorkerCommand::Message(msg) => msg, + _ => panic!(), + }) } pub fn available(&self) -> bool { self.avail.available() } + + pub fn stop(&self, graceful: Option) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send(WorkerCommand::Stop(graceful, tx)); + rx + } } #[derive(Clone)] @@ -96,69 +114,48 @@ impl WorkerAvailability { } } -/// Stop worker message. Returns `true` on successful shutdown -/// and `false` if some connections still alive. -pub(crate) struct StopWorker { - pub graceful: Option, -} - -impl Message for StopWorker { - type Result = Result; -} - /// Http worker /// /// Worker accepts Socket objects via unbounded channel and start requests /// processing. pub(crate) struct Worker { + rx: UnboundedReceiver, services: Vec, availability: WorkerAvailability, conns: Connections, factories: Vec>, -} - -impl Actor for Worker { - type Context = Context; + state: WorkerState, } impl Worker { - pub(crate) fn new( - ctx: &mut Context, factories: Vec>, - availability: WorkerAvailability, - ) -> Self { + pub(crate) fn start( + rx: UnboundedReceiver, + factories: Vec>, availability: WorkerAvailability, + ) { availability.set(false); - let wrk = MAX_CONNS_COUNTER.with(|conns| Worker { + let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker { + rx, availability, factories, services: Vec::new(), conns: conns.clone(), + state: WorkerState::Unavailable(Vec::new()), }); let mut fut = Vec::new(); for factory in &wrk.factories { fut.push(factory.create()); } - ctx.wait( + spawn( future::join_all(fut) - .into_actor(&wrk) - .map_err(|e, _, ctx| { + .map_err(|e| { error!("Can not start worker: {:?}", e); Arbiter::current().do_send(StopArbiter(0)); - ctx.stop(); - }).and_then(|services, act, ctx| { - act.services.extend(services); - let mut readiness = CheckReadiness { - avail: false, - idx: 0, - fut: None, - }; - let _ = readiness.poll(act, ctx); - ctx.spawn(readiness); - fut::ok(()) + }).and_then(move |services| { + wrk.services.extend(services); + wrk }), ); - - wrk } fn shutdown(&mut self, force: bool) { @@ -173,121 +170,203 @@ impl Worker { } } - fn shutdown_timeout( - &mut self, ctx: &mut Context, tx: oneshot::Sender, dur: time::Duration, - ) { - // sleep for 1 second and then check again - ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { - let num = num_connections(); - if num == 0 { - let _ = tx.send(true); - Arbiter::current().do_send(StopArbiter(0)); - } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { - slf.shutdown_timeout(ctx, tx, d); - } else { - info!("Force shutdown http worker, {} connections", num); - slf.shutdown(true); - let _ = tx.send(false); - Arbiter::current().do_send(StopArbiter(0)); - } - }); - } -} - -impl Handler for Worker { - type Result = (); - - fn handle(&mut self, msg: Conn, _: &mut Context) { - let guard = self.conns.get(); - spawn( - self.services[msg.handler.0] - .call(ServerMessage::Connect(msg.io)) - .map(|val| { - drop(guard); - val - }), - ) - } -} - -/// `StopWorker` message handler -impl Handler for Worker { - type Result = Response; - - fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Self::Result { - let num = num_connections(); - if num == 0 { - info!("Shutting down http worker, 0 connections"); - Response::reply(Ok(true)) - } else if let Some(dur) = msg.graceful { - self.shutdown(false); - let (tx, rx) = oneshot::channel(); - let num = num_connections(); - if num != 0 { - info!("Graceful http worker shutdown, {} connections", num); - self.shutdown_timeout(ctx, tx, dur); - Response::reply(Ok(true)) - } else { - Response::async(rx.map_err(|_| ())) + fn check_readiness(&mut self) -> Result { + let mut ready = self.conns.check(); + let mut failed = None; + for (idx, service) in self.services.iter_mut().enumerate() { + match service.poll_ready() { + Ok(Async::Ready(_)) => (), + Ok(Async::NotReady) => ready = false, + Err(_) => { + error!("Service readiness check returned error, restarting"); + failed = Some(idx); + } } + } + if let Some(idx) = failed { + Err(idx) } else { - info!("Force shutdown http worker, {} connections", num); - self.shutdown(true); - Response::reply(Ok(false)) + Ok(ready) } } } -struct CheckReadiness { - avail: bool, - idx: usize, - fut: Option>>, +enum WorkerState { + None, + Available, + Unavailable(Vec), + Restarting(usize, Box>), + Shutdown(Delay, Delay, oneshot::Sender), } -impl ActorFuture for CheckReadiness { +impl Future for Worker { type Item = (); type Error = (); - type Actor = Worker; - fn poll(&mut self, act: &mut Worker, ctx: &mut Context) -> Poll<(), ()> { - if self.fut.is_some() { - match self.fut.as_mut().unwrap().poll() { - Ok(Async::Ready(service)) => { - trace!("Service has been restarted"); - act.services[self.idx] = service; - self.fut.take(); - } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(_) => { - panic!("Can not restart service"); - } - } - } + fn poll(&mut self) -> Poll { + let state = mem::replace(&mut self.state, WorkerState::None); - let mut ready = act.conns.check(); - if ready { - // check if service is restarting - let mut failed = None; - for (idx, service) in act.services.iter_mut().enumerate() { - match service.poll_ready() { - Ok(Async::Ready(_)) => (), - Ok(Async::NotReady) => ready = false, - Err(_) => { - error!("Service readiness check returned error, restarting"); - failed = Some(idx); + match state { + WorkerState::Unavailable(mut conns) => { + match self.check_readiness() { + Ok(true) => { + self.state = WorkerState::Available; + + // process requests from wait queue + while let Some(msg) = conns.pop() { + match self.check_readiness() { + Ok(true) => { + let guard = self.conns.get(); + spawn( + self.services[msg.handler.0] + .call(ServerMessage::Connect(msg.io)) + .map(|val| { + drop(guard); + val + }), + ) + } + Ok(false) => { + self.state = WorkerState::Unavailable(conns); + return self.poll(); + } + Err(idx) => { + self.state = WorkerState::Restarting( + idx, + self.factories[idx].create(), + ); + return self.poll(); + } + } + } + self.availability.set(true); + return self.poll(); + } + Ok(false) => { + self.state = WorkerState::Unavailable(conns); + return Ok(Async::NotReady); + } + Err(idx) => { + self.state = WorkerState::Restarting(idx, self.factories[idx].create()); + return self.poll(); } } } - if let Some(idx) = failed { - self.idx = idx; - self.fut = Some(act.factories[idx].create()); - return self.poll(act, ctx); + WorkerState::Restarting(idx, mut fut) => { + match fut.poll() { + Ok(Async::Ready(service)) => { + trace!("Service has been restarted"); + self.services[idx] = service; + self.state = WorkerState::Unavailable(Vec::new()); + } + Ok(Async::NotReady) => { + self.state = WorkerState::Restarting(idx, fut); + return Ok(Async::NotReady); + } + Err(_) => { + panic!("Can not restart service"); + } + } + return self.poll(); } - } - if self.avail != ready { - self.avail = ready; - act.availability.set(ready); - } + WorkerState::Shutdown(mut t1, mut t2, tx) => { + let num = num_connections(); + if num == 0 { + let _ = tx.send(true); + Arbiter::current().do_send(StopArbiter(0)); + return Ok(Async::Ready(())); + } + + // check graceful timeout + match t2.poll().unwrap() { + Async::NotReady => (), + Async::Ready(_) => { + self.shutdown(true); + let _ = tx.send(false); + Arbiter::current().do_send(StopArbiter(0)); + return Ok(Async::Ready(())); + } + } + + // sleep for 1 second and then check again + match t1.poll().unwrap() { + Async::NotReady => (), + Async::Ready(_) => { + t1 = sleep(time::Duration::from_secs(1)); + let _ = t1.poll(); + } + } + self.state = WorkerState::Shutdown(t1, t2, tx); + return Ok(Async::NotReady); + } + WorkerState::Available => { + loop { + match self.rx.poll() { + // handle incoming tcp stream + Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => match self + .check_readiness() + { + Ok(true) => { + let guard = self.conns.get(); + spawn( + self.services[msg.handler.0] + .call(ServerMessage::Connect(msg.io)) + .map(|val| { + drop(guard); + val + }), + ); + } + Ok(false) => { + self.availability.set(false); + self.state = WorkerState::Unavailable(vec![msg]); + } + Err(idx) => { + self.availability.set(false); + self.state = + WorkerState::Restarting(idx, self.factories[idx].create()); + } + }, + // `StopWorker` message handler + Ok(Async::Ready(Some(WorkerCommand::Stop(graceful, tx)))) => { + self.availability.set(false); + let num = num_connections(); + if num == 0 { + info!("Shutting down http worker, 0 connections"); + let _ = tx.send(true); + return Ok(Async::Ready(())); + } else if let Some(dur) = graceful { + self.shutdown(false); + let num = num_connections(); + if num != 0 { + info!("Graceful http worker shutdown, {} connections", num); + break Some(WorkerState::Shutdown( + sleep(time::Duration::from_secs(1)), + sleep(dur), + tx, + )); + } else { + let _ = tx.send(true); + return Ok(Async::Ready(())); + } + } else { + info!("Force shutdown http worker, {} connections", num); + self.shutdown(true); + let _ = tx.send(false); + return Ok(Async::Ready(())); + } + } + Ok(Async::NotReady) => { + self.state = WorkerState::Available; + return Ok(Async::NotReady); + } + Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), + } + } + } + WorkerState::None => panic!(), + }; + Ok(Async::NotReady) } }