diff --git a/examples/basic.rs b/examples/basic.rs index 43e2bf4cd..0650a078c 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -20,8 +20,7 @@ use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::SslAcceptorExt; -use actix_net::service::{IntoNewService, NewServiceExt}; -use actix_net::Server; +use actix_net::{IntoNewService, NewService, Server}; /// Simple logger service, it just prints fact of the new connections fn logger( @@ -90,7 +89,7 @@ fn main() { Ok::<_, io::Error>(ServiceState { num: num.clone() }) })); - Server::new().bind("0.0.0.0:8443", srv).unwrap().start(); + Server::default().bind("0.0.0.0:8443", srv).unwrap().start(); sys.run(); } diff --git a/examples/ssl.rs b/examples/ssl.rs index 5feb37c3b..5d974f726 100644 --- a/examples/ssl.rs +++ b/examples/ssl.rs @@ -15,8 +15,7 @@ use futures::{future, Future}; use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use tokio_io::{AsyncRead, AsyncWrite}; -use actix_net::service::NewServiceExt; -use actix_net::{ssl, Server}; +use actix_net::{ssl, NewService, Server}; #[derive(Debug)] struct ServiceState { @@ -24,7 +23,7 @@ struct ServiceState { } fn service( - st: &mut ServiceState, stream: T, + st: &mut ServiceState, _: T, ) -> impl Future { let num = st.num.fetch_add(1, Ordering::Relaxed); println!("got ssl connection {:?}", num); @@ -50,7 +49,7 @@ fn main() { Ok::<_, io::Error>(ServiceState { num: num.clone() }) })); - Server::new().bind("0.0.0.0:8443", srv).unwrap().start(); + Server::default().bind("0.0.0.0:8443", srv).unwrap().start(); sys.run(); } diff --git a/src/lib.rs b/src/lib.rs index b59065b31..9ef55abbc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,16 +65,20 @@ use bytes::{BufMut, BytesMut}; use futures::{Async, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; +pub use tower_service::Service; + pub(crate) mod accept; mod extensions; mod server; +pub mod server_config; mod server_service; pub mod service; pub mod ssl; mod worker; pub use self::server::{ConnectionRateTag, ConnectionTag, Connections, Server}; -pub use service::{IntoNewService, IntoService}; +pub use server_config::Config; +pub use service::{IntoNewService, IntoService, NewService}; pub use extensions::Extensions; diff --git a/src/server.rs b/src/server.rs index 1224de115..6795471d0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,7 +10,6 @@ use futures::{Future, Sink, Stream}; use net2::TcpBuilder; use num_cpus; use tokio_tcp::TcpStream; -use tower_service::NewService; use actix::{ actors::signal, fut, Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, Handler, @@ -18,8 +17,9 @@ use actix::{ }; use super::accept::{AcceptLoop, AcceptNotify, Command}; +use super::server_config::{Config, ServerConfig}; use super::server_service::{ServerNewService, ServerServiceFactory}; -use super::service::IntoNewService; +use super::service::{IntoNewService, NewService}; use super::worker::{Conn, StopWorker, Worker, WorkerClient}; use super::{PauseServer, ResumeServer, StopServer, Token}; @@ -28,10 +28,11 @@ pub(crate) enum ServerCommand { } /// Server -pub struct Server { +pub struct Server { + config: C, threads: usize, workers: Vec<(usize, Addr)>, - services: Vec>, + services: Vec + Send>>, sockets: Vec<(Token, net::TcpListener)>, accept: AcceptLoop, exit: bool, @@ -42,16 +43,17 @@ pub struct Server { maxconnrate: usize, } -impl Default for Server { +impl Default for Server { fn default() -> Self { - Self::new() + Self::new(ServerConfig::default()) } } -impl Server { +impl Server { /// Create new Server instance - pub fn new() -> Server { + pub fn new(config: C) -> Server { Server { + config, threads: num_cpus::get(), workers: Vec::new(), services: Vec::new(), @@ -81,7 +83,10 @@ impl Server { /// reached for each worker. /// /// By default max connections is set to a 100k. - pub fn maxconn(mut self, num: usize) -> Self { + pub fn maxconn(mut self, num: usize) -> Self + where + C: AsMut, + { self.maxconn = num; self } @@ -135,7 +140,7 @@ impl Server { where U: net::ToSocketAddrs, T: IntoNewService + Clone, - N: NewService + N: NewService + Clone + Send + 'static, @@ -155,7 +160,7 @@ impl Server { pub fn listen(mut self, lst: net::TcpListener, srv: T) -> Self where T: IntoNewService, - N: NewService + N: NewService + Clone + Send + 'static, @@ -164,8 +169,10 @@ impl Server { N::Error: fmt::Display, { let token = Token(self.services.len()); - self.services - .push(ServerNewService::create(srv.into_new_service())); + self.services.push(ServerNewService::create( + srv.into_new_service(), + self.config.clone(), + )); self.sockets.push((token, lst)); self } @@ -199,7 +206,7 @@ impl Server { } /// Starts Server Actor and returns its address - pub fn start(mut self) -> Addr { + pub fn start(mut self) -> Addr> { if self.sockets.is_empty() { panic!("Service should have at least one bound socket"); } else { @@ -251,7 +258,7 @@ impl Server { let (tx, rx) = unbounded::(); let conns = Connections::new(notify, self.maxconn, self.maxconnrate); let worker = WorkerClient::new(idx, tx, conns.clone()); - let services: Vec> = + let services: Vec + Send>> = self.services.iter().map(|v| v.clone_factory()).collect(); let addr = Arbiter::start(move |ctx: &mut Context<_>| { @@ -263,14 +270,14 @@ impl Server { } } -impl Actor for Server { +impl Actor for Server { type Context = Context; } /// Signals support /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system /// message to `System` actor. -impl Handler for Server { +impl Handler for Server { type Result = (); fn handle(&mut self, msg: signal::Signal, ctx: &mut Context) { @@ -295,7 +302,7 @@ impl Handler for Server { } } -impl Handler for Server { +impl Handler for Server { type Result = (); fn handle(&mut self, _: PauseServer, _: &mut Context) { @@ -303,7 +310,7 @@ impl Handler for Server { } } -impl Handler for Server { +impl Handler for Server { type Result = (); fn handle(&mut self, _: ResumeServer, _: &mut Context) { @@ -311,7 +318,7 @@ impl Handler for Server { } } -impl Handler for Server { +impl Handler for Server { type Result = Response<(), ()>; fn handle(&mut self, msg: StopServer, ctx: &mut Context) -> Self::Result { @@ -366,7 +373,7 @@ impl Handler for Server { } /// Commands from accept threads -impl StreamHandler for Server { +impl StreamHandler for Server { fn finished(&mut self, _: &mut Context) {} fn handle(&mut self, msg: ServerCommand, _: &mut Context) { diff --git a/src/server_config.rs b/src/server_config.rs new file mode 100644 index 000000000..199f71232 --- /dev/null +++ b/src/server_config.rs @@ -0,0 +1,81 @@ +//! Default server config +use std::sync::{atomic::AtomicUsize, Arc}; + +pub trait Config: Send + Clone + Default + 'static { + fn fork(&self) -> Self; +} + +#[derive(Clone, Default)] +pub struct ServerConfig { + conn: ConnectionsConfig, + ssl: SslConfig, +} + +impl Config for ServerConfig { + fn fork(&self) -> Self { + ServerConfig { + conn: self.conn.fork(), + ssl: self.ssl.fork(), + } + } +} + +impl AsRef for ServerConfig { + fn as_ref(&self) -> &ConnectionsConfig { + &self.conn + } +} + +impl AsRef for ServerConfig { + fn as_ref(&self) -> &SslConfig { + &self.ssl + } +} + +#[derive(Clone)] +pub struct ConnectionsConfig { + max_connections: usize, + num_connections: Arc, +} + +impl Default for ConnectionsConfig { + fn default() -> Self { + ConnectionsConfig { + max_connections: 102_400, + num_connections: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl Config for ConnectionsConfig { + fn fork(&self) -> Self { + ConnectionsConfig { + max_connections: self.max_connections, + num_connections: Arc::new(AtomicUsize::new(0)), + } + } +} + +#[derive(Clone)] +pub struct SslConfig { + max_handshakes: usize, + num: Arc, +} + +impl Default for SslConfig { + fn default() -> Self { + SslConfig { + max_handshakes: 256, + num: Arc::new(AtomicUsize::new(0)), + } + } +} + +impl Config for SslConfig { + fn fork(&self) -> Self { + SslConfig { + max_handshakes: self.max_handshakes, + num: Arc::new(AtomicUsize::new(0)), + } + } +} diff --git a/src/server_service.rs b/src/server_service.rs index 973a9c3da..15083a6a0 100644 --- a/src/server_service.rs +++ b/src/server_service.rs @@ -7,7 +7,8 @@ use std::{fmt, io, net}; use futures::{future, Future, Poll}; use tokio_reactor::Handle; use tokio_tcp::TcpStream; -use tower_service::{NewService, Service}; + +use super::{Config, NewService, Service}; pub(crate) type BoxedServerService = Box< Service< @@ -55,14 +56,15 @@ where } } -pub(crate) struct ServerNewService { +pub(crate) struct ServerNewService { inner: T, + config: C, counter: Arc, } -impl ServerNewService +impl ServerNewService where - T: NewService + T: NewService + Clone + Send + 'static, @@ -70,25 +72,26 @@ where T::Future: 'static, T::Error: fmt::Display, { - pub(crate) fn create(inner: T) -> Box { + pub(crate) fn create(inner: T, config: C) -> Box + Send> { Box::new(Self { inner, + config, counter: Arc::new(AtomicUsize::new(0)), }) } } -pub trait ServerServiceFactory { +pub trait ServerServiceFactory { fn counter(&self) -> Arc; - fn clone_factory(&self) -> Box; + fn clone_factory(&self) -> Box + Send>; fn create(&self) -> Box>; } -impl ServerServiceFactory for ServerNewService +impl ServerServiceFactory for ServerNewService where - T: NewService + T: NewService + Clone + Send + 'static, @@ -100,28 +103,35 @@ where self.counter.clone() } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box + Send> { Box::new(Self { inner: self.inner.clone(), + config: self.config.fork(), counter: Arc::new(AtomicUsize::new(0)), }) } fn create(&self) -> Box> { let counter = self.counter.clone(); - Box::new(self.inner.new_service().map_err(|_| ()).map(move |inner| { - let service: BoxedServerService = Box::new(ServerService { inner, counter }); - service - })) + Box::new( + self.inner + .new_service(self.config.clone()) + .map_err(|_| ()) + .map(move |inner| { + let service: BoxedServerService = + Box::new(ServerService { inner, counter }); + service + }), + ) } } -impl ServerServiceFactory for Box { +impl ServerServiceFactory for Box> { fn counter(&self) -> Arc { self.as_ref().counter() } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box + Send> { self.as_ref().clone_factory() } diff --git a/src/service.rs b/src/service.rs index b854765a3..930b7bdaa 100644 --- a/src/service.rs +++ b/src/service.rs @@ -3,9 +3,44 @@ use std::marker; use std::rc::Rc; use futures::{future, future::FutureResult, Async, Future, IntoFuture, Poll}; -use tower_service::{NewService, Service}; +use tower_service::Service; + +/// Creates new `Service` values. +/// +/// Acts as a service factory. This is useful for cases where new `Service` +/// values must be produced. One case is a TCP servier listener. The listner +/// accepts new TCP streams, obtains a new `Service` value using the +/// `NewService` trait, and uses that new `Service` value to process inbound +/// requests on that new TCP stream. +pub trait NewService { + /// Requests handled by the service + type Request; + + /// Responses given by the service + type Response; + + /// Errors produced by the service + type Error; + + /// The `Service` value created by this factory + type Service: Service< + Request = Self::Request, + Response = Self::Response, + Error = Self::Error, + >; + + /// Pipeline configuration + type Config: Clone; + + /// Errors produced while building a service. + type InitError; + + /// The future of the `Service` instance. + type Future: Future; + + /// Create and return a new service value asynchronously. + fn new_service(&self, Self::Config) -> Self::Future; -pub trait NewServiceExt: NewService { fn and_then(self, new_service: F) -> AndThenNewService where Self: Sized, @@ -13,30 +48,7 @@ pub trait NewServiceExt: NewService { B: NewService< Request = Self::Response, Error = Self::Error, - InitError = Self::InitError, - >; - - fn map_err(self, f: F) -> MapErrNewService - where - Self: Sized, - F: Fn(Self::Error) -> E; - - fn map_init_err(self, f: F) -> MapInitErr - where - Self: Sized, - F: Fn(Self::InitError) -> E; -} - -impl NewServiceExt for T -where - T: NewService, -{ - fn and_then(self, new_service: F) -> AndThenNewService - where - F: IntoNewService, - B: NewService< - Request = Self::Response, - Error = Self::Error, + Config = Self::Config, InitError = Self::InitError, >, { @@ -45,6 +57,7 @@ where fn map_err(self, f: F) -> MapErrNewService where + Self: Sized, F: Fn(Self::Error) -> E, { MapErrNewService::new(self, f) @@ -150,7 +163,7 @@ where } } -pub struct FnNewService +pub struct FnNewService where F: Fn(Req) -> Fut, Fut: IntoFuture, @@ -159,9 +172,10 @@ where req: marker::PhantomData, resp: marker::PhantomData, err: marker::PhantomData, + cfg: marker::PhantomData, } -impl FnNewService +impl FnNewService where F: Fn(Req) -> Fut + Clone, Fut: IntoFuture, @@ -172,38 +186,43 @@ where req: marker::PhantomData, resp: marker::PhantomData, err: marker::PhantomData, + cfg: marker::PhantomData, } } } -impl NewService for FnNewService +impl NewService for FnNewService where F: Fn(Req) -> Fut + Clone, Fut: IntoFuture, + Cfg: Clone, { type Request = Req; type Response = Resp; type Error = Err; type Service = FnService; + type Config = Cfg; type InitError = (); type Future = FutureResult; - fn new_service(&self) -> Self::Future { + fn new_service(&self, cfg: Cfg) -> Self::Future { future::ok(FnService::new(self.f.clone())) } } -impl IntoNewService> for F +impl IntoNewService> + for F where F: Fn(Req) -> Fut + Clone + 'static, Fut: IntoFuture, + Cfg: Clone, { - fn into_new_service(self) -> FnNewService { + fn into_new_service(self) -> FnNewService { FnNewService::new(self) } } -impl Clone for FnNewService +impl Clone for FnNewService where F: Fn(Req) -> Fut + Clone, Fut: IntoFuture, @@ -261,7 +280,7 @@ where } /// `NewService` for state and handler functions -pub struct FnStateNewService { +pub struct FnStateNewService { f: F1, state: F2, s: marker::PhantomData, @@ -271,10 +290,11 @@ pub struct FnStateNewService { err2: marker::PhantomData, fut1: marker::PhantomData, fut2: marker::PhantomData, + cfg: marker::PhantomData, } -impl - FnStateNewService +impl + FnStateNewService { fn new(f: F1, state: F2) -> Self { FnStateNewService { @@ -287,12 +307,13 @@ impl err2: marker::PhantomData, fut1: marker::PhantomData, fut2: marker::PhantomData, + cfg: marker::PhantomData, } } } -impl NewService - for FnStateNewService +impl NewService + for FnStateNewService where S: 'static, F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, @@ -303,15 +324,17 @@ where Resp: 'static, Err1: 'static, Err2: 'static, + Cfg: Clone, { type Request = Req; type Response = Resp; type Error = Err1; type Service = FnStateService; + type Config = Cfg; type InitError = Err2; type Future = Box>; - fn new_service(&self) -> Self::Future { + fn new_service(&self, cfg: Cfg) -> Self::Future { let f = self.f.clone(); Box::new( (self.state)() @@ -321,8 +344,9 @@ where } } -impl - IntoNewService> for (F1, F2) +impl + IntoNewService> + for (F1, F2) where S: 'static, F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, @@ -333,16 +357,17 @@ where Resp: 'static, Err1: 'static, Err2: 'static, + Cfg: Clone, { fn into_new_service( self, - ) -> FnStateNewService { + ) -> FnStateNewService { FnStateNewService::new(self.0, self.1) } } -impl Clone - for FnStateNewService +impl Clone + for FnStateNewService where F1: Fn(&mut S, Req) -> Fut1 + Clone + 'static, F2: Fn() -> Fut2 + Clone, @@ -467,7 +492,12 @@ where impl NewService for AndThenNewService where - A: NewService, + A: NewService< + Response = B::Request, + Error = B::Error, + Config = B::Config, + InitError = B::InitError, + >, B: NewService, { type Request = A::Request; @@ -475,11 +505,12 @@ where type Error = A::Error; type Service = AndThen; + type Config = A::Config; type InitError = A::InitError; type Future = AndThenNewServiceFuture; - fn new_service(&self) -> Self::Future { - AndThenNewServiceFuture::new(self.a.new_service(), self.b.new_service()) + fn new_service(&self, cfg: A::Config) -> Self::Future { + AndThenNewServiceFuture::new(self.a.new_service(cfg.clone()), self.b.new_service(cfg)) } } @@ -669,11 +700,12 @@ where type Error = E; type Service = MapErr; + type Config = A::Config; type InitError = A::InitError; type Future = MapErrNewServiceFuture; - fn new_service(&self) -> Self::Future { - MapErrNewServiceFuture::new(self.a.new_service(), self.f.clone()) + fn new_service(&self, cfg: Self::Config) -> Self::Future { + MapErrNewServiceFuture::new(self.a.new_service(cfg), self.f.clone()) } } @@ -759,11 +791,12 @@ where type Error = A::Error; type Service = A::Service; + type Config = A::Config; type InitError = E; type Future = MapInitErrFuture; - fn new_service(&self) -> Self::Future { - MapInitErrFuture::new(self.a.new_service(), self.f.clone()) + fn new_service(&self, cfg: Self::Config) -> Self::Future { + MapInitErrFuture::new(self.a.new_service(cfg), self.f.clone()) } } diff --git a/src/ssl/openssl.rs b/src/ssl/openssl.rs index 512dcfb44..b6496b687 100644 --- a/src/ssl/openssl.rs +++ b/src/ssl/openssl.rs @@ -1,29 +1,31 @@ use std::marker::PhantomData; -use std::net::Shutdown; -use std::{io, time}; +// use std::net::Shutdown; +use std::io; use futures::{future, future::FutureResult, Async, Future, Poll}; use openssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_openssl::{AcceptAsync, SslAcceptorExt, SslStream}; -use tower_service::{NewService, Service}; -use {IntoNewService, IoStream}; +use server_config::SslConfig; +use {NewService, Service}; /// Support `SSL` connections via openssl package /// /// `alpn` feature enables `OpensslAcceptor` type -pub struct OpensslService { +pub struct OpensslService { acceptor: SslAcceptor, io: PhantomData, + cfg: PhantomData, } -impl OpensslService { +impl OpensslService { /// Create default `OpensslService` pub fn new(builder: SslAcceptorBuilder) -> Self { OpensslService { acceptor: builder.build(), io: PhantomData, + cfg: PhantomData, } } @@ -44,27 +46,32 @@ impl OpensslService { Ok(OpensslService { acceptor: builder.build(), io: PhantomData, + cfg: PhantomData, }) } } -impl Clone for OpensslService { +impl Clone for OpensslService { fn clone(&self) -> Self { Self { acceptor: self.acceptor.clone(), io: PhantomData, + cfg: PhantomData, } } } -impl NewService for OpensslService { +impl> NewService + for OpensslService +{ type Request = T; type Response = SslStream; type Error = io::Error; type Service = OpensslAcceptor; + type Config = Cfg; type InitError = io::Error; type Future = FutureResult; - fn new_service(&self) -> Self::Future { + fn new_service(&self, _: Self::Config) -> Self::Future { future::ok(OpensslAcceptor { acceptor: self.acceptor.clone(), io: PhantomData, diff --git a/src/worker.rs b/src/worker.rs index 80bf13566..d23972148 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -69,8 +69,8 @@ impl Actor for Worker { } impl Worker { - pub(crate) fn new( - ctx: &mut Context, services: Vec>, + pub(crate) fn new( + ctx: &mut Context, services: Vec + Send>>, ) -> Self { let wrk = Worker { services: Vec::new(),