Update to futures 0.3 and require Rust 1.39

Also use async/await in the futures examples.
This commit is contained in:
Sebastian Dröge 2019-11-11 11:56:20 +01:00
parent 9e30687d8e
commit 77c6741ae0
9 changed files with 110 additions and 135 deletions

View file

@ -52,7 +52,7 @@ stages:
for crate in gstreamer*; do
if [ -n "$ALL_FEATURES" ]; then
if [ $crate = "gstreamer" ]; then
FEATURES=futures,ser_de,v1_16
FEATURES=ser_de,v1_16
elif [ $crate = "gstreamer-gl" ]; then
FEATURES=egl,x11,wayland,v1_16
else
@ -75,10 +75,10 @@ stages:
cargo build --color=always --manifest-path tutorials/Cargo.toml --bins --examples --all-features
fi
test 1.36:
# 1.36 img
test 1.39:
# 1.39 img
# https://hub.docker.com/_/rust/
image: "rust:1.36-slim-buster"
image: "rust:1.39-slim-buster"
extends: '.cargo test'
test stable:
@ -130,7 +130,7 @@ clippy:
- |
for crate in gstreamer*; do
if [ $crate = "gstreamer" ]; then
FEATURES=futures,ser_de,v1_16
FEATURES=ser_de,v1_16
elif [ $crate = "gstreamer-gl" ]; then
FEATURES=egl,x11,wayland,v1_16
else

View file

@ -21,7 +21,7 @@ gstreamer-rtsp-server-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gs
gtk = { git = "https://github.com/gtk-rs/gtk", optional = true }
gdk = { git = "https://github.com/gtk-rs/gdk", optional = true }
gio = { git = "https://github.com/gtk-rs/gio", optional = true }
futures-preview = { version = "0.3.0-alpha", optional = true }
futures = "0.3"
byte-slice-cast = "0.3"
failure = "0.1"
failure_derive = "0.1"
@ -43,8 +43,6 @@ gtksink = ["gtk", "gio"]
gtkvideooverlay = ["gtk", "gdk", "gio"]
gtkvideooverlay-x11 = ["gtkvideooverlay"]
gtkvideooverlay-quartz = ["gtkvideooverlay"]
generic-futures = ["gstreamer/futures", "futures-preview"]
glib-futures = ["generic-futures", "glib/futures"]
gst-rtsp-server = ["gstreamer-rtsp-server"]
gst-rtsp-server-record = ["gstreamer-rtsp-server-sys", "gstreamer-rtsp-server", "gstreamer-rtsp", "gio"]
v1_10 = ["gstreamer/v1_10"]
@ -120,11 +118,11 @@ name = "toc"
[[bin]]
name = "futures"
required-features = ["generic-futures"]
edition = "2018"
[[bin]]
name = "glib-futures"
required-features = ["glib-futures"]
edition = "2018"
[[bin]]
name = "rtsp-server-record"

View file

@ -8,15 +8,38 @@ use gst::prelude::*;
extern crate futures;
use futures::executor::LocalPool;
use futures::future;
use futures::prelude::*;
use futures::task::SpawnExt;
use std::env;
#[path = "../examples-common.rs"]
mod examples_common;
async fn message_loop(bus: gst::Bus) {
// BusStream implements the Stream trait
let mut messages = gst::BusStream::new(&bus);
while let Some(msg) = messages.next().await {
use gst::MessageView;
// Determine whether we want to quit: on EOS or error message
// we quit, otherwise simply continue.
match msg.view() {
MessageView::Eos(..) => break,
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.get_src().map(|s| s.get_path_string()),
err.get_error(),
err.get_debug()
);
break;
}
_ => (),
};
}
}
fn example_main() {
// Read the pipeline to launch from the commandline, using the launch syntax.
let pipeline_str = env::args().collect::<Vec<String>>()[1..].join(" ");
@ -34,43 +57,8 @@ fn example_main() {
// Use a LocalPool as executor. This runs single threaded on this very thread.
let mut pool = LocalPool::new();
// We use an AbortHandle for having a Future that runs forever
// until we call handle.abort() to quit our event loop
let (quit_handle, quit_registration) = future::AbortHandle::new_pair();
let quit_future =
future::Abortable::new(future::pending::<()>(), quit_registration).map(|_| ());
// BusStream implements the Stream trait. Stream::for_each is calling a closure for each item
// and returns a Future that resolves when the stream is done
let messages = gst::BusStream::new(&bus).for_each(move |msg| {
use gst::MessageView;
// Determine whether we want to quit: on EOS or error message
// we quit, otherwise simply continue.
match msg.view() {
MessageView::Eos(..) => quit_handle.abort(),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.get_src().map(|s| s.get_path_string()),
err.get_error(),
err.get_debug()
);
quit_handle.abort();
}
_ => (),
};
// New future to resolve for each message: nothing here
future::ready(())
});
// Spawn our message handling stream
pool.spawner().spawn(messages).unwrap();
// And run until something is quitting the loop, i.e. an EOS
// or error message is received above
pool.run_until(quit_future);
// Run until our message loop finishes, e.g. EOS/error happens
pool.run_until(message_loop(bus));
pipeline
.set_state(gst::State::Null)

