diff --git a/Cargo.lock b/Cargo.lock index 0488c7ed6..008b3aeb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -253,6 +253,7 @@ name = "gstreamer" version = "0.1.0" dependencies = [ "bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "glib 0.1.3 (git+https://github.com/gtk-rs/glib)", "glib-sys 0.3.4 (git+https://github.com/gtk-rs/sys)", "gobject-sys 0.3.4 (git+https://github.com/gtk-rs/sys)", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 3357f1fc1..382d5513f 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -12,13 +12,14 @@ gstreamer-video = { path = "../gstreamer-video" } gstreamer-player = { path = "../gstreamer-player", optional = true } gtk = { version = "0.1.3", git = "https://github.com/gtk-rs/gtk", features = ["v3_6"] } gio = { version = "0.1.3", git = "https://github.com/gtk-rs/gio" } -futures = "0.1" -tokio-core = "0.1" +futures = { version = "0.1", optional = true } +tokio-core = { version = "0.1", optional = true } send-cell = "0.1" byte-slice-cast = "0.1" [features] gst-player = ["gstreamer-player"] +tokio = ["gstreamer/futures", "futures", "tokio-core"] default-features = [] [badges] diff --git a/examples/src/bin/tokio.rs b/examples/src/bin/tokio.rs index b10f637e7..15c079afa 100644 --- a/examples/src/bin/tokio.rs +++ b/examples/src/bin/tokio.rs @@ -1,60 +1,20 @@ extern crate gstreamer as gst; +#[cfg(feature = "tokio")] use gst::*; +#[cfg(feature = "tokio")] extern crate futures; -use futures::{Async, Poll}; -use futures::task::Task; +#[cfg(feature = "tokio")] use futures::stream::Stream; +#[cfg(feature = "tokio")] extern crate tokio_core; +#[cfg(feature = "tokio")] use tokio_core::reactor::Core; +#[cfg(feature = "tokio")] use std::env; -use std::sync::{Arc, Mutex}; - -struct BusStream(Bus, Arc>>); - -impl BusStream { - fn new(bus: &Bus) -> Self { - let task = Arc::new(Mutex::new(None)); - let task_clone = task.clone(); - - bus.set_sync_handler(move |_, _| { - let mut task = task_clone.lock().unwrap(); - if let Some(task) = task.take() { - // FIXME: Force type... - let task: Task = task; - task.notify(); - } - - BusSyncReply::Pass - }); - BusStream(bus.clone(), task) - } -} - -impl Drop for BusStream { - fn drop(&mut self) { - self.0.unset_sync_handler(); - } -} - -impl Stream for BusStream { - type Item = Message; - type Error = (); - - fn poll(&mut self) -> Poll, Self::Error> { - let mut task = self.1.lock().unwrap(); - - let msg = self.0.pop(); - if let Some(msg) = msg { - Ok(Async::Ready(Some(msg))) - } else { - *task = Some(futures::task::current()); - Ok(Async::NotReady) - } - } -} +#[cfg(feature = "tokio")] fn main() { let pipeline_str = env::args().collect::>()[1..].join(" "); @@ -95,3 +55,8 @@ fn main() { let ret = pipeline.set_state(gst::State::Null); assert_ne!(ret, gst::StateChangeReturn::Failure); } + +#[cfg(not(feature = "tokio"))] +fn main() { + println!("Please compile with --features tokio"); +} diff --git a/gstreamer/Cargo.toml b/gstreamer/Cargo.toml index 22270f471..7b838a3da 100644 --- a/gstreamer/Cargo.toml +++ b/gstreamer/Cargo.toml @@ -19,6 +19,7 @@ gstreamer-sys = { version = "0.1.1", git = "https://github.com/sdroege/gstreamer glib = { version = "0.1.3", git = "https://github.com/gtk-rs/glib" } num-rational = { version = "0.1.38", default-features = false, features = [] } lazy_static = "0.2" +futures = { version = "0.1", optional = true } [build-dependencies.rustdoc-stripper] version = "0.1" diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index e47c7782d..cc1fd64dc 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -16,6 +16,17 @@ use glib_ffi; use glib_ffi::{gboolean, gpointer}; use std::ptr; +#[cfg(feature = "futures")] +use std::sync::{Arc, Mutex}; +#[cfg(feature = "futures")] +use futures; +#[cfg(feature = "futures")] +use futures::{Async, Poll}; +#[cfg(feature = "futures")] +use futures::task::Task; +#[cfg(feature = "futures")] +use futures::stream::Stream; + use Bus; use BusSyncReply; use Message; @@ -129,3 +140,52 @@ impl Bus { unsafe { ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None) } } } + +#[cfg(feature = "futures")] +pub struct BusStream(Bus, Arc>>); + +#[cfg(feature = "futures")] +impl BusStream { + pub fn new(bus: &Bus) -> Self { + let task = Arc::new(Mutex::new(None)); + let task_clone = task.clone(); + + bus.set_sync_handler(move |_, _| { + let mut task = task_clone.lock().unwrap(); + if let Some(task) = task.take() { + // FIXME: Force type... + let task: Task = task; + task.notify(); + } + + BusSyncReply::Pass + }); + + BusStream(bus.clone(), task) + } +} + +#[cfg(feature = "futures")] +impl Drop for BusStream { + fn drop(&mut self) { + self.0.unset_sync_handler(); + } +} + +#[cfg(feature = "futures")] +impl Stream for BusStream { + type Item = Message; + type Error = (); + + fn poll(&mut self) -> Poll, Self::Error> { + let mut task = self.1.lock().unwrap(); + + let msg = self.0.pop(); + if let Some(msg) = msg { + Ok(Async::Ready(Some(msg))) + } else { + *task = Some(futures::task::current()); + Ok(Async::NotReady) + } + } +} diff --git a/gstreamer/src/lib.rs b/gstreamer/src/lib.rs index 99a822632..85c53180e 100644 --- a/gstreamer/src/lib.rs +++ b/gstreamer/src/lib.rs @@ -22,6 +22,9 @@ extern crate glib; extern crate num_rational; +#[cfg(feature = "futures")] +extern crate futures; + use glib::translate::{from_glib, from_glib_full}; macro_rules! callback_guard { @@ -102,6 +105,8 @@ pub use child_proxy::ChildProxyExtManual; pub use tag_setter::TagSetterExtManual; pub use self::iterator::Iterator; pub use device_provider::DeviceProviderExtManual; +#[cfg(feature = "futures")] +pub use bus::BusStream; mod value; pub use value::*;