From 5f1a50026ddeb40c2914fd4827f18d2097ae7aae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Thu, 2 May 2019 21:35:12 +0300 Subject: [PATCH] Update futures code to futures 0.3 Also clean it up a bit. --- examples/Cargo.toml | 2 +- examples/src/bin/futures.rs | 71 +++++++++++++++++--------------- examples/src/bin/glib-futures.rs | 62 +++++++++++++++++----------- gstreamer/Cargo.toml | 2 +- gstreamer/src/bus.rs | 10 ++--- 5 files changed, 83 insertions(+), 64 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 581137a3e..a3b1c458e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -19,7 +19,7 @@ gstreamer-rtsp-server-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gs gtk = { git = "https://github.com/gtk-rs/gtk", optional = true } gdk = { git = "https://github.com/gtk-rs/gdk", optional = true } gio = { git = "https://github.com/gtk-rs/gio", optional = true } -futures-preview = { version = "0.2", optional = true } +futures-preview = { version = "0.3.0-alpha", optional = true } byte-slice-cast = "0.2" failure = "0.1" failure_derive = "0.1" diff --git a/examples/src/bin/futures.rs b/examples/src/bin/futures.rs index 971516588..6eaac7bac 100644 --- a/examples/src/bin/futures.rs +++ b/examples/src/bin/futures.rs @@ -7,8 +7,10 @@ extern crate gstreamer as gst; use gst::prelude::*; extern crate futures; -use futures::executor::block_on; +use futures::executor::LocalPool; +use futures::future; use futures::prelude::*; +use futures::task::SpawnExt; use std::env; @@ -29,42 +31,45 @@ fn example_main() { .set_state(gst::State::Playing) .expect("Unable to set the pipeline to the `Playing` state"); - // BusStream implements the Stream trait, but Stream::for_each is - // calling a closure for each item and returns a Future that resolves - // when the stream is done or an error has happened - let messages = gst::BusStream::new(&bus) - .for_each(|msg| { - use gst::MessageView; + // Use a LocalPool as executor. This runs single threaded on this very thread. + let mut pool = LocalPool::new(); - // Determine whether we want to resolve the future, or we still have - // to wait. The future is resolved when either an error occurs, or the - // pipeline succeeded execution (got an EOS event). - let quit = match msg.view() { - MessageView::Eos(..) => true, - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - true - } - _ => false, - }; + // We use an AbortHandle for having a Future that runs forever + // until we call handle.abort() to quit our event loop + let (quit_handle, quit_registration) = future::AbortHandle::new_pair(); + let quit_future = future::Abortable::new(future::empty::<()>(), quit_registration).map(|_| ()); - if quit { - Err(()) // This resolves the future that is returned by for_each - // FIXME: At the moment, EOS messages also result in the future to be resolved - // by an error. This should probably be changed in the future. - } else { - Ok(()) // Continue - do not resolve the future yet. + // BusStream implements the Stream trait. Stream::for_each is calling a closure for each item + // and returns a Future that resolves when the stream is done + let messages = gst::BusStream::new(&bus).for_each(move |msg| { + use gst::MessageView; + + // Determine whether we want to quit: on EOS or error message + // we quit, otherwise simply continue. + match msg.view() { + MessageView::Eos(..) => quit_handle.abort(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.get_src().map(|s| s.get_path_string()), + err.get_error(), + err.get_debug() + ); + quit_handle.abort(); } - }) - .and_then(|_| Ok(())); + _ => (), + }; - // Synchronously wait on the future we created above. - let _ = block_on(messages); + // New future to resolve for each message: nothing here + future::ready(()) + }); + + // Spawn our message handling stream + pool.spawner().spawn(messages).unwrap(); + + // And run until something is quitting the loop, i.e. an EOS + // or error message is received above + pool.run_until(quit_future); pipeline .set_state(gst::State::Null) diff --git a/examples/src/bin/glib-futures.rs b/examples/src/bin/glib-futures.rs index e78a1e0f3..507700b67 100644 --- a/examples/src/bin/glib-futures.rs +++ b/examples/src/bin/glib-futures.rs @@ -4,6 +4,7 @@ use gst::prelude::*; extern crate glib; extern crate futures; +use futures::future; use futures::prelude::*; use std::env; @@ -12,12 +13,18 @@ use std::env; mod examples_common; fn example_main() { + // Get the default main context and make it also the thread default, then create + // a main loop for it let ctx = glib::MainContext::default(); + ctx.push_thread_default(); + let loop_ = glib::MainLoop::new(Some(&ctx), false); + // Read the pipeline to launch from the commandline, using the launch syntax. let pipeline_str = env::args().collect::>()[1..].join(" "); gst::init().unwrap(); + // Create a pipeline from the launch-syntax given on the cli. let pipeline = gst::parse_launch(&pipeline_str).unwrap(); let bus = pipeline.get_bus().unwrap(); @@ -25,37 +32,44 @@ fn example_main() { .set_state(gst::State::Playing) .expect("Unable to set the pipeline to the `Playing` state"); - let messages = gst::BusStream::new(&bus) - .for_each(|msg| { - use gst::MessageView; + // BusStream implements the Stream trait. Stream::for_each is calling a closure for each item + // and returns a Future that resolves when the stream is done + let loop_clone = loop_.clone(); + let messages = gst::BusStream::new(&bus).for_each(move |msg| { + use gst::MessageView; - let quit = match msg.view() { - MessageView::Eos(..) => true, - MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - true - } - _ => false, - }; - - if quit { - Err(()) - } else { - Ok(()) + // Determine whether we want to quit: on EOS or error message + // we quit, otherwise simply continue. + match msg.view() { + MessageView::Eos(..) => loop_clone.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.get_src().map(|s| s.get_path_string()), + err.get_error(), + err.get_debug() + ); + loop_clone.quit(); } - }) - .and_then(|_| Ok(())); + _ => (), + } - let _ = ctx.block_on(messages); + // New future to resolve for each message: nothing here + future::ready(()) + }); + + // Spawn our message handling stream + ctx.spawn_local(messages); + + // And run until something is quitting the loop, i.e. an EOS + // or error message is received above + loop_.run(); pipeline .set_state(gst::State::Null) .expect("Unable to set the pipeline to the `Null` state"); + + ctx.pop_thread_default(); } fn main() { diff --git a/gstreamer/Cargo.toml b/gstreamer/Cargo.toml index 57e0b3da3..51d0d79cf 100644 --- a/gstreamer/Cargo.toml +++ b/gstreamer/Cargo.toml @@ -22,7 +22,7 @@ gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-s glib = { git = "https://github.com/gtk-rs/glib" } num-rational = { version = "0.2", default-features = false, features = [] } lazy_static = "1.0" -futures-core-preview = { version = "0.2", optional = true } +futures-core-preview = { version = "0.3.0-alpha", optional = true } muldiv = "0.2" serde = { version = "1.0", optional = true } serde_bytes = { version = "0.11", optional = true } diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index cf56209d1..4c53fd6a9 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -201,7 +201,8 @@ mod futures { use super::*; use futures_core::stream::Stream; use futures_core::task::{Context, Waker}; - use futures_core::{Async, Poll}; + use futures_core::Poll; + use std::pin::Pin; use std::sync::{Arc, Mutex}; #[derive(Debug)] @@ -236,18 +237,17 @@ mod futures { impl Stream for BusStream { type Item = Message; - type Error = (); - fn poll_next(&mut self, ctx: &mut Context) -> Poll, Self::Error> { + fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { let BusStream(ref bus, ref waker) = *self; let msg = bus.pop(); if let Some(msg) = msg { - Ok(Async::Ready(Some(msg))) + Poll::Ready(Some(msg)) } else { let mut waker = waker.lock().unwrap(); *waker = Some(ctx.waker().clone()); - Ok(Async::Pending) + Poll::Pending } } }