diff --git a/examples/signals/Cargo.toml b/examples/signals/Cargo.toml index d5a6f9235..869dc66e7 100644 --- a/examples/signals/Cargo.toml +++ b/examples/signals/Cargo.toml @@ -11,7 +11,4 @@ path = "src/main.rs" env_logger = "*" futures = "0.1" actix = "^0.3.5" - -#actix-web = { git = "https://github.com/actix/actix-web.git" } - -actix-web = { path="../../", features=["signal"] } +actix-web = { git = "https://github.com/actix/actix-web.git", features=["signal"] } diff --git a/examples/signals/src/main.rs b/examples/signals/src/main.rs index 500af1a79..77b6b2f74 100644 --- a/examples/signals/src/main.rs +++ b/examples/signals/src/main.rs @@ -3,10 +3,22 @@ extern crate actix_web; extern crate futures; extern crate env_logger; +use actix::*; use actix_web::*; -use actix::Arbiter; use actix::actors::signal::{ProcessSignals, Subscribe}; +struct MyWebSocket; + +impl Actor for MyWebSocket { + type Context = HttpContext; +} + +impl StreamHandler for MyWebSocket {} +impl Handler for MyWebSocket { + fn handle(&mut self, _: ws::Message, _: &mut Self::Context) -> Response { + Self::empty() + } +} fn main() { ::std::env::set_var("RUST_LOG", "actix_web=info"); @@ -17,6 +29,7 @@ fn main() { Application::new() // enable logger .middleware(middleware::Logger::default()) + .resource("/ws/", |r| r.f(|req| ws::start(req, MyWebSocket))) .resource("/", |r| r.h(httpcodes::HTTPOk))}) .bind("127.0.0.1:8080").unwrap() .start(); diff --git a/guide/src/qs_3_5.md b/guide/src/qs_3_5.md index 7ed1ce7da..312668903 100644 --- a/guide/src/qs_3_5.md +++ b/guide/src/qs_3_5.md @@ -164,3 +164,73 @@ fn index(req: HttpRequest) -> HttpResponse { } # fn main() {} ``` + +## Graceful shutdown + +Actix http server support graceful shutdown. After receiving a stop signal, workers +have specific amount of time to finish serving requests. Workers still alive after the +timeout are force dropped. By default shutdown timeout sets to 30 seconds. +You can change this parameter with `HttpServer::shutdown_timeout()` method. + +You can send stop message to server with server address and specify if you what +graceful shutdown or not. `start()` or `spawn()` methods return address of the server. + +```rust +# extern crate futures; +# extern crate actix; +# extern crate actix_web; +# use futures::Future; +use actix_web::*; + +fn main() { + let addr = HttpServer::new( + || Application::new() + .resource("/", |r| r.h(httpcodes::HTTPOk))) + .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") + .shutdown_timeout(60) // <- Set shutdown timeout to 60 seconds + .spawn(); + + let _ = addr.call_fut( + dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. +} +``` + +It is possible to use unix signals on compatible OSs. "signal" feature needs to be enabled +in *Cargo.toml* for *actix-web* dependency. + +```toml +[dependencies] +actix-web = { git = "https://github.com/actix/actix-web", features=["signal"] } +``` + +Then you can subscribe your server to unix signals. Http server handles three signals: + +* *SIGINT* - Force shutdown workers +* *SIGTERM* - Graceful shutdown workers +* *SIGQUIT* - Force shutdown workers + +```rust,ignore +# extern crate futures; +# extern crate actix; +# extern crate actix_web; +use actix_web::*; +use actix::actors::signal::{ProcessSignals, Subscribe}; + +fn main() { + let sys = actix::System::new("signals"); + + let addr = HttpServer::new(|| { + Application::new() + .resource("/", |r| r.h(httpcodes::HTTPOk))}) + .bind("127.0.0.1:8080").unwrap() + .start(); + + // Subscribe to unix signals + let signals = Arbiter::system_registry().get::(); + signals.send(Subscribe(addr.subscriber())); + + println!("Started http server: 127.0.0.1:8080"); + # actix::Arbiter::system().send(actix::msgs::SystemExit(0)); + let _ = sys.run(); +} +``` diff --git a/src/channel.rs b/src/channel.rs index 576c043de..01c18a527 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,7 +1,6 @@ use std::rc::Rc; use std::net::SocketAddr; -use actix::dev::*; use bytes::Bytes; use futures::{Future, Poll, Async}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -71,6 +70,7 @@ impl HttpChannel pub(crate) fn new(h: Rc>, io: T, peer: Option, http2: bool) -> HttpChannel { + h.add_channel(); if http2 { HttpChannel { proto: Some(HttpProtocol::H2( @@ -89,12 +89,6 @@ impl HttpChannel } }*/ -impl Actor for HttpChannel - where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static -{ - type Context = Context; -} - impl Future for HttpChannel where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static { @@ -105,16 +99,27 @@ impl Future for HttpChannel match self.proto { Some(HttpProtocol::H1(ref mut h1)) => { match h1.poll() { - Ok(Async::Ready(h1::Http1Result::Done)) => - return Ok(Async::Ready(())), + Ok(Async::Ready(h1::Http1Result::Done)) => { + h1.settings().remove_channel(); + return Ok(Async::Ready(())) + } Ok(Async::Ready(h1::Http1Result::Switch)) => (), Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(_) => - return Err(()), + Err(_) => { + h1.settings().remove_channel(); + return Err(()) + } } } - Some(HttpProtocol::H2(ref mut h2)) => return h2.poll(), + Some(HttpProtocol::H2(ref mut h2)) => { + let result = h2.poll(); + match result { + Ok(Async::Ready(())) | Err(_) => h2.settings().remove_channel(), + _ => (), + } + return result + } None => unreachable!(), } diff --git a/src/h1.rs b/src/h1.rs index b3cfbf2bd..f49d0a2e7 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -89,6 +89,10 @@ impl Http1 keepalive_timer: None } } + pub fn settings(&self) -> &WorkerSettings { + self.settings.as_ref() + } + pub fn into_inner(self) -> (Rc>, T, Option, Bytes) { (self.settings, self.stream.into_inner(), self.addr, self.read_buf.freeze()) } diff --git a/src/h2.rs b/src/h2.rs index b3fdc5673..be8898038 100644 --- a/src/h2.rs +++ b/src/h2.rs @@ -64,6 +64,10 @@ impl Http2 } } + pub fn settings(&self) -> &WorkerSettings { + self.settings.as_ref() + } + pub fn poll(&mut self) -> Poll<(), ()> { // server if let State::Server(ref mut server) = self.state { diff --git a/src/server.rs b/src/server.rs index e57855943..b5c2e8184 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use actix::dev::*; use actix::System; -use futures::Stream; +use futures::{Future, Sink, Stream}; use futures::sync::mpsc; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::net::TcpStream; @@ -107,6 +107,7 @@ pub struct HttpServer sockets: HashMap, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, exit: bool, + shutdown_timeout: u16, } unsafe impl Sync for HttpServer where H: 'static {} @@ -151,6 +152,7 @@ impl HttpServer sockets: HashMap::new(), accept: Vec::new(), exit: false, + shutdown_timeout: 30, } } @@ -210,6 +212,17 @@ impl HttpServer self } + /// Timeout for graceful workers shutdown. + /// + /// After receiving a stop signal, workers have this much time to finish serving requests. + /// Workers still alive after the timeout are force dropped. + /// + /// By default shutdown timeout sets to 30 seconds. + pub fn shutdown_timeout(mut self, sec: u16) -> Self { + self.shutdown_timeout = sec; + self + } + /// Get addresses of bound sockets. pub fn addrs(&self) -> Vec { self.sockets.keys().cloned().collect() @@ -607,19 +620,42 @@ impl Handler for HttpServer let _ = item.1.send(Command::Stop); let _ = item.0.set_readiness(mio::Ready::readable()); } - ctx.stop(); // stop workers - let dur = if msg.graceful { Some(Duration::new(30, 0)) } else { None }; + let (tx, rx) = mpsc::channel(1); + + let dur = if msg.graceful { + Some(Duration::new(u64::from(self.shutdown_timeout), 0)) + } else { + None + }; for worker in &self.workers { - worker.send(StopWorker{graceful: dur}) + let tx2 = tx.clone(); + let fut = worker.call(self, StopWorker{graceful: dur}); + ActorFuture::then(fut, move |_, slf, _| { + slf.workers.pop(); + if slf.workers.is_empty() { + let _ = tx2.send(()); + + // we need to stop system if server was spawned + if slf.exit { + Arbiter::system().send(msgs::SystemExit(0)) + } + } + fut::ok(()) + }).spawn(ctx); } - // we need to stop system if server was spawned - if self.exit { - Arbiter::system().send(msgs::SystemExit(0)) + if !self.workers.is_empty() { + Self::async_reply( + rx.into_future().map(|_| ()).map_err(|_| ()).actfuture()) + } else { + // we need to stop system if server was spawned + if self.exit { + Arbiter::system().send(msgs::SystemExit(0)) + } + Self::empty() } - Self::empty() } } diff --git a/src/worker.rs b/src/worker.rs index 3072ccac7..29158924f 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,25 +1,27 @@ use std::{net, time}; use std::rc::Rc; -use std::cell::{RefCell, RefMut}; +use std::cell::{Cell, RefCell, RefMut}; +use futures::Future; +use futures::unsync::oneshot; use tokio_core::net::TcpStream; use tokio_core::reactor::Handle; use net2::TcpStreamExt; #[cfg(feature="tls")] -use futures::{future, Future}; +use futures::future; #[cfg(feature="tls")] use native_tls::TlsAcceptor; #[cfg(feature="tls")] use tokio_tls::TlsAcceptorExt; #[cfg(feature="alpn")] -use futures::{future, Future}; +use futures::future; #[cfg(feature="alpn")] use openssl::ssl::SslAcceptor; #[cfg(feature="alpn")] use tokio_openssl::SslAcceptorExt; -use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Response, StreamHandler}; +use actix::*; use actix::msgs::StopArbiter; use helpers; @@ -33,8 +35,10 @@ pub(crate) struct Conn { pub http2: bool, } -/// Stop worker +/// Stop worker message. Returns `true` on successful shutdown +/// and `false` if some connections still alive. #[derive(Message)] +#[rtype(bool)] pub(crate) struct StopWorker { pub graceful: Option, } @@ -45,6 +49,7 @@ pub(crate) struct WorkerSettings { keep_alive: u64, bytes: Rc, messages: Rc, + channels: Cell, } impl WorkerSettings { @@ -55,6 +60,7 @@ impl WorkerSettings { keep_alive: keep_alive.unwrap_or(0), bytes: Rc::new(helpers::SharedBytesPool::new()), messages: Rc::new(helpers::SharedMessagePool::new()), + channels: Cell::new(0), } } @@ -73,6 +79,17 @@ impl WorkerSettings { pub fn get_http_message(&self) -> helpers::SharedHttpMessage { helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages)) } + pub fn add_channel(&self) { + self.channels.set(self.channels.get()+1); + } + pub fn remove_channel(&self) { + let num = self.channels.get(); + if num > 0 { + self.channels.set(num-1); + } else { + error!("Number of removed channels is bigger than added channel. Bug in actix-web"); + } + } } /// Http worker @@ -100,6 +117,24 @@ impl Worker { helpers::update_date(); ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); } + + fn shutdown_timeout(&self, ctx: &mut Context, + tx: oneshot::Sender, dur: time::Duration) { + // sleep for 1 second and then check again + ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { + let num = slf.h.channels.get(); + if num == 0 { + let _ = tx.send(true); + Arbiter::arbiter().send(StopArbiter(0)); + } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { + slf.shutdown_timeout(ctx, tx, d); + } else { + info!("Force shutdown http worker, {} connections", num); + let _ = tx.send(false); + Arbiter::arbiter().send(StopArbiter(0)); + } + }); + } } impl Actor for Worker { @@ -133,10 +168,21 @@ impl Handler> for Worker impl Handler for Worker where H: HttpHandler + 'static, { - fn handle(&mut self, _: StopWorker, _: &mut Context) -> Response + fn handle(&mut self, msg: StopWorker, ctx: &mut Context) -> Response { - Arbiter::arbiter().send(StopArbiter(0)); - Self::empty() + let num = self.h.channels.get(); + if num == 0 { + info!("Shutting down http worker, 0 connections"); + Self::reply(true) + } else if let Some(dur) = msg.graceful { + info!("Graceful http worker shutdown, {} connections", num); + let (tx, rx) = oneshot::channel(); + self.shutdown_timeout(ctx, tx, dur); + Self::async_reply(rx.map_err(|_| ()).actfuture()) + } else { + info!("Force shutdown http worker, {} connections", num); + Self::reply(false) + } } }