diff --git a/actix-http/src/h1/client.rs b/actix-http/src/h1/client.rs index 2e0103409..4a6104688 100644 --- a/actix-http/src/h1/client.rs +++ b/actix-http/src/h1/client.rs @@ -223,15 +223,3 @@ impl Encoder> for ClientCodec { Ok(()) } } - -pub struct Writer<'a>(pub &'a mut BytesMut); - -impl<'a> io::Write for Writer<'a> { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.extend_from_slice(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index bb89905fb..bd8287b26 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -337,7 +337,7 @@ impl MessageType for RequestHeadType { let head = self.as_ref(); dst.reserve(256 + head.headers.len() * AVERAGE_HEADER_SIZE); write!( - Writer(dst), + helpers::Writer(dst), "{} {} {}", head.method, head.uri.path_and_query().map(|u| u.as_str()).unwrap_or("/"), @@ -470,7 +470,7 @@ impl TransferEncoding { *eof = true; buf.extend_from_slice(b"0\r\n\r\n"); } else { - writeln!(Writer(buf), "{:X}\r", msg.len()) + writeln!(helpers::Writer(buf), "{:X}\r", msg.len()) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; buf.reserve(msg.len() + 2); @@ -520,18 +520,6 @@ impl TransferEncoding { } } -struct Writer<'a>(pub &'a mut BytesMut); - -impl<'a> io::Write for Writer<'a> { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.extend_from_slice(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - /// # Safety /// Callers must ensure that the given length matches given value length. unsafe fn write_data(value: &[u8], buf: *mut u8, len: usize) { diff --git a/awc/src/connect.rs b/awc/src/connect.rs index 2ffb8ec37..5450550a7 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -1,71 +1,39 @@ -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; -use std::{fmt, io, net}; +use std::{ + fmt, io, net, + pin::Pin, + task::{Context, Poll}, +}; use actix_codec::{AsyncRead, AsyncWrite, Framed, ReadBuf}; -use actix_http::body::Body; -use actix_http::client::{ - Connect as ClientConnect, ConnectError, Connection, SendRequestError, +use actix_http::{ + body::Body, + client::{Connect as ClientConnect, ConnectError, Connection, SendRequestError}, + h1::ClientCodec, + RequestHead, RequestHeadType, ResponseHead, }; -use actix_http::h1::ClientCodec; -use actix_http::http::HeaderMap; -use actix_http::{RequestHead, RequestHeadType, ResponseHead}; use actix_service::Service; +use futures_core::future::LocalBoxFuture; use crate::response::ClientResponse; pub(crate) struct ConnectorWrapper(pub T); +type TunnelResponse = (ResponseHead, Framed); + pub(crate) trait Connect { fn send_request( &self, - head: RequestHead, + head: RequestHeadType, body: Body, addr: Option, - ) -> Pin>>>; - - fn send_request_extra( - &self, - head: Rc, - extra_headers: Option, - body: Body, - addr: Option, - ) -> Pin>>>; + ) -> LocalBoxFuture<'static, Result>; /// Send request, returns Response and Framed fn open_tunnel( &self, head: RequestHead, addr: Option, - ) -> Pin< - Box< - dyn Future< - Output = Result< - (ResponseHead, Framed), - SendRequestError, - >, - >, - >, - >; - - /// Send request and extra headers, returns Response and Framed - fn open_tunnel_extra( - &self, - head: Rc, - extra_headers: Option, - addr: Option, - ) -> Pin< - Box< - dyn Future< - Output = Result< - (ResponseHead, Framed), - SendRequestError, - >, - >, - >, - >; + ) -> LocalBoxFuture<'static, Result>; } impl Connect for ConnectorWrapper @@ -79,13 +47,13 @@ where { fn send_request( &self, - head: RequestHead, + head: RequestHeadType, body: Body, addr: Option, - ) -> Pin>>> { + ) -> LocalBoxFuture<'static, Result> { // connect to the host let fut = self.0.call(ClientConnect { - uri: head.uri.clone(), + uri: head.as_ref().uri.clone(), addr, }); @@ -93,33 +61,7 @@ where let connection = fut.await?; // send request - connection - .send_request(RequestHeadType::from(head), body) - .await - .map(|(head, payload)| ClientResponse::new(head, payload)) - }) - } - - fn send_request_extra( - &self, - head: Rc, - extra_headers: Option, - body: Body, - addr: Option, - ) -> Pin>>> { - // connect to the host - let fut = self.0.call(ClientConnect { - uri: head.uri.clone(), - addr, - }); - - Box::pin(async move { - let connection = fut.await?; - - // send request - let (head, payload) = connection - .send_request(RequestHeadType::Rc(head, extra_headers), body) - .await?; + let (head, payload) = connection.send_request(head, body).await?; Ok(ClientResponse::new(head, payload)) }) @@ -129,16 +71,7 @@ where &self, head: RequestHead, addr: Option, - ) -> Pin< - Box< - dyn Future< - Output = Result< - (ResponseHead, Framed), - SendRequestError, - >, - >, - >, - > { + ) -> LocalBoxFuture<'static, Result> { // connect to the host let fut = self.0.call(ClientConnect { uri: head.uri.clone(), @@ -156,40 +89,6 @@ where Ok((head, framed)) }) } - - fn open_tunnel_extra( - &self, - head: Rc, - extra_headers: Option, - addr: Option, - ) -> Pin< - Box< - dyn Future< - Output = Result< - (ResponseHead, Framed), - SendRequestError, - >, - >, - >, - > { - // connect to the host - let fut = self.0.call(ClientConnect { - uri: head.uri.clone(), - addr, - }); - - Box::pin(async move { - let connection = fut.await?; - - // send request - let (head, framed) = connection - .open_tunnel(RequestHeadType::Rc(head, extra_headers)) - .await?; - - let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); - Ok((head, framed)) - }) - } } trait AsyncSocket { diff --git a/awc/src/sender.rs b/awc/src/sender.rs index 5f790a038..1cf863d96 100644 --- a/awc/src/sender.rs +++ b/awc/src/sender.rs @@ -1,21 +1,26 @@ -use std::future::Future; -use std::net; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll}; -use std::time::Duration; +use std::{ + future::Future, + net, + pin::Pin, + rc::Rc, + task::{Context, Poll}, + time::Duration, +}; +use actix_http::{ + body::{Body, BodyStream}, + http::{ + header::{self, HeaderMap, HeaderName, IntoHeaderValue}, + Error as HttpError, + }, + Error, RequestHead, RequestHeadType, +}; use actix_rt::time::{sleep, Sleep}; use bytes::Bytes; use derive_more::From; use futures_core::Stream; use serde::Serialize; -use actix_http::body::{Body, BodyStream}; -use actix_http::http::header::{self, IntoHeaderValue}; -use actix_http::http::{Error as HttpError, HeaderMap, HeaderName}; -use actix_http::{Error, RequestHead}; - #[cfg(feature = "compress")] use actix_http::encoding::Decoder; #[cfg(feature = "compress")] @@ -184,12 +189,16 @@ impl RequestSender { B: Into, { let fut = match self { - RequestSender::Owned(head) => { - config.connector.send_request(head, body.into(), addr) - } - RequestSender::Rc(head, extra_headers) => config - .connector - .send_request_extra(head, extra_headers, body.into(), addr), + RequestSender::Owned(head) => config.connector.send_request( + RequestHeadType::Owned(head), + body.into(), + addr, + ), + RequestSender::Rc(head, extra_headers) => config.connector.send_request( + RequestHeadType::Rc(head, extra_headers), + body.into(), + addr, + ), }; SendClientRequest::new(fut, response_decompress, timeout.or(config.timeout))