From 3ed9e872ada2eb96a33bf5ef7d43053108209a86 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 5 Jan 2018 16:32:36 -0800 Subject: [PATCH] subscriber to os signals automatically --- Cargo.toml | 1 - README.md | 6 +-- examples/basics/src/main.rs | 6 +-- examples/diesel/src/main.rs | 5 -- examples/json/src/main.rs | 5 -- examples/multipart/src/main.rs | 5 -- examples/signals/Cargo.toml | 15 ------ examples/signals/README.md | 17 ------- examples/signals/src/main.rs | 45 ----------------- examples/state/src/main.rs | 5 -- examples/template_tera/src/main.rs | 5 -- examples/tls/src/main.rs | 6 +-- examples/websocket-chat/src/main.rs | 5 -- examples/websocket/src/main.rs | 5 -- guide/src/qs_2.md | 15 ++---- guide/src/qs_3_5.md | 78 ++++++++--------------------- src/server.rs | 71 +++++++++++++++++++------- 17 files changed, 81 insertions(+), 214 deletions(-) delete mode 100644 examples/signals/Cargo.toml delete mode 100644 examples/signals/README.md delete mode 100644 examples/signals/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 585a32116..f3c83ccc9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,7 +106,6 @@ members = [ "examples/json", "examples/hello-world", "examples/multipart", - "examples/signals", "examples/state", "examples/template_tera", "examples/tls", diff --git a/README.md b/README.md index f861e9601..124fb9e83 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,6 @@ Actix web is a small, fast, down-to-earth, open source rust web framework. ```rust,ignore -extern crate actix; extern crate actix_web; use actix_web::*; @@ -12,14 +11,11 @@ fn index(req: HttpRequest) -> String { } fn main() { - let sys = actix::System::new("readme"); HttpServer::new( || Application::new() .resource("/{name}", |r| r.f(index))) .bind("127.0.0.1:8080").unwrap() - .start(); - - sys.run(); + .run(); } ``` diff --git a/examples/basics/src/main.rs b/examples/basics/src/main.rs index b0674c8dd..2c7b714f4 100644 --- a/examples/basics/src/main.rs +++ b/examples/basics/src/main.rs @@ -10,7 +10,7 @@ use futures::Stream; use actix_web::*; use actix_web::middleware::RequestSession; use futures::future::{FutureResult, result}; -use actix::actors::signal::{ProcessSignals, Subscribe}; + /// simple handler fn index(mut req: HttpRequest) -> Result { @@ -94,10 +94,6 @@ fn main() { .bind("0.0.0.0:8080").unwrap() .start(); - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(addr.subscriber())); - println!("Starting http server: 127.0.0.1:8080"); let _ = sys.run(); } diff --git a/examples/diesel/src/main.rs b/examples/diesel/src/main.rs index 6c3170ede..4c4bc4cda 100644 --- a/examples/diesel/src/main.rs +++ b/examples/diesel/src/main.rs @@ -18,7 +18,6 @@ extern crate env_logger; use actix::*; use actix_web::*; -use actix::actors::signal::{ProcessSignals, Subscribe}; use diesel::prelude::*; use futures::future::Future; @@ -69,10 +68,6 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(_addr.subscriber())); - println!("Started http server: 127.0.0.1:8080"); let _ = sys.run(); } diff --git a/examples/json/src/main.rs b/examples/json/src/main.rs index 462826393..719d74853 100644 --- a/examples/json/src/main.rs +++ b/examples/json/src/main.rs @@ -8,7 +8,6 @@ extern crate serde_json; #[macro_use] extern crate json; use actix_web::*; -use actix::actors::signal::{ProcessSignals, Subscribe}; use bytes::BytesMut; use futures::{Future, Stream}; @@ -95,10 +94,6 @@ fn main() { .shutdown_timeout(1) .start(); - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(addr.subscriber())); - println!("Started http server: 127.0.0.1:8080"); let _ = sys.run(); } diff --git a/examples/multipart/src/main.rs b/examples/multipart/src/main.rs index 965cc82c6..7da6145a9 100644 --- a/examples/multipart/src/main.rs +++ b/examples/multipart/src/main.rs @@ -6,7 +6,6 @@ extern crate futures; use actix::*; use actix_web::*; -use actix::actors::signal::{ProcessSignals, Subscribe}; use futures::{Future, Stream}; use futures::future::{result, Either}; @@ -55,10 +54,6 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(addr.subscriber())); - println!("Starting http server: 127.0.0.1:8080"); let _ = sys.run(); } diff --git a/examples/signals/Cargo.toml b/examples/signals/Cargo.toml deleted file mode 100644 index 1e4006e35..000000000 --- a/examples/signals/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "signals" -version = "0.1.0" -authors = ["Nikolay Kim "] -workspace = "../.." - -[[bin]] -name = "server" -path = "src/main.rs" - -[dependencies] -env_logger = "*" -futures = "0.1" -actix = "0.4" -actix-web = { path = "../../" } diff --git a/examples/signals/README.md b/examples/signals/README.md deleted file mode 100644 index 368182e7f..000000000 --- a/examples/signals/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Signals - -This example shows how to handle Unix signals and properly stop http server. This example does not work with Windows. - -## Usage - -```bash -cd actix-web/examples/signal -cargo run (or ``cargo watch -x run``) -# Started http server: 127.0.0.1:8080 -# CTRL+C -# INFO:actix_web::server: SIGINT received, exiting -# INFO:actix_web::worker: Shutting down http worker, 0 connections -# INFO:actix_web::worker: Shutting down http worker, 0 connections -# INFO:actix_web::worker: Shutting down http worker, 0 connections -# INFO:actix_web::worker: Shutting down http worker, 0 connections -``` \ No newline at end of file diff --git a/examples/signals/src/main.rs b/examples/signals/src/main.rs deleted file mode 100644 index 92ceb5a8a..000000000 --- a/examples/signals/src/main.rs +++ /dev/null @@ -1,45 +0,0 @@ -extern crate actix; -extern crate actix_web; -extern crate futures; -extern crate env_logger; - -use actix::*; -use actix_web::*; -use actix::actors::signal::{ProcessSignals, Subscribe}; - -struct MyWebSocket; - -impl Actor for MyWebSocket { - type Context = HttpContext; -} - -impl StreamHandler for MyWebSocket {} -impl Handler for MyWebSocket { - type Result = (); - - fn handle(&mut self, _: ws::Message, _: &mut Self::Context) { - {} - } -} - -fn main() { - ::std::env::set_var("RUST_LOG", "actix_web=info"); - let _ = env_logger::init(); - let sys = actix::System::new("signals-example"); - - let addr = HttpServer::new(|| { - Application::new() - // enable logger - .middleware(middleware::Logger::default()) - .resource("/ws/", |r| r.f(|req| ws::start(req, MyWebSocket))) - .resource("/", |r| r.h(httpcodes::HTTPOk))}) - .bind("127.0.0.1:8080").unwrap() - .start(); - - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(addr.subscriber())); - - println!("Started http server: 127.0.0.1:8080"); - let _ = sys.run(); -} diff --git a/examples/state/src/main.rs b/examples/state/src/main.rs index e60e7d706..6c247329c 100644 --- a/examples/state/src/main.rs +++ b/examples/state/src/main.rs @@ -11,7 +11,6 @@ use std::cell::Cell; use actix::*; use actix_web::*; -use actix::actors::signal::{ProcessSignals, Subscribe}; /// Application state struct AppState { @@ -74,10 +73,6 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(addr.subscriber())); - println!("Started http server: 127.0.0.1:8080"); let _ = sys.run(); } diff --git a/examples/template_tera/src/main.rs b/examples/template_tera/src/main.rs index 28706217f..17a14c70c 100644 --- a/examples/template_tera/src/main.rs +++ b/examples/template_tera/src/main.rs @@ -5,7 +5,6 @@ extern crate env_logger; extern crate tera; use actix_web::*; -use actix::actors::signal::{ProcessSignals, Subscribe}; struct State { @@ -43,10 +42,6 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(addr.subscriber())); - println!("Started http server: 127.0.0.1:8080"); let _ = sys.run(); } diff --git a/examples/tls/src/main.rs b/examples/tls/src/main.rs index ad0d4b23b..15cfcc666 100644 --- a/examples/tls/src/main.rs +++ b/examples/tls/src/main.rs @@ -7,7 +7,7 @@ use std::fs::File; use std::io::Read; use actix_web::*; -use actix::actors::signal::{ProcessSignals, Subscribe}; + /// somple handle fn index(req: HttpRequest) -> Result { @@ -46,10 +46,6 @@ fn main() { .bind("127.0.0.1:8443").unwrap() .start_ssl(&pkcs12).unwrap(); - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(addr.subscriber())); - println!("Started http server: 127.0.0.1:8443"); let _ = sys.run(); } diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index 39e7978f5..76cc29e99 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -18,7 +18,6 @@ use std::time::Instant; use actix::*; use actix_web::*; -use actix::actors::signal::{ProcessSignals, Subscribe}; mod codec; mod server; @@ -213,10 +212,6 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(addr.subscriber())); - println!("Started http server: 127.0.0.1:8080"); let _ = sys.run(); } diff --git a/examples/websocket/src/main.rs b/examples/websocket/src/main.rs index b7433203f..ba88cf211 100644 --- a/examples/websocket/src/main.rs +++ b/examples/websocket/src/main.rs @@ -10,7 +10,6 @@ extern crate env_logger; use actix::*; use actix_web::*; -use actix::actors::signal::{ProcessSignals, Subscribe}; /// do websocket handshake and start `MyWebSocket` actor fn ws_index(r: HttpRequest) -> Result { @@ -71,10 +70,6 @@ fn main() { .bind("127.0.0.1:8080").unwrap() .start(); - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(_addr.subscriber())); - println!("Started http server: 127.0.0.1:8080"); let _ = sys.run(); } diff --git a/guide/src/qs_2.md b/guide/src/qs_2.md index c42eaac1c..3c347ce65 100644 --- a/guide/src/qs_2.md +++ b/guide/src/qs_2.md @@ -61,7 +61,7 @@ connections. Server accepts function that should return `HttpHandler` instance: || Application::new() .resource("/", |r| r.f(index))) .bind("127.0.0.1:8088")? - .start(); + .run(); ``` That's it. Now, compile and run the program with cargo run. @@ -69,9 +69,8 @@ Head over to ``http://localhost:8088/`` to see the results. Here is full source of main.rs file: -```rust -extern crate actix; -extern crate actix_web; +```rust,ignore +# extern crate actix_web; use actix_web::*; fn index(req: HttpRequest) -> &'static str { @@ -79,17 +78,11 @@ fn index(req: HttpRequest) -> &'static str { } fn main() { - let sys = actix::System::new("example"); - HttpServer::new( || Application::new() .resource("/", |r| r.f(index))) .bind("127.0.0.1:8088").expect("Can not bind to 127.0.0.1:8088") - .start(); - - println!("Started http server: 127.0.0.1:8088"); -# actix::Arbiter::system().send(actix::msgs::SystemExit(0)); - let _ = sys.run(); + .run(); } ``` diff --git a/guide/src/qs_3_5.md b/guide/src/qs_3_5.md index b9f002c87..580f029d9 100644 --- a/guide/src/qs_3_5.md +++ b/guide/src/qs_3_5.md @@ -47,15 +47,27 @@ address of the started http server. Actix http server accept several messages: # extern crate actix_web; # use futures::Future; use actix_web::*; +use std::thread; +use std::sync::mpsc; fn main() { - let addr = HttpServer::new( - || Application::new() - .resource("/", |r| r.h(httpcodes::HTTPOk))) - .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") - .spawn(); + let (tx, rx) = mpsc::channel(); + + thread::spawn(move || { + let sys = actix::System::new("http-server"); + let addr = HttpServer::new( + || Application::new() + .resource("/", |r| r.h(httpcodes::HTTPOk))) + .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") + .shutdown_timeout(60) // <- Set shutdown timeout to 60 seconds + .start(); + let _ = tx.send(addr); + let _ = sys.run(); + }); - let _ = addr.call_fut(dev::StopServer{graceful: true}).wait(); // <- Send `StopServer` message to server. + let addr = rx.recv().unwrap(); + let _ = addr.call_fut( + dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. } ``` @@ -173,59 +185,13 @@ timeout are force dropped. By default shutdown timeout sets to 30 seconds. You can change this parameter with `HttpServer::shutdown_timeout()` method. You can send stop message to server with server address and specify if you what -graceful shutdown or not. `start()` or `spawn()` methods return address of the server. +graceful shutdown or not. `start()` methods return address of the server. -```rust -# extern crate futures; -# extern crate actix; -# extern crate actix_web; -# use futures::Future; -use actix_web::*; - -fn main() { - let addr = HttpServer::new( - || Application::new() - .resource("/", |r| r.h(httpcodes::HTTPOk))) - .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") - .shutdown_timeout(60) // <- Set shutdown timeout to 60 seconds - .spawn(); - - let _ = addr.call_fut( - dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. -} -``` - -It is possible to use signals. *CTRL-C* is available on all OSs, other signals are -available on unix systems. - -Then you can subscribe your server to unix signals. Http server handles three signals: +Http server handles several OS signals. *CTRL-C* is available on all OSs, +other signals are available on unix systems. * *SIGINT* - Force shutdown workers * *SIGTERM* - Graceful shutdown workers * *SIGQUIT* - Force shutdown workers -```rust,ignore -# extern crate futures; -# extern crate actix; -# extern crate actix_web; -use actix_web::*; -use actix::actors::signal::{ProcessSignals, Subscribe}; - -fn main() { - let sys = actix::System::new("signals"); - - let addr = HttpServer::new(|| { - Application::new() - .resource("/", |r| r.h(httpcodes::HTTPOk))}) - .bind("127.0.0.1:8080").unwrap() - .start(); - - // Subscribe to unix signals - let signals = actix::Arbiter::system_registry().get::(); - signals.send(Subscribe(addr.subscriber())); - - println!("Started http server: 127.0.0.1:8080"); - # actix::Arbiter::system().send(actix::msgs::SystemExit(0)); - let _ = sys.run(); -} -``` +It is possible to disable signals handling with `HttpServer::disable_signals()` method. diff --git a/src/server.rs b/src/server.rs index 45d158c51..a38bb2b1d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,11 +6,11 @@ use std::marker::PhantomData; use std::collections::HashMap; use actix::prelude::*; +use actix::actors::signal; use futures::{Future, Sink, Stream}; use futures::sync::mpsc; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_core::net::TcpStream; -use actix::actors::signal; use mio; use num_cpus; use net2::TcpBuilder; @@ -105,6 +105,8 @@ pub struct HttpServer accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, exit: bool, shutdown_timeout: u16, + signals: Option>, + no_signals: bool, } unsafe impl Sync for HttpServer where H: HttpHandler + 'static {} @@ -150,6 +152,8 @@ impl HttpServer accept: Vec::new(), exit: false, shutdown_timeout: 30, + signals: None, + no_signals: false, } } @@ -208,6 +212,18 @@ impl HttpServer self } + /// Set alternative address for `ProcessSignals` actor. + pub fn signals(mut self, addr: SyncAddress) -> Self { + self.signals = Some(addr); + self + } + + /// Disable signal handling + pub fn disable_signals(mut self) -> Self { + self.no_signals = true; + self + } + /// Timeout for graceful workers shutdown. /// /// After receiving a stop signal, workers have this much time to finish serving requests. @@ -276,6 +292,18 @@ impl HttpServer info!("Starting {} http workers", self.threads); workers } + + // subscribe to os signals + fn subscribe_to_signals(&self, addr: &SyncAddress>) { + if self.no_signals { + let msg = signal::Subscribe(addr.subscriber()); + if let Some(ref signals) = self.signals { + signals.send(msg); + } else { + Arbiter::system_registry().get::().send(msg); + } + } + } } impl HttpServer @@ -327,18 +355,21 @@ impl HttpServer } // start http server actor - HttpServer::create(|_| {self}) + HttpServer::create(|ctx| { + self.subscribe_to_signals(&ctx.address()); + self + }) } } /// Spawn new thread and start listening for incomming connections. /// /// This method spawns new thread and starts new actix system. Other than that it is - /// similar to `start()` method. This method does not block. + /// similar to `start()` method. This method blocks. /// /// This methods panics if no socket addresses get bound. /// - /// ```rust + /// ```rust,ignore /// # extern crate futures; /// # extern crate actix; /// # extern crate actix_web; @@ -346,27 +377,22 @@ impl HttpServer /// use actix_web::*; /// /// fn main() { - /// let addr = HttpServer::new( + /// HttpServer::new( /// || Application::new() /// .resource("/", |r| r.h(httpcodes::HTTPOk))) /// .bind("127.0.0.1:0").expect("Can not bind to 127.0.0.1:0") - /// .spawn(); - /// - /// let _ = addr.call_fut( - /// dev::StopServer{graceful:true}).wait(); // <- Send `StopServer` message to server. + /// .run(); /// } /// ``` - pub fn spawn(mut self) -> SyncAddress { + pub fn run(mut self) { self.exit = true; + self.no_signals = false; - let (tx, rx) = sync_mpsc::channel(); - thread::spawn(move || { + let _ = thread::spawn(move || { let sys = System::new("http-server"); - let addr = self.start(); - let _ = tx.send(addr); - sys.run(); - }); - rx.recv().unwrap() + self.start(); + let _ = sys.run(); + }).join(); } } @@ -401,7 +427,10 @@ impl HttpServer, net::SocketAddr, H, } // start http server actor - Ok(HttpServer::create(|_| {self})) + Ok(HttpServer::create(|ctx| { + self.subscribe_to_signals(&ctx.address()); + self + })) } } } @@ -441,7 +470,10 @@ impl HttpServer, net::SocketAddr, H, } // start http server actor - Ok(HttpServer::create(|_| {self})) + Ok(HttpServer::create(|ctx| { + self.subscribe_to_signals(&ctx.address()); + self + })) } } } @@ -485,6 +517,7 @@ impl HttpServer, A, H, U> HttpServer::create(move |ctx| { ctx.add_stream(stream.map( move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false})); + self.subscribe_to_signals(&ctx.address()); self }) }