diff --git a/gstreamer-app/Cargo.toml b/gstreamer-app/Cargo.toml index c4f75dc0c..54edc5741 100644 --- a/gstreamer-app/Cargo.toml +++ b/gstreamer-app/Cargo.toml @@ -13,6 +13,8 @@ keywords = ["gstreamer", "multimedia", "audio", "video", "gnome"] build = "build.rs" [dependencies] +futures-core = "0.3" +futures-sink = "0.3" bitflags = "1.0" libc = "0.2" glib-sys = { git = "https://github.com/gtk-rs/sys" } diff --git a/gstreamer-app/src/app_sink.rs b/gstreamer-app/src/app_sink.rs index d566466c1..791e1c587 100644 --- a/gstreamer-app/src/app_sink.rs +++ b/gstreamer-app/src/app_sink.rs @@ -20,6 +20,16 @@ use std::mem::transmute; use std::ptr; use AppSink; +#[cfg(any(feature = "v1_10"))] +use { + futures_core::Stream, + std::{ + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll, Waker}, + }, +}; + #[allow(clippy::type_complexity)] pub struct AppSinkCallbacks { eos: Option>>, @@ -217,6 +227,11 @@ impl AppSink { ) } } + + #[cfg(any(feature = "v1_10"))] + pub fn stream(&self) -> AppSinkStream { + AppSinkStream::new(self) + } } unsafe extern "C" fn new_sample_trampoline< @@ -240,3 +255,85 @@ unsafe extern "C" fn new_preroll_trampoline< let ret: gst::FlowReturn = f(&from_glib_borrow(this)).into(); ret.to_glib() } + +#[cfg(any(feature = "v1_10"))] +#[derive(Debug)] +pub struct AppSinkStream { + app_sink: AppSink, + waker_reference: Arc>>, +} + +#[cfg(any(feature = "v1_10"))] +impl AppSinkStream { + fn new(app_sink: &AppSink) -> Self { + skip_assert_initialized!(); + + let app_sink = app_sink.clone(); + let waker_reference = Arc::new(Mutex::new(None as Option)); + + app_sink.set_callbacks( + AppSinkCallbacks::new() + .new_sample({ + let waker_reference = Arc::clone(&waker_reference); + + move |_| { + if let Some(waker) = waker_reference.lock().unwrap().take() { + waker.wake(); + } + + Ok(gst::FlowSuccess::Ok) + } + }) + .eos({ + let waker_reference = Arc::clone(&waker_reference); + + move |_| { + if let Some(waker) = waker_reference.lock().unwrap().take() { + waker.wake(); + } + } + }) + .build(), + ); + + Self { + app_sink, + waker_reference, + } + } +} + +#[cfg(any(feature = "v1_10"))] +impl Stream for AppSinkStream { + type Item = gst::Sample; + + fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll> { + self.app_sink + .try_pull_sample(gst::ClockTime::from_mseconds(0)) + .map(|sample| Poll::Ready(Some(sample))) + .unwrap_or_else(|| { + if self.app_sink.is_eos() { + return Poll::Ready(None); + } + + self.waker_reference + .lock() + .unwrap() + .replace(context.waker().to_owned()); + + Poll::Pending + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_app_sink_stream() { + gst::init().unwrap(); + + unimplemented!() + } +} diff --git a/gstreamer-app/src/app_src.rs b/gstreamer-app/src/app_src.rs index 47e29ef5b..9bfee9030 100644 --- a/gstreamer-app/src/app_src.rs +++ b/gstreamer-app/src/app_src.rs @@ -6,13 +6,16 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +use futures_sink::Sink; use glib::translate::*; use glib_sys::{gboolean, gpointer}; use gst; use gst_app_sys; use std::cell::RefCell; use std::mem; +use std::pin::Pin; use std::ptr; +use std::task::{Context, Poll}; use AppSrc; #[allow(clippy::type_complexity)] @@ -227,4 +230,59 @@ impl AppSrc { (from_glib(min.assume_init()), from_glib(max.assume_init())) } } + + pub fn sink(&self) -> AppSrcSink { + AppSrcSink::new(self) + } +} + +#[derive(Debug)] +pub struct AppSrcSink { + app_src: AppSrc, +} + +impl AppSrcSink { + fn new(app_src: &AppSrc) -> Self { + skip_assert_initialized!(); + + let app_src = app_src.clone(); + + Self { app_src } + } +} + +impl Sink for AppSrcSink { + type Error = gst::FlowError; + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> { + self.app_src.push_sample(&sample)?; + + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + self.app_src.end_of_stream()?; + + Poll::Ready(Ok(())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_app_src_sink() { + gst::init().unwrap(); + + unimplemented!() + } } diff --git a/gstreamer-app/src/lib.rs b/gstreamer-app/src/lib.rs index 8096ec7f4..26f8da8d6 100644 --- a/gstreamer-app/src/lib.rs +++ b/gstreamer-app/src/lib.rs @@ -8,6 +8,8 @@ extern crate libc; +extern crate futures_core; +extern crate futures_sink; extern crate glib_sys; extern crate gobject_sys; extern crate gstreamer as gst; @@ -28,10 +30,11 @@ macro_rules! skip_assert_initialized { mod auto; pub use auto::*; -mod app_sink; -mod app_src; -pub use app_sink::*; -pub use app_src::*; +pub mod app_sink; +pub use app_sink::AppSinkCallbacks; + +pub mod app_src; +pub use app_src::AppSrcCallbacks; // Re-export all the traits in a prelude module, so that applications // can always "use gst::prelude::*" without getting conflicts diff --git a/gstreamer/src/bus.rs b/gstreamer/src/bus.rs index 31d014421..96b3c8420 100644 --- a/gstreamer/src/bus.rs +++ b/gstreamer/src/bus.rs @@ -6,9 +6,9 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use futures_channel::mpsc; -use futures_core::stream::Stream; -use futures_core::task::{Context, Poll}; +use futures_channel::mpsc::{self, UnboundedReceiver}; +use futures_core::Stream; +use futures_util::{future, StreamExt}; use glib; use glib::source::{Continue, Priority, SourceId}; use glib::translate::*; @@ -19,6 +19,7 @@ use std::cell::RefCell; use std::mem::transmute; use std::pin::Pin; use std::ptr; +use std::task::{Context, Poll}; use Bus; use BusSyncReply; @@ -218,20 +219,18 @@ impl Bus { } } - pub fn stream(&self) -> impl Stream + Unpin + Send + 'static { + pub fn stream(&self) -> BusStream { BusStream::new(self) } pub fn stream_filtered<'a>( &self, - msg_types: &'a [MessageType], + message_types: &'a [MessageType], ) -> impl Stream + Unpin + Send + 'a { - use futures_util::future; - use futures_util::StreamExt; + self.stream().filter(move |message| { + let message_type = message.get_type(); - BusStream::new(self).filter(move |msg| { - let type_ = msg.get_type(); - future::ready(msg_types.contains(&type_)) + future::ready(message_types.contains(&message_type)) }) } } @@ -251,34 +250,39 @@ impl<'a> Iterator for Iter<'a> { } #[derive(Debug)] -struct BusStream(Bus, mpsc::UnboundedReceiver); +pub struct BusStream { + bus: Bus, + receiver: UnboundedReceiver, +} impl BusStream { fn new(bus: &Bus) -> Self { skip_assert_initialized!(); + let bus = bus.clone(); let (sender, receiver) = mpsc::unbounded(); - bus.set_sync_handler(move |_, msg| { - let _ = sender.unbounded_send(msg.to_owned()); + bus.set_sync_handler(move |_, message| { + let _ = sender.unbounded_send(message.to_owned()); + BusSyncReply::Drop }); - BusStream(bus.clone(), receiver) + Self { bus, receiver } } } impl Drop for BusStream { fn drop(&mut self) { - self.0.unset_sync_handler(); + self.bus.unset_sync_handler(); } } impl Stream for BusStream { type Item = Message; - fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { - Pin::new(&mut self.1).poll_next(ctx) + fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll> { + self.receiver.poll_next_unpin(context) } } diff --git a/gstreamer/src/lib.rs b/gstreamer/src/lib.rs index 3d2eb8e50..e2fd59c59 100644 --- a/gstreamer/src/lib.rs +++ b/gstreamer/src/lib.rs @@ -156,7 +156,7 @@ mod promise; #[cfg(any(feature = "v1_14", feature = "dox"))] pub use promise::*; -mod bus; +pub mod bus; mod element; mod bin;