View file

@ -4,7 +4,6 @@ use gst::prelude::*;
extern crate glib;
extern crate futures;
use futures::future;
use futures::prelude::*;
use std::env;
@ -12,6 +11,31 @@ use std::env;
#[path = "../examples-common.rs"]
mod examples_common;
async fn message_handler(loop_: glib::MainLoop, bus: gst::Bus) {
// BusStream implements the Stream trait
let mut messages = gst::BusStream::new(&bus);
while let Some(msg) = messages.next().await {
use gst::MessageView;
// Determine whether we want to quit: on EOS or error message
// we quit, otherwise simply continue.
match msg.view() {
MessageView::Eos(..) => loop_.quit(),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.get_src().map(|s| s.get_path_string()),
err.get_error(),
err.get_debug()
);
loop_.quit();
}
_ => (),
}
}
}
fn example_main() {
// Get the default main context and make it also the thread default, then create
// a main loop for it
@ -32,34 +56,8 @@ fn example_main() {
.set_state(gst::State::Playing)
.expect("Unable to set the pipeline to the `Playing` state");
// BusStream implements the Stream trait. Stream::for_each is calling a closure for each item
// and returns a Future that resolves when the stream is done
let loop_clone = loop_.clone();
let messages = gst::BusStream::new(&bus).for_each(move |msg| {
use gst::MessageView;
// Determine whether we want to quit: on EOS or error message
// we quit, otherwise simply continue.
match msg.view() {
MessageView::Eos(..) => loop_clone.quit(),
MessageView::Error(err) => {
println!(
"Error from {:?}: {} ({:?})",
err.get_src().map(|s| s.get_path_string()),
err.get_error(),
err.get_debug()
);
loop_clone.quit();
}
_ => (),
}
// New future to resolve for each message: nothing here
future::ready(())
});
// Spawn our message handling stream
ctx.spawn_local(messages);
ctx.spawn_local(message_handler(loop_.clone(), bus));
// And run until something is quitting the loop, i.e. an EOS
// or error message is received above

View file

@ -25,6 +25,7 @@ gio = { git = "https://github.com/gtk-rs/gio" }
gstreamer = { path = "../gstreamer" }
gstreamer-base = { path = "../gstreamer-base" }
gstreamer-pbutils = { path = "../gstreamer-pbutils" }
fragile = "0.3"
[build-dependencies]
rustdoc-stripper = { version = "0.1", optional = true }

View file

@ -25,6 +25,8 @@ use glib::translate::from_glib;
extern crate glib;
extern crate gio;
extern crate fragile;
static GES_INIT: Once = Once::new();
pub fn init() -> Result<(), glib::BoolError> {

View file

@ -22,7 +22,7 @@ gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-s
glib = { git = "https://github.com/gtk-rs/glib" }
num-rational = { version = "0.2", default-features = false, features = [] }
lazy_static = "1.0"
futures-core-preview = { version = "0.3.0-alpha", optional = true }
futures-core = "0.3"
muldiv = "0.2"
serde = { version = "1.0", optional = true }
serde_bytes = { version = "0.11", optional = true }
@ -44,8 +44,7 @@ v1_14 = ["gstreamer-sys/v1_14", "v1_12"]
v1_16 = ["gstreamer-sys/v1_16", "v1_14"]
embed-lgpl-docs = ["rustdoc-stripper"]
purge-lgpl-docs = ["rustdoc-stripper"]
dox = ["v1_16", "gstreamer-sys/dox", "glib/dox", "futures", "ser_de"]
futures = ["futures-core-preview"]
dox = ["v1_16", "gstreamer-sys/dox", "glib/dox", "ser_de"]
ser_de = ["num-rational/serde", "serde", "serde_bytes", "serde_derive"]
[package.metadata.docs.rs]

View file

@ -6,6 +6,8 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll, Waker};
use glib;
use glib::source::{Continue, Priority, SourceId};
use glib::translate::*;
@ -14,7 +16,9 @@ use glib_sys::{gboolean, gpointer};
use gst_sys;
use std::cell::RefCell;
use std::mem::transmute;
use std::pin::Pin;
use std::ptr;
use std::sync::{Arc, Mutex};
use Bus;
use BusSyncReply;
@ -229,65 +233,52 @@ impl<'a> Iterator for Iter<'a> {
}
}
#[cfg(any(feature = "futures", feature = "dox"))]
mod futures {
use super::*;
use futures_core::stream::Stream;
use futures_core::task::{Context, Waker};
use futures_core::Poll;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct BusStream(Bus, Arc<Mutex<Option<Waker>>>);
#[derive(Debug)]
pub struct BusStream(Bus, Arc<Mutex<Option<Waker>>>);
impl BusStream {
pub fn new(bus: &Bus) -> Self {
skip_assert_initialized!();
let waker = Arc::new(Mutex::new(None));
let waker_clone = Arc::clone(&waker);
impl BusStream {
pub fn new(bus: &Bus) -> Self {
skip_assert_initialized!();
let waker = Arc::new(Mutex::new(None));
let waker_clone = Arc::clone(&waker);
bus.set_sync_handler(move |_, _| {
let mut waker = waker_clone.lock().unwrap();
if let Some(waker) = waker.take() {
// FIXME: Force type...
let waker: Waker = waker;
waker.wake();
}
BusSyncReply::Pass
});
BusStream(bus.clone(), waker)
}
}
impl Drop for BusStream {
fn drop(&mut self) {
self.0.unset_sync_handler();
}
}
impl Stream for BusStream {
type Item = Message;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let BusStream(ref bus, ref waker) = *self;
let msg = bus.pop();
if let Some(msg) = msg {
Poll::Ready(Some(msg))
} else {
let mut waker = waker.lock().unwrap();
*waker = Some(ctx.waker().clone());
Poll::Pending
bus.set_sync_handler(move |_, _| {
let mut waker = waker_clone.lock().unwrap();
if let Some(waker) = waker.take() {
// FIXME: Force type...
let waker: Waker = waker;
waker.wake();
}
}
BusSyncReply::Pass
});
BusStream(bus.clone(), waker)
}
}
#[cfg(any(feature = "futures", feature = "dox"))]
pub use bus::futures::BusStream;
impl Drop for BusStream {
fn drop(&mut self) {
self.0.unset_sync_handler();
}
}
impl Stream for BusStream {
type Item = Message;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let BusStream(ref bus, ref waker) = *self;
let msg = bus.pop();
if let Some(msg) = msg {
Poll::Ready(Some(msg))
} else {
let mut waker = waker.lock().unwrap();
*waker = Some(ctx.waker().clone());
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {

View file

@ -32,7 +32,6 @@ pub extern crate glib;
extern crate num_rational;
#[cfg(any(feature = "futures", feature = "dox"))]
extern crate futures_core;
extern crate muldiv;
@ -221,7 +220,6 @@ cfg_if! {
}
pub use self::iterator::{Iterator, IteratorError, IteratorImpl, StdIterator};
#[cfg(any(feature = "futures", feature = "dox"))]
pub use bus::BusStream;
pub use child_proxy::ChildProxyExtManual;
pub use clock_time::ClockTime;