diff --git a/CHANGES.md b/CHANGES.md index 542d02516..6bce735c3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -21,6 +21,8 @@ * Allow to use std::net::TcpListener for HttpServer +* Handle panics in worker threads + ## 0.4.4 (2018-03-04) diff --git a/src/server/srv.rs b/src/server/srv.rs index 82080403e..2b3cbae62 100644 --- a/src/server/srv.rs +++ b/src/server/srv.rs @@ -36,7 +36,8 @@ pub struct HttpServer where H: IntoHttpHandler + 'static host: Option, keep_alive: Option, factory: Arc Vec + Send + Sync>, - workers: Vec>>, + #[cfg_attr(feature="cargo-clippy", allow(type_complexity))] + workers: Vec<(usize, Addr>)>, sockets: HashMap, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender)>, exit: bool, @@ -48,6 +49,15 @@ pub struct HttpServer where H: IntoHttpHandler + 'static unsafe impl Sync for HttpServer where H: IntoHttpHandler {} unsafe impl Send for HttpServer where H: IntoHttpHandler {} +#[derive(Clone)] +struct Info { + addr: net::SocketAddr, + handler: StreamHandlerType, +} + +enum ServerCommand { + WorkerDied(usize, Info), +} impl Actor for HttpServer where H: IntoHttpHandler { @@ -210,11 +220,11 @@ impl HttpServer where H: IntoHttpHandler + 'static } fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType) - -> Vec>> + -> Vec<(usize, mpsc::UnboundedSender>)> { // start workers let mut workers = Vec::new(); - for _ in 0..self.threads { + for idx in 0..self.threads { let s = settings.clone(); let (tx, rx) = mpsc::unbounded::>(); @@ -228,8 +238,8 @@ impl HttpServer where H: IntoHttpHandler + 'static ctx.add_message_stream(rx); Worker::new(apps, h, ka) }); - workers.push(tx); - self.workers.push(addr); + workers.push((idx, tx)); + self.workers.push((idx, addr)); } info!("Starting {} http workers", self.threads); workers @@ -283,21 +293,28 @@ impl HttpServer if self.sockets.is_empty() { panic!("HttpServer::bind() has to be called before start()"); } else { + let (tx, rx) = mpsc::unbounded(); let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain().collect(); let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); let workers = self.start_workers(&settings, &StreamHandlerType::Normal); + let info = Info{addr: addrs[0].0, handler: StreamHandlerType::Normal}; // start acceptors threads for (addr, sock) in addrs { info!("Starting server on http://{}", addr); self.accept.push( - start_accept_thread(sock, addr, self.backlog, workers.clone())); + start_accept_thread( + sock, addr, self.backlog, + tx.clone(), info.clone(), workers.clone())); } // start http server actor let signals = self.subscribe_to_signals(); - let addr: Addr = Actor::start(self); + let addr: Addr = Actor::create(move |ctx| { + ctx.add_stream(rx); + self + }); signals.map(|signals| signals.do_send( signal::Subscribe(addr.clone().recipient()))); addr @@ -359,7 +376,10 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); - let addr: Addr = Actor::start(self); + let addr: Addr = Actor::create(|ctx| { + ctx.add_stream(rx); + self + }); signals.map(|signals| signals.do_send( signal::Subscribe(addr.clone().recipient()))); Ok(addr) @@ -403,7 +423,10 @@ impl HttpServer // start http server actor let signals = self.subscribe_to_signals(); - let addr: Addr = Actor::start(self); + let addr: Addr = Actor::create(|ctx| { + ctx.add_stream(rx); + self + }); signals.map(|signals| signals.do_send( signal::Subscribe(addr.clone().recipient()))); Ok(addr) @@ -421,17 +444,22 @@ impl HttpServer T: AsyncRead + AsyncWrite + 'static, A: 'static { + let (tx, rx) = mpsc::unbounded(); + if !self.sockets.is_empty() { let addrs: Vec<(net::SocketAddr, net::TcpListener)> = self.sockets.drain().collect(); let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); let workers = self.start_workers(&settings, &StreamHandlerType::Normal); + let info = Info{addr: addrs[0].0, handler: StreamHandlerType::Normal}; // start acceptors threads for (addr, sock) in addrs { info!("Starting server on http://{}", addr); self.accept.push( - start_accept_thread(sock, addr, self.backlog, workers.clone())); + start_accept_thread( + sock, addr, self.backlog, + tx.clone(), info.clone(), workers.clone())); } } @@ -445,6 +473,7 @@ impl HttpServer // start server let signals = self.subscribe_to_signals(); let addr: Addr = HttpServer::create(move |ctx| { + ctx.add_stream(rx); ctx.add_message_stream( stream .map_err(|_| ()) @@ -486,6 +515,61 @@ impl Handler for HttpServer } } +/// Commands from accept threads +impl StreamHandler for HttpServer +{ + fn finished(&mut self, _: &mut Context) {} + fn handle(&mut self, msg: ServerCommand, _: &mut Context) { + match msg { + ServerCommand::WorkerDied(idx, info) => { + let mut found = false; + for i in 0..self.workers.len() { + if self.workers[i].0 == idx { + self.workers.swap_remove(i); + found = true; + break + } + } + + if found { + error!("Worker has died {:?}, restarting", idx); + let (tx, rx) = mpsc::unbounded::>(); + + let mut new_idx = self.workers.len(); + 'found: loop { + for i in 0..self.workers.len() { + if self.workers[i].0 == new_idx { + new_idx += 1; + continue 'found + } + } + break + } + + let h = info.handler; + let ka = self.keep_alive; + let factory = Arc::clone(&self.factory); + let settings = ServerSettings::new(Some(info.addr), &self.host, false); + + let addr = Arbiter::start(move |ctx: &mut Context<_>| { + let apps: Vec<_> = (*factory)() + .into_iter() + .map(|h| h.into_handler(settings.clone())).collect(); + ctx.add_message_stream(rx); + Worker::new(apps, h, ka) + }); + for item in &self.accept { + let _ = item.1.send(Command::Worker(new_idx, tx.clone())); + let _ = item.0.set_readiness(mio::Ready::readable()); + } + + self.workers.push((new_idx, addr)); + } + }, + } + } +} + impl Handler> for HttpServer where T: IoStream, H: IntoHttpHandler, @@ -545,7 +629,7 @@ impl Handler for HttpServer }; for worker in &self.workers { let tx2 = tx.clone(); - let fut = worker.send(StopWorker{graceful: dur}).into_actor(self); + let fut = worker.1.send(StopWorker{graceful: dur}).into_actor(self); ActorFuture::then(fut, move |_, slf, _| { slf.workers.pop(); if slf.workers.is_empty() { @@ -577,16 +661,20 @@ enum Command { Pause, Resume, Stop, + Worker(usize, mpsc::UnboundedSender>), } -fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i32, - workers: Vec>>) - -> (mio::SetReadiness, sync_mpsc::Sender) +fn start_accept_thread( + sock: net::TcpListener, addr: net::SocketAddr, backlog: i32, + srv: mpsc::UnboundedSender, info: Info, + mut workers: Vec<(usize, mpsc::UnboundedSender>)>) + -> (mio::SetReadiness, sync_mpsc::Sender) { let (tx, rx) = sync_mpsc::channel(); let (reg, readiness) = mio::Registration::new2(); // start accept thread + #[cfg_attr(feature="cargo-clippy", allow(cyclomatic_complexity))] let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { const SRV: mio::Token = mio::Token(0); const CMD: mio::Token = mio::Token(1); @@ -629,25 +717,35 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i for event in events.iter() { match event.token() { - SRV => { - if let Some(ref server) = server { - loop { - match server.accept_std() { - Ok((sock, addr)) => { - let msg = Conn{ - io: sock, peer: Some(addr), http2: false}; - workers[next].unbounded_send(msg) - .expect("worker thread died"); - next = (next + 1) % workers.len(); - }, - Err(err) => { - if err.kind() != io::ErrorKind::WouldBlock { - error!("Error accepting connection: {:?}", err); + SRV => if let Some(ref server) = server { + loop { + match server.accept_std() { + Ok((sock, addr)) => { + let mut msg = Conn{ + io: sock, peer: Some(addr), http2: false}; + while !workers.is_empty() { + match workers[next].1.unbounded_send(msg) { + Ok(_) => (), + Err(err) => { + let _ = srv.unbounded_send( + ServerCommand::WorkerDied( + workers[next].0, info.clone())); + msg = err.into_inner(); + workers.swap_remove(next); + continue + } } - // sleep after error - thread::sleep(sleep); + next = (next + 1) % workers.len(); break } + }, + Err(err) => { + if err.kind() != io::ErrorKind::WouldBlock { + error!("Error accepting connection: {:?}", err); + } + // sleep after error + thread::sleep(sleep); + break } } } @@ -686,6 +784,9 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i } return }, + Command::Worker(idx, addr) => { + workers.push((idx, addr)); + }, }, Err(err) => match err { sync_mpsc::TryRecvError::Empty => (), diff --git a/tests/test_client.rs b/tests/test_client.rs index af4a4e50c..ef34d39a5 100644 --- a/tests/test_client.rs +++ b/tests/test_client.rs @@ -322,10 +322,8 @@ fn test_body_streaming_implicit() { assert_eq!(bytes, Bytes::from_static(STR.as_ref())); } -extern crate env_logger; #[test] fn test_client_cookie_handling() { - env_logger::init(); use actix_web::header::Cookie; fn err() -> Error { use std::io::{ErrorKind, Error as IoError};