diff --git a/gstreamer-video/Cargo.toml b/gstreamer-video/Cargo.toml index 24cb5ecaa..f517a5fd2 100644 --- a/gstreamer-video/Cargo.toml +++ b/gstreamer-video/Cargo.toml @@ -23,7 +23,6 @@ gst = { package = "gstreamer", path = "../gstreamer" } gst-base = { package = "gstreamer-base", path = "../gstreamer-base" } once_cell = "1.0" futures-channel = "0.3" -futures-util = "0.3" [build-dependencies] gstreamer-rs-lgpl-docs = { path = "../docs", optional = true } diff --git a/gstreamer-video/src/functions.rs b/gstreamer-video/src/functions.rs index 03f5244d2..a2d1e8820 100644 --- a/gstreamer-video/src/functions.rs +++ b/gstreamer-video/src/functions.rs @@ -109,14 +109,12 @@ pub fn convert_sample_future( skip_assert_initialized!(); use futures_channel::oneshot; - use futures_util::future::lazy; - use futures_util::future::FutureExt; let (sender, receiver) = oneshot::channel(); let sample = sample.clone(); let caps = caps.clone(); - let future = lazy(move |_| { + let future = async move { assert!( glib::MainContext::ref_thread_default().is_owner(), "Spawning futures only allowed if the thread is owning the MainContext" @@ -125,8 +123,11 @@ pub fn convert_sample_future( convert_sample_async(&sample, &caps, timeout, move |res| { let _ = sender.send(res); }); - }) - .then(|_| receiver.map(|res| res.expect("Sender dropped before callback was called"))); + + receiver + .await + .expect("Sender dropped before callback was called") + }; Box::pin(future) } diff --git a/gstreamer/Cargo.toml b/gstreamer/Cargo.toml index d191dd83e..5313525e7 100644 --- a/gstreamer/Cargo.toml +++ b/gstreamer/Cargo.toml @@ -22,8 +22,8 @@ glib = { git = "https://github.com/gtk-rs/gtk-rs" } num-rational = { version = "0.3", default-features = false, features = [] } once_cell = "1.0" futures-core = "0.3" -futures-util = "0.3" futures-channel = "0.3" +futures-util = { version = "0.3", default-features = false } muldiv = "1" serde = { version = "1.0", optional = true } serde_bytes = { version = "0.11", optional = true } diff --git a/gstreamer/src/clock.rs b/gstreamer/src/clock.rs index 61b7ccccf..dfb3ed02a 100644 --- a/gstreamer/src/clock.rs +++ b/gstreamer/src/clock.rs @@ -174,18 +174,10 @@ impl SingleShotClockId { pub fn wait_async_future( &self, ) -> Result< - Pin< - Box< - dyn Future> - + Unpin - + Send - + 'static, - >, - >, + Pin> + Send + 'static>>, ClockError, > { use futures_channel::oneshot; - use futures_util::TryFutureExt; let (sender, receiver) = oneshot::channel(); @@ -196,7 +188,9 @@ impl SingleShotClockId { } })?; - Ok(Box::pin(receiver.map_err(|_| ClockError::Unscheduled))) + Ok(Box::pin(async move { + receiver.await.map_err(|_| ClockError::Unscheduled) + })) } } diff --git a/gstreamer/src/element.rs b/gstreamer/src/element.rs index 2c93369b7..766961607 100644 --- a/gstreamer/src/element.rs +++ b/gstreamer/src/element.rs @@ -36,9 +36,6 @@ use std::ffi::CStr; #[cfg(any(feature = "v1_10", feature = "dox"))] #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_10")))] use std::future::Future; -#[cfg(any(feature = "v1_10", feature = "dox"))] -#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_10")))] -use std::marker::Unpin; use std::mem; use std::num::NonZeroU64; #[cfg(any(feature = "v1_10", feature = "dox"))] @@ -262,10 +259,7 @@ pub trait ElementExtManual: 'static { #[cfg(any(feature = "v1_10", feature = "dox"))] #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_10")))] - fn call_async_future( - &self, - func: F, - ) -> Pin + Unpin + Send + 'static>> + fn call_async_future(&self, func: F) -> Pin + Send + 'static>> where F: FnOnce(&Self) -> T + Send + 'static, T: Send + 'static; @@ -810,16 +804,12 @@ impl> ElementExtManual for O { #[cfg(any(feature = "v1_10", feature = "dox"))] #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_10")))] - fn call_async_future( - &self, - func: F, - ) -> Pin + Unpin + Send + 'static>> + fn call_async_future(&self, func: F) -> Pin + Send + 'static>> where F: FnOnce(&Self) -> T + Send + 'static, T: Send + 'static, { use futures_channel::oneshot; - use futures_util::future::FutureExt; let (sender, receiver) = oneshot::channel(); @@ -827,7 +817,7 @@ impl> ElementExtManual for O { let _ = sender.send(func(element)); }); - Box::pin(receiver.map(|res| res.expect("sender dropped"))) + Box::pin(async move { receiver.await.expect("sender dropped") }) } fn get_current_running_time(&self) -> crate::ClockTime {