1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-23 08:36:34 +00:00

handle panics in worker threads

This commit is contained in:
Nikolay Kim 2018-03-07 21:10:53 -08:00
parent 824244622f
commit 6c0fb3a7d2
3 changed files with 133 additions and 32 deletions

View file

@ -21,6 +21,8 @@
* Allow to use std::net::TcpListener for HttpServer * Allow to use std::net::TcpListener for HttpServer
* Handle panics in worker threads
## 0.4.4 (2018-03-04) ## 0.4.4 (2018-03-04)

View file

@ -36,7 +36,8 @@ pub struct HttpServer<H> where H: IntoHttpHandler + 'static
host: Option<String>, host: Option<String>,
keep_alive: Option<u64>, keep_alive: Option<u64>,
factory: Arc<Fn() -> Vec<H> + Send + Sync>, factory: Arc<Fn() -> Vec<H> + Send + Sync>,
workers: Vec<Addr<Syn, Worker<H::Handler>>>, #[cfg_attr(feature="cargo-clippy", allow(type_complexity))]
workers: Vec<(usize, Addr<Syn, Worker<H::Handler>>)>,
sockets: HashMap<net::SocketAddr, net::TcpListener>, sockets: HashMap<net::SocketAddr, net::TcpListener>,
accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>, accept: Vec<(mio::SetReadiness, sync_mpsc::Sender<Command>)>,
exit: bool, exit: bool,
@ -48,6 +49,15 @@ pub struct HttpServer<H> where H: IntoHttpHandler + 'static
unsafe impl<H> Sync for HttpServer<H> where H: IntoHttpHandler {} unsafe impl<H> Sync for HttpServer<H> where H: IntoHttpHandler {}
unsafe impl<H> Send for HttpServer<H> where H: IntoHttpHandler {} unsafe impl<H> Send for HttpServer<H> where H: IntoHttpHandler {}
#[derive(Clone)]
struct Info {
addr: net::SocketAddr,
handler: StreamHandlerType,
}
enum ServerCommand {
WorkerDied(usize, Info),
}
impl<H> Actor for HttpServer<H> where H: IntoHttpHandler impl<H> Actor for HttpServer<H> where H: IntoHttpHandler
{ {
@ -210,11 +220,11 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
} }
fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType) fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType)
-> Vec<mpsc::UnboundedSender<Conn<net::TcpStream>>> -> Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>
{ {
// start workers // start workers
let mut workers = Vec::new(); let mut workers = Vec::new();
for _ in 0..self.threads { for idx in 0..self.threads {
let s = settings.clone(); let s = settings.clone();
let (tx, rx) = mpsc::unbounded::<Conn<net::TcpStream>>(); let (tx, rx) = mpsc::unbounded::<Conn<net::TcpStream>>();
@ -228,8 +238,8 @@ impl<H> HttpServer<H> where H: IntoHttpHandler + 'static
ctx.add_message_stream(rx); ctx.add_message_stream(rx);
Worker::new(apps, h, ka) Worker::new(apps, h, ka)
}); });
workers.push(tx); workers.push((idx, tx));
self.workers.push(addr); self.workers.push((idx, addr));
} }
info!("Starting {} http workers", self.threads); info!("Starting {} http workers", self.threads);
workers workers
@ -283,21 +293,28 @@ impl<H: IntoHttpHandler> HttpServer<H>
if self.sockets.is_empty() { if self.sockets.is_empty() {
panic!("HttpServer::bind() has to be called before start()"); panic!("HttpServer::bind() has to be called before start()");
} else { } else {
let (tx, rx) = mpsc::unbounded();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect(); self.sockets.drain().collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal); let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
let info = Info{addr: addrs[0].0, handler: StreamHandlerType::Normal};
// start acceptors threads // start acceptors threads
for (addr, sock) in addrs { for (addr, sock) in addrs {
info!("Starting server on http://{}", addr); info!("Starting server on http://{}", addr);
self.accept.push( self.accept.push(
start_accept_thread(sock, addr, self.backlog, workers.clone())); start_accept_thread(
sock, addr, self.backlog,
tx.clone(), info.clone(), workers.clone()));
} }
// start http server actor // start http server actor
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = Actor::start(self); let addr: Addr<Syn, _> = Actor::create(move |ctx| {
ctx.add_stream(rx);
self
});
signals.map(|signals| signals.do_send( signals.map(|signals| signals.do_send(
signal::Subscribe(addr.clone().recipient()))); signal::Subscribe(addr.clone().recipient())));
addr addr
@ -359,7 +376,10 @@ impl<H: IntoHttpHandler> HttpServer<H>
// start http server actor // start http server actor
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = Actor::start(self); let addr: Addr<Syn, _> = Actor::create(|ctx| {
ctx.add_stream(rx);
self
});
signals.map(|signals| signals.do_send( signals.map(|signals| signals.do_send(
signal::Subscribe(addr.clone().recipient()))); signal::Subscribe(addr.clone().recipient())));
Ok(addr) Ok(addr)
@ -403,7 +423,10 @@ impl<H: IntoHttpHandler> HttpServer<H>
// start http server actor // start http server actor
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = Actor::start(self); let addr: Addr<Syn, _> = Actor::create(|ctx| {
ctx.add_stream(rx);
self
});
signals.map(|signals| signals.do_send( signals.map(|signals| signals.do_send(
signal::Subscribe(addr.clone().recipient()))); signal::Subscribe(addr.clone().recipient())));
Ok(addr) Ok(addr)
@ -421,17 +444,22 @@ impl<H: IntoHttpHandler> HttpServer<H>
T: AsyncRead + AsyncWrite + 'static, T: AsyncRead + AsyncWrite + 'static,
A: 'static A: 'static
{ {
let (tx, rx) = mpsc::unbounded();
if !self.sockets.is_empty() { if !self.sockets.is_empty() {
let addrs: Vec<(net::SocketAddr, net::TcpListener)> = let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect(); self.sockets.drain().collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false); let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal); let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
let info = Info{addr: addrs[0].0, handler: StreamHandlerType::Normal};
// start acceptors threads // start acceptors threads
for (addr, sock) in addrs { for (addr, sock) in addrs {
info!("Starting server on http://{}", addr); info!("Starting server on http://{}", addr);
self.accept.push( self.accept.push(
start_accept_thread(sock, addr, self.backlog, workers.clone())); start_accept_thread(
sock, addr, self.backlog,
tx.clone(), info.clone(), workers.clone()));
} }
} }
@ -445,6 +473,7 @@ impl<H: IntoHttpHandler> HttpServer<H>
// start server // start server
let signals = self.subscribe_to_signals(); let signals = self.subscribe_to_signals();
let addr: Addr<Syn, _> = HttpServer::create(move |ctx| { let addr: Addr<Syn, _> = HttpServer::create(move |ctx| {
ctx.add_stream(rx);
ctx.add_message_stream( ctx.add_message_stream(
stream stream
.map_err(|_| ()) .map_err(|_| ())
@ -486,6 +515,61 @@ impl<H: IntoHttpHandler> Handler<signal::Signal> for HttpServer<H>
} }
} }
/// Commands from accept threads
impl<H: IntoHttpHandler> StreamHandler<ServerCommand, ()> for HttpServer<H>
{
fn finished(&mut self, _: &mut Context<Self>) {}
fn handle(&mut self, msg: ServerCommand, _: &mut Context<Self>) {
match msg {
ServerCommand::WorkerDied(idx, info) => {
let mut found = false;
for i in 0..self.workers.len() {
if self.workers[i].0 == idx {
self.workers.swap_remove(i);
found = true;
break
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let (tx, rx) = mpsc::unbounded::<Conn<net::TcpStream>>();
let mut new_idx = self.workers.len();
'found: loop {
for i in 0..self.workers.len() {
if self.workers[i].0 == new_idx {
new_idx += 1;
continue 'found
}
}
break
}
let h = info.handler;
let ka = self.keep_alive;
let factory = Arc::clone(&self.factory);
let settings = ServerSettings::new(Some(info.addr), &self.host, false);
let addr = Arbiter::start(move |ctx: &mut Context<_>| {
let apps: Vec<_> = (*factory)()
.into_iter()
.map(|h| h.into_handler(settings.clone())).collect();
ctx.add_message_stream(rx);
Worker::new(apps, h, ka)
});
for item in &self.accept {
let _ = item.1.send(Command::Worker(new_idx, tx.clone()));
let _ = item.0.set_readiness(mio::Ready::readable());
}
self.workers.push((new_idx, addr));
}
},
}
}
}
impl<T, H> Handler<Conn<T>> for HttpServer<H> impl<T, H> Handler<Conn<T>> for HttpServer<H>
where T: IoStream, where T: IoStream,
H: IntoHttpHandler, H: IntoHttpHandler,
@ -545,7 +629,7 @@ impl<H: IntoHttpHandler> Handler<StopServer> for HttpServer<H>
}; };
for worker in &self.workers { for worker in &self.workers {
let tx2 = tx.clone(); let tx2 = tx.clone();
let fut = worker.send(StopWorker{graceful: dur}).into_actor(self); let fut = worker.1.send(StopWorker{graceful: dur}).into_actor(self);
ActorFuture::then(fut, move |_, slf, _| { ActorFuture::then(fut, move |_, slf, _| {
slf.workers.pop(); slf.workers.pop();
if slf.workers.is_empty() { if slf.workers.is_empty() {
@ -577,16 +661,20 @@ enum Command {
Pause, Pause,
Resume, Resume,
Stop, Stop,
Worker(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>),
} }
fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i32, fn start_accept_thread(
workers: Vec<mpsc::UnboundedSender<Conn<net::TcpStream>>>) sock: net::TcpListener, addr: net::SocketAddr, backlog: i32,
-> (mio::SetReadiness, sync_mpsc::Sender<Command>) srv: mpsc::UnboundedSender<ServerCommand>, info: Info,
mut workers: Vec<(usize, mpsc::UnboundedSender<Conn<net::TcpStream>>)>)
-> (mio::SetReadiness, sync_mpsc::Sender<Command>)
{ {
let (tx, rx) = sync_mpsc::channel(); let (tx, rx) = sync_mpsc::channel();
let (reg, readiness) = mio::Registration::new2(); let (reg, readiness) = mio::Registration::new2();
// start accept thread // start accept thread
#[cfg_attr(feature="cargo-clippy", allow(cyclomatic_complexity))]
let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || { let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || {
const SRV: mio::Token = mio::Token(0); const SRV: mio::Token = mio::Token(0);
const CMD: mio::Token = mio::Token(1); const CMD: mio::Token = mio::Token(1);
@ -629,25 +717,35 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i
for event in events.iter() { for event in events.iter() {
match event.token() { match event.token() {
SRV => { SRV => if let Some(ref server) = server {
if let Some(ref server) = server { loop {
loop { match server.accept_std() {
match server.accept_std() { Ok((sock, addr)) => {
Ok((sock, addr)) => { let mut msg = Conn{
let msg = Conn{ io: sock, peer: Some(addr), http2: false};
io: sock, peer: Some(addr), http2: false}; while !workers.is_empty() {
workers[next].unbounded_send(msg) match workers[next].1.unbounded_send(msg) {
.expect("worker thread died"); Ok(_) => (),
next = (next + 1) % workers.len(); Err(err) => {
}, let _ = srv.unbounded_send(
Err(err) => { ServerCommand::WorkerDied(
if err.kind() != io::ErrorKind::WouldBlock { workers[next].0, info.clone()));
error!("Error accepting connection: {:?}", err); msg = err.into_inner();
workers.swap_remove(next);
continue
}
} }
// sleep after error next = (next + 1) % workers.len();
thread::sleep(sleep);
break break
} }
},
Err(err) => {
if err.kind() != io::ErrorKind::WouldBlock {
error!("Error accepting connection: {:?}", err);
}
// sleep after error
thread::sleep(sleep);
break
} }
} }
} }
@ -686,6 +784,9 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i
} }
return return
}, },
Command::Worker(idx, addr) => {
workers.push((idx, addr));
},
}, },
Err(err) => match err { Err(err) => match err {
sync_mpsc::TryRecvError::Empty => (), sync_mpsc::TryRecvError::Empty => (),

View file

@ -322,10 +322,8 @@ fn test_body_streaming_implicit() {
assert_eq!(bytes, Bytes::from_static(STR.as_ref())); assert_eq!(bytes, Bytes::from_static(STR.as_ref()));
} }
extern crate env_logger;
#[test] #[test]
fn test_client_cookie_handling() { fn test_client_cookie_handling() {
env_logger::init();
use actix_web::header::Cookie; use actix_web::header::Cookie;
fn err() -> Error { fn err() -> Error {
use std::io::{ErrorKind, Error as IoError}; use std::io::{ErrorKind, Error as IoError};