mirror of
https://github.com/actix/actix-web.git
synced 2025-04-09 19:34:05 +00:00
commit
c725427b82
4 changed files with 80 additions and 21 deletions
|
@ -361,10 +361,8 @@ impl MessageBody for String {
|
|||
|
||||
/// Type represent streaming body.
|
||||
/// Response does not contain `content-length` header and appropriate transfer encoding is used.
|
||||
#[pin_project]
|
||||
pub struct BodyStream<S, E> {
|
||||
#[pin]
|
||||
stream: S,
|
||||
stream: Pin<Box<S>>,
|
||||
_t: PhantomData<E>,
|
||||
}
|
||||
|
||||
|
@ -375,7 +373,7 @@ where
|
|||
{
|
||||
pub fn new(stream: S) -> Self {
|
||||
BodyStream {
|
||||
stream,
|
||||
stream: Box::pin(stream),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
@ -396,7 +394,7 @@ where
|
|||
/// ended on a zero-length chunk, but rather proceed until the underlying
|
||||
/// [`Stream`] ends.
|
||||
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
|
||||
let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream;
|
||||
let mut stream = self.stream.as_mut();
|
||||
loop {
|
||||
return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) {
|
||||
Some(Ok(ref bytes)) if bytes.is_empty() => continue,
|
||||
|
@ -408,11 +406,9 @@ where
|
|||
|
||||
/// Type represent streaming body. This body implementation should be used
|
||||
/// if total size of stream is known. Data get sent as is without using transfer encoding.
|
||||
#[pin_project]
|
||||
pub struct SizedStream<S> {
|
||||
size: u64,
|
||||
#[pin]
|
||||
stream: S,
|
||||
stream: Pin<Box<S>>,
|
||||
}
|
||||
|
||||
impl<S> SizedStream<S>
|
||||
|
@ -420,7 +416,7 @@ where
|
|||
S: Stream<Item = Result<Bytes, Error>>,
|
||||
{
|
||||
pub fn new(size: u64, stream: S) -> Self {
|
||||
SizedStream { size, stream }
|
||||
SizedStream { size, stream: Box::pin(stream) }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -438,7 +434,7 @@ where
|
|||
/// ended on a zero-length chunk, but rather proceed until the underlying
|
||||
/// [`Stream`] ends.
|
||||
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
|
||||
let mut stream = unsafe { Pin::new_unchecked(self) }.project().stream;
|
||||
let mut stream = self.stream.as_mut();
|
||||
loop {
|
||||
return Poll::Ready(match ready!(stream.as_mut().poll_next(cx)) {
|
||||
Some(Ok(ref bytes)) if bytes.is_empty() => continue,
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::cell::UnsafeCell;
|
||||
use std::cell::Cell;
|
||||
use std::fmt::Write;
|
||||
use std::rc::Rc;
|
||||
use std::time::Duration;
|
||||
|
@ -228,24 +228,24 @@ impl fmt::Write for Date {
|
|||
struct DateService(Rc<DateServiceInner>);
|
||||
|
||||
struct DateServiceInner {
|
||||
current: UnsafeCell<Option<(Date, Instant)>>,
|
||||
current: Cell<Option<(Date, Instant)>>,
|
||||
}
|
||||
|
||||
impl DateServiceInner {
|
||||
fn new() -> Self {
|
||||
DateServiceInner {
|
||||
current: UnsafeCell::new(None),
|
||||
current: Cell::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn reset(&self) {
|
||||
unsafe { (&mut *self.current.get()).take() };
|
||||
self.current.take();
|
||||
}
|
||||
|
||||
fn update(&self) {
|
||||
let now = Instant::now();
|
||||
let date = Date::new();
|
||||
*(unsafe { &mut *self.current.get() }) = Some((date, now));
|
||||
self.current.set(Some((date, now)));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -255,7 +255,7 @@ impl DateService {
|
|||
}
|
||||
|
||||
fn check_date(&self) {
|
||||
if unsafe { (&*self.0.current.get()).is_none() } {
|
||||
if self.0.current.get().is_none() {
|
||||
self.0.update();
|
||||
|
||||
// periodic date update
|
||||
|
@ -269,12 +269,12 @@ impl DateService {
|
|||
|
||||
fn now(&self) -> Instant {
|
||||
self.check_date();
|
||||
unsafe { (&*self.0.current.get()).as_ref().unwrap().1 }
|
||||
self.0.current.get().unwrap().1
|
||||
}
|
||||
|
||||
fn set_date<F: FnMut(&Date)>(&self, mut f: F) {
|
||||
self.check_date();
|
||||
f(&unsafe { (&*self.0.current.get()).as_ref().unwrap().0 })
|
||||
f(&self.0.current.get().unwrap().0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -282,6 +282,19 @@ impl DateService {
|
|||
mod tests {
|
||||
use super::*;
|
||||
|
||||
|
||||
// Test modifying the date from within the closure
|
||||
// passed to `set_date`
|
||||
#[test]
|
||||
fn test_evil_date() {
|
||||
let service = DateService::new();
|
||||
// Make sure that `check_date` doesn't try to spawn a task
|
||||
service.0.update();
|
||||
service.set_date(|_| {
|
||||
service.0.reset()
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_date_len() {
|
||||
assert_eq!(DATE_VALUE_LENGTH, "Sun, 06 Nov 1994 08:49:37 GMT".len());
|
||||
|
|
|
@ -60,6 +60,12 @@ impl Error {
|
|||
}
|
||||
}
|
||||
|
||||
/// A struct with a private constructor, for use with
|
||||
/// `__private_get_type_id__`. Its single field is private,
|
||||
/// ensuring that it can only be constructed from this module
|
||||
#[doc(hidden)]
|
||||
pub struct PrivateHelper(());
|
||||
|
||||
/// Error that can be converted to `Response`
|
||||
pub trait ResponseError: fmt::Debug + fmt::Display {
|
||||
/// Response's status code
|
||||
|
@ -83,19 +89,37 @@ pub trait ResponseError: fmt::Debug + fmt::Display {
|
|||
resp.set_body(Body::from(buf))
|
||||
}
|
||||
|
||||
/// A helper method to get the type ID of the type
|
||||
/// this trait is implemented on.
|
||||
/// This method is unsafe to *implement*, since `downcast_ref` relies
|
||||
/// on the returned `TypeId` to perform a cast.
|
||||
///
|
||||
/// Unfortunately, Rust has no notion of a trait method that is
|
||||
/// unsafe to implement (marking it as `unsafe` makes it unsafe
|
||||
/// to *call*). As a workaround, we require this method
|
||||
/// to return a private type along with the `TypeId`. This
|
||||
/// private type (`PrivateHelper`) has a private constructor,
|
||||
/// making it impossible for safe code to construct outside of
|
||||
/// this module. This ensures that safe code cannot violate
|
||||
/// type-safety by implementing this method.
|
||||
#[doc(hidden)]
|
||||
fn __private_get_type_id__(&self) -> TypeId
|
||||
fn __private_get_type_id__(&self) -> (TypeId, PrivateHelper)
|
||||
where
|
||||
Self: 'static,
|
||||
{
|
||||
TypeId::of::<Self>()
|
||||
(TypeId::of::<Self>(), PrivateHelper(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl dyn ResponseError + 'static {
|
||||
/// Downcasts a response error to a specific type.
|
||||
pub fn downcast_ref<T: ResponseError + 'static>(&self) -> Option<&T> {
|
||||
if self.__private_get_type_id__() == TypeId::of::<T>() {
|
||||
if self.__private_get_type_id__().0 == TypeId::of::<T>() {
|
||||
// Safety: external crates cannot override the default
|
||||
// implementation of `__private_get_type_id__`, since
|
||||
// it requires returning a private type. We can therefore
|
||||
// rely on the returned `TypeId`, which ensures that this
|
||||
// case is correct.
|
||||
unsafe { Some(&*(self as *const dyn ResponseError as *const T)) }
|
||||
} else {
|
||||
None
|
||||
|
|
26
tests/test_weird_poll.rs
Normal file
26
tests/test_weird_poll.rs
Normal file
|
@ -0,0 +1,26 @@
|
|||
// Regression test for #/1321
|
||||
|
||||
use futures::task::{noop_waker, Context};
|
||||
use futures::stream::once;
|
||||
use actix_http::body::{MessageBody, BodyStream};
|
||||
use bytes::Bytes;
|
||||
|
||||
#[test]
|
||||
fn weird_poll() {
|
||||
let (sender, receiver) = futures::channel::oneshot::channel();
|
||||
let mut body_stream = Ok(BodyStream::new(once(async {
|
||||
let x = Box::new(0);
|
||||
let y = &x;
|
||||
receiver.await.unwrap();
|
||||
let _z = **y;
|
||||
Ok::<_, ()>(Bytes::new())
|
||||
})));
|
||||
|
||||
let waker = noop_waker();
|
||||
let mut context = Context::from_waker(&waker);
|
||||
|
||||
let _ = body_stream.as_mut().unwrap().poll_next(&mut context);
|
||||
sender.send(()).unwrap();
|
||||
let _ = std::mem::replace(&mut body_stream, Err([0; 32])).unwrap().poll_next(&mut context);
|
||||
}
|
||||
|
Loading…
Reference in a new issue