From cd0773662f30ee60f88e60d6238127afdf4a9343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Laignel?= Date: Thu, 25 Nov 2021 19:26:26 +0100 Subject: [PATCH] ts: migrate most tests so that they don't use tokio --- generic/threadshare/src/runtime/executor.rs | 105 +++++++---- generic/threadshare/src/runtime/task.rs | 190 ++++++++++---------- generic/threadshare/src/runtime/time.rs | 14 +- 3 files changed, 173 insertions(+), 136 deletions(-) diff --git a/generic/threadshare/src/runtime/executor.rs b/generic/threadshare/src/runtime/executor.rs index a4f614fc..215bf23f 100644 --- a/generic/threadshare/src/runtime/executor.rs +++ b/generic/threadshare/src/runtime/executor.rs @@ -706,14 +706,85 @@ mod tests { const SLEEP_DURATION: Duration = Duration::from_millis(SLEEP_DURATION_MS); const DELAY: Duration = Duration::from_millis(SLEEP_DURATION_MS * 10); - #[tokio::test] - async fn drain_sub_tasks() { + #[test] + fn block_on_task_id() { + gst::init().unwrap(); + + crate::runtime::executor::block_on(async { + let (_ctx, task_id) = Context::current_task().unwrap(); + assert_eq!(task_id, super::TaskId(0)); + + /* Adding the sub task fails + let res = Context::add_sub_task(async move { + let (_ctx, task_id) = Context::current_task().unwrap(); + assert_eq!(task_id, super::TaskId(0)); + Ok(()) + }); + assert!(res.is_ok()); + */ + }); + } + + #[test] + fn context_task_id() { + gst::init().unwrap(); + + let context = Context::acquire("context_task_id", SLEEP_DURATION).unwrap(); + let join_handle = context.spawn(async { + let (_ctx, task_id) = Context::current_task().unwrap(); + assert_eq!(task_id, super::TaskId(0)); + }); + futures::executor::block_on(join_handle).unwrap(); + + let ctx_weak = context.downgrade(); + let join_handle = context.spawn(async move { + let (_ctx, task_id) = Context::current_task().unwrap(); + assert_eq!(task_id, super::TaskId(1)); + + let res = Context::add_sub_task(async move { + let (_ctx, task_id) = Context::current_task().unwrap(); + assert_eq!(task_id, super::TaskId(1)); + Ok(()) + }); + assert!(res.is_ok()); + + ctx_weak + .upgrade() + .unwrap() + .spawn(async { + let (_ctx, task_id) = Context::current_task().unwrap(); + assert_eq!(task_id, super::TaskId(2)); + + let res = Context::add_sub_task(async move { + let (_ctx, task_id) = Context::current_task().unwrap(); + assert_eq!(task_id, super::TaskId(2)); + Ok(()) + }); + assert!(res.is_ok()); + assert!(Context::drain_sub_tasks().await.is_ok()); + + let (_ctx, task_id) = Context::current_task().unwrap(); + assert_eq!(task_id, super::TaskId(2)); + }) + .await + .unwrap(); + + assert!(Context::drain_sub_tasks().await.is_ok()); + + let (_ctx, task_id) = Context::current_task().unwrap(); + assert_eq!(task_id, super::TaskId(1)); + }); + futures::executor::block_on(join_handle).unwrap(); + } + + #[test] + fn drain_sub_tasks() { // Setup gst::init().unwrap(); let context = Context::acquire("drain_sub_tasks", SLEEP_DURATION).unwrap(); - let join_handle = context.spawn(async move { + let join_handle = context.spawn(async { let (sender, mut receiver) = mpsc::channel(1); let sender: Arc>> = Arc::new(Mutex::new(sender)); @@ -754,38 +825,12 @@ mod tests { receiver }); - let mut receiver = join_handle.await.unwrap(); + let mut receiver = futures::executor::block_on(join_handle).unwrap(); // The last sub task should be simply dropped at this point assert_eq!(receiver.try_next().unwrap(), None); } - #[tokio::test] - async fn block_on_within_tokio() { - gst::init().unwrap(); - - let context = Context::acquire("block_on_within_tokio", SLEEP_DURATION).unwrap(); - - let bytes_sent = crate::runtime::executor::block_on(context.spawn(async { - let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 5000); - let socket = UdpSocket::bind(saddr).unwrap(); - let mut socket = tokio::net::UdpSocket::from_std(socket).unwrap(); - let saddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4000); - socket.send_to(&[0; 10], saddr).await.unwrap() - })) - .unwrap(); - assert_eq!(bytes_sent, 10); - - let elapsed = crate::runtime::executor::block_on(context.spawn(async { - let now = Instant::now(); - crate::runtime::time::delay_for(DELAY).await; - now.elapsed() - })) - .unwrap(); - // Due to throttling, `Delay` may be fired earlier - assert!(elapsed + SLEEP_DURATION / 2 >= DELAY); - } - #[test] fn block_on_from_sync() { gst::init().unwrap(); diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index 1feade11..528fb25a 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -1150,14 +1150,15 @@ impl StateMachine { #[cfg(test)] mod tests { use futures::channel::{mpsc, oneshot}; + use futures::executor::block_on; use std::time::Duration; use crate::runtime::Context; use super::*; - #[tokio::test] - async fn iterate() { + #[test] + fn iterate() { gst::init().unwrap(); struct TaskTest { @@ -1302,15 +1303,15 @@ mod tests { assert_eq!(task.state(), TaskState::Started); // At this point, prepared must be completed - prepared_receiver.next().await.unwrap(); + block_on(prepared_receiver.next()).unwrap(); // ... and start executed - started_receiver.next().await.unwrap(); + block_on(started_receiver.next()).unwrap(); assert_eq!(task.state(), TaskState::Started); // unlock task loop and keep looping - iterate_receiver.next().await.unwrap(); - complete_iterate_sender.send(Ok(())).await.unwrap(); + block_on(iterate_receiver.next()).unwrap(); + block_on(complete_iterate_sender.send(Ok(()))).unwrap(); gst_debug!(RUNTIME_CAT, "task_iterate: starting (redundant)"); // start will return immediately @@ -1334,16 +1335,16 @@ mod tests { // Pause transition is asynchronous while TaskState::Paused != task.state() { - tokio::time::delay_for(Duration::from_millis(2)).await; + std::thread::sleep(Duration::from_millis(2)); if let Ok(Some(())) = iterate_receiver.try_next() { // unlock iteration - complete_iterate_sender.send(Ok(())).await.unwrap(); + block_on(complete_iterate_sender.send(Ok(()))).unwrap(); } } gst_debug!(RUNTIME_CAT, "task_iterate: awaiting pause ack"); - paused_receiver.next().await.unwrap(); + block_on(paused_receiver.next()).unwrap(); gst_debug!(RUNTIME_CAT, "task_iterate: starting (after pause)"); assert_eq!( @@ -1356,7 +1357,7 @@ mod tests { assert_eq!(task.state(), TaskState::Started); // Paused -> Started - let _ = started_receiver.next().await; + let _ = block_on(started_receiver.next()); gst_debug!(RUNTIME_CAT, "task_iterate: stopping"); assert_eq!( @@ -1368,7 +1369,7 @@ mod tests { ); assert_eq!(task.state(), TaskState::Stopped); - let _ = stopped_receiver.next().await; + let _ = block_on(stopped_receiver.next()); // purge remaining iteration received before stop if any let _ = iterate_receiver.try_next(); @@ -1381,21 +1382,18 @@ mod tests { target: TaskState::Started }, ); - let _ = started_receiver.next().await; + let _ = block_on(started_receiver.next()); gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Eos"); - iterate_receiver.next().await.unwrap(); - complete_iterate_sender - .send(Err(gst::FlowError::Eos)) - .await - .unwrap(); + block_on(iterate_receiver.next()).unwrap(); + block_on(complete_iterate_sender.send(Err(gst::FlowError::Eos))).unwrap(); gst_debug!(RUNTIME_CAT, "task_iterate: awaiting stop ack"); - stopped_receiver.next().await.unwrap(); + block_on(stopped_receiver.next()).unwrap(); // Wait for state machine to reach Stopped while TaskState::Stopped != task.state() { - tokio::time::delay_for(Duration::from_millis(2)).await; + std::thread::sleep(Duration::from_millis(2)); } gst_debug!(RUNTIME_CAT, "task_iterate: starting (after stop)"); @@ -1406,21 +1404,18 @@ mod tests { target: TaskState::Started }, ); - let _ = started_receiver.next().await; + let _ = block_on(started_receiver.next()); gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Flushing"); - iterate_receiver.next().await.unwrap(); - complete_iterate_sender - .send(Err(gst::FlowError::Flushing)) - .await - .unwrap(); + block_on(iterate_receiver.next()).unwrap(); + block_on(complete_iterate_sender.send(Err(gst::FlowError::Flushing))).unwrap(); gst_debug!(RUNTIME_CAT, "task_iterate: awaiting flush_start ack"); - flush_start_receiver.next().await.unwrap(); + block_on(flush_start_receiver.next()).unwrap(); // Wait for state machine to reach Flushing while TaskState::Flushing != task.state() { - tokio::time::delay_for(Duration::from_millis(2)).await; + std::thread::sleep(Duration::from_millis(2)); } gst_debug!(RUNTIME_CAT, "task_iterate: stop flushing"); @@ -1431,18 +1426,15 @@ mod tests { target: TaskState::Started }, ); - let _ = started_receiver.next().await; + let _ = block_on(started_receiver.next()); gst_debug!(RUNTIME_CAT, "task_iterate: req. iterate to return Error"); - iterate_receiver.next().await.unwrap(); - complete_iterate_sender - .send(Err(gst::FlowError::Error)) - .await - .unwrap(); + block_on(iterate_receiver.next()).unwrap(); + block_on(complete_iterate_sender.send(Err(gst::FlowError::Error))).unwrap(); // Wait for state machine to reach Error while TaskState::Error != task.state() { - tokio::time::delay_for(Duration::from_millis(2)).await; + std::thread::sleep(Duration::from_millis(2)); } gst_debug!( @@ -1468,11 +1460,11 @@ mod tests { ); assert_eq!(task.state(), TaskState::Unprepared); - let _ = unprepared_receiver.next().await; + let _ = block_on(unprepared_receiver.next()); } - #[tokio::test] - async fn prepare_error() { + #[test] + fn prepare_error() { gst::init().unwrap(); struct TaskPrepareTest { @@ -1538,11 +1530,11 @@ mod tests { RUNTIME_CAT, "prepare_error: await action error notification" ); - prepare_error_receiver.next().await.unwrap(); + block_on(prepare_error_receiver.next()).unwrap(); // Wait for state machine to reach Error while TaskState::Error != task.state() { - tokio::time::delay_for(Duration::from_millis(2)).await; + std::thread::sleep(Duration::from_millis(2)); } let res = task.start().unwrap_err(); @@ -1558,8 +1550,8 @@ mod tests { task.unprepare().unwrap(); } - #[tokio::test] - async fn prepare_start_ok() { + #[test] + fn prepare_start_ok() { // Hold the preparation function so that it completes after the start request is engaged gst::init().unwrap(); @@ -1651,16 +1643,16 @@ mod tests { }); gst_debug!(RUNTIME_CAT, "prepare_start_ok: awaiting for start_ctx"); - ready_receiver.await.unwrap(); + block_on(ready_receiver).unwrap(); gst_debug!(RUNTIME_CAT, "prepare_start_ok: triggering preparation"); - prepare_sender.send(()).await.unwrap(); + block_on(prepare_sender.send(())).unwrap(); - start_handle.await.unwrap(); + block_on(start_handle).unwrap(); } - #[tokio::test] - async fn prepare_start_error() { + #[test] + fn prepare_start_error() { // Hold the preparation function so that it completes after the start request is engaged gst::init().unwrap(); @@ -1756,25 +1748,25 @@ mod tests { }); gst_debug!(RUNTIME_CAT, "prepare_start_error: awaiting for start_ctx"); - ready_receiver.await.unwrap(); + block_on(ready_receiver).unwrap(); gst_debug!( RUNTIME_CAT, "prepare_start_error: triggering preparation (failure)" ); - prepare_sender.send(()).await.unwrap(); + block_on(prepare_sender.send(())).unwrap(); gst_debug!( RUNTIME_CAT, "prepare_start_error: await prepare error notification" ); - prepare_error_receiver.next().await.unwrap(); + block_on(prepare_error_receiver.next()).unwrap(); - start_handle.await.unwrap(); + block_on(start_handle).unwrap(); } - #[tokio::test] - async fn pause_start() { + #[test] + fn pause_start() { gst::init().unwrap(); struct TaskPauseStartTest { @@ -1836,7 +1828,7 @@ mod tests { assert_eq!(task.state(), TaskState::Started); gst_debug!(RUNTIME_CAT, "pause_start: awaiting 1st iteration"); - iterate_receiver.next().await.unwrap(); + block_on(iterate_receiver.next()).unwrap(); gst_debug!(RUNTIME_CAT, "pause_start: pausing (1)"); assert_eq!( @@ -1852,11 +1844,11 @@ mod tests { // Pause transition is asynchronous while TaskState::Paused != task.state() { - tokio::time::delay_for(Duration::from_millis(5)).await; + std::thread::sleep(Duration::from_millis(5)); } gst_debug!(RUNTIME_CAT, "pause_start: awaiting paused"); - let _ = paused_receiver.next().await; + let _ = block_on(paused_receiver.next()); // Loop held on due to Pause iterate_receiver.try_next().unwrap_err(); @@ -1871,7 +1863,7 @@ mod tests { assert_eq!(task.state(), TaskState::Started); gst_debug!(RUNTIME_CAT, "pause_start: awaiting 2d iteration"); - iterate_receiver.next().await.unwrap(); + block_on(iterate_receiver.next()).unwrap(); gst_debug!(RUNTIME_CAT, "pause_start: sending 2d iteration completion"); complete_sender.try_send(()).unwrap(); @@ -1880,8 +1872,8 @@ mod tests { task.unprepare().unwrap(); } - #[tokio::test] - async fn successive_pause_start() { + #[test] + fn successive_pause_start() { // Purpose: check pause cancellation. gst::init().unwrap(); @@ -1913,7 +1905,7 @@ mod tests { task.start().unwrap(); gst_debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 1"); - iterate_receiver.next().await.unwrap(); + block_on(iterate_receiver.next()).unwrap(); gst_debug!(RUNTIME_CAT, "successive_pause_start: pause and start"); task.pause().unwrap(); @@ -1922,15 +1914,15 @@ mod tests { assert_eq!(task.state(), TaskState::Started); gst_debug!(RUNTIME_CAT, "successive_pause_start: awaiting iteration 2"); - iterate_receiver.next().await.unwrap(); + block_on(iterate_receiver.next()).unwrap(); gst_debug!(RUNTIME_CAT, "successive_pause_start: stopping"); task.stop().unwrap(); task.unprepare().unwrap(); } - #[tokio::test] - async fn flush_regular_sync() { + #[test] + fn flush_regular_sync() { gst::init().unwrap(); struct TaskFlushTest { @@ -1990,7 +1982,7 @@ mod tests { ); assert_eq!(task.state(), TaskState::Flushing); - flush_start_receiver.next().await.unwrap(); + block_on(flush_start_receiver.next()).unwrap(); gst_debug!(RUNTIME_CAT, "flush_regular_sync: stopping flush"); assert_eq!( @@ -2002,15 +1994,15 @@ mod tests { ); assert_eq!(task.state(), TaskState::Started); - flush_stop_receiver.next().await.unwrap(); + block_on(flush_stop_receiver.next()).unwrap(); task.pause().unwrap(); task.stop().unwrap(); task.unprepare().unwrap(); } - #[tokio::test] - async fn flush_regular_different_context() { + #[test] + fn flush_regular_different_context() { // Purpose: make sure a flush sequence triggered from a Context doesn't block. gst::init().unwrap(); @@ -2098,15 +2090,15 @@ mod tests { assert_eq!(task_clone.state(), TaskState::Started); }); - flush_handle.await.unwrap(); - flush_stop_receiver.next().await.unwrap(); + block_on(flush_handle).unwrap(); + block_on(flush_stop_receiver.next()).unwrap(); task.stop().unwrap(); task.unprepare().unwrap(); } - #[tokio::test] - async fn flush_regular_same_context() { + #[test] + fn flush_regular_same_context() { // Purpose: make sure a flush sequence triggered from the same Context doesn't block. gst::init().unwrap(); @@ -2181,15 +2173,15 @@ mod tests { assert_eq!(task_clone.state(), TaskState::Started); }); - flush_handle.await.unwrap(); - flush_stop_receiver.next().await.unwrap(); + block_on(flush_handle).unwrap(); + block_on(flush_stop_receiver.next()).unwrap(); task.stop().unwrap(); task.unprepare().unwrap(); } - #[tokio::test] - async fn flush_from_loop() { + #[test] + fn flush_from_loop() { // Purpose: make sure a flush_start triggered from an iteration doesn't block. gst::init().unwrap(); @@ -2244,7 +2236,7 @@ mod tests { RUNTIME_CAT, "flush_from_loop: awaiting flush_start notification" ); - flush_start_receiver.next().await.unwrap(); + block_on(flush_start_receiver.next()).unwrap(); assert_eq!( task.stop().unwrap(), @@ -2256,8 +2248,8 @@ mod tests { task.unprepare().unwrap(); } - #[tokio::test] - async fn pause_from_loop() { + #[test] + fn pause_from_loop() { // Purpose: make sure a start triggered from an iteration doesn't block. // E.g. an auto pause cancellation after a delay. gst::init().unwrap(); @@ -2314,14 +2306,14 @@ mod tests { task.start().unwrap(); gst_debug!(RUNTIME_CAT, "pause_from_loop: awaiting pause notification"); - pause_receiver.next().await.unwrap(); + block_on(pause_receiver.next()).unwrap(); task.stop().unwrap(); task.unprepare().unwrap(); } - #[tokio::test] - async fn trigger_from_action() { + #[test] + fn trigger_from_action() { // Purpose: make sure an event triggered from a transition action doesn't block. gst::init().unwrap(); @@ -2384,14 +2376,14 @@ mod tests { RUNTIME_CAT, "trigger_from_action: awaiting flush_stop notification" ); - flush_stop_receiver.next().await.unwrap(); + block_on(flush_stop_receiver.next()).unwrap(); task.stop().unwrap(); task.unprepare().unwrap(); } - #[tokio::test] - async fn pause_flush_start() { + #[test] + fn pause_flush_start() { gst::init().unwrap(); struct TaskFlushTest { @@ -2470,7 +2462,7 @@ mod tests { }, ); assert_eq!(task.state(), TaskState::PausedFlushing); - flush_start_receiver.next().await; + block_on(flush_start_receiver.next()); gst_debug!(RUNTIME_CAT, "pause_flush_start: stopping flush"); assert_eq!( @@ -2481,7 +2473,7 @@ mod tests { }, ); assert_eq!(task.state(), TaskState::Paused); - flush_stop_receiver.next().await; + block_on(flush_stop_receiver.next()); // start action not executed started_receiver.try_next().unwrap_err(); @@ -2495,14 +2487,14 @@ mod tests { }, ); assert_eq!(task.state(), TaskState::Started); - started_receiver.next().await; + block_on(started_receiver.next()); task.stop().unwrap(); task.unprepare().unwrap(); } - #[tokio::test] - async fn pause_flushing_start() { + #[test] + fn pause_flushing_start() { gst::init().unwrap(); struct TaskFlushTest { @@ -2569,7 +2561,7 @@ mod tests { gst_debug!(RUNTIME_CAT, "pause_flushing_start: starting flush"); task.flush_start().unwrap(); assert_eq!(task.state(), TaskState::PausedFlushing); - flush_start_receiver.next().await; + block_on(flush_start_receiver.next()); gst_debug!(RUNTIME_CAT, "pause_flushing_start: starting while flushing"); assert_eq!( @@ -2593,15 +2585,15 @@ mod tests { }, ); assert_eq!(task.state(), TaskState::Started); - flush_stop_receiver.next().await; - started_receiver.next().await; + block_on(flush_stop_receiver.next()); + block_on(started_receiver.next()); task.stop().unwrap(); task.unprepare().unwrap(); } - #[tokio::test] - async fn flush_concurrent_start() { + #[test] + fn flush_concurrent_start() { // Purpose: check the racy case of start being triggered in // after flush_start // e.g.: a FlushStart event received on a Pad and the element starting after a Pause gst::init().unwrap(); @@ -2682,7 +2674,7 @@ mod tests { RUNTIME_CAT, "flush_concurrent_start: awaiting for oob_context" ); - ready_receiver.await.unwrap(); + block_on(ready_receiver).unwrap(); gst_debug!(RUNTIME_CAT, "flush_concurrent_start: // start"); let res = task.start().unwrap(); @@ -2698,7 +2690,7 @@ mod tests { other => unreachable!("{:?}", other), } - flush_start_handle.await.unwrap(); + block_on(flush_start_handle).unwrap(); gst_debug!(RUNTIME_CAT, "flush_concurrent_start: requesting flush_stop"); assert_eq!( @@ -2710,14 +2702,14 @@ mod tests { ); assert_eq!(task.state(), TaskState::Started); - flush_stop_receiver.next().await; + block_on(flush_stop_receiver.next()); task.stop().unwrap(); task.unprepare().unwrap(); } - #[tokio::test] - async fn start_timer() { + #[test] + fn start_timer() { // Purpose: make sure a Timer initialized in a transition is // available when iterating in the loop. gst::init().unwrap(); @@ -2770,7 +2762,7 @@ mod tests { gst_debug!(RUNTIME_CAT, "start_timer: start"); task.start().unwrap(); - timer_elapsed_receiver.await.unwrap(); + block_on(timer_elapsed_receiver).unwrap(); gst_debug!(RUNTIME_CAT, "start_timer: timer elapsed received"); task.stop().unwrap(); diff --git a/generic/threadshare/src/runtime/time.rs b/generic/threadshare/src/runtime/time.rs index b085a2a0..8010acf5 100644 --- a/generic/threadshare/src/runtime/time.rs +++ b/generic/threadshare/src/runtime/time.rs @@ -68,18 +68,18 @@ pub fn interval(interval: Duration) -> impl Stream { mod tests { use std::time::{Duration, Instant}; - use crate::runtime::Context; + use crate::runtime::{executor, Context}; const MAX_THROTTLING: Duration = Duration::from_millis(10); const DELAY: Duration = Duration::from_millis(12); - #[tokio::test] - async fn delay_for() { + #[test] + fn delay_for() { gst::init().unwrap(); let context = Context::acquire("delay_for", MAX_THROTTLING).unwrap(); - let elapsed = crate::runtime::executor::block_on(context.spawn(async { + let elapsed = executor::block_on(context.spawn(async { let now = Instant::now(); crate::runtime::time::delay_for(DELAY).await; now.elapsed() @@ -90,13 +90,13 @@ mod tests { assert!(elapsed + MAX_THROTTLING / 2 >= DELAY); } - #[tokio::test] - async fn delay_for_at_least() { + #[test] + fn delay_for_at_least() { gst::init().unwrap(); let context = Context::acquire("delay_for_at_least", MAX_THROTTLING).unwrap(); - let elapsed = crate::runtime::executor::block_on(context.spawn(async { + let elapsed = executor::block_on(context.spawn(async { let now = Instant::now(); crate::runtime::time::delay_for_at_least(DELAY).await; now.elapsed()