1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-12 18:29:34 +00:00
actix-web/awc/src/connect.rs

198 lines
5.8 KiB
Rust
Raw Normal View History

use std::{
2021-02-28 18:17:08 +00:00
future::Future,
net,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
2019-03-28 01:53:19 +00:00
use actix_codec::Framed;
2021-12-04 19:40:47 +00:00
use actix_http::{h1::ClientCodec, Payload, RequestHead, RequestHeadType, ResponseHead};
2019-03-26 04:58:01 +00:00
use actix_service::Service;
2021-02-28 18:17:08 +00:00
use futures_core::{future::LocalBoxFuture, ready};
2019-03-26 04:58:01 +00:00
2021-12-04 19:40:47 +00:00
use crate::{
any_body::AnyBody,
2023-07-17 01:38:12 +00:00
client::{Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError},
ClientResponse,
};
pub type BoxConnectorService = Rc<
dyn Service<
ConnectRequest,
Response = ConnectResponse,
Error = SendRequestError,
Future = LocalBoxFuture<'static, Result<ConnectResponse, SendRequestError>>,
>,
>;
pub type BoxedSocket = Box<dyn ConnectionIo>;
2022-03-08 16:51:40 +00:00
/// Combined HTTP and WebSocket request type received by connection service.
pub enum ConnectRequest {
2022-03-08 16:51:40 +00:00
/// Standard HTTP request.
///
/// Contains the request head, body type, and optional pre-resolved socket address.
2021-11-16 22:10:30 +00:00
Client(RequestHeadType, AnyBody, Option<net::SocketAddr>),
2022-03-08 16:51:40 +00:00
/// Tunnel used by WebSocket connection requests.
///
/// Contains the request head and optional pre-resolved socket address.
Tunnel(RequestHead, Option<net::SocketAddr>),
}
2022-03-08 16:51:40 +00:00
/// Combined HTTP response & WebSocket tunnel type returned from connection service.
pub enum ConnectResponse {
2022-03-08 16:51:40 +00:00
/// Standard HTTP response.
Client(ClientResponse),
2022-03-08 16:51:40 +00:00
/// Tunnel used for WebSocket communication.
///
/// Contains response head and framed HTTP/1.1 codec.
Tunnel(ResponseHead, Framed<BoxedSocket, ClientCodec>),
}
impl ConnectResponse {
2022-03-08 16:51:40 +00:00
/// Unwraps type into HTTP response.
///
/// # Panics
/// Panics if enum variant is not `Client`.
pub fn into_client_response(self) -> ClientResponse {
match self {
ConnectResponse::Client(res) => res,
2023-07-17 01:38:12 +00:00
_ => {
panic!("ClientResponse only reachable with ConnectResponse::ClientResponse variant")
}
}
}
2022-03-08 16:51:40 +00:00
/// Unwraps type into WebSocket tunnel response.
///
/// # Panics
/// Panics if enum variant is not `Tunnel`.
pub fn into_tunnel_response(self) -> (ResponseHead, Framed<BoxedSocket, ClientCodec>) {
match self {
ConnectResponse::Tunnel(head, framed) => (head, framed),
2023-07-17 01:38:12 +00:00
_ => {
panic!("TunnelResponse only reachable with ConnectResponse::TunnelResponse variant")
}
}
}
}
pub struct DefaultConnector<S> {
2021-02-28 18:17:08 +00:00
connector: S,
}
impl<S> DefaultConnector<S> {
pub(crate) fn new(connector: S) -> Self {
Self { connector }
}
}
impl<S, Io> Service<ConnectRequest> for DefaultConnector<S>
2019-03-26 04:58:01 +00:00
where
S: Service<ClientConnect, Error = ConnectError, Response = Connection<Io>>,
Io: ConnectionIo,
2019-03-26 04:58:01 +00:00
{
type Response = ConnectResponse;
type Error = SendRequestError;
type Future = ConnectRequestFuture<S::Future, Io>;
actix_service::forward_ready!(connector);
fn call(&self, req: ConnectRequest) -> Self::Future {
// connect to the host
let fut = match req {
ConnectRequest::Client(ref head, .., addr) => self.connector.call(ClientConnect {
uri: head.as_ref().uri.clone(),
addr,
}),
ConnectRequest::Tunnel(ref head, addr) => self.connector.call(ClientConnect {
uri: head.uri.clone(),
addr,
}),
};
2021-02-28 18:17:08 +00:00
ConnectRequestFuture::Connection {
fut,
req: Some(req),
}
}
}
2021-02-28 18:17:08 +00:00
pin_project_lite::pin_project! {
#[project = ConnectRequestProj]
pub enum ConnectRequestFuture<Fut, Io>
where
Io: ConnectionIo
{
2021-02-28 18:17:08 +00:00
Connection {
#[pin]
fut: Fut,
req: Option<ConnectRequest>
},
Client {
fut: LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
},
Tunnel {
fut: LocalBoxFuture<
'static,
Result<(ResponseHead, Framed<Connection<Io>, ClientCodec>), SendRequestError>,
2021-02-28 18:17:08 +00:00
>,
}
}
}
impl<Fut, Io> Future for ConnectRequestFuture<Fut, Io>
2021-02-28 18:17:08 +00:00
where
Fut: Future<Output = Result<Connection<Io>, ConnectError>>,
Io: ConnectionIo,
2021-02-28 18:17:08 +00:00
{
type Output = Result<ConnectResponse, SendRequestError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project() {
ConnectRequestProj::Connection { fut, req } => {
let connection = ready!(fut.poll(cx))?;
let req = req.take().unwrap();
2022-03-08 16:51:40 +00:00
2021-02-28 18:17:08 +00:00
match req {
ConnectRequest::Client(head, body, ..) => {
// send request
let fut = ConnectRequestFuture::Client {
fut: connection.send_request(head, body),
};
2022-03-08 16:51:40 +00:00
self.set(fut);
2021-02-28 18:17:08 +00:00
}
2022-03-08 16:51:40 +00:00
2021-02-28 18:17:08 +00:00
ConnectRequest::Tunnel(head, ..) => {
// send request
let fut = ConnectRequestFuture::Tunnel {
fut: connection.open_tunnel(RequestHeadType::from(head)),
};
2022-03-08 16:51:40 +00:00
self.set(fut);
2021-02-28 18:17:08 +00:00
}
}
2022-03-08 16:51:40 +00:00
2021-02-28 18:17:08 +00:00
self.poll(cx)
}
2022-03-08 16:51:40 +00:00
2021-02-28 18:17:08 +00:00
ConnectRequestProj::Client { fut } => {
let (head, payload) = ready!(fut.as_mut().poll(cx))?;
Poll::Ready(Ok(ConnectResponse::Client(ClientResponse::new(
head, payload,
))))
}
2022-03-08 16:51:40 +00:00
2021-02-28 18:17:08 +00:00
ConnectRequestProj::Tunnel { fut } => {
let (head, framed) = ready!(fut.as_mut().poll(cx))?;
let framed = framed.into_map_io(|io| Box::new(io) as _);
2021-02-28 18:17:08 +00:00
Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed)))
}
}
}
2019-03-28 01:53:19 +00:00
}