1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-11 01:39:33 +00:00

use mio for accept loop

This commit is contained in:
Nikolay Kim 2017-12-27 11:22:27 -08:00
parent be1cd2936d
commit da8aa8b988
5 changed files with 109 additions and 72 deletions

View file

@ -46,7 +46,6 @@ regex = "0.2"
sha1 = "0.2"
url = "1.5"
libc = "0.2"
socket2 = "0.2"
serde = "1.0"
serde_json = "1.0"
flate2 = "0.2"
@ -57,7 +56,9 @@ bitflags = "1.0"
num_cpus = "1.0"
cookie = { version="0.10", features=["percent-encode", "secure"] }
# tokio
# io
mio = "0.6"
net2 = "0.2"
bytes = "0.4"
futures = "0.1"
tokio-io = "0.1"

View file

@ -50,6 +50,8 @@ extern crate bitflags;
extern crate futures;
extern crate tokio_io;
extern crate tokio_core;
extern crate mio;
extern crate net2;
extern crate failure;
#[macro_use] extern crate failure_derive;
@ -69,7 +71,6 @@ extern crate brotli2;
extern crate percent_encoding;
extern crate smallvec;
extern crate num_cpus;
extern crate socket2;
extern crate actix;
extern crate h2 as http2;

View file

@ -10,9 +10,11 @@ use actix::dev::*;
use futures::Stream;
use futures::sync::mpsc;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_core::reactor::Handle;
use tokio_core::net::TcpStream;
use mio;
use num_cpus;
use socket2::{Socket, Domain, Type};
use net2::{TcpBuilder, TcpStreamExt};
#[cfg(feature="tls")]
use futures::{future, Future};
@ -103,7 +105,7 @@ pub struct HttpServer<T, A, H, U>
keep_alive: Option<u64>,
factory: Arc<Fn() -> U + Send + Sync>,
workers: Vec<SyncAddress<Worker<H>>>,
sockets: HashMap<net::SocketAddr, Socket>,
sockets: HashMap<net::SocketAddr, net::TcpListener>,
}
impl<T: 'static, A: 'static, H, U: 'static> Actor for HttpServer<T, A, H, U> {
@ -160,6 +162,8 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
/// attempting to connect. It should only affect servers under significant load.
///
/// Generally set in the 64-2048 range. Default value is 2048.
///
/// This method should be called before `bind()` method call.
pub fn backlog(mut self, num: i32) -> Self {
self.backlog = num;
self
@ -202,34 +206,22 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
let mut succ = false;
if let Ok(iter) = addr.to_socket_addrs() {
for addr in iter {
let socket = match addr {
net::SocketAddr::V4(a) => {
let socket = Socket::new(Domain::ipv4(), Type::stream(), None)?;
match socket.bind(&a.into()) {
Ok(_) => socket,
Err(e) => {
err = Some(e);
continue;
}
}
}
net::SocketAddr::V6(a) => {
let socket = Socket::new(Domain::ipv6(), Type::stream(), None)?;
match socket.bind(&a.into()) {
Ok(_) => socket,
Err(e) => {
err = Some(e);
continue
}
}
}
let builder = match addr {
net::SocketAddr::V4(_) => TcpBuilder::new_v4()?,
net::SocketAddr::V6(_) => TcpBuilder::new_v6()?,
};
succ = true;
socket.listen(self.backlog)
.expect("failed to set socket backlog");
socket.set_reuse_address(true)
.expect("failed to set socket reuse address");
self.sockets.insert(addr, socket);
match builder.bind(addr) {
Ok(builder) => match builder.reuse_address(true) {
Ok(builder) => {
succ = true;
let lst = builder.listen(self.backlog)
.expect("failed to set socket backlog");
self.sockets.insert(lst.local_addr().unwrap(), lst);
},
Err(e) => err = Some(e)
},
Err(e) => err = Some(e),
}
}
}
@ -245,13 +237,13 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
}
fn start_workers(&mut self, settings: &ServerSettings, handler: &StreamHandlerType)
-> Vec<mpsc::UnboundedSender<IoStream<Socket>>>
-> Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>>
{
// start workers
let mut workers = Vec::new();
for _ in 0..self.threads {
let s = settings.clone();
let (tx, rx) = mpsc::unbounded::<IoStream<Socket>>();
let (tx, rx) = mpsc::unbounded::<IoStream<net::TcpStream>>();
let h = handler.clone();
let ka = self.keep_alive;
@ -309,7 +301,8 @@ impl<H: HttpHandler, U, V> HttpServer<TcpStream, net::SocketAddr, H, U>
if self.sockets.is_empty() {
panic!("HttpServer::bind() has to be called befor start()");
} else {
let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
@ -413,7 +406,8 @@ impl<T, A, H, U, V> HttpServer<T, A, H, U>
where S: Stream<Item=(T, A), Error=io::Error> + 'static
{
if !self.sockets.is_empty() {
let addrs: Vec<(net::SocketAddr, Socket)> = self.sockets.drain().collect();
let addrs: Vec<(net::SocketAddr, net::TcpListener)> =
self.sockets.drain().collect();
let settings = ServerSettings::new(Some(addrs[0].0), &self.host, false);
let workers = self.start_workers(&settings, &StreamHandlerType::Normal);
@ -484,6 +478,7 @@ impl<T, A, H, U> Handler<IoStream<T>, io::Error> for HttpServer<T, A, H, U>
/// Worker accepts Socket objects via unbounded channel and start requests processing.
struct Worker<H> {
h: Rc<WorkerSettings<H>>,
hnd: Handle,
handler: StreamHandlerType,
}
@ -528,6 +523,7 @@ impl<H: 'static> Worker<H> {
fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>) -> Worker<H> {
Worker {
h: Rc::new(WorkerSettings::new(h, keep_alive)),
hnd: Arbiter::handle().clone(),
handler: handler,
}
}
@ -546,21 +542,21 @@ impl<H: 'static> Actor for Worker<H> {
}
}
impl<H> StreamHandler<IoStream<Socket>> for Worker<H>
impl<H> StreamHandler<IoStream<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static {}
impl<H> Handler<IoStream<Socket>> for Worker<H>
impl<H> Handler<IoStream<net::TcpStream>> for Worker<H>
where H: HttpHandler + 'static,
{
fn handle(&mut self, msg: IoStream<Socket>, _: &mut Context<Self>)
-> Response<Self, IoStream<Socket>>
fn handle(&mut self, msg: IoStream<net::TcpStream>, _: &mut Context<Self>)
-> Response<Self, IoStream<net::TcpStream>>
{
if !self.h.keep_alive_enabled() &&
msg.io.set_keepalive(Some(Duration::new(75, 0))).is_err()
{
error!("Can not set socket keep-alive option");
}
self.handler.handle(Rc::clone(&self.h), msg);
self.handler.handle(Rc::clone(&self.h), &self.hnd, msg);
Self::empty()
}
}
@ -576,25 +572,27 @@ enum StreamHandlerType {
impl StreamHandlerType {
fn handle<H: HttpHandler>(&mut self, h: Rc<WorkerSettings<H>>, msg: IoStream<Socket>) {
fn handle<H: HttpHandler>(&mut self,
h: Rc<WorkerSettings<H>>,
hnd: &Handle,
msg: IoStream<net::TcpStream>) {
match *self {
StreamHandlerType::Normal => {
let io = TcpStream::from_stream(msg.io.into_tcp_stream(), Arbiter::handle())
let io = TcpStream::from_stream(msg.io, hnd)
.expect("failed to associate TCP stream");
Arbiter::handle().spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
}
#[cfg(feature="tls")]
StreamHandlerType::Tls(ref acceptor) => {
let IoStream { io, peer, http2 } = msg;
let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle())
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
Arbiter::handle().spawn(
TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => Arbiter::handle().spawn(
HttpChannel::new(h, io, peer, http2)),
Ok(io) => hnd.spawn(HttpChannel::new(h, io, peer, http2)),
Err(err) =>
trace!("Error during handling tls connection: {}", err),
};
@ -605,10 +603,10 @@ impl StreamHandlerType {
#[cfg(feature="alpn")]
StreamHandlerType::Alpn(ref acceptor) => {
let IoStream { io, peer, .. } = msg;
let io = TcpStream::from_stream(io.into_tcp_stream(), Arbiter::handle())
let io = TcpStream::from_stream(io, hnd)
.expect("failed to associate TCP stream");
Arbiter::handle().spawn(
hnd.spawn(
SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
match res {
Ok(io) => {
@ -631,24 +629,57 @@ impl StreamHandlerType {
}
}
fn start_accept_thread(sock: Socket, addr: net::SocketAddr,
workers: Vec<mpsc::UnboundedSender<IoStream<Socket>>>) {
// start acceptors thread
fn start_accept_thread(sock: net::TcpListener, addr: net::SocketAddr,
workers: Vec<mpsc::UnboundedSender<IoStream<net::TcpStream>>>) {
// start accept thread
let _ = thread::Builder::new().name(format!("Accept on {}", addr)).spawn(move || {
let mut next = 0;
let server = mio::net::TcpListener::from_listener(sock, &addr)
.expect("Can not create mio::net::TcpListener");
const SERVER: mio::Token = mio::Token(0);
// Create a poll instance
let poll = match mio::Poll::new() {
Ok(poll) => poll,
Err(err) => panic!("Can not create mio::Poll: {}", err),
};
// Start listening for incoming connections
if let Err(err) = poll.register(&server, SERVER,
mio::Ready::readable(), mio::PollOpt::edge()) {
panic!("Can not register io: {}", err);
}
// Create storage for events
let mut events = mio::Events::with_capacity(128);
loop {
match sock.accept() {
Ok((socket, addr)) => {
let addr = if let Some(addr) = addr.as_inet() {
net::SocketAddr::V4(addr)
} else {
net::SocketAddr::V6(addr.as_inet6().unwrap())
};
let msg = IoStream{io: socket, peer: Some(addr), http2: false};
workers[next].unbounded_send(msg).expect("worker thread died");
next = (next + 1) % workers.len();
if let Err(err) = poll.poll(&mut events, None) {
panic!("Poll error: {}", err);
}
for event in events.iter() {
match event.token() {
SERVER => {
loop {
match server.accept_std() {
Ok((sock, addr)) => {
let msg = IoStream{io: sock, peer: Some(addr), http2: false};
workers[next]
.unbounded_send(msg).expect("worker thread died");
next = (next + 1) % workers.len();
},
Err(err) => if err.kind() == io::ErrorKind::WouldBlock {
break
} else {
error!("Error accepting connection: {:?}", err);
return
}
}
}
}
_ => unreachable!(),
}
Err(err) => error!("Error accepting connection: {:?}", err),
}
}
});

View file

@ -11,9 +11,9 @@ use cookie::Cookie;
use http::{Uri, Method, Version, HeaderMap, HttpTryFrom};
use http::header::{HeaderName, HeaderValue};
use futures::Future;
use socket2::{Socket, Domain, Type};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use net2::TcpBuilder;
use error::Error;
use server::HttpServer;
@ -139,10 +139,10 @@ impl TestServer {
/// Get firat available unused address
pub fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = Socket::new(Domain::ipv4(), Type::stream(), None).unwrap();
socket.bind(&addr.into()).unwrap();
socket.set_reuse_address(true).unwrap();
let tcp = socket.into_tcp_listener();
let socket = TcpBuilder::new_v4().unwrap();
socket.bind(&addr).unwrap();
socket.reuse_address(true).unwrap();
let tcp = socket.to_tcp_listener().unwrap();
tcp.local_addr().unwrap()
}

View file

@ -4,7 +4,7 @@ extern crate tokio_core;
extern crate reqwest;
use std::thread;
use std::sync::Arc;
use std::sync::{Arc, mpsc};
use std::sync::atomic::{AtomicUsize, Ordering};
use actix_web::*;
@ -12,16 +12,20 @@ use actix::System;
#[test]
fn test_start() {
let addr = test::TestServer::unused_addr();
let srv_addr = addr.clone();
let _ = test::TestServer::unused_addr();
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let sys = System::new("test");
let srv = HttpServer::new(
|| vec![Application::new()
.resource("/", |r| r.method(Method::GET).h(httpcodes::HTTPOk))]);
srv.bind(srv_addr).unwrap().start();
let srv = srv.bind("127.0.0.1:0").unwrap();
let _ = tx.send(srv.addrs()[0].clone());
srv.start();
sys.run();
});
let addr = rx.recv().unwrap();
assert!(reqwest::get(&format!("http://{}/", addr)).unwrap().status().is_success());
}