mirror of
https://github.com/actix/actix-web.git
synced 2025-01-08 16:25:29 +00:00
simplify methods of awc::connect::Connect trait (#1941)
This commit is contained in:
parent
deafb7c8b8
commit
4c243cbf89
4 changed files with 49 additions and 165 deletions
|
@ -223,15 +223,3 @@ impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Writer<'a>(pub &'a mut BytesMut);
|
||||
|
||||
impl<'a> io::Write for Writer<'a> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.0.extend_from_slice(buf);
|
||||
Ok(buf.len())
|
||||
}
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -337,7 +337,7 @@ impl MessageType for RequestHeadType {
|
|||
let head = self.as_ref();
|
||||
dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE);
|
||||
write!(
|
||||
Writer(dst),
|
||||
helpers::Writer(dst),
|
||||
"{} {} {}",
|
||||
head.method,
|
||||
head.uri.path_and_query().map(|u| u.as_str()).unwrap_or("/"),
|
||||
|
@ -470,7 +470,7 @@ impl TransferEncoding {
|
|||
*eof = true;
|
||||
buf.extend_from_slice(b"0\r\n\r\n");
|
||||
} else {
|
||||
writeln!(Writer(buf), "{:X}\r", msg.len())
|
||||
writeln!(helpers::Writer(buf), "{:X}\r", msg.len())
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
|
||||
buf.reserve(msg.len() + 2);
|
||||
|
@ -520,18 +520,6 @@ impl TransferEncoding {
|
|||
}
|
||||
}
|
||||
|
||||
struct Writer<'a>(pub &'a mut BytesMut);
|
||||
|
||||
impl<'a> io::Write for Writer<'a> {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.0.extend_from_slice(buf);
|
||||
Ok(buf.len())
|
||||
}
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
/// Callers must ensure that the given length matches given value length.
|
||||
unsafe fn write_data(value: &[u8], buf: *mut u8, len: usize) {
|
||||
|
|
|
@ -1,71 +1,39 @@
|
|||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{fmt, io, net};
|
||||
use std::{
|
||||
fmt, io, net,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf};
|
||||
use actix_http::body::Body;
|
||||
use actix_http::client::{
|
||||
Connect as ClientConnect, ConnectError, Connection, SendRequestError,
|
||||
use actix_http::{
|
||||
body::Body,
|
||||
client::{Connect as ClientConnect, ConnectError, Connection, SendRequestError},
|
||||
h1::ClientCodec,
|
||||
RequestHead, RequestHeadType, ResponseHead,
|
||||
};
|
||||
use actix_http::h1::ClientCodec;
|
||||
use actix_http::http::HeaderMap;
|
||||
use actix_http::{RequestHead, RequestHeadType, ResponseHead};
|
||||
use actix_service::Service;
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
|
||||
use crate::response::ClientResponse;
|
||||
|
||||
pub(crate) struct ConnectorWrapper<T>(pub T);
|
||||
|
||||
type TunnelResponse = (ResponseHead, Framed<BoxedSocket, ClientCodec>);
|
||||
|
||||
pub(crate) trait Connect {
|
||||
fn send_request(
|
||||
&self,
|
||||
head: RequestHead,
|
||||
head: RequestHeadType,
|
||||
body: Body,
|
||||
addr: Option<net::SocketAddr>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>;
|
||||
|
||||
fn send_request_extra(
|
||||
&self,
|
||||
head: Rc<RequestHead>,
|
||||
extra_headers: Option<HeaderMap>,
|
||||
body: Body,
|
||||
addr: Option<net::SocketAddr>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>>;
|
||||
) -> LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>>;
|
||||
|
||||
/// Send request, returns Response and Framed
|
||||
fn open_tunnel(
|
||||
&self,
|
||||
head: RequestHead,
|
||||
addr: Option<net::SocketAddr>,
|
||||
) -> Pin<
|
||||
Box<
|
||||
dyn Future<
|
||||
Output = Result<
|
||||
(ResponseHead, Framed<BoxedSocket, ClientCodec>),
|
||||
SendRequestError,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
>;
|
||||
|
||||
/// Send request and extra headers, returns Response and Framed
|
||||
fn open_tunnel_extra(
|
||||
&self,
|
||||
head: Rc<RequestHead>,
|
||||
extra_headers: Option<HeaderMap>,
|
||||
addr: Option<net::SocketAddr>,
|
||||
) -> Pin<
|
||||
Box<
|
||||
dyn Future<
|
||||
Output = Result<
|
||||
(ResponseHead, Framed<BoxedSocket, ClientCodec>),
|
||||
SendRequestError,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
>;
|
||||
) -> LocalBoxFuture<'static, Result<TunnelResponse, SendRequestError>>;
|
||||
}
|
||||
|
||||
impl<T> Connect for ConnectorWrapper<T>
|
||||
|
@ -79,13 +47,13 @@ where
|
|||
{
|
||||
fn send_request(
|
||||
&self,
|
||||
head: RequestHead,
|
||||
head: RequestHeadType,
|
||||
body: Body,
|
||||
addr: Option<net::SocketAddr>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>> {
|
||||
) -> LocalBoxFuture<'static, Result<ClientResponse, SendRequestError>> {
|
||||
// connect to the host
|
||||
let fut = self.0.call(ClientConnect {
|
||||
uri: head.uri.clone(),
|
||||
uri: head.as_ref().uri.clone(),
|
||||
addr,
|
||||
});
|
||||
|
||||
|
@ -93,33 +61,7 @@ where
|
|||
let connection = fut.await?;
|
||||
|
||||
// send request
|
||||
connection
|
||||
.send_request(RequestHeadType::from(head), body)
|
||||
.await
|
||||
.map(|(head, payload)| ClientResponse::new(head, payload))
|
||||
})
|
||||
}
|
||||
|
||||
fn send_request_extra(
|
||||
&self,
|
||||
head: Rc<RequestHead>,
|
||||
extra_headers: Option<HeaderMap>,
|
||||
body: Body,
|
||||
addr: Option<net::SocketAddr>,
|
||||
) -> Pin<Box<dyn Future<Output = Result<ClientResponse, SendRequestError>>>> {
|
||||
// connect to the host
|
||||
let fut = self.0.call(ClientConnect {
|
||||
uri: head.uri.clone(),
|
||||
addr,
|
||||
});
|
||||
|
||||
Box::pin(async move {
|
||||
let connection = fut.await?;
|
||||
|
||||
// send request
|
||||
let (head, payload) = connection
|
||||
.send_request(RequestHeadType::Rc(head, extra_headers), body)
|
||||
.await?;
|
||||
let (head, payload) = connection.send_request(head, body).await?;
|
||||
|
||||
Ok(ClientResponse::new(head, payload))
|
||||
})
|
||||
|
@ -129,16 +71,7 @@ where
|
|||
&self,
|
||||
head: RequestHead,
|
||||
addr: Option<net::SocketAddr>,
|
||||
) -> Pin<
|
||||
Box<
|
||||
dyn Future<
|
||||
Output = Result<
|
||||
(ResponseHead, Framed<BoxedSocket, ClientCodec>),
|
||||
SendRequestError,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
> {
|
||||
) -> LocalBoxFuture<'static, Result<TunnelResponse, SendRequestError>> {
|
||||
// connect to the host
|
||||
let fut = self.0.call(ClientConnect {
|
||||
uri: head.uri.clone(),
|
||||
|
@ -156,40 +89,6 @@ where
|
|||
Ok((head, framed))
|
||||
})
|
||||
}
|
||||
|
||||
fn open_tunnel_extra(
|
||||
&self,
|
||||
head: Rc<RequestHead>,
|
||||
extra_headers: Option<HeaderMap>,
|
||||
addr: Option<net::SocketAddr>,
|
||||
) -> Pin<
|
||||
Box<
|
||||
dyn Future<
|
||||
Output = Result<
|
||||
(ResponseHead, Framed<BoxedSocket, ClientCodec>),
|
||||
SendRequestError,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
> {
|
||||
// connect to the host
|
||||
let fut = self.0.call(ClientConnect {
|
||||
uri: head.uri.clone(),
|
||||
addr,
|
||||
});
|
||||
|
||||
Box::pin(async move {
|
||||
let connection = fut.await?;
|
||||
|
||||
// send request
|
||||
let (head, framed) = connection
|
||||
.open_tunnel(RequestHeadType::Rc(head, extra_headers))
|
||||
.await?;
|
||||
|
||||
let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io))));
|
||||
Ok((head, framed))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
trait AsyncSocket {
|
||||
|
|
|
@ -1,21 +1,26 @@
|
|||
use std::future::Future;
|
||||
use std::net;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
future::Future,
|
||||
net,
|
||||
pin::Pin,
|
||||
rc::Rc,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use actix_http::{
|
||||
body::{Body, BodyStream},
|
||||
http::{
|
||||
header::{self, HeaderMap, HeaderName, IntoHeaderValue},
|
||||
Error as HttpError,
|
||||
},
|
||||
Error, RequestHead, RequestHeadType,
|
||||
};
|
||||
use actix_rt::time::{sleep, Sleep};
|
||||
use bytes::Bytes;
|
||||
use derive_more::From;
|
||||
use futures_core::Stream;
|
||||
use serde::Serialize;
|
||||
|
||||
use actix_http::body::{Body, BodyStream};
|
||||
use actix_http::http::header::{self, IntoHeaderValue};
|
||||
use actix_http::http::{Error as HttpError, HeaderMap, HeaderName};
|
||||
use actix_http::{Error, RequestHead};
|
||||
|
||||
#[cfg(feature = "compress")]
|
||||
use actix_http::encoding::Decoder;
|
||||
#[cfg(feature = "compress")]
|
||||
|
@ -184,12 +189,16 @@ impl RequestSender {
|
|||
B: Into<Body>,
|
||||
{
|
||||
let fut = match self {
|
||||
RequestSender::Owned(head) => {
|
||||
config.connector.send_request(head, body.into(), addr)
|
||||
}
|
||||
RequestSender::Rc(head, extra_headers) => config
|
||||
.connector
|
||||
.send_request_extra(head, extra_headers, body.into(), addr),
|
||||
RequestSender::Owned(head) => config.connector.send_request(
|
||||
RequestHeadType::Owned(head),
|
||||
body.into(),
|
||||
addr,
|
||||
),
|
||||
RequestSender::Rc(head, extra_headers) => config.connector.send_request(
|
||||
RequestHeadType::Rc(head, extra_headers),
|
||||
body.into(),
|
||||
addr,
|
||||
),
|
||||
};
|
||||
|
||||
SendClientRequest::new(fut, response_decompress, timeout.or(config.timeout))
|
||||
|
|
Loading…
Reference in a new issue