diff --git a/gstreamer-app/Cargo.toml b/gstreamer-app/Cargo.toml index 536320cda..5c6fbd710 100644 --- a/gstreamer-app/Cargo.toml +++ b/gstreamer-app/Cargo.toml @@ -28,6 +28,10 @@ gstreamer-base = { version = "0.15", path = "../gstreamer-base" } [build-dependencies] rustdoc-stripper = { version = "0.1", optional = true } +[dev-dependencies] +futures-util = { version = "0.3", features = ["sink"] } +futures-executor = "0.3" + [features] default = [] v1_10 = ["gstreamer/v1_10", "gstreamer-base/v1_10", "gstreamer-app-sys/v1_10"] diff --git a/gstreamer-app/src/app_src.rs b/gstreamer-app/src/app_src.rs index 9bfee9030..8b54ba189 100644 --- a/gstreamer-app/src/app_src.rs +++ b/gstreamer-app/src/app_src.rs @@ -15,7 +15,8 @@ use std::cell::RefCell; use std::mem; use std::pin::Pin; use std::ptr; -use std::task::{Context, Poll}; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; use AppSrc; #[allow(clippy::type_complexity)] @@ -239,6 +240,7 @@ impl AppSrc { #[derive(Debug)] pub struct AppSrcSink { app_src: AppSrc, + waker_reference: Arc>>, } impl AppSrcSink { @@ -246,16 +248,51 @@ impl AppSrcSink { skip_assert_initialized!(); let app_src = app_src.clone(); + let waker_reference = Arc::new(Mutex::new(None as Option)); - Self { app_src } + app_src.set_callbacks( + AppSrcCallbacks::new() + .need_data({ + let waker_reference = Arc::clone(&waker_reference); + + move |_, _| { + if let Some(waker) = waker_reference.lock().unwrap().take() { + waker.wake(); + } + } + }) + .build(), + ); + + Self { + app_src, + waker_reference, + } + } +} + +impl Drop for AppSrcSink { + fn drop(&mut self) { + self.app_src.set_callbacks(AppSrcCallbacks::new().build()); } } impl Sink for AppSrcSink { type Error = gst::FlowError; - fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) + fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll> { + let mut waker = self.waker_reference.lock().unwrap(); + + let current_level_bytes = self.app_src.get_current_level_bytes(); + let max_bytes = self.app_src.get_max_bytes(); + + if current_level_bytes >= max_bytes && max_bytes != 0 { + waker.replace(context.waker().to_owned()); + + Poll::Pending + } else { + Poll::Ready(Ok(())) + } } fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> { @@ -278,11 +315,68 @@ impl Sink for AppSrcSink { #[cfg(test)] mod tests { use super::*; + use futures_util::{sink::SinkExt, stream::StreamExt}; + use gst::prelude::*; + use std::sync::atomic::{AtomicUsize, Ordering}; #[test] fn test_app_src_sink() { gst::init().unwrap(); - unimplemented!() + let appsrc = gst::ElementFactory::make("appsrc", None).unwrap(); + let fakesink = gst::ElementFactory::make("fakesink", None).unwrap(); + + fakesink.set_property("signal-handoffs", &true).unwrap(); + + let pipeline = gst::Pipeline::new(None); + pipeline.add(&appsrc).unwrap(); + pipeline.add(&fakesink).unwrap(); + + appsrc.link(&fakesink).unwrap(); + + let mut bus_stream = pipeline.get_bus().unwrap().stream(); + let mut app_src_sink = appsrc.dynamic_cast::().unwrap().sink(); + + let sample_quantity = 5; + + let samples = (0..sample_quantity) + .map(|_| gst::Sample::new().buffer(&gst::Buffer::new()).build()) + .collect::>(); + + let mut sample_stream = futures_util::stream::iter(samples).map(Ok); + + let handoff_count_reference = Arc::new(AtomicUsize::new(0)); + + fakesink + .connect("handoff", false, { + let handoff_count_reference = Arc::clone(&handoff_count_reference); + + move |_| { + handoff_count_reference.fetch_add(1, Ordering::AcqRel); + + None + } + }) + .unwrap(); + + pipeline.set_state(gst::State::Playing).unwrap(); + + futures_executor::block_on(app_src_sink.send_all(&mut sample_stream)).unwrap(); + futures_executor::block_on(app_src_sink.close()).unwrap(); + + while let Some(message) = futures_executor::block_on(bus_stream.next()) { + match message.view() { + gst::MessageView::Eos(_) => break, + gst::MessageView::Error(_) => unreachable!(), + _ => continue, + } + } + + pipeline.set_state(gst::State::Null).unwrap(); + + assert_eq!( + handoff_count_reference.load(Ordering::Acquire), + sample_quantity + ); } } diff --git a/gstreamer-app/src/lib.rs b/gstreamer-app/src/lib.rs index 26f8da8d6..c7fd82503 100644 --- a/gstreamer-app/src/lib.rs +++ b/gstreamer-app/src/lib.rs @@ -20,6 +20,12 @@ extern crate gstreamer_sys as gst_sys; #[macro_use] extern crate glib; +#[cfg(test)] +extern crate futures_util; + +#[cfg(test)] +extern crate futures_executor; + macro_rules! skip_assert_initialized { () => {}; }