1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-01 21:08:43 +00:00

Run rustfmt

This commit is contained in:
Yuki Okushi 2020-02-27 11:10:55 +09:00
parent 903ae47baa
commit 7ba14fd113
19 changed files with 284 additions and 197 deletions

View file

@ -17,23 +17,18 @@ async fn main() -> io::Result<()> {
HttpService::build() HttpService::build()
.client_timeout(1000) .client_timeout(1000)
.client_disconnect(1000) .client_disconnect(1000)
.finish(|mut req: Request| { .finish(|mut req: Request| async move {
async move { let mut body = BytesMut::new();
let mut body = BytesMut::new(); while let Some(item) = req.payload().next().await {
while let Some(item) = req.payload().next().await { body.extend_from_slice(&item?);
body.extend_from_slice(&item?);
}
info!("request body: {:?}", body);
Ok::<_, Error>(
Response::Ok()
.header(
"x-head",
HeaderValue::from_static("dummy value!"),
)
.body(body),
)
} }
info!("request body: {:?}", body);
Ok::<_, Error>(
Response::Ok()
.header("x-head", HeaderValue::from_static("dummy value!"))
.body(body),
)
}) })
.tcp() .tcp()
})? })?

View file

@ -36,7 +36,10 @@ impl BodySize {
pub trait MessageBody { pub trait MessageBody {
fn size(&self) -> BodySize; fn size(&self) -> BodySize;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>>; fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>>;
downcast_get_type_id!(); downcast_get_type_id!();
} }
@ -48,7 +51,10 @@ impl MessageBody for () {
BodySize::Empty BodySize::Empty
} }
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
Poll::Ready(None) Poll::Ready(None)
} }
} }
@ -58,7 +64,10 @@ impl<T: MessageBody + Unpin> MessageBody for Box<T> {
self.as_ref().size() self.as_ref().size()
} }
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
Pin::new(self.get_mut().as_mut()).poll_next(cx) Pin::new(self.get_mut().as_mut()).poll_next(cx)
} }
} }
@ -103,7 +112,10 @@ impl<B: MessageBody> MessageBody for ResponseBody<B> {
} }
#[project] #[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
#[project] #[project]
match self.project() { match self.project() {
ResponseBody::Body(body) => body.poll_next(cx), ResponseBody::Body(body) => body.poll_next(cx),
@ -164,7 +176,10 @@ impl MessageBody for Body {
} }
#[project] #[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
#[project] #[project]
match self.project() { match self.project() {
Body::None => Poll::Ready(None), Body::None => Poll::Ready(None),
@ -285,7 +300,10 @@ impl MessageBody for Bytes {
BodySize::Sized(self.len()) BodySize::Sized(self.len())
} }
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
@ -299,11 +317,16 @@ impl MessageBody for BytesMut {
BodySize::Sized(self.len()) BodySize::Sized(self.len())
} }
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(mem::replace(self.get_mut(), BytesMut::new()).freeze()))) Poll::Ready(Some(Ok(
mem::replace(self.get_mut(), BytesMut::new()).freeze()
)))
} }
} }
} }
@ -313,7 +336,10 @@ impl MessageBody for &'static str {
BodySize::Sized(self.len()) BodySize::Sized(self.len())
} }
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
@ -329,11 +355,17 @@ impl MessageBody for Vec<u8> {
BodySize::Sized(self.len()) BodySize::Sized(self.len())
} }
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(Bytes::from(mem::replace(self.get_mut(), Vec::new()))))) Poll::Ready(Some(Ok(Bytes::from(mem::replace(
self.get_mut(),
Vec::new(),
)))))
} }
} }
} }
@ -343,7 +375,10 @@ impl MessageBody for String {
BodySize::Sized(self.len()) BodySize::Sized(self.len())
} }
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
@ -390,7 +425,10 @@ where
/// Empty values are skipped to prevent [`BodyStream`]'s transmission being /// Empty values are skipped to prevent [`BodyStream`]'s transmission being
/// ended on a zero-length chunk, but rather proceed until the underlying /// ended on a zero-length chunk, but rather proceed until the underlying
/// [`Stream`] ends. /// [`Stream`] ends.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
let mut stream = self.project().stream; let mut stream = self.project().stream;
loop { loop {
let stream = stream.as_mut(); let stream = stream.as_mut();
@ -433,7 +471,10 @@ where
/// Empty values are skipped to prevent [`SizedStream`]'s transmission being /// Empty values are skipped to prevent [`SizedStream`]'s transmission being
/// ended on a zero-length chunk, but rather proceed until the underlying /// ended on a zero-length chunk, but rather proceed until the underlying
/// [`Stream`] ends. /// [`Stream`] ends.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
let mut stream: Pin<&mut S> = self.project().stream; let mut stream: Pin<&mut S> = self.project().stream;
loop { loop {
let stream = stream.as_mut(); let stream = stream.as_mut();
@ -478,7 +519,10 @@ mod tests {
assert_eq!("test".size(), BodySize::Sized(4)); assert_eq!("test".size(), BodySize::Sized(4));
assert_eq!( assert_eq!(
poll_fn(|cx| Pin::new(&mut "test").poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| Pin::new(&mut "test").poll_next(cx))
.await
.unwrap()
.ok(),
Some(Bytes::from("test")) Some(Bytes::from("test"))
); );
} }
@ -497,10 +541,7 @@ mod tests {
assert_eq!(sb.size(), BodySize::Sized(4)); assert_eq!(sb.size(), BodySize::Sized(4));
assert_eq!( assert_eq!(
poll_fn(|cx| sb.as_mut().poll_next(cx)) poll_fn(|cx| sb.as_mut().poll_next(cx)).await.unwrap().ok(),
.await
.unwrap()
.ok(),
Some(Bytes::from("test")) Some(Bytes::from("test"))
); );
} }
@ -569,7 +610,9 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
async fn test_unit() { async fn test_unit() {
assert_eq!(().size(), BodySize::Empty); assert_eq!(().size(), BodySize::Empty);
assert!(poll_fn(|cx| Pin::new(&mut ()).poll_next(cx)).await.is_none()); assert!(poll_fn(|cx| Pin::new(&mut ()).poll_next(cx))
.await
.is_none());
} }
#[actix_rt::test] #[actix_rt::test]
@ -628,11 +671,17 @@ mod tests {
pin_mut!(body); pin_mut!(body);
assert_eq!( assert_eq!(
poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| body.as_mut().poll_next(cx))
.await
.unwrap()
.ok(),
Some(Bytes::from("1")), Some(Bytes::from("1")),
); );
assert_eq!( assert_eq!(
poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| body.as_mut().poll_next(cx))
.await
.unwrap()
.ok(),
Some(Bytes::from("2")), Some(Bytes::from("2")),
); );
} }
@ -670,11 +719,17 @@ mod tests {
); );
pin_mut!(body); pin_mut!(body);
assert_eq!( assert_eq!(
poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| body.as_mut().poll_next(cx))
.await
.unwrap()
.ok(),
Some(Bytes::from("1")), Some(Bytes::from("1")),
); );
assert_eq!( assert_eq!(
poll_fn(|cx| body.as_mut().poll_next(cx)).await.unwrap().ok(), poll_fn(|cx| body.as_mut().poll_next(cx))
.await
.unwrap()
.ok(),
Some(Bytes::from("2")), Some(Bytes::from("2")),
); );
} }

