1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-02 21:39:26 +00:00

make HttpServer generic over incoming stream

This commit is contained in:
Nikolay Kim 2017-10-16 13:13:32 -07:00
parent 2e96a79969
commit 35107f64e7
4 changed files with 126 additions and 26 deletions

View file

@ -109,12 +109,6 @@ impl HttpRequest {
Ok(&self.cookies)
}
/// Get a mutable reference to the Request headers.
#[inline]
pub fn headers_mut(&mut self) -> &mut HeaderMap {
&mut self.headers
}
/// Get a reference to the Params object.
/// Params is a container for url parameters.
/// Route supports glob patterns: * for a single wildcard segment and :param

View file

@ -1,33 +1,63 @@
use std::{io, mem, net};
use std::rc::Rc;
use std::time::Duration;
use std::marker::PhantomData;
use std::collections::VecDeque;
use actix::dev::*;
use futures::{Future, Poll, Async};
use futures::{Future, Poll, Async, Stream};
use tokio_core::reactor::Timeout;
use tokio_core::net::{TcpListener, TcpStream};
use tokio_io::{AsyncRead, AsyncWrite};
use task::{Task, RequestInfo};
use router::Router;
use reader::{Reader, ReaderError};
/// An HTTP Server
pub struct HttpServer {
///
/// `T` - async stream, anything that implements `AsyncRead` + `AsyncWrite`.
///
/// `A` - peer address
pub struct HttpServer<T, A> {
router: Rc<Router>,
io: PhantomData<T>,
addr: PhantomData<A>,
}
impl Actor for HttpServer {
impl<T: 'static, A: 'static> Actor for HttpServer<T, A> {
type Context = Context<Self>;
}
impl HttpServer {
impl<T, A> HttpServer<T, A> {
/// Create new http server with specified `RoutingMap`
pub fn new(router: Router) -> Self {
HttpServer {router: Rc::new(router)}
HttpServer {router: Rc::new(router), io: PhantomData, addr: PhantomData}
}
}
impl<T, A> HttpServer<T, A>
where T: AsyncRead + AsyncWrite + 'static,
A: 'static
{
/// Start listening for incomming connections from stream.
pub fn serve_incoming<S, Addr>(self, stream: S) -> io::Result<Addr>
where Self: ActorAddress<Self, Addr>,
S: Stream<Item=(T, A), Error=io::Error> + 'static
{
Ok(HttpServer::create(move |ctx| {
ctx.add_stream(stream);
self
}))
}
}
impl HttpServer<TcpStream, net::SocketAddr> {
/// Start listening for incomming connections.
///
/// This methods converts address to list of `SocketAddr`
/// then binds to all available addresses.
pub fn serve<S, Addr>(self, addr: S) -> io::Result<Addr>
where Self: ActorAddress<Self, Addr>,
S: net::ToSocketAddrs,
@ -59,17 +89,24 @@ impl HttpServer {
}
}
impl ResponseType<(TcpStream, net::SocketAddr)> for HttpServer {
impl<T, A> ResponseType<(T, A)> for HttpServer<T, A>
where T: AsyncRead + AsyncWrite + 'static,
A: 'static
{
type Item = ();
type Error = ();
}
impl StreamHandler<(TcpStream, net::SocketAddr), io::Error> for HttpServer {}
impl<T, A> StreamHandler<(T, A), io::Error> for HttpServer<T, A>
where T: AsyncRead + AsyncWrite + 'static,
A: 'static {
}
impl Handler<(TcpStream, net::SocketAddr), io::Error> for HttpServer {
fn handle(&mut self, msg: (TcpStream, net::SocketAddr), _: &mut Context<Self>)
-> Response<Self, (TcpStream, net::SocketAddr)>
impl<T, A> Handler<(T, A), io::Error> for HttpServer<T, A>
where T: AsyncRead + AsyncWrite + 'static,
A: 'static
{
fn handle(&mut self, msg: (T, A), _: &mut Context<Self>) -> Response<Self, (T, A)>
{
Arbiter::handle().spawn(
HttpChannel{router: Rc::clone(&self.router),
@ -98,11 +135,11 @@ struct Entry {
const KEEPALIVE_PERIOD: u64 = 15; // seconds
const MAX_PIPELINED_MESSAGES: usize = 16;
pub struct HttpChannel {
pub struct HttpChannel<T: 'static, A: 'static> {
router: Rc<Router>,
#[allow(dead_code)]
addr: net::SocketAddr,
stream: TcpStream,
addr: A,
stream: T,
reader: Reader,
error: bool,
items: VecDeque<Entry>,
@ -111,17 +148,21 @@ pub struct HttpChannel {
keepalive_timer: Option<Timeout>,
}
impl Drop for HttpChannel {
impl<T: 'static, A: 'static> Drop for HttpChannel<T, A> {
fn drop(&mut self) {
println!("Drop http channel");
}
}
impl Actor for HttpChannel {
impl<T, A> Actor for HttpChannel<T, A>
where T: AsyncRead + AsyncWrite + 'static, A: 'static
{
type Context = Context<Self>;
}
impl Future for HttpChannel {
impl<T, A> Future for HttpChannel<T, A>
where T: AsyncRead + AsyncWrite + 'static, A: 'static
{
type Item = ();
type Error = ();

View file

@ -1,5 +1,4 @@
use std::{cmp, io};
use std::io::Write as IoWrite;
use std::fmt::Write;
use std::collections::VecDeque;
@ -8,7 +7,7 @@ use http::header::{HeaderValue,
CONNECTION, CONTENT_TYPE, CONTENT_LENGTH, TRANSFER_ENCODING, DATE};
use bytes::BytesMut;
use futures::{Async, Future, Poll, Stream};
use tokio_core::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
use date;
use route::Frame;
@ -225,7 +224,9 @@ impl Task {
msg.replace_body(body);
}
pub(crate) fn poll_io(&mut self, io: &mut TcpStream, info: &RequestInfo) -> Poll<bool, ()> {
pub(crate) fn poll_io<T>(&mut self, io: &mut T, info: &RequestInfo) -> Poll<bool, ()>
where T: AsyncRead + AsyncWrite
{
trace!("POLL-IO frames:{:?}", self.frames.len());
// response is completed
if self.frames.is_empty() && self.iostate.is_done() {

64
tests/test_server.rs Normal file
View file

@ -0,0 +1,64 @@
extern crate actix;
extern crate actix_web;
extern crate futures;
extern crate tokio_core;
use std::net;
use std::str::FromStr;
use std::io::prelude::*;
use actix::*;
use actix_web::*;
use futures::Future;
use tokio_core::net::{TcpStream, TcpListener};
fn create_server<T, A>() -> HttpServer<T, A> {
HttpServer::new(
RoutingMap::default()
.resource("/", |r|
r.handler(Method::GET, |_, _, _| {
httpcodes::HTTPOk
}))
.finish())
}
#[test]
fn test_serve() {
let sys = System::new("test");
let srv = create_server();
srv.serve::<_, ()>("127.0.0.1:58902").unwrap();
let addr = net::SocketAddr::from_str("127.0.0.1:58902").unwrap();
Arbiter::handle().spawn(
TcpStream::connect(&addr, Arbiter::handle()).and_then(|mut stream| {
let _ = stream.write("GET /\r\n\r\n ".as_ref());
Arbiter::system().send(msgs::SystemExit(0));
futures::future::ok(())
}).map_err(|_| panic!("should not happen"))
);
sys.run();
}
#[test]
fn test_serve_incoming() {
let sys = System::new("test");
let srv = create_server();
let addr = net::SocketAddr::from_str("127.0.0.1:58906").unwrap();
let tcp = TcpListener::bind(&addr, Arbiter::handle()).unwrap();
srv.serve_incoming::<_, ()>(tcp.incoming()).unwrap();
let addr = net::SocketAddr::from_str("127.0.0.1:58906").unwrap();
// connect
Arbiter::handle().spawn(
TcpStream::connect(&addr, Arbiter::handle()).and_then(|mut stream| {
let _ = stream.write("GET /\r\n\r\n ".as_ref());
Arbiter::system().send(msgs::SystemExit(0));
futures::future::ok(())
}).map_err(|_| panic!("should not happen"))
);
sys.run();
}