1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-02-12 09:05:15 +00:00

Remove uses of Pin::new_unchecked in h1 Dispatcher (#1374)

This removes the last uses of unsafe `Pin` functions in actix-web.

This PR adds a `Pin<Box<_>>` wrapper to `DispatcherState::Upgrade`,
`State::ExpectCall`, and `State::ServiceCall`.

The previous uses of the futures `State::ExpectCall` and `State::ServiceCall`
were Undefined Behavior - a future was obtained from `self.expect.call`
or `self.service.call`, pinned on the stack, and then immediately
returned from `handle_request`. The only alternative to using `Box::pin`
would be to refactor `handle_request` to write the futures directly into
their final location, or avoid polling them before they are returned.

The previous use of `DispatcherState::Upgrade` doesn't seem to be
unsound. However, having data pinned inside an enum that we
`std::mem::replace` would require some careful `unsafe` code to ensure
that we never call `std::mem::replace` when the active variant contains
pinned data. By using `Box::pin`, we any possibility of future
refactoring accidentally introducing undefined behavior.

Co-authored-by: Yuki Okushi <huyuumi.dev@gmail.com>
This commit is contained in:
Aaron Hill 2020-02-25 18:21:05 -05:00 committed by GitHub
parent de1d6ad5cb
commit 71c4bd1b30
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -66,7 +66,7 @@ where
U::Error: fmt::Display, U::Error: fmt::Display,
{ {
Normal(InnerDispatcher<T, S, B, X, U>), Normal(InnerDispatcher<T, S, B, X, U>),
Upgrade(U::Future), Upgrade(Pin<Box<U::Future>>),
None, None,
} }
@ -114,8 +114,8 @@ where
B: MessageBody, B: MessageBody,
{ {
None, None,
ExpectCall(X::Future), ExpectCall(Pin<Box<X::Future>>),
ServiceCall(S::Future), ServiceCall(Pin<Box<S::Future>>),
SendPayload(ResponseBody<B>), SendPayload(ResponseBody<B>),
} }
@ -298,7 +298,7 @@ where
let len = self.write_buf.len(); let len = self.write_buf.len();
let mut written = 0; let mut written = 0;
while written < len { while written < len {
match unsafe { Pin::new_unchecked(&mut self.io) } match Pin::new(&mut self.io)
.poll_write(cx, &self.write_buf[written..]) .poll_write(cx, &self.write_buf[written..])
{ {
Poll::Ready(Ok(0)) => { Poll::Ready(Ok(0)) => {
@ -372,10 +372,10 @@ where
None => None, None => None,
}, },
State::ExpectCall(ref mut fut) => { State::ExpectCall(ref mut fut) => {
match unsafe { Pin::new_unchecked(fut) }.poll(cx) { match fut.as_mut().poll(cx) {
Poll::Ready(Ok(req)) => { Poll::Ready(Ok(req)) => {
self.send_continue(); self.send_continue();
self.state = State::ServiceCall(self.service.call(req)); self.state = State::ServiceCall(Box::pin(self.service.call(req)));
continue; continue;
} }
Poll::Ready(Err(e)) => { Poll::Ready(Err(e)) => {
@ -387,7 +387,7 @@ where
} }
} }
State::ServiceCall(ref mut fut) => { State::ServiceCall(ref mut fut) => {
match unsafe { Pin::new_unchecked(fut) }.poll(cx) { match fut.as_mut().poll(cx) {
Poll::Ready(Ok(res)) => { Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
self.state = self.send_response(res, body)?; self.state = self.send_response(res, body)?;
@ -463,8 +463,8 @@ where
) -> Result<State<S, B, X>, DispatchError> { ) -> Result<State<S, B, X>, DispatchError> {
// Handle `EXPECT: 100-Continue` header // Handle `EXPECT: 100-Continue` header
let req = if req.head().expect() { let req = if req.head().expect() {
let mut task = self.expect.call(req); let mut task = Box::pin(self.expect.call(req));
match unsafe { Pin::new_unchecked(&mut task) }.poll(cx) { match task.as_mut().poll(cx) {
Poll::Ready(Ok(req)) => { Poll::Ready(Ok(req)) => {
self.send_continue(); self.send_continue();
req req
@ -482,8 +482,8 @@ where
}; };
// Call service // Call service
let mut task = self.service.call(req); let mut task = Box::pin(self.service.call(req));
match unsafe { Pin::new_unchecked(&mut task) }.poll(cx) { match task.as_mut().poll(cx) {
Poll::Ready(Ok(res)) => { Poll::Ready(Ok(res)) => {
let (res, body) = res.into().replace_body(()); let (res, body) = res.into().replace_body(());
self.send_response(res, body) self.send_response(res, body)
@ -681,20 +681,6 @@ where
} }
} }
impl<T, S, B, X, U> Unpin for Dispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request = Request>,
S::Error: Into<Error>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>,
U: Service<Request = (Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
}
impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U> impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
@ -771,7 +757,7 @@ where
parts.write_buf = inner.write_buf; parts.write_buf = inner.write_buf;
let framed = Framed::from_parts(parts); let framed = Framed::from_parts(parts);
self.inner = DispatcherState::Upgrade( self.inner = DispatcherState::Upgrade(
inner.upgrade.unwrap().call((req, framed)), Box::pin(inner.upgrade.unwrap().call((req, framed))),
); );
return self.poll(cx); return self.poll(cx);
} else { } else {
@ -823,7 +809,7 @@ where
} }
} }
DispatcherState::Upgrade(ref mut fut) => { DispatcherState::Upgrade(ref mut fut) => {
unsafe { Pin::new_unchecked(fut) }.poll(cx).map_err(|e| { fut.as_mut().poll(cx).map_err(|e| {
error!("Upgrade handler error: {}", e); error!("Upgrade handler error: {}", e);
DispatchError::Upgrade DispatchError::Upgrade
}) })