From 35107f64e712fb4ec92af3ae0ccac23412376c3c Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 16 Oct 2017 13:13:32 -0700 Subject: [PATCH] make HttpServer generic over incoming stream --- src/httprequest.rs | 6 ---- src/server.rs | 75 ++++++++++++++++++++++++++++++++++---------- src/task.rs | 7 +++-- tests/test_server.rs | 64 +++++++++++++++++++++++++++++++++++++ 4 files changed, 126 insertions(+), 26 deletions(-) create mode 100644 tests/test_server.rs diff --git a/src/httprequest.rs b/src/httprequest.rs index 90e2ecd17..51b75f0b2 100644 --- a/src/httprequest.rs +++ b/src/httprequest.rs @@ -109,12 +109,6 @@ impl HttpRequest { Ok(&self.cookies) } - /// Get a mutable reference to the Request headers. - #[inline] - pub fn headers_mut(&mut self) -> &mut HeaderMap { - &mut self.headers - } - /// Get a reference to the Params object. /// Params is a container for url parameters. /// Route supports glob patterns: * for a single wildcard segment and :param diff --git a/src/server.rs b/src/server.rs index ac78de020..88848649c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,33 +1,63 @@ use std::{io, mem, net}; use std::rc::Rc; use std::time::Duration; +use std::marker::PhantomData; use std::collections::VecDeque; use actix::dev::*; -use futures::{Future, Poll, Async}; +use futures::{Future, Poll, Async, Stream}; use tokio_core::reactor::Timeout; use tokio_core::net::{TcpListener, TcpStream}; +use tokio_io::{AsyncRead, AsyncWrite}; use task::{Task, RequestInfo}; use router::Router; use reader::{Reader, ReaderError}; /// An HTTP Server -pub struct HttpServer { +/// +/// `T` - async stream, anything that implements `AsyncRead` + `AsyncWrite`. +/// +/// `A` - peer address +pub struct HttpServer { router: Rc, + io: PhantomData, + addr: PhantomData, } -impl Actor for HttpServer { +impl Actor for HttpServer { type Context = Context; } -impl HttpServer { +impl HttpServer { /// Create new http server with specified `RoutingMap` pub fn new(router: Router) -> Self { - HttpServer {router: Rc::new(router)} + HttpServer {router: Rc::new(router), io: PhantomData, addr: PhantomData} } +} + +impl HttpServer + where T: AsyncRead + AsyncWrite + 'static, + A: 'static +{ + /// Start listening for incomming connections from stream. + pub fn serve_incoming(self, stream: S) -> io::Result + where Self: ActorAddress, + S: Stream + 'static + { + Ok(HttpServer::create(move |ctx| { + ctx.add_stream(stream); + self + })) + } +} + +impl HttpServer { /// Start listening for incomming connections. + /// + /// This methods converts address to list of `SocketAddr` + /// then binds to all available addresses. pub fn serve(self, addr: S) -> io::Result where Self: ActorAddress, S: net::ToSocketAddrs, @@ -59,17 +89,24 @@ impl HttpServer { } } -impl ResponseType<(TcpStream, net::SocketAddr)> for HttpServer { +impl ResponseType<(T, A)> for HttpServer + where T: AsyncRead + AsyncWrite + 'static, + A: 'static +{ type Item = (); type Error = (); } -impl StreamHandler<(TcpStream, net::SocketAddr), io::Error> for HttpServer {} +impl StreamHandler<(T, A), io::Error> for HttpServer + where T: AsyncRead + AsyncWrite + 'static, + A: 'static { +} -impl Handler<(TcpStream, net::SocketAddr), io::Error> for HttpServer { - - fn handle(&mut self, msg: (TcpStream, net::SocketAddr), _: &mut Context) - -> Response +impl Handler<(T, A), io::Error> for HttpServer + where T: AsyncRead + AsyncWrite + 'static, + A: 'static +{ + fn handle(&mut self, msg: (T, A), _: &mut Context) -> Response { Arbiter::handle().spawn( HttpChannel{router: Rc::clone(&self.router), @@ -98,11 +135,11 @@ struct Entry { const KEEPALIVE_PERIOD: u64 = 15; // seconds const MAX_PIPELINED_MESSAGES: usize = 16; -pub struct HttpChannel { +pub struct HttpChannel { router: Rc, #[allow(dead_code)] - addr: net::SocketAddr, - stream: TcpStream, + addr: A, + stream: T, reader: Reader, error: bool, items: VecDeque, @@ -111,17 +148,21 @@ pub struct HttpChannel { keepalive_timer: Option, } -impl Drop for HttpChannel { +impl Drop for HttpChannel { fn drop(&mut self) { println!("Drop http channel"); } } -impl Actor for HttpChannel { +impl Actor for HttpChannel + where T: AsyncRead + AsyncWrite + 'static, A: 'static +{ type Context = Context; } -impl Future for HttpChannel { +impl Future for HttpChannel + where T: AsyncRead + AsyncWrite + 'static, A: 'static +{ type Item = (); type Error = (); diff --git a/src/task.rs b/src/task.rs index fd1523bf7..e7f51be9c 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,5 +1,4 @@ use std::{cmp, io}; -use std::io::Write as IoWrite; use std::fmt::Write; use std::collections::VecDeque; @@ -8,7 +7,7 @@ use http::header::{HeaderValue, CONNECTION, CONTENT_TYPE, CONTENT_LENGTH, TRANSFER_ENCODING, DATE}; use bytes::BytesMut; use futures::{Async, Future, Poll, Stream}; -use tokio_core::net::TcpStream; +use tokio_io::{AsyncRead, AsyncWrite}; use date; use route::Frame; @@ -225,7 +224,9 @@ impl Task { msg.replace_body(body); } - pub(crate) fn poll_io(&mut self, io: &mut TcpStream, info: &RequestInfo) -> Poll { + pub(crate) fn poll_io(&mut self, io: &mut T, info: &RequestInfo) -> Poll + where T: AsyncRead + AsyncWrite + { trace!("POLL-IO frames:{:?}", self.frames.len()); // response is completed if self.frames.is_empty() && self.iostate.is_done() { diff --git a/tests/test_server.rs b/tests/test_server.rs new file mode 100644 index 000000000..5111f3e6d --- /dev/null +++ b/tests/test_server.rs @@ -0,0 +1,64 @@ +extern crate actix; +extern crate actix_web; +extern crate futures; +extern crate tokio_core; + +use std::net; +use std::str::FromStr; +use std::io::prelude::*; +use actix::*; +use actix_web::*; +use futures::Future; +use tokio_core::net::{TcpStream, TcpListener}; + + +fn create_server() -> HttpServer { + HttpServer::new( + RoutingMap::default() + .resource("/", |r| + r.handler(Method::GET, |_, _, _| { + httpcodes::HTTPOk + })) + .finish()) +} + +#[test] +fn test_serve() { + let sys = System::new("test"); + + let srv = create_server(); + srv.serve::<_, ()>("127.0.0.1:58902").unwrap(); + let addr = net::SocketAddr::from_str("127.0.0.1:58902").unwrap(); + + Arbiter::handle().spawn( + TcpStream::connect(&addr, Arbiter::handle()).and_then(|mut stream| { + let _ = stream.write("GET /\r\n\r\n ".as_ref()); + Arbiter::system().send(msgs::SystemExit(0)); + futures::future::ok(()) + }).map_err(|_| panic!("should not happen")) + ); + + sys.run(); +} + +#[test] +fn test_serve_incoming() { + let sys = System::new("test"); + + let srv = create_server(); + let addr = net::SocketAddr::from_str("127.0.0.1:58906").unwrap(); + let tcp = TcpListener::bind(&addr, Arbiter::handle()).unwrap(); + srv.serve_incoming::<_, ()>(tcp.incoming()).unwrap(); + let addr = net::SocketAddr::from_str("127.0.0.1:58906").unwrap(); + + // connect + Arbiter::handle().spawn( + TcpStream::connect(&addr, Arbiter::handle()).and_then(|mut stream| { + let _ = stream.write("GET /\r\n\r\n ".as_ref()); + Arbiter::system().send(msgs::SystemExit(0)); + futures::future::ok(()) + }).map_err(|_| panic!("should not happen")) + ); + + sys.run(); +}