1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-04 14:28:50 +00:00

fix worker shutdown

This commit is contained in:
Nikolay Kim 2018-08-20 22:21:23 -07:00
parent ac70f06c4f
commit 2cbcc21168
6 changed files with 83 additions and 50 deletions

View file

@ -1,4 +1,5 @@
//! simple composite service //! simple composite service
//! build: cargo run --example basic --features "ssl"
//! to test: curl https://127.0.0.1:8443/ -k //! to test: curl https://127.0.0.1:8443/ -k
extern crate actix; extern crate actix;
extern crate actix_net; extern crate actix_net;

View file

@ -432,10 +432,6 @@ impl Connections {
self.0.available() self.0.available()
} }
pub(crate) fn num_connections(&self) -> usize {
self.0.conn.load(Ordering::Relaxed)
}
/// Report opened connection /// Report opened connection
pub fn connection(&self) -> ConnectionTag { pub fn connection(&self) -> ConnectionTag {
ConnectionTag::new(self.0.clone()) ConnectionTag::new(self.0.clone())

View file

@ -1,3 +1,7 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::{fmt, io, net}; use std::{fmt, io, net};
use futures::{future, Future, Poll}; use futures::{future, Future, Poll};
@ -16,6 +20,7 @@ pub(crate) type BoxedServerService = Box<
pub(crate) struct ServerService<T> { pub(crate) struct ServerService<T> {
inner: T, inner: T,
counter: Arc<AtomicUsize>,
} }
impl<T> Service for ServerService<T> impl<T> Service for ServerService<T>
@ -39,7 +44,11 @@ where
}); });
if let Ok(stream) = stream { if let Ok(stream) = stream {
Box::new(self.inner.call(stream).map_err(|_| ())) let counter = self.counter.clone();
let _ = counter.fetch_add(1, Ordering::Relaxed);
Box::new(self.inner.call(stream).map_err(|_| ()).map(move |_| {
let _ = counter.fetch_sub(1, Ordering::Relaxed);
}))
} else { } else {
Box::new(future::err(())) Box::new(future::err(()))
} }
@ -48,6 +57,7 @@ where
pub(crate) struct ServerNewService<T> { pub(crate) struct ServerNewService<T> {
inner: T, inner: T,
counter: Arc<AtomicUsize>,
} }
impl<T> ServerNewService<T> impl<T> ServerNewService<T>
@ -61,11 +71,16 @@ where
T::Error: fmt::Display, T::Error: fmt::Display,
{ {
pub(crate) fn create(inner: T) -> Box<ServerServiceFactory + Send> { pub(crate) fn create(inner: T) -> Box<ServerServiceFactory + Send> {
Box::new(Self { inner }) Box::new(Self {
inner,
counter: Arc::new(AtomicUsize::new(0)),
})
} }
} }
pub trait ServerServiceFactory { pub trait ServerServiceFactory {
fn counter(&self) -> Arc<AtomicUsize>;
fn clone_factory(&self) -> Box<ServerServiceFactory + Send>; fn clone_factory(&self) -> Box<ServerServiceFactory + Send>;
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>; fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>>;
@ -81,21 +96,31 @@ where
T::Future: 'static, T::Future: 'static,
T::Error: fmt::Display, T::Error: fmt::Display,
{ {
fn counter(&self) -> Arc<AtomicUsize> {
self.counter.clone()
}
fn clone_factory(&self) -> Box<ServerServiceFactory + Send> { fn clone_factory(&self) -> Box<ServerServiceFactory + Send> {
Box::new(Self { Box::new(Self {
inner: self.inner.clone(), inner: self.inner.clone(),
counter: Arc::new(AtomicUsize::new(0)),
}) })
} }
fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> { fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
Box::new(self.inner.new_service().map_err(|_| ()).map(|inner| { let counter = self.counter.clone();
let service: BoxedServerService = Box::new(ServerService { inner }); Box::new(self.inner.new_service().map_err(|_| ()).map(move |inner| {
let service: BoxedServerService = Box::new(ServerService { inner, counter });
service service
})) }))
} }
} }
impl ServerServiceFactory for Box<ServerServiceFactory> { impl ServerServiceFactory for Box<ServerServiceFactory> {
fn counter(&self) -> Arc<AtomicUsize> {
self.as_ref().counter()
}
fn clone_factory(&self) -> Box<ServerServiceFactory + Send> { fn clone_factory(&self) -> Box<ServerServiceFactory + Send> {
self.as_ref().clone_factory() self.as_ref().clone_factory()
} }

View file

@ -583,7 +583,7 @@ where
type Future = MapErrFuture<A, F, E>; type Future = MapErrFuture<A, F, E>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.a.poll_ready().map_err(|e| (self.f)(e)) self.a.poll_ready().map_err(&self.f)
} }
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
@ -619,7 +619,7 @@ where
type Error = E; type Error = E;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(|e| (self.f)(e)) self.fut.poll().map_err(&self.f)
} }
} }
@ -795,6 +795,6 @@ where
type Error = E; type Error = E;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(|e| (self.f)(e)) self.fut.poll().map_err(&self.f)
} }
} }

