diff --git a/src/channel.rs b/src/channel.rs index 9940a297d..576c043de 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -11,7 +11,8 @@ use h2; use error::Error; use h1writer::Writer; use httprequest::HttpRequest; -use server::{ServerSettings, WorkerSettings}; +use server::ServerSettings; +use worker::WorkerSettings; /// Low level http request handler #[allow(unused_variables)] diff --git a/src/h1.rs b/src/h1.rs index 3f23029b0..b3cfbf2bd 100644 --- a/src/h1.rs +++ b/src/h1.rs @@ -17,7 +17,7 @@ use pipeline::Pipeline; use encoding::PayloadType; use channel::{HttpHandler, HttpHandlerTask}; use h1writer::{Writer, H1Writer}; -use server::WorkerSettings; +use worker::WorkerSettings; use httpcodes::HTTPNotFound; use httprequest::HttpRequest; use error::{ParseError, PayloadError, ResponseError}; @@ -888,7 +888,7 @@ mod tests { use http::{Version, Method}; use super::*; use application::HttpApplication; - use server::WorkerSettings; + use worker::WorkerSettings; struct Buffer { buf: Bytes, diff --git a/src/h2.rs b/src/h2.rs index f0ac8cb77..b3fdc5673 100644 --- a/src/h2.rs +++ b/src/h2.rs @@ -16,7 +16,7 @@ use tokio_core::reactor::Timeout; use pipeline::Pipeline; use h2writer::H2Writer; -use server::WorkerSettings; +use worker::WorkerSettings; use channel::{HttpHandler, HttpHandlerTask}; use error::PayloadError; use encoding::PayloadType; diff --git a/src/lib.rs b/src/lib.rs index f0178178d..b6c6abd58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,6 +103,7 @@ mod resource; mod handler; mod pipeline; mod server; +mod worker; mod channel; mod wsframe; mod wsproto; diff --git a/src/server.rs b/src/server.rs index cda09352f..ffac04b9f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,6 +1,5 @@ use std::{io, net, thread}; use std::rc::Rc; -use std::cell::{RefCell, RefMut}; use std::sync::{Arc, mpsc as sync_mpsc}; use std::time::Duration; use std::marker::PhantomData; @@ -11,11 +10,10 @@ use actix::System; use futures::Stream; use futures::sync::mpsc; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_core::reactor::Handle; use tokio_core::net::TcpStream; use mio; use num_cpus; -use net2::{TcpBuilder, TcpStreamExt}; +use net2::TcpBuilder; #[cfg(feature="tls")] use futures::{future, Future}; @@ -38,6 +36,7 @@ use actix::actors::signal; use helpers; use channel::{HttpChannel, HttpHandler, IntoHttpHandler}; +use worker::{Conn, Worker, WorkerSettings, StreamHandlerType}; /// Various server settings #[derive(Debug, Clone)] @@ -248,13 +247,13 @@ impl HttpServer } fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType) - -> Vec>> + -> Vec>> { // start workers let mut workers = Vec::new(); for _ in 0..self.threads { let s = settings.clone(); - let (tx, rx) = mpsc::unbounded::>(); + let (tx, rx) = mpsc::unbounded::>(); let h = handler.clone(); let ka = self.keep_alive; @@ -483,7 +482,7 @@ impl HttpServer // start server HttpServer::create(move |ctx| { ctx.add_stream(stream.map( - move |(t, _)| IoStream{io: t, peer: None, http2: false})); + move |(t, _)| Conn{io: t, peer: None, http2: false})); self }) } @@ -524,20 +523,13 @@ impl Handler for HttpServer } } -#[derive(Message)] -struct IoStream { - io: T, - peer: Option, - http2: bool, -} - -impl StreamHandler, io::Error> for HttpServer +impl StreamHandler, io::Error> for HttpServer where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static, U: 'static, A: 'static {} -impl Handler, io::Error> for HttpServer +impl Handler, io::Error> for HttpServer where T: AsyncRead + AsyncWrite + 'static, H: HttpHandler + 'static, U: 'static, @@ -547,8 +539,7 @@ impl Handler, io::Error> for HttpServer debug!("Error handling request: {}", err) } - fn handle(&mut self, msg: IoStream, _: &mut Context) - -> Response> + fn handle(&mut self, msg: Conn, _: &mut Context) -> Response> { Arbiter::handle().spawn( HttpChannel::new(Rc::clone(self.h.as_ref().unwrap()), msg.io, msg.peer, msg.http2)); @@ -629,163 +620,6 @@ impl Handler for HttpServer } } -/// Http worker -/// -/// Worker accepts Socket objects via unbounded channel and start requests processing. -struct Worker { - h: Rc>, - hnd: Handle, - handler: StreamHandlerType, -} - -pub(crate) struct WorkerSettings { - h: RefCell>, - enabled: bool, - keep_alive: u64, - bytes: Rc, - messages: Rc, -} - -impl WorkerSettings { - pub(crate) fn new(h: Vec, keep_alive: Option) -> WorkerSettings { - WorkerSettings { - h: RefCell::new(h), - enabled: if let Some(ka) = keep_alive { ka > 0 } else { false }, - keep_alive: keep_alive.unwrap_or(0), - bytes: Rc::new(helpers::SharedBytesPool::new()), - messages: Rc::new(helpers::SharedMessagePool::new()), - } - } - - pub fn handlers(&self) -> RefMut> { - self.h.borrow_mut() - } - pub fn keep_alive(&self) -> u64 { - self.keep_alive - } - pub fn keep_alive_enabled(&self) -> bool { - self.enabled - } - pub fn get_shared_bytes(&self) -> helpers::SharedBytes { - helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) - } - pub fn get_http_message(&self) -> helpers::SharedHttpMessage { - helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages)) - } -} - -impl Worker { - - fn new(h: Vec, handler: StreamHandlerType, keep_alive: Option) -> Worker { - Worker { - h: Rc::new(WorkerSettings::new(h, keep_alive)), - hnd: Arbiter::handle().clone(), - handler: handler, - } - } - - fn update_time(&self, ctx: &mut Context) { - helpers::update_date(); - ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); - } -} - -impl Actor for Worker { - type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - self.update_time(ctx); - } -} - -impl StreamHandler> for Worker - where H: HttpHandler + 'static {} - -impl Handler> for Worker - where H: HttpHandler + 'static, -{ - fn handle(&mut self, msg: IoStream, _: &mut Context) - -> Response> - { - if !self.h.keep_alive_enabled() && - msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err() - { - error!("Can not set socket keep-alive option"); - } - self.handler.handle(Rc::clone(&self.h), &self.hnd, msg); - Self::empty() - } -} - -#[derive(Clone)] -enum StreamHandlerType { - Normal, - #[cfg(feature="tls")] - Tls(TlsAcceptor), - #[cfg(feature="alpn")] - Alpn(SslAcceptor), -} - -impl StreamHandlerType { - - fn handle(&mut self, - h: Rc>, - hnd: &Handle, - msg: IoStream) { - match *self { - StreamHandlerType::Normal => { - let io = TcpStream::from_stream(msg.io, hnd) - .expect("failed to associate TCP stream"); - - hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); - } - #[cfg(feature="tls")] - StreamHandlerType::Tls(ref acceptor) => { - let IoStream { io, peer, http2 } = msg; - let io = TcpStream::from_stream(io, hnd) - .expect("failed to associate TCP stream"); - - hnd.spawn( - TlsAcceptorExt::accept_async(acceptor, io).then(move |res| { - match res { - Ok(io) => Arbiter::handle().spawn( - HttpChannel::new(h, io, peer, http2)), - Err(err) => - trace!("Error during handling tls connection: {}", err), - }; - future::result(Ok(())) - }) - ); - } - #[cfg(feature="alpn")] - StreamHandlerType::Alpn(ref acceptor) => { - let IoStream { io, peer, .. } = msg; - let io = TcpStream::from_stream(io, hnd) - .expect("failed to associate TCP stream"); - - hnd.spawn( - SslAcceptorExt::accept_async(acceptor, io).then(move |res| { - match res { - Ok(io) => { - let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol() - { - p.len() == 2 && &p == b"h2" - } else { - false - }; - Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2)); - }, - Err(err) => - trace!("Error during handling tls connection: {}", err), - }; - future::result(Ok(())) - }) - ); - } - } - } -} - enum Command { Pause, Resume, @@ -793,7 +627,7 @@ enum Command { } fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i32, - workers: Vec>>) + workers: Vec>>) -> (mio::SetReadiness, sync_mpsc::Sender) { let (tx, rx) = sync_mpsc::channel(); @@ -844,7 +678,7 @@ fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr, backlog: i loop { match server.accept_std() { Ok((sock, addr)) => { - let msg = IoStream{ + let msg = Conn{ io: sock, peer: Some(addr), http2: false}; workers[next].unbounded_send(msg) .expect("worker thread died"); diff --git a/src/worker.rs b/src/worker.rs new file mode 100644 index 000000000..347d02cc6 --- /dev/null +++ b/src/worker.rs @@ -0,0 +1,177 @@ +use std::{net, time}; +use std::rc::Rc; +use std::cell::{RefCell, RefMut}; +use tokio_core::net::TcpStream; +use tokio_core::reactor::Handle; +use net2::TcpStreamExt; + +use actix::{Actor, Arbiter, AsyncContext, Context, Handler, Response, StreamHandler}; + +use helpers; +use channel::{HttpChannel, HttpHandler}; + + +#[derive(Message)] +pub(crate) struct Conn { + pub io: T, + pub peer: Option, + pub http2: bool, +} + +pub(crate) struct WorkerSettings { + h: RefCell>, + enabled: bool, + keep_alive: u64, + bytes: Rc, + messages: Rc, +} + +impl WorkerSettings { + pub(crate) fn new(h: Vec, keep_alive: Option) -> WorkerSettings { + WorkerSettings { + h: RefCell::new(h), + enabled: if let Some(ka) = keep_alive { ka > 0 } else { false }, + keep_alive: keep_alive.unwrap_or(0), + bytes: Rc::new(helpers::SharedBytesPool::new()), + messages: Rc::new(helpers::SharedMessagePool::new()), + } + } + + pub fn handlers(&self) -> RefMut> { + self.h.borrow_mut() + } + pub fn keep_alive(&self) -> u64 { + self.keep_alive + } + pub fn keep_alive_enabled(&self) -> bool { + self.enabled + } + pub fn get_shared_bytes(&self) -> helpers::SharedBytes { + helpers::SharedBytes::new(self.bytes.get_bytes(), Rc::clone(&self.bytes)) + } + pub fn get_http_message(&self) -> helpers::SharedHttpMessage { + helpers::SharedHttpMessage::new(self.messages.get(), Rc::clone(&self.messages)) + } +} + +/// Http worker +/// +/// Worker accepts Socket objects via unbounded channel and start requests processing. +pub(crate) struct Worker { + h: Rc>, + hnd: Handle, + handler: StreamHandlerType, +} + +impl Worker { + + pub(crate) fn new(h: Vec, handler: StreamHandlerType, keep_alive: Option) + -> Worker + { + Worker { + h: Rc::new(WorkerSettings::new(h, keep_alive)), + hnd: Arbiter::handle().clone(), + handler: handler, + } + } + + fn update_time(&self, ctx: &mut Context) { + helpers::update_date(); + ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); + } +} + +impl Actor for Worker { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + self.update_time(ctx); + } +} + +impl StreamHandler> for Worker + where H: HttpHandler + 'static {} + +impl Handler> for Worker + where H: HttpHandler + 'static, +{ + fn handle(&mut self, msg: Conn, _: &mut Context) + -> Response> + { + if !self.h.keep_alive_enabled() && + msg.io.set_keepalive(Some(time::Duration::new(75, 0))).is_err() + { + error!("Can not set socket keep-alive option"); + } + self.handler.handle(Rc::clone(&self.h), &self.hnd, msg); + Self::empty() + } +} + +#[derive(Clone)] +pub(crate) enum StreamHandlerType { + Normal, + #[cfg(feature="tls")] + Tls(TlsAcceptor), + #[cfg(feature="alpn")] + Alpn(SslAcceptor), +} + +impl StreamHandlerType { + + fn handle(&mut self, + h: Rc>, + hnd: &Handle, msg: Conn) { + match *self { + StreamHandlerType::Normal => { + let io = TcpStream::from_stream(msg.io, hnd) + .expect("failed to associate TCP stream"); + + hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2)); + } + #[cfg(feature="tls")] + StreamHandlerType::Tls(ref acceptor) => { + let Conn { io, peer, http2 } = msg; + let io = TcpStream::from_stream(io, hnd) + .expect("failed to associate TCP stream"); + + hnd.spawn( + TlsAcceptorExt::accept_async(acceptor, io).then(move |res| { + match res { + Ok(io) => Arbiter::handle().spawn( + HttpChannel::new(h, io, peer, http2)), + Err(err) => + trace!("Error during handling tls connection: {}", err), + }; + future::result(Ok(())) + }) + ); + } + #[cfg(feature="alpn")] + StreamHandlerType::Alpn(ref acceptor) => { + let Conn { io, peer, .. } = msg; + let io = TcpStream::from_stream(io, hnd) + .expect("failed to associate TCP stream"); + + hnd.spawn( + SslAcceptorExt::accept_async(acceptor, io).then(move |res| { + match res { + Ok(io) => { + let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol() + { + p.len() == 2 && &p == b"h2" + } else { + false + }; + Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2)); + }, + Err(err) => + trace!("Error during handling tls connection: {}", err), + }; + future::result(Ok(())) + }) + ); + } + } + } +} diff --git a/tests/test_server.rs b/tests/test_server.rs index 0852affbd..032c750f4 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -24,7 +24,7 @@ fn test_start() { .resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]); let srv = srv.bind("127.0.0.1:0").unwrap(); - let addr = srv.addrs()[0].clone(); + let addr = srv.addrs()[0]; let srv_addr = srv.start(); let _ = tx.send((addr, srv_addr)); sys.run();