diff --git a/gstreamer-app/src/app_sink.rs b/gstreamer-app/src/app_sink.rs index 791e1c587..1c0a0f372 100644 --- a/gstreamer-app/src/app_sink.rs +++ b/gstreamer-app/src/app_sink.rs @@ -303,11 +303,20 @@ impl AppSinkStream { } } +#[cfg(any(feature = "v1_10"))] +impl Drop for AppSinkStream { + fn drop(&mut self) { + self.app_sink.set_callbacks(AppSinkCallbacks::new().build()); + } +} + #[cfg(any(feature = "v1_10"))] impl Stream for AppSinkStream { type Item = gst::Sample; fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll> { + let mut waker = self.waker_reference.lock().unwrap(); + self.app_sink .try_pull_sample(gst::ClockTime::from_mseconds(0)) .map(|sample| Poll::Ready(Some(sample))) @@ -316,24 +325,42 @@ impl Stream for AppSinkStream { return Poll::Ready(None); } - self.waker_reference - .lock() - .unwrap() - .replace(context.waker().to_owned()); + waker.replace(context.waker().to_owned()); Poll::Pending }) } } +#[cfg(any(feature = "v1_10"))] #[cfg(test)] mod tests { use super::*; + use futures_util::StreamExt; + use gst::prelude::*; #[test] fn test_app_sink_stream() { gst::init().unwrap(); - unimplemented!() + let videotestsrc = gst::ElementFactory::make("videotestsrc", None).unwrap(); + let appsink = gst::ElementFactory::make("appsink", None).unwrap(); + + videotestsrc.set_property("num-buffers", &5).unwrap(); + + let pipeline = gst::Pipeline::new(None); + pipeline.add(&videotestsrc).unwrap(); + pipeline.add(&appsink).unwrap(); + + videotestsrc.link(&appsink).unwrap(); + + let app_sink_stream = appsink.dynamic_cast::().unwrap().stream(); + let samples_future = app_sink_stream.collect::>(); + + pipeline.set_state(gst::State::Playing).unwrap(); + let samples = futures_executor::block_on(samples_future); + pipeline.set_state(gst::State::Null).unwrap(); + + assert_eq!(samples.len(), 5); } }