View file

@ -6,7 +6,6 @@ use futures::{future, future::FutureResult, Async, Future, Poll};
use openssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; use openssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream};
use tokio_tcp::TcpStream;
use tower_service::{NewService, Service}; use tower_service::{NewService, Service};
use {IntoNewService, IoStream}; use {IntoNewService, IoStream};

View file

@ -1,8 +1,10 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{net, time}; use std::{net, time};
use futures::future;
use futures::sync::mpsc::{SendError, UnboundedSender}; use futures::sync::mpsc::{SendError, UnboundedSender};
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::{future, Future};
use actix::msgs::StopArbiter; use actix::msgs::StopArbiter;
use actix::{ use actix::{
@ -59,6 +61,7 @@ impl Message for StopWorker {
pub(crate) struct Worker { pub(crate) struct Worker {
// conns: Connections, // conns: Connections,
services: Vec<BoxedServerService>, services: Vec<BoxedServerService>,
counters: Vec<Arc<AtomicUsize>>,
} }
impl Actor for Worker { impl Actor for Worker {
@ -71,6 +74,7 @@ impl Worker {
) -> Self { ) -> Self {
let wrk = Worker { let wrk = Worker {
services: Vec::new(), services: Vec::new(),
counters: services.iter().map(|i| i.counter()).collect(),
}; };
ctx.wait( ctx.wait(
@ -94,23 +98,26 @@ impl Worker {
} }
fn shutdown_timeout( fn shutdown_timeout(
&self, _ctx: &mut Context<Worker>, _tx: oneshot::Sender<bool>, _dur: time::Duration, &self, ctx: &mut Context<Worker>, tx: oneshot::Sender<bool>, dur: time::Duration,
) { ) {
// sleep for 1 second and then check again // sleep for 1 second and then check again
// ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| { ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
// let num = slf.conns.num_connections(); let num = slf
// if num == 0 { .counters
// let _ = tx.send(true); .iter()
// Arbiter::current().do_send(StopArbiter(0)); .fold(0, |i, v| i + v.load(Ordering::Relaxed));
// } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) { if num == 0 {
// slf.shutdown_timeout(ctx, tx, d); let _ = tx.send(true);
// } else { Arbiter::current().do_send(StopArbiter(0));
// info!("Force shutdown http worker, {} connections", num); } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
// slf.shutdown(true); slf.shutdown_timeout(ctx, tx, d);
// let _ = tx.send(false); } else {
// Arbiter::current().do_send(StopArbiter(0)); info!("Force shutdown http worker, {} connections", num);
// } slf.shutdown(true);
// }); let _ = tx.send(false);
Arbiter::current().do_send(StopArbiter(0));
}
});
} }
} }
@ -126,27 +133,32 @@ impl Handler<Conn> for Worker {
impl Handler<StopWorker> for Worker { impl Handler<StopWorker> for Worker {
type Result = Response<bool, ()>; type Result = Response<bool, ()>;
fn handle(&mut self, _msg: StopWorker, _ctx: &mut Context<Self>) -> Self::Result { fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
unimplemented!() let num = self
// let num = self.conns.num_connections(); .counters
// if num == 0 { .iter()
// info!("Shutting down http worker, 0 connections"); .fold(0, |i, v| i + v.load(Ordering::Relaxed));
// Response::reply(Ok(true)) if num == 0 {
// } else if let Some(dur) = msg.graceful { info!("Shutting down http worker, 0 connections");
// self.shutdown(false); Response::reply(Ok(true))
// let (tx, rx) = oneshot::channel(); } else if let Some(dur) = msg.graceful {
// let num = self.conns.num_connections(); self.shutdown(false);
// if num != 0 { let (tx, rx) = oneshot::channel();
// info!("Graceful http worker shutdown, {} connections", num); let num = self
// self.shutdown_timeout(ctx, tx, dur); .counters
// Response::reply(Ok(true)) .iter()
// } else { .fold(0, |i, v| i + v.load(Ordering::Relaxed));
// Response::async(rx.map_err(|_| ())) if num != 0 {
// } info!("Graceful http worker shutdown, {} connections", num);
// } else { self.shutdown_timeout(ctx, tx, dur);
// info!("Force shutdown http worker, {} connections", num); Response::reply(Ok(true))
// self.shutdown(true); } else {
// Response::reply(Ok(false)) Response::async(rx.map_err(|_| ()))
// } }
} else {
info!("Force shutdown http worker, {} connections", num);
self.shutdown(true);
Response::reply(Ok(false))
}
} }
} }