Update futures code to futures 0.3

Also clean it up a bit.
This commit is contained in:
Sebastian Dröge 2019-05-02 21:35:12 +03:00
parent 2b122a20c5
commit 5f1a50026d
5 changed files with 83 additions and 64 deletions

View file

@ -19,7 +19,7 @@ gstreamer-rtsp-server-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gs
gtk = { git = "https://github.com/gtk-rs/gtk", optional = true } gtk = { git = "https://github.com/gtk-rs/gtk", optional = true }
gdk = { git = "https://github.com/gtk-rs/gdk", optional = true } gdk = { git = "https://github.com/gtk-rs/gdk", optional = true }
gio = { git = "https://github.com/gtk-rs/gio", optional = true } gio = { git = "https://github.com/gtk-rs/gio", optional = true }
futures-preview = { version = "0.2", optional = true } futures-preview = { version = "0.3.0-alpha", optional = true }
byte-slice-cast = "0.2" byte-slice-cast = "0.2"
failure = "0.1" failure = "0.1"
failure_derive = "0.1" failure_derive = "0.1"

View file

@ -7,8 +7,10 @@ extern crate gstreamer as gst;
use gst::prelude::*; use gst::prelude::*;
extern crate futures; extern crate futures;
use futures::executor::block_on; use futures::executor::LocalPool;
use futures::future;
use futures::prelude::*; use futures::prelude::*;
use futures::task::SpawnExt;
use std::env; use std::env;
@ -29,18 +31,23 @@ fn example_main() {
.set_state(gst::State::Playing) .set_state(gst::State::Playing)
.expect("Unable to set the pipeline to the `Playing` state"); .expect("Unable to set the pipeline to the `Playing` state");
// BusStream implements the Stream trait, but Stream::for_each is // Use a LocalPool as executor. This runs single threaded on this very thread.
// calling a closure for each item and returns a Future that resolves let mut pool = LocalPool::new();
// when the stream is done or an error has happened
let messages = gst::BusStream::new(&bus) // We use an AbortHandle for having a Future that runs forever
.for_each(|msg| { // until we call handle.abort() to quit our event loop
let (quit_handle, quit_registration) = future::AbortHandle::new_pair();
let quit_future = future::Abortable::new(future::empty::<()>(), quit_registration).map(|_| ());
// BusStream implements the Stream trait. Stream::for_each is calling a closure for each item
// and returns a Future that resolves when the stream is done
let messages = gst::BusStream::new(&bus).for_each(move |msg| {
use gst::MessageView; use gst::MessageView;
// Determine whether we want to resolve the future, or we still have // Determine whether we want to quit: on EOS or error message
// to wait. The future is resolved when either an error occurs, or the // we quit, otherwise simply continue.
// pipeline succeeded execution (got an EOS event). match msg.view() {
let quit = match msg.view() { MessageView::Eos(..) => quit_handle.abort(),
MessageView::Eos(..) => true,
MessageView::Error(err) => { MessageView::Error(err) => {
println!( println!(
"Error from {:?}: {} ({:?})", "Error from {:?}: {} ({:?})",
@ -48,23 +55,21 @@ fn example_main() {
err.get_error(), err.get_error(),
err.get_debug() err.get_debug()
); );
true quit_handle.abort();
} }
_ => false, _ => (),
}; };
if quit { // New future to resolve for each message: nothing here
Err(()) // This resolves the future that is returned by for_each future::ready(())
// FIXME: At the moment, EOS messages also result in the future to be resolved });
// by an error. This should probably be changed in the future.
} else {
Ok(()) // Continue - do not resolve the future yet.
}
})
.and_then(|_| Ok(()));
// Synchronously wait on the future we created above. // Spawn our message handling stream
let _ = block_on(messages); pool.spawner().spawn(messages).unwrap();
// And run until something is quitting the loop, i.e. an EOS
// or error message is received above
pool.run_until(quit_future);
pipeline pipeline
.set_state(gst::State::Null) .set_state(gst::State::Null)

View file

@ -4,6 +4,7 @@ use gst::prelude::*;
extern crate glib; extern crate glib;
extern crate futures; extern crate futures;
use futures::future;
use futures::prelude::*; use futures::prelude::*;
use std::env; use std::env;
@ -12,12 +13,18 @@ use std::env;
mod examples_common; mod examples_common;
fn example_main() { fn example_main() {
// Get the default main context and make it also the thread default, then create
// a main loop for it
let ctx = glib::MainContext::default(); let ctx = glib::MainContext::default();
ctx.push_thread_default();
let loop_ = glib::MainLoop::new(Some(&ctx), false);
// Read the pipeline to launch from the commandline, using the launch syntax.
let pipeline_str = env::args().collect::<Vec<String>>()[1..].join(" "); let pipeline_str = env::args().collect::<Vec<String>>()[1..].join(" ");
gst::init().unwrap(); gst::init().unwrap();
// Create a pipeline from the launch-syntax given on the cli.
let pipeline = gst::parse_launch(&pipeline_str).unwrap(); let pipeline = gst::parse_launch(&pipeline_str).unwrap();
let bus = pipeline.get_bus().unwrap(); let bus = pipeline.get_bus().unwrap();
@ -25,12 +32,16 @@ fn example_main() {
.set_state(gst::State::Playing) .set_state(gst::State::Playing)
.expect("Unable to set the pipeline to the `Playing` state"); .expect("Unable to set the pipeline to the `Playing` state");
let messages = gst::BusStream::new(&bus) // BusStream implements the Stream trait. Stream::for_each is calling a closure for each item
.for_each(|msg| { // and returns a Future that resolves when the stream is done
let loop_clone = loop_.clone();
let messages = gst::BusStream::new(&bus).for_each(move |msg| {
use gst::MessageView; use gst::MessageView;
let quit = match msg.view() { // Determine whether we want to quit: on EOS or error message
MessageView::Eos(..) => true, // we quit, otherwise simply continue.
match msg.view() {
MessageView::Eos(..) => loop_clone.quit(),
MessageView::Error(err) => { MessageView::Error(err) => {
println!( println!(
"Error from {:?}: {} ({:?})", "Error from {:?}: {} ({:?})",
@ -38,24 +49,27 @@ fn example_main() {
err.get_error(), err.get_error(),
err.get_debug() err.get_debug()
); );
true loop_clone.quit();
} }
_ => false, _ => (),
};
if quit {
Err(())
} else {
Ok(())
} }
})
.and_then(|_| Ok(()));
let _ = ctx.block_on(messages); // New future to resolve for each message: nothing here
future::ready(())
});
// Spawn our message handling stream
ctx.spawn_local(messages);
// And run until something is quitting the loop, i.e. an EOS
// or error message is received above
loop_.run();
pipeline pipeline
.set_state(gst::State::Null) .set_state(gst::State::Null)
.expect("Unable to set the pipeline to the `Null` state"); .expect("Unable to set the pipeline to the `Null` state");
ctx.pop_thread_default();
} }
fn main() { fn main() {

View file

@ -22,7 +22,7 @@ gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-s
glib = { git = "https://github.com/gtk-rs/glib" } glib = { git = "https://github.com/gtk-rs/glib" }
num-rational = { version = "0.2", default-features = false, features = [] } num-rational = { version = "0.2", default-features = false, features = [] }
lazy_static = "1.0" lazy_static = "1.0"
futures-core-preview = { version = "0.2", optional = true } futures-core-preview = { version = "0.3.0-alpha", optional = true }
muldiv = "0.2" muldiv = "0.2"
serde = { version = "1.0", optional = true } serde = { version = "1.0", optional = true }
serde_bytes = { version = "0.11", optional = true } serde_bytes = { version = "0.11", optional = true }

View file

@ -201,7 +201,8 @@ mod futures {
use super::*; use super::*;
use futures_core::stream::Stream; use futures_core::stream::Stream;
use futures_core::task::{Context, Waker}; use futures_core::task::{Context, Waker};
use futures_core::{Async, Poll}; use futures_core::Poll;
use std::pin::Pin;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
#[derive(Debug)] #[derive(Debug)]
@ -236,18 +237,17 @@ mod futures {
impl Stream for BusStream { impl Stream for BusStream {
type Item = Message; type Item = Message;
type Error = ();
fn poll_next(&mut self, ctx: &mut Context) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let BusStream(ref bus, ref waker) = *self; let BusStream(ref bus, ref waker) = *self;
let msg = bus.pop(); let msg = bus.pop();
if let Some(msg) = msg { if let Some(msg) = msg {
Ok(Async::Ready(Some(msg))) Poll::Ready(Some(msg))
} else { } else {
let mut waker = waker.lock().unwrap(); let mut waker = waker.lock().unwrap();
*waker = Some(ctx.waker().clone()); *waker = Some(ctx.waker().clone());
Ok(Async::Pending) Poll::Pending
} }
} }
} }