1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-10 17:29:36 +00:00
actix-web/actix-http/src/h2/mod.rs

101 lines
2.5 KiB
Rust
Raw Normal View History

2021-02-11 22:39:54 +00:00
//! HTTP/2 protocol.
2021-01-04 00:49:02 +00:00
use std::{
future::Future,
2021-01-04 00:49:02 +00:00
pin::Pin,
task::{Context, Poll},
};
2019-02-02 04:18:44 +00:00
use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::time::Sleep;
2019-02-06 19:44:15 +00:00
use bytes::Bytes;
2021-01-04 00:49:02 +00:00
use futures_core::{ready, Stream};
use h2::{
server::{handshake, Connection, Handshake},
RecvStream,
};
2019-02-06 19:44:15 +00:00
mod dispatcher;
2019-02-02 04:18:44 +00:00
mod service;
2019-03-07 06:56:34 +00:00
pub use self::dispatcher::Dispatcher;
2019-02-06 19:44:15 +00:00
pub use self::service::H2Service;
use crate::{
config::ServiceConfig,
error::{DispatchError, PayloadError},
};
2019-02-06 19:44:15 +00:00
2021-01-04 00:49:02 +00:00
/// HTTP/2 peer stream.
2019-02-06 19:44:15 +00:00
pub struct Payload {
2021-01-04 00:49:02 +00:00
stream: RecvStream,
2019-02-06 19:44:15 +00:00
}
impl Payload {
2021-01-04 00:49:02 +00:00
pub(crate) fn new(stream: RecvStream) -> Self {
Self { stream }
2019-02-06 19:44:15 +00:00
}
}
impl Stream for Payload {
type Item = Result<Bytes, PayloadError>;
2021-12-08 06:01:11 +00:00
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
2019-02-06 19:44:15 +00:00
2021-01-04 00:49:02 +00:00
match ready!(Pin::new(&mut this.stream).poll_data(cx)) {
Some(Ok(chunk)) => {
2019-02-06 19:44:15 +00:00
let len = chunk.len();
2021-01-04 00:49:02 +00:00
match this.stream.flow_control().release_capacity(len) {
Ok(()) => Poll::Ready(Some(Ok(chunk))),
Err(err) => Poll::Ready(Some(Err(err.into()))),
2019-02-06 19:44:15 +00:00
}
}
2021-01-04 00:49:02 +00:00
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
None => Poll::Ready(None),
2019-02-06 19:44:15 +00:00
}
}
}
pub(crate) fn handshake_with_timeout<T>(
io: T,
config: &ServiceConfig,
) -> HandshakeWithTimeout<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
HandshakeWithTimeout {
handshake: handshake(io),
timer: config.client_timer().map(Box::pin),
}
}
pub(crate) struct HandshakeWithTimeout<T: AsyncRead + AsyncWrite + Unpin> {
handshake: Handshake<T>,
timer: Option<Pin<Box<Sleep>>>,
}
impl<T> Future for HandshakeWithTimeout<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Output = Result<(Connection<T, Bytes>, Option<Pin<Box<Sleep>>>), DispatchError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match Pin::new(&mut this.handshake).poll(cx)? {
// return the timer on success handshake. It can be re-used for h2 ping-pong.
Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))),
Poll::Pending => match this.timer.as_mut() {
Some(timer) => {
ready!(timer.as_mut().poll(cx));
Poll::Ready(Err(DispatchError::SlowRequestTimeout))
}
None => Poll::Pending,
},
}
}
}