1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-12 02:09:36 +00:00
actix-web/actix-http/src/h2/service.rs

234 lines
6.5 KiB
Rust
Raw Normal View History

2019-02-02 04:18:44 +00:00
use std::fmt::Debug;
use std::marker::PhantomData;
2019-02-06 19:44:15 +00:00
use std::{io, net};
2019-02-02 04:18:44 +00:00
use actix_codec::{AsyncRead, AsyncWrite, Framed};
2019-03-11 22:09:42 +00:00
use actix_server_config::{Io, ServerConfig as SrvConfig};
2019-02-02 04:18:44 +00:00
use actix_service::{IntoNewService, NewService, Service};
use actix_utils::cloneable::CloneableService;
2019-02-02 04:18:44 +00:00
use bytes::Bytes;
use futures::future::{ok, FutureResult};
2019-03-05 04:46:33 +00:00
use futures::{try_ready, Async, Future, IntoFuture, Poll, Stream};
2019-02-02 04:18:44 +00:00
use h2::server::{self, Connection, Handshake};
2019-02-06 19:44:15 +00:00
use h2::RecvStream;
2019-02-02 04:18:44 +00:00
use log::error;
use crate::body::MessageBody;
use crate::config::{KeepAlive, ServiceConfig};
2019-02-06 19:44:15 +00:00
use crate::error::{DispatchError, Error, ParseError, ResponseError};
2019-02-07 21:41:50 +00:00
use crate::payload::Payload;
2019-02-02 04:18:44 +00:00
use crate::request::Request;
use crate::response::Response;
2019-02-06 19:44:15 +00:00
use super::dispatcher::Dispatcher;
2019-02-02 04:18:44 +00:00
/// `NewService` implementation for HTTP2 transport
2019-03-11 22:09:42 +00:00
pub struct H2Service<T, P, S, B> {
2019-02-02 04:18:44 +00:00
srv: S,
cfg: ServiceConfig,
2019-03-11 22:09:42 +00:00
_t: PhantomData<(T, P, B)>,
2019-02-02 04:18:44 +00:00
}
2019-03-11 22:09:42 +00:00
impl<T, P, S, B> H2Service<T, P, S, B>
2019-02-02 04:18:44 +00:00
where
S: NewService<SrvConfig, Request = Request>,
2019-04-05 23:46:44 +00:00
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
2019-04-04 17:59:34 +00:00
<S::Service as Service>::Future: 'static,
2019-02-06 19:44:15 +00:00
B: MessageBody + 'static,
2019-02-02 04:18:44 +00:00
{
/// Create new `HttpService` instance.
pub fn new<F: IntoNewService<S, SrvConfig>>(service: F) -> Self {
2019-02-02 04:18:44 +00:00
let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0);
H2Service {
cfg,
srv: service.into_new_service(),
_t: PhantomData,
}
}
/// Create new `HttpService` instance with config.
pub fn with_config<F: IntoNewService<S, SrvConfig>>(
cfg: ServiceConfig,
service: F,
) -> Self {
H2Service {
cfg,
srv: service.into_new_service(),
_t: PhantomData,
}
}
2019-02-02 04:18:44 +00:00
}
2019-03-11 22:09:42 +00:00
impl<T, P, S, B> NewService<SrvConfig> for H2Service<T, P, S, B>
2019-02-02 04:18:44 +00:00
where
T: AsyncRead + AsyncWrite,
S: NewService<SrvConfig, Request = Request>,
2019-04-05 23:46:44 +00:00
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
2019-04-04 17:59:34 +00:00
<S::Service as Service>::Future: 'static,
2019-02-06 19:44:15 +00:00
B: MessageBody + 'static,
2019-02-02 04:18:44 +00:00
{
2019-03-11 22:09:42 +00:00
type Request = Io<T, P>;
2019-02-06 19:44:15 +00:00
type Response = ();
2019-03-07 06:56:34 +00:00
type Error = DispatchError;
2019-02-02 04:18:44 +00:00
type InitError = S::InitError;
2019-03-11 22:09:42 +00:00
type Service = H2ServiceHandler<T, P, S::Service, B>;
type Future = H2ServiceResponse<T, P, S, B>;
2019-02-02 04:18:44 +00:00
fn new_service(&self, cfg: &SrvConfig) -> Self::Future {
2019-02-02 04:18:44 +00:00
H2ServiceResponse {
fut: self.srv.new_service(cfg).into_future(),
2019-02-02 04:18:44 +00:00
cfg: Some(self.cfg.clone()),
_t: PhantomData,
}
}
}
#[doc(hidden)]
2019-03-11 22:09:42 +00:00
pub struct H2ServiceResponse<T, P, S: NewService<SrvConfig, Request = Request>, B> {
2019-03-05 04:46:33 +00:00
fut: <S::Future as IntoFuture>::Future,
2019-02-02 04:18:44 +00:00
cfg: Option<ServiceConfig>,
2019-03-11 22:09:42 +00:00
_t: PhantomData<(T, P, B)>,
2019-02-02 04:18:44 +00:00
}
2019-03-11 22:09:42 +00:00
impl<T, P, S, B> Future for H2ServiceResponse<T, P, S, B>
2019-02-02 04:18:44 +00:00
where
T: AsyncRead + AsyncWrite,
S: NewService<SrvConfig, Request = Request>,
2019-04-05 23:46:44 +00:00
S::Error: Into<Error>,
2019-04-04 17:59:34 +00:00
S::Response: Into<Response<B>>,
<S::Service as Service>::Future: 'static,
2019-02-06 19:44:15 +00:00
B: MessageBody + 'static,
2019-02-02 04:18:44 +00:00
{
2019-03-11 22:09:42 +00:00
type Item = H2ServiceHandler<T, P, S::Service, B>;
2019-02-02 04:18:44 +00:00
type Error = S::InitError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let service = try_ready!(self.fut.poll());
Ok(Async::Ready(H2ServiceHandler::new(
self.cfg.take().unwrap(),
service,
)))
}
}
/// `Service` implementation for http/2 transport
2019-04-04 17:59:34 +00:00
pub struct H2ServiceHandler<T, P, S, B> {
srv: CloneableService<S>,
2019-02-02 04:18:44 +00:00
cfg: ServiceConfig,
2019-03-11 22:09:42 +00:00
_t: PhantomData<(T, P, B)>,
2019-02-02 04:18:44 +00:00
}
2019-03-11 22:09:42 +00:00
impl<T, P, S, B> H2ServiceHandler<T, P, S, B>
2019-02-02 04:18:44 +00:00
where
2019-04-04 17:59:34 +00:00
S: Service<Request = Request>,
2019-04-05 23:46:44 +00:00
S::Error: Into<Error>,
2019-04-04 17:59:34 +00:00
S::Future: 'static,
S::Response: Into<Response<B>>,
2019-02-06 19:44:15 +00:00
B: MessageBody + 'static,
2019-02-02 04:18:44 +00:00
{
2019-03-11 22:09:42 +00:00
fn new(cfg: ServiceConfig, srv: S) -> H2ServiceHandler<T, P, S, B> {
2019-02-02 04:18:44 +00:00
H2ServiceHandler {
cfg,
srv: CloneableService::new(srv),
2019-02-02 04:18:44 +00:00
_t: PhantomData,
}
}
}
2019-03-11 22:09:42 +00:00
impl<T, P, S, B> Service for H2ServiceHandler<T, P, S, B>
2019-02-02 04:18:44 +00:00
where
T: AsyncRead + AsyncWrite,
2019-04-04 17:59:34 +00:00
S: Service<Request = Request>,
2019-04-05 23:46:44 +00:00
S::Error: Into<Error>,
2019-04-04 17:59:34 +00:00
S::Future: 'static,
S::Response: Into<Response<B>>,
2019-02-06 19:44:15 +00:00
B: MessageBody + 'static,
2019-02-02 04:18:44 +00:00
{
2019-03-11 22:09:42 +00:00
type Request = Io<T, P>;
2019-02-06 19:44:15 +00:00
type Response = ();
2019-03-07 06:56:34 +00:00
type Error = DispatchError;
2019-02-02 04:18:44 +00:00
type Future = H2ServiceHandlerResponse<T, S, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
2019-02-06 19:44:15 +00:00
self.srv.poll_ready().map_err(|e| {
2019-04-05 23:46:44 +00:00
let e = e.into();
2019-02-06 19:44:15 +00:00
error!("Service readiness error: {:?}", e);
2019-04-05 23:46:44 +00:00
DispatchError::Service(e)
2019-02-06 19:44:15 +00:00
})
2019-02-02 04:18:44 +00:00
}
2019-03-11 22:09:42 +00:00
fn call(&mut self, req: Self::Request) -> Self::Future {
2019-02-02 04:18:44 +00:00
H2ServiceHandlerResponse {
2019-02-06 19:44:15 +00:00
state: State::Handshake(
Some(self.srv.clone()),
Some(self.cfg.clone()),
2019-03-11 22:09:42 +00:00
server::handshake(req.into_parts().0),
2019-02-06 19:44:15 +00:00
),
2019-02-02 04:18:44 +00:00
}
}
}
2019-04-04 17:59:34 +00:00
enum State<T: AsyncRead + AsyncWrite, S: Service<Request = Request>, B: MessageBody>
where
S::Future: 'static,
{
2019-02-06 19:44:15 +00:00
Incoming(Dispatcher<T, S, B>),
Handshake(
Option<CloneableService<S>>,
Option<ServiceConfig>,
Handshake<T, Bytes>,
),
2019-02-02 04:18:44 +00:00
}
pub struct H2ServiceHandlerResponse<T, S, B>
where
T: AsyncRead + AsyncWrite,
2019-04-04 17:59:34 +00:00
S: Service<Request = Request>,
2019-04-05 23:46:44 +00:00
S::Error: Into<Error>,
2019-04-04 17:59:34 +00:00
S::Future: 'static,
S::Response: Into<Response<B>>,
2019-02-06 19:44:15 +00:00
B: MessageBody + 'static,
2019-02-02 04:18:44 +00:00
{
2019-02-06 19:44:15 +00:00
state: State<T, S, B>,
2019-02-02 04:18:44 +00:00
}
impl<T, S, B> Future for H2ServiceHandlerResponse<T, S, B>
where
T: AsyncRead + AsyncWrite,
2019-04-04 17:59:34 +00:00
S: Service<Request = Request>,
2019-04-05 23:46:44 +00:00
S::Error: Into<Error>,
2019-04-04 17:59:34 +00:00
S::Future: 'static,
S::Response: Into<Response<B>>,
2019-02-02 04:18:44 +00:00
B: MessageBody,
{
2019-02-06 19:44:15 +00:00
type Item = ();
2019-03-07 06:56:34 +00:00
type Error = DispatchError;
2019-02-02 04:18:44 +00:00
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
2019-02-06 19:44:15 +00:00
match self.state {
State::Incoming(ref mut disp) => disp.poll(),
State::Handshake(ref mut srv, ref mut config, ref mut handshake) => {
match handshake.poll() {
Ok(Async::Ready(conn)) => {
self.state = State::Incoming(Dispatcher::new(
srv.take().unwrap(),
conn,
config.take().unwrap(),
None,
));
self.poll()
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => {
trace!("H2 handshake error: {}", err);
2019-03-30 04:13:39 +00:00
Err(err.into())
2019-02-06 19:44:15 +00:00
}
}
}
}
2019-02-02 04:18:44 +00:00
}
}