mirror of
https://github.com/actix/actix-web.git
synced 2024-11-25 11:01:14 +00:00
don't hang after dropping mutipart (#2463)
This commit is contained in:
parent
cf54388534
commit
654dc64a09
2 changed files with 51 additions and 4 deletions
|
@ -10,8 +10,10 @@
|
|||
* Added `Field::name` method for getting the field name. [#2451]
|
||||
* `MultipartError` now marks variants with inner errors as the source. [#2451]
|
||||
* `MultipartError` is now marked as non-exhaustive. [#2451]
|
||||
* Polling `Field` after dropping `Multipart` now fails immediately instead of hanging forever. [#2463]
|
||||
|
||||
[#2451]: https://github.com/actix/actix-web/pull/2451
|
||||
[#2463]: https://github.com/actix/actix-web/pull/2463
|
||||
|
||||
|
||||
## 0.4.0-beta.7 - 2021-10-20
|
||||
|
|
|
@ -706,8 +706,11 @@ impl Clone for PayloadRef {
|
|||
}
|
||||
}
|
||||
|
||||
/// Counter. It tracks of number of clones of payloads and give access to payload only to top most
|
||||
/// task panics if Safety get destroyed and it not top most task.
|
||||
/// Counter. It tracks of number of clones of payloads and give access to payload only to top most.
|
||||
/// * When dropped, parent task is awakened. This is to support the case where Field is
|
||||
/// dropped in a separate task than Multipart.
|
||||
/// * Assumes that parent owners don't move to different tasks; only the top-most is allowed to.
|
||||
/// * If dropped and is not top most owner, is_clean flag is set to false.
|
||||
#[derive(Debug)]
|
||||
struct Safety {
|
||||
task: LocalWaker,
|
||||
|
@ -750,9 +753,9 @@ impl Safety {
|
|||
|
||||
impl Drop for Safety {
|
||||
fn drop(&mut self) {
|
||||
// parent task is dead
|
||||
if Rc::strong_count(&self.payload) != self.level {
|
||||
self.clean.set(true);
|
||||
// Multipart dropped leaving a Field
|
||||
self.clean.set(false);
|
||||
}
|
||||
|
||||
self.task.wake();
|
||||
|
@ -853,10 +856,12 @@ mod tests {
|
|||
|
||||
use actix_http::h1::Payload;
|
||||
use actix_web::http::header::{DispositionParam, DispositionType};
|
||||
use actix_web::rt;
|
||||
use actix_web::test::TestRequest;
|
||||
use actix_web::FromRequest;
|
||||
use bytes::Bytes;
|
||||
use futures_util::{future::lazy, StreamExt};
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
|
@ -1286,4 +1291,44 @@ mod tests {
|
|||
MultipartError::NoContentDisposition,
|
||||
));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_drop_multipart_dont_hang() {
|
||||
let (sender, payload) = create_stream();
|
||||
let (bytes, headers) = create_simple_request_with_header();
|
||||
sender.send(Ok(bytes)).unwrap();
|
||||
drop(sender); // eof
|
||||
|
||||
let mut multipart = Multipart::new(&headers, payload);
|
||||
let mut field = multipart.next().await.unwrap().unwrap();
|
||||
|
||||
drop(multipart);
|
||||
|
||||
// should fail immediately
|
||||
match field.next().await {
|
||||
Some(Err(MultipartError::NotConsumed)) => {}
|
||||
_ => panic!(),
|
||||
};
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_drop_field_awaken_multipart() {
|
||||
let (sender, payload) = create_stream();
|
||||
let (bytes, headers) = create_simple_request_with_header();
|
||||
sender.send(Ok(bytes)).unwrap();
|
||||
drop(sender); // eof
|
||||
|
||||
let mut multipart = Multipart::new(&headers, payload);
|
||||
let mut field = multipart.next().await.unwrap().unwrap();
|
||||
|
||||
let task = rt::spawn(async move {
|
||||
rt::time::sleep(Duration::from_secs(1)).await;
|
||||
assert_eq!(field.next().await.unwrap().unwrap(), "test");
|
||||
drop(field);
|
||||
});
|
||||
|
||||
// dropping field should awaken current task
|
||||
let _ = multipart.next().await.unwrap().unwrap();
|
||||
task.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue