From 77c6741ae037434cb22243e5a869c93fbf15bd6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 11 Nov 2019 11:56:20 +0100 Subject: [PATCH] Update to futures 0.3 and require Rust 1.39 Also use async/await in the futures examples. --- .gitlab-ci.yml | 10 +-- examples/Cargo.toml | 8 +-- examples/src/bin/futures.rs | 66 ++++++++---------- examples/src/bin/glib-futures.rs | 54 +++++++-------- gstreamer-editing-services/Cargo.toml | 1 + gstreamer-editing-services/src/lib.rs | 2 + gstreamer/Cargo.toml | 5 +- gstreamer/src/bus.rs | 97 ++++++++++++--------------- gstreamer/src/lib.rs | 2 - 9 files changed, 110 insertions(+), 135 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 34f0b1ace..d03b837ff 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -52,7 +52,7 @@ stages: for crate in gstreamer*; do if [ -n "$ALL_FEATURES" ]; then if [ $crate = "gstreamer" ]; then - FEATURES=futures,ser_de,v1_16 + FEATURES=ser_de,v1_16 elif [ $crate = "gstreamer-gl" ]; then FEATURES=egl,x11,wayland,v1_16 else @@ -75,10 +75,10 @@ stages: cargo build --color=always --manifest-path tutorials/Cargo.toml --bins --examples --all-features fi -test 1.36: - # 1.36 img +test 1.39: + # 1.39 img # https://hub.docker.com/_/rust/ - image: "rust:1.36-slim-buster" + image: "rust:1.39-slim-buster" extends: '.cargo test' test stable: @@ -130,7 +130,7 @@ clippy: - | for crate in gstreamer*; do if [ $crate = "gstreamer" ]; then - FEATURES=futures,ser_de,v1_16 + FEATURES=ser_de,v1_16 elif [ $crate = "gstreamer-gl" ]; then FEATURES=egl,x11,wayland,v1_16 else diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 634e5cda6..1449395d1 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -21,7 +21,7 @@ gstreamer-rtsp-server-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gs gtk = { git = "https://github.com/gtk-rs/gtk", optional = true } gdk = { git = "https://github.com/gtk-rs/gdk", optional = true } gio = { git = "https://github.com/gtk-rs/gio", optional = true } -futures-preview = { version = "0.3.0-alpha", optional = true } +futures = "0.3" byte-slice-cast = "0.3" failure = "0.1" failure_derive = "0.1" @@ -43,8 +43,6 @@ gtksink = ["gtk", "gio"] gtkvideooverlay = ["gtk", "gdk", "gio"] gtkvideooverlay-x11 = ["gtkvideooverlay"] gtkvideooverlay-quartz = ["gtkvideooverlay"] -generic-futures = ["gstreamer/futures", "futures-preview"] -glib-futures = ["generic-futures", "glib/futures"] gst-rtsp-server = ["gstreamer-rtsp-server"] gst-rtsp-server-record = ["gstreamer-rtsp-server-sys", "gstreamer-rtsp-server", "gstreamer-rtsp", "gio"] v1_10 = ["gstreamer/v1_10"] @@ -120,11 +118,11 @@ name = "toc" [[bin]] name = "futures" -required-features = ["generic-futures"] +edition = "2018" [[bin]] name = "glib-futures" -required-features = ["glib-futures"] +edition = "2018" [[bin]] name = "rtsp-server-record" diff --git a/examples/src/bin/futures.rs b/examples/src/bin/futures.rs index cc0d6275f..62dadac65 100644 --- a/examples/src/bin/futures.rs +++ b/examples/src/bin/futures.rs @@ -8,15 +8,38 @@ use gst::prelude::*; extern crate futures; use futures::executor::LocalPool; -use futures::future; use futures::prelude::*; -use futures::task::SpawnExt; use std::env; #[path = "../examples-common.rs"] mod examples_common; +async fn message_loop(bus: gst::Bus) { + // BusStream implements the Stream trait + let mut messages = gst::BusStream::new(&bus); + + while let Some(msg) = messages.next().await { + use gst::MessageView; + + // Determine whether we want to quit: on EOS or error message + // we quit, otherwise simply continue. + match msg.view() { + MessageView::Eos(..) => break, + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.get_src().map(|s| s.get_path_string()), + err.get_error(), + err.get_debug() + ); + break; + } + _ => (), + }; + } +} + fn example_main() { // Read the pipeline to launch from the commandline, using the launch syntax. let pipeline_str = env::args().collect::>()[1..].join(" "); @@ -34,43 +57,8 @@ fn example_main() { // Use a LocalPool as executor. This runs single threaded on this very thread. let mut pool = LocalPool::new(); - // We use an AbortHandle for having a Future that runs forever - // until we call handle.abort() to quit our event loop - let (quit_handle, quit_registration) = future::AbortHandle::new_pair(); - let quit_future = - future::Abortable::new(future::pending::<()>(), quit_registration).map(|_| ()); - - // BusStream implements the Stream trait. Stream::for_each is calling a closure for each item - // and returns a Future that resolves when the stream is done - let messages = gst::BusStream::new(&bus).for_each(move |msg| { - use gst::MessageView; - - // Determine whether we want to quit: on EOS or error message - // we quit, otherwise simply continue. - match msg.view() { - MessageView::Eos(..) => quit_handle.abort(), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - quit_handle.abort(); - } - _ => (), - }; - - // New future to resolve for each message: nothing here - future::ready(()) - }); - - // Spawn our message handling stream - pool.spawner().spawn(messages).unwrap(); - - // And run until something is quitting the loop, i.e. an EOS - // or error message is received above - pool.run_until(quit_future); + // Run until our message loop finishes, e.g. EOS/error happens + pool.run_until(message_loop(bus)); pipeline .set_state(gst::State::Null) diff --git a/examples/src/bin/glib-futures.rs b/examples/src/bin/glib-futures.rs index 507700b67..493d691b0 100644 --- a/examples/src/bin/glib-futures.rs +++ b/examples/src/bin/glib-futures.rs @@ -4,7 +4,6 @@ use gst::prelude::*; extern crate glib; extern crate futures; -use futures::future; use futures::prelude::*; use std::env; @@ -12,6 +11,31 @@ use std::env; #[path = "../examples-common.rs"] mod examples_common; +async fn message_handler(loop_: glib::MainLoop, bus: gst::Bus) { + // BusStream implements the Stream trait + let mut messages = gst::BusStream::new(&bus); + + while let Some(msg) = messages.next().await { + use gst::MessageView; + + // Determine whether we want to quit: on EOS or error message + // we quit, otherwise simply continue. + match msg.view() { + MessageView::Eos(..) => loop_.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.get_src().map(|s| s.get_path_string()), + err.get_error(), + err.get_debug() + ); + loop_.quit(); + } + _ => (), + } + } +} + fn example_main() { // Get the default main context and make it also the thread default, then create // a main loop for it @@ -32,34 +56,8 @@ fn example_main() { .set_state(gst::State::Playing) .expect("Unable to set the pipeline to the `Playing` state"); - // BusStream implements the Stream trait. Stream::for_each is calling a closure for each item - // and returns a Future that resolves when the stream is done - let loop_clone = loop_.clone(); - let messages = gst::BusStream::new(&bus).for_each(move |msg| { - use gst::MessageView; - - // Determine whether we want to quit: on EOS or error message - // we quit, otherwise simply continue. - match msg.view() { - MessageView::Eos(..) => loop_clone.quit(), - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - loop_clone.quit(); - } - _ => (), - } - - // New future to resolve for each message: nothing here - future::ready(()) - }); - // Spawn our message handling stream - ctx.spawn_local(messages); + ctx.spawn_local(message_handler(loop_.clone(), bus)); // And run until something is quitting the loop, i.e. an EOS // or error message is received above diff --git a/gstreamer-editing-services/Cargo.toml b/gstreamer-editing-services/Cargo.toml index 58330df0a..bf01df11a 100644 --- a/gstreamer-editing-services/Cargo.toml +++ b/gstreamer-editing-services/Cargo.toml @@ -25,6 +25,7 @@ gio = { git = "https://github.com/gtk-rs/gio" } gstreamer = { path = "../gstreamer" } gstreamer-base = { path = "../gstreamer-base" } gstreamer-pbutils = { path = "../gstreamer-pbutils" } +fragile = "0.3" [build-dependencies] rustdoc-stripper = { version = "0.1", optional = true } diff --git a/gstreamer-editing-services/src/lib.rs b/gstreamer-editing-services/src/lib.rs index b5ffc82a4..8bb9d5f95 100644 --- a/gstreamer-editing-services/src/lib.rs +++ b/gstreamer-editing-services/src/lib.rs @@ -25,6 +25,8 @@ use glib::translate::from_glib; extern crate glib; extern crate gio; +extern crate fragile; + static GES_INIT: Once = Once::new(); pub fn init() -> Result<(), glib::BoolError> { diff --git a/gstreamer/Cargo.toml b/gstreamer/Cargo.toml index a513ce56f..c13d63cc7 100644 --- a/gstreamer/Cargo.toml +++ b/gstreamer/Cargo.toml @@ -22,7 +22,7 @@ gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-s glib = { git = "https://github.com/gtk-rs/glib" } num-rational = { version = "0.2", default-features = false, features = [] } lazy_static = "1.0" -futures-core-preview = { version = "0.3.0-alpha", optional = true } +futures-core = "0.3" muldiv = "0.2" serde = { version = "1.0", optional = true } serde_bytes = { version = "0.11", optional = true } @@ -44,8 +44,7 @@ v1_14 = ["gstreamer-sys/v1_14", "v1_12"] v1_16 = ["gstreamer-sys/v1_16", "v1_14"] embed-lgpl-docs = ["rustdoc-stripper"] purge-lgpl-docs = ["rustdoc-stripper"] -dox = ["v1_16", "gstreamer-sys/dox", "glib/dox", "futures", "ser_de"] -futures = ["futures-core-preview"] +dox = ["v1_16", "gstreamer-sys/dox", "glib/dox", "ser_de"] ser_de = ["num-rational/serde", "serde", "serde_bytes", "serde_derive"] [package.metadata.docs.rs] diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index fefc12f81..20d02c0ac 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -6,6 +6,8 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use futures_core::stream::Stream; +use futures_core::task::{Context, Poll, Waker}; use glib; use glib::source::{Continue, Priority, SourceId}; use glib::translate::*; @@ -14,7 +16,9 @@ use glib_sys::{gboolean, gpointer}; use gst_sys; use std::cell::RefCell; use std::mem::transmute; +use std::pin::Pin; use std::ptr; +use std::sync::{Arc, Mutex}; use Bus; use BusSyncReply; @@ -229,65 +233,52 @@ impl<'a> Iterator for Iter<'a> { } } -#[cfg(any(feature = "futures", feature = "dox"))] -mod futures { - use super::*; - use futures_core::stream::Stream; - use futures_core::task::{Context, Waker}; - use futures_core::Poll; - use std::pin::Pin; - use std::sync::{Arc, Mutex}; +#[derive(Debug)] +pub struct BusStream(Bus, Arc>>); - #[derive(Debug)] - pub struct BusStream(Bus, Arc>>); +impl BusStream { + pub fn new(bus: &Bus) -> Self { + skip_assert_initialized!(); + let waker = Arc::new(Mutex::new(None)); + let waker_clone = Arc::clone(&waker); - impl BusStream { - pub fn new(bus: &Bus) -> Self { - skip_assert_initialized!(); - let waker = Arc::new(Mutex::new(None)); - let waker_clone = Arc::clone(&waker); - - bus.set_sync_handler(move |_, _| { - let mut waker = waker_clone.lock().unwrap(); - if let Some(waker) = waker.take() { - // FIXME: Force type... - let waker: Waker = waker; - waker.wake(); - } - - BusSyncReply::Pass - }); - - BusStream(bus.clone(), waker) - } - } - - impl Drop for BusStream { - fn drop(&mut self) { - self.0.unset_sync_handler(); - } - } - - impl Stream for BusStream { - type Item = Message; - - fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { - let BusStream(ref bus, ref waker) = *self; - - let msg = bus.pop(); - if let Some(msg) = msg { - Poll::Ready(Some(msg)) - } else { - let mut waker = waker.lock().unwrap(); - *waker = Some(ctx.waker().clone()); - Poll::Pending + bus.set_sync_handler(move |_, _| { + let mut waker = waker_clone.lock().unwrap(); + if let Some(waker) = waker.take() { + // FIXME: Force type... + let waker: Waker = waker; + waker.wake(); } - } + + BusSyncReply::Pass + }); + + BusStream(bus.clone(), waker) } } -#[cfg(any(feature = "futures", feature = "dox"))] -pub use bus::futures::BusStream; +impl Drop for BusStream { + fn drop(&mut self) { + self.0.unset_sync_handler(); + } +} + +impl Stream for BusStream { + type Item = Message; + + fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + let BusStream(ref bus, ref waker) = *self; + + let msg = bus.pop(); + if let Some(msg) = msg { + Poll::Ready(Some(msg)) + } else { + let mut waker = waker.lock().unwrap(); + *waker = Some(ctx.waker().clone()); + Poll::Pending + } + } +} #[cfg(test)] mod tests { diff --git a/gstreamer/src/lib.rs b/gstreamer/src/lib.rs index 4f06bb7c9..54452d33d 100644 --- a/gstreamer/src/lib.rs +++ b/gstreamer/src/lib.rs @@ -32,7 +32,6 @@ pub extern crate glib; extern crate num_rational; -#[cfg(any(feature = "futures", feature = "dox"))] extern crate futures_core; extern crate muldiv; @@ -221,7 +220,6 @@ cfg_if! { } pub use self::iterator::{Iterator, IteratorError, IteratorImpl, StdIterator}; -#[cfg(any(feature = "futures", feature = "dox"))] pub use bus::BusStream; pub use child_proxy::ChildProxyExtManual; pub use clock_time::ClockTime;