Move futures based BusStream from examples to the bindings

And hide behind the "futures" feature.

Fixes https://github.com/sdroege/gstreamer-rs/issues/26
This commit is contained in:
Sebastian Dröge 2017-08-17 13:07:32 +03:00
parent 08e2f6d917
commit 165d85646f
6 changed files with 82 additions and 49 deletions

1
Cargo.lock generated
View file

@ -253,6 +253,7 @@ name = "gstreamer"
version = "0.1.0"
dependencies = [
"bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
"glib 0.1.3 (git+https://github.com/gtk-rs/glib)",
"glib-sys 0.3.4 (git+https://github.com/gtk-rs/sys)",
"gobject-sys 0.3.4 (git+https://github.com/gtk-rs/sys)",

View file

@ -12,13 +12,14 @@ gstreamer-video = { path = "../gstreamer-video" }
gstreamer-player = { path = "../gstreamer-player", optional = true }
gtk = { version = "0.1.3", git = "https://github.com/gtk-rs/gtk", features = ["v3_6"] }
gio = { version = "0.1.3", git = "https://github.com/gtk-rs/gio" }
futures = "0.1"
tokio-core = "0.1"
futures = { version = "0.1", optional = true }
tokio-core = { version = "0.1", optional = true }
send-cell = "0.1"
byte-slice-cast = "0.1"
[features]
gst-player = ["gstreamer-player"]
tokio = ["gstreamer/futures", "futures", "tokio-core"]
default-features = []
[badges]

View file

@ -1,60 +1,20 @@
extern crate gstreamer as gst;
#[cfg(feature = "tokio")]
use gst::*;
#[cfg(feature = "tokio")]
extern crate futures;
use futures::{Async, Poll};
use futures::task::Task;
#[cfg(feature = "tokio")]
use futures::stream::Stream;
#[cfg(feature = "tokio")]
extern crate tokio_core;
#[cfg(feature = "tokio")]
use tokio_core::reactor::Core;
#[cfg(feature = "tokio")]
use std::env;
use std::sync::{Arc, Mutex};
struct BusStream(Bus, Arc<Mutex<Option<Task>>>);
impl BusStream {
fn new(bus: &Bus) -> Self {
let task = Arc::new(Mutex::new(None));
let task_clone = task.clone();
bus.set_sync_handler(move |_, _| {
let mut task = task_clone.lock().unwrap();
if let Some(task) = task.take() {
// FIXME: Force type...
let task: Task = task;
task.notify();
}
BusSyncReply::Pass
});
BusStream(bus.clone(), task)
}
}
impl Drop for BusStream {
fn drop(&mut self) {
self.0.unset_sync_handler();
}
}
impl Stream for BusStream {
type Item = Message;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut task = self.1.lock().unwrap();
let msg = self.0.pop();
if let Some(msg) = msg {
Ok(Async::Ready(Some(msg)))
} else {
*task = Some(futures::task::current());
Ok(Async::NotReady)
}
}
}
#[cfg(feature = "tokio")]
fn main() {
let pipeline_str = env::args().collect::<Vec<String>>()[1..].join(" ");
@ -95,3 +55,8 @@ fn main() {
let ret = pipeline.set_state(gst::State::Null);
assert_ne!(ret, gst::StateChangeReturn::Failure);
}
#[cfg(not(feature = "tokio"))]
fn main() {
println!("Please compile with --features tokio");
}

View file

@ -19,6 +19,7 @@ gstreamer-sys = { version = "0.1.1", git = "https://github.com/sdroege/gstreamer
glib = { version = "0.1.3", git = "https://github.com/gtk-rs/glib" }
num-rational = { version = "0.1.38", default-features = false, features = [] }
lazy_static = "0.2"
futures = { version = "0.1", optional = true }
[build-dependencies.rustdoc-stripper]
version = "0.1"

View file

@ -16,6 +16,17 @@ use glib_ffi;
use glib_ffi::{gboolean, gpointer};
use std::ptr;
#[cfg(feature = "futures")]
use std::sync::{Arc, Mutex};
#[cfg(feature = "futures")]
use futures;
#[cfg(feature = "futures")]
use futures::{Async, Poll};
#[cfg(feature = "futures")]
use futures::task::Task;
#[cfg(feature = "futures")]
use futures::stream::Stream;
use Bus;
use BusSyncReply;
use Message;
@ -129,3 +140,52 @@ impl Bus {
unsafe { ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None) }
}
}
#[cfg(feature = "futures")]
pub struct BusStream(Bus, Arc<Mutex<Option<Task>>>);
#[cfg(feature = "futures")]
impl BusStream {
pub fn new(bus: &Bus) -> Self {
let task = Arc::new(Mutex::new(None));
let task_clone = task.clone();
bus.set_sync_handler(move |_, _| {
let mut task = task_clone.lock().unwrap();
if let Some(task) = task.take() {
// FIXME: Force type...
let task: Task = task;
task.notify();
}
BusSyncReply::Pass
});
BusStream(bus.clone(), task)
}
}
#[cfg(feature = "futures")]
impl Drop for BusStream {
fn drop(&mut self) {
self.0.unset_sync_handler();
}
}
#[cfg(feature = "futures")]
impl Stream for BusStream {
type Item = Message;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut task = self.1.lock().unwrap();
let msg = self.0.pop();
if let Some(msg) = msg {
Ok(Async::Ready(Some(msg)))
} else {
*task = Some(futures::task::current());
Ok(Async::NotReady)
}
}
}

View file

@ -22,6 +22,9 @@ extern crate glib;
extern crate num_rational;
#[cfg(feature = "futures")]
extern crate futures;
use glib::translate::{from_glib, from_glib_full};
macro_rules! callback_guard {
@ -102,6 +105,8 @@ pub use child_proxy::ChildProxyExtManual;
pub use tag_setter::TagSetterExtManual;
pub use self::iterator::Iterator;
pub use device_provider::DeviceProviderExtManual;
#[cfg(feature = "futures")]
pub use bus::BusStream;
mod value;
pub use value::*;