View file

@ -8,7 +8,7 @@ use bytes::buf::BufMutExt;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures_core::Stream; use futures_core::Stream;
use futures_util::future::poll_fn; use futures_util::future::poll_fn;
use futures_util::{SinkExt, StreamExt, pin_mut}; use futures_util::{pin_mut, SinkExt, StreamExt};
use crate::error::PayloadError; use crate::error::PayloadError;
use crate::h1; use crate::h1;

View file

@ -16,8 +16,8 @@ use fxhash::FxHashMap;
use h2::client::{handshake, Connection, SendRequest}; use h2::client::{handshake, Connection, SendRequest};
use http::uri::Authority; use http::uri::Authority;
use indexmap::IndexSet; use indexmap::IndexSet;
use slab::Slab;
use pin_project::pin_project; use pin_project::pin_project;
use slab::Slab;
use super::connection::{ConnectionType, IoConnection}; use super::connection::{ConnectionType, IoConnection};
use super::error::ConnectError; use super::error::ConnectError;

View file

@ -211,7 +211,12 @@ impl Date {
} }
fn update(&mut self) { fn update(&mut self) {
self.pos = 0; self.pos = 0;
write!(self, "{}", OffsetDateTime::now().format("%a, %d %b %Y %H:%M:%S GMT")).unwrap(); write!(
self,
"{}",
OffsetDateTime::now().format("%a, %d %b %Y %H:%M:%S GMT")
)
.unwrap();
} }
} }
@ -282,7 +287,6 @@ impl DateService {
mod tests { mod tests {
use super::*; use super::*;
// Test modifying the date from within the closure // Test modifying the date from within the closure
// passed to `set_date` // passed to `set_date`
#[test] #[test]
@ -290,9 +294,7 @@ mod tests {
let service = DateService::new(); let service = DateService::new();
// Make sure that `check_date` doesn't try to spawn a task // Make sure that `check_date` doesn't try to spawn a task
service.0.update(); service.0.update();
service.set_date(|_| { service.set_date(|_| service.0.reset());
service.0.reset()
});
} }
#[test] #[test]

View file

@ -109,7 +109,8 @@ impl CookieBuilder {
pub fn max_age_time(mut self, value: Duration) -> CookieBuilder { pub fn max_age_time(mut self, value: Duration) -> CookieBuilder {
// Truncate any nanoseconds from the Duration, as they aren't represented within `Max-Age` // Truncate any nanoseconds from the Duration, as they aren't represented within `Max-Age`
// and would cause two otherwise identical `Cookie` instances to not be equivalent to one another. // and would cause two otherwise identical `Cookie` instances to not be equivalent to one another.
self.cookie.set_max_age(Duration::seconds(value.whole_seconds())); self.cookie
.set_max_age(Duration::seconds(value.whole_seconds()));
self self
} }

View file

@ -533,8 +533,8 @@ mod test {
#[test] #[test]
#[cfg(feature = "secure-cookies")] #[cfg(feature = "secure-cookies")]
fn delta() { fn delta() {
use time::Duration;
use std::collections::HashMap; use std::collections::HashMap;
use time::Duration;
let mut c = CookieJar::new(); let mut c = CookieJar::new();

View file

@ -1015,7 +1015,9 @@ mod tests {
assert_eq!(&cookie.to_string(), "foo=bar; Domain=www.rust-lang.org"); assert_eq!(&cookie.to_string(), "foo=bar; Domain=www.rust-lang.org");
let time_str = "Wed, 21 Oct 2015 07:28:00 GMT"; let time_str = "Wed, 21 Oct 2015 07:28:00 GMT";
let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S").unwrap().assume_utc(); let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S")
.unwrap()
.assume_utc();
let cookie = Cookie::build("foo", "bar").expires(expires).finish(); let cookie = Cookie::build("foo", "bar").expires(expires).finish();
assert_eq!( assert_eq!(
&cookie.to_string(), &cookie.to_string(),

View file

@ -376,7 +376,9 @@ mod tests {
); );
let time_str = "Wed, 21 Oct 2015 07:28:00 GMT"; let time_str = "Wed, 21 Oct 2015 07:28:00 GMT";
let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S").unwrap().assume_utc(); let expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%M:%S")
.unwrap()
.assume_utc();
expected.set_expires(expires); expected.set_expires(expires);
assert_eq_parse!( assert_eq_parse!(
" foo=bar ;HttpOnly; Secure; Max-Age=4; Path=/foo; \ " foo=bar ;HttpOnly; Secure; Max-Age=4; Path=/foo; \
@ -385,7 +387,9 @@ mod tests {
); );
unexpected.set_domain("foo.com"); unexpected.set_domain("foo.com");
let bad_expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%S:%M").unwrap().assume_utc(); let bad_expires = PrimitiveDateTime::parse(time_str, "%a, %d %b %Y %H:%S:%M")
.unwrap()
.assume_utc();
expected.set_expires(bad_expires); expected.set_expires(bad_expires);
assert_ne_parse!( assert_ne_parse!(
" foo=bar ;HttpOnly; Secure; Max-Age=4; Path=/foo; \ " foo=bar ;HttpOnly; Secure; Max-Age=4; Path=/foo; \
@ -414,8 +418,15 @@ mod tests {
#[test] #[test]
fn do_not_panic_on_large_max_ages() { fn do_not_panic_on_large_max_ages() {
let max_duration = Duration::max_value(); let max_duration = Duration::max_value();
let expected = Cookie::build("foo", "bar").max_age_time(max_duration).finish(); let expected = Cookie::build("foo", "bar")
let overflow_duration = max_duration.checked_add(Duration::nanoseconds(1)).unwrap_or(max_duration); .max_age_time(max_duration)
assert_eq_parse!(format!(" foo=bar; Max-Age={:?}", overflow_duration.whole_seconds()), expected); .finish();
let overflow_duration = max_duration
.checked_add(Duration::nanoseconds(1))
.unwrap_or(max_duration);
assert_eq_parse!(
format!(" foo=bar; Max-Age={:?}", overflow_duration.whole_seconds()),
expected
);
} }
} }

View file

@ -96,7 +96,10 @@ impl<B: MessageBody> MessageBody for EncoderBody<B> {
} }
#[project] #[project]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
#[project] #[project]
match self.project() { match self.project() {
EncoderBody::Bytes(b) => { EncoderBody::Bytes(b) => {
@ -112,7 +115,6 @@ impl<B: MessageBody> MessageBody for EncoderBody<B> {
} }
} }
impl<B: MessageBody> MessageBody for Encoder<B> { impl<B: MessageBody> MessageBody for Encoder<B> {
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
if self.encoder.is_none() { if self.encoder.is_none() {
@ -122,7 +124,10 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
} }
} }
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Error>>> {
let mut this = self.project(); let mut this = self.project();
loop { loop {
if *this.eof { if *this.eof {

View file

@ -297,7 +297,10 @@ where
/// true - got whouldblock /// true - got whouldblock
/// false - didnt get whouldblock /// false - didnt get whouldblock
#[pin_project::project] #[pin_project::project]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<bool, DispatchError> { fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Result<bool, DispatchError> {
if self.write_buf.is_empty() { if self.write_buf.is_empty() {
return Ok(false); return Ok(false);
} }
@ -308,8 +311,7 @@ where
let InnerDispatcher { io, write_buf, .. } = self.project(); let InnerDispatcher { io, write_buf, .. } = self.project();
let mut io = Pin::new(io.as_mut().unwrap()); let mut io = Pin::new(io.as_mut().unwrap());
while written < len { while written < len {
match io.as_mut().poll_write(cx, &write_buf[written..]) match io.as_mut().poll_write(cx, &write_buf[written..]) {
{
Poll::Ready(Ok(0)) => { Poll::Ready(Ok(0)) => {
return Err(DispatchError::Io(io::Error::new( return Err(DispatchError::Io(io::Error::new(
io::ErrorKind::WriteZero, io::ErrorKind::WriteZero,
@ -359,7 +361,8 @@ where
} }
fn send_continue(self: Pin<&mut Self>) { fn send_continue(self: Pin<&mut Self>) {
self.project().write_buf self.project()
.write_buf
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
} }
@ -376,47 +379,44 @@ where
Some(DispatcherMessage::Item(req)) => { Some(DispatcherMessage::Item(req)) => {
Some(self.as_mut().handle_request(req, cx)?) Some(self.as_mut().handle_request(req, cx)?)
} }
Some(DispatcherMessage::Error(res)) => { Some(DispatcherMessage::Error(res)) => Some(
Some(self.as_mut().send_response(res, ResponseBody::Other(Body::Empty))?) self.as_mut()
} .send_response(res, ResponseBody::Other(Body::Empty))?,
),
Some(DispatcherMessage::Upgrade(req)) => { Some(DispatcherMessage::Upgrade(req)) => {
return Ok(PollResponse::Upgrade(req)); return Ok(PollResponse::Upgrade(req));
} }
None => None, None => None,
}, },
State::ExpectCall(fut) => { State::ExpectCall(fut) => match fut.poll(cx) {
match fut.poll(cx) { Poll::Ready(Ok(req)) => {
Poll::Ready(Ok(req)) => { self.as_mut().send_continue();
self.as_mut().send_continue(); this = self.as_mut().project();
this = self.as_mut().project(); this.state.set(State::ServiceCall(this.service.call(req)));
this.state.set(State::ServiceCall(this.service.call(req))); continue;
continue;
}
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
Some(self.as_mut().send_response(res, body.into_body())?)
}
Poll::Pending => None,
} }
} Poll::Ready(Err(e)) => {
State::ServiceCall(fut) => { let res: Response = e.into().into();
match fut.poll(cx) { let (res, body) = res.replace_body(());
Poll::Ready(Ok(res)) => { Some(self.as_mut().send_response(res, body.into_body())?)
let (res, body) = res.into().replace_body(());
let state = self.as_mut().send_response(res, body)?;
this = self.as_mut().project();
this.state.set(state);
continue;
}
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
Some(self.as_mut().send_response(res, body.into_body())?)
}
Poll::Pending => None,
} }
} Poll::Pending => None,
},
State::ServiceCall(fut) => match fut.poll(cx) {
Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(());
let state = self.as_mut().send_response(res, body)?;
this = self.as_mut().project();
this.state.set(state);
continue;
}
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
Some(self.as_mut().send_response(res, body.into_body())?)
}
Poll::Pending => None,
},
State::SendPayload(mut stream) => { State::SendPayload(mut stream) => {
loop { loop {
if this.write_buf.len() < HW_BUFFER_SIZE { if this.write_buf.len() < HW_BUFFER_SIZE {
@ -627,7 +627,10 @@ where
} }
/// keep-alive timer /// keep-alive timer
fn poll_keepalive(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> { fn poll_keepalive(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Result<(), DispatchError> {
let mut this = self.as_mut().project(); let mut this = self.as_mut().project();
if this.ka_timer.is_none() { if this.ka_timer.is_none() {
// shutdown timeout // shutdown timeout
@ -738,7 +741,11 @@ where
if !inner.write_buf.is_empty() || inner.io.is_none() { if !inner.write_buf.is_empty() || inner.io.is_none() {
Poll::Pending Poll::Pending
} else { } else {
match Pin::new(inner.project().io).as_pin_mut().unwrap().poll_shutdown(cx) { match Pin::new(inner.project().io)
.as_pin_mut()
.unwrap()
.poll_shutdown(cx)
{
Poll::Ready(res) => { Poll::Ready(res) => {
Poll::Ready(res.map_err(DispatchError::from)) Poll::Ready(res.map_err(DispatchError::from))
} }
@ -751,7 +758,11 @@ where
let should_disconnect = let should_disconnect =
if !inner.flags.contains(Flags::READ_DISCONNECT) { if !inner.flags.contains(Flags::READ_DISCONNECT) {
let mut inner_p = inner.as_mut().project(); let mut inner_p = inner.as_mut().project();
read_available(cx, inner_p.io.as_mut().unwrap(), &mut inner_p.read_buf)? read_available(
cx,
inner_p.io.as_mut().unwrap(),
&mut inner_p.read_buf,
)?
} else { } else {
None None
}; };
@ -783,10 +794,17 @@ where
std::mem::replace(inner_p.codec, Codec::default()), std::mem::replace(inner_p.codec, Codec::default()),
std::mem::replace(inner_p.read_buf, BytesMut::default()), std::mem::replace(inner_p.read_buf, BytesMut::default()),
); );
parts.write_buf = std::mem::replace(inner_p.write_buf, BytesMut::default()); parts.write_buf = std::mem::replace(
inner_p.write_buf,
BytesMut::default(),
);
let framed = Framed::from_parts(parts); let framed = Framed::from_parts(parts);
let upgrade = inner_p.upgrade.take().unwrap().call((req, framed)); let upgrade =
self.as_mut().project().inner.set(DispatcherState::Upgrade(upgrade)); inner_p.upgrade.take().unwrap().call((req, framed));
self.as_mut()
.project()
.inner
.set(DispatcherState::Upgrade(upgrade));
return self.poll(cx); return self.poll(cx);
} }
@ -834,12 +852,10 @@ where
} }
} }
} }
DispatcherState::Upgrade(fut) => { DispatcherState::Upgrade(fut) => fut.poll(cx).map_err(|e| {
fut.poll(cx).map_err(|e| { error!("Upgrade handler error: {}", e);
error!("Upgrade handler error: {}", e); DispatchError::Upgrade
DispatchError::Upgrade }),
})
}
} }
} }
} }
@ -931,7 +947,10 @@ mod tests {
if let DispatcherState::Normal(ref mut inner) = h1.inner { if let DispatcherState::Normal(ref mut inner) = h1.inner {
assert!(inner.flags.contains(Flags::READ_DISCONNECT)); assert!(inner.flags.contains(Flags::READ_DISCONNECT));
assert_eq!(&inner.io.take().unwrap().write_buf[..26], b"HTTP/1.1 400 Bad Request\r\n"); assert_eq!(
&inner.io.take().unwrap().write_buf[..26],
b"HTTP/1.1 400 Bad Request\r\n"
);
} }
}) })
.await; .await;

View file

@ -255,63 +255,60 @@ where
#[project] #[project]
match this.state.project() { match this.state.project() {
ServiceResponseState::ServiceCall(call, send) => { ServiceResponseState::ServiceCall(call, send) => match call.poll(cx) {
match call.poll(cx) { Poll::Ready(Ok(res)) => {
Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(());
let (res, body) = res.into().replace_body(());
let mut send = send.take().unwrap(); let mut send = send.take().unwrap();
let mut size = body.size(); let mut size = body.size();
let h2_res = let h2_res = self.as_mut().prepare_response(res.head(), &mut size);
self.as_mut().prepare_response(res.head(), &mut size); this = self.as_mut().project();
this = self.as_mut().project();
let stream = match send.send_response(h2_res, size.is_eof()) { let stream = match send.send_response(h2_res, size.is_eof()) {
Err(e) => { Err(e) => {
trace!("Error sending h2 response: {:?}", e); trace!("Error sending h2 response: {:?}", e);
return Poll::Ready(()); return Poll::Ready(());
}
Ok(stream) => stream,
};
if size.is_eof() {
Poll::Ready(())
} else {
this.state.set(ServiceResponseState::SendPayload(stream, body));
self.poll(cx)
} }
} Ok(stream) => stream,
Poll::Pending => Poll::Pending, };
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
let mut send = send.take().unwrap(); if size.is_eof() {
let mut size = body.size(); Poll::Ready(())
let h2_res = } else {
self.as_mut().prepare_response(res.head(), &mut size); this.state
this = self.as_mut().project(); .set(ServiceResponseState::SendPayload(stream, body));
self.poll(cx)
let stream = match send.send_response(h2_res, size.is_eof()) {
Err(e) => {
trace!("Error sending h2 response: {:?}", e);
return Poll::Ready(());
}
Ok(stream) => stream,
};
if size.is_eof() {
Poll::Ready(())
} else {
this.state.set(ServiceResponseState::SendPayload(
stream,
body.into_body(),
));
self.poll(cx)
}
} }
} }
} Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
let res: Response = e.into().into();
let (res, body) = res.replace_body(());
let mut send = send.take().unwrap();
let mut size = body.size();
let h2_res = self.as_mut().prepare_response(res.head(), &mut size);
this = self.as_mut().project();
let stream = match send.send_response(h2_res, size.is_eof()) {
Err(e) => {
trace!("Error sending h2 response: {:?}", e);
return Poll::Ready(());
}
Ok(stream) => stream,
};
if size.is_eof() {
Poll::Ready(())
} else {
this.state.set(ServiceResponseState::SendPayload(
stream,
body.into_body(),
));
self.poll(cx)
}
}
},
ServiceResponseState::SendPayload(ref mut stream, ref mut body) => loop { ServiceResponseState::SendPayload(ref mut stream, ref mut body) => loop {
loop { loop {
if let Some(ref mut buffer) = this.buffer { if let Some(ref mut buffer) = this.buffer {

View file

@ -83,13 +83,11 @@ where
Error = DispatchError, Error = DispatchError,
InitError = S::InitError, InitError = S::InitError,
> { > {
pipeline_factory(fn_factory(|| { pipeline_factory(fn_factory(|| async {
async { Ok::<_, S::InitError>(fn_service(|io: TcpStream| {
Ok::<_, S::InitError>(fn_service(|io: TcpStream| { let peer_addr = io.peer_addr().ok();
let peer_addr = io.peer_addr().ok(); ok::<_, DispatchError>((io, peer_addr))
ok::<_, DispatchError>((io, peer_addr)) }))
}))
}
})) }))
.and_then(self) .and_then(self)
} }

View file

@ -5,7 +5,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use bytes::{buf::BufMutExt, BytesMut}; use bytes::{buf::BufMutExt, BytesMut};
use http::header::{HeaderValue, InvalidHeaderValue}; use http::header::{HeaderValue, InvalidHeaderValue};
use time::{PrimitiveDateTime, OffsetDateTime, offset}; use time::{offset, OffsetDateTime, PrimitiveDateTime};
use crate::error::ParseError; use crate::error::ParseError;
use crate::header::IntoHeaderValue; use crate::header::IntoHeaderValue;
@ -21,7 +21,7 @@ impl FromStr for HttpDate {
fn from_str(s: &str) -> Result<HttpDate, ParseError> { fn from_str(s: &str) -> Result<HttpDate, ParseError> {
match time_parser::parse_http_date(s) { match time_parser::parse_http_date(s) {
Some(t) => Ok(HttpDate(t.assume_utc())), Some(t) => Ok(HttpDate(t.assume_utc())),
None => Err(ParseError::Header) None => Err(ParseError::Header),
} }
} }
} }
@ -49,7 +49,14 @@ impl IntoHeaderValue for HttpDate {
fn try_into(self) -> Result<HeaderValue, Self::Error> { fn try_into(self) -> Result<HeaderValue, Self::Error> {
let mut wrt = BytesMut::with_capacity(29).writer(); let mut wrt = BytesMut::with_capacity(29).writer();
write!(wrt, "{}", self.0.to_offset(offset!(UTC)).format("%a, %d %b %Y %H:%M:%S GMT")).unwrap(); write!(
wrt,
"{}",
self.0
.to_offset(offset!(UTC))
.format("%a, %d %b %Y %H:%M:%S GMT")
)
.unwrap();
HeaderValue::from_maybe_shared(wrt.get_mut().split().freeze()) HeaderValue::from_maybe_shared(wrt.get_mut().split().freeze())
} }
} }
@ -66,14 +73,13 @@ impl From<HttpDate> for SystemTime {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::HttpDate; use super::HttpDate;
use time::{PrimitiveDateTime, date, time}; use time::{date, time, PrimitiveDateTime};
#[test] #[test]
fn test_date() { fn test_date() {
let nov_07 = HttpDate(PrimitiveDateTime::new( let nov_07 = HttpDate(
date!(1994-11-07), PrimitiveDateTime::new(date!(1994 - 11 - 07), time!(8:48:37)).assume_utc(),
time!(8:48:37) );
).assume_utc());
assert_eq!( assert_eq!(
"Sun, 07 Nov 1994 08:48:37 GMT".parse::<HttpDate>().unwrap(), "Sun, 07 Nov 1994 08:48:37 GMT".parse::<HttpDate>().unwrap(),

View file

@ -21,7 +21,7 @@ macro_rules! downcast_get_type_id {
{ {
(std::any::TypeId::of::<Self>(), PrivateHelper(())) (std::any::TypeId::of::<Self>(), PrivateHelper(()))
} }
} };
} }
//Generate implementation for dyn $name //Generate implementation for dyn $name

View file

@ -1,4 +1,4 @@
use time::{OffsetDateTime, PrimitiveDateTime, Date}; use time::{Date, OffsetDateTime, PrimitiveDateTime};
/// Attempt to parse a `time` string as one of either RFC 1123, RFC 850, or asctime. /// Attempt to parse a `time` string as one of either RFC 1123, RFC 850, or asctime.
pub fn parse_http_date(time: &str) -> Option<PrimitiveDateTime> { pub fn parse_http_date(time: &str) -> Option<PrimitiveDateTime> {
@ -29,10 +29,10 @@ fn try_parse_rfc_850(time: &str) -> Option<PrimitiveDateTime> {
match Date::try_from_ymd(expanded_year, dt.month(), dt.day()) { match Date::try_from_ymd(expanded_year, dt.month(), dt.day()) {
Ok(date) => Some(PrimitiveDateTime::new(date, dt.time())), Ok(date) => Some(PrimitiveDateTime::new(date, dt.time())),
Err(_) => None Err(_) => None,
} }
} }
Err(_) => None Err(_) => None,
} }
} }

View file

@ -97,11 +97,9 @@ async fn test_h2_body() -> io::Result<()> {
let data = "HELLOWORLD".to_owned().repeat(64 * 1024); let data = "HELLOWORLD".to_owned().repeat(64 * 1024);
let mut srv = test_server(move || { let mut srv = test_server(move || {
HttpService::build() HttpService::build()
.h2(|mut req: Request<_>| { .h2(|mut req: Request<_>| async move {
async move { let body = load_body(req.take_payload()).await?;
let body = load_body(req.take_payload()).await?; Ok::<_, Error>(Response::Ok().body(body))
Ok::<_, Error>(Response::Ok().body(body))
}
}) })
.openssl(ssl_acceptor()) .openssl(ssl_acceptor())
.map_err(|_| ()) .map_err(|_| ())

View file

@ -104,11 +104,9 @@ async fn test_h2_body1() -> io::Result<()> {
let data = "HELLOWORLD".to_owned().repeat(64 * 1024); let data = "HELLOWORLD".to_owned().repeat(64 * 1024);
let mut srv = test_server(move || { let mut srv = test_server(move || {
HttpService::build() HttpService::build()
.h2(|mut req: Request<_>| { .h2(|mut req: Request<_>| async move {
async move { let body = load_body(req.take_payload()).await?;
let body = load_body(req.take_payload()).await?; Ok::<_, Error>(Response::Ok().body(body))
Ok::<_, Error>(Response::Ok().body(body))
}
}) })
.rustls(ssl_acceptor()) .rustls(ssl_acceptor())
}); });