diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 6f4c09915..a496fd993 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -66,7 +66,7 @@ where U::Error: fmt::Display, { Normal(InnerDispatcher), - Upgrade(U::Future), + Upgrade(Pin>), None, } @@ -114,8 +114,8 @@ where B: MessageBody, { None, - ExpectCall(X::Future), - ServiceCall(S::Future), + ExpectCall(Pin>), + ServiceCall(Pin>), SendPayload(ResponseBody), } @@ -298,7 +298,7 @@ where let len = self.write_buf.len(); let mut written = 0; while written < len { - match unsafe { Pin::new_unchecked(&mut self.io) } + match Pin::new(&mut self.io) .poll_write(cx, &self.write_buf[written..]) { Poll::Ready(Ok(0)) => { @@ -372,10 +372,10 @@ where None => None, }, State::ExpectCall(ref mut fut) => { - match unsafe { Pin::new_unchecked(fut) }.poll(cx) { + match fut.as_mut().poll(cx) { Poll::Ready(Ok(req)) => { self.send_continue(); - self.state = State::ServiceCall(self.service.call(req)); + self.state = State::ServiceCall(Box::pin(self.service.call(req))); continue; } Poll::Ready(Err(e)) => { @@ -387,7 +387,7 @@ where } } State::ServiceCall(ref mut fut) => { - match unsafe { Pin::new_unchecked(fut) }.poll(cx) { + match fut.as_mut().poll(cx) { Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.state = self.send_response(res, body)?; @@ -463,8 +463,8 @@ where ) -> Result, DispatchError> { // Handle `EXPECT: 100-Continue` header let req = if req.head().expect() { - let mut task = self.expect.call(req); - match unsafe { Pin::new_unchecked(&mut task) }.poll(cx) { + let mut task = Box::pin(self.expect.call(req)); + match task.as_mut().poll(cx) { Poll::Ready(Ok(req)) => { self.send_continue(); req @@ -482,8 +482,8 @@ where }; // Call service - let mut task = self.service.call(req); - match unsafe { Pin::new_unchecked(&mut task) }.poll(cx) { + let mut task = Box::pin(self.service.call(req)); + match task.as_mut().poll(cx) { Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); self.send_response(res, body) @@ -681,20 +681,6 @@ where } } -impl Unpin for Dispatcher -where - T: AsyncRead + AsyncWrite + Unpin, - S: Service, - S::Error: Into, - S::Response: Into>, - B: MessageBody, - X: Service, - X::Error: Into, - U: Service), Response = ()>, - U::Error: fmt::Display, -{ -} - impl Future for Dispatcher where T: AsyncRead + AsyncWrite + Unpin, @@ -771,7 +757,7 @@ where parts.write_buf = inner.write_buf; let framed = Framed::from_parts(parts); self.inner = DispatcherState::Upgrade( - inner.upgrade.unwrap().call((req, framed)), + Box::pin(inner.upgrade.unwrap().call((req, framed))), ); return self.poll(cx); } else { @@ -823,7 +809,7 @@ where } } DispatcherState::Upgrade(ref mut fut) => { - unsafe { Pin::new_unchecked(fut) }.poll(cx).map_err(|e| { + fut.as_mut().poll(cx).map_err(|e| { error!("Upgrade handler error: {}", e); DispatchError::Upgrade })