mirror of
https://github.com/actix/actix-web.git
synced 2025-01-10 17:25:36 +00:00
refactor server impl and add support for alpn http2 negotiation
This commit is contained in:
parent
32cefb8455
commit
d7d3d663e9
14 changed files with 240 additions and 136 deletions
12
Cargo.toml
12
Cargo.toml
|
@ -28,6 +28,9 @@ default = []
|
||||||
# tls
|
# tls
|
||||||
tls = ["native-tls", "tokio-tls"]
|
tls = ["native-tls", "tokio-tls"]
|
||||||
|
|
||||||
|
# openssl
|
||||||
|
alpn = ["openssl", "openssl/v102", "openssl/v110", "tokio-openssl"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
log = "0.3"
|
log = "0.3"
|
||||||
time = "0.1"
|
time = "0.1"
|
||||||
|
@ -50,10 +53,13 @@ tokio-core = "0.1"
|
||||||
|
|
||||||
h2 = { git = 'https://github.com/carllerche/h2' }
|
h2 = { git = 'https://github.com/carllerche/h2' }
|
||||||
|
|
||||||
# tls
|
# native-tls
|
||||||
native-tls = { version="0.1", optional = true }
|
native-tls = { version="0.1", optional = true }
|
||||||
tokio-tls = { version="0.1", optional = true }
|
tokio-tls = { version="0.1", optional = true }
|
||||||
|
|
||||||
|
# openssl
|
||||||
|
tokio-openssl = { version="0.1", optional = true }
|
||||||
|
|
||||||
[dependencies.actix]
|
[dependencies.actix]
|
||||||
version = ">=0.3.1"
|
version = ">=0.3.1"
|
||||||
#path = "../actix"
|
#path = "../actix"
|
||||||
|
@ -61,6 +67,10 @@ version = ">=0.3.1"
|
||||||
default-features = false
|
default-features = false
|
||||||
features = []
|
features = []
|
||||||
|
|
||||||
|
[dependencies.openssl]
|
||||||
|
version = "0.9"
|
||||||
|
optional = true
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
env_logger = "0.4"
|
env_logger = "0.4"
|
||||||
reqwest = "0.8"
|
reqwest = "0.8"
|
||||||
|
|
19
README.md
19
README.md
|
@ -13,14 +13,25 @@ Actix web is licensed under the [Apache-2.0 license](http://opensource.org/licen
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
* HTTP/1 and HTTP/2 support
|
* HTTP/1 and HTTP/2
|
||||||
* Streaming and pipelining support
|
* Streaming and pipelining
|
||||||
* Keep-alive and slow requests support
|
* Keep-alive and slow requests handling
|
||||||
* [WebSockets support](https://actix.github.io/actix-web/actix_web/ws/index.html)
|
* [WebSockets](https://actix.github.io/actix-web/actix_web/ws/index.html)
|
||||||
* Configurable request routing
|
* Configurable request routing
|
||||||
* Multipart streams
|
* Multipart streams
|
||||||
* Middlewares
|
* Middlewares
|
||||||
|
|
||||||
|
## HTTP/2 Negotiation
|
||||||
|
|
||||||
|
To use http/2 protocol over tls without prior knowlage requires
|
||||||
|
[tls alpn]( (https://tools.ietf.org/html/rfc7301). At the moment only
|
||||||
|
rust-openssl supports alpn.
|
||||||
|
|
||||||
|
```toml
|
||||||
|
[dependencies]
|
||||||
|
actix-web = { git = "https://github.com/actix/actix-web", features=["alpn"] }
|
||||||
|
```
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
To use `actix-web`, add this to your `Cargo.toml`:
|
To use `actix-web`, add this to your `Cargo.toml`:
|
||||||
|
|
|
@ -11,4 +11,4 @@ path = "src/main.rs"
|
||||||
env_logger = "0.4"
|
env_logger = "0.4"
|
||||||
|
|
||||||
actix = "0.3.1"
|
actix = "0.3.1"
|
||||||
actix-web = { path = "../../", features=["tls"] }
|
actix-web = { path = "../../", features=["alpn"] }
|
||||||
|
|
|
@ -26,7 +26,7 @@ fn main() {
|
||||||
let mut file = File::open("identity.pfx").unwrap();
|
let mut file = File::open("identity.pfx").unwrap();
|
||||||
let mut pkcs12 = vec![];
|
let mut pkcs12 = vec![];
|
||||||
file.read_to_end(&mut pkcs12).unwrap();
|
file.read_to_end(&mut pkcs12).unwrap();
|
||||||
let pkcs12 = Pkcs12::from_der(&pkcs12, "12345").unwrap();
|
let pkcs12 = Pkcs12::from_der(&pkcs12).unwrap().parse("12345").unwrap();
|
||||||
|
|
||||||
HttpServer::new(
|
HttpServer::new(
|
||||||
Application::default("/")
|
Application::default("/")
|
||||||
|
|
|
@ -9,7 +9,7 @@ use resource::Resource;
|
||||||
use recognizer::{RouteRecognizer, check_pattern};
|
use recognizer::{RouteRecognizer, check_pattern};
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
use httpresponse::HttpResponse;
|
use httpresponse::HttpResponse;
|
||||||
use server::HttpHandler;
|
use channel::HttpHandler;
|
||||||
|
|
||||||
|
|
||||||
/// Middleware definition
|
/// Middleware definition
|
||||||
|
|
98
src/channel.rs
Normal file
98
src/channel.rs
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
use actix::dev::*;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::{Future, Poll, Async};
|
||||||
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
|
use h1;
|
||||||
|
use h2;
|
||||||
|
use task::Task;
|
||||||
|
use payload::Payload;
|
||||||
|
use httprequest::HttpRequest;
|
||||||
|
|
||||||
|
/// Low level http request handler
|
||||||
|
pub trait HttpHandler: 'static {
|
||||||
|
/// Http handler prefix
|
||||||
|
fn prefix(&self) -> &str;
|
||||||
|
/// Handle request
|
||||||
|
fn handle(&self, req: &mut HttpRequest, payload: Payload) -> Task;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum HttpProtocol<T, A, H>
|
||||||
|
where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: 'static
|
||||||
|
{
|
||||||
|
H1(h1::Http1<T, A, H>),
|
||||||
|
H2(h2::Http2<T, A, H>),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct HttpChannel<T, A, H>
|
||||||
|
where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: 'static
|
||||||
|
{
|
||||||
|
proto: Option<HttpProtocol<T, A, H>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, A, H> HttpChannel<T, A, H>
|
||||||
|
where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: HttpHandler + 'static
|
||||||
|
{
|
||||||
|
pub fn new(stream: T, addr: A, router: Rc<Vec<H>>, http2: bool) -> HttpChannel<T, A, H> {
|
||||||
|
if http2 {
|
||||||
|
HttpChannel {
|
||||||
|
proto: Some(HttpProtocol::H2(
|
||||||
|
h2::Http2::new(stream, addr, router, Bytes::new()))) }
|
||||||
|
} else {
|
||||||
|
HttpChannel {
|
||||||
|
proto: Some(HttpProtocol::H1(
|
||||||
|
h1::Http1::new(stream, addr, router))) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*impl<T: 'static, A: 'static, H: 'static> Drop for HttpChannel<T, A, H> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
println!("Drop http channel");
|
||||||
|
}
|
||||||
|
}*/
|
||||||
|
|
||||||
|
impl<T, A, H> Actor for HttpChannel<T, A, H>
|
||||||
|
where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: HttpHandler + 'static
|
||||||
|
{
|
||||||
|
type Context = Context<Self>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T, A, H> Future for HttpChannel<T, A, H>
|
||||||
|
where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: HttpHandler + 'static
|
||||||
|
{
|
||||||
|
type Item = ();
|
||||||
|
type Error = ();
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
match self.proto {
|
||||||
|
Some(HttpProtocol::H1(ref mut h1)) => {
|
||||||
|
match h1.poll() {
|
||||||
|
Ok(Async::Ready(h1::Http1Result::Done)) =>
|
||||||
|
return Ok(Async::Ready(())),
|
||||||
|
Ok(Async::Ready(h1::Http1Result::Upgrade)) => (),
|
||||||
|
Ok(Async::NotReady) =>
|
||||||
|
return Ok(Async::NotReady),
|
||||||
|
Err(_) =>
|
||||||
|
return Err(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(HttpProtocol::H2(ref mut h2)) =>
|
||||||
|
return h2.poll(),
|
||||||
|
None => unreachable!(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// upgrade to h2
|
||||||
|
let proto = self.proto.take().unwrap();
|
||||||
|
match proto {
|
||||||
|
HttpProtocol::H1(h1) => {
|
||||||
|
let (stream, addr, router, buf) = h1.into_inner();
|
||||||
|
self.proto = Some(HttpProtocol::H2(h2::Http2::new(stream, addr, router, buf)));
|
||||||
|
self.poll()
|
||||||
|
}
|
||||||
|
_ => unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
14
src/h1.rs
14
src/h1.rs
|
@ -15,7 +15,7 @@ use tokio_core::reactor::Timeout;
|
||||||
use percent_encoding;
|
use percent_encoding;
|
||||||
|
|
||||||
use task::Task;
|
use task::Task;
|
||||||
use server::HttpHandler;
|
use channel::HttpHandler;
|
||||||
use error::ParseError;
|
use error::ParseError;
|
||||||
use httpcodes::HTTPNotFound;
|
use httpcodes::HTTPNotFound;
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
|
@ -75,7 +75,7 @@ impl<T, A, H> Http1<T, A, H>
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn into_inner(mut self) -> (T, A, Rc<Vec<H>>, Bytes) {
|
pub fn into_inner(mut self) -> (T, A, Rc<Vec<H>>, Bytes) {
|
||||||
(self.stream.into_inner(), self.addr, self.router, self.read_buf.freeze())
|
(self.stream.unwrap(), self.addr, self.router, self.read_buf.freeze())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn poll(&mut self) -> Poll<Http1Result, ()> {
|
pub fn poll(&mut self) -> Poll<Http1Result, ()> {
|
||||||
|
@ -114,7 +114,7 @@ impl<T, A, H> Http1<T, A, H>
|
||||||
if self.keepalive {
|
if self.keepalive {
|
||||||
self.keepalive = self.stream.keepalive();
|
self.keepalive = self.stream.keepalive();
|
||||||
}
|
}
|
||||||
self.stream = H1Writer::new(self.stream.into_inner());
|
self.stream = H1Writer::new(self.stream.unwrap());
|
||||||
|
|
||||||
item.eof = true;
|
item.eof = true;
|
||||||
if ready {
|
if ready {
|
||||||
|
@ -251,12 +251,12 @@ impl<T, A, H> Http1<T, A, H>
|
||||||
|
|
||||||
// check for parse error
|
// check for parse error
|
||||||
if self.tasks.is_empty() {
|
if self.tasks.is_empty() {
|
||||||
|
if self.h2 {
|
||||||
|
return Ok(Async::Ready(Http1Result::Upgrade))
|
||||||
|
}
|
||||||
if self.error || self.keepalive_timer.is_none() {
|
if self.error || self.keepalive_timer.is_none() {
|
||||||
return Ok(Async::Ready(Http1Result::Done))
|
return Ok(Async::Ready(Http1Result::Done))
|
||||||
}
|
}
|
||||||
else if self.h2 {
|
|
||||||
return Ok(Async::Ready(Http1Result::Upgrade))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if not_ready {
|
if not_ready {
|
||||||
|
@ -482,7 +482,7 @@ impl Reader {
|
||||||
if buf.is_empty() {
|
if buf.is_empty() {
|
||||||
return Ok(Message::NotReady);
|
return Ok(Message::NotReady);
|
||||||
}
|
}
|
||||||
if buf.len() >= 14 && &buf[..14] == &HTTP2_PREFACE[..] {
|
if buf.len() >= 14 && buf[..14] == HTTP2_PREFACE[..] {
|
||||||
return Ok(Message::Http2)
|
return Ok(Message::Http2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ impl<T: AsyncWrite> H1Writer<T> {
|
||||||
self.stream.as_mut().unwrap()
|
self.stream.as_mut().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn into_inner(&mut self) -> T {
|
pub fn unwrap(&mut self) -> T {
|
||||||
self.stream.take().unwrap()
|
self.stream.take().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,12 +90,11 @@ impl<T: AsyncWrite> H1Writer<T> {
|
||||||
return Ok(WriterState::Done)
|
return Ok(WriterState::Done)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) =>
|
Err(err) => return Err(err),
|
||||||
return Err(err),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Ok(WriterState::Done)
|
Ok(WriterState::Done)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,9 +224,9 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||||
return Ok(WriterState::Pause)
|
Ok(WriterState::Pause)
|
||||||
} else {
|
} else {
|
||||||
return Ok(WriterState::Done)
|
Ok(WriterState::Done)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,12 +235,10 @@ impl<T: AsyncWrite> Writer for H1Writer<T> {
|
||||||
//debug!("last payload item, but it is not EOF ");
|
//debug!("last payload item, but it is not EOF ");
|
||||||
Err(io::Error::new(io::ErrorKind::Other,
|
Err(io::Error::new(io::ErrorKind::Other,
|
||||||
"Last payload item, but eof is not reached"))
|
"Last payload item, but eof is not reached"))
|
||||||
|
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||||
|
Ok(WriterState::Pause)
|
||||||
} else {
|
} else {
|
||||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
Ok(WriterState::Done)
|
||||||
return Ok(WriterState::Pause)
|
|
||||||
} else {
|
|
||||||
return Ok(WriterState::Done)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,7 +342,7 @@ impl Encoder {
|
||||||
true
|
true
|
||||||
},
|
},
|
||||||
Kind::Length(ref mut remaining) => {
|
Kind::Length(ref mut remaining) => {
|
||||||
return *remaining == 0
|
*remaining == 0
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@ use futures::{Async, Poll, Future, Stream};
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
|
||||||
use task::Task;
|
use task::Task;
|
||||||
use server::HttpHandler;
|
use channel::HttpHandler;
|
||||||
use httpcodes::HTTPNotFound;
|
use httpcodes::HTTPNotFound;
|
||||||
use httprequest::HttpRequest;
|
use httprequest::HttpRequest;
|
||||||
use payload::{Payload, PayloadError, PayloadSender};
|
use payload::{Payload, PayloadError, PayloadSender};
|
||||||
|
|
|
@ -77,15 +77,13 @@ impl H2Writer {
|
||||||
let bytes = self.buffer.split_to(cmp::min(cap, len));
|
let bytes = self.buffer.split_to(cmp::min(cap, len));
|
||||||
let eof = self.buffer.is_empty() && self.eof;
|
let eof = self.buffer.is_empty() && self.eof;
|
||||||
|
|
||||||
if let Err(_) = stream.send_data(bytes.freeze(), eof) {
|
if let Err(err) = stream.send_data(bytes.freeze(), eof) {
|
||||||
return Err(io::Error::new(io::ErrorKind::Other, ""))
|
return Err(io::Error::new(io::ErrorKind::Other, err))
|
||||||
|
} else if !self.buffer.is_empty() {
|
||||||
|
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
|
||||||
|
stream.reserve_capacity(cap);
|
||||||
} else {
|
} else {
|
||||||
if !self.buffer.is_empty() {
|
return Ok(WriterState::Done)
|
||||||
let cap = cmp::min(self.buffer.len(), CHUNK_SIZE);
|
|
||||||
stream.reserve_capacity(cap);
|
|
||||||
} else {
|
|
||||||
return Ok(WriterState::Done)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
|
@ -94,7 +92,7 @@ impl H2Writer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return Ok(WriterState::Done)
|
Ok(WriterState::Done)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,9 +198,9 @@ impl Writer for H2Writer {
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||||
return Ok(WriterState::Pause)
|
Ok(WriterState::Pause)
|
||||||
} else {
|
} else {
|
||||||
return Ok(WriterState::Done)
|
Ok(WriterState::Done)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -211,12 +209,10 @@ impl Writer for H2Writer {
|
||||||
if !self.encoder.encode_eof(&mut self.buffer) {
|
if !self.encoder.encode_eof(&mut self.buffer) {
|
||||||
Err(io::Error::new(io::ErrorKind::Other,
|
Err(io::Error::new(io::ErrorKind::Other,
|
||||||
"Last payload item, but eof is not reached"))
|
"Last payload item, but eof is not reached"))
|
||||||
|
} else if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
||||||
|
Ok(WriterState::Pause)
|
||||||
} else {
|
} else {
|
||||||
if self.buffer.len() > MAX_WRITE_BUFFER_SIZE {
|
Ok(WriterState::Done)
|
||||||
return Ok(WriterState::Pause)
|
|
||||||
} else {
|
|
||||||
return Ok(WriterState::Done)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,9 +284,7 @@ impl Encoder {
|
||||||
pub fn encode_eof(&mut self, _dst: &mut BytesMut) -> bool {
|
pub fn encode_eof(&mut self, _dst: &mut BytesMut) -> bool {
|
||||||
match self.kind {
|
match self.kind {
|
||||||
Kind::Eof => true,
|
Kind::Eof => true,
|
||||||
Kind::Length(ref mut remaining) => {
|
Kind::Length(ref mut remaining) => *remaining == 0
|
||||||
return *remaining == 0
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
10
src/lib.rs
10
src/lib.rs
|
@ -27,6 +27,11 @@ extern crate native_tls;
|
||||||
#[cfg(feature="tls")]
|
#[cfg(feature="tls")]
|
||||||
extern crate tokio_tls;
|
extern crate tokio_tls;
|
||||||
|
|
||||||
|
#[cfg(feature="openssl")]
|
||||||
|
extern crate openssl;
|
||||||
|
#[cfg(feature="openssl")]
|
||||||
|
extern crate tokio_openssl;
|
||||||
|
|
||||||
mod application;
|
mod application;
|
||||||
mod body;
|
mod body;
|
||||||
mod context;
|
mod context;
|
||||||
|
@ -42,6 +47,7 @@ mod route;
|
||||||
mod task;
|
mod task;
|
||||||
mod staticfiles;
|
mod staticfiles;
|
||||||
mod server;
|
mod server;
|
||||||
|
mod channel;
|
||||||
mod wsframe;
|
mod wsframe;
|
||||||
mod wsproto;
|
mod wsproto;
|
||||||
mod h1;
|
mod h1;
|
||||||
|
@ -65,6 +71,7 @@ pub use recognizer::{Params, RouteRecognizer};
|
||||||
pub use logger::Logger;
|
pub use logger::Logger;
|
||||||
pub use server::HttpServer;
|
pub use server::HttpServer;
|
||||||
pub use context::HttpContext;
|
pub use context::HttpContext;
|
||||||
|
pub use channel::HttpChannel;
|
||||||
pub use staticfiles::StaticFiles;
|
pub use staticfiles::StaticFiles;
|
||||||
|
|
||||||
// re-exports
|
// re-exports
|
||||||
|
@ -75,3 +82,6 @@ pub use http_range::{HttpRange, HttpRangeParseError};
|
||||||
|
|
||||||
#[cfg(feature="tls")]
|
#[cfg(feature="tls")]
|
||||||
pub use native_tls::Pkcs12;
|
pub use native_tls::Pkcs12;
|
||||||
|
|
||||||
|
#[cfg(feature="openssl")]
|
||||||
|
pub use openssl::pkcs12::Pkcs12;
|
||||||
|
|
|
@ -116,7 +116,7 @@ pub(crate) fn check_pattern(path: &str) {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse(pattern: &str) -> String {
|
fn parse(pattern: &str) -> String {
|
||||||
const DEFAULT_PATTERN: &'static str = "[^/]+";
|
const DEFAULT_PATTERN: &str = "[^/]+";
|
||||||
|
|
||||||
let mut re = String::from("^/");
|
let mut re = String::from("^/");
|
||||||
let mut in_param = false;
|
let mut in_param = false;
|
||||||
|
|
154
src/server.rs
154
src/server.rs
|
@ -1,30 +1,26 @@
|
||||||
use std::{io, net, mem};
|
use std::{io, net};
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
use actix::dev::*;
|
use actix::dev::*;
|
||||||
use futures::{Future, Poll, Async, Stream};
|
use futures::Stream;
|
||||||
use tokio_core::net::{TcpListener, TcpStream};
|
|
||||||
use tokio_io::{AsyncRead, AsyncWrite};
|
use tokio_io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio_core::net::{TcpListener, TcpStream};
|
||||||
|
|
||||||
#[cfg(feature="tls")]
|
#[cfg(feature="tls")]
|
||||||
use native_tls::TlsAcceptor;
|
use native_tls::TlsAcceptor;
|
||||||
#[cfg(feature="tls")]
|
#[cfg(feature="tls")]
|
||||||
use tokio_tls::{TlsStream, TlsAcceptorExt};
|
use tokio_tls::{TlsStream, TlsAcceptorExt};
|
||||||
|
|
||||||
use h1;
|
#[cfg(feature="alpn")]
|
||||||
use h2;
|
use openssl::ssl::{SslMethod, SslAcceptorBuilder};
|
||||||
use task::Task;
|
#[cfg(feature="alpn")]
|
||||||
use payload::Payload;
|
use openssl::pkcs12::ParsedPkcs12;
|
||||||
use httprequest::HttpRequest;
|
#[cfg(feature="alpn")]
|
||||||
|
use tokio_openssl::{SslStream, SslAcceptorExt};
|
||||||
|
|
||||||
|
use channel::{HttpChannel, HttpHandler};
|
||||||
|
|
||||||
/// Low level http request handler
|
|
||||||
pub trait HttpHandler: 'static {
|
|
||||||
/// Http handler prefix
|
|
||||||
fn prefix(&self) -> &str;
|
|
||||||
/// Handle request
|
|
||||||
fn handle(&self, req: &mut HttpRequest, payload: Payload) -> Task;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An HTTP Server
|
/// An HTTP Server
|
||||||
///
|
///
|
||||||
|
@ -66,7 +62,7 @@ impl<T, A, H> HttpServer<T, A, H>
|
||||||
S: Stream<Item=(T, A), Error=io::Error> + 'static
|
S: Stream<Item=(T, A), Error=io::Error> + 'static
|
||||||
{
|
{
|
||||||
Ok(HttpServer::create(move |ctx| {
|
Ok(HttpServer::create(move |ctx| {
|
||||||
ctx.add_stream(stream.map(|(t, a)| IoStream(t, a)));
|
ctx.add_stream(stream.map(|(t, a)| IoStream(t, a, false)));
|
||||||
self
|
self
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
@ -111,7 +107,7 @@ impl<H: HttpHandler> HttpServer<TcpStream, net::SocketAddr, H> {
|
||||||
Ok(HttpServer::create(move |ctx| {
|
Ok(HttpServer::create(move |ctx| {
|
||||||
for (addr, tcp) in addrs {
|
for (addr, tcp) in addrs {
|
||||||
info!("Starting http server on {}", addr);
|
info!("Starting http server on {}", addr);
|
||||||
ctx.add_stream(tcp.incoming().map(|(t, a)| IoStream(t, a)));
|
ctx.add_stream(tcp.incoming().map(|(t, a)| IoStream(t, a, false)));
|
||||||
}
|
}
|
||||||
self
|
self
|
||||||
}))
|
}))
|
||||||
|
@ -161,7 +157,61 @@ impl<H: HttpHandler> HttpServer<TlsStream<TcpStream>, net::SocketAddr, H> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct IoStream<T, A>(T, A);
|
#[cfg(feature="alpn")]
|
||||||
|
impl<H: HttpHandler> HttpServer<SslStream<TcpStream>, net::SocketAddr, H> {
|
||||||
|
|
||||||
|
/// Start listening for incomming tls connections.
|
||||||
|
///
|
||||||
|
/// This methods converts address to list of `SocketAddr`
|
||||||
|
/// then binds to all available addresses.
|
||||||
|
pub fn serve_tls<S, Addr>(self, addr: S, identity: ParsedPkcs12) -> io::Result<Addr>
|
||||||
|
where Self: ActorAddress<Self, Addr>,
|
||||||
|
S: net::ToSocketAddrs,
|
||||||
|
{
|
||||||
|
let addrs = self.bind(addr)?;
|
||||||
|
let acceptor = match SslAcceptorBuilder::mozilla_intermediate(SslMethod::tls(),
|
||||||
|
&identity.pkey,
|
||||||
|
&identity.cert,
|
||||||
|
&identity.chain)
|
||||||
|
{
|
||||||
|
Ok(mut builder) => {
|
||||||
|
match builder.builder_mut().set_alpn_protocols(&[b"h2", b"http/1.1"]) {
|
||||||
|
Ok(_) => builder.build(),
|
||||||
|
Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err))
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(HttpServer::create(move |ctx| {
|
||||||
|
for (addr, tcp) in addrs {
|
||||||
|
info!("Starting tls http server on {}", addr);
|
||||||
|
|
||||||
|
let acc = acceptor.clone();
|
||||||
|
ctx.add_stream(tcp.incoming().and_then(move |(stream, addr)| {
|
||||||
|
SslAcceptorExt::accept_async(&acc, stream)
|
||||||
|
.map(move |stream| {
|
||||||
|
let http2 = if let Some(p) =
|
||||||
|
stream.get_ref().ssl().selected_alpn_protocol()
|
||||||
|
{
|
||||||
|
p.len() == 2 && &p == b"h2"
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
IoStream(stream, addr, http2)
|
||||||
|
})
|
||||||
|
.map_err(|err| {
|
||||||
|
trace!("Error during handling tls connection: {}", err);
|
||||||
|
io::Error::new(io::ErrorKind::Other, err)
|
||||||
|
})
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct IoStream<T, A>(T, A, bool);
|
||||||
|
|
||||||
impl<T, A> ResponseType for IoStream<T, A>
|
impl<T, A> ResponseType for IoStream<T, A>
|
||||||
where T: AsyncRead + AsyncWrite + 'static,
|
where T: AsyncRead + AsyncWrite + 'static,
|
||||||
|
@ -189,73 +239,7 @@ impl<T, A, H> Handler<IoStream<T, A>, io::Error> for HttpServer<T, A, H>
|
||||||
-> Response<Self, IoStream<T, A>>
|
-> Response<Self, IoStream<T, A>>
|
||||||
{
|
{
|
||||||
Arbiter::handle().spawn(
|
Arbiter::handle().spawn(
|
||||||
HttpChannel{
|
HttpChannel::new(msg.0, msg.1, Rc::clone(&self.h), msg.2));
|
||||||
proto: Protocol::H1(h1::Http1::new(msg.0, msg.1, Rc::clone(&self.h)))
|
|
||||||
});
|
|
||||||
Self::empty()
|
Self::empty()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
enum Protocol<T, A, H>
|
|
||||||
where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: 'static
|
|
||||||
{
|
|
||||||
H1(h1::Http1<T, A, H>),
|
|
||||||
H2(h2::Http2<T, A, H>),
|
|
||||||
None,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct HttpChannel<T, A, H>
|
|
||||||
where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: 'static
|
|
||||||
{
|
|
||||||
proto: Protocol<T, A, H>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/*impl<T: 'static, A: 'static, H: 'static> Drop for HttpChannel<T, A, H> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
println!("Drop http channel");
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
|
|
||||||
impl<T, A, H> Actor for HttpChannel<T, A, H>
|
|
||||||
where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: HttpHandler + 'static
|
|
||||||
{
|
|
||||||
type Context = Context<Self>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T, A, H> Future for HttpChannel<T, A, H>
|
|
||||||
where T: AsyncRead + AsyncWrite + 'static, A: 'static, H: HttpHandler + 'static
|
|
||||||
{
|
|
||||||
type Item = ();
|
|
||||||
type Error = ();
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
match self.proto {
|
|
||||||
Protocol::H1(ref mut h1) => {
|
|
||||||
match h1.poll() {
|
|
||||||
Ok(Async::Ready(h1::Http1Result::Done)) =>
|
|
||||||
return Ok(Async::Ready(())),
|
|
||||||
Ok(Async::Ready(h1::Http1Result::Upgrade)) => (),
|
|
||||||
Ok(Async::NotReady) =>
|
|
||||||
return Ok(Async::NotReady),
|
|
||||||
Err(_) =>
|
|
||||||
return Err(()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Protocol::H2(ref mut h2) =>
|
|
||||||
return h2.poll(),
|
|
||||||
Protocol::None =>
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
|
|
||||||
// upgrade to h2
|
|
||||||
let proto = mem::replace(&mut self.proto, Protocol::None);
|
|
||||||
match proto {
|
|
||||||
Protocol::H1(h1) => {
|
|
||||||
let (stream, addr, router, buf) = h1.into_inner();
|
|
||||||
self.proto = Protocol::H2(h2::Http2::new(stream, addr, router, buf));
|
|
||||||
return self.poll()
|
|
||||||
}
|
|
||||||
_ => unreachable!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -80,11 +80,11 @@ use wsproto::*;
|
||||||
pub use wsproto::CloseCode;
|
pub use wsproto::CloseCode;
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
const SEC_WEBSOCKET_ACCEPT: &'static str = "SEC-WEBSOCKET-ACCEPT";
|
const SEC_WEBSOCKET_ACCEPT: &str = "SEC-WEBSOCKET-ACCEPT";
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
const SEC_WEBSOCKET_KEY: &'static str = "SEC-WEBSOCKET-KEY";
|
const SEC_WEBSOCKET_KEY: &str = "SEC-WEBSOCKET-KEY";
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
const SEC_WEBSOCKET_VERSION: &'static str = "SEC-WEBSOCKET-VERSION";
|
const SEC_WEBSOCKET_VERSION: &str = "SEC-WEBSOCKET-VERSION";
|
||||||
// #[doc(hidden)]
|
// #[doc(hidden)]
|
||||||
// const SEC_WEBSOCKET_PROTOCOL: &'static str = "SEC-WEBSOCKET-PROTOCOL";
|
// const SEC_WEBSOCKET_PROTOCOL: &'static str = "SEC-WEBSOCKET-PROTOCOL";
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue